diff --git a/docs/devlog.md b/docs/devlog.md index 4970fef7..ce1a1d92 100644 --- a/docs/devlog.md +++ b/docs/devlog.md @@ -3,27 +3,41 @@ Ну а так, базовый PEX заработал -TODO: Добавлять пиров в KnownPeers - только после того, как они - пинганулись. Т.е пинговать - пиров, если их еще нет. - Не добавлять в KnownPeers до - того, как ответили на пинг. + +FIXME: Более мудрый алгоритм для pokePostponed + + Сейчас оно слишком часто просыпается и забрасывает блоки в + общую очередь, что приводит к busyloop в blockDownloadLoop. + Введение HasTimeLimits улучшило ситуацию, но не сильно. + Алгоритм должен быть что-то наподобие: + + Посмотрели, сколько раз блок процессировался подряд. + Если больше, чем X - то (что?) в общем, выкинули + обратно в postponed. + + С другой стороны, может GetBlockSize можно рассылать + из pokePostponed, и просыпаться, если пришёл размер + нашего блока + + +TODO: Добавлять пиров в KnownPeers только после того, как они пинганулись. + Т.е пинговать пиров, если их еще нет. Не добавлять в KnownPeers до + того, как ответили на пинг. + TODO: Научиться убирать дубликаты пиров. - Их можно распознать по PeerNonce, - но непонятно, какой из пиров - оставлять. - Иначе это будет реально большая - проблема при скачивании. + + Их можно распознать по PeerNonce, но непонятно, какой из пиров + оставлять. Иначе это будет реально большая проблема при + скачивании. -TODO: Убедиться, что subscribe на перманентное - событие НИКОГДА не вызывается в рекурсии. - Проверить ВСЕ subscribe. - Возможно, вставить проверки в рантайм. - Возможно, ограничить число таких событий - и ругаться в рантайме. +TODO: Убедиться, что subscribe на перманентное событие + НИКОГДА не вызывается в рекурсии. + + Проверить ВСЕ subscribe. Возможно, вставить проверки в рантайм. + Возможно, ограничить число таких событий и ругаться в рантайме. + FIXME: При вычислении burst надо каким-то образом находить плато и не лезть выше него. diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index fc4d73a0..2bfe9e0c 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -1,3 +1,4 @@ +{-# OPTIONS_GHC -fno-warn-orphans #-} {-# Language TemplateHaskell #-} {-# Language UndecidableInstances #-} {-# Language FunctionalDependencies #-} @@ -132,6 +133,7 @@ data PeerEnv e = , _envEvents :: TVar (HashMap SKey [Dynamic]) , _envExpireTimes :: Cache SKey () , _envSweepers :: TVar (HashMap SKey [PeerM e IO ()]) + , _envReqLimit :: Cache (Peer e, Integer) () } newtype PeerM e m a = PeerM { fromPeerM :: ReaderT (PeerEnv e) m a } @@ -227,18 +229,44 @@ instance ( MonadIO m se <- asks (view envSessions) liftIO $ Cache.delete se (newSKey @(SessionKey e p) k) +class HasProtocol e p => HasTimeLimits e p m where + withTimeLimit :: Peer e -> m () -> m () + +instance {-# OVERLAPPABLE #-} + (Monad m, HasProtocol e p) => HasTimeLimits e p m where + withTimeLimit _ m = m + +instance (MonadIO m, HasProtocol e p) + => HasTimeLimits e p (PeerM e m) where + withTimeLimit peer m = case requestMinPeriod @e @p of + Nothing -> m + Just lim -> do + let proto = protoId @e @p (Proxy @p) + ex <- asks (view envReqLimit) + here <- liftIO $ Cache.lookup ex (peer, proto) <&> isJust + unless here $ do + liftIO $ Cache.insert' ex (Just (toTimeSpec lim)) (peer, proto) () + m instance ( MonadIO m , HasProtocol e p , HasFabriq e m -- (PeerM e m) , HasOwnPeer e m , PeerMessaging e + , HasTimeLimits e p m ) => Request e p m where request p msg = do let proto = protoId @e @p (Proxy @p) pipe <- getFabriq @e me <- ownPeer @e - sendTo pipe (To p) (From me) (AnyMessage @(Encoded e) @e proto (encode msg)) + + -- TODO: check if a request were sent to peer and timeout is here + -- if not here - than send and create a new timeout + -- + -- TODO: where to store the timeout? + -- TODO: where the timeout come from? + withTimeLimit @e @p p $ do + sendTo pipe (To p) (From me) (AnyMessage @(Encoded e) @e proto (encode msg)) instance ( Typeable (EventHandler e p (PeerM e IO)) @@ -339,6 +367,7 @@ newPeerEnv s bus p = do <*> liftIO (newTVarIO mempty) <*> liftIO (Cache.newCache (Just defCookieTimeout)) <*> liftIO (newTVarIO mempty) + <*> liftIO (Cache.newCache (Just defCookieTimeout)) runPeerM :: forall e m . ( MonadIO m , HasPeer e @@ -392,7 +421,7 @@ runProto hh = do for_ messages $ \(From pip, AnyMessage n msg :: AnyMessage (Encoded e) e) -> do case Map.lookup n disp of - Nothing -> liftIO $ print "SHIT!" >> pure () + Nothing -> pure () -- FIXME: error counting! and statistics counting feature Just (AnyProtocol { protoDecode = decoder , handle = h diff --git a/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs b/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs index 6111f77e..0c83e05e 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs @@ -30,6 +30,7 @@ blockSizeProto :: forall e m . ( MonadIO m blockSizeProto getBlockSize evHasBlock = \case GetBlockSize h -> do + -- liftIO $ print "GetBlockSize" deferred (Proxy @(BlockInfo e))$ do getBlockSize h >>= \case Just size -> response (BlockSize @e h size) diff --git a/hbs2-core/lib/HBS2/Net/Proto/Definition.hs b/hbs2-core/lib/HBS2/Net/Proto/Definition.hs index 952f8b05..aac1bf41 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Definition.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Definition.hs @@ -38,6 +38,7 @@ instance HasProtocol UDP (BlockInfo UDP) where type instance Encoded UDP = ByteString decode = either (const Nothing) Just . deserialiseOrFail encode = serialise + requestMinPeriod = Just 5 instance HasProtocol UDP (BlockChunks UDP) where type instance ProtocolId (BlockChunks UDP) = 2 diff --git a/hbs2-core/lib/HBS2/Net/Proto/Types.hs b/hbs2-core/lib/HBS2/Net/Proto/Types.hs index 9b624d1f..d0e8ae10 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Types.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Types.hs @@ -7,6 +7,8 @@ module HBS2.Net.Proto.Types ( module HBS2.Net.Proto.Types ) where +import HBS2.Clock + import Data.Kind import GHC.TypeLits import Data.Proxy @@ -99,6 +101,8 @@ class (KnownNat (ProtocolId p), HasPeer e) => HasProtocol e p | p -> e where decode :: Encoded e -> Maybe p encode :: p -> Encoded e + requestMinPeriod :: Maybe (Timeout 'Seconds) + requestMinPeriod = Nothing -- FIXME: slow and dumb instance {-# OVERLAPPABLE #-} (MonadIO m, Num (Cookie e)) => GenCookie e m where diff --git a/hbs2-peer/app/BlockDownload.hs b/hbs2-peer/app/BlockDownload.hs index 07d90417..269c70ac 100644 --- a/hbs2-peer/app/BlockDownload.hs +++ b/hbs2-peer/app/BlockDownload.hs @@ -1,5 +1,4 @@ {-# OPTIONS_GHC -fno-warn-orphans #-} -{-# Language TemplateHaskell #-} {-# Language UndecidableInstances #-} module BlockDownload where @@ -20,7 +19,9 @@ import HBS2.Prelude.Plated import HBS2.Storage import HBS2.System.Logger.Simple +import PeerTypes import PeerInfo +import PokePostponed import Control.Concurrent.Async import Control.Concurrent.STM @@ -47,179 +48,6 @@ import System.Random.Shuffle import Type.Reflection -calcBursts :: forall a . Integral a => a -> [a] -> [(a,a)] -calcBursts bu pieces = go seed - where - seed = fmap (,1) pieces - - go ( (n1,s1) : (n2,s2) : xs ) - | (s1 + s2) <= bu = go ((n1, s1+s2) : xs) - | otherwise = (n1,s1) : go ( (n2,s2) : xs) - - go [x] = [x] - go [] = [] - -data BlockDownload = - BlockDownload - { _sBlockHash :: Hash HbSync - , _sBlockSize :: Size - , _sBlockChunkSize :: ChunkSize - , _sBlockChunks :: TQueue (ChunkNum, ByteString) - } - deriving stock (Typeable) - -makeLenses 'BlockDownload - -newBlockDownload :: MonadIO m => Hash HbSync -> m BlockDownload -newBlockDownload h = do - BlockDownload h 0 0 <$> liftIO newTQueueIO - - -type instance SessionData e (BlockChunks e) = BlockDownload - -newtype instance SessionKey e (BlockChunks e) = - DownloadSessionKey (Peer e, Cookie e) - deriving stock (Generic,Typeable) - -data BsFSM = Initial - | Downloading - | Postpone - -data BlockState = - BlockState - { _bsStart :: TimeSpec - , _bsTimes :: Int - , _bsState :: BsFSM - , _bsWipTo :: Double - } - -makeLenses 'BlockState - -data DownloadEnv e = - DownloadEnv - { _downloadQ :: TQueue (Hash HbSync) - , _peerBusy :: TVar (HashMap (Peer e) ()) - , _blockPeers :: TVar (HashMap (Hash HbSync) (HashMap (Peer e) Integer) ) - , _blockWip :: Cache (Hash HbSync) () - , _blockState :: TVar (HashMap (Hash HbSync) BlockState) - , _blockPostponed :: Cache (Hash HbSync) () - , _blockInQ :: TVar (HashMap (Hash HbSync) ()) - } - -makeLenses 'DownloadEnv - -class (Eq (Peer e), Hashable (Peer e), Pretty (Peer e)) => MyPeer e -instance (Eq (Peer e), Hashable (Peer e), Pretty (Peer e)) => MyPeer e - -newDownloadEnv :: (MonadIO m, MyPeer e) => m (DownloadEnv e) -newDownloadEnv = liftIO do - DownloadEnv <$> newTQueueIO - <*> newTVarIO mempty - <*> newTVarIO mempty - <*> Cache.newCache (Just defBlockWipTimeout) - <*> newTVarIO mempty - <*> Cache.newCache Nothing - <*> newTVarIO mempty - -newtype BlockDownloadM e m a = - BlockDownloadM { fromBlockDownloadM :: ReaderT (DownloadEnv e) m a } - deriving newtype ( Functor - , Applicative - , Monad - , MonadIO - , MonadReader (DownloadEnv e) - , MonadTrans - ) - -runDownloadM :: (MyPeer e, MonadIO m) => BlockDownloadM e m a -> m a -runDownloadM m = runReaderT ( fromBlockDownloadM m ) =<< newDownloadEnv - -withDownload :: (MyPeer e, MonadIO m) => DownloadEnv e -> BlockDownloadM e m a -> m a -withDownload e m = runReaderT ( fromBlockDownloadM m ) e - -setBlockState :: MonadIO m => Hash HbSync -> BlockState -> BlockDownloadM e m () -setBlockState h s = do - sh <- asks (view blockState) - liftIO $ atomically $ modifyTVar' sh (HashMap.insert h s) - --- FIXME: что-то более обоснованное -calcWaitTime :: MonadIO m => BlockDownloadM e m Double -calcWaitTime = do - wip <- asks (view blockWip) >>= liftIO . Cache.size - let wipn = realToFrac wip * 3 - let waiting = 5 + ( (realToFrac (toNanoSeconds defBlockWaitMax) * wipn) / 1e9 ) - pure waiting - -touchBlockState :: MonadIO m => Hash HbSync -> BsFSM -> BlockDownloadM e m BlockState -touchBlockState h st = do - sh <- asks (view blockState) - t <- liftIO $ getTime MonotonicCoarse - wo <- calcWaitTime - - let s = BlockState t 0 st wo - - sn <- liftIO $ atomically $ do - modifyTVar sh (HashMap.alter (doAlter s) h) - readTVar sh <&> fromMaybe s . HashMap.lookup h - - case view bsState sn of - Initial -> do - - let t0 = view bsStart sn - let dt = realToFrac (toNanoSecs t - toNanoSecs t0) / 1e9 - - wip <- asks (view blockWip) >>= liftIO . Cache.size - - let waiting = view bsWipTo sn - - if dt > waiting then do -- FIXME: remove-hardcode - debug $ "pospone block" <+> pretty h <+> pretty dt <+> pretty (showGFloat (Just 2) waiting "") - let sn1 = sn { _bsState = Postpone } - liftIO $ atomically $ modifyTVar sh (HashMap.insert h sn1) - pure sn1 - else do - pure sn - - _ -> pure sn - - where - doAlter s1 = \case - Nothing -> Just s1 - Just s -> Just $ over bsTimes succ s - -getBlockState :: MonadIO m => Hash HbSync -> BlockDownloadM e m BlockState -getBlockState h = do - sh <- asks (view blockState) - touchBlockState h Initial - -addDownload :: MonadIO m => Hash HbSync -> BlockDownloadM e m () -addDownload h = do - - tinq <- asks (view blockInQ) - - doAdd <- liftIO $ atomically $ stateTVar tinq - \hm -> case HashMap.lookup h hm of - Nothing -> (True, HashMap.insert h () hm) - Just{} -> (False, HashMap.insert h () hm) - when doAdd $ do - - q <- asks (view downloadQ) - wip <- asks (view blockWip) - - liftIO do - atomically $ writeTQueue q h - Cache.insert wip h () - - void $ touchBlockState h Initial - -removeFromWip :: MonadIO m => Hash HbSync -> BlockDownloadM e m () -removeFromWip h = do - wip <- asks (view blockWip) - st <- asks (view blockState) - po <- asks (view blockPostponed) - liftIO $ Cache.delete wip h - liftIO $ Cache.delete po h - liftIO $ atomically $ modifyTVar' st (HashMap.delete h) withFreePeer :: forall e m . ( MyPeer e @@ -584,7 +412,7 @@ blockDownloadLoop env0 = do pinfo <- fetch True npi (PeerInfoKey p) id updatePeerInfo False pinfo - void $ liftIO $ async $ withPeerM e $ withDownload env0 (tossPostponed e) + void $ liftIO $ async $ withPeerM e $ withDownload env0 (pokePostponed e) -- TODO: peer info loop void $ liftIO $ async $ forever $ withPeerM e $ do @@ -678,69 +506,6 @@ blockDownloadLoop env0 = do next -tossPostponed :: forall e m . ( MonadIO m - , EventListener e (PeerHandshake e) m - , MyPeer e - ) - => PeerEnv e - -> BlockDownloadM e m () - -tossPostponed penv = do - - env <- ask - - waitQ <- liftIO $ newTBQueueIO 1 - - busy <- liftIO $ newTVarIO False - - cache <- asks (view blockPostponed) - - lift $ subscribe @e AnyKnownPeerEventKey $ \(KnownPeerEvent{}) -> do - cant <- liftIO $ readTVarIO busy - unless cant $ do - debug "AnyKnownPeerEventKey" - mt <- liftIO $ atomically $ isEmptyTBQueue waitQ - when mt do - liftIO $ atomically $ writeTBQueue waitQ () - - forever do - r <- liftIO $ race ( pause @'Seconds 20 ) ( atomically $ readTBQueue waitQ ) - - void $ liftIO $ atomically $ flushTBQueue waitQ - - liftIO $ atomically $ writeTVar busy True - - void $ liftIO $ async $ do - pause @'Seconds 5 - atomically $ writeTVar busy False - - let allBack = either (const False) (const True) r - - blks <- liftIO $ Cache.toList cache - - w <- calcWaitTime - - debug $ "tossPostponed" <+> pretty (showGFloat (Just 2) w "") - <+> pretty (length blks) - - for_ blks $ \case - (k,_,Nothing) | not allBack -> pure () - | otherwise -> pushBack cache k - (k,_,Just{}) -> pushBack cache k - - where - pushBack cache k = do - w <- calcWaitTime - liftIO $ Cache.delete cache k - st <- getBlockState k - t0 <- liftIO $ getTime MonotonicCoarse - setBlockState k ( set bsStart t0 - . set bsState Initial - . set bsWipTo w - $ st - ) - debug $ "returning block to downloads" <+> pretty k - addDownload k -- NOTE: this is an adapter for a ResponseM monad diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 721d3995..86a83bc2 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -28,6 +28,7 @@ import HBS2.System.Logger.Simple hiding (info) import HBS2.System.Logger.Simple qualified as Log import RPC +import PeerTypes import BlockDownload import PeerInfo @@ -82,8 +83,6 @@ data PeerOpts = makeLenses 'PeerOpts -deriving newtype instance Hashable (SessionKey UDP (BlockChunks UDP)) -deriving stock instance Eq (SessionKey UDP (BlockChunks UDP)) main :: IO () main = do diff --git a/hbs2-peer/app/PeerTypes.hs b/hbs2-peer/app/PeerTypes.hs new file mode 100644 index 00000000..32ba7c19 --- /dev/null +++ b/hbs2-peer/app/PeerTypes.hs @@ -0,0 +1,206 @@ +{-# OPTIONS_GHC -fno-warn-orphans #-} +{-# Language TemplateHaskell #-} +{-# Language UndecidableInstances #-} +module PeerTypes where + +import HBS2.Clock +import HBS2.Defaults +import HBS2.Hash +import HBS2.Net.Proto +import HBS2.Net.Proto.BlockInfo +import HBS2.Net.Proto.Definition +import HBS2.Net.Proto.Sessions +import HBS2.Prelude.Plated +import HBS2.Storage +import HBS2.System.Logger.Simple +import HBS2.Net.Messaging.UDP (UDP) + +import Control.Concurrent.STM +import Control.Monad.Reader +import Data.ByteString.Lazy (ByteString) +import Data.Cache (Cache) +import Data.Cache qualified as Cache +import Data.HashMap.Strict (HashMap) +import Data.HashMap.Strict qualified as HashMap +import Data.Maybe +import Lens.Micro.Platform +import Numeric (showGFloat) +import Prettyprinter + +calcBursts :: forall a . Integral a => a -> [a] -> [(a,a)] +calcBursts bu pieces = go seed + where + seed = fmap (,1) pieces + + go ( (n1,s1) : (n2,s2) : xs ) + | (s1 + s2) <= bu = go ((n1, s1+s2) : xs) + | otherwise = (n1,s1) : go ( (n2,s2) : xs) + + go [x] = [x] + go [] = [] + +data BlockDownload = + BlockDownload + { _sBlockHash :: Hash HbSync + , _sBlockSize :: Size + , _sBlockChunkSize :: ChunkSize + , _sBlockChunks :: TQueue (ChunkNum, ByteString) + } + deriving stock (Typeable) + +makeLenses 'BlockDownload + +newBlockDownload :: MonadIO m => Hash HbSync -> m BlockDownload +newBlockDownload h = do + BlockDownload h 0 0 <$> liftIO newTQueueIO + + +type instance SessionData e (BlockChunks e) = BlockDownload + +newtype instance SessionKey e (BlockChunks e) = + DownloadSessionKey (Peer e, Cookie e) + deriving stock (Generic,Typeable) + +deriving newtype instance Hashable (SessionKey UDP (BlockChunks UDP)) +deriving stock instance Eq (SessionKey UDP (BlockChunks UDP)) + +data BsFSM = Initial + | Downloading + | Postpone + +data BlockState = + BlockState + { _bsStart :: TimeSpec + , _bsTimes :: Int + , _bsState :: BsFSM + , _bsWipTo :: Double + } + +makeLenses 'BlockState + +data DownloadEnv e = + DownloadEnv + { _downloadQ :: TQueue (Hash HbSync) + , _peerBusy :: TVar (HashMap (Peer e) ()) + , _blockPeers :: TVar (HashMap (Hash HbSync) (HashMap (Peer e) Integer) ) + , _blockWip :: Cache (Hash HbSync) () + , _blockState :: TVar (HashMap (Hash HbSync) BlockState) + , _blockPostponed :: Cache (Hash HbSync) () + , _blockInQ :: TVar (HashMap (Hash HbSync) ()) + } + +makeLenses 'DownloadEnv + +class (Eq (Peer e), Hashable (Peer e), Pretty (Peer e)) => MyPeer e +instance (Eq (Peer e), Hashable (Peer e), Pretty (Peer e)) => MyPeer e + +newDownloadEnv :: (MonadIO m, MyPeer e) => m (DownloadEnv e) +newDownloadEnv = liftIO do + DownloadEnv <$> newTQueueIO + <*> newTVarIO mempty + <*> newTVarIO mempty + <*> Cache.newCache (Just defBlockWipTimeout) + <*> newTVarIO mempty + <*> Cache.newCache Nothing + <*> newTVarIO mempty + +newtype BlockDownloadM e m a = + BlockDownloadM { fromBlockDownloadM :: ReaderT (DownloadEnv e) m a } + deriving newtype ( Functor + , Applicative + , Monad + , MonadIO + , MonadReader (DownloadEnv e) + , MonadTrans + ) + +runDownloadM :: (MyPeer e, MonadIO m) => BlockDownloadM e m a -> m a +runDownloadM m = runReaderT ( fromBlockDownloadM m ) =<< newDownloadEnv + +withDownload :: (MyPeer e, MonadIO m) => DownloadEnv e -> BlockDownloadM e m a -> m a +withDownload e m = runReaderT ( fromBlockDownloadM m ) e + +setBlockState :: MonadIO m => Hash HbSync -> BlockState -> BlockDownloadM e m () +setBlockState h s = do + sh <- asks (view blockState) + liftIO $ atomically $ modifyTVar' sh (HashMap.insert h s) + +-- FIXME: что-то более обоснованное +calcWaitTime :: MonadIO m => BlockDownloadM e m Double +calcWaitTime = do + wip <- asks (view blockWip) >>= liftIO . Cache.size + let wipn = realToFrac wip * 3 + let waiting = 5 + ( (realToFrac (toNanoSeconds defBlockWaitMax) * wipn) / 1e9 ) + pure waiting + + +touchBlockState :: MonadIO m => Hash HbSync -> BsFSM -> BlockDownloadM e m BlockState +touchBlockState h st = do + sh <- asks (view blockState) + t <- liftIO $ getTime MonotonicCoarse + wo <- calcWaitTime + + let s = BlockState t 0 st wo + + sn <- liftIO $ atomically $ do + modifyTVar sh (HashMap.alter (doAlter s) h) + readTVar sh <&> fromMaybe s . HashMap.lookup h + + case view bsState sn of + Initial -> do + + let t0 = view bsStart sn + let dt = realToFrac (toNanoSecs t - toNanoSecs t0) / 1e9 + + wip <- asks (view blockWip) >>= liftIO . Cache.size + + let waiting = view bsWipTo sn + + if dt > waiting then do -- FIXME: remove-hardcode + debug $ "pospone block" <+> pretty h <+> pretty dt <+> pretty (showGFloat (Just 2) waiting "") + let sn1 = sn { _bsState = Postpone } + liftIO $ atomically $ modifyTVar sh (HashMap.insert h sn1) + pure sn1 + else do + pure sn + + _ -> pure sn + + where + doAlter s1 = \case + Nothing -> Just s1 + Just s -> Just $ over bsTimes succ s + +getBlockState :: MonadIO m => Hash HbSync -> BlockDownloadM e m BlockState +getBlockState h = do + sh <- asks (view blockState) + touchBlockState h Initial + +addDownload :: MonadIO m => Hash HbSync -> BlockDownloadM e m () +addDownload h = do + + tinq <- asks (view blockInQ) + + doAdd <- liftIO $ atomically $ stateTVar tinq + \hm -> case HashMap.lookup h hm of + Nothing -> (True, HashMap.insert h () hm) + Just{} -> (False, HashMap.insert h () hm) + when doAdd $ do + + q <- asks (view downloadQ) + wip <- asks (view blockWip) + + liftIO do + atomically $ writeTQueue q h + Cache.insert wip h () + + void $ touchBlockState h Initial + +removeFromWip :: MonadIO m => Hash HbSync -> BlockDownloadM e m () +removeFromWip h = do + wip <- asks (view blockWip) + st <- asks (view blockState) + po <- asks (view blockPostponed) + liftIO $ Cache.delete wip h + liftIO $ Cache.delete po h + liftIO $ atomically $ modifyTVar' st (HashMap.delete h) diff --git a/hbs2-peer/app/PokePostponed.hs b/hbs2-peer/app/PokePostponed.hs new file mode 100644 index 00000000..945229ce --- /dev/null +++ b/hbs2-peer/app/PokePostponed.hs @@ -0,0 +1,85 @@ +module PokePostponed where + +import HBS2.Prelude.Plated +import HBS2.Clock +import HBS2.Actors.Peer +import HBS2.Net.Proto.Peer +import HBS2.Events + +import HBS2.System.Logger.Simple + +import PeerTypes + +import Data.Foldable (for_) +import Control.Concurrent.Async +import Control.Concurrent.STM +import Control.Monad.Reader +import Data.Cache qualified as Cache +import Lens.Micro.Platform +import Numeric ( showGFloat ) +import Prettyprinter + +pokePostponed :: forall e m . ( MonadIO m + , EventListener e (PeerHandshake e) m + , MyPeer e + ) + => PeerEnv e + -> BlockDownloadM e m () + +pokePostponed penv = do + + env <- ask + + waitQ <- liftIO $ newTBQueueIO 1 + + busy <- liftIO $ newTVarIO False + + cache <- asks (view blockPostponed) + + lift $ subscribe @e AnyKnownPeerEventKey $ \(KnownPeerEvent{}) -> do + cant <- liftIO $ readTVarIO busy + unless cant $ do + debug "AnyKnownPeerEventKey" + mt <- liftIO $ atomically $ isEmptyTBQueue waitQ + when mt do + liftIO $ atomically $ writeTBQueue waitQ () + + forever do + -- FIXME: to defaults! + r <- liftIO $ race ( pause @'Seconds 60 ) ( atomically $ readTBQueue waitQ ) + + void $ liftIO $ atomically $ flushTBQueue waitQ + + liftIO $ atomically $ writeTVar busy True + + void $ liftIO $ async $ do + pause @'Seconds 30 + atomically $ writeTVar busy False + + let allBack = either (const False) (const True) r + + blks <- liftIO $ Cache.toList cache + + w <- calcWaitTime + + debug $ "tossPostponed" <+> pretty (showGFloat (Just 2) w "") + <+> pretty (length blks) + + for_ blks $ \case + (k,_,Nothing) | not allBack -> pure () + | otherwise -> pushBack cache k + (k,_,Just{}) -> pushBack cache k + + where + pushBack cache k = do + w <- calcWaitTime + liftIO $ Cache.delete cache k + st <- getBlockState k + t0 <- liftIO $ getTime MonotonicCoarse + setBlockState k ( set bsStart t0 + . set bsState Initial + . set bsWipTo w + $ st + ) + debug $ "returning block to downloads" <+> pretty k + addDownload k diff --git a/hbs2-peer/hbs2-peer.cabal b/hbs2-peer/hbs2-peer.cabal index 20badc1b..2d4febbf 100644 --- a/hbs2-peer/hbs2-peer.cabal +++ b/hbs2-peer/hbs2-peer.cabal @@ -103,7 +103,9 @@ executable hbs2-peer other-modules: BlockDownload , PeerInfo + , PokePostponed , RPC + , PeerTypes -- other-extensions: build-depends: base ^>=4.15.1.0