diff --git a/hbs2-core/lib/HBS2/Net/Messaging/Pipe.hs b/hbs2-core/lib/HBS2/Net/Messaging/Pipe.hs index 8ecaf7ee..78c579b1 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/Pipe.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/Pipe.hs @@ -20,6 +20,7 @@ import Data.Hashable import Network.ByteOrder hiding (ByteString) import System.IO.Unsafe (unsafePerformIO) import System.Posix.IO + import UnliftIO -- define new transport protocol type @@ -53,6 +54,7 @@ newMessagingPipe (pIn,pOut) = do instance Hashable PipeAddr where hashWithSalt salt (PipeAddr pip) = hashWithSalt salt ("pipe-addr", fd) where + -- FIXME: ASAP-unsafePerformIO-is-really-unsafe fd = unsafePerformIO (handleToFd pip <&> fromIntegral @_ @Word) instance HasPeer PIPE where @@ -85,10 +87,14 @@ instance Messaging MessagingPipe PIPE ByteString where runMessagingPipe :: MonadIO m => MessagingPipe -> m () runMessagingPipe bus = liftIO do fix \next -> do - frame <- LBS.hGet who 4 <&> word32 . LBS.toStrict - piece <- LBS.hGet who (fromIntegral frame) - atomically (writeTQueue (inQ bus) piece) - next + done <- hIsEOF who + unless done do + r <- try @_ @SomeException do + frame <- LBS.hGet who 4 <&> word32 . LBS.toStrict + piece <- LBS.hGet who (fromIntegral frame) + atomically (writeTQueue (inQ bus) piece) + + either (const $ pure ()) (const next) r where who = pipeIn bus diff --git a/hbs2-git/hbs2-git-oracle/lib/HBS2/Git/Oracle/Facts.hs b/hbs2-git/hbs2-git-oracle/lib/HBS2/Git/Oracle/Facts.hs index 2df02ae4..39d6caac 100644 --- a/hbs2-git/hbs2-git-oracle/lib/HBS2/Git/Oracle/Facts.hs +++ b/hbs2-git/hbs2-git-oracle/lib/HBS2/Git/Oracle/Facts.hs @@ -30,6 +30,11 @@ newtype GitLwwSeq = GitLwwSeq Word64 deriving stock (Generic,Data) deriving newtype (ToField) + +newtype GitRepoHeadSeq = GitRepoHeadSeq Word64 + deriving stock (Generic,Data) + deriving newtype (ToField) + newtype GitRefLog = GitRefLog (RefLogKey HBS2Basic) deriving stock (Generic,Data) deriving newtype (ToField) @@ -58,15 +63,16 @@ data Facts data GitRepoFacts = GitRepoFacts - { gitLwwRef :: GitLwwRef - , gitLwwSeq :: GitLwwSeq - , gitRefLog :: GitRefLog - , gitTx :: GitTx - , gitRepoHead :: GitRepoHeadRef - , gitName :: GitName - , gitBrief :: GitBrief - , gitEncrypted :: GitEncrypted - , gitExtended :: [GitRepoExtended] + { gitLwwRef :: GitLwwRef + , gitLwwSeq :: GitLwwSeq + , gitRefLog :: GitRefLog + , gitTx :: GitTx + , gitRepoHead :: GitRepoHeadRef + , gitRepoHeadSeq :: GitRepoHeadSeq + , gitName :: GitName + , gitBrief :: GitBrief + , gitEncrypted :: GitEncrypted + , gitExtended :: [GitRepoExtended] } deriving stock (Generic,Data) @@ -80,6 +86,7 @@ instance Serialise GitName instance Serialise GitBrief instance Serialise GitRepoExtended instance Serialise GitEncrypted +instance Serialise GitRepoHeadSeq instance ToField HashRef where toField = toField @String . show . pretty @@ -129,5 +136,6 @@ instance HasColumnName GitBrief where instance HasColumnName GitEncrypted where columnName = "gk" - +instance HasColumnName GitRepoHeadSeq where + columnName = "repoheadseq" diff --git a/hbs2-git/hbs2-git-oracle/lib/HBS2/Git/Oracle/Run.hs b/hbs2-git/hbs2-git-oracle/lib/HBS2/Git/Oracle/Run.hs index cd6cf996..7313ff5a 100644 --- a/hbs2-git/hbs2-git-oracle/lib/HBS2/Git/Oracle/Run.hs +++ b/hbs2-git/hbs2-git-oracle/lib/HBS2/Git/Oracle/Run.hs @@ -41,6 +41,10 @@ import System.Process.Typed import Text.InterpolatedString.Perl6 (qc) import System.Environment (getProgName, getArgs) +import System.Posix.Signals + +import System.Exit + {- HLINT ignore "Functor law" -} runOracleIndex :: forall m . MonadUnliftIO m @@ -113,6 +117,7 @@ runOracleIndex auPk = do (GitRefLog rk) (GitTx tx) (GitRepoHeadRef rhh) + (GitRepoHeadSeq (fromIntegral n)) (GitName (Just name)) (GitBrief (Just brief)) (GitEncrypted _repoHeadGK0) @@ -153,16 +158,17 @@ runDump pks = do flip runContT pure do p <- ContT $ withProcessWait cmd + -- p <- lift $ startProcess cmd let ssin = getStdin p let sout = getStdout p client <- newMessagingPipe (sout,ssin) -- ,sout) - void $ ContT $ withAsync $ runMessagingPipe client + mess <- ContT $ bracket (async $ runMessagingPipe client) cancel caller <- makeServiceCaller @BrowserPluginAPI @PIPE (localPeer client) - void $ ContT $ withAsync $ liftIO $ runReaderT (runServiceClient caller) client + broker <- ContT $ bracket (async $ liftIO $ runReaderT (runServiceClient caller) client) cancel wtf <- callService @RpcChannelQuery caller () >>= orThrowUser "can't query rpc" @@ -173,6 +179,19 @@ runDump pks = do liftIO $ LBS.putStr (A.encodePretty val) + hClose ssin + hClose sout + + waitExitCode p + + debug "CLIENT: WTF?" + + -- stopProcess p + -- error "MOTHERFUCKER!" + -- void $ callService @RpcChannelQuery caller () + -- >>= orThrowUser "can't query rpc" + -- liftIO $ exitSuccess + data RpcChannelQuery -- API definition @@ -196,21 +215,18 @@ instance (MonadUnliftIO m, HasOracleEnv m) => HandleMethod m RpcChannelQuery whe withOracleEnv env do items <- withState $ select_ @_ @(HashVal, Text, Text) [qc| - SELECT - g.ref, - gn.name, - gb.brief - FROM - gitrepo AS g - INNER JOIN - gitreponame AS gn ON g.ref = gn.ref - INNER JOIN - gitrepoheadversion AS ghv ON gn.hash = ghv.hash - LEFT JOIN - gitrepobrief AS gb ON g.ref = gb.ref AND ghv.hash = gb.hash - GROUP BY - g.ref, gn.name - |] + select lwwref, name, brief + from ( + select + lwwref + , name + , brief + , max(lwwseq) + , max(repoheadseq) + + from gitrepofact + group by lwwref,name,brief) as s0; + |] let root = object [ "rows" .= items , "desc" .= [ "entity", "name", "brief" ] @@ -242,11 +258,13 @@ runPipe = do chan <- asks _refchanId debug "run pipe" + liftIO $ void $ installHandler sigPIPE Ignore Nothing + flip runContT pure do server <- newMessagingPipe (stdin,stdout) - void $ ContT $ withAsync $ runMessagingPipe server + void $ ContT $ bracket (async $ runMessagingPipe server) cancel void $ ContT $ withAsync $ forever do debug $ yellow "updateState" @@ -254,12 +272,19 @@ runPipe = do pause @'Seconds 60 -- make server protocol responder - -- void $ ContT $ withAsync $ flip - lift $ flip runReaderT server do + serv <- ContT $ withAsync $ flip runReaderT server do runProto @PIPE [ makeResponse (makeServer @BrowserPluginAPI) ] + fix \next -> do + -- debug $ red "YAYAYAYA" + done1 <- hIsClosed stdin + done2 <- hIsClosed stdout + done3 <- hIsEOF stdin + let done = done1 || done2 || done3 + debug $ red "DONE:" <+> pretty done + unless done (pause @'Seconds 0.01 >> next) updateState :: MonadUnliftIO m => Oracle m () updateState = do diff --git a/hbs2-git/hbs2-git-oracle/lib/HBS2/Git/Oracle/State.hs b/hbs2-git/hbs2-git-oracle/lib/HBS2/Git/Oracle/State.hs index de19043a..60e830a2 100644 --- a/hbs2-git/hbs2-git-oracle/lib/HBS2/Git/Oracle/State.hs +++ b/hbs2-git/hbs2-git-oracle/lib/HBS2/Git/Oracle/State.hs @@ -46,6 +46,7 @@ gitRepoFactTable = do , reflog text not null , tx text not null , repohead text not null + , repoheadseq integer not null , name text null , brief text null , gk text null @@ -97,6 +98,7 @@ insertRepoFacts GitRepoFacts{..} = do , gitRefLog , gitTx , gitRepoHead + , gitRepoHeadSeq , gitName , gitBrief , gitEncrypted