Merge pull request #55 from wunter8/652-rate-topics-header
Add `Rate-Topics` header to WebSocket connection and JSON stream
This commit is contained in:
commit
3c86c7a33f
5 changed files with 16 additions and 7 deletions
|
@ -108,6 +108,7 @@ class ApiService {
|
||||||
fun subscribe(
|
fun subscribe(
|
||||||
baseUrl: String,
|
baseUrl: String,
|
||||||
topics: String,
|
topics: String,
|
||||||
|
unifiedPushTopics: String,
|
||||||
since: String?,
|
since: String?,
|
||||||
user: User?,
|
user: User?,
|
||||||
notify: (topic: String, Notification) -> Unit,
|
notify: (topic: String, Notification) -> Unit,
|
||||||
|
@ -116,7 +117,7 @@ class ApiService {
|
||||||
val sinceVal = since ?: "all"
|
val sinceVal = since ?: "all"
|
||||||
val url = topicUrlJson(baseUrl, topics, sinceVal)
|
val url = topicUrlJson(baseUrl, topics, sinceVal)
|
||||||
Log.d(TAG, "Opening subscription connection to $url")
|
Log.d(TAG, "Opening subscription connection to $url")
|
||||||
val request = requestBuilder(url, user).build()
|
val request = requestBuilder(url, user, unifiedPushTopics).build()
|
||||||
val call = subscriberClient.newCall(request)
|
val call = subscriberClient.newCall(request)
|
||||||
call.enqueue(object : Callback {
|
call.enqueue(object : Callback {
|
||||||
override fun onResponse(call: Call, response: Response) {
|
override fun onResponse(call: Call, response: Response) {
|
||||||
|
@ -178,13 +179,16 @@ class ApiService {
|
||||||
const val EVENT_KEEPALIVE = "keepalive"
|
const val EVENT_KEEPALIVE = "keepalive"
|
||||||
const val EVENT_POLL_REQUEST = "poll_request"
|
const val EVENT_POLL_REQUEST = "poll_request"
|
||||||
|
|
||||||
fun requestBuilder(url: String, user: User?): Request.Builder {
|
fun requestBuilder(url: String, user: User?, unifiedPushTopics: String? = null): Request.Builder {
|
||||||
val builder = Request.Builder()
|
val builder = Request.Builder()
|
||||||
.url(url)
|
.url(url)
|
||||||
.addHeader("User-Agent", USER_AGENT)
|
.addHeader("User-Agent", USER_AGENT)
|
||||||
if (user != null) {
|
if (user != null) {
|
||||||
builder.addHeader("Authorization", Credentials.basic(user.username, user.password, UTF_8))
|
builder.addHeader("Authorization", Credentials.basic(user.username, user.password, UTF_8))
|
||||||
}
|
}
|
||||||
|
if (unifiedPushTopics != null) {
|
||||||
|
builder.addHeader("Rate-Topics", unifiedPushTopics)
|
||||||
|
}
|
||||||
return builder
|
return builder
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,5 +8,6 @@ interface Connection {
|
||||||
|
|
||||||
data class ConnectionId(
|
data class ConnectionId(
|
||||||
val baseUrl: String,
|
val baseUrl: String,
|
||||||
val topicsToSubscriptionIds: Map<String, Long>
|
val topicsToSubscriptionIds: Map<String, Long>,
|
||||||
|
val topicIsUnifiedPush: Map<String, Boolean>
|
||||||
)
|
)
|
||||||
|
|
|
@ -21,8 +21,10 @@ class JsonConnection(
|
||||||
) : Connection {
|
) : Connection {
|
||||||
private val baseUrl = connectionId.baseUrl
|
private val baseUrl = connectionId.baseUrl
|
||||||
private val topicsToSubscriptionIds = connectionId.topicsToSubscriptionIds
|
private val topicsToSubscriptionIds = connectionId.topicsToSubscriptionIds
|
||||||
|
private val topicIsUnifiedPush = connectionId.topicIsUnifiedPush
|
||||||
private val subscriptionIds = topicsToSubscriptionIds.values
|
private val subscriptionIds = topicsToSubscriptionIds.values
|
||||||
private val topicsStr = topicsToSubscriptionIds.keys.joinToString(separator = ",")
|
private val topicsStr = topicsToSubscriptionIds.keys.joinToString(separator = ",")
|
||||||
|
private val unifiedPushTopicsStr = topicIsUnifiedPush.filter { entry -> entry.value }.keys.joinToString(separator = ",")
|
||||||
private val url = topicUrl(baseUrl, topicsStr)
|
private val url = topicUrl(baseUrl, topicsStr)
|
||||||
|
|
||||||
private var since: String? = sinceId
|
private var since: String? = sinceId
|
||||||
|
@ -56,7 +58,7 @@ class JsonConnection(
|
||||||
// Call /json subscribe endpoint and loop until the call fails, is canceled,
|
// Call /json subscribe endpoint and loop until the call fails, is canceled,
|
||||||
// or the job or service are cancelled/stopped
|
// or the job or service are cancelled/stopped
|
||||||
try {
|
try {
|
||||||
call = api.subscribe(baseUrl, topicsStr, since, user, notify, fail)
|
call = api.subscribe(baseUrl, topicsStr, unifiedPushTopicsStr, since, user, notify, fail)
|
||||||
while (!failed.get() && !call.isCanceled() && isActive && serviceActive()) {
|
while (!failed.get() && !call.isCanceled() && isActive && serviceActive()) {
|
||||||
stateChangeListener(subscriptionIds, ConnectionState.CONNECTED)
|
stateChangeListener(subscriptionIds, ConnectionState.CONNECTED)
|
||||||
Log.d(TAG,"[$url] Connection is active (failed=$failed, callCanceled=${call.isCanceled()}, jobActive=$isActive, serviceStarted=${serviceActive()}")
|
Log.d(TAG,"[$url] Connection is active (failed=$failed, callCanceled=${call.isCanceled()}, jobActive=$isActive, serviceStarted=${serviceActive()}")
|
||||||
|
|
|
@ -172,8 +172,8 @@ class SubscriberService : Service() {
|
||||||
.filter { s -> s.instant }
|
.filter { s -> s.instant }
|
||||||
val activeConnectionIds = connections.keys().toList().toSet()
|
val activeConnectionIds = connections.keys().toList().toSet()
|
||||||
val desiredConnectionIds = instantSubscriptions // Set<ConnectionId>
|
val desiredConnectionIds = instantSubscriptions // Set<ConnectionId>
|
||||||
.groupBy { s -> ConnectionId(s.baseUrl, emptyMap()) }
|
.groupBy { s -> ConnectionId(s.baseUrl, emptyMap(), emptyMap()) }
|
||||||
.map { entry -> entry.key.copy(topicsToSubscriptionIds = entry.value.associate { s -> s.topic to s.id }) }
|
.map { entry -> entry.key.copy(topicsToSubscriptionIds = entry.value.associate { s -> s.topic to s.id }, topicIsUnifiedPush = entry.value.associate { s -> s.topic to (s.upConnectorToken != null) }) }
|
||||||
.toSet()
|
.toSet()
|
||||||
val newConnectionIds = desiredConnectionIds.subtract(activeConnectionIds)
|
val newConnectionIds = desiredConnectionIds.subtract(activeConnectionIds)
|
||||||
val obsoleteConnectionIds = activeConnectionIds.subtract(desiredConnectionIds)
|
val obsoleteConnectionIds = activeConnectionIds.subtract(desiredConnectionIds)
|
||||||
|
|
|
@ -56,8 +56,10 @@ class WsConnection(
|
||||||
private val since = AtomicReference<String?>(sinceId)
|
private val since = AtomicReference<String?>(sinceId)
|
||||||
private val baseUrl = connectionId.baseUrl
|
private val baseUrl = connectionId.baseUrl
|
||||||
private val topicsToSubscriptionIds = connectionId.topicsToSubscriptionIds
|
private val topicsToSubscriptionIds = connectionId.topicsToSubscriptionIds
|
||||||
|
private val topicIsUnifiedPush = connectionId.topicIsUnifiedPush
|
||||||
private val subscriptionIds = topicsToSubscriptionIds.values
|
private val subscriptionIds = topicsToSubscriptionIds.values
|
||||||
private val topicsStr = topicsToSubscriptionIds.keys.joinToString(separator = ",")
|
private val topicsStr = topicsToSubscriptionIds.keys.joinToString(separator = ",")
|
||||||
|
private val unifiedPushTopicsStr = topicIsUnifiedPush.filter { entry -> entry.value }.keys.joinToString(separator = ",")
|
||||||
private val shortUrl = topicShortUrl(baseUrl, topicsStr)
|
private val shortUrl = topicShortUrl(baseUrl, topicsStr)
|
||||||
|
|
||||||
init {
|
init {
|
||||||
|
@ -78,7 +80,7 @@ class WsConnection(
|
||||||
val sinceId = since.get()
|
val sinceId = since.get()
|
||||||
val sinceVal = sinceId ?: "all"
|
val sinceVal = sinceId ?: "all"
|
||||||
val urlWithSince = topicUrlWs(baseUrl, topicsStr, sinceVal)
|
val urlWithSince = topicUrlWs(baseUrl, topicsStr, sinceVal)
|
||||||
val request = requestBuilder(urlWithSince, user).build()
|
val request = requestBuilder(urlWithSince, user, unifiedPushTopicsStr).build()
|
||||||
Log.d(TAG, "$shortUrl (gid=$globalId): Opening $urlWithSince with listener ID $nextListenerId ...")
|
Log.d(TAG, "$shortUrl (gid=$globalId): Opening $urlWithSince with listener ID $nextListenerId ...")
|
||||||
webSocket = client.newWebSocket(request, Listener(nextListenerId))
|
webSocket = client.newWebSocket(request, Listener(nextListenerId))
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue