diff --git a/hbs2-core/lib/HBS2/Net/Messaging/Pipe.hs b/hbs2-core/lib/HBS2/Net/Messaging/Pipe.hs index 4a132315..f3af1d22 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/Pipe.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/Pipe.hs @@ -9,6 +9,8 @@ import HBS2.Net.Proto.Types import HBS2.Actors.Peer.Types import HBS2.Net.Messaging +import HBS2.System.Logger.Simple.ANSI + import Control.Concurrent.STM qualified as STM import Control.Monad.Reader import Data.ByteString.Builder qualified as B @@ -83,6 +85,7 @@ runMessagingPipe :: MonadIO m => MessagingPipe -> m () runMessagingPipe bus = liftIO do fix \next -> do frame <- LBS.hGet who 4 <&> word32 . LBS.toStrict + debug $ "JOPAKITA!!" <+> pretty frame piece <- LBS.hGet who (fromIntegral frame) atomically (writeTQueue (inQ bus) piece) next diff --git a/hbs2-git/hbs2-git-oracle/app/Main.hs b/hbs2-git/hbs2-git-oracle/app/Main.hs index 6dfa5a6b..1a06ffb0 100644 --- a/hbs2-git/hbs2-git-oracle/app/Main.hs +++ b/hbs2-git/hbs2-git-oracle/app/Main.hs @@ -7,15 +7,17 @@ import HBS2.Git.Oracle.Run import Options.Applicative as O -type PKS = PubKey 'Sign HBS2Basic data RunMode = RunIndex PKS - | RunDump + | RunDump PKS + | RunPipe main :: IO () main = do - let parser = hsubparser ( pRunIndexCmd <> pRunDumpCmd ) + let parser = hsubparser $ pRunIndexCmd <> + pRunDumpCmd <> + pRunPipeCmd join $ execParser (O.info (parser <**> helper) ( fullDesc @@ -35,8 +37,12 @@ main = do pRunDumpCmd = command "dump" ( O.info pRunDump (progDesc "run index") ) pRunDump = do chan <- option pkey ( long "refchan" <> short 'r' <> help "refchan to post" ) - pure $ runApp chan RunDump + pure $ runApp chan (RunDump chan) + pRunPipeCmd = command "pipe" ( O.info pRunPipe (progDesc "run pipe mode") ) + pRunPipe = do + chan <- option pkey ( long "refchan" <> short 'r' <> help "refchan for queries" ) + pure $ runApp chan RunPipe runApp :: MonadUnliftIO m @@ -50,9 +56,11 @@ runApp chan mode = do setLogging @ERROR (toStderr . logPrefix "[error] ") setLogging @NOTICE (toStderr . logPrefix "[debug] ") - runWithOracleEnv chan $ case mode of - RunIndex a -> runOracleIndex a - RunDump{} -> runDump + + case mode of + RunIndex a -> runWithOracleEnv chan $ runOracleIndex a + RunPipe{} -> runWithOracleEnv chan $ runPipe + RunDump pks -> runDump pks `finally` do setLoggingOff @DEBUG @@ -61,4 +69,3 @@ runApp chan mode = do setLoggingOff @NOTICE - diff --git a/hbs2-git/hbs2-git-oracle/lib/HBS2/Git/Oracle/Prelude.hs b/hbs2-git/hbs2-git-oracle/lib/HBS2/Git/Oracle/Prelude.hs index b9f025de..9e3a1984 100644 --- a/hbs2-git/hbs2-git-oracle/lib/HBS2/Git/Oracle/Prelude.hs +++ b/hbs2-git/hbs2-git-oracle/lib/HBS2/Git/Oracle/Prelude.hs @@ -7,12 +7,16 @@ module HBS2.Git.Oracle.Prelude , module HBS2.Net.Auth.Credentials , module HBS2.Storage + , module HBS2.Misc.PrettyStuff , module HBS2.System.Logger.Simple.ANSI + , module HBS2.Net.Messaging + , module HBS2.Net.Proto.Service + , module HBS2.Net.Messaging.Pipe + , module HBS2.Peer.Proto.RefLog , module HBS2.Peer.Proto.LWWRef , module HBS2.Peer.Proto.RefChan - , module HBS2.Net.Proto.Service , module HBS2.Peer.RPC.API.Peer , module HBS2.Peer.RPC.API.RefLog , module HBS2.Peer.RPC.API.RefChan @@ -35,8 +39,12 @@ import HBS2.Net.Auth.Schema import HBS2.Net.Auth.Credentials import HBS2.Net.Proto.Service import HBS2.Peer.Proto.RefChan +import HBS2.Net.Messaging +import HBS2.Net.Messaging.Pipe +import HBS2.Actors.Peer import HBS2.Storage +import HBS2.Misc.PrettyStuff import HBS2.System.Logger.Simple.ANSI import HBS2.Peer.Proto.LWWRef diff --git a/hbs2-git/hbs2-git-oracle/lib/HBS2/Git/Oracle/Run.hs b/hbs2-git/hbs2-git-oracle/lib/HBS2/Git/Oracle/Run.hs index 2e021065..5def5a35 100644 --- a/hbs2-git/hbs2-git-oracle/lib/HBS2/Git/Oracle/Run.hs +++ b/hbs2-git/hbs2-git-oracle/lib/HBS2/Git/Oracle/Run.hs @@ -1,8 +1,14 @@ +{-# LANGUAGE AllowAmbiguousTypes #-} +{-# LANGUAGE UndecidableInstances #-} +{-# LANGUAGE PolyKinds #-} +{-# LANGUAGE NumericUnderscores #-} module HBS2.Git.Oracle.Run where import HBS2.Git.Oracle.Prelude import HBS2.Git.Oracle.App +import HBS2.Actors.Peer + import HBS2.Hash import HBS2.Merkle import HBS2.Data.Types.SignedBox @@ -12,11 +18,13 @@ import HBS2.KeyMan.Keys.Direct import HBS2.Git.Data.LWWBlock import HBS2.Git.Data.Tx +import Data.ByteString.Lazy (ByteString) + import Data.Maybe import Lens.Micro.Platform hiding ( (.=) ) -import Data.Aeson +import Data.Aeson as Aeson import Data.Aeson.Encode.Pretty qualified as A import Data.Word import Streaming.Prelude qualified as S @@ -26,10 +34,13 @@ import Data.Coerce import Data.Ord import Data.Text qualified as Text import Data.HashMap.Strict qualified as HM -import Control.Monad.Trans.Except -import Data.List import Data.ByteString.Lazy qualified as LBS -import Safe +import System.Process.Typed + +import System.Environment (getProgName, getArgs) +import System.Exit + +type PKS = PubKey 'Sign HBS2Basic {- HLINT ignore "Functor law" -} @@ -162,57 +173,168 @@ runOracleIndex auPk = do runDump :: forall m . MonadUnliftIO m + => PKS + -> m () + +runDump pks = do + self <- liftIO getProgName + + debug $ "fucking dump!" <+> pretty self + + let cmd = proc "hbs2-git-oracle" ["pipe", "-r", show (pretty (AsBase58 pks))] + & setStdin createPipe + & setStdout createPipe + + -- let w + + flip runContT pure do + + -- p <- ContT $ withProcessWait cmd + p <- lift $ startProcess cmd -- ContT $ withProcessWait cmd + + pause @'Seconds 1 + + let ssin = getStdin p + let sout = getStdout p + client <- newMessagingPipe (sout,ssin) -- ,sout) + + -- forever do + -- liftIO $ LBS.hPutStr ssin "\x10 AAAAAAAAAAAAAAAAAAAAAAA\r\n" + -- hFlush ssin + -- pause @'Seconds 1 + + void $ ContT $ withAsync $ runMessagingPipe client + + debug "YAY!" + + caller <- makeServiceCaller @BrowserPluginAPI @PIPE (localPeer client) + + -- pause @'Seconds 2 + + forever do + + wtf <- callService @RpcChannelQuery caller () + >>= orThrowUser "can't query rpc" + + r <- ContT $ maybe1 wtf (pure ()) + + let val = Aeson.decode @Value r + + liftIO $ LBS.putStr (A.encodePretty val) + +data RpcChannelQuery + +-- API definition +type BrowserPluginAPI = '[ RpcChannelQuery ] + +-- API endpoint definition +type instance Input RpcChannelQuery = () +type instance Output RpcChannelQuery = Maybe ByteString + +class HasOracleEnv m where + getOracleEnv :: m OracleEnv + +-- API handler +instance (MonadUnliftIO m, HasOracleEnv m) => HandleMethod m RpcChannelQuery where + handleMethod _ = do + env <- getOracleEnv + let chan = _refchanId env + let rchanAPI = _refchanAPI env + let sto = _storage env + + runMaybeT do + + debug "WTF!!" + + rv <- lift (callRpcWaitMay @RpcRefChanGet (TimeoutSec 1) rchanAPI chan) + >>= toMPlus >>= toMPlus + + liftIO $ print $ pretty rv + + facts <- S.toList_ do + walkMerkle @[HashRef] (fromHashRef rv) (getBlock sto) $ \case + Left{} -> pure () + Right txs -> do + for_ txs $ \htx -> void $ runMaybeT do + getBlock sto (fromHashRef htx) + >>= toMPlus + <&> deserialiseOrFail @(RefChanUpdate L4Proto) + >>= toMPlus + >>= \case + Propose _ box -> pure box + _ -> mzero + <&> unboxSignedBox0 + >>= toMPlus + <&> snd + >>= \(ProposeTran _ box) -> toMPlus (unboxSignedBox0 box) + <&> snd + <&> deserialiseOrFail @GitRepoFacts . LBS.fromStrict + >>= toMPlus + >>= lift . S.yield + + let rf = [ (HashRef (hashObject $ serialise f), f) + | f@GitRepoFact1{} <- universeBi facts + ] & HM.fromListWith (\v1 v2 -> if gitLwwSeq v1 > gitLwwSeq v2 then v1 else v2) + + + let rhf = [ (h,f) | (GitRepoHeadFact h f) <- universeBi facts ] + & HM.fromList + + items <- S.toList_ $ for_ (HM.toList rf) $ \(k, GitRepoFact1{..}) -> do + let d = HM.lookup k rhf + let nm = maybe "" gitRepoName d + let brief = maybe "" gitRepoBrief d + + S.yield $ object [ "item_id" .= show (pretty gitLwwRef) + , "item_title" .= show (pretty nm) + , "item_brief" .= show (pretty brief) + ] + + let root = object [ "items" .= items + , "state" .= show (pretty rv) + ] + + pure $ A.encodePretty root + +-- Codec for protocol +instance HasProtocol PIPE (ServiceProto BrowserPluginAPI PIPE) where + type instance ProtocolId (ServiceProto BrowserPluginAPI PIPE) = 0xDEADF00D123 + type instance Encoded PIPE = ByteString + decode = either (error.show) Just . deserialiseOrFail + encode = serialise + +-- Some "deferred" implementation for our monad +-- note -- plain asyncs may cause to resource leak +instance (MonadUnliftIO m, HasProtocol PIPE (ServiceProto api PIPE)) + => HasDeferred (ServiceProto api PIPE) PIPE m where + deferred m = void (async m) + +-- FIXME: looks-hacky +instance (Monad (t (Oracle m)), MonadIO m, MonadTrans t) => HasOracleEnv (ResponseM PIPE (t (Oracle m))) where + getOracleEnv = lift $ lift ask + +runPipe :: forall m . MonadUnliftIO m => Oracle m () -runDump = do + +runPipe = do chan <- asks _refchanId - rchanAPI <- asks _refchanAPI - sto <- asks _storage + debug "run pipe" - void $ runMaybeT do + liftIO $ hSetBuffering stdin NoBuffering - rv <- lift (callRpcWaitMay @RpcRefChanGet (TimeoutSec 1) rchanAPI chan) - >>= toMPlus >>= toMPlus + -- liftIO $ LBS.getContents >>= LBS.hPutStr stderr + -- forever (pause @'Seconds 10) - liftIO $ print $ pretty rv + flip runContT pure do - facts <- S.toList_ do - walkMerkle @[HashRef] (fromHashRef rv) (getBlock sto) $ \case - Left{} -> pure () - Right txs -> do - for_ txs $ \htx -> void $ runMaybeT do - getBlock sto (fromHashRef htx) - >>= toMPlus - <&> deserialiseOrFail @(RefChanUpdate L4Proto) - >>= toMPlus - >>= \case - Propose _ box -> pure box - _ -> mzero - <&> unboxSignedBox0 - >>= toMPlus - <&> snd - >>= \(ProposeTran _ box) -> toMPlus (unboxSignedBox0 box) - <&> snd - <&> deserialiseOrFail @GitRepoFacts . LBS.fromStrict - >>= toMPlus - >>= lift . S.yield + server <- newMessagingPipe (stdin,stdout) - let rf = [ (HashRef (hashObject $ serialise f), f) - | f@GitRepoFact1{} <- universeBi facts - ] & HM.fromListWith (\v1 v2 -> if gitLwwSeq v1 > gitLwwSeq v2 then v1 else v2) + void $ ContT $ withAsync $ runMessagingPipe server - - let rhf = [ (h,f) | (GitRepoHeadFact h f) <- universeBi facts ] - & HM.fromList - - items <- S.toList_ $ for_ (HM.toList rf) $ \(k, GitRepoFact1{..}) -> do - let d = HM.lookup k rhf - let nm = maybe "" gitRepoName d - let brief = maybe "" gitRepoBrief d - - S.yield $ object [ "item_id" .= show (pretty gitLwwRef) - , "item_title" .= show (pretty nm) - , "item_brief" .= show (pretty brief) - ] - - liftIO $ LBS.putStr $ A.encodePretty $ object [ "items" .= items ] + -- make server protocol responder + -- void $ ContT $ withAsync $ flip + lift $ flip runReaderT server do + runProto @PIPE + [ makeResponse (makeServer @BrowserPluginAPI) + ]