diff --git a/.fixme/log b/.fixme/log index 9cdf90a0..72745f2d 100644 --- a/.fixme/log +++ b/.fixme/log @@ -201,4 +201,6 @@ fixme-set "workflow" "wip" "AkbZFmqRHw" fixme-set "assigned" "fastpok" "AkbZFmqRHw" fixme-set "workflow" "wip" "BZjzN7BjQ4" fixme-set "assigned" "voidlizard" "BZjzN7BjQ4" -fixme-set "assigned" "ivanovs" "4ZMqvoTMY3" \ No newline at end of file +fixme-set "assigned" "ivanovs" "4ZMqvoTMY3" +fixme-set "assigned" "voidlizard" "AR3Ppzm1E2" +fixme-set "workflow" "test" "AR3Ppzm1E2" \ No newline at end of file diff --git a/hbs2-peer/app/BlockDownload.hs b/hbs2-peer/app/BlockDownload.hs index 365911f9..8ae8e89c 100644 --- a/hbs2-peer/app/BlockDownload.hs +++ b/hbs2-peer/app/BlockDownload.hs @@ -39,6 +39,7 @@ import Data.HashMap.Strict qualified as HashMap import Data.IntMap (IntMap) import Data.IntMap qualified as IntMap import Data.IntSet qualified as IntSet +import Data.List qualified as List import Data.Maybe import Data.Set qualified as Set import Data.Set (Set) @@ -401,6 +402,31 @@ blockDownloadLoop env0 = do pl <- getPeerLocator @e + void $ liftIO $ async $ forever $ withPeerM e $ withDownload env0 do + pause @'Seconds 60 + debug "I'm peer thread sweeping thread" + + known <- knownPeers @e pl + + peers' <- forM known $ \p -> do + auth <- lift $ find (KnownPeerKey p) id <&> isJust + if auth then + pure [(p,())] + else + pure mempty + + let auth = HashMap.fromList (mconcat peers') + + pts <- asks (view peerThreads) + + r <- liftIO $ atomically $ stateTVar pts $ \x -> + let items = HashMap.toList x + in let (alive,dead) = List.partition (\(k,_) -> HashMap.member k auth ) items + in (dead, HashMap.fromList alive) + + debug $ "peers to delete" <+> pretty (length r) + + for_ r $ delPeerThread . fst void $ liftIO $ async $ forever $ withPeerM e do pause @'Seconds 5 @@ -410,6 +436,7 @@ blockDownloadLoop env0 = do for_ pee $ \p -> do pinfo' <- find (PeerInfoKey p) id + auth <- find (KnownPeerKey p) id <&> isJust maybe1 pinfo' none $ \pinfo -> do fails <- liftIO $ readTVarIO (view peerDownloadFail pinfo) @@ -419,11 +446,16 @@ blockDownloadLoop env0 = do here <- withDownload env0 $ hasPeerThread p - unless here do - debug $ "peer" <+> pretty p <+> "does not have a thread" + if | not here && auth -> do - runPeer <- liftIO $ async $ liftIO (withPeerM e $ withDownload env0 (peerDownloadLoop p)) - withDownload env0 $ newPeerThread p runPeer + debug $ "peer" <+> pretty p <+> "does not have a thread" + runPeer <- liftIO $ async $ liftIO (withPeerM e $ withDownload env0 (peerDownloadLoop p)) + withDownload env0 $ newPeerThread p runPeer + + | here && not auth -> do + pure () -- remove thread + + | otherwise -> pure () void $ liftIO $ async $ forever $ withPeerM e do pause @'Seconds 30 diff --git a/hbs2-peer/app/PeerInfo.hs b/hbs2-peer/app/PeerInfo.hs index aea9a480..437c65c7 100644 --- a/hbs2-peer/app/PeerInfo.hs +++ b/hbs2-peer/app/PeerInfo.hs @@ -154,7 +154,7 @@ peerPingLoop = do fnum <- liftIO $ readTVarIO pfails fdown <- liftIO $ readTVarIO pdownfails - when (fnum > 10) do -- FIXME: hardcode! + when (fnum > 2) do -- FIXME: hardcode! warn $ "removing peer" <+> pretty p <+> "for not responding to our pings" delPeers pl [p] expire (PeerInfoKey p) diff --git a/hbs2-peer/app/PeerTypes.hs b/hbs2-peer/app/PeerTypes.hs index 0c30e87e..1fc8fa93 100644 --- a/hbs2-peer/app/PeerTypes.hs +++ b/hbs2-peer/app/PeerTypes.hs @@ -88,6 +88,8 @@ data PeerThread e = , _peerThreadMailbox :: TQueue (PeerTask e) } +makeLenses 'PeerThread + data DownloadEnv e = DownloadEnv { _downloadQ :: TQueue (Hash HbSync) @@ -222,6 +224,16 @@ hasPeerThread p = do threads <- asks (view peerThreads) liftIO $ readTVarIO threads <&> HashMap.member p + +delPeerThread :: (MyPeer e, MonadIO m) => Peer e -> BlockDownloadM e m () +delPeerThread p = do + debug $ "delPeerThread" <+> pretty p + threads <- asks (view peerThreads) + pt <- liftIO $ atomically $ stateTVar threads (\x -> let t = HashMap.lookup p x + in (t, HashMap.delete p x)) + + maybe1 pt (pure ()) $ liftIO . cancel . view peerThreadAsync + newPeerThread :: (MyPeer e, MonadIO m) => Peer e -> Async () -> BlockDownloadM e m () newPeerThread p m = do q <- liftIO newTQueueIO