switch to GuzzleHttp for web push transport, parallelize notification sending
This commit is contained in:
parent
248b4277bc
commit
e2805beb16
3 changed files with 27 additions and 17 deletions
lib
|
@ -37,6 +37,7 @@ use OCA\DAV\Events\CardDeletedEvent;
|
||||||
use OCA\DAV\Events\CardUpdatedEvent;
|
use OCA\DAV\Events\CardUpdatedEvent;
|
||||||
|
|
||||||
use Psr\Log\LoggerInterface;
|
use Psr\Log\LoggerInterface;
|
||||||
|
use GuzzleHttp\Promise;
|
||||||
|
|
||||||
use OCA\DavPush\Service\SubscriptionService;
|
use OCA\DavPush\Service\SubscriptionService;
|
||||||
use OCA\DavPush\Transport\TransportManager;
|
use OCA\DavPush\Transport\TransportManager;
|
||||||
|
@ -59,13 +60,16 @@ class CalendarListener implements IEventListener {
|
||||||
$collectionName = $event->getCalendarData()['uri'];
|
$collectionName = $event->getCalendarData()['uri'];
|
||||||
$subscriptions = $this->subscriptionService->findAll($collectionName);
|
$subscriptions = $this->subscriptionService->findAll($collectionName);
|
||||||
|
|
||||||
foreach($subscriptions as $subscription) {
|
$notificationPromises = (function () use ($collectionName, $subscriptions): Generator {
|
||||||
$transport = $this->transportManager->getTransport($subscription->getTransport());
|
foreach($subscriptions as $subscription) {
|
||||||
try {
|
$transport = $this->transportManager->getTransport($subscription->getTransport());
|
||||||
$transport->notify($subscription->getUserId(), $collectionName, $subscription->getId());
|
yield $transport->notify($subscription->getUserId(), $collectionName, $subscription->getId());
|
||||||
} catch (\Exception $e) {
|
|
||||||
$this->logger->error("transport " . $subscription->getTransport() . " failed to deliver notification to subscription " . $subscription->getId());
|
|
||||||
}
|
}
|
||||||
}
|
})();
|
||||||
|
|
||||||
|
$responses = Promise\Utils::settle($notificationPromises)->wait();
|
||||||
|
|
||||||
|
// TODO: iterate over responses and log errors
|
||||||
|
// $this->logger->error("transport " . $subscription->getTransport() . " failed to deliver notification to subscription " . $subscription->getId());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,6 +30,9 @@ use OCA\DavPush\Transport\Transport;
|
||||||
use OCA\DavPush\Service\WebPushSubscriptionService;
|
use OCA\DavPush\Service\WebPushSubscriptionService;
|
||||||
use OCA\DavPush\Errors\WebPushSubscriptionNotFound;
|
use OCA\DavPush\Errors\WebPushSubscriptionNotFound;
|
||||||
|
|
||||||
|
use OCP\Http\Client\IClientService;
|
||||||
|
use OCP\Http\Client\IPromise;
|
||||||
|
|
||||||
use Sabre\Xml\Service;
|
use Sabre\Xml\Service;
|
||||||
|
|
||||||
class WebPushTransport extends Transport {
|
class WebPushTransport extends Transport {
|
||||||
|
@ -37,6 +40,7 @@ class WebPushTransport extends Transport {
|
||||||
|
|
||||||
public function __construct(
|
public function __construct(
|
||||||
private WebPushSubscriptionService $webPushSubscriptionService,
|
private WebPushSubscriptionService $webPushSubscriptionService,
|
||||||
|
private IClientService $httpClientService
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
private function parseOptions(array $options): array {
|
private function parseOptions(array $options): array {
|
||||||
|
@ -79,7 +83,7 @@ class WebPushTransport extends Transport {
|
||||||
];
|
];
|
||||||
}
|
}
|
||||||
|
|
||||||
public function notify(string $userId, string $collectionName, int $subscriptionId) {
|
public function notify(string $userId, string $collectionName, int $subscriptionId): IPromise {
|
||||||
$xmlService = new Service();
|
$xmlService = new Service();
|
||||||
|
|
||||||
$pushResource = $this->webPushSubscriptionService->findBySubscriptionId($subscriptionId)->getPushResource();
|
$pushResource = $this->webPushSubscriptionService->findBySubscriptionId($subscriptionId)->getPushResource();
|
||||||
|
@ -88,15 +92,15 @@ class WebPushTransport extends Transport {
|
||||||
'{DAV:Push}topic' => $collectionName,
|
'{DAV:Push}topic' => $collectionName,
|
||||||
]);
|
]);
|
||||||
|
|
||||||
$options = [
|
$httpClient = $this->httpClientService->newClient();
|
||||||
'http' => [
|
|
||||||
'method' => 'POST',
|
|
||||||
'content' => $content,
|
|
||||||
],
|
|
||||||
];
|
|
||||||
|
|
||||||
$context = stream_context_create($options);
|
return $httpClient->postAsync($pushResource, [
|
||||||
$result = file_get_contents($pushResource, false, $context);
|
"body" => $content,
|
||||||
|
"timeout" => 10,
|
||||||
|
"headers" => [
|
||||||
|
"Content-Type" => "application/xml",
|
||||||
|
],
|
||||||
|
]);
|
||||||
}
|
}
|
||||||
|
|
||||||
public function getSubscriptionIdFromOptions(string $userId, string $collectionName, $options): ?int {
|
public function getSubscriptionIdFromOptions(string $userId, string $collectionName, $options): ?int {
|
||||||
|
|
|
@ -26,6 +26,8 @@ declare(strict_types=1);
|
||||||
|
|
||||||
namespace OCA\DavPush\Transport;
|
namespace OCA\DavPush\Transport;
|
||||||
|
|
||||||
|
use OCP\Http\Client\IPromise;
|
||||||
|
|
||||||
abstract class Transport {
|
abstract class Transport {
|
||||||
protected $id;
|
protected $id;
|
||||||
|
|
||||||
|
@ -69,5 +71,5 @@ abstract class Transport {
|
||||||
// Change mutable options of the subscription (if any exist)
|
// Change mutable options of the subscription (if any exist)
|
||||||
abstract public function updateSubscription($subsciptionId, $options);
|
abstract public function updateSubscription($subsciptionId, $options);
|
||||||
|
|
||||||
abstract public function notify(string $userId, string $collectionName, int $subscriptionId);
|
abstract public function notify(string $userId, string $collectionName, int $subscriptionId): IPromise;
|
||||||
}
|
}
|
Loading…
Reference in a new issue