Support forwarding activities from repo actors
This commit is contained in:
parent
17e59af1c4
commit
e68a659221
6 changed files with 70 additions and 10 deletions
|
@ -132,6 +132,12 @@ ForwarderProject
|
|||
|
||||
UniqueForwarderProject task
|
||||
|
||||
ForwarderRepo
|
||||
task ForwardingId
|
||||
sender RepoId
|
||||
|
||||
UniqueForwarderRepo task
|
||||
|
||||
VerifKey
|
||||
ident LocalRefURI
|
||||
instance InstanceId
|
||||
|
|
5
migrations/2020_05_25_fwd_sender_repo.model
Normal file
5
migrations/2020_05_25_fwd_sender_repo.model
Normal file
|
@ -0,0 +1,5 @@
|
|||
ForwarderRepo
|
||||
task ForwardingId
|
||||
sender RepoId
|
||||
|
||||
UniqueForwarderRepo task
|
|
@ -34,8 +34,10 @@ module Vervis.ActivityPub
|
|||
, deliverHttpBL
|
||||
, deliverRemoteDB_J
|
||||
, deliverRemoteDB_S
|
||||
, deliverRemoteDB_R
|
||||
, deliverRemoteHTTP_J
|
||||
, deliverRemoteHTTP_S
|
||||
, deliverRemoteHTTP_R
|
||||
, checkForward
|
||||
, parseTarget
|
||||
--, checkDep
|
||||
|
@ -363,6 +365,16 @@ deliverRemoteDB_S
|
|||
[((InstanceId, Host), NonEmpty (RemoteActorId, LocalURI, LocalURI, ForwardingId, ForwarderSharerId))]
|
||||
deliverRemoteDB_S = deliverRemoteDB_ ForwarderSharer
|
||||
|
||||
deliverRemoteDB_R
|
||||
:: BL.ByteString
|
||||
-> RemoteActivityId
|
||||
-> RepoId
|
||||
-> ByteString
|
||||
-> [((InstanceId, Host), NonEmpty RemoteRecipient)]
|
||||
-> AppDB
|
||||
[((InstanceId, Host), NonEmpty (RemoteActorId, LocalURI, LocalURI, ForwardingId, ForwarderRepoId))]
|
||||
deliverRemoteDB_R = deliverRemoteDB_ ForwarderRepo
|
||||
|
||||
deliverRemoteHTTP'
|
||||
:: (MonadSite m, SiteEnv m ~ App, PersistRecordBackend fwder SqlBackend)
|
||||
=> UTCTime
|
||||
|
@ -434,6 +446,18 @@ deliverRemoteHTTP_S
|
|||
-> m ()
|
||||
deliverRemoteHTTP_S now shr = deliverRemoteHTTP' now $ LocalActorSharer shr
|
||||
|
||||
deliverRemoteHTTP_R
|
||||
:: (MonadSite m, SiteEnv m ~ App)
|
||||
=> UTCTime
|
||||
-> ShrIdent
|
||||
-> RpIdent
|
||||
-> BL.ByteString
|
||||
-> ByteString
|
||||
-> [((InstanceId, Host), NonEmpty (RemoteActorId, LocalURI, LocalURI, ForwardingId, ForwarderRepoId))]
|
||||
-> m ()
|
||||
deliverRemoteHTTP_R now shr rp =
|
||||
deliverRemoteHTTP' now $ LocalActorRepo shr rp
|
||||
|
||||
checkForward recip = join <$> do
|
||||
let hSig = hForwardingSignature
|
||||
msig <- maybeHeader hSig
|
||||
|
|
|
@ -373,13 +373,17 @@ fixRunningDeliveries = do
|
|||
, " forwarding deliveries"
|
||||
]
|
||||
|
||||
data Fwder = FwderProject ForwarderProjectId | FwderSharer ForwarderSharerId
|
||||
data Fwder
|
||||
= FwderProject ForwarderProjectId
|
||||
| FwderSharer ForwarderSharerId
|
||||
| FwderRepo ForwarderRepoId
|
||||
|
||||
partitionFwders :: [Fwder] -> ([ForwarderProjectId], [ForwarderSharerId])
|
||||
partitionFwders = foldl' f ([], [])
|
||||
partitionFwders :: [Fwder] -> ([ForwarderProjectId], [ForwarderSharerId], [ForwarderRepoId])
|
||||
partitionFwders = foldl' f ([], [], [])
|
||||
where
|
||||
f (js, ss) (FwderProject j) = (j : js, ss)
|
||||
f (js, ss) (FwderSharer s) = (js , s : ss)
|
||||
f (js, ss, rs) (FwderProject j) = (j : js, ss , rs)
|
||||
f (js, ss, rs) (FwderSharer s) = (js , s : ss, rs)
|
||||
f (js, ss, rs) (FwderRepo r) = (js , ss , r : rs)
|
||||
|
||||
retryOutboxDelivery :: Worker ()
|
||||
retryOutboxDelivery = do
|
||||
|
@ -448,7 +452,11 @@ retryOutboxDelivery = do
|
|||
let (linkedOld, linkedNew) = partitionEithers $ map (decideBySinceDL dropAfter now . adaptLinked) linked
|
||||
deleteWhere [DeliveryId <-. linkedOld]
|
||||
-- Same for forwarding deliveries, which are always linked
|
||||
forwarding <- E.select $ E.from $ \ (fw `E.InnerJoin` ra `E.InnerJoin` ro `E.InnerJoin` i `E.LeftOuterJoin` (fwj `E.InnerJoin` j `E.InnerJoin` s) `E.LeftOuterJoin` (fws `E.InnerJoin` s2)) -> do
|
||||
forwarding <- E.select $ E.from $ \ (fw `E.InnerJoin` ra `E.InnerJoin` ro `E.InnerJoin` i `E.LeftOuterJoin` (fwj `E.InnerJoin` j `E.InnerJoin` s) `E.LeftOuterJoin` (fws `E.InnerJoin` s2) `E.LeftOuterJoin` (fwr `E.InnerJoin` r `E.InnerJoin` s3)) -> do
|
||||
E.on $ r E.?. RepoSharer E.==. s3 E.?. SharerId
|
||||
E.on $ fwr E.?. ForwarderRepoSender E.==. r E.?. RepoId
|
||||
E.on $ E.just (fw E.^. ForwardingId) E.==. fwr E.?. ForwarderRepoTask
|
||||
|
||||
E.on $ fws E.?. ForwarderSharerSender E.==. s2 E.?. SharerId
|
||||
E.on $ E.just (fw E.^. ForwardingId) E.==. fws E.?. ForwarderSharerTask
|
||||
|
||||
|
@ -477,13 +485,18 @@ retryOutboxDelivery = do
|
|||
, fws E.?. ForwarderSharerId
|
||||
, s2 E.?. SharerIdent
|
||||
|
||||
, fwr E.?. ForwarderRepoId
|
||||
, s3 E.?. SharerIdent
|
||||
, r E.?. RepoIdent
|
||||
|
||||
, fw E.^. ForwardingSignature
|
||||
)
|
||||
let (forwardingOld, forwardingNew) = partitionEithers $ map (decideBySinceFW dropAfter now . adaptForwarding) forwarding
|
||||
(fwidsOld, fwdersOld) = unzip forwardingOld
|
||||
(fwjidsOld, fwsidsOld) = partitionFwders fwdersOld
|
||||
(fwjidsOld, fwsidsOld, fwridsOld) = partitionFwders fwdersOld
|
||||
deleteWhere [ForwarderProjectId <-. fwjidsOld]
|
||||
deleteWhere [ForwarderSharerId <-. fwsidsOld]
|
||||
deleteWhere [ForwarderRepoId <-. fwridsOld]
|
||||
deleteWhere [ForwardingId <-. fwidsOld]
|
||||
return (groupUnlinked lonelyNew, groupLinked linkedNew, groupForwarding forwardingNew)
|
||||
let deliver = deliverHttpBL
|
||||
|
@ -575,6 +588,7 @@ retryOutboxDelivery = do
|
|||
, E.Value fwid, E.Value body
|
||||
, E.Value mfwjid, E.Value mprj, E.Value mshr
|
||||
, E.Value mfwsid, E.Value mshr2
|
||||
, E.Value mfwrid, E.Value mrp, E.Value mshr3
|
||||
, E.Value sig
|
||||
) =
|
||||
( ( (iid, h)
|
||||
|
@ -583,11 +597,14 @@ retryOutboxDelivery = do
|
|||
, BL.fromStrict body
|
||||
, let project = together3 mfwjid mprj mshr
|
||||
sharer = together2 mfwsid mshr2
|
||||
in case (project, sharer) of
|
||||
(Just (fwjid, shr, prj), Nothing) ->
|
||||
repo = together3 mfwrid mrp mshr3
|
||||
in case (project, sharer, repo) of
|
||||
(Just (fwjid, shr, prj), Nothing, Nothing) ->
|
||||
(FwderProject fwjid, ProjectR shr prj)
|
||||
(Nothing, Just (fwsid, shr)) ->
|
||||
(Nothing, Just (fwsid, shr), Nothing) ->
|
||||
(FwderSharer fwsid, SharerR shr)
|
||||
(Nothing, Nothing, Just (fwrid, shr, rp)) ->
|
||||
(FwderRepo fwrid, RepoR shr rp)
|
||||
_ -> error $ "Non-single fwder for fw#" ++ show fwid
|
||||
, sig
|
||||
)
|
||||
|
@ -599,6 +616,7 @@ retryOutboxDelivery = do
|
|||
together2 (Just x) (Just y) = Just (x, y)
|
||||
together2 Nothing Nothing = Nothing
|
||||
together2 _ _ = error $ "Got weird forwarder for fw#" ++ show fwid
|
||||
together3 :: Maybe a -> Maybe b -> Maybe c -> Maybe (a, b, c)
|
||||
together3 (Just x) (Just y) (Just z) = Just (x, y, z)
|
||||
together3 Nothing Nothing Nothing = Nothing
|
||||
together3 _ _ _ = error $ "Got weird forwarder for fw#" ++ show fwid
|
||||
|
@ -705,6 +723,7 @@ retryOutboxDelivery = do
|
|||
case fwder of
|
||||
FwderProject k -> delete k
|
||||
FwderSharer k -> delete k
|
||||
FwderRepo k -> delete k
|
||||
delete fwid
|
||||
return True
|
||||
results <- sequence waitsD
|
||||
|
|
|
@ -1585,6 +1585,8 @@ changes hLocal ctx =
|
|||
, addFieldPrimRequired "Patch" defaultTime "created"
|
||||
-- 251
|
||||
, addFieldPrimOptional "TicketRepoLocal" (Nothing :: Maybe Text) "branch"
|
||||
-- 252
|
||||
, addEntities model_2020_05_25
|
||||
]
|
||||
|
||||
migrateDB
|
||||
|
|
|
@ -198,6 +198,7 @@ module Vervis.Migration.Model
|
|||
, TicketContextLocal247Generic (..)
|
||||
, TicketProjectLocal247Generic (..)
|
||||
, model_2020_05_17
|
||||
, model_2020_05_25
|
||||
)
|
||||
where
|
||||
|
||||
|
@ -395,3 +396,6 @@ makeEntitiesMigration "247"
|
|||
|
||||
model_2020_05_17 :: [Entity SqlBackend]
|
||||
model_2020_05_17 = $(schema "2020_05_17_patch")
|
||||
|
||||
model_2020_05_25 :: [Entity SqlBackend]
|
||||
model_2020_05_25 = $(schema "2020_05_25_fwd_sender_repo")
|
||||
|
|
Loading…
Reference in a new issue