From bc160b83354bf8d9cc6c4924cd2fbb2c123e9ebb Mon Sep 17 00:00:00 2001 From: voidlizard Date: Tue, 5 Nov 2024 19:09:26 +0300 Subject: [PATCH] wip --- hbs2-core/lib/HBS2/Defaults.hs | 2 +- hbs2-peer/app/RPC2.hs | 130 +++++++++++++++++++++------------ 2 files changed, 86 insertions(+), 46 deletions(-) diff --git a/hbs2-core/lib/HBS2/Defaults.hs b/hbs2-core/lib/HBS2/Defaults.hs index 94dc4027..224293ca 100644 --- a/hbs2-core/lib/HBS2/Defaults.hs +++ b/hbs2-core/lib/HBS2/Defaults.hs @@ -81,7 +81,7 @@ defBlockWaitMax = 60 :: Timeout 'Seconds -- how much time wait for block from peer? defChunkWaitMax :: Timeout 'Seconds -defChunkWaitMax = 10 :: Timeout 'Seconds +defChunkWaitMax = 30 :: Timeout 'Seconds defSweepTimeout :: Timeout 'Seconds defSweepTimeout = 60 -- FIXME: only for debug! diff --git a/hbs2-peer/app/RPC2.hs b/hbs2-peer/app/RPC2.hs index 32c9b367..9cdbb6e5 100644 --- a/hbs2-peer/app/RPC2.hs +++ b/hbs2-peer/app/RPC2.hs @@ -70,6 +70,7 @@ import Data.List qualified as L import Data.Coerce import Numeric import UnliftIO +import UnliftIO.Concurrent import Lens.Micro.Platform import Streaming.Prelude qualified as S @@ -82,6 +83,7 @@ data DownloadError e = | PeerMissBlockError HashRef (Peer e) | PeerBlockHashMismatch (Peer e) | PeerRequestTimeout (Peer e) + | Incomplete HashRef deriving stock (Generic,Typeable) instance Pretty (Peer e) => Show (DownloadError e) where @@ -92,6 +94,7 @@ instance Pretty (Peer e) => Show (DownloadError e) where show (PeerBlockHashMismatch p) = show $ parens $ "PeerBlockHashMismatch" <+> pretty p show StorageError = show "StorageError" show (InternalError n) = show $ parens "InternalError" <+> pretty n + show (Incomplete h) = show $ parens "Incomplete" <+> pretty h instance (Typeable e, Pretty (Peer e)) => Exception (DownloadError e) @@ -166,6 +169,13 @@ queryBlockSizeFromPeer cache e h peer = do Right x -> pure (Right x) +data S = + SInit + | SFetchQ + | SFetchPost (Hash HbSync) ByteString + | SCheckBefore + | SCheckAfter + -- | downloads block with dependencies recursively downloadFromPeerRec :: forall e t cache m . ( e ~ L4Proto , MonadUnliftIO m @@ -186,65 +196,77 @@ downloadFromPeerRec t bu0 cache env h0 peer = do p <- newTQueueIO q <- newTQueueIO - done <- newTVarIO (mempty :: HashSet (Hash HbSync)) + qq <- newTQueueIO - atomically $ writeTQueue q h0 + flip runContT pure do - missed <- findMissedBlocks sto (HashRef h0) + ContT $ withAsync $ forever do + join $ atomically (readTQueue p) - mapM_ (atomically . writeTQueue q . coerce) missed + ContT $ withAsync $ forever do + h <- atomically (readTQueue qq) + void $ queryBlockSizeFromPeer cache env h peer + pause @'Seconds 1.5 - flip runContT pure $ callCC \exit -> do + flip fix SInit $ \next -> \case - ContT $ withAsync $ liftIO $ forever do - atomically (readTQueue p) + SInit -> do + debug "SInit" + atomically $ writeTQueue q h0 + next SCheckBefore - fix \next -> do + SCheckBefore -> do + here <- hasBlock sto h0 <&> isJust + if here then next SCheckAfter else next SFetchQ - mt <- atomically do - pe <- isEmptyTQueue p - qe <- isEmptyTQueue q - when ( qe && not pe ) retry - pure qe + SFetchQ -> do + debug "SFetchQ" - when mt $ exit $ Right () + done <- atomically do + pe <- isEmptyTQueue p + qe <- isEmptyTQueue q + unless pe retry + pure qe - h <- atomically $ readTQueue q + if done then + next SCheckAfter + else do - here1 <- readTVarIO done <&> HS.member h + h <- atomically $ readTQueue q + mbs <- getBlock sto h - here2 <- hasBlock sto (coerce h) <&> isJust + case mbs of + Just bs -> next (SFetchPost h bs) + Nothing -> none - when (here1 || here2) next + w <- lift $ downloadFromPeer t bu0 cache env (coerce h) peer - w <- lift $ downloadFromPeer (TimeoutSec 5) bu0 cache env (coerce h) peer + case w of + Right bs -> do + next (SFetchPost h bs) - case w of - Right bs -> do + Left e -> do + err $ "DOWNLOAD ERROR" <+> viaShow e + next SFetchQ - h' <- enqueueBlock sto bs - h3 <- ContT $ maybe1 h' (pure $ Left StorageError) + SFetchPost h bs -> do + debug $ "SFetchPost" <+> pretty h - let - parse :: IO () - parse = do - let refs = extractBlockRefs h3 bs - atomically $ mapM_ (writeTQueue q . coerce) refs + let parse = do + let refs = extractBlockRefs h bs + atomically $ mapM_ (writeTQueue q . coerce) refs + mapM_ (atomically . writeTQueue qq . coerce) refs - atomically $ writeTQueue p parse + atomically $ writeTQueue p parse - next + next SFetchQ - Left e -> do - err $ "DOWNLOAD ERROR" <+> viaShow e - -- pause @'Seconds 0.25 - next - - missed <- findMissedBlocks sto (HashRef h0) - - unless (L.null missed) do + SCheckAfter -> do + debug "SCheckAfter" + missed <- findMissedBlocks sto (HashRef h0) mapM_ (atomically . writeTQueue q . coerce) missed - next + mapM_ (atomically . writeTQueue qq . coerce) missed + unless (L.null missed) $ next SFetchQ pure $ Right () @@ -266,6 +288,16 @@ downloadFromPeer t bu cache env h peer = liftIO $ withPeerM env do pd@PeerData{..} <- find (KnownPeerKey peer) id >>= orThrow (UnknownPeerError peer) + pinfo <- find (PeerInfoKey peer) id + >>= orThrow (UnknownPeerError peer) + + rtt <- liftIO $ medianPeerRTT pinfo + <&> fmap ((*1) . realToFrac) + <&> fromMaybe 0 + <&> (/1e6) + + let wait = 1000 + sto <- getStorage let chunkSize = defChunkSize @@ -295,15 +327,19 @@ downloadFromPeer t bu cache env h peer = liftIO $ withPeerM env do for_ bursts $ \(i,chunkN) -> do - -- s0 <- readTVarIO _sBlockChunks2 <&> IntMap.size - - -- count <- newTVarIO 0 atomically $ flushTQueue chuQ let req = BlockChunks @e coo (BlockGetChunks h chunkSize (fromIntegral i) (fromIntegral chunkN)) + lift $ request peer req - r <- liftIO $ race (pause t) do + let watchdog = fix \next -> do + s1 <- readTVarIO _sBlockChunks2 <&> IntMap.size + pause @'MilliSeconds ( max (realToFrac chunkN * rtt) 2000 ) + s2 <- readTVarIO _sBlockChunks2 <&> IntMap.size + when (s1 == s2) next + + r <- liftIO $ race watchdog do atomically do pieces <- readTVar _sBlockChunks2 @@ -312,12 +348,16 @@ downloadFromPeer t bu cache env h peer = liftIO $ withPeerM env do atomically $ flushTQueue chuQ - either (const $ exit2 (Left $ DownloadStuckError (HashRef h) peer) ) pure r + case r of + Left{} -> exit2 (Left $ DownloadStuckError (HashRef h) peer) + _ -> pure () blk <- readTVarIO _sBlockChunks2 let rs = LBS.concat $ IntMap.elems blk - ha <- enqueueBlock sto rs + ha <- putBlock sto rs + + -- let ha = Just $ hashObject @HbSync rs lift $ expire @e key