From b0a26722d379a9e111c94a11cb899c9a63603890 Mon Sep 17 00:00:00 2001 From: fr33domlover Date: Fri, 3 May 2019 21:04:53 +0000 Subject: [PATCH] Do inbox forwarding in project inbox handler --- config/models | 10 + migrations/2019_05_03.model | 9 + src/Vervis/Federation.hs | 362 ++++++++++++++++++++++++++-------- src/Vervis/Migration.hs | 2 + src/Vervis/Migration/Model.hs | 4 + src/Web/ActivityPub.hs | 21 +- src/Yesod/ActivityPub.hs | 43 +++- 7 files changed, 356 insertions(+), 95 deletions(-) create mode 100644 migrations/2019_05_03.model diff --git a/config/models b/config/models index b3c2eab..3241569 100644 --- a/config/models +++ b/config/models @@ -80,6 +80,16 @@ Delivery UniqueDelivery recipient activity +Forwarding + recipient RemoteActorId + activity RemoteActivityId + activityRaw ByteString + sender ProjectId + signature ByteString + running Bool + + UniqueForwarding recipient activity + VerifKey ident LocalURI instance InstanceId diff --git a/migrations/2019_05_03.model b/migrations/2019_05_03.model new file mode 100644 index 0000000..0649070 --- /dev/null +++ b/migrations/2019_05_03.model @@ -0,0 +1,9 @@ +Forwarding + recipient RemoteActorId + activity RemoteActivityId + activityRaw ByteString + sender ProjectId + signature ByteString + running Bool + + UniqueForwarding recipient activity diff --git a/src/Vervis/Federation.hs b/src/Vervis/Federation.hs index 4e3f931..d5a0aae 100644 --- a/src/Vervis/Federation.hs +++ b/src/Vervis/Federation.hs @@ -62,6 +62,7 @@ import Yesod.Core hiding (logError, logWarn, logInfo) import Yesod.Persist.Core import qualified Data.ByteString.Lazy as BL +import qualified Data.CaseInsensitive as CI import qualified Data.List as L import qualified Data.List.NonEmpty as NE import qualified Data.List.Ordered as LO @@ -497,6 +498,81 @@ getLocalParentMessageId did shr lmid = do throwE "Local parent belongs to a different discussion" return mid +getPersonOrGroupId :: SharerId -> AppDB (Either PersonId GroupId) +getPersonOrGroupId sid = do + mpid <- getKeyBy $ UniquePersonIdent sid + mgid <- getKeyBy $ UniqueGroup sid + requireEitherM mpid mgid + "Found sharer that is neither person nor group" + "Found sharer that is both person and group" + +getTicketTeam :: SharerId -> AppDB ([PersonId], [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, LocalURI, Maybe UTCTime))]) +getTicketTeam sid = do + id_ <- getPersonOrGroupId sid + (,[]) <$> case id_ of + Left pid -> return [pid] + Right gid -> + map (groupMemberPerson . entityVal) <$> + selectList [GroupMemberGroup ==. gid] [] + +getFollowers :: FollowerSetId -> AppDB ([PersonId], [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, LocalURI, Maybe UTCTime))]) +getFollowers fsid = do + local <- selectList [FollowTarget ==. fsid] [] + remote <- E.select $ E.from $ \ (rf `E.InnerJoin` rs `E.InnerJoin` i) -> do + E.on $ rs E.^. RemoteActorInstance E.==. i E.^. InstanceId + E.on $ rf E.^. RemoteFollowActor E.==. rs E.^. RemoteActorId + E.where_ $ rf E.^. RemoteFollowTarget E.==. E.val fsid + E.orderBy [E.asc $ i E.^. InstanceId] + return + ( i E.^. InstanceId + , i E.^. InstanceHost + , rs E.^. RemoteActorId + , rs E.^. RemoteActorIdent + , rs E.^. RemoteActorInbox + , rs E.^. RemoteActorErrorSince + ) + return + ( map (followPerson . entityVal) local + , groupRemotes $ + map (\ (E.Value iid, E.Value h, E.Value rsid, E.Value luActor, E.Value luInbox, E.Value msince) -> + (iid, h, rsid, luActor, luInbox, msince) + ) + remote + ) + where + groupRemotes :: [(InstanceId, Text, RemoteActorId, LocalURI, LocalURI, Maybe UTCTime)] -> [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, LocalURI, Maybe UTCTime))] + groupRemotes = groupWithExtractBy ((==) `on` fst) fst snd . map toTuples + where + toTuples (iid, h, rsid, luA, luI, ms) = ((iid, h), (rsid, luA, luI, ms)) + +-- | Merge 2 lists ordered on fst, concatenating snd values when +-- multiple identical fsts occur. The resulting list is ordered on fst, +-- and each fst value appears only once. +-- +-- >>> mergeWith (+) [('a',3), ('a',1), ('b',5)] [('a',2), ('c',4)] +-- [('a',6), ('b',5), ('c',4)] +mergeConcat :: (Ord a, Semigroup b) => [(a, b)] -> [(a, b)] -> [(a, b)] +mergeConcat xs ys = map (second sconcat) $ groupWithExtract fst snd $ LO.mergeBy (compare `on` fst) xs ys + +fst3 :: (a, b, c) -> a +fst3 (x, _, _) = x + +fst4 :: (a, b, c, d) -> a +fst4 (x, _, _, _) = x + +thd3 :: (a, b, c) -> c +thd3 (_, _, z) = z + +fourth4 :: (a, b, c, d) -> d +fourth4 (_, _, _, w) = w + +insertMany' mk xs = zip' xs <$> insertMany (NE.toList $ mk <$> xs) + where + zip' x y = + case nonEmpty y of + Just y' | length x == length y' -> NE.zip x y' + _ -> error "insertMany' returned different length!" + handleSharerInbox :: UTCTime -> ShrIdent @@ -621,10 +697,11 @@ handleProjectInbox -> InstanceId -> Text -> RemoteActorId + -> BL.ByteString -> Object -> Activity -> ExceptT Text Handler Text -handleProjectInbox now shrRecip prjRecip iidSender hSender raidSender raw activity = +handleProjectInbox now shrRecip prjRecip iidSender hSender raidSender body raw activity = case activitySpecific activity of CreateActivity (Create note) -> handleNote (activityAudience activity) note @@ -648,16 +725,51 @@ handleProjectInbox now shrRecip prjRecip iidSender hSender raidSender raw activi if shr /= shrRecip || prj /= prjRecip then return $ recip <> " not using; context is a different project" else do + msig <- checkForward hLocal <- getsYesod $ appInstanceHost . appSettings let colls = findRelevantCollections hLocal num audience - runDBExcept $ do - (did, meparent) <- getContextAndParent num mparent - lift $ do + mremotesHttp <- runDBExcept $ do + (sid, fsid, jid, did, meparent) <- getContextAndParent num mparent + lift $ join <$> do mmid <- insertToDiscussion luNote published did meparent - for mmid $ updateOrphans luNote did - -- TODO CONTINUE inbox forwarding!!! - return $ recip <> " inserted new ticket comment" + for mmid $ \ (ractid, mid) -> do + updateOrphans luNote did mid + for msig $ \ sig -> do + remoteRecips <- deliverLocal ractid colls sid fsid + (sig,) <$> deliverRemoteDB ractid jid sig remoteRecips + lift $ for_ mremotesHttp $ \ (sig, remotesHttp) -> do + let handler e = logError $ "Project inbox handler: delivery failed! " <> T.pack (displayException e) + forkHandler handler $ deliverRemoteHttp sig remotesHttp + return $ recip <> " inserted new ticket comment" where + checkForward = join <$> do + let hSig = hForwardingSignature + msig <- maybeHeader hSig + for msig $ \ sig -> do + _proof <- withExceptT (T.pack . displayException) $ ExceptT $ + let requires = [hDigest, hActivityPubForwarder] + in prepareToVerifyHttpSigWith hSig False requires [] Nothing + forwarder <- requireHeader hActivityPubForwarder + renderUrl <- getUrlRender + let project = renderUrl $ ProjectR shrRecip prjRecip + return $ + if forwarder == encodeUtf8 project + then Just sig + else Nothing + where + maybeHeader n = do + let n' = decodeUtf8 $ CI.original n + hs <- lookupHeaders n + case hs of + [] -> return Nothing + [h] -> return $ Just h + _ -> throwE $ n' <> " multiple headers found" + requireHeader n = do + let n' = decodeUtf8 $ CI.original n + mh <- maybeHeader n + case mh of + Nothing -> throwE $ n' <> " header not found" + Just h -> return h findRelevantCollections hLocal numCtx = nub . mapMaybe decide . concatRecipients where decide u = do @@ -677,8 +789,8 @@ handleProjectInbox now shrRecip prjRecip iidSender hSender raidSender raw activi mt <- lift $ do sid <- getKeyBy404 $ UniqueSharer shrRecip jid <- getKeyBy404 $ UniqueProject prjRecip sid - getValBy $ UniqueTicket jid num - t <- fromMaybeE mt "Context: No such local ticket" + fmap (jid,sid,) <$> getValBy (UniqueTicket jid num) + (jid, sid, t) <- fromMaybeE mt "Context: No such local ticket" let did = ticketDiscuss t meparent <- for mparent $ \ parent -> case parent of @@ -695,7 +807,7 @@ handleProjectInbox now shrRecip prjRecip iidSender hSender raidSender raw activi throwE "Remote parent belongs to a different discussion" return mid Nothing -> return $ Right $ l2f hParent luParent - return (did, meparent) + return (sid, ticketFollowers t, jid, did, meparent) insertToDiscussion luNote published did meparent = do ractid <- either entityKey id <$> insertBy' RemoteActivity { remoteActivityInstance = iidSender @@ -727,7 +839,7 @@ handleProjectInbox now shrRecip prjRecip iidSender hSender raidSender raw activi Nothing -> do delete mid return Nothing - Just _ -> return $ Just mid + Just _ -> return $ Just (ractid, mid) updateOrphans luNote did mid = do let uNote = l2f hSender luNote related <- selectOrphans uNote (E.==.) @@ -757,6 +869,88 @@ handleProjectInbox now shrRecip prjRecip iidSender hSender raidSender raw activi rm E.^. RemoteMessageLostParent E.==. E.just (E.val uNote) E.&&. m E.^. MessageRoot `op` E.val did return (rm E.^. RemoteMessageId, m E.^. MessageId) + deliverLocal + :: RemoteActivityId + -> [LocalTicketRecipient] + -> SharerId + -> FollowerSetId + -> AppDB [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, LocalURI, Maybe UTCTime))] + deliverLocal ractid recips sid fsid = do + (teamPids, teamRemotes) <- + if LocalTicketTeam `elem` recips + then getTicketTeam sid + else return ([], []) + (fsPids, fsRemotes) <- + if LocalTicketParticipants `elem` recips + then getFollowers fsid + else return ([], []) + let pids = union teamPids fsPids + -- TODO inefficient, see the other TODOs about mergeConcat + remotes = map (second $ NE.nubBy ((==) `on` fst4)) $ mergeConcat teamRemotes fsRemotes + for_ pids $ \ pid -> insertUnique_ $ InboxItemRemote pid ractid + return remotes + + deliverRemoteDB + :: RemoteActivityId + -> ProjectId + -> ByteString + -> [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, LocalURI, Maybe UTCTime))] + -> AppDB + [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, LocalURI, ForwardingId))] + deliverRemoteDB ractid jid sig recips = do + let body' = BL.toStrict body + deliv raid msince = Forwarding raid ractid body' jid sig $ isNothing msince + fetchedDeliv <- for recips $ \ (i, rs) -> + (i,) <$> insertMany' (\ (raid, _, _, msince) -> deliv raid msince) rs + return $ takeNoError4 fetchedDeliv + where + takeNoError noError = mapMaybe $ \ (i, rs) -> (i,) <$> nonEmpty (mapMaybe noError $ NE.toList rs) + takeNoError4 = takeNoError noError + where + noError ((ak, luA, luI, Nothing), dlk) = Just (ak, luA, luI, dlk) + noError ((_ , _ , _ , Just _ ), _ ) = Nothing + + deliverRemoteHttp + :: ByteString + -> [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, LocalURI, ForwardingId))] + -> Handler () + deliverRemoteHttp sig fetched = do + let deliver h inbox = do + forwardActivity (l2f h inbox) sig (ProjectR shrRecip prjRecip) body + now <- liftIO getCurrentTime + traverse_ (fork . deliverFetched deliver now) fetched + where + fork = forkHandler $ \ e -> logError $ "Project inbox handler: delivery failed! " <> T.pack (displayException e) + deliverFetched deliver now ((_, h), recips@(r :| rs)) = do + let (raid, _luActor, luInbox, fwid) = r + e <- deliver h luInbox + let e' = case e of + Left err -> + if isInstanceErrorP err + then Nothing + else Just False + Right _resp -> Just True + case e' of + Nothing -> runDB $ do + let recips' = NE.toList recips + updateWhere [RemoteActorId <-. map fst4 recips', RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now] + updateWhere [ForwardingId <-. map fourth4 recips'] [ForwardingRunning =. False] + Just success -> do + runDB $ + if success + then delete fwid + else do + updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now] + update fwid [ForwardingRunning =. False] + for_ rs $ \ (raid, _luActor, luInbox, fwid) -> + fork $ do + e <- deliver h luInbox + runDB $ + case e of + Left _err -> do + updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now] + update fwid [ForwardingRunning =. False] + Right _resp -> delete fwid fixRunningDeliveries :: (MonadIO m, MonadLogger m, IsSqlBackend backend) => ReaderT backend m () fixRunningDeliveries = do @@ -772,6 +966,12 @@ fixRunningDeliveries = do , T.pack (show c) , " unlinked deliveries" ] + c'' <- updateWhereCount [ForwardingRunning ==. True] [ForwardingRunning =. False] + unless (c' == 0) $ logWarn $ T.concat + [ "fixRunningDeliveries fixed " + , T.pack (show c) + , " forwarding deliveries" + ] data LocalTicketRecipient = LocalTicketParticipants | LocalTicketTeam deriving (Eq, Ord) @@ -827,7 +1027,7 @@ deliverHttp -> LocalURI -> m (Either APPostError (Response ())) deliverHttp doc mfwd h luInbox = - postActivity (l2f h luInbox) (Left . l2f h <$> mfwd) doc + deliverActivity (l2f h luInbox) (l2f h <$> mfwd) doc isInstanceErrorHttp (InvalidUrlException _ _) = False isInstanceErrorHttp (HttpExceptionRequest _ hec) = @@ -1165,27 +1365,6 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c update obid [OutboxItemActivity =. PersistJSON doc] return (lmid, obid, doc) - -- | Merge 2 lists ordered on fst, concatenating snd values when - -- multiple identical fsts occur. The resulting list is ordered on fst, - -- and each fst value appears only once. - -- - -- >>> mergeWith (+) [('a',3), ('a',1), ('b',5)] [('a',2), ('c',4)] - -- [('a',6), ('b',5), ('c',4)] - mergeConcat :: (Ord a, Semigroup b) => [(a, b)] -> [(a, b)] -> [(a, b)] - mergeConcat xs ys = map (second sconcat) $ groupWithExtract fst snd $ LO.mergeBy (compare `on` fst) xs ys - - fst3 :: (a, b, c) -> a - fst3 (x, _, _) = x - - fst4 :: (a, b, c, d) -> a - fst4 (x, _, _, _) = x - - thd3 :: (a, b, c) -> c - thd3 (_, _, z) = z - - fourth4 :: (a, b, c, d) -> d - fourth4 (_, _, _, w) = w - -- Deliver to local recipients. For local users, find in DB and deliver. -- For local collections, expand them, deliver to local users, and return a -- list of remote actors found in them. @@ -1250,13 +1429,6 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c lift $ for_ (union recipPids morePids) $ \ pid -> insert_ $ InboxItemLocal pid obid return remotes where - getPersonOrGroupId :: SharerId -> AppDB (Either PersonId GroupId) - getPersonOrGroupId sid = do - mpid <- getKeyBy $ UniquePersonIdent sid - mgid <- getKeyBy $ UniqueGroup sid - requireEitherM mpid mgid - "Found sharer that is neither person nor group" - "Found sharer that is both person and group" getPersonId :: ShrIdent -> ExceptT Text AppDB PersonId getPersonId shr = do msid <- lift $ getKeyBy $ UniqueSharer shr @@ -1265,42 +1437,6 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c case id_ of Left pid -> return pid Right _gid -> throwE "Local Note addresses a local group" - groupRemotes :: [(InstanceId, Text, RemoteActorId, LocalURI, LocalURI, Maybe UTCTime)] -> [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, LocalURI, Maybe UTCTime))] - groupRemotes = groupWithExtractBy ((==) `on` fst) fst snd . map toTuples - where - toTuples (iid, h, rsid, luA, luI, ms) = ((iid, h), (rsid, luA, luI, ms)) - getTicketTeam :: SharerId -> AppDB ([PersonId], [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, LocalURI, Maybe UTCTime))]) - getTicketTeam sid = do - id_ <- getPersonOrGroupId sid - (,[]) <$> case id_ of - Left pid -> return [pid] - Right gid -> - map (groupMemberPerson . entityVal) <$> - selectList [GroupMemberGroup ==. gid] [] - getFollowers :: FollowerSetId -> AppDB ([PersonId], [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, LocalURI, Maybe UTCTime))]) - getFollowers fsid = do - local <- selectList [FollowTarget ==. fsid] [] - remote <- E.select $ E.from $ \ (rf `E.InnerJoin` rs `E.InnerJoin` i) -> do - E.on $ rs E.^. RemoteActorInstance E.==. i E.^. InstanceId - E.on $ rf E.^. RemoteFollowActor E.==. rs E.^. RemoteActorId - E.where_ $ rf E.^. RemoteFollowTarget E.==. E.val fsid - E.orderBy [E.asc $ i E.^. InstanceId] - return - ( i E.^. InstanceId - , i E.^. InstanceHost - , rs E.^. RemoteActorId - , rs E.^. RemoteActorIdent - , rs E.^. RemoteActorInbox - , rs E.^. RemoteActorErrorSince - ) - return - ( map (followPerson . entityVal) local - , groupRemotes $ - map (\ (E.Value iid, E.Value h, E.Value rsid, E.Value luActor, E.Value luInbox, E.Value msince) -> - (iid, h, rsid, luActor, luInbox, msince) - ) - remote - ) -- Deliver to a local sharer, if they exist as a user account deliverToLocalSharer :: OutboxItemId -> ShrIdent -> ExceptT Text AppDB () @@ -1375,13 +1511,6 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c groupByHost :: [FedURI] -> [(Text, NonEmpty LocalURI)] groupByHost = groupAllExtract furiHost (snd . f2l) - insertMany' mk xs = zip' xs <$> insertMany (NE.toList $ mk <$> xs) - where - zip' x y = - case nonEmpty y of - Just y' | length x == length y' -> NE.zip x y' - _ -> error "insertMany' returned different length!" - takeNoError noError = mapMaybe $ \ (i, rs) -> (i,) <$> nonEmpty (mapMaybe noError $ NE.toList rs) takeNoError3 = takeNoError noError where @@ -1490,7 +1619,7 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c retryOutboxDelivery :: Worker () retryOutboxDelivery = do now <- liftIO $ getCurrentTime - (udls, dls) <- runSiteDB $ do + (udls, dls, fws) <- runSiteDB $ do -- Get all unlinked deliveries which aren't running already in outbox -- post handlers unlinked' <- E.select $ E.from $ \ (udl `E.InnerJoin` ob `E.InnerJoin` ura `E.InnerJoin` i `E.LeftOuterJoin` ra) -> do @@ -1549,12 +1678,37 @@ retryOutboxDelivery = do ) let (linkedOld, linkedNew) = partitionEithers $ map (decideBySinceDL dropAfter now . adaptLinked) linked deleteWhere [DeliveryId <-. linkedOld] - return (groupUnlinked lonelyNew, groupLinked linkedNew) + -- Same for forwarding deliveries, which are always linked + forwarding <- E.select $ E.from $ \ (fw `E.InnerJoin` ra `E.InnerJoin` i `E.InnerJoin` j `E.InnerJoin` s) -> do + E.on $ j E.^. ProjectSharer E.==. s E.^. SharerId + E.on $ fw E.^. ForwardingSender E.==. j E.^. ProjectId + E.on $ ra E.^. RemoteActorInstance E.==. i E.^. InstanceId + E.on $ fw E.^. ForwardingRecipient E.==. ra E.^. RemoteActorId + E.where_ $ fw E.^. ForwardingRunning E.==. E.val False + E.orderBy [E.asc $ ra E.^. RemoteActorInstance, E.asc $ ra E.^. RemoteActorId] + return + ( i E.^. InstanceId + , i E.^. InstanceHost + , ra E.^. RemoteActorId + , ra E.^. RemoteActorInbox + , ra E.^. RemoteActorErrorSince + , fw E.^. ForwardingId + , fw E.^. ForwardingActivityRaw + , j E.^. ProjectIdent + , s E.^. SharerIdent + , fw E.^. ForwardingSignature + ) + let (forwardingOld, forwardingNew) = partitionEithers $ map (decideBySinceFW dropAfter now . adaptForwarding) forwarding + deleteWhere [ForwardingId <-. forwardingOld] + return (groupUnlinked lonelyNew, groupLinked linkedNew, groupForwarding forwardingNew) let deliver = deliverHttp waitsDL <- traverse (fork . deliverLinked deliver now) dls + waitsFW <- traverse (fork . deliverForwarding now) fws waitsUDL <- traverse (fork . deliverUnlinked deliver now) udls resultsDL <- sequence waitsDL unless (and resultsDL) $ logError "Periodic delivery DL error" + resultsFW <- sequence waitsFW + unless (and resultsFW) $ logError "Periodic delivery FW error" resultsUDL <- sequence waitsUDL unless (and resultsUDL) $ logError "Periodic delivery UDL error" where @@ -1597,6 +1751,23 @@ retryOutboxDelivery = do groupLinked = map (second $ groupWithExtractBy1 ((==) `on` fst) fst snd) . groupWithExtractBy ((==) `on` fst) fst snd + adaptForwarding + (E.Value iid, E.Value h, E.Value raid, E.Value inbox, E.Value since, E.Value fwid, E.Value body, E.Value prj, E.Value shr, E.Value sig) = + ( ( (iid, h) + , ((raid, inbox), (fwid, BL.fromStrict body, ProjectR shr prj, sig)) + ) + , since + ) + decideBySinceFW dropAfter now (fw@(_, (_, (fwid, _, _, _))), msince) = + case msince of + Nothing -> Right fw + Just since -> + if relevant dropAfter now since + then Right fw + else Left fwid + groupForwarding + = map (second $ groupWithExtractBy1 ((==) `on` fst) fst snd) + . groupWithExtractBy ((==) `on` fst) fst snd fork action = do wait <- asyncSite action return $ do @@ -1658,3 +1829,24 @@ retryOutboxDelivery = do unless (and results) $ logError $ "Periodic UDL delivery error for host " <> h return True + deliverForwarding now ((_, h), recips) = do + waitsR <- for recips $ \ ((raid, inbox), delivs) -> fork $ do + waitsD <- for delivs $ \ (fwid, body, sender, sig) -> fork $ do + e <- forwardActivity (l2f h inbox) sig sender body + case e of + Left _err -> return False + Right _resp -> do + runSiteDB $ delete fwid + return True + results <- sequence waitsD + runSiteDB $ + if and results + then update raid [RemoteActorErrorSince =. Nothing] + else if or results + then update raid [RemoteActorErrorSince =. Just now] + else updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now] + return True + results <- sequence waitsR + unless (and results) $ + logError $ "Periodic FW delivery error for host " <> h + return True diff --git a/src/Vervis/Migration.hs b/src/Vervis/Migration.hs index 40e0870..29e1da4 100644 --- a/src/Vervis/Migration.hs +++ b/src/Vervis/Migration.hs @@ -257,6 +257,8 @@ changes = , addFieldPrimRequired "UnlinkedDelivery" True "forwarding" -- 65 , addFieldPrimRequired "Delivery" True "forwarding" + -- 66 + , addEntities model_2019_05_03 ] migrateDB :: MonadIO m => ReaderT SqlBackend m (Either Text (Int, Int)) diff --git a/src/Vervis/Migration/Model.hs b/src/Vervis/Migration/Model.hs index 56c05da..4eb4fc5 100644 --- a/src/Vervis/Migration/Model.hs +++ b/src/Vervis/Migration/Model.hs @@ -38,6 +38,7 @@ module Vervis.Migration.Model , model_2019_04_11 , model_2019_04_12 , model_2019_04_22 + , model_2019_05_03 ) where @@ -102,3 +103,6 @@ model_2019_04_12 = $(schema "2019_04_12") model_2019_04_22 :: [Entity SqlBackend] model_2019_04_22 = $(schema "2019_04_22") + +model_2019_05_03 :: [Entity SqlBackend] +model_2019_05_03 = $(schema "2019_05_03") diff --git a/src/Web/ActivityPub.hs b/src/Web/ActivityPub.hs index 3fab2e2..c66771c 100644 --- a/src/Web/ActivityPub.hs +++ b/src/Web/ActivityPub.hs @@ -54,6 +54,7 @@ module Web.ActivityPub , hForwardingSignature , hForwardedSignature , httpPostAP + , httpPostAPBytes , Fetched (..) , fetchAPID , fetchAPID' @@ -102,6 +103,7 @@ import Network.HTTP.Client.Signature import qualified Data.ByteString as B import qualified Data.ByteString.Base64 as B64 import qualified Data.ByteString.Char8 as BC +import qualified Data.ByteString.Lazy as BL import qualified Data.HashMap.Strict as M (lookup) import qualified Data.Text as T (pack, unpack) import qualified Data.Vector as V @@ -668,10 +670,25 @@ httpPostAP -> a -> m (Either APPostError (Response ())) httpPostAP manager uri headers keyid sign uSender mfwd value = + httpPostAPBytes manager uri headers keyid sign uSender mfwd $ encode value + +-- | Like 'httpPostAP', except it takes the object as a raw lazy +-- 'BL.ByteString'. It's your responsibility to make sure it's valid JSON. +httpPostAPBytes + :: MonadIO m + => Manager + -> FedURI + -> NonEmpty HeaderName + -> S.KeyId + -> (ByteString -> S.Signature) + -> Text + -> Maybe (Either FedURI ByteString) + -> BL.ByteString + -> m (Either APPostError (Response ())) +httpPostAPBytes manager uri headers keyid sign uSender mfwd body = liftIO $ runExceptT $ do req <- requestFromURI $ toURI uri - let body = encode value - digest = formatHttpBodyDigest SHA256 "SHA-256" $ hashlazy body + let digest = formatHttpBodyDigest SHA256 "SHA-256" $ hashlazy body req' = setRequestCheckStatus $ consHeader hContentType typeActivityStreams2LD $ diff --git a/src/Yesod/ActivityPub.hs b/src/Yesod/ActivityPub.hs index 1fb5326..07a5f81 100644 --- a/src/Yesod/ActivityPub.hs +++ b/src/Yesod/ActivityPub.hs @@ -15,7 +15,8 @@ module Yesod.ActivityPub ( YesodActivityPub (..) - , postActivity + , deliverActivity + , forwardActivity ) where @@ -26,6 +27,8 @@ import Data.List.NonEmpty (NonEmpty) import Data.Text (Text) import Yesod.Core +import qualified Data.ByteString.Lazy as BL + import Network.HTTP.Client import Network.HTTP.Signature import Network.HTTP.Types.Header @@ -35,23 +38,47 @@ import Web.ActivityPub import Yesod.MonadSite class Yesod site => YesodActivityPub site where - sitePostSignedHeaders :: site -> NonEmpty HeaderName - siteGetHttpSign :: (MonadSite m, SiteEnv m ~ site) - => m (KeyId, ByteString -> Signature) + sitePostSignedHeaders :: site -> NonEmpty HeaderName + siteGetHttpSign :: (MonadSite m, SiteEnv m ~ site) + => m (KeyId, ByteString -> Signature) + {- + siteSigVerRequiredHeaders :: site -> [HeaderName] + siteSigVerWantedHeaders :: site -> [HeaderName] + siteSigVerSeconds :: site -> Int + -} -postActivity +deliverActivity :: ( MonadSite m , SiteEnv m ~ site , HasHttpManager site , YesodActivityPub site ) => FedURI - -> Maybe (Either FedURI ByteString) + -> Maybe FedURI -> Doc Activity -> m (Either APPostError (Response ())) -postActivity inbox mfwd doc@(Doc hAct activity) = do +deliverActivity inbox mfwd doc@(Doc hAct activity) = do manager <- asksSite getHttpManager headers <- asksSite sitePostSignedHeaders (keyid, sign) <- siteGetHttpSign let sender = renderFedURI $ l2f hAct (activityActor activity) - httpPostAP manager inbox headers keyid sign sender mfwd doc + httpPostAP manager inbox headers keyid sign sender (Left <$> mfwd) doc + +forwardActivity + :: ( MonadSite m + , SiteEnv m ~ site + , HasHttpManager site + , YesodActivityPub site + ) + => FedURI + -> ByteString + -> Route site + -> BL.ByteString + -> m (Either APPostError (Response ())) +forwardActivity inbox sig rSender body = do + manager <- asksSite getHttpManager + headers <- asksSite sitePostSignedHeaders + (keyid, sign) <- siteGetHttpSign + renderUrl <- askUrlRender + let sender = renderUrl rSender + httpPostAPBytes manager inbox headers keyid sign sender (Just $ Right sig) body