diff --git a/hbs2-peer/app/BlockDownloadNew.hs b/hbs2-peer/app/BlockDownloadNew.hs index 54fd6515..491385df 100644 --- a/hbs2-peer/app/BlockDownloadNew.hs +++ b/hbs2-peer/app/BlockDownloadNew.hs @@ -14,7 +14,8 @@ import HBS2.Defaults import HBS2.Events import HBS2.Net.Proto.Service import HBS2.Net.Proto.Sessions - +import HBS2.Data.Bundle +import HBS2.Data.Types.SignedBox import HBS2.Base58 import HBS2.Data.Types.Peer @@ -57,16 +58,20 @@ import Data.Either import Data.Maybe import Data.ByteString.Lazy qualified as LBS import Data.ByteString.Lazy (ByteString) +import Data.Vector qualified as V import Data.ByteString qualified as BS import Data.List qualified as L import Data.Coerce import Numeric import UnliftIO +import Control.Concurrent.STM qualified as STM import UnliftIO.Concurrent +import System.Random import Lens.Micro.Platform import Streaming.Prelude qualified as S + data DownloadError e = DownloadStuckError HashRef (Peer e) | StorageError @@ -400,6 +405,7 @@ downloadFromPeerRec t bu0 cache env h0 peer = do pure $ Right () + downloadFromPeer :: forall e t cache m . ( e ~ L4Proto , MonadUnliftIO m , IsTimeout t @@ -499,7 +505,7 @@ downloadFromPeer t bu cache env h peer = liftIO $ withPeerM env do blk <- readTVarIO _sBlockChunks2 let rs = LBS.concat $ IntMap.elems blk - ha <- putBlock sto rs + ha <- enqueueBlock sto rs -- let ha = Just $ hashObject @HbSync rs @@ -539,6 +545,542 @@ data S2 = | S2FetchBlock (Hash HbSync) | S2Exit +newtype KnownSize = KnownSize Integer + +instance BlockSizeCache e KnownSize where + cacheBlockSize _ _ _ _ = pure () + findBlockSize (KnownSize s) _ _ = pure (Just s) + + +downloadDispatcher2 :: forall e m . ( e ~ L4Proto + , MonadUnliftIO m + ) + => SomeBrains e + -> PeerEnv e + -> m () + +downloadDispatcher2 brains e = do + + let blkInfoLock = 5 :: Timeout 'Seconds + let blkWaitLock = 60 :: Timeout 'Seconds + let workloadFactor = 2.5 + + sto <- withPeerM e getStorage + + let downT = 16 + let sizeT = 4 + + inQ <- newTVarIO mempty + sizeRQ <- newTQueueIO + checkQ <- newTQueueIO + + -- inQ <- withDownload env0 $ asks (view blockInQ) + -- checkQ <- withDownload env0 $ asks (view blockCheckQ) + sizeQ <- newTQueueIO + fetchQ <- newTQueueIO + parseQ <- newTQueueIO + -- sizeRQ <- withDownload env0 $ asks (view blockSizeRecvQ) + + -- FIXME: cleanup-nonce + nonces <- newTVarIO (mempty :: HashMap (Peer e) PeerNonce) + + -- FIXME: cleanup-busy + busy <- newTVarIO (mempty :: HashMap PeerNonce Double) + + rates <- newTVarIO (mempty :: IntMap.IntMap [(Peer e,PeerNonce)]) + + fetchH <- newTVarIO (mempty :: HashSet (Hash HbSync)) + sizes <- newTVarIO (mempty :: HashMap (Peer e, Hash HbSync) (Maybe Integer, TimeSpec)) + sizeReq <- newTVarIO (mempty :: HashMap (Hash HbSync) TimeSpec) + + seen <- newTVarIO (mempty :: HashMap (Hash HbSync) Int) + + peerz <- newTVarIO ( mempty :: HashMap (Peer e) BurstMachine) + + defBurst <- liftIO $ newBurstMachine 0.5 256 (Just 50) 0.05 0.10 + + liftIO $ withPeerM e do + subscribe @e DownloadReqKey $ \(DownloadReqData h) -> do + here <- hasBlock sto h <&> isJust + unless here do + atomically $ modifyTVar inQ (HM.insert h ()) + atomically $ writeTQueue sizeQ h + + flip runContT pure do + + -- UPDATE-STATS-LOOP + void $ ContT $ withAsync $ updateRates e rates nonces + + void $ ContT $ withAsync $ forever do + pips <- withPeerM e $ getKnownPeers @e <&> HS.fromList + + for_ pips $ \p -> do + here <- readTVarIO peerz <&> HM.member p + + unless here do + newBum <- liftIO $ newBurstMachine 0.5 256 (Just 50) 0.05 0.10 + atomically $ modifyTVar peerz (HM.insert p newBum) + + atomically do + modifyTVar peerz ( HM.filterWithKey (\k _ -> HS.member k pips)) + + pause @Seconds 5 + + void $ ContT $ withAsync do + forever do + pause @'Seconds 120 + atomically do + q <- readTVar inQ + let isInQ x = HM.member x q + modifyTVar' fetchH (HS.filter isInQ) + modifyTVar' sizeReq (HM.filterWithKey (curry (isInQ . fst))) + modifyTVar' sizes (HM.filterWithKey (curry (isInQ . snd . fst))) + modifyTVar' seen (HM.filterWithKey (curry (isInQ . fst))) + + livePeers <- readTVar rates <&> mconcat . IntMap.elems + let liveNonce = HS.fromList (fmap snd livePeers) + let livePeer = HS.fromList (fmap fst livePeers) + + modifyTVar' busy (HM.filterWithKey (\x _ -> HS.member x liveNonce)) + modifyTVar' nonces (HM.filterWithKey (\x _ -> HS.member x livePeer)) + + replicateM_ downT $ ContT $ withAsync do + forever do + blk <- atomically $ readTQueue checkQ + here <- hasBlock sto blk <&> isJust + if not here then do + atomically $ writeTQueue sizeQ blk + else do + atomically $ writeTQueue parseQ blk + + void $ ContT $ withAsync $ liftIO $ withPeerM e do + forever do + blk <- atomically $ readTQueue parseQ + + blks <- findMissedBlocks sto (HashRef blk) + + for_ blks $ \b -> do + addDownload @e (Just blk) (fromHashRef b) + + processBlock blk + atomically $ modifyTVar inQ (HM.delete (coerce blk)) + + replicateM_ 1 $ ContT $ withAsync do + forever do + + -- pause @'Seconds 0.25 + + items <- atomically do + peekTQueue sizeRQ >> STM.flushTQueue sizeRQ + + now <- getTimeCoarse + + todo <- atomically do + w <- for items $ \(p,h,s) -> do + modifyTVar sizes (HM.insert (p,h) (s, now)) + readTVar nonces <&> HM.lookup p >>= \case + Nothing -> pure () + Just nonce -> setBusySTM nonce busy (Just (setFactor 0 (0.01-))) + pure h + + for (L.nub w) pure + + for_ todo $ \b -> do + here <- hasBlock sto b <&> isJust + + already <- atomically do + readTVar fetchH <&> HS.member b + + when (not here && not already) do + atomically $ writeTQueue fetchQ b + + replicateM_ sizeT $ ContT $ withAsync do + + -- TODO: trim-sizeReq + let blocks = readTVarIO sizeReq <&> HM.keys <&> fmap (,2) + + polling (Polling 1 1) blocks $ \h -> do + pips <- readTVarIO nonces <&> HM.keys + s <- readTVarIO sizes <&> HM.toList + + for_ pips $ \p -> do + here <- lookupSizeIO sizes p h <&> isRight + + if here then do + atomically $ modifyTVar sizeReq (HM.delete h) + else + withPeerM e $ request p (GetBlockSize @e h) + + + replicateM_ sizeT $ ContT $ withAsync do + + forever do + + blk <- atomically do + readTVar rates <&> not . IntMap.null >>= STM.check + readTQueue sizeQ + + debug $ green "PEER SIZE THREAD" <+> pretty blk + + + r <- readTVarIO rates <&> IntMap.toDescList + <&> foldMap snd + + + answ <- for r $ \(p,nonce) -> do + lookupSizeIO sizes p blk >>= \case + -- уже спрашивали, отрицает + Left{} -> do + npi <- newPeerInfo + PeerInfo{..} <- withPeerM e $ fetch True npi (PeerInfoKey p) id + + atomically do + setBusySTM nonce busy (Just (setFactor 0 (+(-0.01)))) + modifyTVar _peerDownloadMiss succ + modifyTVar seen (HM.insertWith (+) blk 1) + modifyTVar sizeReq (HM.delete blk) + + debug $ red "NONE:" <+> pretty p <+> pretty blk + pure 0 + + -- уже спрашивали, ответил + Right (Just w) -> do + + atomically do + setBusySTM nonce busy (Just (setFactor 0 (+(-0.01)))) + modifyTVar sizeReq (HM.delete blk) + + debug $ red "SIZE:" <+> pretty p <+> pretty blk <+> pretty w + pure 1 + + -- не спрашивали еще + Right Nothing -> do + (doReq, f) <- atomically do + f <- lookupBusySTM nonce busy + if f > workloadFactor then + pure (False, f) + else do + setBusySTM nonce busy (Just (setFactor 0.01 (+0.01))) + pure (True, f) + + debug $ green "BUSY" <+> pretty p <+> pretty f + + when doReq do + debug $ red "SEND REQUEST FOR SIZE" <+> pretty p <+> pretty blk + async $ do + pause blkInfoLock + atomically (setBusySTM nonce busy (Just (setFactor 0 (+(-0.01))))) + + withPeerM e $ request p (GetBlockSize @e blk) + now <- getTimeCoarse + atomically $ modifyTVar sizeReq (HM.insert blk now) + + pure 0 + + if sum answ > 0 then do + atomically do + here <- readTVar fetchH <&> HS.member blk + readTVar seen <&> HM.delete blk + unless here $ + writeTQueue fetchQ blk + + else do + howMany <- readTVarIO seen <&> (fromMaybe 0 . HM.lookup blk) + pips <- readTVarIO nonces <&> HM.size + -- FIXME: hardcode + when (howMany < 10) do + atomically $ writeTQueue sizeQ blk + + void $ ContT $ withAsync do + -- FIXME: ban-time-hardcode + let loosers = readTVarIO seen <&> fmap (,120) . HM.keys + polling (Polling 1 10) loosers $ \it -> do + atomically $ writeTQueue checkQ it + atomically $ modifyTVar seen (HM.delete it) + + replicateM_ downT $ ContT $ withAsync do + + gen <- newStdGen + + forever do + + flip runContT pure $ callCC \exit -> do + + blk <- atomically $ readTQueue fetchQ + + atomically do + modifyTVar fetchH (HS.insert blk) + + here <- hasBlock sto blk <&> isJust + + when here $ exit () + + debug $ green "PEER DOWNLOAD THREAD" <+> pretty blk + + -- TODO: already-downloaded-possible + + let ws = round . (*trimFactor) <$> randomRs (0, 2.5) gen + + work <- lift $ race (pause @'Seconds 60) $ atomically do + r0 <- readTVar rates <&> IntMap.toList + bsy <- readTVar busy + + let bx nonce = + round $ trimFactor * (1.75 / (1.0 + fromMaybe 0 (HM.lookup nonce bsy))) + + let w = [ (-(v + w0 + bx nonce), p) + | (v, (w0, peers)) <- zip ws r0, p@(_,nonce) <- peers + ] & L.sortOn fst & fmap snd + + avail' <- for w $ \(peer,nonce) -> do + p <- readTVar busy <&> HM.lookup nonce + sz <- lookupSizeSTM sizes peer blk + if p < Just workloadFactor then + pure (Just (peer,nonce, sz)) + else + pure Nothing + + let avail = catMaybes avail' + + STM.check (not $ L.null avail) + + found <- for avail $ \(pip, nonce, msz) -> case msz of + Right (Just sz) -> do + pure $ Just (blk, pip, nonce, sz) + + _ -> pure Nothing + + case headMay (catMaybes found) of + Nothing -> do + writeTQueue checkQ blk + modifyTVar fetchH (HS.delete blk) + pure Nothing + + Just what@(_,_,nonce,_) -> do + setBusySTM nonce busy (Just (setFactor 1.0 (+1.0))) + pure $ Just what + + case work of + Right (Just (b,p,nonce,s)) -> do + debug $ green "WORKER CHOOSEN" <+> pretty p <+> pretty blk <+> pretty s + + bum <- readTVarIO peerz <&> HM.findWithDefault defBurst p + bu <- liftIO $ getCurrentBurst bum + + r <- lift $ race (pause @'Seconds 60) do + downloadFromPeer (TimeoutSec 55) bu (KnownSize s) e (coerce blk) p + + atomically do + setBusySTM nonce busy (Just (setFactor 0 (const 0))) + + npi <- newPeerInfo + PeerInfo{..} <- withPeerM e $ fetch True npi (PeerInfoKey p) id + + debug $ green "DOWNLOAD DONE!" <+> pretty p <+> pretty blk <+> pretty s <+> pretty (isRight r) + + atomically $ modifyTVar fetchH (HS.delete blk) + + case r of + Right block -> do + -- mh <- putBlock sto block + atomically do + modifyTVar _peerDownloaded succ + modifyTVar _peerDownloadedBlk succ + + _ -> do + debug $ red "DOWNLOAD FAILED / TIMEOUT" + + liftIO $ burstMachineAddErrors bum 1 + + atomically do + modifyTVar _peerDownloadFail succ + modifyTVar _peerErrors succ + writeTQueue checkQ blk + + _ -> do + debug $ red "WAIT FOR PEERS TIMEOUT" <+> pretty blk + atomically $ writeTVar busy mempty + + forever do + pause @'Seconds 5 + wip <- readTVarIO inQ <&> HM.size + notice $ yellow "wip" <+> pretty wip + + where + + trimFactor :: Double + trimFactor = 100 + + setFactor d f = \case + Nothing -> Just d + Just v -> Just (g v) + where + g y = f y & max 0 + + setBusySTM nonce busy = \case + Nothing -> modifyTVar busy (HM.delete nonce) + Just fn -> modifyTVar busy (HM.alter fn nonce) + + lookupBusySTM nonce busy = + readTVar busy <&> fromMaybe 0 . HM.lookup nonce + + lookupSizeSTM sizes p h = do + readTVar sizes + <&> HM.lookup (p,h) + <&> \case + Nothing -> Right Nothing + Just (Just x,_) -> Right (Just x) + Just (Nothing,_) -> Left () + + lookupSizeIO sizes p h = do + atomically $ lookupSizeSTM sizes p h + + processBlock :: forall e s m . ( MonadIO m + , MyPeer e + , ForSignedBox s + , s ~ Encryption e + , HasPeerLocator e m + , m ~ PeerM e IO + ) + => Hash HbSync + -> m () + + processBlock h = do + + sto <- withPeerM e getStorage + + let parent = Just h + + block <- liftIO $ getBlock sto h + + let bt = tryDetect h <$> block + + -- FIXME: если блок нашёлся, то удаляем его из wip + + -- when (isJust bt) (deleteBlockFromQ h) + + let handleHrr (hrr :: Either (Hash HbSync) [HashRef]) = do + case hrr of + Left hx -> addDownload @e parent hx + Right hr -> do + + for_ hr $ \(HashRef blk) -> do + + -- debug $ pretty blk + + here <- liftIO (hasBlock sto blk) <&> isJust + + if here then do + pure () + -- debug $ "block" <+> pretty blk <+> "is already here" + -- unless (h == blk) do + -- processBlock blk -- NOTE: хуже не стало + -- FIXME: fugure out if it's really required + + else do + addDownload @e parent blk + + case bt of + Nothing -> addDownload @e mzero h + + Just (SeqRef (SequentialRef n (AnnotatedHashRef a' b))) -> do + maybe1 a' none $ \a -> do + debug $ "GOT AnnotatedHashRef" <+> pretty a + processBlock (fromHashRef a) + + addDownload @e parent (fromHashRef b) + + Just (AnnRef (AnnotatedHashRef ann hx)) -> do + maybe1 ann none $ addDownload @e parent . fromHashRef + addDownload @e parent (fromHashRef hx) + + Just (MerkleAnn ann) -> do + case _mtaMeta ann of + NoMetaData -> pure () + ShortMetadata {} -> pure () + AnnHashRef hx -> addDownload @e parent hx + + case _mtaCrypt ann of + NullEncryption -> pure () + CryptAccessKeyNaClAsymm h -> addDownload @e parent h + EncryptGroupNaClSymm h _ -> addDownload @e parent h + + trace $ "GOT WRAPPED MERKLE. requesting nodes/leaves" <+> pretty h + walkMerkleTree (_mtaTree ann) (liftIO . getBlock sto) handleHrr + + Just (Merkle{}) -> do + trace $ "GOT MERKLE. requesting nodes/leaves" <+> pretty h + walkMerkle h (liftIO . getBlock sto) handleHrr + + Just (Blob{}) -> do + -- NOTE: bundle-ref-detection-note + -- добавлять обработку BundleRefValue в tryDetect + -- слишком накладно, т.к. требует большого количества + -- констрейнтов, которые не предполагались там + -- изначально. Как временная мера -- пробуем Bundle + -- обнаруживать здесь. + pure () + + where + unboxBundleRef (BundleRefValue box) = unboxSignedBox0 box + + + + updateRates e rates nonces = withPeerM e do + + let wRtt = 5 + let wUdp = 1.5 + let wTcp = 1.1 + let wS = 1.5 + let eps = 1e-8 + + forever do + pause @'Seconds 20 + + new <- S.toList_ do + withPeerM e $ forKnownPeers @e $ \peer pd -> do + pinfo <- find (PeerInfoKey peer) id + maybe1 pinfo none $ \pip -> do + + let nonce = _peerOwnNonce pd + + atomically $ modifyTVar nonces (HM.insert peer nonce) + + sr <- readTVarIO (_peerDownloaded pip) + er <- readTVarIO (_peerDownloadFail pip) + + let s = (eps + realToFrac sr) / (eps + realToFrac (sr + er)) + +{- HLINT ignore "Functor law" -} + rtt <- medianPeerRTT pip + <&> fmap ( (/1e9) . realToFrac ) + <&> fromMaybe 1.0 + + let (udp,tcp) = case view sockType peer of + UDP -> (0, wUdp * 1.0) + TCP -> (wTcp * 1.0, 0) + + let r = udp + tcp + wS*s + lift $ S.yield (peer, nonce, (r, rtt)) + + let maxRtt = maximumDef 1.0 [ rtt | (_, _, (_, rtt)) <- new ] + + let mkRate s rtt = round $ trimFactor * (s + wRtt * (1 / (1 + rtt / maxRtt))) + + let newRates = [ (mkRate s rtt, [(p,nonce)] ) + | (p, nonce, (s, rtt)) <- new + ] + + + atomically do + writeTVar rates (IntMap.fromListWith (<>) newRates) + + debug $ green "PEER RATES" <+> line <> vcat (fmap fmt newRates) + + where + fmt (r,prs) = pretty r <+> hcat (fmap (pretty . view _1) prs) + + downloadDispatcher :: forall e m . ( e ~ L4Proto , MonadUnliftIO m ) @@ -549,9 +1091,7 @@ downloadDispatcher brains env = flip runContT pure do let t0 = 10.00 - pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async ()) ) - - rq <- newTQueueIO + pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async (), TQueue (Hash HbSync,Integer)) ) let seenLimit = 1000 @@ -563,14 +1103,12 @@ downloadDispatcher brains env = flip runContT pure do sizeCache <- newTVarIO ( HPSQ.empty :: HashPSQ (HashRef,Peer e) TimeSpec (Maybe Integer) ) - sizeQ <- newTVarIO ( HPSQ.empty :: HashPSQ HashRef NominalDiffTime () ) - sto <- withPeerM env getStorage work <- newTQueueIO ContT $ bracket none $ const do - readTVarIO pts >>= mapM_ cancel + readTVarIO pts >>= mapM_ (cancel . fst) atomically $ writeTVar pts mempty ContT $ withAsync $ forever do @@ -612,9 +1150,9 @@ downloadDispatcher brains env = flip runContT pure do let missChk = readTVarIO blkQ <&> fmap tr . HPSQ.toList where tr (k,_,v) = (k, realToFrac v) - let shiftPrio = \case + let shiftPrio t = \case Nothing -> ((), Nothing) - Just (p,v) -> ((), Just (succ p, v * 1.10 )) + Just (p,v) -> ((), Just (succ p, v * t )) ContT $ withAsync $ polling (Polling 1 1) missChk $ \h -> do @@ -631,11 +1169,73 @@ downloadDispatcher brains env = flip runContT pure do atomically $ modifyTVar sizeCache ( HPSQ.insert (h,p) now w ) debug $ green "GOT SIZE" <+> pretty w <+> pretty h <+> pretty p - atomically $ modifyTVar blkQ ( snd . HPSQ.alter shiftPrio h ) + -- atomically $ modifyTVar blkQ ( snd . HPSQ.alter (shiftPrio 1.10) h ) + + + _wip <- newTVarIO ( mempty :: HashMap HashRef () ) + + ContT $ withAsync $ do + let ws = readTVarIO _wip <&> fmap (,10) . HM.keys + polling (Polling 1 1) ws $ \h -> do + atomically $ modifyTVar _wip (HM.delete h) + + ContT $ withAsync $ do + + pause @'Seconds 10 + + _i <- newTVarIO 0 + + flip runContT pure $ forever do + + blokz <- readTVarIO blkQ <&> fmap (view _1) . HPSQ.toList + + flip runContT pure do + + k <- for blokz $ \what -> callCC \next -> do + + atomically $ modifyTVar _i succ + i <- readTVarIO _i + + here <- hasBlock sto (coerce what) <&> isJust + + when here $ next 0 + + wip <- readTVarIO _wip <&> HM.member what + + when wip $ next 0 + + holders <- readTVarIO sizeCache <&> HPSQ.toList + + wip <- readTVarIO _wip + + let jobs = [ (h,p,s) + | ((h,p),_,Just s) <- holders + , what == h + , not (HM.member h wip) + ] & V.fromList + + when ( V.null jobs ) $ next 0 + + let j = i `mod` V.length jobs + let (h,p,s) = V.unsafeIndex jobs j + worker <- readTVarIO pts <&> HM.lookup p + + n <- maybe1 worker (pure 0) $ \(_,q) -> do + atomically do + modifyTVar _wip $ HM.insert what () + writeTQueue q (coerce h,s) + pure 1 + + debug $ green "FOUND WORKER" <+> pretty p <+> pretty h + pure n + + when ( sum k == 0 ) do + debug $ red "NOTHING" + pause @'Seconds 0.1 ContT $ withAsync $ forever do polling (Polling 1 1) missChk $ \h -> do - debug $ blue "CHECK MISSED BLOCKS" <+> pretty h + -- debug $ blue "CHECK MISSED BLOCKS" <+> pretty h missed <- findMissedBlocks sto h @@ -643,10 +1243,8 @@ downloadDispatcher brains env = flip runContT pure do atomically do for_ missed $ \hm -> modifyTVar blkQ (HPSQ.insert (coerce hm) 2 t0) - if here then do + when here do modifyTVar blkQ (HPSQ.delete (coerce h)) - else - modifyTVar blkQ ( snd . HPSQ.alter shiftPrio h ) ContT $ withAsync (manageThreads pts) -- ContT $ withAsync (sizeQueryLoop pts rq down) @@ -669,8 +1267,9 @@ downloadDispatcher brains env = flip runContT pure do for_ pips $ \p -> do here <- readTVarIO pts <&> HM.member p unless here do - a <- async $ peerDownloadLoop env p - atomically $ modifyTVar pts (HM.insert p a) + q <- newTQueueIO + a <- async $ peerDownloadLoop p q + atomically $ modifyTVar pts (HM.insert p (a,q)) debug $ "downloadDispatcher: knownPeer" <+> yellow (pretty p) @@ -679,7 +1278,7 @@ downloadDispatcher brains env = flip runContT pure do what <- for total $ \(p,a) -> do let pipExist = HS.member p pips - stillAlive <- pollSTM a <&> isNothing + stillAlive <- pollSTM (fst a) <&> isNothing if pipExist && stillAlive then do pure $ Right (p,a) @@ -690,26 +1289,22 @@ downloadDispatcher brains env = flip runContT pure do pure $ lefts what for_ dead $ \(p,a) -> do - cancel a + cancel (fst a) debug $ red "terminating peer loop" <+> pretty p pause @'Seconds 5 - peerDownloadLoop env p = flip runContT pure do - - sto <- withPeerM env getStorage - + peerDownloadLoop p q = do bm <- liftIO $ newBurstMachine 0.5 256 (Just 50) 0.05 0.10 - - -- let blokz = atomically do - -- d <- readTVar down <&> fromMaybe mempty . HM.lookup p - -- pure $ HM.toList d - - -- void $ ContT $ withAsync $ polling (Polling 1 1) blokz $ \h0 -> do - -- debug $ "POLL BLOCK DOWNLOAD" <+> pretty h0 - forever do - pause @'Seconds 10 - debug $ yellow "Peer download loop" <+> pretty p + (h,s) <- atomically $ readTQueue q + bu <- getCurrentBurst bm + blk <- downloadFromPeer (TimeoutSec 5) bu (KnownSize s) env (coerce h) p + case blk of + Left{} -> pure () + Right bs -> liftIO $ withPeerM env do + let refs = extractBlockRefs h bs + for_ refs $ \r -> do + addDownload @e (Just (coerce h)) r diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 9657aeea..c4b4a8b6 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -1197,7 +1197,7 @@ runPeer opts = respawnOnError opts $ do peerThread "pexLoop" (pexLoop @e brains tcp) -- FIXME: new-download-loop - peerThread "downloadDispatcher" (downloadDispatcher (SomeBrains brains) env) + peerThread "downloadDispatcher" (downloadDispatcher2 (SomeBrains brains) env) peerThread "fillPeerMeta" (fillPeerMeta tcp tcpProbeWait)