This commit is contained in:
voidlizard 2024-11-07 13:53:43 +03:00
parent c762b48bb8
commit ec9e7230cc
3 changed files with 139 additions and 12 deletions

View File

@ -3,7 +3,7 @@
module HBS2.Clock
( module HBS2.Clock
, module System.Clock
, POSIXTime, getPOSIXTime
, POSIXTime, getPOSIXTime, NominalDiffTime
)where
import Data.Functor

View File

@ -4,6 +4,7 @@
module BlockDownloadNew where
import HBS2.Prelude.Plated
import HBS2.Clock
import HBS2.OrDie
import HBS2.Data.Detect
import HBS2.Hash
@ -521,34 +522,160 @@ downloadFromPeer t bu cache env h peer = liftIO $ withPeerM env do
data S1 =
S1Init
| S1QuerySize (Hash HbSync)
| S1CheckMissed (Hash HbSync)
downloadDispatcher :: forall e m . ( e ~ L4Proto
, MonadUnliftIO m
)
=> PeerEnv e
=> SomeBrains e
-> PeerEnv e
-> m ()
downloadDispatcher env = flip runContT pure do
downloadDispatcher brains env = flip runContT pure do
pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async ()) )
pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async ()) )
down <- newTVarIO ( mempty :: HashMap (Peer e) (HashMap (Hash HbSync) Integer) )
use <- newTVarIO ( mempty :: HashMap (Hash HbSync) Integer )
rq <- newTQueueIO
sto <- withPeerM env getStorage
work <- newTQueueIO
ContT $ bracket none $ const do
readTVarIO pts >>= mapM_ cancel
atomically $ writeTVar pts mempty
ContT $ withAsync $ forever do
join $ atomically (readTQueue work)
liftIO $ withPeerM env do
subscribe @e DownloadReqKey $ \(DownloadReqData h) -> do
debug $ green "Download request" <+> pretty h
let w = do
-- here <- hasBlock sto h <&> isJust
-- let hs = if not here then [h] else mempty
-- missed <- findMissedBlocks sto (HashRef h)
-- for_ ( hs <> fmap coerce missed ) $ \hx -> do
atomically $ writeTQueue rq h
-- pause @'Seconds 0.01
pause @'Seconds 1
atomically $ writeTQueue work w
ContT $ withAsync $ withPeerM env $ forever do
pips <- getKnownPeers @e
for_ pips $ \p -> do
debug $ "downloadDispatcher: knownPeer" <+> yellow (pretty p)
pause @'Seconds 5
ContT $ withAsync (manageThreads pts down use)
ContT $ withAsync (sizeQueryLoop pts rq down)
forever do
pause @'Seconds 10
debug $ yellow $ "I'm download dispatcher"
u <- readTVarIO use <&> length . HM.keys
d <- readTVarIO down <&> HS.size . HS.unions . fmap HM.keysSet . HM.elems
debug $ yellow $ "wip:" <+> pretty d
where
manageThreads pts down use = withPeerM env $ forever do
pips <- getKnownPeers @e <&> HS.fromList
for_ pips $ \p -> do
here <- readTVarIO pts <&> HM.member p
unless here do
a <- async $ peerDownloadLoop env p down use
atomically $ modifyTVar pts (HM.insert p a)
debug $ "downloadDispatcher: knownPeer" <+> yellow (pretty p)
dead <- atomically do
total <- readTVar pts <&> HM.toList
what <- for total $ \(p,a) -> do
let pipExist = HS.member p pips
stillAlive <- pollSTM a <&> isNothing
if pipExist && stillAlive then do
pure $ Right (p,a)
else
pure $ Left (p,a)
writeTVar pts (HM.fromList (rights what))
pure $ lefts what
for_ dead $ \(p,a) -> do
cancel a
debug $ red "terminating peer loop" <+> pretty p
pause @'Seconds 5
sizeQueryLoop pts rq down = flip runContT pure do
sto <- withPeerM env getStorage
wip <- newTVarIO ( mempty :: HashMap (Hash HbSync) NominalDiffTime )
void $ ContT $ withAsync $ replicateM_ 4 do
flip fix S1Init $ \next -> \case
S1Init -> do
w <- atomically $ readTQueue rq
here <- hasBlock sto w <&> isJust
if not here then
next (S1QuerySize w)
else
next (S1CheckMissed w)
S1QuerySize h -> do
debug $ "QUERY SIZE1" <+> pretty h
atomically $ modifyTVar wip (HM.insert h 10)
next S1Init
S1CheckMissed h -> do
-- debug $ "CHECK MISSED!" <+> pretty h
next S1Init
void $ ContT $ withAsync $ forever do
pause @'Seconds 10
let blkz = readTVarIO wip <&> HM.toList
polling (Polling 1 1) blkz $ \h -> liftIO do
pips <- readTVarIO pts <&> HM.keys
forConcurrently_ pips $ \p -> do
debug $ "QUERY SIZE" <+> pretty h <+> pretty p
r <- queryBlockSizeFromPeer brains env h p
case r of
Right (Just size) -> do
atomically do
modifyTVar wip (HM.delete h)
modifyTVar down (HM.insertWith (<>) p (HM.singleton h size))
Right Nothing -> do
atomically $ modifyTVar wip (HM.adjust ((*1.10).(+60)) h)
Left{} -> do
atomically $ modifyTVar wip (HM.adjust (*1.05) h)
forever do
pause @'Seconds 10
peerDownloadLoop env p down use = forever do
debug $ "Peer download loop" <+> green (pretty p)
hh <- atomically do
u <- readTVar use
q <- readTVar down <&> HM.toList . fromMaybe mempty . HM.lookup p
let blk = headMay [ (k,v) | (k, v) <- q, fromMaybe 0 (HM.lookup k u) == 0 ]
case blk of
Just (h,size) -> do
modifyTVar use (HM.insertWith (+) h 1)
pure (h,size)
Nothing -> retry
debug $ red "START TO DOWNLOAD" <+> pretty hh <+> "FROM" <+> pretty p

View File

@ -1197,7 +1197,7 @@ runPeer opts = respawnOnError opts $ do
peerThread "pexLoop" (pexLoop @e brains tcp)
-- FIXME: new-download-loop
peerThread "downloadDispatcher" (downloadDispatcher env)
peerThread "downloadDispatcher" (downloadDispatcher (SomeBrains brains) env)
peerThread "fillPeerMeta" (fillPeerMeta tcp tcpProbeWait)