Locking around UP registration, #230
This commit is contained in:
parent
a2551bc7f0
commit
821a1ac222
2 changed files with 65 additions and 49 deletions
|
@ -11,7 +11,10 @@ import io.heckel.ntfy.util.randomString
|
|||
import io.heckel.ntfy.util.topicUrlUp
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.GlobalScope
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import java.util.*
|
||||
import kotlin.random.Random
|
||||
|
||||
|
@ -44,46 +47,51 @@ class BroadcastReceiver : android.content.BroadcastReceiver() {
|
|||
return
|
||||
}
|
||||
GlobalScope.launch(Dispatchers.IO) {
|
||||
val existingSubscription = repository.getSubscriptionByConnectorToken(connectorToken)
|
||||
if (existingSubscription != null) {
|
||||
if (existingSubscription.upAppId == appId) {
|
||||
val endpoint = topicUrlUp(existingSubscription.baseUrl, existingSubscription.topic)
|
||||
Log.d(TAG, "Subscription with connectorToken $connectorToken exists. Sending endpoint $endpoint.")
|
||||
distributor.sendEndpoint(appId, connectorToken, endpoint)
|
||||
} else {
|
||||
Log.d(TAG, "Subscription with connectorToken $connectorToken exists for a different app. Refusing registration.")
|
||||
distributor.sendRegistrationFailed(appId, connectorToken, "Connector token already exists")
|
||||
// We're doing all of this inside a critical section, because of possible races.
|
||||
// See https://github.com/binwiederhier/ntfy/issues/230 for details.
|
||||
|
||||
mutex.withLock {
|
||||
val existingSubscription = repository.getSubscriptionByConnectorToken(connectorToken)
|
||||
if (existingSubscription != null) {
|
||||
if (existingSubscription.upAppId == appId) {
|
||||
val endpoint = topicUrlUp(existingSubscription.baseUrl, existingSubscription.topic)
|
||||
Log.d(TAG, "Subscription with connectorToken $connectorToken exists. Sending endpoint $endpoint.")
|
||||
distributor.sendEndpoint(appId, connectorToken, endpoint)
|
||||
} else {
|
||||
Log.d(TAG, "Subscription with connectorToken $connectorToken exists for a different app. Refusing registration.")
|
||||
distributor.sendRegistrationFailed(appId, connectorToken, "Connector token already exists")
|
||||
}
|
||||
return@launch
|
||||
}
|
||||
return@launch
|
||||
}
|
||||
|
||||
// Add subscription
|
||||
val baseUrl = repository.getDefaultBaseUrl() ?: context.getString(R.string.app_base_url)
|
||||
val topic = UP_PREFIX + randomString(TOPIC_RANDOM_ID_LENGTH)
|
||||
val endpoint = topicUrlUp(baseUrl, topic)
|
||||
val subscription = Subscription(
|
||||
id = Random.nextLong(),
|
||||
baseUrl = baseUrl,
|
||||
topic = topic,
|
||||
instant = true, // No Firebase, always instant!
|
||||
mutedUntil = 0,
|
||||
upAppId = appId,
|
||||
upConnectorToken = connectorToken,
|
||||
totalCount = 0,
|
||||
newCount = 0,
|
||||
lastActive = Date().time/1000
|
||||
)
|
||||
Log.d(TAG, "Adding subscription with for app $appId (connectorToken $connectorToken): $subscription")
|
||||
try {
|
||||
// Note, this may fail due to a SQL constraint exception, see https://github.com/binwiederhier/ntfy/issues/185
|
||||
repository.addSubscription(subscription)
|
||||
distributor.sendEndpoint(appId, connectorToken, endpoint)
|
||||
// Add subscription
|
||||
val baseUrl = repository.getDefaultBaseUrl() ?: context.getString(R.string.app_base_url)
|
||||
val topic = UP_PREFIX + randomString(TOPIC_RANDOM_ID_LENGTH)
|
||||
val endpoint = topicUrlUp(baseUrl, topic)
|
||||
val subscription = Subscription(
|
||||
id = Random.nextLong(),
|
||||
baseUrl = baseUrl,
|
||||
topic = topic,
|
||||
instant = true, // No Firebase, always instant!
|
||||
mutedUntil = 0,
|
||||
upAppId = appId,
|
||||
upConnectorToken = connectorToken,
|
||||
totalCount = 0,
|
||||
newCount = 0,
|
||||
lastActive = Date().time/1000
|
||||
)
|
||||
Log.d(TAG, "Adding subscription with for app $appId (connectorToken $connectorToken): $subscription")
|
||||
try {
|
||||
// Note, this may fail due to a SQL constraint exception, see https://github.com/binwiederhier/ntfy/issues/185
|
||||
repository.addSubscription(subscription)
|
||||
distributor.sendEndpoint(appId, connectorToken, endpoint)
|
||||
|
||||
// Refresh (and maybe start) foreground service
|
||||
SubscriberServiceManager.refresh(app)
|
||||
} catch (e: Exception) {
|
||||
Log.w(TAG, "Failed to add subscription", e)
|
||||
distributor.sendRegistrationFailed(appId, connectorToken, e.message)
|
||||
// Refresh (and maybe start) foreground service
|
||||
SubscriberServiceManager.refresh(app)
|
||||
} catch (e: Exception) {
|
||||
Log.w(TAG, "Failed to add subscription", e)
|
||||
distributor.sendRegistrationFailed(appId, connectorToken, e.message)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -95,19 +103,24 @@ class BroadcastReceiver : android.content.BroadcastReceiver() {
|
|||
val distributor = Distributor(app)
|
||||
Log.d(TAG, "UNREGISTER received (connectorToken=$connectorToken)")
|
||||
GlobalScope.launch(Dispatchers.IO) {
|
||||
val existingSubscription = repository.getSubscriptionByConnectorToken(connectorToken)
|
||||
if (existingSubscription == null) {
|
||||
Log.d(TAG, "Subscription with connectorToken $connectorToken does not exist. Ignoring.")
|
||||
return@launch
|
||||
// We're doing all of this inside a critical section, because of possible races.
|
||||
// See https://github.com/binwiederhier/ntfy/issues/230 for details.
|
||||
|
||||
mutex.withLock {
|
||||
val existingSubscription = repository.getSubscriptionByConnectorToken(connectorToken)
|
||||
if (existingSubscription == null) {
|
||||
Log.d(TAG, "Subscription with connectorToken $connectorToken does not exist. Ignoring.")
|
||||
return@launch
|
||||
}
|
||||
|
||||
// Remove subscription
|
||||
Log.d(TAG, "Removing subscription ${existingSubscription.id} with connectorToken $connectorToken")
|
||||
repository.removeSubscription(existingSubscription.id)
|
||||
existingSubscription.upAppId?.let { appId -> distributor.sendUnregistered(appId, connectorToken) }
|
||||
|
||||
// Refresh (and maybe stop) foreground service
|
||||
SubscriberServiceManager.refresh(context)
|
||||
}
|
||||
|
||||
// Remove subscription
|
||||
Log.d(TAG, "Removing subscription ${existingSubscription.id} with connectorToken $connectorToken")
|
||||
repository.removeSubscription(existingSubscription.id)
|
||||
existingSubscription.upAppId?.let { appId -> distributor.sendUnregistered(appId, connectorToken) }
|
||||
|
||||
// Refresh (and maybe stop) foreground service
|
||||
SubscriberServiceManager.refresh(context)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -115,5 +128,7 @@ class BroadcastReceiver : android.content.BroadcastReceiver() {
|
|||
private const val TAG = "NtfyUpBroadcastRecv"
|
||||
private const val UP_PREFIX = "up"
|
||||
private const val TOPIC_RANDOM_ID_LENGTH = 12
|
||||
|
||||
val mutex = Mutex() // https://github.com/binwiederhier/ntfy/issues/230
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ Bugs:
|
|||
* Make messages with links selectable (#226, thanks to @StoyanDimitrov for reporting)
|
||||
* Restoring topics or settings from backup doesn't work (#223, thanks to @shadow00 for reporting)
|
||||
* Fix app icon on old Android versions (#120, thanks to @shadow00 for reporting)
|
||||
* Fix races in UnifiedPush registration (#230, thanks to @Jakob for reporting)
|
||||
|
||||
**Thanks for testing:**
|
||||
|
||||
|
|
Loading…
Reference in a new issue