From a74b24f61a0cb190b199a4dd6907c7078f36f6a9 Mon Sep 17 00:00:00 2001 From: Pere Lev Date: Mon, 29 Jul 2024 23:06:42 +0300 Subject: [PATCH] Make push-to-repo SSH events sequential via the Repo actor Until now, `Vervis.Ssh` was freely running git/darcs commands, regardless of what the `Repo` actor is doing at the same time. It means that in the `Repo` actor implementation, every repo manipulation must take into account the possibility for a simultaneous push. This commit gives more control, clarity and safety to the `Repo` actor. Since moving the push logic itself to the `Repo` actor would be cumbersome and complicated, the push logic remains in `Vervis.Ssh`, but some thingss happen differently: 1. Before running the git/darcs command that handles the push, it waits for the `Repo` actor to be available (as if pushing is one of the `Repo` actor's methods) 2. In the method handler, the `Repo` actor waits for `Vervis.Ssh` to say that the push handler is done 3. When the git/darcs command returns, `Vervis.Ssh` sends the notification to release the `Repo` actor from the waiting So, from now on, `Repo` and `Loom` code, in particular code that handles PRs, can be sure no simultaneous pushing will happen. --- src/Control/Concurrent/Actor.hs | 2 +- src/Vervis/Actor.hs | 15 ++++--- src/Vervis/Actor/Repo.hs | 5 ++- src/Vervis/Ssh.hs | 79 ++++++++++++++++++++++----------- 4 files changed, 69 insertions(+), 32 deletions(-) diff --git a/src/Control/Concurrent/Actor.hs b/src/Control/Concurrent/Actor.hs index 7cb9675..f714d9a 100644 --- a/src/Control/Concurrent/Actor.hs +++ b/src/Control/Concurrent/Actor.hs @@ -42,7 +42,7 @@ module Control.Concurrent.Actor , startTheater , callIO , call - --, sendIO + , sendIO , send , sendManyIO , sendMany diff --git a/src/Vervis/Actor.hs b/src/Vervis/Actor.hs index 9149e8f..598bbfd 100644 --- a/src/Vervis/Actor.hs +++ b/src/Vervis/Actor.hs @@ -502,7 +502,7 @@ instance Actor Repo where type ActorStage Repo = Staje type ActorKey Repo = RepoId type ActorReturn Repo = Either Text Text - data ActorMessage Repo = MsgR Verse + data ActorMessage Repo = MsgR (Either Verse (IO ())) instance Actor Project where type ActorStage Project = Staje type ActorKey Project = ProjectId @@ -533,8 +533,11 @@ instance VervisActor Loom where actorVerse = MsgL toVerse (MsgL v) = Just v instance VervisActor Repo where - actorVerse = MsgR - toVerse (MsgR v) = Just v + actorVerse = MsgR . Left + toVerse (MsgR e) = + case e of + Left v -> Just v + Right _ -> Nothing instance Stage Staje where data StageEnv Staje = forall y. (Typeable y, Yesod y) => Env @@ -575,8 +578,10 @@ instance Message (ActorMessage Loom) where summarize (MsgL verse) = summarizeVerse verse refer (MsgL verse) = referVerse verse instance Message (ActorMessage Repo) where - summarize (MsgR verse) = summarizeVerse verse - refer (MsgR verse) = referVerse verse + summarize (MsgR (Left verse)) = summarizeVerse verse + summarize (MsgR (Right _)) = "WaitPushCompletion" + refer (MsgR (Left verse)) = referVerse verse + refer (MsgR (Right _)) = "WaitPushCompletion" instance Message (ActorMessage Project) where summarize (MsgJ verse) = summarizeVerse verse refer (MsgJ verse) = referVerse verse diff --git a/src/Vervis/Actor/Repo.hs b/src/Vervis/Actor/Repo.hs index fc133fe..17bd00f 100644 --- a/src/Vervis/Actor/Repo.hs +++ b/src/Vervis/Actor/Repo.hs @@ -54,9 +54,12 @@ import Vervis.Persist.Discussion import Vervis.Ticket repoBehavior :: UTCTime -> RepoId -> ActorMessage Repo -> ActE (Text, Act (), Next) -repoBehavior now repoID (MsgR _verse@(Verse _authorIdMsig body)) = +repoBehavior now repoID (MsgR (Left _verse@(Verse _authorIdMsig body))) = case AP.activitySpecific $ actbActivity body of _ -> throwE "Unsupported activity type for Repo" +repoBehavior _now _repoID (MsgR (Right waitValue)) = do + liftIO waitValue + done "Waited for push to complete" instance VervisActorLaunch Repo where actorBehavior' now repoID ve = do diff --git a/src/Vervis/Ssh.hs b/src/Vervis/Ssh.hs index fb468da..cbb7bc3 100644 --- a/src/Vervis/Ssh.hs +++ b/src/Vervis/Ssh.hs @@ -24,7 +24,7 @@ import Control.Monad.IO.Class (liftIO) import Control.Monad.Logger import Control.Monad.Trans.Class (lift) import Control.Monad.Trans.Maybe (MaybeT (MaybeT, runMaybeT)) -import Control.Monad.Trans.Reader (ReaderT (runReaderT), ask) +import Control.Monad.Trans.Reader (ReaderT (runReaderT), ask, asks) import Data.Attoparsec.Text import Data.ByteString (ByteString) import Data.ByteString.Lazy (fromStrict) @@ -50,6 +50,8 @@ import Yesod.Core.Dispatch import qualified Data.Text as T import qualified Formatting as F +import Control.Concurrent.Actor +import Control.Concurrent.Return import Yesod.Hashids import Data.Git.Local @@ -66,12 +68,12 @@ import Vervis.Settings -- Types ------------------------------------------------------------------------------- -type ChannelBase = LoggingT (ReaderT ConnectionPool IO) +type ChannelBase = LoggingT (ReaderT (ConnectionPool, Theater) IO) type SessionBase = LoggingT (ReaderT ConnectionPool IO) type UserAuthId = PersonId type Channel = ChannelT UserAuthId ChannelBase -type Session = SessionT SessionBase UserAuthId ChannelBase +type Session = SessionT ChannelBase UserAuthId SessionBase type SshChanDB = SqlPersistT Channel type SshSessDB = SqlPersistT Session @@ -98,7 +100,7 @@ src = "SSH" runChanDB :: SshChanDB a -> Channel a runChanDB action = do - pool <- lift . lift $ ask + pool <- lift . lift $ asks fst runSqlPool action pool runSessDB :: SshSessDB a -> Session a @@ -178,8 +180,8 @@ detectAction (Execute s) = Right action -> Right action detectAction _ = Left "Unsupported channel request" -execute :: FilePath -> [String] -> Channel () -execute cmd args = do +execute' :: Bool -> FilePath -> [String] -> Channel () +execute' wait cmd args = do lift $ $logDebugS src $ F.sformat ("Executing " % F.string % " " % F.shown) cmd args let config = (proc cmd args) @@ -191,7 +193,15 @@ execute cmd args = do verifyPipe (Just h) = h verifyPipes (mIn, mOut, mErr, ph) = (verifyPipe mIn, verifyPipe mOut, verifyPipe mErr, ph) - spawnProcess $ verifyPipes <$> createProcess config + if wait + then spawnProcessWait $ verifyPipes <$> createProcess config + else spawnProcess $ verifyPipes <$> createProcess config + +execute :: FilePath -> [String] -> Channel () +execute = execute' False + +executeWait :: FilePath -> [String] -> Channel () +executeWait = execute' True whenRepoExists :: Text @@ -242,17 +252,26 @@ runAction decodeRepoHash root _wantReply action = return ARProcess DarcsApply repoHash -> do let repoPath = repoDir root repoHash - can <- + maybeCan <- case decodeRepoHash repoHash of - Nothing -> return False - Just repoID -> canPushTo repoID - if can - then whenDarcsRepoExists True repoPath $ do + Nothing -> return Nothing + Just repoID -> do + can <- canPushTo repoID + return $ + if can + then Just repoID + else Nothing + case maybeCan of + Just repoID -> whenDarcsRepoExists True repoPath $ do pid <- authId <$> askAuthDetails liftIO $ setEnv "VERVIS_SSH_USER" (show $ fromSqlKey pid) - execute "darcs" ["apply", "--all", "--repodir", repoPath] + theater <- lift . lift $ asks snd + (sendValue, waitValue) <- liftIO newReturn + _ <- liftIO $ sendIO theater repoID $ MsgR $ Right waitValue + executeWait "darcs" ["apply", "--all", "--repodir", repoPath] + liftIO $ sendValue () return ARProcess - else return $ ARFail "You can't push to this repository" + Nothing -> return $ ARFail "You can't push to this repository" GitUploadPack repoHash -> do let repoPath = repoDir root repoHash whenGitRepoExists False repoPath $ do @@ -260,17 +279,26 @@ runAction decodeRepoHash root _wantReply action = return ARProcess GitReceivePack repoHash -> do let repoPath = repoDir root repoHash - can <- + maybeCan <- case decodeRepoHash repoHash of - Nothing -> return False - Just repoID -> canPushTo repoID - if can - then whenGitRepoExists True repoPath $ do + Nothing -> return Nothing + Just repoID -> do + can <- canPushTo repoID + return $ + if can + then Just repoID + else Nothing + case maybeCan of + Just repoID -> whenGitRepoExists True repoPath $ do pid <- authId <$> askAuthDetails liftIO $ setEnv "VERVIS_SSH_USER" (show $ fromSqlKey pid) - execute "git-receive-pack" [repoPath] + theater <- lift . lift $ asks snd + (sendValue, waitValue) <- liftIO newReturn + _ <- liftIO $ sendIO theater repoID $ MsgR $ Right waitValue + executeWait "git-receive-pack" [repoPath] + liftIO $ sendValue () return ARProcess - else return $ ARFail "You can't push to this repository" + Nothing -> return $ ARFail "You can't push to this repository" handle :: (KeyHashid Repo -> Maybe RepoId) @@ -315,8 +343,9 @@ mkConfig -> HashidsContext -> ConnectionPool -> LogFunc + -> Theater -> IO (Config SessionBase ChannelBase UserAuthId) -mkConfig settings ctx pool logFunc = do +mkConfig settings ctx pool logFunc theater = do keyPair <- keyPairFromFile $ appSshKeyFile settings return $ Config { cSession = SessionConfig @@ -329,13 +358,13 @@ mkConfig settings ctx pool logFunc = do , cChannel = ChannelConfig { ccRequestHandler = handle (decodeKeyHashidPure ctx) (appRepoDir settings) , ccRunBaseMonad = - flip runReaderT pool . flip runLoggingT logFunc + flip runReaderT (pool, theater) . flip runLoggingT logFunc } , cPort = fromIntegral $ appSshPort settings , cReadyAction = ready logFunc } runSsh :: AppSettings -> HashidsContext -> ConnectionPool -> LogFunc -> Theater -> IO () -runSsh settings ctx pool logFunc _theater = do - config <- mkConfig settings ctx pool logFunc +runSsh settings ctx pool logFunc theater = do + config <- mkConfig settings ctx pool logFunc theater startConfig config