This commit is contained in:
voidlizard 2024-11-09 09:54:32 +03:00
parent b25305db5c
commit 1e591bcbf8
1 changed files with 72 additions and 86 deletions

View File

@ -1,6 +1,7 @@
{-# OPTIONS_GHC -fno-warn-orphans #-} {-# OPTIONS_GHC -fno-warn-orphans #-}
{-# Language UndecidableInstances #-} {-# Language UndecidableInstances #-}
{-# Language AllowAmbiguousTypes #-} {-# Language AllowAmbiguousTypes #-}
{-# Language ImplicitParams #-}
module BlockDownloadNew where module BlockDownloadNew where
import HBS2.Prelude.Plated import HBS2.Prelude.Plated
@ -42,6 +43,8 @@ import Control.Monad.Trans.Cont
import Control.Concurrent.STM (flushTQueue,retry) import Control.Concurrent.STM (flushTQueue,retry)
import Data.Map qualified as Map import Data.Map qualified as Map
import Data.Map (Map) import Data.Map (Map)
import Data.HashPSQ qualified as HPSQ
import Data.HashPSQ ( HashPSQ(..) )
import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HM import Data.HashMap.Strict qualified as HM
import Data.HashSet (HashSet) import Data.HashSet (HashSet)
@ -462,6 +465,8 @@ downloadFromPeer t bu cache env h peer = liftIO $ withPeerM env do
lift $ request peer req lift $ request peer req
-- pause @'MilliSeconds ( rtt * 0.01 )
t0 <- getTimeCoarse t0 <- getTimeCoarse
let watchdog = fix \next -> do let watchdog = fix \next -> do
@ -472,11 +477,10 @@ downloadFromPeer t bu cache env h peer = liftIO $ withPeerM env do
either (const none) (const next) r either (const none) (const next) r
r <- liftIO $ race watchdog do r <- liftIO $ race watchdog do
atomically do
atomically do pieces <- readTVar _sBlockChunks2
pieces <- readTVar _sBlockChunks2 let done = and [ IntMap.member j pieces | j <- [i .. i + chunkN-1] ]
let done = and [ IntMap.member j pieces | j <- [i .. i + chunkN-1] ] unless done retry -- $ pause @'MilliSeconds ( 0.25 * rtt ) >> next
unless done retry
atomically $ flushTQueue chuQ atomically $ flushTQueue chuQ
@ -527,6 +531,14 @@ data S1 =
| S1QuerySize (Hash HbSync) | S1QuerySize (Hash HbSync)
| S1CheckMissed (Hash HbSync) | S1CheckMissed (Hash HbSync)
data S2 =
S2Init (Hash HbSync)
| S2CheckBlock1 (Hash HbSync) ByteString
| S2CheckBlock2 (Hash HbSync)
| S2FetchBlock (Hash HbSync)
| S2Exit
downloadDispatcher :: forall e m . ( e ~ L4Proto downloadDispatcher :: forall e m . ( e ~ L4Proto
, MonadUnliftIO m , MonadUnliftIO m
) )
@ -535,12 +547,16 @@ downloadDispatcher :: forall e m . ( e ~ L4Proto
-> m () -> m ()
downloadDispatcher brains env = flip runContT pure do downloadDispatcher brains env = flip runContT pure do
let t0 = 10.00
pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async ()) ) pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async ()) )
down <- newTVarIO ( mempty :: HashMap (Peer e) (HashMap (Hash HbSync) Integer) )
use <- newTVarIO ( mempty :: HashMap (Hash HbSync) Integer )
rq <- newTQueueIO rq <- newTQueueIO
seen <- newTVarIO ( HPSQ.empty :: HashPSQ HashRef Int TimeSpec )
blkQ <- newTVarIO ( HPSQ.empty :: HashPSQ HashRef Int NominalDiffTime )
sto <- withPeerM env getStorage sto <- withPeerM env getStorage
work <- newTQueueIO work <- newTQueueIO
@ -554,36 +570,58 @@ downloadDispatcher brains env = flip runContT pure do
liftIO $ withPeerM env do liftIO $ withPeerM env do
subscribe @e DownloadReqKey $ \(DownloadReqData h) -> do subscribe @e DownloadReqKey $ \(DownloadReqData h) -> do
debug $ green "Download request" <+> pretty h now <- getTimeCoarse
let w = do new <- atomically do
-- here <- hasBlock sto h <&> isJust already <- readTVar seen <&> HPSQ.member (HashRef h)
-- let hs = if not here then [h] else mempty if already then do
-- missed <- findMissedBlocks sto (HashRef h) pure False
-- for_ ( hs <> fmap coerce missed ) $ \hx -> do else do
atomically $ writeTQueue rq h modifyTVar seen ( HPSQ.insert (HashRef h) 1 now )
-- pause @'Seconds 0.01 modifyTVar blkQ ( HPSQ.insert (HashRef h) 1 t0 )
pure True
when new do
debug $ green "New download request" <+> pretty h
atomically $ writeTQueue work w let missChk = readTVarIO blkQ <&> fmap tr . HPSQ.toList
where tr (k,_,v) = (k, realToFrac v)
ContT $ withAsync (manageThreads pts down use) let shiftPrio = \case
ContT $ withAsync (sizeQueryLoop pts rq down) Nothing -> ((), Nothing)
Just (p,v) -> ((), Just (succ p, v * 1.10 ))
ContT $ withAsync $ forever do
polling (Polling 1 1) missChk $ \h -> do
debug $ blue "CHECK MISSED BLOCKS" <+> pretty h
missed <- findMissedBlocks sto h
here <- hasBlock sto (coerce h) <&> isJust
atomically do
for_ missed $ \hm -> modifyTVar blkQ (HPSQ.insert (coerce hm) 2 t0)
if here then do
modifyTVar blkQ (HPSQ.delete (coerce h))
else
modifyTVar blkQ ( snd . HPSQ.alter shiftPrio h )
ContT $ withAsync (manageThreads pts)
-- ContT $ withAsync (sizeQueryLoop pts rq down)
forever do forever do
pause @'Seconds 10 pause @'Seconds 10
size <- atomically $ readTVar blkQ <&> HPSQ.size
debug $ yellow $ "I'm download dispatcher" debug $ yellow $ "I'm download dispatcher"
u <- readTVarIO use <&> length . HM.keys debug $ yellow $ "wip:" <+> pretty size
d <- readTVarIO down <&> HS.size . HS.unions . fmap HM.keysSet . HM.elems
debug $ yellow $ "wip:" <+> pretty d
where where
manageThreads pts down use = withPeerM env $ forever do manageThreads pts = withPeerM env $ forever do
pips <- getKnownPeers @e <&> HS.fromList pips <- getKnownPeers @e <&> HS.fromList
for_ pips $ \p -> do for_ pips $ \p -> do
here <- readTVarIO pts <&> HM.member p here <- readTVarIO pts <&> HM.member p
unless here do unless here do
a <- async $ peerDownloadLoop env p down use a <- async $ peerDownloadLoop env p
atomically $ modifyTVar pts (HM.insert p a) atomically $ modifyTVar pts (HM.insert p a)
debug $ "downloadDispatcher: knownPeer" <+> yellow (pretty p) debug $ "downloadDispatcher: knownPeer" <+> yellow (pretty p)
@ -609,73 +647,21 @@ downloadDispatcher brains env = flip runContT pure do
pause @'Seconds 5 pause @'Seconds 5
sizeQueryLoop pts rq down = flip runContT pure do peerDownloadLoop env p = flip runContT pure do
sto <- withPeerM env getStorage
wip <- newTVarIO ( mempty :: HashMap (Hash HbSync) NominalDiffTime )
void $ ContT $ withAsync $ replicateM_ 4 do sto <- withPeerM env getStorage
flip fix S1Init $ \next -> \case
S1Init -> do
w <- atomically $ readTQueue rq bm <- liftIO $ newBurstMachine 0.5 256 (Just 50) 0.05 0.10
here <- hasBlock sto w <&> isJust
if not here then -- let blokz = atomically do
next (S1QuerySize w) -- d <- readTVar down <&> fromMaybe mempty . HM.lookup p
else -- pure $ HM.toList d
next (S1CheckMissed w)
S1QuerySize h -> do -- void $ ContT $ withAsync $ polling (Polling 1 1) blokz $ \h0 -> do
debug $ "QUERY SIZE1" <+> pretty h -- debug $ "POLL BLOCK DOWNLOAD" <+> pretty h0
atomically $ modifyTVar wip (HM.insert h 10)
next S1Init
S1CheckMissed h -> do forever do
-- debug $ "CHECK MISSED!" <+> pretty h pause @'Seconds 10
next S1Init debug $ yellow "Peer download loop" <+> pretty p
void $ ContT $ withAsync $ forever do
pause @'Seconds 10
let blkz = readTVarIO wip <&> HM.toList
polling (Polling 1 1) blkz $ \h -> liftIO do
pips <- readTVarIO pts <&> HM.keys
forConcurrently_ pips $ \p -> do
debug $ "QUERY SIZE" <+> pretty h <+> pretty p
r <- queryBlockSizeFromPeer brains env h p
case r of
Right (Just size) -> do
atomically do
modifyTVar wip (HM.delete h)
modifyTVar down (HM.insertWith (<>) p (HM.singleton h size))
Right Nothing -> do
atomically $ modifyTVar wip (HM.adjust ((*1.10).(+60)) h)
Left{} -> do
atomically $ modifyTVar wip (HM.adjust (*1.05) h)
forever do
pause @'Seconds 10
peerDownloadLoop env p down use = forever do
debug $ "Peer download loop" <+> green (pretty p)
hh <- atomically do
u <- readTVar use
q <- readTVar down <&> HM.toList . fromMaybe mempty . HM.lookup p
let blk = headMay [ (k,v) | (k, v) <- q, fromMaybe 0 (HM.lookup k u) == 0 ]
case blk of
Just (h,size) -> do
modifyTVar use (HM.insertWith (+) h 1)
pure (h,size)
Nothing -> retry
debug $ red "START TO DOWNLOAD" <+> pretty hh <+> "FROM" <+> pretty p