Do inbox forwarding in project inbox handler
This commit is contained in:
parent
5d5c56695e
commit
b0a26722d3
7 changed files with 356 additions and 95 deletions
|
@ -80,6 +80,16 @@ Delivery
|
|||
|
||||
UniqueDelivery recipient activity
|
||||
|
||||
Forwarding
|
||||
recipient RemoteActorId
|
||||
activity RemoteActivityId
|
||||
activityRaw ByteString
|
||||
sender ProjectId
|
||||
signature ByteString
|
||||
running Bool
|
||||
|
||||
UniqueForwarding recipient activity
|
||||
|
||||
VerifKey
|
||||
ident LocalURI
|
||||
instance InstanceId
|
||||
|
|
9
migrations/2019_05_03.model
Normal file
9
migrations/2019_05_03.model
Normal file
|
@ -0,0 +1,9 @@
|
|||
Forwarding
|
||||
recipient RemoteActorId
|
||||
activity RemoteActivityId
|
||||
activityRaw ByteString
|
||||
sender ProjectId
|
||||
signature ByteString
|
||||
running Bool
|
||||
|
||||
UniqueForwarding recipient activity
|
|
@ -62,6 +62,7 @@ import Yesod.Core hiding (logError, logWarn, logInfo)
|
|||
import Yesod.Persist.Core
|
||||
|
||||
import qualified Data.ByteString.Lazy as BL
|
||||
import qualified Data.CaseInsensitive as CI
|
||||
import qualified Data.List as L
|
||||
import qualified Data.List.NonEmpty as NE
|
||||
import qualified Data.List.Ordered as LO
|
||||
|
@ -497,6 +498,81 @@ getLocalParentMessageId did shr lmid = do
|
|||
throwE "Local parent belongs to a different discussion"
|
||||
return mid
|
||||
|
||||
getPersonOrGroupId :: SharerId -> AppDB (Either PersonId GroupId)
|
||||
getPersonOrGroupId sid = do
|
||||
mpid <- getKeyBy $ UniquePersonIdent sid
|
||||
mgid <- getKeyBy $ UniqueGroup sid
|
||||
requireEitherM mpid mgid
|
||||
"Found sharer that is neither person nor group"
|
||||
"Found sharer that is both person and group"
|
||||
|
||||
getTicketTeam :: SharerId -> AppDB ([PersonId], [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, LocalURI, Maybe UTCTime))])
|
||||
getTicketTeam sid = do
|
||||
id_ <- getPersonOrGroupId sid
|
||||
(,[]) <$> case id_ of
|
||||
Left pid -> return [pid]
|
||||
Right gid ->
|
||||
map (groupMemberPerson . entityVal) <$>
|
||||
selectList [GroupMemberGroup ==. gid] []
|
||||
|
||||
getFollowers :: FollowerSetId -> AppDB ([PersonId], [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, LocalURI, Maybe UTCTime))])
|
||||
getFollowers fsid = do
|
||||
local <- selectList [FollowTarget ==. fsid] []
|
||||
remote <- E.select $ E.from $ \ (rf `E.InnerJoin` rs `E.InnerJoin` i) -> do
|
||||
E.on $ rs E.^. RemoteActorInstance E.==. i E.^. InstanceId
|
||||
E.on $ rf E.^. RemoteFollowActor E.==. rs E.^. RemoteActorId
|
||||
E.where_ $ rf E.^. RemoteFollowTarget E.==. E.val fsid
|
||||
E.orderBy [E.asc $ i E.^. InstanceId]
|
||||
return
|
||||
( i E.^. InstanceId
|
||||
, i E.^. InstanceHost
|
||||
, rs E.^. RemoteActorId
|
||||
, rs E.^. RemoteActorIdent
|
||||
, rs E.^. RemoteActorInbox
|
||||
, rs E.^. RemoteActorErrorSince
|
||||
)
|
||||
return
|
||||
( map (followPerson . entityVal) local
|
||||
, groupRemotes $
|
||||
map (\ (E.Value iid, E.Value h, E.Value rsid, E.Value luActor, E.Value luInbox, E.Value msince) ->
|
||||
(iid, h, rsid, luActor, luInbox, msince)
|
||||
)
|
||||
remote
|
||||
)
|
||||
where
|
||||
groupRemotes :: [(InstanceId, Text, RemoteActorId, LocalURI, LocalURI, Maybe UTCTime)] -> [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, LocalURI, Maybe UTCTime))]
|
||||
groupRemotes = groupWithExtractBy ((==) `on` fst) fst snd . map toTuples
|
||||
where
|
||||
toTuples (iid, h, rsid, luA, luI, ms) = ((iid, h), (rsid, luA, luI, ms))
|
||||
|
||||
-- | Merge 2 lists ordered on fst, concatenating snd values when
|
||||
-- multiple identical fsts occur. The resulting list is ordered on fst,
|
||||
-- and each fst value appears only once.
|
||||
--
|
||||
-- >>> mergeWith (+) [('a',3), ('a',1), ('b',5)] [('a',2), ('c',4)]
|
||||
-- [('a',6), ('b',5), ('c',4)]
|
||||
mergeConcat :: (Ord a, Semigroup b) => [(a, b)] -> [(a, b)] -> [(a, b)]
|
||||
mergeConcat xs ys = map (second sconcat) $ groupWithExtract fst snd $ LO.mergeBy (compare `on` fst) xs ys
|
||||
|
||||
fst3 :: (a, b, c) -> a
|
||||
fst3 (x, _, _) = x
|
||||
|
||||
fst4 :: (a, b, c, d) -> a
|
||||
fst4 (x, _, _, _) = x
|
||||
|
||||
thd3 :: (a, b, c) -> c
|
||||
thd3 (_, _, z) = z
|
||||
|
||||
fourth4 :: (a, b, c, d) -> d
|
||||
fourth4 (_, _, _, w) = w
|
||||
|
||||
insertMany' mk xs = zip' xs <$> insertMany (NE.toList $ mk <$> xs)
|
||||
where
|
||||
zip' x y =
|
||||
case nonEmpty y of
|
||||
Just y' | length x == length y' -> NE.zip x y'
|
||||
_ -> error "insertMany' returned different length!"
|
||||
|
||||
handleSharerInbox
|
||||
:: UTCTime
|
||||
-> ShrIdent
|
||||
|
@ -621,10 +697,11 @@ handleProjectInbox
|
|||
-> InstanceId
|
||||
-> Text
|
||||
-> RemoteActorId
|
||||
-> BL.ByteString
|
||||
-> Object
|
||||
-> Activity
|
||||
-> ExceptT Text Handler Text
|
||||
handleProjectInbox now shrRecip prjRecip iidSender hSender raidSender raw activity =
|
||||
handleProjectInbox now shrRecip prjRecip iidSender hSender raidSender body raw activity =
|
||||
case activitySpecific activity of
|
||||
CreateActivity (Create note) ->
|
||||
handleNote (activityAudience activity) note
|
||||
|
@ -648,16 +725,51 @@ handleProjectInbox now shrRecip prjRecip iidSender hSender raidSender raw activi
|
|||
if shr /= shrRecip || prj /= prjRecip
|
||||
then return $ recip <> " not using; context is a different project"
|
||||
else do
|
||||
msig <- checkForward
|
||||
hLocal <- getsYesod $ appInstanceHost . appSettings
|
||||
let colls = findRelevantCollections hLocal num audience
|
||||
runDBExcept $ do
|
||||
(did, meparent) <- getContextAndParent num mparent
|
||||
lift $ do
|
||||
mremotesHttp <- runDBExcept $ do
|
||||
(sid, fsid, jid, did, meparent) <- getContextAndParent num mparent
|
||||
lift $ join <$> do
|
||||
mmid <- insertToDiscussion luNote published did meparent
|
||||
for mmid $ updateOrphans luNote did
|
||||
-- TODO CONTINUE inbox forwarding!!!
|
||||
return $ recip <> " inserted new ticket comment"
|
||||
for mmid $ \ (ractid, mid) -> do
|
||||
updateOrphans luNote did mid
|
||||
for msig $ \ sig -> do
|
||||
remoteRecips <- deliverLocal ractid colls sid fsid
|
||||
(sig,) <$> deliverRemoteDB ractid jid sig remoteRecips
|
||||
lift $ for_ mremotesHttp $ \ (sig, remotesHttp) -> do
|
||||
let handler e = logError $ "Project inbox handler: delivery failed! " <> T.pack (displayException e)
|
||||
forkHandler handler $ deliverRemoteHttp sig remotesHttp
|
||||
return $ recip <> " inserted new ticket comment"
|
||||
where
|
||||
checkForward = join <$> do
|
||||
let hSig = hForwardingSignature
|
||||
msig <- maybeHeader hSig
|
||||
for msig $ \ sig -> do
|
||||
_proof <- withExceptT (T.pack . displayException) $ ExceptT $
|
||||
let requires = [hDigest, hActivityPubForwarder]
|
||||
in prepareToVerifyHttpSigWith hSig False requires [] Nothing
|
||||
forwarder <- requireHeader hActivityPubForwarder
|
||||
renderUrl <- getUrlRender
|
||||
let project = renderUrl $ ProjectR shrRecip prjRecip
|
||||
return $
|
||||
if forwarder == encodeUtf8 project
|
||||
then Just sig
|
||||
else Nothing
|
||||
where
|
||||
maybeHeader n = do
|
||||
let n' = decodeUtf8 $ CI.original n
|
||||
hs <- lookupHeaders n
|
||||
case hs of
|
||||
[] -> return Nothing
|
||||
[h] -> return $ Just h
|
||||
_ -> throwE $ n' <> " multiple headers found"
|
||||
requireHeader n = do
|
||||
let n' = decodeUtf8 $ CI.original n
|
||||
mh <- maybeHeader n
|
||||
case mh of
|
||||
Nothing -> throwE $ n' <> " header not found"
|
||||
Just h -> return h
|
||||
findRelevantCollections hLocal numCtx = nub . mapMaybe decide . concatRecipients
|
||||
where
|
||||
decide u = do
|
||||
|
@ -677,8 +789,8 @@ handleProjectInbox now shrRecip prjRecip iidSender hSender raidSender raw activi
|
|||
mt <- lift $ do
|
||||
sid <- getKeyBy404 $ UniqueSharer shrRecip
|
||||
jid <- getKeyBy404 $ UniqueProject prjRecip sid
|
||||
getValBy $ UniqueTicket jid num
|
||||
t <- fromMaybeE mt "Context: No such local ticket"
|
||||
fmap (jid,sid,) <$> getValBy (UniqueTicket jid num)
|
||||
(jid, sid, t) <- fromMaybeE mt "Context: No such local ticket"
|
||||
let did = ticketDiscuss t
|
||||
meparent <- for mparent $ \ parent ->
|
||||
case parent of
|
||||
|
@ -695,7 +807,7 @@ handleProjectInbox now shrRecip prjRecip iidSender hSender raidSender raw activi
|
|||
throwE "Remote parent belongs to a different discussion"
|
||||
return mid
|
||||
Nothing -> return $ Right $ l2f hParent luParent
|
||||
return (did, meparent)
|
||||
return (sid, ticketFollowers t, jid, did, meparent)
|
||||
insertToDiscussion luNote published did meparent = do
|
||||
ractid <- either entityKey id <$> insertBy' RemoteActivity
|
||||
{ remoteActivityInstance = iidSender
|
||||
|
@ -727,7 +839,7 @@ handleProjectInbox now shrRecip prjRecip iidSender hSender raidSender raw activi
|
|||
Nothing -> do
|
||||
delete mid
|
||||
return Nothing
|
||||
Just _ -> return $ Just mid
|
||||
Just _ -> return $ Just (ractid, mid)
|
||||
updateOrphans luNote did mid = do
|
||||
let uNote = l2f hSender luNote
|
||||
related <- selectOrphans uNote (E.==.)
|
||||
|
@ -757,6 +869,88 @@ handleProjectInbox now shrRecip prjRecip iidSender hSender raidSender raw activi
|
|||
rm E.^. RemoteMessageLostParent E.==. E.just (E.val uNote) E.&&.
|
||||
m E.^. MessageRoot `op` E.val did
|
||||
return (rm E.^. RemoteMessageId, m E.^. MessageId)
|
||||
deliverLocal
|
||||
:: RemoteActivityId
|
||||
-> [LocalTicketRecipient]
|
||||
-> SharerId
|
||||
-> FollowerSetId
|
||||
-> AppDB [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, LocalURI, Maybe UTCTime))]
|
||||
deliverLocal ractid recips sid fsid = do
|
||||
(teamPids, teamRemotes) <-
|
||||
if LocalTicketTeam `elem` recips
|
||||
then getTicketTeam sid
|
||||
else return ([], [])
|
||||
(fsPids, fsRemotes) <-
|
||||
if LocalTicketParticipants `elem` recips
|
||||
then getFollowers fsid
|
||||
else return ([], [])
|
||||
let pids = union teamPids fsPids
|
||||
-- TODO inefficient, see the other TODOs about mergeConcat
|
||||
remotes = map (second $ NE.nubBy ((==) `on` fst4)) $ mergeConcat teamRemotes fsRemotes
|
||||
for_ pids $ \ pid -> insertUnique_ $ InboxItemRemote pid ractid
|
||||
return remotes
|
||||
|
||||
deliverRemoteDB
|
||||
:: RemoteActivityId
|
||||
-> ProjectId
|
||||
-> ByteString
|
||||
-> [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, LocalURI, Maybe UTCTime))]
|
||||
-> AppDB
|
||||
[((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, LocalURI, ForwardingId))]
|
||||
deliverRemoteDB ractid jid sig recips = do
|
||||
let body' = BL.toStrict body
|
||||
deliv raid msince = Forwarding raid ractid body' jid sig $ isNothing msince
|
||||
fetchedDeliv <- for recips $ \ (i, rs) ->
|
||||
(i,) <$> insertMany' (\ (raid, _, _, msince) -> deliv raid msince) rs
|
||||
return $ takeNoError4 fetchedDeliv
|
||||
where
|
||||
takeNoError noError = mapMaybe $ \ (i, rs) -> (i,) <$> nonEmpty (mapMaybe noError $ NE.toList rs)
|
||||
takeNoError4 = takeNoError noError
|
||||
where
|
||||
noError ((ak, luA, luI, Nothing), dlk) = Just (ak, luA, luI, dlk)
|
||||
noError ((_ , _ , _ , Just _ ), _ ) = Nothing
|
||||
|
||||
deliverRemoteHttp
|
||||
:: ByteString
|
||||
-> [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, LocalURI, ForwardingId))]
|
||||
-> Handler ()
|
||||
deliverRemoteHttp sig fetched = do
|
||||
let deliver h inbox = do
|
||||
forwardActivity (l2f h inbox) sig (ProjectR shrRecip prjRecip) body
|
||||
now <- liftIO getCurrentTime
|
||||
traverse_ (fork . deliverFetched deliver now) fetched
|
||||
where
|
||||
fork = forkHandler $ \ e -> logError $ "Project inbox handler: delivery failed! " <> T.pack (displayException e)
|
||||
deliverFetched deliver now ((_, h), recips@(r :| rs)) = do
|
||||
let (raid, _luActor, luInbox, fwid) = r
|
||||
e <- deliver h luInbox
|
||||
let e' = case e of
|
||||
Left err ->
|
||||
if isInstanceErrorP err
|
||||
then Nothing
|
||||
else Just False
|
||||
Right _resp -> Just True
|
||||
case e' of
|
||||
Nothing -> runDB $ do
|
||||
let recips' = NE.toList recips
|
||||
updateWhere [RemoteActorId <-. map fst4 recips', RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
|
||||
updateWhere [ForwardingId <-. map fourth4 recips'] [ForwardingRunning =. False]
|
||||
Just success -> do
|
||||
runDB $
|
||||
if success
|
||||
then delete fwid
|
||||
else do
|
||||
updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
|
||||
update fwid [ForwardingRunning =. False]
|
||||
for_ rs $ \ (raid, _luActor, luInbox, fwid) ->
|
||||
fork $ do
|
||||
e <- deliver h luInbox
|
||||
runDB $
|
||||
case e of
|
||||
Left _err -> do
|
||||
updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
|
||||
update fwid [ForwardingRunning =. False]
|
||||
Right _resp -> delete fwid
|
||||
|
||||
fixRunningDeliveries :: (MonadIO m, MonadLogger m, IsSqlBackend backend) => ReaderT backend m ()
|
||||
fixRunningDeliveries = do
|
||||
|
@ -772,6 +966,12 @@ fixRunningDeliveries = do
|
|||
, T.pack (show c)
|
||||
, " unlinked deliveries"
|
||||
]
|
||||
c'' <- updateWhereCount [ForwardingRunning ==. True] [ForwardingRunning =. False]
|
||||
unless (c' == 0) $ logWarn $ T.concat
|
||||
[ "fixRunningDeliveries fixed "
|
||||
, T.pack (show c)
|
||||
, " forwarding deliveries"
|
||||
]
|
||||
|
||||
data LocalTicketRecipient = LocalTicketParticipants | LocalTicketTeam
|
||||
deriving (Eq, Ord)
|
||||
|
@ -827,7 +1027,7 @@ deliverHttp
|
|||
-> LocalURI
|
||||
-> m (Either APPostError (Response ()))
|
||||
deliverHttp doc mfwd h luInbox =
|
||||
postActivity (l2f h luInbox) (Left . l2f h <$> mfwd) doc
|
||||
deliverActivity (l2f h luInbox) (l2f h <$> mfwd) doc
|
||||
|
||||
isInstanceErrorHttp (InvalidUrlException _ _) = False
|
||||
isInstanceErrorHttp (HttpExceptionRequest _ hec) =
|
||||
|
@ -1165,27 +1365,6 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c
|
|||
update obid [OutboxItemActivity =. PersistJSON doc]
|
||||
return (lmid, obid, doc)
|
||||
|
||||
-- | Merge 2 lists ordered on fst, concatenating snd values when
|
||||
-- multiple identical fsts occur. The resulting list is ordered on fst,
|
||||
-- and each fst value appears only once.
|
||||
--
|
||||
-- >>> mergeWith (+) [('a',3), ('a',1), ('b',5)] [('a',2), ('c',4)]
|
||||
-- [('a',6), ('b',5), ('c',4)]
|
||||
mergeConcat :: (Ord a, Semigroup b) => [(a, b)] -> [(a, b)] -> [(a, b)]
|
||||
mergeConcat xs ys = map (second sconcat) $ groupWithExtract fst snd $ LO.mergeBy (compare `on` fst) xs ys
|
||||
|
||||
fst3 :: (a, b, c) -> a
|
||||
fst3 (x, _, _) = x
|
||||
|
||||
fst4 :: (a, b, c, d) -> a
|
||||
fst4 (x, _, _, _) = x
|
||||
|
||||
thd3 :: (a, b, c) -> c
|
||||
thd3 (_, _, z) = z
|
||||
|
||||
fourth4 :: (a, b, c, d) -> d
|
||||
fourth4 (_, _, _, w) = w
|
||||
|
||||
-- Deliver to local recipients. For local users, find in DB and deliver.
|
||||
-- For local collections, expand them, deliver to local users, and return a
|
||||
-- list of remote actors found in them.
|
||||
|
@ -1250,13 +1429,6 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c
|
|||
lift $ for_ (union recipPids morePids) $ \ pid -> insert_ $ InboxItemLocal pid obid
|
||||
return remotes
|
||||
where
|
||||
getPersonOrGroupId :: SharerId -> AppDB (Either PersonId GroupId)
|
||||
getPersonOrGroupId sid = do
|
||||
mpid <- getKeyBy $ UniquePersonIdent sid
|
||||
mgid <- getKeyBy $ UniqueGroup sid
|
||||
requireEitherM mpid mgid
|
||||
"Found sharer that is neither person nor group"
|
||||
"Found sharer that is both person and group"
|
||||
getPersonId :: ShrIdent -> ExceptT Text AppDB PersonId
|
||||
getPersonId shr = do
|
||||
msid <- lift $ getKeyBy $ UniqueSharer shr
|
||||
|
@ -1265,42 +1437,6 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c
|
|||
case id_ of
|
||||
Left pid -> return pid
|
||||
Right _gid -> throwE "Local Note addresses a local group"
|
||||
groupRemotes :: [(InstanceId, Text, RemoteActorId, LocalURI, LocalURI, Maybe UTCTime)] -> [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, LocalURI, Maybe UTCTime))]
|
||||
groupRemotes = groupWithExtractBy ((==) `on` fst) fst snd . map toTuples
|
||||
where
|
||||
toTuples (iid, h, rsid, luA, luI, ms) = ((iid, h), (rsid, luA, luI, ms))
|
||||
getTicketTeam :: SharerId -> AppDB ([PersonId], [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, LocalURI, Maybe UTCTime))])
|
||||
getTicketTeam sid = do
|
||||
id_ <- getPersonOrGroupId sid
|
||||
(,[]) <$> case id_ of
|
||||
Left pid -> return [pid]
|
||||
Right gid ->
|
||||
map (groupMemberPerson . entityVal) <$>
|
||||
selectList [GroupMemberGroup ==. gid] []
|
||||
getFollowers :: FollowerSetId -> AppDB ([PersonId], [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, LocalURI, Maybe UTCTime))])
|
||||
getFollowers fsid = do
|
||||
local <- selectList [FollowTarget ==. fsid] []
|
||||
remote <- E.select $ E.from $ \ (rf `E.InnerJoin` rs `E.InnerJoin` i) -> do
|
||||
E.on $ rs E.^. RemoteActorInstance E.==. i E.^. InstanceId
|
||||
E.on $ rf E.^. RemoteFollowActor E.==. rs E.^. RemoteActorId
|
||||
E.where_ $ rf E.^. RemoteFollowTarget E.==. E.val fsid
|
||||
E.orderBy [E.asc $ i E.^. InstanceId]
|
||||
return
|
||||
( i E.^. InstanceId
|
||||
, i E.^. InstanceHost
|
||||
, rs E.^. RemoteActorId
|
||||
, rs E.^. RemoteActorIdent
|
||||
, rs E.^. RemoteActorInbox
|
||||
, rs E.^. RemoteActorErrorSince
|
||||
)
|
||||
return
|
||||
( map (followPerson . entityVal) local
|
||||
, groupRemotes $
|
||||
map (\ (E.Value iid, E.Value h, E.Value rsid, E.Value luActor, E.Value luInbox, E.Value msince) ->
|
||||
(iid, h, rsid, luActor, luInbox, msince)
|
||||
)
|
||||
remote
|
||||
)
|
||||
|
||||
-- Deliver to a local sharer, if they exist as a user account
|
||||
deliverToLocalSharer :: OutboxItemId -> ShrIdent -> ExceptT Text AppDB ()
|
||||
|
@ -1375,13 +1511,6 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c
|
|||
groupByHost :: [FedURI] -> [(Text, NonEmpty LocalURI)]
|
||||
groupByHost = groupAllExtract furiHost (snd . f2l)
|
||||
|
||||
insertMany' mk xs = zip' xs <$> insertMany (NE.toList $ mk <$> xs)
|
||||
where
|
||||
zip' x y =
|
||||
case nonEmpty y of
|
||||
Just y' | length x == length y' -> NE.zip x y'
|
||||
_ -> error "insertMany' returned different length!"
|
||||
|
||||
takeNoError noError = mapMaybe $ \ (i, rs) -> (i,) <$> nonEmpty (mapMaybe noError $ NE.toList rs)
|
||||
takeNoError3 = takeNoError noError
|
||||
where
|
||||
|
@ -1490,7 +1619,7 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c
|
|||
retryOutboxDelivery :: Worker ()
|
||||
retryOutboxDelivery = do
|
||||
now <- liftIO $ getCurrentTime
|
||||
(udls, dls) <- runSiteDB $ do
|
||||
(udls, dls, fws) <- runSiteDB $ do
|
||||
-- Get all unlinked deliveries which aren't running already in outbox
|
||||
-- post handlers
|
||||
unlinked' <- E.select $ E.from $ \ (udl `E.InnerJoin` ob `E.InnerJoin` ura `E.InnerJoin` i `E.LeftOuterJoin` ra) -> do
|
||||
|
@ -1549,12 +1678,37 @@ retryOutboxDelivery = do
|
|||
)
|
||||
let (linkedOld, linkedNew) = partitionEithers $ map (decideBySinceDL dropAfter now . adaptLinked) linked
|
||||
deleteWhere [DeliveryId <-. linkedOld]
|
||||
return (groupUnlinked lonelyNew, groupLinked linkedNew)
|
||||
-- Same for forwarding deliveries, which are always linked
|
||||
forwarding <- E.select $ E.from $ \ (fw `E.InnerJoin` ra `E.InnerJoin` i `E.InnerJoin` j `E.InnerJoin` s) -> do
|
||||
E.on $ j E.^. ProjectSharer E.==. s E.^. SharerId
|
||||
E.on $ fw E.^. ForwardingSender E.==. j E.^. ProjectId
|
||||
E.on $ ra E.^. RemoteActorInstance E.==. i E.^. InstanceId
|
||||
E.on $ fw E.^. ForwardingRecipient E.==. ra E.^. RemoteActorId
|
||||
E.where_ $ fw E.^. ForwardingRunning E.==. E.val False
|
||||
E.orderBy [E.asc $ ra E.^. RemoteActorInstance, E.asc $ ra E.^. RemoteActorId]
|
||||
return
|
||||
( i E.^. InstanceId
|
||||
, i E.^. InstanceHost
|
||||
, ra E.^. RemoteActorId
|
||||
, ra E.^. RemoteActorInbox
|
||||
, ra E.^. RemoteActorErrorSince
|
||||
, fw E.^. ForwardingId
|
||||
, fw E.^. ForwardingActivityRaw
|
||||
, j E.^. ProjectIdent
|
||||
, s E.^. SharerIdent
|
||||
, fw E.^. ForwardingSignature
|
||||
)
|
||||
let (forwardingOld, forwardingNew) = partitionEithers $ map (decideBySinceFW dropAfter now . adaptForwarding) forwarding
|
||||
deleteWhere [ForwardingId <-. forwardingOld]
|
||||
return (groupUnlinked lonelyNew, groupLinked linkedNew, groupForwarding forwardingNew)
|
||||
let deliver = deliverHttp
|
||||
waitsDL <- traverse (fork . deliverLinked deliver now) dls
|
||||
waitsFW <- traverse (fork . deliverForwarding now) fws
|
||||
waitsUDL <- traverse (fork . deliverUnlinked deliver now) udls
|
||||
resultsDL <- sequence waitsDL
|
||||
unless (and resultsDL) $ logError "Periodic delivery DL error"
|
||||
resultsFW <- sequence waitsFW
|
||||
unless (and resultsFW) $ logError "Periodic delivery FW error"
|
||||
resultsUDL <- sequence waitsUDL
|
||||
unless (and resultsUDL) $ logError "Periodic delivery UDL error"
|
||||
where
|
||||
|
@ -1597,6 +1751,23 @@ retryOutboxDelivery = do
|
|||
groupLinked
|
||||
= map (second $ groupWithExtractBy1 ((==) `on` fst) fst snd)
|
||||
. groupWithExtractBy ((==) `on` fst) fst snd
|
||||
adaptForwarding
|
||||
(E.Value iid, E.Value h, E.Value raid, E.Value inbox, E.Value since, E.Value fwid, E.Value body, E.Value prj, E.Value shr, E.Value sig) =
|
||||
( ( (iid, h)
|
||||
, ((raid, inbox), (fwid, BL.fromStrict body, ProjectR shr prj, sig))
|
||||
)
|
||||
, since
|
||||
)
|
||||
decideBySinceFW dropAfter now (fw@(_, (_, (fwid, _, _, _))), msince) =
|
||||
case msince of
|
||||
Nothing -> Right fw
|
||||
Just since ->
|
||||
if relevant dropAfter now since
|
||||
then Right fw
|
||||
else Left fwid
|
||||
groupForwarding
|
||||
= map (second $ groupWithExtractBy1 ((==) `on` fst) fst snd)
|
||||
. groupWithExtractBy ((==) `on` fst) fst snd
|
||||
fork action = do
|
||||
wait <- asyncSite action
|
||||
return $ do
|
||||
|
@ -1658,3 +1829,24 @@ retryOutboxDelivery = do
|
|||
unless (and results) $
|
||||
logError $ "Periodic UDL delivery error for host " <> h
|
||||
return True
|
||||
deliverForwarding now ((_, h), recips) = do
|
||||
waitsR <- for recips $ \ ((raid, inbox), delivs) -> fork $ do
|
||||
waitsD <- for delivs $ \ (fwid, body, sender, sig) -> fork $ do
|
||||
e <- forwardActivity (l2f h inbox) sig sender body
|
||||
case e of
|
||||
Left _err -> return False
|
||||
Right _resp -> do
|
||||
runSiteDB $ delete fwid
|
||||
return True
|
||||
results <- sequence waitsD
|
||||
runSiteDB $
|
||||
if and results
|
||||
then update raid [RemoteActorErrorSince =. Nothing]
|
||||
else if or results
|
||||
then update raid [RemoteActorErrorSince =. Just now]
|
||||
else updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
|
||||
return True
|
||||
results <- sequence waitsR
|
||||
unless (and results) $
|
||||
logError $ "Periodic FW delivery error for host " <> h
|
||||
return True
|
||||
|
|
|
@ -257,6 +257,8 @@ changes =
|
|||
, addFieldPrimRequired "UnlinkedDelivery" True "forwarding"
|
||||
-- 65
|
||||
, addFieldPrimRequired "Delivery" True "forwarding"
|
||||
-- 66
|
||||
, addEntities model_2019_05_03
|
||||
]
|
||||
|
||||
migrateDB :: MonadIO m => ReaderT SqlBackend m (Either Text (Int, Int))
|
||||
|
|
|
@ -38,6 +38,7 @@ module Vervis.Migration.Model
|
|||
, model_2019_04_11
|
||||
, model_2019_04_12
|
||||
, model_2019_04_22
|
||||
, model_2019_05_03
|
||||
)
|
||||
where
|
||||
|
||||
|
@ -102,3 +103,6 @@ model_2019_04_12 = $(schema "2019_04_12")
|
|||
|
||||
model_2019_04_22 :: [Entity SqlBackend]
|
||||
model_2019_04_22 = $(schema "2019_04_22")
|
||||
|
||||
model_2019_05_03 :: [Entity SqlBackend]
|
||||
model_2019_05_03 = $(schema "2019_05_03")
|
||||
|
|
|
@ -54,6 +54,7 @@ module Web.ActivityPub
|
|||
, hForwardingSignature
|
||||
, hForwardedSignature
|
||||
, httpPostAP
|
||||
, httpPostAPBytes
|
||||
, Fetched (..)
|
||||
, fetchAPID
|
||||
, fetchAPID'
|
||||
|
@ -102,6 +103,7 @@ import Network.HTTP.Client.Signature
|
|||
import qualified Data.ByteString as B
|
||||
import qualified Data.ByteString.Base64 as B64
|
||||
import qualified Data.ByteString.Char8 as BC
|
||||
import qualified Data.ByteString.Lazy as BL
|
||||
import qualified Data.HashMap.Strict as M (lookup)
|
||||
import qualified Data.Text as T (pack, unpack)
|
||||
import qualified Data.Vector as V
|
||||
|
@ -668,10 +670,25 @@ httpPostAP
|
|||
-> a
|
||||
-> m (Either APPostError (Response ()))
|
||||
httpPostAP manager uri headers keyid sign uSender mfwd value =
|
||||
httpPostAPBytes manager uri headers keyid sign uSender mfwd $ encode value
|
||||
|
||||
-- | Like 'httpPostAP', except it takes the object as a raw lazy
|
||||
-- 'BL.ByteString'. It's your responsibility to make sure it's valid JSON.
|
||||
httpPostAPBytes
|
||||
:: MonadIO m
|
||||
=> Manager
|
||||
-> FedURI
|
||||
-> NonEmpty HeaderName
|
||||
-> S.KeyId
|
||||
-> (ByteString -> S.Signature)
|
||||
-> Text
|
||||
-> Maybe (Either FedURI ByteString)
|
||||
-> BL.ByteString
|
||||
-> m (Either APPostError (Response ()))
|
||||
httpPostAPBytes manager uri headers keyid sign uSender mfwd body =
|
||||
liftIO $ runExceptT $ do
|
||||
req <- requestFromURI $ toURI uri
|
||||
let body = encode value
|
||||
digest = formatHttpBodyDigest SHA256 "SHA-256" $ hashlazy body
|
||||
let digest = formatHttpBodyDigest SHA256 "SHA-256" $ hashlazy body
|
||||
req' =
|
||||
setRequestCheckStatus $
|
||||
consHeader hContentType typeActivityStreams2LD $
|
||||
|
|
|
@ -15,7 +15,8 @@
|
|||
|
||||
module Yesod.ActivityPub
|
||||
( YesodActivityPub (..)
|
||||
, postActivity
|
||||
, deliverActivity
|
||||
, forwardActivity
|
||||
)
|
||||
where
|
||||
|
||||
|
@ -26,6 +27,8 @@ import Data.List.NonEmpty (NonEmpty)
|
|||
import Data.Text (Text)
|
||||
import Yesod.Core
|
||||
|
||||
import qualified Data.ByteString.Lazy as BL
|
||||
|
||||
import Network.HTTP.Client
|
||||
import Network.HTTP.Signature
|
||||
import Network.HTTP.Types.Header
|
||||
|
@ -35,23 +38,47 @@ import Web.ActivityPub
|
|||
import Yesod.MonadSite
|
||||
|
||||
class Yesod site => YesodActivityPub site where
|
||||
sitePostSignedHeaders :: site -> NonEmpty HeaderName
|
||||
siteGetHttpSign :: (MonadSite m, SiteEnv m ~ site)
|
||||
=> m (KeyId, ByteString -> Signature)
|
||||
sitePostSignedHeaders :: site -> NonEmpty HeaderName
|
||||
siteGetHttpSign :: (MonadSite m, SiteEnv m ~ site)
|
||||
=> m (KeyId, ByteString -> Signature)
|
||||
{-
|
||||
siteSigVerRequiredHeaders :: site -> [HeaderName]
|
||||
siteSigVerWantedHeaders :: site -> [HeaderName]
|
||||
siteSigVerSeconds :: site -> Int
|
||||
-}
|
||||
|
||||
postActivity
|
||||
deliverActivity
|
||||
:: ( MonadSite m
|
||||
, SiteEnv m ~ site
|
||||
, HasHttpManager site
|
||||
, YesodActivityPub site
|
||||
)
|
||||
=> FedURI
|
||||
-> Maybe (Either FedURI ByteString)
|
||||
-> Maybe FedURI
|
||||
-> Doc Activity
|
||||
-> m (Either APPostError (Response ()))
|
||||
postActivity inbox mfwd doc@(Doc hAct activity) = do
|
||||
deliverActivity inbox mfwd doc@(Doc hAct activity) = do
|
||||
manager <- asksSite getHttpManager
|
||||
headers <- asksSite sitePostSignedHeaders
|
||||
(keyid, sign) <- siteGetHttpSign
|
||||
let sender = renderFedURI $ l2f hAct (activityActor activity)
|
||||
httpPostAP manager inbox headers keyid sign sender mfwd doc
|
||||
httpPostAP manager inbox headers keyid sign sender (Left <$> mfwd) doc
|
||||
|
||||
forwardActivity
|
||||
:: ( MonadSite m
|
||||
, SiteEnv m ~ site
|
||||
, HasHttpManager site
|
||||
, YesodActivityPub site
|
||||
)
|
||||
=> FedURI
|
||||
-> ByteString
|
||||
-> Route site
|
||||
-> BL.ByteString
|
||||
-> m (Either APPostError (Response ()))
|
||||
forwardActivity inbox sig rSender body = do
|
||||
manager <- asksSite getHttpManager
|
||||
headers <- asksSite sitePostSignedHeaders
|
||||
(keyid, sign) <- siteGetHttpSign
|
||||
renderUrl <- askUrlRender
|
||||
let sender = renderUrl rSender
|
||||
httpPostAPBytes manager inbox headers keyid sign sender (Just $ Right sig) body
|
||||
|
|
Loading…
Reference in a new issue