diff --git a/hbs2-peer/app/BlockDownloadNew.hs b/hbs2-peer/app/BlockDownloadNew.hs index 34ced84a..49fdcdaf 100644 --- a/hbs2-peer/app/BlockDownloadNew.hs +++ b/hbs2-peer/app/BlockDownloadNew.hs @@ -553,18 +553,14 @@ instance BlockSizeCache e KnownSize where findBlockSize (KnownSize s) _ _ = pure (Just s) -data BState = - BNone - | BWaitSize TimeSpec - | BFetch - | BCheck +data Work e = + RequestSize (Maybe (Peer e)) HashRef + | FetchBlock (Maybe (Peer e)) HashRef + deriving stock (Generic) -data BInfo e = BInfo (TVar BState) - -data PInfo e = - PInfo - { pinfoBm :: BurstMachine - } +deriving stock instance Eq (Peer e) => Eq (Work e) +deriving stock instance Ord (Peer e) => Ord (Work e) +instance (Eq (Peer e), Hashable (Peer e)) => Hashable (Work e) downloadDispatcher :: forall e m . ( e ~ L4Proto , MonadUnliftIO m @@ -574,138 +570,144 @@ downloadDispatcher :: forall e m . ( e ~ L4Proto -> m () downloadDispatcher brains env = flip runContT pure do + pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async ())) + tasks <- newTVarIO ( HPSQ.empty :: HashPSQ (Work e) Double Int ) - blkQ <- newTVarIO ( HPSQ.empty :: HashPSQ HashRef Int (BInfo e) ) - peerQ <- newTVarIO ( HPSQ.empty :: HashPSQ (Peer e) Double (PInfo e) ) - _busy <- newTVarIO ( mempty :: HashSet (Peer e) ) - allPeers <- newTVarIO ( mempty :: HashMap (Peer e) BurstMachine ) - parseQ <- newTQueueIO @_ @(HashRef, ByteString) + void $ ContT $ withAsync $ manageThreads tasks pts sto <- withPeerM env getStorage - jobs <- newTQueueIO - - -- ContT $ withAsync $ forever $ replicateM_ 16 do - -- join $ atomically (readTQueue jobs) - - ContT $ withAsync $ forever do - (h,bs) <- atomically $ readTQueue parseQ - let refs = extractBlockRefs (coerce h) bs - missed <- findMissedBlocks sto h - for_ (HS.fromList (refs <> missed)) $ \h -> do - here <- hasBlock sto (coerce h) <&> isJust - already <- readTVarIO blkQ <&> HPSQ.member h - unless (here || already) do - newBi <- BInfo @e <$> newTVarIO BNone - atomically $ modifyTVar blkQ (HPSQ.insert h 1 newBi) - liftIO $ withPeerM env do subscribe @e DownloadReqKey $ \(DownloadReqData h) -> do here <- hasBlock sto h <&> isJust - already <- readTVarIO blkQ <&> HPSQ.member (HashRef h) - unless (here || already) do - -- atomically $ modifyTVar blkQ (HPSQ.insert (HashRef h) 1 10) - newBi <- BInfo @e <$> newTVarIO BNone - atomically $ modifyTVar blkQ (HPSQ.insert (HashRef h) 1 newBi) + unless here do + atomically do + modifyTVar tasks (HPSQ.insert (RequestSize Nothing (HashRef h)) 1.0 0) + debug $ green "New download request" <+> pretty h - -- block processing loop - ContT $ withAsync $ forever $ flip runContT pure $ callCC \exit -> callCC \exit2 -> do - blk' <- atomically $ readTVar blkQ <&> HPSQ.findMin - (h,prio,bi@(BInfo _bstate)) <- maybe (pause @'Seconds 0.5 >> exit ()) pure blk' - - - bstate <- readTVarIO _bstate - - case bstate of - BNone -> do - now <- getTimeCoarse - who <- atomically do - p <- readTVar peerQ <&> HPSQ.findMin - case p of - Just some -> do - modifyTVar peerQ HPSQ.deleteMin - writeTVar _bstate (BWaitSize now) - modifyTVar blkQ (HPSQ.insert h (succ prio) bi) - pure some - - Nothing -> retry - - let work _bs (p,pprio,i@PInfo{..}) = liftIO $ flip runContT pure do - ContT $ bracket none $ const do - atomically $ modifyTVar peerQ (HPSQ.insert p pprio i) - - liftIO do - s <- queryBlockSizeFromPeer brains env (coerce h) p - - case s of - Right (Just size) -> do - debug $ green "GOT BLOCK SIZE" <+> pretty h <+> pretty size <+> pretty p - atomically $ writeTVar _bs BFetch - bu <- getCurrentBurst pinfoBm - - debug $ yellow "START DOWNLOAD" <+> pretty h <+> pretty p - r <- downloadFromPeer (TimeoutSec 10) bu (KnownSize size) env (coerce h) p - - case r of - Right bs -> do - debug $ green "DOWNLOAD DONE" <+> pretty h <+> pretty p - atomically do - writeTQueue parseQ (h, bs) - writeTVar _bs BCheck - - Left{} -> do - debug $ red "DOWNLOAD FAIL" <+> pretty h <+> pretty p - atomically $ writeTVar _bstate BNone - burstMachineAddErrors pinfoBm 1 - - Right Nothing -> do - debug $ yellow "NO BLOCK" <+> pretty h <+> pretty p - atomically $ writeTVar _bstate BNone - - Left{} -> do - pure () - - -- FIXME: dangeourous - void $ liftIO $ async (work _bstate who) - - BWaitSize ts -> do - now <- getTimeCoarse - let done = expired (TimeoutSec 10) (now - ts) - when done do - atomically $ writeTVar _bstate BNone - atomically $ modifyTVar blkQ (HPSQ.insert h (succ prio) bi) - - BFetch -> do - atomically $ modifyTVar blkQ (HPSQ.insert h (succ prio) bi) - - BCheck -> do - atomically $ modifyTVar blkQ (HPSQ.delete h) - - -- peer update loop - ContT $ withAsync $ withPeerM env $ forever $ (>> pause @'Seconds 5) do - - peers <- getKnownPeers @e <&> HS.fromList - - atomically do - busy <- readTVar _busy - every <- readTVar peerQ <&> HPSQ.toList - let alive = [ x | x@(p,_,_) <- every, HS.member p peers, not (HS.member p busy) ] & HPSQ.fromList - let newBusy = [ x | x <- HS.toList busy, HS.member x peers ] - writeTVar peerQ alive - writeTVar _busy (HS.fromList newBusy) - - for_ peers $ \p -> do - here <- readTVarIO allPeers <&> HM.member p - unless here do - bm <- liftIO $ newBurstMachine 0.5 256 (Just 50) 0.05 0.10 - atomically do - modifyTVar allPeers (HM.insert p bm) - modifyTVar peerQ (HPSQ.insert p 1.0 (PInfo bm)) - - forever do - pause @'Seconds 10 - size <- atomically $ readTVar blkQ <&> HPSQ.size - debug $ yellow $ "I'm download dispatcher" + forever $ (>> pause @'Seconds 10) do + size <- atomically $ readTVar tasks <&> HPSQ.size debug $ yellow $ "wip:" <+> pretty size + where + manageThreads tasks pts = forever $ (>> pause @'Seconds 10) $ withPeerM env do + debug "MANAGE THREADS" + + peers <- getKnownPeers @e <&> HS.fromList + + for_ peers $ \p -> do + here <- readTVarIO pts <&> HM.member p + unless here do + a <- async (peerThread p tasks) + atomically $ modifyTVar pts (HM.insert p a) + + loosers <- atomically do + xs <- readTVar pts <&> HM.toList + -- FIXME: filter-stopped-tasks + let (alive,dead) = L.partition ( \(x,_) -> HS.member x peers ) xs + writeTVar pts (HM.fromList alive) + pure dead + + mapM_ cancel (fmap snd loosers) + + peerThread p tasks = flip runContT pure do + + sto <- withPeerM env getStorage + + _sizeCache <- newTVarIO ( mempty :: HashMap HashRef (Maybe Integer) ) + + bm <- liftIO $ newBurstMachine 0.5 256 (Just 50) 0.05 0.10 + + void $ ContT $ bracket none $ const do + debug $ "Cancelling thread for" <+> pretty p + + debug $ yellow "Started thread for" <+> pretty p + + bmt <- ContT $ withAsync $ runBurstMachine bm + + parseQ <- newTQueueIO @_ @(HashRef, ByteString) + + pt <- ContT $ withAsync $ forever do + (hr,bs) <- atomically $ readTQueue parseQ + let refs = extractBlockRefs (coerce hr) bs + atomically do + for_ refs $ \r -> do + modifyTVar tasks (HPSQ.insert (RequestSize (Just p) r) 0.1 0) + modifyTVar tasks (HPSQ.insert (RequestSize Nothing r) 1 0) + + miss <- findMissedBlocks sto hr + atomically do + for_ miss $ \r -> do + modifyTVar tasks (HPSQ.insert (RequestSize (Just p) r) 0.1 0) + modifyTVar tasks (HPSQ.insert (RequestSize Nothing r) 1 0) + + tt <- ContT $ withAsync $ forever $ flip runContT pure do + + callCC \again -> do + + q <- readTVarIO tasks <&> HPSQ.keys + + for_ q $ \case + + RequestSize (Just who) blk | who == p -> do + r <- lift $ queryBlockSizeFromPeer brains env (coerce blk) p + case r of + Left{} -> do + atomically $ modifyTVar tasks (HPSQ.delete (RequestSize (Just p) blk)) + again () + + Right s -> do + atomically do + modifyTVar _sizeCache (HM.insert blk s) + modifyTVar tasks (HPSQ.delete (RequestSize (Just p) blk)) + modifyTVar tasks (HPSQ.insert (FetchBlock (Just p) blk) 1 0) + + RequestSize Nothing blk -> do + r <- lift $ queryBlockSizeFromPeer brains env (coerce blk) p + case r of + Left{} -> again () + + Right s -> do + atomically do + modifyTVar _sizeCache (HM.insert blk s) + modifyTVar tasks (HPSQ.delete (RequestSize (Just p) blk)) + modifyTVar tasks (HPSQ.insert (FetchBlock (Just p) blk) 1 0) + + FetchBlock w blk -> do + s <- atomically $ readTVar _sizeCache <&> HM.lookup blk + + case s of + Just (Just size) -> do + debug $ "START DOWNLOAD!" <+> pretty blk <+> pretty p + atomically $ modifyTVar tasks (HPSQ.delete (FetchBlock w blk)) + bu <- lift $ getCurrentBurst bm + what <- lift $ downloadFromPeer (TimeoutSec 5) bu brains env (coerce blk) p + case what of + Left{} -> do + lift $ burstMachineAddErrors bm 1 + atomically $ modifyTVar tasks (HPSQ.insert (FetchBlock w blk) 1 0) + again () + + Right bs -> do + atomically do + tss <- readTVar tasks <&> HPSQ.toList + + let tssAlive = flip L.filter tss $ \case + (RequestSize _ b,_,_) | b == blk -> False + (FetchBlock _ b,_,_) | b == blk -> False + _ -> True + + writeTVar tasks (HPSQ.fromList tssAlive) + writeTQueue parseQ (blk, bs) + + _ -> none + + _ -> none + + bs <- ContT $ withAsync $ forever do + pause @'Seconds 10 + debug $ yellow "I'm thread" <+> pretty p + + void $ waitAnyCatchCancel [bmt,tt,bs] +