diff --git a/app/src/main/java/io/heckel/ntfy/data/Database.kt b/app/src/main/java/io/heckel/ntfy/data/Database.kt index 5b1e968..52ce86b 100644 --- a/app/src/main/java/io/heckel/ntfy/data/Database.kt +++ b/app/src/main/java/io/heckel/ntfy/data/Database.kt @@ -21,7 +21,7 @@ data class Subscription( } enum class ConnectionState { - NOT_APPLICABLE, RECONNECTING, CONNECTED + NOT_APPLICABLE, CONNECTING, CONNECTED } data class SubscriptionWithMetadata( diff --git a/app/src/main/java/io/heckel/ntfy/data/Repository.kt b/app/src/main/java/io/heckel/ntfy/data/Repository.kt index 47ecdb6..348b40a 100644 --- a/app/src/main/java/io/heckel/ntfy/data/Repository.kt +++ b/app/src/main/java/io/heckel/ntfy/data/Repository.kt @@ -137,14 +137,20 @@ class Repository(private val subscriptionDao: SubscriptionDao, private val notif ) } - fun updateStateIfChanged(subscriptionId: Long, newState: ConnectionState) { - val state = connectionStates.getOrElse(subscriptionId) { ConnectionState.NOT_APPLICABLE } - if (state !== newState) { - if (newState == ConnectionState.NOT_APPLICABLE) { - connectionStates.remove(subscriptionId) - } else { - connectionStates[subscriptionId] = newState + fun updateState(subscriptionIds: Collection, newState: ConnectionState) { + var changed = false + subscriptionIds.forEach { subscriptionId -> + val state = connectionStates.getOrElse(subscriptionId) { ConnectionState.NOT_APPLICABLE } + if (state !== newState) { + changed = true + if (newState == ConnectionState.NOT_APPLICABLE) { + connectionStates.remove(subscriptionId) + } else { + connectionStates[subscriptionId] = newState + } } + } + if (changed) { connectionStatesLiveData.postValue(connectionStates) } } diff --git a/app/src/main/java/io/heckel/ntfy/msg/ApiService.kt b/app/src/main/java/io/heckel/ntfy/msg/ApiService.kt index 96ab997..002611b 100644 --- a/app/src/main/java/io/heckel/ntfy/msg/ApiService.kt +++ b/app/src/main/java/io/heckel/ntfy/msg/ApiService.kt @@ -57,9 +57,15 @@ class ApiService { } } - fun subscribe(subscriptionId: Long, baseUrl: String, topic: String, since: Long, notify: (Notification) -> Unit, fail: (Exception) -> Unit): Call { + fun subscribe( + baseUrl: String, + topics: String, + since: Long, + notify: (topic: String, Notification) -> Unit, + fail: (Exception) -> Unit + ): Call { val sinceVal = if (since == 0L) "all" else since.toString() - val url = topicUrlJson(baseUrl, topic, sinceVal) + val url = topicUrlJson(baseUrl, topics, sinceVal) Log.d(TAG, "Opening subscription connection to $url") val request = Request.Builder().url(url).build() @@ -75,15 +81,16 @@ class ApiService { val line = source.readUtf8Line() ?: throw Exception("Unexpected response for $url: line is null") val message = gson.fromJson(line, Message::class.java) if (message.event == EVENT_MESSAGE) { + val topic = message.topic val notification = Notification( id = message.id, - subscriptionId = subscriptionId, + subscriptionId = 0, // TO BE SET downstream timestamp = message.time, message = message.message, notificationId = Random.nextInt(), deleted = false ) - notify(notification) + notify(topic, notification) } } } catch (e: Exception) { @@ -108,6 +115,7 @@ class ApiService { val id: String, val time: Long, val event: String, + val topic: String, val message: String ) diff --git a/app/src/main/java/io/heckel/ntfy/msg/SubscriberConnection.kt b/app/src/main/java/io/heckel/ntfy/msg/SubscriberConnection.kt new file mode 100644 index 0000000..9785aed --- /dev/null +++ b/app/src/main/java/io/heckel/ntfy/msg/SubscriberConnection.kt @@ -0,0 +1,109 @@ +package io.heckel.ntfy.msg + +import android.util.Log +import io.heckel.ntfy.data.ConnectionState +import io.heckel.ntfy.data.Notification +import io.heckel.ntfy.data.Subscription +import io.heckel.ntfy.data.topicUrl +import kotlinx.coroutines.* +import okhttp3.Call +import java.util.concurrent.atomic.AtomicBoolean + +class SubscriberConnection( + private val api: ApiService, + private val baseUrl: String, + private val sinceTime: Long, + private val subscriptions: Map, + private val stateChangeListener: (Collection, ConnectionState) -> Unit, + private val notificationListener: (Subscription, Notification) -> Unit, + private val serviceActive: () -> Boolean +) { + private val topicsStr = subscriptions.values.joinToString(separator = ",") { s -> s.topic } + private val url = topicUrl(baseUrl, topicsStr) + + private var since: Long = sinceTime + private lateinit var call: Call + private lateinit var job: Job + + fun start(scope: CoroutineScope) { + job = scope.launch(Dispatchers.IO) { + Log.d(TAG, "[$url] Starting connection for subscriptions: $subscriptions") + + // Retry-loop: if the connection fails, we retry unless the job or service is cancelled/stopped + var retryMillis = 0L + while (isActive && serviceActive()) { + Log.d(TAG, "[$url] (Re-)starting connection for subscriptions: $subscriptions") + val startTime = System.currentTimeMillis() + val notify = { topic: String, notification: Notification -> + since = notification.timestamp + val subscription = subscriptions.values.first { it.topic == topic } + val notificationWithSubscriptionId = notification.copy(subscriptionId = subscription.id) + notificationListener(subscription, notificationWithSubscriptionId) + } + val failed = AtomicBoolean(false) + val fail = { e: Exception -> + failed.set(true) + stateChangeListener(subscriptions.values, ConnectionState.CONNECTING) + } + + // Call /json subscribe endpoint and loop until the call fails, is canceled, + // or the job or service are cancelled/stopped + try { + call = api.subscribe(baseUrl, topicsStr, since, notify, fail) + while (!failed.get() && !call.isCanceled() && isActive && serviceActive()) { + stateChangeListener(subscriptions.values, ConnectionState.CONNECTED) + Log.d(TAG,"[$url] Connection is active (failed=$failed, callCanceled=${call.isCanceled()}, jobActive=$isActive, serviceStarted=${serviceActive()}") + delay(CONNECTION_LOOP_DELAY_MILLIS) // Resumes immediately if job is cancelled + } + } catch (e: Exception) { + Log.e(TAG, "[$url] Connection failed: ${e.message}", e) + if (isActive && serviceActive()) { + // Only update if we're not canceled, otherwise this may lead to races + stateChangeListener(subscriptions.values, ConnectionState.CONNECTING) + } + } + + // If we're not cancelled yet, wait little before retrying (incremental back-off) + if (isActive && serviceActive()) { + retryMillis = nextRetryMillis(retryMillis, startTime) + Log.d(TAG, "[$url] Connection failed, retrying connection in ${retryMillis / 1000}s ...") + delay(retryMillis) + } + } + Log.d(TAG, "[$url] Connection job SHUT DOWN") + // FIXME: Do NOT update state here as this can lead to races; this leaks the subscription state map + } + } + + fun matches(otherSubscriptions: Map): Boolean { + return subscriptions.keys == otherSubscriptions.keys + } + + fun since(): Long { + return since + } + + fun cancel() { + Log.d(TAG, "[$url] Cancelling connection") + job?.cancel() + call?.cancel() + } + + private fun nextRetryMillis(retryMillis: Long, startTime: Long): Long { + val connectionDurationMillis = System.currentTimeMillis() - startTime + if (connectionDurationMillis > RETRY_RESET_AFTER_MILLIS) { + return RETRY_STEP_MILLIS + } else if (retryMillis + RETRY_STEP_MILLIS >= RETRY_MAX_MILLIS) { + return RETRY_MAX_MILLIS + } + return retryMillis + RETRY_STEP_MILLIS + } + + companion object { + private const val TAG = "NtfySubscriberConn" + private const val CONNECTION_LOOP_DELAY_MILLIS = 30_000L + private const val RETRY_STEP_MILLIS = 5_000L + private const val RETRY_MAX_MILLIS = 60_000L + private const val RETRY_RESET_AFTER_MILLIS = 60_000L // Must be larger than CONNECTION_LOOP_DELAY_MILLIS + } +} diff --git a/app/src/main/java/io/heckel/ntfy/msg/SubscriberService.kt b/app/src/main/java/io/heckel/ntfy/msg/SubscriberService.kt index e927b2b..348bfcd 100644 --- a/app/src/main/java/io/heckel/ntfy/msg/SubscriberService.kt +++ b/app/src/main/java/io/heckel/ntfy/msg/SubscriberService.kt @@ -17,9 +17,7 @@ import io.heckel.ntfy.data.Subscription import io.heckel.ntfy.data.topicUrl import io.heckel.ntfy.ui.MainActivity import kotlinx.coroutines.* -import okhttp3.Call import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.atomic.AtomicBoolean /** * @@ -31,12 +29,11 @@ class SubscriberService : Service() { private var wakeLock: PowerManager.WakeLock? = null private var isServiceStarted = false private val repository by lazy { (application as Application).repository } - private val jobs = ConcurrentHashMap() // Subscription ID -> Job - private val calls = ConcurrentHashMap() // Subscription ID -> Cal + private val connections = ConcurrentHashMap() // Base URL -> Connection private val api = ApiService() private val notifier = NotificationService(this) private var notificationManager: NotificationManager? = null - private var notification: Notification? = null + private var serviceNotification: Notification? = null override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int { Log.d(TAG, "onStartCommand executed with startId: $startId") @@ -61,9 +58,9 @@ class SubscriberService : Service() { val title = getString(R.string.channel_subscriber_notification_title) val text = getString(R.string.channel_subscriber_notification_text) notificationManager = createNotificationChannel() - notification = createNotification(title, text) + serviceNotification = createNotification(title, text) - startForeground(NOTIFICATION_SERVICE_ID, notification) + startForeground(NOTIFICATION_SERVICE_ID, serviceNotification) } override fun onDestroy() { @@ -73,7 +70,7 @@ class SubscriberService : Service() { private fun startService() { if (isServiceStarted) { - launchAndCancelJobs() + refreshConnections() return } Log.d(TAG, "Starting the foreground service task") @@ -84,17 +81,15 @@ class SubscriberService : Service() { acquire() } } - launchAndCancelJobs() + refreshConnections() } private fun stopService() { Log.d(TAG, "Stopping the foreground service") // Cancelling all remaining jobs and open HTTP calls - jobs.values.forEach { job -> job.cancel() } - calls.values.forEach { call -> call.cancel() } - jobs.clear() - calls.clear() + connections.values.forEach { connection -> connection.cancel() } + connections.clear() // Releasing wake-lock and stopping ourselves try { @@ -113,97 +108,72 @@ class SubscriberService : Service() { saveServiceState(this, ServiceState.STOPPED) } - private fun launchAndCancelJobs() = + private fun refreshConnections() = GlobalScope.launch(Dispatchers.IO) { - val subscriptions = repository.getSubscriptions().filter { s -> s.instant } - val subscriptionIds = subscriptions.map { it.id } + // Group subscriptions by base URL (Base URL -> Map Sub>. + // There is only one connection per base URL. + val subscriptions = repository.getSubscriptions() + .filter { s -> s.instant } + val subscriptionsByBaseUrl = subscriptions + .groupBy { s -> s.baseUrl } + .mapValues { entry -> entry.value.associateBy { it.id } } + Log.d(TAG, "Refreshing subscriptions") - Log.d(TAG, "- Subscriptions: $subscriptions") - Log.d(TAG, "- Jobs: $jobs") - Log.d(TAG, "- HTTP calls: $calls") - subscriptions.forEach { subscription -> - if (!jobs.containsKey(subscription.id)) { - jobs[subscription.id] = launchJob(this, subscription) + Log.d(TAG, "- Subscriptions: $subscriptionsByBaseUrl") + Log.d(TAG, "- Active connections: $connections") + + // Start new connections and restart connections (if subscriptions have changed) + subscriptionsByBaseUrl.forEach { (baseUrl, subscriptions) -> + val connection = connections[baseUrl] + var since = 0L + if (connection != null && !connection.matches(subscriptions)) { + since = connection.since() + connections.remove(baseUrl) + connection.cancel() + } + if (!connections.containsKey(baseUrl)) { + val serviceActive = { -> isServiceStarted } + val connection = SubscriberConnection(api, baseUrl, since, subscriptions, ::onStateChanged, ::onNotificationReceived, serviceActive) + connections[baseUrl] = connection + connection.start(this) } } - jobs.keys().toList().forEach { subscriptionId -> - if (!subscriptionIds.contains(subscriptionId)) { - cancelJob(subscriptionId) + + // Close connections without subscriptions + val baseUrls = subscriptionsByBaseUrl.keys + connections.keys().toList().forEach { baseUrl -> + if (!baseUrls.contains(baseUrl)) { + val connection = connections.remove(baseUrl) + connection?.cancel() } } - if (jobs.size > 0) { + + // Update foreground service notification popup + if (connections.size > 0) { synchronized(this) { val title = getString(R.string.channel_subscriber_notification_title) - val text = if (jobs.size == 1) { - getString(R.string.channel_subscriber_notification_text_one) - } else { - getString(R.string.channel_subscriber_notification_text_more, jobs.size) + val text = when (subscriptions.size) { + 1 -> getString(R.string.channel_subscriber_notification_text_one) + 2 -> getString(R.string.channel_subscriber_notification_text_two) + 3 -> getString(R.string.channel_subscriber_notification_text_three) + 4 -> getString(R.string.channel_subscriber_notification_text_four) + else -> getString(R.string.channel_subscriber_notification_text_more, subscriptions.size) } - notification = createNotification(title, text) - notificationManager?.notify(NOTIFICATION_SERVICE_ID, notification) + serviceNotification = createNotification(title, text) + notificationManager?.notify(NOTIFICATION_SERVICE_ID, serviceNotification) } } } - private fun cancelJob(subscriptionId: Long?) { - Log.d(TAG, "Cancelling job for $subscriptionId") - val job = jobs.remove(subscriptionId) - val call = calls.remove(subscriptionId) - job?.cancel() - call?.cancel() + private fun onStateChanged(subscriptions: Collection, state: ConnectionState) { + val subscriptionIds = subscriptions.map { it.id } + repository.updateState(subscriptionIds, state) } - private fun launchJob(scope: CoroutineScope, subscription: Subscription): Job = - scope.launch(Dispatchers.IO) { - val url = topicUrl(subscription.baseUrl, subscription.topic) - Log.d(TAG, "[$url] Starting connection job") - - // Retry-loop: if the connection fails, we retry unless the job or service is cancelled/stopped - var since = 0L - var retryMillis = 0L - while (isActive && isServiceStarted) { - Log.d(TAG, "[$url] (Re-)starting subscription for $subscription") - val startTime = System.currentTimeMillis() - val notify = { n: io.heckel.ntfy.data.Notification -> - since = n.timestamp - onNotificationReceived(scope, subscription, n) - } - val failed = AtomicBoolean(false) - val fail = { e: Exception -> - failed.set(true) - repository.updateStateIfChanged(subscription.id, ConnectionState.RECONNECTING) - } - - // Call /json subscribe endpoint and loop until the call fails, is canceled, - // or the job or service are cancelled/stopped - try { - val call = api.subscribe(subscription.id, subscription.baseUrl, subscription.topic, since, notify, fail) - calls[subscription.id] = call - while (!failed.get() && !call.isCanceled() && isActive && isServiceStarted) { - repository.updateStateIfChanged(subscription.id, ConnectionState.CONNECTED) - Log.d(TAG, "[$url] Connection is active (failed=$failed, callCanceled=${call.isCanceled()}, jobActive=$isActive, serviceStarted=$isServiceStarted") - delay(CONNECTION_LOOP_DELAY_MILLIS) // Resumes immediately if job is cancelled - } - } catch (e: Exception) { - Log.e(TAG, "[$url] Connection failed: ${e.message}", e) - repository.updateStateIfChanged(subscription.id, ConnectionState.RECONNECTING) - } - - // If we're not cancelled yet, wait little before retrying (incremental back-off) - if (isActive && isServiceStarted) { - retryMillis = nextRetryMillis(retryMillis, startTime) - Log.d(TAG, "Connection failed, retrying connection in ${retryMillis/1000}s ...") - delay(retryMillis) - } - } - Log.d(TAG, "[$url] Connection job SHUT DOWN") - repository.updateStateIfChanged(subscription.id, ConnectionState.NOT_APPLICABLE) - } - - private fun onNotificationReceived(scope: CoroutineScope, subscription: Subscription, n: io.heckel.ntfy.data.Notification) { + private fun onNotificationReceived(subscription: Subscription, n: io.heckel.ntfy.data.Notification) { val url = topicUrl(subscription.baseUrl, subscription.topic) Log.d(TAG, "[$url] Received notification: $n") - scope.launch(Dispatchers.IO) { + GlobalScope.launch(Dispatchers.IO) { val added = repository.addNotification(n) val detailViewOpen = repository.detailViewSubscriptionId.get() == subscription.id @@ -214,16 +184,6 @@ class SubscriberService : Service() { } } - private fun nextRetryMillis(retryMillis: Long, startTime: Long): Long { - val connectionDurationMillis = System.currentTimeMillis() - startTime - if (connectionDurationMillis > RETRY_RESET_AFTER_MILLIS) { - return RETRY_STEP_MILLIS - } else if (retryMillis + RETRY_STEP_MILLIS >= RETRY_MAX_MILLIS) { - return RETRY_MAX_MILLIS - } - return retryMillis + RETRY_STEP_MILLIS - } - private fun createNotificationChannel(): NotificationManager? { if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) { val notificationManager = getSystemService(Context.NOTIFICATION_SERVICE) as NotificationManager @@ -302,10 +262,6 @@ class SubscriberService : Service() { private const val NOTIFICATION_SERVICE_ID = 2586 private const val SHARED_PREFS_ID = "SubscriberService" private const val SHARED_PREFS_SERVICE_STATE = "ServiceState" - private const val CONNECTION_LOOP_DELAY_MILLIS = 30_000L - private const val RETRY_STEP_MILLIS = 5_000L - private const val RETRY_MAX_MILLIS = 60_000L - private const val RETRY_RESET_AFTER_MILLIS = 60_000L // Must be larger than CONNECTION_LOOP_DELAY_MILLIS fun saveServiceState(context: Context, state: ServiceState) { val sharedPrefs = context.getSharedPreferences(SHARED_PREFS_ID, Context.MODE_PRIVATE) diff --git a/app/src/main/java/io/heckel/ntfy/ui/MainAdapter.kt b/app/src/main/java/io/heckel/ntfy/ui/MainAdapter.kt index 49c258b..ade97dd 100644 --- a/app/src/main/java/io/heckel/ntfy/ui/MainAdapter.kt +++ b/app/src/main/java/io/heckel/ntfy/ui/MainAdapter.kt @@ -59,7 +59,7 @@ class MainAdapter(private val onClick: (Subscription) -> Unit, private val onLon } else { context.getString(R.string.main_item_status_text_not_one, subscription.totalCount) } - if (subscription.instant && subscription.state == ConnectionState.RECONNECTING) { + if (subscription.instant && subscription.state == ConnectionState.CONNECTING) { statusMessage += ", " + context.getString(R.string.main_item_status_reconnecting) } val dateText = if (subscription.lastActive == 0L) { diff --git a/app/src/main/res/values/strings.xml b/app/src/main/res/values/strings.xml index 25068bb..2340fcf 100644 --- a/app/src/main/res/values/strings.xml +++ b/app/src/main/res/values/strings.xml @@ -9,6 +9,9 @@ Listening for incoming notifications You are subscribed to instant delivery topics You are subscribed to one instant delivery topic + You are subscribed to two instant delivery topics + You are subscribed to three instant delivery topics + You are subscribed to four instant delivery topics You are subscribed to %1$d instant delivery topics @@ -48,7 +51,7 @@ Instant delivery in doze mode - Doze mode sometimes causes delayed message delivery to conserve power. With instant delivery, notifications are + Android\'s doze mode sometimes causes delayed message delivery to conserve power. Using this option, notifications are always immediately delivered. This requires a foreground service and consumes a little more power. Cancel