diff --git a/hbs2-peer/app/BlockDownload.hs b/hbs2-peer/app/BlockDownload.hs index 893c1928..574f0404 100644 --- a/hbs2-peer/app/BlockDownload.hs +++ b/hbs2-peer/app/BlockDownload.hs @@ -853,46 +853,3 @@ processBlock h = do unboxBundleRef (BundleRefValue box) = unboxSignedBox0 box --- NOTE: this is an adapter for a ResponseM monad --- because response is working in ResponseM monad (ha!) --- So don't be confused with types --- -mkAdapter :: forall e m . ( m ~ PeerM e IO - , HasProtocol e (BlockChunks e) - , Hashable (SessionKey e (BlockChunks e)) - , Sessions e (BlockChunks e) (ResponseM e m) - , Typeable (SessionKey e (BlockChunks e)) - , EventEmitter e (BlockChunks e) m - , Pretty (Peer e) - ) - => m (BlockChunksI e (ResponseM e m )) -mkAdapter = do - storage <- getStorage - pure $ - BlockChunksI - { blkSize = liftIO . hasBlock storage - , blkChunk = \h o s -> liftIO (getChunk storage h o s) - , blkGetHash = \c -> find (DownloadSessionKey @e c) (view sBlockHash) - - , blkAcceptChunk = \(c,p,h,n,bs) -> void $ runMaybeT $ do - let cKey = DownloadSessionKey (p,c) - - dodo <- lift $ find cKey (view sBlockChunks) - - unless (isJust dodo) $ do - debug $ "session lost for peer !" <+> pretty p - --- debug $ "FINDING-SESSION:" <+> pretty c <+> pretty n --- debug $ "GOT SHIT" <+> pretty c <+> pretty n - - se <- MaybeT $ find cKey id - let dwnld = view sBlockChunks se - let dwnld2 = view sBlockChunks2 se - - -- debug $ "WRITE SHIT" <+> pretty c <+> pretty n - liftIO $ atomically do - writeTQueue dwnld (n, bs) - modifyTVar' dwnld2 (IntMap.insert (fromIntegral n) bs) - } - - diff --git a/hbs2-peer/app/BlockDownloadNew.hs b/hbs2-peer/app/BlockDownloadNew.hs new file mode 100644 index 00000000..4fc4ba37 --- /dev/null +++ b/hbs2-peer/app/BlockDownloadNew.hs @@ -0,0 +1,554 @@ +{-# OPTIONS_GHC -fno-warn-orphans #-} +{-# Language UndecidableInstances #-} +{-# Language AllowAmbiguousTypes #-} +module BlockDownloadNew where + +import HBS2.Prelude.Plated +import HBS2.OrDie +import HBS2.Data.Detect +import HBS2.Hash +import HBS2.Merkle +import HBS2.Defaults +import HBS2.Events +import HBS2.Net.Proto.Service +import HBS2.Net.Proto.Sessions + + +import HBS2.Base58 +import HBS2.Data.Types.Peer +import HBS2.Data.Types.Refs +import HBS2.Actors.Peer +import HBS2.Peer.Proto.Peer +import HBS2.Peer.Proto.BlockInfo +import HBS2.Peer.Proto.BlockChunks +import HBS2.Peer.Brains +import HBS2.Storage +import HBS2.Storage.Operations.Missed +import HBS2.Misc.PrettyStuff +import HBS2.Clock +import HBS2.Net.Auth.Schema + +import HBS2.Peer.RPC.Internal.Types +import HBS2.Peer.RPC.API.Peer + +import HBS2.Net.Messaging.TCP + +import PeerTypes +import PeerInfo + +import Control.Monad.Trans.Maybe +import Control.Monad.Trans.Cont +import Control.Concurrent.STM (flushTQueue,retry) +import Data.Map qualified as Map +import Data.Map (Map) +import Data.HashMap.Strict (HashMap) +import Data.HashMap.Strict qualified as HM +import Data.HashSet (HashSet) +import Data.HashSet qualified as HS +import Data.IntMap qualified as IntMap +import Data.IntMap (IntMap) +import Data.List.Split qualified as Split +import Data.Text qualified as Text +import Data.Either +import Data.Maybe +import Data.ByteString.Lazy qualified as LBS +import Data.ByteString.Lazy (ByteString) +import Data.ByteString qualified as BS +import Data.List qualified as L +import Data.Coerce +import Numeric +import UnliftIO +import UnliftIO.Concurrent +import Lens.Micro.Platform +import Streaming.Prelude qualified as S + + +data DownloadError e = + DownloadStuckError HashRef (Peer e) + | StorageError + | UnknownPeerError (Peer e) + | InternalError Int + | PeerMissBlockError HashRef (Peer e) + | PeerBlockHashMismatch (Peer e) + | PeerRequestTimeout (Peer e) + | Incomplete HashRef + deriving stock (Generic,Typeable) + +instance Pretty (Peer e) => Show (DownloadError e) where + show (DownloadStuckError h p) = show $ parens $ "DownloadStuck" <+> pretty h <+> pretty p + show (UnknownPeerError p) = show $ parens $ "UnknownPeerError" <+> pretty p + show (PeerMissBlockError h p) = show $ parens $ "PeerMissBlockError" <+> pretty h <+> pretty p + show (PeerRequestTimeout p) = show $ parens $ "PeerRequestTimeout" <+> pretty p + show (PeerBlockHashMismatch p) = show $ parens $ "PeerBlockHashMismatch" <+> pretty p + show StorageError = show "StorageError" + show (InternalError n) = show $ parens "InternalError" <+> pretty n + show (Incomplete h) = show $ parens "Incomplete" <+> pretty h + +instance (Typeable e, Pretty (Peer e)) => Exception (DownloadError e) + + +class BlockSizeCache e cache where + + cacheBlockSize :: forall m . MonadUnliftIO m + => cache + -> PubKey 'Sign (Encryption e) + -> Hash HbSync + -> Integer + -> m () + + findBlockSize :: forall m . MonadUnliftIO m + => cache + -> PubKey 'Sign (Encryption e) + -> Hash HbSync + -> m (Maybe Integer) + +instance BlockSizeCache e () where + cacheBlockSize _ _ _ _ = pure () + findBlockSize _ _ _ = pure Nothing + +instance BlockSizeCache e (SomeBrains e) where + cacheBlockSize = brainsCacheBlockSize @e + findBlockSize = brainsFindBlockSize @e + +queryBlockSizeFromPeer :: forall e cache m . ( e ~ L4Proto + , MonadUnliftIO m + , BlockSizeCache e cache + ) + => cache + -> PeerEnv e + -> Hash HbSync + -> Peer e + -> m (Either (DownloadError e) (Maybe Integer)) + +queryBlockSizeFromPeer cache e h peer = do + + what <- try @_ @(DownloadError e) $ liftIO $ withPeerM e do + + flip runContT pure $ callCC \exit -> do + + PeerData{..} <- lift $ find (KnownPeerKey peer) id + >>= orThrow (UnknownPeerError peer) + + s <- lift $ findBlockSize @e cache _peerSignKey h + + debug $ "FOUND CACHED VALUE" <+> pretty h <+> pretty s + + maybe none (exit . Just) s + + lift do + + sizeQ <- newTQueueIO + + subscribe @e (BlockSizeEventKey peer) $ \case + BlockSizeEvent (that, hx, sz) | hx == h -> do + atomically $ writeTQueue sizeQ (Just sz) + cacheBlockSize @e cache _peerSignKey h sz + + _ -> do + atomically $ writeTQueue sizeQ Nothing + + request peer (GetBlockSize @e h) + + race ( pause defBlockInfoTimeout ) (atomically $ readTQueue sizeQ ) + >>= orThrow (PeerRequestTimeout peer) + + case what of + Left{} -> pure $ Left (PeerRequestTimeout peer) + Right x -> pure (Right x) + + +data BurstMachine = + BurstMachine + { _buTimeout :: Double + , _buBurstMax :: Int + , _buStepUp :: Double + , _buStepDown :: Double + , _buCurrent :: TVar Double + , _buErrors :: TVar Int + } + +burstMachineAddErrors :: MonadUnliftIO m => BurstMachine -> Int -> m () +burstMachineAddErrors BurstMachine{..} n = + atomically $ modifyTVar _buErrors (+n) + +newBurstMachine :: MonadUnliftIO m + => Double -- ^ timeout + -> Int -- ^ max burst + -> Maybe Int -- ^ start burst + -> Double -- ^ step up + -> Double -- ^ step down + -> m BurstMachine + +newBurstMachine t0 buMax buStart up' down' = do + BurstMachine t0 buMax up down + <$> newTVarIO bu0 + <*> newTVarIO 0 + + where + bu0 = realToFrac $ fromMaybe (max 2 (buMax `div` 2)) buStart + down = min 0.85 down' + up = min 0.5 up' + +getCurrentBurst :: MonadUnliftIO m => BurstMachine -> m Int +getCurrentBurst BurstMachine{..} = readTVarIO _buCurrent <&> round + +runBurstMachine :: MonadUnliftIO m + => BurstMachine + -> m () + +runBurstMachine BurstMachine{..} = do + + + bu0 <- readTVarIO _buCurrent <&> realToFrac + let buMax = realToFrac _buBurstMax + let down = _buStepDown + let up = _buStepUp + + _dEdT <- newTVarIO 0.00 + + _rates <- newTVarIO (mempty :: Map Double Double) + + _buMaxReal <- newTVarIO buMax + + pause @'Seconds (realToFrac _buTimeout) + + flip runContT pure do + + void $ ContT $ withAsync do + forever do + pause @'Seconds (realToFrac _buTimeout * 10) + + atomically do + e <- headDef bu0 . Map.elems <$> readTVar _rates + nrates <- readTVar _rates <&> take 100 . Map.toList + writeTVar _rates (Map.fromList nrates) + modifyTVar _buMaxReal (max e) + + void $ ContT $ withAsync do + forever do + pause @'Seconds 600 + atomically $ writeTVar _buMaxReal buMax + + + void $ ContT $ withAsync do + forever do + pause @'Seconds (realToFrac _buTimeout * 2.0) + ddt <- readTVarIO _dEdT + + when (ddt <= 0) do + atomically do + buMaxReal <- readTVar _buMaxReal + current <- readTVar _buCurrent + let new = min buMaxReal (current * (1.0 + up)) + writeTVar _buCurrent new + + flip fix 0 $ \next e1 -> do + + let dt = realToFrac _buTimeout + + eNew <- atomically do + + e2 <- readTVar _buErrors + current <- readTVar _buCurrent + + new <- if e2 > e1 then do + let d = max 2.0 (current * (1.0 - down)) + nrates <- readTVar _rates <&> drop 3 . Map.toList + let newFucked = maybe d snd (headMay nrates) + writeTVar _rates (Map.fromList nrates) + pure newFucked + + else + pure current -- $ min buMaxReal (current * (1.0 + up)) + + writeTVar _buErrors 0 + writeTVar _buCurrent new + + let dedt = realToFrac (e2 - e1) / realToFrac dt + + writeTVar _dEdT (realToFrac dedt) + + modifyTVar _rates ( Map.insertWith max dedt current ) + + pure e2 + + pause @'Seconds dt + next eNew + +data S = + SInit + | SFetchQ + | SFetchPost (Hash HbSync) ByteString + | SCheckBefore + | SCheckAfter + +-- | downloads block with dependencies recursively +downloadFromPeerRec :: forall e t cache m . ( e ~ L4Proto + , MonadUnliftIO m + , IsTimeout t + , BlockSizeCache e cache + ) + => Timeout t + -> Int + -> cache + -> PeerEnv e + -> Hash HbSync + -> Peer e + -> m (Either (DownloadError e) ()) + +downloadFromPeerRec t bu0 cache env h0 peer = do + + sto <- withPeerM env getStorage + + p <- newTQueueIO + q <- newTQueueIO + qq <- newTQueueIO + toq <- newTVarIO ( mempty :: [Int] ) + + bm <- newBurstMachine 0.5 256 (Just bu0) 0.05 0.10 + + flip runContT pure do + + ContT $ withAsync $ forever do + join $ atomically (readTQueue p) + + ContT $ withAsync $ forever do + h <- atomically (readTQueue qq) + void $ queryBlockSizeFromPeer cache env h peer + pause @'Seconds 1.5 + + ContT $ withAsync $ flip fix 10000000 $ \next m0 -> do + txs <- readTVarIO toq <&> L.take 1000 + let m1 = fromMaybe m0 $ median txs + when ( m1 > m0 ) $ burstMachineAddErrors bm 1 + pause @'Seconds 3 + next m1 + + ContT $ withAsync $ runBurstMachine bm + + flip fix SInit $ \next -> \case + + SInit -> do + debug "SInit" + atomically $ writeTQueue q h0 + next SCheckBefore + + SCheckBefore -> do + here <- hasBlock sto h0 <&> isJust + if here then next SCheckAfter else next SFetchQ + + SFetchQ -> do + debug "SFetchQ" + + done <- atomically do + pe <- isEmptyTQueue p + qe <- isEmptyTQueue q + when (qe && not pe) retry + -- when (not pe) retry + pure qe + + if done then + next SCheckAfter + else do + + h <- atomically $ readTQueue q + mbs <- getBlock sto h + + case mbs of + Just bs -> next (SFetchPost h bs) + Nothing -> none + + bu <- lift $ getCurrentBurst bm + + t0 <- getTimeCoarse + w <- lift $ downloadFromPeer t bu cache env (coerce h) peer + t1 <- getTimeCoarse + let dt = toMicroSeconds $ TimeoutTS (t1 - t0) + atomically $ modifyTVar toq ( dt : ) + + case w of + Right bs -> do + next (SFetchPost h bs) + + Left e -> do + lift $ burstMachineAddErrors bm 1 + err $ "DOWNLOAD ERROR" <+> viaShow e + next SFetchQ + + SFetchPost h bs -> do + debug $ "SFetchPost" <+> pretty h + + let parse = do + let refs = extractBlockRefs h bs + atomically $ mapM_ (writeTQueue q . coerce) refs + mapM_ (atomically . writeTQueue qq . coerce) refs + + atomically $ writeTQueue p parse + + next SFetchQ + + SCheckAfter -> do + debug "SCheckAfter" + missed <- findMissedBlocks sto (HashRef h0) + mapM_ (atomically . writeTQueue q . coerce) missed + mapM_ (atomically . writeTQueue qq . coerce) missed + unless (L.null missed) $ next SFetchQ + + pure $ Right () + +downloadFromPeer :: forall e t cache m . ( e ~ L4Proto + , MonadUnliftIO m + , IsTimeout t + , BlockSizeCache e cache + ) + => Timeout t + -> Int + -> cache + -> PeerEnv e + -> Hash HbSync + -> Peer e + -> m (Either (DownloadError e) ByteString) + +downloadFromPeer t bu cache env h peer = liftIO $ withPeerM env do + + pd@PeerData{..} <- find (KnownPeerKey peer) id + >>= orThrow (UnknownPeerError peer) + + pinfo <- find (PeerInfoKey peer) id + >>= orThrow (UnknownPeerError peer) + + rtt <- liftIO $ medianPeerRTT pinfo + <&> fmap ((*1) . realToFrac) + <&> fromMaybe 1000 + <&> (/1e6) + + let waity = 10 * rtt + + sto <- getStorage + + let chunkSize = defChunkSize + + flip runContT pure $ callCC \exit -> do + + size <- lift (findBlockSize @e cache _peerSignKey h) + >>= maybe (queryBlockSize exit) pure + + coo <- genCookie (peer,h) + let key = DownloadSessionKey (peer, coo) + down@BlockDownload{..} <- newBlockDownload h + let chuQ = _sBlockChunks + let new = set sBlockChunkSize chunkSize + . set sBlockSize (fromIntegral size) + $ down + + lift $ update @e new key id + + let offsets = calcChunks size (fromIntegral chunkSize) :: [(Offset, Size)] + + let chunkNums = [ 0 .. pred (length offsets) ] + + let bursts = calcBursts bu chunkNums + + callCC $ \exit2 -> do + + _wx <- newTVarIO waity + + for_ bursts $ \(i,chunkN) -> do + + -- atomically $ flushTQueue chuQ + + let req = BlockChunks @e coo (BlockGetChunks h chunkSize (fromIntegral i) (fromIntegral chunkN)) + + lift $ request peer req + + t0 <- getTimeCoarse + + let watchdog = fix \next -> do + wx <- readTVarIO _wx <&> realToFrac + -- debug $ "WATCHDOG" <+> pretty wx <+> pretty waity + r <- race (pause @'MilliSeconds (min wx waity)) do + void $ atomically $ readTQueue chuQ + either (const none) (const next) r + + r <- liftIO $ race watchdog do + + atomically do + pieces <- readTVar _sBlockChunks2 + let done = and [ IntMap.member j pieces | j <- [i .. i + chunkN-1] ] + unless done retry + + atomically $ flushTQueue chuQ + + t1 <- getTimeCoarse + + atomically do + wx0 <- readTVar _wx + let wx1 = realToFrac (t1 - t0) * 100 / 1e6 -- millis + writeTVar _wx wx1 + + case r of + Left{} -> exit2 (Left $ DownloadStuckError (HashRef h) peer) + _ -> pure () + + + blk <- readTVarIO _sBlockChunks2 + let rs = LBS.concat $ IntMap.elems blk + + ha <- putBlock sto rs + + -- let ha = Just $ hashObject @HbSync rs + + lift $ expire @e key + + case ha of + Nothing -> pure $ Left StorageError + + Just h1 | h1 == h -> do + pure $ Right rs + + Just h1 -> do + delBlock sto h1 + pure $ Left (PeerBlockHashMismatch peer) + + where + + queryBlockSize exit = do + what <- lift $ queryBlockSizeFromPeer cache env h peer + case what of + Left{} -> exit (Left (PeerRequestTimeout peer)) + Right Nothing -> exit (Left (PeerMissBlockError (HashRef h) peer)) + Right (Just s) -> pure s + + + +downloadDispatcher :: forall e m . ( e ~ L4Proto + , MonadUnliftIO m + ) + => PeerEnv e + -> m () +downloadDispatcher env = flip runContT pure do + + pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async ()) ) + + ContT $ bracket none $ const do + readTVarIO pts >>= mapM_ cancel + atomically $ writeTVar pts mempty + + liftIO $ withPeerM env do + subscribe @e DownloadReqKey $ \(DownloadReqData h) -> do + debug $ green "Download request" <+> pretty h + + pause @'Seconds 1 + + ContT $ withAsync $ withPeerM env $ forever do + pips <- getKnownPeers @e + for_ pips $ \p -> do + debug $ "downloadDispatcher: knownPeer" <+> yellow (pretty p) + + pause @'Seconds 5 + + forever do + pause @'Seconds 10 + debug $ yellow $ "I'm download dispatcher" + + diff --git a/hbs2-peer/app/CheckBlockAnnounce.hs b/hbs2-peer/app/CheckBlockAnnounce.hs index 2450de35..2c350eb6 100644 --- a/hbs2-peer/app/CheckBlockAnnounce.hs +++ b/hbs2-peer/app/CheckBlockAnnounce.hs @@ -13,8 +13,6 @@ import HBS2.Net.Proto.Types import PeerTypes import PeerConfig import CheckPeer (peerBanned) -import BlockDownload() -import DownloadQ() import Control.Monad.Trans.Maybe import Control.Monad.Reader @@ -90,13 +88,13 @@ checkBlockAnnounce :: forall e m . ( e ~ L4Proto , m ~ PeerM e IO ) => PeerConfig - -> DownloadEnv e + -> PeerEnv e -> PeerNonce -> PeerAddr e -> Hash HbSync -> m () -checkBlockAnnounce conf denv nonce pa h = void $ runMaybeT do +checkBlockAnnounce conf penv nonce pa h = void $ runMaybeT do accept <- lift $ acceptAnnouncesFromPeer conf pa @@ -108,8 +106,6 @@ checkBlockAnnounce conf denv nonce pa h = void $ runMaybeT do guard accept - lift do - withDownload denv $ do - -- TODO: use-brains-to-download-direct - addDownload Nothing h + liftIO $ withPeerM penv do + addDownload @e Nothing h diff --git a/hbs2-peer/app/DownloadQ.hs b/hbs2-peer/app/DownloadQ.hs deleted file mode 100644 index 315aa55c..00000000 --- a/hbs2-peer/app/DownloadQ.hs +++ /dev/null @@ -1,53 +0,0 @@ -{-# Language AllowAmbiguousTypes #-} -module DownloadQ where - -import HBS2.Prelude -import HBS2.Clock -import HBS2.Events -import HBS2.Data.Types.Refs -import HBS2.Net.PeerLocator -import HBS2.Peer.Brains -import HBS2.Storage -import HBS2.Storage.Operations.Missed - -import PeerTypes -import PeerConfig - -import Data.Foldable -import Control.Monad -import Lens.Micro.Platform - -downloadQueue :: forall e m . ( MyPeer e - , DownloadFromPeerStuff e m - , HasPeerLocator e (BlockDownloadM e m) - , HasPeerLocator e m - , EventListener e (DownloadReq e) m - , HasStorage m - ) => PeerConfig - -> SomeBrains e - -> DownloadEnv e -> m () - -downloadQueue _ brains denv = do - debug "DownloadQ started" - - down <- listDownloads @e brains - sto <- getStorage - - withDownload denv do - forM_ down $ \(HashRef h,_) -> do - missed <- findMissedBlocks sto (HashRef h) - for_ missed $ \h -> do - debug $ "DownloadQ:" <+> pretty h - addDownload mzero (fromHashRef h) - - -- FIXME: timeout-hardcodes - let refs = listDownloads @e brains <&> fmap (set _2 30) - - polling (Polling 5 20) refs $ \ref -> do - missed <- findMissedBlocks sto ref - trace $ "DownloadQ. check" <+> pretty ref <+> pretty (length missed) - - when (null missed) do - delDownload @e brains ref - - diff --git a/hbs2-peer/app/Fetch.hs b/hbs2-peer/app/Fetch.hs index 2096ebd9..ba91a62f 100644 --- a/hbs2-peer/app/Fetch.hs +++ b/hbs2-peer/app/Fetch.hs @@ -2,28 +2,27 @@ module Fetch where import HBS2.Prelude import HBS2.Actors.Peer +import HBS2.Events import HBS2.Data.Types.Refs import HBS2.Storage.Operations.Missed import HBS2.Net.Proto.Types import PeerTypes -import BlockDownload -import Data.Foldable (for_) - -fetchHash :: forall e m . (e ~ L4Proto, MonadIO m) +fetchHash :: forall e m . ( e ~ L4Proto + , MonadIO m + ) => PeerEnv e - -> DownloadEnv e -> HashRef -> m () -fetchHash penv denv href = do +fetchHash penv href = do debug $ "fetchAction" <+> pretty h liftIO $ withPeerM penv $ do sto <- getStorage missed <- findMissedBlocks sto href for_ missed $ \miss -> do - withDownload denv (addDownload Nothing (fromHashRef miss)) + addDownload @e Nothing (fromHashRef miss) where h = fromHashRef href diff --git a/hbs2-peer/app/HttpWorker.hs b/hbs2-peer/app/HttpWorker.hs index 6c8dc16f..a581d855 100644 --- a/hbs2-peer/app/HttpWorker.hs +++ b/hbs2-peer/app/HttpWorker.hs @@ -84,9 +84,9 @@ httpWorker :: forall e s m . ( MyPeer e , m ~ PeerM e IO , e ~ L4Proto -- , ForLWWRefProto e - ) => PeerConfig -> AnnMetaData -> DownloadEnv e -> m () + ) => PeerConfig -> AnnMetaData -> m () -httpWorker (PeerConfig syn) pmeta e = do +httpWorker (PeerConfig syn) pmeta = do sto <- getStorage let port' = runReader (cfgValue @PeerHttpPortKey) syn <&> fromIntegral diff --git a/hbs2-peer/app/MailboxProtoWorker.hs b/hbs2-peer/app/MailboxProtoWorker.hs index 0f49f57c..e99db49e 100644 --- a/hbs2-peer/app/MailboxProtoWorker.hs +++ b/hbs2-peer/app/MailboxProtoWorker.hs @@ -42,7 +42,6 @@ import HBS2.Misc.PrettyStuff import Brains import PeerConfig import PeerTypes -import BlockDownload() import DBPipe.SQLite as Q @@ -125,7 +124,6 @@ instance ForMailbox s => Hashable (MailboxDownload s) data MailboxProtoWorker (s :: CryptoScheme) e = MailboxProtoWorker { mpwPeerEnv :: PeerEnv e - , mpwDownloadEnv :: DownloadEnv e , mpwStorage :: AnyStorage , mpwCredentials :: PeerCredentials s , mpwFetchQ :: TVar (HashSet (MailboxRefKey s)) @@ -457,7 +455,7 @@ startDownloadStuff :: forall s e m . (ForMailbox s, s ~ Encryption e, MyPeer e, -> m () startDownloadStuff MailboxProtoWorker{..} href = do - liftIO $ withPeerM mpwPeerEnv $ withDownload mpwDownloadEnv + liftIO $ withPeerM mpwPeerEnv $ do debug $ "startDownloadStuff" <+> pretty href addDownload @e Nothing (coerce href) @@ -552,13 +550,12 @@ createMailboxProtoWorker :: forall s e m . ( MonadIO m ) => PeerCredentials s -> PeerEnv e - -> DownloadEnv e -> AnyStorage -> m (MailboxProtoWorker s e) -createMailboxProtoWorker pc pe de sto = do +createMailboxProtoWorker pc pe sto = do -- FIXME: queue-size-hardcode -- $class: hardcode - MailboxProtoWorker pe de sto pc + MailboxProtoWorker pe sto pc <$> newTVarIO mempty <*> newTBQueueIO 8000 <*> newTVarIO mempty diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index d091876d..1043cbff 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -44,10 +44,9 @@ import Brains import BrainyPeerLocator import ByPassWorker import PeerTypes hiding (info) -import BlockDownload +import BlockDownloadNew import CheckBlockAnnounce (checkBlockAnnounce) import CheckPeer (peerBanned) -import DownloadQ import PeerInfo import PeerConfig import Bootstrap @@ -836,11 +835,6 @@ runPeer opts = respawnOnError opts $ do brainsThread <- async $ runBasicBrains conf brains - denv <- newDownloadEnv brains - - dProbe <- newSimpleProbe "BlockDownload" - addProbe dProbe - downloadEnvSetProbe denv dProbe pl <- AnyPeerLocator <$> newBrainyPeerLocator @e (SomeBrains @e brains) mempty @@ -956,7 +950,7 @@ runPeer opts = respawnOnError opts $ do pause @'Seconds 600 liftIO $ Cache.purgeExpired nbcache - rce <- refChanWorkerEnv conf penv denv refChanNotifySource + rce <- refChanWorkerEnv conf penv refChanNotifySource rcwProbe <- newSimpleProbe "RefChanWorker" addProbe rcwProbe @@ -987,7 +981,7 @@ runPeer opts = respawnOnError opts $ do rcw <- async $ liftIO $ runRefChanRelyWorker rce refChanAdapter - mailboxWorker <- createMailboxProtoWorker pc penv denv (AnyStorage s) + mailboxWorker <- createMailboxProtoWorker pc penv (AnyStorage s) p <- newSimpleProbe "MailboxProtoWorker" mailboxProtoWorkerSetProbe mailboxWorker p @@ -1002,7 +996,7 @@ runPeer opts = respawnOnError opts $ do when (Set.member pk helpFetchKeys) do liftIO $ Cache.insert nbcache (p,h) () -- debug $ "onNoBlock" <+> pretty p <+> pretty h - withPeerM penv $ withDownload denv (addDownload mzero h) + liftIO $ withPeerM penv $ addDownload @e mzero h loop <- liftIO $ async do @@ -1014,10 +1008,10 @@ runPeer opts = respawnOnError opts $ do let doDownload h = do pro <- isReflogProcessed @e brains h if pro then do - withPeerM penv $ withDownload denv (addDownload mzero h) + withPeerM penv $ addDownload @e mzero h else do -- FIXME: separate-process-to-mark-logs-processed - withPeerM penv $ withDownload denv (addDownload Nothing h) + withPeerM penv $ addDownload @e Nothing h let doFetchRef puk = do withPeerM penv $ do @@ -1064,7 +1058,7 @@ runPeer opts = respawnOnError opts $ do subscribe @e BlockAnnounceInfoKey $ \(BlockAnnounceEvent p bi no) -> do pa <- toPeerAddr p - checkBlockAnnounce conf denv no pa (view biHash bi) + checkBlockAnnounce conf penv no pa (view biHash bi) subscribe @e AnyKnownPeerEventKey $ \(KnownPeerEvent p pd) -> do @@ -1169,7 +1163,7 @@ runPeer opts = respawnOnError opts $ do let lwwRefProtoA = lwwRefProto (LWWRefProtoAdapter { lwwFetchBlock = download }) - where download h = withPeerM env $ withDownload denv (addDownload Nothing h) + where download h = liftIO $ withPeerM env $ addDownload @e Nothing h flip runContT pure do @@ -1180,7 +1174,7 @@ runPeer opts = respawnOnError opts $ do peerThread "byPassWorker" (byPassWorker byPass) - peerThread "httpWorker" (httpWorker conf peerMeta denv) + peerThread "httpWorker" (httpWorker conf peerMeta) metricsProbe <- newSimpleProbe "ghc.runtime" addProbe metricsProbe @@ -1203,9 +1197,7 @@ runPeer opts = respawnOnError opts $ do peerThread "pexLoop" (pexLoop @e brains tcp) -- FIXME: new-download-loop - peerThread "blockDownloadLoop" (blockDownloadLoop denv) - - peerThread "blockDownloadQ" (downloadQueue conf (SomeBrains brains) denv) + peerThread "downloadDispatcher" (downloadDispatcher env) peerThread "fillPeerMeta" (fillPeerMeta tcp tcpProbeWait) @@ -1227,7 +1219,7 @@ runPeer opts = respawnOnError opts $ do liftIO $ withPeerM penv do runProto @e - [ makeResponse (blockSizeProto blk (downloadOnBlockSize denv) onNoBlock) + [ makeResponse (blockSizeProto blk onNoBlock) , makeResponse (blockChunksProto adapter) , makeResponse blockAnnounceProto , makeResponse (withCredentials @e pc . peerHandShakeProto hshakeAdapter penv) @@ -1300,7 +1292,7 @@ runPeer opts = respawnOnError opts $ do subscribe @e BlockAnnounceInfoKey $ \(BlockAnnounceEvent p bi no) -> do unless (p == self) do pa <- toPeerAddr p - checkBlockAnnounce conf denv no pa (view biHash bi) + checkBlockAnnounce conf penv no pa (view biHash bi) subscribe @e PeerAnnounceEventKey $ \pe@(PeerAnnounceEvent{}) -> do -- debug $ "Got peer announce!" <+> pretty pip @@ -1345,7 +1337,7 @@ runPeer opts = respawnOnError opts $ do , rpcBrains = SomeBrains brains , rpcByPassInfo = liftIO (getStat byPass) , rpcProbes = probes - , rpcDoFetch = liftIO . fetchHash penv denv + , rpcDoFetch = liftIO . fetchHash penv , rpcDoRefChanHeadPost = refChanHeadPostAction , rpcDoRefChanPropose = refChanProposeAction , rpcDoRefChanNotify = refChanNotifyAction diff --git a/hbs2-peer/app/PeerTypes.hs b/hbs2-peer/app/PeerTypes.hs index 56b22992..22e429fd 100644 --- a/hbs2-peer/app/PeerTypes.hs +++ b/hbs2-peer/app/PeerTypes.hs @@ -39,6 +39,7 @@ import PeerConfig import PeerLogger import Prelude hiding (log) +import Control.Monad.Trans.Maybe import Control.Monad.Reader import Control.Monad.Writer qualified as W import Data.ByteString.Lazy (ByteString) @@ -49,6 +50,7 @@ import Data.Maybe import Lens.Micro.Platform import Data.Hashable import Type.Reflection +import Data.IntMap qualified as IntMap import Data.IntMap (IntMap) import Data.IntSet (IntSet) import Data.Text qualified as Text @@ -122,11 +124,11 @@ instance Expires (SessionKey L4Proto (PeerInfo L4Proto)) where expiresIn = const (Just defCookieTimeoutSec) - type MyPeer e = ( Eq (Peer e) , Hashable (Peer e) , Pretty (Peer e) , HasPeer e + , Typeable e , ForSignedBox (Encryption e) ) @@ -243,121 +245,19 @@ downloadMonAdd :: forall m . MonadIO m downloadMonAdd env h whenDone = do atomically $ modifyTVar (view downloads env) (HashMap.insert h whenDone) -data DownloadEnv e = - DownloadEnv - { _blockInQ :: TVar (HashMap (Hash HbSync) BlockState) - , _blockInDirty :: TVar Bool - , _blockCheckQ :: TQueue (Hash HbSync) - , _blockSizeRecvQ :: TQueue (Peer e, Hash HbSync, Maybe Integer) - -- FIXME: trim!! - -- , _blockProposed :: Cache (Hash HbSync, Peer e) () - , _downloadMon :: DownloadMonEnv - , _downloadBrains :: SomeBrains e - , _downloadProbe :: TVar AnyProbe - } -makeLenses 'DownloadEnv - - -downloadEnvSetProbe :: forall e m . (MonadIO m, MyPeer e) - => DownloadEnv e - -> AnyProbe - -> m () -downloadEnvSetProbe DownloadEnv{..} p = do - atomically $ writeTVar _downloadProbe p - -newDownloadEnv :: (MonadIO m, MyPeer e, HasBrains e brains) => brains -> m (DownloadEnv e) -newDownloadEnv brains = liftIO do - DownloadEnv <$> newTVarIO mempty - <*> newTVarIO False - <*> newTQueueIO - <*> newTQueueIO - -- <*> Cache.newCache (Just (toTimeSpec (2 :: Timeout 'Seconds))) - <*> downloadMonNew - <*> pure (SomeBrains brains) - <*> newTVarIO (AnyProbe ()) - -newtype BlockDownloadM e m a = - BlockDownloadM { fromBlockDownloadM :: ReaderT (DownloadEnv e) m a } - deriving newtype ( Functor - , Applicative - , Monad - , MonadIO - , MonadUnliftIO - , MonadReader (DownloadEnv e) - , MonadTrans - ) - -withDownload :: (MyPeer e, HasPeerLocator e m, MonadIO m) => DownloadEnv e -> BlockDownloadM e m a -> m a -withDownload e m = runReaderT ( fromBlockDownloadM m ) e - - -isBlockHereCached :: forall e m . ( MyPeer e - , MonadIO m - , HasStorage m - ) - => Hash HbSync -> BlockDownloadM e m Bool - -isBlockHereCached h = do - sto <- lift getStorage - liftIO $ hasBlock sto h <&> isJust - - -type DownloadConstr e m = ( MyPeer e - , MonadIO m - , HasPeerLocator e (BlockDownloadM e m) - , HasStorage m -- (BlockDownloadM e m) +type DownloadConstr e m = ( MonadIO m + , EventEmitter e (DownloadReq e) m ) addDownload :: forall e m . ( DownloadConstr e m ) => Maybe (Hash HbSync) -> Hash HbSync - -> BlockDownloadM e m () + -> m () addDownload mbh h = do - - tinq <- asks (view blockInQ) - checkQ <- asks (view blockCheckQ) - dirty <- asks (view blockInDirty) - brains <- asks (view downloadBrains) - here <- isBlockHereCached h - - if here then do - deleteBlockFromQ h - else do - newBlock <- BlockState - <$> liftIO getTimeCoarse - <*> pure Nothing - <*> liftIO (newTVarIO BlkNew) - - claimBlockCameFrom @e brains mbh h - -- Cache.insert - liftIO $ atomically $ do - modifyTVar tinq $ HashMap.insert h newBlock - writeTQueue checkQ h - writeTVar dirty True - - -deleteBlockFromQ :: MonadIO m => Hash HbSync -> BlockDownloadM e m () -deleteBlockFromQ h = do - inq <- asks (view blockInQ) - liftIO $ atomically $ modifyTVar' inq (HashMap.delete h) - -failedDownload :: forall e m . ( MyPeer e - , MonadIO m - , HasPeer e - , HasPeerLocator e (BlockDownloadM e m) - , HasStorage m - ) - => Peer e - -> Hash HbSync - -> BlockDownloadM e m () - -failedDownload p h = do - trace $ "failedDownload" <+> pretty p <+> pretty h - addDownload mzero h - -- FIXME: brains-download-fail + emit @e DownloadReqKey (DownloadReqData h) type ForGossip e p m = ( MonadIO m @@ -527,3 +427,46 @@ authorized f req = do when auth (f req) +-- NOTE: this is an adapter for a ResponseM monad +-- because response is working in ResponseM monad (ha!) +-- So don't be confused with types +-- +mkAdapter :: forall e m . ( m ~ PeerM e IO + , HasProtocol e (BlockChunks e) + , Hashable (SessionKey e (BlockChunks e)) + , Sessions e (BlockChunks e) (ResponseM e m) + , Typeable (SessionKey e (BlockChunks e)) + , EventEmitter e (BlockChunks e) m + , Pretty (Peer e) + ) + => m (BlockChunksI e (ResponseM e m )) +mkAdapter = do + storage <- getStorage + pure $ + BlockChunksI + { blkSize = liftIO . hasBlock storage + , blkChunk = \h o s -> liftIO (getChunk storage h o s) + , blkGetHash = \c -> find (DownloadSessionKey @e c) (view sBlockHash) + + , blkAcceptChunk = \(c,p,h,n,bs) -> void $ runMaybeT $ do + let cKey = DownloadSessionKey (p,c) + + dodo <- lift $ find cKey (view sBlockChunks) + + unless (isJust dodo) $ do + debug $ "session lost for peer !" <+> pretty p + +-- debug $ "FINDING-SESSION:" <+> pretty c <+> pretty n +-- debug $ "GOT SHIT" <+> pretty c <+> pretty n + + se <- MaybeT $ find cKey id + let dwnld = view sBlockChunks se + let dwnld2 = view sBlockChunks2 se + + -- debug $ "WRITE SHIT" <+> pretty c <+> pretty n + liftIO $ atomically do + writeTQueue dwnld (n, bs) + modifyTVar' dwnld2 (IntMap.insert (fromIntegral n) bs) + } + + diff --git a/hbs2-peer/app/RPC2.hs b/hbs2-peer/app/RPC2.hs index 555b1419..aa55a823 100644 --- a/hbs2-peer/app/RPC2.hs +++ b/hbs2-peer/app/RPC2.hs @@ -1,7 +1,6 @@ {-# OPTIONS_GHC -fno-warn-orphans #-} {-# Language UndecidableInstances #-} {-# Language AllowAmbiguousTypes #-} -{-# LANGUAGE ImplicitParams #-} module RPC2 ( module RPC2.Peer , module RPC2.RefLog @@ -51,6 +50,7 @@ import RPC2.Mailbox() import PeerTypes import PeerInfo +import BlockDownloadNew import Control.Monad.Trans.Maybe import Control.Monad.Trans.Cont @@ -77,462 +77,6 @@ import Lens.Micro.Platform import Streaming.Prelude qualified as S -data DownloadError e = - DownloadStuckError HashRef (Peer e) - | StorageError - | UnknownPeerError (Peer e) - | InternalError Int - | PeerMissBlockError HashRef (Peer e) - | PeerBlockHashMismatch (Peer e) - | PeerRequestTimeout (Peer e) - | Incomplete HashRef - deriving stock (Generic,Typeable) - -instance Pretty (Peer e) => Show (DownloadError e) where - show (DownloadStuckError h p) = show $ parens $ "DownloadStuck" <+> pretty h <+> pretty p - show (UnknownPeerError p) = show $ parens $ "UnknownPeerError" <+> pretty p - show (PeerMissBlockError h p) = show $ parens $ "PeerMissBlockError" <+> pretty h <+> pretty p - show (PeerRequestTimeout p) = show $ parens $ "PeerRequestTimeout" <+> pretty p - show (PeerBlockHashMismatch p) = show $ parens $ "PeerBlockHashMismatch" <+> pretty p - show StorageError = show "StorageError" - show (InternalError n) = show $ parens "InternalError" <+> pretty n - show (Incomplete h) = show $ parens "Incomplete" <+> pretty h - -instance (Typeable e, Pretty (Peer e)) => Exception (DownloadError e) - - -class BlockSizeCache e cache where - - cacheBlockSize :: forall m . MonadUnliftIO m - => cache - -> PubKey 'Sign (Encryption e) - -> Hash HbSync - -> Integer - -> m () - - findBlockSize :: forall m . MonadUnliftIO m - => cache - -> PubKey 'Sign (Encryption e) - -> Hash HbSync - -> m (Maybe Integer) - -instance BlockSizeCache e () where - cacheBlockSize _ _ _ _ = pure () - findBlockSize _ _ _ = pure Nothing - -instance BlockSizeCache e (SomeBrains e) where - cacheBlockSize = brainsCacheBlockSize @e - findBlockSize = brainsFindBlockSize @e - -queryBlockSizeFromPeer :: forall e cache m . ( e ~ L4Proto - , MonadUnliftIO m - , BlockSizeCache e cache - ) - => cache - -> PeerEnv e - -> Hash HbSync - -> Peer e - -> m (Either (DownloadError e) (Maybe Integer)) - -queryBlockSizeFromPeer cache e h peer = do - - what <- try @_ @(DownloadError e) $ liftIO $ withPeerM e do - - flip runContT pure $ callCC \exit -> do - - PeerData{..} <- lift $ find (KnownPeerKey peer) id - >>= orThrow (UnknownPeerError peer) - - s <- lift $ findBlockSize @e cache _peerSignKey h - - debug $ "FOUND CACHED VALUE" <+> pretty h <+> pretty s - - maybe none (exit . Just) s - - lift do - - sizeQ <- newTQueueIO - - subscribe @e (BlockSizeEventKey peer) $ \case - BlockSizeEvent (that, hx, sz) | hx == h -> do - atomically $ writeTQueue sizeQ (Just sz) - cacheBlockSize @e cache _peerSignKey h sz - - _ -> do - atomically $ writeTQueue sizeQ Nothing - - request peer (GetBlockSize @e h) - - race ( pause defBlockInfoTimeout ) (atomically $ readTQueue sizeQ ) - >>= orThrow (PeerRequestTimeout peer) - - case what of - Left{} -> pure $ Left (PeerRequestTimeout peer) - Right x -> pure (Right x) - - -data BurstMachine = - BurstMachine - { _buTimeout :: Double - , _buBurstMax :: Int - , _buStepUp :: Double - , _buStepDown :: Double - , _buCurrent :: TVar Double - , _buErrors :: TVar Int - } - -burstMachineAddErrors :: MonadUnliftIO m => BurstMachine -> Int -> m () -burstMachineAddErrors BurstMachine{..} n = - atomically $ modifyTVar _buErrors (+n) - -newBurstMachine :: MonadUnliftIO m - => Double -- ^ timeout - -> Int -- ^ max burst - -> Maybe Int -- ^ start burst - -> Double -- ^ step up - -> Double -- ^ step down - -> m BurstMachine - -newBurstMachine t0 buMax buStart up' down' = do - BurstMachine t0 buMax up down - <$> newTVarIO bu0 - <*> newTVarIO 0 - - where - bu0 = realToFrac $ fromMaybe (max 2 (buMax `div` 2)) buStart - down = min 0.85 down' - up = min 0.5 up' - -getCurrentBurst :: MonadUnliftIO m => BurstMachine -> m Int -getCurrentBurst BurstMachine{..} = readTVarIO _buCurrent <&> round - -runBurstMachine :: MonadUnliftIO m - => BurstMachine - -> m () - -runBurstMachine BurstMachine{..} = do - - - bu0 <- readTVarIO _buCurrent <&> realToFrac - let buMax = realToFrac _buBurstMax - let down = _buStepDown - let up = _buStepUp - - _dEdT <- newTVarIO 0.00 - - _rates <- newTVarIO (mempty :: Map Double Double) - - _buMaxReal <- newTVarIO buMax - - pause @'Seconds (realToFrac _buTimeout) - - flip runContT pure do - - void $ ContT $ withAsync do - forever do - pause @'Seconds (realToFrac _buTimeout * 10) - - atomically do - e <- headDef bu0 . Map.elems <$> readTVar _rates - nrates <- readTVar _rates <&> take 100 . Map.toList - writeTVar _rates (Map.fromList nrates) - modifyTVar _buMaxReal (max e) - - void $ ContT $ withAsync do - forever do - pause @'Seconds 600 - atomically $ writeTVar _buMaxReal buMax - - - void $ ContT $ withAsync do - forever do - pause @'Seconds (realToFrac _buTimeout * 2.0) - ddt <- readTVarIO _dEdT - - when (ddt <= 0) do - atomically do - buMaxReal <- readTVar _buMaxReal - current <- readTVar _buCurrent - let new = min buMaxReal (current * (1.0 + up)) - writeTVar _buCurrent new - - flip fix 0 $ \next e1 -> do - - let dt = realToFrac _buTimeout - - eNew <- atomically do - - e2 <- readTVar _buErrors - current <- readTVar _buCurrent - - new <- if e2 > e1 then do - let d = max 2.0 (current * (1.0 - down)) - nrates <- readTVar _rates <&> drop 3 . Map.toList - let newFucked = maybe d snd (headMay nrates) - writeTVar _rates (Map.fromList nrates) - pure newFucked - - else - pure current -- $ min buMaxReal (current * (1.0 + up)) - - writeTVar _buErrors 0 - writeTVar _buCurrent new - - let dedt = realToFrac (e2 - e1) / realToFrac dt - - writeTVar _dEdT (realToFrac dedt) - - modifyTVar _rates ( Map.insertWith max dedt current ) - - pure e2 - - pause @'Seconds dt - next eNew - -data S = - SInit - | SFetchQ - | SFetchPost (Hash HbSync) ByteString - | SCheckBefore - | SCheckAfter - --- | downloads block with dependencies recursively -downloadFromPeerRec :: forall e t cache m . ( e ~ L4Proto - , MonadUnliftIO m - , IsTimeout t - , BlockSizeCache e cache - ) - => Timeout t - -> Int - -> cache - -> PeerEnv e - -> Hash HbSync - -> Peer e - -> m (Either (DownloadError e) ()) - -downloadFromPeerRec t bu0 cache env h0 peer = do - - sto <- withPeerM env getStorage - - p <- newTQueueIO - q <- newTQueueIO - qq <- newTQueueIO - toq <- newTVarIO ( mempty :: [Int] ) - - bm <- newBurstMachine 0.5 256 (Just bu0) 0.05 0.10 - - flip runContT pure do - - ContT $ withAsync $ forever do - join $ atomically (readTQueue p) - - ContT $ withAsync $ forever do - h <- atomically (readTQueue qq) - void $ queryBlockSizeFromPeer cache env h peer - pause @'Seconds 1.5 - - ContT $ withAsync $ flip fix 10000000 $ \next m0 -> do - txs <- readTVarIO toq <&> L.take 1000 - let m1 = fromMaybe m0 $ median txs - when ( m1 > m0 ) $ burstMachineAddErrors bm 1 - pause @'Seconds 3 - next m1 - - ContT $ withAsync $ runBurstMachine bm - - flip fix SInit $ \next -> \case - - SInit -> do - debug "SInit" - atomically $ writeTQueue q h0 - next SCheckBefore - - SCheckBefore -> do - here <- hasBlock sto h0 <&> isJust - if here then next SCheckAfter else next SFetchQ - - SFetchQ -> do - debug "SFetchQ" - - done <- atomically do - pe <- isEmptyTQueue p - qe <- isEmptyTQueue q - when (qe && not pe) retry - -- when (not pe) retry - pure qe - - if done then - next SCheckAfter - else do - - h <- atomically $ readTQueue q - mbs <- getBlock sto h - - case mbs of - Just bs -> next (SFetchPost h bs) - Nothing -> none - - bu <- lift $ getCurrentBurst bm - - t0 <- getTimeCoarse - w <- lift $ downloadFromPeer t bu cache env (coerce h) peer - t1 <- getTimeCoarse - let dt = toMicroSeconds $ TimeoutTS (t1 - t0) - atomically $ modifyTVar toq ( dt : ) - - case w of - Right bs -> do - next (SFetchPost h bs) - - Left e -> do - lift $ burstMachineAddErrors bm 1 - err $ "DOWNLOAD ERROR" <+> viaShow e - next SFetchQ - - SFetchPost h bs -> do - debug $ "SFetchPost" <+> pretty h - - let parse = do - let refs = extractBlockRefs h bs - atomically $ mapM_ (writeTQueue q . coerce) refs - mapM_ (atomically . writeTQueue qq . coerce) refs - - atomically $ writeTQueue p parse - - next SFetchQ - - SCheckAfter -> do - debug "SCheckAfter" - missed <- findMissedBlocks sto (HashRef h0) - mapM_ (atomically . writeTQueue q . coerce) missed - mapM_ (atomically . writeTQueue qq . coerce) missed - unless (L.null missed) $ next SFetchQ - - pure $ Right () - -downloadFromPeer :: forall e t cache m . ( e ~ L4Proto - , MonadUnliftIO m - , IsTimeout t - , BlockSizeCache e cache - ) - => Timeout t - -> Int - -> cache - -> PeerEnv e - -> Hash HbSync - -> Peer e - -> m (Either (DownloadError e) ByteString) - -downloadFromPeer t bu cache env h peer = liftIO $ withPeerM env do - - pd@PeerData{..} <- find (KnownPeerKey peer) id - >>= orThrow (UnknownPeerError peer) - - pinfo <- find (PeerInfoKey peer) id - >>= orThrow (UnknownPeerError peer) - - rtt <- liftIO $ medianPeerRTT pinfo - <&> fmap ((*1) . realToFrac) - <&> fromMaybe 1000 - <&> (/1e6) - - let waity = 10 * rtt - - sto <- getStorage - - let chunkSize = defChunkSize - - flip runContT pure $ callCC \exit -> do - - size <- lift (findBlockSize @e cache _peerSignKey h) - >>= maybe (queryBlockSize exit) pure - - coo <- genCookie (peer,h) - let key = DownloadSessionKey (peer, coo) - down@BlockDownload{..} <- newBlockDownload h - let chuQ = _sBlockChunks - let new = set sBlockChunkSize chunkSize - . set sBlockSize (fromIntegral size) - $ down - - lift $ update @e new key id - - let offsets = calcChunks size (fromIntegral chunkSize) :: [(Offset, Size)] - - let chunkNums = [ 0 .. pred (length offsets) ] - - let bursts = calcBursts bu chunkNums - - callCC $ \exit2 -> do - - _wx <- newTVarIO waity - - for_ bursts $ \(i,chunkN) -> do - - -- atomically $ flushTQueue chuQ - - let req = BlockChunks @e coo (BlockGetChunks h chunkSize (fromIntegral i) (fromIntegral chunkN)) - - lift $ request peer req - - t0 <- getTimeCoarse - - let watchdog = fix \next -> do - wx <- readTVarIO _wx <&> realToFrac - -- debug $ "WATCHDOG" <+> pretty wx <+> pretty waity - r <- race (pause @'MilliSeconds (min wx waity)) do - void $ atomically $ readTQueue chuQ - either (const none) (const next) r - - r <- liftIO $ race watchdog do - - atomically do - pieces <- readTVar _sBlockChunks2 - let done = and [ IntMap.member j pieces | j <- [i .. i + chunkN-1] ] - unless done retry - - atomically $ flushTQueue chuQ - - t1 <- getTimeCoarse - - atomically do - wx0 <- readTVar _wx - let wx1 = realToFrac (t1 - t0) * 100 / 1e6 -- millis - writeTVar _wx wx1 - - case r of - Left{} -> exit2 (Left $ DownloadStuckError (HashRef h) peer) - _ -> pure () - - - blk <- readTVarIO _sBlockChunks2 - let rs = LBS.concat $ IntMap.elems blk - - ha <- putBlock sto rs - - -- let ha = Just $ hashObject @HbSync rs - - lift $ expire @e key - - case ha of - Nothing -> pure $ Left StorageError - - Just h1 | h1 == h -> do - pure $ Right rs - - Just h1 -> do - delBlock sto h1 - pure $ Left (PeerBlockHashMismatch peer) - - where - - queryBlockSize exit = do - what <- lift $ queryBlockSizeFromPeer cache env h peer - case what of - Left{} -> exit (Left (PeerRequestTimeout peer)) - Right Nothing -> exit (Left (PeerMissBlockError (HashRef h) peer)) - Right (Just s) -> pure s - instance (e ~ L4Proto, MonadUnliftIO m, HasRpcContext PeerAPI RPC2Context m) => HandleMethod m RpcRunScript where handleMethod top = do diff --git a/hbs2-peer/app/RefChan.hs b/hbs2-peer/app/RefChan.hs index bca84594..162e5524 100644 --- a/hbs2-peer/app/RefChan.hs +++ b/hbs2-peer/app/RefChan.hs @@ -38,7 +38,6 @@ import HBS2.Storage import PeerTypes hiding (downloads) import PeerConfig -import BlockDownload() import Brains import Control.Monad.Trans.Cont @@ -91,7 +90,6 @@ data RefChanWorkerEnv e = RefChanWorkerEnv { _refChanWorkerConf :: PeerConfig , _refChanPeerEnv :: PeerEnv e - , _refChanWorkerEnvDEnv :: DownloadEnv e , _refChanNotifySource :: SomeNotifySource (RefChanEvents e) , _refChanWorkerEnvHeadQ :: TQueue (RefChanId e, RefChanHeadBlockTran e) , _refChanWorkerEnvDownload :: TVar (HashMap HashRef (RefChanId e, (TimeSpec, OnDownloadComplete))) @@ -121,12 +119,11 @@ refChanWorkerEnvSetProbe RefChanWorkerEnv{..} probe = do refChanWorkerEnv :: forall m e . (MonadIO m, ForRefChans e) => PeerConfig -> PeerEnv e - -> DownloadEnv e -> SomeNotifySource (RefChanEvents e) -> m (RefChanWorkerEnv e) -refChanWorkerEnv conf pe de nsource = - liftIO $ RefChanWorkerEnv @e conf pe de nsource +refChanWorkerEnv conf pe nsource = + liftIO $ RefChanWorkerEnv @e conf pe nsource <$> newTQueueIO <*> newTVarIO mempty <*> newTVarIO mempty @@ -217,8 +214,7 @@ refChanAddDownload :: forall e m . ( m ~ PeerM e IO refChanAddDownload env chan r onComlete = do penv <- ask t <- getTimeCoarse - withPeerM penv $ withDownload (_refChanWorkerEnvDEnv env) - $ addDownload @e Nothing (fromHashRef r) + liftIO $ withPeerM penv $ addDownload @e Nothing (fromHashRef r) atomically $ modifyTVar (view refChanWorkerEnvDownload env) (HashMap.insert r (chan,(t, onComlete))) @@ -903,7 +899,11 @@ logMergeProcess penv env q = withPeerM penv do atomically $ modifyTVar (mergeHeads e) (HashMap.insert h headblk) pure headblk - downloadMissedHead :: AnyStorage -> RefChanId e -> HashRef -> m () + downloadMissedHead :: EventEmitter e (DownloadReq e) m + => AnyStorage + -> RefChanId e + -> HashRef + -> m () downloadMissedHead sto chan headRef = do penv <- ask here <- liftIO $ hasBlock sto (fromHashRef headRef) <&> isJust diff --git a/hbs2-peer/hbs2-peer.cabal b/hbs2-peer/hbs2-peer.cabal index b4352ab9..ac909b7e 100644 --- a/hbs2-peer/hbs2-peer.cabal +++ b/hbs2-peer/hbs2-peer.cabal @@ -245,10 +245,9 @@ executable hbs2-peer main-is: PeerMain.hs other-modules: - BlockDownload + BlockDownloadNew , BrainyPeerLocator , ByPassWorker - , DownloadQ , DownloadMon , Bootstrap , PeerInfo diff --git a/hbs2-peer/lib/HBS2/Peer/Proto/BlockInfo.hs b/hbs2-peer/lib/HBS2/Peer/Proto/BlockInfo.hs index cbb35be0..718d0f72 100644 --- a/hbs2-peer/lib/HBS2/Peer/Proto/BlockInfo.hs +++ b/hbs2-peer/lib/HBS2/Peer/Proto/BlockInfo.hs @@ -10,6 +10,7 @@ import HBS2.Hash import HBS2.System.Logger.Simple +import Type.Reflection (someTypeRep) import Data.Hashable import Data.Maybe import Data.ByteString (ByteString) @@ -33,13 +34,12 @@ blockSizeProto :: forall e m proto . ( MonadIO m , proto ~ BlockInfo e ) => GetBlockSize HbSync m - -> HasBlockEvent HbSync e m -> ( (Peer e, Hash HbSync) -> m () ) -> BlockInfo e -> m () -- FIXME: with-auth-combinator -blockSizeProto getBlockSize evHasBlock onNoBlock = +blockSizeProto getBlockSize onNoBlock = \case GetBlockSize h -> do -- liftIO $ print "GetBlockSize" @@ -57,13 +57,13 @@ blockSizeProto getBlockSize evHasBlock onNoBlock = that <- thatPeer @proto emit @e (BlockSizeEventKey that) (NoBlockEvent (that, h)) emit @e AnyBlockSizeEventKey (AnyBlockSizeEvent h Nothing that) - evHasBlock ( that, h, Nothing ) + -- evHasBlock ( that, h, Nothing ) BlockSize h sz -> deferred @proto do that <- thatPeer @proto emit @e (BlockSizeEventKey @e that) (BlockSizeEvent (that, h, sz)) emit @e AnyBlockSizeEventKey (AnyBlockSizeEvent h (Just sz) that) - evHasBlock ( that, h, Just sz ) + -- evHasBlock ( that, h, Just sz ) data AnyBlockSizeEvent e @@ -71,8 +71,15 @@ data instance EventKey e (AnyBlockSizeEvent e) = AnyBlockSizeEventKey deriving stock (Typeable, Generic, Eq) -instance Hashable (EventKey e (AnyBlockSizeEvent e)) where - hashWithSalt s _ = hashWithSalt s ("AnyBlockSizeEventKey_1730696922" :: ByteString) +instance Typeable e => Hashable (EventKey e (AnyBlockSizeEvent e)) where + hashWithSalt s _ = + hashWithSalt s (someTypeRep (Proxy :: Proxy (EventKey e (AnyBlockSizeEvent e))) :: TypeRep) + +instance EventType (Event e (AnyBlockSizeEvent e)) where + isPersistent = True + +instance Expires (EventKey e (AnyBlockSizeEvent e)) where + expiresIn = const Nothing -- (Just defCookieTimeoutSec) data instance Event e (AnyBlockSizeEvent e) = AnyBlockSizeEvent