diff --git a/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs b/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs index cb6dbc21..05bb68af 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs @@ -202,6 +202,8 @@ spawnConnection tp env so sa = liftIO do let newP = fromSockAddr @'TCP sa theirCookie <- handshake tp env so + -- TCP address available + -- FIXME: how to use this info let connId = connectionId myCookie theirCookie @@ -341,6 +343,7 @@ connectPeerTCP env peer = liftIO do connect (show i) (show p) $ \(sock, remoteAddr) -> do spawnConnection Client env sock remoteAddr + -- FIXME: tcp-pex. Где-то здесь добавить этих пиров в пекс ? shutdown sock ShutdownBoth runMessagingTCP :: forall m . MonadIO m => MessagingTCP -> m () diff --git a/hbs2-core/lib/HBS2/Net/Proto/PeerMeta.hs b/hbs2-core/lib/HBS2/Net/Proto/PeerMeta.hs index 3b4f8e7d..2a0ceb47 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/PeerMeta.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/PeerMeta.hs @@ -59,8 +59,9 @@ newtype instance Event e (PeerMetaProto e) = PeerMetaEvent AnnMetaData deriving stock (Typeable) -newtype PeerMeta = PeerMeta [(Text, ByteString)] +newtype PeerMeta = PeerMeta { unPeerMeta :: [(Text, ByteString)] } deriving stock (Generic) + deriving newtype (Semigroup, Monoid) instance Serialise PeerMeta diff --git a/hbs2-peer/app/BlockHttpDownload.hs b/hbs2-peer/app/BlockHttpDownload.hs index 62565a86..02236260 100644 --- a/hbs2-peer/app/BlockHttpDownload.hs +++ b/hbs2-peer/app/BlockHttpDownload.hs @@ -33,7 +33,8 @@ import Control.Concurrent.Async import Control.Concurrent.STM import Control.Monad.Reader import Control.Monad.Trans.Maybe -import Data.ByteString.Lazy (ByteString) +import Data.ByteString (ByteString) +import Data.ByteString.Lazy qualified as LBS import Data.Cache qualified as Cache import Data.Foldable hiding (find) import Data.HashMap.Strict qualified as HashMap @@ -44,6 +45,7 @@ import Data.List qualified as List import Data.Maybe import Data.Text qualified as Text import Data.Text.Encoding qualified as TE +import Data.Word import Lens.Micro.Platform import Network.HTTP.Simple (getResponseBody, httpLbs, parseRequest, getResponseStatus) import Network.HTTP.Types.Status @@ -65,7 +67,7 @@ blockHttpDownloadLoop :: forall e m . -- FIXME: backlog-do-something-with-that -- это не ревью, это надо что-то с этим -- сделать, неудачное решение - , Block ByteString ~ ByteString + , Block LBS.ByteString ~ LBS.ByteString ) => DownloadEnv e -> m () blockHttpDownloadLoop denv = do @@ -138,7 +140,7 @@ blockHttpDownloadLoop denv = do --- -updatePeerHttpAddrs :: forall e m . +fillPeerMeta :: forall e m . ( m ~ PeerM e IO , MonadIO m , HasProtocol e (PeerMetaProto e) @@ -148,24 +150,24 @@ updatePeerHttpAddrs :: forall e m . , Pretty (Peer e) , Pretty (PeerAddr e) , EventListener e ( PeerMetaProto e) m - -- , e ~ L4Proto + , e ~ L4Proto ) => m () -updatePeerHttpAddrs = do - debug "I'm updatePeerHttpAddrs" +fillPeerMeta = do + debug "I'm fillPeerMeta" pl <- getPeerLocator @e forever do pause @'Seconds 5 ps <- knownPeers @e pl - debug $ "updatePeerHttpAddrs peers:" <+> pretty ps + debug $ "fillPeerMeta peers:" <+> pretty ps npi <- newPeerInfo for_ ps $ \p -> do pinfo <- fetch True npi (PeerInfoKey p) id mmApiAddr <- liftIO $ readTVarIO (_peerHttpApiAddress pinfo) - debug $ "Find peer http address" <+> pretty p <+> "current:" <+> viaShow mmApiAddr + debug $ "Find peer meta and http address" <+> pretty p <+> "current:" <+> viaShow mmApiAddr case mmApiAddr of Left attemptn -> do @@ -187,16 +189,29 @@ updatePeerHttpAddrs = do liftIO $ atomically $ writeTVar (_peerHttpApiAddress pinfo) $ if attemptn < 3 then (Left (attemptn + 1)) else (Right Nothing) Right (Just meta) -> (void . runMaybeT) do - port <- case meta of + peerMeta <- case meta of NoMetaData -> (MaybeT . pure) Nothing ShortMetadata t -> do - PeerMeta d <- (MaybeT . pure) (parsePeerMeta t) - httpPortBS <- (MaybeT . pure) (lookup "http-port" d) - (MaybeT . pure . readMay . Text.unpack . TE.decodeUtf8) httpPortBS + (MaybeT . pure) (parsePeerMeta t) AnnHashRef h -> (MaybeT . pure) Nothing + liftIO $ atomically $ writeTVar (_peerMeta pinfo) (Just peerMeta) + + -- 3) пробить, что есть tcp + forM_ (lookupDecode "listen-tcp" (unPeerMeta peerMeta)) \listenTCPPort -> lift do + peerTCPAddrPort <- replacePort p listenTCPPort + -- 4) выяснить, можно ли к нему открыть соединение на этот порт + -- возможно, с ним уже открыто соединение + -- или попробовать открыть или запомнить, что было открыто + -- connectPeerTCP ? + -- 5) добавить этих пиров в пекс + -- p :: Peer e <- fromPeerAddr (L4Address TCP (peerTCPAddrPort :: IPAddrPort L4Proto) :: PeerAddr e) + sendPing =<< fromPeerAddr (L4Address TCP peerTCPAddrPort) + + port <- (MaybeT . pure) (lookupDecode "http-port" (unPeerMeta peerMeta)) + lift do - IPAddrPort (ip,_port) <- fromString @(IPAddrPort e) . show . pretty <$> toPeerAddr p - let peerHttpApiAddr = show . pretty $ IPAddrPort (ip,port) + + peerHttpApiAddr <- show . pretty <$> replacePort p port -- check peerHttpApiAddr r <- liftIO $ race ( pause defBlockWaitMax ) do @@ -213,3 +228,13 @@ updatePeerHttpAddrs = do liftIO $ atomically $ writeTVar (_peerHttpApiAddress pinfo) $ Right Nothing _ -> pure () + + where + replacePort :: Peer e -> Word16 -> PeerM e IO (IPAddrPort e) + replacePort peer port = do + IPAddrPort (ip,_port) <- fromString @(IPAddrPort e) . show . pretty <$> toPeerAddr peer + pure $ IPAddrPort (ip,port) + + lookupDecode :: (Eq k, Read v) => k -> [(k, ByteString)] -> Maybe v + lookupDecode k d = + readMay . Text.unpack . TE.decodeUtf8 =<< lookup k d diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 7ec05b85..a41e762b 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -54,6 +54,7 @@ import Control.Concurrent.STM import Control.Exception as Exception import Control.Monad.Reader import Control.Monad.Trans.Maybe +import Control.Monad.Trans.Writer.CPS qualified as W import Crypto.Saltine (sodiumInit) import Data.ByteString.Lazy (ByteString) import Data.ByteString.Lazy qualified as LBS @@ -712,10 +713,9 @@ runPeer opts = Exception.handle myException $ do debug "sending first peer announce" request localMulticast (PeerAnnounce @e pnonce) - let wo = fmap L.singleton - let peerThread = wo . liftIO . async . withPeerM env + let peerThread = W.tell . L.singleton <=< liftIO . async . withPeerM env - workers <- do + workers <- W.execWriterT do peerThread $ forever $ do pause defPeerAnnounceTime -- FIXME: setting! @@ -738,13 +738,12 @@ runPeer opts = Exception.handle myException $ do peerThread (blockDownloadLoop denv) + peerThread fillPeerMeta + -- FIXME: clumsy-code - if useHttpDownload - then do - -- FIXME: discarded-async-value-for-updatePeerHttpAddrs - peerThread updatePeerHttpAddrs - peerThread (blockHttpDownloadLoop denv) - else pure mempty + -- Is it better now ? + when useHttpDownload do + peerThread (blockHttpDownloadLoop denv) peerThread (postponedLoop denv) diff --git a/hbs2-peer/app/PeerTypes.hs b/hbs2-peer/app/PeerTypes.hs index 348f6f17..9aa36b5a 100644 --- a/hbs2-peer/app/PeerTypes.hs +++ b/hbs2-peer/app/PeerTypes.hs @@ -64,6 +64,7 @@ data PeerInfo e = -- Acts like a circular buffer. , _peerHttpApiAddress :: TVar (Either Int (Maybe String)) , _peerHttpDownloaded :: TVar Int + , _peerMeta :: TVar (Maybe PeerMeta) } deriving stock (Generic,Typeable) @@ -87,6 +88,7 @@ newPeerInfo = liftIO do <*> newTVarIO [] <*> newTVarIO (Left 0) <*> newTVarIO 0 + <*> newTVarIO Nothing type instance SessionData e (PeerInfo e) = PeerInfo e