From dd61c4a09c3faf85178bcf83b7334802bf2b936a Mon Sep 17 00:00:00 2001 From: voidlizard Date: Sun, 3 Nov 2024 12:10:02 +0300 Subject: [PATCH] wip --- hbs2-core/lib/HBS2/Net/Messaging/TCP.hs | 123 +++++++++--------- hbs2-peer/app/PeerMain.hs | 26 +++- hbs2-peer/app/RPC2.hs | 92 ++++++++++++- hbs2-peer/lib/HBS2/Peer/RPC/API/Peer.hs | 6 + .../Data/Config/Suckless/Script/Internal.hs | 6 + 5 files changed, 188 insertions(+), 65 deletions(-) diff --git a/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs b/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs index 6a2bf6f8..b14943b3 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs @@ -60,6 +60,7 @@ data MessagingTCP = , _tcpCookie :: Word32 , _tcpPeerConn :: TVar (HashMap (Peer L4Proto) Word64) , _tcpPeerCookie :: TVar (HashMap Word32 Int) + , _tcpPeerSocket :: TVar (HashMap (Peer L4Proto) Socket) , _tcpConnDemand :: TQueue (Peer L4Proto) , _tcpReceived :: TBQueue (Peer L4Proto, ByteString) , _tcpSent :: TVar (HashMap (Peer L4Proto) (TBQueue ByteString)) @@ -100,6 +101,7 @@ newMessagingTCP pa = liftIO do <*> randomIO <*> newTVarIO mempty <*> newTVarIO mempty + <*> newTVarIO mempty <*> newTQueueIO <*> newTBQueueIO outMessageQLen <*> newTVarIO mempty @@ -234,6 +236,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do atomically do pips <- readTVar _tcpPeerConn modifyTVar _tcpSent (HM.filterWithKey (\k _ -> HM.member k pips)) + modifyTVar _tcpPeerSocket (HM.filterWithKey (\k _ -> HM.member k pips)) modifyTVar _tcpPeerCookie (HM.filter (>=1)) waitAnyCatchCancel [p1,p2,probes,sweep] @@ -310,6 +313,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do atomically do modifyTVar _tcpSent (HM.insert newP newOutQ) modifyTVar _tcpPeerConn (HM.insert newP (connectionId myCookie cookie)) + modifyTVar _tcpPeerSocket (HM.insert newP so) wr <- ContT $ withAsync $ forever do bs <- atomically $ readTBQueue newOutQ @@ -330,6 +334,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do void $ ContT $ bracket none $ const do debug $ "SHUTDOWN SOCKET AND SHIT" <+> pretty remote atomically $ modifyTVar _tcpServerThreadsCount pred + atomically $ modifyTVar _tcpPeerSocket (HM.delete newP) shutdown so ShutdownBoth cancel rd cancel wr @@ -343,11 +348,9 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do runClient = flip runContT pure do - own <- toPeerAddr $ view tcpOwnPeer env - let (L4Address _ (IPAddrPort (i,_))) = own let myCookie = view tcpCookie env - pause @'Seconds 10 + pause @'Seconds 3.14 void $ ContT $ bracket none $ const $ do what <- atomically $ stateTVar _tcpClientThreads (\x -> (HM.elems x, mempty)) @@ -363,76 +366,78 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do atomically $ modifyTVar _tcpClientThreads (HM.filterWithKey (\k _ -> not (HS.member k done))) forever $ void $ runMaybeT do - -- client sockets + -- client sockets - -- смотрим к кому надо - who <- atomically $ readTQueue _tcpConnDemand - whoAddr <- toPeerAddr who + -- смотрим к кому надо + who <- atomically $ readTQueue _tcpConnDemand + whoAddr <- toPeerAddr who - debug $ "DEMAND:" <+> pretty who + debug $ "DEMAND:" <+> pretty who - already <- atomically $ readTVar _tcpPeerConn <&> HM.member who + already <- atomically $ readTVar _tcpPeerConn <&> HM.member who - when already do - debug "SHIT? BUSYLOOP?" - mzero + when already do + debug "SHIT? BUSYLOOP?" + mzero - liftIO $ newClientThread env $ do - let (L4Address _ (IPAddrPort (ip,port))) = whoAddr - connect (show ip) (show port) $ \(so, remoteAddr) -> do + liftIO $ newClientThread env $ do + let (L4Address _ (IPAddrPort (ip,port))) = whoAddr + connect (show ip) (show port) $ \(so, remoteAddr) -> do - let ?env = env + let ?env = env - flip runContT pure $ callCC \exit -> do + flip runContT pure $ callCC \exit -> do - debug $ "OPEN CLIENT CONNECTION" <+> pretty ip <+> pretty port <+> pretty remoteAddr - cookie <- handshake Client env so - let connId = connectionId cookie myCookie + debug $ "OPEN CLIENT CONNECTION" <+> pretty ip <+> pretty port <+> pretty remoteAddr + cookie <- handshake Client env so + let connId = connectionId cookie myCookie - when (cookie == myCookie) $ do - debug $ "same peer, exit" <+> pretty remoteAddr - exit () + when (cookie == myCookie) $ do + debug $ "same peer, exit" <+> pretty remoteAddr + exit () - here <- useCookie cookie + here <- useCookie cookie - -- TODO: handshake notification - liftIO $ _tcpOnClientStarted whoAddr connId + -- TODO: handshake notification + liftIO $ _tcpOnClientStarted whoAddr connId - when here do - debug $ "CLIENT: ALREADY CONNECTED" <+> pretty cookie <+> pretty ip <+> pretty port - exit () + when here do + debug $ "CLIENT: ALREADY CONNECTED" <+> pretty cookie <+> pretty ip <+> pretty port + exit () + atomically do + modifyTVar _tcpPeerCookie (HM.insertWith (+) cookie 1) + modifyTVar _tcpPeerConn (HM.insert who connId) + modifyTVar _tcpPeerSocket (HM.insert who so) + + wr <- ContT $ withAsync $ forever do + bss <- atomically do + q' <- readTVar _tcpSent <&> HM.lookup who + maybe1 q' mempty $ \q -> do + s <- readTBQueue q + sx <- flushTBQueue q + pure (s:sx) + + for_ bss $ \bs -> do + -- FIXME: check-this! + let pq = myCookie -- randomIO + let qids = bytestring32 pq + let size = bytestring32 (fromIntegral $ LBS.length bs) + + let frame = LBS.fromStrict qids + <> LBS.fromStrict size -- req-size + <> bs -- payload + + sendLazy so frame --(LBS.toStrict frame) + + void $ ContT $ bracket none $ const $ do + debug "!!! TCP: BRACKET FIRED IN CLIENT !!!" atomically do - modifyTVar _tcpPeerCookie (HM.insertWith (+) cookie 1) - modifyTVar _tcpPeerConn (HM.insert who connId) + modifyTVar _tcpPeerConn (HM.delete who) + modifyTVar _tcpPeerCookie (HM.update killCookie cookie) + modifyTVar _tcpPeerSocket (HM.delete who) - wr <- ContT $ withAsync $ forever do - bss <- atomically do - q' <- readTVar _tcpSent <&> HM.lookup who - maybe1 q' mempty $ \q -> do - s <- readTBQueue q - sx <- flushTBQueue q - pure (s:sx) + void $ ContT $ bracket none (const $ cancel wr) - for_ bss $ \bs -> do - -- FIXME: check-this! - let pq = myCookie -- randomIO - let qids = bytestring32 pq - let size = bytestring32 (fromIntegral $ LBS.length bs) - - let frame = LBS.fromStrict qids - <> LBS.fromStrict size -- req-size - <> bs -- payload - - sendLazy so frame --(LBS.toStrict frame) - - void $ ContT $ bracket none (const $ cancel wr) - - void $ ContT $ bracket none $ const $ do - debug "!!! TCP: BRACKET FIRED IN CLIENT !!!" - atomically do - modifyTVar _tcpPeerConn (HM.delete who) - modifyTVar _tcpPeerCookie (HM.update killCookie cookie) - - readFrames so who _tcpReceived + readFrames so who _tcpReceived diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index a4ee79d3..412398bf 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -105,8 +105,11 @@ import Data.List qualified as L import Data.Map (Map) import Data.Map qualified as Map import Data.Maybe +import Data.Either import Data.Set qualified as Set import Data.Set (Set) +import Data.Text qualified as Text +import Data.Text.IO qualified as Text import Data.Time.Clock.POSIX import Data.Time.Format import Lens.Micro.Platform as Lens @@ -264,6 +267,7 @@ runCLI = do <> command "bypass" (info pByPass (progDesc "bypass")) <> command "gc" (info pRunGC (progDesc "run RAM garbage collector")) <> command "probes" (info pRunProbes (progDesc "show probes")) + <> command "do" (info pDoScript (progDesc "execute a command in peer context")) <> command "version" (info pVersion (progDesc "show program version")) ) @@ -391,11 +395,9 @@ runCLI = do pPeers = do rpc <- pRpcCommon pure $ withMyRPC @PeerAPI rpc $ \caller -> do - r <- callService @RpcPeers caller () - case r of - Left e -> err (viaShow e) - Right p -> do - print $ vcat (fmap fmt p) + r <- callRpcWaitRetry @RpcPeers (TimeoutSec 1) 2 caller () + >>= orThrowUser "rpc timeout" + liftIO $ print $ vcat (fmap fmt r) where fmt (a, b) = pretty (AsBase58 a) <+> pretty b @@ -608,6 +610,20 @@ runCLI = do liftIO $ print $ vcat (fmap pretty p) + pDoScript = do + rpc <- pRpcCommon + argz <- many (strArgument (metavar "TERM" <> help "script terms")) + pure do + let s = unlines $ unwords <$> splitForms argz + + withMyRPC @PeerAPI rpc $ \caller -> do + + r <- callRpcWaitRetry @RpcRunScript (TimeoutSec 1) 3 caller (Text.pack s) + >>= orThrowUser "rpc timeout" + + for_ (parseTop r & fromRight mempty) \sexy -> do + liftIO $ hPutDoc stdout (pretty sexy) + refP :: ReadM (PubKey 'Sign 'HBS2Basic) refP = maybeReader fromStringMay diff --git a/hbs2-peer/app/RPC2.hs b/hbs2-peer/app/RPC2.hs index dd1c9e6d..2bb15603 100644 --- a/hbs2-peer/app/RPC2.hs +++ b/hbs2-peer/app/RPC2.hs @@ -1,15 +1,105 @@ +{-# OPTIONS_GHC -fno-warn-orphans #-} +{-# Language UndecidableInstances #-} module RPC2 ( module RPC2.Peer , module RPC2.RefLog , module RPC2.RefChan , module RPC2.LWWRef - , module RPC2.Mailbox + , HandleMethod(..) + -- , module RPC2.Mailbox ) where +import HBS2.Prelude.Plated +import HBS2.Net.Proto.Service +import HBS2.Net.Proto.Sessions + +import HBS2.Base58 +import HBS2.Data.Types.Peer +import HBS2.Actors.Peer +import HBS2.Peer.Proto.Peer +import HBS2.Clock +import HBS2.Net.Auth.Schema + +import HBS2.Peer.RPC.Internal.Types +import HBS2.Peer.RPC.API.Peer + +import Data.Config.Suckless.Script + import RPC2.Peer import RPC2.RefLog import RPC2.RefChan import RPC2.LWWRef import RPC2.Mailbox() +import PeerTypes +import PeerInfo + +import UnliftIO + +import Data.Text qualified as Text +import Data.Either +import Data.Maybe +import Numeric + +instance (e ~ L4Proto, MonadUnliftIO m, HasRpcContext PeerAPI RPC2Context m) => HandleMethod m RpcRunScript where + handleMethod top = do + + co <- getRpcContext @PeerAPI + + let cli = parseTop top & fromRight mempty + + r <- try @_ @SomeException (run (dict co) cli) + + either (pure . Text.pack . show) (pure . Text.pack . show . pretty) r + + where + + dict RPC2Context{..} = makeDict @_ @m do + entry $ bindMatch "hey" $ const do + pure $ mkSym @C "hey" + + entry $ bindMatch "peer-info" $ const do + + now <- getTimeCoarse + + liftIO $ withPeerM rpcPeerEnv do + pl <- getPeerLocator @e + pips <- knownPeers @e pl + npi <- newPeerInfo + + r <- for pips $ \p -> do + pinfo@PeerInfo{..} <- fetch True npi (PeerInfoKey p) id + burst <- readTVarIO _peerBurst + buM <- readTVarIO _peerBurstMax + errors <- readTVarIO _peerErrorsPerSec + downFails <- readTVarIO _peerDownloadFail + downMiss <- readTVarIO _peerDownloadMiss + down <- readTVarIO _peerDownloadedBlk + rtt <- medianPeerRTT pinfo <&> fmap realToFrac + seen <- readTVarIO _peerLastWatched + let l = realToFrac (toNanoSecs $ now - seen) / 1e9 + + let rttMs = (/1e6) <$> rtt <&> (\x -> showGFloat (Just 2) x "") <&> (<> "ms") + let ls = showGFloat (Just 2) l "" <> "s" + + mpde <- find (KnownPeerKey p) id + let pk = maybe1 mpde mempty $ \PeerData{..} -> do + [ mkList [ mkSym "key", mkSym (show $ pretty (AsBase58 _peerSignKey)) ] ] + + let peerStaff = mkList @C $ + pk <> + [ mkList [ mkSym "addr", mkSym (show $ pretty p) ] + , mkList [ mkSym "seen", mkSym ls ] + , mkList [ mkSym "burst", mkInt burst ] + , mkList [ mkSym "burst-max", mkInt (fromMaybe 0 buM) ] + , mkList [ mkSym "errors", mkInt (downFails + errors) ] + , mkList [ mkSym "downloaded", mkInt down ] + , mkList [ mkSym "miss", mkInt downMiss ] + ] + <> maybe1 rttMs mempty (\r -> [ mkList [ mkSym "rtt", mkSym r ] ]) + + pure $ mkList @C [mkSym "peer", peerStaff ] + + pure $ mkList r + diff --git a/hbs2-peer/lib/HBS2/Peer/RPC/API/Peer.hs b/hbs2-peer/lib/HBS2/Peer/RPC/API/Peer.hs index 99f3c6ff..50b317a0 100644 --- a/hbs2-peer/lib/HBS2/Peer/RPC/API/Peer.hs +++ b/hbs2-peer/lib/HBS2/Peer/RPC/API/Peer.hs @@ -38,6 +38,8 @@ data RpcPerformGC data RpcGetProbes +data RpcRunScript + type PeerAPI = '[ RpcPoke , RpcPing , RpcAnnounce @@ -55,6 +57,7 @@ type PeerAPI = '[ RpcPoke , RpcPerformGC , RpcPollList2 , RpcGetProbes + , RpcRunScript ] instance HasProtocol UNIX (ServiceProto PeerAPI UNIX) where @@ -119,6 +122,9 @@ type instance Output RpcPerformGC = () type instance Input RpcGetProbes = () type instance Output RpcGetProbes = [ProbeSnapshotElement] +type instance Input RpcRunScript = Text +type instance Output RpcRunScript = Text + data SetLogging = DebugOn Bool | TraceOn Bool diff --git a/miscellaneous/suckless-conf/lib/Data/Config/Suckless/Script/Internal.hs b/miscellaneous/suckless-conf/lib/Data/Config/Suckless/Script/Internal.hs index dea95478..30aad285 100644 --- a/miscellaneous/suckless-conf/lib/Data/Config/Suckless/Script/Internal.hs +++ b/miscellaneous/suckless-conf/lib/Data/Config/Suckless/Script/Internal.hs @@ -269,6 +269,12 @@ eatNil f = \case class IsContext c => MkInt c s where mkInt :: s -> Syntax c +class IsContext c => MkDouble c s where + mkDouble :: s -> Syntax c + +instance (IsContext c, RealFrac s) => MkDouble c s where + mkDouble v = Literal noContext $ LitScientific (realToFrac v) + instance (Integral i, IsContext c) => MkInt c i where mkInt n = Literal noContext $ LitInt (fromIntegral n)