From 4424466c84b9a78cd582ada051e0616ab7693505 Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Thu, 13 Apr 2023 19:02:25 +0300 Subject: [PATCH] tcp --- docs/todo/tcp-injecting-plan.txt | 27 ++ hbs2-core/hbs2-core.cabal | 4 + hbs2-core/lib/HBS2/Defaults.hs | 16 +- hbs2-core/lib/HBS2/Net/IP/Addr.hs | 26 +- hbs2-core/lib/HBS2/Net/Messaging/TCP.hs | 445 +++++++++++++++++++ hbs2-core/lib/HBS2/Net/Messaging/UDP.hs | 53 +-- hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs | 2 +- hbs2-core/lib/HBS2/Net/Proto/Definition.hs | 95 ++-- hbs2-core/lib/HBS2/Net/Proto/Peer.hs | 2 + hbs2-core/lib/HBS2/Net/Proto/PeerExchange.hs | 91 ++-- hbs2-core/lib/HBS2/Net/Proto/Types.hs | 119 ++++- hbs2-git/lib/HBS2Git/App.hs | 1 - hbs2-git/lib/HBS2Git/Types.hs | 4 +- hbs2-peer/app/BlockDownload.hs | 123 +++-- hbs2-peer/app/BlockHttpDownload.hs | 8 +- hbs2-peer/app/Bootstrap.hs | 63 +-- hbs2-peer/app/Brains.hs | 6 +- hbs2-peer/app/PeerInfo.hs | 7 +- hbs2-peer/app/PeerMain.hs | 134 ++++-- hbs2-peer/app/PeerTypes.hs | 11 +- hbs2-peer/app/ProxyMessaging.hs | 88 ++++ hbs2-peer/app/RPC.hs | 20 +- hbs2-peer/hbs2-peer.cabal | 4 +- hbs2-tests/hbs2-tests.cabal | 99 +++++ hbs2-tests/test/TestTCP.hs | 248 +++++++++++ hbs2-tests/test/TestTCPNet.hs | 179 ++++++++ hbs2-tests/test/TestUDP.hs | 17 +- hbs2/Main.hs | 1 - nix/peer/flake.lock | 32 +- nix/peer/flake.nix | 6 +- 30 files changed, 1615 insertions(+), 316 deletions(-) create mode 100644 docs/todo/tcp-injecting-plan.txt create mode 100644 hbs2-core/lib/HBS2/Net/Messaging/TCP.hs create mode 100644 hbs2-peer/app/ProxyMessaging.hs create mode 100644 hbs2-tests/test/TestTCP.hs create mode 100644 hbs2-tests/test/TestTCPNet.hs diff --git a/docs/todo/tcp-injecting-plan.txt b/docs/todo/tcp-injecting-plan.txt new file mode 100644 index 00000000..0511f3b0 --- /dev/null +++ b/docs/todo/tcp-injecting-plan.txt @@ -0,0 +1,27 @@ + +TODO: tcp-run-proto + +TODO: tcp-add-ping + +TODO: tcp-check-ping-work + +TODO: tcp-messaging-filter-addr-on-udp + +TODO: tcp-extend-pex + +TODO: tcp-drop-deffered-after-timeout + +TODO: tcp-test-different-hosts + +TODO: tcp-test-nat + +TODO: tcp-test-vpn + +TODO: tcp-only-client-connections-to-pex + В pex возвращать только те соединения, к которым + мы сами смогли сделать, то есть как клиенты. + Это немного не прикольно, так как если мы не успели + соединиться клиентом, до того, как открыли серверное + соединение - мы не узнаем. Так что надо вести где-то + ( brains?) статистику, что смогли соединиться, как + клиент. diff --git a/hbs2-core/hbs2-core.cabal b/hbs2-core/hbs2-core.cabal index 94528e3d..e2e691b7 100644 --- a/hbs2-core/hbs2-core.cabal +++ b/hbs2-core/hbs2-core.cabal @@ -82,6 +82,7 @@ library , HBS2.Net.Messaging , HBS2.Net.Messaging.Fake , HBS2.Net.Messaging.UDP + , HBS2.Net.Messaging.TCP , HBS2.Net.PeerLocator , HBS2.Net.PeerLocator.Static , HBS2.Net.Proto @@ -133,6 +134,8 @@ library , murmur-hash , network , network-multicast + , network-simple + , network-byte-order , prettyprinter , random , random-shuffle @@ -143,6 +146,7 @@ library , split , stm , stm-chans + , streaming , suckless-conf , temporary , text diff --git a/hbs2-core/lib/HBS2/Defaults.hs b/hbs2-core/lib/HBS2/Defaults.hs index ff5dd470..bdcc6d9a 100644 --- a/hbs2-core/lib/HBS2/Defaults.hs +++ b/hbs2-core/lib/HBS2/Defaults.hs @@ -12,15 +12,15 @@ defMaxDatagramRPC = 4096 defMessageQueueSize :: Integral a => a defMessageQueueSize = 65536*10 -defBurst :: Integral a => a -defBurst = 4 - defBurstMax :: Integral a => a -defBurstMax = 64 +defBurstMax = 128 + +defBurst :: Integral a => a +defBurst = defBurstMax `div` 2 -- defChunkSize :: Integer defChunkSize :: Integral a => a -defChunkSize = 1400 +defChunkSize = 1420 -- defChunkSize = 480 defBlockSize :: Integer @@ -70,18 +70,18 @@ defBlockWipTimeout :: TimeSpec defBlockWipTimeout = defCookieTimeout defBlockInfoTimeout :: Timeout 'Seconds -defBlockInfoTimeout = 5 +defBlockInfoTimeout = 20 defBlockInfoTimeoutSpec :: TimeSpec defBlockInfoTimeoutSpec = toTimeSpec defBlockInfoTimeout -- how much time wait for block from peer? defBlockWaitMax :: Timeout 'Seconds -defBlockWaitMax = 20 :: Timeout 'Seconds +defBlockWaitMax = 60 :: Timeout 'Seconds -- how much time wait for block from peer? defChunkWaitMax :: Timeout 'Seconds -defChunkWaitMax = 10 :: Timeout 'Seconds +defChunkWaitMax = 30 :: Timeout 'Seconds defSweepTimeout :: Timeout 'Seconds defSweepTimeout = 60 -- FIXME: only for debug! diff --git a/hbs2-core/lib/HBS2/Net/IP/Addr.hs b/hbs2-core/lib/HBS2/Net/IP/Addr.hs index 8275cf62..3a389d26 100644 --- a/hbs2-core/lib/HBS2/Net/IP/Addr.hs +++ b/hbs2-core/lib/HBS2/Net/IP/Addr.hs @@ -1,6 +1,7 @@ {-# OPTIONS_GHC -fno-warn-orphans #-} module HBS2.Net.IP.Addr - ( parseAddr + ( parseAddrUDP + , parseAddrTCP , getHostPort , Pretty , IPAddrPort(..) @@ -20,11 +21,8 @@ import Data.Functor import Data.IP import Data.Maybe import Data.Text qualified as Text -import Data.Text (Text) -import Network.SockAddr import Network.Socket import Data.Word (Word16) -import Prettyprinter class AddrPriority a where addrPriority :: a -> Int @@ -44,7 +42,12 @@ instance Serialise IPv6 newtype IPAddrPort e = IPAddrPort (IP, Word16) - deriving stock (Generic,Eq,Ord) + deriving stock (Generic,Eq,Ord,Show) + +instance Hashable IPv4 +instance Hashable IPv6 +instance Hashable IP +instance Hashable (IPAddrPort e) instance Serialise (IPAddrPort e) @@ -74,15 +77,22 @@ getHostPort s = parseOnly p s & either (const Nothing) Just (h, p) <- pAddr pure (Text.unpack h, read (Text.unpack p)) -parseAddr :: Text -> IO [AddrInfo] -parseAddr s = fromMaybe mempty <$> runMaybeT do + +parseAddrUDP :: Text -> IO [AddrInfo] +parseAddrUDP = parseAddr Datagram + +parseAddrTCP :: Text -> IO [AddrInfo] +parseAddrTCP = parseAddr Stream + +parseAddr :: SocketType -> Text -> IO [AddrInfo] +parseAddr tp s = fromMaybe mempty <$> runMaybeT do (host,port) <- MaybeT $ pure $ parseOnly pAddr s & either (const Nothing) Just let hostS = Text.unpack host & Just let portS = Text.unpack port & Just MaybeT $ liftIO $ getAddrInfo (Just udp) hostS portS <&> Just where - udp = defaultHints { addrSocketType = Datagram } + udp = defaultHints { addrSocketType = tp } pAddr :: Parser (Text, Text) pAddr = pIP6 <|> pIP4 <|> pHostName diff --git a/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs b/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs new file mode 100644 index 00000000..cb6dbc21 --- /dev/null +++ b/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs @@ -0,0 +1,445 @@ +{-# Language TemplateHaskell #-} +module HBS2.Net.Messaging.TCP + ( MessagingTCP + , runMessagingTCP + , newMessagingTCP + , tcpOwnPeer + , tcpCookie + ) where + +import HBS2.Clock +import HBS2.Net.IP.Addr +import HBS2.Net.Messaging +import HBS2.Net.Proto.Types +import HBS2.Prelude.Plated + +import HBS2.System.Logger.Simple + +import Control.Concurrent.Async +import Control.Concurrent.STM +import Control.Exception +import Control.Monad +import Data.Bits +import Data.ByteString.Lazy (ByteString) +import Data.ByteString.Lazy qualified as LBS +import Data.ByteString qualified as BS +import Data.Function +import Data.Functor +import Data.HashMap.Strict (HashMap) +import Data.HashMap.Strict qualified as HashMap +import Data.List qualified as L +import Data.Maybe +import Data.Word +import Lens.Micro.Platform +import Network.ByteOrder hiding (ByteString) +import Network.Simple.TCP +import Network.Socket hiding (listen,connect) +import Network.Socket.ByteString.Lazy hiding (send,recv) +import Streaming.Prelude qualified as S +import System.Random hiding (next) + +data SocketClosedException = + SocketClosedException + deriving stock (Show, Typeable) + +instance Exception SocketClosedException + + +-- FIXME: control-recv-capacity-to-avoid-leaks +data MessagingTCP = + MessagingTCP + { _tcpOwnPeer :: Peer L4Proto + , _tcpCookie :: Word32 + , _tcpConnPeer :: TVar (HashMap Word64 (Peer L4Proto)) + , _tcpPeerConn :: TVar (HashMap (Peer L4Proto) Word64) + , _tcpConnUsed :: TVar (HashMap Word64 Int) + , _tcpConnQ :: TVar (HashMap Word64 (TQueue (Peer L4Proto, ByteString))) + , _tcpPeerPx :: TVar (HashMap Word32 (Peer L4Proto)) + , _tcpPeerXp :: TVar (HashMap (Peer L4Proto) Word32) + , _tcpRecv :: TQueue (Peer L4Proto, ByteString) + , _tcpDefer :: TVar (HashMap (Peer L4Proto) [(TimeSpec, ByteString)]) + , _tcpDeferEv :: TQueue () + } + +makeLenses 'MessagingTCP + +newMessagingTCP :: ( MonadIO m + , FromSockAddr 'TCP (Peer L4Proto) + ) + => PeerAddr L4Proto + -> m MessagingTCP + +newMessagingTCP pa = liftIO do + MessagingTCP <$> fromPeerAddr pa + <*> randomIO + <*> newTVarIO mempty + <*> newTVarIO mempty + <*> newTVarIO mempty + <*> newTVarIO mempty + <*> newTVarIO mempty + <*> newTVarIO mempty + <*> newTQueueIO + <*> newTVarIO mempty + <*> newTQueueIO + +instance Messaging MessagingTCP L4Proto ByteString where + + sendTo bus (To p) (From f) msg = liftIO do + let own = view tcpOwnPeer bus + + co' <- atomically $ readTVar (view tcpPeerConn bus) <&> HashMap.lookup p + + -- debug $ "sendTo" <+> brackets (pretty own) + -- <+> pretty p + -- <+> braces (pretty co') + -- <+> pretty (LBS.length msg) + + maybe1 co' defer $ \co -> do + -- trace $ "writing to" <+> pretty co + q' <- atomically $ readTVar (view tcpConnQ bus) <&> HashMap.lookup co + maybe1 q' (warn $ "no queue for" <+> pretty co) $ \q -> do + atomically $ writeTQueue q (p, msg) + + where + defer = do + warn $ "defer" <+> pretty p + t <- getTimeCoarse + atomically $ modifyTVar (view tcpDefer bus) (HashMap.insertWith (<>) p [(t, msg)]) + atomically $ writeTQueue (view tcpDeferEv bus) () + + receive bus _ = liftIO do + let q = view tcpRecv bus + + ms <- atomically do + r <- readTQueue q + rs <- flushTQueue q + pure (r:rs) + + forM ms $ \(p, msg) -> pure (From p, msg) + + +-- FIXME: why-streaming-then? +-- Ну и зачем тут вообще стриминг, +-- если чтение всё равно руками написал? +-- Если fromChunks - O(n), и reverse O(n) +-- то мы все равно пройдем все чанки, на +-- кой чёрт тогда вообще стриминг? бред +-- какой-то. +readFromSocket :: forall m . MonadIO m + => Socket + -> Int + -> m ByteString + +readFromSocket sock size = LBS.fromChunks <$> (go size & S.toList_) + where + go 0 = pure () + go n = do + r <- liftIO $ recv sock n + maybe1 r eos $ \bs -> do + let nread = BS.length bs + S.yield bs + go (max 0 (n - nread)) + + eos = do + debug "SOCKET FUCKING CLOSED!" + liftIO $ throwIO SocketClosedException + +connectionId :: Word32 -> Word32 -> Word64 +connectionId a b = (fromIntegral hi `shiftL` 32) .|. fromIntegral low + where + low = min a b + hi = max a b + + +data ConnType = Server | Client + deriving (Eq,Ord,Show,Generic) + + +sendCookie :: MonadIO m + => MessagingTCP + -> Socket + -> m () + +sendCookie env so = do + let coo = view tcpCookie env & bytestring32 + send so coo + +recvCookie :: MonadIO m + => MessagingTCP + -> Socket + -> m Word32 + +recvCookie _ so = liftIO do + scoo <- readFromSocket so 4 <&> LBS.toStrict + pure $ word32 scoo + +handshake :: MonadIO m + => ConnType + -> MessagingTCP + -> Socket + -> m Word32 + +handshake Server env so = do + cookie <- recvCookie env so + sendCookie env so + pure cookie + +handshake Client env so = do + sendCookie env so + recvCookie env so + +spawnConnection :: forall m . MonadIO m + => ConnType + -> MessagingTCP + -> Socket + -> SockAddr + -> m () + +spawnConnection tp env so sa = liftIO do + + let myCookie = view tcpCookie env + let own = view tcpOwnPeer env + let newP = fromSockAddr @'TCP sa + + theirCookie <- handshake tp env so + + let connId = connectionId myCookie theirCookie + + traceCmd own + ( "spawnConnection " + <+> viaShow tp + <+> pretty myCookie + <+> pretty connId ) + newP + + debug $ "handshake" <+> viaShow tp + <+> brackets (pretty (view tcpOwnPeer env)) + <+> pretty sa + <+> pretty theirCookie + <+> pretty connId + + used <- atomically $ do + modifyTVar (view tcpConnUsed env) (HashMap.insertWith (+) connId 1) + readTVar (view tcpConnUsed env) <&> HashMap.findWithDefault 0 connId + + debug $ "USED:" <+> viaShow tp <+> pretty own <+> pretty used + + when ( used <= 2 ) do + atomically $ modifyTVar (view tcpPeerConn env) (HashMap.insert newP connId) + + when (used == 1) do + q <- getWriteQueue connId + updatePeer connId newP + + debug $ "NEW PEER" <+> brackets (pretty own) + <+> pretty connId + <+> pretty newP + <+> parens ("used:" <+> pretty used) + + rd <- async $ fix \next -> do + + spx <- readFromSocket so 4 <&> LBS.toStrict + ssize <- readFromSocket so 4 <&> LBS.toStrict --- УУУ, фреейминг + let px = word32 spx -- & fromIntegral + let size = word32 ssize & fromIntegral + + + bs <- readFromSocket so size + + memReqId newP px + + pxes <- readTVarIO (view tcpPeerPx env) + + let orig = fromMaybe (fromSockAddr @'TCP sa) (HashMap.lookup px pxes) + + -- debug $ "RECEIVED" <+> pretty orig <+> pretty (LBS.length bs) + + atomically $ writeTQueue (view tcpRecv env) (orig, bs) + + next + + wr <- async $ fix \next -> do + (rcpt, bs) <- atomically $ readTQueue q + + pq <- makeReqId rcpt + let qids = bytestring32 pq + let size = bytestring32 (fromIntegral $ LBS.length bs) + + let frame = LBS.fromStrict qids + <> LBS.fromStrict size -- req-size + <> bs -- payload + + sendLazy so frame --(LBS.toStrict frame) + next + + void $ waitAnyCatchCancel [rd,wr] + + cleanupConn connId + + -- gracefulClose so 1000 + debug $ "spawnConnection exit" <+> pretty sa + + where + + memReqId newP px = + atomically $ modifyTVar (view tcpPeerXp env) (HashMap.insert newP px) + + makeReqId rcpt = do + let pxes = view tcpPeerPx env + let xpes = view tcpPeerXp env + + nq <- randomIO + atomically $ do + px <- readTVar xpes <&> HashMap.lookup rcpt + case px of + Just qq -> pure qq + Nothing -> do + modifyTVar pxes (HashMap.insert nq rcpt) + modifyTVar xpes (HashMap.insert rcpt nq) + pure nq + + updatePeer connId newP = atomically $ do + modifyTVar (view tcpPeerConn env) (HashMap.insert newP connId) + modifyTVar (view tcpConnPeer env) (HashMap.insert connId newP) + + getWriteQueue connId = atomically $ do + readTVar (view tcpConnQ env) >>= \x -> do + case HashMap.lookup connId x of + Just qq -> pure qq + Nothing -> do + newQ <- newTQueue + modifyTVar (view tcpConnQ env) (HashMap.insert connId newQ) + pure newQ + + cleanupConn connId = atomically do + modifyTVar (view tcpConnUsed env) (HashMap.alter del connId) + used <- readTVar (view tcpConnUsed env) <&> HashMap.findWithDefault 0 connId + when (used == 0) do + p <- stateTVar (view tcpConnPeer env) + $ \x -> (HashMap.lookup connId x, HashMap.delete connId x) + + maybe1 p none $ \pp -> + modifyTVar (view tcpPeerConn env) (HashMap.delete pp) + + modifyTVar (view tcpConnQ env) (HashMap.delete connId) + + where + del = \case + Nothing -> Nothing + Just n | n <= 1 -> Nothing + | otherwise -> Just (pred n) + + +connectPeerTCP :: MonadIO m + => MessagingTCP + -> Peer L4Proto + -> m () + +connectPeerTCP env peer = liftIO do + pa <- toPeerAddr peer + let (L4Address _ (IPAddrPort (i,p))) = pa + + connect (show i) (show p) $ \(sock, remoteAddr) -> do + spawnConnection Client env sock remoteAddr + shutdown sock ShutdownBoth + +runMessagingTCP :: forall m . MonadIO m => MessagingTCP -> m () +runMessagingTCP env = liftIO do + own <- toPeerAddr $ view tcpOwnPeer env + let (L4Address _ (IPAddrPort (i,p))) = own + + let defs = view tcpDefer env + + void $ async $ forever do + pause @'Seconds 30 + now <- getTimeCoarse + + -- FIXME: time-hardcode-again + let expire = filter (\e -> (realToFrac (toNanoSecs (now - fst e)) / 1e9) < 30) + atomically $ modifyTVar defs + $ HashMap.mapMaybe + $ \es -> let rs = expire es + in case rs of + [] -> Nothing + xs -> Just xs + + void $ async $ forever do + + let ev = view tcpDeferEv env + + -- FIXME: wait-period-hardcode + void $ race (pause @'Seconds 0.25) (atomically $ readTQueue ev >> flushTQueue ev) + + dePips <- readTVarIO defs <&> HashMap.keys + + + forM_ dePips $ \pip -> do + msgs <- readTVarIO defs <&> HashMap.findWithDefault mempty pip + + unless (L.null msgs) do + trace $ "DEFERRED FOR" <+> pretty pip <+> pretty (length msgs) + + let len = length msgs + + when (len > 10) do + -- FIXME: deferred-message-hardcoded + atomically $ modifyTVar defs (HashMap.adjust (L.drop (len - 10)) pip) + + co' <- atomically $ readTVar (view tcpPeerConn env) <&> HashMap.lookup pip + + maybe1 co' (void $ async (connectPeerTCP env pip)) $ \co -> do + q' <- atomically $ readTVar (view tcpConnQ env) <&> HashMap.lookup co + maybe1 q' none $ \q -> do + atomically do + mss <- readTVar defs <&> HashMap.findWithDefault mempty pip + modifyTVar defs $ HashMap.delete pip + forM_ mss $ \m -> writeTQueue q (pip, snd m) + + pure () + + void $ async $ forever do + pause @'Seconds 120 + ps <- readTVarIO $ view tcpConnPeer env + let peers = HashMap.toList ps + forM_ peers $ \(c,pip) -> do + used <- readTVarIO (view tcpConnUsed env) <&> HashMap.findWithDefault 0 c + trace $ "peer" <+> brackets (pretty own) + <+> pretty pip + <+> pretty c + <+> parens ("used:" <+> pretty used) + + listen (Host (show i)) (show p) $ \(sock, sa) -> do + debug $ "Listening on" <+> pretty sa + + forever do + void $ acceptFork sock $ \(so, remote) -> do + trace $ "GOT INCOMING CONNECTION FROM" + <+> brackets (pretty own) + <+> brackets (pretty sa) + <+> pretty remote + + void $ try @SomeException $ do + + spawnConnection Server env so remote + + -- gracefulClose so 1000 + + -- TODO: probably-cleanup-peer + -- TODO: periodically-drop-inactive-connections + + debug $ "CLOSING CONNECTION" <+> pretty remote + shutdown so ShutdownBoth + close so + + +traceCmd :: forall a ann b m . ( Pretty a + , Pretty b + , MonadIO m + ) + => a -> Doc ann -> b -> m () + +traceCmd p1 s p2 = do + trace $ brackets (pretty p1) + <+> s + <+> parens (pretty p2) + diff --git a/hbs2-core/lib/HBS2/Net/Messaging/UDP.hs b/hbs2-core/lib/HBS2/Net/Messaging/UDP.hs index 28725edd..9ae6079c 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/UDP.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/UDP.hs @@ -1,4 +1,3 @@ -{-# Language TemplateHaskell #-} {-# Language UndecidableInstances #-} module HBS2.Net.Messaging.UDP where @@ -9,20 +8,16 @@ import HBS2.Net.Messaging import HBS2.Net.Proto import HBS2.Prelude.Plated -import Data.Foldable import Data.Function import Control.Exception import Control.Monad.Trans.Maybe import Control.Concurrent.Async import Control.Concurrent.STM -import Control.Concurrent.STM.TBQueue qualified as Q import Control.Concurrent.STM.TQueue qualified as Q0 import Control.Monad import Data.ByteString.Lazy (ByteString) -import Data.ByteString qualified as BS import Data.ByteString.Lazy qualified as LBS import Data.Functor -import Data.Hashable import Data.List qualified as L import Data.Maybe -- import Data.Text (Text) @@ -31,55 +26,22 @@ import Lens.Micro.Platform import Network.Socket import Network.Socket.ByteString import Network.Multicast -import Prettyprinter - -data UDP - --- FIXME: #ASAP change SockAddr to PeerAddr !!! -instance HasPeer UDP where - newtype instance Peer UDP = - PeerUDP - { _sockAddr :: SockAddr - } - deriving stock (Eq,Ord,Show,Generic) -instance AddrPriority (Peer UDP) where - addrPriority (PeerUDP sa) = addrPriority sa - -instance Hashable (Peer UDP) where - hashWithSalt salt p = case _sockAddr p of - SockAddrInet pn h -> hashWithSalt salt (4, fromIntegral pn, h) - SockAddrInet6 pn _ h _ -> hashWithSalt salt (6, fromIntegral pn, h) - SockAddrUnix s -> hashWithSalt salt ("unix", s) - -instance Pretty (Peer UDP) where - pretty p = pretty (_sockAddr p) - -makeLenses 'PeerUDP - - -instance (FromStringMaybe (IPAddrPort UDP), MonadIO m) => IsPeerAddr UDP m where - type instance PeerAddr UDP = IPAddrPort UDP - toPeerAddr p = pure $ fromString $ show $ pretty p - - fromPeerAddr iap = do - ai <- liftIO $ parseAddr $ fromString (show (pretty iap)) - pure $ PeerUDP $ addrAddress (head ai) -- FIXME: errors?! -- One address - one peer - one messaging data MessagingUDP = MessagingUDP { listenAddr :: SockAddr - , sink :: TQueue (From UDP, ByteString) - , inbox :: TQueue (To UDP, ByteString) + , sink :: TQueue (From L4Proto, ByteString) + , inbox :: TQueue (To L4Proto, ByteString) , sock :: TVar Socket , mcast :: Bool } -getOwnPeer :: MessagingUDP -> Peer UDP -getOwnPeer mess = PeerUDP (listenAddr mess) +getOwnPeer :: MessagingUDP -> Peer L4Proto +getOwnPeer mess = PeerL4 UDP (listenAddr mess) newMessagingUDPMulticast :: MonadIO m => String -> m (Maybe MessagingUDP) newMessagingUDPMulticast s = runMaybeT $ do @@ -103,7 +65,7 @@ newMessagingUDP reuse saddr = Just s -> do runMaybeT $ do - l <- MaybeT $ liftIO $ parseAddr (Text.pack s) <&> listToMaybe . sorted + l <- MaybeT $ liftIO $ parseAddrUDP (Text.pack s) <&> listToMaybe . sorted let a = addrAddress l so <- liftIO $ socket (addrFamily l) (addrSocketType l) (addrProtocol l) @@ -144,7 +106,8 @@ udpWorker env tso = do -- pause ( 10 :: Timeout 'Seconds ) (msg, from) <- recvFrom so defMaxDatagram -- liftIO $ print $ "recv:" <+> pretty (BS.length msg) - liftIO $ atomically $ Q0.writeTQueue (sink env) (From (PeerUDP from), LBS.fromStrict msg) + -- FIXME: ASAP-check-addr-type + liftIO $ atomically $ Q0.writeTQueue (sink env) (From (PeerL4 UDP from), LBS.fromStrict msg) sndLoop <- async $ forever $ do pause ( 10 :: Timeout 'Seconds ) @@ -171,7 +134,7 @@ runMessagingUDP udpMess = liftIO $ do w <- async $ udpWorker udpMess (sock udpMess) waitCatch w >>= either throwIO (const $ pure ()) -instance Messaging MessagingUDP UDP ByteString where +instance Messaging MessagingUDP L4Proto ByteString where sendTo bus (To whom) _ msg = liftIO do -- atomically $ Q0.writeTQueue (inbox bus) (To whom, msg) diff --git a/hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs b/hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs index 23e8f72e..52c613a3 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs @@ -152,7 +152,7 @@ blockChunksProto adapter (BlockChunks c p) = do pure () BlockLost{} -> do - liftIO $ print "GOT BLOCK LOST MESSAGE - means IO ERROR" + -- liftIO $ print "GOT BLOCK LOST MESSAGE - means IO ERROR" pure () _ -> do diff --git a/hbs2-core/lib/HBS2/Net/Proto/Definition.hs b/hbs2-core/lib/HBS2/Net/Proto/Definition.hs index 91cff8f2..e84fa2cd 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Definition.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Definition.hs @@ -8,10 +8,8 @@ module HBS2.Net.Proto.Definition import HBS2.Clock import HBS2.Defaults -import HBS2.Merkle import HBS2.Hash import HBS2.Net.Auth.Credentials -import HBS2.Net.Messaging.UDP import HBS2.Net.Proto import HBS2.Net.Proto.BlockAnnounce import HBS2.Net.Proto.BlockChunks @@ -26,7 +24,7 @@ import HBS2.Prelude import Data.Functor import Data.ByteString.Lazy (ByteString) import Data.ByteString qualified as BS -import Codec.Serialise (deserialiseOrFail,serialise,Serialise(..)) +import Codec.Serialise (deserialiseOrFail,serialise) import Crypto.Saltine.Core.Box qualified as Crypto import Crypto.Saltine.Class qualified as Crypto @@ -35,7 +33,7 @@ import Crypto.Saltine.Core.Box qualified as Encrypt -type instance Encryption UDP = HBS2Basic +type instance Encryption L4Proto = HBS2Basic type instance PubKey 'Sign HBS2Basic = Sign.PublicKey type instance PrivKey 'Sign HBS2Basic = Sign.SecretKey @@ -54,9 +52,9 @@ instance Serialise Encrypt.PublicKey instance Serialise Sign.SecretKey instance Serialise Encrypt.SecretKey -instance HasProtocol UDP (BlockInfo UDP) where - type instance ProtocolId (BlockInfo UDP) = 1 - type instance Encoded UDP = ByteString +instance HasProtocol L4Proto (BlockInfo L4Proto) where + type instance ProtocolId (BlockInfo L4Proto) = 1 + type instance Encoded L4Proto = ByteString decode = either (const Nothing) Just . deserialiseOrFail encode = serialise @@ -64,103 +62,108 @@ instance HasProtocol UDP (BlockInfo UDP) where -- requestPeriodLim = ReqLimPerMessage 1 -instance HasProtocol UDP (BlockChunks UDP) where - type instance ProtocolId (BlockChunks UDP) = 2 - type instance Encoded UDP = ByteString +instance HasProtocol L4Proto (BlockChunks L4Proto) where + type instance ProtocolId (BlockChunks L4Proto) = 2 + type instance Encoded L4Proto = ByteString decode = either (const Nothing) Just . deserialiseOrFail encode = serialise -instance Expires (SessionKey UDP (BlockChunks UDP)) where +instance Expires (SessionKey L4Proto (BlockChunks L4Proto)) where expiresIn _ = Just defCookieTimeoutSec -instance HasProtocol UDP (BlockAnnounce UDP) where - type instance ProtocolId (BlockAnnounce UDP) = 3 - type instance Encoded UDP = ByteString +instance HasProtocol L4Proto (BlockAnnounce L4Proto) where + type instance ProtocolId (BlockAnnounce L4Proto) = 3 + type instance Encoded L4Proto = ByteString decode = either (const Nothing) Just . deserialiseOrFail encode = serialise -instance HasProtocol UDP (PeerHandshake UDP) where - type instance ProtocolId (PeerHandshake UDP) = 4 - type instance Encoded UDP = ByteString +instance HasProtocol L4Proto (PeerHandshake L4Proto) where + type instance ProtocolId (PeerHandshake L4Proto) = 4 + type instance Encoded L4Proto = ByteString decode = either (const Nothing) Just . deserialiseOrFail encode = serialise requestPeriodLim = ReqLimPerProto 2 -instance HasProtocol UDP (PeerAnnounce UDP) where - type instance ProtocolId (PeerAnnounce UDP) = 5 - type instance Encoded UDP = ByteString +instance HasProtocol L4Proto (PeerAnnounce L4Proto) where + type instance ProtocolId (PeerAnnounce L4Proto) = 5 + type instance Encoded L4Proto = ByteString decode = either (const Nothing) Just . deserialiseOrFail encode = serialise -instance HasProtocol UDP (PeerExchange UDP) where - type instance ProtocolId (PeerExchange UDP) = 6 - type instance Encoded UDP = ByteString +instance HasProtocol L4Proto (PeerExchange L4Proto) where + type instance ProtocolId (PeerExchange L4Proto) = 6 + type instance Encoded L4Proto = ByteString decode = either (const Nothing) Just . deserialiseOrFail encode = serialise -instance HasProtocol UDP (RefLogUpdate UDP) where - type instance ProtocolId (RefLogUpdate UDP) = 7 - type instance Encoded UDP = ByteString +instance HasProtocol L4Proto (RefLogUpdate L4Proto) where + type instance ProtocolId (RefLogUpdate L4Proto) = 7 + type instance Encoded L4Proto = ByteString decode = either (const Nothing) Just . deserialiseOrFail encode = serialise requestPeriodLim = ReqLimPerMessage 600 -instance HasProtocol UDP (RefLogRequest UDP) where - type instance ProtocolId (RefLogRequest UDP) = 8 - type instance Encoded UDP = ByteString +instance HasProtocol L4Proto (RefLogRequest L4Proto) where + type instance ProtocolId (RefLogRequest L4Proto) = 8 + type instance Encoded L4Proto = ByteString decode = either (const Nothing) Just . deserialiseOrFail encode = serialise -instance HasProtocol UDP (PeerMetaProto UDP) where - type instance ProtocolId (PeerMetaProto UDP) = 9 - type instance Encoded UDP = ByteString +instance HasProtocol L4Proto (PeerMetaProto L4Proto) where + type instance ProtocolId (PeerMetaProto L4Proto) = 9 + type instance Encoded L4Proto = ByteString decode = either (const Nothing) Just . deserialiseOrFail encode = serialise -- FIXME: real-period requestPeriodLim = ReqLimPerMessage 1 -instance Expires (SessionKey UDP (BlockInfo UDP)) where +instance Expires (SessionKey L4Proto (BlockInfo L4Proto)) where expiresIn _ = Just defCookieTimeoutSec -instance Expires (EventKey UDP (BlockInfo UDP)) where +instance Expires (EventKey L4Proto (BlockInfo L4Proto)) where expiresIn _ = Just 600 -instance Expires (EventKey UDP (BlockChunks UDP)) where +instance Expires (EventKey L4Proto (BlockChunks L4Proto)) where expiresIn _ = Just 600 -instance Expires (EventKey UDP (BlockAnnounce UDP)) where +instance Expires (EventKey L4Proto (BlockAnnounce L4Proto)) where expiresIn _ = Nothing -instance Expires (SessionKey UDP (KnownPeer UDP)) where +instance Expires (SessionKey L4Proto (KnownPeer L4Proto)) where expiresIn _ = Just 3600 -instance Expires (SessionKey UDP (PeerHandshake UDP)) where +instance Expires (SessionKey L4Proto (PeerHandshake L4Proto)) where expiresIn _ = Just 60 -instance Expires (EventKey UDP (PeerAnnounce UDP)) where +instance Expires (EventKey L4Proto (PeerAnnounce L4Proto)) where expiresIn _ = Nothing -instance Expires (EventKey UDP (PeerMetaProto UDP)) where +instance Expires (EventKey L4Proto (PeerMetaProto L4Proto)) where expiresIn _ = Just 600 +-- instance MonadIO m => HasNonces () m where +-- type instance Nonce (PeerHandshake L4Proto) = BS.ByteString +-- newNonce = do +-- n <- liftIO ( Crypto.newNonce <&> Crypto.encode ) +-- pure $ BS.take 32 n -instance MonadIO m => HasNonces (PeerHandshake UDP) m where - type instance Nonce (PeerHandshake UDP) = BS.ByteString +instance MonadIO m => HasNonces (PeerHandshake L4Proto) m where + type instance Nonce (PeerHandshake L4Proto) = BS.ByteString newNonce = do n <- liftIO ( Crypto.newNonce <&> Crypto.encode ) pure $ BS.take 32 n -instance MonadIO m => HasNonces (PeerExchange UDP) m where - type instance Nonce (PeerExchange UDP) = BS.ByteString +instance MonadIO m => HasNonces (PeerExchange L4Proto) m where + type instance Nonce (PeerExchange L4Proto) = BS.ByteString newNonce = do n <- liftIO ( Crypto.newNonce <&> Crypto.encode ) pure $ BS.take 32 n -instance MonadIO m => HasNonces (RefLogUpdate UDP) m where - type instance Nonce (RefLogUpdate UDP) = BS.ByteString +instance MonadIO m => HasNonces (RefLogUpdate L4Proto) m where + type instance Nonce (RefLogUpdate L4Proto) = BS.ByteString newNonce = do n <- liftIO ( Crypto.newNonce <&> Crypto.encode ) pure $ BS.take 32 n diff --git a/hbs2-core/lib/HBS2/Net/Proto/Peer.hs b/hbs2-core/lib/HBS2/Net/Proto/Peer.hs index 648daa13..1de440a6 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Peer.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Peer.hs @@ -75,6 +75,7 @@ sendPing :: forall e m . ( MonadIO m , HasNonces (PeerHandshake e) m , Nonce (PeerHandshake e) ~ PingNonce , Pretty (Peer e) + , e ~ L4Proto ) => Peer e -> m () @@ -105,6 +106,7 @@ peerHandShakeProto :: forall e s m . ( MonadIO m , HasCredentials s m , Signatures s , s ~ Encryption e + , e ~ L4Proto ) => PeerHandshakeAdapter e m -> PeerHandshake e -> m () diff --git a/hbs2-core/lib/HBS2/Net/Proto/PeerExchange.hs b/hbs2-core/lib/HBS2/Net/Proto/PeerExchange.hs index 70b3f0c0..60d3a5fd 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/PeerExchange.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/PeerExchange.hs @@ -9,21 +9,26 @@ import HBS2.Net.Proto.Sessions import HBS2.Events import HBS2.Clock import HBS2.Defaults +import HBS2.Net.IP.Addr -import Data.ByteString qualified as BS -import Data.Traversable +import Control.Monad import Data.Functor import Data.Maybe import Codec.Serialise import Data.Hashable import Type.Reflection +import Data.List qualified as L import HBS2.System.Logger.Simple -import Prettyprinter + + +data PexVersion = PEX1 | PEX2 data PeerExchange e = PeerExchangeGet (Nonce (PeerExchange e)) - | PeerExchangePeers (Nonce (PeerExchange e)) [PeerAddr e] + | PeerExchangePeers (Nonce (PeerExchange e)) [IPAddrPort e] + | PeerExchangeGet2 (Nonce (PeerExchange e)) + | PeerExchangePeers2 (Nonce (PeerExchange e)) [PeerAddr e] deriving stock (Generic, Typeable) data PeerExchangePeersEv e @@ -40,7 +45,9 @@ sendPeerExchangeGet :: forall e m . ( MonadIO m sendPeerExchangeGet pip = do nonce <- newNonce @(PeerExchange e) update nonce (PeerExchangeKey @e nonce) id + -- FIXME: about-to-delete request pip (PeerExchangeGet @e nonce) + request pip (PeerExchangeGet2 @e nonce) peerExchangeProto :: forall e m . ( MonadIO m , Response e (PeerExchange e) m @@ -53,14 +60,45 @@ peerExchangeProto :: forall e m . ( MonadIO m , EventEmitter e (PeerExchangePeersEv e) m , Eq (Nonce (PeerExchange e)) , Pretty (Peer e) + , e ~ L4Proto ) => PeerExchange e -> m () -peerExchangeProto = - \case - PeerExchangeGet n -> deferred proto do - -- TODO: sort peers by their usefulness +peerExchangeProto msg = do + case msg of + PeerExchangeGet n -> peerExchangeGet PEX1 n + PeerExchangeGet2 n -> peerExchangeGet PEX2 n + PeerExchangePeers nonce pips -> peerExchangePeers1 nonce pips + PeerExchangePeers2 nonce pips -> peerExchangePeers2 nonce pips + where + proto = Proxy @(PeerExchange e) + + fromPEXAddr1 = fromPeerAddr . L4Address UDP + + peerExchangePeers1 nonce pips = do + pip <- thatPeer proto + + ok <- find (PeerExchangeKey @e nonce) id <&> isJust + + when ok do + sa <- mapM fromPEXAddr1 pips + debug $ "got pex" <+> "from" <+> pretty pip <+> pretty sa + expire @e (PeerExchangeKey nonce) + emit @e PeerExchangePeersKey (PeerExchangePeersData sa) + + peerExchangePeers2 nonce pips = do + pip <- thatPeer proto + + ok <- find (PeerExchangeKey @e nonce) id <&> isJust + + when ok do + sa <- mapM fromPeerAddr pips + debug $ "got pex" <+> "from" <+> pretty pip <+> pretty sa + expire @e (PeerExchangeKey nonce) + emit @e PeerExchangePeersKey (PeerExchangePeersData sa) + + peerExchangeGet pex n = deferred proto do that <- thatPeer proto debug $ "PeerExchangeGet" <+> "from" <+> pretty that @@ -68,32 +106,31 @@ peerExchangeProto = pl <- getPeerLocator @e pips <- knownPeers @e pl - pa' <- forM pips $ \p -> do - auth <- find (KnownPeerKey p) id <&> isJust - if auth then do - a <- toPeerAddr p - pure [a] - else - pure mempty + case pex of + PEX1 -> do - let pa = take defPexMaxPeers $ mconcat pa' + -- TODO: tcp-peer-support-in-pex + pa' <- forM pips $ \p -> do + auth <- find (KnownPeerKey p) id <&> isJust + pa <- toPeerAddr p + case pa of + (L4Address UDP x) | auth -> pure [x] + _ -> pure mempty - response (PeerExchangePeers @e n pa) + let pa = take defPexMaxPeers $ mconcat pa' - PeerExchangePeers nonce pips -> do + response (PeerExchangePeers @e n pa) - pip <- thatPeer proto + PEX2 -> do - ok <- find (PeerExchangeKey @e nonce) id <&> isJust + pa' <- forM pips $ \p -> do + auth <- find (KnownPeerKey p) id + maybe1 auth (pure mempty) ( const $ fmap L.singleton (toPeerAddr p) ) - when ok do - sa <- mapM (fromPeerAddr @e) pips - debug $ "got pex" <+> "from" <+> pretty pip <+> pretty sa - expire @e (PeerExchangeKey nonce) - emit @e PeerExchangePeersKey (PeerExchangePeersData sa) + -- FIXME: asap-random-shuffle-peers + let pa = take defPexMaxPeers $ mconcat pa' - where - proto = Proxy @(PeerExchange e) + response (PeerExchangePeers2 @e n pa) newtype instance SessionKey e (PeerExchange e) = diff --git a/hbs2-core/lib/HBS2/Net/Proto/Types.hs b/hbs2-core/lib/HBS2/Net/Proto/Types.hs index 79c5d48e..d0a08c85 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Types.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Types.hs @@ -2,20 +2,27 @@ {-# Language FunctionalDependencies #-} {-# Language AllowAmbiguousTypes #-} {-# Language UndecidableInstances #-} +{-# Language TemplateHaskell #-} +{-# Language MultiWayIf #-} module HBS2.Net.Proto.Types ( module HBS2.Net.Proto.Types ) where -import HBS2.Prelude (FromStringMaybe(..)) +import HBS2.Prelude.Plated import HBS2.Clock +import HBS2.Net.IP.Addr -import Data.Kind -import GHC.TypeLits -import Data.Proxy -import Data.Hashable -import Control.Monad.IO.Class -import System.Random qualified as Random +import Control.Applicative import Data.Digest.Murmur32 +import Data.Hashable +import Data.Kind +import Data.Text qualified as Text +import GHC.TypeLits +import Lens.Micro.Platform +import Network.Socket +import System.Random qualified as Random +import Codec.Serialise +import Data.Maybe -- e -> Transport (like, UDP or TChan) -- p -> L4 Protocol (like Ping/Pong) @@ -25,6 +32,17 @@ type family Encryption e :: Type -- FIXME: move-to-a-crypto-definition-modules data HBS2Basic +data L4Proto = UDP | TCP + deriving stock (Eq,Ord,Generic) + deriving stock (Enum,Bounded) + +instance Hashable L4Proto where + hashWithSalt s l = hashWithSalt s ("l4proto", fromEnum l) + +instance Show L4Proto where + show UDP = "udp" + show TCP = "tcp" + -- type family Encryption e :: Type class Monad m => GenCookie e m where @@ -36,7 +54,6 @@ class Monad m => HasNonces p m where newNonce :: m (Nonce p) - class HasCookie e p | p -> e where type family Cookie e :: Type getCookie :: p -> Maybe (Cookie e) @@ -47,17 +64,20 @@ type PeerNonce = Nonce () class HasPeerNonce e m where peerNonce :: m PeerNonce +-- instance {-# OVERLAPPABLE #-} HasPeerNonce e IO where +-- peerNonce = newNonce @() + data WithCookie e p = WithCookie (Cookie e) p class (Hashable (Peer e), Eq (Peer e)) => HasPeer e where data family (Peer e) :: Type -class ( FromStringMaybe (PeerAddr e) - , Eq (PeerAddr e) +class ( Eq (PeerAddr e) , Monad m + , Hashable (PeerAddr e) ) => IsPeerAddr e m where - type family PeerAddr e :: Type + data family PeerAddr e :: Type toPeerAddr :: Peer e -> m (PeerAddr e) fromPeerAddr :: PeerAddr e -> m (Peer e) @@ -102,3 +122,80 @@ instance {-# OVERLAPPABLE #-} (MonadIO m, Num (Cookie e)) => GenCookie e m where r <- liftIO $ Random.randomIO @Int pure $ fromInteger $ fromIntegral $ asWord32 $ hash32 (hash salt + r) +class FromSockAddr ( t :: L4Proto) a where + fromSockAddr :: SockAddr -> a + +instance HasPeer L4Proto where + data instance Peer L4Proto = + PeerL4 + { _sockType :: L4Proto + , _sockAddr :: SockAddr + } + deriving stock (Eq,Ord,Show,Generic) + + +instance AddrPriority (Peer L4Proto) where + addrPriority (PeerL4 _ sa) = addrPriority sa + +instance Hashable (Peer L4Proto) where + hashWithSalt salt p = case _sockAddr p of + SockAddrInet pn h -> hashWithSalt salt (4, fromEnum (_sockType p), fromIntegral pn, h) + SockAddrInet6 pn _ h _ -> hashWithSalt salt (6, fromEnum (_sockType p), fromIntegral pn, h) + SockAddrUnix s -> hashWithSalt salt ("unix", s) + +-- FIXME: support-udp-prefix +instance Pretty (Peer L4Proto) where + pretty (PeerL4 UDP p) = pretty p + pretty (PeerL4 TCP p) = "tcp://" <> pretty p + +instance FromSockAddr 'UDP (Peer L4Proto) where + fromSockAddr = PeerL4 UDP + +instance FromSockAddr 'TCP (Peer L4Proto) where + fromSockAddr = PeerL4 TCP + +makeLenses 'PeerL4 + +newtype FromIP a = FromIP { fromIP :: a } + + +-- FIXME: tcp-and-udp-support +instance (MonadIO m) => IsPeerAddr L4Proto m where +-- instance MonadIO m => IsPeerAddr L4Proto m where + data instance PeerAddr L4Proto = + L4Address L4Proto (IPAddrPort L4Proto) + deriving stock (Eq,Ord,Show,Generic) + + -- FIXME: backlog-fix-addr-conversion + toPeerAddr (PeerL4 t p) = pure $ L4Address t (fromString $ show $ pretty p) + -- + + -- FIXME: ASAP-tcp-support + fromPeerAddr (L4Address UDP iap) = do + ai <- liftIO $ parseAddrUDP $ fromString (show (pretty iap)) + pure $ PeerL4 UDP $ addrAddress (head ai) + + fromPeerAddr (L4Address TCP iap) = do + ai <- liftIO $ parseAddrTCP $ fromString (show (pretty iap)) + pure $ PeerL4 TCP $ addrAddress (head ai) + +instance Hashable (PeerAddr L4Proto) + +instance Pretty (PeerAddr L4Proto) where + pretty (L4Address UDP a) = pretty a + pretty (L4Address TCP a) = "tcp://" <> pretty a + +instance IsString (PeerAddr L4Proto) where + fromString s = fromMaybe (error "invalid address") (fromStringMay s) + +instance FromStringMaybe (PeerAddr L4Proto) where + fromStringMay s | Text.isPrefixOf "tcp://" txt = L4Address TCP <$> fromStringMay addr + | otherwise = L4Address UDP <$> fromStringMay addr + where + txt = fromString s :: Text + addr = Text.unpack $ fromMaybe txt (Text.stripPrefix "tcp://" txt <|> Text.stripPrefix "udp://" txt) + +instance Serialise L4Proto +instance Serialise (PeerAddr L4Proto) + + diff --git a/hbs2-git/lib/HBS2Git/App.hs b/hbs2-git/lib/HBS2Git/App.hs index f03da38d..c8bf68c5 100644 --- a/hbs2-git/lib/HBS2Git/App.hs +++ b/hbs2-git/lib/HBS2Git/App.hs @@ -15,7 +15,6 @@ import HBS2.Hash import HBS2.System.Logger.Simple import HBS2.Merkle import HBS2.Git.Types -import HBS2.Net.Messaging.UDP (UDP) import HBS2.Net.Proto.Definition() import HBS2.Net.Auth.Credentials hiding (getCredentials) import HBS2.Net.Proto.RefLog diff --git a/hbs2-git/lib/HBS2Git/Types.hs b/hbs2-git/lib/HBS2Git/Types.hs index 417e6991..75d183de 100644 --- a/hbs2-git/lib/HBS2Git/Types.hs +++ b/hbs2-git/lib/HBS2Git/Types.hs @@ -10,7 +10,7 @@ module HBS2Git.Types import HBS2.Prelude.Plated import HBS2.Git.Types -import HBS2.Net.Messaging.UDP (UDP) +import HBS2.Net.Proto.Types(L4Proto) import HBS2.Data.Types.Refs import HBS2.Net.Proto.Types import HBS2.Net.Auth.Credentials @@ -38,7 +38,7 @@ import Control.Monad.Catch -- FIXME: remove-udp-hardcode-asap type Schema = HBS2Basic -type HBS2L4Proto = UDP +type HBS2L4Proto = L4Proto -- FIXME: introduce-API-type type API = String diff --git a/hbs2-peer/app/BlockDownload.hs b/hbs2-peer/app/BlockDownload.hs index c21727ca..5c8230a1 100644 --- a/hbs2-peer/app/BlockDownload.hs +++ b/hbs2-peer/app/BlockDownload.hs @@ -31,6 +31,7 @@ import Control.Concurrent.STM.TSem import Control.Monad.Reader import Control.Monad.Trans.Maybe import Data.ByteString.Lazy (ByteString) +import Data.ByteString.Lazy qualified as LBS import Data.Cache qualified as Cache import Data.Foldable hiding (find) import Data.HashMap.Strict qualified as HashMap @@ -172,6 +173,7 @@ processBlock h = do downloadFromWithPeer :: forall e m . ( DownloadFromPeerStuff e m + , e ~ L4Proto , HasPeerLocator e (BlockDownloadM e m) ) => Peer e -> Integer @@ -186,15 +188,21 @@ downloadFromWithPeer peer thisBkSize h = do sto <- lift getStorage + let chunkSize = case view sockType peer of + UDP -> defChunkSize + TCP -> defChunkSize + coo <- genCookie (peer,h) let key = DownloadSessionKey (peer, coo) - let chusz = defChunkSize + let chusz = fromIntegral chunkSize -- defChunkSize dnwld <- newBlockDownload h let chuQ = view sBlockChunks dnwld let new = set sBlockChunkSize chusz . set sBlockSize (fromIntegral thisBkSize) $ dnwld + trace $ "downloadFromWithPeer STARTED" <+> pretty coo + lift $ update @e new key id let burstSizeT = view peerBurst pinfo @@ -207,11 +215,16 @@ downloadFromWithPeer peer thisBkSize h = do let bursts = calcBursts burstSize chunkNums - let w = max defChunkWaitMax $ realToFrac (toNanoSeconds defBlockWaitMax) / realToFrac (length bursts) / 1e9 * 2 + rtt <- medianPeerRTT pinfo <&> fmap ( (/1e9) . realToFrac ) + <&> fromMaybe defChunkWaitMax - let burstTime = realToFrac w :: Timeout 'Seconds -- defChunkWaitMax -- min defBlockWaitMax (0.8 * realToFrac burstSize * defChunkWaitMax) + let w = 4 * rtt * realToFrac (length bursts) - r <- liftIO $ newTVarIO (mempty :: IntMap ByteString) + let burstTime = max defChunkWaitMax $ realToFrac w :: Timeout 'Seconds + + trace $ "BURST TIME" <+> pretty burstTime + + let r = view sBlockChunks2 new rq <- liftIO newTQueueIO for_ bursts $ liftIO . atomically . writeTQueue rq @@ -223,44 +236,56 @@ downloadFromWithPeer peer thisBkSize h = do Just (i,chunksN) -> do let req = BlockGetChunks h chusz (fromIntegral i) (fromIntegral chunksN) + + void $ liftIO $ atomically $ flushTQueue chuQ + lift $ request peer (BlockChunks @e coo req) - -- TODO: here wait for all requested chunks! - -- FIXME: it may blocks forever, so must be timeout and retry + let waity = liftIO $ race ( pause burstTime >> pure False ) do + fix \zzz -> do + hc <- atomically do + forM [i .. i + chunksN-1 ] $ \j -> do + m <- readTVar r + pure (j, IntMap.member j m) - catched <- either id id <$> liftIO ( race ( pause burstTime >> pure mempty ) - ( replicateM chunksN - $ atomically - $ readTQueue chuQ ) + let here = and $ fmap snd hc + if here then do + pure here - ) - if not (null catched) then do + else do + pause rtt + zzz + + void $ liftIO $ race ( pause (2 * rtt) ) $ atomically do + void $ peekTQueue chuQ + flushTQueue chuQ + + catched <- waity <&> either id id + + if catched then do liftIO $ atomically do modifyTVar (view peerDownloaded pinfo) (+chunksN) writeTVar (view peerPingFailed pinfo) 0 else do - -- liftIO $ atomically $ modifyTVar (view peerErrors pinfo) succ - updatePeerInfo True pinfo + liftIO $ atomically $ modifyTVar (view peerErrors pinfo) succ + updatePeerInfo True peer pinfo - newBurst <- liftIO $ readTVarIO burstSizeT - -- let newBurst = max defBurst $ floor (realToFrac newBurst' * 0.5 ) + newBurst <- liftIO $ readTVarIO burstSizeT + -- let newBurst = max defBurst $ floor (realToFrac newBurst' * 0.5 ) - liftIO $ atomically $ modifyTVar (view peerDownloaded pinfo) (+chunksN) + liftIO $ atomically $ modifyTVar (view peerDownloaded pinfo) (+chunksN) - let chuchu = calcBursts newBurst [ i + n | n <- [0 .. chunksN] ] + let chuchu = calcBursts newBurst [ i + n | n <- [0 .. chunksN] ] - liftIO $ atomically $ modifyTVar (view peerErrors pinfo) succ + liftIO $ atomically $ modifyTVar (view peerErrors pinfo) succ - trace $ "new burst: " <+> pretty newBurst - trace $ "missed chunks for request" <+> pretty (i,chunksN) - trace $ "burst time" <+> pretty burstTime + trace $ "new burst: " <+> pretty newBurst + trace $ "missed chunks for request" <+> pretty (i,chunksN) + trace $ "burst time" <+> pretty burstTime - for_ chuchu $ liftIO . atomically . writeTQueue rq - - for_ catched $ \(num,bs) -> do - liftIO $ atomically $ modifyTVar' r (IntMap.insert (fromIntegral num) bs) + for_ chuchu $ liftIO . atomically . writeTQueue rq next @@ -268,13 +293,13 @@ downloadFromWithPeer peer thisBkSize h = do sz <- liftIO $ readTVarIO r <&> IntMap.size - if sz == length offsets then do + if sz >= length offsets then do pieces <- liftIO $ readTVarIO r <&> IntMap.elems let block = mconcat pieces let h1 = hashObject @HbSync block if h1 == h then do - -- debug "PROCESS BLOCK" + trace $ "PROCESS BLOCK" <+> pretty coo <+> pretty h lift $ expire @e key void $ liftIO $ putBlock sto block onBlockDownloaded brains peer h @@ -293,8 +318,14 @@ downloadFromWithPeer peer thisBkSize h = do -- however, let's try do download the tails -- by one chunk a time for_ missed $ \n -> do + trace $ "MISSED CHUNK" <+> pretty coo <+> pretty n liftIO $ atomically $ writeTQueue rq (n,1) + next + + lift $ expire @e key + trace $ "downloadFromWithPeer EXIT" <+> pretty coo + instance HasPeerLocator e m => HasPeerLocator e (BlockDownloadM e m) where getPeerLocator = lift getPeerLocator @@ -303,8 +334,12 @@ instance HasPeerLocator e m => HasPeerLocator e (BlockDownloadM e m) where -- NOTE: updatePeerInfo is CC -- updatePeerInfo is actuall doing CC (congestion control) -updatePeerInfo :: MonadIO m => Bool -> PeerInfo e -> m () -updatePeerInfo onError pinfo = do +updatePeerInfo :: forall e m . (e ~ L4Proto, MonadIO m) => Bool -> Peer e -> PeerInfo e -> m () + +updatePeerInfo _ p pinfo | view sockType p == TCP = do + liftIO $ atomically $ writeTVar (view peerBurst pinfo) 256 + +updatePeerInfo onError _ pinfo = do t1 <- liftIO getTimeCoarse @@ -332,12 +367,12 @@ updatePeerInfo onError pinfo = do (bu1, bus) <- if eps == 0 && not onError then do let bmm = fromMaybe defBurstMax buMax - let buN = min bmm (ceiling (realToFrac bu * 1.05)) + let buN = min bmm (ceiling (realToFrac bu * 1.25)) pure (buN, trimUp win $ IntSet.insert buN buSet) else do - let buM = headMay $ drop 2 $ IntSet.toDescList buSet + let buM = headMay $ drop 1 $ IntSet.toDescList buSet writeTVar (view peerBurstMax pinfo) buM - let buN = headDef defBurst $ drop 4 $ IntSet.toDescList buSet + let buN = headDef defBurst $ drop 2 $ IntSet.toDescList buSet pure (buN, trimDown win $ IntSet.insert buN buSet) @@ -381,6 +416,7 @@ blockDownloadLoop :: forall e m . ( m ~ PeerM e IO , PeerMessaging e , IsPeerAddr e m , HasPeerLocator e m + , e ~ L4Proto ) => DownloadEnv e -> m () blockDownloadLoop env0 = do @@ -414,7 +450,7 @@ blockDownloadLoop env0 = do for_ pee $ \p -> do pinfo <- fetch True npi (PeerInfoKey p) id - updatePeerInfo False pinfo + updatePeerInfo False p pinfo void $ liftIO $ async $ forever $ withAllStuff do @@ -454,6 +490,7 @@ blockDownloadLoop env0 = do when (List.null pips) do void $ liftIO $ race (pause @'Seconds 5) $ do + trace "ALL PEERS BUSY" void $ liftIO $ atomically $ do p <- readTQueue released ps <- flushTQueue released @@ -501,6 +538,9 @@ blockDownloadLoop env0 = do r <- liftIO $ race ( pause defBlockWaitMax ) $ withAllStuff $ downloadFromWithPeer p size h + + liftIO $ atomically $ writeTQueue released p + case r of Left{} -> do liftIO $ atomically $ modifyTVar downFail succ @@ -508,7 +548,7 @@ blockDownloadLoop env0 = do Right{} -> do onBlockDownloaded brains p h - processBlock h + liftIO $ withAllStuff $ processBlock h liftIO $ atomically do writeTVar downFail 0 modifyTVar downBlk succ @@ -633,8 +673,17 @@ mkAdapter = do unless (isJust dodo) $ do debug $ "session lost for peer !" <+> pretty p - dwnld <- MaybeT $ find cKey (view sBlockChunks) - liftIO $ atomically $ writeTQueue dwnld (n, bs) +-- debug $ "FINDING-SESSION:" <+> pretty c <+> pretty n +-- debug $ "GOT SHIT" <+> pretty c <+> pretty n + + se <- MaybeT $ find cKey id + let dwnld = view sBlockChunks se + let dwnld2 = view sBlockChunks2 se + + -- debug $ "WRITE SHIT" <+> pretty c <+> pretty n + liftIO $ atomically do + writeTQueue dwnld (n, bs) + modifyTVar' dwnld2 (IntMap.insert (fromIntegral n) bs) } diff --git a/hbs2-peer/app/BlockHttpDownload.hs b/hbs2-peer/app/BlockHttpDownload.hs index 07badc45..984228e3 100644 --- a/hbs2-peer/app/BlockHttpDownload.hs +++ b/hbs2-peer/app/BlockHttpDownload.hs @@ -146,7 +146,8 @@ updatePeerHttpAddrs :: forall e m . , IsPeerAddr e m , Pretty (Peer e) , Pretty (PeerAddr e) - , EventListener e( PeerMetaProto e) m + , EventListener e ( PeerMetaProto e) m + -- , e ~ L4Proto ) => m () updatePeerHttpAddrs = do @@ -154,11 +155,6 @@ updatePeerHttpAddrs = do pl <- getPeerLocator @e forever do - -- REVIEW: isnt-it-too-often - -- Не слишком ли часто обновлять http адрес? - -- Зачем раз в пять секунд? - -- -- Это попытка узнать адрес. Если раз определили его, то уже не будем снова пытаться. - -- При этом всего будет не более трёх попыток. pause @'Seconds 5 ps <- knownPeers @e pl debug $ "updatePeerHttpAddrs peers:" <+> pretty ps diff --git a/hbs2-peer/app/Bootstrap.hs b/hbs2-peer/app/Bootstrap.hs index b2fdc9ab..4b0a0ace 100644 --- a/hbs2-peer/app/Bootstrap.hs +++ b/hbs2-peer/app/Bootstrap.hs @@ -5,7 +5,6 @@ import HBS2.Prelude import HBS2.Net.Proto.Types import HBS2.Net.Proto.Peer import HBS2.Clock -import HBS2.Net.Messaging.UDP import HBS2.Net.IP.Addr import HBS2.Net.Proto.Sessions @@ -13,8 +12,7 @@ import PeerConfig import HBS2.System.Logger.Simple import Data.Functor -import Network.DNS qualified as DNS -import Network.DNS (Name(..),CharStr(..)) +import Network.DNS import Data.ByteString.Char8 qualified as B8 import Data.Foldable import Data.Maybe @@ -22,6 +20,8 @@ import Data.Set qualified as Set import Data.Set (Set) import Control.Monad import Network.Socket +import Control.Monad.Trans.Maybe + data PeerDnsBootStrapKey @@ -33,61 +33,64 @@ instance HasCfgKey PeerDnsBootStrapKey (Set String) where instance HasCfgKey PeerKnownPeer [String] where key = "known-peer" +-- FIXME: tcp-addr-support-bootstrap bootstrapDnsLoop :: forall e m . ( HasPeer e , Request e (PeerHandshake e) m , HasNonces (PeerHandshake e) m , Nonce (PeerHandshake e) ~ PingNonce , Sessions e (PeerHandshake e) m , Pretty (Peer e) + -- , FromSockAddr 'UDP (Peer e) + , e ~ L4Proto , MonadIO m - , e ~ UDP ) => PeerConfig -> m () bootstrapDnsLoop conf = do pause @'Seconds 2 + rs <- liftIO $ makeResolvSeed defaultResolvConf + forever do debug "I'm a bootstrapLoop" let dns = cfgValue @PeerDnsBootStrapKey conf <> Set.singleton "bootstrap.hbs2.net" + -- FIXME: utf8-domains for_ (Set.toList dns) $ \dn -> do debug $ "bootstrapping from" <+> pretty dn - answers <- liftIO $ DNS.queryTXT (Name $ fromString dn) <&> foldMap ( fmap mkStr . snd ) - for_ answers $ \answ -> do - pips <- liftIO $ parseAddr (fromString answ) <&> fmap (PeerUDP . addrAddress) - for_ pips $ \pip -> do - debug $ "got dns answer" <+> pretty pip - sendPing @e pip + answers <- liftIO $ withResolver rs $ \resolver -> lookupTXT resolver (B8.pack dn) <&> either mempty id + void $ runMaybeT do + for_ answers $ \answ -> do + -- FIXME: tcp-addr-support-1 + pa <- MaybeT $ pure $ fromStringMay @(PeerAddr L4Proto) (B8.unpack answ) + pip <- fromPeerAddr pa + debug $ "BOOTSTRAP:" <+> pretty pip + lift $ sendPing @e pip -- FIXME: fix-bootstrapDnsLoop-time-hardcode pause @'Seconds 300 - where - mkStr (CharStr s) = B8.unpack s -knownPeersPingLoop :: - forall e m. - ( HasPeer e, - Request e (PeerHandshake e) m, - HasNonces (PeerHandshake e) m, - Nonce (PeerHandshake e) ~ PingNonce, - Sessions e (PeerHandshake e) m, - Pretty (Peer e), - MonadIO m, - e ~ UDP - ) => - PeerConfig -> - m () +-- FIXME: tcp-addr-support-known-peers-loop +knownPeersPingLoop :: forall e m . ( HasPeer e + , Request e (PeerHandshake e) m + , HasNonces (PeerHandshake e) m + , Nonce (PeerHandshake e) ~ PingNonce + , Sessions e (PeerHandshake e) m + , Pretty (Peer e) + , e ~ L4Proto + , MonadIO m) + => PeerConfig -> m () knownPeersPingLoop conf = do -- FIXME: add validation and error handling - let parseKnownPeers xs = - fmap (PeerUDP . addrAddress) - . catMaybes - <$> (fmap headMay . parseAddr . fromString) - `mapM` xs + -- FIXME: tcp-addr-support-2 + let parseKnownPeers xs = do + let pa = foldMap (maybeToList . fromStringMay) xs + mapM fromPeerAddr pa + knownPeers' <- liftIO $ parseKnownPeers $ cfgValue @PeerKnownPeer conf forever do forM_ knownPeers' (sendPing @e) pause @'Minutes 20 + diff --git a/hbs2-peer/app/Brains.hs b/hbs2-peer/app/Brains.hs index dd04a454..0526cef7 100644 --- a/hbs2-peer/app/Brains.hs +++ b/hbs2-peer/app/Brains.hs @@ -194,7 +194,7 @@ instance (Hashable (Peer e), Pretty (Peer e)) => HasBrains e (BasicBrains e) whe downs <- liftIO $ readTVarIO (view brainsPostponeDown b) r <- forM peers $ \p -> do - let v = HashMap.lookup (p,h) downs & fromMaybe 0 & (<2) + let v = HashMap.lookup (p,h) downs & fromMaybe 0 & (<4) pure [v] let postpone = not (List.null r || or (mconcat r) ) @@ -204,9 +204,9 @@ instance (Hashable (Peer e), Pretty (Peer e)) => HasBrains e (BasicBrains e) whe shouldDownloadBlock b p h = do noPeers <- liftIO $ readTVarIO (view brainsPeers b) <&> List.null downs <- liftIO $ readTVarIO (view brainsPostponeDown b) - let doo = HashMap.lookup (p,h) downs & fromMaybe 0 & (<2) + let doo = HashMap.lookup (p,h) downs & fromMaybe 0 & (<4) -- trace $ "shouldDownloadBlock" <+> pretty noPeers <+> pretty doo - pure $ noPeers || (HashMap.lookup (p,h) downs & fromMaybe 0 & (<2)) + pure $ noPeers || (HashMap.lookup (p,h) downs & fromMaybe 0 & (<4)) advisePeersForBlock b h = do r <- liftIO $ findPeers b h diff --git a/hbs2-peer/app/PeerInfo.hs b/hbs2-peer/app/PeerInfo.hs index cdcdf2d7..3f40df96 100644 --- a/hbs2-peer/app/PeerInfo.hs +++ b/hbs2-peer/app/PeerInfo.hs @@ -115,11 +115,11 @@ type instance SessionData e (PeerInfo e) = PeerInfo e newtype instance SessionKey e (PeerInfo e) = PeerInfoKey (Peer e) -deriving newtype instance Hashable (SessionKey UDP (PeerInfo UDP)) -deriving stock instance Eq (SessionKey UDP (PeerInfo UDP)) +deriving newtype instance Hashable (SessionKey L4Proto (PeerInfo L4Proto)) +deriving stock instance Eq (SessionKey L4Proto (PeerInfo L4Proto)) -- FIXME: this? -instance Expires (SessionKey UDP (PeerInfo UDP)) where +instance Expires (SessionKey L4Proto (PeerInfo L4Proto)) where expiresIn = const (Just defCookieTimeoutSec) pexLoop :: forall e m . ( HasPeerLocator e m @@ -164,6 +164,7 @@ peerPingLoop :: forall e m . ( HasPeerLocator e m , Pretty (Peer e) , MonadIO m , m ~ PeerM e IO + , e ~ L4Proto ) => PeerConfig -> m () peerPingLoop cfg = do diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index c616f807..276dde41 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -16,6 +16,7 @@ import HBS2.Merkle import HBS2.Net.Auth.Credentials import HBS2.Net.IP.Addr import HBS2.Net.Messaging.UDP +import HBS2.Net.Messaging.TCP import HBS2.Net.PeerLocator import HBS2.Net.Proto import HBS2.Net.Proto.Definition @@ -45,6 +46,7 @@ import CheckMetrics import RefLog qualified import RefLog (reflogWorker) import HttpWorker +import ProxyMessaging import Codec.Serialise import Control.Concurrent.Async @@ -90,8 +92,8 @@ defRpcUDP = "localhost:13331" defLocalMulticast :: String defLocalMulticast = "239.192.152.145:10153" - data PeerListenKey +data PeerListenTCPKey data PeerRpcKey data PeerKeyFileKey data PeerBlackListKey @@ -102,7 +104,7 @@ data PeerTraceKey data PeerProxyFetchKey data AcceptAnnounce = AcceptAnnounceAll - | AcceptAnnounceFrom (Set (PubKey 'Sign (Encryption UDP))) + | AcceptAnnounceFrom (Set (PubKey 'Sign (Encryption L4Proto))) instance Pretty AcceptAnnounce where pretty = \case @@ -117,6 +119,9 @@ instance HasCfgKey PeerTraceKey FeatureSwitch where instance HasCfgKey PeerListenKey (Maybe String) where key = "listen" +instance HasCfgKey PeerListenTCPKey (Maybe String) where + key = "listen-tcp" + instance HasCfgKey PeerRpcKey (Maybe String) where key = "rpc" @@ -143,7 +148,7 @@ instance HasCfgValue PeerAcceptAnnounceKey AcceptAnnounce where where fromAll = headMay [ AcceptAnnounceAll | ListVal @C (Key s [SymbolVal "*"]) <- syn, s == kk ] lst = Set.fromList $ - catMaybes [ fromStringMay @(PubKey 'Sign (Encryption UDP)) (Text.unpack e) + catMaybes [ fromStringMay @(PubKey 'Sign (Encryption L4Proto)) (Text.unpack e) | ListVal @C (Key s [LitStrVal e]) <- syn, s == kk ] kk = key @PeerAcceptAnnounceKey @AcceptAnnounce @@ -161,14 +166,14 @@ makeLenses 'RPCOpt data RPCCommand = POKE | ANNOUNCE (Hash HbSync) - | PING (PeerAddr UDP) (Maybe (Peer UDP)) - | CHECK PeerNonce (PeerAddr UDP) (Hash HbSync) + | PING (PeerAddr L4Proto) (Maybe (Peer L4Proto)) + | CHECK PeerNonce (PeerAddr L4Proto) (Hash HbSync) | FETCH (Hash HbSync) | PEERS | SETLOG SetLogging | REFLOGUPDATE ByteString - | REFLOGFETCH (PubKey 'Sign (Encryption UDP)) - | REFLOGGET (PubKey 'Sign (Encryption UDP)) + | REFLOGFETCH (PubKey 'Sign (Encryption L4Proto)) + | REFLOGGET (PubKey 'Sign (Encryption L4Proto)) data PeerOpts = PeerOpts @@ -316,11 +321,11 @@ runCLI = join . customExecParser (prefs showHelpOnError) $ trace "pRefLogSend" s <- BS.readFile kr -- FIXME: UDP is weird here - creds <- pure (parseCredentials @(Encryption UDP) (AsCredFile s)) `orDie` "bad keyring file" + creds <- pure (parseCredentials @(Encryption L4Proto) (AsCredFile s)) `orDie` "bad keyring file" bs <- BS.take defChunkSize <$> BS.hGetContents stdin let pubk = view peerSignPk creds let privk = view peerSignSk creds - msg <- makeRefLogUpdate @UDP pubk privk bs <&> serialise + msg <- makeRefLogUpdate @L4Proto pubk privk bs <&> serialise runRpcCommand rpc (REFLOGUPDATE msg) pRefLogSendRaw = do @@ -410,7 +415,7 @@ instance ( Monad m -- runPeer :: forall e . (e ~ UDP, Nonce (RefLogUpdate e) ~ BS.ByteString) => PeerOpts -> IO () -runPeer :: forall e s . ( e ~ UDP +runPeer :: forall e s . ( e ~ L4Proto , FromStringMaybe (PeerAddr e) , s ~ Encryption e ) => PeerOpts -> IO () @@ -492,8 +497,8 @@ runPeer opts = Exception.handle myException $ do w <- replicateM defStorageThreads $ async $ simpleStorageWorker s - localMulticast <- (headMay <$> parseAddr (fromString defLocalMulticast) - <&> fmap (PeerUDP . addrAddress)) + localMulticast <- (headMay <$> parseAddrUDP (fromString defLocalMulticast) + <&> fmap (fromSockAddr @'UDP . addrAddress) ) `orDie` "assertion: localMulticastPeer not set" @@ -523,7 +528,24 @@ runPeer opts = Exception.handle myException $ do denv <- newDownloadEnv brains - penv <- newPeerEnv (AnyStorage s) (Fabriq mess) (getOwnPeer mess) + let tcpListen = cfgValue @PeerListenTCPKey conf & fromMaybe "" + let addr' = fromStringMay @(PeerAddr L4Proto) tcpListen + + trace $ "TCP addr:" <+> pretty tcpListen <+> pretty addr' + + tcp <- maybe1 addr' (pure Nothing) $ \addr -> do + tcpEnv <- newMessagingTCP addr + -- FIXME: handle-tcp-thread-somehow + void $ async $ runMessagingTCP tcpEnv + `catch` (\(e::SomeException) -> throwIO e ) + pure $ Just tcpEnv + + proxy <- newProxyMessaging mess tcp + + proxyThread <- async $ runProxyMessaging proxy + `catch` (\(e::SomeException) -> throwIO e ) + + penv <- newPeerEnv (AnyStorage s) (Fabriq proxy) (getOwnPeer mess) nbcache <- liftIO $ Cache.newCache (Just $ toTimeSpec ( 600 :: Timeout 'Seconds)) @@ -604,26 +626,27 @@ runPeer opts = Exception.handle myException $ do banned <- peerBanned p d let doAddPeer p = do - addPeers pl [p] + addPeers pl [p] - -- TODO: better-handling-for-new-peers - npi <- newPeerInfo + -- TODO: better-handling-for-new-peers + npi <- newPeerInfo - here <- find @e (KnownPeerKey p) id <&> isJust + here <- find @e (KnownPeerKey p) id <&> isJust - pfails <- fetch True npi (PeerInfoKey p) (view peerPingFailed) - liftIO $ atomically $ writeTVar pfails 0 - -- pdownfails <- fetch True npi (PeerInfoKey p) (view peerDownloadFail) + pfails <- fetch True npi (PeerInfoKey p) (view peerPingFailed) + liftIO $ atomically $ writeTVar pfails 0 + -- pdownfails <- fetch True npi (PeerInfoKey p) (view peerDownloadFail) - unless here do - -- liftIO $ atomically $ writeTVar pdownfails 0 + unless here do + -- liftIO $ atomically $ writeTVar pdownfails 0 - debug $ "Got authorized peer!" <+> pretty p - <+> pretty (AsBase58 (view peerSignKey d)) + debug $ "Got authorized peer!" <+> pretty p + <+> pretty (AsBase58 (view peerSignKey d)) -- FIXME: check if we've got a reference to ourselves if | pnonce == thatNonce -> do + debug $ "GOT OWN NONCE FROM" <+> pretty p delPeers pl [p] addExcluded pl [p] expire (KnownPeerKey p) @@ -642,34 +665,51 @@ runPeer opts = Exception.handle myException $ do let pd = Map.fromList $ catMaybes pd' + let proto1 = view sockType p + case Map.lookup thatNonce pd of -- TODO: prefer-local-peer-with-same-nonce-over-remote-peer -- remove remote peer -- add local peer - Just p0 | p0 /= p -> do - debug "Same peer, different address" - void $ runMaybeT do + -- FIXME: move-protocol-comparison-to-peer-nonce + -- - pinfo0 <- MaybeT $ find (PeerInfoKey p0) id - pinfo1 <- MaybeT $ find (PeerInfoKey p) id + Nothing -> doAddPeer p - rtt0 <- MaybeT $ medianPeerRTT pinfo0 - rtt1 <- MaybeT $ medianPeerRTT pinfo1 + Just p0 -> do - when ( rtt1 < rtt0 ) do - debug $ "Better rtt!" <+> pretty p0 - <+> pretty p - <+> pretty rtt0 - <+> pretty rtt1 + pa0 <- toPeerAddr p0 + pa1 <- toPeerAddr p - lift $ do - expire (KnownPeerKey p0) - delPeers pl [p] + if | pa0 == pa1 -> none + | view sockType p0 /= view sockType p -> do doAddPeer p - _ -> doAddPeer p + | otherwise -> do + + debug "Same peer, different address" + + void $ runMaybeT do + + pinfo0 <- MaybeT $ find (PeerInfoKey p0) id + pinfo1 <- MaybeT $ find (PeerInfoKey p) id + + rtt0 <- MaybeT $ medianPeerRTT pinfo0 + rtt1 <- MaybeT $ medianPeerRTT pinfo1 + + when ( rtt1 < rtt0 ) do + debug $ "Better rtt!" <+> pretty p0 + <+> pretty p + <+> pretty rtt0 + <+> pretty rtt1 + + lift $ do + expire (KnownPeerKey p0) + delPeers pl [p0] + -- addExcluded pl [p0] + doAddPeer p void $ liftIO $ async $ withPeerM env do @@ -687,6 +727,8 @@ runPeer opts = Exception.handle myException $ do debug "sending local peer announce" request localMulticast (PeerAnnounce @e pnonce) + -- peerThread (tcpWorker conf) + peerThread (httpWorker conf denv) peerThread (checkMetrics metrics) @@ -703,7 +745,7 @@ runPeer opts = Exception.handle myException $ do if useHttpDownload then do - peerThread updatePeerHttpAddrs + peerThread (updatePeerHttpAddrs) peerThread (blockHttpDownloadLoop denv) else pure mempty @@ -790,7 +832,7 @@ runPeer opts = Exception.handle myException $ do trace "REFLOGUPDATE" - let msg' = deserialiseOrFail @(RefLogUpdate UDP) bs + let msg' = deserialiseOrFail @(RefLogUpdate L4Proto) bs & either (const Nothing) Just when (isNothing msg') do @@ -956,7 +998,7 @@ rpcClientMain opt action = do setLoggingOff @DEBUG action -withRPC :: FromStringMaybe (PeerAddr UDP) => RPCOpt -> RPC UDP -> IO () +withRPC :: FromStringMaybe (PeerAddr L4Proto) => RPCOpt -> RPC L4Proto -> IO () withRPC o cmd = rpcClientMain o $ do hSetBuffering stdout LineBuffering @@ -967,7 +1009,7 @@ withRPC o cmd = rpcClientMain o $ do saddr <- pure (view rpcOptAddr o <|> rpcConf) `orDie` "RPC endpoint not set" - as <- parseAddr (fromString saddr) <&> fmap (PeerUDP . addrAddress) + as <- parseAddrUDP (fromString saddr) <&> fmap (fromSockAddr @'UDP . addrAddress) let rpc' = headMay $ L.sortBy (compare `on` addrPriority) as rpc <- pure rpc' `orDie` "Can't parse RPC endpoint" @@ -1007,7 +1049,7 @@ withRPC o cmd = rpcClientMain o $ do prpc <- async $ runRPC udp1 do env <- ask proto <- liftIO $ async $ continueWithRPC env $ do - runProto @UDP + runProto @L4Proto [ makeResponse (rpcHandler adapter) ] @@ -1066,7 +1108,7 @@ withRPC o cmd = rpcClientMain o $ do void $ waitAnyCatchCancel [mrpc, prpc] -runRpcCommand :: FromStringMaybe (IPAddrPort UDP) => RPCOpt -> RPCCommand -> IO () +runRpcCommand :: FromStringMaybe (IPAddrPort L4Proto) => RPCOpt -> RPCCommand -> IO () runRpcCommand opt = \case POKE -> withRPC opt RPCPoke PING s _ -> withRPC opt (RPCPing s) diff --git a/hbs2-peer/app/PeerTypes.hs b/hbs2-peer/app/PeerTypes.hs index f22d9d3d..569ca248 100644 --- a/hbs2-peer/app/PeerTypes.hs +++ b/hbs2-peer/app/PeerTypes.hs @@ -9,7 +9,6 @@ import HBS2.Clock import HBS2.Defaults import HBS2.Events import HBS2.Hash -import HBS2.Net.Messaging.UDP (UDP) import HBS2.Net.Proto import HBS2.Net.Proto.Peer import HBS2.Net.Proto.BlockInfo @@ -37,6 +36,7 @@ import Data.Maybe import Lens.Micro.Platform import Data.Hashable import Type.Reflection +import Data.IntMap (IntMap) type MyPeer e = ( Eq (Peer e) @@ -105,14 +105,15 @@ data BlockDownload = , _sBlockSize :: Size , _sBlockChunkSize :: ChunkSize , _sBlockChunks :: TQueue (ChunkNum, ByteString) + , _sBlockChunks2 :: TVar (IntMap ByteString) } deriving stock (Typeable) makeLenses 'BlockDownload newBlockDownload :: MonadIO m => Hash HbSync -> m BlockDownload -newBlockDownload h = do - BlockDownload h 0 0 <$> liftIO newTQueueIO +newBlockDownload h = liftIO do + BlockDownload h 0 0 <$> newTQueueIO <*> newTVarIO mempty type instance SessionData e (BlockChunks e) = BlockDownload @@ -121,8 +122,8 @@ newtype instance SessionKey e (BlockChunks e) = DownloadSessionKey (Peer e, Cookie e) deriving stock (Generic,Typeable) -deriving newtype instance Hashable (SessionKey UDP (BlockChunks UDP)) -deriving stock instance Eq (SessionKey UDP (BlockChunks UDP)) +deriving newtype instance Hashable (SessionKey L4Proto (BlockChunks L4Proto)) +deriving stock instance Eq (SessionKey L4Proto (BlockChunks L4Proto)) data BlockState = BlockState diff --git a/hbs2-peer/app/ProxyMessaging.hs b/hbs2-peer/app/ProxyMessaging.hs new file mode 100644 index 00000000..e70d093a --- /dev/null +++ b/hbs2-peer/app/ProxyMessaging.hs @@ -0,0 +1,88 @@ +{-# Language TemplateHaskell #-} +module ProxyMessaging + ( ProxyMessaging + , newProxyMessaging + , runProxyMessaging + ) where + +import HBS2.Prelude.Plated +import HBS2.Net.Messaging +import HBS2.Clock +import HBS2.Net.Proto.Types +import HBS2.Net.Messaging.UDP +import HBS2.Net.Messaging.TCP + +import HBS2.System.Logger.Simple + +import Control.Concurrent.Async +import Control.Concurrent.STM +import Control.Concurrent.STM.TQueue +import Data.ByteString.Lazy (ByteString) +import Data.ByteString.Lazy qualified as LBS +import Data.List qualified as L +import Lens.Micro.Platform +import Control.Monad + +-- TODO: protocol-encryption-goes-here + +data ProxyMessaging = + ProxyMessaging + { _proxyUDP :: MessagingUDP + , _proxyTCP :: Maybe MessagingTCP + , _proxyAnswers :: TQueue (From L4Proto, ByteString) + } + +makeLenses 'ProxyMessaging + +newProxyMessaging :: forall m . MonadIO m + => MessagingUDP + -> Maybe MessagingTCP + -> m ProxyMessaging + +newProxyMessaging u t = liftIO do + ProxyMessaging u t + <$> newTQueueIO + +runProxyMessaging :: forall m . MonadIO m + => ProxyMessaging + -> m () + +runProxyMessaging env = liftIO do + + let udp = view proxyUDP env + let answ = view proxyAnswers env + let udpPeer = getOwnPeer udp + + u <- async $ forever do + msgs <- receive udp (To udpPeer) + atomically $ do + forM_ msgs $ writeTQueue answ + + t <- async $ maybe1 (view proxyTCP env) none $ \tcp -> do + forever do + msgs <- receive tcp (To $ view tcpOwnPeer tcp) + atomically $ do + forM_ msgs $ writeTQueue answ + + liftIO $ mapM_ waitCatch [u,t] + +instance Messaging ProxyMessaging L4Proto ByteString where + + sendTo bus t@(To whom) f m = do + -- sendTo (view proxyUDP bus) t f m + -- trace $ "PROXY: SEND" <+> pretty whom + let udp = view proxyUDP bus + case view sockType whom of + UDP -> sendTo udp t f m + TCP -> maybe1 (view proxyTCP bus) none $ \tcp -> do + sendTo tcp t f m + + receive bus _ = liftIO do + -- trace "PROXY: RECEIVE" + -- receive (view proxyUDP bus) w + let answ = view proxyAnswers bus + atomically $ do + r <- readTQueue answ + rs <- flushTQueue answ + pure (r:rs) + diff --git a/hbs2-peer/app/RPC.hs b/hbs2-peer/app/RPC.hs index b1221cf3..9393ff25 100644 --- a/hbs2-peer/app/RPC.hs +++ b/hbs2-peer/app/RPC.hs @@ -4,8 +4,8 @@ module RPC where import HBS2.Prelude.Plated import HBS2.Net.Proto -import HBS2.Hash import HBS2.Net.Messaging.UDP +import HBS2.Hash import HBS2.Actors.Peer import HBS2.Net.Auth.Credentials import HBS2.Net.Proto.Definition() @@ -41,17 +41,17 @@ data RPC e = instance (Serialise (PeerAddr e), Serialise (PubKey 'Sign (Encryption e))) => Serialise (RPC e) -instance HasProtocol UDP (RPC UDP) where - type instance ProtocolId (RPC UDP) = 0xFFFFFFE0 - type instance Encoded UDP = ByteString +instance HasProtocol L4Proto (RPC L4Proto) where + type instance ProtocolId (RPC L4Proto) = 0xFFFFFFE0 + type instance Encoded L4Proto = ByteString decode = either (const Nothing) Just . deserialiseOrFail encode = serialise data RPCEnv = RPCEnv - { _rpcSelf :: Peer UDP - , _rpcFab :: Fabriq UDP + { _rpcSelf :: Peer L4Proto + , _rpcFab :: Fabriq L4Proto } makeLenses 'RPCEnv @@ -84,7 +84,7 @@ newtype RpcM m a = RpcM { fromRpcM :: ReaderT RPCEnv m a } ) runRPC :: ( MonadIO m - , PeerMessaging UDP + , PeerMessaging L4Proto ) => MessagingUDP -> RpcM m a -> m a @@ -95,13 +95,13 @@ runRPC udp m = runReaderT (fromRpcM m) (RPCEnv pip (Fabriq udp)) continueWithRPC :: RPCEnv -> RpcM m a -> m a continueWithRPC e m = runReaderT (fromRpcM m) e -instance Monad m => HasFabriq UDP (RpcM m) where +instance Monad m => HasFabriq L4Proto (RpcM m) where getFabriq = asks (view rpcFab) -instance Monad m => HasOwnPeer UDP (RpcM m) where +instance Monad m => HasOwnPeer L4Proto (RpcM m) where ownPeer = asks (view rpcSelf) -instance (Monad m, HasProtocol UDP p) => HasTimeLimits UDP p (RpcM m) where +instance (Monad m, HasProtocol L4Proto p) => HasTimeLimits L4Proto p (RpcM m) where tryLockForPeriod _ _ = pure True rpcHandler :: forall e m . ( MonadIO m diff --git a/hbs2-peer/hbs2-peer.cabal b/hbs2-peer/hbs2-peer.cabal index 198b51c0..bbc59852 100644 --- a/hbs2-peer/hbs2-peer.cabal +++ b/hbs2-peer/hbs2-peer.cabal @@ -26,6 +26,7 @@ common common-deps , data-default , deepseq , directory + , dns , filepath , hashable , microlens-platform @@ -37,7 +38,7 @@ common common-deps , prettyprinter , random , random-shuffle - , resolv + -- , resolv , safe , saltine >=0.2.0.1 , suckless-conf @@ -124,6 +125,7 @@ executable hbs2-peer , CheckMetrics , HttpWorker , Brains + , ProxyMessaging -- other-extensions: build-depends: base diff --git a/hbs2-tests/hbs2-tests.cabal b/hbs2-tests/hbs2-tests.cabal index 40f3b3b2..9773da7c 100644 --- a/hbs2-tests/hbs2-tests.cabal +++ b/hbs2-tests/hbs2-tests.cabal @@ -49,6 +49,8 @@ common common-deps , uniplate , unordered-containers , vector + , prettyprinter-ansi-terminal + , interpolatedstring-perl6 common shared-properties ghc-options: @@ -157,6 +159,103 @@ executable test-udp , uniplate , vector +test-suite test-tcp + import: shared-properties + import: common-deps + default-language: Haskell2010 + + ghc-options: + -- -prof + -- -fprof-auto + + other-modules: + + -- other-extensions: + + type: exitcode-stdio-1.0 + hs-source-dirs: test + main-is: TestTCP.hs + + build-depends: + base, hbs2-core, hbs2-storage-simple + , async + , attoparsec + , bytestring + , cache + , clock + , containers + , data-default + , data-textual + , directory + , hashable + , microlens-platform + , mtl + , mwc-random + , network + , network-ip + , prettyprinter + , QuickCheck + , random + , safe + , serialise + , stm + , streaming + , tasty + , tasty-hunit + , text + , transformers + , uniplate + , vector + , network-simple + , network-byte-order + + +executable test-tcp-net + import: shared-properties + import: common-deps + default-language: Haskell2010 + + ghc-options: + -- -prof + -- -fprof-auto + + other-modules: + + -- other-extensions: + + -- type: exitcode-stdio-1.0 + hs-source-dirs: test + main-is: TestTCPNet.hs + + build-depends: + base, hbs2-core + , async + , attoparsec + , bytestring + , cache + , clock + , containers + , data-default + , data-textual + , hashable + , microlens-platform + , mtl + , mwc-random + , network + , network-ip + , prettyprinter + , QuickCheck + , random + , safe + , serialise + , stm + , streaming + , tasty + , tasty-hunit + , text + , transformers + , uniplate + , vector executable test-logger diff --git a/hbs2-tests/test/TestTCP.hs b/hbs2-tests/test/TestTCP.hs new file mode 100644 index 00000000..c2dbe1a8 --- /dev/null +++ b/hbs2-tests/test/TestTCP.hs @@ -0,0 +1,248 @@ +{-# Language TemplateHaskell #-} +module Main where + +import HBS2.Prelude.Plated +import HBS2.Net.Proto.Types +import HBS2.Clock +import HBS2.Net.Messaging.TCP +import HBS2.Actors.Peer + +import HBS2.System.Logger.Simple + +import System.IO +import Control.Monad.Reader +import Control.Monad.Writer hiding (listen) + +import Test.Tasty.HUnit + +import Data.ByteString.Lazy (ByteString) +import Control.Concurrent.Async +import Lens.Micro.Platform +import Codec.Serialise + +logPrefix s = set loggerTr (s <>) + +tracePrefix :: SetLoggerEntry +tracePrefix = logPrefix "[trace] " + +debugPrefix :: SetLoggerEntry +debugPrefix = logPrefix "[debug] " + +errorPrefix :: SetLoggerEntry +errorPrefix = logPrefix "[error] " + +warnPrefix :: SetLoggerEntry +warnPrefix = logPrefix "[warn] " + +noticePrefix :: SetLoggerEntry +noticePrefix = logPrefix "[RT] " + +testPeerAddr :: IO () +testPeerAddr = do + + let p1 = fromStringMay @(PeerAddr L4Proto) "192.168.1.2:5551" + let p2 = fromStringMay @(PeerAddr L4Proto) "udp://192.168.1.2:5551" + let p3 = fromStringMay @(PeerAddr L4Proto) "tcp://192.168.1.2:5551" + + debug $ "parsed udp addr:" <+> pretty p1 + debug $ "parsed udp addr:" <+> pretty p2 + debug $ "parsed tcp addr:" <+> pretty p3 + + assertEqual "udp address check 1" p2 p2 + assertBool "tcp and udp are different" (p1 /= p3) + + case p1 of + (Just (L4Address UDP _)) -> pure () + _ -> assertFailure "p1 is not UDP" + + case p2 of + (Just (L4Address UDP _)) -> pure () + _ -> assertFailure "p1 is not UDP" + + case p3 of + (Just (L4Address TCP _)) -> pure () + _ -> assertFailure "p3 is not TCP" + + peerUDP0 <- fromPeerAddr @L4Proto "192.168.1.1:5551" + peerUDP1 <- fromPeerAddr @L4Proto "udp://192.168.1.1:5551" + peerTCP <- fromPeerAddr @L4Proto "tcp://192.168.1.1:3001" + + debug $ "peerUDP0" <+> pretty peerUDP0 + debug $ "peerUDP1" <+> pretty peerUDP1 + debug $ "peerTCP" <+> pretty peerTCP + + pure () + + +data PingPong e = Ping Int + | Pong Int + deriving stock (Eq,Generic,Show,Read) + + +instance Serialise (PingPong e) + +instance HasProtocol L4Proto (PingPong L4Proto) where + type instance ProtocolId (PingPong L4Proto) = 1 + type instance Encoded L4Proto = ByteString + decode = either (const Nothing) Just . deserialiseOrFail + encode = serialise + +testCmd :: forall a ann b m . ( Pretty a + , Pretty b + , MonadIO m + ) + => a -> Doc ann -> b -> m () + +testCmd p1 s p2 = do + notice $ brackets (pretty p1) + <+> s + <+> parens (pretty p2) + +pingPongHandler :: forall e m . ( MonadIO m + , Response e (PingPong e) m + , HasProtocol e (PingPong e) + , HasOwnPeer e m + , Pretty (Peer e) + ) + => Int + -> PingPong e + -> m () + +pingPongHandler n req = do + + that <- thatPeer (Proxy @(PingPong e)) + own <- ownPeer @e + + case req of + + Ping c -> do + testCmd own (">>> RECV PING" <+> pretty c) that + + when ( c <= n ) do + testCmd own ("<<< SEND PONG" <+> pretty (succ c)) that + response (Pong @e (succ c)) + + Pong c -> do + testCmd own (">>> RECV PONG" <+> pretty c) that + testCmd own (">>> SEND PING BACK" <+> pretty (succ c)) that + + response (Ping @e c) + + +data PPEnv = + PPEnv + { _ppSelf :: Peer L4Proto + , _ppFab :: Fabriq L4Proto + } + +makeLenses 'PPEnv + +newtype PingPongM m a = PingPongM { fromPingPong :: ReaderT PPEnv m a } + deriving newtype ( Functor + , Applicative + , Monad + , MonadIO + , MonadReader PPEnv + , MonadTrans + ) + +runPingPong :: (MonadIO m, PeerMessaging L4Proto) => MessagingTCP-> PingPongM m a -> m a +runPingPong tcp m = runReaderT (fromPingPong m) (PPEnv (view tcpOwnPeer tcp) (Fabriq tcp)) + +instance Monad m => HasFabriq L4Proto (PingPongM m) where + getFabriq = asks (view ppFab) + +instance Monad m => HasOwnPeer L4Proto (PingPongM m) where + ownPeer = asks (view ppSelf) + +instance HasTimeLimits L4Proto (PingPong L4Proto) IO where + tryLockForPeriod _ _ = pure True + + +main :: IO () +main = do + hSetBuffering stdout LineBuffering + hSetBuffering stderr LineBuffering + + setLogging @DEBUG debugPrefix + setLogging @INFO defLog + setLogging @ERROR errorPrefix + setLogging @WARN warnPrefix + setLogging @NOTICE noticePrefix + setLogging @TRACE tracePrefix + + testPeerAddr + + let pa1 = fromString "tcp://127.0.0.1:3001" + let pa2 = fromString "tcp://127.0.0.1:3002" + + let pa3 = fromString "tcp://127.0.0.1:3003" + pip3 <- fromPeerAddr pa3 + + -- let pa3 = fromSockAddr @'TCP $ fromString "tcp://127.0.0.1:3003" + + env1 <- newMessagingTCP pa1 + env2 <- newMessagingTCP pa2 + + p1 <- fromPeerAddr pa1 + p2 <- fromPeerAddr pa2 + + peer1 <- async do + runMessagingTCP env1 + + peer2 <- async do + runMessagingTCP env2 + + pause @'Seconds 1 + + let runPeers m = snd <$> runWriterT m + let run m = do + x <- liftIO $ async m + tell [x] + + pause @'Seconds 1 + + setLoggingOff @TRACE + + pp1 <- async $ runPingPong env1 do + testCmd (view tcpOwnPeer env1) ("!!! SEND PING" <+> pretty 1) (view tcpOwnPeer env2) + request (view tcpOwnPeer env2) (Ping @L4Proto 1) + runProto @L4Proto + [ makeResponse (pingPongHandler 3) + ] + + pp2 <- async $ runPingPong env2 do + -- request (view tcpOwnPeer env1) (Ping @L4Proto 1) + runProto @L4Proto + [ makeResponse (pingPongHandler 3) + ] + + pause @'Seconds 1 + + testCmd "!!!" "reverse test" "!!!" + + runPingPong env2 do + testCmd (view tcpOwnPeer env2) ("!!! SEND PING" <+> pretty 1) (view tcpOwnPeer env1) + request (view tcpOwnPeer env1) (Ping @L4Proto 1) + pure () + + forever do + runPingPong env2 do + testCmd (view tcpOwnPeer env1) ("!!! SEND PING" <+> pretty 1) pip3 + request pip3 (Ping @L4Proto 1) + pure () + pause @'Seconds 2 + + -- waiter <- async $ pause @'Seconds 60 + + mapM_ wait [pp1,pp2] + -- void $ waitAnyCatchCancel $ [peer1,peer2] <> conn <> [pp1,pp2] + + setLoggingOff @DEBUG + setLoggingOff @INFO + setLoggingOff @ERROR + setLoggingOff @WARN + setLoggingOff @NOTICE + setLoggingOff @TRACE + + diff --git a/hbs2-tests/test/TestTCPNet.hs b/hbs2-tests/test/TestTCPNet.hs new file mode 100644 index 00000000..fcf3611c --- /dev/null +++ b/hbs2-tests/test/TestTCPNet.hs @@ -0,0 +1,179 @@ +{-# Language TemplateHaskell #-} +module Main where + +import HBS2.Prelude.Plated +import HBS2.Net.Proto.Types +import HBS2.Clock +import HBS2.Net.Messaging.TCP +import HBS2.Actors.Peer + +import HBS2.System.Logger.Simple + +import System.IO +import Control.Monad.Reader +import Control.Monad.Writer hiding (listen) + +import Test.Tasty.HUnit + +import Data.ByteString.Lazy (ByteString) +import Control.Concurrent.Async +import Lens.Micro.Platform +import Codec.Serialise +import System.Environment + +logPrefix s = set loggerTr (s <>) + +tracePrefix :: SetLoggerEntry +tracePrefix = logPrefix "[trace] " + +debugPrefix :: SetLoggerEntry +debugPrefix = logPrefix "[debug] " + +errorPrefix :: SetLoggerEntry +errorPrefix = logPrefix "[error] " + +warnPrefix :: SetLoggerEntry +warnPrefix = logPrefix "[warn] " + +noticePrefix :: SetLoggerEntry +noticePrefix = logPrefix "[RT] " + + +data PingPong e = Ping Int + | Pong Int + deriving stock (Eq,Generic,Show,Read) + + +instance Serialise (PingPong e) + +instance HasProtocol L4Proto (PingPong L4Proto) where + type instance ProtocolId (PingPong L4Proto) = 1 + type instance Encoded L4Proto = ByteString + decode = either (const Nothing) Just . deserialiseOrFail + encode = serialise + +testCmd :: forall a ann b m . ( Pretty a + , Pretty b + , MonadIO m + ) + => a -> Doc ann -> b -> m () + +testCmd p1 s p2 = do + notice $ brackets (pretty p1) + <+> s + <+> parens (pretty p2) + +pingPongHandler :: forall e m . ( MonadIO m + , Response e (PingPong e) m + , HasProtocol e (PingPong e) + , HasOwnPeer e m + , HasDeferred e (PingPong e) m + , Pretty (Peer e) + ) + => Int + -> PingPong e + -> m () + +pingPongHandler n req = do + + that <- thatPeer (Proxy @(PingPong e)) + own <- ownPeer @e + + case req of + + Ping c -> do + testCmd own ("RECV PING <<<" <+> pretty c) that + + deferred (Proxy @(PingPong e)) do + pause @'Seconds 1 + testCmd own ("SEND PONG >>>" <+> pretty (succ c)) that + response (Pong @e (succ c)) + + Pong c -> do + testCmd own ("RECV PONG <<<" <+> pretty c) that + + deferred (Proxy @(PingPong e)) do + pause @'Seconds 1 + testCmd own ("SEND PING >>>" <+> pretty (succ c)) that + response (Ping @e c) + +data PPEnv = + PPEnv + { _ppSelf :: Peer L4Proto + , _ppFab :: Fabriq L4Proto + } + +makeLenses 'PPEnv + +newtype PingPongM e m a = PingPongM { fromPingPong :: ReaderT PPEnv m a } + deriving newtype ( Functor + , Applicative + , Monad + , MonadIO + , MonadReader PPEnv + , MonadTrans + ) + +runPingPong :: (MonadIO m, PeerMessaging L4Proto) => Peer L4Proto -> Fabriq L4Proto -> PingPongM L4Proto m a -> m a +runPingPong peer tcp m = runReaderT (fromPingPong m) (PPEnv peer tcp) + +instance Monad m => HasFabriq L4Proto (PingPongM L4Proto m) where + getFabriq = asks (view ppFab) + +instance Monad m => HasOwnPeer L4Proto (PingPongM L4Proto m) where + ownPeer = asks (view ppSelf) + +instance HasTimeLimits L4Proto (PingPong L4Proto) IO where + tryLockForPeriod _ _ = pure True + + +instance HasDeferred L4Proto (PingPong L4Proto) (ResponseM L4Proto (PingPongM L4Proto IO)) where + deferred _ m = do + self <- lift $ asks (view ppSelf) + bus <- lift $ asks (view ppFab) + who <- thatPeer (Proxy @(PingPong L4Proto)) + void $ liftIO $ async $ runPingPong self bus (runResponseM who m) + +main :: IO () +main = do + hSetBuffering stdout LineBuffering + hSetBuffering stderr LineBuffering + + setLogging @DEBUG debugPrefix + setLogging @INFO defLog + setLogging @ERROR errorPrefix + setLogging @WARN warnPrefix + setLogging @NOTICE noticePrefix + setLogging @TRACE tracePrefix + + args <- getArgs >>= \case + [self,remote] -> pure (self,remote) + _ -> error "bad args" + + let self = fromString (fst args) -- "tcp://127.0.0.1:3001" + remote <- fromPeerAddr $ fromString (snd args) :: IO (Peer L4Proto) + + tcp <- newMessagingTCP self + + peer <- async do + runMessagingTCP tcp + + -- setLoggingOff @TRACE + + pp1 <- async $ runPingPong (view tcpOwnPeer tcp) (Fabriq tcp) do + testCmd (view tcpOwnPeer tcp) ("!!! SEND PING" <+> pretty 1) remote + request remote (Ping @L4Proto 1) + runProto @L4Proto + [ makeResponse (pingPongHandler 100) + ] + + void $ waitAnyCatchCancel [pp1,peer] + + setLoggingOff @DEBUG + setLoggingOff @INFO + setLoggingOff @ERROR + setLoggingOff @WARN + setLoggingOff @NOTICE + setLoggingOff @TRACE + + diff --git a/hbs2-tests/test/TestUDP.hs b/hbs2-tests/test/TestUDP.hs index 53793c14..e286391c 100644 --- a/hbs2-tests/test/TestUDP.hs +++ b/hbs2-tests/test/TestUDP.hs @@ -16,6 +16,8 @@ import Lens.Micro.Platform import Codec.Serialise import Control.Concurrent.Async +type UDP = L4Proto + debug :: (MonadIO m) => Doc ann -> m () debug p = liftIO $ hPrint stderr p @@ -38,14 +40,15 @@ pingPongHandler :: forall e m . ( MonadIO m , Response e (PingPong e) m , HasProtocol e (PingPong e) ) - => PingPong e + => Int + -> PingPong e -> m () -pingPongHandler = \case +pingPongHandler n = \case Ping c -> debug ("Ping" <+> pretty c) >> response (Pong @e c) - Pong c | c < 100000 -> debug ("Pong" <+> pretty c) >> response (Ping @e (succ c)) + Pong c | c < n -> debug ("Pong" <+> pretty c) >> response (Ping @e (succ c)) | otherwise -> pure () data PPEnv = @@ -89,15 +92,15 @@ main = do m2 <- async $ runMessagingUDP udp2 p1 <- async $ runPingPong udp1 do - request (getOwnPeer udp2) (Ping @UDP (-10000)) + request (getOwnPeer udp2) (Ping @UDP 0) runProto @UDP - [ makeResponse pingPongHandler + [ makeResponse (pingPongHandler 3) ] p2 <- async $ runPingPong udp2 do - request (getOwnPeer udp1) (Ping @UDP 0) + -- request (getOwnPeer udp1) (Ping @UDP 0) runProto @UDP - [ makeResponse pingPongHandler + [ makeResponse (pingPongHandler 3) ] mapM_ wait [p1,p2,m1,m2] diff --git a/hbs2/Main.hs b/hbs2/Main.hs index 7ff12093..61ce3a11 100644 --- a/hbs2/Main.hs +++ b/hbs2/Main.hs @@ -8,7 +8,6 @@ import HBS2.Merkle import HBS2.Net.Proto.Types import HBS2.Net.Auth.AccessKey import HBS2.Net.Auth.Credentials -import HBS2.Net.Messaging.UDP (UDP) import HBS2.Net.Proto.Definition() import HBS2.Prelude.Plated import HBS2.Storage.Simple diff --git a/nix/peer/flake.lock b/nix/peer/flake.lock index de8de24e..714dc8cc 100644 --- a/nix/peer/flake.lock +++ b/nix/peer/flake.lock @@ -29,11 +29,11 @@ "suckless-conf": "suckless-conf" }, "locked": { - "lastModified": 1677558983, - "narHash": "sha256-1KlLTPdRv2cwQkg9FKSEYHqFJ/6WT3mSliyxc22hVzI=", + "lastModified": 1679822846, + "narHash": "sha256-bXGorR8cLCVm3Cu7EcTUGNtaxPwqZH8zLrch7r/ST5c=", "owner": "voidlizard", "repo": "fixme", - "rev": "80caffb07aaa18e1fd2bcbbc2b4acfea628aaa5f", + "rev": "ff3faeeee860b2ed2edf6e69cec26e9b49b167a3", "type": "github" }, "original": { @@ -236,16 +236,16 @@ "suckless-conf": "suckless-conf_2" }, "locked": { - "lastModified": 1679596211, - "narHash": "sha256-MrfKDT4O4kEjM6KKA7taTCBsMSz4OvsxEd+oDNUfzc0=", + "lastModified": 1681115037, + "narHash": "sha256-CovUWmx6Pup3p/6zhIBAltJiUlh2ukFAI1P4U/vnXNw=", "owner": "voidlizard", "repo": "hbs2", - "rev": "df5bb49271f9aa03572a4ac34df480b674501471", + "rev": "21fb2d844076f8b380847854ebbd75cac57e3424", "type": "github" }, "original": { "owner": "voidlizard", - "ref": "master", + "ref": "injecting-tcp", "repo": "hbs2", "type": "github" } @@ -280,11 +280,11 @@ ] }, "locked": { - "lastModified": 1672641093, - "narHash": "sha256-v0Uj3gkDWPdnXZUKpJGD7RxIOncTexhN0csIop36yug=", + "lastModified": 1679933705, + "narHash": "sha256-UOd70L+FKQLmGjA3IqjFaBpaS/dZMSABtRgVDY3lBCg=", "owner": "voidlizard", "repo": "hspup", - "rev": "031d27dea1505fd68cd603da7e72eb5eefd348fd", + "rev": "6b969a9de1f9800ebfc61c51252b8647123c51bb", "type": "github" }, "original": { @@ -371,11 +371,11 @@ "nixpkgs": "nixpkgs_2" }, "locked": { - "lastModified": 1676656630, - "narHash": "sha256-FFEgtajUGdYd/Ux5lkjXXpAKosve+NAfxp/eG7m7JQY=", + "lastModified": 1679815688, + "narHash": "sha256-xLvIoeG5krM0CXfWRcwSgHGP7W8i8dsoKP5hUb182hE=", "owner": "voidlizard", "repo": "suckless-conf", - "rev": "b017bc1e9d6a11d89da294089d312203c39c0b1f", + "rev": "04c432681d3627f180a402674523736f409f964d", "type": "github" }, "original": { @@ -393,11 +393,11 @@ ] }, "locked": { - "lastModified": 1675946914, - "narHash": "sha256-OE0R9dnB+ZXpf30g1xVSMur68iKUDB53pnyA3K2e788=", + "lastModified": 1679815688, + "narHash": "sha256-xLvIoeG5krM0CXfWRcwSgHGP7W8i8dsoKP5hUb182hE=", "owner": "voidlizard", "repo": "suckless-conf", - "rev": "995e1cd52cfe2e9aa4e00ea5cd016548f7932e5a", + "rev": "04c432681d3627f180a402674523736f409f964d", "type": "github" }, "original": { diff --git a/nix/peer/flake.nix b/nix/peer/flake.nix index 9232dbc4..b529c2ee 100644 --- a/nix/peer/flake.nix +++ b/nix/peer/flake.nix @@ -5,7 +5,7 @@ inputs = { extra-container.url = "github:erikarvstedt/extra-container"; nixpkgs.url = "github:nixos/nixpkgs/nixpkgs-unstable"; - hbs2.url = "github:voidlizard/hbs2/master"; + hbs2.url = "github:voidlizard/hbs2/injecting-tcp"; hbs2.inputs.nixpkgs.follows = "nixpkgs"; home-manager.url = "github:nix-community/home-manager"; @@ -68,6 +68,7 @@ tshark tmux gitFull + iptables ]; environment.etc = { @@ -92,6 +93,7 @@ j1u3RJEr8kosBH2DR8XMY6Mj8s environment.etc."hbs2-peer/config" = { text = '' listen "0.0.0.0:7351" +listen-tcp "0.0.0.0:3003" rpc "127.0.0.1:13331" http-port 5001 key "./key" @@ -102,7 +104,7 @@ bootstrap-dns "bootstrap.hbs2.net" known-peer "10.250.0.1:7354" known-peer "10.250.0.1:7351" -poll reflog 1 "2YNGdnDBnciF1Kgmx1EZTjKUp1h5pvYAjrHoApbArpeX" +; poll reflog 1 "2YNGdnDBnciF1Kgmx1EZTjKUp1h5pvYAjrHoApbArpeX" '';