WIP: Websockets
This commit is contained in:
5 changed files with 233 additions and 11 deletions
@ -173,7 +173,7 @@ class ApiService {
/* This annotation ensures that proguard still works in production builds,
* see https://stackoverflow.com/a/62753300/1440785 */
private data class Message(
data class Message(
val id: String,
val time: Long,
val event: String,
@ -187,7 +187,7 @@ class ApiService {
private data class MessageAttachment(
data class MessageAttachment(
val name: String,
val type: String?,
val size: Long?,
@ -11,7 +11,8 @@ import kotlinx.coroutines.*
import okhttp3.Call
import java.util.concurrent.atomic.AtomicBoolean
class SubscriberConnection(
class JsonConnection(
private val scope: CoroutineScope,
private val repository: Repository,
private val api: ApiService,
private val baseUrl: String,
@ -20,7 +21,7 @@ class SubscriberConnection(
private val stateChangeListener: (Collection<Long>, ConnectionState) -> Unit,
private val notificationListener: (Subscription, Notification) -> Unit,
private val serviceActive: () -> Boolean
) {
) : Connection {
private val subscriptionIds = topicsToSubscriptionIds.values
private val topicsStr = topicsToSubscriptionIds.keys.joinToString(separator = ",")
private val url = topicUrl(baseUrl, topicsStr)
@ -29,7 +30,7 @@ class SubscriberConnection(
private lateinit var call: Call
private lateinit var job: Job
fun start(scope: CoroutineScope) {
override fun start() {
job = scope.launch(Dispatchers.IO) {
Log.d(TAG, "[$url] Starting connection for subscriptions: $topicsToSubscriptionIds")
@ -81,17 +82,17 @@ class SubscriberConnection(
fun since(): Long {
override fun since(): Long {
return since
fun cancel() {
override fun cancel() {
Log.d(TAG, "[$url] Cancelling connection")
if (this::job.isInitialized) job?.cancel()
if (this::call.isInitialized) call?.cancel()
fun matches(otherSubscriptionIds: Collection<Long>): Boolean {
override fun matches(otherSubscriptionIds: Collection<Long>): Boolean {
return subscriptionIds.toSet() == otherSubscriptionIds.toSet()
@ -54,12 +54,20 @@ import java.util.concurrent.ConcurrentHashMap
* - https://github.com/robertohuertasm/endless-service/blob/master/app/src/main/java/com/robertohuertas/endless/EndlessService.kt
* - https://gist.github.com/varunon9/f2beec0a743c96708eb0ef971a9ff9cd
interface Connection {
fun start()
fun cancel()
fun since(): Long
fun matches(otherSubscriptionIds: Collection<Long>): Boolean
class SubscriberService : Service() {
private var wakeLock: PowerManager.WakeLock? = null
private var isServiceStarted = false
private val repository by lazy { (application as Application).repository }
private val dispatcher by lazy { NotificationDispatcher(this, repository) }
private val connections = ConcurrentHashMap<String, SubscriberConnection>() // Base URL -> Connection
private val connections = ConcurrentHashMap<String, Connection>() // Base URL -> Connection
private val api = ApiService()
private var notificationManager: NotificationManager? = null
private var serviceNotification: Notification? = null
@ -174,9 +182,14 @@ class SubscriberService : Service() {
if (!connections.containsKey(baseUrl)) {
val serviceActive = { -> isServiceStarted }
val connection = SubscriberConnection(repository, api, baseUrl, since, subscriptions, ::onStateChanged, ::onNotificationReceived, serviceActive)
val connection = if (true) {
val alarmManager = getSystemService(ALARM_SERVICE) as AlarmManager
WsConnection(repository, baseUrl, since, subscriptions, ::onStateChanged, ::onNotificationReceived, alarmManager)
} else {
JsonConnection(this, repository, api, baseUrl, since, subscriptions, ::onStateChanged, ::onNotificationReceived, serviceActive)
connections[baseUrl] = connection
Normal file
Normal file
@ -0,0 +1,207 @@
package io.heckel.ntfy.service
import android.app.AlarmManager
import android.os.Build
import android.util.Log
import com.google.gson.Gson
import io.heckel.ntfy.data.*
import io.heckel.ntfy.msg.ApiService
import io.heckel.ntfy.util.joinTags
import io.heckel.ntfy.util.toPriority
import io.heckel.ntfy.util.topicUrlWs
import okhttp3.*
import java.util.*
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
import kotlin.random.Random
class WsConnection(
private val repository: Repository,
private val baseUrl: String,
private val sinceTime: Long,
private val topicsToSubscriptionIds: Map<String, Long>, // Topic -> Subscription ID
private val stateChangeListener: (Collection<Long>, ConnectionState) -> Unit,
private val notificationListener: (Subscription, Notification) -> Unit,
private val alarmManager: AlarmManager
) : Connection {
private val client: OkHttpClient
//private val reconnectHandler = Handler()
//private val reconnectCallback = Runnable { start() }
private var errorCount = 0
private var webSocket: WebSocket? = null
private var state: State? = null
private val gson = Gson()
private val subscriptionIds = topicsToSubscriptionIds.values
private val topicsStr = topicsToSubscriptionIds.keys.joinToString(separator = ",")
private val sinceVal = if (sinceTime == 0L) "all" else sinceTime.toString()
private val wsurl = topicUrlWs(baseUrl, topicsStr, sinceVal)
init {
val builder = OkHttpClient.Builder()
.readTimeout(0, TimeUnit.MILLISECONDS)
//.pingInterval(1, TimeUnit.MINUTES)
.pingInterval(30, TimeUnit.SECONDS)
.connectTimeout(10, TimeUnit.SECONDS)
client = builder.build()
private fun request(): Request {
return Request.Builder()
override fun start() {
if (state == State.Connecting || state == State.Connected) {
state = State.Connecting
val nextId = ID.incrementAndGet()
Log.d(TAG, "WebSocket($nextId): starting...")
webSocket = client.newWebSocket(request(), Listener(nextId))
override fun cancel() {
if (webSocket != null) {
Log.d(TAG, "WebSocket(" + ID.get() + "): closing existing connection.")
state = State.Disconnected
webSocket!!.close(1000, "")
webSocket = null
override fun since(): Long {
return 0L
override fun matches(otherSubscriptionIds: Collection<Long>): Boolean {
return subscriptionIds.toSet() == otherSubscriptionIds.toSet()
fun scheduleReconnect(seconds: Long) {
if (state == State.Connecting || state == State.Connected) {
state = State.Scheduled
"WebSocket: scheduling a restart in "
+ seconds
+ " second(s) (via alarm manager)"
val future = Calendar.getInstance()
future.add(Calendar.SECOND, seconds.toInt())
"reconnect-tag", { start() },
} else {
Log.d(TAG, "WebSocket: scheduling a restart in $seconds second(s)")
//reconnectHandler.postDelayed(reconnectCallback, TimeUnit.SECONDS.toMillis(seconds))
private inner class Listener(private val id: Long) : WebSocketListener() {
override fun onOpen(webSocket: WebSocket, response: Response) {
syncExec {
state = State.Connected
Log.d(TAG, "WebSocket(" + id + "): opened")
if (errorCount > 0) {
Log.d(TAG, "reconnected")
errorCount = 0
stateChangeListener(subscriptionIds, ConnectionState.CONNECTED)
override fun onMessage(webSocket: WebSocket, text: String) {
syncExec {
Log.d(TAG, "WebSocket(" + id + "): received message " + text)
val message = gson.fromJson(text, ApiService.Message::class.java)
if (message.event == ApiService.EVENT_MESSAGE) {
val topic = message.topic
val attachment = if (message.attachment?.url != null) {
name = message.attachment.name,
type = message.attachment.type,
size = message.attachment.size,
expires = message.attachment.expires,
url = message.attachment.url,
} else null
val notification = Notification(
id = message.id,
subscriptionId = 0, // TO BE SET downstream
timestamp = message.time,
title = message.title ?: "",
message = message.message,
priority = toPriority(message.priority),
tags = joinTags(message.tags),
click = message.click ?: "",
attachment = attachment,
notificationId = Random.nextInt(),
deleted = false
val subscriptionId = topicsToSubscriptionIds[topic] ?: return@syncExec
val subscription = repository.getSubscription(subscriptionId) ?: return@syncExec
val notificationWithSubscriptionId = notification.copy(subscriptionId = subscription.id)
notificationListener(subscription, notificationWithSubscriptionId)
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
syncExec {
if (state == State.Connected) {
Log.w(TAG, "WebSocket(" + id + "): closed")
state = State.Disconnected
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
val code = if (response != null) "StatusCode: " + response.code else ""
val message = response?.message ?: ""
Log.e(TAG, "WebSocket($id): failure $code Message: $message", t)
syncExec {
stateChangeListener(subscriptionIds, ConnectionState.CONNECTING)
state = State.Disconnected
if ((response != null) && (response.code >= 400) && (response.code <= 499)) {
Log.d(TAG, "bad request")
val minutes: Int = Math.min(errorCount * 2 - 1, 20)
private fun syncExec(runnable: Runnable) {
synchronized(this) {
if (ID.get() == id) {
internal enum class State {
Scheduled, Connecting, Connected, Disconnected
companion object {
private const val TAG = "NtfyWsConnection"
private val ID = AtomicLong(0)
@ -17,6 +17,7 @@ import kotlin.math.abs
fun topicUrl(baseUrl: String, topic: String) = "${baseUrl}/${topic}"
fun topicUrlUp(baseUrl: String, topic: String) = "${baseUrl}/${topic}?up=1" // UnifiedPush
fun topicUrlJson(baseUrl: String, topic: String, since: String) = "${topicUrl(baseUrl, topic)}/json?since=$since"
fun topicUrlWs(baseUrl: String, topic: String, since: String) = "${topicUrl(baseUrl, topic)}/ws?since=$since"
fun topicUrlJsonPoll(baseUrl: String, topic: String, since: String) = "${topicUrl(baseUrl, topic)}/json?poll=1&since=$since"
fun topicShortUrl(baseUrl: String, topic: String) =
topicUrl(baseUrl, topic)
Add table
Reference in a new issue