diff --git a/hbs2-peer/app/BlockDownloadNew.hs b/hbs2-peer/app/BlockDownloadNew.hs index 38de17d7..984adc5a 100644 --- a/hbs2-peer/app/BlockDownloadNew.hs +++ b/hbs2-peer/app/BlockDownloadNew.hs @@ -6,19 +6,13 @@ module BlockDownloadNew where import HBS2.Prelude.Plated -import HBS2.Clock import HBS2.OrDie import HBS2.Data.Detect import HBS2.Hash -import HBS2.Merkle import HBS2.Defaults import HBS2.Events import HBS2.Net.Proto.Service import HBS2.Net.Proto.Sessions -import HBS2.Data.Bundle -import HBS2.Data.Types.SignedBox - -import HBS2.Base58 import HBS2.Data.Types.Peer import HBS2.Data.Types.Refs import HBS2.Actors.Peer @@ -29,18 +23,10 @@ import HBS2.Peer.Brains import HBS2.Storage import HBS2.Storage.Operations.Missed import HBS2.Misc.PrettyStuff -import HBS2.Clock -import HBS2.Net.Auth.Schema - -import HBS2.Peer.RPC.Internal.Types -import HBS2.Peer.RPC.API.Peer - -import HBS2.Net.Messaging.TCP import PeerTypes import PeerInfo -import Control.Monad.Trans.Maybe import Control.Monad.Trans.Cont import Control.Concurrent.STM (flushTQueue,retry) import Data.Map qualified as Map @@ -52,9 +38,6 @@ import Data.HashMap.Strict qualified as HM import Data.HashSet (HashSet) import Data.HashSet qualified as HS import Data.IntMap qualified as IntMap -import Data.IntMap (IntMap) -import Data.List.Split qualified as Split -import Data.Text qualified as Text import Data.Either import Data.Maybe import Data.ByteString.Lazy qualified as LBS @@ -66,14 +49,12 @@ import Data.List qualified as L import Data.Coerce import Numeric import UnliftIO -import Control.Concurrent.STM qualified as STM +import Control.Concurrent.STM.TSem (TSem) +import Control.Concurrent.STM.TSem qualified as TSem import UnliftIO.Concurrent -import System.Random +import UnliftIO.STM import Lens.Micro.Platform -import System.Random qualified as R -import System.Random.Shuffle qualified as Shuffle - import Streaming.Prelude qualified as S @@ -570,7 +551,7 @@ downloadDispatcher :: forall e m . ( e ~ L4Proto -> m () downloadDispatcher brains env = flip runContT pure do - pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async ()) ) + pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async (), PeerNonce) ) -- tasks <- newTVarIO ( HPSQ.empty :: HashPSQ (Work e) Double (TVar Int) ) _blkNum <- newTVarIO 0 @@ -635,28 +616,53 @@ downloadDispatcher brains env = flip runContT pure do manageThreads onBlock wip pts = do _pnum <- newTVarIO 1 + _psem <- newTVarIO ( mempty :: HashMap PeerNonce TSem ) + forever $ (>> pause @'Seconds 10) $ withPeerM env do debug "MANAGE THREADS" - peers <- getKnownPeers @e <&> HS.fromList + peers <- HM.fromList <$> do + pips <- getKnownPeers @e <&> HS.fromList + S.toList_ $ for_ pips $ \p -> do + mpd <- lift $ find (KnownPeerKey p) id + maybe1 mpd (pure ()) $ + \PeerData{..} -> S.yield (p, _peerOwnNonce) - for_ peers $ \p -> do + + for_ (HM.toList peers) $ \(p,nonce) -> do here <- readTVarIO pts <&> HM.member p - i <- atomically $ stateTVar _pnum (\i -> (i, succ i)) + + (i,sem) <- atomically do + j <- stateTVar _pnum (\i -> (i, succ i)) + psem <- readTVar _psem + + ssem <- case HM.lookup nonce psem of + Just s -> pure s + Nothing -> do + -- TODO: semaphore-hardcode + new <- TSem.newTSem 2 + modifyTVar _psem (HM.insert nonce new) + pure new + + pure (j,ssem) + unless here do - a <- async (peerThread i onBlock p wip) - atomically $ modifyTVar pts (HM.insert p a) + a <- async (peerThread sem i onBlock p wip) + atomically $ modifyTVar pts (HM.insert p (a,nonce)) loosers <- atomically do xs <- readTVar pts <&> HM.toList -- FIXME: filter-stopped-tasks - let (alive,dead) = L.partition ( \(x,_) -> HS.member x peers ) xs + let (alive,dead) = L.partition ( \(x,_) -> HM.member x peers ) xs writeTVar pts (HM.fromList alive) pure dead - mapM_ cancel (fmap snd loosers) + for_ loosers $ \(p, (a, nonce)) -> do + cancel a + atomically do + modifyTVar _psem (HM.delete nonce) - peerThread me onBlock p wip = flip runContT pure do + peerThread sem me onBlock p wip = flip runContT pure do btimes <- newTVarIO ( mempty :: [Double] ) @@ -704,6 +710,7 @@ downloadDispatcher brains env = flip runContT pure do PChoose -> do what <- atomically do + TSem.waitTSem sem r <- newTVar ( HPSQ.empty @HashRef @Double @DCB ) blocks <- readTVar wip <&> HM.toList let todo = blocks @@ -799,7 +806,7 @@ downloadDispatcher brains env = flip runContT pure do avg <- readTVarIO _avg - when (dtsec > avg) do + when (dtsec > avg * 1.15) do burstMachineAddErrors bm 1 atomically do @@ -818,6 +825,7 @@ downloadDispatcher brains env = flip runContT pure do PReleaseBlock hx dcb done -> do atomically do + TSem.signalTSem sem if not done then do modifyTVar (dcbBusy dcb) pred else do