Write C2S Offer{Ticket} handler, not used in any route handlers yet
This patch doesn't just add the handler code, it also does lots of refactoring and moves around pieces of code that are used in multiple places. There is still lots of refactoring to make though. In this patch I tried to make minimal changes to the existing Note handler to avoid breaking it. In later patches I'll do some more serious refactoring, hopefully resulting with less mess in the code.
This commit is contained in:
parent
d6b999eaf3
commit
55fdb5437c
6 changed files with 540 additions and 328 deletions
|
@ -18,12 +18,14 @@ module Data.List.NonEmpty.Local
|
|||
, groupWithExtractBy
|
||||
, groupWithExtractBy1
|
||||
, groupAllExtract
|
||||
, unionGroupsOrdWith
|
||||
)
|
||||
where
|
||||
|
||||
import Data.Function
|
||||
import Data.List.NonEmpty (NonEmpty (..))
|
||||
|
||||
import qualified Data.List.Ordered as LO
|
||||
import qualified Data.List.NonEmpty as NE
|
||||
|
||||
extract :: (a -> b) -> (a -> c) -> NonEmpty a -> (b, NonEmpty c)
|
||||
|
@ -56,3 +58,29 @@ groupWithExtractBy1 eq f g = NE.map (extract f g) . NE.groupBy1 (eq `on` f)
|
|||
|
||||
groupAllExtract :: Ord b => (a -> b) -> (a -> c) -> [a] -> [(b, NonEmpty c)]
|
||||
groupAllExtract f g = map (extract f g) . NE.groupAllWith f
|
||||
|
||||
unionOrdByNE :: (a -> a -> Ordering) -> NonEmpty a -> NonEmpty a -> NonEmpty a
|
||||
unionOrdByNE cmp (x :| xs) (y :| ys) =
|
||||
case cmp x y of
|
||||
LT -> x :| LO.unionBy cmp xs (y : ys)
|
||||
EQ -> x :| LO.unionBy cmp xs ys
|
||||
GT -> y :| LO.unionBy cmp (x : xs) ys
|
||||
|
||||
unionGroupsOrdWith
|
||||
:: (Ord c, Ord d)
|
||||
=> (a -> c)
|
||||
-> (b -> d)
|
||||
-> [(a, NonEmpty b)]
|
||||
-> [(a, NonEmpty b)]
|
||||
-> [(a, NonEmpty b)]
|
||||
unionGroupsOrdWith groupOrd itemOrd = go
|
||||
where
|
||||
go [] ys = ys
|
||||
go xs [] = xs
|
||||
go xs@((i, as) : zs) ys@((j, bs) : ws) =
|
||||
case (compare `on` groupOrd) i j of
|
||||
LT -> (i, as) : go zs ys
|
||||
EQ ->
|
||||
let cs = unionOrdByNE (compare `on` itemOrd) as bs
|
||||
in (i, cs) : go zs ws
|
||||
GT -> (j, bs) : go xs ws
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
module Vervis.API
|
||||
( createNoteC
|
||||
, offerTicketC
|
||||
, getFollowersCollection
|
||||
)
|
||||
where
|
||||
|
@ -41,6 +42,7 @@ import Data.Maybe
|
|||
import Data.Semigroup
|
||||
import Data.Text (Text)
|
||||
import Data.Text.Encoding
|
||||
import Data.Time.Calendar
|
||||
import Data.Time.Clock
|
||||
import Data.Time.Units
|
||||
import Data.Traversable
|
||||
|
@ -74,13 +76,15 @@ import Crypto.PublicVerifKey
|
|||
import Database.Persist.JSON
|
||||
import Network.FedURI
|
||||
import Network.HTTP.Digest
|
||||
import Web.ActivityPub hiding (Follow)
|
||||
import Web.ActivityPub hiding (Follow, Ticket)
|
||||
import Yesod.ActivityPub
|
||||
import Yesod.Auth.Unverified
|
||||
import Yesod.FedURI
|
||||
import Yesod.Hashids
|
||||
import Yesod.MonadSite
|
||||
|
||||
import qualified Web.ActivityPub as AP
|
||||
|
||||
import Control.Monad.Trans.Except.Local
|
||||
import Data.Aeson.Local
|
||||
import Data.Either.Local
|
||||
|
@ -97,13 +101,36 @@ import Vervis.API.Recipient
|
|||
import Vervis.Foundation
|
||||
import Vervis.Model
|
||||
import Vervis.Model.Ident
|
||||
import Vervis.Model.Ticket
|
||||
import Vervis.RemoteActorStore
|
||||
import Vervis.Settings
|
||||
|
||||
data Recip
|
||||
= RecipRA (Entity RemoteActor)
|
||||
| RecipURA (Entity UnfetchedRemoteActor)
|
||||
| RecipRC (Entity RemoteCollection)
|
||||
verifyIsLoggedInUser
|
||||
:: LocalURI
|
||||
-> Text
|
||||
-> ExceptT Text AppDB (PersonId, OutboxId, ShrIdent)
|
||||
verifyIsLoggedInUser lu t = do
|
||||
Entity pid p <- requireVerifiedAuth
|
||||
s <- lift $ getJust $ personIdent p
|
||||
route2local <- getEncodeRouteLocal
|
||||
let shr = sharerIdent s
|
||||
if route2local (SharerR shr) == lu
|
||||
then return (pid, personOutbox p, shr)
|
||||
else throwE t
|
||||
|
||||
verifyAuthor
|
||||
:: ShrIdent
|
||||
-> LocalURI
|
||||
-> Text
|
||||
-> ExceptT Text AppDB (PersonId, OutboxId)
|
||||
verifyAuthor shr lu t = ExceptT $ do
|
||||
Entity sid s <- getBy404 $ UniqueSharer shr
|
||||
Entity pid p <- getBy404 $ UniquePersonIdent sid
|
||||
encodeRouteLocal <- getEncodeRouteLocal
|
||||
return $
|
||||
if encodeRouteLocal (SharerR shr) == lu
|
||||
then Right (pid, personOutbox p)
|
||||
else Left t
|
||||
|
||||
parseComment :: LocalURI -> ExceptT Text Handler (ShrIdent, LocalMessageId)
|
||||
parseComment luParent = do
|
||||
|
@ -123,8 +150,7 @@ createNoteC host (Note mluNote luAttrib aud muParent muContext mpublished source
|
|||
verifyNothingE mluNote "Note specifies an id"
|
||||
verifyNothingE mpublished "Note specifies published"
|
||||
uContext <- fromMaybeE muContext "Note without context"
|
||||
recips <- nonEmptyE (concatRecipients aud) "Note without recipients"
|
||||
(mparent, localRecips, mticket, remoteRecips) <- parseRecipsContextParent recips uContext muParent
|
||||
(mparent, localRecips, mticket, remoteRecips) <- parseRecipsContextParent uContext muParent
|
||||
federation <- getsYesod $ appFederation . appSettings
|
||||
unless (federation || null remoteRecips) $
|
||||
throwE "Federation disabled, but remote recipients specified"
|
||||
|
@ -201,7 +227,7 @@ createNoteC host (Note mluNote luAttrib aud muParent muContext mpublished source
|
|||
moreRemotes <- deliverLocal pid obiid localRecips mcollections
|
||||
unless (federation || null moreRemotes) $
|
||||
throwE "Federation disabled but remote collection members found"
|
||||
remotesHttp <- lift $ deliverRemoteDB (furiHost uContext) obiid remoteRecips moreRemotes
|
||||
remotesHttp <- lift $ deliverRemoteDB' (furiHost uContext) obiid remoteRecips moreRemotes
|
||||
return (lmid, obiid, doc, remotesHttp)
|
||||
lift $ forkWorker "Outbox POST handler: async HTTP delivery" $ deliverRemoteHttp (furiHost uContext) obiid doc remotesHttp
|
||||
return lmid
|
||||
|
@ -213,29 +239,29 @@ createNoteC host (Note mluNote luAttrib aud muParent muContext mpublished source
|
|||
Just ne -> return ne
|
||||
|
||||
parseRecipsContextParent
|
||||
:: NonEmpty FedURI
|
||||
-> FedURI
|
||||
:: FedURI
|
||||
-> Maybe FedURI
|
||||
-> ExceptT Text Handler
|
||||
( Maybe (Either (ShrIdent, LocalMessageId) (Text, LocalURI))
|
||||
, [ShrIdent]
|
||||
, Maybe (ShrIdent, PrjIdent, Int)
|
||||
, [FedURI]
|
||||
, [(Text, NonEmpty LocalURI)]
|
||||
)
|
||||
parseRecipsContextParent recips uContext muParent = do
|
||||
(localsSet, remotes) <- parseRecipients recips
|
||||
parseRecipsContextParent uContext muParent = do
|
||||
(localsSet, remotes) <- do
|
||||
mrecips <- parseAudience aud
|
||||
fromMaybeE mrecips "Note without recipients"
|
||||
let (hContext, luContext) = f2l uContext
|
||||
parent <- parseParent uContext muParent
|
||||
local <- hostIsLocal hContext
|
||||
let remotes' = remotes L.\\ audienceNonActors aud
|
||||
if local
|
||||
then do
|
||||
ticket <- parseContextTicket luContext
|
||||
shrs <- verifyTicketRecipients ticket localsSet
|
||||
return (parent, shrs, Just ticket, remotes')
|
||||
return (parent, shrs, Just ticket, remotes)
|
||||
else do
|
||||
shrs <- verifyOnlySharers localsSet
|
||||
return (parent, shrs, Nothing, remotes')
|
||||
return (parent, shrs, Nothing, remotes)
|
||||
where
|
||||
parseParent :: FedURI -> Maybe FedURI -> ExceptT Text Handler (Maybe (Either (ShrIdent, LocalMessageId) (Text, LocalURI)))
|
||||
parseParent _ Nothing = return Nothing
|
||||
|
@ -287,19 +313,6 @@ createNoteC host (Note mluNote luAttrib aud muParent muContext mpublished source
|
|||
verifyOnlySharers :: LocalRecipientSet -> ExceptT Text Handler [ShrIdent]
|
||||
verifyOnlySharers lrs = catMaybes <$> traverse (atMostSharer "Note with remote context but local project-related recipients") lrs
|
||||
|
||||
verifyIsLoggedInUser
|
||||
:: LocalURI
|
||||
-> Text
|
||||
-> ExceptT Text AppDB (PersonId, OutboxId, ShrIdent)
|
||||
verifyIsLoggedInUser lu t = do
|
||||
Entity pid p <- requireVerifiedAuth
|
||||
s <- lift $ getJust $ personIdent p
|
||||
route2local <- getEncodeRouteLocal
|
||||
let shr = sharerIdent s
|
||||
if route2local (SharerR shr) == lu
|
||||
then return (pid, personOutbox p, shr)
|
||||
else throwE t
|
||||
|
||||
insertMessage
|
||||
:: LocalURI
|
||||
-> ShrIdent
|
||||
|
@ -389,45 +402,7 @@ createNoteC host (Note mluNote luAttrib aud muParent muContext mpublished source
|
|||
(jfsPids, jfsRemotes) <- getFollowers fsidJ
|
||||
return
|
||||
( L.delete pidAuthor $ union teamPids $ union tfsPids jfsPids
|
||||
-- TODO this is inefficient! The way this combines
|
||||
-- same-host sharer lists is:
|
||||
--
|
||||
-- (1) concatenate them
|
||||
-- (2) nubBy fst to remove duplicates
|
||||
--
|
||||
-- But we have knowledge that:
|
||||
--
|
||||
-- (1) in each of the 2 lists we're combining, each
|
||||
-- instance occurs only once
|
||||
-- (2) in each actor list, each actor occurs only
|
||||
-- once
|
||||
--
|
||||
-- So we can improve this code by:
|
||||
--
|
||||
-- (1) Not assume arbitrary number of consecutive
|
||||
-- repetition of the same instance, we may only
|
||||
-- have repetition if the same instance occurs
|
||||
-- in both lists
|
||||
-- (2) Don't <> the lists, instead apply unionBy or
|
||||
-- something better (unionBy assumes one list
|
||||
-- may have repetition, but removes repetition
|
||||
-- from the other; we know both lists have no
|
||||
-- repetition, can we use that to do this
|
||||
-- faster than unionBy?)
|
||||
--
|
||||
-- Also, if we ask the DB to sort by actor, then in
|
||||
-- the (2) point above, instead of unionBy we can use
|
||||
-- the knowledge the lists are sorted, and apply
|
||||
-- LO.unionBy instead. Or even better, because
|
||||
-- LO.unionBy doesn't assume no repetitions (possibly
|
||||
-- though it still does it the fastest way).
|
||||
--
|
||||
-- So, in mergeConcat, don't start with merging,
|
||||
-- because we lose the knowledge that each list's
|
||||
-- instances aren't repeated. Use a custom merge
|
||||
-- where we can unionBy or LO.unionBy whenever both
|
||||
-- lists have the same instance.
|
||||
, map (second $ NE.nubBy ((==) `on` fst4)) $ mergeConcat3 teamRemotes tfsRemotes jfsRemotes
|
||||
, teamRemotes `unionRemotes` tfsRemotes `unionRemotes` jfsRemotes
|
||||
)
|
||||
lift $ do
|
||||
for_ mticket $ \ (_, _, ibidProject, _) -> do
|
||||
|
@ -465,209 +440,182 @@ createNoteC host (Note mluNote luAttrib aud muParent muContext mpublished source
|
|||
Right _gid -> throwE "Local Note addresses a local group"
|
||||
-}
|
||||
|
||||
deliverRemoteDB
|
||||
:: Text
|
||||
-> OutboxItemId
|
||||
-> [FedURI]
|
||||
-> [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, LocalURI, Maybe UTCTime))]
|
||||
-> AppDB
|
||||
( [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, LocalURI, DeliveryId))]
|
||||
, [((InstanceId, Text), NonEmpty (UnfetchedRemoteActorId, LocalURI, UnlinkedDeliveryId))]
|
||||
, [((InstanceId, Text), NonEmpty (UnfetchedRemoteActorId, LocalURI, UnlinkedDeliveryId))]
|
||||
)
|
||||
deliverRemoteDB hContext obid recips known = do
|
||||
recips' <- for (groupByHost recips) $ \ (h, lus) -> do
|
||||
let lus' = NE.nub lus
|
||||
(iid, inew) <- idAndNew <$> insertBy' (Instance h)
|
||||
if inew
|
||||
then return ((iid, h), (Nothing, Nothing, Just lus'))
|
||||
else do
|
||||
es <- for lus' $ \ lu -> do
|
||||
ma <- runMaybeT
|
||||
$ RecipRA <$> MaybeT (getBy $ UniqueRemoteActor iid lu)
|
||||
<|> RecipURA <$> MaybeT (getBy $ UniqueUnfetchedRemoteActor iid lu)
|
||||
<|> RecipRC <$> MaybeT (getBy $ UniqueRemoteCollection iid lu)
|
||||
return $
|
||||
case ma of
|
||||
Nothing -> Just $ Left lu
|
||||
Just r ->
|
||||
case r of
|
||||
RecipRA (Entity raid ra) -> Just $ Right $ Left (raid, remoteActorIdent ra, remoteActorInbox ra, remoteActorErrorSince ra)
|
||||
RecipURA (Entity uraid ura) -> Just $ Right $ Right (uraid, unfetchedRemoteActorIdent ura, unfetchedRemoteActorSince ura)
|
||||
RecipRC _ -> Nothing
|
||||
let (unknown, newKnown) = partitionEithers $ catMaybes $ NE.toList es
|
||||
(fetched, unfetched) = partitionEithers newKnown
|
||||
return ((iid, h), (nonEmpty fetched, nonEmpty unfetched, nonEmpty unknown))
|
||||
let moreKnown = mapMaybe (\ (i, (f, _, _)) -> (i,) <$> f) recips'
|
||||
unfetched = mapMaybe (\ (i, (_, uf, _)) -> (i,) <$> uf) recips'
|
||||
stillUnknown = mapMaybe (\ (i, (_, _, uk)) -> (i,) <$> uk) recips'
|
||||
-- TODO see the earlier TODO about merge, it applies here too
|
||||
allFetched = map (second $ NE.nubBy ((==) `on` fst4)) $ mergeConcat known moreKnown
|
||||
fetchedDeliv <- for allFetched $ \ (i, rs) ->
|
||||
let fwd = snd i == hContext
|
||||
in (i,) <$> insertMany' (\ (raid, _, _, msince) -> Delivery raid obid fwd $ isNothing msince) rs
|
||||
unfetchedDeliv <- for unfetched $ \ (i, rs) ->
|
||||
let fwd = snd i == hContext
|
||||
in (i,) <$> insertMany' (\ (uraid, _, msince) -> UnlinkedDelivery uraid obid fwd $ isNothing msince) rs
|
||||
unknownDeliv <- for stillUnknown $ \ (i, lus) -> do
|
||||
-- TODO maybe for URA insertion we should do insertUnique?
|
||||
rs <- insertMany' (\ lu -> UnfetchedRemoteActor (fst i) lu Nothing) lus
|
||||
let fwd = snd i == hContext
|
||||
(i,) <$> insertMany' (\ (_, uraid) -> UnlinkedDelivery uraid obid fwd True) rs
|
||||
return
|
||||
( takeNoError4 fetchedDeliv
|
||||
, takeNoError3 unfetchedDeliv
|
||||
, map
|
||||
(second $ NE.map $ \ ((lu, ak), dlk) -> (ak, lu, dlk))
|
||||
unknownDeliv
|
||||
)
|
||||
offerTicketC
|
||||
:: ShrIdent
|
||||
-> TextHtml
|
||||
-> Audience
|
||||
-> Offer
|
||||
-> Handler (Either Text OutboxItemId)
|
||||
offerTicketC shrUser summary audience offer@(Offer ticket uTarget) = runExceptT $ do
|
||||
(hProject, shrProject, prjProject) <- parseTarget uTarget
|
||||
deps <- checkOffer hProject shrProject prjProject
|
||||
(localRecips, remoteRecips) <- do
|
||||
mrecips <- parseAudience audience
|
||||
fromMaybeE mrecips "Offer with no recipients"
|
||||
federation <- asksSite $ appFederation . appSettings
|
||||
unless (federation || null remoteRecips) $
|
||||
throwE "Federation disabled, but remote recipients specified"
|
||||
checkRecips hProject shrProject prjProject localRecips
|
||||
now <- liftIO getCurrentTime
|
||||
(obiid, doc, remotesHttp) <- runDBExcept $ do
|
||||
(pidAuthor, obidAuthor) <-
|
||||
verifyAuthor
|
||||
shrUser
|
||||
(AP.ticketAttributedTo ticket)
|
||||
"Ticket attributed to different actor"
|
||||
mprojAndDeps <- do
|
||||
targetIsLocal <- hostIsLocal hProject
|
||||
if targetIsLocal
|
||||
then Just <$> getProjectAndDeps shrProject prjProject deps
|
||||
else return Nothing
|
||||
(obiid, doc) <- lift $ insertToOutbox now obidAuthor
|
||||
moreRemotes <-
|
||||
lift $ deliverLocal shrProject prjProject now pidAuthor mprojAndDeps obiid localRecips
|
||||
unless (federation || null moreRemotes) $
|
||||
throwE "Federation disabled but remote collection members found"
|
||||
remotesHttp <- lift $ deliverRemoteDB' hProject obiid remoteRecips moreRemotes
|
||||
return (obiid, doc, remotesHttp)
|
||||
lift $ forkWorker "Outbox POST handler: async HTTP delivery" $ deliverRemoteHttp hProject obiid doc remotesHttp
|
||||
return obiid
|
||||
where
|
||||
checkOffer hProject shrProject prjProject = do
|
||||
verifyNothingE (AP.ticketLocal ticket) "Ticket with 'id'"
|
||||
verifyNothingE (AP.ticketPublished ticket) "Ticket with 'published'"
|
||||
verifyNothingE (AP.ticketUpdated ticket) "Ticket with 'updated'"
|
||||
verifyNothingE (AP.ticketName ticket) "Ticket with 'name'"
|
||||
verifyNothingE (AP.ticketAssignedTo ticket) "Ticket with 'assignedTo'"
|
||||
when (AP.ticketIsResolved ticket) $ throwE "Ticket resolved"
|
||||
unless (null $ AP.ticketDependedBy ticket) $ throwE "Ticket has rdeps"
|
||||
traverse checkDep' $ AP.ticketDependsOn ticket
|
||||
where
|
||||
groupByHost :: [FedURI] -> [(Text, NonEmpty LocalURI)]
|
||||
groupByHost = groupAllExtract furiHost (snd . f2l)
|
||||
|
||||
takeNoError noError = mapMaybe $ \ (i, rs) -> (i,) <$> nonEmpty (mapMaybe noError $ NE.toList rs)
|
||||
takeNoError3 = takeNoError noError
|
||||
where
|
||||
noError ((ak, lu, Nothing), dlk) = Just (ak, lu, dlk)
|
||||
noError ((_ , _ , Just _ ), _ ) = Nothing
|
||||
takeNoError4 = takeNoError noError
|
||||
where
|
||||
noError ((ak, luA, luI, Nothing), dlk) = Just (ak, luA, luI, dlk)
|
||||
noError ((_ , _ , _ , Just _ ), _ ) = Nothing
|
||||
|
||||
deliverRemoteHttp
|
||||
:: Text
|
||||
-> OutboxItemId
|
||||
-> Doc Activity
|
||||
-> ( [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, LocalURI, DeliveryId))]
|
||||
, [((InstanceId, Text), NonEmpty (UnfetchedRemoteActorId, LocalURI, UnlinkedDeliveryId))]
|
||||
, [((InstanceId, Text), NonEmpty (UnfetchedRemoteActorId, LocalURI, UnlinkedDeliveryId))]
|
||||
)
|
||||
-> Worker ()
|
||||
deliverRemoteHttp hContext obid doc (fetched, unfetched, unknown) = do
|
||||
logDebug' "Starting"
|
||||
let deliver fwd h inbox = do
|
||||
let fwd' = if h == hContext then Just fwd else Nothing
|
||||
(isJust fwd',) <$> deliverHttp doc fwd' h inbox
|
||||
now <- liftIO getCurrentTime
|
||||
logDebug' $
|
||||
"Launching fetched " <> T.pack (show $ map (snd . fst) fetched)
|
||||
traverse_ (fork . deliverFetched deliver now) fetched
|
||||
logDebug' $
|
||||
"Launching unfetched " <> T.pack (show $ map (snd . fst) unfetched)
|
||||
traverse_ (fork . deliverUnfetched deliver now) unfetched
|
||||
logDebug' $
|
||||
"Launching unknown " <> T.pack (show $ map (snd . fst) unknown)
|
||||
traverse_ (fork . deliverUnfetched deliver now) unknown
|
||||
logDebug' "Done (async delivery may still be running)"
|
||||
checkDep' = checkDep hProject shrProject prjProject
|
||||
checkRecips hProject shrProject prjProject localRecips = do
|
||||
local <- hostIsLocal hProject
|
||||
if local
|
||||
then traverse (verifyOfferRecips shrProject prjProject) localRecips
|
||||
else traverse (verifyOnlySharer . snd) localRecips
|
||||
where
|
||||
logDebug' t = logDebug $ prefix <> t
|
||||
verifyOfferRecips shr prj (shr', lsrSet) =
|
||||
if shr == shr'
|
||||
then unless (lsrSet == offerRecips prj) $
|
||||
throwE "Unexpected offer target recipient set"
|
||||
else verifyOnlySharer lsrSet
|
||||
where
|
||||
prefix =
|
||||
T.concat
|
||||
[ "Outbox POST handler: deliverRemoteHttp obid#"
|
||||
, T.pack $ show $ fromSqlKey obid
|
||||
, ": "
|
||||
offerRecips prj = LocalSharerRelatedSet
|
||||
{ localRecipSharerDirect = LocalSharerDirectSet False
|
||||
, localRecipProjectRelated =
|
||||
[ ( prj
|
||||
, LocalProjectRelatedSet
|
||||
{ localRecipProjectDirect =
|
||||
LocalProjectDirectSet True True True
|
||||
, localRecipTicketRelated = []
|
||||
}
|
||||
)
|
||||
]
|
||||
fork = forkWorker "Outbox POST handler: HTTP delivery"
|
||||
deliverFetched deliver now ((_, h), recips@(r :| rs)) = do
|
||||
logDebug'' "Starting"
|
||||
let (raid, luActor, luInbox, dlid) = r
|
||||
(_, e) <- deliver luActor h luInbox
|
||||
e' <- case e of
|
||||
Left err -> do
|
||||
logError $ T.concat
|
||||
[ "Outbox DL delivery #", T.pack $ show dlid
|
||||
, " error for <", renderFedURI $ l2f h luActor
|
||||
, ">: ", T.pack $ displayException err
|
||||
]
|
||||
return $
|
||||
if isInstanceErrorP err
|
||||
then Nothing
|
||||
else Just False
|
||||
Right _resp -> return $ Just True
|
||||
case e' of
|
||||
Nothing -> runSiteDB $ do
|
||||
let recips' = NE.toList recips
|
||||
updateWhere [RemoteActorId <-. map fst4 recips', RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
|
||||
updateWhere [DeliveryId <-. map fourth4 recips'] [DeliveryRunning =. False]
|
||||
Just success -> do
|
||||
runSiteDB $
|
||||
if success
|
||||
then delete dlid
|
||||
else do
|
||||
updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
|
||||
update dlid [DeliveryRunning =. False]
|
||||
for_ rs $ \ (raid, luActor, luInbox, dlid) ->
|
||||
fork $ do
|
||||
(_, e) <- deliver luActor h luInbox
|
||||
runSiteDB $
|
||||
case e of
|
||||
Left err -> do
|
||||
logError $ T.concat
|
||||
[ "Outbox DL delivery #", T.pack $ show dlid
|
||||
, " error for <", renderFedURI $ l2f h luActor
|
||||
, ">: ", T.pack $ displayException err
|
||||
]
|
||||
updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
|
||||
update dlid [DeliveryRunning =. False]
|
||||
Right _resp -> delete dlid
|
||||
}
|
||||
verifyOnlySharer lsrSet =
|
||||
unless (null $ localRecipProjectRelated lsrSet) $
|
||||
throwE "Unexpected recipients unrelated to offer target"
|
||||
insertToOutbox now obid = do
|
||||
hLocal <- asksSite siteInstanceHost
|
||||
let activity mluAct = Doc hLocal Activity
|
||||
{ activityId = mluAct
|
||||
, activityActor = AP.ticketAttributedTo ticket
|
||||
, activitySummary = Just summary
|
||||
, activityAudience = audience
|
||||
, activitySpecific = OfferActivity offer
|
||||
}
|
||||
obiid <- insert OutboxItem
|
||||
{ outboxItemOutbox = obid
|
||||
, outboxItemActivity = PersistJSON $ activity Nothing
|
||||
, outboxItemPublished = now
|
||||
}
|
||||
encodeRouteLocal <- getEncodeRouteLocal
|
||||
obikhid <- encodeKeyHashid obiid
|
||||
let luAct = encodeRouteLocal $ SharerOutboxItemR shrUser obikhid
|
||||
doc = activity $ Just luAct
|
||||
update obiid [OutboxItemActivity =. PersistJSON doc]
|
||||
return (obiid, doc)
|
||||
deliverLocal shrProject prjProject now pidAuthor mprojAndDeps obiid recips = do
|
||||
(pids, remotes) <- forCollect recips $ \ (shr, LocalSharerRelatedSet sharer projects) -> do
|
||||
(pids, remotes) <-
|
||||
traverseCollect (uncurry $ deliverLocalProject shr) projects
|
||||
pids' <- do
|
||||
mpid <-
|
||||
if localRecipSharer sharer
|
||||
then runMaybeT $ do
|
||||
sid <- MaybeT $ getKeyBy $ UniqueSharer shr
|
||||
MaybeT $ getKeyBy $ UniquePersonIdent sid
|
||||
else return Nothing
|
||||
return $
|
||||
case mpid of
|
||||
Nothing -> pids
|
||||
Just pid -> LO.insertSet pid pids
|
||||
return (pids', remotes)
|
||||
for_ (L.delete pidAuthor pids) $ \ pid -> do
|
||||
ibid <- personInbox <$> getJust pid
|
||||
ibiid <- insert $ InboxItem True
|
||||
insert_ $ InboxItemLocal ibid obiid ibiid
|
||||
return remotes
|
||||
where
|
||||
traverseCollect action values =
|
||||
bimap collectPids collectRemotes . unzip <$> traverse action values
|
||||
where
|
||||
logDebug'' t = logDebug' $ T.concat ["deliverFetched ", h, t]
|
||||
deliverUnfetched deliver now ((iid, h), recips@(r :| rs)) = do
|
||||
logDebug'' "Starting"
|
||||
let (uraid, luActor, udlid) = r
|
||||
e <- fetchRemoteActor iid h luActor
|
||||
let e' = case e of
|
||||
Left err -> Just Nothing
|
||||
Right (Left err) ->
|
||||
if isInstanceErrorG err
|
||||
then Nothing
|
||||
else Just Nothing
|
||||
Right (Right mera) -> Just $ Just mera
|
||||
case e' of
|
||||
Nothing -> runSiteDB $ do
|
||||
let recips' = NE.toList recips
|
||||
updateWhere [UnfetchedRemoteActorId <-. map fst3 recips', UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now]
|
||||
updateWhere [UnlinkedDeliveryId <-. map thd3 recips'] [UnlinkedDeliveryRunning =. False]
|
||||
Just mmera -> do
|
||||
for_ rs $ \ (uraid, luActor, udlid) ->
|
||||
fork $ do
|
||||
e <- fetchRemoteActor iid h luActor
|
||||
case e of
|
||||
Right (Right mera) ->
|
||||
case mera of
|
||||
Nothing -> runSiteDB $ delete udlid
|
||||
Just (Entity raid ra) -> do
|
||||
(fwd, e') <- deliver luActor h $ remoteActorInbox ra
|
||||
runSiteDB $
|
||||
case e' of
|
||||
Left _ -> do
|
||||
updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
|
||||
delete udlid
|
||||
insert_ $ Delivery raid obid fwd False
|
||||
Right _ -> delete udlid
|
||||
_ -> runSiteDB $ do
|
||||
updateWhere [UnfetchedRemoteActorId ==. uraid, UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now]
|
||||
update udlid [UnlinkedDeliveryRunning =. False]
|
||||
case mmera of
|
||||
Nothing -> runSiteDB $ do
|
||||
updateWhere [UnfetchedRemoteActorId ==. uraid, UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now]
|
||||
update udlid [UnlinkedDeliveryRunning =. False]
|
||||
Just mera ->
|
||||
case mera of
|
||||
Nothing -> runSiteDB $ delete udlid
|
||||
Just (Entity raid ra) -> do
|
||||
(fwd, e'') <- deliver luActor h $ remoteActorInbox ra
|
||||
runSiteDB $
|
||||
case e'' of
|
||||
Left _ -> do
|
||||
updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
|
||||
delete udlid
|
||||
insert_ $ Delivery raid obid fwd False
|
||||
Right _ -> delete udlid
|
||||
collectPids = foldl' LO.union []
|
||||
collectRemotes = foldl' unionRemotes []
|
||||
forCollect = flip traverseCollect
|
||||
deliverLocalProject shr prj (LocalProjectRelatedSet project _) =
|
||||
case mprojAndDeps of
|
||||
Just (sid, jid, ibid, fsid, tids)
|
||||
| shr == shrProject &&
|
||||
prj == prjProject &&
|
||||
localRecipProject project -> do
|
||||
insertToInbox ibid
|
||||
insertTicket jid tids
|
||||
(pidsTeam, remotesTeam) <-
|
||||
if localRecipProjectTeam project
|
||||
then getProjectTeam sid
|
||||
else return ([], [])
|
||||
(pidsFollowers, remotesFollowers) <-
|
||||
if localRecipProjectFollowers project
|
||||
then getFollowers fsid
|
||||
else return ([], [])
|
||||
return
|
||||
( LO.union pidsTeam pidsFollowers
|
||||
, unionRemotes remotesTeam remotesFollowers
|
||||
)
|
||||
_ -> return ([], [])
|
||||
where
|
||||
logDebug'' t = logDebug' $ T.concat ["deliverUnfetched ", h, t]
|
||||
insertToInbox ibid = do
|
||||
ibiid <- insert $ InboxItem False
|
||||
insert_ $ InboxItemLocal ibid obiid ibiid
|
||||
insertTicket jid tidsDeps = do
|
||||
next <-
|
||||
((subtract 1) . projectNextTicket) <$>
|
||||
updateGet jid [ProjectNextTicket +=. 1]
|
||||
did <- insert Discussion
|
||||
fsid <- insert FollowerSet
|
||||
tid <- insert Ticket
|
||||
{ ticketProject = jid
|
||||
, ticketNumber = next
|
||||
, ticketCreated = now
|
||||
, ticketTitle = unTextHtml $ AP.ticketSummary ticket
|
||||
, ticketSource =
|
||||
unTextPandocMarkdown $ AP.ticketSource ticket
|
||||
, ticketDescription = unTextHtml $ AP.ticketContent ticket
|
||||
, ticketAssignee = Nothing
|
||||
, ticketStatus = TSNew
|
||||
, ticketClosed = UTCTime (ModifiedJulianDay 0) 0
|
||||
, ticketCloser = Nothing
|
||||
, ticketDiscuss = did
|
||||
, ticketFollowers = fsid
|
||||
}
|
||||
insert TicketAuthorLocal
|
||||
{ ticketAuthorLocalTicket = tid
|
||||
, ticketAuthorLocalAuthor = pidAuthor
|
||||
, ticketAuthorLocalOffer = obiid
|
||||
}
|
||||
insertMany_ $ map (TicketDependency tid) tidsDeps
|
||||
|
||||
getFollowersCollection
|
||||
:: Route App -> AppDB FollowerSetId -> Handler TypedContent
|
||||
|
|
|
@ -20,7 +20,7 @@ module Vervis.API.Recipient
|
|||
, LocalSharerDirectSet (..)
|
||||
, LocalSharerRelatedSet (..)
|
||||
, LocalRecipientSet
|
||||
, parseRecipients
|
||||
, parseAudience
|
||||
)
|
||||
where
|
||||
|
||||
|
@ -30,19 +30,23 @@ import Control.Monad.Trans.Except
|
|||
import Data.Bifunctor
|
||||
import Data.Either
|
||||
import Data.Foldable
|
||||
import Data.List.NonEmpty (NonEmpty)
|
||||
import Data.List ((\\))
|
||||
import Data.List.NonEmpty (NonEmpty, nonEmpty)
|
||||
import Data.Text (Text)
|
||||
import Data.Traversable
|
||||
|
||||
import qualified Data.List.NonEmpty as NE
|
||||
import qualified Data.Text as T
|
||||
|
||||
import Network.FedURI
|
||||
import Web.ActivityPub
|
||||
import Yesod.ActivityPub
|
||||
import Yesod.FedURI
|
||||
import Yesod.MonadSite
|
||||
|
||||
import Data.List.NonEmpty.Local
|
||||
|
||||
import Vervis.ActivityPub
|
||||
import Vervis.Foundation
|
||||
import Vervis.Model.Ident
|
||||
|
||||
|
@ -159,26 +163,31 @@ data LocalTicketDirectSet = LocalTicketDirectSet
|
|||
{ localRecipTicketTeam :: Bool
|
||||
, localRecipTicketFollowers :: Bool
|
||||
}
|
||||
deriving Eq
|
||||
|
||||
data LocalProjectDirectSet = LocalProjectDirectSet
|
||||
{ localRecipProject :: Bool
|
||||
, localRecipProjectTeam :: Bool
|
||||
, localRecipProjectFollowers :: Bool
|
||||
}
|
||||
deriving Eq
|
||||
|
||||
data LocalProjectRelatedSet = LocalProjectRelatedSet
|
||||
{ localRecipProjectDirect :: LocalProjectDirectSet
|
||||
, localRecipTicketRelated :: [(Int, LocalTicketDirectSet)]
|
||||
}
|
||||
deriving Eq
|
||||
|
||||
data LocalSharerDirectSet = LocalSharerDirectSet
|
||||
{ localRecipSharer :: Bool
|
||||
}
|
||||
deriving Eq
|
||||
|
||||
data LocalSharerRelatedSet = LocalSharerRelatedSet
|
||||
{ localRecipSharerDirect :: LocalSharerDirectSet
|
||||
, localRecipProjectRelated :: [(PrjIdent, LocalProjectRelatedSet)]
|
||||
}
|
||||
deriving Eq
|
||||
|
||||
type LocalRecipientSet = [(ShrIdent, LocalSharerRelatedSet)]
|
||||
|
||||
|
@ -275,3 +284,17 @@ parseRecipients recips = do
|
|||
case parseLocalRecipient route of
|
||||
Nothing -> Left route
|
||||
Just recip -> Right recip
|
||||
|
||||
parseAudience
|
||||
:: (MonadSite m, SiteEnv m ~ App)
|
||||
=> Audience
|
||||
-> ExceptT Text m (Maybe (LocalRecipientSet, [(Text, NonEmpty LocalURI)]))
|
||||
parseAudience audience = do
|
||||
let recips = concatRecipients audience
|
||||
for (nonEmpty recips) $ \ recipsNE -> do
|
||||
(localsSet, remotes) <- parseRecipients recipsNE
|
||||
return
|
||||
(localsSet, groupByHost $ remotes \\ audienceNonActors audience)
|
||||
where
|
||||
groupByHost :: [FedURI] -> [(Text, NonEmpty LocalURI)]
|
||||
groupByHost = groupAllExtract furiHost (snd . f2l)
|
||||
|
|
|
@ -23,9 +23,9 @@ module Vervis.ActivityPub
|
|||
, concatRecipients
|
||||
, getPersonOrGroupId
|
||||
, getTicketTeam
|
||||
, getProjectTeam
|
||||
, getFollowers
|
||||
, mergeConcat
|
||||
, mergeConcat3
|
||||
, unionRemotes
|
||||
, insertMany'
|
||||
, isInstanceErrorP
|
||||
, isInstanceErrorG
|
||||
|
@ -33,9 +33,15 @@ module Vervis.ActivityPub
|
|||
, deliverRemoteDB
|
||||
, deliverRemoteHTTP
|
||||
, checkForward
|
||||
, parseTarget
|
||||
, checkDep
|
||||
, getProjectAndDeps
|
||||
, deliverRemoteDB'
|
||||
, deliverRemoteHttp
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Applicative
|
||||
import Control.Exception hiding (Handler, try)
|
||||
import Control.Monad
|
||||
import Control.Monad.IO.Class
|
||||
|
@ -43,9 +49,11 @@ import Control.Monad.IO.Unlift
|
|||
import Control.Monad.Logger.CallStack
|
||||
import Control.Monad.Trans.Class
|
||||
import Control.Monad.Trans.Except
|
||||
import Control.Monad.Trans.Maybe
|
||||
import Control.Monad.Trans.Reader
|
||||
import Data.Bifunctor
|
||||
import Data.ByteString (ByteString)
|
||||
import Data.Either
|
||||
import Data.Foldable
|
||||
import Data.Function
|
||||
import Data.List.NonEmpty (NonEmpty (..), nonEmpty)
|
||||
|
@ -89,6 +97,7 @@ import Database.Persist.Local
|
|||
import Vervis.Foundation
|
||||
import Vervis.Model
|
||||
import Vervis.Model.Ident
|
||||
import Vervis.RemoteActorStore
|
||||
import Vervis.Settings
|
||||
|
||||
hostIsLocal :: (MonadSite m, SiteEnv m ~ App) => Text -> m Bool
|
||||
|
@ -184,16 +193,18 @@ getTicketTeam sid = do
|
|||
Left pid -> return [pid]
|
||||
Right gid ->
|
||||
map (groupMemberPerson . entityVal) <$>
|
||||
selectList [GroupMemberGroup ==. gid] []
|
||||
selectList [GroupMemberGroup ==. gid] [Asc GroupMemberPerson]
|
||||
|
||||
getProjectTeam = getTicketTeam
|
||||
|
||||
getFollowers :: FollowerSetId -> AppDB ([PersonId], [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, LocalURI, Maybe UTCTime))])
|
||||
getFollowers fsid = do
|
||||
local <- selectList [FollowTarget ==. fsid] []
|
||||
local <- selectList [FollowTarget ==. fsid] [Asc FollowPerson]
|
||||
remote <- E.select $ E.from $ \ (rf `E.InnerJoin` rs `E.InnerJoin` i) -> do
|
||||
E.on $ rs E.^. RemoteActorInstance E.==. i E.^. InstanceId
|
||||
E.on $ rf E.^. RemoteFollowActor E.==. rs E.^. RemoteActorId
|
||||
E.where_ $ rf E.^. RemoteFollowTarget E.==. E.val fsid
|
||||
E.orderBy [E.asc $ i E.^. InstanceId]
|
||||
E.orderBy [E.asc $ i E.^. InstanceId, E.asc $ rs E.^. RemoteActorId]
|
||||
return
|
||||
( i E.^. InstanceId
|
||||
, i E.^. InstanceHost
|
||||
|
@ -216,17 +227,11 @@ getFollowers fsid = do
|
|||
where
|
||||
toTuples (iid, h, rsid, luA, luI, ms) = ((iid, h), (rsid, luA, luI, ms))
|
||||
|
||||
-- | Merge 2 lists ordered on fst, concatenating snd values when
|
||||
-- multiple identical fsts occur. The resulting list is ordered on fst,
|
||||
-- and each fst value appears only once.
|
||||
--
|
||||
-- >>> mergeWith (+) [('a',3), ('a',1), ('b',5)] [('a',2), ('c',4)]
|
||||
-- [('a',6), ('b',5), ('c',4)]
|
||||
mergeConcat :: (Ord a, Semigroup b) => [(a, b)] -> [(a, b)] -> [(a, b)]
|
||||
mergeConcat xs ys = map (second sconcat) $ groupWithExtract fst snd $ LO.mergeBy (compare `on` fst) xs ys
|
||||
|
||||
mergeConcat3 :: (Ord a, Semigroup b) => [(a, b)] -> [(a, b)] -> [(a, b)] -> [(a, b)]
|
||||
mergeConcat3 xs ys zs = mergeConcat xs $ LO.mergeBy (compare `on` fst) ys zs
|
||||
unionRemotes
|
||||
:: [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, LocalURI, Maybe UTCTime))]
|
||||
-> [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, LocalURI, Maybe UTCTime))]
|
||||
-> [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, LocalURI, Maybe UTCTime))]
|
||||
unionRemotes = unionGroupsOrdWith fst fst4
|
||||
|
||||
insertMany' mk xs = zip' xs <$> insertMany (NE.toList $ mk <$> xs)
|
||||
where
|
||||
|
@ -361,3 +366,250 @@ checkForward shrRecip prjRecip = join <$> do
|
|||
case mh of
|
||||
Nothing -> throwE $ n' <> " header not found"
|
||||
Just h -> return h
|
||||
|
||||
parseTarget u = do
|
||||
let (h, lu) = f2l u
|
||||
(shr, prj) <- parseProject lu
|
||||
return (h, shr, prj)
|
||||
where
|
||||
parseProject lu = do
|
||||
route <- case decodeRouteLocal lu of
|
||||
Nothing -> throwE "Expected project route, got invalid route"
|
||||
Just r -> return r
|
||||
case route of
|
||||
ProjectR shr prj -> return (shr, prj)
|
||||
_ -> throwE "Expected project route, got non-project route"
|
||||
|
||||
checkDep hProject shrProject prjProject u = do
|
||||
let (h, lu) = f2l u
|
||||
unless (h == hProject) $
|
||||
throwE "Dep belongs to different host"
|
||||
(shrTicket, prjTicket, num) <- parseTicket lu
|
||||
unless (shrTicket == shrProject) $
|
||||
throwE "Dep belongs to different sharer under same host"
|
||||
unless (prjTicket == prjProject) $
|
||||
throwE "Dep belongs to different project under same sharer"
|
||||
return num
|
||||
where
|
||||
parseTicket lu = do
|
||||
route <- case decodeRouteLocal lu of
|
||||
Nothing -> throwE "Expected ticket route, got invalid route"
|
||||
Just r -> return r
|
||||
case route of
|
||||
TicketR shr prj num -> return (shr, prj, num)
|
||||
_ -> throwE "Expected ticket route, got non-ticket route"
|
||||
|
||||
getProjectAndDeps shr prj deps = do
|
||||
msid <- lift $ getKeyBy $ UniqueSharer shr
|
||||
sid <- fromMaybeE msid "Offer target: no such local sharer"
|
||||
mej <- lift $ getBy $ UniqueProject prj sid
|
||||
Entity jid j <- fromMaybeE mej "Offer target: no such local project"
|
||||
tids <- for deps $ \ dep -> do
|
||||
mtid <- lift $ getKeyBy $ UniqueTicket jid dep
|
||||
fromMaybeE mtid "Local dep: No such ticket number in DB"
|
||||
return (sid, jid, projectInbox j, projectFollowers j, tids)
|
||||
|
||||
data Recip
|
||||
= RecipRA (Entity RemoteActor)
|
||||
| RecipURA (Entity UnfetchedRemoteActor)
|
||||
| RecipRC (Entity RemoteCollection)
|
||||
|
||||
deliverRemoteDB'
|
||||
:: Text
|
||||
-> OutboxItemId
|
||||
-> [(Text, NonEmpty LocalURI)]
|
||||
-> [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, LocalURI, Maybe UTCTime))]
|
||||
-> AppDB
|
||||
( [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, LocalURI, DeliveryId))]
|
||||
, [((InstanceId, Text), NonEmpty (UnfetchedRemoteActorId, LocalURI, UnlinkedDeliveryId))]
|
||||
, [((InstanceId, Text), NonEmpty (UnfetchedRemoteActorId, LocalURI, UnlinkedDeliveryId))]
|
||||
)
|
||||
deliverRemoteDB' hContext obid recips known = do
|
||||
recips' <- for recips $ \ (h, lus) -> do
|
||||
let lus' = NE.nub lus
|
||||
(iid, inew) <- idAndNew <$> insertBy' (Instance h)
|
||||
if inew
|
||||
then return ((iid, h), (Nothing, Nothing, Just lus'))
|
||||
else do
|
||||
es <- for lus' $ \ lu -> do
|
||||
ma <- runMaybeT
|
||||
$ RecipRA <$> MaybeT (getBy $ UniqueRemoteActor iid lu)
|
||||
<|> RecipURA <$> MaybeT (getBy $ UniqueUnfetchedRemoteActor iid lu)
|
||||
<|> RecipRC <$> MaybeT (getBy $ UniqueRemoteCollection iid lu)
|
||||
return $
|
||||
case ma of
|
||||
Nothing -> Just $ Left lu
|
||||
Just r ->
|
||||
case r of
|
||||
RecipRA (Entity raid ra) -> Just $ Right $ Left (raid, remoteActorIdent ra, remoteActorInbox ra, remoteActorErrorSince ra)
|
||||
RecipURA (Entity uraid ura) -> Just $ Right $ Right (uraid, unfetchedRemoteActorIdent ura, unfetchedRemoteActorSince ura)
|
||||
RecipRC _ -> Nothing
|
||||
let (unknown, newKnown) = partitionEithers $ catMaybes $ NE.toList es
|
||||
(fetched, unfetched) = partitionEithers newKnown
|
||||
return ((iid, h), (nonEmpty fetched, nonEmpty unfetched, nonEmpty unknown))
|
||||
let moreKnown = mapMaybe (\ (i, (f, _, _)) -> (i,) <$> f) recips'
|
||||
unfetched = mapMaybe (\ (i, (_, uf, _)) -> (i,) <$> uf) recips'
|
||||
stillUnknown = mapMaybe (\ (i, (_, _, uk)) -> (i,) <$> uk) recips'
|
||||
allFetched = unionRemotes known moreKnown
|
||||
fetchedDeliv <- for allFetched $ \ (i, rs) ->
|
||||
let fwd = snd i == hContext
|
||||
in (i,) <$> insertMany' (\ (raid, _, _, msince) -> Delivery raid obid fwd $ isNothing msince) rs
|
||||
unfetchedDeliv <- for unfetched $ \ (i, rs) ->
|
||||
let fwd = snd i == hContext
|
||||
in (i,) <$> insertMany' (\ (uraid, _, msince) -> UnlinkedDelivery uraid obid fwd $ isNothing msince) rs
|
||||
unknownDeliv <- for stillUnknown $ \ (i, lus) -> do
|
||||
-- TODO maybe for URA insertion we should do insertUnique?
|
||||
rs <- insertMany' (\ lu -> UnfetchedRemoteActor (fst i) lu Nothing) lus
|
||||
let fwd = snd i == hContext
|
||||
(i,) <$> insertMany' (\ (_, uraid) -> UnlinkedDelivery uraid obid fwd True) rs
|
||||
return
|
||||
( takeNoError4 fetchedDeliv
|
||||
, takeNoError3 unfetchedDeliv
|
||||
, map
|
||||
(second $ NE.map $ \ ((lu, ak), dlk) -> (ak, lu, dlk))
|
||||
unknownDeliv
|
||||
)
|
||||
where
|
||||
takeNoError noError = mapMaybe $ \ (i, rs) -> (i,) <$> nonEmpty (mapMaybe noError $ NE.toList rs)
|
||||
takeNoError3 = takeNoError noError
|
||||
where
|
||||
noError ((ak, lu, Nothing), dlk) = Just (ak, lu, dlk)
|
||||
noError ((_ , _ , Just _ ), _ ) = Nothing
|
||||
takeNoError4 = takeNoError noError
|
||||
where
|
||||
noError ((ak, luA, luI, Nothing), dlk) = Just (ak, luA, luI, dlk)
|
||||
noError ((_ , _ , _ , Just _ ), _ ) = Nothing
|
||||
|
||||
deliverRemoteHttp
|
||||
:: Text
|
||||
-> OutboxItemId
|
||||
-> Doc Activity
|
||||
-> ( [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, LocalURI, DeliveryId))]
|
||||
, [((InstanceId, Text), NonEmpty (UnfetchedRemoteActorId, LocalURI, UnlinkedDeliveryId))]
|
||||
, [((InstanceId, Text), NonEmpty (UnfetchedRemoteActorId, LocalURI, UnlinkedDeliveryId))]
|
||||
)
|
||||
-> Worker ()
|
||||
deliverRemoteHttp hContext obid doc (fetched, unfetched, unknown) = do
|
||||
logDebug' "Starting"
|
||||
let deliver fwd h inbox = do
|
||||
let fwd' = if h == hContext then Just fwd else Nothing
|
||||
(isJust fwd',) <$> deliverHttp doc fwd' h inbox
|
||||
now <- liftIO getCurrentTime
|
||||
logDebug' $
|
||||
"Launching fetched " <> T.pack (show $ map (snd . fst) fetched)
|
||||
traverse_ (fork . deliverFetched deliver now) fetched
|
||||
logDebug' $
|
||||
"Launching unfetched " <> T.pack (show $ map (snd . fst) unfetched)
|
||||
traverse_ (fork . deliverUnfetched deliver now) unfetched
|
||||
logDebug' $
|
||||
"Launching unknown " <> T.pack (show $ map (snd . fst) unknown)
|
||||
traverse_ (fork . deliverUnfetched deliver now) unknown
|
||||
logDebug' "Done (async delivery may still be running)"
|
||||
where
|
||||
logDebug' t = logDebug $ prefix <> t
|
||||
where
|
||||
prefix =
|
||||
T.concat
|
||||
[ "Outbox POST handler: deliverRemoteHttp obid#"
|
||||
, T.pack $ show $ fromSqlKey obid
|
||||
, ": "
|
||||
]
|
||||
fork = forkWorker "Outbox POST handler: HTTP delivery"
|
||||
deliverFetched deliver now ((_, h), recips@(r :| rs)) = do
|
||||
logDebug'' "Starting"
|
||||
let (raid, luActor, luInbox, dlid) = r
|
||||
(_, e) <- deliver luActor h luInbox
|
||||
e' <- case e of
|
||||
Left err -> do
|
||||
logError $ T.concat
|
||||
[ "Outbox DL delivery #", T.pack $ show dlid
|
||||
, " error for <", renderFedURI $ l2f h luActor
|
||||
, ">: ", T.pack $ displayException err
|
||||
]
|
||||
return $
|
||||
if isInstanceErrorP err
|
||||
then Nothing
|
||||
else Just False
|
||||
Right _resp -> return $ Just True
|
||||
case e' of
|
||||
Nothing -> runSiteDB $ do
|
||||
let recips' = NE.toList recips
|
||||
updateWhere [RemoteActorId <-. map fst4 recips', RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
|
||||
updateWhere [DeliveryId <-. map fourth4 recips'] [DeliveryRunning =. False]
|
||||
Just success -> do
|
||||
runSiteDB $
|
||||
if success
|
||||
then delete dlid
|
||||
else do
|
||||
updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
|
||||
update dlid [DeliveryRunning =. False]
|
||||
for_ rs $ \ (raid, luActor, luInbox, dlid) ->
|
||||
fork $ do
|
||||
(_, e) <- deliver luActor h luInbox
|
||||
runSiteDB $
|
||||
case e of
|
||||
Left err -> do
|
||||
logError $ T.concat
|
||||
[ "Outbox DL delivery #", T.pack $ show dlid
|
||||
, " error for <", renderFedURI $ l2f h luActor
|
||||
, ">: ", T.pack $ displayException err
|
||||
]
|
||||
updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
|
||||
update dlid [DeliveryRunning =. False]
|
||||
Right _resp -> delete dlid
|
||||
where
|
||||
logDebug'' t = logDebug' $ T.concat ["deliverFetched ", h, t]
|
||||
deliverUnfetched deliver now ((iid, h), recips@(r :| rs)) = do
|
||||
logDebug'' "Starting"
|
||||
let (uraid, luActor, udlid) = r
|
||||
e <- fetchRemoteActor iid h luActor
|
||||
let e' = case e of
|
||||
Left err -> Just Nothing
|
||||
Right (Left err) ->
|
||||
if isInstanceErrorG err
|
||||
then Nothing
|
||||
else Just Nothing
|
||||
Right (Right mera) -> Just $ Just mera
|
||||
case e' of
|
||||
Nothing -> runSiteDB $ do
|
||||
let recips' = NE.toList recips
|
||||
updateWhere [UnfetchedRemoteActorId <-. map fst3 recips', UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now]
|
||||
updateWhere [UnlinkedDeliveryId <-. map thd3 recips'] [UnlinkedDeliveryRunning =. False]
|
||||
Just mmera -> do
|
||||
for_ rs $ \ (uraid, luActor, udlid) ->
|
||||
fork $ do
|
||||
e <- fetchRemoteActor iid h luActor
|
||||
case e of
|
||||
Right (Right mera) ->
|
||||
case mera of
|
||||
Nothing -> runSiteDB $ delete udlid
|
||||
Just (Entity raid ra) -> do
|
||||
(fwd, e') <- deliver luActor h $ remoteActorInbox ra
|
||||
runSiteDB $
|
||||
case e' of
|
||||
Left _ -> do
|
||||
updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
|
||||
delete udlid
|
||||
insert_ $ Delivery raid obid fwd False
|
||||
Right _ -> delete udlid
|
||||
_ -> runSiteDB $ do
|
||||
updateWhere [UnfetchedRemoteActorId ==. uraid, UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now]
|
||||
update udlid [UnlinkedDeliveryRunning =. False]
|
||||
case mmera of
|
||||
Nothing -> runSiteDB $ do
|
||||
updateWhere [UnfetchedRemoteActorId ==. uraid, UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now]
|
||||
update udlid [UnlinkedDeliveryRunning =. False]
|
||||
Just mera ->
|
||||
case mera of
|
||||
Nothing -> runSiteDB $ delete udlid
|
||||
Just (Entity raid ra) -> do
|
||||
(fwd, e'') <- deliver luActor h $ remoteActorInbox ra
|
||||
runSiteDB $
|
||||
case e'' of
|
||||
Left _ -> do
|
||||
updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
|
||||
delete udlid
|
||||
insert_ $ Delivery raid obid fwd False
|
||||
Right _ -> delete udlid
|
||||
where
|
||||
logDebug'' t = logDebug' $ T.concat ["deliverUnfetched ", h, t]
|
||||
|
|
|
@ -372,8 +372,7 @@ projectCreateNoteF now shrRecip prjRecip author body (Note mluNote _ _ muParent
|
|||
then getFollowers fsidProject
|
||||
else return ([], [])
|
||||
let pids = union teamPids tfsPids `union` jfsPids
|
||||
-- TODO inefficient, see the other TODOs about mergeConcat
|
||||
remotes = map (second $ NE.nubBy ((==) `on` fst4)) $ mergeConcat3 teamRemotes tfsRemotes jfsRemotes
|
||||
remotes = teamRemotes `unionRemotes` tfsRemotes `unionRemotes` jfsRemotes
|
||||
for_ pids $ \ pid -> do
|
||||
ibid <- personInbox <$> getJust pid
|
||||
ibiid <- insert $ InboxItem True
|
||||
|
|
|
@ -72,26 +72,9 @@ checkOffer ticket hProject shrProject prjProject = do
|
|||
verifyNothingE (AP.ticketAssignedTo ticket) "Ticket with 'assignedTo'"
|
||||
when (AP.ticketIsResolved ticket) $ throwE "Ticket resolved"
|
||||
unless (null $ AP.ticketDependedBy ticket) $ throwE "Ticket has rdeps"
|
||||
traverse checkDep $ AP.ticketDependsOn ticket
|
||||
traverse checkDep' $ AP.ticketDependsOn ticket
|
||||
where
|
||||
checkDep u = do
|
||||
let (h, lu) = f2l u
|
||||
unless (h == hProject) $
|
||||
throwE "Dep belongs to different host"
|
||||
(shrTicket, prjTicket, num) <- parseTicket lu
|
||||
unless (shrTicket == shrProject) $
|
||||
throwE "Dep belongs to different sharer under same host"
|
||||
unless (prjTicket == prjProject) $
|
||||
throwE "Dep belongs to different project under same sharer"
|
||||
return num
|
||||
where
|
||||
parseTicket lu = do
|
||||
route <- case decodeRouteLocal lu of
|
||||
Nothing -> throwE "Expected ticket route, got invalid route"
|
||||
Just r -> return r
|
||||
case route of
|
||||
TicketR shr prj num -> return (shr, prj, num)
|
||||
_ -> throwE "Expected ticket route, got non-ticket route"
|
||||
checkDep' = checkDep hProject shrProject prjProject
|
||||
|
||||
sharerOfferTicketF
|
||||
:: UTCTime
|
||||
|
@ -113,18 +96,6 @@ sharerOfferTicketF now shrRecip author body (Offer ticket uTarget) = do
|
|||
when local $ checkTargetAndDeps shrProject prjProject deps
|
||||
lift $ insertToInbox luOffer ibidRecip
|
||||
where
|
||||
parseTarget u = do
|
||||
let (h, lu) = f2l u
|
||||
(shr, prj) <- parseProject lu
|
||||
return (h, shr, prj)
|
||||
where
|
||||
parseProject lu = do
|
||||
route <- case decodeRouteLocal lu of
|
||||
Nothing -> throwE "Expected project route, got invalid route"
|
||||
Just r -> return r
|
||||
case route of
|
||||
ProjectR shr prj -> return (shr, prj)
|
||||
_ -> throwE "Expected project route, got non-project route"
|
||||
checkTargetAndDeps shrProject prjProject deps = do
|
||||
msid <- lift $ getKeyBy $ UniqueSharer shrProject
|
||||
sid <- fromMaybeE msid "Offer target: no such local sharer"
|
||||
|
@ -183,7 +154,8 @@ projectOfferTicketF
|
|||
findRelevantCollections hLocal $
|
||||
activityAudience $ actbActivity body
|
||||
mremotesHttp <- runDBExcept $ do
|
||||
(sid, jid, ibid, fsid, tids) <- getProjectAndDeps deps
|
||||
(sid, jid, ibid, fsid, tids) <-
|
||||
getProjectAndDeps shrRecip prjRecip deps
|
||||
lift $ join <$> do
|
||||
mractid <- insertTicket luOffer jid ibid tids
|
||||
for mractid $ \ ractid -> for msig $ \ sig -> do
|
||||
|
@ -229,15 +201,6 @@ projectOfferTicketF
|
|||
| shr == shrRecip && prj == prjRecip
|
||||
-> Just OfferTicketRecipProjectFollowers
|
||||
_ -> Nothing
|
||||
getProjectAndDeps deps = do
|
||||
msid <- lift $ getKeyBy $ UniqueSharer shrRecip
|
||||
sid <- fromMaybeE msid "Offer target: no such local sharer"
|
||||
mej <- lift $ getBy $ UniqueProject prjRecip sid
|
||||
Entity jid j <- fromMaybeE mej "Offer target: no such local project"
|
||||
tids <- for deps $ \ dep -> do
|
||||
mtid <- lift $ getKeyBy $ UniqueTicket jid dep
|
||||
fromMaybeE mtid "Local dep: No such ticket number in DB"
|
||||
return (sid, jid, projectInbox j, projectFollowers j, tids)
|
||||
insertTicket luOffer jid ibid deps = do
|
||||
let iidAuthor = remoteAuthorInstance author
|
||||
raidAuthor = remoteAuthorId author
|
||||
|
@ -298,8 +261,7 @@ projectOfferTicketF
|
|||
then getFollowers fsid
|
||||
else return ([], [])
|
||||
let pids = union teamPids fsPids
|
||||
-- TODO inefficient, see the other TODOs about mergeConcat
|
||||
remotes = map (second $ NE.nubBy ((==) `on` fst4)) $ mergeConcat teamRemotes fsRemotes
|
||||
remotes = unionRemotes teamRemotes fsRemotes
|
||||
for_ pids $ \ pid -> do
|
||||
ibid <- personInbox <$> getJust pid
|
||||
ibiid <- insert $ InboxItem True
|
||||
|
|
Loading…
Reference in a new issue