Only use one connection per base URL

This commit is contained in:
Philipp Heckel 2021-11-16 14:08:52 -05:00
parent 0ab3bdc2a0
commit cc25c5fc5c
7 changed files with 197 additions and 115 deletions

View file

@ -21,7 +21,7 @@ data class Subscription(
} }
enum class ConnectionState { enum class ConnectionState {
NOT_APPLICABLE, RECONNECTING, CONNECTED NOT_APPLICABLE, CONNECTING, CONNECTED
} }
data class SubscriptionWithMetadata( data class SubscriptionWithMetadata(

View file

@ -137,14 +137,20 @@ class Repository(private val subscriptionDao: SubscriptionDao, private val notif
) )
} }
fun updateStateIfChanged(subscriptionId: Long, newState: ConnectionState) { fun updateState(subscriptionIds: Collection<Long>, newState: ConnectionState) {
val state = connectionStates.getOrElse(subscriptionId) { ConnectionState.NOT_APPLICABLE } var changed = false
if (state !== newState) { subscriptionIds.forEach { subscriptionId ->
if (newState == ConnectionState.NOT_APPLICABLE) { val state = connectionStates.getOrElse(subscriptionId) { ConnectionState.NOT_APPLICABLE }
connectionStates.remove(subscriptionId) if (state !== newState) {
} else { changed = true
connectionStates[subscriptionId] = newState if (newState == ConnectionState.NOT_APPLICABLE) {
connectionStates.remove(subscriptionId)
} else {
connectionStates[subscriptionId] = newState
}
} }
}
if (changed) {
connectionStatesLiveData.postValue(connectionStates) connectionStatesLiveData.postValue(connectionStates)
} }
} }

View file

@ -57,9 +57,15 @@ class ApiService {
} }
} }
fun subscribe(subscriptionId: Long, baseUrl: String, topic: String, since: Long, notify: (Notification) -> Unit, fail: (Exception) -> Unit): Call { fun subscribe(
baseUrl: String,
topics: String,
since: Long,
notify: (topic: String, Notification) -> Unit,
fail: (Exception) -> Unit
): Call {
val sinceVal = if (since == 0L) "all" else since.toString() val sinceVal = if (since == 0L) "all" else since.toString()
val url = topicUrlJson(baseUrl, topic, sinceVal) val url = topicUrlJson(baseUrl, topics, sinceVal)
Log.d(TAG, "Opening subscription connection to $url") Log.d(TAG, "Opening subscription connection to $url")
val request = Request.Builder().url(url).build() val request = Request.Builder().url(url).build()
@ -75,15 +81,16 @@ class ApiService {
val line = source.readUtf8Line() ?: throw Exception("Unexpected response for $url: line is null") val line = source.readUtf8Line() ?: throw Exception("Unexpected response for $url: line is null")
val message = gson.fromJson(line, Message::class.java) val message = gson.fromJson(line, Message::class.java)
if (message.event == EVENT_MESSAGE) { if (message.event == EVENT_MESSAGE) {
val topic = message.topic
val notification = Notification( val notification = Notification(
id = message.id, id = message.id,
subscriptionId = subscriptionId, subscriptionId = 0, // TO BE SET downstream
timestamp = message.time, timestamp = message.time,
message = message.message, message = message.message,
notificationId = Random.nextInt(), notificationId = Random.nextInt(),
deleted = false deleted = false
) )
notify(notification) notify(topic, notification)
} }
} }
} catch (e: Exception) { } catch (e: Exception) {
@ -108,6 +115,7 @@ class ApiService {
val id: String, val id: String,
val time: Long, val time: Long,
val event: String, val event: String,
val topic: String,
val message: String val message: String
) )

View file

@ -0,0 +1,109 @@
package io.heckel.ntfy.msg
import android.util.Log
import io.heckel.ntfy.data.ConnectionState
import io.heckel.ntfy.data.Notification
import io.heckel.ntfy.data.Subscription
import io.heckel.ntfy.data.topicUrl
import kotlinx.coroutines.*
import okhttp3.Call
import java.util.concurrent.atomic.AtomicBoolean
class SubscriberConnection(
private val api: ApiService,
private val baseUrl: String,
private val sinceTime: Long,
private val subscriptions: Map<Long, Subscription>,
private val stateChangeListener: (Collection<Subscription>, ConnectionState) -> Unit,
private val notificationListener: (Subscription, Notification) -> Unit,
private val serviceActive: () -> Boolean
) {
private val topicsStr = subscriptions.values.joinToString(separator = ",") { s -> s.topic }
private val url = topicUrl(baseUrl, topicsStr)
private var since: Long = sinceTime
private lateinit var call: Call
private lateinit var job: Job
fun start(scope: CoroutineScope) {
job = scope.launch(Dispatchers.IO) {
Log.d(TAG, "[$url] Starting connection for subscriptions: $subscriptions")
// Retry-loop: if the connection fails, we retry unless the job or service is cancelled/stopped
var retryMillis = 0L
while (isActive && serviceActive()) {
Log.d(TAG, "[$url] (Re-)starting connection for subscriptions: $subscriptions")
val startTime = System.currentTimeMillis()
val notify = { topic: String, notification: Notification ->
since = notification.timestamp
val subscription = subscriptions.values.first { it.topic == topic }
val notificationWithSubscriptionId = notification.copy(subscriptionId = subscription.id)
notificationListener(subscription, notificationWithSubscriptionId)
}
val failed = AtomicBoolean(false)
val fail = { e: Exception ->
failed.set(true)
stateChangeListener(subscriptions.values, ConnectionState.CONNECTING)
}
// Call /json subscribe endpoint and loop until the call fails, is canceled,
// or the job or service are cancelled/stopped
try {
call = api.subscribe(baseUrl, topicsStr, since, notify, fail)
while (!failed.get() && !call.isCanceled() && isActive && serviceActive()) {
stateChangeListener(subscriptions.values, ConnectionState.CONNECTED)
Log.d(TAG,"[$url] Connection is active (failed=$failed, callCanceled=${call.isCanceled()}, jobActive=$isActive, serviceStarted=${serviceActive()}")
delay(CONNECTION_LOOP_DELAY_MILLIS) // Resumes immediately if job is cancelled
}
} catch (e: Exception) {
Log.e(TAG, "[$url] Connection failed: ${e.message}", e)
if (isActive && serviceActive()) {
// Only update if we're not canceled, otherwise this may lead to races
stateChangeListener(subscriptions.values, ConnectionState.CONNECTING)
}
}
// If we're not cancelled yet, wait little before retrying (incremental back-off)
if (isActive && serviceActive()) {
retryMillis = nextRetryMillis(retryMillis, startTime)
Log.d(TAG, "[$url] Connection failed, retrying connection in ${retryMillis / 1000}s ...")
delay(retryMillis)
}
}
Log.d(TAG, "[$url] Connection job SHUT DOWN")
// FIXME: Do NOT update state here as this can lead to races; this leaks the subscription state map
}
}
fun matches(otherSubscriptions: Map<Long, Subscription>): Boolean {
return subscriptions.keys == otherSubscriptions.keys
}
fun since(): Long {
return since
}
fun cancel() {
Log.d(TAG, "[$url] Cancelling connection")
job?.cancel()
call?.cancel()
}
private fun nextRetryMillis(retryMillis: Long, startTime: Long): Long {
val connectionDurationMillis = System.currentTimeMillis() - startTime
if (connectionDurationMillis > RETRY_RESET_AFTER_MILLIS) {
return RETRY_STEP_MILLIS
} else if (retryMillis + RETRY_STEP_MILLIS >= RETRY_MAX_MILLIS) {
return RETRY_MAX_MILLIS
}
return retryMillis + RETRY_STEP_MILLIS
}
companion object {
private const val TAG = "NtfySubscriberConn"
private const val CONNECTION_LOOP_DELAY_MILLIS = 30_000L
private const val RETRY_STEP_MILLIS = 5_000L
private const val RETRY_MAX_MILLIS = 60_000L
private const val RETRY_RESET_AFTER_MILLIS = 60_000L // Must be larger than CONNECTION_LOOP_DELAY_MILLIS
}
}

View file

@ -17,9 +17,7 @@ import io.heckel.ntfy.data.Subscription
import io.heckel.ntfy.data.topicUrl import io.heckel.ntfy.data.topicUrl
import io.heckel.ntfy.ui.MainActivity import io.heckel.ntfy.ui.MainActivity
import kotlinx.coroutines.* import kotlinx.coroutines.*
import okhttp3.Call
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean
/** /**
* *
@ -31,12 +29,11 @@ class SubscriberService : Service() {
private var wakeLock: PowerManager.WakeLock? = null private var wakeLock: PowerManager.WakeLock? = null
private var isServiceStarted = false private var isServiceStarted = false
private val repository by lazy { (application as Application).repository } private val repository by lazy { (application as Application).repository }
private val jobs = ConcurrentHashMap<Long, Job>() // Subscription ID -> Job private val connections = ConcurrentHashMap<String, SubscriberConnection>() // Base URL -> Connection
private val calls = ConcurrentHashMap<Long, Call>() // Subscription ID -> Cal
private val api = ApiService() private val api = ApiService()
private val notifier = NotificationService(this) private val notifier = NotificationService(this)
private var notificationManager: NotificationManager? = null private var notificationManager: NotificationManager? = null
private var notification: Notification? = null private var serviceNotification: Notification? = null
override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int { override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int {
Log.d(TAG, "onStartCommand executed with startId: $startId") Log.d(TAG, "onStartCommand executed with startId: $startId")
@ -61,9 +58,9 @@ class SubscriberService : Service() {
val title = getString(R.string.channel_subscriber_notification_title) val title = getString(R.string.channel_subscriber_notification_title)
val text = getString(R.string.channel_subscriber_notification_text) val text = getString(R.string.channel_subscriber_notification_text)
notificationManager = createNotificationChannel() notificationManager = createNotificationChannel()
notification = createNotification(title, text) serviceNotification = createNotification(title, text)
startForeground(NOTIFICATION_SERVICE_ID, notification) startForeground(NOTIFICATION_SERVICE_ID, serviceNotification)
} }
override fun onDestroy() { override fun onDestroy() {
@ -73,7 +70,7 @@ class SubscriberService : Service() {
private fun startService() { private fun startService() {
if (isServiceStarted) { if (isServiceStarted) {
launchAndCancelJobs() refreshConnections()
return return
} }
Log.d(TAG, "Starting the foreground service task") Log.d(TAG, "Starting the foreground service task")
@ -84,17 +81,15 @@ class SubscriberService : Service() {
acquire() acquire()
} }
} }
launchAndCancelJobs() refreshConnections()
} }
private fun stopService() { private fun stopService() {
Log.d(TAG, "Stopping the foreground service") Log.d(TAG, "Stopping the foreground service")
// Cancelling all remaining jobs and open HTTP calls // Cancelling all remaining jobs and open HTTP calls
jobs.values.forEach { job -> job.cancel() } connections.values.forEach { connection -> connection.cancel() }
calls.values.forEach { call -> call.cancel() } connections.clear()
jobs.clear()
calls.clear()
// Releasing wake-lock and stopping ourselves // Releasing wake-lock and stopping ourselves
try { try {
@ -113,97 +108,72 @@ class SubscriberService : Service() {
saveServiceState(this, ServiceState.STOPPED) saveServiceState(this, ServiceState.STOPPED)
} }
private fun launchAndCancelJobs() = private fun refreshConnections() =
GlobalScope.launch(Dispatchers.IO) { GlobalScope.launch(Dispatchers.IO) {
val subscriptions = repository.getSubscriptions().filter { s -> s.instant } // Group subscriptions by base URL (Base URL -> Map<SubId -> Sub>.
val subscriptionIds = subscriptions.map { it.id } // There is only one connection per base URL.
val subscriptions = repository.getSubscriptions()
.filter { s -> s.instant }
val subscriptionsByBaseUrl = subscriptions
.groupBy { s -> s.baseUrl }
.mapValues { entry -> entry.value.associateBy { it.id } }
Log.d(TAG, "Refreshing subscriptions") Log.d(TAG, "Refreshing subscriptions")
Log.d(TAG, "- Subscriptions: $subscriptions") Log.d(TAG, "- Subscriptions: $subscriptionsByBaseUrl")
Log.d(TAG, "- Jobs: $jobs") Log.d(TAG, "- Active connections: $connections")
Log.d(TAG, "- HTTP calls: $calls")
subscriptions.forEach { subscription -> // Start new connections and restart connections (if subscriptions have changed)
if (!jobs.containsKey(subscription.id)) { subscriptionsByBaseUrl.forEach { (baseUrl, subscriptions) ->
jobs[subscription.id] = launchJob(this, subscription) val connection = connections[baseUrl]
var since = 0L
if (connection != null && !connection.matches(subscriptions)) {
since = connection.since()
connections.remove(baseUrl)
connection.cancel()
}
if (!connections.containsKey(baseUrl)) {
val serviceActive = { -> isServiceStarted }
val connection = SubscriberConnection(api, baseUrl, since, subscriptions, ::onStateChanged, ::onNotificationReceived, serviceActive)
connections[baseUrl] = connection
connection.start(this)
} }
} }
jobs.keys().toList().forEach { subscriptionId ->
if (!subscriptionIds.contains(subscriptionId)) { // Close connections without subscriptions
cancelJob(subscriptionId) val baseUrls = subscriptionsByBaseUrl.keys
connections.keys().toList().forEach { baseUrl ->
if (!baseUrls.contains(baseUrl)) {
val connection = connections.remove(baseUrl)
connection?.cancel()
} }
} }
if (jobs.size > 0) {
// Update foreground service notification popup
if (connections.size > 0) {
synchronized(this) { synchronized(this) {
val title = getString(R.string.channel_subscriber_notification_title) val title = getString(R.string.channel_subscriber_notification_title)
val text = if (jobs.size == 1) { val text = when (subscriptions.size) {
getString(R.string.channel_subscriber_notification_text_one) 1 -> getString(R.string.channel_subscriber_notification_text_one)
} else { 2 -> getString(R.string.channel_subscriber_notification_text_two)
getString(R.string.channel_subscriber_notification_text_more, jobs.size) 3 -> getString(R.string.channel_subscriber_notification_text_three)
4 -> getString(R.string.channel_subscriber_notification_text_four)
else -> getString(R.string.channel_subscriber_notification_text_more, subscriptions.size)
} }
notification = createNotification(title, text) serviceNotification = createNotification(title, text)
notificationManager?.notify(NOTIFICATION_SERVICE_ID, notification) notificationManager?.notify(NOTIFICATION_SERVICE_ID, serviceNotification)
} }
} }
} }
private fun cancelJob(subscriptionId: Long?) { private fun onStateChanged(subscriptions: Collection<Subscription>, state: ConnectionState) {
Log.d(TAG, "Cancelling job for $subscriptionId") val subscriptionIds = subscriptions.map { it.id }
val job = jobs.remove(subscriptionId) repository.updateState(subscriptionIds, state)
val call = calls.remove(subscriptionId)
job?.cancel()
call?.cancel()
} }
private fun launchJob(scope: CoroutineScope, subscription: Subscription): Job = private fun onNotificationReceived(subscription: Subscription, n: io.heckel.ntfy.data.Notification) {
scope.launch(Dispatchers.IO) {
val url = topicUrl(subscription.baseUrl, subscription.topic)
Log.d(TAG, "[$url] Starting connection job")
// Retry-loop: if the connection fails, we retry unless the job or service is cancelled/stopped
var since = 0L
var retryMillis = 0L
while (isActive && isServiceStarted) {
Log.d(TAG, "[$url] (Re-)starting subscription for $subscription")
val startTime = System.currentTimeMillis()
val notify = { n: io.heckel.ntfy.data.Notification ->
since = n.timestamp
onNotificationReceived(scope, subscription, n)
}
val failed = AtomicBoolean(false)
val fail = { e: Exception ->
failed.set(true)
repository.updateStateIfChanged(subscription.id, ConnectionState.RECONNECTING)
}
// Call /json subscribe endpoint and loop until the call fails, is canceled,
// or the job or service are cancelled/stopped
try {
val call = api.subscribe(subscription.id, subscription.baseUrl, subscription.topic, since, notify, fail)
calls[subscription.id] = call
while (!failed.get() && !call.isCanceled() && isActive && isServiceStarted) {
repository.updateStateIfChanged(subscription.id, ConnectionState.CONNECTED)
Log.d(TAG, "[$url] Connection is active (failed=$failed, callCanceled=${call.isCanceled()}, jobActive=$isActive, serviceStarted=$isServiceStarted")
delay(CONNECTION_LOOP_DELAY_MILLIS) // Resumes immediately if job is cancelled
}
} catch (e: Exception) {
Log.e(TAG, "[$url] Connection failed: ${e.message}", e)
repository.updateStateIfChanged(subscription.id, ConnectionState.RECONNECTING)
}
// If we're not cancelled yet, wait little before retrying (incremental back-off)
if (isActive && isServiceStarted) {
retryMillis = nextRetryMillis(retryMillis, startTime)
Log.d(TAG, "Connection failed, retrying connection in ${retryMillis/1000}s ...")
delay(retryMillis)
}
}
Log.d(TAG, "[$url] Connection job SHUT DOWN")
repository.updateStateIfChanged(subscription.id, ConnectionState.NOT_APPLICABLE)
}
private fun onNotificationReceived(scope: CoroutineScope, subscription: Subscription, n: io.heckel.ntfy.data.Notification) {
val url = topicUrl(subscription.baseUrl, subscription.topic) val url = topicUrl(subscription.baseUrl, subscription.topic)
Log.d(TAG, "[$url] Received notification: $n") Log.d(TAG, "[$url] Received notification: $n")
scope.launch(Dispatchers.IO) { GlobalScope.launch(Dispatchers.IO) {
val added = repository.addNotification(n) val added = repository.addNotification(n)
val detailViewOpen = repository.detailViewSubscriptionId.get() == subscription.id val detailViewOpen = repository.detailViewSubscriptionId.get() == subscription.id
@ -214,16 +184,6 @@ class SubscriberService : Service() {
} }
} }
private fun nextRetryMillis(retryMillis: Long, startTime: Long): Long {
val connectionDurationMillis = System.currentTimeMillis() - startTime
if (connectionDurationMillis > RETRY_RESET_AFTER_MILLIS) {
return RETRY_STEP_MILLIS
} else if (retryMillis + RETRY_STEP_MILLIS >= RETRY_MAX_MILLIS) {
return RETRY_MAX_MILLIS
}
return retryMillis + RETRY_STEP_MILLIS
}
private fun createNotificationChannel(): NotificationManager? { private fun createNotificationChannel(): NotificationManager? {
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) { if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
val notificationManager = getSystemService(Context.NOTIFICATION_SERVICE) as NotificationManager val notificationManager = getSystemService(Context.NOTIFICATION_SERVICE) as NotificationManager
@ -302,10 +262,6 @@ class SubscriberService : Service() {
private const val NOTIFICATION_SERVICE_ID = 2586 private const val NOTIFICATION_SERVICE_ID = 2586
private const val SHARED_PREFS_ID = "SubscriberService" private const val SHARED_PREFS_ID = "SubscriberService"
private const val SHARED_PREFS_SERVICE_STATE = "ServiceState" private const val SHARED_PREFS_SERVICE_STATE = "ServiceState"
private const val CONNECTION_LOOP_DELAY_MILLIS = 30_000L
private const val RETRY_STEP_MILLIS = 5_000L
private const val RETRY_MAX_MILLIS = 60_000L
private const val RETRY_RESET_AFTER_MILLIS = 60_000L // Must be larger than CONNECTION_LOOP_DELAY_MILLIS
fun saveServiceState(context: Context, state: ServiceState) { fun saveServiceState(context: Context, state: ServiceState) {
val sharedPrefs = context.getSharedPreferences(SHARED_PREFS_ID, Context.MODE_PRIVATE) val sharedPrefs = context.getSharedPreferences(SHARED_PREFS_ID, Context.MODE_PRIVATE)

View file

@ -59,7 +59,7 @@ class MainAdapter(private val onClick: (Subscription) -> Unit, private val onLon
} else { } else {
context.getString(R.string.main_item_status_text_not_one, subscription.totalCount) context.getString(R.string.main_item_status_text_not_one, subscription.totalCount)
} }
if (subscription.instant && subscription.state == ConnectionState.RECONNECTING) { if (subscription.instant && subscription.state == ConnectionState.CONNECTING) {
statusMessage += ", " + context.getString(R.string.main_item_status_reconnecting) statusMessage += ", " + context.getString(R.string.main_item_status_reconnecting)
} }
val dateText = if (subscription.lastActive == 0L) { val dateText = if (subscription.lastActive == 0L) {

View file

@ -9,6 +9,9 @@
<string name="channel_subscriber_notification_title">Listening for incoming notifications</string> <string name="channel_subscriber_notification_title">Listening for incoming notifications</string>
<string name="channel_subscriber_notification_text">You are subscribed to instant delivery topics</string> <string name="channel_subscriber_notification_text">You are subscribed to instant delivery topics</string>
<string name="channel_subscriber_notification_text_one">You are subscribed to one instant delivery topic</string> <string name="channel_subscriber_notification_text_one">You are subscribed to one instant delivery topic</string>
<string name="channel_subscriber_notification_text_two">You are subscribed to two instant delivery topics</string>
<string name="channel_subscriber_notification_text_three">You are subscribed to three instant delivery topics</string>
<string name="channel_subscriber_notification_text_four">You are subscribed to four instant delivery topics</string>
<string name="channel_subscriber_notification_text_more">You are subscribed to %1$d instant delivery topics</string> <string name="channel_subscriber_notification_text_more">You are subscribed to %1$d instant delivery topics</string>
<!-- Common refresh toasts --> <!-- Common refresh toasts -->
@ -48,7 +51,7 @@
</string> </string>
<string name="add_dialog_instant_delivery">Instant delivery in doze mode</string> <string name="add_dialog_instant_delivery">Instant delivery in doze mode</string>
<string name="add_dialog_instant_delivery_description"> <string name="add_dialog_instant_delivery_description">
Doze mode sometimes causes delayed message delivery to conserve power. With instant delivery, notifications are Android\'s doze mode sometimes causes delayed message delivery to conserve power. Using this option, notifications are
always immediately delivered. This requires a foreground service and consumes a little more power. always immediately delivered. This requires a foreground service and consumes a little more power.
</string> </string>
<string name="add_dialog_button_cancel">Cancel</string> <string name="add_dialog_button_cancel">Cancel</string>