mirror of https://github.com/voidlizard/hbs2
wip
This commit is contained in:
parent
de4995f311
commit
1c6f6aba1a
|
@ -1,6 +1,7 @@
|
||||||
{-# OPTIONS_GHC -fno-warn-orphans #-}
|
{-# OPTIONS_GHC -fno-warn-orphans #-}
|
||||||
{-# Language UndecidableInstances #-}
|
{-# Language UndecidableInstances #-}
|
||||||
{-# Language AllowAmbiguousTypes #-}
|
{-# Language AllowAmbiguousTypes #-}
|
||||||
|
{-# Language ImplicitParams #-}
|
||||||
module BlockDownloadNew where
|
module BlockDownloadNew where
|
||||||
|
|
||||||
import HBS2.Prelude.Plated
|
import HBS2.Prelude.Plated
|
||||||
|
@ -42,6 +43,8 @@ import Control.Monad.Trans.Cont
|
||||||
import Control.Concurrent.STM (flushTQueue,retry)
|
import Control.Concurrent.STM (flushTQueue,retry)
|
||||||
import Data.Map qualified as Map
|
import Data.Map qualified as Map
|
||||||
import Data.Map (Map)
|
import Data.Map (Map)
|
||||||
|
import Data.HashPSQ qualified as HPSQ
|
||||||
|
import Data.HashPSQ ( HashPSQ(..) )
|
||||||
import Data.HashMap.Strict (HashMap)
|
import Data.HashMap.Strict (HashMap)
|
||||||
import Data.HashMap.Strict qualified as HM
|
import Data.HashMap.Strict qualified as HM
|
||||||
import Data.HashSet (HashSet)
|
import Data.HashSet (HashSet)
|
||||||
|
@ -462,6 +465,8 @@ downloadFromPeer t bu cache env h peer = liftIO $ withPeerM env do
|
||||||
|
|
||||||
lift $ request peer req
|
lift $ request peer req
|
||||||
|
|
||||||
|
-- pause @'MilliSeconds ( rtt * 0.01 )
|
||||||
|
|
||||||
t0 <- getTimeCoarse
|
t0 <- getTimeCoarse
|
||||||
|
|
||||||
let watchdog = fix \next -> do
|
let watchdog = fix \next -> do
|
||||||
|
@ -472,11 +477,10 @@ downloadFromPeer t bu cache env h peer = liftIO $ withPeerM env do
|
||||||
either (const none) (const next) r
|
either (const none) (const next) r
|
||||||
|
|
||||||
r <- liftIO $ race watchdog do
|
r <- liftIO $ race watchdog do
|
||||||
|
|
||||||
atomically do
|
atomically do
|
||||||
pieces <- readTVar _sBlockChunks2
|
pieces <- readTVar _sBlockChunks2
|
||||||
let done = and [ IntMap.member j pieces | j <- [i .. i + chunkN-1] ]
|
let done = and [ IntMap.member j pieces | j <- [i .. i + chunkN-1] ]
|
||||||
unless done retry
|
unless done retry -- $ pause @'MilliSeconds ( 0.25 * rtt ) >> next
|
||||||
|
|
||||||
atomically $ flushTQueue chuQ
|
atomically $ flushTQueue chuQ
|
||||||
|
|
||||||
|
@ -527,6 +531,14 @@ data S1 =
|
||||||
| S1QuerySize (Hash HbSync)
|
| S1QuerySize (Hash HbSync)
|
||||||
| S1CheckMissed (Hash HbSync)
|
| S1CheckMissed (Hash HbSync)
|
||||||
|
|
||||||
|
|
||||||
|
data S2 =
|
||||||
|
S2Init (Hash HbSync)
|
||||||
|
| S2CheckBlock1 (Hash HbSync) ByteString
|
||||||
|
| S2CheckBlock2 (Hash HbSync)
|
||||||
|
| S2FetchBlock (Hash HbSync)
|
||||||
|
| S2Exit
|
||||||
|
|
||||||
downloadDispatcher :: forall e m . ( e ~ L4Proto
|
downloadDispatcher :: forall e m . ( e ~ L4Proto
|
||||||
, MonadUnliftIO m
|
, MonadUnliftIO m
|
||||||
)
|
)
|
||||||
|
@ -535,12 +547,16 @@ downloadDispatcher :: forall e m . ( e ~ L4Proto
|
||||||
-> m ()
|
-> m ()
|
||||||
downloadDispatcher brains env = flip runContT pure do
|
downloadDispatcher brains env = flip runContT pure do
|
||||||
|
|
||||||
|
let t0 = 10.00
|
||||||
|
|
||||||
pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async ()) )
|
pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async ()) )
|
||||||
down <- newTVarIO ( mempty :: HashMap (Peer e) (HashMap (Hash HbSync) Integer) )
|
|
||||||
use <- newTVarIO ( mempty :: HashMap (Hash HbSync) Integer )
|
|
||||||
|
|
||||||
rq <- newTQueueIO
|
rq <- newTQueueIO
|
||||||
|
|
||||||
|
seen <- newTVarIO ( HPSQ.empty :: HashPSQ HashRef Int TimeSpec )
|
||||||
|
|
||||||
|
blkQ <- newTVarIO ( HPSQ.empty :: HashPSQ HashRef Int NominalDiffTime )
|
||||||
|
|
||||||
sto <- withPeerM env getStorage
|
sto <- withPeerM env getStorage
|
||||||
|
|
||||||
work <- newTQueueIO
|
work <- newTQueueIO
|
||||||
|
@ -554,36 +570,58 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
|
|
||||||
liftIO $ withPeerM env do
|
liftIO $ withPeerM env do
|
||||||
subscribe @e DownloadReqKey $ \(DownloadReqData h) -> do
|
subscribe @e DownloadReqKey $ \(DownloadReqData h) -> do
|
||||||
debug $ green "Download request" <+> pretty h
|
now <- getTimeCoarse
|
||||||
let w = do
|
new <- atomically do
|
||||||
-- here <- hasBlock sto h <&> isJust
|
already <- readTVar seen <&> HPSQ.member (HashRef h)
|
||||||
-- let hs = if not here then [h] else mempty
|
if already then do
|
||||||
-- missed <- findMissedBlocks sto (HashRef h)
|
pure False
|
||||||
-- for_ ( hs <> fmap coerce missed ) $ \hx -> do
|
else do
|
||||||
atomically $ writeTQueue rq h
|
modifyTVar seen ( HPSQ.insert (HashRef h) 1 now )
|
||||||
-- pause @'Seconds 0.01
|
modifyTVar blkQ ( HPSQ.insert (HashRef h) 1 t0 )
|
||||||
|
pure True
|
||||||
|
when new do
|
||||||
|
debug $ green "New download request" <+> pretty h
|
||||||
|
|
||||||
atomically $ writeTQueue work w
|
let missChk = readTVarIO blkQ <&> fmap tr . HPSQ.toList
|
||||||
|
where tr (k,_,v) = (k, realToFrac v)
|
||||||
|
|
||||||
ContT $ withAsync (manageThreads pts down use)
|
let shiftPrio = \case
|
||||||
ContT $ withAsync (sizeQueryLoop pts rq down)
|
Nothing -> ((), Nothing)
|
||||||
|
Just (p,v) -> ((), Just (succ p, v * 1.10 ))
|
||||||
|
|
||||||
|
ContT $ withAsync $ forever do
|
||||||
|
polling (Polling 1 1) missChk $ \h -> do
|
||||||
|
debug $ blue "CHECK MISSED BLOCKS" <+> pretty h
|
||||||
|
|
||||||
|
missed <- findMissedBlocks sto h
|
||||||
|
|
||||||
|
here <- hasBlock sto (coerce h) <&> isJust
|
||||||
|
|
||||||
|
atomically do
|
||||||
|
for_ missed $ \hm -> modifyTVar blkQ (HPSQ.insert (coerce hm) 2 t0)
|
||||||
|
if here then do
|
||||||
|
modifyTVar blkQ (HPSQ.delete (coerce h))
|
||||||
|
else
|
||||||
|
modifyTVar blkQ ( snd . HPSQ.alter shiftPrio h )
|
||||||
|
|
||||||
|
ContT $ withAsync (manageThreads pts)
|
||||||
|
-- ContT $ withAsync (sizeQueryLoop pts rq down)
|
||||||
|
|
||||||
forever do
|
forever do
|
||||||
pause @'Seconds 10
|
pause @'Seconds 10
|
||||||
|
size <- atomically $ readTVar blkQ <&> HPSQ.size
|
||||||
debug $ yellow $ "I'm download dispatcher"
|
debug $ yellow $ "I'm download dispatcher"
|
||||||
u <- readTVarIO use <&> length . HM.keys
|
debug $ yellow $ "wip:" <+> pretty size
|
||||||
d <- readTVarIO down <&> HS.size . HS.unions . fmap HM.keysSet . HM.elems
|
|
||||||
debug $ yellow $ "wip:" <+> pretty d
|
|
||||||
|
|
||||||
where
|
where
|
||||||
|
|
||||||
manageThreads pts down use = withPeerM env $ forever do
|
manageThreads pts = withPeerM env $ forever do
|
||||||
pips <- getKnownPeers @e <&> HS.fromList
|
pips <- getKnownPeers @e <&> HS.fromList
|
||||||
|
|
||||||
for_ pips $ \p -> do
|
for_ pips $ \p -> do
|
||||||
here <- readTVarIO pts <&> HM.member p
|
here <- readTVarIO pts <&> HM.member p
|
||||||
unless here do
|
unless here do
|
||||||
a <- async $ peerDownloadLoop env p down use
|
a <- async $ peerDownloadLoop env p
|
||||||
atomically $ modifyTVar pts (HM.insert p a)
|
atomically $ modifyTVar pts (HM.insert p a)
|
||||||
|
|
||||||
debug $ "downloadDispatcher: knownPeer" <+> yellow (pretty p)
|
debug $ "downloadDispatcher: knownPeer" <+> yellow (pretty p)
|
||||||
|
@ -609,73 +647,21 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
|
|
||||||
pause @'Seconds 5
|
pause @'Seconds 5
|
||||||
|
|
||||||
sizeQueryLoop pts rq down = flip runContT pure do
|
peerDownloadLoop env p = flip runContT pure do
|
||||||
|
|
||||||
sto <- withPeerM env getStorage
|
sto <- withPeerM env getStorage
|
||||||
wip <- newTVarIO ( mempty :: HashMap (Hash HbSync) NominalDiffTime )
|
|
||||||
|
|
||||||
void $ ContT $ withAsync $ replicateM_ 4 do
|
bm <- liftIO $ newBurstMachine 0.5 256 (Just 50) 0.05 0.10
|
||||||
flip fix S1Init $ \next -> \case
|
|
||||||
S1Init -> do
|
|
||||||
|
|
||||||
w <- atomically $ readTQueue rq
|
-- let blokz = atomically do
|
||||||
here <- hasBlock sto w <&> isJust
|
-- d <- readTVar down <&> fromMaybe mempty . HM.lookup p
|
||||||
|
-- pure $ HM.toList d
|
||||||
if not here then
|
|
||||||
next (S1QuerySize w)
|
|
||||||
else
|
|
||||||
next (S1CheckMissed w)
|
|
||||||
|
|
||||||
S1QuerySize h -> do
|
|
||||||
debug $ "QUERY SIZE1" <+> pretty h
|
|
||||||
atomically $ modifyTVar wip (HM.insert h 10)
|
|
||||||
next S1Init
|
|
||||||
|
|
||||||
S1CheckMissed h -> do
|
|
||||||
-- debug $ "CHECK MISSED!" <+> pretty h
|
|
||||||
next S1Init
|
|
||||||
|
|
||||||
void $ ContT $ withAsync $ forever do
|
|
||||||
pause @'Seconds 10
|
|
||||||
|
|
||||||
let blkz = readTVarIO wip <&> HM.toList
|
|
||||||
|
|
||||||
polling (Polling 1 1) blkz $ \h -> liftIO do
|
|
||||||
|
|
||||||
pips <- readTVarIO pts <&> HM.keys
|
|
||||||
|
|
||||||
forConcurrently_ pips $ \p -> do
|
|
||||||
|
|
||||||
debug $ "QUERY SIZE" <+> pretty h <+> pretty p
|
|
||||||
r <- queryBlockSizeFromPeer brains env h p
|
|
||||||
|
|
||||||
case r of
|
|
||||||
Right (Just size) -> do
|
|
||||||
atomically do
|
|
||||||
modifyTVar wip (HM.delete h)
|
|
||||||
modifyTVar down (HM.insertWith (<>) p (HM.singleton h size))
|
|
||||||
|
|
||||||
Right Nothing -> do
|
|
||||||
atomically $ modifyTVar wip (HM.adjust ((*1.10).(+60)) h)
|
|
||||||
|
|
||||||
Left{} -> do
|
|
||||||
atomically $ modifyTVar wip (HM.adjust (*1.05) h)
|
|
||||||
|
|
||||||
|
-- void $ ContT $ withAsync $ polling (Polling 1 1) blokz $ \h0 -> do
|
||||||
|
-- debug $ "POLL BLOCK DOWNLOAD" <+> pretty h0
|
||||||
|
|
||||||
forever do
|
forever do
|
||||||
pause @'Seconds 10
|
pause @'Seconds 10
|
||||||
|
debug $ yellow "Peer download loop" <+> pretty p
|
||||||
|
|
||||||
peerDownloadLoop env p down use = forever do
|
|
||||||
debug $ "Peer download loop" <+> green (pretty p)
|
|
||||||
hh <- atomically do
|
|
||||||
u <- readTVar use
|
|
||||||
q <- readTVar down <&> HM.toList . fromMaybe mempty . HM.lookup p
|
|
||||||
let blk = headMay [ (k,v) | (k, v) <- q, fromMaybe 0 (HM.lookup k u) == 0 ]
|
|
||||||
case blk of
|
|
||||||
Just (h,size) -> do
|
|
||||||
modifyTVar use (HM.insertWith (+) h 1)
|
|
||||||
pure (h,size)
|
|
||||||
|
|
||||||
Nothing -> retry
|
|
||||||
|
|
||||||
debug $ red "START TO DOWNLOAD" <+> pretty hh <+> "FROM" <+> pretty p
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue