diff --git a/.fixme/log b/.fixme/log index 9185428e..e69de29b 100644 --- a/.fixme/log +++ b/.fixme/log @@ -1,5 +0,0 @@ - -fixme-del "Hwmrzssg8X" -fixme-del "Cos1uYVyys" -fixme-del "6KCMs4gLkt" -fixme-del "AiKNngFjfk" \ No newline at end of file diff --git a/docs/devlog.md b/docs/devlog.md index 082252bc..722bf8ae 100644 --- a/docs/devlog.md +++ b/docs/devlog.md @@ -1254,4 +1254,11 @@ FIXME: Обработка ошибок в асинхронном приложе всё еще 0. +## 2023-04-03 + +PR: implement-http-block-download-worker + branch: iv/http-block-download-worker-5 + commit: c1b32d9b7d4ad46f1924bf340374d64c29cefb67 + Скачивание блока по http. + Решение 7gN8M32Ugm (http-block-download-worker) diff --git a/hbs2-core/hbs2-core.cabal b/hbs2-core/hbs2-core.cabal index 5d4b7605..94528e3d 100644 --- a/hbs2-core/hbs2-core.cabal +++ b/hbs2-core/hbs2-core.cabal @@ -93,6 +93,7 @@ library , HBS2.Net.Proto.Peer , HBS2.Net.Proto.PeerAnnounce , HBS2.Net.Proto.PeerExchange + , HBS2.Net.Proto.PeerMeta , HBS2.Net.Proto.Sessions , HBS2.Net.Proto.RefLog , HBS2.Net.Proto.Types diff --git a/hbs2-core/lib/HBS2/Defaults.hs b/hbs2-core/lib/HBS2/Defaults.hs index d94e44ab..ff5dd470 100644 --- a/hbs2-core/lib/HBS2/Defaults.hs +++ b/hbs2-core/lib/HBS2/Defaults.hs @@ -95,6 +95,9 @@ defPexMaxPeers = 50 defDownloadFails :: Int defDownloadFails = 100 +defGetPeerMetaTimeout :: Timeout 'Seconds +defGetPeerMetaTimeout = 10 + -- TODO: peer-does-not-have-a-block-ok -- Это нормально, когда у пира нет блока. -- У него может не быть каких-то блоков, diff --git a/hbs2-core/lib/HBS2/Merkle.hs b/hbs2-core/lib/HBS2/Merkle.hs index fa7c472b..2af873ea 100644 --- a/hbs2-core/lib/HBS2/Merkle.hs +++ b/hbs2-core/lib/HBS2/Merkle.hs @@ -78,7 +78,7 @@ makeLenses ''MNodeData instance Serialise MNodeData data AnnMetaData = NoMetaData | ShortMetadata Text | AnnHashRef (Hash HbSync) - deriving stock (Generic,Data,Show) + deriving stock (Generic,Data,Show,Eq) instance Serialise AnnMetaData diff --git a/hbs2-core/lib/HBS2/Net/Proto/Definition.hs b/hbs2-core/lib/HBS2/Net/Proto/Definition.hs index 8623b407..91cff8f2 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Definition.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Definition.hs @@ -19,6 +19,7 @@ import HBS2.Net.Proto.BlockInfo import HBS2.Net.Proto.Peer import HBS2.Net.Proto.PeerAnnounce import HBS2.Net.Proto.PeerExchange +import HBS2.Net.Proto.PeerMeta import HBS2.Net.Proto.RefLog import HBS2.Prelude @@ -112,6 +113,12 @@ instance HasProtocol UDP (RefLogRequest UDP) where decode = either (const Nothing) Just . deserialiseOrFail encode = serialise +instance HasProtocol UDP (PeerMetaProto UDP) where + type instance ProtocolId (PeerMetaProto UDP) = 9 + type instance Encoded UDP = ByteString + decode = either (const Nothing) Just . deserialiseOrFail + encode = serialise + -- FIXME: real-period requestPeriodLim = ReqLimPerMessage 1 @@ -136,6 +143,9 @@ instance Expires (SessionKey UDP (PeerHandshake UDP)) where instance Expires (EventKey UDP (PeerAnnounce UDP)) where expiresIn _ = Nothing +instance Expires (EventKey UDP (PeerMetaProto UDP)) where + expiresIn _ = Just 600 + instance MonadIO m => HasNonces (PeerHandshake UDP) m where type instance Nonce (PeerHandshake UDP) = BS.ByteString diff --git a/hbs2-core/lib/HBS2/Net/Proto/PeerMeta.hs b/hbs2-core/lib/HBS2/Net/Proto/PeerMeta.hs new file mode 100644 index 00000000..07a5839b --- /dev/null +++ b/hbs2-core/lib/HBS2/Net/Proto/PeerMeta.hs @@ -0,0 +1,72 @@ +module HBS2.Net.Proto.PeerMeta where + +import HBS2.Base58 +import HBS2.Events +import HBS2.Hash +import HBS2.Merkle +import HBS2.Net.Proto +import HBS2.Net.Proto.Peer +import HBS2.Net.Proto.Sessions +import HBS2.Prelude.Plated + +import Codec.Serialise +import Control.Monad +import Data.ByteString ( ByteString ) +import Data.ByteString.Lazy qualified as LBS +import Data.Functor +import Data.Maybe +import Data.Text.Encoding qualified as TE + +data PeerMetaProto e + = GetPeerMeta + | ThePeerMeta AnnMetaData + deriving stock (Eq,Generic,Show) + +instance Serialise (PeerMetaProto e) + + +peerMetaProto :: forall e m . ( MonadIO m + , Response e (PeerMetaProto e) m + , HasDeferred e (PeerMetaProto e) m + , EventEmitter e (PeerMetaProto e) m + , Sessions e (KnownPeer e) m + ) + => m AnnMetaData + -> PeerMetaProto e + -> m () + +peerMetaProto getPeerMeta = + \case + GetPeerMeta -> do + p <- thatPeer (Proxy @(PeerMetaProto e)) + auth <- find (KnownPeerKey p) id <&> isJust + when auth do + deferred (Proxy @(PeerMetaProto e)) do + getPeerMeta >>= \meta -> response (ThePeerMeta @e meta) + + ThePeerMeta meta -> do + that <- thatPeer (Proxy @(PeerMetaProto e)) + emit @e (PeerMetaEventKey that) (PeerMetaEvent meta) + +newtype instance EventKey e (PeerMetaProto e) = + PeerMetaEventKey (Peer e) + deriving stock (Typeable, Generic) + +deriving instance Eq (Peer e) => Eq (EventKey e (PeerMetaProto e)) +deriving instance (Eq (Peer e), Hashable (Peer e)) => Hashable (EventKey e (PeerMetaProto e)) + +newtype instance Event e (PeerMetaProto e) + = PeerMetaEvent AnnMetaData + deriving stock (Typeable) + +newtype PeerMeta = PeerMeta [(Text, ByteString)] + deriving stock (Generic) + +instance Serialise PeerMeta + +annMetaFromPeerMeta :: PeerMeta -> AnnMetaData +annMetaFromPeerMeta = + ShortMetadata . TE.decodeUtf8 . toBase58 . LBS.toStrict . serialise + +parsePeerMeta :: Text -> Maybe PeerMeta +parsePeerMeta = either (const Nothing) Just . deserialiseOrFail . LBS.fromStrict <=< fromBase58 . TE.encodeUtf8 diff --git a/hbs2-core/lib/HBS2/Prelude.hs b/hbs2-core/lib/HBS2/Prelude.hs index 601e6f41..9f0e1503 100644 --- a/hbs2-core/lib/HBS2/Prelude.hs +++ b/hbs2-core/lib/HBS2/Prelude.hs @@ -4,6 +4,7 @@ module HBS2.Prelude , MonadIO(..) , void, guard, when, unless , maybe1 + , eitherToMaybe , Hashable , lift , AsFileName(..) @@ -36,6 +37,8 @@ none = pure () maybe1 :: Maybe a -> b -> (a -> b) -> b maybe1 mb n j = maybe n j mb +eitherToMaybe :: Either a b -> Maybe b +eitherToMaybe = either (const Nothing) Just newtype AsFileName a = AsFileName a diff --git a/hbs2-peer/app/BlockHttpDownload.hs b/hbs2-peer/app/BlockHttpDownload.hs new file mode 100644 index 00000000..07badc45 --- /dev/null +++ b/hbs2-peer/app/BlockHttpDownload.hs @@ -0,0 +1,218 @@ +{-# OPTIONS_GHC -fno-warn-orphans #-} +{-# Language UndecidableInstances #-} +{-# Language MultiWayIf #-} +module BlockHttpDownload where + +import HBS2.Actors.Peer +import HBS2.Clock +import HBS2.Data.Detect +import HBS2.Data.Types.Refs +import HBS2.Defaults +import HBS2.Events +import HBS2.Hash +import HBS2.Merkle +import HBS2.Net.IP.Addr +import HBS2.Net.PeerLocator +import HBS2.Net.Proto +import HBS2.Net.Proto.Definition +import HBS2.Net.Proto.Peer +import HBS2.Net.Proto.PeerMeta +import HBS2.Net.Proto.RefLog +import HBS2.Net.Proto.Sessions +import HBS2.Prelude +import HBS2.Prelude.Plated +import HBS2.Storage +import HBS2.System.Logger.Simple + +import PeerTypes +import PeerInfo +import BlockDownload +import Brains + +import Control.Concurrent.Async +import Control.Concurrent.STM +import Control.Monad.Reader +import Control.Monad.Trans.Maybe +import Data.ByteString.Lazy (ByteString) +import Data.Cache qualified as Cache +import Data.Foldable hiding (find) +import Data.HashMap.Strict qualified as HashMap +import Data.IntMap (IntMap) +import Data.IntMap qualified as IntMap +import Data.IntSet qualified as IntSet +import Data.List qualified as List +import Data.Maybe +import Data.Text qualified as Text +import Data.Text.Encoding qualified as TE +import Lens.Micro.Platform +import Network.HTTP.Simple (getResponseBody, httpLbs, parseRequest, getResponseStatus) +import Network.HTTP.Types.Status +import Network.Socket +import Streaming (Stream, Of) +import Streaming.Prelude qualified as S +import System.Random (randomRIO) +import System.Random.Shuffle (shuffleM) +import Text.InterpolatedString.Perl6 (qc) + +blockHttpDownloadLoop :: forall e m . + ( m ~ PeerM e IO + , MonadIO m + , HasProtocol e (BlockInfo e) + , Sessions e (KnownPeer e) m + , PeerSessionKey e (PeerInfo e) + , Pretty (Peer e) + , IsPeerAddr e m + -- FIXME: backlog-do-something-with-that + -- это не ревью, это надо что-то с этим + -- сделать, неудачное решение + , Block ByteString ~ ByteString + ) + => DownloadEnv e -> m () +blockHttpDownloadLoop denv = do + + e <- ask + + pl <- getPeerLocator @e + + pause @'Seconds 3.81 + + debug "I'm blockHttpDownloadLoop" + +--- + + let streamPeers :: Stream (Of (Peer e, PeerInfo e)) m () + streamPeers = flip fix [] \goPeers -> \case + [] -> do + pause @'Seconds 1.44 + ps <- knownPeers @e pl + when (null ps) do + trace $ "No peers to use for http download" + pause @'Seconds 4 + goPeers ps + peer:ps -> do + authorized <- lift $ find (KnownPeerKey peer) id <&> isJust + pinfo <- lift $ find (PeerInfoKey peer) id <&> isJust + when (authorized && pinfo) do + npi <- lift newPeerInfo + pinfo <- lift $ fetch True npi (PeerInfoKey peer) id + S.yield (peer, pinfo) + goPeers ps + + let streamPeerAddrs = S.catMaybes $ streamPeers & S.mapM \(peer, pinfo) -> + (fmap (peer, pinfo, ) . join . eitherToMaybe) <$> do + r <- liftIO $ readTVarIO (_peerHttpApiAddress pinfo) + -- debug $ "streamPeerAddrs" <+> pretty peer <+> viaShow (viaShow <$> r) + pure r + + let streamBlockHs = S.catMaybes $ streamPeerAddrs & S.mapM \(peer, pinfo, apiAddr) -> do + -- inq <- liftIO $ readTVarIO (_blockInQ denv) + -- TODO: change to only use blockInQ + -- do we need analog of getBlockForDownload ? + mbh <- withDownload denv $ getBlockForDownload peer + -- debug $ "streamBlockHs" <+> pretty peer <+> pretty apiAddr <+> viaShow (pretty <$> mbh) + pure $ (peer, pinfo, apiAddr, ) <$> mbh + + streamBlockHs & S.mapM_ \(peer, pinfo, apiAddr, h) -> do + + trace $ "Querying via http from" <+> pretty (peer, apiAddr) <+> "block" <+> pretty h + r <- liftIO $ race ( pause defBlockWaitMax ) + $ do + req <- liftIO $ parseRequest [qc|http://{apiAddr}/cat/{pretty h}|] + resp <- httpLbs req + + case statusCode (getResponseStatus resp) of + 200 -> pure $ Just (getResponseBody resp) + _ -> pure Nothing + + case r of + Right (Just block) -> do + trace $ "SUCCESS" <+> pretty peer <+> "http-download block" <+> pretty h + sto <- getStorage + liftIO $ putBlock sto block + withDownload denv $ processBlock h + _ -> do + trace $ "FAIL" <+> pretty peer <+> "download block" <+> pretty h + withDownload denv $ failedDownload peer h + + +--- + +updatePeerHttpAddrs :: forall e m . + ( m ~ PeerM e IO + , MonadIO m + , HasProtocol e (PeerMetaProto e) + , PeerSessionKey e (PeerInfo e) + , PeerMessaging e + , IsPeerAddr e m + , Pretty (Peer e) + , Pretty (PeerAddr e) + , EventListener e( PeerMetaProto e) m + ) + => m () +updatePeerHttpAddrs = do + debug "I'm updatePeerHttpAddrs" + pl <- getPeerLocator @e + forever do + + -- REVIEW: isnt-it-too-often + -- Не слишком ли часто обновлять http адрес? + -- Зачем раз в пять секунд? + -- -- Это попытка узнать адрес. Если раз определили его, то уже не будем снова пытаться. + -- При этом всего будет не более трёх попыток. + pause @'Seconds 5 + ps <- knownPeers @e pl + debug $ "updatePeerHttpAddrs peers:" <+> pretty ps + npi <- newPeerInfo + for_ ps $ \p -> do + + pinfo <- fetch True npi (PeerInfoKey p) id + mmApiAddr <- liftIO $ readTVarIO (_peerHttpApiAddress pinfo) + + debug $ "Find peer http address" <+> pretty p <+> "current:" <+> viaShow mmApiAddr + case mmApiAddr of + Left attemptn -> do + + q <- liftIO newTQueueIO + + subscribe @e (PeerMetaEventKey p) $ \case + PeerMetaEvent meta -> do + liftIO $ atomically $ writeTQueue q (Just meta) + request p (GetPeerMeta @e) + + r <- liftIO $ race ( pause defGetPeerMetaTimeout ) + ( atomically $ do + s <- readTQueue q + void $ flushTQueue q + pure s + ) + case r of + Left _ -> + liftIO $ atomically $ writeTVar (_peerHttpApiAddress pinfo) $ + if attemptn < 3 then (Left (attemptn + 1)) else (Right Nothing) + Right (Just meta) -> (void . runMaybeT) do + port <- case meta of + NoMetaData -> (MaybeT . pure) Nothing + ShortMetadata t -> do + PeerMeta d <- (MaybeT . pure) (parsePeerMeta t) + httpPortBS <- (MaybeT . pure) (lookup "http-port" d) + (MaybeT . pure . readMay . Text.unpack . TE.decodeUtf8) httpPortBS + AnnHashRef h -> (MaybeT . pure) Nothing + lift do + IPAddrPort (ip,_port) <- fromString @(IPAddrPort e) . show . pretty <$> toPeerAddr p + let peerHttpApiAddr = show . pretty $ IPAddrPort (ip,port) + + -- check peerHttpApiAddr + r <- liftIO $ race ( pause defBlockWaitMax ) do + req <- liftIO $ parseRequest [qc|http://{peerHttpApiAddr}/metadata|] + resp <- httpLbs req + case statusCode (getResponseStatus resp) of + 200 -> pure True + _ -> pure False + liftIO $ atomically $ writeTVar (_peerHttpApiAddress pinfo) $ Right $ + case r of + Right True -> Just peerHttpApiAddr + _ -> Nothing + _ -> do + liftIO $ atomically $ writeTVar (_peerHttpApiAddress pinfo) $ Right Nothing + + _ -> pure () diff --git a/hbs2-peer/app/HttpWorker.hs b/hbs2-peer/app/HttpWorker.hs index a90cd3d9..1a7077d1 100644 --- a/hbs2-peer/app/HttpWorker.hs +++ b/hbs2-peer/app/HttpWorker.hs @@ -2,6 +2,7 @@ module HttpWorker where import HBS2.Prelude import HBS2.Actors.Peer +import HBS2.Net.Proto.PeerMeta import HBS2.Storage import HBS2.Data.Types.Refs import HBS2.Net.Proto.Types @@ -12,6 +13,9 @@ import PeerTypes import PeerConfig import Data.Functor +import Data.Maybe +import Data.Text qualified as Text +import Data.Text.Encoding qualified as TE import Data.ByteString.Lazy qualified as LBS import Network.HTTP.Types.Status import Network.Wai.Middleware.RequestLogger @@ -66,6 +70,12 @@ httpWorker conf e = do maybe1 va (status status404) $ \val -> do text [qc|{pretty val}|] + get "/metadata" do + let mport = cfgValue @PeerHttpPortKey conf <&> fromIntegral + raw $ serialise . annMetaFromPeerMeta . PeerMeta . catMaybes $ + [ mport <&> \port -> ("http-port", TE.encodeUtf8 . Text.pack . show $ port) + ] + put "/" do -- FIXME: optional-header-based-authorization -- signed nonce + peer key? diff --git a/hbs2-peer/app/PeerConfig.hs b/hbs2-peer/app/PeerConfig.hs index 02017e83..778c32fa 100644 --- a/hbs2-peer/app/PeerConfig.hs +++ b/hbs2-peer/app/PeerConfig.hs @@ -43,10 +43,14 @@ pattern Key n ns <- SymbolVal n : ns data PeerDownloadLogKey data PeerHttpPortKey +data PeerUseHttpDownload instance HasCfgKey PeerHttpPortKey (Maybe Integer) where key = "http-port" +instance HasCfgKey PeerUseHttpDownload FeatureSwitch where + key = "http-download" + instance HasCfgKey PeerDownloadLogKey (Maybe String) where key = "download-log" diff --git a/hbs2-peer/app/PeerInfo.hs b/hbs2-peer/app/PeerInfo.hs index 9457a3f1..cdcdf2d7 100644 --- a/hbs2-peer/app/PeerInfo.hs +++ b/hbs2-peer/app/PeerInfo.hs @@ -54,6 +54,7 @@ data PeerInfo e = , _peerUsefulness :: TVar Double , _peerRTTBuffer :: TVar [Integer] -- ^ Contains a list of the last few round-trip time (RTT) values, measured in nanoseconds. -- Acts like a circular buffer. + , _peerHttpApiAddress :: TVar (Either Int (Maybe String)) } deriving stock (Generic,Typeable) @@ -107,6 +108,7 @@ newPeerInfo = liftIO do <*> newTVarIO 0 <*> newTVarIO 0 <*> newTVarIO [] + <*> newTVarIO (Left 0) type instance SessionData e (PeerInfo e) = PeerInfo e diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 81872434..c616f807 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -12,6 +12,7 @@ import HBS2.Defaults import HBS2.Events import HBS2.Hash import HBS2.Data.Types.Refs (RefLogKey(..)) +import HBS2.Merkle import HBS2.Net.Auth.Credentials import HBS2.Net.IP.Addr import HBS2.Net.Messaging.UDP @@ -21,6 +22,7 @@ import HBS2.Net.Proto.Definition import HBS2.Net.Proto.Peer import HBS2.Net.Proto.PeerAnnounce import HBS2.Net.Proto.PeerExchange +import HBS2.Net.Proto.PeerMeta import HBS2.Net.Proto.RefLog import HBS2.Net.Proto.Sessions import HBS2.OrDie @@ -34,6 +36,7 @@ import Brains import RPC import PeerTypes import BlockDownload +import BlockHttpDownload import DownloadQ import PeerInfo import PeerConfig @@ -60,6 +63,10 @@ import Data.Maybe import Data.Set qualified as Set import Data.Set (Set) import Data.Text qualified as Text +import Data.Text (Text) +import Data.Text.Encoding qualified as TE +import GHC.Stats +import GHC.TypeLits import Lens.Micro.Platform import Network.Socket import Options.Applicative @@ -448,6 +455,8 @@ runPeer opts = Exception.handle myException $ do let wlkeys = toKeys (whs `Set.difference` bls) let helpFetchKeys = cfgValue @PeerProxyFetchKey conf & toKeys + let useHttpDownload = cfgValue @PeerUseHttpDownload conf & (== FeatureOn) + let accptAnn = cfgValue @PeerAcceptAnnounceKey conf :: AcceptAnnounce print $ pretty accptAnn @@ -518,6 +527,12 @@ runPeer opts = Exception.handle myException $ do nbcache <- liftIO $ Cache.newCache (Just $ toTimeSpec ( 600 :: Timeout 'Seconds)) + let mkPeerMeta = do + let mport = cfgValue @PeerHttpPortKey conf <&> fromIntegral + pure $ annMetaFromPeerMeta . PeerMeta . catMaybes $ + [ mport <&> \port -> ("http-port", TE.encodeUtf8 . Text.pack . show $ port) + ] + void $ async $ forever do pause @'Seconds 600 liftIO $ Cache.purgeExpired nbcache @@ -686,6 +701,12 @@ runPeer opts = Exception.handle myException $ do peerThread (blockDownloadLoop denv) + if useHttpDownload + then do + peerThread updatePeerHttpAddrs + peerThread (blockHttpDownloadLoop denv) + else pure mempty + peerThread (postponedLoop denv) peerThread (downloadQueue conf denv) @@ -792,6 +813,7 @@ runPeer opts = Exception.handle myException $ do , makeResponse peerExchangeProto , makeResponse (refLogUpdateProto reflogAdapter) , makeResponse (refLogRequestProto reflogReqAdapter) + , makeResponse (peerMetaProto mkPeerMeta) ] void $ liftIO $ waitAnyCatchCancel workers diff --git a/hbs2-peer/hbs2-peer.cabal b/hbs2-peer/hbs2-peer.cabal index bd3a28da..198b51c0 100644 --- a/hbs2-peer/hbs2-peer.cabal +++ b/hbs2-peer/hbs2-peer.cabal @@ -58,6 +58,7 @@ common common-deps , ekg-core , scotty , warp + , http-conduit , http-types , wai-extra @@ -112,6 +113,7 @@ executable hbs2-peer main-is: PeerMain.hs other-modules: BlockDownload + , BlockHttpDownload , DownloadQ , Bootstrap , PeerInfo