This commit is contained in:
Dmitry Zuikov 2024-03-26 16:25:33 +03:00
parent 3388aeaf64
commit 30042b5a51
4 changed files with 198 additions and 58 deletions

View File

@ -9,6 +9,8 @@ import HBS2.Net.Proto.Types
import HBS2.Actors.Peer.Types
import HBS2.Net.Messaging
import HBS2.System.Logger.Simple.ANSI
import Control.Concurrent.STM qualified as STM
import Control.Monad.Reader
import Data.ByteString.Builder qualified as B
@ -83,6 +85,7 @@ runMessagingPipe :: MonadIO m => MessagingPipe -> m ()
runMessagingPipe bus = liftIO do
fix \next -> do
frame <- LBS.hGet who 4 <&> word32 . LBS.toStrict
debug $ "JOPAKITA!!" <+> pretty frame
piece <- LBS.hGet who (fromIntegral frame)
atomically (writeTQueue (inQ bus) piece)
next

View File

@ -7,15 +7,17 @@ import HBS2.Git.Oracle.Run
import Options.Applicative as O
type PKS = PubKey 'Sign HBS2Basic
data RunMode =
RunIndex PKS
| RunDump
| RunDump PKS
| RunPipe
main :: IO ()
main = do
let parser = hsubparser ( pRunIndexCmd <> pRunDumpCmd )
let parser = hsubparser $ pRunIndexCmd <>
pRunDumpCmd <>
pRunPipeCmd
join $ execParser (O.info (parser <**> helper)
( fullDesc
@ -35,8 +37,12 @@ main = do
pRunDumpCmd = command "dump" ( O.info pRunDump (progDesc "run index") )
pRunDump = do
chan <- option pkey ( long "refchan" <> short 'r' <> help "refchan to post" )
pure $ runApp chan RunDump
pure $ runApp chan (RunDump chan)
pRunPipeCmd = command "pipe" ( O.info pRunPipe (progDesc "run pipe mode") )
pRunPipe = do
chan <- option pkey ( long "refchan" <> short 'r' <> help "refchan for queries" )
pure $ runApp chan RunPipe
runApp :: MonadUnliftIO m
@ -50,9 +56,11 @@ runApp chan mode = do
setLogging @ERROR (toStderr . logPrefix "[error] ")
setLogging @NOTICE (toStderr . logPrefix "[debug] ")
runWithOracleEnv chan $ case mode of
RunIndex a -> runOracleIndex a
RunDump{} -> runDump
case mode of
RunIndex a -> runWithOracleEnv chan $ runOracleIndex a
RunPipe{} -> runWithOracleEnv chan $ runPipe
RunDump pks -> runDump pks
`finally` do
setLoggingOff @DEBUG
@ -61,4 +69,3 @@ runApp chan mode = do
setLoggingOff @NOTICE

View File

@ -7,12 +7,16 @@ module HBS2.Git.Oracle.Prelude
, module HBS2.Net.Auth.Credentials
, module HBS2.Storage
, module HBS2.Misc.PrettyStuff
, module HBS2.System.Logger.Simple.ANSI
, module HBS2.Net.Messaging
, module HBS2.Net.Proto.Service
, module HBS2.Net.Messaging.Pipe
, module HBS2.Peer.Proto.RefLog
, module HBS2.Peer.Proto.LWWRef
, module HBS2.Peer.Proto.RefChan
, module HBS2.Net.Proto.Service
, module HBS2.Peer.RPC.API.Peer
, module HBS2.Peer.RPC.API.RefLog
, module HBS2.Peer.RPC.API.RefChan
@ -35,8 +39,12 @@ import HBS2.Net.Auth.Schema
import HBS2.Net.Auth.Credentials
import HBS2.Net.Proto.Service
import HBS2.Peer.Proto.RefChan
import HBS2.Net.Messaging
import HBS2.Net.Messaging.Pipe
import HBS2.Actors.Peer
import HBS2.Storage
import HBS2.Misc.PrettyStuff
import HBS2.System.Logger.Simple.ANSI
import HBS2.Peer.Proto.LWWRef

View File

@ -1,8 +1,14 @@
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE PolyKinds #-}
{-# LANGUAGE NumericUnderscores #-}
module HBS2.Git.Oracle.Run where
import HBS2.Git.Oracle.Prelude
import HBS2.Git.Oracle.App
import HBS2.Actors.Peer
import HBS2.Hash
import HBS2.Merkle
import HBS2.Data.Types.SignedBox
@ -12,11 +18,13 @@ import HBS2.KeyMan.Keys.Direct
import HBS2.Git.Data.LWWBlock
import HBS2.Git.Data.Tx
import Data.ByteString.Lazy (ByteString)
import Data.Maybe
import Lens.Micro.Platform hiding ( (.=) )
import Data.Aeson
import Data.Aeson as Aeson
import Data.Aeson.Encode.Pretty qualified as A
import Data.Word
import Streaming.Prelude qualified as S
@ -26,10 +34,13 @@ import Data.Coerce
import Data.Ord
import Data.Text qualified as Text
import Data.HashMap.Strict qualified as HM
import Control.Monad.Trans.Except
import Data.List
import Data.ByteString.Lazy qualified as LBS
import Safe
import System.Process.Typed
import System.Environment (getProgName, getArgs)
import System.Exit
type PKS = PubKey 'Sign HBS2Basic
{- HLINT ignore "Functor law" -}
@ -162,57 +173,168 @@ runOracleIndex auPk = do
runDump :: forall m . MonadUnliftIO m
=> PKS
-> m ()
runDump pks = do
self <- liftIO getProgName
debug $ "fucking dump!" <+> pretty self
let cmd = proc "hbs2-git-oracle" ["pipe", "-r", show (pretty (AsBase58 pks))]
& setStdin createPipe
& setStdout createPipe
-- let w
flip runContT pure do
-- p <- ContT $ withProcessWait cmd
p <- lift $ startProcess cmd -- ContT $ withProcessWait cmd
pause @'Seconds 1
let ssin = getStdin p
let sout = getStdout p
client <- newMessagingPipe (sout,ssin) -- ,sout)
-- forever do
-- liftIO $ LBS.hPutStr ssin "\x10 AAAAAAAAAAAAAAAAAAAAAAA\r\n"
-- hFlush ssin
-- pause @'Seconds 1
void $ ContT $ withAsync $ runMessagingPipe client
debug "YAY!"
caller <- makeServiceCaller @BrowserPluginAPI @PIPE (localPeer client)
-- pause @'Seconds 2
forever do
wtf <- callService @RpcChannelQuery caller ()
>>= orThrowUser "can't query rpc"
r <- ContT $ maybe1 wtf (pure ())
let val = Aeson.decode @Value r
liftIO $ LBS.putStr (A.encodePretty val)
data RpcChannelQuery
-- API definition
type BrowserPluginAPI = '[ RpcChannelQuery ]
-- API endpoint definition
type instance Input RpcChannelQuery = ()
type instance Output RpcChannelQuery = Maybe ByteString
class HasOracleEnv m where
getOracleEnv :: m OracleEnv
-- API handler
instance (MonadUnliftIO m, HasOracleEnv m) => HandleMethod m RpcChannelQuery where
handleMethod _ = do
env <- getOracleEnv
let chan = _refchanId env
let rchanAPI = _refchanAPI env
let sto = _storage env
runMaybeT do
debug "WTF!!"
rv <- lift (callRpcWaitMay @RpcRefChanGet (TimeoutSec 1) rchanAPI chan)
>>= toMPlus >>= toMPlus
liftIO $ print $ pretty rv
facts <- S.toList_ do
walkMerkle @[HashRef] (fromHashRef rv) (getBlock sto) $ \case
Left{} -> pure ()
Right txs -> do
for_ txs $ \htx -> void $ runMaybeT do
getBlock sto (fromHashRef htx)
>>= toMPlus
<&> deserialiseOrFail @(RefChanUpdate L4Proto)
>>= toMPlus
>>= \case
Propose _ box -> pure box
_ -> mzero
<&> unboxSignedBox0
>>= toMPlus
<&> snd
>>= \(ProposeTran _ box) -> toMPlus (unboxSignedBox0 box)
<&> snd
<&> deserialiseOrFail @GitRepoFacts . LBS.fromStrict
>>= toMPlus
>>= lift . S.yield
let rf = [ (HashRef (hashObject $ serialise f), f)
| f@GitRepoFact1{} <- universeBi facts
] & HM.fromListWith (\v1 v2 -> if gitLwwSeq v1 > gitLwwSeq v2 then v1 else v2)
let rhf = [ (h,f) | (GitRepoHeadFact h f) <- universeBi facts ]
& HM.fromList
items <- S.toList_ $ for_ (HM.toList rf) $ \(k, GitRepoFact1{..}) -> do
let d = HM.lookup k rhf
let nm = maybe "" gitRepoName d
let brief = maybe "" gitRepoBrief d
S.yield $ object [ "item_id" .= show (pretty gitLwwRef)
, "item_title" .= show (pretty nm)
, "item_brief" .= show (pretty brief)
]
let root = object [ "items" .= items
, "state" .= show (pretty rv)
]
pure $ A.encodePretty root
-- Codec for protocol
instance HasProtocol PIPE (ServiceProto BrowserPluginAPI PIPE) where
type instance ProtocolId (ServiceProto BrowserPluginAPI PIPE) = 0xDEADF00D123
type instance Encoded PIPE = ByteString
decode = either (error.show) Just . deserialiseOrFail
encode = serialise
-- Some "deferred" implementation for our monad
-- note -- plain asyncs may cause to resource leak
instance (MonadUnliftIO m, HasProtocol PIPE (ServiceProto api PIPE))
=> HasDeferred (ServiceProto api PIPE) PIPE m where
deferred m = void (async m)
-- FIXME: looks-hacky
instance (Monad (t (Oracle m)), MonadIO m, MonadTrans t) => HasOracleEnv (ResponseM PIPE (t (Oracle m))) where
getOracleEnv = lift $ lift ask
runPipe :: forall m . MonadUnliftIO m
=> Oracle m ()
runDump = do
runPipe = do
chan <- asks _refchanId
rchanAPI <- asks _refchanAPI
sto <- asks _storage
debug "run pipe"
void $ runMaybeT do
liftIO $ hSetBuffering stdin NoBuffering
rv <- lift (callRpcWaitMay @RpcRefChanGet (TimeoutSec 1) rchanAPI chan)
>>= toMPlus >>= toMPlus
-- liftIO $ LBS.getContents >>= LBS.hPutStr stderr
-- forever (pause @'Seconds 10)
liftIO $ print $ pretty rv
flip runContT pure do
facts <- S.toList_ do
walkMerkle @[HashRef] (fromHashRef rv) (getBlock sto) $ \case
Left{} -> pure ()
Right txs -> do
for_ txs $ \htx -> void $ runMaybeT do
getBlock sto (fromHashRef htx)
>>= toMPlus
<&> deserialiseOrFail @(RefChanUpdate L4Proto)
>>= toMPlus
>>= \case
Propose _ box -> pure box
_ -> mzero
<&> unboxSignedBox0
>>= toMPlus
<&> snd
>>= \(ProposeTran _ box) -> toMPlus (unboxSignedBox0 box)
<&> snd
<&> deserialiseOrFail @GitRepoFacts . LBS.fromStrict
>>= toMPlus
>>= lift . S.yield
server <- newMessagingPipe (stdin,stdout)
let rf = [ (HashRef (hashObject $ serialise f), f)
| f@GitRepoFact1{} <- universeBi facts
] & HM.fromListWith (\v1 v2 -> if gitLwwSeq v1 > gitLwwSeq v2 then v1 else v2)
void $ ContT $ withAsync $ runMessagingPipe server
let rhf = [ (h,f) | (GitRepoHeadFact h f) <- universeBi facts ]
& HM.fromList
items <- S.toList_ $ for_ (HM.toList rf) $ \(k, GitRepoFact1{..}) -> do
let d = HM.lookup k rhf
let nm = maybe "" gitRepoName d
let brief = maybe "" gitRepoBrief d
S.yield $ object [ "item_id" .= show (pretty gitLwwRef)
, "item_title" .= show (pretty nm)
, "item_brief" .= show (pretty brief)
]
liftIO $ LBS.putStr $ A.encodePretty $ object [ "items" .= items ]
-- make server protocol responder
-- void $ ContT $ withAsync $ flip
lift $ flip runReaderT server do
runProto @PIPE
[ makeResponse (makeServer @BrowserPluginAPI)
]