In outbox POST handler, run async delivery using Worker instead of Handler

Worker is enough and seems much simpler. forkHandler does stuff with
forkResourceT and more stuff that I don't exactly understand and which may
involve more resource allocation. I guess forkWorker would generally be the
preferred approach, and there are bugs with delivery leading to sudden
CPU/memory peaks forcing me to kill the process. Maybe not related, just
mentioning it ^_^
This commit is contained in:
fr33domlover 2019-05-10 04:36:21 +00:00
parent e29053145f
commit 770983e829
2 changed files with 32 additions and 13 deletions

View file

@ -1148,8 +1148,7 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c
(lmid, obid, doc, remotesHttp) <- case result of (lmid, obid, doc, remotesHttp) <- case result of
Left (FedError t) -> throwE t Left (FedError t) -> throwE t
Right r -> return r Right r -> return r
let handleDeliveryError e = logError $ "Outbox POST handler: delivery failed! " <> T.pack (displayException e) lift $ forkWorker "Outbox POST handler: async HTTP delivery" $ deliverRemoteHttp (furiHost uContext) obid doc remotesHttp
lift $ forkHandler handleDeliveryError $ deliverRemoteHttp (furiHost uContext) obid doc remotesHttp
return lmid return lmid
where where
verifyNothing :: Monad m => Maybe a -> e -> ExceptT e m () verifyNothing :: Monad m => Maybe a -> e -> ExceptT e m ()
@ -1547,7 +1546,7 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c
, [((InstanceId, Text), NonEmpty (UnfetchedRemoteActorId, LocalURI, UnlinkedDeliveryId))] , [((InstanceId, Text), NonEmpty (UnfetchedRemoteActorId, LocalURI, UnlinkedDeliveryId))]
, [((InstanceId, Text), NonEmpty (UnfetchedRemoteActorId, LocalURI, UnlinkedDeliveryId))] , [((InstanceId, Text), NonEmpty (UnfetchedRemoteActorId, LocalURI, UnlinkedDeliveryId))]
) )
-> Handler () -> Worker ()
deliverRemoteHttp hContext obid doc (fetched, unfetched, unknown) = do deliverRemoteHttp hContext obid doc (fetched, unfetched, unknown) = do
let deliver fwd h inbox = do let deliver fwd h inbox = do
let fwd' = if h == hContext then Just fwd else Nothing let fwd' = if h == hContext then Just fwd else Nothing
@ -1557,7 +1556,7 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c
traverse_ (fork . deliverUnfetched deliver now) unfetched traverse_ (fork . deliverUnfetched deliver now) unfetched
traverse_ (fork . deliverUnfetched deliver now) unknown traverse_ (fork . deliverUnfetched deliver now) unknown
where where
fork = forkHandler $ \ e -> logError $ "Outbox POST handler: delivery failed! " <> T.pack (displayException e) fork = forkWorker "Outbox POST handler: HTTP delivery"
deliverFetched deliver now ((_, h), recips@(r :| rs)) = do deliverFetched deliver now ((_, h), recips@(r :| rs)) = do
let (raid, luActor, luInbox, dlid) = r let (raid, luActor, luInbox, dlid) = r
(_, e) <- deliver luActor h luInbox (_, e) <- deliver luActor h luInbox
@ -1574,12 +1573,12 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c
else Just False else Just False
Right _resp -> return $ Just True Right _resp -> return $ Just True
case e' of case e' of
Nothing -> runDB $ do Nothing -> runSiteDB $ do
let recips' = NE.toList recips let recips' = NE.toList recips
updateWhere [RemoteActorId <-. map fst4 recips', RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now] updateWhere [RemoteActorId <-. map fst4 recips', RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
updateWhere [DeliveryId <-. map fourth4 recips'] [DeliveryRunning =. False] updateWhere [DeliveryId <-. map fourth4 recips'] [DeliveryRunning =. False]
Just success -> do Just success -> do
runDB $ runSiteDB $
if success if success
then delete dlid then delete dlid
else do else do
@ -1588,7 +1587,7 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c
for_ rs $ \ (raid, luActor, luInbox, dlid) -> for_ rs $ \ (raid, luActor, luInbox, dlid) ->
fork $ do fork $ do
(_, e) <- deliver luActor h luInbox (_, e) <- deliver luActor h luInbox
runDB $ runSiteDB $
case e of case e of
Left err -> do Left err -> do
logError $ T.concat logError $ T.concat
@ -1610,7 +1609,7 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c
else Just Nothing else Just Nothing
Right (Right era) -> Just $ Just era Right (Right era) -> Just $ Just era
case e' of case e' of
Nothing -> runDB $ do Nothing -> runSiteDB $ do
let recips' = NE.toList recips let recips' = NE.toList recips
updateWhere [UnfetchedRemoteActorId <-. map fst3 recips', UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now] updateWhere [UnfetchedRemoteActorId <-. map fst3 recips', UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now]
updateWhere [UnlinkedDeliveryId <-. map thd3 recips'] [UnlinkedDeliveryRunning =. False] updateWhere [UnlinkedDeliveryId <-. map thd3 recips'] [UnlinkedDeliveryRunning =. False]
@ -1621,23 +1620,23 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c
case e of case e of
Right (Right (Entity raid ra)) -> do Right (Right (Entity raid ra)) -> do
(fwd, e') <- deliver luActor h $ remoteActorInbox ra (fwd, e') <- deliver luActor h $ remoteActorInbox ra
runDB $ runSiteDB $
case e' of case e' of
Left _ -> do Left _ -> do
updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now] updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
delete udlid delete udlid
insert_ $ Delivery raid obid fwd False insert_ $ Delivery raid obid fwd False
Right _ -> delete udlid Right _ -> delete udlid
_ -> runDB $ do _ -> runSiteDB $ do
updateWhere [UnfetchedRemoteActorId ==. uraid, UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now] updateWhere [UnfetchedRemoteActorId ==. uraid, UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now]
update udlid [UnlinkedDeliveryRunning =. False] update udlid [UnlinkedDeliveryRunning =. False]
case mera of case mera of
Nothing -> runDB $ do Nothing -> runSiteDB $ do
updateWhere [UnfetchedRemoteActorId ==. uraid, UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now] updateWhere [UnfetchedRemoteActorId ==. uraid, UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now]
update udlid [UnlinkedDeliveryRunning =. False] update udlid [UnlinkedDeliveryRunning =. False]
Just (Entity raid ra) -> do Just (Entity raid ra) -> do
(fwd, e'') <- deliver luActor h $ remoteActorInbox ra (fwd, e'') <- deliver luActor h $ remoteActorInbox ra
runDB $ runSiteDB $
case e'' of case e'' of
Left _ -> do Left _ -> do
updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now] updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]

View file

@ -25,6 +25,7 @@ module Yesod.MonadSite
, runWorkerT , runWorkerT
, WorkerFor , WorkerFor
, runWorker , runWorker
, forkWorker
) )
where where
@ -42,10 +43,12 @@ import Data.Text (Text)
import Database.Persist.Sql import Database.Persist.Sql
import UnliftIO.Async import UnliftIO.Async
import UnliftIO.Concurrent import UnliftIO.Concurrent
import Yesod.Core import Yesod.Core hiding (logError)
import Yesod.Core.Types import Yesod.Core.Types
import Yesod.Persist.Core import Yesod.Persist.Core
import qualified Data.Text as T
class PersistConfig (SitePersistConfig site) => Site site where class PersistConfig (SitePersistConfig site) => Site site where
type SitePersistConfig site type SitePersistConfig site
siteApproot :: site -> Text siteApproot :: site -> Text
@ -123,3 +126,20 @@ type WorkerFor site = WorkerT site IO
runWorker :: (Yesod site, Site site) => WorkerFor site a -> site -> IO a runWorker :: (Yesod site, Site site) => WorkerFor site a -> site -> IO a
runWorker = runWorkerT runWorker = runWorkerT
forkWorker
:: (MonadSite m, Yesod site, Site site, SiteEnv m ~ site)
=> Text
-> WorkerFor site ()
-> m ()
forkWorker err worker = do
site <- askSite
void $ liftIO $ forkFinally (runWorker worker site) (handler site)
where
handler site r = flip runWorker site $
case r of
Left e ->
logError $
"Worker thread threw exception: " <> err <> ": " <>
T.pack (displayException e)
Right _ -> return ()