Fix websocket global ID bug

This commit is contained in:
Philipp Heckel 2022-02-02 12:11:04 -05:00
parent 82177253a7
commit 74b260f9a0
4 changed files with 51 additions and 49 deletions

View file

@ -1,20 +1,15 @@
<?xml version="1.0" encoding="utf-8"?>
<manifest xmlns:android="http://schemas.android.com/apk/res/android"
package="io.heckel.ntfy">
<!--
Permissions
- INTERNET is needed because we need to talk to the ntfy server(s)
- FOREGROUND_SERVICE is needed to support "use another server" feature
- WAKE_LOCK & RECEIVE_BOOT_COMPLETED are required to restart the foreground service
if it is stopped; see https://robertohuertas.com/2019/06/29/android_foreground_services/
-->
<!-- Permissions -->
<uses-permission android:name="android.permission.INTERNET"/>
<uses-permission android:name="android.permission.FOREGROUND_SERVICE"/>
<uses-permission android:name="android.permission.WAKE_LOCK"/>
<uses-permission android:name="android.permission.RECEIVE_BOOT_COMPLETED"/>
<uses-permission android:name="android.permission.VIBRATE"/>
<uses-permission android:name="android.permission.FOREGROUND_SERVICE"/> <!-- For instant delivery foregrounds service -->
<uses-permission android:name="android.permission.WAKE_LOCK"/> <!-- To keep foreground service awake; soon not needed anymore -->
<uses-permission android:name="android.permission.RECEIVE_BOOT_COMPLETED"/> <!-- To restart service on reboot -->
<uses-permission android:name="android.permission.VIBRATE"/> <!-- Incoming notifications should be able to vibrate the phone -->
<uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE" android:maxSdkVersion="28"/> <!-- Only required on SDK <= 28 -->
<uses-permission android:name="android.permission.REQUEST_INSTALL_PACKAGES"/> <!-- Required to install packages downloaded through ntfy; craazyy! -->
<uses-permission android:name="android.permission.REQUEST_INSTALL_PACKAGES"/> <!-- To install packages downloaded through ntfy; craazyy! -->
<uses-permission android:name="android.permission.SCHEDULE_EXACT_ALARM"/> <!-- To reschedule the websocket retry -->
<application
android:name=".app.Application"

View file

@ -32,7 +32,7 @@ class ApiService {
val url = topicUrl(baseUrl, topic)
Log.d(TAG, "Publishing to $url")
val builder = builder(url, user)
val builder = requestBuilder(url, user)
.put(message.toRequestBody())
if (priority in 1..5) {
builder.addHeader("X-Priority", priority.toString())
@ -59,7 +59,7 @@ class ApiService {
val url = topicUrlJsonPoll(baseUrl, topic, sinceVal)
Log.d(TAG, "Polling topic $url")
val request = builder(url, user).build()
val request = requestBuilder(url, user).build()
client.newCall(request).execute().use { response ->
if (!response.isSuccessful) {
throw Exception("Unexpected response ${response.code} when polling topic $url")
@ -86,7 +86,7 @@ class ApiService {
val sinceVal = if (since == 0L) "all" else since.toString()
val url = topicUrlJson(baseUrl, topics, sinceVal)
Log.d(TAG, "Opening subscription connection to $url")
val request = builder(url, user).build()
val request = requestBuilder(url, user).build()
val call = subscriberClient.newCall(request)
call.enqueue(object : Callback {
override fun onResponse(call: Call, response: Response) {
@ -122,7 +122,7 @@ class ApiService {
Log.d(TAG, "Checking read access for user ${user.username} against ${topicUrl(baseUrl, topic)}")
}
val url = topicUrlAuth(baseUrl, topic)
val request = builder(url, user).build()
val request = requestBuilder(url, user).build()
client.newCall(request).execute().use { response ->
return if (user == null) {
response.isSuccessful || response.code == 404 // Treat 404 as success (old server; to be removed in future versions)
@ -132,16 +132,6 @@ class ApiService {
}
}
private fun builder(url: String, user: User?): Request.Builder {
val builder = Request.Builder()
.url(url)
.addHeader("User-Agent", USER_AGENT)
if (user != null) {
builder.addHeader("Authorization", Credentials.basic(user.username, user.password, UTF_8))
}
return builder
}
companion object {
val USER_AGENT = "ntfy/${BuildConfig.VERSION_NAME} (${BuildConfig.FLAVOR}; Android ${Build.VERSION.RELEASE}; SDK ${Build.VERSION.SDK_INT})"
private const val TAG = "NtfyApiService"
@ -151,5 +141,15 @@ class ApiService {
const val EVENT_MESSAGE = "message"
const val EVENT_KEEPALIVE = "keepalive"
const val EVENT_POLL_REQUEST = "poll_request"
fun requestBuilder(url: String, user: User?): Request.Builder {
val builder = Request.Builder()
.url(url)
.addHeader("User-Agent", USER_AGENT)
if (user != null) {
builder.addHeader("Authorization", Credentials.basic(user.username, user.password, UTF_8))
}
return builder
}
}
}

View file

@ -7,6 +7,7 @@ import android.os.Looper
import io.heckel.ntfy.db.*
import io.heckel.ntfy.log.Log
import io.heckel.ntfy.msg.ApiService
import io.heckel.ntfy.msg.ApiService.Companion.requestBuilder
import io.heckel.ntfy.msg.NotificationParser
import io.heckel.ntfy.util.topicUrl
import io.heckel.ntfy.util.topicUrlWs
@ -44,6 +45,7 @@ class WsConnection(
.build()
private var errorCount = 0
private var webSocket: WebSocket? = null
private val webSocketId = AtomicLong(0)
private var state: State? = null
private var closed = false
@ -57,24 +59,18 @@ class WsConnection(
@Synchronized
override fun start() {
if (closed || state == State.Connecting || state == State.Connected) {
Log.d(TAG,"[$url] WebSocket: Not (re-)starting, because connection is marked closed/connecting/connected")
return
}
if (webSocket != null) {
webSocket!!.close(WS_CLOSE_NORMAL, "")
}
state = State.Connecting
val nextId = ID.incrementAndGet()
val nextId = webSocketId.incrementAndGet()
val sinceVal = if (since == 0L) "all" else since.toString()
val urlWithSince = topicUrlWs(baseUrl, topicsStr, sinceVal)
val builder = Request.Builder()
.get()
.url(urlWithSince)
.addHeader("User-Agent", ApiService.USER_AGENT)
if (user != null) {
builder.addHeader("Authorization", Credentials.basic(user.username, user.password, StandardCharsets.UTF_8))
}
val request = builder.build()
Log.d(TAG, "[$url] WebSocket($nextId): opening $urlWithSince ...")
val request = requestBuilder(urlWithSince, user).build()
Log.d(TAG, "[$url] WebSocket: Opening $urlWithSince with listener ID $nextId ...")
webSocket = client.newWebSocket(request, Listener(nextId))
}
@ -82,9 +78,10 @@ class WsConnection(
override fun close() {
closed = true
if (webSocket == null) {
Log.d(TAG,"[$url] WebSocket: Not closing existing connection, because there is no active web socket")
return
}
Log.d(TAG, "[$url] WebSocket(${ID.get()}): closing existing connection")
Log.d(TAG, "[$url] WebSocket: Closing existing connection")
state = State.Disconnected
webSocket!!.close(WS_CLOSE_NORMAL, "")
webSocket = null
@ -98,6 +95,7 @@ class WsConnection(
@Synchronized
fun scheduleReconnect(seconds: Int) {
if (closed || state == State.Connecting || state == State.Connected) {
Log.d(TAG,"[$url] WebSocket: Not rescheduling connection, because connection is marked closed/connecting/connected")
return
}
state = State.Scheduled
@ -115,8 +113,8 @@ class WsConnection(
private inner class Listener(private val id: Long) : WebSocketListener() {
override fun onOpen(webSocket: WebSocket, response: Response) {
syncExec {
Log.d(TAG, "[$url] WebSocket($id): opened")
syncExec("onOpen") {
Log.d(TAG, "[$url] WebSocket ($id): Opened connection")
state = State.Connected
if (errorCount > 0) {
errorCount = 0
@ -126,10 +124,11 @@ class WsConnection(
}
override fun onMessage(webSocket: WebSocket, text: String) {
syncExec {
Log.d(TAG, "[$url] WebSocket($id): received message: $text")
syncExec("onMessage") {
Log.d(TAG, "[$url] WebSocket ($id): Received message: $text")
val notificationWithTopic = parser.parseWithTopic(text, subscriptionId = 0, notificationId = Random.nextInt())
if (notificationWithTopic == null) {
Log.d(TAG, "[$url] WebSocket ($id): Unable to parse message. Discarding.")
return@syncExec
}
val topic = notificationWithTopic.topic
@ -143,17 +142,21 @@ class WsConnection(
}
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
syncExec {
Log.w(TAG, "[$url] WebSocket($id): closed")
syncExec("onClosed") {
Log.w(TAG, "[$url] WebSocket ($id): Closed connection")
state = State.Disconnected
}
}
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
Log.e(TAG, "[$url] WebSocket($id): failure ${response?.code}: ${response?.message}", t)
syncExec {
syncExec("onFailure") {
if (response == null) {
Log.e(TAG, "[$url] WebSocket ($id): Connection failed (response is null): ${t.message}", t)
} else {
Log.e(TAG, "[$url] WebSocket ($id): Connection failed (response code ${response.code}, message: ${response.message}): ${t.message}", t)
}
if (closed) {
Log.d(TAG, "WebSocket($id): Connection marked as closed. Not retrying.")
Log.d(TAG, "WebSocket ($id): Connection marked as closed. Not retrying.")
return@syncExec
}
stateChangeListener(subscriptionIds, ConnectionState.CONNECTING)
@ -164,10 +167,14 @@ class WsConnection(
}
}
private fun syncExec(fn: () -> Unit) {
private fun syncExec(tag: String, fn: () -> Unit) {
synchronized(this) {
if (ID.get() == id) {
if (webSocketId.get() == id) {
Log.d(TAG, "[$url] WebSocket ($id): Begin $tag")
fn()
Log.d(TAG, "[$url] WebSocket ($id): End $tag")
} else {
Log.d(TAG, "[$url] WebSocket ($id): Skipping synchronized block '$tag', because ID does not match ${webSocketId.get()}")
}
}
}
@ -182,6 +189,5 @@ class WsConnection(
private const val RECONNECT_TAG = "WsReconnect"
private const val WS_CLOSE_NORMAL = 1000
private val RETRY_SECONDS = listOf(5, 10, 15, 20, 30, 45, 60, 120)
private val ID = AtomicLong(0)
}
}

View file

@ -66,6 +66,7 @@ class SettingsActivity : AppCompatActivity(), PreferenceFragmentCompat.OnPrefere
.beginTransaction()
.replace(R.id.settings_layout, settingsFragment)
.commit()
title = getString(R.string.settings_title)
} else {
title = savedInstanceState.getCharSequence(TITLE_TAG)
}