new-udp-download-sequence

This commit is contained in:
Dmitry Zuikov 2023-04-05 08:59:23 +03:00
parent f035827731
commit 7ed6116c45
6 changed files with 235 additions and 363 deletions

View File

@ -1,3 +1,7 @@
(fixme-set "workflow" "test" "6kx1sdj7ej")
(fixme-set "assigned" "voidlizard" "6kx1sdj7ej")
fixme-del "2N9TakVmJZ"
fixme-del "A7YuQZdSy9"
(fixme-set "workflow" "test" "7naMmLv2Fn")
fixme-del "7GUKGpHTpV"
fixme-del "4DzcYjazuz"

View File

@ -13,7 +13,7 @@ defMessageQueueSize :: Integral a => a
defMessageQueueSize = 65536*10
defBurst :: Integral a => a
defBurst = 2
defBurst = 4
defBurstMax :: Integral a => a
defBurstMax = 64
@ -64,7 +64,7 @@ defBlockPostponeTime :: TimeSpec
defBlockPostponeTime = toTimeSpec ( 45 :: Timeout 'Seconds)
defBlockBanTimeSec :: Timeout 'Seconds
defBlockBanTimeSec = 30 :: Timeout 'Seconds
defBlockBanTimeSec = 60 :: Timeout 'Seconds
defBlockWipTimeout :: TimeSpec
defBlockWipTimeout = defCookieTimeout

View File

@ -27,12 +27,15 @@ import Brains
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Concurrent.STM.TSem
import Control.Monad.Reader
import Control.Monad.Trans.Maybe
import Data.ByteString.Lazy (ByteString)
import Data.Cache qualified as Cache
import Data.Foldable hiding (find)
import Data.HashMap.Strict qualified as HashMap
import Data.HashSet (HashSet)
import Data.HashSet qualified as HashSet
import Data.IntMap (IntMap)
import Data.IntMap qualified as IntMap
import Data.IntSet qualified as IntSet
@ -41,9 +44,8 @@ import Data.Maybe
import Lens.Micro.Platform
import System.Random (randomRIO)
import System.Random.Shuffle (shuffleM)
import Numeric (showGFloat)
getBlockForDownload :: forall e m . (MonadIO m, IsPeerAddr e m, MyPeer e)
getBlockForDownload :: forall e m . (MonadIO m, IsPeerAddr e m, MyPeer e, HasStorage m)
=> Peer e
-> BlockDownloadM e m (Maybe (Hash HbSync))
@ -53,42 +55,37 @@ getBlockForDownload peer = do
brains <- asks (view downloadBrains)
prop <- asks (view blockProposed)
inq <- liftIO $ readTVarIO tinq
let size = HashMap.size inq
sto <- lift getStorage
if size == 0 then
inq <- liftIO $ readTVarIO tinq
-- let size = HashMap.size inq
let allBlks = HashMap.keys inq
hs' <- forM allBlks $ \blk -> do
here <- liftIO $ hasBlock sto blk <&> isJust
newOne <- shouldDownloadBlock @e brains peer blk
if not here && newOne then do
pure $ Just blk
else do
po <- shouldPostponeBlock @e brains blk
when po do
postponeBlock blk
pure Nothing
let hs = catMaybes hs'
let size = length hs
if size == 0 then do
pure Nothing
else do
i <- randomRIO (0, size - 1)
let blk = HashMap.keys inq !! i
peers <- advisePeersForBlock @e brains blk
pure $ Just blk
proposed <- liftIO $ Cache.lookup prop (blk, peer) <&> isJust
r <- if | proposed -> do
pure Nothing
| List.null peers -> do
pure $ Just blk
| pa `elem` peers -> do
pure $ Just blk
| otherwise -> do
newOne <- shouldDownloadBlock @e brains peer blk
let chance = if newOne then 1 else 5
lucky <- liftIO $ shuffleM (True : replicate chance False) <&> headDef False
if lucky then
pure $ Just blk
else do
pure Nothing
case r of
Nothing -> none
Just h -> do
liftIO $ Cache.insert prop (h, peer) ()
pure r
processBlock :: forall e m . ( MonadIO m
, HasStorage m
@ -383,6 +380,7 @@ blockDownloadLoop :: forall e m . ( m ~ PeerM e IO
, Block ByteString ~ ByteString
, PeerMessaging e
, IsPeerAddr e m
, HasPeerLocator e m
)
=> DownloadEnv e -> m ()
blockDownloadLoop env0 = do
@ -395,64 +393,7 @@ blockDownloadLoop env0 = do
pause @'Seconds 3.81
void $ liftIO $ async $ forever $ withPeerM e $ withDownload env0 do
pause @'Seconds 10
debug "I'm peer thread sweeping thread"
known <- knownPeers @e pl
peers' <- forM known $ \p -> do
auth <- lift $ find (KnownPeerKey p) id <&> isJust
pinfo <- lift $ find (PeerInfoKey p) id <&> isJust
if auth && pinfo then
pure [(p,())]
else
pure mempty
let auth = HashMap.fromList (mconcat peers')
pts <- asks (view peerThreads)
r <- liftIO $ atomically $ stateTVar pts $ \x ->
let items = HashMap.toList x
in let (alive,dead) = List.partition (\(k,_) -> HashMap.member k auth ) items
in (dead, HashMap.fromList alive)
debug $ "peers to delete" <+> pretty (length r)
for_ r $ killPeerThread . fst
void $ liftIO $ async $ forever $ withPeerM e do
pause @'Seconds 1
-- debug "I'm a peer maintaining thread"
brains <- withDownload env0 $ asks (view downloadBrains)
pee <- knownPeers @e pl
onKnownPeers brains pee
for_ pee $ \p -> do
pinfo' <- find (PeerInfoKey p) id
auth <- find (KnownPeerKey p) id <&> isJust
maybe1 pinfo' none $ \pinfo -> do
fails <- liftIO $ readTVarIO (view peerDownloadFail pinfo)
when (fails >= defDownloadFails) do
trace $ "peer" <+> pretty p <+> "has too many failures:" <+> pretty fails
here <- withDownload env0 $ hasPeerThread p
if | not here && auth -> do
debug $ "peer" <+> pretty p <+> "does not have a thread"
runPeer <- liftIO $ async $ liftIO (withPeerM e $ withDownload env0 (peerDownloadLoop p))
withDownload env0 $ newPeerThread p runPeer
| not auth -> do
pure ()
| otherwise -> pure ()
let withAllStuff = withPeerM e . withDownload env0
void $ liftIO $ async $ forever $ withPeerM e do
pause @'Seconds 30
@ -475,53 +416,117 @@ blockDownloadLoop env0 = do
pinfo <- fetch True npi (PeerInfoKey p) id
updatePeerInfo False pinfo
-- TODO: peer info loop
void $ liftIO $ async $ forever $ withPeerM e $ do
pause @'Seconds 10
pee <- knownPeers @e pl
npi <- newPeerInfo
debug $ "known peers" <+> pretty pee
for_ pee $ \p -> do
pinfo <- fetch True npi (PeerInfoKey p) id
burst <- liftIO $ readTVarIO (view peerBurst pinfo)
buM <- liftIO $ readTVarIO (view peerBurstMax pinfo)
errors <- liftIO $ readTVarIO (view peerErrorsPerSec pinfo)
downFails <- liftIO $ readTVarIO (view peerDownloadFail pinfo)
down <- liftIO $ readTVarIO (view peerDownloadedBlk pinfo)
rtt <- liftIO $ medianPeerRTT pinfo <&> fmap realToFrac
let rttMs = (/1e6) <$> rtt <&> (\x -> showGFloat (Just 2) x "") <&> (<> "ms")
notice $ "peer" <+> pretty p <+> "burst:" <+> pretty burst
<+> "burst-max:" <+> pretty buM
<+> "errors:" <+> pretty (downFails + errors)
<+> "down:" <+> pretty down
<+> "rtt:" <+> pretty rttMs
pure ()
void $ liftIO $ async $ forever $ withPeerM e $ withDownload env0 do
void $ liftIO $ async $ forever $ withAllStuff do
pause @'Seconds 5 -- FIXME: put to defaults
-- we need to show download stats
wipNum <- asks (view blockInQ) >>= liftIO . readTVarIO <&> HashMap.size
let po = 0
po <- postponedNum
notice $ "maintain blocks wip" <+> pretty wipNum
<+> "postponed"
<+> pretty po
busyPeers <- liftIO $ newTVarIO (mempty :: HashSet (Peer e))
released <- liftIO newTQueueIO
npi <- newPeerInfo
liftIO $ withAllStuff do
brains <- asks (view downloadBrains)
fix \next -> do
wipNum <- asks (view blockInQ) >>= liftIO . readTVarIO <&> HashMap.size
when (wipNum == 0) do
pause @'Seconds 1
next
allPips <- lift $ getKnownPeers @e
onKnownPeers brains allPips
pips <- flip filterM allPips $
\p -> liftIO do
busy <- readTVarIO busyPeers <&> HashSet.member p
pure $ not busy
when (List.null pips) do
void $ liftIO $ race (pause @'Seconds 5) $ do
void $ liftIO $ atomically $ do
p <- readTQueue released
ps <- flushTQueue released
for_ (p:ps) $ \x -> do
modifyTVar busyPeers (HashSet.delete x)
next
for_ pips $ \p -> do
h0 <- getBlockForDownload p
-- trace $ "getBlockForDownload" <+> pretty p <+> pretty h0
-- FIXME: busyloop-when-no-block-for-peer
maybe1 h0 (pure ()) $ \h -> do
liftIO $ atomically $ do
modifyTVar busyPeers (HashSet.insert p)
void $ liftIO $ async $ withAllStuff do
-- trace $ "start downloading shit" <+> pretty p <+> pretty h
lift $ onBlockDownloadAttempt brains p h
pinfo <- lift $ fetch True npi (PeerInfoKey p) id
size' <- blockSize brains p h
esize <- case size' of
Nothing -> do
doBlockSizeRequest p h
Just s -> pure (Right (Just s))
case esize of
Left{} -> pure ()
Right Nothing -> do
let downMiss = view peerDownloadMiss pinfo
liftIO $ atomically $ modifyTVar downMiss succ
Right (Just size) -> do
-- trace $ "BLOCK SIZE" <+> pretty p <+> pretty h <+> pretty size
let downFail = view peerDownloadFail pinfo
let downBlk = view peerDownloadedBlk pinfo
r <- liftIO $ race ( pause defBlockWaitMax )
$ withAllStuff
$ downloadFromWithPeer p size h
case r of
Left{} -> do
liftIO $ atomically $ modifyTVar downFail succ
failedDownload p h
Right{} -> do
onBlockDownloaded brains p h
processBlock h
liftIO $ atomically do
writeTVar downFail 0
modifyTVar downBlk succ
-- trace $ "exit download thread" <+> pretty p <+> pretty h
liftIO $ atomically $ writeTQueue released p
next
withDownload env0 do
mapM_ processBlock blks
proposed <- asks (view blockProposed)
forever do
void $ liftIO $ async $ forever do
pause @'Seconds 20
debug "block download loop. does not do anything"
-- debug "block download loop. does not do anything"
liftIO $ Cache.purgeExpired proposed
@ -551,14 +556,14 @@ postponedLoop env0 = do
void $ liftIO $ async $ withPeerM e $ withDownload env0 do
forever do
pause @'Seconds 20
pause @'Seconds 30
trace "UNPOSTPONE LOOP"
po <- asks (view blockPostponedTo) >>= liftIO . Cache.toList
for_ po $ \(h, _, expired) -> do
when (isJust expired) do
unpostponeBlock h
peerDownloadLoop :: forall e m . ( MyPeer e
doBlockSizeRequest :: forall e m . ( MyPeer e
, Sessions e (KnownPeer e) m
, Request e (BlockInfo e) m
, EventListener e (BlockInfo e) m
@ -566,135 +571,37 @@ peerDownloadLoop :: forall e m . ( MyPeer e
, HasPeerLocator e m
, IsPeerAddr e m
, m ~ PeerM e IO
) => Peer e -> BlockDownloadM e m ()
peerDownloadLoop peer = do
)
=> Peer e
-> Hash HbSync
-> BlockDownloadM e m (Either () (Maybe Integer))
pe <- lift ask
e <- ask
doBlockSizeRequest peer h = do
brains <- asks (view downloadBrains)
let doBlockSizeRequest h = do
q <- liftIO newTQueueIO
lift do
subscribe @e (BlockSizeEventKey h) $ \case
BlockSizeEvent (p1,_,s) -> do
when (p1 == peer) do
liftIO $ atomically $ writeTQueue q (Just s)
onBlockSize brains peer h s
q <- liftIO newTQueueIO
lift do
subscribe @e (BlockSizeEventKey h) $ \case
BlockSizeEvent (p1,_,s) -> do
when (p1 == peer) do
liftIO $ atomically $ writeTQueue q (Just s)
onBlockSize brains peer h s
NoBlockEvent{} -> do
-- TODO: ban-block-for-some-seconds
liftIO $ atomically $ writeTQueue q Nothing
pure ()
NoBlockEvent{} -> do
-- TODO: ban-block-for-some-seconds
liftIO $ atomically $ writeTQueue q Nothing
pure ()
request peer (GetBlockSize @e h)
request peer (GetBlockSize @e h)
liftIO $ race ( pause defBlockInfoTimeout )
( atomically $ do
s <- readTQueue q
void $ flushTQueue q
pure s
)
liftIO $ race ( pause defBlockInfoTimeout )
( atomically $ do
s <- readTQueue q
void $ flushTQueue q
pure s
)
let tryDownload pinfo h size = do
trace $ "tryDownload" <+> pretty peer <+> pretty h
here <- isBlockHereCached h
if here then do
trace $ pretty peer <+> "block" <+> pretty h <+> "is already here"
processBlock h
else do
lift $ onBlockDownloadAttempt brains peer h
let downFail = view peerDownloadFail pinfo
let downBlk = view peerDownloadedBlk pinfo
r <- liftIO $ race ( pause defBlockWaitMax )
$ withPeerM pe
$ withDownload e
$ downloadFromWithPeer peer size h
case r of
Left{} -> do
trace $ "FAIL" <+> pretty peer <+> "download block" <+> pretty h
liftIO $ atomically $ modifyTVar downFail succ
failedDownload peer h
Right{} -> do
trace $ "OK" <+> pretty peer <+> "dowloaded block" <+> pretty h
onBlockDownloaded brains peer h
processBlock h
liftIO $ atomically do
writeTVar downFail 0
modifyTVar downBlk succ
let warnExit = warn $ "peer loop exit" <+> pretty peer
-- let stopLoop = none
idle <- liftIO $ newTVarIO 0
fix \next -> do
let thenNext m = m >> next
npi <- newPeerInfo
auth' <- lift $ find (KnownPeerKey peer) id
pinfo <- lift $ fetch True npi (PeerInfoKey peer) id
let mbauth = (,) <$> auth' <*> pure pinfo
let noAuth = do
let authNone = if isNothing auth' then "noauth" else ""
warn ( "lost peer auth" <+> pretty peer <+> pretty authNone )
warnExit
maybe1 mbauth noAuth $ \_ -> do
pt' <- getPeerThread peer
maybe1 pt' warnExit $ \pt -> do
liftIO $ atomically $ modifyTVar (view peerBlocksWip pt) (max 0 . pred)
mbh <- getBlockForDownload peer
case mbh of
Nothing -> thenNext do
idleNum <- liftIO $ atomically $ stateTVar idle $ \x -> (x, succ x)
when (idleNum > 5) do
trace $ "peer IDLE" <+> pretty peer
liftIO $ atomically $ writeTVar idle 0
x <- lift $ randomRIO (2.85, 10.47)
pause @'Seconds (realToFrac x)
Just h -> thenNext do
liftIO $ atomically $ writeTVar idle 0
trace $ "start download block" <+> pretty peer <+> pretty h
mbSize2 <- blockSize brains peer h
case mbSize2 of
Just size -> do
trace $ "HAS SIZE:" <+> pretty peer <+> pretty h <+> pretty size
tryDownload pinfo h size
Nothing -> do
r <- doBlockSizeRequest h
case r of
(Right (Just s)) -> do
tryDownload pinfo h s
pure ()
_ -> pure ()
warnExit
void $ delPeerThreadData peer
-- NOTE: this is an adapter for a ResponseM monad
-- because response is working in ResponseM monad (ha!)

View File

@ -164,11 +164,11 @@ instance (Hashable (Peer e), Pretty (Peer e)) => HasBrains e (BasicBrains e) whe
commitNow b True
-- FIXME: wait-till-really-commited
sz <- liftIO $ selectBlockSize b p h
trace $ "BRAINS: onBlockSize" <+> pretty p <+> pretty h <+> pretty sz
-- trace $ "BRAINS: onBlockSize" <+> pretty p <+> pretty h <+> pretty sz
pure ()
onBlockDownloadAttempt b peer h = do
-- trace "BRAINS: onBlockDownloadAttempt"
-- trace $ "BRAINS: onBlockDownloadAttempt" <+> pretty peer <+> pretty h
noPeers <- liftIO $ readTVarIO (view brainsPeers b) <&> List.null
unless noPeers do
let cache = view brainsExpire b
@ -182,7 +182,7 @@ instance (Hashable (Peer e), Pretty (Peer e)) => HasBrains e (BasicBrains e) whe
updateOP b $ insertPeer b h p
onBlockPostponed b h = do
trace $ "BRAINS: onBlockPostponed" <+> pretty h
-- trace $ "BRAINS: onBlockPostponed" <+> pretty h
cleanupPostponed b h
claimBlockCameFrom b f t = do
@ -204,6 +204,8 @@ instance (Hashable (Peer e), Pretty (Peer e)) => HasBrains e (BasicBrains e) whe
shouldDownloadBlock b p h = do
noPeers <- liftIO $ readTVarIO (view brainsPeers b) <&> List.null
downs <- liftIO $ readTVarIO (view brainsPostponeDown b)
let doo = HashMap.lookup (p,h) downs & fromMaybe 0 & (<2)
-- trace $ "shouldDownloadBlock" <+> pretty noPeers <+> pretty doo
pure $ noPeers || (HashMap.lookup (p,h) downs & fromMaybe 0 & (<2))
advisePeersForBlock b h = do

View File

@ -17,18 +17,17 @@ import HBS2.System.Logger.Simple
import PeerConfig
import Data.Maybe
import Data.Set qualified as Set
import Data.List qualified as List
import Data.Foldable hiding (find)
import Lens.Micro.Platform
import Control.Concurrent.STM.TVar
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Monad
import Control.Concurrent.Async
import System.Random.Shuffle
import Control.Monad.Reader
import Data.Foldable hiding (find)
import Data.IntSet (IntSet)
import Prettyprinter
import Data.List qualified as List
import Data.Maybe
import Lens.Micro.Platform
import Numeric (showGFloat)
import System.Random.Shuffle
data PeerPingIntervalKey
@ -51,6 +50,7 @@ data PeerInfo e =
, _peerPingFailed :: TVar Int
, _peerDownloadedBlk :: TVar Int
, _peerDownloadFail :: TVar Int
, _peerDownloadMiss :: TVar Int
, _peerUsefulness :: TVar Double
, _peerRTTBuffer :: TVar [Integer] -- ^ Contains a list of the last few round-trip time (RTT) values, measured in nanoseconds.
-- Acts like a circular buffer.
@ -105,6 +105,7 @@ newPeerInfo = liftIO do
<*> newTVarIO 0
<*> newTVarIO 0
<*> newTVarIO 0
<*> newTVarIO 0
<*> newTVarIO []
type instance SessionData e (PeerInfo e) = PeerInfo e
@ -160,10 +161,15 @@ peerPingLoop :: forall e m . ( HasPeerLocator e m
, EventListener e (PeerHandshake e) m
, Pretty (Peer e)
, MonadIO m
, m ~ PeerM e IO
)
=> PeerConfig -> m ()
peerPingLoop cfg = do
e <- ask
pl <- getPeerLocator @e
let pingTime = cfgValue @PeerPingIntervalKey cfg
& fromMaybe 30
& realToFrac
@ -178,6 +184,38 @@ peerPingLoop cfg = do
-- subscribe @e AnyKnownPeerEventKey $ \(KnownPeerEvent p _) -> do
-- liftIO $ atomically $ writeTQueue wake [p]
-- TODO: peer info loop
void $ liftIO $ async $ forever $ withPeerM e $ do
pause @'Seconds 10
pee <- knownPeers @e pl
npi <- newPeerInfo
debug $ "known peers" <+> pretty pee
for_ pee $ \p -> do
pinfo <- fetch True npi (PeerInfoKey p) id
burst <- liftIO $ readTVarIO (view peerBurst pinfo)
buM <- liftIO $ readTVarIO (view peerBurstMax pinfo)
errors <- liftIO $ readTVarIO (view peerErrorsPerSec pinfo)
downFails <- liftIO $ readTVarIO (view peerDownloadFail pinfo)
downMiss <- liftIO $ readTVarIO (view peerDownloadMiss pinfo)
down <- liftIO $ readTVarIO (view peerDownloadedBlk pinfo)
rtt <- liftIO $ medianPeerRTT pinfo <&> fmap realToFrac
let rttMs = (/1e6) <$> rtt <&> (\x -> showGFloat (Just 2) x "") <&> (<> "ms")
notice $ "peer" <+> pretty p <+> "burst:" <+> pretty burst
<+> "burst-max:" <+> pretty buM
<+> "errors:" <+> pretty (downFails + errors)
<+> "down:" <+> pretty down
<+> "miss:" <+> pretty downMiss
<+> "rtt:" <+> pretty rttMs
pure ()
forever do
-- FIXME: defaults
@ -192,7 +230,6 @@ peerPingLoop cfg = do
debug "peerPingLoop"
pl <- getPeerLocator @e
pips <- knownPeers @e pl <&> (<> sas) <&> List.nub
for_ pips $ \p -> do

View File

@ -30,13 +30,13 @@ import Control.Monad.Reader
import Data.ByteString.Lazy (ByteString)
import Data.Cache (Cache)
import Data.Cache qualified as Cache
import Data.HashSet (HashSet)
import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HashMap
import Data.Maybe
import Lens.Micro.Platform
import Data.Hashable
import Type.Reflection
import Numeric (showGFloat)
type MyPeer e = ( Eq (Peer e)
@ -135,22 +135,10 @@ data BlockState =
makeLenses 'BlockState
newtype PeerTask e = DoDownload (Hash HbSync)
deriving newtype (Pretty)
data PeerThread e =
PeerThread
{ _peerThreadAsync :: Async ()
, _peerThreadMailbox :: TQueue (PeerTask e)
, _peerBlocksWip :: TVar Int
}
makeLenses 'PeerThread
data DownloadEnv e =
DownloadEnv
{ _blockInQ :: TVar (HashMap (Hash HbSync) ())
, _peerThreads :: TVar (HashMap (Peer e) (PeerThread e))
, _blockPostponed :: TVar (HashMap (Hash HbSync) () )
, _blockPostponedTo :: Cache (Hash HbSync) ()
, _blockDelayTo :: TQueue (Hash HbSync)
@ -164,7 +152,6 @@ makeLenses 'DownloadEnv
newDownloadEnv :: (MonadIO m, MyPeer e, HasBrains e brains) => brains -> m (DownloadEnv e)
newDownloadEnv brains = liftIO do
DownloadEnv <$> newTVarIO mempty
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> Cache.newCache (Just defBlockBanTime)
<*> newTQueueIO
@ -219,11 +206,7 @@ addDownload mbh h = do
removeFromWip h
else do
maybe1 mbh none $ \hp -> claimBlockCameFrom @e brains hp h
postpone <- shouldPostponeBlock @e brains h
if postpone then do
postponeBlock h
else do
liftIO $ atomically $ modifyTVar tinq $ HashMap.insert h ()
liftIO $ atomically $ modifyTVar tinq $ HashMap.insert h ()
postponedNum :: forall e m . (MyPeer e, MonadIO m) => BlockDownloadM e m Int
postponedNum = do
@ -249,11 +232,16 @@ postponeBlock h = do
tto <- asks (view blockPostponedTo)
tinq <- asks (view blockInQ)
liftIO $ do
liftIO $ atomically $ modifyTVar tinq $ HashMap.delete h
already <- atomically $ readTVar po <&> HashMap.member h
unless already do
atomically $ modifyTVar po (HashMap.insert h ())
postponed <- atomically $ do
already <- readTVar po <&> HashMap.member h
unless already do
modifyTVar tinq $ HashMap.delete h
modifyTVar po (HashMap.insert h ())
pure $ not already
when postponed do
Cache.insert tto h ()
onBlockPostponed @e brains h
@ -276,87 +264,6 @@ removeFromWip h = do
liftIO $ atomically $ do
modifyTVar' tinq (HashMap.delete h)
hasPeerThread :: (MyPeer e, MonadIO m) => Peer e -> BlockDownloadM e m Bool
hasPeerThread p = do
threads <- asks (view peerThreads)
liftIO $ readTVarIO threads <&> HashMap.member p
getPeerThreads :: (MyPeer e, MonadIO m) => BlockDownloadM e m [(Peer e, PeerThread e)]
getPeerThreads = do
threads <- asks (view peerThreads)
liftIO $ atomically $ readTVar threads <&> HashMap.toList
getPeerThread :: (MyPeer e, MonadIO m) => Peer e -> BlockDownloadM e m (Maybe (PeerThread e))
getPeerThread p = do
threads <- asks (view peerThreads)
liftIO $ atomically $ readTVar threads <&> HashMap.lookup p
getPeerTask :: (MyPeer e, MonadIO m) => Peer e -> BlockDownloadM e m (Maybe (PeerTask e))
getPeerTask p = do
threads <- asks (view peerThreads)
pt' <- liftIO $ atomically $ readTVar threads <&> HashMap.lookup p
maybe1 pt' (pure Nothing) $ \pt -> do
liftIO $ atomically $ readTQueue (view peerThreadMailbox pt) <&> Just
addPeerTask :: (MyPeer e, MonadIO m)
=> Peer e
-> PeerTask e
-> BlockDownloadM e m ()
addPeerTask p t = do
trace $ "ADD-PEER-TASK" <+> pretty p <+> pretty t
threads <- asks (view peerThreads)
liftIO $ atomically $ do
pt' <- readTVar threads <&> HashMap.lookup p
maybe1 pt' none $ \pt -> do
writeTQueue (view peerThreadMailbox pt) t
modifyTVar (view peerBlocksWip pt) succ
delPeerThreadData :: (MyPeer e, MonadIO m) => Peer e -> BlockDownloadM e m (Maybe (PeerThread e))
delPeerThreadData p = do
debug $ "delPeerThreadData" <+> pretty p
threads <- asks (view peerThreads)
liftIO $ atomically $ stateTVar threads (\x -> let t = HashMap.lookup p x
in (t, HashMap.delete p x))
killPeerThread :: (MyPeer e, MonadIO m) => Peer e -> BlockDownloadM e m ()
killPeerThread p = do
debug $ "delPeerThread" <+> pretty p
pt <- delPeerThreadData p
maybe1 pt (pure ()) $ liftIO . cancel . view peerThreadAsync
newPeerThread :: ( MyPeer e
, MonadIO m
, Sessions e (PeerInfo e) m
-- , Sessions e (PeerInfo e) (BlockDownloadM e m)
)
=> Peer e
-> Async ()
-> BlockDownloadM e m ()
newPeerThread p m = do
npi <- newPeerInfo
void $ lift $ fetch True npi (PeerInfoKey p) id
q <- liftIO newTQueueIO
tnum <- liftIO $ newTVarIO 0
let pt = PeerThread m q tnum
threads <- asks (view peerThreads)
liftIO $ atomically $ modifyTVar threads $ HashMap.insert p pt
getPeerTaskWip :: ( MyPeer e
, MonadIO m
-- , Sessions e (PeerInfo e) m
-- , Sessions e (PeerInfo e) (BlockDownloadM e m)
)
=> Peer e
-> BlockDownloadM e m Int
getPeerTaskWip p = do
threads <- asks (view peerThreads)
pt' <- liftIO $ atomically $ readTVar threads <&> HashMap.lookup p
maybe1 pt' (pure 0) $ \pt -> do
liftIO $ readTVarIO (view peerBlocksWip pt)
failedDownload :: forall e m . ( MyPeer e
, MonadIO m
, HasPeer e
@ -386,4 +293,19 @@ forKnownPeers m = do
pd' <- find (KnownPeerKey p) id
maybe1 pd' (pure ()) (m p)
getKnownPeers :: forall e m . ( MonadIO m
, HasPeerLocator e m
, Sessions e (KnownPeer e) m
, HasPeer e
)
=> m [Peer e]
getKnownPeers = do
pl <- getPeerLocator @e
pips <- knownPeers @e pl
r <- forM pips $ \p -> do
pd' <- find (KnownPeerKey p) id
maybe1 pd' (pure mempty) (const $ pure [p])
pure $ mconcat r