Smarter treatment of recipients that are collections

- Allow client to specify recipients that don't need to be delivered to
- When fetching recipient, recognize collections and don't try to deliver to
  them
- Remember collections in DB, and use that to skip HTTP delivery
This commit is contained in:
fr33domlover 2019-05-17 22:42:01 +00:00
parent 48882d65ad
commit 6d304b9307
11 changed files with 233 additions and 102 deletions

View file

@ -131,6 +131,12 @@ Instance
UniqueInstance host
RemoteCollection
instance InstanceId
ident LocalURI
UniqueRemoteCollection instance ident
FollowerSet
Follow

View file

@ -0,0 +1,5 @@
RemoteCollection
instance InstanceId
ident Text
UniqueRemoteCollection instance ident

View file

@ -19,6 +19,7 @@ module Data.Aeson.Local
, fromEither
, frg
, (.=?)
, (.=%)
, WithValue (..)
)
where
@ -59,6 +60,13 @@ infixr 8 .=?
_ .=? Nothing = mempty
k .=? (Just v) = k .= v
infixr 8 .=%
(.=%) :: ToJSON v => Text -> [v] -> Series
k .=% v =
if null v
then mempty
else k .= v
data WithValue a = WithValue
{ wvRaw :: Object
, wvParsed :: a

View file

@ -1051,6 +1051,11 @@ isInstanceErrorG (Just e) =
APGetErrorJSON _ -> False
APGetErrorContentType _ -> False
data Recip
= RecipRA (Entity RemoteActor)
| RecipURA (Entity UnfetchedRemoteActor)
| RecipRC (Entity RemoteCollection)
-- | Handle a Note submitted by a local user to their outbox. It can be either
-- a comment on a local ticket, or a comment on some remote context. Return an
-- error message if the Note is rejected, otherwise the new 'LocalMessageId'.
@ -1477,16 +1482,18 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c
else do
es <- for lus' $ \ lu -> do
ma <- runMaybeT
$ Left <$> MaybeT (getBy $ UniqueRemoteActor iid lu)
<|> Right <$> MaybeT (getBy $ UniqueUnfetchedRemoteActor iid lu)
$ RecipRA <$> MaybeT (getBy $ UniqueRemoteActor iid lu)
<|> RecipURA <$> MaybeT (getBy $ UniqueUnfetchedRemoteActor iid lu)
<|> RecipRC <$> MaybeT (getBy $ UniqueRemoteCollection iid lu)
return $
case ma of
Nothing -> Left lu
Just e ->
Right $ case e of
Left (Entity raid ra) -> Left (raid, remoteActorIdent ra, remoteActorInbox ra, remoteActorErrorSince ra)
Right (Entity uraid ura) -> Right (uraid, unfetchedRemoteActorIdent ura, unfetchedRemoteActorSince ura)
let (unknown, newKnown) = partitionEithers $ NE.toList es
Nothing -> Just $ Left lu
Just r ->
case r of
RecipRA (Entity raid ra) -> Just $ Right $ Left (raid, remoteActorIdent ra, remoteActorInbox ra, remoteActorErrorSince ra)
RecipURA (Entity uraid ura) -> Just $ Right $ Right (uraid, unfetchedRemoteActorIdent ura, unfetchedRemoteActorSince ura)
RecipRC _ -> Nothing
let (unknown, newKnown) = partitionEithers $ catMaybes $ NE.toList es
(fetched, unfetched) = partitionEithers newKnown
return ((iid, h), (nonEmpty fetched, nonEmpty unfetched, nonEmpty unknown))
let moreKnown = mapMaybe (\ (i, (f, _, _)) -> (i,) <$> f) recips'
@ -1615,18 +1622,21 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c
if isInstanceErrorG err
then Nothing
else Just Nothing
Right (Right era) -> Just $ Just era
Right (Right mera) -> Just $ Just mera
case e' of
Nothing -> runSiteDB $ do
let recips' = NE.toList recips
updateWhere [UnfetchedRemoteActorId <-. map fst3 recips', UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now]
updateWhere [UnlinkedDeliveryId <-. map thd3 recips'] [UnlinkedDeliveryRunning =. False]
Just mera -> do
Just mmera -> do
for_ rs $ \ (uraid, luActor, udlid) ->
fork $ do
e <- fetchRemoteActor iid h luActor
case e of
Right (Right (Entity raid ra)) -> do
Right (Right mera) ->
case mera of
Nothing -> runSiteDB $ delete udlid
Just (Entity raid ra) -> do
(fwd, e') <- deliver luActor h $ remoteActorInbox ra
runSiteDB $
case e' of
@ -1638,10 +1648,13 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c
_ -> runSiteDB $ do
updateWhere [UnfetchedRemoteActorId ==. uraid, UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now]
update udlid [UnlinkedDeliveryRunning =. False]
case mera of
case mmera of
Nothing -> runSiteDB $ do
updateWhere [UnfetchedRemoteActorId ==. uraid, UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now]
update udlid [UnlinkedDeliveryRunning =. False]
Just mera ->
case mera of
Nothing -> runSiteDB $ delete udlid
Just (Entity raid ra) -> do
(fwd, e'') <- deliver luActor h $ remoteActorInbox ra
runSiteDB $
@ -1661,7 +1674,9 @@ retryOutboxDelivery = do
(udls, dls, fws) <- runSiteDB $ do
-- Get all unlinked deliveries which aren't running already in outbox
-- post handlers
unlinked' <- E.select $ E.from $ \ (udl `E.InnerJoin` ob `E.InnerJoin` ura `E.InnerJoin` i `E.LeftOuterJoin` ra) -> do
unlinked' <- E.select $ E.from $ \ (udl `E.InnerJoin` ob `E.InnerJoin` ura `E.InnerJoin` i `E.LeftOuterJoin` ra `E.LeftOuterJoin` rc) -> do
E.on $ E.just (ura E.^. UnfetchedRemoteActorInstance) E.==. rc E.?. RemoteCollectionInstance
E.&&. E.just (ura E.^. UnfetchedRemoteActorIdent) E.==. rc E.?. RemoteCollectionIdent
E.on $ E.just (ura E.^. UnfetchedRemoteActorInstance) E.==. ra E.?. RemoteActorInstance
E.&&. E.just (ura E.^. UnfetchedRemoteActorIdent) E.==. ra E.?. RemoteActorIdent
E.on $ ura E.^. UnfetchedRemoteActorInstance E.==. i E.^. InstanceId
@ -1680,6 +1695,7 @@ retryOutboxDelivery = do
, udl E.^. UnlinkedDeliveryForwarding
, ob E.^. OutboxItemActivity
, ra E.?. RemoteActorId
, rc E.?. RemoteCollectionId
)
-- Strip the E.Value wrappers and organize the records for the
-- filtering and grouping we'll need to do
@ -1689,7 +1705,7 @@ retryOutboxDelivery = do
(found, lonely) = partitionMaybes unlinked
-- Turn the found ones into linked deliveries
deleteWhere [UnlinkedDeliveryId <-. map (unlinkedID . snd) found]
insertMany_ $ map toLinked found
insertMany_ $ mapMaybe toLinked found
-- We're left with the lonely ones. We'll check which actors have been
-- unreachable for too long, and we'll delete deliveries for them. The
-- rest of the actors we'll try to reach by HTTP.
@ -1785,8 +1801,8 @@ retryOutboxDelivery = do
logInfo "Periodic delivery done"
where
adaptUnlinked
(E.Value iid, E.Value h, E.Value uraid, E.Value luRecip, E.Value since, E.Value udlid, E.Value obid, E.Value fwd, E.Value act, E.Value mraid) =
( mraid
(E.Value iid, E.Value h, E.Value uraid, E.Value luRecip, E.Value since, E.Value udlid, E.Value obid, E.Value fwd, E.Value act, E.Value mraid, E.Value mrcid) =
( Left <$> mraid <|> Right <$> mrcid
, ( ( (iid, h)
, ((uraid, luRecip), (udlid, fwd, obid, persistJSONValue act))
)
@ -1794,7 +1810,8 @@ retryOutboxDelivery = do
)
)
unlinkedID ((_, (_, (udlid, _, _, _))), _) = udlid
toLinked (raid, ((_, (_, (_, fwd, obid, _))), _)) = Delivery raid obid fwd False
toLinked (Left raid, ((_, (_, (_, fwd, obid, _))), _)) = Just $ Delivery raid obid fwd False
toLinked (Right _ , _ ) = Nothing
relevant dropAfter now since = addUTCTime dropAfter since > now
decideBySinceUDL dropAfter now (udl@(_, (_, (udlid, _, _, _))), msince) =
case msince of
@ -1889,7 +1906,10 @@ retryOutboxDelivery = do
renderFedURI (l2f h luRecip)
e <- fetchRemoteActor iid h luRecip
case e of
Right (Right (Entity raid ra)) -> do
Right (Right mera) ->
case mera of
Nothing -> runSiteDB $ deleteWhere [UnlinkedDeliveryId <-. map fst4 (NE.toList delivs)]
Just (Entity raid ra) -> do
waitsD <- for delivs $ \ (udlid, fwd, obid, doc) -> fork $ do
let fwd' = if fwd then Just luRecip else Nothing
e' <- deliver doc fwd' h $ remoteActorInbox ra

View file

@ -166,11 +166,12 @@ getTopReply replyP = do
postTopReply
:: Text
-> [Route App]
-> [Route App]
-> Route App
-> Route App
-> (LocalMessageId -> Route App)
-> Handler Html
postTopReply hDest recips context replyP after = do
postTopReply hDest recipsA recipsC context replyP after = do
((result, widget), enctype) <- runFormPost newMessageForm
elmid <- runExceptT $ do
msg <- case result of
@ -185,6 +186,7 @@ postTopReply hDest recips context replyP after = do
lift $ runDB $ sharerIdent <$> get404 (personIdent p)
let (hLocal, luAuthor) = f2l $ encodeRouteFed $ SharerR shrAuthor
uContext = encodeRecipRoute context
recips = recipsA ++ recipsC
note = Note
{ noteId = Nothing
, noteAttrib = luAuthor
@ -194,6 +196,7 @@ postTopReply hDest recips context replyP after = do
, audienceCc = []
, audienceBcc = []
, audienceGeneral = []
, audienceNonActors = map encodeRecipRoute recipsC
}
, noteReplyTo = Just uContext
, noteContext = Just uContext
@ -224,6 +227,7 @@ getReply replyG replyP getdid midParent = do
postReply
:: Text
-> [Route App]
-> [Route App]
-> Route App
-> (MessageId -> Route App)
-> (MessageId -> Route App)
@ -231,7 +235,7 @@ postReply
-> AppDB DiscussionId
-> MessageId
-> Handler Html
postReply hDest recips context replyG replyP after getdid midParent = do
postReply hDest recipsA recipsC context replyG replyP after getdid midParent = do
((result, widget), enctype) <- runFormPost newMessageForm
elmid <- runExceptT $ do
msg <- case result of
@ -262,6 +266,7 @@ postReply hDest recips context replyG replyP after getdid midParent = do
return (shr, parent)
let (hLocal, luAuthor) = f2l $ encodeRouteFed $ SharerR shrAuthor
uContext = encodeRecipRoute context
recips = recipsA ++ recipsC
note = Note
{ noteId = Nothing
, noteAttrib = luAuthor
@ -271,6 +276,7 @@ postReply hDest recips context replyG replyP after getdid midParent = do
, audienceCc = []
, audienceBcc = []
, audienceGeneral = []
, audienceNonActors = map encodeRecipRoute recipsC
}
, noteReplyTo = Just uParent
, noteContext = Just uContext

View file

@ -327,11 +327,11 @@ postOutboxR shrAuthor = do
let encodeRecipRoute = l2f hTicket . encodeRouteLocal
uTicket = encodeRecipRoute $ TicketR shrTicket prj num
(hLocal, luAuthor) = f2l $ encodeRouteFed $ SharerR shrAuthor
recips =
[ ProjectR shrTicket prj
, TicketParticipantsR shrTicket prj num
collections =
[ TicketParticipantsR shrTicket prj num
, TicketTeamR shrTicket prj num
]
recips = ProjectR shrTicket prj : collections
note = Note
{ noteId = Nothing
, noteAttrib = luAuthor
@ -341,6 +341,7 @@ postOutboxR shrAuthor = do
, audienceCc = []
, audienceBcc = []
, audienceGeneral = []
, audienceNonActors = map encodeRecipRoute collections
}
, noteReplyTo = Just $ fromMaybe uTicket muParent
, noteContext = Just uTicket

View file

@ -648,10 +648,8 @@ postTicketDiscussionR shr prj num = do
hLocal <- getsYesod $ appInstanceHost . appSettings
postTopReply
hLocal
[ ProjectR shr prj
, TicketParticipantsR shr prj num
, TicketTeamR shr prj num
]
[ProjectR shr prj]
[TicketParticipantsR shr prj num, TicketTeamR shr prj num]
(TicketR shr prj num)
(TicketDiscussionR shr prj num)
(const $ TicketR shr prj num)
@ -668,10 +666,8 @@ postTicketMessageR shr prj num mkhid = do
hLocal <- getsYesod $ appInstanceHost . appSettings
postReply
hLocal
[ ProjectR shr prj
, TicketParticipantsR shr prj num
, TicketTeamR shr prj num
]
[ProjectR shr prj]
[TicketParticipantsR shr prj num, TicketTeamR shr prj num]
(TicketR shr prj num)
(TicketReplyR shr prj num . encodeHid)
(TicketMessageR shr prj num . encodeHid)

View file

@ -274,6 +274,8 @@ changes =
, addUnique "InboxItemLocal" $ Unique "UniqueInboxItemLocalItem" ["item"]
-- 73
, addUnique "InboxItemRemote" $ Unique "UniqueInboxItemRemoteItem" ["item"]
-- 74
, addEntities model_2019_05_17
]
migrateDB :: MonadIO m => ReaderT SqlBackend m (Either Text (Int, Int))

View file

@ -39,6 +39,7 @@ module Vervis.Migration.Model
, model_2019_04_12
, model_2019_04_22
, model_2019_05_03
, model_2019_05_17
)
where
@ -106,3 +107,6 @@ model_2019_04_22 = $(schema "2019_04_22")
model_2019_05_03 :: [Entity SqlBackend]
model_2019_05_03 = $(schema "2019_05_03")
model_2019_05_17 :: [Entity SqlBackend]
model_2019_05_17 = $(schema "2019_05_17")

View file

@ -32,6 +32,7 @@ where
import Prelude
import Control.Applicative
import Control.Concurrent (forkIO)
import Control.Concurrent.MVar (MVar, newMVar)
import Control.Concurrent.ResultShare
@ -41,6 +42,7 @@ import Control.Monad
import Control.Monad.Logger.CallStack
import Control.Monad.STM
import Control.Monad.Trans.Except
import Control.Monad.Trans.Maybe
import Data.Foldable
import Data.HashMap.Strict (HashMap)
import Data.Maybe
@ -79,7 +81,7 @@ data RoomMode
= RoomModeInstant
| RoomModeCached RoomModeDB
type ActorFetchShare site = ResultShare FedURI (Either (Maybe APGetError) (Entity RemoteActor)) (site, InstanceId)
type ActorFetchShare site = ResultShare FedURI (Either (Maybe APGetError) (Maybe (Entity RemoteActor))) (site, InstanceId)
class Yesod site => YesodRemoteActorStore site where
siteInstanceMutex :: site -> InstanceMutex
@ -469,18 +471,31 @@ actorFetchShareAction
, PersistConfigPool (SitePersistConfig site) ~ ConnectionPool
, PersistConfigBackend (SitePersistConfig site) ~ SqlPersistT
)
=> FedURI -> (site, InstanceId) -> IO (Either (Maybe APGetError) (Entity RemoteActor))
=> FedURI
-> (site, InstanceId)
-> IO (Either (Maybe APGetError) (Maybe (Entity RemoteActor)))
actorFetchShareAction u (site, iid) = flip runWorkerT site $ do
let (h, lu) = f2l u
mers <- runSiteDB $ getBy $ UniqueRemoteActor iid lu
case mers of
Just ers -> return $ Right ers
mrecip <- runSiteDB $ runMaybeT
$ Left <$> MaybeT (getBy $ UniqueRemoteActor iid lu)
<|> Right <$> MaybeT (getBy $ UniqueRemoteCollection iid lu)
case mrecip of
Just recip ->
return $ Right $
case recip of
Left ers -> Just ers
Right _ -> Nothing
Nothing -> do
manager <- asksSite getHttpManager
eactor <- fetchAPID' manager actorId h lu
for eactor $ \ actor -> runSiteDB $
erecip <- fetchRecipient manager h lu
for erecip $ \ recip ->
case recip of
RecipientActor actor -> runSiteDB $
let ra = RemoteActor lu iid (actorInbox actor) Nothing
in either id (flip Entity ra) <$> insertBy' ra
in Just . either id (flip Entity ra) <$> insertBy' ra
RecipientCollection _ -> runSiteDB $ do
insertUnique_ $ RemoteCollection iid lu
return Nothing
fetchRemoteActor
:: ( YesodPersist site
@ -493,11 +508,17 @@ fetchRemoteActor
, PersistConfigPool (SitePersistConfig site) ~ ConnectionPool
, PersistConfigBackend (SitePersistConfig site) ~ SqlPersistT
)
=> InstanceId -> Text -> LocalURI -> m (Either SomeException (Either (Maybe APGetError) (Entity RemoteActor)))
=> InstanceId -> Text -> LocalURI -> m (Either SomeException (Either (Maybe APGetError) (Maybe (Entity RemoteActor))))
fetchRemoteActor iid host luActor = do
mers <- runSiteDB $ getBy $ UniqueRemoteActor iid luActor
case mers of
Just ers -> return $ Right $ Right ers
mrecip <- runSiteDB $ runMaybeT
$ Left <$> MaybeT (getBy $ UniqueRemoteActor iid luActor)
<|> Right <$> MaybeT (getBy $ UniqueRemoteCollection iid luActor)
case mrecip of
Just recip ->
return $ Right $ Right $
case recip of
Left ers -> Just ers
Right _ -> Nothing
Nothing -> do
site <- askSite
liftIO $ runShared (siteActorFetchShare site) (l2f host luActor) (site, iid)

View file

@ -31,6 +31,9 @@ module Web.ActivityPub
, Owner (..)
, PublicKey (..)
, Actor (..)
, CollectionType (..)
, Collection (..)
, Recipient (..)
-- * Activity
, Note (..)
@ -57,6 +60,7 @@ module Web.ActivityPub
, Fetched (..)
, fetchAPID
, fetchAPID'
, fetchRecipient
, keyListedByActor
, fetchUnknownKey
, fetchKnownPersonalKey
@ -327,6 +331,63 @@ instance ActivityPub Actor where
<> "inbox" .= l2f host inbox
<> "publicKey" `pair` encodePublicKeySet host pkeys
data CollectionType = CollectionTypeUnordered | CollectionTypeOrdered
instance FromJSON CollectionType where
parseJSON = withText "CollectionType" parse
where
parse "Collection" = pure CollectionTypeUnordered
parse "OrderedCollection" = pure CollectionTypeOrdered
parse t = fail $ "Unknown collection type: " ++ T.unpack t
instance ToJSON CollectionType where
toJSON = error "toJSON CollectionType"
toEncoding ct =
toEncoding $ case ct of
CollectionTypeUnordered -> "Collection" :: Text
CollectionTypeOrdered -> "OrderedCollection"
data Collection a = Collection
{ collectionId :: LocalURI
, collectionType :: CollectionType
, collectionTotalItems :: Maybe Int
, collectionCurrent :: Maybe LocalURI
, collectionFirst :: Maybe LocalURI
, collectionLast :: Maybe LocalURI
, collectionItems :: [a]
}
instance (FromJSON a, ToJSON a) => ActivityPub (Collection a) where
jsonldContext _ = ContextAS2
parseObject o = do
(host, id_) <- f2l <$> o .: "id"
fmap (host,) $
Collection id_
<$> o .: "type"
<*> o .:? "totalItems"
<*> withHostMaybe host (fmap f2l <$> o .:? "current")
<*> withHostMaybe host (fmap f2l <$> o .:? "first")
<*> withHostMaybe host (fmap f2l <$> o .:? "last")
<*> optional (o .: "items" <|> o .: "orderedItems") .!= []
toSeries host (Collection id_ typ total curr firzt last items)
= "id" .= l2f host id_
<> "type" .= typ
<> "totalItems" .=? total
<> "current" .=? (l2f host <$> curr)
<> "first" .=? (l2f host <$> firzt)
<> "last" .=? (l2f host <$> last)
<> "items" .=% items
data Recipient = RecipientActor Actor | RecipientCollection (Collection FedURI)
instance ActivityPub Recipient where
jsonldContext _ = ContextAS2
parseObject o =
second RecipientActor <$> parseObject o <|>
second RecipientCollection <$> parseObject o
toSeries h (RecipientActor a) = toSeries h a
toSeries h (RecipientCollection c) = toSeries h c
data Audience = Audience
{ audienceTo :: [FedURI]
, audienceBto :: [FedURI]
@ -372,11 +433,6 @@ encodeAudience (Audience to bto cc bcc aud nons)
<> "bcc" .=% bcc
<> "audience" .=% aud
<> (frg <> "nonActors") .=% nons
where
t .=% v =
if null v
then mempty
else t .= v
data Note = Note
{ noteId :: Maybe LocalURI
@ -394,7 +450,7 @@ withHost h a = do
then return v
else fail "URI host mismatch"
withHostM h a = do
withHostMaybe h a = do
mp <- a
for mp $ \ (h', v) ->
if h == h'
@ -409,7 +465,7 @@ instance ActivityPub Note where
(h, attrib) <- f2l <$> o .: "attributedTo"
fmap (h,) $
Note
<$> withHostM h (fmap f2l <$> o .:? "id")
<$> withHostMaybe h (fmap f2l <$> o .:? "id")
<*> pure attrib
<*> parseAudience o
<*> o .:? "inReplyTo"
@ -747,6 +803,12 @@ fetchAPID' m getId h lu = runExceptT $ do
then return v
else throwE Nothing
fetchRecipient :: MonadIO m => Manager -> Text -> LocalURI -> m (Either (Maybe APGetError) Recipient)
fetchRecipient m = fetchAPID' m getId
where
getId (RecipientActor a) = actorId a
getId (RecipientCollection c) = collectionId c
fetchAPID :: (MonadIO m, ActivityPub a) => Manager -> (a -> LocalURI) -> Text -> LocalURI -> m (Either String a)
fetchAPID m getId h lu = first showError <$> fetchAPID' m getId h lu
where