diff --git a/docs/devlog.md b/docs/devlog.md index f0730fce..77c8c302 100644 --- a/docs/devlog.md +++ b/docs/devlog.md @@ -1,6 +1,12 @@ ## 2023-02-26 +TODO: block-shuffle + Если при добавлении перемешивать блоки, + то есть надежда, что пиры скачают их в разном + порядке и будут помогать друг другу. + Но при этом может оказаться сломан стриминг (когда/если он будет) + TODO: choose-peer-lesser-rtt Выбирать пира с наименьшим RTT при скачивании diff --git a/hbs2-core/lib/HBS2/Defaults.hs b/hbs2-core/lib/HBS2/Defaults.hs index bb9cbbea..e3c3b4e3 100644 --- a/hbs2-core/lib/HBS2/Defaults.hs +++ b/hbs2-core/lib/HBS2/Defaults.hs @@ -60,13 +60,16 @@ defBlockBanTime :: TimeSpec defBlockBanTime = toTimeSpec defBlockBanTimeSec defBlockBanTimeSec :: Timeout 'Seconds -defBlockBanTimeSec = 30 :: Timeout 'Seconds +defBlockBanTimeSec = 60 :: Timeout 'Seconds defBlockWipTimeout :: TimeSpec -defBlockWipTimeout = toTimeSpec defCookieTimeoutSec +defBlockWipTimeout = defBlockSizeCacheTime defBlockInfoTimeout :: Timeout 'Seconds -defBlockInfoTimeout = 2 +defBlockInfoTimeout = 1 + +defBlockInfoTimeoutSpec :: TimeSpec +defBlockInfoTimeoutSpec = toTimeSpec defBlockInfoTimeout -- how much time wait for block from peer? defBlockWaitMax :: Timeout 'Seconds diff --git a/hbs2-core/lib/HBS2/Net/Proto/Definition.hs b/hbs2-core/lib/HBS2/Net/Proto/Definition.hs index 9abce093..676552cd 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Definition.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Definition.hs @@ -56,7 +56,7 @@ instance HasProtocol UDP (BlockInfo UDP) where -- FIXME: requestMinPeriod-breaks-fast-block-download -- - -- requestPeriodLim = ReqLimPerMessage 0.5 + requestPeriodLim = ReqLimPerMessage 1 instance HasProtocol UDP (BlockChunks UDP) where type instance ProtocolId (BlockChunks UDP) = 2 diff --git a/hbs2-peer/app/BlockDownload.hs b/hbs2-peer/app/BlockDownload.hs index 5acedb1a..146b0b8a 100644 --- a/hbs2-peer/app/BlockDownload.hs +++ b/hbs2-peer/app/BlockDownload.hs @@ -31,6 +31,7 @@ import Data.ByteString.Lazy (ByteString) import Data.Cache qualified as Cache import Data.Foldable hiding (find) import Data.HashMap.Strict qualified as HashMap +import Data.HashSet qualified as HashSet import Data.IntMap (IntMap) import Data.IntMap qualified as IntMap import Data.IntSet qualified as IntSet @@ -44,29 +45,24 @@ getBlockForDownload :: MonadIO m => BlockDownloadM e m (Hash HbSync) getBlockForDownload = do q <- asks (view downloadQ) inq <- asks (view blockInQ) - h <- liftIO $ atomically $ readTQueue q - liftIO $ atomically $ modifyTVar inq (HashMap.delete h) - pure h + liftIO $ atomically $ do + h <- readTQueue q + modifyTVar inq (HashMap.delete h) + pure h -withBlockForDownload :: MonadIO m - => (Hash HbSync -> BlockDownloadM e m ()) +withBlockForDownload :: (MonadIO m, MyPeer e, HasStorage m, HasPeerLocator e m) + => Peer e + -> (Hash HbSync -> BlockDownloadM e m ()) -> BlockDownloadM e m () -withBlockForDownload action = do - - cache <- asks (view blockPostponed) - +withBlockForDownload p action = do + -- FIXME: busyloop-e46ad5e0 h <- getBlockForDownload - s <- getBlockState h - - let postpone = toTimeSpec @'Seconds 10 -- FIXME: remove-hardcode - - case view bsState s of - Postpone -> do - debug $ "posponed:" <+> pretty h - liftIO $ Cache.insert' cache (Just postpone) h () - - _ -> action h + banned <- isBanned p h + if banned then do + addDownload h + else do + action h addBlockInfo :: (MonadIO m, MyPeer e) => Peer e @@ -93,6 +89,8 @@ getPeersForBlock h = do processBlock :: forall e m . ( MonadIO m , HasStorage m + , MyPeer e + , HasPeerLocator e (BlockDownloadM e m) , Block ByteString ~ ByteString ) => Hash HbSync @@ -161,7 +159,8 @@ processBlock h = do -- GetBlockSize request -downloadFromWithPeer :: forall e m . DownloadFromPeerStuff e m +downloadFromWithPeer :: forall e m . ( DownloadFromPeerStuff e m + , HasPeerLocator e (BlockDownloadM e m) ) => Peer e -> Integer -> Hash HbSync @@ -467,11 +466,11 @@ blockDownloadLoop env0 = do downFails <- liftIO $ readTVarIO (view peerDownloadFail pinfo) down <- liftIO $ readTVarIO (view peerDownloadedBlk pinfo) useful <- liftIO $ readTVarIO (view peerUsefulness pinfo) - debug $ "peer" <+> pretty p <+> "burst:" <+> pretty burst - <+> "burst-max:" <+> pretty buM - <+> "errors:" <+> pretty (downFails + errors) - <+> "down:" <+> pretty down - <+> "useful:" <+> pretty useful + notice $ "peer" <+> pretty p <+> "burst:" <+> pretty burst + <+> "burst-max:" <+> pretty buM + <+> "errors:" <+> pretty (downFails + errors) + <+> "down:" <+> pretty down + <+> "useful:" <+> pretty useful pure () void $ liftIO $ async $ forever $ withPeerM e $ withDownload env0 do @@ -493,12 +492,14 @@ blockDownloadLoop env0 = do liftIO $ atomically $ writeTVar tinfo alive + po <- asks (view peerPostponed) >>= liftIO . readTVarIO + notice $ "maintain blocks wip" <+> pretty (Set.size aliveWip) + <+> "postponed" + <+> pretty (HashMap.size po) withDownload env0 do - env <- ask - mapM_ processBlock blks fix \next -> do @@ -506,6 +507,57 @@ blockDownloadLoop env0 = do debug "I'm a download loop. I don't do anything anymore" next + +postponedLoop :: forall e m . ( MyPeer e + , Sessions e (KnownPeer e) m + , Request e (BlockInfo e) m + , EventListener e (BlockInfo e) m + , DownloadFromPeerStuff e m + , HasPeerLocator e m + , m ~ PeerM e IO + ) + => DownloadEnv e -> m () +postponedLoop env0 = do + e <- ask + + void $ liftIO $ async $ withPeerM e $ withDownload env0 do + + po <- asks (view peerPostponed) + pl <- getPeerLocator @e + + forever do + + pause @'Seconds 10 + debug "findPosponedLoop" + + ba <- asks (view blockBanned) >>= liftIO . Cache.keys + pipsAll <- knownPeers @e pl <&> HashSet.fromList + + let blk2pip = HashMap.fromListWith (<>) [ (h, HashSet.singleton p) | (h,p) <- ba ] + & HashMap.toList + + for_ blk2pip $ \(h, banned) -> do + let notBanned = HashSet.difference pipsAll banned + when (null notBanned) do + liftIO $ atomically $ modifyTVar po $ HashMap.insert h () + + + void $ liftIO $ async $ withPeerM e $ withDownload env0 do + po <- asks (view peerPostponed) + + forever do + -- FIXME: del-posponed-time-hardcode + pause @'Seconds 60 + debug "postponedLoop" + + back <- liftIO $ atomically $ stateTVar po $ \hm -> + let els = HashMap.toList hm in + -- FIXME: back-from-postponed-size-var + let (x,xs) = List.splitAt 10 els in + (fmap fst x, HashMap.fromList xs) + + for_ back returnPostponed + peerDownloadLoop :: forall e m . ( MyPeer e , Sessions e (KnownPeer e) m , Request e (BlockInfo e) m @@ -516,124 +568,110 @@ peerDownloadLoop :: forall e m . ( MyPeer e ) => Peer e -> BlockDownloadM e m () peerDownloadLoop peer = do - bannedBlocks <- liftIO $ Cache.newCache (Just defBlockBanTime) - sizeCache <- liftIO $ Cache.newCache (Just defBlockSizeCacheTime) - seenBlocks <- liftIO $ newTVarIO mempty + sizeCache <- liftIO $ Cache.newCache @_ @Integer (Just defBlockSizeCacheTime) + noBlock <- liftIO $ Cache.newCache (Just defBlockBanTime) pe <- lift ask e <- ask - let withAllStuff m = withPeerM pe $ withDownload e m + let doBlockSizeRequest h = do + q <- liftIO newTQueueIO + lift do + subscribe @e (BlockSizeEventKey h) $ \case + BlockSizeEvent (p1,_,s) -> do + when (p1 == peer) do + liftIO $ Cache.insert sizeCache h s + liftIO $ atomically $ writeTQueue q (Just s) - forever do + NoBlockEvent{} -> do + -- TODO: ban-block-for-some-seconds + liftIO $ atomically $ writeTQueue q Nothing + pure () - sto <- lift getStorage + request peer (GetBlockSize @e h) - auth <- lift $ find (KnownPeerKey peer) id <&> isJust + liftIO $ race ( pause defBlockInfoTimeout ) + ( atomically $ do + s <- readTQueue q + void $ flushTQueue q + pure s + ) + + let tryDownload pinfo h size = do + + trace $ "tryDownload" <+> pretty peer <+> pretty h + + here <- isBlockHereCached h + + if here then do + trace $ pretty peer <+> "block" <+> pretty h <+> "is already here" + processBlock h + else do + let downFail = view peerDownloadFail pinfo + let downBlk = view peerDownloadedBlk pinfo + + r <- liftIO $ race ( pause defBlockWaitMax ) + $ withPeerM pe + $ withDownload e + $ downloadFromWithPeer peer size h + case r of + Left{} -> do + trace $ "FAIL" <+> pretty peer <+> "download block" <+> pretty h + liftIO $ atomically $ modifyTVar downFail succ + failedDownload peer h + + Right{} -> do + trace $ "OK" <+> pretty peer <+> "dowloaded block" <+> pretty h + processBlock h + liftIO $ atomically do + writeTVar downFail 0 + modifyTVar downBlk succ + + fix \next -> do + + auth' <- lift $ find (KnownPeerKey peer) id pinfo' <- lift $ find (PeerInfoKey peer) id -- (view peerDownloadFail) - maybe1 pinfo' none $ \pinfo -> do + let mbauth = (,) <$> auth' <*> pinfo' - let downFail = view peerDownloadFail pinfo - let downBlk = view peerDownloadedBlk pinfo - failNum <- liftIO $ readTVarIO downFail + maybe1 mbauth none $ \(_,pinfo) -> do - -- FIXME: better-avoiding-busyloop - -- unless notFailed do - -- pause @'Seconds 1 + withBlockForDownload peer $ \h -> do + -- TODO: insert-busyloop-counter-for-block-request + -- trace $ "withBlockForDownload" <+> pretty peer <+> pretty h - when (failNum > 5) do - pause @'Seconds defBlockWaitMax + mbSize <- liftIO $ Cache.lookup sizeCache h + noBlk <- liftIO $ Cache.lookup noBlock h <&> isJust - when auth do + case mbSize of + Just size -> do + trace $ "HAS SIZE:" <+> pretty peer <+> pretty h <+> pretty size + updateBlockPeerSize h peer size + tryDownload pinfo h size - withBlockForDownload $ \h -> do - e <- lift ask - ee <- ask + Nothing | noBlk -> do + trace $ pretty peer <+> "does not have block" <+> pretty h + banBlock peer h + addDownload h - st <- getBlockState h + Nothing -> do + incBlockSizeReqCount h - let alterSeen = \case - Just x -> Just (succ x) - Nothing -> Just 1 + r <- doBlockSizeRequest h - banned <- liftIO $ Cache.lookup bannedBlocks h <&> isJust + case r of + Left{} -> failedDownload peer h - if banned then do - pl <- getPeerLocator @e - ps <- knownPeers @e pl <&> length - let seenTotal = view bsTimes st - - if seenTotal < ps*100 then do - addDownload h - else do - let wa = min defBlockBanTimeSec (realToFrac (ceiling $ Prelude.logBase 10 (realToFrac (2 * seenTotal)))) - void $ liftIO $ async $ withAllStuff (pause wa >> addDownload h) - -- trace $ "block" <+> pretty h <+> "seen" <+> pretty seenTotal <+> "times" <+> parens (pretty wa) - - else do - - liftIO $ atomically $ modifyTVar seenBlocks (HashMap.alter alterSeen h) - - seenTimes <- liftIO $ readTVarIO seenBlocks <&> fromMaybe 0 . HashMap.lookup h - - when ( seenTimes > 100 ) do - trace $ "ban block" <+> pretty h <+> "for a while" <+> parens (pretty seenTimes) - liftIO $ atomically $ modifyTVar seenBlocks (HashMap.delete h) - liftIO $ Cache.insert bannedBlocks h () - - setBlockState h (set bsState Downloading st) - - r1 <- liftIO $ race ( pause defBlockInfoTimeout ) $ withPeerM e do - -- blksq <- liftIO newTQueueIO - - cachedSize' <- liftIO $ Cache.lookup sizeCache h - - case cachedSize' of - Just sz -> pure (Just sz) - Nothing -> do - subscribe @e (BlockSizeEventKey h) $ \case - (BlockSizeEvent (_,_,s)) -> do - -- liftIO $ atomically $ writeTQueue blksq (Just s) - liftIO $ Cache.insert sizeCache h s - - (NoBlockEvent p) -> do - pure () - -- trace $ "NoBlockEvent" <+> pretty p <+> pretty h - -- liftIO $ atomically $ writeTQueue blksq Nothing - - request peer (GetBlockSize @e h) - pure Nothing - - -- liftIO $ atomically $ readTQueue blksq - - case r1 of - Left{} -> do - liftIO $ atomically $ modifyTVar downFail succ + Right Nothing -> do + -- FIXME: non-existent-block-ruins-all + liftIO $ Cache.insert noBlock h () addDownload h - Right Nothing -> do - addDownload h -- this is a legit situation; it is handled above (block ban... etc). + Right (Just s) -> do + updateBlockPeerSize h peer s + tryDownload pinfo h s - Right (Just size) -> do - r2 <- liftIO $ race ( pause defBlockWaitMax ) - $ withPeerM e - $ withDownload ee - $ downloadFromWithPeer peer size h - - case r2 of - Left{} -> do - liftIO $ atomically $ modifyTVar downFail succ - addDownload h - -- FIXME: remove-block-seen-times-hardcode - - Right{} -> do - processBlock h - liftIO $ atomically do - writeTVar downFail 0 - modifyTVar downBlk succ - - pure () + next -- NOTE: this is an adapter for a ResponseM monad -- because response is working in ResponseM monad (ha!) diff --git a/hbs2-peer/app/DownloadQ.hs b/hbs2-peer/app/DownloadQ.hs index 02df5754..6f0a9c18 100644 --- a/hbs2-peer/app/DownloadQ.hs +++ b/hbs2-peer/app/DownloadQ.hs @@ -7,12 +7,14 @@ import HBS2.Hash import HBS2.Events import HBS2.Data.Types.Refs import HBS2.Actors.Peer +import HBS2.Net.PeerLocator import HBS2.Storage import HBS2.Merkle import HBS2.System.Logger.Simple import PeerTypes import PeerConfig +import BlockDownload (processBlock) import Data.Map qualified as Map import Data.Foldable @@ -41,6 +43,8 @@ noLogFile = err "download log not defined" downloadQueue :: forall e m . ( MyPeer e , DownloadFromPeerStuff e m + , HasPeerLocator e (BlockDownloadM e m) + , HasPeerLocator e m , EventListener e (DownloadReq e) m ) => PeerConfig -> DownloadEnv e -> m () @@ -71,7 +75,7 @@ downloadQueue conf denv = do debug $ "downloadQueue" <+> pretty fn - liftIO do + lo <- liftIO do -- FIXME: will-crash-on-big-logs atomically $ waitTSem fsem @@ -103,13 +107,16 @@ downloadQueue conf denv = do let leftovers = [ x | x <- hashesWip , Map.member x loosers ] - for_ leftovers $ withDownload denv . addDownload atomically $ waitTSem fsem catchAny ( B8.writeFile fn ( B8.unlines (fmap (B8.pack.show.pretty) leftovers) ) ) whimper atomically $ signalTSem fsem + pure leftovers + + for_ lo $ withDownload denv . processBlock + debug "downloadQueue okay" -- TODO: remove-downloadQueue-pause-hardcode diff --git a/hbs2-peer/app/PeerConfig.hs b/hbs2-peer/app/PeerConfig.hs index 7ba54e86..1fc86223 100644 --- a/hbs2-peer/app/PeerConfig.hs +++ b/hbs2-peer/app/PeerConfig.hs @@ -25,6 +25,10 @@ import Data.Set (Set) import Data.Text qualified as Text import Text.InterpolatedString.Perl6 (qc) +data FeatureSwitch = + FeatureOn | FeatureOff + deriving (Eq,Ord,Show,Generic) + class HasCfgKey a b where -- type family CfgValue a :: Type key :: Id @@ -154,6 +158,14 @@ instance {-# OVERLAPPABLE #-} (IsString b, HasCfgKey a (Maybe b)) => HasCfgValue | ListVal @C (Key s [LitStrVal e]) <- syn, s == key @a @(Maybe b) ] +instance (HasCfgKey a FeatureSwitch) => HasCfgValue a FeatureSwitch where + cfgValue (PeerConfig syn) = val + where + val = + lastDef FeatureOff + [ FeatureOn + | ListVal @C (Key s [SymbolVal (Id e)]) <- syn, s == key @a @FeatureSwitch, e == "on" + ] instance {-# OVERLAPPABLE #-} (IsString b, HasCfgKey a [b]) => HasCfgValue a [b] where cfgValue (PeerConfig syn) = val diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index bf24b8e5..4bdb5cdb 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -59,6 +59,7 @@ import System.Directory import System.Exit import System.IO import Data.Set (Set) +import GHC.TypeLits defStorageThreads :: Integral a => a defStorageThreads = 4 @@ -79,6 +80,7 @@ data PeerKeyFileKey data PeerBlackListKey data PeerStorageKey data PeerAcceptAnnounceKey +data PeerTraceKey data AcceptAnnounce = AcceptAnnounceAll | AcceptAnnounceFrom (Set (PubKey 'Sign UDP)) @@ -90,6 +92,9 @@ instance Pretty AcceptAnnounce where -- FIXME: better-pretty-for-AcceptAnnounceFrom AcceptAnnounceFrom xs -> parens ("accept-announce" <+> pretty (fmap AsBase58 (Set.toList xs))) +instance HasCfgKey PeerTraceKey FeatureSwitch where + key = "trace" + instance HasCfgKey PeerListenKey (Maybe String) where key = "listen" @@ -354,6 +359,7 @@ runPeer opts = Exception.handle myException $ do let rpcConf = cfgValue @PeerRpcKey conf let keyConf = cfgValue @PeerKeyFileKey conf let storConf = cfgValue @PeerStorageKey conf <&> StoragePrefix + let traceConf = cfgValue @PeerTraceKey conf :: FeatureSwitch let listenSa = view listenOn opts <|> listenConf <|> Just defListenUDP let rpcSa = view listenRpc opts <|> rpcConf <|> Just defRpcUDP @@ -363,6 +369,11 @@ runPeer opts = Exception.handle myException $ do debug $ "storage prefix:" <+> pretty pref + debug $ pretty "trace: " <+> pretty (show traceConf) + + when (traceConf == FeatureOn) do + setLogging @TRACE tracePrefix + let bls = cfgValue @PeerBlackListKey conf :: Set String let blkeys = Set.fromList @@ -528,6 +539,8 @@ runPeer opts = Exception.handle myException $ do peerThread (blockDownloadLoop denv) + peerThread (postponedLoop denv) + peerThread (downloadQueue conf denv) peerThread $ forever $ do diff --git a/hbs2-peer/app/PeerTypes.hs b/hbs2-peer/app/PeerTypes.hs index 85ed392b..ae244b2b 100644 --- a/hbs2-peer/app/PeerTypes.hs +++ b/hbs2-peer/app/PeerTypes.hs @@ -1,6 +1,7 @@ {-# OPTIONS_GHC -fno-warn-orphans #-} {-# Language TemplateHaskell #-} {-# Language UndecidableInstances #-} +{-# Language MultiWayIf #-} module PeerTypes where import HBS2.Actors.Peer @@ -15,6 +16,7 @@ import HBS2.Net.Proto.Definition import HBS2.Net.Proto.Sessions import HBS2.Prelude.Plated import HBS2.Storage +import HBS2.Net.PeerLocator import HBS2.System.Logger.Simple import PeerInfo @@ -34,7 +36,12 @@ import Type.Reflection import Numeric (showGFloat) -type MyPeer e = (Eq (Peer e), Hashable (Peer e), Pretty (Peer e)) +type MyPeer e = ( Eq (Peer e) + , Hashable (Peer e) + , Pretty (Peer e) + , HasPeer e + , Block ByteString ~ ByteString + ) data DownloadReq e @@ -110,16 +117,12 @@ newtype instance SessionKey e (BlockChunks e) = deriving newtype instance Hashable (SessionKey UDP (BlockChunks UDP)) deriving stock instance Eq (SessionKey UDP (BlockChunks UDP)) -data BsFSM = Initial - | Downloading - | Postpone - data BlockState = BlockState - { _bsStart :: TimeSpec - , _bsTimes :: Int - , _bsState :: BsFSM - , _bsWipTo :: Double + { _bsStart :: TimeSpec + , _bsReqSizeTimes :: TVar Int + , _bsLastSeen :: TVar TimeSpec + , _bsHasSize :: TVar Bool } makeLenses 'BlockState @@ -142,9 +145,11 @@ data DownloadEnv e = , _blockPeers :: TVar (HashMap (Hash HbSync) (HashMap (Peer e) Integer) ) , _blockWip :: Cache (Hash HbSync) () , _blockState :: TVar (HashMap (Hash HbSync) BlockState) - , _blockPostponed :: Cache (Hash HbSync) () , _blockInQ :: TVar (HashMap (Hash HbSync) ()) , _peerThreads :: TVar (HashMap (Peer e) (PeerThread e)) + , _peerPostponed :: TVar (HashMap (Hash HbSync) ()) + , _blockStored :: Cache (Hash HbSync) () + , _blockBanned :: Cache (Hash HbSync, Peer e) () } makeLenses 'DownloadEnv @@ -157,9 +162,11 @@ newDownloadEnv = liftIO do <*> newTVarIO mempty <*> Cache.newCache (Just defBlockWipTimeout) <*> newTVarIO mempty - <*> Cache.newCache Nothing <*> newTVarIO mempty <*> newTVarIO mempty + <*> newTVarIO mempty + <*> Cache.newCache (Just defBlockWipTimeout) + <*> Cache.newCache (Just defBlockBanTime) newtype BlockDownloadM e m a = BlockDownloadM { fromBlockDownloadM :: ReaderT (DownloadEnv e) m a } @@ -174,7 +181,7 @@ newtype BlockDownloadM e m a = runDownloadM :: (MyPeer e, MonadIO m) => BlockDownloadM e m a -> m a runDownloadM m = runReaderT ( fromBlockDownloadM m ) =<< newDownloadEnv -withDownload :: (MyPeer e, MonadIO m) => DownloadEnv e -> BlockDownloadM e m a -> m a +withDownload :: (MyPeer e, HasPeerLocator e m, MonadIO m) => DownloadEnv e -> BlockDownloadM e m a -> m a withDownload e m = runReaderT ( fromBlockDownloadM m ) e setBlockState :: MonadIO m => Hash HbSync -> BlockState -> BlockDownloadM e m () @@ -182,7 +189,53 @@ setBlockState h s = do sh <- asks (view blockState) liftIO $ atomically $ modifyTVar' sh (HashMap.insert h s) --- FIXME: что-то более обоснованное +setBlockHasSize :: MonadIO m => Hash HbSync -> BlockDownloadM e m () +setBlockHasSize h = do + blk <- fetchBlockState h + liftIO $ atomically $ writeTVar (view bsHasSize blk) True + +fetchBlockState :: MonadIO m => Hash HbSync -> BlockDownloadM e m BlockState +fetchBlockState h = do + sh <- asks (view blockState) + liftIO do + now <- getTime MonotonicCoarse + tvlast <- newTVarIO now + tvreq <- newTVarIO 0 + tvsz <- newTVarIO False + let defState = BlockState now tvreq tvlast tvsz + atomically $ stateTVar sh $ \hm -> case HashMap.lookup h hm of + Nothing -> (defState, HashMap.insert h defState hm) + Just x -> (x, hm) + +banBlock :: (MyPeer e, MonadIO m) => Peer e -> Hash HbSync -> BlockDownloadM e m () +banBlock p h = do + banned <- asks (view blockBanned) + liftIO $ Cache.insert banned (h,p) () + +isBanned :: (MyPeer e, MonadIO m) => Peer e -> Hash HbSync -> BlockDownloadM e m Bool +isBanned p h = do + banned <- asks (view blockBanned) + liftIO $ Cache.lookup banned (h,p) <&> isJust + +delBlockState :: MonadIO m => Hash HbSync -> BlockDownloadM e m () +delBlockState h = do + sh <- asks (view blockState) + liftIO $ atomically $ modifyTVar sh (HashMap.delete h) + +incBlockSizeReqCount :: MonadIO m => Hash HbSync -> BlockDownloadM e m () +incBlockSizeReqCount h = do + blk <- fetchBlockState h + now <- liftIO $ getTime MonotonicCoarse + seen <- liftIO $ readTVarIO (view bsLastSeen blk) + let elapsed = realToFrac (toNanoSecs (now - seen)) / 1e9 + noSize <- liftIO $ readTVarIO (view bsHasSize blk) <&> not + + when (elapsed > 1.0 && noSize) do + liftIO $ atomically $ do + writeTVar (view bsLastSeen blk) now + modifyTVar (view bsReqSizeTimes blk) succ + +-- FIXME: что-то более обоснованно calcWaitTime :: MonadIO m => BlockDownloadM e m Double calcWaitTime = do wip <- asks (view blockWip) >>= liftIO . Cache.size @@ -190,59 +243,47 @@ calcWaitTime = do let waiting = 5 + ( (realToFrac (toNanoSeconds defBlockWaitMax) * wipn) / 1e9 ) pure waiting +isBlockHereCached :: forall e m . ( MyPeer e + , MonadIO m + , HasStorage m + ) + => Hash HbSync -> BlockDownloadM e m Bool -touchBlockState :: MonadIO m => Hash HbSync -> BsFSM -> BlockDownloadM e m BlockState -touchBlockState h st = do - sh <- asks (view blockState) - t <- liftIO $ getTime MonotonicCoarse - wo <- calcWaitTime +isBlockHereCached h = do + szcache <- asks (view blockStored) + sto <- lift getStorage - let s = BlockState t 0 st wo + cached <- liftIO $ Cache.lookup szcache h - sn <- liftIO $ atomically $ do - modifyTVar sh (HashMap.alter (doAlter s) h) - readTVar sh <&> fromMaybe s . HashMap.lookup h + case cached of + Just{} -> pure True + Nothing -> liftIO do + blk <- hasBlock sto h <&> isJust + when blk $ Cache.insert szcache h () + pure blk - case view bsState sn of - Initial -> do +addDownload :: forall e m . ( MyPeer e + , MonadIO m + , HasPeerLocator e (BlockDownloadM e m) + , HasStorage m -- (BlockDownloadM e m) + , Block ByteString ~ ByteString + ) + => Hash HbSync -> BlockDownloadM e m () - let t0 = view bsStart sn - let dt = realToFrac (toNanoSecs t - toNanoSecs t0) / 1e9 - - wip <- asks (view blockWip) >>= liftIO . Cache.size - - let waiting = view bsWipTo sn - - if dt > waiting then do -- FIXME: remove-hardcode - debug $ "pospone block" <+> pretty h <+> pretty dt <+> pretty (showGFloat (Just 2) waiting "") - let sn1 = sn { _bsState = Postpone } - liftIO $ atomically $ modifyTVar sh (HashMap.insert h sn1) - pure sn1 - else do - pure sn - - _ -> pure sn - - where - doAlter s1 = \case - Nothing -> Just s1 - Just s -> Just $ over bsTimes succ s - -getBlockState :: MonadIO m => Hash HbSync -> BlockDownloadM e m BlockState -getBlockState h = do - sh <- asks (view blockState) - touchBlockState h Initial - -addDownload :: MonadIO m => Hash HbSync -> BlockDownloadM e m () addDownload h = do + po <- asks (view peerPostponed) + tinq <- asks (view blockInQ) doAdd <- do liftIO $ atomically $ stateTVar tinq \hm -> case HashMap.lookup h hm of Nothing -> (True, HashMap.insert h () hm) Just{} -> (False, HashMap.insert h () hm) - when doAdd $ do + + notPostponed <- liftIO $ readTVarIO po <&> isNothing . HashMap.lookup h + + when (doAdd && notPostponed) do q <- asks (view downloadQ) wip <- asks (view blockWip) @@ -251,16 +292,54 @@ addDownload h = do atomically $ writeTQueue q h Cache.insert wip h () - void $ touchBlockState h Initial + -- | False -> do -- not hasSize -> do + + -- po <- asks (view peerPostponed) + -- liftIO $ atomically $ do + -- modifyTVar po $ HashMap.insert h () + + -- trace $ "postpone block" <+> pretty h <+> pretty brt + -- <+> "here:" <+> pretty (not missed) + + -- | otherwise -> do + -- -- TODO: counter-on-this-situation + -- none + +returnPostponed :: forall e m . ( MyPeer e + , MonadIO m + , HasStorage m + , HasPeerLocator e (BlockDownloadM e m) + ) + => Hash HbSync -> BlockDownloadM e m () + +returnPostponed h = do + tinq <- asks (view blockInQ) + -- TODO: atomic-operations + delFromPostponed h + delBlockState h + liftIO $ atomically $ modifyTVar' tinq (HashMap.delete h) + addDownload h + +delFromPostponed :: MonadIO m => Hash HbSync -> BlockDownloadM e m () +delFromPostponed h = do + po <- asks (view peerPostponed) + liftIO $ atomically $ do + modifyTVar' po (HashMap.delete h) removeFromWip :: MonadIO m => Hash HbSync -> BlockDownloadM e m () removeFromWip h = do wip <- asks (view blockWip) st <- asks (view blockState) - po <- asks (view blockPostponed) + sz <- asks (view blockPeers) + tinq <- asks (view blockInQ) + po <- asks (view peerPostponed) + liftIO $ Cache.delete wip h - liftIO $ Cache.delete po h - liftIO $ atomically $ modifyTVar' st (HashMap.delete h) + liftIO $ atomically $ do + modifyTVar' st (HashMap.delete h) + modifyTVar' sz (HashMap.delete h) + modifyTVar' tinq (HashMap.delete h) + modifyTVar' po (HashMap.delete h) hasPeerThread :: (MyPeer e, MonadIO m) => Peer e -> BlockDownloadM e m Bool hasPeerThread p = do @@ -284,3 +363,34 @@ newPeerThread p m = do threads <- asks (view peerThreads) liftIO $ atomically $ modifyTVar threads $ HashMap.insert p pt + +failedDownload :: forall e m . ( MyPeer e + , MonadIO m + , HasPeer e + , HasPeerLocator e (BlockDownloadM e m) + , HasStorage m + ) + => Peer e + -> Hash HbSync + -> BlockDownloadM e m () + +failedDownload p h = do + addDownload h + +updateBlockPeerSize :: forall e m . (MyPeer e, MonadIO m) + => Hash HbSync + -> Peer e + -> Integer + -> BlockDownloadM e m () + +updateBlockPeerSize h p s = do + tv <- asks (view blockPeers) + + setBlockHasSize h + + let alt = \case + Nothing -> Just $ HashMap.singleton p s + Just hm -> Just $ HashMap.insert p s hm + + liftIO $ atomically $ modifyTVar tv (HashMap.alter alt h) +