This commit is contained in:
voidlizard 2024-11-03 12:10:02 +03:00
parent df6a6c94e0
commit 25cf9a2040
5 changed files with 188 additions and 65 deletions

View File

@ -60,6 +60,7 @@ data MessagingTCP =
, _tcpCookie :: Word32
, _tcpPeerConn :: TVar (HashMap (Peer L4Proto) Word64)
, _tcpPeerCookie :: TVar (HashMap Word32 Int)
, _tcpPeerSocket :: TVar (HashMap (Peer L4Proto) Socket)
, _tcpConnDemand :: TQueue (Peer L4Proto)
, _tcpReceived :: TBQueue (Peer L4Proto, ByteString)
, _tcpSent :: TVar (HashMap (Peer L4Proto) (TBQueue ByteString))
@ -100,6 +101,7 @@ newMessagingTCP pa = liftIO do
<*> randomIO
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> newTQueueIO
<*> newTBQueueIO outMessageQLen
<*> newTVarIO mempty
@ -234,6 +236,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
atomically do
pips <- readTVar _tcpPeerConn
modifyTVar _tcpSent (HM.filterWithKey (\k _ -> HM.member k pips))
modifyTVar _tcpPeerSocket (HM.filterWithKey (\k _ -> HM.member k pips))
modifyTVar _tcpPeerCookie (HM.filter (>=1))
waitAnyCatchCancel [p1,p2,probes,sweep]
@ -310,6 +313,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
atomically do
modifyTVar _tcpSent (HM.insert newP newOutQ)
modifyTVar _tcpPeerConn (HM.insert newP (connectionId myCookie cookie))
modifyTVar _tcpPeerSocket (HM.insert newP so)
wr <- ContT $ withAsync $ forever do
bs <- atomically $ readTBQueue newOutQ
@ -330,6 +334,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
void $ ContT $ bracket none $ const do
debug $ "SHUTDOWN SOCKET AND SHIT" <+> pretty remote
atomically $ modifyTVar _tcpServerThreadsCount pred
atomically $ modifyTVar _tcpPeerSocket (HM.delete newP)
shutdown so ShutdownBoth
cancel rd
cancel wr
@ -343,11 +348,9 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
runClient = flip runContT pure do
own <- toPeerAddr $ view tcpOwnPeer env
let (L4Address _ (IPAddrPort (i,_))) = own
let myCookie = view tcpCookie env
pause @'Seconds 10
pause @'Seconds 3.14
void $ ContT $ bracket none $ const $ do
what <- atomically $ stateTVar _tcpClientThreads (\x -> (HM.elems x, mempty))
@ -363,76 +366,78 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
atomically $ modifyTVar _tcpClientThreads (HM.filterWithKey (\k _ -> not (HS.member k done)))
forever $ void $ runMaybeT do
-- client sockets
-- client sockets
-- смотрим к кому надо
who <- atomically $ readTQueue _tcpConnDemand
whoAddr <- toPeerAddr who
-- смотрим к кому надо
who <- atomically $ readTQueue _tcpConnDemand
whoAddr <- toPeerAddr who
debug $ "DEMAND:" <+> pretty who
debug $ "DEMAND:" <+> pretty who
already <- atomically $ readTVar _tcpPeerConn <&> HM.member who
already <- atomically $ readTVar _tcpPeerConn <&> HM.member who
when already do
debug "SHIT? BUSYLOOP?"
mzero
when already do
debug "SHIT? BUSYLOOP?"
mzero
liftIO $ newClientThread env $ do
let (L4Address _ (IPAddrPort (ip,port))) = whoAddr
connect (show ip) (show port) $ \(so, remoteAddr) -> do
liftIO $ newClientThread env $ do
let (L4Address _ (IPAddrPort (ip,port))) = whoAddr
connect (show ip) (show port) $ \(so, remoteAddr) -> do
let ?env = env
let ?env = env
flip runContT pure $ callCC \exit -> do
flip runContT pure $ callCC \exit -> do
debug $ "OPEN CLIENT CONNECTION" <+> pretty ip <+> pretty port <+> pretty remoteAddr
cookie <- handshake Client env so
let connId = connectionId cookie myCookie
debug $ "OPEN CLIENT CONNECTION" <+> pretty ip <+> pretty port <+> pretty remoteAddr
cookie <- handshake Client env so
let connId = connectionId cookie myCookie
when (cookie == myCookie) $ do
debug $ "same peer, exit" <+> pretty remoteAddr
exit ()
when (cookie == myCookie) $ do
debug $ "same peer, exit" <+> pretty remoteAddr
exit ()
here <- useCookie cookie
here <- useCookie cookie
-- TODO: handshake notification
liftIO $ _tcpOnClientStarted whoAddr connId
-- TODO: handshake notification
liftIO $ _tcpOnClientStarted whoAddr connId
when here do
debug $ "CLIENT: ALREADY CONNECTED" <+> pretty cookie <+> pretty ip <+> pretty port
exit ()
when here do
debug $ "CLIENT: ALREADY CONNECTED" <+> pretty cookie <+> pretty ip <+> pretty port
exit ()
atomically do
modifyTVar _tcpPeerCookie (HM.insertWith (+) cookie 1)
modifyTVar _tcpPeerConn (HM.insert who connId)
modifyTVar _tcpPeerSocket (HM.insert who so)
wr <- ContT $ withAsync $ forever do
bss <- atomically do
q' <- readTVar _tcpSent <&> HM.lookup who
maybe1 q' mempty $ \q -> do
s <- readTBQueue q
sx <- flushTBQueue q
pure (s:sx)
for_ bss $ \bs -> do
-- FIXME: check-this!
let pq = myCookie -- randomIO
let qids = bytestring32 pq
let size = bytestring32 (fromIntegral $ LBS.length bs)
let frame = LBS.fromStrict qids
<> LBS.fromStrict size -- req-size
<> bs -- payload
sendLazy so frame --(LBS.toStrict frame)
void $ ContT $ bracket none $ const $ do
debug "!!! TCP: BRACKET FIRED IN CLIENT !!!"
atomically do
modifyTVar _tcpPeerCookie (HM.insertWith (+) cookie 1)
modifyTVar _tcpPeerConn (HM.insert who connId)
modifyTVar _tcpPeerConn (HM.delete who)
modifyTVar _tcpPeerCookie (HM.update killCookie cookie)
modifyTVar _tcpPeerSocket (HM.delete who)
wr <- ContT $ withAsync $ forever do
bss <- atomically do
q' <- readTVar _tcpSent <&> HM.lookup who
maybe1 q' mempty $ \q -> do
s <- readTBQueue q
sx <- flushTBQueue q
pure (s:sx)
void $ ContT $ bracket none (const $ cancel wr)
for_ bss $ \bs -> do
-- FIXME: check-this!
let pq = myCookie -- randomIO
let qids = bytestring32 pq
let size = bytestring32 (fromIntegral $ LBS.length bs)
let frame = LBS.fromStrict qids
<> LBS.fromStrict size -- req-size
<> bs -- payload
sendLazy so frame --(LBS.toStrict frame)
void $ ContT $ bracket none (const $ cancel wr)
void $ ContT $ bracket none $ const $ do
debug "!!! TCP: BRACKET FIRED IN CLIENT !!!"
atomically do
modifyTVar _tcpPeerConn (HM.delete who)
modifyTVar _tcpPeerCookie (HM.update killCookie cookie)
readFrames so who _tcpReceived
readFrames so who _tcpReceived

View File

@ -105,8 +105,11 @@ import Data.List qualified as L
import Data.Map (Map)
import Data.Map qualified as Map
import Data.Maybe
import Data.Either
import Data.Set qualified as Set
import Data.Set (Set)
import Data.Text qualified as Text
import Data.Text.IO qualified as Text
import Data.Time.Clock.POSIX
import Data.Time.Format
import Lens.Micro.Platform as Lens
@ -264,6 +267,7 @@ runCLI = do
<> command "bypass" (info pByPass (progDesc "bypass"))
<> command "gc" (info pRunGC (progDesc "run RAM garbage collector"))
<> command "probes" (info pRunProbes (progDesc "show probes"))
<> command "do" (info pDoScript (progDesc "execute a command in peer context"))
<> command "version" (info pVersion (progDesc "show program version"))
)
@ -391,11 +395,9 @@ runCLI = do
pPeers = do
rpc <- pRpcCommon
pure $ withMyRPC @PeerAPI rpc $ \caller -> do
r <- callService @RpcPeers caller ()
case r of
Left e -> err (viaShow e)
Right p -> do
print $ vcat (fmap fmt p)
r <- callRpcWaitRetry @RpcPeers (TimeoutSec 1) 2 caller ()
>>= orThrowUser "rpc timeout"
liftIO $ print $ vcat (fmap fmt r)
where
fmt (a, b) = pretty (AsBase58 a) <+> pretty b
@ -608,6 +610,20 @@ runCLI = do
liftIO $ print $ vcat (fmap pretty p)
pDoScript = do
rpc <- pRpcCommon
argz <- many (strArgument (metavar "TERM" <> help "script terms"))
pure do
let s = unlines $ unwords <$> splitForms argz
withMyRPC @PeerAPI rpc $ \caller -> do
r <- callRpcWaitRetry @RpcRunScript (TimeoutSec 1) 3 caller (Text.pack s)
>>= orThrowUser "rpc timeout"
for_ (parseTop r & fromRight mempty) \sexy -> do
liftIO $ hPutDoc stdout (pretty sexy)
refP :: ReadM (PubKey 'Sign 'HBS2Basic)
refP = maybeReader fromStringMay

View File

@ -1,15 +1,105 @@
{-# OPTIONS_GHC -fno-warn-orphans #-}
{-# Language UndecidableInstances #-}
module RPC2
( module RPC2.Peer
, module RPC2.RefLog
, module RPC2.RefChan
, module RPC2.LWWRef
, module RPC2.Mailbox
, HandleMethod(..)
-- , module RPC2.Mailbox
) where
import HBS2.Prelude.Plated
import HBS2.Net.Proto.Service
import HBS2.Net.Proto.Sessions
import HBS2.Base58
import HBS2.Data.Types.Peer
import HBS2.Actors.Peer
import HBS2.Peer.Proto.Peer
import HBS2.Clock
import HBS2.Net.Auth.Schema
import HBS2.Peer.RPC.Internal.Types
import HBS2.Peer.RPC.API.Peer
import Data.Config.Suckless.Script
import RPC2.Peer
import RPC2.RefLog
import RPC2.RefChan
import RPC2.LWWRef
import RPC2.Mailbox()
import PeerTypes
import PeerInfo
import UnliftIO
import Data.Text qualified as Text
import Data.Either
import Data.Maybe
import Numeric
instance (e ~ L4Proto, MonadUnliftIO m, HasRpcContext PeerAPI RPC2Context m) => HandleMethod m RpcRunScript where
handleMethod top = do
co <- getRpcContext @PeerAPI
let cli = parseTop top & fromRight mempty
r <- try @_ @SomeException (run (dict co) cli)
either (pure . Text.pack . show) (pure . Text.pack . show . pretty) r
where
dict RPC2Context{..} = makeDict @_ @m do
entry $ bindMatch "hey" $ const do
pure $ mkSym @C "hey"
entry $ bindMatch "peer-info" $ const do
now <- getTimeCoarse
liftIO $ withPeerM rpcPeerEnv do
pl <- getPeerLocator @e
pips <- knownPeers @e pl
npi <- newPeerInfo
r <- for pips $ \p -> do
pinfo@PeerInfo{..} <- fetch True npi (PeerInfoKey p) id
burst <- readTVarIO _peerBurst
buM <- readTVarIO _peerBurstMax
errors <- readTVarIO _peerErrorsPerSec
downFails <- readTVarIO _peerDownloadFail
downMiss <- readTVarIO _peerDownloadMiss
down <- readTVarIO _peerDownloadedBlk
rtt <- medianPeerRTT pinfo <&> fmap realToFrac
seen <- readTVarIO _peerLastWatched
let l = realToFrac (toNanoSecs $ now - seen) / 1e9
let rttMs = (/1e6) <$> rtt <&> (\x -> showGFloat (Just 2) x "") <&> (<> "ms")
let ls = showGFloat (Just 2) l "" <> "s"
mpde <- find (KnownPeerKey p) id
let pk = maybe1 mpde mempty $ \PeerData{..} -> do
[ mkList [ mkSym "key", mkSym (show $ pretty (AsBase58 _peerSignKey)) ] ]
let peerStaff = mkList @C $
pk <>
[ mkList [ mkSym "addr", mkSym (show $ pretty p) ]
, mkList [ mkSym "seen", mkSym ls ]
, mkList [ mkSym "burst", mkInt burst ]
, mkList [ mkSym "burst-max", mkInt (fromMaybe 0 buM) ]
, mkList [ mkSym "errors", mkInt (downFails + errors) ]
, mkList [ mkSym "downloaded", mkInt down ]
, mkList [ mkSym "miss", mkInt downMiss ]
]
<> maybe1 rttMs mempty (\r -> [ mkList [ mkSym "rtt", mkSym r ] ])
pure $ mkList @C [mkSym "peer", peerStaff ]
pure $ mkList r

View File

@ -38,6 +38,8 @@ data RpcPerformGC
data RpcGetProbes
data RpcRunScript
type PeerAPI = '[ RpcPoke
, RpcPing
, RpcAnnounce
@ -55,6 +57,7 @@ type PeerAPI = '[ RpcPoke
, RpcPerformGC
, RpcPollList2
, RpcGetProbes
, RpcRunScript
]
instance HasProtocol UNIX (ServiceProto PeerAPI UNIX) where
@ -119,6 +122,9 @@ type instance Output RpcPerformGC = ()
type instance Input RpcGetProbes = ()
type instance Output RpcGetProbes = [ProbeSnapshotElement]
type instance Input RpcRunScript = Text
type instance Output RpcRunScript = Text
data SetLogging =
DebugOn Bool
| TraceOn Bool

View File

@ -269,6 +269,12 @@ eatNil f = \case
class IsContext c => MkInt c s where
mkInt :: s -> Syntax c
class IsContext c => MkDouble c s where
mkDouble :: s -> Syntax c
instance (IsContext c, RealFrac s) => MkDouble c s where
mkDouble v = Literal noContext $ LitScientific (realToFrac v)
instance (Integral i, IsContext c) => MkInt c i where
mkInt n = Literal noContext $ LitInt (fromIntegral n)