This commit is contained in:
voidlizard 2024-11-03 12:10:02 +03:00
parent a360dfb7ec
commit dd61c4a09c
5 changed files with 188 additions and 65 deletions

View File

@ -60,6 +60,7 @@ data MessagingTCP =
, _tcpCookie :: Word32 , _tcpCookie :: Word32
, _tcpPeerConn :: TVar (HashMap (Peer L4Proto) Word64) , _tcpPeerConn :: TVar (HashMap (Peer L4Proto) Word64)
, _tcpPeerCookie :: TVar (HashMap Word32 Int) , _tcpPeerCookie :: TVar (HashMap Word32 Int)
, _tcpPeerSocket :: TVar (HashMap (Peer L4Proto) Socket)
, _tcpConnDemand :: TQueue (Peer L4Proto) , _tcpConnDemand :: TQueue (Peer L4Proto)
, _tcpReceived :: TBQueue (Peer L4Proto, ByteString) , _tcpReceived :: TBQueue (Peer L4Proto, ByteString)
, _tcpSent :: TVar (HashMap (Peer L4Proto) (TBQueue ByteString)) , _tcpSent :: TVar (HashMap (Peer L4Proto) (TBQueue ByteString))
@ -100,6 +101,7 @@ newMessagingTCP pa = liftIO do
<*> randomIO <*> randomIO
<*> newTVarIO mempty <*> newTVarIO mempty
<*> newTVarIO mempty <*> newTVarIO mempty
<*> newTVarIO mempty
<*> newTQueueIO <*> newTQueueIO
<*> newTBQueueIO outMessageQLen <*> newTBQueueIO outMessageQLen
<*> newTVarIO mempty <*> newTVarIO mempty
@ -234,6 +236,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
atomically do atomically do
pips <- readTVar _tcpPeerConn pips <- readTVar _tcpPeerConn
modifyTVar _tcpSent (HM.filterWithKey (\k _ -> HM.member k pips)) modifyTVar _tcpSent (HM.filterWithKey (\k _ -> HM.member k pips))
modifyTVar _tcpPeerSocket (HM.filterWithKey (\k _ -> HM.member k pips))
modifyTVar _tcpPeerCookie (HM.filter (>=1)) modifyTVar _tcpPeerCookie (HM.filter (>=1))
waitAnyCatchCancel [p1,p2,probes,sweep] waitAnyCatchCancel [p1,p2,probes,sweep]
@ -310,6 +313,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
atomically do atomically do
modifyTVar _tcpSent (HM.insert newP newOutQ) modifyTVar _tcpSent (HM.insert newP newOutQ)
modifyTVar _tcpPeerConn (HM.insert newP (connectionId myCookie cookie)) modifyTVar _tcpPeerConn (HM.insert newP (connectionId myCookie cookie))
modifyTVar _tcpPeerSocket (HM.insert newP so)
wr <- ContT $ withAsync $ forever do wr <- ContT $ withAsync $ forever do
bs <- atomically $ readTBQueue newOutQ bs <- atomically $ readTBQueue newOutQ
@ -330,6 +334,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
void $ ContT $ bracket none $ const do void $ ContT $ bracket none $ const do
debug $ "SHUTDOWN SOCKET AND SHIT" <+> pretty remote debug $ "SHUTDOWN SOCKET AND SHIT" <+> pretty remote
atomically $ modifyTVar _tcpServerThreadsCount pred atomically $ modifyTVar _tcpServerThreadsCount pred
atomically $ modifyTVar _tcpPeerSocket (HM.delete newP)
shutdown so ShutdownBoth shutdown so ShutdownBoth
cancel rd cancel rd
cancel wr cancel wr
@ -343,11 +348,9 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
runClient = flip runContT pure do runClient = flip runContT pure do
own <- toPeerAddr $ view tcpOwnPeer env
let (L4Address _ (IPAddrPort (i,_))) = own
let myCookie = view tcpCookie env let myCookie = view tcpCookie env
pause @'Seconds 10 pause @'Seconds 3.14
void $ ContT $ bracket none $ const $ do void $ ContT $ bracket none $ const $ do
what <- atomically $ stateTVar _tcpClientThreads (\x -> (HM.elems x, mempty)) what <- atomically $ stateTVar _tcpClientThreads (\x -> (HM.elems x, mempty))
@ -363,76 +366,78 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
atomically $ modifyTVar _tcpClientThreads (HM.filterWithKey (\k _ -> not (HS.member k done))) atomically $ modifyTVar _tcpClientThreads (HM.filterWithKey (\k _ -> not (HS.member k done)))
forever $ void $ runMaybeT do forever $ void $ runMaybeT do
-- client sockets -- client sockets
-- смотрим к кому надо -- смотрим к кому надо
who <- atomically $ readTQueue _tcpConnDemand who <- atomically $ readTQueue _tcpConnDemand
whoAddr <- toPeerAddr who whoAddr <- toPeerAddr who
debug $ "DEMAND:" <+> pretty who debug $ "DEMAND:" <+> pretty who
already <- atomically $ readTVar _tcpPeerConn <&> HM.member who already <- atomically $ readTVar _tcpPeerConn <&> HM.member who
when already do when already do
debug "SHIT? BUSYLOOP?" debug "SHIT? BUSYLOOP?"
mzero mzero
liftIO $ newClientThread env $ do liftIO $ newClientThread env $ do
let (L4Address _ (IPAddrPort (ip,port))) = whoAddr let (L4Address _ (IPAddrPort (ip,port))) = whoAddr
connect (show ip) (show port) $ \(so, remoteAddr) -> do connect (show ip) (show port) $ \(so, remoteAddr) -> do
let ?env = env let ?env = env
flip runContT pure $ callCC \exit -> do flip runContT pure $ callCC \exit -> do
debug $ "OPEN CLIENT CONNECTION" <+> pretty ip <+> pretty port <+> pretty remoteAddr debug $ "OPEN CLIENT CONNECTION" <+> pretty ip <+> pretty port <+> pretty remoteAddr
cookie <- handshake Client env so cookie <- handshake Client env so
let connId = connectionId cookie myCookie let connId = connectionId cookie myCookie
when (cookie == myCookie) $ do when (cookie == myCookie) $ do
debug $ "same peer, exit" <+> pretty remoteAddr debug $ "same peer, exit" <+> pretty remoteAddr
exit () exit ()
here <- useCookie cookie here <- useCookie cookie
-- TODO: handshake notification -- TODO: handshake notification
liftIO $ _tcpOnClientStarted whoAddr connId liftIO $ _tcpOnClientStarted whoAddr connId
when here do when here do
debug $ "CLIENT: ALREADY CONNECTED" <+> pretty cookie <+> pretty ip <+> pretty port debug $ "CLIENT: ALREADY CONNECTED" <+> pretty cookie <+> pretty ip <+> pretty port
exit () exit ()
atomically do
modifyTVar _tcpPeerCookie (HM.insertWith (+) cookie 1)
modifyTVar _tcpPeerConn (HM.insert who connId)
modifyTVar _tcpPeerSocket (HM.insert who so)
wr <- ContT $ withAsync $ forever do
bss <- atomically do
q' <- readTVar _tcpSent <&> HM.lookup who
maybe1 q' mempty $ \q -> do
s <- readTBQueue q
sx <- flushTBQueue q
pure (s:sx)
for_ bss $ \bs -> do
-- FIXME: check-this!
let pq = myCookie -- randomIO
let qids = bytestring32 pq
let size = bytestring32 (fromIntegral $ LBS.length bs)
let frame = LBS.fromStrict qids
<> LBS.fromStrict size -- req-size
<> bs -- payload
sendLazy so frame --(LBS.toStrict frame)
void $ ContT $ bracket none $ const $ do
debug "!!! TCP: BRACKET FIRED IN CLIENT !!!"
atomically do atomically do
modifyTVar _tcpPeerCookie (HM.insertWith (+) cookie 1) modifyTVar _tcpPeerConn (HM.delete who)
modifyTVar _tcpPeerConn (HM.insert who connId) modifyTVar _tcpPeerCookie (HM.update killCookie cookie)
modifyTVar _tcpPeerSocket (HM.delete who)
wr <- ContT $ withAsync $ forever do void $ ContT $ bracket none (const $ cancel wr)
bss <- atomically do
q' <- readTVar _tcpSent <&> HM.lookup who
maybe1 q' mempty $ \q -> do
s <- readTBQueue q
sx <- flushTBQueue q
pure (s:sx)
for_ bss $ \bs -> do readFrames so who _tcpReceived
-- FIXME: check-this!
let pq = myCookie -- randomIO
let qids = bytestring32 pq
let size = bytestring32 (fromIntegral $ LBS.length bs)
let frame = LBS.fromStrict qids
<> LBS.fromStrict size -- req-size
<> bs -- payload
sendLazy so frame --(LBS.toStrict frame)
void $ ContT $ bracket none (const $ cancel wr)
void $ ContT $ bracket none $ const $ do
debug "!!! TCP: BRACKET FIRED IN CLIENT !!!"
atomically do
modifyTVar _tcpPeerConn (HM.delete who)
modifyTVar _tcpPeerCookie (HM.update killCookie cookie)
readFrames so who _tcpReceived

View File

@ -105,8 +105,11 @@ import Data.List qualified as L
import Data.Map (Map) import Data.Map (Map)
import Data.Map qualified as Map import Data.Map qualified as Map
import Data.Maybe import Data.Maybe
import Data.Either
import Data.Set qualified as Set import Data.Set qualified as Set
import Data.Set (Set) import Data.Set (Set)
import Data.Text qualified as Text
import Data.Text.IO qualified as Text
import Data.Time.Clock.POSIX import Data.Time.Clock.POSIX
import Data.Time.Format import Data.Time.Format
import Lens.Micro.Platform as Lens import Lens.Micro.Platform as Lens
@ -264,6 +267,7 @@ runCLI = do
<> command "bypass" (info pByPass (progDesc "bypass")) <> command "bypass" (info pByPass (progDesc "bypass"))
<> command "gc" (info pRunGC (progDesc "run RAM garbage collector")) <> command "gc" (info pRunGC (progDesc "run RAM garbage collector"))
<> command "probes" (info pRunProbes (progDesc "show probes")) <> command "probes" (info pRunProbes (progDesc "show probes"))
<> command "do" (info pDoScript (progDesc "execute a command in peer context"))
<> command "version" (info pVersion (progDesc "show program version")) <> command "version" (info pVersion (progDesc "show program version"))
) )
@ -391,11 +395,9 @@ runCLI = do
pPeers = do pPeers = do
rpc <- pRpcCommon rpc <- pRpcCommon
pure $ withMyRPC @PeerAPI rpc $ \caller -> do pure $ withMyRPC @PeerAPI rpc $ \caller -> do
r <- callService @RpcPeers caller () r <- callRpcWaitRetry @RpcPeers (TimeoutSec 1) 2 caller ()
case r of >>= orThrowUser "rpc timeout"
Left e -> err (viaShow e) liftIO $ print $ vcat (fmap fmt r)
Right p -> do
print $ vcat (fmap fmt p)
where where
fmt (a, b) = pretty (AsBase58 a) <+> pretty b fmt (a, b) = pretty (AsBase58 a) <+> pretty b
@ -608,6 +610,20 @@ runCLI = do
liftIO $ print $ vcat (fmap pretty p) liftIO $ print $ vcat (fmap pretty p)
pDoScript = do
rpc <- pRpcCommon
argz <- many (strArgument (metavar "TERM" <> help "script terms"))
pure do
let s = unlines $ unwords <$> splitForms argz
withMyRPC @PeerAPI rpc $ \caller -> do
r <- callRpcWaitRetry @RpcRunScript (TimeoutSec 1) 3 caller (Text.pack s)
>>= orThrowUser "rpc timeout"
for_ (parseTop r & fromRight mempty) \sexy -> do
liftIO $ hPutDoc stdout (pretty sexy)
refP :: ReadM (PubKey 'Sign 'HBS2Basic) refP :: ReadM (PubKey 'Sign 'HBS2Basic)
refP = maybeReader fromStringMay refP = maybeReader fromStringMay

View File

@ -1,15 +1,105 @@
{-# OPTIONS_GHC -fno-warn-orphans #-}
{-# Language UndecidableInstances #-}
module RPC2 module RPC2
( module RPC2.Peer ( module RPC2.Peer
, module RPC2.RefLog , module RPC2.RefLog
, module RPC2.RefChan , module RPC2.RefChan
, module RPC2.LWWRef , module RPC2.LWWRef
, module RPC2.Mailbox , HandleMethod(..)
-- , module RPC2.Mailbox
) where ) where
import HBS2.Prelude.Plated
import HBS2.Net.Proto.Service
import HBS2.Net.Proto.Sessions
import HBS2.Base58
import HBS2.Data.Types.Peer
import HBS2.Actors.Peer
import HBS2.Peer.Proto.Peer
import HBS2.Clock
import HBS2.Net.Auth.Schema
import HBS2.Peer.RPC.Internal.Types
import HBS2.Peer.RPC.API.Peer
import Data.Config.Suckless.Script
import RPC2.Peer import RPC2.Peer
import RPC2.RefLog import RPC2.RefLog
import RPC2.RefChan import RPC2.RefChan
import RPC2.LWWRef import RPC2.LWWRef
import RPC2.Mailbox() import RPC2.Mailbox()
import PeerTypes
import PeerInfo
import UnliftIO
import Data.Text qualified as Text
import Data.Either
import Data.Maybe
import Numeric
instance (e ~ L4Proto, MonadUnliftIO m, HasRpcContext PeerAPI RPC2Context m) => HandleMethod m RpcRunScript where
handleMethod top = do
co <- getRpcContext @PeerAPI
let cli = parseTop top & fromRight mempty
r <- try @_ @SomeException (run (dict co) cli)
either (pure . Text.pack . show) (pure . Text.pack . show . pretty) r
where
dict RPC2Context{..} = makeDict @_ @m do
entry $ bindMatch "hey" $ const do
pure $ mkSym @C "hey"
entry $ bindMatch "peer-info" $ const do
now <- getTimeCoarse
liftIO $ withPeerM rpcPeerEnv do
pl <- getPeerLocator @e
pips <- knownPeers @e pl
npi <- newPeerInfo
r <- for pips $ \p -> do
pinfo@PeerInfo{..} <- fetch True npi (PeerInfoKey p) id
burst <- readTVarIO _peerBurst
buM <- readTVarIO _peerBurstMax
errors <- readTVarIO _peerErrorsPerSec
downFails <- readTVarIO _peerDownloadFail
downMiss <- readTVarIO _peerDownloadMiss
down <- readTVarIO _peerDownloadedBlk
rtt <- medianPeerRTT pinfo <&> fmap realToFrac
seen <- readTVarIO _peerLastWatched
let l = realToFrac (toNanoSecs $ now - seen) / 1e9
let rttMs = (/1e6) <$> rtt <&> (\x -> showGFloat (Just 2) x "") <&> (<> "ms")
let ls = showGFloat (Just 2) l "" <> "s"
mpde <- find (KnownPeerKey p) id
let pk = maybe1 mpde mempty $ \PeerData{..} -> do
[ mkList [ mkSym "key", mkSym (show $ pretty (AsBase58 _peerSignKey)) ] ]
let peerStaff = mkList @C $
pk <>
[ mkList [ mkSym "addr", mkSym (show $ pretty p) ]
, mkList [ mkSym "seen", mkSym ls ]
, mkList [ mkSym "burst", mkInt burst ]
, mkList [ mkSym "burst-max", mkInt (fromMaybe 0 buM) ]
, mkList [ mkSym "errors", mkInt (downFails + errors) ]
, mkList [ mkSym "downloaded", mkInt down ]
, mkList [ mkSym "miss", mkInt downMiss ]
]
<> maybe1 rttMs mempty (\r -> [ mkList [ mkSym "rtt", mkSym r ] ])
pure $ mkList @C [mkSym "peer", peerStaff ]
pure $ mkList r

View File

@ -38,6 +38,8 @@ data RpcPerformGC
data RpcGetProbes data RpcGetProbes
data RpcRunScript
type PeerAPI = '[ RpcPoke type PeerAPI = '[ RpcPoke
, RpcPing , RpcPing
, RpcAnnounce , RpcAnnounce
@ -55,6 +57,7 @@ type PeerAPI = '[ RpcPoke
, RpcPerformGC , RpcPerformGC
, RpcPollList2 , RpcPollList2
, RpcGetProbes , RpcGetProbes
, RpcRunScript
] ]
instance HasProtocol UNIX (ServiceProto PeerAPI UNIX) where instance HasProtocol UNIX (ServiceProto PeerAPI UNIX) where
@ -119,6 +122,9 @@ type instance Output RpcPerformGC = ()
type instance Input RpcGetProbes = () type instance Input RpcGetProbes = ()
type instance Output RpcGetProbes = [ProbeSnapshotElement] type instance Output RpcGetProbes = [ProbeSnapshotElement]
type instance Input RpcRunScript = Text
type instance Output RpcRunScript = Text
data SetLogging = data SetLogging =
DebugOn Bool DebugOn Bool
| TraceOn Bool | TraceOn Bool

View File

@ -269,6 +269,12 @@ eatNil f = \case
class IsContext c => MkInt c s where class IsContext c => MkInt c s where
mkInt :: s -> Syntax c mkInt :: s -> Syntax c
class IsContext c => MkDouble c s where
mkDouble :: s -> Syntax c
instance (IsContext c, RealFrac s) => MkDouble c s where
mkDouble v = Literal noContext $ LitScientific (realToFrac v)
instance (Integral i, IsContext c) => MkInt c i where instance (Integral i, IsContext c) => MkInt c i where
mkInt n = Literal noContext $ LitInt (fromIntegral n) mkInt n = Literal noContext $ LitInt (fromIntegral n)