Vervis.ActivityPub: Implement general-purpose full local delivery
Delivery of an activity into local inboxes is being done using custom local functions. Each C2S or S2S handler has its own specific variant for this. As part of the ongoing refactoring and evolution of the federation code, I implemented a general-purpose local delivery function: It takes a LocalRecipientSet and simply delivers to everyone, no handler-specific assumptions or limitations. To limit the recipient set according to handler specific rules, just filter/adapt/edit it before passing to the delivery function. The function isn't exported yet, but the existing 'deliverLocal' that delivers only to actors and to author's followers is now implemented via the new general-purpose function. I hope that's a step towards doing all the local delivery using this one function, simplifying the complicated federation code.
This commit is contained in:
parent
adc107bb4c
commit
a53fbcf1c0
2 changed files with 193 additions and 19 deletions
|
@ -708,37 +708,81 @@ deliverLocal
|
||||||
:: ShrIdent
|
:: ShrIdent
|
||||||
-> InboxId
|
-> InboxId
|
||||||
-> FollowerSetId
|
-> FollowerSetId
|
||||||
-> Key OutboxItem
|
-> OutboxItemId
|
||||||
-> [(ShrIdent, LocalSharerRelatedSet)]
|
-> LocalRecipientSet
|
||||||
-> AppDB
|
-> AppDB
|
||||||
[ ( (InstanceId, Host)
|
[ ( (InstanceId, Host)
|
||||||
, NonEmpty (RemoteActorId, LocalURI, LocalURI, Maybe UTCTime)
|
, NonEmpty (RemoteActorId, LocalURI, LocalURI, Maybe UTCTime)
|
||||||
)
|
)
|
||||||
]
|
]
|
||||||
deliverLocal shrAuthor ibidAuthor fsidAuthor obiid recips = do
|
deliverLocal shrAuthor ibidAuthor _fsidAuthor obiid = fmap (map $ second $ NE.map fromRR) . deliverLocal' True shrAuthor ibidAuthor obiid . map (uncurry clearCollections)
|
||||||
(pidsFollowers, remotesFollowers) <-
|
where
|
||||||
if authorFollowers shrAuthor recips
|
clearCollections shr (LocalSharerRelatedSet s js rs) =
|
||||||
then getFollowers fsidAuthor
|
( shr
|
||||||
else return ([], [])
|
, LocalSharerRelatedSet
|
||||||
ibidsFollowers <-
|
(clearSharer shr s)
|
||||||
map (personInbox . entityVal) <$>
|
(map (second clearProject) js)
|
||||||
selectList [PersonId <-. pidsFollowers] [Asc PersonInbox]
|
(map (second clearRepo) rs)
|
||||||
|
)
|
||||||
|
where
|
||||||
|
clearSharer shr (LocalSharerDirectSet s f) =
|
||||||
|
let f' = if shr == shrAuthor then f else False
|
||||||
|
in LocalSharerDirectSet s f'
|
||||||
|
clearProject (LocalProjectRelatedSet (LocalProjectDirectSet j _t _f) _ts) =
|
||||||
|
LocalProjectRelatedSet (LocalProjectDirectSet j False False) []
|
||||||
|
clearRepo (LocalRepoRelatedSet (LocalRepoDirectSet r _t _f)) =
|
||||||
|
LocalRepoRelatedSet $ LocalRepoDirectSet r False False
|
||||||
|
fromRR (RemoteRecipient raid luA luI msince) = (raid, luA, luI, msince)
|
||||||
|
|
||||||
|
data RemoteRecipient = RemoteRecipient
|
||||||
|
{ remoteRecipientActor :: RemoteActorId
|
||||||
|
, remoteRecipientId :: LocalURI
|
||||||
|
, remoteRecipientInbox :: LocalURI
|
||||||
|
, remoteRecipientErrorSince :: Maybe UTCTime
|
||||||
|
}
|
||||||
|
|
||||||
|
-- | Given a list of local recipients, which may include actors and
|
||||||
|
-- collections,
|
||||||
|
--
|
||||||
|
-- * Insert activity to inboxes of actors
|
||||||
|
-- * If collections are listed, insert activity to the local members and return
|
||||||
|
-- the remote members
|
||||||
|
deliverLocal'
|
||||||
|
:: Bool -- ^ Whether to deliver to collection only if owner actor is addressed
|
||||||
|
-> ShrIdent
|
||||||
|
-> InboxId
|
||||||
|
-> OutboxItemId
|
||||||
|
-> LocalRecipientSet
|
||||||
|
-> AppDB [((InstanceId, Host), NonEmpty RemoteRecipient)]
|
||||||
|
deliverLocal' requireOwner shrAuthor ibidAuthor obiid recips = do
|
||||||
ibidsSharer <- L.delete ibidAuthor <$> getSharerInboxes recips
|
ibidsSharer <- L.delete ibidAuthor <$> getSharerInboxes recips
|
||||||
ibidsOther <- concat <$> traverse getOtherInboxes recips
|
ibidsOther <- concat <$> traverse getOtherInboxes recips
|
||||||
let ibids = LO.union ibidsFollowers ibidsSharer ++ ibidsOther
|
|
||||||
|
(ibidsFollowers, remotesFollowers) <- do
|
||||||
|
fsidsSharer <- getSharerFollowerSets recips
|
||||||
|
fsidsOther <- concat <$> traverse getOtherFollowerSets recips
|
||||||
|
let fsids = fsidsSharer ++ fsidsOther
|
||||||
|
(,) <$> getLocalFollowers fsids <*> getRemoteFollowers fsids
|
||||||
|
|
||||||
|
ibidsTeams <- foldl' LO.union [] <$> traverse getTeams recips
|
||||||
|
|
||||||
|
let ibids = L.delete ibidAuthor (ibidsFollowers `LO.union` ibidsTeams `LO.union` ibidsSharer) ++ ibidsOther
|
||||||
ibiids <- insertMany $ replicate (length ibids) $ InboxItem True
|
ibiids <- insertMany $ replicate (length ibids) $ InboxItem True
|
||||||
insertMany_ $
|
insertMany_ $
|
||||||
map (\ (ibid, ibiid) -> InboxItemLocal ibid obiid ibiid)
|
map (\ (ibid, ibiid) -> InboxItemLocal ibid obiid ibiid)
|
||||||
(zip ibids ibiids)
|
(zip ibids ibiids)
|
||||||
return remotesFollowers
|
return remotesFollowers
|
||||||
where
|
where
|
||||||
|
getSharerInboxes :: LocalRecipientSet -> AppDB [InboxId]
|
||||||
getSharerInboxes sharers = do
|
getSharerInboxes sharers = do
|
||||||
let shrs =
|
let shrs =
|
||||||
[shr | (shr, s) <- sharers
|
[shr | (shr, s) <- sharers
|
||||||
, localRecipSharer $ localRecipSharerDirect s
|
, localRecipSharer $ localRecipSharerDirect s
|
||||||
]
|
]
|
||||||
sids <- selectKeysList [SharerIdent <-. shrs] []
|
sids <- selectKeysList [SharerIdent <-. shrs] []
|
||||||
map (personInbox . entityVal) <$> selectList [PersonIdent <-. sids] [Asc PersonInbox]
|
map (personInbox . entityVal) <$> selectList [PersonIdent <-. sids] [Asc PersonInbox]
|
||||||
|
|
||||||
|
getOtherInboxes :: (ShrIdent, LocalSharerRelatedSet) -> AppDB [InboxId]
|
||||||
getOtherInboxes (shr, LocalSharerRelatedSet _ projects repos) = do
|
getOtherInboxes (shr, LocalSharerRelatedSet _ projects repos) = do
|
||||||
msid <- getKeyBy $ UniqueSharer shr
|
msid <- getKeyBy $ UniqueSharer shr
|
||||||
case msid of
|
case msid of
|
||||||
|
@ -758,12 +802,142 @@ deliverLocal shrAuthor ibidAuthor fsidAuthor obiid recips = do
|
||||||
getRepoInboxes sid repos =
|
getRepoInboxes sid repos =
|
||||||
let rps =
|
let rps =
|
||||||
[rp | (rp, r) <- repos
|
[rp | (rp, r) <- repos
|
||||||
, localRecipRepo $ localRecipRepoDirect r
|
, localRecipRepo $ localRecipRepoDirect r
|
||||||
]
|
]
|
||||||
in map (repoInbox . entityVal) <$>
|
in map (repoInbox . entityVal) <$>
|
||||||
selectList [RepoSharer ==. sid, RepoIdent <-. rps] []
|
selectList [RepoSharer ==. sid, RepoIdent <-. rps] []
|
||||||
authorFollowers shr lrset =
|
|
||||||
case lookup shr lrset of
|
getSharerFollowerSets :: LocalRecipientSet -> AppDB [FollowerSetId]
|
||||||
Just s
|
getSharerFollowerSets sharers = do
|
||||||
| localRecipSharerFollowers $ localRecipSharerDirect s -> True
|
let shrs =
|
||||||
_ -> False
|
[shr | (shr, s) <- sharers
|
||||||
|
, let d = localRecipSharerDirect s
|
||||||
|
in localRecipSharerFollowers d &&
|
||||||
|
(localRecipSharer d || not requireOwner || shr == shrAuthor)
|
||||||
|
]
|
||||||
|
sids <- selectKeysList [SharerIdent <-. shrs] []
|
||||||
|
map (personFollowers . entityVal) <$> selectList [PersonIdent <-. sids] []
|
||||||
|
|
||||||
|
getOtherFollowerSets :: (ShrIdent, LocalSharerRelatedSet) -> AppDB [FollowerSetId]
|
||||||
|
getOtherFollowerSets (shr, LocalSharerRelatedSet _ projects repos) = do
|
||||||
|
msid <- getKeyBy $ UniqueSharer shr
|
||||||
|
case msid of
|
||||||
|
Nothing -> return []
|
||||||
|
Just sid ->
|
||||||
|
(++)
|
||||||
|
<$> getProjectFollowerSets sid projects
|
||||||
|
<*> getRepoFollowerSets sid repos
|
||||||
|
where
|
||||||
|
getProjectFollowerSets sid projects = do
|
||||||
|
let prjsJ =
|
||||||
|
[prj | (prj, j) <- projects
|
||||||
|
, let d = localRecipProjectDirect j
|
||||||
|
in localRecipProjectFollowers d &&
|
||||||
|
(localRecipProject d || not requireOwner)
|
||||||
|
]
|
||||||
|
fsidsJ <-
|
||||||
|
map (projectFollowers . entityVal) <$>
|
||||||
|
selectList [ProjectSharer ==. sid, ProjectIdent <-. prjsJ] []
|
||||||
|
let prjsT =
|
||||||
|
if requireOwner
|
||||||
|
then
|
||||||
|
[ (prj, localRecipTicketRelated j)
|
||||||
|
| (prj, j) <- projects
|
||||||
|
, localRecipProject $ localRecipProjectDirect j
|
||||||
|
]
|
||||||
|
else
|
||||||
|
map (second localRecipTicketRelated) projects
|
||||||
|
fsidssT <- for prjsT $ \ (prj, tickets) -> do
|
||||||
|
mjid <- getKeyBy $ UniqueProject prj sid
|
||||||
|
case mjid of
|
||||||
|
Nothing -> return []
|
||||||
|
Just jid -> getTicketFollowerSets jid tickets
|
||||||
|
return $ fsidsJ ++ map E.unValue (concat fsidssT)
|
||||||
|
where
|
||||||
|
getTicketFollowerSets jid tickets = do
|
||||||
|
let ltkhids =
|
||||||
|
[ltkhid | (ltkhid, t) <- tickets
|
||||||
|
, localRecipTicketFollowers t
|
||||||
|
]
|
||||||
|
ltids <- catMaybes <$> traverse decodeKeyHashid ltkhids
|
||||||
|
E.select $ E.from $ \ (lt `E.InnerJoin` t `E.InnerJoin` tpl `E.LeftOuterJoin` tup `E.LeftOuterJoin` tar) -> do
|
||||||
|
E.on $ E.just (tpl E.^. TicketProjectLocalId) E.==. tar E.?. TicketAuthorRemoteTicket
|
||||||
|
E.on $ E.just (tpl E.^. TicketProjectLocalId) E.==. tup E.?. TicketUnderProjectProject
|
||||||
|
E.on $ t E.^. TicketId E.==. tpl E.^. TicketProjectLocalTicket
|
||||||
|
E.on $ lt E.^. LocalTicketTicket E.==. t E.^. TicketId
|
||||||
|
E.where_ $
|
||||||
|
tpl E.^. TicketProjectLocalProject E.==. E.val jid E.&&.
|
||||||
|
E.not_
|
||||||
|
( E.isNothing (tup E.?. TicketUnderProjectId) E.&&.
|
||||||
|
E.isNothing (tar E.?. TicketAuthorRemoteId)
|
||||||
|
)
|
||||||
|
return $ lt E.^. LocalTicketFollowers
|
||||||
|
getRepoFollowerSets sid repos =
|
||||||
|
let rps =
|
||||||
|
[rp | (rp, r) <- repos
|
||||||
|
, let d = localRecipRepoDirect r
|
||||||
|
in localRecipRepoFollowers d &&
|
||||||
|
(localRecipRepo d || not requireOwner)
|
||||||
|
]
|
||||||
|
in map (repoFollowers . entityVal) <$>
|
||||||
|
selectList [RepoSharer ==. sid, RepoIdent <-. rps] []
|
||||||
|
|
||||||
|
getLocalFollowers :: [FollowerSetId] -> AppDB [InboxId]
|
||||||
|
getLocalFollowers fsids = do
|
||||||
|
pids <-
|
||||||
|
map (followPerson . entityVal) <$>
|
||||||
|
selectList [FollowTarget <-. fsids] []
|
||||||
|
map (personInbox . entityVal) <$>
|
||||||
|
selectList [PersonId <-. pids] [Asc PersonInbox]
|
||||||
|
|
||||||
|
getRemoteFollowers :: [FollowerSetId] -> AppDB [((InstanceId, Host), NonEmpty RemoteRecipient)]
|
||||||
|
getRemoteFollowers fsids =
|
||||||
|
fmap groupRemotes $
|
||||||
|
E.select $ E.from $ \ (rf `E.InnerJoin` ra `E.InnerJoin` ro `E.InnerJoin` i) -> do
|
||||||
|
E.on $ ro E.^. RemoteObjectInstance E.==. i E.^. InstanceId
|
||||||
|
E.on $ ra E.^. RemoteActorIdent E.==. ro E.^. RemoteObjectId
|
||||||
|
E.on $ rf E.^. RemoteFollowActor E.==. ra E.^. RemoteActorId
|
||||||
|
E.where_ $ rf E.^. RemoteFollowTarget `E.in_` E.valList fsids
|
||||||
|
E.orderBy [E.asc $ i E.^. InstanceId, E.asc $ ra E.^. RemoteActorId]
|
||||||
|
return
|
||||||
|
( i E.^. InstanceId
|
||||||
|
, i E.^. InstanceHost
|
||||||
|
, ra E.^. RemoteActorId
|
||||||
|
, ro E.^. RemoteObjectIdent
|
||||||
|
, ra E.^. RemoteActorInbox
|
||||||
|
, ra E.^. RemoteActorErrorSince
|
||||||
|
)
|
||||||
|
where
|
||||||
|
groupRemotes = groupWithExtractBy ((==) `on` fst) fst snd . map toTuples
|
||||||
|
where
|
||||||
|
toTuples (E.Value iid, E.Value h, E.Value raid, E.Value luA, E.Value luI, E.Value ms) = ((iid, h), RemoteRecipient raid luA luI ms)
|
||||||
|
|
||||||
|
getTeams :: (ShrIdent, LocalSharerRelatedSet) -> AppDB [InboxId]
|
||||||
|
getTeams (shr, LocalSharerRelatedSet _ projects repos) = do
|
||||||
|
msid <- getKeyBy $ UniqueSharer shr
|
||||||
|
case msid of
|
||||||
|
Nothing -> return []
|
||||||
|
Just sid ->
|
||||||
|
LO.union
|
||||||
|
<$> getProjectTeams sid projects
|
||||||
|
<*> getRepoTeams sid repos
|
||||||
|
where
|
||||||
|
getProjectTeams sid projects = do
|
||||||
|
let prjs =
|
||||||
|
[prj | (prj, LocalProjectRelatedSet d ts) <- projects
|
||||||
|
, (localRecipProject d || not requireOwner) &&
|
||||||
|
(localRecipProjectTeam d || any (localRecipTicketTeam . snd) ts)
|
||||||
|
]
|
||||||
|
jids <- selectKeysList [ProjectSharer ==. sid, ProjectIdent <-. prjs] []
|
||||||
|
pids <- map (projectCollabPerson . entityVal) <$> selectList [ProjectCollabProject <-. jids] []
|
||||||
|
map (personInbox . entityVal) <$> selectList [PersonId <-. pids] [Asc PersonInbox]
|
||||||
|
getRepoTeams sid repos = do
|
||||||
|
let rps =
|
||||||
|
[rp | (rp, r) <- repos
|
||||||
|
, let d = localRecipRepoDirect r
|
||||||
|
in localRecipRepoTeam d &&
|
||||||
|
(localRecipRepo d || not requireOwner)
|
||||||
|
]
|
||||||
|
rids <- selectKeysList [RepoSharer ==. sid, RepoIdent <-. rps] []
|
||||||
|
pids <- map (repoCollabPerson . entityVal) <$> selectList [RepoCollabRepo <-. rids] []
|
||||||
|
map (personInbox . entityVal) <$> selectList [PersonId <-. pids] [Asc PersonInbox]
|
||||||
|
|
|
@ -20,6 +20,7 @@ module Yesod.Hashids
|
||||||
, encodeKeyHashidPure
|
, encodeKeyHashidPure
|
||||||
, getEncodeKeyHashid
|
, getEncodeKeyHashid
|
||||||
, encodeKeyHashid
|
, encodeKeyHashid
|
||||||
|
, decodeKeyHashid
|
||||||
, decodeKeyHashidF
|
, decodeKeyHashidF
|
||||||
, decodeKeyHashidM
|
, decodeKeyHashidM
|
||||||
, decodeKeyHashidE
|
, decodeKeyHashidE
|
||||||
|
@ -39,7 +40,6 @@ import Database.Persist.Sql
|
||||||
import Web.Hashids
|
import Web.Hashids
|
||||||
import Web.PathPieces
|
import Web.PathPieces
|
||||||
import Yesod.Core
|
import Yesod.Core
|
||||||
import Yesod.Core.Handler
|
|
||||||
|
|
||||||
import Yesod.MonadSite
|
import Yesod.MonadSite
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue