fixed AR3Ppzm1E2

This commit is contained in:
Dmitry Zuikov 2023-02-22 17:58:56 +03:00
parent b21686d7e8
commit 3cec2f711f
4 changed files with 52 additions and 6 deletions

View File

@ -202,3 +202,5 @@ fixme-set "assigned" "fastpok" "AkbZFmqRHw"
fixme-set "workflow" "wip" "BZjzN7BjQ4" fixme-set "workflow" "wip" "BZjzN7BjQ4"
fixme-set "assigned" "voidlizard" "BZjzN7BjQ4" fixme-set "assigned" "voidlizard" "BZjzN7BjQ4"
fixme-set "assigned" "ivanovs" "4ZMqvoTMY3" fixme-set "assigned" "ivanovs" "4ZMqvoTMY3"
fixme-set "assigned" "voidlizard" "AR3Ppzm1E2"
fixme-set "workflow" "test" "AR3Ppzm1E2"

View File

@ -39,6 +39,7 @@ import Data.HashMap.Strict qualified as HashMap
import Data.IntMap (IntMap) import Data.IntMap (IntMap)
import Data.IntMap qualified as IntMap import Data.IntMap qualified as IntMap
import Data.IntSet qualified as IntSet import Data.IntSet qualified as IntSet
import Data.List qualified as List
import Data.Maybe import Data.Maybe
import Data.Set qualified as Set import Data.Set qualified as Set
import Data.Set (Set) import Data.Set (Set)
@ -401,6 +402,31 @@ blockDownloadLoop env0 = do
pl <- getPeerLocator @e pl <- getPeerLocator @e
void $ liftIO $ async $ forever $ withPeerM e $ withDownload env0 do
pause @'Seconds 60
debug "I'm peer thread sweeping thread"
known <- knownPeers @e pl
peers' <- forM known $ \p -> do
auth <- lift $ find (KnownPeerKey p) id <&> isJust
if auth then
pure [(p,())]
else
pure mempty
let auth = HashMap.fromList (mconcat peers')
pts <- asks (view peerThreads)
r <- liftIO $ atomically $ stateTVar pts $ \x ->
let items = HashMap.toList x
in let (alive,dead) = List.partition (\(k,_) -> HashMap.member k auth ) items
in (dead, HashMap.fromList alive)
debug $ "peers to delete" <+> pretty (length r)
for_ r $ delPeerThread . fst
void $ liftIO $ async $ forever $ withPeerM e do void $ liftIO $ async $ forever $ withPeerM e do
pause @'Seconds 5 pause @'Seconds 5
@ -410,6 +436,7 @@ blockDownloadLoop env0 = do
for_ pee $ \p -> do for_ pee $ \p -> do
pinfo' <- find (PeerInfoKey p) id pinfo' <- find (PeerInfoKey p) id
auth <- find (KnownPeerKey p) id <&> isJust
maybe1 pinfo' none $ \pinfo -> do maybe1 pinfo' none $ \pinfo -> do
fails <- liftIO $ readTVarIO (view peerDownloadFail pinfo) fails <- liftIO $ readTVarIO (view peerDownloadFail pinfo)
@ -419,12 +446,17 @@ blockDownloadLoop env0 = do
here <- withDownload env0 $ hasPeerThread p here <- withDownload env0 $ hasPeerThread p
unless here do if | not here && auth -> do
debug $ "peer" <+> pretty p <+> "does not have a thread"
debug $ "peer" <+> pretty p <+> "does not have a thread"
runPeer <- liftIO $ async $ liftIO (withPeerM e $ withDownload env0 (peerDownloadLoop p)) runPeer <- liftIO $ async $ liftIO (withPeerM e $ withDownload env0 (peerDownloadLoop p))
withDownload env0 $ newPeerThread p runPeer withDownload env0 $ newPeerThread p runPeer
| here && not auth -> do
pure () -- remove thread
| otherwise -> pure ()
void $ liftIO $ async $ forever $ withPeerM e do void $ liftIO $ async $ forever $ withPeerM e do
pause @'Seconds 30 pause @'Seconds 30

View File

@ -154,7 +154,7 @@ peerPingLoop = do
fnum <- liftIO $ readTVarIO pfails fnum <- liftIO $ readTVarIO pfails
fdown <- liftIO $ readTVarIO pdownfails fdown <- liftIO $ readTVarIO pdownfails
when (fnum > 10) do -- FIXME: hardcode! when (fnum > 2) do -- FIXME: hardcode!
warn $ "removing peer" <+> pretty p <+> "for not responding to our pings" warn $ "removing peer" <+> pretty p <+> "for not responding to our pings"
delPeers pl [p] delPeers pl [p]
expire (PeerInfoKey p) expire (PeerInfoKey p)

View File

@ -88,6 +88,8 @@ data PeerThread e =
, _peerThreadMailbox :: TQueue (PeerTask e) , _peerThreadMailbox :: TQueue (PeerTask e)
} }
makeLenses 'PeerThread
data DownloadEnv e = data DownloadEnv e =
DownloadEnv DownloadEnv
{ _downloadQ :: TQueue (Hash HbSync) { _downloadQ :: TQueue (Hash HbSync)
@ -222,6 +224,16 @@ hasPeerThread p = do
threads <- asks (view peerThreads) threads <- asks (view peerThreads)
liftIO $ readTVarIO threads <&> HashMap.member p liftIO $ readTVarIO threads <&> HashMap.member p
delPeerThread :: (MyPeer e, MonadIO m) => Peer e -> BlockDownloadM e m ()
delPeerThread p = do
debug $ "delPeerThread" <+> pretty p
threads <- asks (view peerThreads)
pt <- liftIO $ atomically $ stateTVar threads (\x -> let t = HashMap.lookup p x
in (t, HashMap.delete p x))
maybe1 pt (pure ()) $ liftIO . cancel . view peerThreadAsync
newPeerThread :: (MyPeer e, MonadIO m) => Peer e -> Async () -> BlockDownloadM e m () newPeerThread :: (MyPeer e, MonadIO m) => Peer e -> Async () -> BlockDownloadM e m ()
newPeerThread p m = do newPeerThread p m = do
q <- liftIO newTQueueIO q <- liftIO newTQueueIO