diff --git a/hbs2-peer/app/BlockHttpDownload.hs b/hbs2-peer/app/BlockHttpDownload.hs index 426fa549..e362819f 100644 --- a/hbs2-peer/app/BlockHttpDownload.hs +++ b/hbs2-peer/app/BlockHttpDownload.hs @@ -138,123 +138,3 @@ blockHttpDownloadLoop denv = do _ -> do trace $ "FAIL" <+> pretty peer <+> "download block" <+> pretty h withDownload denv $ failedDownload peer h - - ---- - --- FIXME: move-fillPeerMeta-to-separate-module -fillPeerMeta :: forall e m . - ( m ~ PeerM e IO - , MonadIO m - , HasProtocol e (PeerMetaProto e) - , PeerSessionKey e (PeerInfo e) - , PeerMessaging e - , IsPeerAddr e m - , Pretty (Peer e) - , Pretty (PeerAddr e) - , EventListener e ( PeerMetaProto e) m - , e ~ L4Proto - ) - => Maybe MessagingTCP -> m () -fillPeerMeta mtcp = do - debug "I'm fillPeerMeta" - pl <- getPeerLocator @e - - pause @'Seconds 5 -- wait 'till everything calm down - forever do - - ps <- knownPeers @e pl - debug $ "fillPeerMeta peers:" <+> pretty ps - npi <- newPeerInfo - for_ ps $ \p -> do - - pinfo <- fetch True npi (PeerInfoKey p) id - mmApiAddr <- liftIO $ readTVarIO (_peerHttpApiAddress pinfo) - - debug $ "Find peer meta and http address" <+> pretty p <+> "current:" <+> viaShow mmApiAddr - case mmApiAddr of - Left attemptn -> do - - q <- liftIO newTQueueIO - - subscribe @e (PeerMetaEventKey p) $ \case - PeerMetaEvent meta -> do - liftIO $ atomically $ writeTQueue q (Just meta) - request p (GetPeerMeta @e) - - r <- liftIO $ race ( pause defGetPeerMetaTimeout ) - ( atomically $ do - s <- readTQueue q - void $ flushTQueue q - pure s - ) - case r of - Left _ -> - liftIO $ atomically $ writeTVar (_peerHttpApiAddress pinfo) $ - if attemptn < 3 then (Left (attemptn + 1)) else (Right Nothing) - Right (Just meta) -> (void . runMaybeT) do - peerMeta <- case meta of - NoMetaData -> (MaybeT . pure) Nothing - ShortMetadata t -> do - (MaybeT . pure) (parsePeerMeta t) - AnnHashRef h -> (MaybeT . pure) Nothing - liftIO $ atomically $ writeTVar (_peerMeta pinfo) (Just peerMeta) - - -- 3) пробить, что есть tcp - forM_ (lookupDecode "listen-tcp" (unPeerMeta peerMeta)) \listenTCPPort -> lift do - peerTCPAddrPort <- replacePort p listenTCPPort - p <- fromPeerAddr (L4Address TCP peerTCPAddrPort) - sendPing p - - forM_ mtcp \(tcp :: MessagingTCP) -> do - -- 4) выяснить, можно ли к нему открыть соединение на этот порт - -- возможно, с ним уже открыто соединение - -- или попробовать открыть или запомнить, что было открыто - -- connectPeerTCP ? - tcpAddressIsAvailable <- isJust <$> do - liftIO $ atomically $ readTVar (view tcpPeerConn tcp) <&> HashMap.lookup p - when tcpAddressIsAvailable do - -- добавить этого пира в pex - addPeers pl [p] - - port <- (MaybeT . pure) (lookupDecode "http-port" (unPeerMeta peerMeta)) - - lift do - - peerHttpApiAddr <- show . pretty <$> replacePort p port - -- check peerHttpApiAddr - - r :: Maybe () <- runMaybeT do - resp <- MaybeT (liftIO $ fmap eitherToMaybe - $ race ( pause defBlockWaitMax ) - (do - req <- liftIO $ parseRequest [qc|http://{peerHttpApiAddr}/metadata|] - httpLbs req - ) - `catch` (\(e :: SomeException) -> debug (viaShow e) >> pure (Left ())) - ) - MaybeT . pure $ case statusCode (getResponseStatus resp) of - 200 -> Just () - _ -> Nothing - - liftIO $ atomically $ writeTVar (_peerHttpApiAddress pinfo) $ Right $ peerHttpApiAddr <$ r - mapM_ (liftIO . atomically . writeTVar (_peerMeta pinfo) . Just) $ peerMeta <$ r - debug $ "Got peer meta from" <+> pretty p <+> ":" <+> viaShow peerMeta - - _ -> do - liftIO $ atomically $ writeTVar (_peerHttpApiAddress pinfo) $ Right Nothing - - _ -> pure () - - -- FIXME: move-hardcode-to-a-config - pause @'Seconds 300 - - where - replacePort :: Peer e -> Word16 -> PeerM e IO (IPAddrPort e) - replacePort peer port = do - IPAddrPort (ip,_port) <- fromString @(IPAddrPort e) . show . pretty <$> toPeerAddr peer - pure $ IPAddrPort (ip,port) - - lookupDecode :: (Eq k, Read v) => k -> [(k, ByteString)] -> Maybe v - lookupDecode k d = - readMay . Text.unpack . TE.decodeUtf8 =<< lookup k d diff --git a/hbs2-peer/app/PeerConfig.hs b/hbs2-peer/app/PeerConfig.hs index 57cbacef..2c0958a9 100644 --- a/hbs2-peer/app/PeerConfig.hs +++ b/hbs2-peer/app/PeerConfig.hs @@ -44,6 +44,7 @@ pattern Key n ns <- SymbolVal n : ns data PeerListenTCPKey data PeerDownloadLogKey data PeerHttpPortKey +data PeerTcpPropWaitKey data PeerUseHttpDownload instance HasCfgKey PeerListenTCPKey (Maybe String) where @@ -52,6 +53,9 @@ instance HasCfgKey PeerListenTCPKey (Maybe String) where instance HasCfgKey PeerHttpPortKey (Maybe Integer) where key = "http-port" +instance HasCfgKey PeerTcpPropWaitKey (Maybe Integer) where + key = "tcp-prop-wait" + instance HasCfgKey PeerUseHttpDownload FeatureSwitch where key = "http-download" diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 43bbb4a5..1036c8c5 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -47,6 +47,7 @@ import RefLog qualified import RefLog (reflogWorker) import HttpWorker import ProxyMessaging +import PeerMeta import Codec.Serialise import Control.Concurrent.Async @@ -151,7 +152,6 @@ instance HasCfgValue PeerAcceptAnnounceKey AcceptAnnounce where ] kk = key @PeerAcceptAnnounceKey @AcceptAnnounce - data RPCOpt = RPCOpt { _rpcOptConf :: Maybe FilePath @@ -743,7 +743,9 @@ runPeer opts = Exception.handle myException $ do peerThread "blockDownloadLoop " (blockDownloadLoop denv) - peerThread "fillPeerMeta" (fillPeerMeta tcp) + let tcpPropWait :: Timeout 'Seconds + tcpPropWait = (fromInteger . fromMaybe 300) (cfgValue @PeerTcpPropWaitKey conf) + peerThread "fillPeerMeta" (fillPeerMeta tcp tcpPropWait) -- FIXME: clumsy-code -- Is it better now ? diff --git a/hbs2-peer/app/PeerMeta.hs b/hbs2-peer/app/PeerMeta.hs new file mode 100644 index 00000000..10962521 --- /dev/null +++ b/hbs2-peer/app/PeerMeta.hs @@ -0,0 +1,150 @@ +module PeerMeta where + +import HBS2.Actors.Peer +import HBS2.Clock +import HBS2.Defaults +import HBS2.Events +import HBS2.Merkle +import HBS2.Net.IP.Addr +import HBS2.Net.Messaging.TCP +import HBS2.Net.PeerLocator +import HBS2.Net.Proto +import HBS2.Net.Proto.Peer +import HBS2.Net.Proto.PeerMeta +import HBS2.Net.Proto.Sessions +import HBS2.Prelude.Plated +import HBS2.System.Logger.Simple + +import PeerTypes + +import Control.Concurrent.Async +import Control.Concurrent.STM +import Control.Monad.Reader +import Control.Monad.Trans.Maybe +import Data.ByteString (ByteString) +import Data.Foldable hiding (find) +import Data.HashMap.Strict qualified as HashMap +import Data.Maybe +import Data.Text qualified as Text +import Data.Text.Encoding qualified as TE +import Data.Word +import Lens.Micro.Platform +import Network.HTTP.Simple (getResponseBody, httpLbs, parseRequest, getResponseStatus) +import Network.HTTP.Types.Status +import Text.InterpolatedString.Perl6 (qc) +import UnliftIO.Exception + + +fillPeerMeta :: forall e m t . + ( m ~ PeerM e IO + , MonadIO m + , HasProtocol e (PeerMetaProto e) + , PeerSessionKey e (PeerInfo e) + , PeerMessaging e + , IsPeerAddr e m + , Pretty (Peer e) + , Pretty (PeerAddr e) + , EventListener e ( PeerMetaProto e) m + , e ~ L4Proto + , IsTimeout t + ) + => Maybe MessagingTCP -> Timeout t -> m () +fillPeerMeta mtcp propPeriod = do + debug "I'm fillPeerMeta" + pl <- getPeerLocator @e + + forever $ (>> (pause propPeriod)) $ do + pause @'Seconds 5 -- wait 'till everything calm down + + ps <- knownPeers @e pl + debug $ "fillPeerMeta peers:" <+> pretty ps + npi <- newPeerInfo + for_ ps $ \p -> do + + pinfo <- fetch True npi (PeerInfoKey p) id + mmApiAddr <- liftIO $ readTVarIO (_peerHttpApiAddress pinfo) + + debug $ "Find peer meta and http address" <+> pretty p <+> "current:" <+> viaShow mmApiAddr + case mmApiAddr of + Left attemptn -> do + + q <- liftIO newTQueueIO + + subscribe @e (PeerMetaEventKey p) $ \case + PeerMetaEvent meta -> do + liftIO $ atomically $ writeTQueue q (Just meta) + request p (GetPeerMeta @e) + + r <- liftIO $ race ( pause defGetPeerMetaTimeout ) + ( atomically $ do + s <- readTQueue q + void $ flushTQueue q + pure s + ) + case r of + Left _ -> + liftIO $ atomically $ writeTVar (_peerHttpApiAddress pinfo) $ + if attemptn < 3 then (Left (attemptn + 1)) else (Right Nothing) + Right (Just meta) -> (void . runMaybeT) do + peerMeta <- case meta of + NoMetaData -> (MaybeT . pure) Nothing + ShortMetadata t -> do + (MaybeT . pure) (parsePeerMeta t) + AnnHashRef h -> (MaybeT . pure) Nothing + liftIO $ atomically $ writeTVar (_peerMeta pinfo) (Just peerMeta) + + -- 3) пробить, что есть tcp + forM_ (lookupDecode "listen-tcp" (unPeerMeta peerMeta)) \listenTCPPort -> lift do + peerTCPAddrPort <- replacePort p listenTCPPort + p <- fromPeerAddr (L4Address TCP peerTCPAddrPort) + sendPing p + + forM_ mtcp \(tcp :: MessagingTCP) -> do + -- 4) выяснить, можно ли к нему открыть соединение на этот порт + -- возможно, с ним уже открыто соединение + -- или попробовать открыть или запомнить, что было открыто + -- connectPeerTCP ? + tcpAddressIsAvailable <- isJust <$> do + liftIO $ atomically $ readTVar (view tcpPeerConn tcp) <&> HashMap.lookup p + when tcpAddressIsAvailable do + -- добавить этого пира в pex + addPeers pl [p] + + port <- (MaybeT . pure) (lookupDecode "http-port" (unPeerMeta peerMeta)) + + lift do + + peerHttpApiAddr <- show . pretty <$> replacePort p port + -- check peerHttpApiAddr + + r :: Maybe () <- runMaybeT do + resp <- MaybeT (liftIO $ fmap eitherToMaybe + $ race ( pause defBlockWaitMax ) + (do + req <- liftIO $ parseRequest [qc|http://{peerHttpApiAddr}/metadata|] + httpLbs req + ) + `catch` (\(e :: SomeException) -> debug (viaShow e) >> pure (Left ())) + ) + MaybeT . pure $ case statusCode (getResponseStatus resp) of + 200 -> Just () + _ -> Nothing + + liftIO $ atomically $ writeTVar (_peerHttpApiAddress pinfo) $ Right $ peerHttpApiAddr <$ r + mapM_ (liftIO . atomically . writeTVar (_peerMeta pinfo) . Just) $ peerMeta <$ r + debug $ "Got peer meta from" <+> pretty p <+> ":" <+> viaShow peerMeta + + _ -> do + liftIO $ atomically $ writeTVar (_peerHttpApiAddress pinfo) $ Right Nothing + + _ -> pure () + + where + replacePort :: Peer e -> Word16 -> PeerM e IO (IPAddrPort e) + replacePort peer port = do + IPAddrPort (ip,_port) <- fromString @(IPAddrPort e) . show . pretty <$> toPeerAddr peer + pure $ IPAddrPort (ip,port) + + lookupDecode :: (Eq k, Read v) => k -> [(k, ByteString)] -> Maybe v + lookupDecode k d = + readMay . Text.unpack . TE.decodeUtf8 =<< lookup k d diff --git a/hbs2-peer/hbs2-peer.cabal b/hbs2-peer/hbs2-peer.cabal index ff97558b..8092c059 100644 --- a/hbs2-peer/hbs2-peer.cabal +++ b/hbs2-peer/hbs2-peer.cabal @@ -119,6 +119,7 @@ executable hbs2-peer , DownloadQ , Bootstrap , PeerInfo + , PeerMeta , RPC , PeerTypes , PeerConfig