Run the delivery worker priodically, settings control how often to run

This commit is contained in:
fr33domlover 2019-04-18 10:38:01 +00:00
parent c9c7da5902
commit f37b9b3f52
12 changed files with 285 additions and 109 deletions

View file

@ -155,3 +155,8 @@ reject-on-max-keys: true
drop-delivery-after: drop-delivery-after:
amount: 25 amount: 25
unit: weeks unit: weeks
# How often to retry failed deliveries
retry-delivery-every:
amount: 1
unit: hours

View file

@ -15,13 +15,17 @@
module Control.Concurrent.Local module Control.Concurrent.Local
( forkCheck ( forkCheck
, periodically
) )
where where
import Prelude import Prelude
import Control.Concurrent import Control.Concurrent
import Control.Monad
import Control.Monad.IO.Class
import Data.Functor (void) import Data.Functor (void)
import Data.Time.Interval
-- | Like 'forkIO', but if the new thread terminates with an exception, -- | Like 'forkIO', but if the new thread terminates with an exception,
-- re-throw it in the current thread. -- re-throw it in the current thread.
@ -29,3 +33,12 @@ forkCheck :: IO () -> IO ()
forkCheck run = do forkCheck run = do
tid <- myThreadId tid <- myThreadId
void $ forkFinally run $ either (throwTo tid) (const $ return ()) void $ forkFinally run $ either (throwTo tid) (const $ return ())
periodically :: MonadIO m => TimeInterval -> m () -> m ()
periodically interval action =
let micros = microseconds interval
in if 0 < micros && micros <= toInteger (maxBound :: Int)
then
let micros' = fromInteger micros
in forever $ liftIO (threadDelay micros') >> action
else error $ "periodically: interval out of range: " ++ show micros

View file

@ -32,8 +32,7 @@
-- * It could be nice to provide defaults for plain IO and for UnliftIO -- * It could be nice to provide defaults for plain IO and for UnliftIO
-- * The action is constant, could make it more flexible -- * The action is constant, could make it more flexible
module Control.Concurrent.ResultShare module Control.Concurrent.ResultShare
( ResultShareSettings (..) ( ResultShare ()
, ResultShare ()
, newResultShare , newResultShare
, runShared , runShared
) )
@ -41,7 +40,7 @@ where
import Prelude import Prelude
import Control.Concurrent.MVar import Control.Concurrent
import Control.Concurrent.STM.TVar import Control.Concurrent.STM.TVar
import Control.Monad import Control.Monad
import Control.Monad.IO.Class import Control.Monad.IO.Class
@ -51,22 +50,16 @@ import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as M import qualified Data.HashMap.Strict as M
data ResultShareSettings m k v a = ResultShareSettings data ResultShare k v a = ResultShare
{ resultShareFork :: m () -> m ()
, resultShareAction :: k -> a -> m v
}
data ResultShare m k v a = ResultShare
{ _rsMap :: TVar (HashMap k (MVar v)) { _rsMap :: TVar (HashMap k (MVar v))
, _rsFork :: m () -> m () , _rsAction :: k -> a -> IO v
, _rsAction :: k -> a -> m v
} }
newResultShare newResultShare
:: MonadIO n => ResultShareSettings m k v a -> n (ResultShare m k v a) :: MonadIO m => (k -> a -> IO v) -> m (ResultShare k v a)
newResultShare (ResultShareSettings fork action) = do newResultShare action = do
tvar <- liftIO $ newTVarIO M.empty tvar <- liftIO $ newTVarIO M.empty
return $ ResultShare tvar fork action return $ ResultShare tvar action
-- TODO this is copied from stm-2.5, remove when we upgrade LTS -- TODO this is copied from stm-2.5, remove when we upgrade LTS
stateTVar :: TVar s -> (s -> (a, s)) -> STM a stateTVar :: TVar s -> (s -> (a, s)) -> STM a
@ -77,9 +70,9 @@ stateTVar var f = do
return a return a
runShared runShared
:: (MonadIO m, Eq k, Hashable k) => ResultShare m k v a -> k -> a -> m v :: (MonadIO m, Eq k, Hashable k) => ResultShare k v a -> k -> a -> m v
runShared (ResultShare tvar fork action) key param = do runShared (ResultShare tvar action) key param = liftIO $ do
(mvar, new) <- liftIO $ do (mvar, new) <- do
existing <- M.lookup key <$> readTVarIO tvar existing <- M.lookup key <$> readTVarIO tvar
case existing of case existing of
Just v -> return (v, False) Just v -> return (v, False)
@ -89,9 +82,8 @@ runShared (ResultShare tvar fork action) key param = do
case M.lookup key m of case M.lookup key m of
Just v' -> ((v', False), m) Just v' -> ((v', False), m)
Nothing -> ((v , True) , M.insert key v m) Nothing -> ((v , True) , M.insert key v m)
when new $ fork $ do when new $ void $ forkIO $ do
result <- action key param result <- action key param
liftIO $ do
atomically $ modifyTVar' tvar $ M.delete key atomically $ modifyTVar' tvar $ M.delete key
putMVar mvar result putMVar mvar result
liftIO $ readMVar mvar readMVar mvar

View file

@ -44,6 +44,8 @@ import qualified Data.ByteString as B (writeFile, readFile)
import Crypto.PublicVerifKey import Crypto.PublicVerifKey
import Data.KeyFile import Data.KeyFile
import Control.Concurrent.Local
-- | Ed25519 signing key, we generate it on the server and use for signing. We -- | Ed25519 signing key, we generate it on the server and use for signing. We
-- also make its public key available to whoever wishes to verify our -- also make its public key available to whoever wishes to verify our
-- signatures. -- signatures.
@ -157,22 +159,13 @@ generateActorKey = mk <$> generateSecretKey
-- storing them in a 'TVar'. It manages a pait of keys, and each time it toggles -- storing them in a 'TVar'. It manages a pait of keys, and each time it toggles
-- which key gets rotated. -- which key gets rotated.
actorKeyRotator :: TimeInterval -> TVar (ActorKey, ActorKey, Bool) -> IO () actorKeyRotator :: TimeInterval -> TVar (ActorKey, ActorKey, Bool) -> IO ()
actorKeyRotator interval keys = actorKeyRotator interval keys = periodically interval $ do
let micros = microseconds interval
in if 0 < micros && micros <= toInteger (maxBound :: Int)
then
let micros' = fromInteger micros
in forever $ do
threadDelay micros'
fresh <- generateActorKey fresh <- generateActorKey
atomically $ atomically $
modifyTVar' keys $ \ (k1, k2, new1) -> modifyTVar' keys $ \ (k1, k2, new1) ->
if new1 if new1
then (k1 , fresh, False) then (k1 , fresh, False)
else (fresh, k2 , True) else (fresh, k2 , True)
else
error $
"actorKeyRotator: interval out of range: " ++ show micros
actorKeyPublicBin :: ActorKey -> PublicVerifKey actorKeyPublicBin :: ActorKey -> PublicVerifKey
actorKeyPublicBin = fromEd25519 . actorKeyPublic actorKeyPublicBin = fromEd25519 . actorKeyPublic

View file

@ -56,13 +56,13 @@ import Yesod.Mail.Send (runMailer)
import qualified Data.Text as T (unpack) import qualified Data.Text as T (unpack)
import qualified Data.HashMap.Strict as M (empty) import qualified Data.HashMap.Strict as M (empty)
import Control.Concurrent.Local (forkCheck)
import Database.Persist.Schema.PostgreSQL (schemaBackend) import Database.Persist.Schema.PostgreSQL (schemaBackend)
import Control.Concurrent.ResultShare import Control.Concurrent.ResultShare
import Data.KeyFile import Data.KeyFile
import Yesod.MonadSite
import Control.Concurrent.Local
import Web.Hashids.Local import Web.Hashids.Local
import Vervis.ActorKey (generateActorKey, actorKeyRotator) import Vervis.ActorKey (generateActorKey, actorKeyRotator)
@ -128,7 +128,7 @@ makeFoundation appSettings = do
appInstanceMutex <- newInstanceMutex appInstanceMutex <- newInstanceMutex
appActorFetchShare <- newResultShare actorFetchShareSettings appActorFetchShare <- newResultShare actorFetchShareAction
appActivities <- newTVarIO mempty appActivities <- newTVarIO mempty
@ -239,6 +239,11 @@ actorKeyPeriodicRotator :: App -> IO ()
actorKeyPeriodicRotator app = actorKeyPeriodicRotator app =
actorKeyRotator (appActorKeyRotation $ appSettings app) (appActorKeys app) actorKeyRotator (appActorKeyRotation $ appSettings app) (appActorKeys app)
deliveryRunner :: App -> IO ()
deliveryRunner app =
let interval = appDeliveryRetryFreq $ appSettings app
in runWorker (periodically interval retryOutboxDelivery) app
sshServer :: App -> IO () sshServer :: App -> IO ()
sshServer foundation = sshServer foundation =
runSsh runSsh
@ -280,6 +285,9 @@ appMain = do
-- Run actor signature key periodic generation thread -- Run actor signature key periodic generation thread
forkCheck $ actorKeyPeriodicRotator foundation forkCheck $ actorKeyPeriodicRotator foundation
-- Run periodic activity delivery retry runner
forkCheck $ deliveryRunner foundation
-- Run SSH server -- Run SSH server
forkCheck $ sshServer foundation forkCheck $ sshServer foundation

View file

@ -34,6 +34,7 @@ import Control.Monad.Trans.Maybe
import Control.Monad.Trans.Reader import Control.Monad.Trans.Reader
import Data.Aeson (Object) import Data.Aeson (Object)
import Data.Bifunctor import Data.Bifunctor
import Data.ByteString (ByteString)
import Data.Either import Data.Either
import Data.Foldable import Data.Foldable
import Data.Function import Data.Function
@ -49,6 +50,7 @@ import Data.Tuple
import Database.Persist hiding (deleteBy) import Database.Persist hiding (deleteBy)
import Database.Persist.Sql hiding (deleteBy) import Database.Persist.Sql hiding (deleteBy)
import Network.HTTP.Client import Network.HTTP.Client
import Network.HTTP.Signature
import Network.HTTP.Types.Header import Network.HTTP.Types.Header
import Network.HTTP.Types.URI import Network.HTTP.Types.URI
import Network.TLS import Network.TLS
@ -69,6 +71,7 @@ import Web.ActivityPub
import Yesod.Auth.Unverified import Yesod.Auth.Unverified
import Yesod.FedURI import Yesod.FedURI
import Yesod.Hashids import Yesod.Hashids
import Yesod.MonadSite
import Data.Either.Local import Data.Either.Local
import Data.List.Local import Data.List.Local
@ -356,17 +359,26 @@ newtype FedError = FedError Text deriving Show
instance Exception FedError instance Exception FedError
getHttpSign
:: (MonadSite m, SiteEnv m ~ App) => m (ByteString -> (KeyId, Signature))
getHttpSign = do getHttpSign = do
(akey1, akey2, new1) <- liftIO . readTVarIO =<< getsYesod appActorKeys (akey1, akey2, new1) <- liftIO . readTVarIO =<< asksSite appActorKeys
renderUrl <- getUrlRender renderUrl <- askUrlRender
let (keyID, akey) = let (keyID, akey) =
if new1 if new1
then (renderUrl ActorKey1R, akey1) then (renderUrl ActorKey1R, akey1)
else (renderUrl ActorKey2R, akey2) else (renderUrl ActorKey2R, akey2)
return $ \ b -> (KeyId $ encodeUtf8 keyID, actorKeySign akey b) return $ \ b -> (KeyId $ encodeUtf8 keyID, actorKeySign akey b)
deliverHttp
:: (MonadSite m, SiteEnv m ~ App)
=> (ByteString -> (KeyId, Signature))
-> Doc Activity
-> Text
-> LocalURI
-> m (Either APPostError (Response ()))
deliverHttp sign doc h luInbox = do deliverHttp sign doc h luInbox = do
manager <- getsYesod appHttpManager manager <- asksSite appHttpManager
let inbox = l2f h luInbox let inbox = l2f h luInbox
headers = hRequestTarget :| [hHost, hDate, hActivityPubActor] headers = hRequestTarget :| [hHost, hDate, hActivityPubActor]
httpPostAP manager inbox headers sign docActor doc httpPostAP manager inbox headers sign docActor doc
@ -965,11 +977,12 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c
let (uraid, luActor, udlid) = r let (uraid, luActor, udlid) = r
e <- fetchRemoteActor iid h luActor e <- fetchRemoteActor iid h luActor
let e' = case e of let e' = case e of
Left err -> Left err -> Just Nothing
Right (Left err) ->
if isInstanceErrorG err if isInstanceErrorG err
then Nothing then Nothing
else Just Nothing else Just Nothing
Right era -> Just $ Just era Right (Right era) -> Just $ Just era
case e' of case e' of
Nothing -> runDB $ do Nothing -> runDB $ do
let recips' = NE.toList recips let recips' = NE.toList recips
@ -980,10 +993,7 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c
fork $ do fork $ do
e <- fetchRemoteActor iid h luActor e <- fetchRemoteActor iid h luActor
case e of case e of
Left _ -> runDB $ do Right (Right (Entity raid ra)) -> do
updateWhere [UnfetchedRemoteActorId ==. uraid, UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now]
update udlid [UnlinkedDeliveryRunning =. False]
Right (Entity raid ra) -> do
e' <- deliver h $ remoteActorInbox ra e' <- deliver h $ remoteActorInbox ra
runDB $ runDB $
case e' of case e' of
@ -992,6 +1002,9 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c
delete udlid delete udlid
insert_ $ Delivery raid obid False insert_ $ Delivery raid obid False
Right _ -> delete udlid Right _ -> delete udlid
_ -> runDB $ do
updateWhere [UnfetchedRemoteActorId ==. uraid, UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now]
update udlid [UnlinkedDeliveryRunning =. False]
case mera of case mera of
Nothing -> runDB $ do Nothing -> runDB $ do
updateWhere [UnfetchedRemoteActorId ==. uraid, UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now] updateWhere [UnfetchedRemoteActorId ==. uraid, UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now]
@ -1006,10 +1019,10 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c
insert_ $ Delivery raid obid False insert_ $ Delivery raid obid False
Right _ -> delete udlid Right _ -> delete udlid
retryOutboxDelivery :: Handler () retryOutboxDelivery :: Worker ()
retryOutboxDelivery = do retryOutboxDelivery = do
now <- liftIO getCurrentTime now <- liftIO $ getCurrentTime
(udls, dls) <- runDB $ do (udls, dls) <- runSiteDB $ do
-- Get all unlinked deliveries which aren't running already in outbox -- Get all unlinked deliveries which aren't running already in outbox
-- post handlers -- post handlers
unlinked' <- E.select $ E.from $ \ (udl `E.InnerJoin` ob `E.InnerJoin` ura `E.InnerJoin` i `E.LeftOuterJoin` ra) -> do unlinked' <- E.select $ E.from $ \ (udl `E.InnerJoin` ob `E.InnerJoin` ura `E.InnerJoin` i `E.LeftOuterJoin` ra) -> do
@ -1043,7 +1056,7 @@ retryOutboxDelivery = do
-- We're left with the lonely ones. We'll check which actors have been -- We're left with the lonely ones. We'll check which actors have been
-- unreachable for too long, and we'll delete deliveries for them. The -- unreachable for too long, and we'll delete deliveries for them. The
-- rest of the actors we'll try to reach by HTTP. -- rest of the actors we'll try to reach by HTTP.
dropAfter <- getsYesod $ appDropDeliveryAfter . appSettings dropAfter <- lift $ asksSite $ appDropDeliveryAfter . appSettings
let (lonelyOld, lonelyNew) = partitionEithers $ map (decideBySinceUDL dropAfter now) lonely let (lonelyOld, lonelyNew) = partitionEithers $ map (decideBySinceUDL dropAfter now) lonely
deleteWhere [UnlinkedDeliveryId <-. lonelyOld] deleteWhere [UnlinkedDeliveryId <-. lonelyOld]
-- Now let's grab the linked deliveries, and similarly delete old ones -- Now let's grab the linked deliveries, and similarly delete old ones
@ -1115,14 +1128,14 @@ retryOutboxDelivery = do
= map (second $ groupWithExtractBy1 ((==) `on` fst) fst snd) = map (second $ groupWithExtractBy1 ((==) `on` fst) fst snd)
. groupWithExtractBy ((==) `on` fst) fst snd . groupWithExtractBy ((==) `on` fst) fst snd
fork action = do fork action = do
mvar <- liftIO newEmptyMVar wait <- asyncSite action
let handle e = do return $ do
liftIO $ putMVar mvar False result <- wait
case result of
Left e -> do
logError $ "Periodic delivery error! " <> T.pack (displayException e) logError $ "Periodic delivery error! " <> T.pack (displayException e)
forkHandler handle $ do return False
success <- action Right success -> return success
liftIO $ putMVar mvar success
return $ liftIO $ readMVar mvar
deliverLinked deliver now ((_, h), recips) = do deliverLinked deliver now ((_, h), recips) = do
waitsR <- for recips $ \ ((raid, inbox), delivs) -> fork $ do waitsR <- for recips $ \ ((raid, inbox), delivs) -> fork $ do
waitsD <- for delivs $ \ (dlid, doc) -> fork $ do waitsD <- for delivs $ \ (dlid, doc) -> fork $ do
@ -1130,10 +1143,10 @@ retryOutboxDelivery = do
case e of case e of
Left _err -> return False Left _err -> return False
Right _resp -> do Right _resp -> do
runDB $ delete dlid runSiteDB $ delete dlid
return True return True
results <- sequence waitsD results <- sequence waitsD
runDB $ runSiteDB $
if and results if and results
then update raid [RemoteActorErrorSince =. Nothing] then update raid [RemoteActorErrorSince =. Nothing]
else if or results else if or results
@ -1148,26 +1161,26 @@ retryOutboxDelivery = do
waitsR <- for recips $ \ ((uraid, luRecip), delivs) -> fork $ do waitsR <- for recips $ \ ((uraid, luRecip), delivs) -> fork $ do
e <- fetchRemoteActor iid h luRecip e <- fetchRemoteActor iid h luRecip
case e of case e of
Left _ -> runDB $ updateWhere [UnfetchedRemoteActorId ==. uraid, UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now] Right (Right (Entity raid ra)) -> do
Right (Entity raid ra) -> do
waitsD <- for delivs $ \ (udlid, obid, doc) -> fork $ do waitsD <- for delivs $ \ (udlid, obid, doc) -> fork $ do
e' <- deliver doc h $ remoteActorInbox ra e' <- deliver doc h $ remoteActorInbox ra
case e' of case e' of
Left _err -> do Left _err -> do
runDB $ do runSiteDB $ do
delete udlid delete udlid
insert_ $ Delivery raid obid False insert_ $ Delivery raid obid False
return False return False
Right _resp -> do Right _resp -> do
runDB $ delete udlid runSiteDB $ delete udlid
return True return True
results <- sequence waitsD results <- sequence waitsD
runDB $ runSiteDB $
if and results if and results
then update raid [RemoteActorErrorSince =. Nothing] then update raid [RemoteActorErrorSince =. Nothing]
else if or results else if or results
then update raid [RemoteActorErrorSince =. Just now] then update raid [RemoteActorErrorSince =. Just now]
else updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now] else updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
_ -> runSiteDB $ updateWhere [UnfetchedRemoteActorId ==. uraid, UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now]
return True return True
results <- sequence waitsR results <- sequence waitsR
unless (and results) $ unless (and results) $

View file

@ -32,6 +32,7 @@ import Data.PEM (pemContent)
import Data.Text.Encoding (decodeUtf8') import Data.Text.Encoding (decodeUtf8')
import Data.Time.Interval (TimeInterval, fromTimeUnit, toTimeUnit) import Data.Time.Interval (TimeInterval, fromTimeUnit, toTimeUnit)
import Data.Time.Units (Second, Minute, Day) import Data.Time.Units (Second, Minute, Day)
import Database.Persist.Postgresql
import Database.Persist.Sql (ConnectionPool, runSqlPool) import Database.Persist.Sql (ConnectionPool, runSqlPool)
import Graphics.SVGFonts.ReadFont (PreparedFont) import Graphics.SVGFonts.ReadFont (PreparedFont)
import Network.HTTP.Client import Network.HTTP.Client
@ -70,6 +71,7 @@ import Network.FedURI
import Web.ActivityAccess import Web.ActivityAccess
import Web.ActivityPub import Web.ActivityPub
import Yesod.Hashids import Yesod.Hashids
import Yesod.MonadSite
import Text.Email.Local import Text.Email.Local
import Text.Jasmine.Local (discardm) import Text.Jasmine.Local (discardm)
@ -105,7 +107,7 @@ data App = App
, appInstanceMutex :: InstanceMutex , appInstanceMutex :: InstanceMutex
, appCapSignKey :: AccessTokenSecretKey , appCapSignKey :: AccessTokenSecretKey
, appHashidsContext :: HashidsContext , appHashidsContext :: HashidsContext
, appActorFetchShare :: ResultShare (HandlerFor App) FedURI (Either (Maybe APGetError) (Entity RemoteActor)) InstanceId , appActorFetchShare :: ActorFetchShare App
, appActivities :: TVar (Vector (UTCTime, ActivityReport)) , appActivities :: TVar (Vector (UTCTime, ActivityReport))
} }
@ -135,14 +137,23 @@ type Form a = Html -> MForm (HandlerT App IO) (FormResult a, Widget)
type AppDB = YesodDB App type AppDB = YesodDB App
type Worker = WorkerFor App
type WorkerDB = PersistConfigBackend (SitePersistConfig App) Worker
instance Site App where
type SitePersistConfig App = PostgresConf
siteApproot = ("https://" <>) . appInstanceHost . appSettings
sitePersistConfig = appDatabaseConf . appSettings
sitePersistPool = appConnPool
siteLogger = appLogger
-- Please see the documentation for the Yesod typeclass. There are a number -- Please see the documentation for the Yesod typeclass. There are a number
-- of settings which can be configured by overriding methods here. -- of settings which can be configured by overriding methods here.
instance Yesod App where instance Yesod App where
-- Controls the base of generated URLs. For more information on modifying, -- Controls the base of generated URLs. For more information on modifying,
-- see: https://github.com/yesodweb/yesod/wiki/Overriding-approot -- see: https://github.com/yesodweb/yesod/wiki/Overriding-approot
approot = ApprootMaster $ mkroot . appInstanceHost . appSettings approot = ApprootMaster siteApproot
where
mkroot h = "https://" <> h
-- Store session data on the client in encrypted cookies, -- Store session data on the client in encrypted cookies,
-- default session idle timeout is 120 minutes -- default session idle timeout is 120 minutes
@ -445,9 +456,7 @@ instance Yesod App where
-- How to run database actions. -- How to run database actions.
instance YesodPersist App where instance YesodPersist App where
type YesodPersistBackend App = SqlBackend type YesodPersistBackend App = SqlBackend
runDB action = do runDB = runSiteDB
master <- getYesod
runSqlPool action $ appConnPool master
instance YesodPersistRunner App where instance YesodPersistRunner App where
getDBRunner = defaultGetDBRunner appConnPool getDBRunner = defaultGetDBRunner appConnPool

View file

@ -296,15 +296,18 @@ postOutboxR shr = do
iid <- runDB $ either entityKey id <$> insertBy' (Instance h) iid <- runDB $ either entityKey id <$> insertBy' (Instance h)
result <- fetchRemoteActor iid h lto result <- fetchRemoteActor iid h lto
case result of case result of
Left err -> do Left err -> setErrorMsg $ displayException err
Right (Left err) -> setErrorMsg $ show err
Right (Right (Entity _ ra)) -> return $ Just $ remoteActorInbox ra
where
setErrorMsg err = do
setMessage $ toHtml $ T.concat setMessage $ toHtml $ T.concat
[ "Tried to fetch recipient actor <" [ "Tried to fetch recipient actor <"
, renderFedURI $ l2f h lto , renderFedURI $ l2f h lto
, "> and got an error: " , "> and got an error: "
, T.pack (show err) , T.pack err
] ]
return Nothing return Nothing
Right (Entity _ ra) -> return $ Just $ remoteActorInbox ra
getActorKey :: ((ActorKey, ActorKey, Bool) -> ActorKey) -> Route App -> Handler TypedContent getActorKey :: ((ActorKey, ActorKey, Bool) -> ActorKey) -> Route App -> Handler TypedContent
getActorKey choose route = selectRep $ provideAP $ do getActorKey choose route = selectRep $ provideAP $ do

View file

@ -18,12 +18,13 @@
module Vervis.RemoteActorStore module Vervis.RemoteActorStore
( InstanceMutex () ( InstanceMutex ()
, newInstanceMutex , newInstanceMutex
, ActorFetchShare
, YesodRemoteActorStore (..) , YesodRemoteActorStore (..)
, withHostLock , withHostLock
, keyListedByActorShared , keyListedByActorShared
, VerifKeyDetail (..) , VerifKeyDetail (..)
, addVerifKey , addVerifKey
, actorFetchShareSettings , actorFetchShareAction
, fetchRemoteActor , fetchRemoteActor
, deleteUnusedURAs , deleteUnusedURAs
) )
@ -31,6 +32,7 @@ where
import Prelude import Prelude
import Control.Concurrent (forkIO)
import Control.Concurrent.MVar (MVar, newMVar) import Control.Concurrent.MVar (MVar, newMVar)
import Control.Concurrent.ResultShare import Control.Concurrent.ResultShare
import Control.Concurrent.STM.TVar import Control.Concurrent.STM.TVar
@ -60,6 +62,7 @@ import Crypto.PublicVerifKey
import Database.Persist.Local import Database.Persist.Local
import Network.FedURI import Network.FedURI
import Web.ActivityPub import Web.ActivityPub
import Yesod.MonadSite
import Vervis.Model import Vervis.Model
@ -76,13 +79,15 @@ data RoomMode
= RoomModeInstant = RoomModeInstant
| RoomModeCached RoomModeDB | RoomModeCached RoomModeDB
type ActorFetchShare site = ResultShare FedURI (Either SomeException (Either (Maybe APGetError) (Entity RemoteActor))) (site, InstanceId)
class Yesod site => YesodRemoteActorStore site where class Yesod site => YesodRemoteActorStore site where
siteInstanceMutex :: site -> InstanceMutex siteInstanceMutex :: site -> InstanceMutex
siteInstanceRoomMode :: site -> Maybe Int siteInstanceRoomMode :: site -> Maybe Int
siteActorRoomMode :: site -> Maybe Int siteActorRoomMode :: site -> Maybe Int
siteRejectOnMaxKeys :: site -> Bool siteRejectOnMaxKeys :: site -> Bool
siteActorFetchShare :: site -> ResultShare (HandlerFor site) FedURI (Either (Maybe APGetError) (Entity RemoteActor)) InstanceId siteActorFetchShare :: site -> ActorFetchShare site
-- TODO this is copied from stm-2.5, remove when we upgrade LTS -- TODO this is copied from stm-2.5, remove when we upgrade LTS
stateTVar :: TVar s -> (s -> (a, s)) -> STM a stateTVar :: TVar s -> (s -> (a, s)) -> STM a
@ -454,42 +459,48 @@ addVerifKey h uinb vkd =
lift $ insert_ $ VerifKey luKey iid mexpires key (Just rsid) lift $ insert_ $ VerifKey luKey iid mexpires key (Just rsid)
return (iid, rsid) return (iid, rsid)
actorFetchShareSettings actorFetchShareAction
:: ( YesodPersist site :: ( Yesod site
, YesodPersist site
, PersistUniqueWrite (YesodPersistBackend site) , PersistUniqueWrite (YesodPersistBackend site)
, BaseBackend (YesodPersistBackend site) ~ SqlBackend , BaseBackend (YesodPersistBackend site) ~ SqlBackend
, HasHttpManager site , HasHttpManager site
, Site site
, PersistConfigPool (SitePersistConfig site) ~ ConnectionPool
, PersistConfigBackend (SitePersistConfig site) ~ SqlPersistT
) )
=> ResultShareSettings (HandlerFor site) FedURI (Either (Maybe APGetError) (Entity RemoteActor)) InstanceId => FedURI -> (site, InstanceId) -> IO (Either SomeException (Either (Maybe APGetError) (Entity RemoteActor)))
actorFetchShareSettings = ResultShareSettings actorFetchShareAction u (site, iid) = try $ flip runWorkerT site $ do
{ resultShareFork = forkHandler $ \ e -> logError $ "ActorFetchShare action failed! " <> T.pack (displayException e)
, resultShareAction = \ u iid -> do
let (h, lu) = f2l u let (h, lu) = f2l u
mers <- runDB $ getBy $ UniqueRemoteActor iid lu mers <- runSiteDB $ getBy $ UniqueRemoteActor iid lu
case mers of case mers of
Just ers -> return $ Right ers Just ers -> return $ Right ers
Nothing -> do Nothing -> do
manager <- getsYesod getHttpManager manager <- asksSite getHttpManager
eactor <- fetchAPID' manager actorId h lu eactor <- fetchAPID' manager actorId h lu
for eactor $ \ actor -> runDB $ for eactor $ \ actor -> runSiteDB $
let ra = RemoteActor lu iid (actorInbox actor) Nothing let ra = RemoteActor lu iid (actorInbox actor) Nothing
in either id (flip Entity ra) <$> insertBy' ra in either id (flip Entity ra) <$> insertBy' ra
}
fetchRemoteActor fetchRemoteActor
:: ( YesodPersist site :: ( YesodPersist site
, PersistUniqueRead (YesodPersistBackend site) , PersistUniqueRead (YesodPersistBackend site)
, BaseBackend (YesodPersistBackend site) ~ SqlBackend , BaseBackend (YesodPersistBackend site) ~ SqlBackend
, YesodRemoteActorStore site , YesodRemoteActorStore site
, MonadSite m
, SiteEnv m ~ site
, Site site
, PersistConfigPool (SitePersistConfig site) ~ ConnectionPool
, PersistConfigBackend (SitePersistConfig site) ~ SqlPersistT
) )
=> InstanceId -> Text -> LocalURI -> HandlerFor site (Either (Maybe APGetError) (Entity RemoteActor)) => InstanceId -> Text -> LocalURI -> m (Either SomeException (Either (Maybe APGetError) (Entity RemoteActor)))
fetchRemoteActor iid host luActor = do fetchRemoteActor iid host luActor = do
mers <- runDB $ getBy $ UniqueRemoteActor iid luActor mers <- runSiteDB $ getBy $ UniqueRemoteActor iid luActor
case mers of case mers of
Just ers -> return $ Right ers Just ers -> return $ Right $ Right ers
Nothing -> do Nothing -> do
afs <- getsYesod siteActorFetchShare site <- askSite
runShared afs (l2f host luActor) iid liftIO $ runShared (siteActorFetchShare site) (l2f host luActor) (site, iid)
deleteUnusedURAs = do deleteUnusedURAs = do
uraids <- E.select $ E.from $ \ ura -> do uraids <- E.select $ E.from $ \ ura -> do

View file

@ -146,6 +146,8 @@ data AppSettings = AppSettings
-- time, we stop trying to deliver and we remove them from follower lists -- time, we stop trying to deliver and we remove them from follower lists
-- of local actors. -- of local actors.
, appDropDeliveryAfter :: NominalDiffTime , appDropDeliveryAfter :: NominalDiffTime
-- | How much time to wait between retries of failed deliveries.
, appDeliveryRetryFreq :: TimeInterval
} }
instance FromJSON AppSettings where instance FromJSON AppSettings where
@ -193,6 +195,7 @@ instance FromJSON AppSettings where
appHashidsSaltFile <- o .: "hashids-salt-file" appHashidsSaltFile <- o .: "hashids-salt-file"
appRejectOnMaxKeys <- o .: "reject-on-max-keys" appRejectOnMaxKeys <- o .: "reject-on-max-keys"
appDropDeliveryAfter <- ndt <$> o .: "drop-delivery-after" appDropDeliveryAfter <- ndt <$> o .: "drop-delivery-after"
appDeliveryRetryFreq <- interval <$> o .: "retry-delivery-every"
return AppSettings {..} return AppSettings {..}
where where

125
src/Yesod/MonadSite.hs Normal file
View file

@ -0,0 +1,125 @@
{- This file is part of Vervis.
-
- Written in 2019 by fr33domlover <fr33domlover@riseup.net>.
-
- Copying is an act of love. Please copy, reuse and share.
-
- The author(s) have dedicated all copyright and related and neighboring
- rights to this software to the public domain worldwide. This software is
- distributed without any warranty.
-
- You should have received a copy of the CC0 Public Domain Dedication along
- with this software. If not, see
- <http://creativecommons.org/publicdomain/zero/1.0/>.
-}
-- | A typeclass providing a subset of what 'HandlerFor' does, allowing to
-- write monadic actions that can run both inside a request handler and outside
-- of the web server context.
module Yesod.MonadSite
( Site (..)
, MonadSite (..)
, asksSite
, runSiteDB
, WorkerT ()
, runWorkerT
, WorkerFor
, runWorker
)
where
import Prelude
import Control.Exception
import Control.Monad.Fail
import Control.Monad.IO.Class
import Control.Monad.IO.Unlift
import Control.Monad.Logger.CallStack
import Control.Monad.Trans.Class
import Control.Monad.Trans.Reader
import Data.Functor
import Data.Text (Text)
import Database.Persist.Sql
import UnliftIO.Async
import UnliftIO.Concurrent
import Yesod.Core
import Yesod.Core.Types
import Yesod.Persist.Core
class PersistConfig (SitePersistConfig site) => Site site where
type SitePersistConfig site
siteApproot :: site -> Text
sitePersistConfig :: site -> SitePersistConfig site
sitePersistPool :: site -> PersistConfigPool (SitePersistConfig site)
siteLogger :: site -> Logger
class (MonadUnliftIO m, MonadLogger m) => MonadSite m where
type SiteEnv m
askSite :: m (SiteEnv m)
askUrlRender :: m (Route (SiteEnv m) -> Text)
forkSite :: (SomeException -> m ()) -> m () -> m ()
asyncSite :: m a -> m (m (Either SomeException a))
asksSite :: MonadSite m => (SiteEnv m -> a) -> m a
asksSite f = f <$> askSite
runSiteDB
:: (MonadSite m, Site (SiteEnv m))
=> PersistConfigBackend (SitePersistConfig (SiteEnv m)) m a
-> m a
runSiteDB action = do
site <- askSite
runPool (sitePersistConfig site) action (sitePersistPool site)
instance MonadSite (HandlerFor site) where
type SiteEnv (HandlerFor site) = site
askSite = getYesod
askUrlRender = getUrlRender
forkSite = forkHandler
asyncSite action = do
mvar <- newEmptyMVar
let handle e = putMVar mvar $ Left e
forkHandler handle $ do
result <- action
putMVar mvar $ Right result
return $ liftIO $ readMVar mvar
newtype WorkerT site m a = WorkerT
{ unWorkerT :: LoggingT (ReaderT site m) a
}
deriving
( Functor, Applicative, Monad, MonadFail, MonadIO, MonadLogger
, MonadLoggerIO
)
instance MonadUnliftIO m => MonadUnliftIO (WorkerT site m) where
askUnliftIO =
WorkerT $ withUnliftIO $ \ u ->
return $ UnliftIO $ unliftIO u . unWorkerT
withRunInIO inner =
WorkerT $ withRunInIO $ \ run -> inner (run . unWorkerT)
instance MonadTrans (WorkerT site) where
lift = WorkerT . lift . lift
instance (MonadUnliftIO m, Yesod site, Site site) => MonadSite (WorkerT site m) where
type SiteEnv (WorkerT site m) = site
askSite = WorkerT $ lift ask
askUrlRender = do
site <- askSite
return $ \ route -> yesodRender site (siteApproot site) route []
forkSite handler action = void $ forkFinally action handler'
where
handler' (Left e) = handler e
handler' (Right _) = pure ()
asyncSite action = waitCatch <$> async action
runWorkerT :: (Yesod site, Site site) => WorkerT site m a -> site -> m a
runWorkerT (WorkerT action) site = runReaderT (runLoggingT action logFunc) site
where
logFunc = messageLoggerSource site (siteLogger site)
type WorkerFor site = WorkerT site IO
runWorker :: (Yesod site, Site site) => WorkerFor site a -> site -> IO a
runWorker = runWorkerT

View file

@ -100,6 +100,7 @@ library
Yesod.Auth.Unverified.Internal Yesod.Auth.Unverified.Internal
Yesod.FedURI Yesod.FedURI
Yesod.Hashids Yesod.Hashids
Yesod.MonadSite
Yesod.Paginate.Local Yesod.Paginate.Local
Yesod.Persist.Local Yesod.Persist.Local
Yesod.SessionEntity Yesod.SessionEntity