This commit is contained in:
voidlizard 2024-11-09 09:54:32 +03:00
parent ec9e7230cc
commit 8ef0c2a6a4
1 changed files with 72 additions and 86 deletions

View File

@ -1,6 +1,7 @@
{-# OPTIONS_GHC -fno-warn-orphans #-}
{-# Language UndecidableInstances #-}
{-# Language AllowAmbiguousTypes #-}
{-# Language ImplicitParams #-}
module BlockDownloadNew where
import HBS2.Prelude.Plated
@ -42,6 +43,8 @@ import Control.Monad.Trans.Cont
import Control.Concurrent.STM (flushTQueue,retry)
import Data.Map qualified as Map
import Data.Map (Map)
import Data.HashPSQ qualified as HPSQ
import Data.HashPSQ ( HashPSQ(..) )
import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HM
import Data.HashSet (HashSet)
@ -462,6 +465,8 @@ downloadFromPeer t bu cache env h peer = liftIO $ withPeerM env do
lift $ request peer req
-- pause @'MilliSeconds ( rtt * 0.01 )
t0 <- getTimeCoarse
let watchdog = fix \next -> do
@ -472,11 +477,10 @@ downloadFromPeer t bu cache env h peer = liftIO $ withPeerM env do
either (const none) (const next) r
r <- liftIO $ race watchdog do
atomically do
pieces <- readTVar _sBlockChunks2
let done = and [ IntMap.member j pieces | j <- [i .. i + chunkN-1] ]
unless done retry
atomically do
pieces <- readTVar _sBlockChunks2
let done = and [ IntMap.member j pieces | j <- [i .. i + chunkN-1] ]
unless done retry -- $ pause @'MilliSeconds ( 0.25 * rtt ) >> next
atomically $ flushTQueue chuQ
@ -527,6 +531,14 @@ data S1 =
| S1QuerySize (Hash HbSync)
| S1CheckMissed (Hash HbSync)
data S2 =
S2Init (Hash HbSync)
| S2CheckBlock1 (Hash HbSync) ByteString
| S2CheckBlock2 (Hash HbSync)
| S2FetchBlock (Hash HbSync)
| S2Exit
downloadDispatcher :: forall e m . ( e ~ L4Proto
, MonadUnliftIO m
)
@ -535,12 +547,16 @@ downloadDispatcher :: forall e m . ( e ~ L4Proto
-> m ()
downloadDispatcher brains env = flip runContT pure do
let t0 = 10.00
pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async ()) )
down <- newTVarIO ( mempty :: HashMap (Peer e) (HashMap (Hash HbSync) Integer) )
use <- newTVarIO ( mempty :: HashMap (Hash HbSync) Integer )
rq <- newTQueueIO
seen <- newTVarIO ( HPSQ.empty :: HashPSQ HashRef Int TimeSpec )
blkQ <- newTVarIO ( HPSQ.empty :: HashPSQ HashRef Int NominalDiffTime )
sto <- withPeerM env getStorage
work <- newTQueueIO
@ -554,36 +570,58 @@ downloadDispatcher brains env = flip runContT pure do
liftIO $ withPeerM env do
subscribe @e DownloadReqKey $ \(DownloadReqData h) -> do
debug $ green "Download request" <+> pretty h
let w = do
-- here <- hasBlock sto h <&> isJust
-- let hs = if not here then [h] else mempty
-- missed <- findMissedBlocks sto (HashRef h)
-- for_ ( hs <> fmap coerce missed ) $ \hx -> do
atomically $ writeTQueue rq h
-- pause @'Seconds 0.01
now <- getTimeCoarse
new <- atomically do
already <- readTVar seen <&> HPSQ.member (HashRef h)
if already then do
pure False
else do
modifyTVar seen ( HPSQ.insert (HashRef h) 1 now )
modifyTVar blkQ ( HPSQ.insert (HashRef h) 1 t0 )
pure True
when new do
debug $ green "New download request" <+> pretty h
atomically $ writeTQueue work w
let missChk = readTVarIO blkQ <&> fmap tr . HPSQ.toList
where tr (k,_,v) = (k, realToFrac v)
ContT $ withAsync (manageThreads pts down use)
ContT $ withAsync (sizeQueryLoop pts rq down)
let shiftPrio = \case
Nothing -> ((), Nothing)
Just (p,v) -> ((), Just (succ p, v * 1.10 ))
ContT $ withAsync $ forever do
polling (Polling 1 1) missChk $ \h -> do
debug $ blue "CHECK MISSED BLOCKS" <+> pretty h
missed <- findMissedBlocks sto h
here <- hasBlock sto (coerce h) <&> isJust
atomically do
for_ missed $ \hm -> modifyTVar blkQ (HPSQ.insert (coerce hm) 2 t0)
if here then do
modifyTVar blkQ (HPSQ.delete (coerce h))
else
modifyTVar blkQ ( snd . HPSQ.alter shiftPrio h )
ContT $ withAsync (manageThreads pts)
-- ContT $ withAsync (sizeQueryLoop pts rq down)
forever do
pause @'Seconds 10
size <- atomically $ readTVar blkQ <&> HPSQ.size
debug $ yellow $ "I'm download dispatcher"
u <- readTVarIO use <&> length . HM.keys
d <- readTVarIO down <&> HS.size . HS.unions . fmap HM.keysSet . HM.elems
debug $ yellow $ "wip:" <+> pretty d
debug $ yellow $ "wip:" <+> pretty size
where
manageThreads pts down use = withPeerM env $ forever do
manageThreads pts = withPeerM env $ forever do
pips <- getKnownPeers @e <&> HS.fromList
for_ pips $ \p -> do
here <- readTVarIO pts <&> HM.member p
unless here do
a <- async $ peerDownloadLoop env p down use
a <- async $ peerDownloadLoop env p
atomically $ modifyTVar pts (HM.insert p a)
debug $ "downloadDispatcher: knownPeer" <+> yellow (pretty p)
@ -609,73 +647,21 @@ downloadDispatcher brains env = flip runContT pure do
pause @'Seconds 5
sizeQueryLoop pts rq down = flip runContT pure do
sto <- withPeerM env getStorage
wip <- newTVarIO ( mempty :: HashMap (Hash HbSync) NominalDiffTime )
peerDownloadLoop env p = flip runContT pure do
void $ ContT $ withAsync $ replicateM_ 4 do
flip fix S1Init $ \next -> \case
S1Init -> do
sto <- withPeerM env getStorage
w <- atomically $ readTQueue rq
here <- hasBlock sto w <&> isJust
bm <- liftIO $ newBurstMachine 0.5 256 (Just 50) 0.05 0.10
if not here then
next (S1QuerySize w)
else
next (S1CheckMissed w)
-- let blokz = atomically do
-- d <- readTVar down <&> fromMaybe mempty . HM.lookup p
-- pure $ HM.toList d
S1QuerySize h -> do
debug $ "QUERY SIZE1" <+> pretty h
atomically $ modifyTVar wip (HM.insert h 10)
next S1Init
-- void $ ContT $ withAsync $ polling (Polling 1 1) blokz $ \h0 -> do
-- debug $ "POLL BLOCK DOWNLOAD" <+> pretty h0
S1CheckMissed h -> do
-- debug $ "CHECK MISSED!" <+> pretty h
next S1Init
void $ ContT $ withAsync $ forever do
pause @'Seconds 10
let blkz = readTVarIO wip <&> HM.toList
polling (Polling 1 1) blkz $ \h -> liftIO do
pips <- readTVarIO pts <&> HM.keys
forConcurrently_ pips $ \p -> do
debug $ "QUERY SIZE" <+> pretty h <+> pretty p
r <- queryBlockSizeFromPeer brains env h p
case r of
Right (Just size) -> do
atomically do
modifyTVar wip (HM.delete h)
modifyTVar down (HM.insertWith (<>) p (HM.singleton h size))
Right Nothing -> do
atomically $ modifyTVar wip (HM.adjust ((*1.10).(+60)) h)
Left{} -> do
atomically $ modifyTVar wip (HM.adjust (*1.05) h)
forever do
pause @'Seconds 10
debug $ yellow "Peer download loop" <+> pretty p
forever do
pause @'Seconds 10
peerDownloadLoop env p down use = forever do
debug $ "Peer download loop" <+> green (pretty p)
hh <- atomically do
u <- readTVar use
q <- readTVar down <&> HM.toList . fromMaybe mempty . HM.lookup p
let blk = headMay [ (k,v) | (k, v) <- q, fromMaybe 0 (HM.lookup k u) == 0 ]
case blk of
Just (h,size) -> do
modifyTVar use (HM.insertWith (+) h 1)
pure (h,size)
Nothing -> retry
debug $ red "START TO DOWNLOAD" <+> pretty hh <+> "FROM" <+> pretty p