This commit is contained in:
voidlizard 2024-11-14 07:07:23 +03:00
parent 772f529c4c
commit 3653eff6b0
1 changed files with 40 additions and 32 deletions

View File

@ -6,19 +6,13 @@
module BlockDownloadNew where
import HBS2.Prelude.Plated
import HBS2.Clock
import HBS2.OrDie
import HBS2.Data.Detect
import HBS2.Hash
import HBS2.Merkle
import HBS2.Defaults
import HBS2.Events
import HBS2.Net.Proto.Service
import HBS2.Net.Proto.Sessions
import HBS2.Data.Bundle
import HBS2.Data.Types.SignedBox
import HBS2.Base58
import HBS2.Data.Types.Peer
import HBS2.Data.Types.Refs
import HBS2.Actors.Peer
@ -29,18 +23,10 @@ import HBS2.Peer.Brains
import HBS2.Storage
import HBS2.Storage.Operations.Missed
import HBS2.Misc.PrettyStuff
import HBS2.Clock
import HBS2.Net.Auth.Schema
import HBS2.Peer.RPC.Internal.Types
import HBS2.Peer.RPC.API.Peer
import HBS2.Net.Messaging.TCP
import PeerTypes
import PeerInfo
import Control.Monad.Trans.Maybe
import Control.Monad.Trans.Cont
import Control.Concurrent.STM (flushTQueue,retry)
import Data.Map qualified as Map
@ -52,9 +38,6 @@ import Data.HashMap.Strict qualified as HM
import Data.HashSet (HashSet)
import Data.HashSet qualified as HS
import Data.IntMap qualified as IntMap
import Data.IntMap (IntMap)
import Data.List.Split qualified as Split
import Data.Text qualified as Text
import Data.Either
import Data.Maybe
import Data.ByteString.Lazy qualified as LBS
@ -66,14 +49,12 @@ import Data.List qualified as L
import Data.Coerce
import Numeric
import UnliftIO
import Control.Concurrent.STM qualified as STM
import Control.Concurrent.STM.TSem (TSem)
import Control.Concurrent.STM.TSem qualified as TSem
import UnliftIO.Concurrent
import System.Random
import UnliftIO.STM
import Lens.Micro.Platform
import System.Random qualified as R
import System.Random.Shuffle qualified as Shuffle
import Streaming.Prelude qualified as S
@ -570,7 +551,7 @@ downloadDispatcher :: forall e m . ( e ~ L4Proto
-> m ()
downloadDispatcher brains env = flip runContT pure do
pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async ()) )
pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async (), PeerNonce) )
-- tasks <- newTVarIO ( HPSQ.empty :: HashPSQ (Work e) Double (TVar Int) )
_blkNum <- newTVarIO 0
@ -635,28 +616,53 @@ downloadDispatcher brains env = flip runContT pure do
manageThreads onBlock wip pts = do
_pnum <- newTVarIO 1
_psem <- newTVarIO ( mempty :: HashMap PeerNonce TSem )
forever $ (>> pause @'Seconds 10) $ withPeerM env do
debug "MANAGE THREADS"
peers <- getKnownPeers @e <&> HS.fromList
peers <- HM.fromList <$> do
pips <- getKnownPeers @e <&> HS.fromList
S.toList_ $ for_ pips $ \p -> do
mpd <- lift $ find (KnownPeerKey p) id
maybe1 mpd (pure ()) $
\PeerData{..} -> S.yield (p, _peerOwnNonce)
for_ peers $ \p -> do
for_ (HM.toList peers) $ \(p,nonce) -> do
here <- readTVarIO pts <&> HM.member p
i <- atomically $ stateTVar _pnum (\i -> (i, succ i))
(i,sem) <- atomically do
j <- stateTVar _pnum (\i -> (i, succ i))
psem <- readTVar _psem
ssem <- case HM.lookup nonce psem of
Just s -> pure s
Nothing -> do
-- TODO: semaphore-hardcode
new <- TSem.newTSem 2
modifyTVar _psem (HM.insert nonce new)
pure new
pure (j,ssem)
unless here do
a <- async (peerThread i onBlock p wip)
atomically $ modifyTVar pts (HM.insert p a)
a <- async (peerThread sem i onBlock p wip)
atomically $ modifyTVar pts (HM.insert p (a,nonce))
loosers <- atomically do
xs <- readTVar pts <&> HM.toList
-- FIXME: filter-stopped-tasks
let (alive,dead) = L.partition ( \(x,_) -> HS.member x peers ) xs
let (alive,dead) = L.partition ( \(x,_) -> HM.member x peers ) xs
writeTVar pts (HM.fromList alive)
pure dead
mapM_ cancel (fmap snd loosers)
for_ loosers $ \(p, (a, nonce)) -> do
cancel a
atomically do
modifyTVar _psem (HM.delete nonce)
peerThread me onBlock p wip = flip runContT pure do
peerThread sem me onBlock p wip = flip runContT pure do
btimes <- newTVarIO ( mempty :: [Double] )
@ -704,6 +710,7 @@ downloadDispatcher brains env = flip runContT pure do
PChoose -> do
what <- atomically do
TSem.waitTSem sem
r <- newTVar ( HPSQ.empty @HashRef @Double @DCB )
blocks <- readTVar wip <&> HM.toList
let todo = blocks
@ -799,7 +806,7 @@ downloadDispatcher brains env = flip runContT pure do
avg <- readTVarIO _avg
when (dtsec > avg) do
when (dtsec > avg * 1.15) do
burstMachineAddErrors bm 1
atomically do
@ -818,6 +825,7 @@ downloadDispatcher brains env = flip runContT pure do
PReleaseBlock hx dcb done -> do
atomically do
TSem.signalTSem sem
if not done then do
modifyTVar (dcbBusy dcb) pred
else do