From 71808979faab5bfd78ec803273904ed3022d69a1 Mon Sep 17 00:00:00 2001 From: voidlizard Date: Sun, 10 Nov 2024 12:14:58 +0300 Subject: [PATCH] does not work --- hbs2-core/lib/HBS2/Data/Detect.hs | 6 +- hbs2-peer/app/BlockDownloadNew.hs | 203 +++++++++++++++++++----------- 2 files changed, 135 insertions(+), 74 deletions(-) diff --git a/hbs2-core/lib/HBS2/Data/Detect.hs b/hbs2-core/lib/HBS2/Data/Detect.hs index 75fe8571..214e08cb 100644 --- a/hbs2-core/lib/HBS2/Data/Detect.hs +++ b/hbs2-core/lib/HBS2/Data/Detect.hs @@ -49,7 +49,7 @@ tryDetect hash obj = rights [mbAnn, mbLink, mbMerkle, mbSeq] & headDef orBlob data ScanLevel = ScanShallow | ScanDeep -extractBlockRefs :: Hash HbSync -> ByteString -> [Hash HbSync] +extractBlockRefs :: Hash HbSync -> ByteString -> [HashRef] extractBlockRefs hx bs = case tryDetect hx bs of (SeqRef (SequentialRef _ (AnnotatedHashRef a' b))) -> @@ -58,7 +58,7 @@ extractBlockRefs hx bs = AnnRef (AnnotatedHashRef ann h) -> do coerce <$> catMaybes [ann, Just h] - Merkle (MNode _ hs) -> hs + Merkle (MNode _ hs) -> fmap HashRef hs MerkleAnn (MTreeAnn{..}) -> do @@ -76,7 +76,7 @@ extractBlockRefs hx bs = MNode _ hs -> hs _ -> mempty - meta <> c <> t + fmap HashRef (meta <> c <> t) _ -> mempty diff --git a/hbs2-peer/app/BlockDownloadNew.hs b/hbs2-peer/app/BlockDownloadNew.hs index c32c028a..68dd9753 100644 --- a/hbs2-peer/app/BlockDownloadNew.hs +++ b/hbs2-peer/app/BlockDownloadNew.hs @@ -2,6 +2,7 @@ {-# Language UndecidableInstances #-} {-# Language AllowAmbiguousTypes #-} {-# Language ImplicitParams #-} +{-# Language RecordWildCards #-} module BlockDownloadNew where import HBS2.Prelude.Plated @@ -552,6 +553,19 @@ instance BlockSizeCache e KnownSize where findBlockSize (KnownSize s) _ _ = pure (Just s) +data BState = + BNone + | BWaitSize TimeSpec + | BFetch + | BCheck + +data BInfo e = BInfo (TVar BState) + +data PInfo e = + PInfo + { pinfoBm :: BurstMachine + } + downloadDispatcher :: forall e m . ( e ~ L4Proto , MonadUnliftIO m ) @@ -560,43 +574,139 @@ downloadDispatcher :: forall e m . ( e ~ L4Proto -> m () downloadDispatcher brains env = flip runContT pure do - let t0 = 10.00 - pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async (), TQueue (Hash HbSync,Integer)) ) - - let seenLimit = 1000 - - blkQ <- newTVarIO ( HPSQ.empty :: HashPSQ HashRef Int NominalDiffTime ) + blkQ <- newTVarIO ( HPSQ.empty :: HashPSQ HashRef Int (BInfo e) ) + peerQ <- newTVarIO ( HPSQ.empty :: HashPSQ (Peer e) Double (PInfo e) ) + _busy <- newTVarIO ( mempty :: HashSet (Peer e) ) + allPeers <- newTVarIO ( mempty :: HashMap (Peer e) BurstMachine ) + parseQ <- newTQueueIO @_ @(HashRef, ByteString) sto <- withPeerM env getStorage - work <- newTQueueIO + jobs <- newTQueueIO - ContT $ bracket none $ const do - readTVarIO pts >>= mapM_ (cancel . fst) - atomically $ writeTVar pts mempty + -- ContT $ withAsync $ forever $ replicateM_ 16 do + -- join $ atomically (readTQueue jobs) + + ContT $ withAsync $ forever do + (h,bs) <- atomically $ readTQueue parseQ + let refs = extractBlockRefs (coerce h) bs + missed <- findMissedBlocks sto h + for_ (HS.fromList (refs <> missed)) $ \h -> do + here <- hasBlock sto (coerce h) <&> isJust + already <- readTVarIO blkQ <&> HPSQ.member h + unless (here || already) do + newBi <- BInfo @e <$> newTVarIO BNone + atomically $ modifyTVar blkQ (HPSQ.insert h 1 newBi) liftIO $ withPeerM env do subscribe @e DownloadReqKey $ \(DownloadReqData h) -> do - now <- getTimeCoarse here <- hasBlock sto h <&> isJust already <- readTVarIO blkQ <&> HPSQ.member (HashRef h) unless (here || already) do - atomically $ modifyTVar blkQ (HPSQ.insert (HashRef h) 1 10) + -- atomically $ modifyTVar blkQ (HPSQ.insert (HashRef h) 1 10) + newBi <- BInfo @e <$> newTVarIO BNone + atomically $ modifyTVar blkQ (HPSQ.insert (HashRef h) 1 newBi) debug $ green "New download request" <+> pretty h - ContT $ withAsync $ manageThreads pts + -- block processing loop + ContT $ withAsync $ forever $ flip runContT pure $ callCC \exit -> callCC \exit2 -> do + blk' <- atomically $ readTVar blkQ <&> HPSQ.findMin + (h,prio,bi@(BInfo _bstate)) <- maybe (pause @'Seconds 0.5 >> exit ()) pure blk' + atomically do + free <- readTVar peerQ <&> HPSQ.size + unless (free > 0) retry - ContT $ withAsync do - let sq = readTVarIO blkQ <&> fmap (\(a,_,b) -> (a,b)) . HPSQ.toList - polling (Polling 1 1) sq $ \h -> do - pips <- readTVarIO pts <&> HM.keys - forConcurrently_ pips $ \p -> do - s <- queryBlockSizeFromPeer brains env (coerce h) p - case s of - Right x -> debug $ red "GOT SIZE FOR BLOCK" <+> pretty h <+> pretty x <+> pretty p - _ -> none + debug $ green "JERK OFF" <+> pretty h + + bstate <- readTVarIO _bstate + + case bstate of + BNone -> do + now <- getTimeCoarse + who' <- atomically do + p <- readTVar peerQ <&> HPSQ.findMin + case p of + Just some -> do + modifyTVar peerQ HPSQ.deleteMin + writeTVar _bstate (BWaitSize now) + pure (Just some) + + Nothing -> pure Nothing + + who <- maybe (exit2 ()) pure who' + + let work _bs (p,pprio,i@PInfo{..}) = liftIO $ flip runContT pure do + ContT $ bracket none $ const do + atomically $ modifyTVar peerQ (HPSQ.insert p pprio i) + + liftIO do + s <- queryBlockSizeFromPeer brains env (coerce h) p + + case s of + Right (Just size) -> do + debug $ green "GOT BLOCK SIZE" <+> pretty h <+> pretty size <+> pretty p + atomically $ writeTVar _bs BFetch + bu <- getCurrentBurst pinfoBm + + debug $ yellow "START DOWNLOAD" <+> pretty h <+> pretty p + r <- downloadFromPeer (TimeoutSec 10) bu (KnownSize size) env (coerce h) p + + case r of + Right bs -> do + debug $ green "DOWNLOAD DONE" <+> pretty h <+> pretty p + atomically do + writeTQueue parseQ (h, bs) + writeTVar _bs BCheck + + Left{} -> do + debug $ red "DOWNLOAD FAIL" <+> pretty h <+> pretty p + burstMachineAddErrors pinfoBm 1 + + Right Nothing -> do + debug $ yellow "NO BLOCK" <+> pretty h <+> pretty p + pure () + + Left{} -> do + pure () + + -- FIXME: dangeourous + void $ liftIO $ async (work _bstate who) + + BWaitSize ts -> do + now <- getTimeCoarse + let done = expired (TimeoutSec 10) (now - ts) + when done do + atomically $ writeTVar _bstate BNone + atomically $ modifyTVar blkQ (HPSQ.insert h (succ prio) bi) + + BFetch -> do + atomically $ modifyTVar blkQ (HPSQ.insert h (succ prio) bi) + + BCheck -> do + atomically $ modifyTVar blkQ (HPSQ.delete h) + + -- peer update loop + ContT $ withAsync $ withPeerM env $ forever $ (>> pause @'Seconds 5) do + + peers <- getKnownPeers @e <&> HS.fromList + + atomically do + busy <- readTVar _busy + every <- readTVar peerQ <&> HPSQ.toList + let alive = [ x | x@(p,_,_) <- every, HS.member p peers, not (HS.member p busy) ] & HPSQ.fromList + let newBusy = [ x | x <- HS.toList busy, HS.member x peers ] + writeTVar peerQ alive + writeTVar _busy (HS.fromList newBusy) + + for_ peers $ \p -> do + here <- readTVarIO allPeers <&> HM.member p + unless here do + bm <- liftIO $ newBurstMachine 0.5 256 (Just 50) 0.05 0.10 + atomically do + modifyTVar allPeers (HM.insert p bm) + modifyTVar peerQ (HPSQ.insert p 1.0 (PInfo bm)) forever do pause @'Seconds 10 @@ -604,52 +714,3 @@ downloadDispatcher brains env = flip runContT pure do debug $ yellow $ "I'm download dispatcher" debug $ yellow $ "wip:" <+> pretty size - where - - manageThreads pts = withPeerM env $ forever do - pips <- getKnownPeers @e <&> HS.fromList - - for_ pips $ \p -> do - here <- readTVarIO pts <&> HM.member p - unless here do - q <- newTQueueIO - a <- async $ peerDownloadLoop p q - atomically $ modifyTVar pts (HM.insert p (a,q)) - - debug $ "downloadDispatcher: knownPeer" <+> yellow (pretty p) - - dead <- atomically do - total <- readTVar pts <&> HM.toList - - what <- for total $ \(p,a) -> do - let pipExist = HS.member p pips - stillAlive <- pollSTM (fst a) <&> isNothing - - if pipExist && stillAlive then do - pure $ Right (p,a) - else - pure $ Left (p,a) - - writeTVar pts (HM.fromList (rights what)) - pure $ lefts what - - for_ dead $ \(p,a) -> do - cancel (fst a) - debug $ red "terminating peer loop" <+> pretty p - - pause @'Seconds 5 - - peerDownloadLoop p q = do - bm <- liftIO $ newBurstMachine 0.5 256 (Just 50) 0.05 0.10 - forever do - (h,s) <- atomically $ readTQueue q - bu <- getCurrentBurst bm - blk <- downloadFromPeer (TimeoutSec 5) bu (KnownSize s) env (coerce h) p - case blk of - Left{} -> pure () - Right bs -> liftIO $ withPeerM env do - let refs = extractBlockRefs h bs - for_ refs $ \r -> do - addDownload @e (Just (coerce h)) r - -