From 26b408c828e06568f039c417e0aee6c2e08c2757 Mon Sep 17 00:00:00 2001 From: Philipp Heckel Date: Sat, 15 Jan 2022 13:31:34 -0500 Subject: [PATCH] WIP: Websockets --- .../java/io/heckel/ntfy/msg/ApiService.kt | 4 +- ...scriberConnection.kt => JsonConnection.kt} | 13 +- .../heckel/ntfy/service/SubscriberService.kt | 19 +- .../io/heckel/ntfy/service/WsConnection.kt | 207 ++++++++++++++++++ app/src/main/java/io/heckel/ntfy/util/Util.kt | 1 + 5 files changed, 233 insertions(+), 11 deletions(-) rename app/src/main/java/io/heckel/ntfy/service/{SubscriberConnection.kt => JsonConnection.kt} (95%) create mode 100644 app/src/main/java/io/heckel/ntfy/service/WsConnection.kt diff --git a/app/src/main/java/io/heckel/ntfy/msg/ApiService.kt b/app/src/main/java/io/heckel/ntfy/msg/ApiService.kt index a39069a..3058a65 100644 --- a/app/src/main/java/io/heckel/ntfy/msg/ApiService.kt +++ b/app/src/main/java/io/heckel/ntfy/msg/ApiService.kt @@ -173,7 +173,7 @@ class ApiService { /* This annotation ensures that proguard still works in production builds, * see https://stackoverflow.com/a/62753300/1440785 */ @Keep - private data class Message( + data class Message( val id: String, val time: Long, val event: String, @@ -187,7 +187,7 @@ class ApiService { ) @Keep - private data class MessageAttachment( + data class MessageAttachment( val name: String, val type: String?, val size: Long?, diff --git a/app/src/main/java/io/heckel/ntfy/service/SubscriberConnection.kt b/app/src/main/java/io/heckel/ntfy/service/JsonConnection.kt similarity index 95% rename from app/src/main/java/io/heckel/ntfy/service/SubscriberConnection.kt rename to app/src/main/java/io/heckel/ntfy/service/JsonConnection.kt index 9a70b8b..f2b1fd3 100644 --- a/app/src/main/java/io/heckel/ntfy/service/SubscriberConnection.kt +++ b/app/src/main/java/io/heckel/ntfy/service/JsonConnection.kt @@ -11,7 +11,8 @@ import kotlinx.coroutines.* import okhttp3.Call import java.util.concurrent.atomic.AtomicBoolean -class SubscriberConnection( +class JsonConnection( + private val scope: CoroutineScope, private val repository: Repository, private val api: ApiService, private val baseUrl: String, @@ -20,7 +21,7 @@ class SubscriberConnection( private val stateChangeListener: (Collection, ConnectionState) -> Unit, private val notificationListener: (Subscription, Notification) -> Unit, private val serviceActive: () -> Boolean -) { +) : Connection { private val subscriptionIds = topicsToSubscriptionIds.values private val topicsStr = topicsToSubscriptionIds.keys.joinToString(separator = ",") private val url = topicUrl(baseUrl, topicsStr) @@ -29,7 +30,7 @@ class SubscriberConnection( private lateinit var call: Call private lateinit var job: Job - fun start(scope: CoroutineScope) { + override fun start() { job = scope.launch(Dispatchers.IO) { Log.d(TAG, "[$url] Starting connection for subscriptions: $topicsToSubscriptionIds") @@ -81,17 +82,17 @@ class SubscriberConnection( } } - fun since(): Long { + override fun since(): Long { return since } - fun cancel() { + override fun cancel() { Log.d(TAG, "[$url] Cancelling connection") if (this::job.isInitialized) job?.cancel() if (this::call.isInitialized) call?.cancel() } - fun matches(otherSubscriptionIds: Collection): Boolean { + override fun matches(otherSubscriptionIds: Collection): Boolean { return subscriptionIds.toSet() == otherSubscriptionIds.toSet() } diff --git a/app/src/main/java/io/heckel/ntfy/service/SubscriberService.kt b/app/src/main/java/io/heckel/ntfy/service/SubscriberService.kt index 882d250..6f65d95 100644 --- a/app/src/main/java/io/heckel/ntfy/service/SubscriberService.kt +++ b/app/src/main/java/io/heckel/ntfy/service/SubscriberService.kt @@ -54,12 +54,20 @@ import java.util.concurrent.ConcurrentHashMap * - https://github.com/robertohuertasm/endless-service/blob/master/app/src/main/java/com/robertohuertas/endless/EndlessService.kt * - https://gist.github.com/varunon9/f2beec0a743c96708eb0ef971a9ff9cd */ + +interface Connection { + fun start() + fun cancel() + fun since(): Long + fun matches(otherSubscriptionIds: Collection): Boolean +} + class SubscriberService : Service() { private var wakeLock: PowerManager.WakeLock? = null private var isServiceStarted = false private val repository by lazy { (application as Application).repository } private val dispatcher by lazy { NotificationDispatcher(this, repository) } - private val connections = ConcurrentHashMap() // Base URL -> Connection + private val connections = ConcurrentHashMap() // Base URL -> Connection private val api = ApiService() private var notificationManager: NotificationManager? = null private var serviceNotification: Notification? = null @@ -174,9 +182,14 @@ class SubscriberService : Service() { } if (!connections.containsKey(baseUrl)) { val serviceActive = { -> isServiceStarted } - val connection = SubscriberConnection(repository, api, baseUrl, since, subscriptions, ::onStateChanged, ::onNotificationReceived, serviceActive) + val connection = if (true) { + val alarmManager = getSystemService(ALARM_SERVICE) as AlarmManager + WsConnection(repository, baseUrl, since, subscriptions, ::onStateChanged, ::onNotificationReceived, alarmManager) + } else { + JsonConnection(this, repository, api, baseUrl, since, subscriptions, ::onStateChanged, ::onNotificationReceived, serviceActive) + } connections[baseUrl] = connection - connection.start(this) + connection.start() } } diff --git a/app/src/main/java/io/heckel/ntfy/service/WsConnection.kt b/app/src/main/java/io/heckel/ntfy/service/WsConnection.kt new file mode 100644 index 0000000..fa15629 --- /dev/null +++ b/app/src/main/java/io/heckel/ntfy/service/WsConnection.kt @@ -0,0 +1,207 @@ +package io.heckel.ntfy.service + +import android.app.AlarmManager +import android.os.Build +import android.util.Log +import com.google.gson.Gson +import io.heckel.ntfy.data.* +import io.heckel.ntfy.msg.ApiService +import io.heckel.ntfy.util.joinTags +import io.heckel.ntfy.util.toPriority +import io.heckel.ntfy.util.topicUrlWs +import okhttp3.* +import java.util.* +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicLong +import kotlin.random.Random + +class WsConnection( + private val repository: Repository, + private val baseUrl: String, + private val sinceTime: Long, + private val topicsToSubscriptionIds: Map, // Topic -> Subscription ID + private val stateChangeListener: (Collection, ConnectionState) -> Unit, + private val notificationListener: (Subscription, Notification) -> Unit, + private val alarmManager: AlarmManager +) : Connection { + private val client: OkHttpClient + //private val reconnectHandler = Handler() + //private val reconnectCallback = Runnable { start() } + private var errorCount = 0 + private var webSocket: WebSocket? = null + private var state: State? = null + private val gson = Gson() + + private val subscriptionIds = topicsToSubscriptionIds.values + private val topicsStr = topicsToSubscriptionIds.keys.joinToString(separator = ",") + private val sinceVal = if (sinceTime == 0L) "all" else sinceTime.toString() + private val wsurl = topicUrlWs(baseUrl, topicsStr, sinceVal) + + init { + val builder = OkHttpClient.Builder() + .readTimeout(0, TimeUnit.MILLISECONDS) + //.pingInterval(1, TimeUnit.MINUTES) + .pingInterval(30, TimeUnit.SECONDS) + .connectTimeout(10, TimeUnit.SECONDS) + client = builder.build() + } + + private fun request(): Request { + return Request.Builder() + .url(wsurl) + .get() + .build() + } + + @Synchronized + override fun start() { + if (state == State.Connecting || state == State.Connected) { + return + } + cancel() + state = State.Connecting + val nextId = ID.incrementAndGet() + Log.d(TAG, "WebSocket($nextId): starting...") + webSocket = client.newWebSocket(request(), Listener(nextId)) + } + + @Synchronized + override fun cancel() { + if (webSocket != null) { + Log.d(TAG, "WebSocket(" + ID.get() + "): closing existing connection.") + state = State.Disconnected + webSocket!!.close(1000, "") + webSocket = null + } + } + + override fun since(): Long { + return 0L + } + + override fun matches(otherSubscriptionIds: Collection): Boolean { + return subscriptionIds.toSet() == otherSubscriptionIds.toSet() + } + + @Synchronized + fun scheduleReconnect(seconds: Long) { + if (state == State.Connecting || state == State.Connected) { + return + } + state = State.Scheduled + if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) { + Log.d(TAG, + "WebSocket: scheduling a restart in " + + seconds + + " second(s) (via alarm manager)" + ) + val future = Calendar.getInstance() + future.add(Calendar.SECOND, seconds.toInt()) + alarmManager.setExact( + AlarmManager.RTC_WAKEUP, + future.timeInMillis, + "reconnect-tag", { start() }, + null + ) + } else { + Log.d(TAG, "WebSocket: scheduling a restart in $seconds second(s)") + //reconnectHandler.removeCallbacks(reconnectCallback) + //reconnectHandler.postDelayed(reconnectCallback, TimeUnit.SECONDS.toMillis(seconds)) + } + } + + private inner class Listener(private val id: Long) : WebSocketListener() { + override fun onOpen(webSocket: WebSocket, response: Response) { + syncExec { + state = State.Connected + Log.d(TAG, "WebSocket(" + id + "): opened") + if (errorCount > 0) { + Log.d(TAG, "reconnected") + errorCount = 0 + } + stateChangeListener(subscriptionIds, ConnectionState.CONNECTED) + } + } + + override fun onMessage(webSocket: WebSocket, text: String) { + syncExec { + Log.d(TAG, "WebSocket(" + id + "): received message " + text) + val message = gson.fromJson(text, ApiService.Message::class.java) + if (message.event == ApiService.EVENT_MESSAGE) { + val topic = message.topic + val attachment = if (message.attachment?.url != null) { + Attachment( + name = message.attachment.name, + type = message.attachment.type, + size = message.attachment.size, + expires = message.attachment.expires, + url = message.attachment.url, + ) + } else null + val notification = Notification( + id = message.id, + subscriptionId = 0, // TO BE SET downstream + timestamp = message.time, + title = message.title ?: "", + message = message.message, + priority = toPriority(message.priority), + tags = joinTags(message.tags), + click = message.click ?: "", + attachment = attachment, + notificationId = Random.nextInt(), + deleted = false + ) + val subscriptionId = topicsToSubscriptionIds[topic] ?: return@syncExec + val subscription = repository.getSubscription(subscriptionId) ?: return@syncExec + val notificationWithSubscriptionId = notification.copy(subscriptionId = subscription.id) + notificationListener(subscription, notificationWithSubscriptionId) + } + } + } + + override fun onClosed(webSocket: WebSocket, code: Int, reason: String) { + syncExec { + if (state == State.Connected) { + Log.w(TAG, "WebSocket(" + id + "): closed") + } + state = State.Disconnected + } + } + + override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) { + val code = if (response != null) "StatusCode: " + response.code else "" + val message = response?.message ?: "" + Log.e(TAG, "WebSocket($id): failure $code Message: $message", t) + syncExec { + stateChangeListener(subscriptionIds, ConnectionState.CONNECTING) + state = State.Disconnected + if ((response != null) && (response.code >= 400) && (response.code <= 499)) { + Log.d(TAG, "bad request") + cancel() + return@syncExec + } + errorCount++ + val minutes: Int = Math.min(errorCount * 2 - 1, 20) + //scheduleReconnect(TimeUnit.MINUTES.toSeconds(minutes.toLong())) + scheduleReconnect(30) + } + } + + private fun syncExec(runnable: Runnable) { + synchronized(this) { + if (ID.get() == id) { + runnable.run() + } + } + } + } + + internal enum class State { + Scheduled, Connecting, Connected, Disconnected + } + + companion object { + private const val TAG = "NtfyWsConnection" + private val ID = AtomicLong(0) + } +} diff --git a/app/src/main/java/io/heckel/ntfy/util/Util.kt b/app/src/main/java/io/heckel/ntfy/util/Util.kt index f224abb..583e9ec 100644 --- a/app/src/main/java/io/heckel/ntfy/util/Util.kt +++ b/app/src/main/java/io/heckel/ntfy/util/Util.kt @@ -17,6 +17,7 @@ import kotlin.math.abs fun topicUrl(baseUrl: String, topic: String) = "${baseUrl}/${topic}" fun topicUrlUp(baseUrl: String, topic: String) = "${baseUrl}/${topic}?up=1" // UnifiedPush fun topicUrlJson(baseUrl: String, topic: String, since: String) = "${topicUrl(baseUrl, topic)}/json?since=$since" +fun topicUrlWs(baseUrl: String, topic: String, since: String) = "${topicUrl(baseUrl, topic)}/ws?since=$since" fun topicUrlJsonPoll(baseUrl: String, topic: String, since: String) = "${topicUrl(baseUrl, topic)}/json?poll=1&since=$since" fun topicShortUrl(baseUrl: String, topic: String) = topicUrl(baseUrl, topic)