better block processing

This commit is contained in:
Dmitry Zuikov 2023-02-05 13:58:22 +03:00
parent 2d1d5aec03
commit 0a194b2e7c
1 changed files with 204 additions and 37 deletions

View File

@ -13,6 +13,7 @@ import HBS2.Hash
import HBS2.Merkle
import HBS2.Net.PeerLocator
import HBS2.Net.Proto
import HBS2.Net.Proto.Peer
import HBS2.Net.Proto.Definition
import HBS2.Net.Proto.Sessions
import HBS2.Prelude.Plated
@ -21,6 +22,7 @@ import HBS2.System.Logger.Simple
import PeerInfo
import Numeric ( showGFloat )
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Monad.Reader
@ -76,17 +78,29 @@ newtype instance SessionKey e (BlockChunks e) =
DownloadSessionKey (Peer e, Cookie e)
deriving stock (Generic,Typeable)
data BsFSM = Initial
| Downloading
| Postpone
-- data MyBlkInfo e =
-- MyBlkInfo (Peer e) Integer
-- deriving stock (Eq,Ord)
data BlockState =
BlockState
{ _bsStart :: TimeSpec
, _bsTimes :: Int
, _bsState :: BsFSM
, _bsWipTo :: Double
}
makeLenses 'BlockState
data DownloadEnv e =
DownloadEnv
{ _downloadQ :: TQueue (Hash HbSync)
, _peerBusy :: TVar (HashMap (Peer e) ())
, _blockPeers :: TVar (HashMap (Hash HbSync) (HashMap (Peer e) Integer) )
, _blockWip :: Cache (Hash HbSync) ()
{ _downloadQ :: TQueue (Hash HbSync)
, _peerBusy :: TVar (HashMap (Peer e) ())
, _blockPeers :: TVar (HashMap (Hash HbSync) (HashMap (Peer e) Integer) )
, _blockWip :: Cache (Hash HbSync) ()
, _blockState :: TVar (HashMap (Hash HbSync) BlockState)
, _blockPostponed :: Cache (Hash HbSync) ()
, _blockInQ :: TVar (HashMap (Hash HbSync) ())
}
makeLenses 'DownloadEnv
@ -100,6 +114,9 @@ newDownloadEnv = liftIO do
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> Cache.newCache (Just defBlockWipTimeout)
<*> newTVarIO mempty
<*> Cache.newCache Nothing
<*> newTVarIO mempty
newtype BlockDownloadM e m a =
BlockDownloadM { fromBlockDownloadM :: ReaderT (DownloadEnv e) m a }
@ -117,19 +134,89 @@ runDownloadM m = runReaderT ( fromBlockDownloadM m ) =<< newDownloadEnv
withDownload :: (MyPeer e, MonadIO m) => DownloadEnv e -> BlockDownloadM e m a -> m a
withDownload e m = runReaderT ( fromBlockDownloadM m ) e
setBlockState :: MonadIO m => Hash HbSync -> BlockState -> BlockDownloadM e m ()
setBlockState h s = do
sh <- asks (view blockState)
liftIO $ atomically $ modifyTVar' sh (HashMap.insert h s)
calcWaitTime :: MonadIO m => BlockDownloadM e m Double
calcWaitTime = do
wip <- asks (view blockWip) >>= liftIO . Cache.size
let wipn = realToFrac wip * 4
let waiting = 5 + ( (realToFrac (toNanoSeconds defBlockWaitMax) * wipn) / 1e9 )
pure waiting
touchBlockState :: MonadIO m => Hash HbSync -> BsFSM -> BlockDownloadM e m BlockState
touchBlockState h st = do
sh <- asks (view blockState)
t <- liftIO $ getTime MonotonicCoarse
wo <- calcWaitTime
let s = BlockState t 0 st wo
sn <- liftIO $ atomically $ do
modifyTVar sh (HashMap.alter (doAlter s) h)
readTVar sh <&> fromMaybe s . HashMap.lookup h
case view bsState sn of
Initial -> do
let t0 = view bsStart sn
let dt = realToFrac (toNanoSecs t - toNanoSecs t0) / 1e9
wip <- asks (view blockWip) >>= liftIO . Cache.size
let waiting = view bsWipTo sn
if dt > waiting then do -- FIXME: remove-hardcode
debug $ "pospone block" <+> pretty h <+> pretty dt <+> pretty (showGFloat (Just 2) waiting "")
let sn1 = sn { _bsState = Postpone }
liftIO $ atomically $ modifyTVar sh (HashMap.insert h sn1)
pure sn1
else do
pure sn
_ -> pure sn
where
doAlter s1 = \case
Nothing -> Just s1
Just s -> Just $ over bsTimes succ s
getBlockState :: MonadIO m => Hash HbSync -> BlockDownloadM e m BlockState
getBlockState h = do
sh <- asks (view blockState)
touchBlockState h Initial
addDownload :: MonadIO m => Hash HbSync -> BlockDownloadM e m ()
addDownload h = do
q <- asks (view downloadQ)
wip <- asks (view blockWip)
liftIO do
atomically $ writeTQueue q h
Cache.insert wip h ()
tinq <- asks (view blockInQ)
doAdd <- liftIO $ atomically $ stateTVar tinq
\hm -> case HashMap.lookup h hm of
Nothing -> (True, HashMap.insert h () hm)
Just{} -> (False, HashMap.insert h () hm)
when doAdd $ do
q <- asks (view downloadQ)
wip <- asks (view blockWip)
liftIO do
atomically $ writeTQueue q h
Cache.insert wip h ()
void $ touchBlockState h Initial
removeFromWip :: MonadIO m => Hash HbSync -> BlockDownloadM e m ()
removeFromWip h = do
wip <- asks (view blockWip)
st <- asks (view blockState)
po <- asks (view blockPostponed)
liftIO $ Cache.delete wip h
liftIO $ Cache.delete po h
liftIO $ atomically $ modifyTVar' st (HashMap.delete h)
withFreePeer :: (MyPeer e, MonadIO m)
=> Peer e
@ -166,7 +253,30 @@ dismissPeer p = do
getBlockForDownload :: MonadIO m => BlockDownloadM e m (Hash HbSync)
getBlockForDownload = do
q <- asks (view downloadQ)
liftIO $ atomically $ readTQueue q
inq <- asks (view blockInQ)
h <- liftIO $ atomically $ readTQueue q
liftIO $ atomically $ modifyTVar inq (HashMap.delete h)
pure h
withBlockForDownload :: MonadIO m
=> (Hash HbSync -> BlockDownloadM e m ())
-> BlockDownloadM e m ()
withBlockForDownload action = do
cache <- asks (view blockPostponed)
h <- getBlockForDownload
s <- getBlockState h
let postpone = toTimeSpec @'Seconds 10 -- FIXME: remove-hardcode
case view bsState s of
Postpone -> do
debug $ "posponed:" <+> pretty h
liftIO $ Cache.insert' cache (Just postpone) h ()
_ -> action h
addBlockInfo :: (MonadIO m, MyPeer e)
=> Peer e
@ -328,7 +438,7 @@ downloadFromWithPeer peer thisBkSize h = do
updatePeerInfo True pinfo
newBurst' <- liftIO $ readTVarIO burstSizeT
let newBurst = floor (realToFrac newBurst' * 0.5 )
let newBurst = max defBurst $ floor (realToFrac newBurst' * 0.5 )
liftIO $ atomically $ modifyTVar (view peerDownloaded pinfo) (+chunksN)
@ -427,6 +537,7 @@ blockDownloadLoop :: forall e m . ( m ~ PeerM e IO
, EventListener e (BlockInfo e) m
, EventListener e (BlockChunks e) m
, EventListener e (BlockAnnounce e) m
, EventListener e (PeerHandshake e) m
, EventEmitter e (BlockChunks e) m
, Sessions e (BlockChunks e) m
, Sessions e (PeerInfo e) m
@ -460,6 +571,8 @@ blockDownloadLoop env0 = do
pinfo <- fetch True npi (PeerInfoKey p) id
updatePeerInfo False pinfo
void $ liftIO $ async $ withPeerM e $ withDownload env0 (tossPostponed e)
-- TODO: peer info loop
void $ liftIO $ async $ forever $ withPeerM e $ do
pause @'Seconds 20
@ -510,45 +623,99 @@ blockDownloadLoop env0 = do
fix \next -> do
h <- getBlockForDownload
withBlockForDownload $ \h -> do
here <- liftIO $ hasBlock stor h <&> isJust
here <- liftIO $ hasBlock stor h <&> isJust
unless here do
unless here do
peers <- getPeersForBlock h
peers <- getPeersForBlock h
when (null peers) $ do
when (null peers) $ do
lift do -- in PeerM
subscribe (BlockSizeEventKey h) $ \(BlockSizeEvent (p1,hx,s)) -> do
withDownload env (addBlockInfo p1 hx s)
lift do -- in PeerM
subscribe (BlockSizeEventKey h) $ \(BlockSizeEvent (p1,hx,s)) -> do
withDownload env (addBlockInfo p1 hx s)
pips <- knownPeers @e pl
for_ pips $ \pip -> request pip (GetBlockSize @e h)
pips <- knownPeers @e pl
for_ pips $ \pip -> request pip (GetBlockSize @e h)
p <- knownPeers @e pl >>= liftIO . shuffleM
p <- knownPeers @e pl >>= liftIO . shuffleM
-- debug $ "known peers" <+> pretty p
-- debug $ "peers/blocks" <+> pretty peers
-- debug $ "known peers" <+> pretty p
-- debug $ "peers/blocks" <+> pretty peers
p0 <- headMay <$> liftIO (shuffleM peers) -- FIXME: random choice to work faster
p0 <- headMay <$> liftIO (shuffleM peers) -- FIXME: random choice to work faster
let withAllShit f = withPeerM e $ withDownload env f
let withAllShit f = withPeerM e $ withDownload env f
maybe1 p0 (again h) $ \(p1,size) -> do
withFreePeer p1 (again h) $
liftIO do
re <- race ( pause defBlockWaitMax ) $
withAllShit $ downloadFromWithPeer p1 size h
maybe1 p0 (again h) $ \(p1,size) -> do
case re of
Left{} -> withAllShit (again h)
Right{} -> withAllShit (processBlock h)
st <- getBlockState h
setBlockState h (set bsState Downloading st)
withFreePeer p1 (again h) $
liftIO do
re <- race ( pause defBlockWaitMax ) $
withAllShit $ downloadFromWithPeer p1 size h
case re of
Left{} -> withAllShit (again h)
Right{} -> withAllShit (processBlock h)
next
tossPostponed :: forall e m . ( MonadIO m
, EventListener e (PeerHandshake e) m
, MyPeer e
)
=> PeerEnv e
-> BlockDownloadM e m ()
tossPostponed penv = do
env <- ask
waitQ <- liftIO newTQueueIO
cache <- asks (view blockPostponed)
lift $ subscribe @e AnyKnownPeerEventKey $ \(KnownPeerEvent{}) -> do
liftIO $ atomically $ writeTQueue waitQ ()
forever do
r <- liftIO $ race ( pause @'Seconds 20 ) ( atomically $ readTQueue waitQ )
let allBack = either (const False) (const True) r
blks <- liftIO $ Cache.toList cache
w <- calcWaitTime
debug $ "tossPostponed" <+> pretty (showGFloat (Just 2) w "")
<+> pretty (length blks)
for_ blks $ \case
(k,_,Nothing) | not allBack -> pure ()
| otherwise -> pushBack cache k
(k,_,Just{}) -> pushBack cache k
where
pushBack cache k = do
w <- calcWaitTime
liftIO $ Cache.delete cache k
st <- getBlockState k
t0 <- liftIO $ getTime MonotonicCoarse
setBlockState k ( set bsStart t0
. set bsState Initial
. set bsWipTo w
$ st
)
debug $ "returning block to downloads" <+> pretty k
addDownload k
-- NOTE: this is an adapter for a ResponseM monad
-- because response is working in ResponseM monad (ha!)
-- So don't be confused with types