diff --git a/.fixme/log b/.fixme/log index 8e990696..8b35d6d2 100644 --- a/.fixme/log +++ b/.fixme/log @@ -1,3 +1,7 @@ (fixme-set "workflow" "test" "6kx1sdj7ej") -(fixme-set "assigned" "voidlizard" "6kx1sdj7ej") \ No newline at end of file +fixme-del "2N9TakVmJZ" +fixme-del "A7YuQZdSy9" +(fixme-set "workflow" "test" "7naMmLv2Fn") +fixme-del "7GUKGpHTpV" +fixme-del "4DzcYjazuz" \ No newline at end of file diff --git a/hbs2-core/lib/HBS2/Defaults.hs b/hbs2-core/lib/HBS2/Defaults.hs index 5d99a5fe..d94e44ab 100644 --- a/hbs2-core/lib/HBS2/Defaults.hs +++ b/hbs2-core/lib/HBS2/Defaults.hs @@ -13,7 +13,7 @@ defMessageQueueSize :: Integral a => a defMessageQueueSize = 65536*10 defBurst :: Integral a => a -defBurst = 2 +defBurst = 4 defBurstMax :: Integral a => a defBurstMax = 64 @@ -64,7 +64,7 @@ defBlockPostponeTime :: TimeSpec defBlockPostponeTime = toTimeSpec ( 45 :: Timeout 'Seconds) defBlockBanTimeSec :: Timeout 'Seconds -defBlockBanTimeSec = 30 :: Timeout 'Seconds +defBlockBanTimeSec = 60 :: Timeout 'Seconds defBlockWipTimeout :: TimeSpec defBlockWipTimeout = defCookieTimeout diff --git a/hbs2-peer/app/BlockDownload.hs b/hbs2-peer/app/BlockDownload.hs index 08d9e266..c21727ca 100644 --- a/hbs2-peer/app/BlockDownload.hs +++ b/hbs2-peer/app/BlockDownload.hs @@ -27,12 +27,15 @@ import Brains import Control.Concurrent.Async import Control.Concurrent.STM +import Control.Concurrent.STM.TSem import Control.Monad.Reader import Control.Monad.Trans.Maybe import Data.ByteString.Lazy (ByteString) import Data.Cache qualified as Cache import Data.Foldable hiding (find) import Data.HashMap.Strict qualified as HashMap +import Data.HashSet (HashSet) +import Data.HashSet qualified as HashSet import Data.IntMap (IntMap) import Data.IntMap qualified as IntMap import Data.IntSet qualified as IntSet @@ -41,9 +44,8 @@ import Data.Maybe import Lens.Micro.Platform import System.Random (randomRIO) import System.Random.Shuffle (shuffleM) -import Numeric (showGFloat) -getBlockForDownload :: forall e m . (MonadIO m, IsPeerAddr e m, MyPeer e) +getBlockForDownload :: forall e m . (MonadIO m, IsPeerAddr e m, MyPeer e, HasStorage m) => Peer e -> BlockDownloadM e m (Maybe (Hash HbSync)) @@ -53,42 +55,37 @@ getBlockForDownload peer = do brains <- asks (view downloadBrains) prop <- asks (view blockProposed) - inq <- liftIO $ readTVarIO tinq - let size = HashMap.size inq + sto <- lift getStorage - if size == 0 then + inq <- liftIO $ readTVarIO tinq + -- let size = HashMap.size inq + + let allBlks = HashMap.keys inq + + hs' <- forM allBlks $ \blk -> do + here <- liftIO $ hasBlock sto blk <&> isJust + newOne <- shouldDownloadBlock @e brains peer blk + + if not here && newOne then do + pure $ Just blk + else do + po <- shouldPostponeBlock @e brains blk + + when po do + postponeBlock blk + + pure Nothing + + let hs = catMaybes hs' + let size = length hs + + if size == 0 then do pure Nothing else do i <- randomRIO (0, size - 1) let blk = HashMap.keys inq !! i - peers <- advisePeersForBlock @e brains blk + pure $ Just blk - proposed <- liftIO $ Cache.lookup prop (blk, peer) <&> isJust - - r <- if | proposed -> do - pure Nothing - - | List.null peers -> do - pure $ Just blk - - | pa `elem` peers -> do - pure $ Just blk - - | otherwise -> do - newOne <- shouldDownloadBlock @e brains peer blk - let chance = if newOne then 1 else 5 - lucky <- liftIO $ shuffleM (True : replicate chance False) <&> headDef False - if lucky then - pure $ Just blk - else do - pure Nothing - - case r of - Nothing -> none - Just h -> do - liftIO $ Cache.insert prop (h, peer) () - - pure r processBlock :: forall e m . ( MonadIO m , HasStorage m @@ -383,6 +380,7 @@ blockDownloadLoop :: forall e m . ( m ~ PeerM e IO , Block ByteString ~ ByteString , PeerMessaging e , IsPeerAddr e m + , HasPeerLocator e m ) => DownloadEnv e -> m () blockDownloadLoop env0 = do @@ -395,64 +393,7 @@ blockDownloadLoop env0 = do pause @'Seconds 3.81 - void $ liftIO $ async $ forever $ withPeerM e $ withDownload env0 do - pause @'Seconds 10 - debug "I'm peer thread sweeping thread" - - known <- knownPeers @e pl - - peers' <- forM known $ \p -> do - auth <- lift $ find (KnownPeerKey p) id <&> isJust - pinfo <- lift $ find (PeerInfoKey p) id <&> isJust - if auth && pinfo then - pure [(p,())] - else - pure mempty - - let auth = HashMap.fromList (mconcat peers') - - pts <- asks (view peerThreads) - - r <- liftIO $ atomically $ stateTVar pts $ \x -> - let items = HashMap.toList x - in let (alive,dead) = List.partition (\(k,_) -> HashMap.member k auth ) items - in (dead, HashMap.fromList alive) - - debug $ "peers to delete" <+> pretty (length r) - - for_ r $ killPeerThread . fst - - void $ liftIO $ async $ forever $ withPeerM e do - pause @'Seconds 1 - -- debug "I'm a peer maintaining thread" - - brains <- withDownload env0 $ asks (view downloadBrains) - pee <- knownPeers @e pl - - onKnownPeers brains pee - - for_ pee $ \p -> do - pinfo' <- find (PeerInfoKey p) id - auth <- find (KnownPeerKey p) id <&> isJust - maybe1 pinfo' none $ \pinfo -> do - - fails <- liftIO $ readTVarIO (view peerDownloadFail pinfo) - - when (fails >= defDownloadFails) do - trace $ "peer" <+> pretty p <+> "has too many failures:" <+> pretty fails - - here <- withDownload env0 $ hasPeerThread p - - if | not here && auth -> do - - debug $ "peer" <+> pretty p <+> "does not have a thread" - runPeer <- liftIO $ async $ liftIO (withPeerM e $ withDownload env0 (peerDownloadLoop p)) - withDownload env0 $ newPeerThread p runPeer - - | not auth -> do - pure () - - | otherwise -> pure () + let withAllStuff = withPeerM e . withDownload env0 void $ liftIO $ async $ forever $ withPeerM e do pause @'Seconds 30 @@ -475,53 +416,117 @@ blockDownloadLoop env0 = do pinfo <- fetch True npi (PeerInfoKey p) id updatePeerInfo False pinfo - -- TODO: peer info loop - void $ liftIO $ async $ forever $ withPeerM e $ do - pause @'Seconds 10 - pee <- knownPeers @e pl - npi <- newPeerInfo - - debug $ "known peers" <+> pretty pee - - for_ pee $ \p -> do - pinfo <- fetch True npi (PeerInfoKey p) id - burst <- liftIO $ readTVarIO (view peerBurst pinfo) - buM <- liftIO $ readTVarIO (view peerBurstMax pinfo) - errors <- liftIO $ readTVarIO (view peerErrorsPerSec pinfo) - downFails <- liftIO $ readTVarIO (view peerDownloadFail pinfo) - down <- liftIO $ readTVarIO (view peerDownloadedBlk pinfo) - rtt <- liftIO $ medianPeerRTT pinfo <&> fmap realToFrac - - let rttMs = (/1e6) <$> rtt <&> (\x -> showGFloat (Just 2) x "") <&> (<> "ms") - - notice $ "peer" <+> pretty p <+> "burst:" <+> pretty burst - <+> "burst-max:" <+> pretty buM - <+> "errors:" <+> pretty (downFails + errors) - <+> "down:" <+> pretty down - <+> "rtt:" <+> pretty rttMs - pure () - - void $ liftIO $ async $ forever $ withPeerM e $ withDownload env0 do + void $ liftIO $ async $ forever $ withAllStuff do pause @'Seconds 5 -- FIXME: put to defaults -- we need to show download stats wipNum <- asks (view blockInQ) >>= liftIO . readTVarIO <&> HashMap.size - let po = 0 + po <- postponedNum notice $ "maintain blocks wip" <+> pretty wipNum <+> "postponed" <+> pretty po + busyPeers <- liftIO $ newTVarIO (mempty :: HashSet (Peer e)) + released <- liftIO newTQueueIO + + npi <- newPeerInfo + + liftIO $ withAllStuff do + brains <- asks (view downloadBrains) + + fix \next -> do + wipNum <- asks (view blockInQ) >>= liftIO . readTVarIO <&> HashMap.size + + when (wipNum == 0) do + pause @'Seconds 1 + next + + allPips <- lift $ getKnownPeers @e + + onKnownPeers brains allPips + + pips <- flip filterM allPips $ + \p -> liftIO do + busy <- readTVarIO busyPeers <&> HashSet.member p + pure $ not busy + + when (List.null pips) do + void $ liftIO $ race (pause @'Seconds 5) $ do + void $ liftIO $ atomically $ do + p <- readTQueue released + ps <- flushTQueue released + for_ (p:ps) $ \x -> do + modifyTVar busyPeers (HashSet.delete x) + next + + for_ pips $ \p -> do + h0 <- getBlockForDownload p + + -- trace $ "getBlockForDownload" <+> pretty p <+> pretty h0 + + -- FIXME: busyloop-when-no-block-for-peer + maybe1 h0 (pure ()) $ \h -> do + + liftIO $ atomically $ do + modifyTVar busyPeers (HashSet.insert p) + + void $ liftIO $ async $ withAllStuff do + + -- trace $ "start downloading shit" <+> pretty p <+> pretty h + + lift $ onBlockDownloadAttempt brains p h + + pinfo <- lift $ fetch True npi (PeerInfoKey p) id + size' <- blockSize brains p h + + esize <- case size' of + Nothing -> do + doBlockSizeRequest p h + + Just s -> pure (Right (Just s)) + + case esize of + Left{} -> pure () + Right Nothing -> do + let downMiss = view peerDownloadMiss pinfo + liftIO $ atomically $ modifyTVar downMiss succ + + Right (Just size) -> do + -- trace $ "BLOCK SIZE" <+> pretty p <+> pretty h <+> pretty size + let downFail = view peerDownloadFail pinfo + let downBlk = view peerDownloadedBlk pinfo + + r <- liftIO $ race ( pause defBlockWaitMax ) + $ withAllStuff + $ downloadFromWithPeer p size h + case r of + Left{} -> do + liftIO $ atomically $ modifyTVar downFail succ + failedDownload p h + + Right{} -> do + onBlockDownloaded brains p h + processBlock h + liftIO $ atomically do + writeTVar downFail 0 + modifyTVar downBlk succ + + -- trace $ "exit download thread" <+> pretty p <+> pretty h + liftIO $ atomically $ writeTQueue released p + + next + withDownload env0 do mapM_ processBlock blks proposed <- asks (view blockProposed) - forever do + void $ liftIO $ async $ forever do pause @'Seconds 20 - debug "block download loop. does not do anything" + -- debug "block download loop. does not do anything" liftIO $ Cache.purgeExpired proposed @@ -551,14 +556,14 @@ postponedLoop env0 = do void $ liftIO $ async $ withPeerM e $ withDownload env0 do forever do - pause @'Seconds 20 + pause @'Seconds 30 trace "UNPOSTPONE LOOP" po <- asks (view blockPostponedTo) >>= liftIO . Cache.toList for_ po $ \(h, _, expired) -> do when (isJust expired) do unpostponeBlock h -peerDownloadLoop :: forall e m . ( MyPeer e +doBlockSizeRequest :: forall e m . ( MyPeer e , Sessions e (KnownPeer e) m , Request e (BlockInfo e) m , EventListener e (BlockInfo e) m @@ -566,135 +571,37 @@ peerDownloadLoop :: forall e m . ( MyPeer e , HasPeerLocator e m , IsPeerAddr e m , m ~ PeerM e IO - ) => Peer e -> BlockDownloadM e m () -peerDownloadLoop peer = do + ) + => Peer e + -> Hash HbSync + -> BlockDownloadM e m (Either () (Maybe Integer)) - pe <- lift ask - e <- ask +doBlockSizeRequest peer h = do brains <- asks (view downloadBrains) - let doBlockSizeRequest h = do - q <- liftIO newTQueueIO - lift do - subscribe @e (BlockSizeEventKey h) $ \case - BlockSizeEvent (p1,_,s) -> do - when (p1 == peer) do - liftIO $ atomically $ writeTQueue q (Just s) - onBlockSize brains peer h s + q <- liftIO newTQueueIO + lift do + subscribe @e (BlockSizeEventKey h) $ \case + BlockSizeEvent (p1,_,s) -> do + when (p1 == peer) do + liftIO $ atomically $ writeTQueue q (Just s) + onBlockSize brains peer h s - NoBlockEvent{} -> do - -- TODO: ban-block-for-some-seconds - liftIO $ atomically $ writeTQueue q Nothing - pure () + NoBlockEvent{} -> do + -- TODO: ban-block-for-some-seconds + liftIO $ atomically $ writeTQueue q Nothing + pure () - request peer (GetBlockSize @e h) + request peer (GetBlockSize @e h) - liftIO $ race ( pause defBlockInfoTimeout ) - ( atomically $ do - s <- readTQueue q - void $ flushTQueue q - pure s - ) + liftIO $ race ( pause defBlockInfoTimeout ) + ( atomically $ do + s <- readTQueue q + void $ flushTQueue q + pure s + ) - let tryDownload pinfo h size = do - - trace $ "tryDownload" <+> pretty peer <+> pretty h - - here <- isBlockHereCached h - - if here then do - trace $ pretty peer <+> "block" <+> pretty h <+> "is already here" - processBlock h - else do - lift $ onBlockDownloadAttempt brains peer h - let downFail = view peerDownloadFail pinfo - let downBlk = view peerDownloadedBlk pinfo - - r <- liftIO $ race ( pause defBlockWaitMax ) - $ withPeerM pe - $ withDownload e - $ downloadFromWithPeer peer size h - case r of - Left{} -> do - trace $ "FAIL" <+> pretty peer <+> "download block" <+> pretty h - liftIO $ atomically $ modifyTVar downFail succ - failedDownload peer h - - Right{} -> do - trace $ "OK" <+> pretty peer <+> "dowloaded block" <+> pretty h - onBlockDownloaded brains peer h - processBlock h - liftIO $ atomically do - writeTVar downFail 0 - modifyTVar downBlk succ - - let warnExit = warn $ "peer loop exit" <+> pretty peer - -- let stopLoop = none - - idle <- liftIO $ newTVarIO 0 - - fix \next -> do - - let thenNext m = m >> next - - npi <- newPeerInfo - - auth' <- lift $ find (KnownPeerKey peer) id - pinfo <- lift $ fetch True npi (PeerInfoKey peer) id - - let mbauth = (,) <$> auth' <*> pure pinfo - - let noAuth = do - let authNone = if isNothing auth' then "noauth" else "" - warn ( "lost peer auth" <+> pretty peer <+> pretty authNone ) - warnExit - - maybe1 mbauth noAuth $ \_ -> do - - pt' <- getPeerThread peer - - maybe1 pt' warnExit $ \pt -> do - - liftIO $ atomically $ modifyTVar (view peerBlocksWip pt) (max 0 . pred) - - mbh <- getBlockForDownload peer - - case mbh of - Nothing -> thenNext do - idleNum <- liftIO $ atomically $ stateTVar idle $ \x -> (x, succ x) - - when (idleNum > 5) do - trace $ "peer IDLE" <+> pretty peer - liftIO $ atomically $ writeTVar idle 0 - x <- lift $ randomRIO (2.85, 10.47) - pause @'Seconds (realToFrac x) - - Just h -> thenNext do - - liftIO $ atomically $ writeTVar idle 0 - - trace $ "start download block" <+> pretty peer <+> pretty h - - mbSize2 <- blockSize brains peer h - - case mbSize2 of - Just size -> do - trace $ "HAS SIZE:" <+> pretty peer <+> pretty h <+> pretty size - tryDownload pinfo h size - - Nothing -> do - r <- doBlockSizeRequest h - case r of - (Right (Just s)) -> do - tryDownload pinfo h s - pure () - - _ -> pure () - - - warnExit - void $ delPeerThreadData peer -- NOTE: this is an adapter for a ResponseM monad -- because response is working in ResponseM monad (ha!) diff --git a/hbs2-peer/app/Brains.hs b/hbs2-peer/app/Brains.hs index 00f0ac22..dd04a454 100644 --- a/hbs2-peer/app/Brains.hs +++ b/hbs2-peer/app/Brains.hs @@ -164,11 +164,11 @@ instance (Hashable (Peer e), Pretty (Peer e)) => HasBrains e (BasicBrains e) whe commitNow b True -- FIXME: wait-till-really-commited sz <- liftIO $ selectBlockSize b p h - trace $ "BRAINS: onBlockSize" <+> pretty p <+> pretty h <+> pretty sz + -- trace $ "BRAINS: onBlockSize" <+> pretty p <+> pretty h <+> pretty sz pure () onBlockDownloadAttempt b peer h = do - -- trace "BRAINS: onBlockDownloadAttempt" + -- trace $ "BRAINS: onBlockDownloadAttempt" <+> pretty peer <+> pretty h noPeers <- liftIO $ readTVarIO (view brainsPeers b) <&> List.null unless noPeers do let cache = view brainsExpire b @@ -182,7 +182,7 @@ instance (Hashable (Peer e), Pretty (Peer e)) => HasBrains e (BasicBrains e) whe updateOP b $ insertPeer b h p onBlockPostponed b h = do - trace $ "BRAINS: onBlockPostponed" <+> pretty h + -- trace $ "BRAINS: onBlockPostponed" <+> pretty h cleanupPostponed b h claimBlockCameFrom b f t = do @@ -204,6 +204,8 @@ instance (Hashable (Peer e), Pretty (Peer e)) => HasBrains e (BasicBrains e) whe shouldDownloadBlock b p h = do noPeers <- liftIO $ readTVarIO (view brainsPeers b) <&> List.null downs <- liftIO $ readTVarIO (view brainsPostponeDown b) + let doo = HashMap.lookup (p,h) downs & fromMaybe 0 & (<2) + -- trace $ "shouldDownloadBlock" <+> pretty noPeers <+> pretty doo pure $ noPeers || (HashMap.lookup (p,h) downs & fromMaybe 0 & (<2)) advisePeersForBlock b h = do diff --git a/hbs2-peer/app/PeerInfo.hs b/hbs2-peer/app/PeerInfo.hs index caeff75a..9457a3f1 100644 --- a/hbs2-peer/app/PeerInfo.hs +++ b/hbs2-peer/app/PeerInfo.hs @@ -17,18 +17,17 @@ import HBS2.System.Logger.Simple import PeerConfig -import Data.Maybe -import Data.Set qualified as Set -import Data.List qualified as List -import Data.Foldable hiding (find) -import Lens.Micro.Platform -import Control.Concurrent.STM.TVar +import Control.Concurrent.Async import Control.Concurrent.STM import Control.Monad -import Control.Concurrent.Async -import System.Random.Shuffle +import Control.Monad.Reader +import Data.Foldable hiding (find) import Data.IntSet (IntSet) -import Prettyprinter +import Data.List qualified as List +import Data.Maybe +import Lens.Micro.Platform +import Numeric (showGFloat) +import System.Random.Shuffle data PeerPingIntervalKey @@ -51,6 +50,7 @@ data PeerInfo e = , _peerPingFailed :: TVar Int , _peerDownloadedBlk :: TVar Int , _peerDownloadFail :: TVar Int + , _peerDownloadMiss :: TVar Int , _peerUsefulness :: TVar Double , _peerRTTBuffer :: TVar [Integer] -- ^ Contains a list of the last few round-trip time (RTT) values, measured in nanoseconds. -- Acts like a circular buffer. @@ -105,6 +105,7 @@ newPeerInfo = liftIO do <*> newTVarIO 0 <*> newTVarIO 0 <*> newTVarIO 0 + <*> newTVarIO 0 <*> newTVarIO [] type instance SessionData e (PeerInfo e) = PeerInfo e @@ -160,10 +161,15 @@ peerPingLoop :: forall e m . ( HasPeerLocator e m , EventListener e (PeerHandshake e) m , Pretty (Peer e) , MonadIO m + , m ~ PeerM e IO ) => PeerConfig -> m () peerPingLoop cfg = do + e <- ask + + pl <- getPeerLocator @e + let pingTime = cfgValue @PeerPingIntervalKey cfg & fromMaybe 30 & realToFrac @@ -178,6 +184,38 @@ peerPingLoop cfg = do -- subscribe @e AnyKnownPeerEventKey $ \(KnownPeerEvent p _) -> do -- liftIO $ atomically $ writeTQueue wake [p] + + -- TODO: peer info loop + void $ liftIO $ async $ forever $ withPeerM e $ do + pause @'Seconds 10 + pee <- knownPeers @e pl + + npi <- newPeerInfo + + debug $ "known peers" <+> pretty pee + + for_ pee $ \p -> do + pinfo <- fetch True npi (PeerInfoKey p) id + burst <- liftIO $ readTVarIO (view peerBurst pinfo) + buM <- liftIO $ readTVarIO (view peerBurstMax pinfo) + errors <- liftIO $ readTVarIO (view peerErrorsPerSec pinfo) + downFails <- liftIO $ readTVarIO (view peerDownloadFail pinfo) + downMiss <- liftIO $ readTVarIO (view peerDownloadMiss pinfo) + down <- liftIO $ readTVarIO (view peerDownloadedBlk pinfo) + rtt <- liftIO $ medianPeerRTT pinfo <&> fmap realToFrac + + let rttMs = (/1e6) <$> rtt <&> (\x -> showGFloat (Just 2) x "") <&> (<> "ms") + + notice $ "peer" <+> pretty p <+> "burst:" <+> pretty burst + <+> "burst-max:" <+> pretty buM + <+> "errors:" <+> pretty (downFails + errors) + <+> "down:" <+> pretty down + <+> "miss:" <+> pretty downMiss + <+> "rtt:" <+> pretty rttMs + pure () + + + forever do -- FIXME: defaults @@ -192,7 +230,6 @@ peerPingLoop cfg = do debug "peerPingLoop" - pl <- getPeerLocator @e pips <- knownPeers @e pl <&> (<> sas) <&> List.nub for_ pips $ \p -> do diff --git a/hbs2-peer/app/PeerTypes.hs b/hbs2-peer/app/PeerTypes.hs index 3790aebf..f22d9d3d 100644 --- a/hbs2-peer/app/PeerTypes.hs +++ b/hbs2-peer/app/PeerTypes.hs @@ -30,13 +30,13 @@ import Control.Monad.Reader import Data.ByteString.Lazy (ByteString) import Data.Cache (Cache) import Data.Cache qualified as Cache +import Data.HashSet (HashSet) import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict qualified as HashMap import Data.Maybe import Lens.Micro.Platform import Data.Hashable import Type.Reflection -import Numeric (showGFloat) type MyPeer e = ( Eq (Peer e) @@ -135,22 +135,10 @@ data BlockState = makeLenses 'BlockState -newtype PeerTask e = DoDownload (Hash HbSync) - deriving newtype (Pretty) - -data PeerThread e = - PeerThread - { _peerThreadAsync :: Async () - , _peerThreadMailbox :: TQueue (PeerTask e) - , _peerBlocksWip :: TVar Int - } - -makeLenses 'PeerThread data DownloadEnv e = DownloadEnv { _blockInQ :: TVar (HashMap (Hash HbSync) ()) - , _peerThreads :: TVar (HashMap (Peer e) (PeerThread e)) , _blockPostponed :: TVar (HashMap (Hash HbSync) () ) , _blockPostponedTo :: Cache (Hash HbSync) () , _blockDelayTo :: TQueue (Hash HbSync) @@ -164,7 +152,6 @@ makeLenses 'DownloadEnv newDownloadEnv :: (MonadIO m, MyPeer e, HasBrains e brains) => brains -> m (DownloadEnv e) newDownloadEnv brains = liftIO do DownloadEnv <$> newTVarIO mempty - <*> newTVarIO mempty <*> newTVarIO mempty <*> Cache.newCache (Just defBlockBanTime) <*> newTQueueIO @@ -219,11 +206,7 @@ addDownload mbh h = do removeFromWip h else do maybe1 mbh none $ \hp -> claimBlockCameFrom @e brains hp h - postpone <- shouldPostponeBlock @e brains h - if postpone then do - postponeBlock h - else do - liftIO $ atomically $ modifyTVar tinq $ HashMap.insert h () + liftIO $ atomically $ modifyTVar tinq $ HashMap.insert h () postponedNum :: forall e m . (MyPeer e, MonadIO m) => BlockDownloadM e m Int postponedNum = do @@ -249,11 +232,16 @@ postponeBlock h = do tto <- asks (view blockPostponedTo) tinq <- asks (view blockInQ) + liftIO $ do - liftIO $ atomically $ modifyTVar tinq $ HashMap.delete h - already <- atomically $ readTVar po <&> HashMap.member h - unless already do - atomically $ modifyTVar po (HashMap.insert h ()) + postponed <- atomically $ do + already <- readTVar po <&> HashMap.member h + unless already do + modifyTVar tinq $ HashMap.delete h + modifyTVar po (HashMap.insert h ()) + pure $ not already + + when postponed do Cache.insert tto h () onBlockPostponed @e brains h @@ -276,87 +264,6 @@ removeFromWip h = do liftIO $ atomically $ do modifyTVar' tinq (HashMap.delete h) -hasPeerThread :: (MyPeer e, MonadIO m) => Peer e -> BlockDownloadM e m Bool -hasPeerThread p = do - threads <- asks (view peerThreads) - liftIO $ readTVarIO threads <&> HashMap.member p - -getPeerThreads :: (MyPeer e, MonadIO m) => BlockDownloadM e m [(Peer e, PeerThread e)] -getPeerThreads = do - threads <- asks (view peerThreads) - liftIO $ atomically $ readTVar threads <&> HashMap.toList - -getPeerThread :: (MyPeer e, MonadIO m) => Peer e -> BlockDownloadM e m (Maybe (PeerThread e)) -getPeerThread p = do - threads <- asks (view peerThreads) - liftIO $ atomically $ readTVar threads <&> HashMap.lookup p - -getPeerTask :: (MyPeer e, MonadIO m) => Peer e -> BlockDownloadM e m (Maybe (PeerTask e)) -getPeerTask p = do - threads <- asks (view peerThreads) - pt' <- liftIO $ atomically $ readTVar threads <&> HashMap.lookup p - maybe1 pt' (pure Nothing) $ \pt -> do - liftIO $ atomically $ readTQueue (view peerThreadMailbox pt) <&> Just - -addPeerTask :: (MyPeer e, MonadIO m) - => Peer e - -> PeerTask e - -> BlockDownloadM e m () -addPeerTask p t = do - trace $ "ADD-PEER-TASK" <+> pretty p <+> pretty t - threads <- asks (view peerThreads) - liftIO $ atomically $ do - pt' <- readTVar threads <&> HashMap.lookup p - maybe1 pt' none $ \pt -> do - writeTQueue (view peerThreadMailbox pt) t - modifyTVar (view peerBlocksWip pt) succ - -delPeerThreadData :: (MyPeer e, MonadIO m) => Peer e -> BlockDownloadM e m (Maybe (PeerThread e)) -delPeerThreadData p = do - debug $ "delPeerThreadData" <+> pretty p - threads <- asks (view peerThreads) - liftIO $ atomically $ stateTVar threads (\x -> let t = HashMap.lookup p x - in (t, HashMap.delete p x)) - -killPeerThread :: (MyPeer e, MonadIO m) => Peer e -> BlockDownloadM e m () -killPeerThread p = do - debug $ "delPeerThread" <+> pretty p - pt <- delPeerThreadData p - maybe1 pt (pure ()) $ liftIO . cancel . view peerThreadAsync - -newPeerThread :: ( MyPeer e - , MonadIO m - , Sessions e (PeerInfo e) m - -- , Sessions e (PeerInfo e) (BlockDownloadM e m) - ) - => Peer e - -> Async () - -> BlockDownloadM e m () - -newPeerThread p m = do - - npi <- newPeerInfo - void $ lift $ fetch True npi (PeerInfoKey p) id - - q <- liftIO newTQueueIO - tnum <- liftIO $ newTVarIO 0 - let pt = PeerThread m q tnum - threads <- asks (view peerThreads) - liftIO $ atomically $ modifyTVar threads $ HashMap.insert p pt - -getPeerTaskWip :: ( MyPeer e - , MonadIO m - -- , Sessions e (PeerInfo e) m - -- , Sessions e (PeerInfo e) (BlockDownloadM e m) - ) - => Peer e - -> BlockDownloadM e m Int -getPeerTaskWip p = do - threads <- asks (view peerThreads) - pt' <- liftIO $ atomically $ readTVar threads <&> HashMap.lookup p - maybe1 pt' (pure 0) $ \pt -> do - liftIO $ readTVarIO (view peerBlocksWip pt) - failedDownload :: forall e m . ( MyPeer e , MonadIO m , HasPeer e @@ -386,4 +293,19 @@ forKnownPeers m = do pd' <- find (KnownPeerKey p) id maybe1 pd' (pure ()) (m p) +getKnownPeers :: forall e m . ( MonadIO m + , HasPeerLocator e m + , Sessions e (KnownPeer e) m + , HasPeer e + ) + => m [Peer e] + +getKnownPeers = do + pl <- getPeerLocator @e + pips <- knownPeers @e pl + r <- forM pips $ \p -> do + pd' <- find (KnownPeerKey p) id + maybe1 pd' (pure mempty) (const $ pure [p]) + pure $ mconcat r +