Make push-to-repo SSH events sequential via the Repo actor

Until now, `Vervis.Ssh` was freely running git/darcs commands,
regardless of what the `Repo` actor is doing at the same time. It means
that in the `Repo` actor implementation, every repo manipulation must
take into account the possibility for a simultaneous push.

This commit gives more control, clarity and safety to the `Repo` actor.
Since moving the push logic itself to the `Repo` actor would be
cumbersome and complicated, the push logic remains in `Vervis.Ssh`, but
some thingss happen differently:

1. Before running the git/darcs command that handles the push, it waits
   for the `Repo` actor to be available (as if pushing is one of the
   `Repo` actor's methods)
2. In the method handler, the `Repo` actor waits for `Vervis.Ssh` to say
   that the push handler is done
3. When the git/darcs command returns, `Vervis.Ssh` sends the
   notification to release the `Repo` actor from the waiting

So, from now on, `Repo` and `Loom` code, in particular code that handles PRs,
can be sure no simultaneous pushing will happen.
This commit is contained in:
Pere Lev 2024-07-29 23:06:42 +03:00
parent ea463703b5
commit a74b24f61a
No known key found for this signature in database
GPG key ID: 5252C5C863E5E57D
4 changed files with 69 additions and 32 deletions

View file

@ -42,7 +42,7 @@ module Control.Concurrent.Actor
, startTheater
, callIO
, call
--, sendIO
, sendIO
, send
, sendManyIO
, sendMany

View file

@ -502,7 +502,7 @@ instance Actor Repo where
type ActorStage Repo = Staje
type ActorKey Repo = RepoId
type ActorReturn Repo = Either Text Text
data ActorMessage Repo = MsgR Verse
data ActorMessage Repo = MsgR (Either Verse (IO ()))
instance Actor Project where
type ActorStage Project = Staje
type ActorKey Project = ProjectId
@ -533,8 +533,11 @@ instance VervisActor Loom where
actorVerse = MsgL
toVerse (MsgL v) = Just v
instance VervisActor Repo where
actorVerse = MsgR
toVerse (MsgR v) = Just v
actorVerse = MsgR . Left
toVerse (MsgR e) =
case e of
Left v -> Just v
Right _ -> Nothing
instance Stage Staje where
data StageEnv Staje = forall y. (Typeable y, Yesod y) => Env
@ -575,8 +578,10 @@ instance Message (ActorMessage Loom) where
summarize (MsgL verse) = summarizeVerse verse
refer (MsgL verse) = referVerse verse
instance Message (ActorMessage Repo) where
summarize (MsgR verse) = summarizeVerse verse
refer (MsgR verse) = referVerse verse
summarize (MsgR (Left verse)) = summarizeVerse verse
summarize (MsgR (Right _)) = "WaitPushCompletion"
refer (MsgR (Left verse)) = referVerse verse
refer (MsgR (Right _)) = "WaitPushCompletion"
instance Message (ActorMessage Project) where
summarize (MsgJ verse) = summarizeVerse verse
refer (MsgJ verse) = referVerse verse

View file

@ -54,9 +54,12 @@ import Vervis.Persist.Discussion
import Vervis.Ticket
repoBehavior :: UTCTime -> RepoId -> ActorMessage Repo -> ActE (Text, Act (), Next)
repoBehavior now repoID (MsgR _verse@(Verse _authorIdMsig body)) =
repoBehavior now repoID (MsgR (Left _verse@(Verse _authorIdMsig body))) =
case AP.activitySpecific $ actbActivity body of
_ -> throwE "Unsupported activity type for Repo"
repoBehavior _now _repoID (MsgR (Right waitValue)) = do
liftIO waitValue
done "Waited for push to complete"
instance VervisActorLaunch Repo where
actorBehavior' now repoID ve = do

View file

@ -24,7 +24,7 @@ import Control.Monad.IO.Class (liftIO)
import Control.Monad.Logger
import Control.Monad.Trans.Class (lift)
import Control.Monad.Trans.Maybe (MaybeT (MaybeT, runMaybeT))
import Control.Monad.Trans.Reader (ReaderT (runReaderT), ask)
import Control.Monad.Trans.Reader (ReaderT (runReaderT), ask, asks)
import Data.Attoparsec.Text
import Data.ByteString (ByteString)
import Data.ByteString.Lazy (fromStrict)
@ -50,6 +50,8 @@ import Yesod.Core.Dispatch
import qualified Data.Text as T
import qualified Formatting as F
import Control.Concurrent.Actor
import Control.Concurrent.Return
import Yesod.Hashids
import Data.Git.Local
@ -66,12 +68,12 @@ import Vervis.Settings
-- Types
-------------------------------------------------------------------------------
type ChannelBase = LoggingT (ReaderT ConnectionPool IO)
type ChannelBase = LoggingT (ReaderT (ConnectionPool, Theater) IO)
type SessionBase = LoggingT (ReaderT ConnectionPool IO)
type UserAuthId = PersonId
type Channel = ChannelT UserAuthId ChannelBase
type Session = SessionT SessionBase UserAuthId ChannelBase
type Session = SessionT ChannelBase UserAuthId SessionBase
type SshChanDB = SqlPersistT Channel
type SshSessDB = SqlPersistT Session
@ -98,7 +100,7 @@ src = "SSH"
runChanDB :: SshChanDB a -> Channel a
runChanDB action = do
pool <- lift . lift $ ask
pool <- lift . lift $ asks fst
runSqlPool action pool
runSessDB :: SshSessDB a -> Session a
@ -178,8 +180,8 @@ detectAction (Execute s) =
Right action -> Right action
detectAction _ = Left "Unsupported channel request"
execute :: FilePath -> [String] -> Channel ()
execute cmd args = do
execute' :: Bool -> FilePath -> [String] -> Channel ()
execute' wait cmd args = do
lift $ $logDebugS src $
F.sformat ("Executing " % F.string % " " % F.shown) cmd args
let config = (proc cmd args)
@ -191,7 +193,15 @@ execute cmd args = do
verifyPipe (Just h) = h
verifyPipes (mIn, mOut, mErr, ph) =
(verifyPipe mIn, verifyPipe mOut, verifyPipe mErr, ph)
spawnProcess $ verifyPipes <$> createProcess config
if wait
then spawnProcessWait $ verifyPipes <$> createProcess config
else spawnProcess $ verifyPipes <$> createProcess config
execute :: FilePath -> [String] -> Channel ()
execute = execute' False
executeWait :: FilePath -> [String] -> Channel ()
executeWait = execute' True
whenRepoExists
:: Text
@ -242,17 +252,26 @@ runAction decodeRepoHash root _wantReply action =
return ARProcess
DarcsApply repoHash -> do
let repoPath = repoDir root repoHash
can <-
maybeCan <-
case decodeRepoHash repoHash of
Nothing -> return False
Just repoID -> canPushTo repoID
Nothing -> return Nothing
Just repoID -> do
can <- canPushTo repoID
return $
if can
then whenDarcsRepoExists True repoPath $ do
then Just repoID
else Nothing
case maybeCan of
Just repoID -> whenDarcsRepoExists True repoPath $ do
pid <- authId <$> askAuthDetails
liftIO $ setEnv "VERVIS_SSH_USER" (show $ fromSqlKey pid)
execute "darcs" ["apply", "--all", "--repodir", repoPath]
theater <- lift . lift $ asks snd
(sendValue, waitValue) <- liftIO newReturn
_ <- liftIO $ sendIO theater repoID $ MsgR $ Right waitValue
executeWait "darcs" ["apply", "--all", "--repodir", repoPath]
liftIO $ sendValue ()
return ARProcess
else return $ ARFail "You can't push to this repository"
Nothing -> return $ ARFail "You can't push to this repository"
GitUploadPack repoHash -> do
let repoPath = repoDir root repoHash
whenGitRepoExists False repoPath $ do
@ -260,17 +279,26 @@ runAction decodeRepoHash root _wantReply action =
return ARProcess
GitReceivePack repoHash -> do
let repoPath = repoDir root repoHash
can <-
maybeCan <-
case decodeRepoHash repoHash of
Nothing -> return False
Just repoID -> canPushTo repoID
Nothing -> return Nothing
Just repoID -> do
can <- canPushTo repoID
return $
if can
then whenGitRepoExists True repoPath $ do
then Just repoID
else Nothing
case maybeCan of
Just repoID -> whenGitRepoExists True repoPath $ do
pid <- authId <$> askAuthDetails
liftIO $ setEnv "VERVIS_SSH_USER" (show $ fromSqlKey pid)
execute "git-receive-pack" [repoPath]
theater <- lift . lift $ asks snd
(sendValue, waitValue) <- liftIO newReturn
_ <- liftIO $ sendIO theater repoID $ MsgR $ Right waitValue
executeWait "git-receive-pack" [repoPath]
liftIO $ sendValue ()
return ARProcess
else return $ ARFail "You can't push to this repository"
Nothing -> return $ ARFail "You can't push to this repository"
handle
:: (KeyHashid Repo -> Maybe RepoId)
@ -315,8 +343,9 @@ mkConfig
-> HashidsContext
-> ConnectionPool
-> LogFunc
-> Theater
-> IO (Config SessionBase ChannelBase UserAuthId)
mkConfig settings ctx pool logFunc = do
mkConfig settings ctx pool logFunc theater = do
keyPair <- keyPairFromFile $ appSshKeyFile settings
return $ Config
{ cSession = SessionConfig
@ -329,13 +358,13 @@ mkConfig settings ctx pool logFunc = do
, cChannel = ChannelConfig
{ ccRequestHandler = handle (decodeKeyHashidPure ctx) (appRepoDir settings)
, ccRunBaseMonad =
flip runReaderT pool . flip runLoggingT logFunc
flip runReaderT (pool, theater) . flip runLoggingT logFunc
}
, cPort = fromIntegral $ appSshPort settings
, cReadyAction = ready logFunc
}
runSsh :: AppSettings -> HashidsContext -> ConnectionPool -> LogFunc -> Theater -> IO ()
runSsh settings ctx pool logFunc _theater = do
config <- mkConfig settings ctx pool logFunc
runSsh settings ctx pool logFunc theater = do
config <- mkConfig settings ctx pool logFunc theater
startConfig config