mirror of https://github.com/voidlizard/hbs2
parent
1f36cc82a0
commit
50775b4f91
|
@ -3,27 +3,41 @@
|
|||
|
||||
Ну а так, базовый PEX заработал
|
||||
|
||||
TODO: Добавлять пиров в KnownPeers
|
||||
только после того, как они
|
||||
пинганулись. Т.е пинговать
|
||||
пиров, если их еще нет.
|
||||
Не добавлять в KnownPeers до
|
||||
того, как ответили на пинг.
|
||||
|
||||
FIXME: Более мудрый алгоритм для pokePostponed
|
||||
|
||||
Сейчас оно слишком часто просыпается и забрасывает блоки в
|
||||
общую очередь, что приводит к busyloop в blockDownloadLoop.
|
||||
Введение HasTimeLimits улучшило ситуацию, но не сильно.
|
||||
Алгоритм должен быть что-то наподобие:
|
||||
|
||||
Посмотрели, сколько раз блок процессировался подряд.
|
||||
Если больше, чем X - то (что?) в общем, выкинули
|
||||
обратно в postponed.
|
||||
|
||||
С другой стороны, может GetBlockSize можно рассылать
|
||||
из pokePostponed, и просыпаться, если пришёл размер
|
||||
нашего блока
|
||||
|
||||
|
||||
TODO: Добавлять пиров в KnownPeers только после того, как они пинганулись.
|
||||
Т.е пинговать пиров, если их еще нет. Не добавлять в KnownPeers до
|
||||
того, как ответили на пинг.
|
||||
|
||||
|
||||
TODO: Научиться убирать дубликаты пиров.
|
||||
Их можно распознать по PeerNonce,
|
||||
но непонятно, какой из пиров
|
||||
оставлять.
|
||||
Иначе это будет реально большая
|
||||
проблема при скачивании.
|
||||
|
||||
Их можно распознать по PeerNonce, но непонятно, какой из пиров
|
||||
оставлять. Иначе это будет реально большая проблема при
|
||||
скачивании.
|
||||
|
||||
|
||||
TODO: Убедиться, что subscribe на перманентное
|
||||
событие НИКОГДА не вызывается в рекурсии.
|
||||
Проверить ВСЕ subscribe.
|
||||
Возможно, вставить проверки в рантайм.
|
||||
Возможно, ограничить число таких событий
|
||||
и ругаться в рантайме.
|
||||
TODO: Убедиться, что subscribe на перманентное событие
|
||||
НИКОГДА не вызывается в рекурсии.
|
||||
|
||||
Проверить ВСЕ subscribe. Возможно, вставить проверки в рантайм.
|
||||
Возможно, ограничить число таких событий и ругаться в рантайме.
|
||||
|
||||
|
||||
FIXME: При вычислении burst надо каким-то образом
|
||||
находить плато и не лезть выше него.
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
{-# OPTIONS_GHC -fno-warn-orphans #-}
|
||||
{-# Language TemplateHaskell #-}
|
||||
{-# Language UndecidableInstances #-}
|
||||
{-# Language FunctionalDependencies #-}
|
||||
|
@ -132,6 +133,7 @@ data PeerEnv e =
|
|||
, _envEvents :: TVar (HashMap SKey [Dynamic])
|
||||
, _envExpireTimes :: Cache SKey ()
|
||||
, _envSweepers :: TVar (HashMap SKey [PeerM e IO ()])
|
||||
, _envReqLimit :: Cache (Peer e, Integer) ()
|
||||
}
|
||||
|
||||
newtype PeerM e m a = PeerM { fromPeerM :: ReaderT (PeerEnv e) m a }
|
||||
|
@ -227,18 +229,44 @@ instance ( MonadIO m
|
|||
se <- asks (view envSessions)
|
||||
liftIO $ Cache.delete se (newSKey @(SessionKey e p) k)
|
||||
|
||||
class HasProtocol e p => HasTimeLimits e p m where
|
||||
withTimeLimit :: Peer e -> m () -> m ()
|
||||
|
||||
instance {-# OVERLAPPABLE #-}
|
||||
(Monad m, HasProtocol e p) => HasTimeLimits e p m where
|
||||
withTimeLimit _ m = m
|
||||
|
||||
instance (MonadIO m, HasProtocol e p)
|
||||
=> HasTimeLimits e p (PeerM e m) where
|
||||
withTimeLimit peer m = case requestMinPeriod @e @p of
|
||||
Nothing -> m
|
||||
Just lim -> do
|
||||
let proto = protoId @e @p (Proxy @p)
|
||||
ex <- asks (view envReqLimit)
|
||||
here <- liftIO $ Cache.lookup ex (peer, proto) <&> isJust
|
||||
unless here $ do
|
||||
liftIO $ Cache.insert' ex (Just (toTimeSpec lim)) (peer, proto) ()
|
||||
m
|
||||
|
||||
instance ( MonadIO m
|
||||
, HasProtocol e p
|
||||
, HasFabriq e m -- (PeerM e m)
|
||||
, HasOwnPeer e m
|
||||
, PeerMessaging e
|
||||
, HasTimeLimits e p m
|
||||
) => Request e p m where
|
||||
request p msg = do
|
||||
let proto = protoId @e @p (Proxy @p)
|
||||
pipe <- getFabriq @e
|
||||
me <- ownPeer @e
|
||||
sendTo pipe (To p) (From me) (AnyMessage @(Encoded e) @e proto (encode msg))
|
||||
|
||||
-- TODO: check if a request were sent to peer and timeout is here
|
||||
-- if not here - than send and create a new timeout
|
||||
--
|
||||
-- TODO: where to store the timeout?
|
||||
-- TODO: where the timeout come from?
|
||||
withTimeLimit @e @p p $ do
|
||||
sendTo pipe (To p) (From me) (AnyMessage @(Encoded e) @e proto (encode msg))
|
||||
|
||||
|
||||
instance ( Typeable (EventHandler e p (PeerM e IO))
|
||||
|
@ -339,6 +367,7 @@ newPeerEnv s bus p = do
|
|||
<*> liftIO (newTVarIO mempty)
|
||||
<*> liftIO (Cache.newCache (Just defCookieTimeout))
|
||||
<*> liftIO (newTVarIO mempty)
|
||||
<*> liftIO (Cache.newCache (Just defCookieTimeout))
|
||||
|
||||
runPeerM :: forall e m . ( MonadIO m
|
||||
, HasPeer e
|
||||
|
@ -392,7 +421,7 @@ runProto hh = do
|
|||
for_ messages $ \(From pip, AnyMessage n msg :: AnyMessage (Encoded e) e) -> do
|
||||
|
||||
case Map.lookup n disp of
|
||||
Nothing -> liftIO $ print "SHIT!" >> pure ()
|
||||
Nothing -> pure () -- FIXME: error counting! and statistics counting feature
|
||||
|
||||
Just (AnyProtocol { protoDecode = decoder
|
||||
, handle = h
|
||||
|
|
|
@ -30,6 +30,7 @@ blockSizeProto :: forall e m . ( MonadIO m
|
|||
blockSizeProto getBlockSize evHasBlock =
|
||||
\case
|
||||
GetBlockSize h -> do
|
||||
-- liftIO $ print "GetBlockSize"
|
||||
deferred (Proxy @(BlockInfo e))$ do
|
||||
getBlockSize h >>= \case
|
||||
Just size -> response (BlockSize @e h size)
|
||||
|
|
|
@ -38,6 +38,7 @@ instance HasProtocol UDP (BlockInfo UDP) where
|
|||
type instance Encoded UDP = ByteString
|
||||
decode = either (const Nothing) Just . deserialiseOrFail
|
||||
encode = serialise
|
||||
requestMinPeriod = Just 5
|
||||
|
||||
instance HasProtocol UDP (BlockChunks UDP) where
|
||||
type instance ProtocolId (BlockChunks UDP) = 2
|
||||
|
|
|
@ -7,6 +7,8 @@ module HBS2.Net.Proto.Types
|
|||
( module HBS2.Net.Proto.Types
|
||||
) where
|
||||
|
||||
import HBS2.Clock
|
||||
|
||||
import Data.Kind
|
||||
import GHC.TypeLits
|
||||
import Data.Proxy
|
||||
|
@ -99,6 +101,8 @@ class (KnownNat (ProtocolId p), HasPeer e) => HasProtocol e p | p -> e where
|
|||
decode :: Encoded e -> Maybe p
|
||||
encode :: p -> Encoded e
|
||||
|
||||
requestMinPeriod :: Maybe (Timeout 'Seconds)
|
||||
requestMinPeriod = Nothing
|
||||
|
||||
-- FIXME: slow and dumb
|
||||
instance {-# OVERLAPPABLE #-} (MonadIO m, Num (Cookie e)) => GenCookie e m where
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
{-# OPTIONS_GHC -fno-warn-orphans #-}
|
||||
{-# Language TemplateHaskell #-}
|
||||
{-# Language UndecidableInstances #-}
|
||||
module BlockDownload where
|
||||
|
||||
|
@ -20,7 +19,9 @@ import HBS2.Prelude.Plated
|
|||
import HBS2.Storage
|
||||
import HBS2.System.Logger.Simple
|
||||
|
||||
import PeerTypes
|
||||
import PeerInfo
|
||||
import PokePostponed
|
||||
|
||||
import Control.Concurrent.Async
|
||||
import Control.Concurrent.STM
|
||||
|
@ -47,179 +48,6 @@ import System.Random.Shuffle
|
|||
import Type.Reflection
|
||||
|
||||
|
||||
calcBursts :: forall a . Integral a => a -> [a] -> [(a,a)]
|
||||
calcBursts bu pieces = go seed
|
||||
where
|
||||
seed = fmap (,1) pieces
|
||||
|
||||
go ( (n1,s1) : (n2,s2) : xs )
|
||||
| (s1 + s2) <= bu = go ((n1, s1+s2) : xs)
|
||||
| otherwise = (n1,s1) : go ( (n2,s2) : xs)
|
||||
|
||||
go [x] = [x]
|
||||
go [] = []
|
||||
|
||||
data BlockDownload =
|
||||
BlockDownload
|
||||
{ _sBlockHash :: Hash HbSync
|
||||
, _sBlockSize :: Size
|
||||
, _sBlockChunkSize :: ChunkSize
|
||||
, _sBlockChunks :: TQueue (ChunkNum, ByteString)
|
||||
}
|
||||
deriving stock (Typeable)
|
||||
|
||||
makeLenses 'BlockDownload
|
||||
|
||||
newBlockDownload :: MonadIO m => Hash HbSync -> m BlockDownload
|
||||
newBlockDownload h = do
|
||||
BlockDownload h 0 0 <$> liftIO newTQueueIO
|
||||
|
||||
|
||||
type instance SessionData e (BlockChunks e) = BlockDownload
|
||||
|
||||
newtype instance SessionKey e (BlockChunks e) =
|
||||
DownloadSessionKey (Peer e, Cookie e)
|
||||
deriving stock (Generic,Typeable)
|
||||
|
||||
data BsFSM = Initial
|
||||
| Downloading
|
||||
| Postpone
|
||||
|
||||
data BlockState =
|
||||
BlockState
|
||||
{ _bsStart :: TimeSpec
|
||||
, _bsTimes :: Int
|
||||
, _bsState :: BsFSM
|
||||
, _bsWipTo :: Double
|
||||
}
|
||||
|
||||
makeLenses 'BlockState
|
||||
|
||||
data DownloadEnv e =
|
||||
DownloadEnv
|
||||
{ _downloadQ :: TQueue (Hash HbSync)
|
||||
, _peerBusy :: TVar (HashMap (Peer e) ())
|
||||
, _blockPeers :: TVar (HashMap (Hash HbSync) (HashMap (Peer e) Integer) )
|
||||
, _blockWip :: Cache (Hash HbSync) ()
|
||||
, _blockState :: TVar (HashMap (Hash HbSync) BlockState)
|
||||
, _blockPostponed :: Cache (Hash HbSync) ()
|
||||
, _blockInQ :: TVar (HashMap (Hash HbSync) ())
|
||||
}
|
||||
|
||||
makeLenses 'DownloadEnv
|
||||
|
||||
class (Eq (Peer e), Hashable (Peer e), Pretty (Peer e)) => MyPeer e
|
||||
instance (Eq (Peer e), Hashable (Peer e), Pretty (Peer e)) => MyPeer e
|
||||
|
||||
newDownloadEnv :: (MonadIO m, MyPeer e) => m (DownloadEnv e)
|
||||
newDownloadEnv = liftIO do
|
||||
DownloadEnv <$> newTQueueIO
|
||||
<*> newTVarIO mempty
|
||||
<*> newTVarIO mempty
|
||||
<*> Cache.newCache (Just defBlockWipTimeout)
|
||||
<*> newTVarIO mempty
|
||||
<*> Cache.newCache Nothing
|
||||
<*> newTVarIO mempty
|
||||
|
||||
newtype BlockDownloadM e m a =
|
||||
BlockDownloadM { fromBlockDownloadM :: ReaderT (DownloadEnv e) m a }
|
||||
deriving newtype ( Functor
|
||||
, Applicative
|
||||
, Monad
|
||||
, MonadIO
|
||||
, MonadReader (DownloadEnv e)
|
||||
, MonadTrans
|
||||
)
|
||||
|
||||
runDownloadM :: (MyPeer e, MonadIO m) => BlockDownloadM e m a -> m a
|
||||
runDownloadM m = runReaderT ( fromBlockDownloadM m ) =<< newDownloadEnv
|
||||
|
||||
withDownload :: (MyPeer e, MonadIO m) => DownloadEnv e -> BlockDownloadM e m a -> m a
|
||||
withDownload e m = runReaderT ( fromBlockDownloadM m ) e
|
||||
|
||||
setBlockState :: MonadIO m => Hash HbSync -> BlockState -> BlockDownloadM e m ()
|
||||
setBlockState h s = do
|
||||
sh <- asks (view blockState)
|
||||
liftIO $ atomically $ modifyTVar' sh (HashMap.insert h s)
|
||||
|
||||
-- FIXME: что-то более обоснованное
|
||||
calcWaitTime :: MonadIO m => BlockDownloadM e m Double
|
||||
calcWaitTime = do
|
||||
wip <- asks (view blockWip) >>= liftIO . Cache.size
|
||||
let wipn = realToFrac wip * 3
|
||||
let waiting = 5 + ( (realToFrac (toNanoSeconds defBlockWaitMax) * wipn) / 1e9 )
|
||||
pure waiting
|
||||
|
||||
touchBlockState :: MonadIO m => Hash HbSync -> BsFSM -> BlockDownloadM e m BlockState
|
||||
touchBlockState h st = do
|
||||
sh <- asks (view blockState)
|
||||
t <- liftIO $ getTime MonotonicCoarse
|
||||
wo <- calcWaitTime
|
||||
|
||||
let s = BlockState t 0 st wo
|
||||
|
||||
sn <- liftIO $ atomically $ do
|
||||
modifyTVar sh (HashMap.alter (doAlter s) h)
|
||||
readTVar sh <&> fromMaybe s . HashMap.lookup h
|
||||
|
||||
case view bsState sn of
|
||||
Initial -> do
|
||||
|
||||
let t0 = view bsStart sn
|
||||
let dt = realToFrac (toNanoSecs t - toNanoSecs t0) / 1e9
|
||||
|
||||
wip <- asks (view blockWip) >>= liftIO . Cache.size
|
||||
|
||||
let waiting = view bsWipTo sn
|
||||
|
||||
if dt > waiting then do -- FIXME: remove-hardcode
|
||||
debug $ "pospone block" <+> pretty h <+> pretty dt <+> pretty (showGFloat (Just 2) waiting "")
|
||||
let sn1 = sn { _bsState = Postpone }
|
||||
liftIO $ atomically $ modifyTVar sh (HashMap.insert h sn1)
|
||||
pure sn1
|
||||
else do
|
||||
pure sn
|
||||
|
||||
_ -> pure sn
|
||||
|
||||
where
|
||||
doAlter s1 = \case
|
||||
Nothing -> Just s1
|
||||
Just s -> Just $ over bsTimes succ s
|
||||
|
||||
getBlockState :: MonadIO m => Hash HbSync -> BlockDownloadM e m BlockState
|
||||
getBlockState h = do
|
||||
sh <- asks (view blockState)
|
||||
touchBlockState h Initial
|
||||
|
||||
addDownload :: MonadIO m => Hash HbSync -> BlockDownloadM e m ()
|
||||
addDownload h = do
|
||||
|
||||
tinq <- asks (view blockInQ)
|
||||
|
||||
doAdd <- liftIO $ atomically $ stateTVar tinq
|
||||
\hm -> case HashMap.lookup h hm of
|
||||
Nothing -> (True, HashMap.insert h () hm)
|
||||
Just{} -> (False, HashMap.insert h () hm)
|
||||
when doAdd $ do
|
||||
|
||||
q <- asks (view downloadQ)
|
||||
wip <- asks (view blockWip)
|
||||
|
||||
liftIO do
|
||||
atomically $ writeTQueue q h
|
||||
Cache.insert wip h ()
|
||||
|
||||
void $ touchBlockState h Initial
|
||||
|
||||
removeFromWip :: MonadIO m => Hash HbSync -> BlockDownloadM e m ()
|
||||
removeFromWip h = do
|
||||
wip <- asks (view blockWip)
|
||||
st <- asks (view blockState)
|
||||
po <- asks (view blockPostponed)
|
||||
liftIO $ Cache.delete wip h
|
||||
liftIO $ Cache.delete po h
|
||||
liftIO $ atomically $ modifyTVar' st (HashMap.delete h)
|
||||
|
||||
withFreePeer :: forall e m .
|
||||
( MyPeer e
|
||||
|
@ -584,7 +412,7 @@ blockDownloadLoop env0 = do
|
|||
pinfo <- fetch True npi (PeerInfoKey p) id
|
||||
updatePeerInfo False pinfo
|
||||
|
||||
void $ liftIO $ async $ withPeerM e $ withDownload env0 (tossPostponed e)
|
||||
void $ liftIO $ async $ withPeerM e $ withDownload env0 (pokePostponed e)
|
||||
|
||||
-- TODO: peer info loop
|
||||
void $ liftIO $ async $ forever $ withPeerM e $ do
|
||||
|
@ -678,69 +506,6 @@ blockDownloadLoop env0 = do
|
|||
next
|
||||
|
||||
|
||||
tossPostponed :: forall e m . ( MonadIO m
|
||||
, EventListener e (PeerHandshake e) m
|
||||
, MyPeer e
|
||||
)
|
||||
=> PeerEnv e
|
||||
-> BlockDownloadM e m ()
|
||||
|
||||
tossPostponed penv = do
|
||||
|
||||
env <- ask
|
||||
|
||||
waitQ <- liftIO $ newTBQueueIO 1
|
||||
|
||||
busy <- liftIO $ newTVarIO False
|
||||
|
||||
cache <- asks (view blockPostponed)
|
||||
|
||||
lift $ subscribe @e AnyKnownPeerEventKey $ \(KnownPeerEvent{}) -> do
|
||||
cant <- liftIO $ readTVarIO busy
|
||||
unless cant $ do
|
||||
debug "AnyKnownPeerEventKey"
|
||||
mt <- liftIO $ atomically $ isEmptyTBQueue waitQ
|
||||
when mt do
|
||||
liftIO $ atomically $ writeTBQueue waitQ ()
|
||||
|
||||
forever do
|
||||
r <- liftIO $ race ( pause @'Seconds 20 ) ( atomically $ readTBQueue waitQ )
|
||||
|
||||
void $ liftIO $ atomically $ flushTBQueue waitQ
|
||||
|
||||
liftIO $ atomically $ writeTVar busy True
|
||||
|
||||
void $ liftIO $ async $ do
|
||||
pause @'Seconds 5
|
||||
atomically $ writeTVar busy False
|
||||
|
||||
let allBack = either (const False) (const True) r
|
||||
|
||||
blks <- liftIO $ Cache.toList cache
|
||||
|
||||
w <- calcWaitTime
|
||||
|
||||
debug $ "tossPostponed" <+> pretty (showGFloat (Just 2) w "")
|
||||
<+> pretty (length blks)
|
||||
|
||||
for_ blks $ \case
|
||||
(k,_,Nothing) | not allBack -> pure ()
|
||||
| otherwise -> pushBack cache k
|
||||
(k,_,Just{}) -> pushBack cache k
|
||||
|
||||
where
|
||||
pushBack cache k = do
|
||||
w <- calcWaitTime
|
||||
liftIO $ Cache.delete cache k
|
||||
st <- getBlockState k
|
||||
t0 <- liftIO $ getTime MonotonicCoarse
|
||||
setBlockState k ( set bsStart t0
|
||||
. set bsState Initial
|
||||
. set bsWipTo w
|
||||
$ st
|
||||
)
|
||||
debug $ "returning block to downloads" <+> pretty k
|
||||
addDownload k
|
||||
|
||||
|
||||
-- NOTE: this is an adapter for a ResponseM monad
|
||||
|
|
|
@ -28,6 +28,7 @@ import HBS2.System.Logger.Simple hiding (info)
|
|||
import HBS2.System.Logger.Simple qualified as Log
|
||||
|
||||
import RPC
|
||||
import PeerTypes
|
||||
import BlockDownload
|
||||
import PeerInfo
|
||||
|
||||
|
@ -82,8 +83,6 @@ data PeerOpts =
|
|||
|
||||
makeLenses 'PeerOpts
|
||||
|
||||
deriving newtype instance Hashable (SessionKey UDP (BlockChunks UDP))
|
||||
deriving stock instance Eq (SessionKey UDP (BlockChunks UDP))
|
||||
|
||||
main :: IO ()
|
||||
main = do
|
||||
|
|
|
@ -0,0 +1,206 @@
|
|||
{-# OPTIONS_GHC -fno-warn-orphans #-}
|
||||
{-# Language TemplateHaskell #-}
|
||||
{-# Language UndecidableInstances #-}
|
||||
module PeerTypes where
|
||||
|
||||
import HBS2.Clock
|
||||
import HBS2.Defaults
|
||||
import HBS2.Hash
|
||||
import HBS2.Net.Proto
|
||||
import HBS2.Net.Proto.BlockInfo
|
||||
import HBS2.Net.Proto.Definition
|
||||
import HBS2.Net.Proto.Sessions
|
||||
import HBS2.Prelude.Plated
|
||||
import HBS2.Storage
|
||||
import HBS2.System.Logger.Simple
|
||||
import HBS2.Net.Messaging.UDP (UDP)
|
||||
|
||||
import Control.Concurrent.STM
|
||||
import Control.Monad.Reader
|
||||
import Data.ByteString.Lazy (ByteString)
|
||||
import Data.Cache (Cache)
|
||||
import Data.Cache qualified as Cache
|
||||
import Data.HashMap.Strict (HashMap)
|
||||
import Data.HashMap.Strict qualified as HashMap
|
||||
import Data.Maybe
|
||||
import Lens.Micro.Platform
|
||||
import Numeric (showGFloat)
|
||||
import Prettyprinter
|
||||
|
||||
calcBursts :: forall a . Integral a => a -> [a] -> [(a,a)]
|
||||
calcBursts bu pieces = go seed
|
||||
where
|
||||
seed = fmap (,1) pieces
|
||||
|
||||
go ( (n1,s1) : (n2,s2) : xs )
|
||||
| (s1 + s2) <= bu = go ((n1, s1+s2) : xs)
|
||||
| otherwise = (n1,s1) : go ( (n2,s2) : xs)
|
||||
|
||||
go [x] = [x]
|
||||
go [] = []
|
||||
|
||||
data BlockDownload =
|
||||
BlockDownload
|
||||
{ _sBlockHash :: Hash HbSync
|
||||
, _sBlockSize :: Size
|
||||
, _sBlockChunkSize :: ChunkSize
|
||||
, _sBlockChunks :: TQueue (ChunkNum, ByteString)
|
||||
}
|
||||
deriving stock (Typeable)
|
||||
|
||||
makeLenses 'BlockDownload
|
||||
|
||||
newBlockDownload :: MonadIO m => Hash HbSync -> m BlockDownload
|
||||
newBlockDownload h = do
|
||||
BlockDownload h 0 0 <$> liftIO newTQueueIO
|
||||
|
||||
|
||||
type instance SessionData e (BlockChunks e) = BlockDownload
|
||||
|
||||
newtype instance SessionKey e (BlockChunks e) =
|
||||
DownloadSessionKey (Peer e, Cookie e)
|
||||
deriving stock (Generic,Typeable)
|
||||
|
||||
deriving newtype instance Hashable (SessionKey UDP (BlockChunks UDP))
|
||||
deriving stock instance Eq (SessionKey UDP (BlockChunks UDP))
|
||||
|
||||
data BsFSM = Initial
|
||||
| Downloading
|
||||
| Postpone
|
||||
|
||||
data BlockState =
|
||||
BlockState
|
||||
{ _bsStart :: TimeSpec
|
||||
, _bsTimes :: Int
|
||||
, _bsState :: BsFSM
|
||||
, _bsWipTo :: Double
|
||||
}
|
||||
|
||||
makeLenses 'BlockState
|
||||
|
||||
data DownloadEnv e =
|
||||
DownloadEnv
|
||||
{ _downloadQ :: TQueue (Hash HbSync)
|
||||
, _peerBusy :: TVar (HashMap (Peer e) ())
|
||||
, _blockPeers :: TVar (HashMap (Hash HbSync) (HashMap (Peer e) Integer) )
|
||||
, _blockWip :: Cache (Hash HbSync) ()
|
||||
, _blockState :: TVar (HashMap (Hash HbSync) BlockState)
|
||||
, _blockPostponed :: Cache (Hash HbSync) ()
|
||||
, _blockInQ :: TVar (HashMap (Hash HbSync) ())
|
||||
}
|
||||
|
||||
makeLenses 'DownloadEnv
|
||||
|
||||
class (Eq (Peer e), Hashable (Peer e), Pretty (Peer e)) => MyPeer e
|
||||
instance (Eq (Peer e), Hashable (Peer e), Pretty (Peer e)) => MyPeer e
|
||||
|
||||
newDownloadEnv :: (MonadIO m, MyPeer e) => m (DownloadEnv e)
|
||||
newDownloadEnv = liftIO do
|
||||
DownloadEnv <$> newTQueueIO
|
||||
<*> newTVarIO mempty
|
||||
<*> newTVarIO mempty
|
||||
<*> Cache.newCache (Just defBlockWipTimeout)
|
||||
<*> newTVarIO mempty
|
||||
<*> Cache.newCache Nothing
|
||||
<*> newTVarIO mempty
|
||||
|
||||
newtype BlockDownloadM e m a =
|
||||
BlockDownloadM { fromBlockDownloadM :: ReaderT (DownloadEnv e) m a }
|
||||
deriving newtype ( Functor
|
||||
, Applicative
|
||||
, Monad
|
||||
, MonadIO
|
||||
, MonadReader (DownloadEnv e)
|
||||
, MonadTrans
|
||||
)
|
||||
|
||||
runDownloadM :: (MyPeer e, MonadIO m) => BlockDownloadM e m a -> m a
|
||||
runDownloadM m = runReaderT ( fromBlockDownloadM m ) =<< newDownloadEnv
|
||||
|
||||
withDownload :: (MyPeer e, MonadIO m) => DownloadEnv e -> BlockDownloadM e m a -> m a
|
||||
withDownload e m = runReaderT ( fromBlockDownloadM m ) e
|
||||
|
||||
setBlockState :: MonadIO m => Hash HbSync -> BlockState -> BlockDownloadM e m ()
|
||||
setBlockState h s = do
|
||||
sh <- asks (view blockState)
|
||||
liftIO $ atomically $ modifyTVar' sh (HashMap.insert h s)
|
||||
|
||||
-- FIXME: что-то более обоснованное
|
||||
calcWaitTime :: MonadIO m => BlockDownloadM e m Double
|
||||
calcWaitTime = do
|
||||
wip <- asks (view blockWip) >>= liftIO . Cache.size
|
||||
let wipn = realToFrac wip * 3
|
||||
let waiting = 5 + ( (realToFrac (toNanoSeconds defBlockWaitMax) * wipn) / 1e9 )
|
||||
pure waiting
|
||||
|
||||
|
||||
touchBlockState :: MonadIO m => Hash HbSync -> BsFSM -> BlockDownloadM e m BlockState
|
||||
touchBlockState h st = do
|
||||
sh <- asks (view blockState)
|
||||
t <- liftIO $ getTime MonotonicCoarse
|
||||
wo <- calcWaitTime
|
||||
|
||||
let s = BlockState t 0 st wo
|
||||
|
||||
sn <- liftIO $ atomically $ do
|
||||
modifyTVar sh (HashMap.alter (doAlter s) h)
|
||||
readTVar sh <&> fromMaybe s . HashMap.lookup h
|
||||
|
||||
case view bsState sn of
|
||||
Initial -> do
|
||||
|
||||
let t0 = view bsStart sn
|
||||
let dt = realToFrac (toNanoSecs t - toNanoSecs t0) / 1e9
|
||||
|
||||
wip <- asks (view blockWip) >>= liftIO . Cache.size
|
||||
|
||||
let waiting = view bsWipTo sn
|
||||
|
||||
if dt > waiting then do -- FIXME: remove-hardcode
|
||||
debug $ "pospone block" <+> pretty h <+> pretty dt <+> pretty (showGFloat (Just 2) waiting "")
|
||||
let sn1 = sn { _bsState = Postpone }
|
||||
liftIO $ atomically $ modifyTVar sh (HashMap.insert h sn1)
|
||||
pure sn1
|
||||
else do
|
||||
pure sn
|
||||
|
||||
_ -> pure sn
|
||||
|
||||
where
|
||||
doAlter s1 = \case
|
||||
Nothing -> Just s1
|
||||
Just s -> Just $ over bsTimes succ s
|
||||
|
||||
getBlockState :: MonadIO m => Hash HbSync -> BlockDownloadM e m BlockState
|
||||
getBlockState h = do
|
||||
sh <- asks (view blockState)
|
||||
touchBlockState h Initial
|
||||
|
||||
addDownload :: MonadIO m => Hash HbSync -> BlockDownloadM e m ()
|
||||
addDownload h = do
|
||||
|
||||
tinq <- asks (view blockInQ)
|
||||
|
||||
doAdd <- liftIO $ atomically $ stateTVar tinq
|
||||
\hm -> case HashMap.lookup h hm of
|
||||
Nothing -> (True, HashMap.insert h () hm)
|
||||
Just{} -> (False, HashMap.insert h () hm)
|
||||
when doAdd $ do
|
||||
|
||||
q <- asks (view downloadQ)
|
||||
wip <- asks (view blockWip)
|
||||
|
||||
liftIO do
|
||||
atomically $ writeTQueue q h
|
||||
Cache.insert wip h ()
|
||||
|
||||
void $ touchBlockState h Initial
|
||||
|
||||
removeFromWip :: MonadIO m => Hash HbSync -> BlockDownloadM e m ()
|
||||
removeFromWip h = do
|
||||
wip <- asks (view blockWip)
|
||||
st <- asks (view blockState)
|
||||
po <- asks (view blockPostponed)
|
||||
liftIO $ Cache.delete wip h
|
||||
liftIO $ Cache.delete po h
|
||||
liftIO $ atomically $ modifyTVar' st (HashMap.delete h)
|
|
@ -0,0 +1,85 @@
|
|||
module PokePostponed where
|
||||
|
||||
import HBS2.Prelude.Plated
|
||||
import HBS2.Clock
|
||||
import HBS2.Actors.Peer
|
||||
import HBS2.Net.Proto.Peer
|
||||
import HBS2.Events
|
||||
|
||||
import HBS2.System.Logger.Simple
|
||||
|
||||
import PeerTypes
|
||||
|
||||
import Data.Foldable (for_)
|
||||
import Control.Concurrent.Async
|
||||
import Control.Concurrent.STM
|
||||
import Control.Monad.Reader
|
||||
import Data.Cache qualified as Cache
|
||||
import Lens.Micro.Platform
|
||||
import Numeric ( showGFloat )
|
||||
import Prettyprinter
|
||||
|
||||
pokePostponed :: forall e m . ( MonadIO m
|
||||
, EventListener e (PeerHandshake e) m
|
||||
, MyPeer e
|
||||
)
|
||||
=> PeerEnv e
|
||||
-> BlockDownloadM e m ()
|
||||
|
||||
pokePostponed penv = do
|
||||
|
||||
env <- ask
|
||||
|
||||
waitQ <- liftIO $ newTBQueueIO 1
|
||||
|
||||
busy <- liftIO $ newTVarIO False
|
||||
|
||||
cache <- asks (view blockPostponed)
|
||||
|
||||
lift $ subscribe @e AnyKnownPeerEventKey $ \(KnownPeerEvent{}) -> do
|
||||
cant <- liftIO $ readTVarIO busy
|
||||
unless cant $ do
|
||||
debug "AnyKnownPeerEventKey"
|
||||
mt <- liftIO $ atomically $ isEmptyTBQueue waitQ
|
||||
when mt do
|
||||
liftIO $ atomically $ writeTBQueue waitQ ()
|
||||
|
||||
forever do
|
||||
-- FIXME: to defaults!
|
||||
r <- liftIO $ race ( pause @'Seconds 60 ) ( atomically $ readTBQueue waitQ )
|
||||
|
||||
void $ liftIO $ atomically $ flushTBQueue waitQ
|
||||
|
||||
liftIO $ atomically $ writeTVar busy True
|
||||
|
||||
void $ liftIO $ async $ do
|
||||
pause @'Seconds 30
|
||||
atomically $ writeTVar busy False
|
||||
|
||||
let allBack = either (const False) (const True) r
|
||||
|
||||
blks <- liftIO $ Cache.toList cache
|
||||
|
||||
w <- calcWaitTime
|
||||
|
||||
debug $ "tossPostponed" <+> pretty (showGFloat (Just 2) w "")
|
||||
<+> pretty (length blks)
|
||||
|
||||
for_ blks $ \case
|
||||
(k,_,Nothing) | not allBack -> pure ()
|
||||
| otherwise -> pushBack cache k
|
||||
(k,_,Just{}) -> pushBack cache k
|
||||
|
||||
where
|
||||
pushBack cache k = do
|
||||
w <- calcWaitTime
|
||||
liftIO $ Cache.delete cache k
|
||||
st <- getBlockState k
|
||||
t0 <- liftIO $ getTime MonotonicCoarse
|
||||
setBlockState k ( set bsStart t0
|
||||
. set bsState Initial
|
||||
. set bsWipTo w
|
||||
$ st
|
||||
)
|
||||
debug $ "returning block to downloads" <+> pretty k
|
||||
addDownload k
|
|
@ -103,7 +103,9 @@ executable hbs2-peer
|
|||
|
||||
other-modules: BlockDownload
|
||||
, PeerInfo
|
||||
, PokePostponed
|
||||
, RPC
|
||||
, PeerTypes
|
||||
|
||||
-- other-extensions:
|
||||
build-depends: base ^>=4.15.1.0
|
||||
|
|
Loading…
Reference in New Issue