Remember for deliveries in the DB, whether they should sign forwarding
This commit is contained in:
3 changed files with 50 additions and 34 deletions
@ -67,6 +67,7 @@ InboxItemRemote
recipient UnfetchedRemoteActorId
recipient UnfetchedRemoteActorId
activity OutboxItemId
activity OutboxItemId
forwarding Bool
running Bool
running Bool
UniqueUnlinkedDelivery recipient activity
UniqueUnlinkedDelivery recipient activity
@ -74,6 +75,7 @@ UnlinkedDelivery
recipient RemoteActorId
recipient RemoteActorId
activity OutboxItemId
activity OutboxItemId
forwarding Bool
running Bool
running Bool
UniqueDelivery recipient activity
UniqueDelivery recipient activity
@ -929,7 +929,7 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c
moreRemotes <- deliverLocal pid obid localRecips mcollections
moreRemotes <- deliverLocal pid obid localRecips mcollections
unless (federation || null moreRemotes) $
unless (federation || null moreRemotes) $
throwE "Federation disabled but remote collection members found"
throwE "Federation disabled but remote collection members found"
remotesHttp <- lift $ deliverRemoteDB obid remoteRecips moreRemotes
remotesHttp <- lift $ deliverRemoteDB (furiHost uContext) obid remoteRecips moreRemotes
return (lmid, obid, doc, remotesHttp)
return (lmid, obid, doc, remotesHttp)
(lmid, obid, doc, remotesHttp) <- case result of
(lmid, obid, doc, remotesHttp) <- case result of
Left (FedError t) -> throwE t
Left (FedError t) -> throwE t
@ -1318,7 +1318,8 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c
Right _gid -> throwE "Local Note addresses a local group"
Right _gid -> throwE "Local Note addresses a local group"
:: OutboxItemId
:: Text
-> OutboxItemId
-> [FedURI]
-> [FedURI]
-> [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, LocalURI, Maybe UTCTime))]
-> [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, LocalURI, Maybe UTCTime))]
-> AppDB
-> AppDB
@ -1326,7 +1327,7 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c
, [((InstanceId, Text), NonEmpty (UnfetchedRemoteActorId, LocalURI, UnlinkedDeliveryId))]
, [((InstanceId, Text), NonEmpty (UnfetchedRemoteActorId, LocalURI, UnlinkedDeliveryId))]
, [((InstanceId, Text), NonEmpty (UnfetchedRemoteActorId, LocalURI, UnlinkedDeliveryId))]
, [((InstanceId, Text), NonEmpty (UnfetchedRemoteActorId, LocalURI, UnlinkedDeliveryId))]
deliverRemoteDB obid recips known = do
deliverRemoteDB hContext obid recips known = do
recips' <- for (groupByHost recips) $ \ (h, lus) -> do
recips' <- for (groupByHost recips) $ \ (h, lus) -> do
let lus' = NE.nub lus
let lus' = NE.nub lus
(iid, inew) <- idAndNew <$> insertBy' (Instance h)
(iid, inew) <- idAndNew <$> insertBy' (Instance h)
@ -1353,13 +1354,16 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c
-- TODO see the earlier TODO about merge, it applies here too
-- TODO see the earlier TODO about merge, it applies here too
allFetched = map (second $ NE.nubBy ((==) `on` fst4)) $ mergeConcat known moreKnown
allFetched = map (second $ NE.nubBy ((==) `on` fst4)) $ mergeConcat known moreKnown
fetchedDeliv <- for allFetched $ \ (i, rs) ->
fetchedDeliv <- for allFetched $ \ (i, rs) ->
(i,) <$> insertMany' (\ (raid, _, _, msince) -> Delivery raid obid $ isNothing msince) rs
let fwd = snd i == hContext
in (i,) <$> insertMany' (\ (raid, _, _, msince) -> Delivery raid obid fwd $ isNothing msince) rs
unfetchedDeliv <- for unfetched $ \ (i, rs) ->
unfetchedDeliv <- for unfetched $ \ (i, rs) ->
(i,) <$> insertMany' (\ (uraid, _, msince) -> UnlinkedDelivery uraid obid $ isNothing msince) rs
let fwd = snd i == hContext
in (i,) <$> insertMany' (\ (uraid, _, msince) -> UnlinkedDelivery uraid obid fwd $ isNothing msince) rs
unknownDeliv <- for stillUnknown $ \ (i, lus) -> do
unknownDeliv <- for stillUnknown $ \ (i, lus) -> do
-- TODO maybe for URA insertion we should do insertUnique?
-- TODO maybe for URA insertion we should do insertUnique?
rs <- insertMany' (\ lu -> UnfetchedRemoteActor (fst i) lu Nothing) lus
rs <- insertMany' (\ lu -> UnfetchedRemoteActor (fst i) lu Nothing) lus
(i,) <$> insertMany' (\ (_, uraid) -> UnlinkedDelivery uraid obid True) rs
let fwd = snd i == hContext
(i,) <$> insertMany' (\ (_, uraid) -> UnlinkedDelivery uraid obid fwd True) rs
( takeNoError4 fetchedDeliv
( takeNoError4 fetchedDeliv
, takeNoError3 unfetchedDeliv
, takeNoError3 unfetchedDeliv
@ -1398,8 +1402,9 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c
-> Handler ()
-> Handler ()
deliverRemoteHttp hContext obid doc (fetched, unfetched, unknown) = do
deliverRemoteHttp hContext obid doc (fetched, unfetched, unknown) = do
let deliver fwd h =
let deliver fwd h inbox = do
deliverHttp doc (if h == hContext then Just fwd else Nothing) h
let fwd' = if h == hContext then Just fwd else Nothing
(isJust fwd',) <$> deliverHttp doc fwd' h inbox
now <- liftIO getCurrentTime
now <- liftIO getCurrentTime
traverse_ (fork . deliverFetched deliver now) fetched
traverse_ (fork . deliverFetched deliver now) fetched
traverse_ (fork . deliverUnfetched deliver now) unfetched
traverse_ (fork . deliverUnfetched deliver now) unfetched
@ -1408,7 +1413,7 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c
fork = forkHandler $ \ e -> logError $ "Outbox POST handler: delivery failed! " <> T.pack (displayException e)
fork = forkHandler $ \ e -> logError $ "Outbox POST handler: delivery failed! " <> T.pack (displayException e)
deliverFetched deliver now ((_, h), recips@(r :| rs)) = do
deliverFetched deliver now ((_, h), recips@(r :| rs)) = do
let (raid, luActor, luInbox, dlid) = r
let (raid, luActor, luInbox, dlid) = r
e <- deliver luActor h luInbox
(_, e) <- deliver luActor h luInbox
let e' = case e of
let e' = case e of
Left err ->
Left err ->
if isInstanceErrorP err
if isInstanceErrorP err
@ -1429,7 +1434,7 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c
update dlid [DeliveryRunning =. False]
update dlid [DeliveryRunning =. False]
for_ rs $ \ (raid, luActor, luInbox, dlid) ->
for_ rs $ \ (raid, luActor, luInbox, dlid) ->
fork $ do
fork $ do
e <- deliver luActor h luInbox
(_, e) <- deliver luActor h luInbox
runDB $
runDB $
case e of
case e of
Left _err -> do
Left _err -> do
@ -1457,13 +1462,13 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c
e <- fetchRemoteActor iid h luActor
e <- fetchRemoteActor iid h luActor
case e of
case e of
Right (Right (Entity raid ra)) -> do
Right (Right (Entity raid ra)) -> do
e' <- deliver luActor h $ remoteActorInbox ra
(fwd, e') <- deliver luActor h $ remoteActorInbox ra
runDB $
runDB $
case e' of
case e' of
Left _ -> do
Left _ -> do
updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
delete udlid
delete udlid
insert_ $ Delivery raid obid False
insert_ $ Delivery raid obid fwd False
Right _ -> delete udlid
Right _ -> delete udlid
_ -> runDB $ do
_ -> runDB $ do
updateWhere [UnfetchedRemoteActorId ==. uraid, UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now]
updateWhere [UnfetchedRemoteActorId ==. uraid, UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now]
@ -1473,13 +1478,13 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c
updateWhere [UnfetchedRemoteActorId ==. uraid, UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now]
updateWhere [UnfetchedRemoteActorId ==. uraid, UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now]
update udlid [UnlinkedDeliveryRunning =. False]
update udlid [UnlinkedDeliveryRunning =. False]
Just (Entity raid ra) -> do
Just (Entity raid ra) -> do
e'' <- deliver luActor h $ remoteActorInbox ra
(fwd, e'') <- deliver luActor h $ remoteActorInbox ra
runDB $
runDB $
case e'' of
case e'' of
Left _ -> do
Left _ -> do
updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
delete udlid
delete udlid
insert_ $ Delivery raid obid False
insert_ $ Delivery raid obid fwd False
Right _ -> delete udlid
Right _ -> delete udlid
retryOutboxDelivery :: Worker ()
retryOutboxDelivery :: Worker ()
@ -1504,6 +1509,7 @@ retryOutboxDelivery = do
, ura E.^. UnfetchedRemoteActorSince
, ura E.^. UnfetchedRemoteActorSince
, udl E.^. UnlinkedDeliveryId
, udl E.^. UnlinkedDeliveryId
, udl E.^. UnlinkedDeliveryActivity
, udl E.^. UnlinkedDeliveryActivity
, udl E.^. UnlinkedDeliveryForwarding
, ob E.^. OutboxItemActivity
, ob E.^. OutboxItemActivity
, ra E.?. RemoteActorId
, ra E.?. RemoteActorId
@ -1534,9 +1540,11 @@ retryOutboxDelivery = do
( i E.^. InstanceId
( i E.^. InstanceId
, i E.^. InstanceHost
, i E.^. InstanceHost
, ra E.^. RemoteActorId
, ra E.^. RemoteActorId
, ra E.^. RemoteActorIdent
, ra E.^. RemoteActorInbox
, ra E.^. RemoteActorInbox
, ra E.^. RemoteActorErrorSince
, ra E.^. RemoteActorErrorSince
, dl E.^. DeliveryId
, dl E.^. DeliveryId
, dl E.^. DeliveryForwarding
, ob E.^. OutboxItemActivity
, ob E.^. OutboxItemActivity
let (linkedOld, linkedNew) = partitionEithers $ map (decideBySinceDL dropAfter now . adaptLinked) linked
let (linkedOld, linkedNew) = partitionEithers $ map (decideBySinceDL dropAfter now . adaptLinked) linked
@ -1551,18 +1559,18 @@ retryOutboxDelivery = do
unless (and resultsUDL) $ logError "Periodic delivery UDL error"
unless (and resultsUDL) $ logError "Periodic delivery UDL error"
(E.Value iid, E.Value h, E.Value uraid, E.Value luRecip, E.Value since, E.Value udlid, E.Value obid, E.Value act, E.Value mraid) =
(E.Value iid, E.Value h, E.Value uraid, E.Value luRecip, E.Value since, E.Value udlid, E.Value obid, E.Value fwd, E.Value act, E.Value mraid) =
( mraid
( mraid
, ( ( (iid, h)
, ( ( (iid, h)
, ((uraid, luRecip), (udlid, obid, persistJSONValue act))
, ((uraid, luRecip), (udlid, fwd, obid, persistJSONValue act))
, since
, since
unlinkedID ((_, (_, (udlid, _, _))), _) = udlid
unlinkedID ((_, (_, (udlid, _, _, _))), _) = udlid
toLinked (raid, ((_, (_, (_, obid, _))), _)) = Delivery raid obid False
toLinked (raid, ((_, (_, (_, fwd, obid, _))), _)) = Delivery raid obid fwd False
relevant dropAfter now since = addUTCTime dropAfter since > now
relevant dropAfter now since = addUTCTime dropAfter since > now
decideBySinceUDL dropAfter now (udl@(_, (_, (udlid, _, _))), msince) =
decideBySinceUDL dropAfter now (udl@(_, (_, (udlid, _, _, _))), msince) =
case msince of
case msince of
Nothing -> Right udl
Nothing -> Right udl
Just since ->
Just since ->
@ -1573,13 +1581,13 @@ retryOutboxDelivery = do
= map (second $ groupWithExtractBy1 ((==) `on` fst) fst snd)
= map (second $ groupWithExtractBy1 ((==) `on` fst) fst snd)
. groupWithExtractBy ((==) `on` fst) fst snd
. groupWithExtractBy ((==) `on` fst) fst snd
(E.Value iid, E.Value h, E.Value raid, E.Value inbox, E.Value since, E.Value dlid, E.Value act) =
(E.Value iid, E.Value h, E.Value raid, E.Value ident, E.Value inbox, E.Value since, E.Value dlid, E.Value fwd, E.Value act) =
( ( (iid, h)
( ( (iid, h)
, ((raid, inbox), (dlid, persistJSONValue act))
, ((raid, (ident, inbox)), (dlid, fwd, persistJSONValue act))
, since
, since
decideBySinceDL dropAfter now (dl@(_, (_, (dlid, _))), msince) =
decideBySinceDL dropAfter now (dl@(_, (_, (dlid, _, _))), msince) =
case msince of
case msince of
Nothing -> Right dl
Nothing -> Right dl
Just since ->
Just since ->
@ -1599,9 +1607,10 @@ retryOutboxDelivery = do
return False
return False
Right success -> return success
Right success -> return success
deliverLinked deliver now ((_, h), recips) = do
deliverLinked deliver now ((_, h), recips) = do
waitsR <- for recips $ \ ((raid, inbox), delivs) -> fork $ do
waitsR <- for recips $ \ ((raid, (ident, inbox)), delivs) -> fork $ do
waitsD <- for delivs $ \ (dlid, doc) -> fork $ do
waitsD <- for delivs $ \ (dlid, fwd, doc) -> fork $ do
e <- deliver doc Nothing h inbox
let fwd' = if fwd then Just ident else Nothing
e <- deliver doc fwd' h inbox
case e of
case e of
Left _err -> return False
Left _err -> return False
Right _resp -> do
Right _resp -> do
@ -1624,13 +1633,14 @@ retryOutboxDelivery = do
e <- fetchRemoteActor iid h luRecip
e <- fetchRemoteActor iid h luRecip
case e of
case e of
Right (Right (Entity raid ra)) -> do
Right (Right (Entity raid ra)) -> do
waitsD <- for delivs $ \ (udlid, obid, doc) -> fork $ do
waitsD <- for delivs $ \ (udlid, fwd, obid, doc) -> fork $ do
e' <- deliver doc Nothing h $ remoteActorInbox ra
let fwd' = if fwd then Just luRecip else Nothing
e' <- deliver doc fwd' h $ remoteActorInbox ra
case e' of
case e' of
Left _err -> do
Left _err -> do
runSiteDB $ do
runSiteDB $ do
delete udlid
delete udlid
insert_ $ Delivery raid obid False
insert_ $ Delivery raid obid fwd False
return False
return False
Right _resp -> do
Right _resp -> do
runSiteDB $ delete udlid
runSiteDB $ delete udlid
@ -253,6 +253,10 @@ changes =
, removeField "RemoteMessage" "raw"
, removeField "RemoteMessage" "raw"
-- 63
-- 63
, removeEntity "RemoteRawObject"
, removeEntity "RemoteRawObject"
-- 64
, addFieldPrimRequired "UnlinkedDelivery" True "forwarding"
-- 65
, addFieldPrimRequired "Delivery" True "forwarding"
migrateDB :: MonadIO m => ReaderT SqlBackend m (Either Text (Int, Int))
migrateDB :: MonadIO m => ReaderT SqlBackend m (Either Text (Int, Int))
Add table
Reference in a new issue