smarter-block-download

This commit is contained in:
Dmitry Zuikov 2023-02-26 13:39:35 +03:00
parent baa253ddf6
commit 179a0b49a3
8 changed files with 376 additions and 187 deletions

View File

@ -1,6 +1,12 @@
## 2023-02-26 ## 2023-02-26
TODO: block-shuffle
Если при добавлении перемешивать блоки,
то есть надежда, что пиры скачают их в разном
порядке и будут помогать друг другу.
Но при этом может оказаться сломан стриминг (когда/если он будет)
TODO: choose-peer-lesser-rtt TODO: choose-peer-lesser-rtt
Выбирать пира с наименьшим RTT при скачивании Выбирать пира с наименьшим RTT при скачивании

View File

@ -60,13 +60,16 @@ defBlockBanTime :: TimeSpec
defBlockBanTime = toTimeSpec defBlockBanTimeSec defBlockBanTime = toTimeSpec defBlockBanTimeSec
defBlockBanTimeSec :: Timeout 'Seconds defBlockBanTimeSec :: Timeout 'Seconds
defBlockBanTimeSec = 30 :: Timeout 'Seconds defBlockBanTimeSec = 60 :: Timeout 'Seconds
defBlockWipTimeout :: TimeSpec defBlockWipTimeout :: TimeSpec
defBlockWipTimeout = toTimeSpec defCookieTimeoutSec defBlockWipTimeout = defBlockSizeCacheTime
defBlockInfoTimeout :: Timeout 'Seconds defBlockInfoTimeout :: Timeout 'Seconds
defBlockInfoTimeout = 2 defBlockInfoTimeout = 1
defBlockInfoTimeoutSpec :: TimeSpec
defBlockInfoTimeoutSpec = toTimeSpec defBlockInfoTimeout
-- how much time wait for block from peer? -- how much time wait for block from peer?
defBlockWaitMax :: Timeout 'Seconds defBlockWaitMax :: Timeout 'Seconds

View File

@ -56,7 +56,7 @@ instance HasProtocol UDP (BlockInfo UDP) where
-- FIXME: requestMinPeriod-breaks-fast-block-download -- FIXME: requestMinPeriod-breaks-fast-block-download
-- --
-- requestPeriodLim = ReqLimPerMessage 0.5 requestPeriodLim = ReqLimPerMessage 1
instance HasProtocol UDP (BlockChunks UDP) where instance HasProtocol UDP (BlockChunks UDP) where
type instance ProtocolId (BlockChunks UDP) = 2 type instance ProtocolId (BlockChunks UDP) = 2

View File

@ -31,6 +31,7 @@ import Data.ByteString.Lazy (ByteString)
import Data.Cache qualified as Cache import Data.Cache qualified as Cache
import Data.Foldable hiding (find) import Data.Foldable hiding (find)
import Data.HashMap.Strict qualified as HashMap import Data.HashMap.Strict qualified as HashMap
import Data.HashSet qualified as HashSet
import Data.IntMap (IntMap) import Data.IntMap (IntMap)
import Data.IntMap qualified as IntMap import Data.IntMap qualified as IntMap
import Data.IntSet qualified as IntSet import Data.IntSet qualified as IntSet
@ -44,29 +45,24 @@ getBlockForDownload :: MonadIO m => BlockDownloadM e m (Hash HbSync)
getBlockForDownload = do getBlockForDownload = do
q <- asks (view downloadQ) q <- asks (view downloadQ)
inq <- asks (view blockInQ) inq <- asks (view blockInQ)
h <- liftIO $ atomically $ readTQueue q liftIO $ atomically $ do
liftIO $ atomically $ modifyTVar inq (HashMap.delete h) h <- readTQueue q
pure h modifyTVar inq (HashMap.delete h)
pure h
withBlockForDownload :: MonadIO m withBlockForDownload :: (MonadIO m, MyPeer e, HasStorage m, HasPeerLocator e m)
=> (Hash HbSync -> BlockDownloadM e m ()) => Peer e
-> (Hash HbSync -> BlockDownloadM e m ())
-> BlockDownloadM e m () -> BlockDownloadM e m ()
withBlockForDownload action = do withBlockForDownload p action = do
-- FIXME: busyloop-e46ad5e0
cache <- asks (view blockPostponed)
h <- getBlockForDownload h <- getBlockForDownload
s <- getBlockState h banned <- isBanned p h
if banned then do
let postpone = toTimeSpec @'Seconds 10 -- FIXME: remove-hardcode addDownload h
else do
case view bsState s of action h
Postpone -> do
debug $ "posponed:" <+> pretty h
liftIO $ Cache.insert' cache (Just postpone) h ()
_ -> action h
addBlockInfo :: (MonadIO m, MyPeer e) addBlockInfo :: (MonadIO m, MyPeer e)
=> Peer e => Peer e
@ -93,6 +89,8 @@ getPeersForBlock h = do
processBlock :: forall e m . ( MonadIO m processBlock :: forall e m . ( MonadIO m
, HasStorage m , HasStorage m
, MyPeer e
, HasPeerLocator e (BlockDownloadM e m)
, Block ByteString ~ ByteString , Block ByteString ~ ByteString
) )
=> Hash HbSync => Hash HbSync
@ -161,7 +159,8 @@ processBlock h = do
-- GetBlockSize request -- GetBlockSize request
downloadFromWithPeer :: forall e m . DownloadFromPeerStuff e m downloadFromWithPeer :: forall e m . ( DownloadFromPeerStuff e m
, HasPeerLocator e (BlockDownloadM e m) )
=> Peer e => Peer e
-> Integer -> Integer
-> Hash HbSync -> Hash HbSync
@ -467,11 +466,11 @@ blockDownloadLoop env0 = do
downFails <- liftIO $ readTVarIO (view peerDownloadFail pinfo) downFails <- liftIO $ readTVarIO (view peerDownloadFail pinfo)
down <- liftIO $ readTVarIO (view peerDownloadedBlk pinfo) down <- liftIO $ readTVarIO (view peerDownloadedBlk pinfo)
useful <- liftIO $ readTVarIO (view peerUsefulness pinfo) useful <- liftIO $ readTVarIO (view peerUsefulness pinfo)
debug $ "peer" <+> pretty p <+> "burst:" <+> pretty burst notice $ "peer" <+> pretty p <+> "burst:" <+> pretty burst
<+> "burst-max:" <+> pretty buM <+> "burst-max:" <+> pretty buM
<+> "errors:" <+> pretty (downFails + errors) <+> "errors:" <+> pretty (downFails + errors)
<+> "down:" <+> pretty down <+> "down:" <+> pretty down
<+> "useful:" <+> pretty useful <+> "useful:" <+> pretty useful
pure () pure ()
void $ liftIO $ async $ forever $ withPeerM e $ withDownload env0 do void $ liftIO $ async $ forever $ withPeerM e $ withDownload env0 do
@ -493,12 +492,14 @@ blockDownloadLoop env0 = do
liftIO $ atomically $ writeTVar tinfo alive liftIO $ atomically $ writeTVar tinfo alive
po <- asks (view peerPostponed) >>= liftIO . readTVarIO
notice $ "maintain blocks wip" <+> pretty (Set.size aliveWip) notice $ "maintain blocks wip" <+> pretty (Set.size aliveWip)
<+> "postponed"
<+> pretty (HashMap.size po)
withDownload env0 do withDownload env0 do
env <- ask
mapM_ processBlock blks mapM_ processBlock blks
fix \next -> do fix \next -> do
@ -506,6 +507,57 @@ blockDownloadLoop env0 = do
debug "I'm a download loop. I don't do anything anymore" debug "I'm a download loop. I don't do anything anymore"
next next
postponedLoop :: forall e m . ( MyPeer e
, Sessions e (KnownPeer e) m
, Request e (BlockInfo e) m
, EventListener e (BlockInfo e) m
, DownloadFromPeerStuff e m
, HasPeerLocator e m
, m ~ PeerM e IO
)
=> DownloadEnv e -> m ()
postponedLoop env0 = do
e <- ask
void $ liftIO $ async $ withPeerM e $ withDownload env0 do
po <- asks (view peerPostponed)
pl <- getPeerLocator @e
forever do
pause @'Seconds 10
debug "findPosponedLoop"
ba <- asks (view blockBanned) >>= liftIO . Cache.keys
pipsAll <- knownPeers @e pl <&> HashSet.fromList
let blk2pip = HashMap.fromListWith (<>) [ (h, HashSet.singleton p) | (h,p) <- ba ]
& HashMap.toList
for_ blk2pip $ \(h, banned) -> do
let notBanned = HashSet.difference pipsAll banned
when (null notBanned) do
liftIO $ atomically $ modifyTVar po $ HashMap.insert h ()
void $ liftIO $ async $ withPeerM e $ withDownload env0 do
po <- asks (view peerPostponed)
forever do
-- FIXME: del-posponed-time-hardcode
pause @'Seconds 60
debug "postponedLoop"
back <- liftIO $ atomically $ stateTVar po $ \hm ->
let els = HashMap.toList hm in
-- FIXME: back-from-postponed-size-var
let (x,xs) = List.splitAt 10 els in
(fmap fst x, HashMap.fromList xs)
for_ back returnPostponed
peerDownloadLoop :: forall e m . ( MyPeer e peerDownloadLoop :: forall e m . ( MyPeer e
, Sessions e (KnownPeer e) m , Sessions e (KnownPeer e) m
, Request e (BlockInfo e) m , Request e (BlockInfo e) m
@ -516,124 +568,110 @@ peerDownloadLoop :: forall e m . ( MyPeer e
) => Peer e -> BlockDownloadM e m () ) => Peer e -> BlockDownloadM e m ()
peerDownloadLoop peer = do peerDownloadLoop peer = do
bannedBlocks <- liftIO $ Cache.newCache (Just defBlockBanTime) sizeCache <- liftIO $ Cache.newCache @_ @Integer (Just defBlockSizeCacheTime)
sizeCache <- liftIO $ Cache.newCache (Just defBlockSizeCacheTime) noBlock <- liftIO $ Cache.newCache (Just defBlockBanTime)
seenBlocks <- liftIO $ newTVarIO mempty
pe <- lift ask pe <- lift ask
e <- ask e <- ask
let withAllStuff m = withPeerM pe $ withDownload e m let doBlockSizeRequest h = do
q <- liftIO newTQueueIO
lift do
subscribe @e (BlockSizeEventKey h) $ \case
BlockSizeEvent (p1,_,s) -> do
when (p1 == peer) do
liftIO $ Cache.insert sizeCache h s
liftIO $ atomically $ writeTQueue q (Just s)
forever do NoBlockEvent{} -> do
-- TODO: ban-block-for-some-seconds
liftIO $ atomically $ writeTQueue q Nothing
pure ()
sto <- lift getStorage request peer (GetBlockSize @e h)
auth <- lift $ find (KnownPeerKey peer) id <&> isJust liftIO $ race ( pause defBlockInfoTimeout )
( atomically $ do
s <- readTQueue q
void $ flushTQueue q
pure s
)
let tryDownload pinfo h size = do
trace $ "tryDownload" <+> pretty peer <+> pretty h
here <- isBlockHereCached h
if here then do
trace $ pretty peer <+> "block" <+> pretty h <+> "is already here"
processBlock h
else do
let downFail = view peerDownloadFail pinfo
let downBlk = view peerDownloadedBlk pinfo
r <- liftIO $ race ( pause defBlockWaitMax )
$ withPeerM pe
$ withDownload e
$ downloadFromWithPeer peer size h
case r of
Left{} -> do
trace $ "FAIL" <+> pretty peer <+> "download block" <+> pretty h
liftIO $ atomically $ modifyTVar downFail succ
failedDownload peer h
Right{} -> do
trace $ "OK" <+> pretty peer <+> "dowloaded block" <+> pretty h
processBlock h
liftIO $ atomically do
writeTVar downFail 0
modifyTVar downBlk succ
fix \next -> do
auth' <- lift $ find (KnownPeerKey peer) id
pinfo' <- lift $ find (PeerInfoKey peer) id -- (view peerDownloadFail) pinfo' <- lift $ find (PeerInfoKey peer) id -- (view peerDownloadFail)
maybe1 pinfo' none $ \pinfo -> do let mbauth = (,) <$> auth' <*> pinfo'
let downFail = view peerDownloadFail pinfo maybe1 mbauth none $ \(_,pinfo) -> do
let downBlk = view peerDownloadedBlk pinfo
failNum <- liftIO $ readTVarIO downFail
-- FIXME: better-avoiding-busyloop withBlockForDownload peer $ \h -> do
-- unless notFailed do -- TODO: insert-busyloop-counter-for-block-request
-- pause @'Seconds 1 -- trace $ "withBlockForDownload" <+> pretty peer <+> pretty h
when (failNum > 5) do mbSize <- liftIO $ Cache.lookup sizeCache h
pause @'Seconds defBlockWaitMax noBlk <- liftIO $ Cache.lookup noBlock h <&> isJust
when auth do case mbSize of
Just size -> do
trace $ "HAS SIZE:" <+> pretty peer <+> pretty h <+> pretty size
updateBlockPeerSize h peer size
tryDownload pinfo h size
withBlockForDownload $ \h -> do Nothing | noBlk -> do
e <- lift ask trace $ pretty peer <+> "does not have block" <+> pretty h
ee <- ask banBlock peer h
addDownload h
st <- getBlockState h Nothing -> do
incBlockSizeReqCount h
let alterSeen = \case r <- doBlockSizeRequest h
Just x -> Just (succ x)
Nothing -> Just 1
banned <- liftIO $ Cache.lookup bannedBlocks h <&> isJust case r of
Left{} -> failedDownload peer h
if banned then do Right Nothing -> do
pl <- getPeerLocator @e -- FIXME: non-existent-block-ruins-all
ps <- knownPeers @e pl <&> length liftIO $ Cache.insert noBlock h ()
let seenTotal = view bsTimes st
if seenTotal < ps*100 then do
addDownload h
else do
let wa = min defBlockBanTimeSec (realToFrac (ceiling $ Prelude.logBase 10 (realToFrac (2 * seenTotal))))
void $ liftIO $ async $ withAllStuff (pause wa >> addDownload h)
-- trace $ "block" <+> pretty h <+> "seen" <+> pretty seenTotal <+> "times" <+> parens (pretty wa)
else do
liftIO $ atomically $ modifyTVar seenBlocks (HashMap.alter alterSeen h)
seenTimes <- liftIO $ readTVarIO seenBlocks <&> fromMaybe 0 . HashMap.lookup h
when ( seenTimes > 100 ) do
trace $ "ban block" <+> pretty h <+> "for a while" <+> parens (pretty seenTimes)
liftIO $ atomically $ modifyTVar seenBlocks (HashMap.delete h)
liftIO $ Cache.insert bannedBlocks h ()
setBlockState h (set bsState Downloading st)
r1 <- liftIO $ race ( pause defBlockInfoTimeout ) $ withPeerM e do
-- blksq <- liftIO newTQueueIO
cachedSize' <- liftIO $ Cache.lookup sizeCache h
case cachedSize' of
Just sz -> pure (Just sz)
Nothing -> do
subscribe @e (BlockSizeEventKey h) $ \case
(BlockSizeEvent (_,_,s)) -> do
-- liftIO $ atomically $ writeTQueue blksq (Just s)
liftIO $ Cache.insert sizeCache h s
(NoBlockEvent p) -> do
pure ()
-- trace $ "NoBlockEvent" <+> pretty p <+> pretty h
-- liftIO $ atomically $ writeTQueue blksq Nothing
request peer (GetBlockSize @e h)
pure Nothing
-- liftIO $ atomically $ readTQueue blksq
case r1 of
Left{} -> do
liftIO $ atomically $ modifyTVar downFail succ
addDownload h addDownload h
Right Nothing -> do Right (Just s) -> do
addDownload h -- this is a legit situation; it is handled above (block ban... etc). updateBlockPeerSize h peer s
tryDownload pinfo h s
Right (Just size) -> do next
r2 <- liftIO $ race ( pause defBlockWaitMax )
$ withPeerM e
$ withDownload ee
$ downloadFromWithPeer peer size h
case r2 of
Left{} -> do
liftIO $ atomically $ modifyTVar downFail succ
addDownload h
-- FIXME: remove-block-seen-times-hardcode
Right{} -> do
processBlock h
liftIO $ atomically do
writeTVar downFail 0
modifyTVar downBlk succ
pure ()
-- NOTE: this is an adapter for a ResponseM monad -- NOTE: this is an adapter for a ResponseM monad
-- because response is working in ResponseM monad (ha!) -- because response is working in ResponseM monad (ha!)

View File

@ -7,12 +7,14 @@ import HBS2.Hash
import HBS2.Events import HBS2.Events
import HBS2.Data.Types.Refs import HBS2.Data.Types.Refs
import HBS2.Actors.Peer import HBS2.Actors.Peer
import HBS2.Net.PeerLocator
import HBS2.Storage import HBS2.Storage
import HBS2.Merkle import HBS2.Merkle
import HBS2.System.Logger.Simple import HBS2.System.Logger.Simple
import PeerTypes import PeerTypes
import PeerConfig import PeerConfig
import BlockDownload (processBlock)
import Data.Map qualified as Map import Data.Map qualified as Map
import Data.Foldable import Data.Foldable
@ -41,6 +43,8 @@ noLogFile = err "download log not defined"
downloadQueue :: forall e m . ( MyPeer e downloadQueue :: forall e m . ( MyPeer e
, DownloadFromPeerStuff e m , DownloadFromPeerStuff e m
, HasPeerLocator e (BlockDownloadM e m)
, HasPeerLocator e m
, EventListener e (DownloadReq e) m , EventListener e (DownloadReq e) m
) => PeerConfig -> DownloadEnv e -> m () ) => PeerConfig -> DownloadEnv e -> m ()
@ -71,7 +75,7 @@ downloadQueue conf denv = do
debug $ "downloadQueue" <+> pretty fn debug $ "downloadQueue" <+> pretty fn
liftIO do lo <- liftIO do
-- FIXME: will-crash-on-big-logs -- FIXME: will-crash-on-big-logs
atomically $ waitTSem fsem atomically $ waitTSem fsem
@ -103,13 +107,16 @@ downloadQueue conf denv = do
let leftovers = [ x | x <- hashesWip , Map.member x loosers ] let leftovers = [ x | x <- hashesWip , Map.member x loosers ]
for_ leftovers $ withDownload denv . addDownload
atomically $ waitTSem fsem atomically $ waitTSem fsem
catchAny ( B8.writeFile fn ( B8.unlines (fmap (B8.pack.show.pretty) leftovers) ) ) catchAny ( B8.writeFile fn ( B8.unlines (fmap (B8.pack.show.pretty) leftovers) ) )
whimper whimper
atomically $ signalTSem fsem atomically $ signalTSem fsem
pure leftovers
for_ lo $ withDownload denv . processBlock
debug "downloadQueue okay" debug "downloadQueue okay"
-- TODO: remove-downloadQueue-pause-hardcode -- TODO: remove-downloadQueue-pause-hardcode

View File

@ -25,6 +25,10 @@ import Data.Set (Set)
import Data.Text qualified as Text import Data.Text qualified as Text
import Text.InterpolatedString.Perl6 (qc) import Text.InterpolatedString.Perl6 (qc)
data FeatureSwitch =
FeatureOn | FeatureOff
deriving (Eq,Ord,Show,Generic)
class HasCfgKey a b where class HasCfgKey a b where
-- type family CfgValue a :: Type -- type family CfgValue a :: Type
key :: Id key :: Id
@ -154,6 +158,14 @@ instance {-# OVERLAPPABLE #-} (IsString b, HasCfgKey a (Maybe b)) => HasCfgValue
| ListVal @C (Key s [LitStrVal e]) <- syn, s == key @a @(Maybe b) | ListVal @C (Key s [LitStrVal e]) <- syn, s == key @a @(Maybe b)
] ]
instance (HasCfgKey a FeatureSwitch) => HasCfgValue a FeatureSwitch where
cfgValue (PeerConfig syn) = val
where
val =
lastDef FeatureOff
[ FeatureOn
| ListVal @C (Key s [SymbolVal (Id e)]) <- syn, s == key @a @FeatureSwitch, e == "on"
]
instance {-# OVERLAPPABLE #-} (IsString b, HasCfgKey a [b]) => HasCfgValue a [b] where instance {-# OVERLAPPABLE #-} (IsString b, HasCfgKey a [b]) => HasCfgValue a [b] where
cfgValue (PeerConfig syn) = val cfgValue (PeerConfig syn) = val

View File

@ -59,6 +59,7 @@ import System.Directory
import System.Exit import System.Exit
import System.IO import System.IO
import Data.Set (Set) import Data.Set (Set)
import GHC.TypeLits
defStorageThreads :: Integral a => a defStorageThreads :: Integral a => a
defStorageThreads = 4 defStorageThreads = 4
@ -79,6 +80,7 @@ data PeerKeyFileKey
data PeerBlackListKey data PeerBlackListKey
data PeerStorageKey data PeerStorageKey
data PeerAcceptAnnounceKey data PeerAcceptAnnounceKey
data PeerTraceKey
data AcceptAnnounce = AcceptAnnounceAll data AcceptAnnounce = AcceptAnnounceAll
| AcceptAnnounceFrom (Set (PubKey 'Sign UDP)) | AcceptAnnounceFrom (Set (PubKey 'Sign UDP))
@ -90,6 +92,9 @@ instance Pretty AcceptAnnounce where
-- FIXME: better-pretty-for-AcceptAnnounceFrom -- FIXME: better-pretty-for-AcceptAnnounceFrom
AcceptAnnounceFrom xs -> parens ("accept-announce" <+> pretty (fmap AsBase58 (Set.toList xs))) AcceptAnnounceFrom xs -> parens ("accept-announce" <+> pretty (fmap AsBase58 (Set.toList xs)))
instance HasCfgKey PeerTraceKey FeatureSwitch where
key = "trace"
instance HasCfgKey PeerListenKey (Maybe String) where instance HasCfgKey PeerListenKey (Maybe String) where
key = "listen" key = "listen"
@ -354,6 +359,7 @@ runPeer opts = Exception.handle myException $ do
let rpcConf = cfgValue @PeerRpcKey conf let rpcConf = cfgValue @PeerRpcKey conf
let keyConf = cfgValue @PeerKeyFileKey conf let keyConf = cfgValue @PeerKeyFileKey conf
let storConf = cfgValue @PeerStorageKey conf <&> StoragePrefix let storConf = cfgValue @PeerStorageKey conf <&> StoragePrefix
let traceConf = cfgValue @PeerTraceKey conf :: FeatureSwitch
let listenSa = view listenOn opts <|> listenConf <|> Just defListenUDP let listenSa = view listenOn opts <|> listenConf <|> Just defListenUDP
let rpcSa = view listenRpc opts <|> rpcConf <|> Just defRpcUDP let rpcSa = view listenRpc opts <|> rpcConf <|> Just defRpcUDP
@ -363,6 +369,11 @@ runPeer opts = Exception.handle myException $ do
debug $ "storage prefix:" <+> pretty pref debug $ "storage prefix:" <+> pretty pref
debug $ pretty "trace: " <+> pretty (show traceConf)
when (traceConf == FeatureOn) do
setLogging @TRACE tracePrefix
let bls = cfgValue @PeerBlackListKey conf :: Set String let bls = cfgValue @PeerBlackListKey conf :: Set String
let blkeys = Set.fromList let blkeys = Set.fromList
@ -528,6 +539,8 @@ runPeer opts = Exception.handle myException $ do
peerThread (blockDownloadLoop denv) peerThread (blockDownloadLoop denv)
peerThread (postponedLoop denv)
peerThread (downloadQueue conf denv) peerThread (downloadQueue conf denv)
peerThread $ forever $ do peerThread $ forever $ do

View File

@ -1,6 +1,7 @@
{-# OPTIONS_GHC -fno-warn-orphans #-} {-# OPTIONS_GHC -fno-warn-orphans #-}
{-# Language TemplateHaskell #-} {-# Language TemplateHaskell #-}
{-# Language UndecidableInstances #-} {-# Language UndecidableInstances #-}
{-# Language MultiWayIf #-}
module PeerTypes where module PeerTypes where
import HBS2.Actors.Peer import HBS2.Actors.Peer
@ -15,6 +16,7 @@ import HBS2.Net.Proto.Definition
import HBS2.Net.Proto.Sessions import HBS2.Net.Proto.Sessions
import HBS2.Prelude.Plated import HBS2.Prelude.Plated
import HBS2.Storage import HBS2.Storage
import HBS2.Net.PeerLocator
import HBS2.System.Logger.Simple import HBS2.System.Logger.Simple
import PeerInfo import PeerInfo
@ -34,7 +36,12 @@ import Type.Reflection
import Numeric (showGFloat) import Numeric (showGFloat)
type MyPeer e = (Eq (Peer e), Hashable (Peer e), Pretty (Peer e)) type MyPeer e = ( Eq (Peer e)
, Hashable (Peer e)
, Pretty (Peer e)
, HasPeer e
, Block ByteString ~ ByteString
)
data DownloadReq e data DownloadReq e
@ -110,16 +117,12 @@ newtype instance SessionKey e (BlockChunks e) =
deriving newtype instance Hashable (SessionKey UDP (BlockChunks UDP)) deriving newtype instance Hashable (SessionKey UDP (BlockChunks UDP))
deriving stock instance Eq (SessionKey UDP (BlockChunks UDP)) deriving stock instance Eq (SessionKey UDP (BlockChunks UDP))
data BsFSM = Initial
| Downloading
| Postpone
data BlockState = data BlockState =
BlockState BlockState
{ _bsStart :: TimeSpec { _bsStart :: TimeSpec
, _bsTimes :: Int , _bsReqSizeTimes :: TVar Int
, _bsState :: BsFSM , _bsLastSeen :: TVar TimeSpec
, _bsWipTo :: Double , _bsHasSize :: TVar Bool
} }
makeLenses 'BlockState makeLenses 'BlockState
@ -142,9 +145,11 @@ data DownloadEnv e =
, _blockPeers :: TVar (HashMap (Hash HbSync) (HashMap (Peer e) Integer) ) , _blockPeers :: TVar (HashMap (Hash HbSync) (HashMap (Peer e) Integer) )
, _blockWip :: Cache (Hash HbSync) () , _blockWip :: Cache (Hash HbSync) ()
, _blockState :: TVar (HashMap (Hash HbSync) BlockState) , _blockState :: TVar (HashMap (Hash HbSync) BlockState)
, _blockPostponed :: Cache (Hash HbSync) ()
, _blockInQ :: TVar (HashMap (Hash HbSync) ()) , _blockInQ :: TVar (HashMap (Hash HbSync) ())
, _peerThreads :: TVar (HashMap (Peer e) (PeerThread e)) , _peerThreads :: TVar (HashMap (Peer e) (PeerThread e))
, _peerPostponed :: TVar (HashMap (Hash HbSync) ())
, _blockStored :: Cache (Hash HbSync) ()
, _blockBanned :: Cache (Hash HbSync, Peer e) ()
} }
makeLenses 'DownloadEnv makeLenses 'DownloadEnv
@ -157,9 +162,11 @@ newDownloadEnv = liftIO do
<*> newTVarIO mempty <*> newTVarIO mempty
<*> Cache.newCache (Just defBlockWipTimeout) <*> Cache.newCache (Just defBlockWipTimeout)
<*> newTVarIO mempty <*> newTVarIO mempty
<*> Cache.newCache Nothing
<*> newTVarIO mempty <*> newTVarIO mempty
<*> newTVarIO mempty <*> newTVarIO mempty
<*> newTVarIO mempty
<*> Cache.newCache (Just defBlockWipTimeout)
<*> Cache.newCache (Just defBlockBanTime)
newtype BlockDownloadM e m a = newtype BlockDownloadM e m a =
BlockDownloadM { fromBlockDownloadM :: ReaderT (DownloadEnv e) m a } BlockDownloadM { fromBlockDownloadM :: ReaderT (DownloadEnv e) m a }
@ -174,7 +181,7 @@ newtype BlockDownloadM e m a =
runDownloadM :: (MyPeer e, MonadIO m) => BlockDownloadM e m a -> m a runDownloadM :: (MyPeer e, MonadIO m) => BlockDownloadM e m a -> m a
runDownloadM m = runReaderT ( fromBlockDownloadM m ) =<< newDownloadEnv runDownloadM m = runReaderT ( fromBlockDownloadM m ) =<< newDownloadEnv
withDownload :: (MyPeer e, MonadIO m) => DownloadEnv e -> BlockDownloadM e m a -> m a withDownload :: (MyPeer e, HasPeerLocator e m, MonadIO m) => DownloadEnv e -> BlockDownloadM e m a -> m a
withDownload e m = runReaderT ( fromBlockDownloadM m ) e withDownload e m = runReaderT ( fromBlockDownloadM m ) e
setBlockState :: MonadIO m => Hash HbSync -> BlockState -> BlockDownloadM e m () setBlockState :: MonadIO m => Hash HbSync -> BlockState -> BlockDownloadM e m ()
@ -182,7 +189,53 @@ setBlockState h s = do
sh <- asks (view blockState) sh <- asks (view blockState)
liftIO $ atomically $ modifyTVar' sh (HashMap.insert h s) liftIO $ atomically $ modifyTVar' sh (HashMap.insert h s)
-- FIXME: что-то более обоснованное setBlockHasSize :: MonadIO m => Hash HbSync -> BlockDownloadM e m ()
setBlockHasSize h = do
blk <- fetchBlockState h
liftIO $ atomically $ writeTVar (view bsHasSize blk) True
fetchBlockState :: MonadIO m => Hash HbSync -> BlockDownloadM e m BlockState
fetchBlockState h = do
sh <- asks (view blockState)
liftIO do
now <- getTime MonotonicCoarse
tvlast <- newTVarIO now
tvreq <- newTVarIO 0
tvsz <- newTVarIO False
let defState = BlockState now tvreq tvlast tvsz
atomically $ stateTVar sh $ \hm -> case HashMap.lookup h hm of
Nothing -> (defState, HashMap.insert h defState hm)
Just x -> (x, hm)
banBlock :: (MyPeer e, MonadIO m) => Peer e -> Hash HbSync -> BlockDownloadM e m ()
banBlock p h = do
banned <- asks (view blockBanned)
liftIO $ Cache.insert banned (h,p) ()
isBanned :: (MyPeer e, MonadIO m) => Peer e -> Hash HbSync -> BlockDownloadM e m Bool
isBanned p h = do
banned <- asks (view blockBanned)
liftIO $ Cache.lookup banned (h,p) <&> isJust
delBlockState :: MonadIO m => Hash HbSync -> BlockDownloadM e m ()
delBlockState h = do
sh <- asks (view blockState)
liftIO $ atomically $ modifyTVar sh (HashMap.delete h)
incBlockSizeReqCount :: MonadIO m => Hash HbSync -> BlockDownloadM e m ()
incBlockSizeReqCount h = do
blk <- fetchBlockState h
now <- liftIO $ getTime MonotonicCoarse
seen <- liftIO $ readTVarIO (view bsLastSeen blk)
let elapsed = realToFrac (toNanoSecs (now - seen)) / 1e9
noSize <- liftIO $ readTVarIO (view bsHasSize blk) <&> not
when (elapsed > 1.0 && noSize) do
liftIO $ atomically $ do
writeTVar (view bsLastSeen blk) now
modifyTVar (view bsReqSizeTimes blk) succ
-- FIXME: что-то более обоснованно
calcWaitTime :: MonadIO m => BlockDownloadM e m Double calcWaitTime :: MonadIO m => BlockDownloadM e m Double
calcWaitTime = do calcWaitTime = do
wip <- asks (view blockWip) >>= liftIO . Cache.size wip <- asks (view blockWip) >>= liftIO . Cache.size
@ -190,59 +243,47 @@ calcWaitTime = do
let waiting = 5 + ( (realToFrac (toNanoSeconds defBlockWaitMax) * wipn) / 1e9 ) let waiting = 5 + ( (realToFrac (toNanoSeconds defBlockWaitMax) * wipn) / 1e9 )
pure waiting pure waiting
isBlockHereCached :: forall e m . ( MyPeer e
, MonadIO m
, HasStorage m
)
=> Hash HbSync -> BlockDownloadM e m Bool
touchBlockState :: MonadIO m => Hash HbSync -> BsFSM -> BlockDownloadM e m BlockState isBlockHereCached h = do
touchBlockState h st = do szcache <- asks (view blockStored)
sh <- asks (view blockState) sto <- lift getStorage
t <- liftIO $ getTime MonotonicCoarse
wo <- calcWaitTime
let s = BlockState t 0 st wo cached <- liftIO $ Cache.lookup szcache h
sn <- liftIO $ atomically $ do case cached of
modifyTVar sh (HashMap.alter (doAlter s) h) Just{} -> pure True
readTVar sh <&> fromMaybe s . HashMap.lookup h Nothing -> liftIO do
blk <- hasBlock sto h <&> isJust
when blk $ Cache.insert szcache h ()
pure blk
case view bsState sn of addDownload :: forall e m . ( MyPeer e
Initial -> do , MonadIO m
, HasPeerLocator e (BlockDownloadM e m)
, HasStorage m -- (BlockDownloadM e m)
, Block ByteString ~ ByteString
)
=> Hash HbSync -> BlockDownloadM e m ()
let t0 = view bsStart sn
let dt = realToFrac (toNanoSecs t - toNanoSecs t0) / 1e9
wip <- asks (view blockWip) >>= liftIO . Cache.size
let waiting = view bsWipTo sn
if dt > waiting then do -- FIXME: remove-hardcode
debug $ "pospone block" <+> pretty h <+> pretty dt <+> pretty (showGFloat (Just 2) waiting "")
let sn1 = sn { _bsState = Postpone }
liftIO $ atomically $ modifyTVar sh (HashMap.insert h sn1)
pure sn1
else do
pure sn
_ -> pure sn
where
doAlter s1 = \case
Nothing -> Just s1
Just s -> Just $ over bsTimes succ s
getBlockState :: MonadIO m => Hash HbSync -> BlockDownloadM e m BlockState
getBlockState h = do
sh <- asks (view blockState)
touchBlockState h Initial
addDownload :: MonadIO m => Hash HbSync -> BlockDownloadM e m ()
addDownload h = do addDownload h = do
po <- asks (view peerPostponed)
tinq <- asks (view blockInQ) tinq <- asks (view blockInQ)
doAdd <- do liftIO $ atomically $ stateTVar tinq doAdd <- do liftIO $ atomically $ stateTVar tinq
\hm -> case HashMap.lookup h hm of \hm -> case HashMap.lookup h hm of
Nothing -> (True, HashMap.insert h () hm) Nothing -> (True, HashMap.insert h () hm)
Just{} -> (False, HashMap.insert h () hm) Just{} -> (False, HashMap.insert h () hm)
when doAdd $ do
notPostponed <- liftIO $ readTVarIO po <&> isNothing . HashMap.lookup h
when (doAdd && notPostponed) do
q <- asks (view downloadQ) q <- asks (view downloadQ)
wip <- asks (view blockWip) wip <- asks (view blockWip)
@ -251,16 +292,54 @@ addDownload h = do
atomically $ writeTQueue q h atomically $ writeTQueue q h
Cache.insert wip h () Cache.insert wip h ()
void $ touchBlockState h Initial -- | False -> do -- not hasSize -> do
-- po <- asks (view peerPostponed)
-- liftIO $ atomically $ do
-- modifyTVar po $ HashMap.insert h ()
-- trace $ "postpone block" <+> pretty h <+> pretty brt
-- <+> "here:" <+> pretty (not missed)
-- | otherwise -> do
-- -- TODO: counter-on-this-situation
-- none
returnPostponed :: forall e m . ( MyPeer e
, MonadIO m
, HasStorage m
, HasPeerLocator e (BlockDownloadM e m)
)
=> Hash HbSync -> BlockDownloadM e m ()
returnPostponed h = do
tinq <- asks (view blockInQ)
-- TODO: atomic-operations
delFromPostponed h
delBlockState h
liftIO $ atomically $ modifyTVar' tinq (HashMap.delete h)
addDownload h
delFromPostponed :: MonadIO m => Hash HbSync -> BlockDownloadM e m ()
delFromPostponed h = do
po <- asks (view peerPostponed)
liftIO $ atomically $ do
modifyTVar' po (HashMap.delete h)
removeFromWip :: MonadIO m => Hash HbSync -> BlockDownloadM e m () removeFromWip :: MonadIO m => Hash HbSync -> BlockDownloadM e m ()
removeFromWip h = do removeFromWip h = do
wip <- asks (view blockWip) wip <- asks (view blockWip)
st <- asks (view blockState) st <- asks (view blockState)
po <- asks (view blockPostponed) sz <- asks (view blockPeers)
tinq <- asks (view blockInQ)
po <- asks (view peerPostponed)
liftIO $ Cache.delete wip h liftIO $ Cache.delete wip h
liftIO $ Cache.delete po h liftIO $ atomically $ do
liftIO $ atomically $ modifyTVar' st (HashMap.delete h) modifyTVar' st (HashMap.delete h)
modifyTVar' sz (HashMap.delete h)
modifyTVar' tinq (HashMap.delete h)
modifyTVar' po (HashMap.delete h)
hasPeerThread :: (MyPeer e, MonadIO m) => Peer e -> BlockDownloadM e m Bool hasPeerThread :: (MyPeer e, MonadIO m) => Peer e -> BlockDownloadM e m Bool
hasPeerThread p = do hasPeerThread p = do
@ -284,3 +363,34 @@ newPeerThread p m = do
threads <- asks (view peerThreads) threads <- asks (view peerThreads)
liftIO $ atomically $ modifyTVar threads $ HashMap.insert p pt liftIO $ atomically $ modifyTVar threads $ HashMap.insert p pt
failedDownload :: forall e m . ( MyPeer e
, MonadIO m
, HasPeer e
, HasPeerLocator e (BlockDownloadM e m)
, HasStorage m
)
=> Peer e
-> Hash HbSync
-> BlockDownloadM e m ()
failedDownload p h = do
addDownload h
updateBlockPeerSize :: forall e m . (MyPeer e, MonadIO m)
=> Hash HbSync
-> Peer e
-> Integer
-> BlockDownloadM e m ()
updateBlockPeerSize h p s = do
tv <- asks (view blockPeers)
setBlockHasSize h
let alt = \case
Nothing -> Just $ HashMap.singleton p s
Just hm -> Just $ HashMap.insert p s hm
liftIO $ atomically $ modifyTVar tv (HashMap.alter alt h)