From f03450961be0e3380a831db23fd1a77ea48470bd Mon Sep 17 00:00:00 2001 From: voidlizard Date: Sun, 10 Nov 2024 10:01:08 +0300 Subject: [PATCH] fucked --- hbs2-peer/app/BlockDownloadNew.hs | 679 +----------------------------- hbs2-peer/app/PeerMain.hs | 2 +- 2 files changed, 13 insertions(+), 668 deletions(-) diff --git a/hbs2-peer/app/BlockDownloadNew.hs b/hbs2-peer/app/BlockDownloadNew.hs index 491385df..c32c028a 100644 --- a/hbs2-peer/app/BlockDownloadNew.hs +++ b/hbs2-peer/app/BlockDownloadNew.hs @@ -552,535 +552,6 @@ instance BlockSizeCache e KnownSize where findBlockSize (KnownSize s) _ _ = pure (Just s) -downloadDispatcher2 :: forall e m . ( e ~ L4Proto - , MonadUnliftIO m - ) - => SomeBrains e - -> PeerEnv e - -> m () - -downloadDispatcher2 brains e = do - - let blkInfoLock = 5 :: Timeout 'Seconds - let blkWaitLock = 60 :: Timeout 'Seconds - let workloadFactor = 2.5 - - sto <- withPeerM e getStorage - - let downT = 16 - let sizeT = 4 - - inQ <- newTVarIO mempty - sizeRQ <- newTQueueIO - checkQ <- newTQueueIO - - -- inQ <- withDownload env0 $ asks (view blockInQ) - -- checkQ <- withDownload env0 $ asks (view blockCheckQ) - sizeQ <- newTQueueIO - fetchQ <- newTQueueIO - parseQ <- newTQueueIO - -- sizeRQ <- withDownload env0 $ asks (view blockSizeRecvQ) - - -- FIXME: cleanup-nonce - nonces <- newTVarIO (mempty :: HashMap (Peer e) PeerNonce) - - -- FIXME: cleanup-busy - busy <- newTVarIO (mempty :: HashMap PeerNonce Double) - - rates <- newTVarIO (mempty :: IntMap.IntMap [(Peer e,PeerNonce)]) - - fetchH <- newTVarIO (mempty :: HashSet (Hash HbSync)) - sizes <- newTVarIO (mempty :: HashMap (Peer e, Hash HbSync) (Maybe Integer, TimeSpec)) - sizeReq <- newTVarIO (mempty :: HashMap (Hash HbSync) TimeSpec) - - seen <- newTVarIO (mempty :: HashMap (Hash HbSync) Int) - - peerz <- newTVarIO ( mempty :: HashMap (Peer e) BurstMachine) - - defBurst <- liftIO $ newBurstMachine 0.5 256 (Just 50) 0.05 0.10 - - liftIO $ withPeerM e do - subscribe @e DownloadReqKey $ \(DownloadReqData h) -> do - here <- hasBlock sto h <&> isJust - unless here do - atomically $ modifyTVar inQ (HM.insert h ()) - atomically $ writeTQueue sizeQ h - - flip runContT pure do - - -- UPDATE-STATS-LOOP - void $ ContT $ withAsync $ updateRates e rates nonces - - void $ ContT $ withAsync $ forever do - pips <- withPeerM e $ getKnownPeers @e <&> HS.fromList - - for_ pips $ \p -> do - here <- readTVarIO peerz <&> HM.member p - - unless here do - newBum <- liftIO $ newBurstMachine 0.5 256 (Just 50) 0.05 0.10 - atomically $ modifyTVar peerz (HM.insert p newBum) - - atomically do - modifyTVar peerz ( HM.filterWithKey (\k _ -> HS.member k pips)) - - pause @Seconds 5 - - void $ ContT $ withAsync do - forever do - pause @'Seconds 120 - atomically do - q <- readTVar inQ - let isInQ x = HM.member x q - modifyTVar' fetchH (HS.filter isInQ) - modifyTVar' sizeReq (HM.filterWithKey (curry (isInQ . fst))) - modifyTVar' sizes (HM.filterWithKey (curry (isInQ . snd . fst))) - modifyTVar' seen (HM.filterWithKey (curry (isInQ . fst))) - - livePeers <- readTVar rates <&> mconcat . IntMap.elems - let liveNonce = HS.fromList (fmap snd livePeers) - let livePeer = HS.fromList (fmap fst livePeers) - - modifyTVar' busy (HM.filterWithKey (\x _ -> HS.member x liveNonce)) - modifyTVar' nonces (HM.filterWithKey (\x _ -> HS.member x livePeer)) - - replicateM_ downT $ ContT $ withAsync do - forever do - blk <- atomically $ readTQueue checkQ - here <- hasBlock sto blk <&> isJust - if not here then do - atomically $ writeTQueue sizeQ blk - else do - atomically $ writeTQueue parseQ blk - - void $ ContT $ withAsync $ liftIO $ withPeerM e do - forever do - blk <- atomically $ readTQueue parseQ - - blks <- findMissedBlocks sto (HashRef blk) - - for_ blks $ \b -> do - addDownload @e (Just blk) (fromHashRef b) - - processBlock blk - atomically $ modifyTVar inQ (HM.delete (coerce blk)) - - replicateM_ 1 $ ContT $ withAsync do - forever do - - -- pause @'Seconds 0.25 - - items <- atomically do - peekTQueue sizeRQ >> STM.flushTQueue sizeRQ - - now <- getTimeCoarse - - todo <- atomically do - w <- for items $ \(p,h,s) -> do - modifyTVar sizes (HM.insert (p,h) (s, now)) - readTVar nonces <&> HM.lookup p >>= \case - Nothing -> pure () - Just nonce -> setBusySTM nonce busy (Just (setFactor 0 (0.01-))) - pure h - - for (L.nub w) pure - - for_ todo $ \b -> do - here <- hasBlock sto b <&> isJust - - already <- atomically do - readTVar fetchH <&> HS.member b - - when (not here && not already) do - atomically $ writeTQueue fetchQ b - - replicateM_ sizeT $ ContT $ withAsync do - - -- TODO: trim-sizeReq - let blocks = readTVarIO sizeReq <&> HM.keys <&> fmap (,2) - - polling (Polling 1 1) blocks $ \h -> do - pips <- readTVarIO nonces <&> HM.keys - s <- readTVarIO sizes <&> HM.toList - - for_ pips $ \p -> do - here <- lookupSizeIO sizes p h <&> isRight - - if here then do - atomically $ modifyTVar sizeReq (HM.delete h) - else - withPeerM e $ request p (GetBlockSize @e h) - - - replicateM_ sizeT $ ContT $ withAsync do - - forever do - - blk <- atomically do - readTVar rates <&> not . IntMap.null >>= STM.check - readTQueue sizeQ - - debug $ green "PEER SIZE THREAD" <+> pretty blk - - - r <- readTVarIO rates <&> IntMap.toDescList - <&> foldMap snd - - - answ <- for r $ \(p,nonce) -> do - lookupSizeIO sizes p blk >>= \case - -- уже спрашивали, отрицает - Left{} -> do - npi <- newPeerInfo - PeerInfo{..} <- withPeerM e $ fetch True npi (PeerInfoKey p) id - - atomically do - setBusySTM nonce busy (Just (setFactor 0 (+(-0.01)))) - modifyTVar _peerDownloadMiss succ - modifyTVar seen (HM.insertWith (+) blk 1) - modifyTVar sizeReq (HM.delete blk) - - debug $ red "NONE:" <+> pretty p <+> pretty blk - pure 0 - - -- уже спрашивали, ответил - Right (Just w) -> do - - atomically do - setBusySTM nonce busy (Just (setFactor 0 (+(-0.01)))) - modifyTVar sizeReq (HM.delete blk) - - debug $ red "SIZE:" <+> pretty p <+> pretty blk <+> pretty w - pure 1 - - -- не спрашивали еще - Right Nothing -> do - (doReq, f) <- atomically do - f <- lookupBusySTM nonce busy - if f > workloadFactor then - pure (False, f) - else do - setBusySTM nonce busy (Just (setFactor 0.01 (+0.01))) - pure (True, f) - - debug $ green "BUSY" <+> pretty p <+> pretty f - - when doReq do - debug $ red "SEND REQUEST FOR SIZE" <+> pretty p <+> pretty blk - async $ do - pause blkInfoLock - atomically (setBusySTM nonce busy (Just (setFactor 0 (+(-0.01))))) - - withPeerM e $ request p (GetBlockSize @e blk) - now <- getTimeCoarse - atomically $ modifyTVar sizeReq (HM.insert blk now) - - pure 0 - - if sum answ > 0 then do - atomically do - here <- readTVar fetchH <&> HS.member blk - readTVar seen <&> HM.delete blk - unless here $ - writeTQueue fetchQ blk - - else do - howMany <- readTVarIO seen <&> (fromMaybe 0 . HM.lookup blk) - pips <- readTVarIO nonces <&> HM.size - -- FIXME: hardcode - when (howMany < 10) do - atomically $ writeTQueue sizeQ blk - - void $ ContT $ withAsync do - -- FIXME: ban-time-hardcode - let loosers = readTVarIO seen <&> fmap (,120) . HM.keys - polling (Polling 1 10) loosers $ \it -> do - atomically $ writeTQueue checkQ it - atomically $ modifyTVar seen (HM.delete it) - - replicateM_ downT $ ContT $ withAsync do - - gen <- newStdGen - - forever do - - flip runContT pure $ callCC \exit -> do - - blk <- atomically $ readTQueue fetchQ - - atomically do - modifyTVar fetchH (HS.insert blk) - - here <- hasBlock sto blk <&> isJust - - when here $ exit () - - debug $ green "PEER DOWNLOAD THREAD" <+> pretty blk - - -- TODO: already-downloaded-possible - - let ws = round . (*trimFactor) <$> randomRs (0, 2.5) gen - - work <- lift $ race (pause @'Seconds 60) $ atomically do - r0 <- readTVar rates <&> IntMap.toList - bsy <- readTVar busy - - let bx nonce = - round $ trimFactor * (1.75 / (1.0 + fromMaybe 0 (HM.lookup nonce bsy))) - - let w = [ (-(v + w0 + bx nonce), p) - | (v, (w0, peers)) <- zip ws r0, p@(_,nonce) <- peers - ] & L.sortOn fst & fmap snd - - avail' <- for w $ \(peer,nonce) -> do - p <- readTVar busy <&> HM.lookup nonce - sz <- lookupSizeSTM sizes peer blk - if p < Just workloadFactor then - pure (Just (peer,nonce, sz)) - else - pure Nothing - - let avail = catMaybes avail' - - STM.check (not $ L.null avail) - - found <- for avail $ \(pip, nonce, msz) -> case msz of - Right (Just sz) -> do - pure $ Just (blk, pip, nonce, sz) - - _ -> pure Nothing - - case headMay (catMaybes found) of - Nothing -> do - writeTQueue checkQ blk - modifyTVar fetchH (HS.delete blk) - pure Nothing - - Just what@(_,_,nonce,_) -> do - setBusySTM nonce busy (Just (setFactor 1.0 (+1.0))) - pure $ Just what - - case work of - Right (Just (b,p,nonce,s)) -> do - debug $ green "WORKER CHOOSEN" <+> pretty p <+> pretty blk <+> pretty s - - bum <- readTVarIO peerz <&> HM.findWithDefault defBurst p - bu <- liftIO $ getCurrentBurst bum - - r <- lift $ race (pause @'Seconds 60) do - downloadFromPeer (TimeoutSec 55) bu (KnownSize s) e (coerce blk) p - - atomically do - setBusySTM nonce busy (Just (setFactor 0 (const 0))) - - npi <- newPeerInfo - PeerInfo{..} <- withPeerM e $ fetch True npi (PeerInfoKey p) id - - debug $ green "DOWNLOAD DONE!" <+> pretty p <+> pretty blk <+> pretty s <+> pretty (isRight r) - - atomically $ modifyTVar fetchH (HS.delete blk) - - case r of - Right block -> do - -- mh <- putBlock sto block - atomically do - modifyTVar _peerDownloaded succ - modifyTVar _peerDownloadedBlk succ - - _ -> do - debug $ red "DOWNLOAD FAILED / TIMEOUT" - - liftIO $ burstMachineAddErrors bum 1 - - atomically do - modifyTVar _peerDownloadFail succ - modifyTVar _peerErrors succ - writeTQueue checkQ blk - - _ -> do - debug $ red "WAIT FOR PEERS TIMEOUT" <+> pretty blk - atomically $ writeTVar busy mempty - - forever do - pause @'Seconds 5 - wip <- readTVarIO inQ <&> HM.size - notice $ yellow "wip" <+> pretty wip - - where - - trimFactor :: Double - trimFactor = 100 - - setFactor d f = \case - Nothing -> Just d - Just v -> Just (g v) - where - g y = f y & max 0 - - setBusySTM nonce busy = \case - Nothing -> modifyTVar busy (HM.delete nonce) - Just fn -> modifyTVar busy (HM.alter fn nonce) - - lookupBusySTM nonce busy = - readTVar busy <&> fromMaybe 0 . HM.lookup nonce - - lookupSizeSTM sizes p h = do - readTVar sizes - <&> HM.lookup (p,h) - <&> \case - Nothing -> Right Nothing - Just (Just x,_) -> Right (Just x) - Just (Nothing,_) -> Left () - - lookupSizeIO sizes p h = do - atomically $ lookupSizeSTM sizes p h - - processBlock :: forall e s m . ( MonadIO m - , MyPeer e - , ForSignedBox s - , s ~ Encryption e - , HasPeerLocator e m - , m ~ PeerM e IO - ) - => Hash HbSync - -> m () - - processBlock h = do - - sto <- withPeerM e getStorage - - let parent = Just h - - block <- liftIO $ getBlock sto h - - let bt = tryDetect h <$> block - - -- FIXME: если блок нашёлся, то удаляем его из wip - - -- when (isJust bt) (deleteBlockFromQ h) - - let handleHrr (hrr :: Either (Hash HbSync) [HashRef]) = do - case hrr of - Left hx -> addDownload @e parent hx - Right hr -> do - - for_ hr $ \(HashRef blk) -> do - - -- debug $ pretty blk - - here <- liftIO (hasBlock sto blk) <&> isJust - - if here then do - pure () - -- debug $ "block" <+> pretty blk <+> "is already here" - -- unless (h == blk) do - -- processBlock blk -- NOTE: хуже не стало - -- FIXME: fugure out if it's really required - - else do - addDownload @e parent blk - - case bt of - Nothing -> addDownload @e mzero h - - Just (SeqRef (SequentialRef n (AnnotatedHashRef a' b))) -> do - maybe1 a' none $ \a -> do - debug $ "GOT AnnotatedHashRef" <+> pretty a - processBlock (fromHashRef a) - - addDownload @e parent (fromHashRef b) - - Just (AnnRef (AnnotatedHashRef ann hx)) -> do - maybe1 ann none $ addDownload @e parent . fromHashRef - addDownload @e parent (fromHashRef hx) - - Just (MerkleAnn ann) -> do - case _mtaMeta ann of - NoMetaData -> pure () - ShortMetadata {} -> pure () - AnnHashRef hx -> addDownload @e parent hx - - case _mtaCrypt ann of - NullEncryption -> pure () - CryptAccessKeyNaClAsymm h -> addDownload @e parent h - EncryptGroupNaClSymm h _ -> addDownload @e parent h - - trace $ "GOT WRAPPED MERKLE. requesting nodes/leaves" <+> pretty h - walkMerkleTree (_mtaTree ann) (liftIO . getBlock sto) handleHrr - - Just (Merkle{}) -> do - trace $ "GOT MERKLE. requesting nodes/leaves" <+> pretty h - walkMerkle h (liftIO . getBlock sto) handleHrr - - Just (Blob{}) -> do - -- NOTE: bundle-ref-detection-note - -- добавлять обработку BundleRefValue в tryDetect - -- слишком накладно, т.к. требует большого количества - -- констрейнтов, которые не предполагались там - -- изначально. Как временная мера -- пробуем Bundle - -- обнаруживать здесь. - pure () - - where - unboxBundleRef (BundleRefValue box) = unboxSignedBox0 box - - - - updateRates e rates nonces = withPeerM e do - - let wRtt = 5 - let wUdp = 1.5 - let wTcp = 1.1 - let wS = 1.5 - let eps = 1e-8 - - forever do - pause @'Seconds 20 - - new <- S.toList_ do - withPeerM e $ forKnownPeers @e $ \peer pd -> do - pinfo <- find (PeerInfoKey peer) id - maybe1 pinfo none $ \pip -> do - - let nonce = _peerOwnNonce pd - - atomically $ modifyTVar nonces (HM.insert peer nonce) - - sr <- readTVarIO (_peerDownloaded pip) - er <- readTVarIO (_peerDownloadFail pip) - - let s = (eps + realToFrac sr) / (eps + realToFrac (sr + er)) - -{- HLINT ignore "Functor law" -} - rtt <- medianPeerRTT pip - <&> fmap ( (/1e9) . realToFrac ) - <&> fromMaybe 1.0 - - let (udp,tcp) = case view sockType peer of - UDP -> (0, wUdp * 1.0) - TCP -> (wTcp * 1.0, 0) - - let r = udp + tcp + wS*s - lift $ S.yield (peer, nonce, (r, rtt)) - - let maxRtt = maximumDef 1.0 [ rtt | (_, _, (_, rtt)) <- new ] - - let mkRate s rtt = round $ trimFactor * (s + wRtt * (1 / (1 + rtt / maxRtt))) - - let newRates = [ (mkRate s rtt, [(p,nonce)] ) - | (p, nonce, (s, rtt)) <- new - ] - - - atomically do - writeTVar rates (IntMap.fromListWith (<>) newRates) - - debug $ green "PEER RATES" <+> line <> vcat (fmap fmt newRates) - - where - fmt (r,prs) = pretty r <+> hcat (fmap (pretty . view _1) prs) - - downloadDispatcher :: forall e m . ( e ~ L4Proto , MonadUnliftIO m ) @@ -1095,14 +566,8 @@ downloadDispatcher brains env = flip runContT pure do let seenLimit = 1000 - seen <- newTVarIO ( HPSQ.empty :: HashPSQ HashRef TimeSpec () ) - blkQ <- newTVarIO ( HPSQ.empty :: HashPSQ HashRef Int NominalDiffTime ) - let sizeCacheLimit = 10000 - - sizeCache <- newTVarIO ( HPSQ.empty :: HashPSQ (HashRef,Peer e) TimeSpec (Maybe Integer) ) - sto <- withPeerM env getStorage work <- newTQueueIO @@ -1111,153 +576,33 @@ downloadDispatcher brains env = flip runContT pure do readTVarIO pts >>= mapM_ (cancel . fst) atomically $ writeTVar pts mempty - ContT $ withAsync $ forever do - join $ atomically (readTQueue work) - - ContT $ withAsync $ forever do - pause @'Seconds 600 - - debug $ "CLEANUP SEEN" - atomically do - fix \next -> do - n <- readTVar seen <&> HPSQ.size - when (n > seenLimit) do - modifyTVar seen HPSQ.deleteMin - next - - debug $ "CLEANUP SIZES" - atomically do - fix \next -> do - n <- readTVar sizeCache <&> HPSQ.size - when (n > sizeCacheLimit) do - modifyTVar sizeCache HPSQ.deleteMin - next - liftIO $ withPeerM env do subscribe @e DownloadReqKey $ \(DownloadReqData h) -> do now <- getTimeCoarse - new <- atomically do - already <- readTVar seen <&> HPSQ.member (HashRef h) - if already then do - pure False - else do - modifyTVar seen ( HPSQ.insert (HashRef h) now () ) - modifyTVar blkQ ( HPSQ.insert (HashRef h) 1 t0 ) - pure True - when new do + here <- hasBlock sto h <&> isJust + already <- readTVarIO blkQ <&> HPSQ.member (HashRef h) + unless (here || already) do + atomically $ modifyTVar blkQ (HPSQ.insert (HashRef h) 1 10) debug $ green "New download request" <+> pretty h - let missChk = readTVarIO blkQ <&> fmap tr . HPSQ.toList - where tr (k,_,v) = (k, realToFrac v) + ContT $ withAsync $ manageThreads pts - let shiftPrio t = \case - Nothing -> ((), Nothing) - Just (p,v) -> ((), Just (succ p, v * t )) - ContT $ withAsync $ - polling (Polling 1 1) missChk $ \h -> do + ContT $ withAsync do + let sq = readTVarIO blkQ <&> fmap (\(a,_,b) -> (a,b)) . HPSQ.toList + polling (Polling 1 1) sq $ \h -> do pips <- readTVarIO pts <&> HM.keys forConcurrently_ pips $ \p -> do - now <- getTimeCoarse - here <- readTVarIO sizeCache <&> HPSQ.member (h,p) - hereB <- hasBlock sto (coerce h) <&> isJust - when (not here && not hereB) do - size <- queryBlockSizeFromPeer brains env (coerce h) p - case size of - Left{} -> pure () - Right w -> do - atomically $ modifyTVar sizeCache ( HPSQ.insert (h,p) now w ) - debug $ green "GOT SIZE" <+> pretty w <+> pretty h <+> pretty p - - -- atomically $ modifyTVar blkQ ( snd . HPSQ.alter (shiftPrio 1.10) h ) - - - _wip <- newTVarIO ( mempty :: HashMap HashRef () ) - - ContT $ withAsync $ do - let ws = readTVarIO _wip <&> fmap (,10) . HM.keys - polling (Polling 1 1) ws $ \h -> do - atomically $ modifyTVar _wip (HM.delete h) - - ContT $ withAsync $ do - - pause @'Seconds 10 - - _i <- newTVarIO 0 - - flip runContT pure $ forever do - - blokz <- readTVarIO blkQ <&> fmap (view _1) . HPSQ.toList - - flip runContT pure do - - k <- for blokz $ \what -> callCC \next -> do - - atomically $ modifyTVar _i succ - i <- readTVarIO _i - - here <- hasBlock sto (coerce what) <&> isJust - - when here $ next 0 - - wip <- readTVarIO _wip <&> HM.member what - - when wip $ next 0 - - holders <- readTVarIO sizeCache <&> HPSQ.toList - - wip <- readTVarIO _wip - - let jobs = [ (h,p,s) - | ((h,p),_,Just s) <- holders - , what == h - , not (HM.member h wip) - ] & V.fromList - - when ( V.null jobs ) $ next 0 - - let j = i `mod` V.length jobs - let (h,p,s) = V.unsafeIndex jobs j - worker <- readTVarIO pts <&> HM.lookup p - - n <- maybe1 worker (pure 0) $ \(_,q) -> do - atomically do - modifyTVar _wip $ HM.insert what () - writeTQueue q (coerce h,s) - pure 1 - - debug $ green "FOUND WORKER" <+> pretty p <+> pretty h - pure n - - when ( sum k == 0 ) do - debug $ red "NOTHING" - pause @'Seconds 0.1 - - ContT $ withAsync $ forever do - polling (Polling 1 1) missChk $ \h -> do - -- debug $ blue "CHECK MISSED BLOCKS" <+> pretty h - - missed <- findMissedBlocks sto h - - here <- hasBlock sto (coerce h) <&> isJust - - atomically do - for_ missed $ \hm -> modifyTVar blkQ (HPSQ.insert (coerce hm) 2 t0) - when here do - modifyTVar blkQ (HPSQ.delete (coerce h)) - - ContT $ withAsync (manageThreads pts) - -- ContT $ withAsync (sizeQueryLoop pts rq down) + s <- queryBlockSizeFromPeer brains env (coerce h) p + case s of + Right x -> debug $ red "GOT SIZE FOR BLOCK" <+> pretty h <+> pretty x <+> pretty p + _ -> none forever do pause @'Seconds 10 size <- atomically $ readTVar blkQ <&> HPSQ.size - seenSize <- atomically $ readTVar seen <&> HPSQ.size - sizeCacheSize <- atomically $ readTVar sizeCache <&> HPSQ.size debug $ yellow $ "I'm download dispatcher" debug $ yellow $ "wip:" <+> pretty size - debug $ yellow $ "seen:" <+> pretty seenSize - debug $ yellow $ "sizes:" <+> pretty sizeCacheSize where diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index c4b4a8b6..9657aeea 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -1197,7 +1197,7 @@ runPeer opts = respawnOnError opts $ do peerThread "pexLoop" (pexLoop @e brains tcp) -- FIXME: new-download-loop - peerThread "downloadDispatcher" (downloadDispatcher2 (SomeBrains brains) env) + peerThread "downloadDispatcher" (downloadDispatcher (SomeBrains brains) env) peerThread "fillPeerMeta" (fillPeerMeta tcp tcpProbeWait)