This commit is contained in:
voidlizard 2024-11-14 07:07:23 +03:00
parent 6164cd4aba
commit fe444bb7f7
1 changed files with 40 additions and 32 deletions

View File

@ -6,19 +6,13 @@
module BlockDownloadNew where module BlockDownloadNew where
import HBS2.Prelude.Plated import HBS2.Prelude.Plated
import HBS2.Clock
import HBS2.OrDie import HBS2.OrDie
import HBS2.Data.Detect import HBS2.Data.Detect
import HBS2.Hash import HBS2.Hash
import HBS2.Merkle
import HBS2.Defaults import HBS2.Defaults
import HBS2.Events import HBS2.Events
import HBS2.Net.Proto.Service import HBS2.Net.Proto.Service
import HBS2.Net.Proto.Sessions import HBS2.Net.Proto.Sessions
import HBS2.Data.Bundle
import HBS2.Data.Types.SignedBox
import HBS2.Base58
import HBS2.Data.Types.Peer import HBS2.Data.Types.Peer
import HBS2.Data.Types.Refs import HBS2.Data.Types.Refs
import HBS2.Actors.Peer import HBS2.Actors.Peer
@ -29,18 +23,10 @@ import HBS2.Peer.Brains
import HBS2.Storage import HBS2.Storage
import HBS2.Storage.Operations.Missed import HBS2.Storage.Operations.Missed
import HBS2.Misc.PrettyStuff import HBS2.Misc.PrettyStuff
import HBS2.Clock
import HBS2.Net.Auth.Schema
import HBS2.Peer.RPC.Internal.Types
import HBS2.Peer.RPC.API.Peer
import HBS2.Net.Messaging.TCP
import PeerTypes import PeerTypes
import PeerInfo import PeerInfo
import Control.Monad.Trans.Maybe
import Control.Monad.Trans.Cont import Control.Monad.Trans.Cont
import Control.Concurrent.STM (flushTQueue,retry) import Control.Concurrent.STM (flushTQueue,retry)
import Data.Map qualified as Map import Data.Map qualified as Map
@ -52,9 +38,6 @@ import Data.HashMap.Strict qualified as HM
import Data.HashSet (HashSet) import Data.HashSet (HashSet)
import Data.HashSet qualified as HS import Data.HashSet qualified as HS
import Data.IntMap qualified as IntMap import Data.IntMap qualified as IntMap
import Data.IntMap (IntMap)
import Data.List.Split qualified as Split
import Data.Text qualified as Text
import Data.Either import Data.Either
import Data.Maybe import Data.Maybe
import Data.ByteString.Lazy qualified as LBS import Data.ByteString.Lazy qualified as LBS
@ -66,14 +49,12 @@ import Data.List qualified as L
import Data.Coerce import Data.Coerce
import Numeric import Numeric
import UnliftIO import UnliftIO
import Control.Concurrent.STM qualified as STM import Control.Concurrent.STM.TSem (TSem)
import Control.Concurrent.STM.TSem qualified as TSem
import UnliftIO.Concurrent import UnliftIO.Concurrent
import System.Random import UnliftIO.STM
import Lens.Micro.Platform import Lens.Micro.Platform
import System.Random qualified as R
import System.Random.Shuffle qualified as Shuffle
import Streaming.Prelude qualified as S import Streaming.Prelude qualified as S
@ -570,7 +551,7 @@ downloadDispatcher :: forall e m . ( e ~ L4Proto
-> m () -> m ()
downloadDispatcher brains env = flip runContT pure do downloadDispatcher brains env = flip runContT pure do
pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async ()) ) pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async (), PeerNonce) )
-- tasks <- newTVarIO ( HPSQ.empty :: HashPSQ (Work e) Double (TVar Int) ) -- tasks <- newTVarIO ( HPSQ.empty :: HashPSQ (Work e) Double (TVar Int) )
_blkNum <- newTVarIO 0 _blkNum <- newTVarIO 0
@ -635,28 +616,53 @@ downloadDispatcher brains env = flip runContT pure do
manageThreads onBlock wip pts = do manageThreads onBlock wip pts = do
_pnum <- newTVarIO 1 _pnum <- newTVarIO 1
_psem <- newTVarIO ( mempty :: HashMap PeerNonce TSem )
forever $ (>> pause @'Seconds 10) $ withPeerM env do forever $ (>> pause @'Seconds 10) $ withPeerM env do
debug "MANAGE THREADS" debug "MANAGE THREADS"
peers <- getKnownPeers @e <&> HS.fromList peers <- HM.fromList <$> do
pips <- getKnownPeers @e <&> HS.fromList
S.toList_ $ for_ pips $ \p -> do
mpd <- lift $ find (KnownPeerKey p) id
maybe1 mpd (pure ()) $
\PeerData{..} -> S.yield (p, _peerOwnNonce)
for_ peers $ \p -> do
for_ (HM.toList peers) $ \(p,nonce) -> do
here <- readTVarIO pts <&> HM.member p here <- readTVarIO pts <&> HM.member p
i <- atomically $ stateTVar _pnum (\i -> (i, succ i))
(i,sem) <- atomically do
j <- stateTVar _pnum (\i -> (i, succ i))
psem <- readTVar _psem
ssem <- case HM.lookup nonce psem of
Just s -> pure s
Nothing -> do
-- TODO: semaphore-hardcode
new <- TSem.newTSem 2
modifyTVar _psem (HM.insert nonce new)
pure new
pure (j,ssem)
unless here do unless here do
a <- async (peerThread i onBlock p wip) a <- async (peerThread sem i onBlock p wip)
atomically $ modifyTVar pts (HM.insert p a) atomically $ modifyTVar pts (HM.insert p (a,nonce))
loosers <- atomically do loosers <- atomically do
xs <- readTVar pts <&> HM.toList xs <- readTVar pts <&> HM.toList
-- FIXME: filter-stopped-tasks -- FIXME: filter-stopped-tasks
let (alive,dead) = L.partition ( \(x,_) -> HS.member x peers ) xs let (alive,dead) = L.partition ( \(x,_) -> HM.member x peers ) xs
writeTVar pts (HM.fromList alive) writeTVar pts (HM.fromList alive)
pure dead pure dead
mapM_ cancel (fmap snd loosers) for_ loosers $ \(p, (a, nonce)) -> do
cancel a
atomically do
modifyTVar _psem (HM.delete nonce)
peerThread me onBlock p wip = flip runContT pure do peerThread sem me onBlock p wip = flip runContT pure do
btimes <- newTVarIO ( mempty :: [Double] ) btimes <- newTVarIO ( mempty :: [Double] )
@ -704,6 +710,7 @@ downloadDispatcher brains env = flip runContT pure do
PChoose -> do PChoose -> do
what <- atomically do what <- atomically do
TSem.waitTSem sem
r <- newTVar ( HPSQ.empty @HashRef @Double @DCB ) r <- newTVar ( HPSQ.empty @HashRef @Double @DCB )
blocks <- readTVar wip <&> HM.toList blocks <- readTVar wip <&> HM.toList
let todo = blocks let todo = blocks
@ -799,7 +806,7 @@ downloadDispatcher brains env = flip runContT pure do
avg <- readTVarIO _avg avg <- readTVarIO _avg
when (dtsec > avg) do when (dtsec > avg * 1.15) do
burstMachineAddErrors bm 1 burstMachineAddErrors bm 1
atomically do atomically do
@ -818,6 +825,7 @@ downloadDispatcher brains env = flip runContT pure do
PReleaseBlock hx dcb done -> do PReleaseBlock hx dcb done -> do
atomically do atomically do
TSem.signalTSem sem
if not done then do if not done then do
modifyTVar (dcbBusy dcb) pred modifyTVar (dcbBusy dcb) pred
else do else do