Move fillPeerMeta to it's own module, add tcp-prop-wait config param

This commit is contained in:
Sergey Ivanov 2023-05-22 13:22:34 +04:00 committed by Dmitry Zuikov
parent 0424dfdc9f
commit 69b7a56695
5 changed files with 159 additions and 122 deletions

View File

@ -138,123 +138,3 @@ blockHttpDownloadLoop denv = do
_ -> do
trace $ "FAIL" <+> pretty peer <+> "download block" <+> pretty h
withDownload denv $ failedDownload peer h
---
-- FIXME: move-fillPeerMeta-to-separate-module
fillPeerMeta :: forall e m .
( m ~ PeerM e IO
, MonadIO m
, HasProtocol e (PeerMetaProto e)
, PeerSessionKey e (PeerInfo e)
, PeerMessaging e
, IsPeerAddr e m
, Pretty (Peer e)
, Pretty (PeerAddr e)
, EventListener e ( PeerMetaProto e) m
, e ~ L4Proto
)
=> Maybe MessagingTCP -> m ()
fillPeerMeta mtcp = do
debug "I'm fillPeerMeta"
pl <- getPeerLocator @e
pause @'Seconds 5 -- wait 'till everything calm down
forever do
ps <- knownPeers @e pl
debug $ "fillPeerMeta peers:" <+> pretty ps
npi <- newPeerInfo
for_ ps $ \p -> do
pinfo <- fetch True npi (PeerInfoKey p) id
mmApiAddr <- liftIO $ readTVarIO (_peerHttpApiAddress pinfo)
debug $ "Find peer meta and http address" <+> pretty p <+> "current:" <+> viaShow mmApiAddr
case mmApiAddr of
Left attemptn -> do
q <- liftIO newTQueueIO
subscribe @e (PeerMetaEventKey p) $ \case
PeerMetaEvent meta -> do
liftIO $ atomically $ writeTQueue q (Just meta)
request p (GetPeerMeta @e)
r <- liftIO $ race ( pause defGetPeerMetaTimeout )
( atomically $ do
s <- readTQueue q
void $ flushTQueue q
pure s
)
case r of
Left _ ->
liftIO $ atomically $ writeTVar (_peerHttpApiAddress pinfo) $
if attemptn < 3 then (Left (attemptn + 1)) else (Right Nothing)
Right (Just meta) -> (void . runMaybeT) do
peerMeta <- case meta of
NoMetaData -> (MaybeT . pure) Nothing
ShortMetadata t -> do
(MaybeT . pure) (parsePeerMeta t)
AnnHashRef h -> (MaybeT . pure) Nothing
liftIO $ atomically $ writeTVar (_peerMeta pinfo) (Just peerMeta)
-- 3) пробить, что есть tcp
forM_ (lookupDecode "listen-tcp" (unPeerMeta peerMeta)) \listenTCPPort -> lift do
peerTCPAddrPort <- replacePort p listenTCPPort
p <- fromPeerAddr (L4Address TCP peerTCPAddrPort)
sendPing p
forM_ mtcp \(tcp :: MessagingTCP) -> do
-- 4) выяснить, можно ли к нему открыть соединение на этот порт
-- возможно, с ним уже открыто соединение
-- или попробовать открыть или запомнить, что было открыто
-- connectPeerTCP ?
tcpAddressIsAvailable <- isJust <$> do
liftIO $ atomically $ readTVar (view tcpPeerConn tcp) <&> HashMap.lookup p
when tcpAddressIsAvailable do
-- добавить этого пира в pex
addPeers pl [p]
port <- (MaybeT . pure) (lookupDecode "http-port" (unPeerMeta peerMeta))
lift do
peerHttpApiAddr <- show . pretty <$> replacePort p port
-- check peerHttpApiAddr
r :: Maybe () <- runMaybeT do
resp <- MaybeT (liftIO $ fmap eitherToMaybe
$ race ( pause defBlockWaitMax )
(do
req <- liftIO $ parseRequest [qc|http://{peerHttpApiAddr}/metadata|]
httpLbs req
)
`catch` (\(e :: SomeException) -> debug (viaShow e) >> pure (Left ()))
)
MaybeT . pure $ case statusCode (getResponseStatus resp) of
200 -> Just ()
_ -> Nothing
liftIO $ atomically $ writeTVar (_peerHttpApiAddress pinfo) $ Right $ peerHttpApiAddr <$ r
mapM_ (liftIO . atomically . writeTVar (_peerMeta pinfo) . Just) $ peerMeta <$ r
debug $ "Got peer meta from" <+> pretty p <+> ":" <+> viaShow peerMeta
_ -> do
liftIO $ atomically $ writeTVar (_peerHttpApiAddress pinfo) $ Right Nothing
_ -> pure ()
-- FIXME: move-hardcode-to-a-config
pause @'Seconds 300
where
replacePort :: Peer e -> Word16 -> PeerM e IO (IPAddrPort e)
replacePort peer port = do
IPAddrPort (ip,_port) <- fromString @(IPAddrPort e) . show . pretty <$> toPeerAddr peer
pure $ IPAddrPort (ip,port)
lookupDecode :: (Eq k, Read v) => k -> [(k, ByteString)] -> Maybe v
lookupDecode k d =
readMay . Text.unpack . TE.decodeUtf8 =<< lookup k d

View File

@ -44,6 +44,7 @@ pattern Key n ns <- SymbolVal n : ns
data PeerListenTCPKey
data PeerDownloadLogKey
data PeerHttpPortKey
data PeerTcpPropWaitKey
data PeerUseHttpDownload
instance HasCfgKey PeerListenTCPKey (Maybe String) where
@ -52,6 +53,9 @@ instance HasCfgKey PeerListenTCPKey (Maybe String) where
instance HasCfgKey PeerHttpPortKey (Maybe Integer) where
key = "http-port"
instance HasCfgKey PeerTcpPropWaitKey (Maybe Integer) where
key = "tcp-prop-wait"
instance HasCfgKey PeerUseHttpDownload FeatureSwitch where
key = "http-download"

View File

@ -47,6 +47,7 @@ import RefLog qualified
import RefLog (reflogWorker)
import HttpWorker
import ProxyMessaging
import PeerMeta
import Codec.Serialise
import Control.Concurrent.Async
@ -151,7 +152,6 @@ instance HasCfgValue PeerAcceptAnnounceKey AcceptAnnounce where
]
kk = key @PeerAcceptAnnounceKey @AcceptAnnounce
data RPCOpt =
RPCOpt
{ _rpcOptConf :: Maybe FilePath
@ -743,7 +743,9 @@ runPeer opts = Exception.handle myException $ do
peerThread "blockDownloadLoop " (blockDownloadLoop denv)
peerThread "fillPeerMeta" (fillPeerMeta tcp)
let tcpPropWait :: Timeout 'Seconds
tcpPropWait = (fromInteger . fromMaybe 300) (cfgValue @PeerTcpPropWaitKey conf)
peerThread "fillPeerMeta" (fillPeerMeta tcp tcpPropWait)
-- FIXME: clumsy-code
-- Is it better now ?

150
hbs2-peer/app/PeerMeta.hs Normal file
View File

@ -0,0 +1,150 @@
module PeerMeta where
import HBS2.Actors.Peer
import HBS2.Clock
import HBS2.Defaults
import HBS2.Events
import HBS2.Merkle
import HBS2.Net.IP.Addr
import HBS2.Net.Messaging.TCP
import HBS2.Net.PeerLocator
import HBS2.Net.Proto
import HBS2.Net.Proto.Peer
import HBS2.Net.Proto.PeerMeta
import HBS2.Net.Proto.Sessions
import HBS2.Prelude.Plated
import HBS2.System.Logger.Simple
import PeerTypes
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Monad.Reader
import Control.Monad.Trans.Maybe
import Data.ByteString (ByteString)
import Data.Foldable hiding (find)
import Data.HashMap.Strict qualified as HashMap
import Data.Maybe
import Data.Text qualified as Text
import Data.Text.Encoding qualified as TE
import Data.Word
import Lens.Micro.Platform
import Network.HTTP.Simple (getResponseBody, httpLbs, parseRequest, getResponseStatus)
import Network.HTTP.Types.Status
import Text.InterpolatedString.Perl6 (qc)
import UnliftIO.Exception
fillPeerMeta :: forall e m t .
( m ~ PeerM e IO
, MonadIO m
, HasProtocol e (PeerMetaProto e)
, PeerSessionKey e (PeerInfo e)
, PeerMessaging e
, IsPeerAddr e m
, Pretty (Peer e)
, Pretty (PeerAddr e)
, EventListener e ( PeerMetaProto e) m
, e ~ L4Proto
, IsTimeout t
)
=> Maybe MessagingTCP -> Timeout t -> m ()
fillPeerMeta mtcp propPeriod = do
debug "I'm fillPeerMeta"
pl <- getPeerLocator @e
forever $ (>> (pause propPeriod)) $ do
pause @'Seconds 5 -- wait 'till everything calm down
ps <- knownPeers @e pl
debug $ "fillPeerMeta peers:" <+> pretty ps
npi <- newPeerInfo
for_ ps $ \p -> do
pinfo <- fetch True npi (PeerInfoKey p) id
mmApiAddr <- liftIO $ readTVarIO (_peerHttpApiAddress pinfo)
debug $ "Find peer meta and http address" <+> pretty p <+> "current:" <+> viaShow mmApiAddr
case mmApiAddr of
Left attemptn -> do
q <- liftIO newTQueueIO
subscribe @e (PeerMetaEventKey p) $ \case
PeerMetaEvent meta -> do
liftIO $ atomically $ writeTQueue q (Just meta)
request p (GetPeerMeta @e)
r <- liftIO $ race ( pause defGetPeerMetaTimeout )
( atomically $ do
s <- readTQueue q
void $ flushTQueue q
pure s
)
case r of
Left _ ->
liftIO $ atomically $ writeTVar (_peerHttpApiAddress pinfo) $
if attemptn < 3 then (Left (attemptn + 1)) else (Right Nothing)
Right (Just meta) -> (void . runMaybeT) do
peerMeta <- case meta of
NoMetaData -> (MaybeT . pure) Nothing
ShortMetadata t -> do
(MaybeT . pure) (parsePeerMeta t)
AnnHashRef h -> (MaybeT . pure) Nothing
liftIO $ atomically $ writeTVar (_peerMeta pinfo) (Just peerMeta)
-- 3) пробить, что есть tcp
forM_ (lookupDecode "listen-tcp" (unPeerMeta peerMeta)) \listenTCPPort -> lift do
peerTCPAddrPort <- replacePort p listenTCPPort
p <- fromPeerAddr (L4Address TCP peerTCPAddrPort)
sendPing p
forM_ mtcp \(tcp :: MessagingTCP) -> do
-- 4) выяснить, можно ли к нему открыть соединение на этот порт
-- возможно, с ним уже открыто соединение
-- или попробовать открыть или запомнить, что было открыто
-- connectPeerTCP ?
tcpAddressIsAvailable <- isJust <$> do
liftIO $ atomically $ readTVar (view tcpPeerConn tcp) <&> HashMap.lookup p
when tcpAddressIsAvailable do
-- добавить этого пира в pex
addPeers pl [p]
port <- (MaybeT . pure) (lookupDecode "http-port" (unPeerMeta peerMeta))
lift do
peerHttpApiAddr <- show . pretty <$> replacePort p port
-- check peerHttpApiAddr
r :: Maybe () <- runMaybeT do
resp <- MaybeT (liftIO $ fmap eitherToMaybe
$ race ( pause defBlockWaitMax )
(do
req <- liftIO $ parseRequest [qc|http://{peerHttpApiAddr}/metadata|]
httpLbs req
)
`catch` (\(e :: SomeException) -> debug (viaShow e) >> pure (Left ()))
)
MaybeT . pure $ case statusCode (getResponseStatus resp) of
200 -> Just ()
_ -> Nothing
liftIO $ atomically $ writeTVar (_peerHttpApiAddress pinfo) $ Right $ peerHttpApiAddr <$ r
mapM_ (liftIO . atomically . writeTVar (_peerMeta pinfo) . Just) $ peerMeta <$ r
debug $ "Got peer meta from" <+> pretty p <+> ":" <+> viaShow peerMeta
_ -> do
liftIO $ atomically $ writeTVar (_peerHttpApiAddress pinfo) $ Right Nothing
_ -> pure ()
where
replacePort :: Peer e -> Word16 -> PeerM e IO (IPAddrPort e)
replacePort peer port = do
IPAddrPort (ip,_port) <- fromString @(IPAddrPort e) . show . pretty <$> toPeerAddr peer
pure $ IPAddrPort (ip,port)
lookupDecode :: (Eq k, Read v) => k -> [(k, ByteString)] -> Maybe v
lookupDecode k d =
readMay . Text.unpack . TE.decodeUtf8 =<< lookup k d

View File

@ -119,6 +119,7 @@ executable hbs2-peer
, DownloadQ
, Bootstrap
, PeerInfo
, PeerMeta
, RPC
, PeerTypes
, PeerConfig