diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index 6467429c..12924058 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -422,8 +422,8 @@ runPeerM env f = do void $ liftIO $ stopPipeline de liftIO $ mapM_ cancel (as <> [sw]) -withPeerM :: MonadIO m => PeerEnv e -> PeerM e m a -> m () -withPeerM env action = void $ runReaderT (fromPeerM action) env +withPeerM :: MonadIO m => PeerEnv e -> PeerM e m a -> m a +withPeerM env action = runReaderT (fromPeerM action) env runProto :: forall e m . ( MonadIO m , HasOwnPeer e m diff --git a/hbs2-core/lib/HBS2/Defaults.hs b/hbs2-core/lib/HBS2/Defaults.hs index f249c312..077f3ab3 100644 --- a/hbs2-core/lib/HBS2/Defaults.hs +++ b/hbs2-core/lib/HBS2/Defaults.hs @@ -10,13 +10,13 @@ defMaxDatagramRPC :: Int defMaxDatagramRPC = 4096 defMessageQueueSize :: Integral a => a -defMessageQueueSize = 65536 +defMessageQueueSize = 65536*10 defBurst :: Integral a => a -defBurst = 4 +defBurst = 8 defBurstMax :: Integral a => a -defBurstMax = 32 +defBurstMax = 64 -- defChunkSize :: Integer defChunkSize :: Integral a => a @@ -32,7 +32,7 @@ defPipelineSize :: Int defPipelineSize = 16000 defBlockDownloadQ :: Integral a => a -defBlockDownloadQ = 65536 +defBlockDownloadQ = 65536*10 defBlockDownloadThreshold :: Integral a => a defBlockDownloadThreshold = 2 @@ -61,11 +61,11 @@ defBlockInfoTimeout = 2 -- how much time wait for block from peer? defBlockWaitMax :: Timeout 'Seconds -defBlockWaitMax = 1.5 :: Timeout 'Seconds +defBlockWaitMax = 1 :: Timeout 'Seconds -- how much time wait for block from peer? defChunkWaitMax :: Timeout 'Seconds -defChunkWaitMax = 0.15 :: Timeout 'Seconds +defChunkWaitMax = 0.5 :: Timeout 'Seconds defSweepTimeout :: Timeout 'Seconds defSweepTimeout = 30 -- FIXME: only for debug! @@ -76,5 +76,16 @@ defPeerAnnounceTime = 120 defPexMaxPeers :: Int defPexMaxPeers = 50 +defDownloadFails :: Int +defDownloadFails = 100 + +-- TODO: peer-does-not-have-a-block-ok +-- Это нормально, когда у пира нет блока. +-- У него может не быть каких-то блоков, +-- а какие-то могут быть. Нужно более умный +-- алгоритм, чем бан пира за отсутствие блока. + +defUsefulLimit :: Double +defUsefulLimit = 0.25 diff --git a/hbs2-core/lib/HBS2/Net/Messaging/UDP.hs b/hbs2-core/lib/HBS2/Net/Messaging/UDP.hs index f81b7124..4bcd3cee 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/UDP.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/UDP.hs @@ -70,7 +70,7 @@ instance MonadIO m => IsPeerAddr UDP m where data MessagingUDP = MessagingUDP { listenAddr :: SockAddr - , sink :: TBQueue (From UDP, ByteString) + , sink :: TQueue (From UDP, ByteString) , inbox :: TQueue (To UDP, ByteString) , sock :: TVar Socket , mcast :: Bool @@ -91,7 +91,7 @@ newMessagingUDPMulticast s = runMaybeT $ do a <- liftIO $ getSocketName so - liftIO $ MessagingUDP a <$> Q.newTBQueueIO defMessageQueueSize + liftIO $ MessagingUDP a <$> Q0.newTQueueIO <*> Q0.newTQueueIO <*> newTVarIO so <*> pure True @@ -109,7 +109,7 @@ newMessagingUDP reuse saddr = when reuse $ do liftIO $ setSocketOption so ReuseAddr 1 - liftIO $ MessagingUDP a <$> Q.newTBQueueIO defMessageQueueSize + liftIO $ MessagingUDP a <$> Q0.newTQueueIO <*> Q0.newTQueueIO <*> newTVarIO so <*> pure False @@ -119,7 +119,7 @@ newMessagingUDP reuse saddr = so <- liftIO $ socket AF_INET Datagram defaultProtocol sa <- liftIO $ getSocketName so - liftIO $ Just <$> ( MessagingUDP sa <$> Q.newTBQueueIO defMessageQueueSize + liftIO $ Just <$> ( MessagingUDP sa <$> Q0.newTQueueIO <*> Q0.newTQueueIO <*> newTVarIO so <*> pure False @@ -143,7 +143,7 @@ udpWorker env tso = do -- pause ( 10 :: Timeout 'Seconds ) (msg, from) <- recvFrom so defMaxDatagram -- liftIO $ print $ "recv:" <+> pretty (BS.length msg) - liftIO $ atomically $ Q.writeTBQueue (sink env) (From (PeerUDP from), LBS.fromStrict msg) + liftIO $ atomically $ Q0.writeTQueue (sink env) (From (PeerUDP from), LBS.fromStrict msg) sndLoop <- async $ forever $ do pause ( 10 :: Timeout 'Seconds ) @@ -182,5 +182,5 @@ instance Messaging MessagingUDP UDP ByteString where -- (msg, from) <- recvFrom so defMaxDatagram -- pure [(From (PeerUDP from), LBS.fromStrict msg)] - liftIO $ atomically $ Q.readTBQueue (sink bus) <&> L.singleton + liftIO $ atomically $ Q0.readTQueue (sink bus) <&> L.singleton diff --git a/hbs2-core/lib/HBS2/Net/Proto/Definition.hs b/hbs2-core/lib/HBS2/Net/Proto/Definition.hs index 52a21c0c..9abce093 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Definition.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Definition.hs @@ -56,7 +56,7 @@ instance HasProtocol UDP (BlockInfo UDP) where -- FIXME: requestMinPeriod-breaks-fast-block-download -- - requestPeriodLim = ReqLimPerMessage 10 + -- requestPeriodLim = ReqLimPerMessage 0.5 instance HasProtocol UDP (BlockChunks UDP) where type instance ProtocolId (BlockChunks UDP) = 2 @@ -79,7 +79,7 @@ instance HasProtocol UDP (PeerHandshake UDP) where decode = either (const Nothing) Just . deserialiseOrFail encode = serialise - requestPeriodLim = ReqLimPerProto 10 + requestPeriodLim = ReqLimPerProto 2 instance HasProtocol UDP (PeerAnnounce UDP) where type instance ProtocolId (PeerAnnounce UDP) = 5 diff --git a/hbs2-peer/app/BlockDownload.hs b/hbs2-peer/app/BlockDownload.hs index 0b37d67c..365911f9 100644 --- a/hbs2-peer/app/BlockDownload.hs +++ b/hbs2-peer/app/BlockDownload.hs @@ -1,5 +1,6 @@ {-# OPTIONS_GHC -fno-warn-orphans #-} {-# Language UndecidableInstances #-} +{-# Language MultiWayIf #-} module BlockDownload where import HBS2.Actors.Peer @@ -48,49 +49,9 @@ import System.Random.Shuffle import Type.Reflection +none :: forall m . Monad m => m () +none = pure () -withFreePeer :: forall e m . - ( MyPeer e - , MonadIO m - , Sessions e (KnownPeer e) m - ) - => Peer e - -> BlockDownloadM e m () - -> BlockDownloadM e m () - -> BlockDownloadM e m () - -withFreePeer p n m = do - busy <- asks (view peerBusy) - - avail <- liftIO $ atomically - $ stateTVar busy $ - \s -> case HashMap.lookup p s of - Nothing -> (True, HashMap.insert p () s) - Just{} -> (False, s) - - auth <- lift $ find (KnownPeerKey p) id <&> isJust - - unless auth do - debug $ "peer " <+> pretty p <+> "not authorized (yet?)" - - if not (avail && auth) - then n - else do - r <- m - liftIO $ atomically $ modifyTVar busy $ HashMap.delete p - pure r - --- NOTE: dangerous! if called in --- wrong place/wrong time, --- it may cause a drastical --- download speed degradation - -dismissPeer :: (MyPeer e, MonadIO m) - => Peer e - -> BlockDownloadM e m () -dismissPeer p = do - busy <- asks (view peerBusy) - liftIO $ atomically $ modifyTVar busy $ HashMap.delete p getBlockForDownload :: MonadIO m => BlockDownloadM e m (Hash HbSync) getBlockForDownload = do @@ -212,20 +173,22 @@ processBlock h = do -- So make sure that this peer really answered to -- GetBlockSize request -downloadFromWithPeer :: forall e m . ( MyPeer e - , MonadIO m - , Request e (BlockInfo e) m - , Request e (BlockChunks e) m - , MonadReader (PeerEnv e ) m - , PeerMessaging e - , HasProtocol e (BlockInfo e) - , EventListener e (BlockInfo e) m - , EventListener e (BlockChunks e) m - , Sessions e (BlockChunks e) m - , Sessions e (PeerInfo e) m - , Block ByteString ~ ByteString - , HasStorage m - ) +type DownloadFromPeerStuff e m = ( MyPeer e + , MonadIO m + , Request e (BlockInfo e) m + , Request e (BlockChunks e) m + , MonadReader (PeerEnv e ) m + , PeerMessaging e + , HasProtocol e (BlockInfo e) + , EventListener e (BlockInfo e) m + , EventListener e (BlockChunks e) m + , Sessions e (BlockChunks e) m + , Sessions e (PeerInfo e) m + , Block ByteString ~ ByteString + , HasStorage m + ) + +downloadFromWithPeer :: forall e m . DownloadFromPeerStuff e m => Peer e -> Integer -> Hash HbSync @@ -258,9 +221,9 @@ downloadFromWithPeer peer thisBkSize h = do let bursts = calcBursts burstSize chunkNums - -- debug $ "bursts: " <+> pretty bursts + let w = max defChunkWaitMax $ realToFrac (toNanoSeconds defBlockWaitMax) / realToFrac (length bursts) / 1e9 * 2 - let burstTime = min defBlockWaitMax (0.8 * realToFrac burstSize * defChunkWaitMax) + let burstTime = realToFrac w :: Timeout 'Seconds -- defChunkWaitMax -- min defBlockWaitMax (0.8 * realToFrac burstSize * defChunkWaitMax) r <- liftIO $ newTVarIO (mempty :: IntMap ByteString) rq <- liftIO newTQueueIO @@ -369,25 +332,26 @@ updatePeerInfo onError pinfo = do t0 <- readTVar (view peerLastWatched pinfo) down <- readTVar (view peerDownloaded pinfo) downLast <- readTVar (view peerDownloadedLast pinfo) + -- downFail <- readTVar (view peerDownloadFail pinfo) + -- downBlk <- readTVar (view peerDownloadedBlk pinfo) let dE = realToFrac $ max 0 (errs - errsLast) let dT = realToFrac (max 1 (toNanoSecs t1 - toNanoSecs t0)) / 1e9 let eps = floor (dE / dT) - let win = min 10 $ defBurstMax - defBurst - 2 + let win = min 10 $ defBurstMax - defBurst - 2 when (down - downLast > 0 || onError) do - (bu1, bus) <- if eps == 0 then do + (bu1, bus) <- if eps == 0 && not onError then do let bmm = fromMaybe defBurstMax buMax - let buN = min bmm (ceiling $ (realToFrac bu * 1.05)) + let buN = min bmm (ceiling (realToFrac bu * 1.05)) pure (buN, trimUp win $ IntSet.insert buN buSet) else do - let buM = headMay $ drop 1 $ IntSet.toDescList buSet + let buM = headMay $ drop 2 $ IntSet.toDescList buSet writeTVar (view peerBurstMax pinfo) buM - -- let s = IntSet.size buSet - let buN = headDef defBurst $ drop 2 $ IntSet.toDescList buSet + let buN = headDef defBurst $ drop 8 $ IntSet.toDescList buSet pure (buN, trimDown win $ IntSet.insert buN buSet) @@ -397,7 +361,7 @@ updatePeerInfo onError pinfo = do writeTVar (view peerBurst pinfo) bu1 writeTVar (view peerBurstSet pinfo) bus writeTVar (view peerDownloadedLast pinfo) down - + -- writeTVar (view peerUsefulness pinfo) usefulN where trimUp n s | IntSet.size s >= n = IntSet.deleteMin s @@ -438,6 +402,29 @@ blockDownloadLoop env0 = do pl <- getPeerLocator @e + void $ liftIO $ async $ forever $ withPeerM e do + pause @'Seconds 5 + debug "I'm a peer maintaining thread" + + pee <- knownPeers @e pl + + for_ pee $ \p -> do + pinfo' <- find (PeerInfoKey p) id + maybe1 pinfo' none $ \pinfo -> do + + fails <- liftIO $ readTVarIO (view peerDownloadFail pinfo) + + when (fails >= defDownloadFails) do + warn $ "peer" <+> pretty p <+> "has too many failures:" <+> pretty fails + + here <- withDownload env0 $ hasPeerThread p + + unless here do + debug $ "peer" <+> pretty p <+> "does not have a thread" + + runPeer <- liftIO $ async $ liftIO (withPeerM e $ withDownload env0 (peerDownloadLoop p)) + withDownload env0 $ newPeerThread p runPeer + void $ liftIO $ async $ forever $ withPeerM e do pause @'Seconds 30 @@ -450,12 +437,11 @@ blockDownloadLoop env0 = do void $ liftIO $ async $ forever $ withPeerM e do - pause @'Seconds 1 + pause @'Seconds 2 pee <- knownPeers @e pl npi <- newPeerInfo - for_ pee $ \p -> do pinfo <- fetch True npi (PeerInfoKey p) id updatePeerInfo False pinfo @@ -476,9 +462,14 @@ blockDownloadLoop env0 = do burst <- liftIO $ readTVarIO (view peerBurst pinfo) buM <- liftIO $ readTVarIO (view peerBurstMax pinfo) errors <- liftIO $ readTVarIO (view peerErrorsPerSec pinfo) + downFails <- liftIO $ readTVarIO (view peerDownloadFail pinfo) + down <- liftIO $ readTVarIO (view peerDownloadedBlk pinfo) + useful <- liftIO $ readTVarIO (view peerUsefulness pinfo) debug $ "peer" <+> pretty p <+> "burst:" <+> pretty burst <+> "burst-max:" <+> pretty buM - <+> "errors:" <+> pretty errors + <+> "errors:" <+> pretty (downFails + errors) + <+> "down:" <+> pretty down + <+> "useful:" <+> pretty useful pure () void $ liftIO $ async $ forever $ withPeerM e $ withDownload env0 do @@ -506,57 +497,89 @@ blockDownloadLoop env0 = do env <- ask - let again h = do - -- debug $ "retrying block: " <+> pretty h - withPeerM e $ withDownload env (addDownload h) - mapM_ processBlock blks fix \next -> do - - withBlockForDownload $ \h -> do - - here <- liftIO $ hasBlock stor h <&> isJust - - unless here do - - peers <- getPeersForBlock h - - when (null peers) $ do - - lift do -- in PeerM - subscribe (BlockSizeEventKey h) $ \(BlockSizeEvent (p1,hx,s)) -> do - withDownload env (addBlockInfo p1 hx s) - - pips <- knownPeers @e pl - for_ pips $ \pip -> do - auth <- find (KnownPeerKey pip) id <&> isJust - - when auth $ request pip (GetBlockSize @e h) -- FIXME: request only known peers - -- move this to peer locator - - p0 <- headMay <$> liftIO (shuffleM peers) -- FIXME: random choice to work faster - - let withAllShit f = withPeerM e $ withDownload env f - - maybe1 p0 (again h) $ \(p1,size) -> do - - st <- getBlockState h - setBlockState h (set bsState Downloading st) - - withFreePeer p1 (again h) $ - liftIO do - re <- race ( pause defBlockWaitMax ) $ - withAllShit $ downloadFromWithPeer p1 size h - - case re of - Left{} -> withAllShit (again h) - Right{} -> withAllShit (processBlock h) - + pause @'Seconds 30 + debug "I'm a download loop. I don't do anything anymore" next +peerDownloadLoop :: forall e m . ( MyPeer e + , Sessions e (KnownPeer e) m + , Request e (BlockInfo e) m + , EventListener e (BlockInfo e) m + , DownloadFromPeerStuff e m + , m ~ PeerM e IO + ) => Peer e -> BlockDownloadM e m () +peerDownloadLoop peer = forever do + + sto <- lift getStorage + + auth <- lift $ find (KnownPeerKey peer) id <&> isJust + pinfo' <- lift $ find (PeerInfoKey peer) id -- (view peerDownloadFail) + + maybe1 pinfo' none $ \pinfo -> do + + let downFail = view peerDownloadFail pinfo + let downBlk = view peerDownloadedBlk pinfo + failNum <- liftIO $ readTVarIO downFail + + -- FIXME: failNum-to-defaults + let notFailed = failNum < defDownloadFails + + -- FIXME: better-avoiding-busyloop + -- unless notFailed do + -- pause @'Seconds 1 + + when (failNum > 5) do + pause @'Seconds defBlockWaitMax + + when auth do + + withBlockForDownload $ \h -> do + e <- lift ask + ee <- ask + st <- getBlockState h + setBlockState h (set bsState Downloading st) + + r1 <- liftIO $ race ( pause defBlockInfoTimeout ) $ withPeerM e do + blksq <- liftIO newTQueueIO + subscribe @e (BlockSizeEventKey h) $ \(BlockSizeEvent (_,_,s)) -> do + liftIO $ atomically $ writeTQueue blksq s + + request peer (GetBlockSize @e h) + + liftIO $ atomically $ readTQueue blksq + + case r1 of + Left{} -> do + liftIO $ atomically $ modifyTVar downFail succ + addDownload h + + Right size -> do + r2 <- liftIO $ race ( pause defBlockWaitMax ) + $ withPeerM e + $ withDownload ee + $ downloadFromWithPeer peer size h + + case r2 of + Left{} -> do + liftIO $ atomically $ modifyTVar downFail succ + addDownload h + +-- Right Nothing -> do +-- liftIO $ atomically $ modifyTVar downFail succ +-- addDownload h + + Right{} -> do + processBlock h + liftIO $ atomically do + writeTVar downFail 0 + modifyTVar downBlk succ + + pure () -- NOTE: this is an adapter for a ResponseM monad -- because response is working in ResponseM monad (ha!) @@ -582,6 +605,12 @@ mkAdapter = do , blkAcceptChunk = \(c,p,h,n,bs) -> void $ runMaybeT $ do let cKey = DownloadSessionKey (p,c) + + dodo <- lift $ find cKey (view sBlockChunks) + + unless (isJust dodo) $ do + debug $ "session lost for peer !" <+> pretty p + dwnld <- MaybeT $ find cKey (view sBlockChunks) liftIO $ atomically $ writeTQueue dwnld (n, bs) } diff --git a/hbs2-peer/app/PeerInfo.hs b/hbs2-peer/app/PeerInfo.hs index 24d43fa2..021c3839 100644 --- a/hbs2-peer/app/PeerInfo.hs +++ b/hbs2-peer/app/PeerInfo.hs @@ -40,6 +40,9 @@ data PeerInfo e = , _peerDownloaded :: TVar Int , _peerDownloadedLast :: TVar Int , _peerPingFailed :: TVar Int + , _peerDownloadedBlk :: TVar Int + , _peerDownloadFail :: TVar Int + , _peerUsefulness :: TVar Double } deriving stock (Generic,Typeable) @@ -58,6 +61,9 @@ newPeerInfo = liftIO do <*> newTVarIO 0 <*> newTVarIO 0 <*> newTVarIO 0 + <*> newTVarIO 0 + <*> newTVarIO 0 + <*> newTVarIO 0 type instance SessionData e (PeerInfo e) = PeerInfo e @@ -98,7 +104,7 @@ pexLoop = do for_ peers sendPeerExchangeGet - pause @'Seconds 60 -- FIXME: defaults + pause @'Seconds 60 -- FIXME: defaults peerPingLoop :: forall e m . ( HasPeerLocator e m , HasPeer e @@ -140,13 +146,15 @@ peerPingLoop = do for_ pips $ \p -> do npi <- newPeerInfo pfails <- fetch True npi (PeerInfoKey p) (view peerPingFailed) + pdownfails <- fetch True npi (PeerInfoKey p) (view peerDownloadFail) + liftIO $ atomically $ modifyTVar pfails succ sendPing @e p - pause @'Seconds 1 -- NOTE: it's okay? fnum <- liftIO $ readTVarIO pfails + fdown <- liftIO $ readTVarIO pdownfails - when (fnum > 3) do -- FIXME: hardcode! + when (fnum > 10) do -- FIXME: hardcode! warn $ "removing peer" <+> pretty p <+> "for not responding to our pings" delPeers pl [p] expire (PeerInfoKey p) diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index d44e18a5..b995823d 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -439,12 +439,19 @@ runPeer opts = Exception.handle myException $ do addPeers pl [p] + -- TODO: better-handling-for-new-peers npi <- newPeerInfo - pfails <- fetch True npi (PeerInfoKey p) (view peerPingFailed) - liftIO $ atomically $ writeTVar pfails 0 - debug $ "Got authorized peer!" <+> pretty p - <+> pretty (AsBase58 (view peerSignKey d)) + here <- find @e (KnownPeerKey p) id <&> isJust + + unless here do + pfails <- fetch True npi (PeerInfoKey p) (view peerPingFailed) + pdownfails <- fetch True npi (PeerInfoKey p) (view peerDownloadFail) + liftIO $ atomically $ writeTVar pfails 0 + liftIO $ atomically $ writeTVar pdownfails 0 + + debug $ "Got authorized peer!" <+> pretty p + <+> pretty (AsBase58 (view peerSignKey d)) void $ liftIO $ async $ withPeerM env do pause @'Seconds 1 @@ -535,9 +542,6 @@ runPeer opts = Exception.handle myException $ do | otherwise -> do - debug "announce from a known peer" - debug "preparing to dowload shit" - withDownload denv $ do processBlock h diff --git a/hbs2-peer/app/PeerTypes.hs b/hbs2-peer/app/PeerTypes.hs index 214ba0a6..0c30e87e 100644 --- a/hbs2-peer/app/PeerTypes.hs +++ b/hbs2-peer/app/PeerTypes.hs @@ -15,6 +15,7 @@ import HBS2.Storage import HBS2.System.Logger.Simple import HBS2.Net.Messaging.UDP (UDP) +import Control.Concurrent.Async import Control.Concurrent.STM import Control.Monad.Reader import Data.ByteString.Lazy (ByteString) @@ -78,6 +79,15 @@ data BlockState = makeLenses 'BlockState + +data PeerTask e = DoDownload + +data PeerThread e = + PeerThread + { _peerThreadAsync :: Async () + , _peerThreadMailbox :: TQueue (PeerTask e) + } + data DownloadEnv e = DownloadEnv { _downloadQ :: TQueue (Hash HbSync) @@ -87,6 +97,7 @@ data DownloadEnv e = , _blockState :: TVar (HashMap (Hash HbSync) BlockState) , _blockPostponed :: Cache (Hash HbSync) () , _blockInQ :: TVar (HashMap (Hash HbSync) ()) + , _peerThreads :: TVar (HashMap (Peer e) (PeerThread e)) } makeLenses 'DownloadEnv @@ -103,6 +114,7 @@ newDownloadEnv = liftIO do <*> newTVarIO mempty <*> Cache.newCache Nothing <*> newTVarIO mempty + <*> newTVarIO mempty newtype BlockDownloadM e m a = BlockDownloadM { fromBlockDownloadM :: ReaderT (DownloadEnv e) m a } @@ -181,7 +193,7 @@ addDownload h = do tinq <- asks (view blockInQ) - doAdd <- liftIO $ atomically $ stateTVar tinq + doAdd <- do liftIO $ atomically $ stateTVar tinq \hm -> case HashMap.lookup h hm of Nothing -> (True, HashMap.insert h () hm) Just{} -> (False, HashMap.insert h () hm) @@ -205,3 +217,15 @@ removeFromWip h = do liftIO $ Cache.delete po h liftIO $ atomically $ modifyTVar' st (HashMap.delete h) +hasPeerThread :: (MyPeer e, MonadIO m) => Peer e -> BlockDownloadM e m Bool +hasPeerThread p = do + threads <- asks (view peerThreads) + liftIO $ readTVarIO threads <&> HashMap.member p + +newPeerThread :: (MyPeer e, MonadIO m) => Peer e -> Async () -> BlockDownloadM e m () +newPeerThread p m = do + q <- liftIO newTQueueIO + let pt = PeerThread m q + threads <- asks (view peerThreads) + liftIO $ atomically $ modifyTVar threads $ HashMap.insert p pt +