Actor system: Implement internal auto-increasing actor IDs

This commit is contained in:
Pere Lev 2024-08-22 20:54:31 +03:00
parent 0d189271cc
commit cdc97dcb8b
No known key found for this signature in database
GPG key ID: 5252C5C863E5E57D
13 changed files with 767 additions and 306 deletions

2
.gitignore vendored
View file

@ -22,3 +22,5 @@ config/ssh-host-key.pub
lib/
repos/
delivery-states/
actor-counter.sqlite3
delivery-counter.sqlite3

View file

@ -50,6 +50,7 @@ module Control.Concurrent.Actor
, MethodHandler (..)
-- * Implementing an actor
, SpawnMode (AllowSpawn)
, Stage (..)
, Actor (..)
, Next ()
@ -67,6 +68,7 @@ module Control.Concurrent.Actor
-- * Calling actor methods
, ActorHasMethod
, Ref ()
, callIO'
, sendIO'
, call'
@ -93,6 +95,8 @@ module Control.Concurrent.Actor
, HAdaptHandler
, Handler_
, Handle'
, ActorRefMap
, ActorRefMapTVar_
-- * Exported to allow some Yesod Handlers to reuse some actor actions
, runActor
@ -124,9 +128,12 @@ import Control.Monad.Trans.Reader
import Data.Foldable
import Data.Hashable
import Data.HashMap.Strict (HashMap)
import Data.Int
import Data.Maybe
import Data.Proxy
import Data.Text (Text)
import Data.Traversable
import Database.Persist.Sql (PersistField (..), PersistFieldSql (..))
import GHC.TypeLits
import UnliftIO.Exception
@ -137,6 +144,7 @@ import qualified Data.Text as T
import qualified Vary as V
import Control.Concurrent.Return
import Database.Persist.Box
--------------------------- Defining method types ----------------------------
@ -243,17 +251,20 @@ handleMethod = Proxy
-- done True
-- @
data MethodHandler (actor :: Type) (sym :: Symbol) (sig :: Signature) =
Proxy sym := (ActorKey actor -> HandlerSig (ActorStage actor) sig)
Proxy sym := (ActorIdentity actor -> HandlerSig (ActorStage actor) sig)
--------------------------- Implementing an actor ----------------------------
class Stage (a :: Type) where
data SpawnMode = NoSpawn | AllowSpawn
class KnownSpawnMode (StageSpawn a) => Stage (a :: Type) where
data StageEnv a :: Type
type StageActors a :: [Type]
type StageSpawn a :: SpawnMode
class Actor (a :: Type) where
type ActorStage a :: Type
type ActorKey a = (k :: Type) | k -> a
type ActorIdentity a :: Type
type ActorInterface a :: [Method]
data Next = Stop | Proceed
@ -266,15 +277,23 @@ class Actor a => ActorLaunch a where
type LogFunc = Loc -> LogSource -> LogLevel -> LogStr -> IO ()
type ActorRefMap a = HashMap (ActorKey a) (ActorRef a)
type ActorRefMap a = HashMap (Ref a) (ActorRef a)
data ActorRefMapTVar_ :: Type -> Exp Type
type instance Eval (ActorRefMapTVar_ a) = TVar (ActorRefMap a)
type ActorInt = Int64
type TheaterCounter :: SpawnMode -> Type
type family TheaterCounter mode = t | t -> mode where
TheaterCounter NoSpawn = ()
TheaterCounter AllowSpawn = (TheaterFor (ACounterStage ActorInt), Ref (ACounter ActorInt))
-- | A set of live actors responding to messages
data TheaterFor s = TheaterFor
{ theaterMap :: HList (Eval (Map ActorRefMapTVar_ (StageActors s)))
, theaterLog :: LogFunc
, theaterCounter :: TheaterCounter (StageSpawn s)
}
-- | Actor monad in which message reponse actions are executed. Supports
@ -541,15 +560,14 @@ askTheater :: ActFor s (TheaterFor s)
askTheater = ActFor $ lift $ asks snd
lookupActor
:: ( Eq (ActorKey a), Hashable (ActorKey a)
, H.HOccurs
:: ( H.HOccurs
(TVar (ActorRefMap a))
(HList (Eval (Map ActorRefMapTVar_ (StageActors s))))
)
=> TheaterFor s
-> ActorKey a
-> Ref a
-> IO (Maybe (ActorRef a))
lookupActor (TheaterFor hlist _) key =
lookupActor (TheaterFor hlist _ _) key =
HM.lookup key <$> readTVarIO (H.hOccurs hlist)
{-
@ -582,7 +600,6 @@ callIO'
(sig::Signature)
(stage::Type) .
( Actor a
, Eq (ActorKey a), Hashable (ActorKey a)
, m ~ (sym ::: sig)
--, Eval (LookupSig sym (ActorInterface a)) ~ Just sig
--, Eval (Parcel_ m) V.:| Eval (Map Parcel_ (ActorInterface a))
@ -593,7 +610,7 @@ callIO'
)
=> TheaterFor stage
-> Proxy m
-> ActorKey a
-> Ref a
-> HList (SignatureParams sig)
-> IO (Maybe (SignatureReturn sig))
callIO' theater proxy key args = do
@ -617,7 +634,6 @@ sendIO'
(sig::Signature)
(stage::Type) .
( Actor a
, Eq (ActorKey a), Hashable (ActorKey a)
, m ~ (sym ::: sig)
, Eval (LookupSig sym (ActorInterface a)) ~ Just sig
, Eval (Parcel_ m) V.:| Eval (Map Parcel_ (ActorInterface a))
@ -627,7 +643,7 @@ sendIO'
)
=> TheaterFor stage
-> Proxy m
-> ActorKey a
-> Ref a
-> HList (SignatureParams sig)
-> IO Bool
sendIO' theater proxy key args = do
@ -654,7 +670,6 @@ call'
(stage::Type)
(monad :: Type -> Type) .
( Actor a
, Eq (ActorKey a), Hashable (ActorKey a)
, m ~ (sym ::: sig)
, Eval (LookupSig sym (ActorInterface a)) ~ Just sig
, Eval (Parcel_ m) V.:| Eval (Map Parcel_ (ActorInterface a))
@ -666,12 +681,12 @@ call'
, MonadActorStage monad ~ stage
)
=> Proxy m
-> ActorKey a
-> Ref a
-> HList (SignatureParams sig)
-> monad (Maybe (SignatureReturn sig))
call' proxy key args = liftActor $ do
call' proxy ref args = liftActor $ do
theater <- askTheater
liftIO $ callIO' theater proxy key args
liftIO $ callIO' theater proxy ref args
-- | Like 'send', except a Proxy is passed to specify the method's name, and
-- arguments are passed as a 'HList'.
@ -684,7 +699,6 @@ send'
(stage::Type)
(monad :: Type -> Type) .
( Actor a
, Eq (ActorKey a), Hashable (ActorKey a)
, m ~ (sym ::: sig)
, Eval (LookupSig sym (ActorInterface a)) ~ Just sig
, Eval (Parcel_ m) V.:| Eval (Map Parcel_ (ActorInterface a))
@ -696,7 +710,7 @@ send'
, MonadActorStage monad ~ stage
)
=> Proxy m
-> ActorKey a
-> Ref a
-> HList (SignatureParams sig)
-> monad Bool
send' proxy key args = liftActor $ do
@ -730,7 +744,7 @@ type family CallSig (stage :: Type) (signature :: Signature) = (a :: Type) | a -
CallSig s (t :-> sig) = t -> CallSig s sig
class ActorMethodCall (sym :: Symbol) (actor :: Type) (params :: [Type]) (result :: Type) where
actorMethodCall :: ActorKey actor -> HList params -> result
actorMethodCall :: Ref actor -> HList params -> result
instance
forall
@ -744,7 +758,6 @@ instance
(params :: [Type])
(paramsRev :: [Type]) .
( Actor a
, Eq (ActorKey a), Hashable (ActorKey a)
, Eval (LookupSig sym (ActorInterface a)) ~ Just sig
, m ~ (sym ::: sig)
, ret ~ SignatureReturn sig
@ -781,7 +794,7 @@ call
, Eval (LookupSig sym (ActorInterface actor)) ~ Just sig
, r ~ CallSig (ActorStage actor) sig
)
=> ActorKey actor
=> Ref actor
-> r
call key = actorMethodCall @sym key HNil
@ -790,7 +803,7 @@ type family SendSig (stage :: Type) (signature :: Signature) = (a :: Type) | a -
SendSig s (t :-> sig) = t -> SendSig s sig
class ActorMethodSend (sym :: Symbol) (actor :: Type) (params :: [Type]) (result :: Type) where
actorMethodSend :: ActorKey actor -> HList params -> result
actorMethodSend :: Ref actor -> HList params -> result
instance
forall
@ -803,7 +816,6 @@ instance
(params :: [Type])
(paramsRev :: [Type]) .
( Actor a
, Eq (ActorKey a), Hashable (ActorKey a)
, Eval (LookupSig sym (ActorInterface a)) ~ Just sig
, m ~ (sym ::: sig)
, Eval (Parcel_ m) V.:| Eval (Map Parcel_ (ActorInterface a))
@ -839,7 +851,7 @@ send
, Eval (LookupSig sym (ActorInterface actor)) ~ Just sig
, r ~ SendSig (ActorStage actor) sig
)
=> ActorKey actor
=> Ref actor
-> r
send key = actorMethodSend @sym key HNil
@ -912,24 +924,24 @@ uncurryHandler = uncurryH
adaptHandler
:: ( ActorStage actor ~ stage
, Show (ActorKey actor)
, KnownSymbol sym
, UncurryH
(SignatureParams sig)
(HandlerSig stage sig)
(HandlerAction stage (SignatureReturn sig))
)
=> ActorKey actor
=> Ref actor
-> ActorIdentity actor
-> MethodHandler actor sym sig
-> (Proxy sym, Parcel sig)
-> (AdaptedAction (ActorStage actor), Text)
adaptHandler key (Proxy := handler) (p@Proxy, Parcel args respond) =
adaptHandler ref ident (Proxy := handler) (p@Proxy, Parcel args respond) =
(go, prefixOn)
where
prefix = T.concat ["[Actor '", T.pack $ show key, "']"]
prefix = T.concat ["[Actor '", T.pack $ show ref, "']"]
prefixOn = T.concat [prefix, " on ", T.pack $ symbolVal p]
go = do
result <- try $ uncurryHandler (handler key) args
result <- try $ uncurryHandler (handler ident) args
case result of
Left e -> do
logError $ T.concat [prefix, " exception: ", T.pack $ displayException (e :: SomeException)]
@ -942,11 +954,10 @@ adaptHandler key (Proxy := handler) (p@Proxy, Parcel args respond) =
-- This is for adaptHandler to work with hMapL
data HAdaptHandler a = HAdaptHandler (ActorKey a)
data HAdaptHandler a = HAdaptHandler (Ref a) (ActorIdentity a)
instance
( ActorStage actor ~ stage
, Show (ActorKey actor)
, KnownSymbol sym
, UncurryH
(SignatureParams sig)
@ -956,12 +967,11 @@ instance
, o ~ ( (Proxy sym, Parcel sig) -> (AdaptedAction stage, Text) )
) =>
H.ApplyAB (HAdaptHandler actor) i o where
applyAB (HAdaptHandler key) = adaptHandler key
applyAB (HAdaptHandler ref ident) = adaptHandler ref ident
data AdaptHandlerConstraint :: Type -> Method -> Exp Constraint
type instance Eval (AdaptHandlerConstraint actor (sym ::: sig)) =
( Show (ActorKey actor)
, KnownSymbol sym
( KnownSymbol sym
, UncurryH
(SignatureParams sig)
(HandlerSig (ActorStage actor) sig)
@ -976,12 +986,10 @@ type instance Eval (AdaptedHandler stage (sym ::: sig)) =
(Proxy sym, Parcel sig) -> (AdaptedAction stage, Text)
launchActorThread
:: forall (a::Type) (k::Type) (s::Type) (ms::[Method]) .
:: forall (a::Type) (s::Type) (ms::[Method]) .
( ActorLaunch a
, ActorStage a ~ s
, ActorKey a ~ k
, ActorInterface a ~ ms
, Eq k, Hashable k, Show k
, H.HOccurs
(TVar (ActorRefMap a))
(HList (Eval (Map ActorRefMapTVar_ (StageActors s))))
@ -1011,14 +1019,15 @@ launchActorThread
)
)
)
=> Chan (Invocation ms)
=> Ref a
-> Chan (Invocation ms)
-> TheaterFor s
-> k
-> ActorIdentity a
-> StageEnv s
-> IO ()
launchActorThread chan theater actor env =
launchActorThread ref chan theater actor env =
void $ forkIO $ runActor theater env $ do
let handlers' = H.hMapL (HAdaptHandler actor) handlers :: HList (Eval (Map (AdaptedHandler s) ms))
let handlers' = H.hMapL (HAdaptHandler ref actor) handlers :: HList (Eval (Map (AdaptedHandler s) ms))
logInfo $ prefix <> " starting"
loop handlers'
logInfo $ prefix <> " bye"
@ -1026,7 +1035,7 @@ launchActorThread chan theater actor env =
handlers :: HList (Eval (Map (Handler_ a) ms))
handlers = actorBehavior (Proxy @a)
prefix = T.concat ["[Actor '", T.pack $ show actor, "']"]
prefix = T.concat ["[Actor '", T.pack $ show ref, "']"]
loop :: HList (Eval (Map (AdaptedHandler s) ms)) -> ActFor s ()
loop handlers' = do
@ -1044,7 +1053,7 @@ launchActorThread chan theater actor env =
logInfo $ T.concat [prefixOn, " stopping"]
let tvar = H.hOccurs (theaterMap theater) :: TVar (ActorRefMap a)
liftIO $ atomically $ modifyTVar' tvar $ HM.delete actor
liftIO $ atomically $ modifyTVar' tvar $ HM.delete ref
return False
Proceed -> do
@ -1054,12 +1063,12 @@ launchActorThread chan theater actor env =
-- | Same as 'spawn', except it takes the theater as a parameter.
spawnIO
:: forall (a::Type) (k::Type) (s::Type) (ms::[Method]) .
:: forall (a::Type) (s::Type) (ms::[Method]) .
( ActorLaunch a
, ActorStage a ~ s
, ActorKey a ~ k
, Stage s
, StageSpawn s ~ AllowSpawn
, ActorInterface a ~ ms
, Eq k, Hashable k, Show k
, H.HOccurs
(TVar (ActorRefMap a))
(HList (Eval (Map ActorRefMapTVar_ (StageActors s))))
@ -1090,36 +1099,30 @@ spawnIO
)
)
=> TheaterFor s
-> ActorKey a
-> ActorIdentity a
-> IO (StageEnv s)
-> IO Bool
spawnIO theater@(TheaterFor hlist _) key mkEnv = do
-> IO (Ref a)
spawnIO theater@(TheaterFor hlist _ (acounterTheater, acounterRef)) ident mkEnv = do
let tvar = H.hOccurs hlist :: TVar (ActorRefMap a)
chan <- newChan
added <- atomically $ stateTVar tvar $ \ hm ->
let hm' = HM.alter (create $ ActorRef $ ActorRef' chan) key hm
in ( not (HM.member key hm) && HM.member key hm'
, hm'
)
when added $ do
next <- fromJust <$> callIO' @"next" acounterTheater Proxy acounterRef HNil
let ref = Ref next
atomically $ modifyTVar' tvar $ HM.insert ref (ActorRef $ ActorRef' chan)
env <- mkEnv
launchActorThread chan theater key env
return added
where
create actor Nothing = Just actor
create _ j@(Just _) = j
launchActorThread ref chan theater ident env
return ref
-- | Launch a new actor with the given ID and behavior. Return 'True' if the ID
-- was unused and the actor has been launched. Return 'False' if the ID is
-- already in use, thus a new actor hasn't been launched.
spawn
:: forall (m::Type->Type) (a::Type) (k::Type) (s::Type) (ms::[Method]) .
:: forall (a::Type) (m::Type->Type) (s::Type) (ms::[Method]) .
( MonadActor m, MonadActorStage m ~ s
, ActorLaunch a
, ActorStage a ~ s
, ActorKey a ~ k
, Stage s
, StageSpawn s ~ AllowSpawn
, ActorInterface a ~ ms
, Eq k, Hashable k, Show k
, H.HOccurs
(TVar (ActorRefMap a))
(HList (Eval (Map ActorRefMapTVar_ (StageActors s))))
@ -1149,22 +1152,20 @@ spawn
)
)
)
=> ActorKey a
=> ActorIdentity a
-> IO (StageEnv s)
-> m Bool
spawn key mkEnv = liftActor $ do
-> m (Ref a)
spawn ident mkEnv = liftActor $ do
theater <- askTheater
liftIO $ spawnIO theater key mkEnv
liftIO $ spawnIO theater ident mkEnv
--------------------------- Launching the actor system -----------------------
prepareActorType
:: forall (a::Type) (k::Type) (s::Type) (ms::[Method]) .
:: forall (a::Type) (s::Type) (ms::[Method]) .
( ActorLaunch a
, ActorStage a ~ s
, ActorKey a ~ k
, ActorInterface a ~ ms
, Eq k, Hashable k, Show k
, H.HOccurs
(TVar (ActorRefMap a))
(HList (Eval (Map ActorRefMapTVar_ (StageActors s))))
@ -1195,34 +1196,40 @@ prepareActorType
)
, Stage s
)
=> [(k, StageEnv s)]
=> TheaterCounter (StageSpawn s)
-> [(ActorIdentity a, StageEnv s)]
-> IO
( TVar (ActorRefMap a)
( ( TVar (ActorRefMap a)
, TheaterFor s -> IO ()
)
prepareActorType actors = do
actorsWithChans <- for actors $ \ (key, env) -> do
, [(ActorIdentity a, Ref a)]
)
prepareActorType counter actors = do
refs <- produceRefs counter $ length actors
let actorsWithRefs = zip actors refs
actorsWithChans <- for actorsWithRefs $ \ ((ident, env), ref) -> do
chan <- newChan
return (key, env, chan)
return (ref, ident, env, chan)
tvar <-
newTVarIO $ HM.fromList $
map
(\ (key, _, chan) -> (key, ActorRef $ ActorRef' chan))
(\ (ref, _, _, chan) -> (ref, ActorRef $ ActorRef' chan))
actorsWithChans
return
( tvar
, \ theater -> for_ actorsWithChans $ \ (key, env, chan) ->
launchActorThread chan theater key env
( ( tvar
, \ theater -> for_ actorsWithChans $ \ (ref, ident, env, chan) ->
launchActorThread ref chan theater ident env
)
, map (\ (ref, ident, _, _) -> (ident, ref)) actorsWithChans
)
data HPrepareActorType = HPrepareActorType
data HPrepareActorType (sm::SpawnMode) = HPrepareActorType (TheaterCounter sm)
instance
forall (a::Type) (k::Type) (s::Type) (ms::[Method]) (i::Type) (o::Type).
forall (a::Type) (s::Type) (sm::SpawnMode) (ms::[Method]) (i::Type) (o::Type).
( ActorLaunch a
, ActorStage a ~ s
, ActorKey a ~ k
, StageSpawn s ~ sm
, ActorInterface a ~ ms
, Eq k, Hashable k, Show k
, H.HOccurs
(TVar (ActorRefMap a))
(HList (Eval (Map ActorRefMapTVar_ (StageActors s))))
@ -1252,16 +1259,15 @@ instance
)
)
, Stage s
, i ~ [(k, StageEnv s)]
, o ~ IO (TVar (ActorRefMap a), TheaterFor s -> IO ())
, i ~ [(ActorIdentity a, StageEnv s)]
, o ~ IO ((TVar (ActorRefMap a), TheaterFor s -> IO ()), [(ActorIdentity a, Ref a)])
) =>
H.ApplyAB HPrepareActorType i o where
applyAB _ a = prepareActorType a
H.ApplyAB (HPrepareActorType sm) i o where
applyAB (HPrepareActorType counter) a = prepareActorType counter a
data A_ :: Type -> Exp Constraint
type instance Eval (A_ a) =
( ActorLaunch a
, Eq (ActorKey a), Hashable (ActorKey a), Show (ActorKey a)
, H.HOccurs
(TVar (ActorRefMap a))
(HList (Eval (Map ActorRefMapTVar_ (StageActors (ActorStage a)))))
@ -1293,19 +1299,42 @@ type instance Eval (A_ a) =
)
data Starter :: Type -> Exp Type
type instance Eval (Starter a) = [(ActorKey a, StageEnv (ActorStage a))]
type instance Eval (Starter a) = [(ActorIdentity a, StageEnv (ActorStage a))]
data Prepare_ :: Type -> Type -> Exp Type
type instance Eval (Prepare_ s a) = IO (TVar (ActorRefMap a), TheaterFor s -> IO ())
type instance Eval (Prepare_ s a) = IO ((TVar (ActorRefMap a), TheaterFor s -> IO ()), [(ActorIdentity a, Ref a)])
data Pair_ :: Type -> Type -> Exp Type
type instance Eval (Pair_ s a) = (TVar (ActorRefMap a), TheaterFor s -> IO ())
data Triplet_ :: Type -> Type -> Exp Type
type instance Eval (Triplet_ s a) = ((TVar (ActorRefMap a), TheaterFor s -> IO ()), [(ActorIdentity a, Ref a)])
data Launch_ :: Type -> Type -> Exp Type
type instance Eval (Launch_ s _) = TheaterFor s -> IO ()
-- | Launch the actor system
startTheater
data Finisher :: Type -> Exp Type
type instance Eval (Finisher a) = [(ActorIdentity a, Ref a)]
class KnownSpawnMode (sm :: SpawnMode) where
type SpawnModeInput sm = (i :: Type) | i -> sm
loadCounter :: SpawnModeInput sm -> IO (TheaterCounter sm)
produceRefs :: TheaterCounter sm -> Int -> IO [Ref a]
instance KnownSpawnMode NoSpawn where
type SpawnModeInput NoSpawn = ()
loadCounter () = pure ()
produceRefs () count = pure $ map Ref [0 .. toEnum count - 1]
instance KnownSpawnMode AllowSpawn where
type SpawnModeInput AllowSpawn = (LogFunc, FilePath)
loadCounter (logFunc, pathA) = loadSingleACounterTheater logFunc pathA 0
produceRefs (acounterTheater, acounterRef) count =
replicateM count $
Ref . fromJust <$>
callIO' @"next" acounterTheater Proxy acounterRef HNil
startTheater'
:: forall (s :: Type) (as :: [Type]) .
( Stage s
, StageActors s ~ as
@ -1313,54 +1342,107 @@ startTheater
, H.HMapAux
HList
HPrepareActorType
(HPrepareActorType (StageSpawn s))
(Eval (Map Starter as))
(Eval (Map (Prepare_ s) as))
, H.SameLength'
(Eval (Map Starter as))
(Eval (Map (Prepare_ s) as))
, H.SameLength'
(Eval (Map (Prepare_ s) as))
(Eval (Map Starter as))
, H.HSequence
IO
(Eval (Map (Prepare_ s) as))
IO (Eval (Map (Prepare_ s) as)) (Eval (Map (Triplet_ s) as))
, H.HZipList
(Eval (Map (Pair_ s) as))
, H.SameLength'
(Eval (Map ActorRefMapTVar_ as))
(Eval (Map (Launch_ s) as))
, H.SameLength'
(Eval (Map (Launch_ s) as))
(Eval (Map ActorRefMapTVar_ as))
, H.SameLength'
(Eval (Map (Launch_ s) as))
(Eval (Map (Pair_ s) as))
, H.SameLength'
(Eval (Map (Pair_ s) as))
(Eval (Map (Launch_ s) as))
(Eval (Map Finisher as))
(Eval (Map (Triplet_ s) as))
, H.HZipList
(Eval (Map ActorRefMapTVar_ as))
(Eval (Map (Launch_ s) as))
(Eval (Map (Pair_ s) as))
, H.HList2List
(Eval (Map (Launch_ s) as))
(TheaterFor s -> IO ())
(Eval (Map (Launch_ s) as)) (TheaterFor s -> IO ())
, H.SameLength'
(Eval (Map (Prepare_ s) as)) (Eval (Map Starter as))
, H.SameLength'
(Eval (Map (Triplet_ s) as)) (Eval (Map Finisher as))
, H.SameLength'
(Eval (Map (Launch_ s) as)) (Eval (Map (Pair_ s) as))
, H.SameLength'
(Eval (Map Starter as)) (Eval (Map (Prepare_ s) as))
, H.SameLength'
(Eval (Map (Pair_ s) as)) (Eval (Map Finisher as))
, H.SameLength'
(Eval (Map (Launch_ s) as)) (Eval (Map ActorRefMapTVar_ as))
, H.SameLength'
(Eval (Map Finisher as)) (Eval (Map (Pair_ s) as))
, H.SameLength'
(Eval (Map (Pair_ s) as)) (Eval (Map (Launch_ s) as))
, H.SameLength'
(Eval (Map Finisher as)) (Eval (Map (Triplet_ s) as))
, H.SameLength'
(Eval (Map ActorRefMapTVar_ as)) (Eval (Map (Launch_ s) as))
)
=> LogFunc
=> SpawnModeInput (StageSpawn s)
-> LogFunc
-> HList (Eval (Map Starter as))
-> IO (TheaterFor s)
startTheater logFunc actors = do
let actions = H.hMapL HPrepareActorType actors :: HList (Eval (Map (Prepare_ s) as))
mapsAndLaunches <- H.hSequence actions :: IO (HList (Eval (Map (Pair_ s) as)))
let (maps :: HList (Eval (Map ActorRefMapTVar_ as)), launches :: HList (Eval (Map (Launch_ s) as))) = H.hUnzip mapsAndLaunches
theater = TheaterFor maps logFunc
-> IO (TheaterFor s, HList (Eval (Map Finisher as)))
startTheater' input logFunc actors = do
counter <- loadCounter input
let actions = H.hMapL (HPrepareActorType counter) actors :: HList (Eval (Map (Prepare_ s) as))
mapsAndLaunchesAndResults <- H.hSequence actions :: IO (HList (Eval (Map (Triplet_ s) as)))
let (mapsAndLaunches :: HList (Eval (Map (Pair_ s) as)), results :: HList (Eval (Map Finisher as))) = H.hUnzip mapsAndLaunchesAndResults
(maps :: HList (Eval (Map ActorRefMapTVar_ as)), launches :: HList (Eval (Map (Launch_ s) as))) = H.hUnzip mapsAndLaunches
theater = TheaterFor maps logFunc counter
for_ (H.hList2List launches) $ \ launch -> launch theater
return theater
return (theater, results)
-- | Launch the actor system
startTheater
:: forall (s :: Type) (as :: [Type]) .
( Stage s
, StageSpawn s ~ AllowSpawn
, StageActors s ~ as
, Eval (Constraints (Eval (Map A_ as)))
, H.HMapAux
HList
(HPrepareActorType (StageSpawn s))
(Eval (Map Starter as))
(Eval (Map (Prepare_ s) as))
, H.HSequence
IO (Eval (Map (Prepare_ s) as)) (Eval (Map (Triplet_ s) as))
, H.HZipList
(Eval (Map (Pair_ s) as))
(Eval (Map Finisher as))
(Eval (Map (Triplet_ s) as))
, H.HZipList
(Eval (Map ActorRefMapTVar_ as))
(Eval (Map (Launch_ s) as))
(Eval (Map (Pair_ s) as))
, H.HList2List
(Eval (Map (Launch_ s) as)) (TheaterFor s -> IO ())
, H.SameLength'
(Eval (Map (Prepare_ s) as)) (Eval (Map Starter as))
, H.SameLength'
(Eval (Map (Triplet_ s) as)) (Eval (Map Finisher as))
, H.SameLength'
(Eval (Map (Launch_ s) as)) (Eval (Map (Pair_ s) as))
, H.SameLength'
(Eval (Map Starter as)) (Eval (Map (Prepare_ s) as))
, H.SameLength'
(Eval (Map (Pair_ s) as)) (Eval (Map Finisher as))
, H.SameLength'
(Eval (Map (Launch_ s) as)) (Eval (Map ActorRefMapTVar_ as))
, H.SameLength'
(Eval (Map Finisher as)) (Eval (Map (Pair_ s) as))
, H.SameLength'
(Eval (Map (Pair_ s) as)) (Eval (Map (Launch_ s) as))
, H.SameLength'
(Eval (Map Finisher as)) (Eval (Map (Triplet_ s) as))
, H.SameLength'
(Eval (Map ActorRefMapTVar_ as)) (Eval (Map (Launch_ s) as))
)
=> FilePath
-> LogFunc
-> HList (Eval (Map Starter as))
-> IO (TheaterFor s, HList (Eval (Map Finisher as)))
startTheater avarBoxPath logFunc = startTheater' (logFunc, avarBoxPath) logFunc
@ -1482,7 +1564,7 @@ sendManyIO
=> TheaterFor s
-> HList (Eval (Map Set_ (StageActors s)))
-> IO ()
sendManyIO (TheaterFor hlist _) recips =
sendManyIO (TheaterFor hlist _ _) recips =
let zipped = H.hZip hlist recips
:: HList (Eval (Map Pair__ (StageActors s)))
actions = H.hMapL HSendTo zipped
@ -1620,3 +1702,159 @@ data Parcel (s :: Signature') = Parcel
}
-}
-- We're going to define a "simple value holder" actor, which is both a good
-- simple example for using the actor system itself, and will serve us for
-- holding vat and actor auto-increasing numbering.
--
-- Let's start with the Box.
newtype Cell a = Cell a
instance PersistField a => PersistField (Cell a) where
toPersistValue (Cell v) = toPersistValue v
fromPersistValue = fmap Cell . fromPersistValue
instance PersistFieldSql a => PersistFieldSql (Cell a) where
sqlType = sqlType . fmap uncell
where
uncell (Cell v) = v
instance PersistFieldSql a => BoxableVia (Cell a) where
type BV (Cell a) = BoxableField
-- Let's use this Box in a new actor type, the value holder
data AVarStage (a :: Type)
instance Stage (AVarStage a) where
data StageEnv (AVarStage a) = AVarStageEnv (Box (Cell a))
type StageActors (AVarStage a) = '[AVar a]
type StageSpawn (AVarStage a) = NoSpawn
data AVar (a :: Type)
instance Actor (AVar a) where
type ActorStage (AVar a) = AVarStage a
type ActorInterface (AVar a) =
[ "get" ::: Return a
, "put" ::: a :-> Return ()
]
type ActorIdentity (AVar a) = ()
instance PersistFieldSql a => ActorLaunch (AVar a) where
actorBehavior _ =
(handleMethod @"get" := \ () -> do
AVarStageEnv box <- askEnv
Cell val <- withBox box obtain
done val
)
`HCons`
(handleMethod @"put" := \ () val -> do
AVarStageEnv box <- askEnv
withBox box $ bestow $ Cell val
done ()
)
`HCons`
HNil
-- So, we want to load a theater with exactly one AVar
loadSingleAVarTheater
:: PersistFieldSql a
=> LogFunc
-> FilePath
-> a
-> IO (TheaterFor (AVarStage a), Ref (AVar a))
loadSingleAVarTheater logFunc path initial = do
box <- flip runLoggingT logFunc $ loadBox path $ Cell initial
startStar () logFunc () (AVarStageEnv box)
-- Now, we have an infinite loop problem:
--
-- * Every theater needs an AVar, in order to auto-increase the key counter
-- * The AVar comes inside a Theater
--
-- Solution: Instead of using a Theater, we're going to write a different
-- version specialized for our purpose here. A single-actor theater, that
-- doesn't need to count. But since ActFor uses a Theater, we're going to use
-- Theater as well, except we just rely on avoiding any spawning.
startStar counter logFunc ident env = do
(theater, actors `HCons` HNil) <- startTheater' counter logFunc $ [(ident, env)] `HCons` HNil
ref <-
case actors of
[((), r)] -> pure r
_ -> error "startStar: Expected exactly one actor"
return (theater, ref)
-- Now, we can't really use AVar here because we need increase to be atomic,
-- and AVar has only 'get' and 'put'. We could write a whole new actor type for
-- this, or just wrap AVar (shows what wrapping looks like with current API),
-- or add a new method to AVar (which wouldn't work in a network setting,
-- unless there's a way to send a function (a->a) remotely).
--
-- Let's go for the wrapping.
data ACounterStage (a :: Type)
instance Stage (ACounterStage a) where
data StageEnv (ACounterStage a) = ACounterStageEnv (TheaterFor (AVarStage a)) (Ref (AVar a))
type StageActors (ACounterStage a) = '[ACounter a]
type StageSpawn (ACounterStage a) = NoSpawn
data ACounter (a :: Type)
instance Actor (ACounter a) where
type ActorStage (ACounter a) = ACounterStage a
type ActorInterface (ACounter a) =
'[ "next" ::: Return a
]
type ActorIdentity (ACounter a) = ()
instance (Integral a, PersistFieldSql a) => ActorLaunch (ACounter a) where
actorBehavior _ =
(handleMethod @"next" := \ () -> do
ACounterStageEnv avarTheater avarRef <- askEnv
val <- liftIO $ fromJust <$> callIO' @"get" avarTheater Proxy avarRef HNil
void $ liftIO $ sendIO' @"put" avarTheater Proxy avarRef $ (val+1) `HCons` HNil
done val
)
`HCons`
HNil
-- So, we want to load a theater with exactly one ACounter
loadSingleACounterTheater
:: (Integral a, PersistFieldSql a)
=> LogFunc
-> FilePath
-> a
-> IO (TheaterFor (ACounterStage a), Ref (ACounter a))
loadSingleACounterTheater logFunc pathA initial = do
(theaterA, avarRef) <- loadSingleAVarTheater logFunc pathA initial
startStar () logFunc () (ACounterStageEnv theaterA avarRef)
-- We now modify TheaterFor, to have 2 types of theaters: Ones that allow
-- spawning, and ones that don't.
--
-- And we're going to do it type-based.
--
-- Done. Now, how will we track the bidirectional mapping between DB key and
-- internal actor number? Let's see why we need both directions:
--
-- * In actor handlers, we need the DB key avalable somehow, i.e. startTheater
-- and spawn/IO must take this per-actor "env"
-- * Inbox POST handlers need to determine the internal ID in order to insert
-- the incoming activity
-- First let's define a type to use for the ID
newtype Ref a = Ref ActorInt deriving newtype (Eq, Show, Read, Hashable)
-- Now, for each actor type, specify an env type, and have spawn & startTheater
-- take these values
-- Done. Now, we need to keep the map updated:
--
-- [ ] Whenever an actor is deleted, remove from appActors as well (preferrably
-- even before the removal from Theater)

View file

@ -32,6 +32,7 @@ module Vervis.API
where
import Control.Applicative
import Control.Concurrent.STM.TVar
import Control.Exception hiding (Handler, try)
import Control.Monad
import Control.Monad.IO.Class
@ -69,6 +70,8 @@ import Yesod.Persist.Core
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL
import qualified Data.HashMap.Strict as HM
import qualified Data.HList as H
import qualified Data.List.NonEmpty as NE
import qualified Data.Text as T
import qualified Data.Text.Encoding as TE
@ -145,10 +148,16 @@ handleViaActor
-> AP.Action URIMode
-> ExceptT Text Handler OutboxItemId
handleViaActor personID maybeCap localRecips remoteRecips fwdHosts action = do
personRef <- do
peopleVar <- H.hOccurs <$> asksSite appActors
people <- liftIO $ readTVarIO peopleVar
case HM.lookup personID people of
Nothing -> error "Person not found in appActors"
Just ref -> pure ref
theater <- asksSite appTheater
let maybeCap' = first (\ (byKey, _, i) -> (byKey, i)) <$> maybeCap
msg = ClientMsg maybeCap' localRecips remoteRecips fwdHosts action
maybeResult <- liftIO $ callIO' @"client" theater Proxy personID $ msg `HCons` HNil
maybeResult <- liftIO $ callIO' @"client" @Person theater Proxy personRef $ msg `HCons` HNil
outboxItemID <-
case maybeResult of
Nothing -> error "Person not found in theater"
@ -1123,7 +1132,7 @@ createPatchTrackerC (Entity pidUser personUser) senderActor maybeCap localRecips
success <- do
theater <- asksSite appTheater
env <- asksSite appEnv
liftIO $ launchActorIO theater env loomID
liftIO $ launchActorIO @Loom theater env loomID
unless success $
error "Failed to spawn new Loom, somehow ID already in Theater"
@ -1377,7 +1386,7 @@ createRepositoryC (Entity pidUser personUser) senderActor maybeCap localRecips r
success <- do
theater <- asksSite appTheater
env <- asksSite appEnv
liftIO $ launchActorIO theater env repoID
liftIO $ launchActorIO @Repo theater env repoID
unless success $
error "Failed to spawn new Repo, somehow ID already in Theater"

View file

@ -22,6 +22,9 @@
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE UndecidableInstances #-}
-- For launchActor not to need to take a Proxy
{-# LANGUAGE AllowAmbiguousTypes #-}
module Vervis.Actor
( -- * Local actors
LocalActorBy (..)
@ -79,6 +82,7 @@ module Vervis.Actor
, ClientMsg (..)
-- * Behavior utility types
, KeyAndRef_
, StageEnv (..)
, Staje
, Act
@ -106,6 +110,7 @@ module Vervis.Actor
)
where
import Control.Concurrent.STM
import Control.Concurrent.STM.TVar
import Control.Monad
import Control.Monad.IO.Class
@ -121,6 +126,8 @@ import Data.Function
import Data.Hashable
import Data.HashMap.Strict (HashMap)
import Data.HashSet (HashSet)
import Data.HList (HList (..))
import Data.Kind
import Data.List.NonEmpty (NonEmpty)
import Data.Maybe
import Data.Text (Text)
@ -139,6 +146,7 @@ import qualified Control.Monad.Fail as F
import qualified Data.Aeson as A
import qualified Data.ByteString.Lazy as BL
import qualified Data.HashSet as HS
import qualified Data.HashMap.Strict as HM
import qualified Data.HList as H
import qualified Data.List.NonEmpty as NE
import qualified Data.List.Ordered as LO
@ -491,7 +499,7 @@ type Ret = Return (Either Text Text)
instance Actor Person where
type ActorStage Person = Staje
type ActorKey Person = PersonId
type ActorIdentity Person = PersonId
type ActorInterface Person =
[ "verse" ::: Verse :-> Ret
, "client" ::: ClientMsg :-> Return (Either Text OutboxItemId)
@ -499,41 +507,41 @@ instance Actor Person where
]
instance Actor Deck where
type ActorStage Deck = Staje
type ActorKey Deck = DeckId
type ActorIdentity Deck = DeckId
type ActorInterface Deck =
[ "verse" ::: Verse :-> Ret
, "init" ::: (Either (LocalActorBy Key, ActorId, OutboxItemId) (RemoteAuthor, LocalURI)) :-> Ret
]
instance Actor Loom where
type ActorStage Loom = Staje
type ActorKey Loom = LoomId
type ActorIdentity Loom = LoomId
type ActorInterface Loom =
'[ "verse" ::: Verse :-> Ret
]
instance Actor Repo where
type ActorStage Repo = Staje
type ActorKey Repo = RepoId
type ActorIdentity Repo = RepoId
type ActorInterface Repo =
[ "verse" ::: Verse :-> Ret
, "wait-during-push" ::: IO () :-> Ret
]
instance Actor Project where
type ActorStage Project = Staje
type ActorKey Project = ProjectId
type ActorIdentity Project = ProjectId
type ActorInterface Project =
[ "verse" ::: Verse :-> Ret
, "init" ::: (Either (LocalActorBy Key, ActorId, OutboxItemId) (RemoteAuthor, LocalURI)) :-> Ret
]
instance Actor Group where
type ActorStage Group = Staje
type ActorKey Group = GroupId
type ActorIdentity Group = GroupId
type ActorInterface Group =
[ "verse" ::: Verse :-> Ret
, "init" ::: (Either (LocalActorBy Key, ActorId, OutboxItemId) (RemoteAuthor, LocalURI)) :-> Ret
]
instance Actor Factory where
type ActorStage Factory = Staje
type ActorKey Factory = FactoryId
type ActorIdentity Factory = FactoryId
type ActorInterface Factory =
[ "verse" ::: Verse :-> Ret
, "verified" ::: PersonId :-> Ret
@ -571,6 +579,9 @@ instance VervisActor Factory where
toVerse _ = Nothing
-}
data KeyAndRef_ :: Type -> Exp Type
type instance Eval (KeyAndRef_ a) = TVar (HashMap (Key a) (Ref a))
instance Stage Staje where
data StageEnv Staje = forall y. (Typeable y, Yesod y) => Env
-- | Data to which every actor has access. Since such data can be passed to the
@ -594,9 +605,11 @@ instance Stage Staje where
, envYesodRender :: YesodRender y
, envHttpManager :: Manager
, envFetch :: ActorFetchShare
, envActors :: HList (Eval (Map KeyAndRef_ (StageActors Staje)))
}
deriving Typeable
type StageActors Staje = [Person, Project, Group, Deck, Loom, Repo, Factory]
type StageSpawn Staje = AllowSpawn
type YesodRender y = Route y -> [(Text, Text)] -> Text
@ -664,130 +677,149 @@ instance (Actor a, VervisActorLaunch a, ActorReturn a ~ Either Text Text, ActorS
-}
launchActorIO
:: ( ActorLaunch a, ActorStage a ~ Staje
, Eq (ActorKey a), Hashable (ActorKey a), Show (ActorKey a)
, H.HEq
(TVar (HashMap (ActorKey a) (ActorRef a)))
(TVar (HashMap PersonId (ActorRef Person)))
b1
, H.HOccurrence'
b1
(TVar (HashMap (ActorKey a) (ActorRef a)))
'[TVar (HashMap PersonId (ActorRef Person)),
TVar (HashMap ProjectId (ActorRef Project)),
TVar (HashMap GroupId (ActorRef Group)),
TVar (HashMap DeckId (ActorRef Deck)),
TVar (HashMap LoomId (ActorRef Loom)),
TVar (HashMap RepoId (ActorRef Repo)),
TVar (HashMap FactoryId (ActorRef Factory))]
l'1
, H.HOccurs'
(TVar (HashMap (ActorKey a) (ActorRef a)))
l'1
'[TVar (HashMap PersonId (ActorRef Person)),
TVar (HashMap ProjectId (ActorRef Project)),
TVar (HashMap GroupId (ActorRef Group)),
TVar (HashMap DeckId (ActorRef Deck)),
TVar (HashMap LoomId (ActorRef Loom)),
TVar (HashMap RepoId (ActorRef Repo)),
TVar (HashMap FactoryId (ActorRef Factory))]
, ActorStage a ~ s
:: forall a ms b l .
( ActorLaunch a, ActorStage a ~ Staje
, Hashable (ActorIdentity a)
, ActorInterface a ~ ms
, Eval (Map (AdaptedHandler s) ms)
, H.HOccurs
(TVar (ActorRefMap a))
(HList (Eval (Map ActorRefMapTVar_ (StageActors Staje))))
, Eval (Map (AdaptedHandler Staje) ms)
~
Eval
(Map
(Func (AdaptedAction s, Text))
(Func (AdaptedAction Staje, Text))
(Eval (Map Parcel_ ms))
)
, H.SameLength'
(Eval (Map (Func (AdaptedAction s, Text)) (Eval (Map Parcel_ ms))))
(Eval (Map (Func (AdaptedAction Staje, Text)) (Eval (Map Parcel_ ms))))
(Eval (Map (Handler_ a) ms))
, H.SameLength'
(Eval (Map (Handler_ a) ms))
(Eval (Map (Func (AdaptedAction s, Text)) (Eval (Map Parcel_ ms))))
(Eval (Map (Func (AdaptedAction Staje, Text)) (Eval (Map Parcel_ ms))))
, Eval (Constraints (Eval (Map (AdaptHandlerConstraint a) ms)))
, Handle' (Eval (Map Parcel_ ms)) (AdaptedAction s, Text)
, Handle' (Eval (Map Parcel_ ms)) (AdaptedAction Staje, Text)
, H.HMapAux
H.HList
HList
(HAdaptHandler a)
(Eval (Map (Handler_ a) ms))
(Eval
(Map
(Func (AdaptedAction s, Text))
(Func (AdaptedAction Staje, Text))
(Eval (Map Parcel_ ms))
)
)
, H.HEq
(TVar (HashMap (ActorIdentity a) (Ref a)))
(TVar (HashMap (Key Person) (Ref Person)))
b
, H.HOccurrence'
b
(TVar (HashMap (ActorIdentity a) (Ref a)))
[TVar (HashMap (Key Person) (Ref Person)),
TVar (HashMap (Key Project) (Ref Project)),
TVar (HashMap (Key Group) (Ref Group)),
TVar (HashMap (Key Deck) (Ref Deck)),
TVar (HashMap (Key Loom) (Ref Loom)),
TVar (HashMap (Key Repo) (Ref Repo)),
TVar (HashMap (Key Factory) (Ref Factory))]
l
, H.HOccurs'
(TVar (HashMap (ActorIdentity a) (Ref a)))
l
[TVar (HashMap (Key Person) (Ref Person)),
TVar (HashMap (Key Project) (Ref Project)),
TVar (HashMap (Key Group) (Ref Group)),
TVar (HashMap (Key Deck) (Ref Deck)),
TVar (HashMap (Key Loom) (Ref Loom)),
TVar (HashMap (Key Repo) (Ref Repo)),
TVar (HashMap (Key Factory) (Ref Factory))]
)
=> Theater
-> StageEnv Staje
-> ActorKey a
-> ActorIdentity a
-> IO Bool
launchActorIO theater env key = spawnIO theater key (pure env)
launchActorIO theater env ident = do
let tvar = H.hOccurs (envActors env)
maybeRef <- HM.lookup ident <$> readTVarIO tvar
case maybeRef of
Just _ -> pure False
Nothing -> do
ref <- spawnIO @a theater ident (pure env)
atomically $ modifyTVar' tvar $ HM.insert ident ref
return True
launchActor
:: ( ActorLaunch a, ActorStage a ~ Staje
, Eq (ActorKey a), Hashable (ActorKey a), Show (ActorKey a)
, H.HEq
(TVar (HashMap (ActorKey a) (ActorRef a)))
(TVar (HashMap PersonId (ActorRef Person)))
b0
, H.HOccurrence'
b0
(TVar (HashMap (ActorKey a) (ActorRef a)))
'[TVar (HashMap PersonId (ActorRef Person)),
TVar (HashMap ProjectId (ActorRef Project)),
TVar (HashMap GroupId (ActorRef Group)),
TVar (HashMap DeckId (ActorRef Deck)),
TVar (HashMap LoomId (ActorRef Loom)),
TVar (HashMap RepoId (ActorRef Repo)),
TVar (HashMap FactoryId (ActorRef Factory))]
l'0
, H.HOccurs'
(TVar (HashMap (ActorKey a) (ActorRef a)))
l'0
'[TVar (HashMap PersonId (ActorRef Person)),
TVar (HashMap ProjectId (ActorRef Project)),
TVar (HashMap GroupId (ActorRef Group)),
TVar (HashMap DeckId (ActorRef Deck)),
TVar (HashMap LoomId (ActorRef Loom)),
TVar (HashMap RepoId (ActorRef Repo)),
TVar (HashMap FactoryId (ActorRef Factory))]
, ActorStage a ~ s
:: forall a ms b l .
( ActorLaunch a, ActorStage a ~ Staje
, Hashable (ActorIdentity a)
, ActorInterface a ~ ms
, Eval (Map (AdaptedHandler s) ms)
, H.HOccurs
(TVar (ActorRefMap a))
(HList (Eval (Map ActorRefMapTVar_ (StageActors Staje))))
, Eval (Map (AdaptedHandler Staje) ms)
~
Eval
(Map
(Func (AdaptedAction s, Text))
(Func (AdaptedAction Staje, Text))
(Eval (Map Parcel_ ms))
)
, H.SameLength'
(Eval (Map (Func (AdaptedAction s, Text)) (Eval (Map Parcel_ ms))))
(Eval (Map (Func (AdaptedAction Staje, Text)) (Eval (Map Parcel_ ms))))
(Eval (Map (Handler_ a) ms))
, H.SameLength'
(Eval (Map (Handler_ a) ms))
(Eval (Map (Func (AdaptedAction s, Text)) (Eval (Map Parcel_ ms))))
(Eval (Map (Func (AdaptedAction Staje, Text)) (Eval (Map Parcel_ ms))))
, Eval (Constraints (Eval (Map (AdaptHandlerConstraint a) ms)))
, Handle' (Eval (Map Parcel_ ms)) (AdaptedAction s, Text)
, Handle' (Eval (Map Parcel_ ms)) (AdaptedAction Staje, Text)
, H.HMapAux
H.HList
HList
(HAdaptHandler a)
(Eval (Map (Handler_ a) ms))
(Eval
(Map
(Func (AdaptedAction s, Text))
(Func (AdaptedAction Staje, Text))
(Eval (Map Parcel_ ms))
)
)
, H.HEq
(TVar (HashMap (ActorIdentity a) (Ref a)))
(TVar (HashMap (Key Person) (Ref Person)))
b
, H.HOccurrence'
b
(TVar (HashMap (ActorIdentity a) (Ref a)))
[TVar (HashMap (Key Person) (Ref Person)),
TVar (HashMap (Key Project) (Ref Project)),
TVar (HashMap (Key Group) (Ref Group)),
TVar (HashMap (Key Deck) (Ref Deck)),
TVar (HashMap (Key Loom) (Ref Loom)),
TVar (HashMap (Key Repo) (Ref Repo)),
TVar (HashMap (Key Factory) (Ref Factory))]
l
, H.HOccurs'
(TVar (HashMap (ActorIdentity a) (Ref a)))
l
[TVar (HashMap (Key Person) (Ref Person)),
TVar (HashMap (Key Project) (Ref Project)),
TVar (HashMap (Key Group) (Ref Group)),
TVar (HashMap (Key Deck) (Ref Deck)),
TVar (HashMap (Key Loom) (Ref Loom)),
TVar (HashMap (Key Repo) (Ref Repo)),
TVar (HashMap (Key Factory) (Ref Factory))]
)
=> ActorKey a
=> ActorIdentity a
-> Act Bool
launchActor key = do
e <- askEnv
spawn key (pure e)
launchActor ident = do
env <- askEnv
let tvar = H.hOccurs (envActors env)
maybeRef <- liftIO $ HM.lookup ident <$> readTVarIO tvar
case maybeRef of
Just _ -> pure False
Nothing -> do
ref <- spawn @a ident (pure env)
liftIO $ atomically $ modifyTVar' tvar $ HM.insert ident ref
return True
data RemoteRecipient = RemoteRecipient
{ remoteRecipientActor :: RemoteActorId
@ -796,6 +828,68 @@ data RemoteRecipient = RemoteRecipient
, remoteRecipientErrorSince :: Maybe UTCTime
}
--data MapAndSet_ :: Type -> Exp Type
--type instance Eval (MapAndSet_ a) = (Eval (KeyAndRef_ a), HashSet (ActorIdentity a))
sendVerses
:: ( Actor a
, ActorStage a ~ Staje
, ActorHasMethod a "verse" (Verse :-> Return (Either Text Text))
, Eq (ActorIdentity a)
, H.HEq
(TVar (ActorRefMap a)) (TVar (ActorRefMap Person)) b
, H.HOccurrence'
b
(TVar (ActorRefMap a))
[TVar (ActorRefMap Person), TVar (ActorRefMap Project),
TVar (ActorRefMap Group), TVar (ActorRefMap Deck),
TVar (ActorRefMap Loom), TVar (ActorRefMap Repo),
TVar (ActorRefMap Factory)]
l
, H.HOccurs'
(TVar (ActorRefMap a))
l
[TVar (ActorRefMap Person), TVar (ActorRefMap Project),
TVar (ActorRefMap Group), TVar (ActorRefMap Deck),
TVar (ActorRefMap Loom), TVar (ActorRefMap Repo),
TVar (ActorRefMap Factory)]
)
=> Verse
-> (TVar (HashMap (ActorIdentity a) (Ref a)), HashSet (ActorIdentity a))
-> Act ()
sendVerses verse (tvar, s) = do
actorMap <- liftIO $ readTVarIO tvar
let refs = HM.elems $ actorMap `HM.intersection` HS.toMap s
for_ refs $ \ ref -> void $ send @"verse" ref verse
data HSendVerses = HSendVerses Verse
instance
( Actor a
, ActorStage a ~ Staje
, ActorHasMethod a "verse" (Verse :-> Return (Either Text Text))
, Eq (ActorIdentity a)
, i ~ (TVar (HashMap (ActorIdentity a) (Ref a)), HashSet (ActorIdentity a))
, H.HEq
(TVar (ActorRefMap a)) (TVar (ActorRefMap Person)) b
, H.HOccurrence'
b
(TVar (ActorRefMap a))
[TVar (ActorRefMap Person), TVar (ActorRefMap Project),
TVar (ActorRefMap Group), TVar (ActorRefMap Deck),
TVar (ActorRefMap Loom), TVar (ActorRefMap Repo),
TVar (ActorRefMap Factory)]
l
, H.HOccurs'
(TVar (ActorRefMap a))
l
[TVar (ActorRefMap Person), TVar (ActorRefMap Project),
TVar (ActorRefMap Group), TVar (ActorRefMap Deck),
TVar (ActorRefMap Loom), TVar (ActorRefMap Repo),
TVar (ActorRefMap Factory)]
) =>
H.ApplyAB HSendVerses i (Act ()) where
applyAB (HSendVerses verse) = sendVerses verse
-- Given a list of local recipients, which may include actors and collections,
--
-- * Insert activity to message queues of live actors
@ -978,13 +1072,21 @@ sendToLocalActors authorAndId body requireOwner mauthor maidAuthor recips = do
(Just (liveRecipsR, actorVerse verse)) `H.HCons`
(Just (liveRecipsF, actorVerse verse)) `H.HCons` H.HNil
-}
for_ liveRecipsP $ \ k -> void $ send @"verse" k verse
for_ liveRecipsJ $ \ k -> void $ send @"verse" k verse
for_ liveRecipsG $ \ k -> void $ send @"verse" k verse
for_ liveRecipsD $ \ k -> void $ send @"verse" k verse
for_ liveRecipsL $ \ k -> void $ send @"verse" k verse
for_ liveRecipsR $ \ k -> void $ send @"verse" k verse
for_ liveRecipsF $ \ k -> void $ send @"verse" k verse
let actorSets =
liveRecipsP `HCons` liveRecipsJ `HCons` liveRecipsG `HCons`
liveRecipsD `HCons` liveRecipsL `HCons` liveRecipsR `HCons`
liveRecipsF `HCons` HNil
actorMaps <- envActors <$> askEnv
{-
let sendVerses'
:: ( ActorStage a ~ Staje
, ActorHasMethod a "verse" (Verse :-> Return (Either Text Text))
)
=> (TVar (HashMap (ActorIdentity a) (Ref a)), HashSet (ActorIdentity a))
-> Act ()
sendVerses' = sendVerses verse
-}
H.hMapM_ (HSendVerses verse) (H.hZip actorMaps actorSets) -- :: HList (Eval (Map MapAndSet_ (StageActors Staje))))
-- Return remote followers, to whom we need to deliver via HTTP
return remoteFollowers

View file

@ -19,6 +19,7 @@ module Vervis.Actor.Factory
where
import Control.Applicative
import Control.Concurrent.STM.TVar
import Control.Exception.Base hiding (handle)
import Control.Monad
import Control.Monad.IO.Class
@ -45,6 +46,8 @@ import Database.Persist.Sql
import Optics.Core
import Yesod.Persist.Core
import qualified Data.HashMap.Strict as HM
import qualified Data.HList as H
import qualified Data.Text as T
import qualified Database.Esqueleto as E
@ -1060,8 +1063,12 @@ factoryCreateNew new now factoryMeID (Verse authorIdMsig body) detail = do
}
return
( LocalResourceDeck did
, launchActor did
, send @"init" did authorId
, launchActor @Deck did
, do tvar <- H.hOccurs <$> asksEnv envActors
actors <- liftIO $ readTVarIO tvar
case HM.lookup did actors of
Nothing -> pure False
Just ref -> send @"init" @Deck ref authorId
)
NAProject -> do
jid <- insert Project
@ -1070,8 +1077,12 @@ factoryCreateNew new now factoryMeID (Verse authorIdMsig body) detail = do
}
return
( LocalResourceProject jid
, launchActor jid
, send @"init" jid authorId
, launchActor @Project jid
, do tvar <- H.hOccurs <$> asksEnv envActors
actors <- liftIO $ readTVarIO tvar
case HM.lookup jid actors of
Nothing -> pure False
Just ref -> send @"init" @Project ref authorId
)
NATeam -> do
gid <- insert Group
@ -1080,8 +1091,12 @@ factoryCreateNew new now factoryMeID (Verse authorIdMsig body) detail = do
}
return
( LocalResourceGroup gid
, launchActor gid
, send @"init" gid authorId
, launchActor @Group gid
, do tvar <- H.hOccurs <$> asksEnv envActors
actors <- liftIO $ readTVarIO tvar
case HM.lookup gid actors of
Nothing -> pure False
Just ref -> send @"init" @Group ref authorId
)
return (lr, launch, sendInit, rid)

View file

@ -502,7 +502,7 @@ clientCreateFactory now personMeID (ClientMsg maybeCap localRecips remoteRecips
)
-- Spawn new Factory actor
success <- lift $ launchActor factoryID
success <- lift $ launchActor @Factory factoryID
unless success $
error "Failed to spawn new Factory, somehow ID already in Theater"

View file

@ -16,6 +16,9 @@
{-# OPTIONS_GHC -fno-warn-orphans #-}
-- For HWriteTVar to work
{-# LANGUAGE UndecidableInstances #-}
{- LANGUAGE RankNTypes #-}
module Vervis.Application
@ -46,6 +49,8 @@ import Control.Monad.Trans.Reader
import Data.Bifunctor
import Data.Default.Class
import Data.Foldable
import Data.Hashable
import Data.HList (HList (..))
import Data.List
import Data.List.NonEmpty (nonEmpty)
import Data.Maybe
@ -99,6 +104,8 @@ import Yesod.ActivityPub
import Yesod.Hashids
import Yesod.MonadSite
import qualified Control.Concurrent.Actor as CCA
import Control.Concurrent.Local
import Development.Git (isGitRepo)
import Data.List.NonEmpty.Local
@ -162,6 +169,17 @@ mkYesodDispatch "App" resourcesApp
loggingFunction :: App -> LogFunc
loggingFunction app = messageLoggerSource app (appLogger app)
data HWriteTVar = HWriteTVar
instance
( CCA.Actor a
, Eq (ActorIdentity a)
, Hashable (ActorIdentity a)
, i ~ (TVar (HM.HashMap (ActorIdentity a) (Ref a)), [(ActorIdentity a, Ref a)])
) =>
H.ApplyAB HWriteTVar i (IO ()) where
applyAB HWriteTVar (tvar, l) =
atomically $ writeTVar tvar $ HM.fromList l
-- | This function allocates resources (such as a database connection pool),
-- performs initialization and returns a foundation datatype value. This is also
-- the place to put your migrate statements to have automatic database
@ -208,7 +226,8 @@ makeFoundation appSettings = do
appHashidsContext
appTheater
appEnv
appPersonLauncher =
appPersonLauncher
appActors =
App {..}
-- The App {..} syntax is an example of record wild cards. For more
-- information, see:
@ -221,6 +240,7 @@ makeFoundation appSettings = do
(error "theater forced in tempFoundation")
(error "env forced in tempFoundation")
(error "launcher forced in tempFoundation")
(error "actors forced in tempFoundation")
logFunc = loggingFunction tempFoundation
-- Create the database connection pool
@ -235,7 +255,7 @@ makeFoundation appSettings = do
hashidsSalt <- loadKeyFile loadMode $ appHashidsSaltFile appSettings
let hashidsCtx = hashidsContext hashidsSalt
app = mkFoundation pool capSignKey hashidsCtx (error "theater") (error "env") (error "launcher")
app = mkFoundation pool capSignKey hashidsCtx (error "theater") (error "env") (error "launcher") (error "actors")
-- Perform database migration using our application's logging settings.
--runLoggingT (runSqlPool (runMigration migrateAll) pool) logFunc
@ -256,13 +276,22 @@ makeFoundation appSettings = do
delivery <- do
micros <- intervalMicros $ appDeliveryRetryBase appSettings
startDeliveryTheater
(sitePostSignedHeaders app) micros appHttpManager logFunc delieryStateDir
"delivery-counter.sqlite3" (sitePostSignedHeaders app) micros appHttpManager logFunc delieryStateDir
actorTVars <- do
p <- newTVarIO HM.empty
j <- newTVarIO HM.empty
g <- newTVarIO HM.empty
d <- newTVarIO HM.empty
l <- newTVarIO HM.empty
r <- newTVarIO HM.empty
f <- newTVarIO HM.empty
return $ p `HCons` j `HCons` g `HCons` d `HCons` l `HCons` r `HCons` f `HCons` HNil
let root = renderObjURI $ flip ObjURI topLocalURI $ appInstanceHost appSettings
--render :: Yesod y => y -> Route y -> [(Text, Text)] -> Text
render = yesodRender app root
env = Env appSettings pool hashidsCtx appActorKeys delivery render appHttpManager appActorFetchShare
env = Env appSettings pool hashidsCtx appActorKeys delivery render appHttpManager appActorFetchShare actorTVars
actors <- flip runWorker app $ runSiteDB $ loadTheater env
theater <- startTheater logFunc actors
(theater, actorMap) <- startTheater "actor-counter.sqlite3" logFunc actors
launcher <- startPersonLauncher theater env
let hostString = T.unpack $ renderAuthority hLocal
@ -272,8 +301,10 @@ makeFoundation appSettings = do
, configMaxCommits = 20
}
H.hMapM_ HWriteTVar (H.hZip actorTVars actorMap)
-- Return the foundation
return app { appTheater = theater, appEnv = env, appPersonLauncher = launcher }
return app { appTheater = theater, appEnv = env, appPersonLauncher = launcher, appActors = actorTVars }
where
verifyRepoDir = do
repos <- lift reposFromDir
@ -371,7 +402,7 @@ makeFoundation appSettings = do
where
handle mvar = do
(personID, sendResult) <- takeMVar mvar
success <- launchActorIO theater env personID
success <- launchActorIO @Person theater env personID
putMVar sendResult success
-- | Convert our foundation to a WAI Application by calling @toWaiAppPlain@ and
@ -446,6 +477,7 @@ sshServer foundation =
(appConnPool foundation)
(loggingFunction foundation)
(appTheater foundation)
(H.hOccurs $ appActors foundation)
mailer :: App -> IO ()
mailer foundation =

View file

@ -38,6 +38,7 @@ import Data.Traversable
import Data.Vector (Vector)
import Database.Persist.Postgresql
import Database.Persist.Sql (ConnectionPool)
import Fcf (Eval, Map)
import Network.HTTP.Client (Manager, HasHttpManager (..))
import Network.HTTP.Types.Header
import Text.Shakespeare.Text (textFile)
@ -60,6 +61,7 @@ import Yesod.Static
import qualified Data.Aeson as A
import qualified Data.ByteString.Lazy as BL (ByteString)
import qualified Data.HashMap.Strict as HM
import qualified Data.HashSet as HS
import qualified Data.HList as H
import qualified Data.Time.Units as U
@ -141,6 +143,7 @@ data App = App
, appTheater :: Theater
, appEnv :: StageEnv Staje
, appPersonLauncher :: MVar (PersonId, MVar Bool)
, appActors :: HList (Eval (Map KeyAndRef_ (StageActors Staje)))
}
-- Aliases for the routes file, because it doesn't like spaces in path piece
@ -725,9 +728,14 @@ instance AccountDB AccountPersistDB' where
error "Failed to spawn new Person, somehow ID already in Theater"
AccountPersistDB' $ do
theater <- asksSite appTheater
there <- liftIO $ sendIO' @"init" theater Proxy personID HNil
peopleVar `HCons` _ `HCons` _ `HCons` _ `HCons` _ `HCons` _ `HCons` factoriesVar `HCons` HNil <- asksSite appActors
people <- liftIO $ readTVarIO peopleVar
case HM.lookup personID people of
Nothing -> error "Failed to find new Person, somehow ID not in appActors"
Just personRef -> do
there <- liftIO $ sendIO' @"init" theater Proxy personRef HNil
unless there $
error "Failed to find new Person, somehow ID not in Theater"
error "Failed to find new Person, somehow Ref not in Theater"
factoryIDs <- runDB $ selectKeysList [] []
{-
let package = (HS.fromList factoryIDs, FactoryMsgVerified personID)
@ -740,8 +748,12 @@ instance AccountDB AccountPersistDB' where
Nothing `H.HCons`
Just package `H.HCons` H.HNil
-}
liftIO $ for_ factoryIDs $ \ (factoryID :: FactoryId) ->
void $ sendIO' @"verified" theater Proxy factoryID (personID `HCons` HNil)
factories <- liftIO $ readTVarIO factoriesVar
let factoryRefs =
HM.elems $
factories `HM.intersection` HS.toMap (HS.fromList factoryIDs)
liftIO $ for_ factoryRefs $ \ (ref :: Ref Factory) ->
void $ sendIO' @"verified" theater Proxy ref (personID `HCons` HNil)
setVerifyKey = (morphAPDB .) . setVerifyKey
setNewPasswordKey = (morphAPDB .) . setNewPasswordKey
setNewPassword = (morphAPDB .) . setNewPassword

View file

@ -172,7 +172,7 @@ import Vervis.Model
instance WA.StageWebRoute Staje where
type StageRoute Staje = Route App
askUrlRenderParams = do
Env _ _ _ _ _ render _ _ <- askEnv
Env _ _ _ _ _ render _ _ _ <- askEnv
case cast render of
Nothing -> error "Env site isn't App"
Just r -> pure r

View file

@ -1,6 +1,7 @@
{- This file is part of Vervis.
-
- Written in 2019, 2020, 2022, 2023 by fr33domlover <fr33domlover@riseup.net>.
- Written in 2019, 2020, 2022, 2023, 2024
- by fr33domlover <fr33domlover@riseup.net>.
-
- Copying is an act of love. Please copy, reuse and share.
-
@ -565,7 +566,7 @@ fetchRemoteActor' iid host luActor = do
Left ers -> Just ers
Right _ -> Nothing
Nothing -> do
Env _ pool _ _ _ _ manager fetch <- askEnv
Env _ pool _ _ _ _ manager fetch _ <- askEnv
liftIO $ runShared fetch (ObjURI host luActor) (pool, manager, iid)
deleteUnusedURAs :: (MonadIO m, MonadLogger m) => ReaderT SqlBackend m ()

View file

@ -19,6 +19,7 @@ module Vervis.Ssh
where
import Control.Applicative ((<|>), optional)
import Control.Concurrent.STM.TVar
import Control.Monad (when)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Logger
@ -29,6 +30,7 @@ import Data.Attoparsec.Text
import Data.ByteString (ByteString)
import Data.ByteString.Lazy (fromStrict)
import Data.Foldable (find)
import Data.HashMap.Strict (HashMap)
import Data.HList (HList (..))
import Data.Maybe (isJust)
import Data.Monoid ((<>))
@ -49,6 +51,7 @@ import System.Process (CreateProcess (..), StdStream (..), createProcess, proc)
import Web.Hashids
import Yesod.Core.Dispatch
import qualified Data.HashMap.Strict as HM
import qualified Data.Text as T
import qualified Formatting as F
@ -70,7 +73,7 @@ import Vervis.Settings
-- Types
-------------------------------------------------------------------------------
type ChannelBase = LoggingT (ReaderT (ConnectionPool, Theater) IO)
type ChannelBase = LoggingT (ReaderT (ConnectionPool, Theater, TVar (HashMap RepoId (Ref Repo))) IO)
type SessionBase = LoggingT (ReaderT ConnectionPool IO)
type UserAuthId = PersonId
@ -102,7 +105,7 @@ src = "SSH"
runChanDB :: SshChanDB a -> Channel a
runChanDB action = do
pool <- lift . lift $ asks fst
(pool, _, _) <- lift . lift $ ask
runSqlPool action pool
runSessDB :: SshSessDB a -> Session a
@ -267,9 +270,13 @@ runAction decodeRepoHash root _wantReply action =
Just repoID -> whenDarcsRepoExists True repoPath $ do
pid <- authId <$> askAuthDetails
liftIO $ setEnv "VERVIS_SSH_USER" (show $ fromSqlKey pid)
theater <- lift . lift $ asks snd
(_, theater, reposVar) <- lift . lift $ ask
repos <- liftIO $ readTVarIO reposVar
case HM.lookup repoID repos of
Nothing -> return $ ARFail "RepoId not found in map"
Just ref -> do
(sendValue, waitValue) <- liftIO newReturn
_ <- liftIO $ sendIO' @"wait-during-push" theater Proxy repoID $ waitValue `HCons` HNil
_ <- liftIO $ sendIO' @"wait-during-push" theater Proxy ref $ waitValue `HCons` HNil
executeWait "darcs" ["apply", "--all", "--repodir", repoPath]
liftIO $ sendValue ()
return ARProcess
@ -294,9 +301,13 @@ runAction decodeRepoHash root _wantReply action =
Just repoID -> whenGitRepoExists True repoPath $ do
pid <- authId <$> askAuthDetails
liftIO $ setEnv "VERVIS_SSH_USER" (show $ fromSqlKey pid)
theater <- lift . lift $ asks snd
(_, theater, reposVar) <- lift . lift $ ask
repos <- liftIO $ readTVarIO reposVar
case HM.lookup repoID repos of
Nothing -> return $ ARFail "RepoId not found in map"
Just ref -> do
(sendValue, waitValue) <- liftIO newReturn
_ <- liftIO $ sendIO' @"wait-during-push" theater Proxy repoID $ waitValue `HCons` HNil
_ <- liftIO $ sendIO' @"wait-during-push" theater Proxy ref $ waitValue `HCons` HNil
executeWait "git-receive-pack" [repoPath]
liftIO $ sendValue ()
return ARProcess
@ -346,8 +357,9 @@ mkConfig
-> ConnectionPool
-> LogFunc
-> Theater
-> TVar (HashMap RepoId (Ref Repo))
-> IO (Config SessionBase ChannelBase UserAuthId)
mkConfig settings ctx pool logFunc theater = do
mkConfig settings ctx pool logFunc theater reposVar = do
keyPair <- keyPairFromFile $ appSshKeyFile settings
return $ Config
{ cSession = SessionConfig
@ -360,13 +372,13 @@ mkConfig settings ctx pool logFunc theater = do
, cChannel = ChannelConfig
{ ccRequestHandler = handle (decodeKeyHashidPure ctx) (appRepoDir settings)
, ccRunBaseMonad =
flip runReaderT (pool, theater) . flip runLoggingT logFunc
flip runReaderT (pool, theater, reposVar) . flip runLoggingT logFunc
}
, cPort = fromIntegral $ appSshPort settings
, cReadyAction = ready logFunc
}
runSsh :: AppSettings -> HashidsContext -> ConnectionPool -> LogFunc -> Theater -> IO ()
runSsh settings ctx pool logFunc theater = do
config <- mkConfig settings ctx pool logFunc theater
runSsh :: AppSettings -> HashidsContext -> ConnectionPool -> LogFunc -> Theater -> TVar (HashMap RepoId (Ref Repo)) -> IO ()
runSsh settings ctx pool logFunc theater reposVar = do
config <- mkConfig settings ctx pool logFunc theater reposVar
startConfig config

View file

@ -256,40 +256,55 @@ getInbox'' grabInbox here getActorID hash = do
ibiidString = "InboxItem #" ++ show (fromSqlKey ibid)
postInbox
:: ( CCA.Actor a
:: forall a b l b0 l0 .
( CCA.Actor a
, ActorLaunch a
, ActorHasMethod a "verse" (Verse :-> Return (Either Text Text))
--, Eval (LookupSig "verse" (ActorInterface a))
-- ~
-- Just (Verse :-> Return (Either Text Text))
, ActorKey a ~ Key a
, ActorIdentity a ~ Key a
, Eq (Key a)
, Hashable (Key a)
, H.HEq
(TVar (M.HashMap (Key a) (ActorRef a)))
(TVar (M.HashMap PersonId (ActorRef Person)))
b0
(TVar (M.HashMap (Key a) (Ref a)))
(TVar (M.HashMap (Key Person) (Ref Person)))
b
, H.HOccurrence'
b
(TVar (M.HashMap (Key a) (Ref a)))
[TVar (M.HashMap (Key Person) (Ref Person)),
TVar (M.HashMap (Key Project) (Ref Project)),
TVar (M.HashMap (Key Group) (Ref Group)),
TVar (M.HashMap (Key Deck) (Ref Deck)),
TVar (M.HashMap (Key Loom) (Ref Loom)),
TVar (M.HashMap (Key Repo) (Ref Repo)),
TVar (M.HashMap (Key Factory) (Ref Factory))]
l
, H.HOccurs'
(TVar (M.HashMap (Key a) (Ref a)))
l
[TVar (M.HashMap (Key Person) (Ref Person)),
TVar (M.HashMap (Key Project) (Ref Project)),
TVar (M.HashMap (Key Group) (Ref Group)),
TVar (M.HashMap (Key Deck) (Ref Deck)),
TVar (M.HashMap (Key Loom) (Ref Loom)),
TVar (M.HashMap (Key Repo) (Ref Repo)),
TVar (M.HashMap (Key Factory) (Ref Factory))]
, H.HEq
(TVar (ActorRefMap a)) (TVar (ActorRefMap Person)) b0
, H.HOccurrence'
b0
(TVar (M.HashMap (Key a) (ActorRef a)))
'[TVar (M.HashMap PersonId (ActorRef Person)),
TVar (M.HashMap ProjectId (ActorRef Project)),
TVar (M.HashMap GroupId (ActorRef Group)),
TVar (M.HashMap DeckId (ActorRef Deck)),
TVar (M.HashMap LoomId (ActorRef Loom)),
TVar (M.HashMap RepoId (ActorRef Vervis.Model.Repo)),
TVar (M.HashMap FactoryId (ActorRef Factory))]
l'0
(TVar (ActorRefMap a))
[TVar (ActorRefMap Person), TVar (ActorRefMap Project),
TVar (ActorRefMap Group), TVar (ActorRefMap Deck),
TVar (ActorRefMap Loom), TVar (ActorRefMap Repo),
TVar (ActorRefMap Factory)]
l0
, H.HOccurs'
(TVar (M.HashMap (Key a) (ActorRef a)))
l'0
'[TVar (M.HashMap PersonId (ActorRef Person)),
TVar (M.HashMap ProjectId (ActorRef Project)),
TVar (M.HashMap GroupId (ActorRef Group)),
TVar (M.HashMap DeckId (ActorRef Deck)),
TVar (M.HashMap LoomId (ActorRef Loom)),
TVar (M.HashMap RepoId (ActorRef Vervis.Model.Repo)),
TVar (M.HashMap FactoryId (ActorRef Factory))]
(TVar (ActorRefMap a))
l0
[TVar (ActorRefMap Person), TVar (ActorRefMap Project),
TVar (ActorRefMap Group), TVar (ActorRefMap Deck),
TVar (ActorRefMap Loom), TVar (ActorRefMap Repo),
TVar (ActorRefMap Factory)]
)
=> (Key a -> LocalActorBy Key) -> Key a -> Handler ()
postInbox toLA recipID = do
@ -319,8 +334,14 @@ postInbox toLA recipID = do
recipByHash <- hashLocalActor recipByKey
msig <- checkForwarding recipByHash
return (author, luActivity, msig)
ref <- lift $ do
tvar <- H.hOccurs <$> getsYesod appActors
actors <- liftIO $ readTVarIO tvar
case M.lookup recipID actors of
Nothing -> notFound
Just (ref :: Ref a) -> pure ref
theater <- getsYesod appTheater
r <- liftIO $ callIO' @"verse" theater Proxy recipID $ Verse authorIdMsig body `HCons` HNil
r <- liftIO $ callIO' @"verse" @a theater Proxy ref $ Verse authorIdMsig body `HCons` HNil
case r of
Nothing -> notFound
Just (Left e) -> throwE e

View file

@ -31,6 +31,8 @@ module Web.Actor.Deliver
)
where
import Control.Concurrent.STM
import Control.Concurrent.STM.TVar
import Control.Exception.Base hiding (handle)
import Control.Monad
import Control.Monad.IO.Class
@ -41,6 +43,7 @@ import Control.Retry
import Data.ByteString (ByteString)
import Data.Foldable
import Data.Hashable
import Data.HashMap.Strict (HashMap)
import Data.HList (HList (..))
import Data.List
import Data.List.NonEmpty (NonEmpty)
@ -61,6 +64,7 @@ import Web.Hashids
import qualified Data.Aeson as A
import qualified Data.ByteString.Lazy as BL
import qualified Data.HashMap.Strict as HM
import qualified Data.HashSet as HS
import qualified Data.HList as H
import qualified Data.Text as T
@ -88,7 +92,7 @@ data DeliveryStage u
instance UriMode u => Actor (DeliveryActor u) where
type ActorStage (DeliveryActor u) = DeliveryStage u
type ActorKey (DeliveryActor u) = ObjURI u
type ActorIdentity (DeliveryActor u) = ObjURI u
type ActorInterface (DeliveryActor u) =
[ "deliver-local" ::: AP.Envelope u :-> Bool :-> Return ()
, "forward-remote" ::: AP.Errand u :-> Return ()
@ -114,6 +118,7 @@ instance UriMode u => Stage (DeliveryStage u) where
, envInit :: (Manager, NonEmpty HeaderName, Int)
}
type StageActors (DeliveryStage u) = '[DeliveryActor u]
type StageSpawn (DeliveryStage u) = AllowSpawn
{-
migrations :: [Migration SqlBackend IO]
@ -138,6 +143,7 @@ data DeliveryTheater u = DeliveryTheater
, _dtLog :: LogFunc
, _dtDir :: OsPath
, _dtTheater :: TheaterFor (DeliveryStage u)
, _dtMap :: TVar (HashMap (ObjURI u) (Ref (DeliveryActor u)))
}
data IdMismatch = IdMismatch deriving Show
@ -219,13 +225,14 @@ decodeUtf = pure
startDeliveryTheater
:: UriMode u
=> NonEmpty HeaderName
=> FilePath
-> NonEmpty HeaderName
-> Int
-> Manager
-> LogFunc
-> OsPath
-> IO (DeliveryTheater u)
startDeliveryTheater headers micros manager logFunc dbRootDir = do
startDeliveryTheater avarBoxPath headers micros manager logFunc dbRootDir = do
-- We first add the sqlite3 extension as needed
entries <- listDirectory dbRootDir
@ -249,7 +256,9 @@ startDeliveryTheater headers micros manager logFunc dbRootDir = do
Right uri -> return uri
env <- mkEnv (manager, headers, micros) logFunc (dbRootDir </> path)
return (u, env)
DeliveryTheater manager headers micros logFunc dbRootDir <$> startTheater logFunc (actors `H.HCons` H.HNil)
(theater, actorMap `HCons` HNil) <- startTheater avarBoxPath logFunc (actors `H.HCons` H.HNil)
actorMapVar <- newTVarIO $ HM.fromList actorMap
return $ DeliveryTheater manager headers micros logFunc dbRootDir theater actorMapVar
data DeliveryMethod u
= MethodDeliverLocal (AP.Envelope u) Bool
@ -257,18 +266,26 @@ data DeliveryMethod u
-- Since sendManyIO isn't available right now, we're using many sendIO
sendHttp :: UriMode u => DeliveryTheater u -> DeliveryMethod u -> [ObjURI u] -> IO ()
sendHttp (DeliveryTheater manager headers micros logFunc root theater) method recips =
sendHttp (DeliveryTheater manager headers micros logFunc root theater actorMapVar) method recips =
case method of
MethodDeliverLocal envelope fwd ->
for_ recips $ \ u -> do
void $ spawnIO theater u (makeEnv u)
void $ sendIO' @"deliver-local" theater Proxy u $ envelope `HCons` fwd `HCons` HNil
ref <- getRef u
void $ sendIO' @"deliver-local" theater Proxy ref $ envelope `HCons` fwd `HCons` HNil
MethodForwardRemote errand ->
for_ recips $ \ u -> do
void $ spawnIO theater u (makeEnv u)
void $ sendIO' @"forward-remote" theater Proxy u $ errand `HCons` HNil
ref <- getRef u
void $ sendIO' @"forward-remote" theater Proxy ref $ errand `HCons` HNil
where
makeEnv u =
either throwIO pure (TE.decodeUtf8' $ urlEncode False $ TE.encodeUtf8 $ renderObjURI u) >>=
encodeUtf . (<.> "sqlite3") . (root </>) . T.unpack >>=
mkEnv (manager, headers, micros) logFunc
getRef u = do
mref <- HM.lookup u <$> readTVarIO actorMapVar
case mref of
Just r -> pure r
Nothing -> do
r <- spawnIO theater u (makeEnv u)
atomically $ modifyTVar' actorMapVar $ HM.insert u r
return r