mirror of https://github.com/voidlizard/hbs2
wip
This commit is contained in:
parent
369d2c013c
commit
5bffbc1d03
|
@ -60,6 +60,7 @@ data MessagingTCP =
|
|||
, _tcpCookie :: Word32
|
||||
, _tcpPeerConn :: TVar (HashMap (Peer L4Proto) Word64)
|
||||
, _tcpPeerCookie :: TVar (HashMap Word32 Int)
|
||||
, _tcpPeerSocket :: TVar (HashMap (Peer L4Proto) Socket)
|
||||
, _tcpConnDemand :: TQueue (Peer L4Proto)
|
||||
, _tcpReceived :: TBQueue (Peer L4Proto, ByteString)
|
||||
, _tcpSent :: TVar (HashMap (Peer L4Proto) (TBQueue ByteString))
|
||||
|
@ -100,6 +101,7 @@ newMessagingTCP pa = liftIO do
|
|||
<*> randomIO
|
||||
<*> newTVarIO mempty
|
||||
<*> newTVarIO mempty
|
||||
<*> newTVarIO mempty
|
||||
<*> newTQueueIO
|
||||
<*> newTBQueueIO outMessageQLen
|
||||
<*> newTVarIO mempty
|
||||
|
@ -234,6 +236,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
|
|||
atomically do
|
||||
pips <- readTVar _tcpPeerConn
|
||||
modifyTVar _tcpSent (HM.filterWithKey (\k _ -> HM.member k pips))
|
||||
modifyTVar _tcpPeerSocket (HM.filterWithKey (\k _ -> HM.member k pips))
|
||||
modifyTVar _tcpPeerCookie (HM.filter (>=1))
|
||||
|
||||
waitAnyCatchCancel [p1,p2,probes,sweep]
|
||||
|
@ -310,6 +313,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
|
|||
atomically do
|
||||
modifyTVar _tcpSent (HM.insert newP newOutQ)
|
||||
modifyTVar _tcpPeerConn (HM.insert newP (connectionId myCookie cookie))
|
||||
modifyTVar _tcpPeerSocket (HM.insert newP so)
|
||||
|
||||
wr <- ContT $ withAsync $ forever do
|
||||
bs <- atomically $ readTBQueue newOutQ
|
||||
|
@ -330,6 +334,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
|
|||
void $ ContT $ bracket none $ const do
|
||||
debug $ "SHUTDOWN SOCKET AND SHIT" <+> pretty remote
|
||||
atomically $ modifyTVar _tcpServerThreadsCount pred
|
||||
atomically $ modifyTVar _tcpPeerSocket (HM.delete newP)
|
||||
shutdown so ShutdownBoth
|
||||
cancel rd
|
||||
cancel wr
|
||||
|
@ -343,11 +348,9 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
|
|||
|
||||
runClient = flip runContT pure do
|
||||
|
||||
own <- toPeerAddr $ view tcpOwnPeer env
|
||||
let (L4Address _ (IPAddrPort (i,_))) = own
|
||||
let myCookie = view tcpCookie env
|
||||
|
||||
pause @'Seconds 10
|
||||
pause @'Seconds 3.14
|
||||
|
||||
void $ ContT $ bracket none $ const $ do
|
||||
what <- atomically $ stateTVar _tcpClientThreads (\x -> (HM.elems x, mempty))
|
||||
|
@ -363,76 +366,78 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
|
|||
atomically $ modifyTVar _tcpClientThreads (HM.filterWithKey (\k _ -> not (HS.member k done)))
|
||||
|
||||
forever $ void $ runMaybeT do
|
||||
-- client sockets
|
||||
-- client sockets
|
||||
|
||||
-- смотрим к кому надо
|
||||
who <- atomically $ readTQueue _tcpConnDemand
|
||||
whoAddr <- toPeerAddr who
|
||||
-- смотрим к кому надо
|
||||
who <- atomically $ readTQueue _tcpConnDemand
|
||||
whoAddr <- toPeerAddr who
|
||||
|
||||
debug $ "DEMAND:" <+> pretty who
|
||||
debug $ "DEMAND:" <+> pretty who
|
||||
|
||||
already <- atomically $ readTVar _tcpPeerConn <&> HM.member who
|
||||
already <- atomically $ readTVar _tcpPeerConn <&> HM.member who
|
||||
|
||||
when already do
|
||||
debug "SHIT? BUSYLOOP?"
|
||||
mzero
|
||||
when already do
|
||||
debug "SHIT? BUSYLOOP?"
|
||||
mzero
|
||||
|
||||
liftIO $ newClientThread env $ do
|
||||
let (L4Address _ (IPAddrPort (ip,port))) = whoAddr
|
||||
connect (show ip) (show port) $ \(so, remoteAddr) -> do
|
||||
liftIO $ newClientThread env $ do
|
||||
let (L4Address _ (IPAddrPort (ip,port))) = whoAddr
|
||||
connect (show ip) (show port) $ \(so, remoteAddr) -> do
|
||||
|
||||
let ?env = env
|
||||
let ?env = env
|
||||
|
||||
flip runContT pure $ callCC \exit -> do
|
||||
flip runContT pure $ callCC \exit -> do
|
||||
|
||||
debug $ "OPEN CLIENT CONNECTION" <+> pretty ip <+> pretty port <+> pretty remoteAddr
|
||||
cookie <- handshake Client env so
|
||||
let connId = connectionId cookie myCookie
|
||||
debug $ "OPEN CLIENT CONNECTION" <+> pretty ip <+> pretty port <+> pretty remoteAddr
|
||||
cookie <- handshake Client env so
|
||||
let connId = connectionId cookie myCookie
|
||||
|
||||
when (cookie == myCookie) $ do
|
||||
debug $ "same peer, exit" <+> pretty remoteAddr
|
||||
exit ()
|
||||
when (cookie == myCookie) $ do
|
||||
debug $ "same peer, exit" <+> pretty remoteAddr
|
||||
exit ()
|
||||
|
||||
here <- useCookie cookie
|
||||
here <- useCookie cookie
|
||||
|
||||
-- TODO: handshake notification
|
||||
liftIO $ _tcpOnClientStarted whoAddr connId
|
||||
-- TODO: handshake notification
|
||||
liftIO $ _tcpOnClientStarted whoAddr connId
|
||||
|
||||
when here do
|
||||
debug $ "CLIENT: ALREADY CONNECTED" <+> pretty cookie <+> pretty ip <+> pretty port
|
||||
exit ()
|
||||
when here do
|
||||
debug $ "CLIENT: ALREADY CONNECTED" <+> pretty cookie <+> pretty ip <+> pretty port
|
||||
exit ()
|
||||
|
||||
atomically do
|
||||
modifyTVar _tcpPeerCookie (HM.insertWith (+) cookie 1)
|
||||
modifyTVar _tcpPeerConn (HM.insert who connId)
|
||||
modifyTVar _tcpPeerSocket (HM.insert who so)
|
||||
|
||||
wr <- ContT $ withAsync $ forever do
|
||||
bss <- atomically do
|
||||
q' <- readTVar _tcpSent <&> HM.lookup who
|
||||
maybe1 q' mempty $ \q -> do
|
||||
s <- readTBQueue q
|
||||
sx <- flushTBQueue q
|
||||
pure (s:sx)
|
||||
|
||||
for_ bss $ \bs -> do
|
||||
-- FIXME: check-this!
|
||||
let pq = myCookie -- randomIO
|
||||
let qids = bytestring32 pq
|
||||
let size = bytestring32 (fromIntegral $ LBS.length bs)
|
||||
|
||||
let frame = LBS.fromStrict qids
|
||||
<> LBS.fromStrict size -- req-size
|
||||
<> bs -- payload
|
||||
|
||||
sendLazy so frame --(LBS.toStrict frame)
|
||||
|
||||
void $ ContT $ bracket none $ const $ do
|
||||
debug "!!! TCP: BRACKET FIRED IN CLIENT !!!"
|
||||
atomically do
|
||||
modifyTVar _tcpPeerCookie (HM.insertWith (+) cookie 1)
|
||||
modifyTVar _tcpPeerConn (HM.insert who connId)
|
||||
modifyTVar _tcpPeerConn (HM.delete who)
|
||||
modifyTVar _tcpPeerCookie (HM.update killCookie cookie)
|
||||
modifyTVar _tcpPeerSocket (HM.delete who)
|
||||
|
||||
wr <- ContT $ withAsync $ forever do
|
||||
bss <- atomically do
|
||||
q' <- readTVar _tcpSent <&> HM.lookup who
|
||||
maybe1 q' mempty $ \q -> do
|
||||
s <- readTBQueue q
|
||||
sx <- flushTBQueue q
|
||||
pure (s:sx)
|
||||
void $ ContT $ bracket none (const $ cancel wr)
|
||||
|
||||
for_ bss $ \bs -> do
|
||||
-- FIXME: check-this!
|
||||
let pq = myCookie -- randomIO
|
||||
let qids = bytestring32 pq
|
||||
let size = bytestring32 (fromIntegral $ LBS.length bs)
|
||||
|
||||
let frame = LBS.fromStrict qids
|
||||
<> LBS.fromStrict size -- req-size
|
||||
<> bs -- payload
|
||||
|
||||
sendLazy so frame --(LBS.toStrict frame)
|
||||
|
||||
void $ ContT $ bracket none (const $ cancel wr)
|
||||
|
||||
void $ ContT $ bracket none $ const $ do
|
||||
debug "!!! TCP: BRACKET FIRED IN CLIENT !!!"
|
||||
atomically do
|
||||
modifyTVar _tcpPeerConn (HM.delete who)
|
||||
modifyTVar _tcpPeerCookie (HM.update killCookie cookie)
|
||||
|
||||
readFrames so who _tcpReceived
|
||||
readFrames so who _tcpReceived
|
||||
|
||||
|
|
|
@ -105,8 +105,11 @@ import Data.List qualified as L
|
|||
import Data.Map (Map)
|
||||
import Data.Map qualified as Map
|
||||
import Data.Maybe
|
||||
import Data.Either
|
||||
import Data.Set qualified as Set
|
||||
import Data.Set (Set)
|
||||
import Data.Text qualified as Text
|
||||
import Data.Text.IO qualified as Text
|
||||
import Data.Time.Clock.POSIX
|
||||
import Data.Time.Format
|
||||
import Lens.Micro.Platform as Lens
|
||||
|
@ -264,6 +267,7 @@ runCLI = do
|
|||
<> command "bypass" (info pByPass (progDesc "bypass"))
|
||||
<> command "gc" (info pRunGC (progDesc "run RAM garbage collector"))
|
||||
<> command "probes" (info pRunProbes (progDesc "show probes"))
|
||||
<> command "do" (info pDoScript (progDesc "execute a command in peer context"))
|
||||
<> command "version" (info pVersion (progDesc "show program version"))
|
||||
)
|
||||
|
||||
|
@ -391,11 +395,9 @@ runCLI = do
|
|||
pPeers = do
|
||||
rpc <- pRpcCommon
|
||||
pure $ withMyRPC @PeerAPI rpc $ \caller -> do
|
||||
r <- callService @RpcPeers caller ()
|
||||
case r of
|
||||
Left e -> err (viaShow e)
|
||||
Right p -> do
|
||||
print $ vcat (fmap fmt p)
|
||||
r <- callRpcWaitRetry @RpcPeers (TimeoutSec 1) 2 caller ()
|
||||
>>= orThrowUser "rpc timeout"
|
||||
liftIO $ print $ vcat (fmap fmt r)
|
||||
where
|
||||
fmt (a, b) = pretty (AsBase58 a) <+> pretty b
|
||||
|
||||
|
@ -608,6 +610,20 @@ runCLI = do
|
|||
|
||||
liftIO $ print $ vcat (fmap pretty p)
|
||||
|
||||
pDoScript = do
|
||||
rpc <- pRpcCommon
|
||||
argz <- many (strArgument (metavar "TERM" <> help "script terms"))
|
||||
pure do
|
||||
let s = unlines $ unwords <$> splitForms argz
|
||||
|
||||
withMyRPC @PeerAPI rpc $ \caller -> do
|
||||
|
||||
r <- callRpcWaitRetry @RpcRunScript (TimeoutSec 1) 3 caller (Text.pack s)
|
||||
>>= orThrowUser "rpc timeout"
|
||||
|
||||
for_ (parseTop r & fromRight mempty) \sexy -> do
|
||||
liftIO $ hPutDoc stdout (pretty sexy)
|
||||
|
||||
refP :: ReadM (PubKey 'Sign 'HBS2Basic)
|
||||
refP = maybeReader fromStringMay
|
||||
|
||||
|
|
|
@ -1,15 +1,105 @@
|
|||
{-# OPTIONS_GHC -fno-warn-orphans #-}
|
||||
{-# Language UndecidableInstances #-}
|
||||
module RPC2
|
||||
( module RPC2.Peer
|
||||
, module RPC2.RefLog
|
||||
, module RPC2.RefChan
|
||||
, module RPC2.LWWRef
|
||||
, module RPC2.Mailbox
|
||||
, HandleMethod(..)
|
||||
-- , module RPC2.Mailbox
|
||||
) where
|
||||
|
||||
|
||||
import HBS2.Prelude.Plated
|
||||
import HBS2.Net.Proto.Service
|
||||
import HBS2.Net.Proto.Sessions
|
||||
|
||||
import HBS2.Base58
|
||||
import HBS2.Data.Types.Peer
|
||||
import HBS2.Actors.Peer
|
||||
import HBS2.Peer.Proto.Peer
|
||||
import HBS2.Clock
|
||||
import HBS2.Net.Auth.Schema
|
||||
|
||||
import HBS2.Peer.RPC.Internal.Types
|
||||
import HBS2.Peer.RPC.API.Peer
|
||||
|
||||
import Data.Config.Suckless.Script
|
||||
|
||||
import RPC2.Peer
|
||||
import RPC2.RefLog
|
||||
import RPC2.RefChan
|
||||
import RPC2.LWWRef
|
||||
import RPC2.Mailbox()
|
||||
|
||||
import PeerTypes
|
||||
import PeerInfo
|
||||
|
||||
import UnliftIO
|
||||
|
||||
import Data.Text qualified as Text
|
||||
import Data.Either
|
||||
import Data.Maybe
|
||||
import Numeric
|
||||
|
||||
instance (e ~ L4Proto, MonadUnliftIO m, HasRpcContext PeerAPI RPC2Context m) => HandleMethod m RpcRunScript where
|
||||
handleMethod top = do
|
||||
|
||||
co <- getRpcContext @PeerAPI
|
||||
|
||||
let cli = parseTop top & fromRight mempty
|
||||
|
||||
r <- try @_ @SomeException (run (dict co) cli)
|
||||
|
||||
either (pure . Text.pack . show) (pure . Text.pack . show . pretty) r
|
||||
|
||||
where
|
||||
|
||||
dict RPC2Context{..} = makeDict @_ @m do
|
||||
entry $ bindMatch "hey" $ const do
|
||||
pure $ mkSym @C "hey"
|
||||
|
||||
entry $ bindMatch "peer-info" $ const do
|
||||
|
||||
now <- getTimeCoarse
|
||||
|
||||
liftIO $ withPeerM rpcPeerEnv do
|
||||
pl <- getPeerLocator @e
|
||||
pips <- knownPeers @e pl
|
||||
npi <- newPeerInfo
|
||||
|
||||
r <- for pips $ \p -> do
|
||||
pinfo@PeerInfo{..} <- fetch True npi (PeerInfoKey p) id
|
||||
burst <- readTVarIO _peerBurst
|
||||
buM <- readTVarIO _peerBurstMax
|
||||
errors <- readTVarIO _peerErrorsPerSec
|
||||
downFails <- readTVarIO _peerDownloadFail
|
||||
downMiss <- readTVarIO _peerDownloadMiss
|
||||
down <- readTVarIO _peerDownloadedBlk
|
||||
rtt <- medianPeerRTT pinfo <&> fmap realToFrac
|
||||
seen <- readTVarIO _peerLastWatched
|
||||
let l = realToFrac (toNanoSecs $ now - seen) / 1e9
|
||||
|
||||
let rttMs = (/1e6) <$> rtt <&> (\x -> showGFloat (Just 2) x "") <&> (<> "ms")
|
||||
let ls = showGFloat (Just 2) l "" <> "s"
|
||||
|
||||
mpde <- find (KnownPeerKey p) id
|
||||
let pk = maybe1 mpde mempty $ \PeerData{..} -> do
|
||||
[ mkList [ mkSym "key", mkSym (show $ pretty (AsBase58 _peerSignKey)) ] ]
|
||||
|
||||
let peerStaff = mkList @C $
|
||||
pk <>
|
||||
[ mkList [ mkSym "addr", mkSym (show $ pretty p) ]
|
||||
, mkList [ mkSym "seen", mkSym ls ]
|
||||
, mkList [ mkSym "burst", mkInt burst ]
|
||||
, mkList [ mkSym "burst-max", mkInt (fromMaybe 0 buM) ]
|
||||
, mkList [ mkSym "errors", mkInt (downFails + errors) ]
|
||||
, mkList [ mkSym "downloaded", mkInt down ]
|
||||
, mkList [ mkSym "miss", mkInt downMiss ]
|
||||
]
|
||||
<> maybe1 rttMs mempty (\r -> [ mkList [ mkSym "rtt", mkSym r ] ])
|
||||
|
||||
pure $ mkList @C [mkSym "peer", peerStaff ]
|
||||
|
||||
pure $ mkList r
|
||||
|
||||
|
|
|
@ -38,6 +38,8 @@ data RpcPerformGC
|
|||
|
||||
data RpcGetProbes
|
||||
|
||||
data RpcRunScript
|
||||
|
||||
type PeerAPI = '[ RpcPoke
|
||||
, RpcPing
|
||||
, RpcAnnounce
|
||||
|
@ -55,6 +57,7 @@ type PeerAPI = '[ RpcPoke
|
|||
, RpcPerformGC
|
||||
, RpcPollList2
|
||||
, RpcGetProbes
|
||||
, RpcRunScript
|
||||
]
|
||||
|
||||
instance HasProtocol UNIX (ServiceProto PeerAPI UNIX) where
|
||||
|
@ -119,6 +122,9 @@ type instance Output RpcPerformGC = ()
|
|||
type instance Input RpcGetProbes = ()
|
||||
type instance Output RpcGetProbes = [ProbeSnapshotElement]
|
||||
|
||||
type instance Input RpcRunScript = Text
|
||||
type instance Output RpcRunScript = Text
|
||||
|
||||
data SetLogging =
|
||||
DebugOn Bool
|
||||
| TraceOn Bool
|
||||
|
|
|
@ -269,6 +269,12 @@ eatNil f = \case
|
|||
class IsContext c => MkInt c s where
|
||||
mkInt :: s -> Syntax c
|
||||
|
||||
class IsContext c => MkDouble c s where
|
||||
mkDouble :: s -> Syntax c
|
||||
|
||||
instance (IsContext c, RealFrac s) => MkDouble c s where
|
||||
mkDouble v = Literal noContext $ LitScientific (realToFrac v)
|
||||
|
||||
instance (Integral i, IsContext c) => MkInt c i where
|
||||
mkInt n = Literal noContext $ LitInt (fromIntegral n)
|
||||
|
||||
|
|
Loading…
Reference in New Issue