diff --git a/.gitignore b/.gitignore index cae71c3..1a8670b 100644 --- a/.gitignore +++ b/.gitignore @@ -22,3 +22,5 @@ config/ssh-host-key.pub lib/ repos/ delivery-states/ +actor-counter.sqlite3 +delivery-counter.sqlite3 diff --git a/src/Control/Concurrent/Actor.hs b/src/Control/Concurrent/Actor.hs index 6516400..0522bb2 100644 --- a/src/Control/Concurrent/Actor.hs +++ b/src/Control/Concurrent/Actor.hs @@ -50,6 +50,7 @@ module Control.Concurrent.Actor , MethodHandler (..) -- * Implementing an actor + , SpawnMode (AllowSpawn) , Stage (..) , Actor (..) , Next () @@ -67,6 +68,7 @@ module Control.Concurrent.Actor -- * Calling actor methods , ActorHasMethod + , Ref () , callIO' , sendIO' , call' @@ -93,6 +95,8 @@ module Control.Concurrent.Actor , HAdaptHandler , Handler_ , Handle' + , ActorRefMap + , ActorRefMapTVar_ -- * Exported to allow some Yesod Handlers to reuse some actor actions , runActor @@ -124,9 +128,12 @@ import Control.Monad.Trans.Reader import Data.Foldable import Data.Hashable import Data.HashMap.Strict (HashMap) +import Data.Int +import Data.Maybe import Data.Proxy import Data.Text (Text) import Data.Traversable +import Database.Persist.Sql (PersistField (..), PersistFieldSql (..)) import GHC.TypeLits import UnliftIO.Exception @@ -137,6 +144,7 @@ import qualified Data.Text as T import qualified Vary as V import Control.Concurrent.Return +import Database.Persist.Box --------------------------- Defining method types ---------------------------- @@ -243,17 +251,20 @@ handleMethod = Proxy -- done True -- @ data MethodHandler (actor :: Type) (sym :: Symbol) (sig :: Signature) = - Proxy sym := (ActorKey actor -> HandlerSig (ActorStage actor) sig) + Proxy sym := (ActorIdentity actor -> HandlerSig (ActorStage actor) sig) --------------------------- Implementing an actor ---------------------------- -class Stage (a :: Type) where +data SpawnMode = NoSpawn | AllowSpawn + +class KnownSpawnMode (StageSpawn a) => Stage (a :: Type) where data StageEnv a :: Type type StageActors a :: [Type] + type StageSpawn a :: SpawnMode class Actor (a :: Type) where type ActorStage a :: Type - type ActorKey a = (k :: Type) | k -> a + type ActorIdentity a :: Type type ActorInterface a :: [Method] data Next = Stop | Proceed @@ -266,15 +277,23 @@ class Actor a => ActorLaunch a where type LogFunc = Loc -> LogSource -> LogLevel -> LogStr -> IO () -type ActorRefMap a = HashMap (ActorKey a) (ActorRef a) +type ActorRefMap a = HashMap (Ref a) (ActorRef a) data ActorRefMapTVar_ :: Type -> Exp Type type instance Eval (ActorRefMapTVar_ a) = TVar (ActorRefMap a) +type ActorInt = Int64 + +type TheaterCounter :: SpawnMode -> Type +type family TheaterCounter mode = t | t -> mode where + TheaterCounter NoSpawn = () + TheaterCounter AllowSpawn = (TheaterFor (ACounterStage ActorInt), Ref (ACounter ActorInt)) + -- | A set of live actors responding to messages data TheaterFor s = TheaterFor - { theaterMap :: HList (Eval (Map ActorRefMapTVar_ (StageActors s))) - , theaterLog :: LogFunc + { theaterMap :: HList (Eval (Map ActorRefMapTVar_ (StageActors s))) + , theaterLog :: LogFunc + , theaterCounter :: TheaterCounter (StageSpawn s) } -- | Actor monad in which message reponse actions are executed. Supports @@ -541,15 +560,14 @@ askTheater :: ActFor s (TheaterFor s) askTheater = ActFor $ lift $ asks snd lookupActor - :: ( Eq (ActorKey a), Hashable (ActorKey a) - , H.HOccurs + :: ( H.HOccurs (TVar (ActorRefMap a)) (HList (Eval (Map ActorRefMapTVar_ (StageActors s)))) ) => TheaterFor s - -> ActorKey a + -> Ref a -> IO (Maybe (ActorRef a)) -lookupActor (TheaterFor hlist _) key = +lookupActor (TheaterFor hlist _ _) key = HM.lookup key <$> readTVarIO (H.hOccurs hlist) {- @@ -582,7 +600,6 @@ callIO' (sig::Signature) (stage::Type) . ( Actor a - , Eq (ActorKey a), Hashable (ActorKey a) , m ~ (sym ::: sig) --, Eval (LookupSig sym (ActorInterface a)) ~ Just sig --, Eval (Parcel_ m) V.:| Eval (Map Parcel_ (ActorInterface a)) @@ -593,7 +610,7 @@ callIO' ) => TheaterFor stage -> Proxy m - -> ActorKey a + -> Ref a -> HList (SignatureParams sig) -> IO (Maybe (SignatureReturn sig)) callIO' theater proxy key args = do @@ -617,7 +634,6 @@ sendIO' (sig::Signature) (stage::Type) . ( Actor a - , Eq (ActorKey a), Hashable (ActorKey a) , m ~ (sym ::: sig) , Eval (LookupSig sym (ActorInterface a)) ~ Just sig , Eval (Parcel_ m) V.:| Eval (Map Parcel_ (ActorInterface a)) @@ -627,7 +643,7 @@ sendIO' ) => TheaterFor stage -> Proxy m - -> ActorKey a + -> Ref a -> HList (SignatureParams sig) -> IO Bool sendIO' theater proxy key args = do @@ -654,7 +670,6 @@ call' (stage::Type) (monad :: Type -> Type) . ( Actor a - , Eq (ActorKey a), Hashable (ActorKey a) , m ~ (sym ::: sig) , Eval (LookupSig sym (ActorInterface a)) ~ Just sig , Eval (Parcel_ m) V.:| Eval (Map Parcel_ (ActorInterface a)) @@ -666,12 +681,12 @@ call' , MonadActorStage monad ~ stage ) => Proxy m - -> ActorKey a + -> Ref a -> HList (SignatureParams sig) -> monad (Maybe (SignatureReturn sig)) -call' proxy key args = liftActor $ do +call' proxy ref args = liftActor $ do theater <- askTheater - liftIO $ callIO' theater proxy key args + liftIO $ callIO' theater proxy ref args -- | Like 'send', except a Proxy is passed to specify the method's name, and -- arguments are passed as a 'HList'. @@ -684,7 +699,6 @@ send' (stage::Type) (monad :: Type -> Type) . ( Actor a - , Eq (ActorKey a), Hashable (ActorKey a) , m ~ (sym ::: sig) , Eval (LookupSig sym (ActorInterface a)) ~ Just sig , Eval (Parcel_ m) V.:| Eval (Map Parcel_ (ActorInterface a)) @@ -696,7 +710,7 @@ send' , MonadActorStage monad ~ stage ) => Proxy m - -> ActorKey a + -> Ref a -> HList (SignatureParams sig) -> monad Bool send' proxy key args = liftActor $ do @@ -730,7 +744,7 @@ type family CallSig (stage :: Type) (signature :: Signature) = (a :: Type) | a - CallSig s (t :-> sig) = t -> CallSig s sig class ActorMethodCall (sym :: Symbol) (actor :: Type) (params :: [Type]) (result :: Type) where - actorMethodCall :: ActorKey actor -> HList params -> result + actorMethodCall :: Ref actor -> HList params -> result instance forall @@ -744,7 +758,6 @@ instance (params :: [Type]) (paramsRev :: [Type]) . ( Actor a - , Eq (ActorKey a), Hashable (ActorKey a) , Eval (LookupSig sym (ActorInterface a)) ~ Just sig , m ~ (sym ::: sig) , ret ~ SignatureReturn sig @@ -781,7 +794,7 @@ call , Eval (LookupSig sym (ActorInterface actor)) ~ Just sig , r ~ CallSig (ActorStage actor) sig ) - => ActorKey actor + => Ref actor -> r call key = actorMethodCall @sym key HNil @@ -790,7 +803,7 @@ type family SendSig (stage :: Type) (signature :: Signature) = (a :: Type) | a - SendSig s (t :-> sig) = t -> SendSig s sig class ActorMethodSend (sym :: Symbol) (actor :: Type) (params :: [Type]) (result :: Type) where - actorMethodSend :: ActorKey actor -> HList params -> result + actorMethodSend :: Ref actor -> HList params -> result instance forall @@ -803,7 +816,6 @@ instance (params :: [Type]) (paramsRev :: [Type]) . ( Actor a - , Eq (ActorKey a), Hashable (ActorKey a) , Eval (LookupSig sym (ActorInterface a)) ~ Just sig , m ~ (sym ::: sig) , Eval (Parcel_ m) V.:| Eval (Map Parcel_ (ActorInterface a)) @@ -839,7 +851,7 @@ send , Eval (LookupSig sym (ActorInterface actor)) ~ Just sig , r ~ SendSig (ActorStage actor) sig ) - => ActorKey actor + => Ref actor -> r send key = actorMethodSend @sym key HNil @@ -912,24 +924,24 @@ uncurryHandler = uncurryH adaptHandler :: ( ActorStage actor ~ stage - , Show (ActorKey actor) , KnownSymbol sym , UncurryH (SignatureParams sig) (HandlerSig stage sig) (HandlerAction stage (SignatureReturn sig)) ) - => ActorKey actor + => Ref actor + -> ActorIdentity actor -> MethodHandler actor sym sig -> (Proxy sym, Parcel sig) -> (AdaptedAction (ActorStage actor), Text) -adaptHandler key (Proxy := handler) (p@Proxy, Parcel args respond) = +adaptHandler ref ident (Proxy := handler) (p@Proxy, Parcel args respond) = (go, prefixOn) where - prefix = T.concat ["[Actor '", T.pack $ show key, "']"] + prefix = T.concat ["[Actor '", T.pack $ show ref, "']"] prefixOn = T.concat [prefix, " on ", T.pack $ symbolVal p] go = do - result <- try $ uncurryHandler (handler key) args + result <- try $ uncurryHandler (handler ident) args case result of Left e -> do logError $ T.concat [prefix, " exception: ", T.pack $ displayException (e :: SomeException)] @@ -942,11 +954,10 @@ adaptHandler key (Proxy := handler) (p@Proxy, Parcel args respond) = -- This is for adaptHandler to work with hMapL -data HAdaptHandler a = HAdaptHandler (ActorKey a) +data HAdaptHandler a = HAdaptHandler (Ref a) (ActorIdentity a) instance ( ActorStage actor ~ stage - , Show (ActorKey actor) , KnownSymbol sym , UncurryH (SignatureParams sig) @@ -956,12 +967,11 @@ instance , o ~ ( (Proxy sym, Parcel sig) -> (AdaptedAction stage, Text) ) ) => H.ApplyAB (HAdaptHandler actor) i o where - applyAB (HAdaptHandler key) = adaptHandler key + applyAB (HAdaptHandler ref ident) = adaptHandler ref ident data AdaptHandlerConstraint :: Type -> Method -> Exp Constraint type instance Eval (AdaptHandlerConstraint actor (sym ::: sig)) = - ( Show (ActorKey actor) - , KnownSymbol sym + ( KnownSymbol sym , UncurryH (SignatureParams sig) (HandlerSig (ActorStage actor) sig) @@ -976,12 +986,10 @@ type instance Eval (AdaptedHandler stage (sym ::: sig)) = (Proxy sym, Parcel sig) -> (AdaptedAction stage, Text) launchActorThread - :: forall (a::Type) (k::Type) (s::Type) (ms::[Method]) . + :: forall (a::Type) (s::Type) (ms::[Method]) . ( ActorLaunch a , ActorStage a ~ s - , ActorKey a ~ k , ActorInterface a ~ ms - , Eq k, Hashable k, Show k , H.HOccurs (TVar (ActorRefMap a)) (HList (Eval (Map ActorRefMapTVar_ (StageActors s)))) @@ -1011,14 +1019,15 @@ launchActorThread ) ) ) - => Chan (Invocation ms) + => Ref a + -> Chan (Invocation ms) -> TheaterFor s - -> k + -> ActorIdentity a -> StageEnv s -> IO () -launchActorThread chan theater actor env = +launchActorThread ref chan theater actor env = void $ forkIO $ runActor theater env $ do - let handlers' = H.hMapL (HAdaptHandler actor) handlers :: HList (Eval (Map (AdaptedHandler s) ms)) + let handlers' = H.hMapL (HAdaptHandler ref actor) handlers :: HList (Eval (Map (AdaptedHandler s) ms)) logInfo $ prefix <> " starting" loop handlers' logInfo $ prefix <> " bye" @@ -1026,7 +1035,7 @@ launchActorThread chan theater actor env = handlers :: HList (Eval (Map (Handler_ a) ms)) handlers = actorBehavior (Proxy @a) - prefix = T.concat ["[Actor '", T.pack $ show actor, "']"] + prefix = T.concat ["[Actor '", T.pack $ show ref, "']"] loop :: HList (Eval (Map (AdaptedHandler s) ms)) -> ActFor s () loop handlers' = do @@ -1044,7 +1053,7 @@ launchActorThread chan theater actor env = logInfo $ T.concat [prefixOn, " stopping"] let tvar = H.hOccurs (theaterMap theater) :: TVar (ActorRefMap a) - liftIO $ atomically $ modifyTVar' tvar $ HM.delete actor + liftIO $ atomically $ modifyTVar' tvar $ HM.delete ref return False Proceed -> do @@ -1054,12 +1063,12 @@ launchActorThread chan theater actor env = -- | Same as 'spawn', except it takes the theater as a parameter. spawnIO - :: forall (a::Type) (k::Type) (s::Type) (ms::[Method]) . + :: forall (a::Type) (s::Type) (ms::[Method]) . ( ActorLaunch a , ActorStage a ~ s - , ActorKey a ~ k + , Stage s + , StageSpawn s ~ AllowSpawn , ActorInterface a ~ ms - , Eq k, Hashable k, Show k , H.HOccurs (TVar (ActorRefMap a)) (HList (Eval (Map ActorRefMapTVar_ (StageActors s)))) @@ -1090,36 +1099,30 @@ spawnIO ) ) => TheaterFor s - -> ActorKey a + -> ActorIdentity a -> IO (StageEnv s) - -> IO Bool -spawnIO theater@(TheaterFor hlist _) key mkEnv = do + -> IO (Ref a) +spawnIO theater@(TheaterFor hlist _ (acounterTheater, acounterRef)) ident mkEnv = do let tvar = H.hOccurs hlist :: TVar (ActorRefMap a) chan <- newChan - added <- atomically $ stateTVar tvar $ \ hm -> - let hm' = HM.alter (create $ ActorRef $ ActorRef' chan) key hm - in ( not (HM.member key hm) && HM.member key hm' - , hm' - ) - when added $ do - env <- mkEnv - launchActorThread chan theater key env - return added - where - create actor Nothing = Just actor - create _ j@(Just _) = j + next <- fromJust <$> callIO' @"next" acounterTheater Proxy acounterRef HNil + let ref = Ref next + atomically $ modifyTVar' tvar $ HM.insert ref (ActorRef $ ActorRef' chan) + env <- mkEnv + launchActorThread ref chan theater ident env + return ref -- | Launch a new actor with the given ID and behavior. Return 'True' if the ID -- was unused and the actor has been launched. Return 'False' if the ID is -- already in use, thus a new actor hasn't been launched. spawn - :: forall (m::Type->Type) (a::Type) (k::Type) (s::Type) (ms::[Method]) . + :: forall (a::Type) (m::Type->Type) (s::Type) (ms::[Method]) . ( MonadActor m, MonadActorStage m ~ s , ActorLaunch a , ActorStage a ~ s - , ActorKey a ~ k + , Stage s + , StageSpawn s ~ AllowSpawn , ActorInterface a ~ ms - , Eq k, Hashable k, Show k , H.HOccurs (TVar (ActorRefMap a)) (HList (Eval (Map ActorRefMapTVar_ (StageActors s)))) @@ -1149,22 +1152,20 @@ spawn ) ) ) - => ActorKey a + => ActorIdentity a -> IO (StageEnv s) - -> m Bool -spawn key mkEnv = liftActor $ do + -> m (Ref a) +spawn ident mkEnv = liftActor $ do theater <- askTheater - liftIO $ spawnIO theater key mkEnv + liftIO $ spawnIO theater ident mkEnv --------------------------- Launching the actor system ----------------------- prepareActorType - :: forall (a::Type) (k::Type) (s::Type) (ms::[Method]) . + :: forall (a::Type) (s::Type) (ms::[Method]) . ( ActorLaunch a , ActorStage a ~ s - , ActorKey a ~ k , ActorInterface a ~ ms - , Eq k, Hashable k, Show k , H.HOccurs (TVar (ActorRefMap a)) (HList (Eval (Map ActorRefMapTVar_ (StageActors s)))) @@ -1195,34 +1196,40 @@ prepareActorType ) , Stage s ) - => [(k, StageEnv s)] + => TheaterCounter (StageSpawn s) + -> [(ActorIdentity a, StageEnv s)] -> IO - ( TVar (ActorRefMap a) - , TheaterFor s -> IO () + ( ( TVar (ActorRefMap a) + , TheaterFor s -> IO () + ) + , [(ActorIdentity a, Ref a)] ) -prepareActorType actors = do - actorsWithChans <- for actors $ \ (key, env) -> do +prepareActorType counter actors = do + refs <- produceRefs counter $ length actors + let actorsWithRefs = zip actors refs + actorsWithChans <- for actorsWithRefs $ \ ((ident, env), ref) -> do chan <- newChan - return (key, env, chan) + return (ref, ident, env, chan) tvar <- newTVarIO $ HM.fromList $ map - (\ (key, _, chan) -> (key, ActorRef $ ActorRef' chan)) + (\ (ref, _, _, chan) -> (ref, ActorRef $ ActorRef' chan)) actorsWithChans return - ( tvar - , \ theater -> for_ actorsWithChans $ \ (key, env, chan) -> - launchActorThread chan theater key env + ( ( tvar + , \ theater -> for_ actorsWithChans $ \ (ref, ident, env, chan) -> + launchActorThread ref chan theater ident env + ) + , map (\ (ref, ident, _, _) -> (ident, ref)) actorsWithChans ) -data HPrepareActorType = HPrepareActorType +data HPrepareActorType (sm::SpawnMode) = HPrepareActorType (TheaterCounter sm) instance - forall (a::Type) (k::Type) (s::Type) (ms::[Method]) (i::Type) (o::Type). + forall (a::Type) (s::Type) (sm::SpawnMode) (ms::[Method]) (i::Type) (o::Type). ( ActorLaunch a , ActorStage a ~ s - , ActorKey a ~ k + , StageSpawn s ~ sm , ActorInterface a ~ ms - , Eq k, Hashable k, Show k , H.HOccurs (TVar (ActorRefMap a)) (HList (Eval (Map ActorRefMapTVar_ (StageActors s)))) @@ -1252,16 +1259,15 @@ instance ) ) , Stage s - , i ~ [(k, StageEnv s)] - , o ~ IO (TVar (ActorRefMap a), TheaterFor s -> IO ()) + , i ~ [(ActorIdentity a, StageEnv s)] + , o ~ IO ((TVar (ActorRefMap a), TheaterFor s -> IO ()), [(ActorIdentity a, Ref a)]) ) => - H.ApplyAB HPrepareActorType i o where - applyAB _ a = prepareActorType a + H.ApplyAB (HPrepareActorType sm) i o where + applyAB (HPrepareActorType counter) a = prepareActorType counter a data A_ :: Type -> Exp Constraint type instance Eval (A_ a) = ( ActorLaunch a - , Eq (ActorKey a), Hashable (ActorKey a), Show (ActorKey a) , H.HOccurs (TVar (ActorRefMap a)) (HList (Eval (Map ActorRefMapTVar_ (StageActors (ActorStage a))))) @@ -1293,19 +1299,42 @@ type instance Eval (A_ a) = ) data Starter :: Type -> Exp Type -type instance Eval (Starter a) = [(ActorKey a, StageEnv (ActorStage a))] +type instance Eval (Starter a) = [(ActorIdentity a, StageEnv (ActorStage a))] data Prepare_ :: Type -> Type -> Exp Type -type instance Eval (Prepare_ s a) = IO (TVar (ActorRefMap a), TheaterFor s -> IO ()) +type instance Eval (Prepare_ s a) = IO ((TVar (ActorRefMap a), TheaterFor s -> IO ()), [(ActorIdentity a, Ref a)]) data Pair_ :: Type -> Type -> Exp Type type instance Eval (Pair_ s a) = (TVar (ActorRefMap a), TheaterFor s -> IO ()) +data Triplet_ :: Type -> Type -> Exp Type +type instance Eval (Triplet_ s a) = ((TVar (ActorRefMap a), TheaterFor s -> IO ()), [(ActorIdentity a, Ref a)]) + data Launch_ :: Type -> Type -> Exp Type type instance Eval (Launch_ s _) = TheaterFor s -> IO () --- | Launch the actor system -startTheater +data Finisher :: Type -> Exp Type +type instance Eval (Finisher a) = [(ActorIdentity a, Ref a)] + +class KnownSpawnMode (sm :: SpawnMode) where + type SpawnModeInput sm = (i :: Type) | i -> sm + loadCounter :: SpawnModeInput sm -> IO (TheaterCounter sm) + produceRefs :: TheaterCounter sm -> Int -> IO [Ref a] + +instance KnownSpawnMode NoSpawn where + type SpawnModeInput NoSpawn = () + loadCounter () = pure () + produceRefs () count = pure $ map Ref [0 .. toEnum count - 1] + +instance KnownSpawnMode AllowSpawn where + type SpawnModeInput AllowSpawn = (LogFunc, FilePath) + loadCounter (logFunc, pathA) = loadSingleACounterTheater logFunc pathA 0 + produceRefs (acounterTheater, acounterRef) count = + replicateM count $ + Ref . fromJust <$> + callIO' @"next" acounterTheater Proxy acounterRef HNil + +startTheater' :: forall (s :: Type) (as :: [Type]) . ( Stage s , StageActors s ~ as @@ -1313,54 +1342,107 @@ startTheater , H.HMapAux HList - HPrepareActorType + (HPrepareActorType (StageSpawn s)) (Eval (Map Starter as)) (Eval (Map (Prepare_ s) as)) - , H.SameLength' - (Eval (Map Starter as)) - (Eval (Map (Prepare_ s) as)) - , H.SameLength' - (Eval (Map (Prepare_ s) as)) - (Eval (Map Starter as)) - , H.HSequence - IO - (Eval (Map (Prepare_ s) as)) + IO (Eval (Map (Prepare_ s) as)) (Eval (Map (Triplet_ s) as)) + , H.HZipList (Eval (Map (Pair_ s) as)) - - , H.SameLength' - (Eval (Map ActorRefMapTVar_ as)) - (Eval (Map (Launch_ s) as)) - , H.SameLength' - (Eval (Map (Launch_ s) as)) - (Eval (Map ActorRefMapTVar_ as)) - - , H.SameLength' - (Eval (Map (Launch_ s) as)) - (Eval (Map (Pair_ s) as)) - , H.SameLength' - (Eval (Map (Pair_ s) as)) - (Eval (Map (Launch_ s) as)) - + (Eval (Map Finisher as)) + (Eval (Map (Triplet_ s) as)) , H.HZipList (Eval (Map ActorRefMapTVar_ as)) (Eval (Map (Launch_ s) as)) (Eval (Map (Pair_ s) as)) - , H.HList2List - (Eval (Map (Launch_ s) as)) - (TheaterFor s -> IO ()) + (Eval (Map (Launch_ s) as)) (TheaterFor s -> IO ()) + , H.SameLength' + (Eval (Map (Prepare_ s) as)) (Eval (Map Starter as)) + , H.SameLength' + (Eval (Map (Triplet_ s) as)) (Eval (Map Finisher as)) + , H.SameLength' + (Eval (Map (Launch_ s) as)) (Eval (Map (Pair_ s) as)) + , H.SameLength' + (Eval (Map Starter as)) (Eval (Map (Prepare_ s) as)) + , H.SameLength' + (Eval (Map (Pair_ s) as)) (Eval (Map Finisher as)) + , H.SameLength' + (Eval (Map (Launch_ s) as)) (Eval (Map ActorRefMapTVar_ as)) + , H.SameLength' + (Eval (Map Finisher as)) (Eval (Map (Pair_ s) as)) + , H.SameLength' + (Eval (Map (Pair_ s) as)) (Eval (Map (Launch_ s) as)) + , H.SameLength' + (Eval (Map Finisher as)) (Eval (Map (Triplet_ s) as)) + , H.SameLength' + (Eval (Map ActorRefMapTVar_ as)) (Eval (Map (Launch_ s) as)) ) - => LogFunc + => SpawnModeInput (StageSpawn s) + -> LogFunc -> HList (Eval (Map Starter as)) - -> IO (TheaterFor s) -startTheater logFunc actors = do - let actions = H.hMapL HPrepareActorType actors :: HList (Eval (Map (Prepare_ s) as)) - mapsAndLaunches <- H.hSequence actions :: IO (HList (Eval (Map (Pair_ s) as))) - let (maps :: HList (Eval (Map ActorRefMapTVar_ as)), launches :: HList (Eval (Map (Launch_ s) as))) = H.hUnzip mapsAndLaunches - theater = TheaterFor maps logFunc + -> IO (TheaterFor s, HList (Eval (Map Finisher as))) +startTheater' input logFunc actors = do + counter <- loadCounter input + let actions = H.hMapL (HPrepareActorType counter) actors :: HList (Eval (Map (Prepare_ s) as)) + mapsAndLaunchesAndResults <- H.hSequence actions :: IO (HList (Eval (Map (Triplet_ s) as))) + let (mapsAndLaunches :: HList (Eval (Map (Pair_ s) as)), results :: HList (Eval (Map Finisher as))) = H.hUnzip mapsAndLaunchesAndResults + (maps :: HList (Eval (Map ActorRefMapTVar_ as)), launches :: HList (Eval (Map (Launch_ s) as))) = H.hUnzip mapsAndLaunches + theater = TheaterFor maps logFunc counter for_ (H.hList2List launches) $ \ launch -> launch theater - return theater + return (theater, results) + +-- | Launch the actor system +startTheater + :: forall (s :: Type) (as :: [Type]) . + ( Stage s + , StageSpawn s ~ AllowSpawn + , StageActors s ~ as + , Eval (Constraints (Eval (Map A_ as))) + + , H.HMapAux + HList + (HPrepareActorType (StageSpawn s)) + (Eval (Map Starter as)) + (Eval (Map (Prepare_ s) as)) + , H.HSequence + IO (Eval (Map (Prepare_ s) as)) (Eval (Map (Triplet_ s) as)) + , H.HZipList + (Eval (Map (Pair_ s) as)) + (Eval (Map Finisher as)) + (Eval (Map (Triplet_ s) as)) + , H.HZipList + (Eval (Map ActorRefMapTVar_ as)) + (Eval (Map (Launch_ s) as)) + (Eval (Map (Pair_ s) as)) + , H.HList2List + (Eval (Map (Launch_ s) as)) (TheaterFor s -> IO ()) + , H.SameLength' + (Eval (Map (Prepare_ s) as)) (Eval (Map Starter as)) + , H.SameLength' + (Eval (Map (Triplet_ s) as)) (Eval (Map Finisher as)) + , H.SameLength' + (Eval (Map (Launch_ s) as)) (Eval (Map (Pair_ s) as)) + , H.SameLength' + (Eval (Map Starter as)) (Eval (Map (Prepare_ s) as)) + , H.SameLength' + (Eval (Map (Pair_ s) as)) (Eval (Map Finisher as)) + , H.SameLength' + (Eval (Map (Launch_ s) as)) (Eval (Map ActorRefMapTVar_ as)) + , H.SameLength' + (Eval (Map Finisher as)) (Eval (Map (Pair_ s) as)) + , H.SameLength' + (Eval (Map (Pair_ s) as)) (Eval (Map (Launch_ s) as)) + , H.SameLength' + (Eval (Map Finisher as)) (Eval (Map (Triplet_ s) as)) + , H.SameLength' + (Eval (Map ActorRefMapTVar_ as)) (Eval (Map (Launch_ s) as)) + ) + => FilePath + -> LogFunc + -> HList (Eval (Map Starter as)) + -> IO (TheaterFor s, HList (Eval (Map Finisher as))) +startTheater avarBoxPath logFunc = startTheater' (logFunc, avarBoxPath) logFunc @@ -1482,7 +1564,7 @@ sendManyIO => TheaterFor s -> HList (Eval (Map Set_ (StageActors s))) -> IO () -sendManyIO (TheaterFor hlist _) recips = +sendManyIO (TheaterFor hlist _ _) recips = let zipped = H.hZip hlist recips :: HList (Eval (Map Pair__ (StageActors s))) actions = H.hMapL HSendTo zipped @@ -1620,3 +1702,159 @@ data Parcel (s :: Signature') = Parcel } -} + +-- We're going to define a "simple value holder" actor, which is both a good +-- simple example for using the actor system itself, and will serve us for +-- holding vat and actor auto-increasing numbering. +-- +-- Let's start with the Box. + +newtype Cell a = Cell a +instance PersistField a => PersistField (Cell a) where + toPersistValue (Cell v) = toPersistValue v + fromPersistValue = fmap Cell . fromPersistValue + +instance PersistFieldSql a => PersistFieldSql (Cell a) where + sqlType = sqlType . fmap uncell + where + uncell (Cell v) = v + +instance PersistFieldSql a => BoxableVia (Cell a) where + type BV (Cell a) = BoxableField + +-- Let's use this Box in a new actor type, the value holder + +data AVarStage (a :: Type) + +instance Stage (AVarStage a) where + data StageEnv (AVarStage a) = AVarStageEnv (Box (Cell a)) + type StageActors (AVarStage a) = '[AVar a] + type StageSpawn (AVarStage a) = NoSpawn + +data AVar (a :: Type) + +instance Actor (AVar a) where + type ActorStage (AVar a) = AVarStage a + type ActorInterface (AVar a) = + [ "get" ::: Return a + , "put" ::: a :-> Return () + ] + type ActorIdentity (AVar a) = () + +instance PersistFieldSql a => ActorLaunch (AVar a) where + actorBehavior _ = + (handleMethod @"get" := \ () -> do + AVarStageEnv box <- askEnv + Cell val <- withBox box obtain + done val + ) + `HCons` + (handleMethod @"put" := \ () val -> do + AVarStageEnv box <- askEnv + withBox box $ bestow $ Cell val + done () + ) + `HCons` + HNil + +-- So, we want to load a theater with exactly one AVar + +loadSingleAVarTheater + :: PersistFieldSql a + => LogFunc + -> FilePath + -> a + -> IO (TheaterFor (AVarStage a), Ref (AVar a)) +loadSingleAVarTheater logFunc path initial = do + box <- flip runLoggingT logFunc $ loadBox path $ Cell initial + startStar () logFunc () (AVarStageEnv box) + +-- Now, we have an infinite loop problem: +-- +-- * Every theater needs an AVar, in order to auto-increase the key counter +-- * The AVar comes inside a Theater +-- +-- Solution: Instead of using a Theater, we're going to write a different +-- version specialized for our purpose here. A single-actor theater, that +-- doesn't need to count. But since ActFor uses a Theater, we're going to use +-- Theater as well, except we just rely on avoiding any spawning. + +startStar counter logFunc ident env = do + (theater, actors `HCons` HNil) <- startTheater' counter logFunc $ [(ident, env)] `HCons` HNil + ref <- + case actors of + [((), r)] -> pure r + _ -> error "startStar: Expected exactly one actor" + return (theater, ref) + +-- Now, we can't really use AVar here because we need increase to be atomic, +-- and AVar has only 'get' and 'put'. We could write a whole new actor type for +-- this, or just wrap AVar (shows what wrapping looks like with current API), +-- or add a new method to AVar (which wouldn't work in a network setting, +-- unless there's a way to send a function (a->a) remotely). +-- +-- Let's go for the wrapping. + +data ACounterStage (a :: Type) + +instance Stage (ACounterStage a) where + data StageEnv (ACounterStage a) = ACounterStageEnv (TheaterFor (AVarStage a)) (Ref (AVar a)) + type StageActors (ACounterStage a) = '[ACounter a] + type StageSpawn (ACounterStage a) = NoSpawn + +data ACounter (a :: Type) + +instance Actor (ACounter a) where + type ActorStage (ACounter a) = ACounterStage a + type ActorInterface (ACounter a) = + '[ "next" ::: Return a + ] + type ActorIdentity (ACounter a) = () + +instance (Integral a, PersistFieldSql a) => ActorLaunch (ACounter a) where + actorBehavior _ = + (handleMethod @"next" := \ () -> do + ACounterStageEnv avarTheater avarRef <- askEnv + val <- liftIO $ fromJust <$> callIO' @"get" avarTheater Proxy avarRef HNil + void $ liftIO $ sendIO' @"put" avarTheater Proxy avarRef $ (val+1) `HCons` HNil + done val + ) + `HCons` + HNil + +-- So, we want to load a theater with exactly one ACounter + +loadSingleACounterTheater + :: (Integral a, PersistFieldSql a) + => LogFunc + -> FilePath + -> a + -> IO (TheaterFor (ACounterStage a), Ref (ACounter a)) +loadSingleACounterTheater logFunc pathA initial = do + (theaterA, avarRef) <- loadSingleAVarTheater logFunc pathA initial + startStar () logFunc () (ACounterStageEnv theaterA avarRef) + +-- We now modify TheaterFor, to have 2 types of theaters: Ones that allow +-- spawning, and ones that don't. +-- +-- And we're going to do it type-based. +-- +-- Done. Now, how will we track the bidirectional mapping between DB key and +-- internal actor number? Let's see why we need both directions: +-- +-- * In actor handlers, we need the DB key avalable somehow, i.e. startTheater +-- and spawn/IO must take this per-actor "env" +-- * Inbox POST handlers need to determine the internal ID in order to insert +-- the incoming activity + +-- First let's define a type to use for the ID + +newtype Ref a = Ref ActorInt deriving newtype (Eq, Show, Read, Hashable) + +-- Now, for each actor type, specify an env type, and have spawn & startTheater +-- take these values + +-- Done. Now, we need to keep the map updated: +-- +-- [ ] Whenever an actor is deleted, remove from appActors as well (preferrably +-- even before the removal from Theater) diff --git a/src/Vervis/API.hs b/src/Vervis/API.hs index 4b09f15..2e20808 100644 --- a/src/Vervis/API.hs +++ b/src/Vervis/API.hs @@ -32,6 +32,7 @@ module Vervis.API where import Control.Applicative +import Control.Concurrent.STM.TVar import Control.Exception hiding (Handler, try) import Control.Monad import Control.Monad.IO.Class @@ -69,6 +70,8 @@ import Yesod.Persist.Core import qualified Data.ByteString as B import qualified Data.ByteString.Lazy as BL +import qualified Data.HashMap.Strict as HM +import qualified Data.HList as H import qualified Data.List.NonEmpty as NE import qualified Data.Text as T import qualified Data.Text.Encoding as TE @@ -145,10 +148,16 @@ handleViaActor -> AP.Action URIMode -> ExceptT Text Handler OutboxItemId handleViaActor personID maybeCap localRecips remoteRecips fwdHosts action = do + personRef <- do + peopleVar <- H.hOccurs <$> asksSite appActors + people <- liftIO $ readTVarIO peopleVar + case HM.lookup personID people of + Nothing -> error "Person not found in appActors" + Just ref -> pure ref theater <- asksSite appTheater let maybeCap' = first (\ (byKey, _, i) -> (byKey, i)) <$> maybeCap msg = ClientMsg maybeCap' localRecips remoteRecips fwdHosts action - maybeResult <- liftIO $ callIO' @"client" theater Proxy personID $ msg `HCons` HNil + maybeResult <- liftIO $ callIO' @"client" @Person theater Proxy personRef $ msg `HCons` HNil outboxItemID <- case maybeResult of Nothing -> error "Person not found in theater" @@ -1123,7 +1132,7 @@ createPatchTrackerC (Entity pidUser personUser) senderActor maybeCap localRecips success <- do theater <- asksSite appTheater env <- asksSite appEnv - liftIO $ launchActorIO theater env loomID + liftIO $ launchActorIO @Loom theater env loomID unless success $ error "Failed to spawn new Loom, somehow ID already in Theater" @@ -1377,7 +1386,7 @@ createRepositoryC (Entity pidUser personUser) senderActor maybeCap localRecips r success <- do theater <- asksSite appTheater env <- asksSite appEnv - liftIO $ launchActorIO theater env repoID + liftIO $ launchActorIO @Repo theater env repoID unless success $ error "Failed to spawn new Repo, somehow ID already in Theater" diff --git a/src/Vervis/Actor.hs b/src/Vervis/Actor.hs index d671c1e..a13c04b 100644 --- a/src/Vervis/Actor.hs +++ b/src/Vervis/Actor.hs @@ -22,6 +22,9 @@ {-# LANGUAGE StandaloneDeriving #-} {-# LANGUAGE UndecidableInstances #-} +-- For launchActor not to need to take a Proxy +{-# LANGUAGE AllowAmbiguousTypes #-} + module Vervis.Actor ( -- * Local actors LocalActorBy (..) @@ -79,6 +82,7 @@ module Vervis.Actor , ClientMsg (..) -- * Behavior utility types + , KeyAndRef_ , StageEnv (..) , Staje , Act @@ -106,6 +110,7 @@ module Vervis.Actor ) where +import Control.Concurrent.STM import Control.Concurrent.STM.TVar import Control.Monad import Control.Monad.IO.Class @@ -121,6 +126,8 @@ import Data.Function import Data.Hashable import Data.HashMap.Strict (HashMap) import Data.HashSet (HashSet) +import Data.HList (HList (..)) +import Data.Kind import Data.List.NonEmpty (NonEmpty) import Data.Maybe import Data.Text (Text) @@ -139,6 +146,7 @@ import qualified Control.Monad.Fail as F import qualified Data.Aeson as A import qualified Data.ByteString.Lazy as BL import qualified Data.HashSet as HS +import qualified Data.HashMap.Strict as HM import qualified Data.HList as H import qualified Data.List.NonEmpty as NE import qualified Data.List.Ordered as LO @@ -491,7 +499,7 @@ type Ret = Return (Either Text Text) instance Actor Person where type ActorStage Person = Staje - type ActorKey Person = PersonId + type ActorIdentity Person = PersonId type ActorInterface Person = [ "verse" ::: Verse :-> Ret , "client" ::: ClientMsg :-> Return (Either Text OutboxItemId) @@ -499,41 +507,41 @@ instance Actor Person where ] instance Actor Deck where type ActorStage Deck = Staje - type ActorKey Deck = DeckId + type ActorIdentity Deck = DeckId type ActorInterface Deck = [ "verse" ::: Verse :-> Ret , "init" ::: (Either (LocalActorBy Key, ActorId, OutboxItemId) (RemoteAuthor, LocalURI)) :-> Ret ] instance Actor Loom where type ActorStage Loom = Staje - type ActorKey Loom = LoomId + type ActorIdentity Loom = LoomId type ActorInterface Loom = '[ "verse" ::: Verse :-> Ret ] instance Actor Repo where type ActorStage Repo = Staje - type ActorKey Repo = RepoId + type ActorIdentity Repo = RepoId type ActorInterface Repo = [ "verse" ::: Verse :-> Ret , "wait-during-push" ::: IO () :-> Ret ] instance Actor Project where type ActorStage Project = Staje - type ActorKey Project = ProjectId + type ActorIdentity Project = ProjectId type ActorInterface Project = [ "verse" ::: Verse :-> Ret , "init" ::: (Either (LocalActorBy Key, ActorId, OutboxItemId) (RemoteAuthor, LocalURI)) :-> Ret ] instance Actor Group where type ActorStage Group = Staje - type ActorKey Group = GroupId + type ActorIdentity Group = GroupId type ActorInterface Group = [ "verse" ::: Verse :-> Ret , "init" ::: (Either (LocalActorBy Key, ActorId, OutboxItemId) (RemoteAuthor, LocalURI)) :-> Ret ] instance Actor Factory where type ActorStage Factory = Staje - type ActorKey Factory = FactoryId + type ActorIdentity Factory = FactoryId type ActorInterface Factory = [ "verse" ::: Verse :-> Ret , "verified" ::: PersonId :-> Ret @@ -571,6 +579,9 @@ instance VervisActor Factory where toVerse _ = Nothing -} +data KeyAndRef_ :: Type -> Exp Type +type instance Eval (KeyAndRef_ a) = TVar (HashMap (Key a) (Ref a)) + instance Stage Staje where data StageEnv Staje = forall y. (Typeable y, Yesod y) => Env -- | Data to which every actor has access. Since such data can be passed to the @@ -594,9 +605,11 @@ instance Stage Staje where , envYesodRender :: YesodRender y , envHttpManager :: Manager , envFetch :: ActorFetchShare + , envActors :: HList (Eval (Map KeyAndRef_ (StageActors Staje))) } deriving Typeable type StageActors Staje = [Person, Project, Group, Deck, Loom, Repo, Factory] + type StageSpawn Staje = AllowSpawn type YesodRender y = Route y -> [(Text, Text)] -> Text @@ -664,130 +677,149 @@ instance (Actor a, VervisActorLaunch a, ActorReturn a ~ Either Text Text, ActorS -} launchActorIO - :: ( ActorLaunch a, ActorStage a ~ Staje - , Eq (ActorKey a), Hashable (ActorKey a), Show (ActorKey a) - , H.HEq - (TVar (HashMap (ActorKey a) (ActorRef a))) - (TVar (HashMap PersonId (ActorRef Person))) - b1 - , H.HOccurrence' - b1 - (TVar (HashMap (ActorKey a) (ActorRef a))) - '[TVar (HashMap PersonId (ActorRef Person)), - TVar (HashMap ProjectId (ActorRef Project)), - TVar (HashMap GroupId (ActorRef Group)), - TVar (HashMap DeckId (ActorRef Deck)), - TVar (HashMap LoomId (ActorRef Loom)), - TVar (HashMap RepoId (ActorRef Repo)), - TVar (HashMap FactoryId (ActorRef Factory))] - l'1 - , H.HOccurs' - (TVar (HashMap (ActorKey a) (ActorRef a))) - l'1 - '[TVar (HashMap PersonId (ActorRef Person)), - TVar (HashMap ProjectId (ActorRef Project)), - TVar (HashMap GroupId (ActorRef Group)), - TVar (HashMap DeckId (ActorRef Deck)), - TVar (HashMap LoomId (ActorRef Loom)), - TVar (HashMap RepoId (ActorRef Repo)), - TVar (HashMap FactoryId (ActorRef Factory))] - - , ActorStage a ~ s + :: forall a ms b l . + ( ActorLaunch a, ActorStage a ~ Staje + , Hashable (ActorIdentity a) , ActorInterface a ~ ms - , Eval (Map (AdaptedHandler s) ms) + , H.HOccurs + (TVar (ActorRefMap a)) + (HList (Eval (Map ActorRefMapTVar_ (StageActors Staje)))) + , Eval (Map (AdaptedHandler Staje) ms) ~ Eval (Map - (Func (AdaptedAction s, Text)) + (Func (AdaptedAction Staje, Text)) (Eval (Map Parcel_ ms)) ) , H.SameLength' - (Eval (Map (Func (AdaptedAction s, Text)) (Eval (Map Parcel_ ms)))) + (Eval (Map (Func (AdaptedAction Staje, Text)) (Eval (Map Parcel_ ms)))) (Eval (Map (Handler_ a) ms)) , H.SameLength' (Eval (Map (Handler_ a) ms)) - (Eval (Map (Func (AdaptedAction s, Text)) (Eval (Map Parcel_ ms)))) + (Eval (Map (Func (AdaptedAction Staje, Text)) (Eval (Map Parcel_ ms)))) , Eval (Constraints (Eval (Map (AdaptHandlerConstraint a) ms))) - , Handle' (Eval (Map Parcel_ ms)) (AdaptedAction s, Text) + , Handle' (Eval (Map Parcel_ ms)) (AdaptedAction Staje, Text) , H.HMapAux - H.HList + HList (HAdaptHandler a) (Eval (Map (Handler_ a) ms)) (Eval (Map - (Func (AdaptedAction s, Text)) + (Func (AdaptedAction Staje, Text)) (Eval (Map Parcel_ ms)) ) ) + , H.HEq + (TVar (HashMap (ActorIdentity a) (Ref a))) + (TVar (HashMap (Key Person) (Ref Person))) + b + , H.HOccurrence' + b + (TVar (HashMap (ActorIdentity a) (Ref a))) + [TVar (HashMap (Key Person) (Ref Person)), + TVar (HashMap (Key Project) (Ref Project)), + TVar (HashMap (Key Group) (Ref Group)), + TVar (HashMap (Key Deck) (Ref Deck)), + TVar (HashMap (Key Loom) (Ref Loom)), + TVar (HashMap (Key Repo) (Ref Repo)), + TVar (HashMap (Key Factory) (Ref Factory))] + l + , H.HOccurs' + (TVar (HashMap (ActorIdentity a) (Ref a))) + l + [TVar (HashMap (Key Person) (Ref Person)), + TVar (HashMap (Key Project) (Ref Project)), + TVar (HashMap (Key Group) (Ref Group)), + TVar (HashMap (Key Deck) (Ref Deck)), + TVar (HashMap (Key Loom) (Ref Loom)), + TVar (HashMap (Key Repo) (Ref Repo)), + TVar (HashMap (Key Factory) (Ref Factory))] ) => Theater -> StageEnv Staje - -> ActorKey a + -> ActorIdentity a -> IO Bool -launchActorIO theater env key = spawnIO theater key (pure env) +launchActorIO theater env ident = do + let tvar = H.hOccurs (envActors env) + maybeRef <- HM.lookup ident <$> readTVarIO tvar + case maybeRef of + Just _ -> pure False + Nothing -> do + ref <- spawnIO @a theater ident (pure env) + atomically $ modifyTVar' tvar $ HM.insert ident ref + return True launchActor - :: ( ActorLaunch a, ActorStage a ~ Staje - , Eq (ActorKey a), Hashable (ActorKey a), Show (ActorKey a) - , H.HEq - (TVar (HashMap (ActorKey a) (ActorRef a))) - (TVar (HashMap PersonId (ActorRef Person))) - b0 - , H.HOccurrence' - b0 - (TVar (HashMap (ActorKey a) (ActorRef a))) - '[TVar (HashMap PersonId (ActorRef Person)), - TVar (HashMap ProjectId (ActorRef Project)), - TVar (HashMap GroupId (ActorRef Group)), - TVar (HashMap DeckId (ActorRef Deck)), - TVar (HashMap LoomId (ActorRef Loom)), - TVar (HashMap RepoId (ActorRef Repo)), - TVar (HashMap FactoryId (ActorRef Factory))] - l'0 - , H.HOccurs' - (TVar (HashMap (ActorKey a) (ActorRef a))) - l'0 - '[TVar (HashMap PersonId (ActorRef Person)), - TVar (HashMap ProjectId (ActorRef Project)), - TVar (HashMap GroupId (ActorRef Group)), - TVar (HashMap DeckId (ActorRef Deck)), - TVar (HashMap LoomId (ActorRef Loom)), - TVar (HashMap RepoId (ActorRef Repo)), - TVar (HashMap FactoryId (ActorRef Factory))] - - , ActorStage a ~ s + :: forall a ms b l . + ( ActorLaunch a, ActorStage a ~ Staje + , Hashable (ActorIdentity a) , ActorInterface a ~ ms - , Eval (Map (AdaptedHandler s) ms) + , H.HOccurs + (TVar (ActorRefMap a)) + (HList (Eval (Map ActorRefMapTVar_ (StageActors Staje)))) + , Eval (Map (AdaptedHandler Staje) ms) ~ Eval (Map - (Func (AdaptedAction s, Text)) + (Func (AdaptedAction Staje, Text)) (Eval (Map Parcel_ ms)) ) , H.SameLength' - (Eval (Map (Func (AdaptedAction s, Text)) (Eval (Map Parcel_ ms)))) + (Eval (Map (Func (AdaptedAction Staje, Text)) (Eval (Map Parcel_ ms)))) (Eval (Map (Handler_ a) ms)) , H.SameLength' (Eval (Map (Handler_ a) ms)) - (Eval (Map (Func (AdaptedAction s, Text)) (Eval (Map Parcel_ ms)))) + (Eval (Map (Func (AdaptedAction Staje, Text)) (Eval (Map Parcel_ ms)))) , Eval (Constraints (Eval (Map (AdaptHandlerConstraint a) ms))) - , Handle' (Eval (Map Parcel_ ms)) (AdaptedAction s, Text) + , Handle' (Eval (Map Parcel_ ms)) (AdaptedAction Staje, Text) , H.HMapAux - H.HList + HList (HAdaptHandler a) (Eval (Map (Handler_ a) ms)) (Eval (Map - (Func (AdaptedAction s, Text)) + (Func (AdaptedAction Staje, Text)) (Eval (Map Parcel_ ms)) ) ) + , H.HEq + (TVar (HashMap (ActorIdentity a) (Ref a))) + (TVar (HashMap (Key Person) (Ref Person))) + b + , H.HOccurrence' + b + (TVar (HashMap (ActorIdentity a) (Ref a))) + [TVar (HashMap (Key Person) (Ref Person)), + TVar (HashMap (Key Project) (Ref Project)), + TVar (HashMap (Key Group) (Ref Group)), + TVar (HashMap (Key Deck) (Ref Deck)), + TVar (HashMap (Key Loom) (Ref Loom)), + TVar (HashMap (Key Repo) (Ref Repo)), + TVar (HashMap (Key Factory) (Ref Factory))] + l + , H.HOccurs' + (TVar (HashMap (ActorIdentity a) (Ref a))) + l + [TVar (HashMap (Key Person) (Ref Person)), + TVar (HashMap (Key Project) (Ref Project)), + TVar (HashMap (Key Group) (Ref Group)), + TVar (HashMap (Key Deck) (Ref Deck)), + TVar (HashMap (Key Loom) (Ref Loom)), + TVar (HashMap (Key Repo) (Ref Repo)), + TVar (HashMap (Key Factory) (Ref Factory))] ) - => ActorKey a + => ActorIdentity a -> Act Bool -launchActor key = do - e <- askEnv - spawn key (pure e) +launchActor ident = do + env <- askEnv + let tvar = H.hOccurs (envActors env) + maybeRef <- liftIO $ HM.lookup ident <$> readTVarIO tvar + case maybeRef of + Just _ -> pure False + Nothing -> do + ref <- spawn @a ident (pure env) + liftIO $ atomically $ modifyTVar' tvar $ HM.insert ident ref + return True data RemoteRecipient = RemoteRecipient { remoteRecipientActor :: RemoteActorId @@ -796,6 +828,68 @@ data RemoteRecipient = RemoteRecipient , remoteRecipientErrorSince :: Maybe UTCTime } +--data MapAndSet_ :: Type -> Exp Type +--type instance Eval (MapAndSet_ a) = (Eval (KeyAndRef_ a), HashSet (ActorIdentity a)) + +sendVerses + :: ( Actor a + , ActorStage a ~ Staje + , ActorHasMethod a "verse" (Verse :-> Return (Either Text Text)) + , Eq (ActorIdentity a) + , H.HEq + (TVar (ActorRefMap a)) (TVar (ActorRefMap Person)) b + , H.HOccurrence' + b + (TVar (ActorRefMap a)) + [TVar (ActorRefMap Person), TVar (ActorRefMap Project), + TVar (ActorRefMap Group), TVar (ActorRefMap Deck), + TVar (ActorRefMap Loom), TVar (ActorRefMap Repo), + TVar (ActorRefMap Factory)] + l + , H.HOccurs' + (TVar (ActorRefMap a)) + l + [TVar (ActorRefMap Person), TVar (ActorRefMap Project), + TVar (ActorRefMap Group), TVar (ActorRefMap Deck), + TVar (ActorRefMap Loom), TVar (ActorRefMap Repo), + TVar (ActorRefMap Factory)] + ) + => Verse + -> (TVar (HashMap (ActorIdentity a) (Ref a)), HashSet (ActorIdentity a)) + -> Act () +sendVerses verse (tvar, s) = do + actorMap <- liftIO $ readTVarIO tvar + let refs = HM.elems $ actorMap `HM.intersection` HS.toMap s + for_ refs $ \ ref -> void $ send @"verse" ref verse + +data HSendVerses = HSendVerses Verse +instance + ( Actor a + , ActorStage a ~ Staje + , ActorHasMethod a "verse" (Verse :-> Return (Either Text Text)) + , Eq (ActorIdentity a) + , i ~ (TVar (HashMap (ActorIdentity a) (Ref a)), HashSet (ActorIdentity a)) + , H.HEq + (TVar (ActorRefMap a)) (TVar (ActorRefMap Person)) b + , H.HOccurrence' + b + (TVar (ActorRefMap a)) + [TVar (ActorRefMap Person), TVar (ActorRefMap Project), + TVar (ActorRefMap Group), TVar (ActorRefMap Deck), + TVar (ActorRefMap Loom), TVar (ActorRefMap Repo), + TVar (ActorRefMap Factory)] + l + , H.HOccurs' + (TVar (ActorRefMap a)) + l + [TVar (ActorRefMap Person), TVar (ActorRefMap Project), + TVar (ActorRefMap Group), TVar (ActorRefMap Deck), + TVar (ActorRefMap Loom), TVar (ActorRefMap Repo), + TVar (ActorRefMap Factory)] + ) => + H.ApplyAB HSendVerses i (Act ()) where + applyAB (HSendVerses verse) = sendVerses verse + -- Given a list of local recipients, which may include actors and collections, -- -- * Insert activity to message queues of live actors @@ -978,13 +1072,21 @@ sendToLocalActors authorAndId body requireOwner mauthor maidAuthor recips = do (Just (liveRecipsR, actorVerse verse)) `H.HCons` (Just (liveRecipsF, actorVerse verse)) `H.HCons` H.HNil -} - for_ liveRecipsP $ \ k -> void $ send @"verse" k verse - for_ liveRecipsJ $ \ k -> void $ send @"verse" k verse - for_ liveRecipsG $ \ k -> void $ send @"verse" k verse - for_ liveRecipsD $ \ k -> void $ send @"verse" k verse - for_ liveRecipsL $ \ k -> void $ send @"verse" k verse - for_ liveRecipsR $ \ k -> void $ send @"verse" k verse - for_ liveRecipsF $ \ k -> void $ send @"verse" k verse + let actorSets = + liveRecipsP `HCons` liveRecipsJ `HCons` liveRecipsG `HCons` + liveRecipsD `HCons` liveRecipsL `HCons` liveRecipsR `HCons` + liveRecipsF `HCons` HNil + actorMaps <- envActors <$> askEnv + {- + let sendVerses' + :: ( ActorStage a ~ Staje + , ActorHasMethod a "verse" (Verse :-> Return (Either Text Text)) + ) + => (TVar (HashMap (ActorIdentity a) (Ref a)), HashSet (ActorIdentity a)) + -> Act () + sendVerses' = sendVerses verse + -} + H.hMapM_ (HSendVerses verse) (H.hZip actorMaps actorSets) -- :: HList (Eval (Map MapAndSet_ (StageActors Staje)))) -- Return remote followers, to whom we need to deliver via HTTP return remoteFollowers diff --git a/src/Vervis/Actor/Factory.hs b/src/Vervis/Actor/Factory.hs index 73980cf..5f71227 100644 --- a/src/Vervis/Actor/Factory.hs +++ b/src/Vervis/Actor/Factory.hs @@ -19,6 +19,7 @@ module Vervis.Actor.Factory where import Control.Applicative +import Control.Concurrent.STM.TVar import Control.Exception.Base hiding (handle) import Control.Monad import Control.Monad.IO.Class @@ -45,6 +46,8 @@ import Database.Persist.Sql import Optics.Core import Yesod.Persist.Core +import qualified Data.HashMap.Strict as HM +import qualified Data.HList as H import qualified Data.Text as T import qualified Database.Esqueleto as E @@ -1060,8 +1063,12 @@ factoryCreateNew new now factoryMeID (Verse authorIdMsig body) detail = do } return ( LocalResourceDeck did - , launchActor did - , send @"init" did authorId + , launchActor @Deck did + , do tvar <- H.hOccurs <$> asksEnv envActors + actors <- liftIO $ readTVarIO tvar + case HM.lookup did actors of + Nothing -> pure False + Just ref -> send @"init" @Deck ref authorId ) NAProject -> do jid <- insert Project @@ -1070,8 +1077,12 @@ factoryCreateNew new now factoryMeID (Verse authorIdMsig body) detail = do } return ( LocalResourceProject jid - , launchActor jid - , send @"init" jid authorId + , launchActor @Project jid + , do tvar <- H.hOccurs <$> asksEnv envActors + actors <- liftIO $ readTVarIO tvar + case HM.lookup jid actors of + Nothing -> pure False + Just ref -> send @"init" @Project ref authorId ) NATeam -> do gid <- insert Group @@ -1080,8 +1091,12 @@ factoryCreateNew new now factoryMeID (Verse authorIdMsig body) detail = do } return ( LocalResourceGroup gid - , launchActor gid - , send @"init" gid authorId + , launchActor @Group gid + , do tvar <- H.hOccurs <$> asksEnv envActors + actors <- liftIO $ readTVarIO tvar + case HM.lookup gid actors of + Nothing -> pure False + Just ref -> send @"init" @Group ref authorId ) return (lr, launch, sendInit, rid) diff --git a/src/Vervis/Actor/Person/Client.hs b/src/Vervis/Actor/Person/Client.hs index 49b90ee..6a27028 100644 --- a/src/Vervis/Actor/Person/Client.hs +++ b/src/Vervis/Actor/Person/Client.hs @@ -502,7 +502,7 @@ clientCreateFactory now personMeID (ClientMsg maybeCap localRecips remoteRecips ) -- Spawn new Factory actor - success <- lift $ launchActor factoryID + success <- lift $ launchActor @Factory factoryID unless success $ error "Failed to spawn new Factory, somehow ID already in Theater" diff --git a/src/Vervis/Application.hs b/src/Vervis/Application.hs index 28c281b..487a0f3 100644 --- a/src/Vervis/Application.hs +++ b/src/Vervis/Application.hs @@ -16,6 +16,9 @@ {-# OPTIONS_GHC -fno-warn-orphans #-} +-- For HWriteTVar to work +{-# LANGUAGE UndecidableInstances #-} + {- LANGUAGE RankNTypes #-} module Vervis.Application @@ -46,6 +49,8 @@ import Control.Monad.Trans.Reader import Data.Bifunctor import Data.Default.Class import Data.Foldable +import Data.Hashable +import Data.HList (HList (..)) import Data.List import Data.List.NonEmpty (nonEmpty) import Data.Maybe @@ -99,6 +104,8 @@ import Yesod.ActivityPub import Yesod.Hashids import Yesod.MonadSite +import qualified Control.Concurrent.Actor as CCA + import Control.Concurrent.Local import Development.Git (isGitRepo) import Data.List.NonEmpty.Local @@ -162,6 +169,17 @@ mkYesodDispatch "App" resourcesApp loggingFunction :: App -> LogFunc loggingFunction app = messageLoggerSource app (appLogger app) +data HWriteTVar = HWriteTVar +instance + ( CCA.Actor a + , Eq (ActorIdentity a) + , Hashable (ActorIdentity a) + , i ~ (TVar (HM.HashMap (ActorIdentity a) (Ref a)), [(ActorIdentity a, Ref a)]) + ) => + H.ApplyAB HWriteTVar i (IO ()) where + applyAB HWriteTVar (tvar, l) = + atomically $ writeTVar tvar $ HM.fromList l + -- | This function allocates resources (such as a database connection pool), -- performs initialization and returns a foundation datatype value. This is also -- the place to put your migrate statements to have automatic database @@ -208,7 +226,8 @@ makeFoundation appSettings = do appHashidsContext appTheater appEnv - appPersonLauncher = + appPersonLauncher + appActors = App {..} -- The App {..} syntax is an example of record wild cards. For more -- information, see: @@ -221,6 +240,7 @@ makeFoundation appSettings = do (error "theater forced in tempFoundation") (error "env forced in tempFoundation") (error "launcher forced in tempFoundation") + (error "actors forced in tempFoundation") logFunc = loggingFunction tempFoundation -- Create the database connection pool @@ -235,7 +255,7 @@ makeFoundation appSettings = do hashidsSalt <- loadKeyFile loadMode $ appHashidsSaltFile appSettings let hashidsCtx = hashidsContext hashidsSalt - app = mkFoundation pool capSignKey hashidsCtx (error "theater") (error "env") (error "launcher") + app = mkFoundation pool capSignKey hashidsCtx (error "theater") (error "env") (error "launcher") (error "actors") -- Perform database migration using our application's logging settings. --runLoggingT (runSqlPool (runMigration migrateAll) pool) logFunc @@ -256,13 +276,22 @@ makeFoundation appSettings = do delivery <- do micros <- intervalMicros $ appDeliveryRetryBase appSettings startDeliveryTheater - (sitePostSignedHeaders app) micros appHttpManager logFunc delieryStateDir + "delivery-counter.sqlite3" (sitePostSignedHeaders app) micros appHttpManager logFunc delieryStateDir + actorTVars <- do + p <- newTVarIO HM.empty + j <- newTVarIO HM.empty + g <- newTVarIO HM.empty + d <- newTVarIO HM.empty + l <- newTVarIO HM.empty + r <- newTVarIO HM.empty + f <- newTVarIO HM.empty + return $ p `HCons` j `HCons` g `HCons` d `HCons` l `HCons` r `HCons` f `HCons` HNil let root = renderObjURI $ flip ObjURI topLocalURI $ appInstanceHost appSettings --render :: Yesod y => y -> Route y -> [(Text, Text)] -> Text render = yesodRender app root - env = Env appSettings pool hashidsCtx appActorKeys delivery render appHttpManager appActorFetchShare + env = Env appSettings pool hashidsCtx appActorKeys delivery render appHttpManager appActorFetchShare actorTVars actors <- flip runWorker app $ runSiteDB $ loadTheater env - theater <- startTheater logFunc actors + (theater, actorMap) <- startTheater "actor-counter.sqlite3" logFunc actors launcher <- startPersonLauncher theater env let hostString = T.unpack $ renderAuthority hLocal @@ -272,8 +301,10 @@ makeFoundation appSettings = do , configMaxCommits = 20 } + H.hMapM_ HWriteTVar (H.hZip actorTVars actorMap) + -- Return the foundation - return app { appTheater = theater, appEnv = env, appPersonLauncher = launcher } + return app { appTheater = theater, appEnv = env, appPersonLauncher = launcher, appActors = actorTVars } where verifyRepoDir = do repos <- lift reposFromDir @@ -371,7 +402,7 @@ makeFoundation appSettings = do where handle mvar = do (personID, sendResult) <- takeMVar mvar - success <- launchActorIO theater env personID + success <- launchActorIO @Person theater env personID putMVar sendResult success -- | Convert our foundation to a WAI Application by calling @toWaiAppPlain@ and @@ -446,6 +477,7 @@ sshServer foundation = (appConnPool foundation) (loggingFunction foundation) (appTheater foundation) + (H.hOccurs $ appActors foundation) mailer :: App -> IO () mailer foundation = diff --git a/src/Vervis/Foundation.hs b/src/Vervis/Foundation.hs index f483609..d9fded6 100644 --- a/src/Vervis/Foundation.hs +++ b/src/Vervis/Foundation.hs @@ -38,6 +38,7 @@ import Data.Traversable import Data.Vector (Vector) import Database.Persist.Postgresql import Database.Persist.Sql (ConnectionPool) +import Fcf (Eval, Map) import Network.HTTP.Client (Manager, HasHttpManager (..)) import Network.HTTP.Types.Header import Text.Shakespeare.Text (textFile) @@ -60,6 +61,7 @@ import Yesod.Static import qualified Data.Aeson as A import qualified Data.ByteString.Lazy as BL (ByteString) +import qualified Data.HashMap.Strict as HM import qualified Data.HashSet as HS import qualified Data.HList as H import qualified Data.Time.Units as U @@ -141,6 +143,7 @@ data App = App , appTheater :: Theater , appEnv :: StageEnv Staje , appPersonLauncher :: MVar (PersonId, MVar Bool) + , appActors :: HList (Eval (Map KeyAndRef_ (StageActors Staje))) } -- Aliases for the routes file, because it doesn't like spaces in path piece @@ -725,9 +728,14 @@ instance AccountDB AccountPersistDB' where error "Failed to spawn new Person, somehow ID already in Theater" AccountPersistDB' $ do theater <- asksSite appTheater - there <- liftIO $ sendIO' @"init" theater Proxy personID HNil - unless there $ - error "Failed to find new Person, somehow ID not in Theater" + peopleVar `HCons` _ `HCons` _ `HCons` _ `HCons` _ `HCons` _ `HCons` factoriesVar `HCons` HNil <- asksSite appActors + people <- liftIO $ readTVarIO peopleVar + case HM.lookup personID people of + Nothing -> error "Failed to find new Person, somehow ID not in appActors" + Just personRef -> do + there <- liftIO $ sendIO' @"init" theater Proxy personRef HNil + unless there $ + error "Failed to find new Person, somehow Ref not in Theater" factoryIDs <- runDB $ selectKeysList [] [] {- let package = (HS.fromList factoryIDs, FactoryMsgVerified personID) @@ -740,8 +748,12 @@ instance AccountDB AccountPersistDB' where Nothing `H.HCons` Just package `H.HCons` H.HNil -} - liftIO $ for_ factoryIDs $ \ (factoryID :: FactoryId) -> - void $ sendIO' @"verified" theater Proxy factoryID (personID `HCons` HNil) + factories <- liftIO $ readTVarIO factoriesVar + let factoryRefs = + HM.elems $ + factories `HM.intersection` HS.toMap (HS.fromList factoryIDs) + liftIO $ for_ factoryRefs $ \ (ref :: Ref Factory) -> + void $ sendIO' @"verified" theater Proxy ref (personID `HCons` HNil) setVerifyKey = (morphAPDB .) . setVerifyKey setNewPasswordKey = (morphAPDB .) . setNewPasswordKey setNewPassword = (morphAPDB .) . setNewPassword diff --git a/src/Vervis/Recipient.hs b/src/Vervis/Recipient.hs index c58445c..f84c8e2 100644 --- a/src/Vervis/Recipient.hs +++ b/src/Vervis/Recipient.hs @@ -172,7 +172,7 @@ import Vervis.Model instance WA.StageWebRoute Staje where type StageRoute Staje = Route App askUrlRenderParams = do - Env _ _ _ _ _ render _ _ <- askEnv + Env _ _ _ _ _ render _ _ _ <- askEnv case cast render of Nothing -> error "Env site isn't App" Just r -> pure r diff --git a/src/Vervis/RemoteActorStore.hs b/src/Vervis/RemoteActorStore.hs index c6cc79e..dcf6969 100644 --- a/src/Vervis/RemoteActorStore.hs +++ b/src/Vervis/RemoteActorStore.hs @@ -1,6 +1,7 @@ {- This file is part of Vervis. - - - Written in 2019, 2020, 2022, 2023 by fr33domlover . + - Written in 2019, 2020, 2022, 2023, 2024 + - by fr33domlover . - - ♡ Copying is an act of love. Please copy, reuse and share. - @@ -565,7 +566,7 @@ fetchRemoteActor' iid host luActor = do Left ers -> Just ers Right _ -> Nothing Nothing -> do - Env _ pool _ _ _ _ manager fetch <- askEnv + Env _ pool _ _ _ _ manager fetch _ <- askEnv liftIO $ runShared fetch (ObjURI host luActor) (pool, manager, iid) deleteUnusedURAs :: (MonadIO m, MonadLogger m) => ReaderT SqlBackend m () diff --git a/src/Vervis/Ssh.hs b/src/Vervis/Ssh.hs index ed7f3b0..ddd8033 100644 --- a/src/Vervis/Ssh.hs +++ b/src/Vervis/Ssh.hs @@ -19,6 +19,7 @@ module Vervis.Ssh where import Control.Applicative ((<|>), optional) +import Control.Concurrent.STM.TVar import Control.Monad (when) import Control.Monad.IO.Class (liftIO) import Control.Monad.Logger @@ -29,6 +30,7 @@ import Data.Attoparsec.Text import Data.ByteString (ByteString) import Data.ByteString.Lazy (fromStrict) import Data.Foldable (find) +import Data.HashMap.Strict (HashMap) import Data.HList (HList (..)) import Data.Maybe (isJust) import Data.Monoid ((<>)) @@ -49,6 +51,7 @@ import System.Process (CreateProcess (..), StdStream (..), createProcess, proc) import Web.Hashids import Yesod.Core.Dispatch +import qualified Data.HashMap.Strict as HM import qualified Data.Text as T import qualified Formatting as F @@ -70,7 +73,7 @@ import Vervis.Settings -- Types ------------------------------------------------------------------------------- -type ChannelBase = LoggingT (ReaderT (ConnectionPool, Theater) IO) +type ChannelBase = LoggingT (ReaderT (ConnectionPool, Theater, TVar (HashMap RepoId (Ref Repo))) IO) type SessionBase = LoggingT (ReaderT ConnectionPool IO) type UserAuthId = PersonId @@ -102,7 +105,7 @@ src = "SSH" runChanDB :: SshChanDB a -> Channel a runChanDB action = do - pool <- lift . lift $ asks fst + (pool, _, _) <- lift . lift $ ask runSqlPool action pool runSessDB :: SshSessDB a -> Session a @@ -267,12 +270,16 @@ runAction decodeRepoHash root _wantReply action = Just repoID -> whenDarcsRepoExists True repoPath $ do pid <- authId <$> askAuthDetails liftIO $ setEnv "VERVIS_SSH_USER" (show $ fromSqlKey pid) - theater <- lift . lift $ asks snd - (sendValue, waitValue) <- liftIO newReturn - _ <- liftIO $ sendIO' @"wait-during-push" theater Proxy repoID $ waitValue `HCons` HNil - executeWait "darcs" ["apply", "--all", "--repodir", repoPath] - liftIO $ sendValue () - return ARProcess + (_, theater, reposVar) <- lift . lift $ ask + repos <- liftIO $ readTVarIO reposVar + case HM.lookup repoID repos of + Nothing -> return $ ARFail "RepoId not found in map" + Just ref -> do + (sendValue, waitValue) <- liftIO newReturn + _ <- liftIO $ sendIO' @"wait-during-push" theater Proxy ref $ waitValue `HCons` HNil + executeWait "darcs" ["apply", "--all", "--repodir", repoPath] + liftIO $ sendValue () + return ARProcess Nothing -> return $ ARFail "You can't push to this repository" GitUploadPack repoHash -> do let repoPath = repoDir root repoHash @@ -294,12 +301,16 @@ runAction decodeRepoHash root _wantReply action = Just repoID -> whenGitRepoExists True repoPath $ do pid <- authId <$> askAuthDetails liftIO $ setEnv "VERVIS_SSH_USER" (show $ fromSqlKey pid) - theater <- lift . lift $ asks snd - (sendValue, waitValue) <- liftIO newReturn - _ <- liftIO $ sendIO' @"wait-during-push" theater Proxy repoID $ waitValue `HCons` HNil - executeWait "git-receive-pack" [repoPath] - liftIO $ sendValue () - return ARProcess + (_, theater, reposVar) <- lift . lift $ ask + repos <- liftIO $ readTVarIO reposVar + case HM.lookup repoID repos of + Nothing -> return $ ARFail "RepoId not found in map" + Just ref -> do + (sendValue, waitValue) <- liftIO newReturn + _ <- liftIO $ sendIO' @"wait-during-push" theater Proxy ref $ waitValue `HCons` HNil + executeWait "git-receive-pack" [repoPath] + liftIO $ sendValue () + return ARProcess Nothing -> return $ ARFail "You can't push to this repository" handle @@ -346,8 +357,9 @@ mkConfig -> ConnectionPool -> LogFunc -> Theater + -> TVar (HashMap RepoId (Ref Repo)) -> IO (Config SessionBase ChannelBase UserAuthId) -mkConfig settings ctx pool logFunc theater = do +mkConfig settings ctx pool logFunc theater reposVar = do keyPair <- keyPairFromFile $ appSshKeyFile settings return $ Config { cSession = SessionConfig @@ -360,13 +372,13 @@ mkConfig settings ctx pool logFunc theater = do , cChannel = ChannelConfig { ccRequestHandler = handle (decodeKeyHashidPure ctx) (appRepoDir settings) , ccRunBaseMonad = - flip runReaderT (pool, theater) . flip runLoggingT logFunc + flip runReaderT (pool, theater, reposVar) . flip runLoggingT logFunc } , cPort = fromIntegral $ appSshPort settings , cReadyAction = ready logFunc } -runSsh :: AppSettings -> HashidsContext -> ConnectionPool -> LogFunc -> Theater -> IO () -runSsh settings ctx pool logFunc theater = do - config <- mkConfig settings ctx pool logFunc theater +runSsh :: AppSettings -> HashidsContext -> ConnectionPool -> LogFunc -> Theater -> TVar (HashMap RepoId (Ref Repo)) -> IO () +runSsh settings ctx pool logFunc theater reposVar = do + config <- mkConfig settings ctx pool logFunc theater reposVar startConfig config diff --git a/src/Vervis/Web/Actor.hs b/src/Vervis/Web/Actor.hs index 0a9e712..2c1e8fe 100644 --- a/src/Vervis/Web/Actor.hs +++ b/src/Vervis/Web/Actor.hs @@ -256,40 +256,55 @@ getInbox'' grabInbox here getActorID hash = do ibiidString = "InboxItem #" ++ show (fromSqlKey ibid) postInbox - :: ( CCA.Actor a + :: forall a b l b0 l0 . + ( CCA.Actor a , ActorLaunch a , ActorHasMethod a "verse" (Verse :-> Return (Either Text Text)) - --, Eval (LookupSig "verse" (ActorInterface a)) - -- ~ - -- Just (Verse :-> Return (Either Text Text)) - , ActorKey a ~ Key a + , ActorIdentity a ~ Key a , Eq (Key a) , Hashable (Key a) , H.HEq - (TVar (M.HashMap (Key a) (ActorRef a))) - (TVar (M.HashMap PersonId (ActorRef Person))) - b0 + (TVar (M.HashMap (Key a) (Ref a))) + (TVar (M.HashMap (Key Person) (Ref Person))) + b + , H.HOccurrence' + b + (TVar (M.HashMap (Key a) (Ref a))) + [TVar (M.HashMap (Key Person) (Ref Person)), + TVar (M.HashMap (Key Project) (Ref Project)), + TVar (M.HashMap (Key Group) (Ref Group)), + TVar (M.HashMap (Key Deck) (Ref Deck)), + TVar (M.HashMap (Key Loom) (Ref Loom)), + TVar (M.HashMap (Key Repo) (Ref Repo)), + TVar (M.HashMap (Key Factory) (Ref Factory))] + l + , H.HOccurs' + (TVar (M.HashMap (Key a) (Ref a))) + l + [TVar (M.HashMap (Key Person) (Ref Person)), + TVar (M.HashMap (Key Project) (Ref Project)), + TVar (M.HashMap (Key Group) (Ref Group)), + TVar (M.HashMap (Key Deck) (Ref Deck)), + TVar (M.HashMap (Key Loom) (Ref Loom)), + TVar (M.HashMap (Key Repo) (Ref Repo)), + TVar (M.HashMap (Key Factory) (Ref Factory))] + , H.HEq + (TVar (ActorRefMap a)) (TVar (ActorRefMap Person)) b0 , H.HOccurrence' b0 - (TVar (M.HashMap (Key a) (ActorRef a))) - '[TVar (M.HashMap PersonId (ActorRef Person)), - TVar (M.HashMap ProjectId (ActorRef Project)), - TVar (M.HashMap GroupId (ActorRef Group)), - TVar (M.HashMap DeckId (ActorRef Deck)), - TVar (M.HashMap LoomId (ActorRef Loom)), - TVar (M.HashMap RepoId (ActorRef Vervis.Model.Repo)), - TVar (M.HashMap FactoryId (ActorRef Factory))] - l'0 + (TVar (ActorRefMap a)) + [TVar (ActorRefMap Person), TVar (ActorRefMap Project), + TVar (ActorRefMap Group), TVar (ActorRefMap Deck), + TVar (ActorRefMap Loom), TVar (ActorRefMap Repo), + TVar (ActorRefMap Factory)] + l0 , H.HOccurs' - (TVar (M.HashMap (Key a) (ActorRef a))) - l'0 - '[TVar (M.HashMap PersonId (ActorRef Person)), - TVar (M.HashMap ProjectId (ActorRef Project)), - TVar (M.HashMap GroupId (ActorRef Group)), - TVar (M.HashMap DeckId (ActorRef Deck)), - TVar (M.HashMap LoomId (ActorRef Loom)), - TVar (M.HashMap RepoId (ActorRef Vervis.Model.Repo)), - TVar (M.HashMap FactoryId (ActorRef Factory))] + (TVar (ActorRefMap a)) + l0 + [TVar (ActorRefMap Person), TVar (ActorRefMap Project), + TVar (ActorRefMap Group), TVar (ActorRefMap Deck), + TVar (ActorRefMap Loom), TVar (ActorRefMap Repo), + TVar (ActorRefMap Factory)] ) => (Key a -> LocalActorBy Key) -> Key a -> Handler () postInbox toLA recipID = do @@ -319,8 +334,14 @@ postInbox toLA recipID = do recipByHash <- hashLocalActor recipByKey msig <- checkForwarding recipByHash return (author, luActivity, msig) + ref <- lift $ do + tvar <- H.hOccurs <$> getsYesod appActors + actors <- liftIO $ readTVarIO tvar + case M.lookup recipID actors of + Nothing -> notFound + Just (ref :: Ref a) -> pure ref theater <- getsYesod appTheater - r <- liftIO $ callIO' @"verse" theater Proxy recipID $ Verse authorIdMsig body `HCons` HNil + r <- liftIO $ callIO' @"verse" @a theater Proxy ref $ Verse authorIdMsig body `HCons` HNil case r of Nothing -> notFound Just (Left e) -> throwE e diff --git a/src/Web/Actor/Deliver.hs b/src/Web/Actor/Deliver.hs index 7251dcc..1b7b1a0 100644 --- a/src/Web/Actor/Deliver.hs +++ b/src/Web/Actor/Deliver.hs @@ -31,6 +31,8 @@ module Web.Actor.Deliver ) where +import Control.Concurrent.STM +import Control.Concurrent.STM.TVar import Control.Exception.Base hiding (handle) import Control.Monad import Control.Monad.IO.Class @@ -41,6 +43,7 @@ import Control.Retry import Data.ByteString (ByteString) import Data.Foldable import Data.Hashable +import Data.HashMap.Strict (HashMap) import Data.HList (HList (..)) import Data.List import Data.List.NonEmpty (NonEmpty) @@ -61,6 +64,7 @@ import Web.Hashids import qualified Data.Aeson as A import qualified Data.ByteString.Lazy as BL +import qualified Data.HashMap.Strict as HM import qualified Data.HashSet as HS import qualified Data.HList as H import qualified Data.Text as T @@ -87,8 +91,8 @@ data DeliveryActor u data DeliveryStage u instance UriMode u => Actor (DeliveryActor u) where - type ActorStage (DeliveryActor u) = DeliveryStage u - type ActorKey (DeliveryActor u) = ObjURI u + type ActorStage (DeliveryActor u) = DeliveryStage u + type ActorIdentity (DeliveryActor u) = ObjURI u type ActorInterface (DeliveryActor u) = [ "deliver-local" ::: AP.Envelope u :-> Bool :-> Return () , "forward-remote" ::: AP.Errand u :-> Return () @@ -114,6 +118,7 @@ instance UriMode u => Stage (DeliveryStage u) where , envInit :: (Manager, NonEmpty HeaderName, Int) } type StageActors (DeliveryStage u) = '[DeliveryActor u] + type StageSpawn (DeliveryStage u) = AllowSpawn {- migrations :: [Migration SqlBackend IO] @@ -138,6 +143,7 @@ data DeliveryTheater u = DeliveryTheater , _dtLog :: LogFunc , _dtDir :: OsPath , _dtTheater :: TheaterFor (DeliveryStage u) + , _dtMap :: TVar (HashMap (ObjURI u) (Ref (DeliveryActor u))) } data IdMismatch = IdMismatch deriving Show @@ -219,13 +225,14 @@ decodeUtf = pure startDeliveryTheater :: UriMode u - => NonEmpty HeaderName + => FilePath + -> NonEmpty HeaderName -> Int -> Manager -> LogFunc -> OsPath -> IO (DeliveryTheater u) -startDeliveryTheater headers micros manager logFunc dbRootDir = do +startDeliveryTheater avarBoxPath headers micros manager logFunc dbRootDir = do -- We first add the sqlite3 extension as needed entries <- listDirectory dbRootDir @@ -249,7 +256,9 @@ startDeliveryTheater headers micros manager logFunc dbRootDir = do Right uri -> return uri env <- mkEnv (manager, headers, micros) logFunc (dbRootDir path) return (u, env) - DeliveryTheater manager headers micros logFunc dbRootDir <$> startTheater logFunc (actors `H.HCons` H.HNil) + (theater, actorMap `HCons` HNil) <- startTheater avarBoxPath logFunc (actors `H.HCons` H.HNil) + actorMapVar <- newTVarIO $ HM.fromList actorMap + return $ DeliveryTheater manager headers micros logFunc dbRootDir theater actorMapVar data DeliveryMethod u = MethodDeliverLocal (AP.Envelope u) Bool @@ -257,18 +266,26 @@ data DeliveryMethod u -- Since sendManyIO isn't available right now, we're using many sendIO sendHttp :: UriMode u => DeliveryTheater u -> DeliveryMethod u -> [ObjURI u] -> IO () -sendHttp (DeliveryTheater manager headers micros logFunc root theater) method recips = +sendHttp (DeliveryTheater manager headers micros logFunc root theater actorMapVar) method recips = case method of MethodDeliverLocal envelope fwd -> for_ recips $ \ u -> do - void $ spawnIO theater u (makeEnv u) - void $ sendIO' @"deliver-local" theater Proxy u $ envelope `HCons` fwd `HCons` HNil + ref <- getRef u + void $ sendIO' @"deliver-local" theater Proxy ref $ envelope `HCons` fwd `HCons` HNil MethodForwardRemote errand -> for_ recips $ \ u -> do - void $ spawnIO theater u (makeEnv u) - void $ sendIO' @"forward-remote" theater Proxy u $ errand `HCons` HNil + ref <- getRef u + void $ sendIO' @"forward-remote" theater Proxy ref $ errand `HCons` HNil where makeEnv u = either throwIO pure (TE.decodeUtf8' $ urlEncode False $ TE.encodeUtf8 $ renderObjURI u) >>= encodeUtf . (<.> "sqlite3") . (root ) . T.unpack >>= mkEnv (manager, headers, micros) logFunc + getRef u = do + mref <- HM.lookup u <$> readTVarIO actorMapVar + case mref of + Just r -> pure r + Nothing -> do + r <- spawnIO theater u (makeEnv u) + atomically $ modifyTVar' actorMapVar $ HM.insert u r + return r