fixed IPC/RPC

This commit is contained in:
Dmitry Zuikov 2024-03-26 16:43:12 +03:00
parent 30042b5a51
commit 0aec2526ea
2 changed files with 15 additions and 18 deletions

View File

@ -75,7 +75,8 @@ instance Messaging MessagingPipe PIPE ByteString where
receive bus _ = do receive bus _ = do
msg <- liftIO $ atomically $ peekTQueue q >> STM.flushTQueue q msg <- liftIO $ atomically $ peekTQueue q >> STM.flushTQueue q
for msg $ \m -> pure (From (PeerPIPE (PipeAddr who)), m) for msg $ \m -> do
pure (From (PeerPIPE (PipeAddr who)), m)
where where
q = inQ bus q = inQ bus
@ -85,7 +86,6 @@ runMessagingPipe :: MonadIO m => MessagingPipe -> m ()
runMessagingPipe bus = liftIO do runMessagingPipe bus = liftIO do
fix \next -> do fix \next -> do
frame <- LBS.hGet who 4 <&> word32 . LBS.toStrict frame <- LBS.hGet who 4 <&> word32 . LBS.toStrict
debug $ "JOPAKITA!!" <+> pretty frame
piece <- LBS.hGet who (fromIntegral frame) piece <- LBS.hGet who (fromIntegral frame)
atomically (writeTQueue (inQ bus) piece) atomically (writeTQueue (inQ bus) piece)
next next

View File

@ -13,6 +13,11 @@ import HBS2.Hash
import HBS2.Merkle import HBS2.Merkle
import HBS2.Data.Types.SignedBox import HBS2.Data.Types.SignedBox
import HBS2.Net.Messaging
import HBS2.Net.Messaging.Pipe
import HBS2.Net.Proto.Service
import HBS2.Actors.Peer
import HBS2.KeyMan.Keys.Direct import HBS2.KeyMan.Keys.Direct
import HBS2.Git.Data.LWWBlock import HBS2.Git.Data.LWWBlock
@ -192,8 +197,6 @@ runDump pks = do
-- p <- ContT $ withProcessWait cmd -- p <- ContT $ withProcessWait cmd
p <- lift $ startProcess cmd -- ContT $ withProcessWait cmd p <- lift $ startProcess cmd -- ContT $ withProcessWait cmd
pause @'Seconds 1
let ssin = getStdin p let ssin = getStdin p
let sout = getStdout p let sout = getStdout p
client <- newMessagingPipe (sout,ssin) -- ,sout) client <- newMessagingPipe (sout,ssin) -- ,sout)
@ -205,22 +208,18 @@ runDump pks = do
void $ ContT $ withAsync $ runMessagingPipe client void $ ContT $ withAsync $ runMessagingPipe client
debug "YAY!"
caller <- makeServiceCaller @BrowserPluginAPI @PIPE (localPeer client) caller <- makeServiceCaller @BrowserPluginAPI @PIPE (localPeer client)
-- pause @'Seconds 2 void $ ContT $ withAsync $ liftIO $ runReaderT (runServiceClient caller) client
forever do wtf <- callService @RpcChannelQuery caller ()
>>= orThrowUser "can't query rpc"
wtf <- callService @RpcChannelQuery caller () r <- ContT $ maybe1 wtf (pure ())
>>= orThrowUser "can't query rpc"
r <- ContT $ maybe1 wtf (pure ()) let val = Aeson.decode @Value r
let val = Aeson.decode @Value r liftIO $ LBS.putStr (A.encodePretty val)
liftIO $ LBS.putStr (A.encodePretty val)
data RpcChannelQuery data RpcChannelQuery
@ -244,12 +243,10 @@ instance (MonadUnliftIO m, HasOracleEnv m) => HandleMethod m RpcChannelQuery whe
runMaybeT do runMaybeT do
debug "WTF!!"
rv <- lift (callRpcWaitMay @RpcRefChanGet (TimeoutSec 1) rchanAPI chan) rv <- lift (callRpcWaitMay @RpcRefChanGet (TimeoutSec 1) rchanAPI chan)
>>= toMPlus >>= toMPlus >>= toMPlus >>= toMPlus
liftIO $ print $ pretty rv debug $ "AAAAAA" <+> pretty rv
facts <- S.toList_ do facts <- S.toList_ do
walkMerkle @[HashRef] (fromHashRef rv) (getBlock sto) $ \case walkMerkle @[HashRef] (fromHashRef rv) (getBlock sto) $ \case
@ -320,7 +317,7 @@ runPipe = do
chan <- asks _refchanId chan <- asks _refchanId
debug "run pipe" debug "run pipe"
liftIO $ hSetBuffering stdin NoBuffering -- liftIO $ hSetBuffering stdin NoBuffering
-- liftIO $ LBS.getContents >>= LBS.hPutStr stderr -- liftIO $ LBS.getContents >>= LBS.hPutStr stderr
-- forever (pause @'Seconds 10) -- forever (pause @'Seconds 10)