more or less works

This commit is contained in:
voidlizard 2024-11-13 18:53:22 +03:00
parent d267315898
commit 8df3485752
1 changed files with 59 additions and 35 deletions

View File

@ -569,13 +569,14 @@ data Work =
-- | Download control block -- | Download control block
data DCB = data DCB =
DCB DCB
{ dcbStart :: TimeSpec { dcbStart :: !TimeSpec
, dcbBusy :: TVar Int , dcbParent :: !(Maybe HashRef)
, dcbDownloaded :: TVar Bool , dcbBusy :: !(TVar Int)
, dcbDownloaded :: !(TVar Bool)
} }
newDcbSTM :: TimeSpec -> STM DCB newDcbSTM :: TimeSpec -> Maybe HashRef -> STM DCB
newDcbSTM ts = DCB ts <$> newTVar 0 <*> newTVar False newDcbSTM ts parent = DCB ts parent <$> newTVar 0 <*> newTVar False
data PSt = data PSt =
PChoose PChoose
@ -616,12 +617,12 @@ downloadDispatcher brains env = flip runContT pure do
debug $ green "New download request" <+> pretty h debug $ green "New download request" <+> pretty h
atomically do atomically do
already <- readTVar wip <&> HPSQ.member (HashRef h) already <- readTVar wip <&> HPSQ.member (HashRef h)
dcb <- newDcbSTM now dcb <- newDcbSTM now mzero
let w = 1.0 -- realToFrac now let w = 1.0 -- realToFrac now
unless already do unless already do
modifyTVar wip (HPSQ.insert (HashRef h) w dcb) modifyTVar wip (HPSQ.insert (HashRef h) w dcb)
ContT $ withAsync $ forever $ (>> pause @'Seconds 5) do ContT $ withAsync $ forever $ (>> pause @'Seconds 30) do
debug "Sweep blocks" debug "Sweep blocks"
atomically do atomically do
total <- readTVar wip <&> HPSQ.toList total <- readTVar wip <&> HPSQ.toList
@ -636,13 +637,13 @@ downloadDispatcher brains env = flip runContT pure do
writeTVar wip (HPSQ.fromList (catMaybes alive)) writeTVar wip (HPSQ.fromList (catMaybes alive))
ContT $ withAsync $ replicateM_ 4 $ forever do ContT $ withAsync $ forever do
what <- atomically $ readTQueue parseQ what <- atomically $ readTQueue parseQ
missed <- findMissedBlocks sto what missed <- findMissedBlocks sto what
now <- getTimeCoarse now <- getTimeCoarse
for_ missed $ \hi -> do for_ missed $ \hi -> do
atomically do atomically do
dcb <- newDcbSTM now dcb <- newDcbSTM now (Just what)
let w = realToFrac now let w = realToFrac now
already <- readTVar wip <&> HPSQ.member hi already <- readTVar wip <&> HPSQ.member hi
unless already do unless already do
@ -654,15 +655,19 @@ downloadDispatcher brains env = flip runContT pure do
where where
manageThreads onBlock wip pts = forever $ (>> pause @'Seconds 10) $ withPeerM env do manageThreads onBlock wip pts = do
_pnum <- newTVarIO 1
forever $ (>> pause @'Seconds 10) $ withPeerM env do
debug "MANAGE THREADS" debug "MANAGE THREADS"
peers <- getKnownPeers @e <&> HS.fromList peers <- getKnownPeers @e <&> HS.fromList
for_ peers $ \p -> do for_ peers $ \p -> do
here <- readTVarIO pts <&> HM.member p here <- readTVarIO pts <&> HM.member p
i <- atomically $ stateTVar _pnum (\i -> (i, succ i))
unless here do unless here do
a <- async (peerThread onBlock p wip) a <- async (peerThread i onBlock p wip)
atomically $ modifyTVar pts (HM.insert p a) atomically $ modifyTVar pts (HM.insert p a)
loosers <- atomically do loosers <- atomically do
@ -674,7 +679,7 @@ downloadDispatcher brains env = flip runContT pure do
mapM_ cancel (fmap snd loosers) mapM_ cancel (fmap snd loosers)
peerThread onBlock p wip = flip runContT pure do peerThread me onBlock p wip = flip runContT pure do
btimes <- newTVarIO ( mempty :: [Double] ) btimes <- newTVarIO ( mempty :: [Double] )
@ -685,7 +690,7 @@ downloadDispatcher brains env = flip runContT pure do
_sizeCache <- newTVarIO ( mempty :: HashMap HashRef (Maybe Integer) ) _sizeCache <- newTVarIO ( mempty :: HashMap HashRef (Maybe Integer) )
bm <- liftIO $ newBurstMachine 10 256 (Just 50) 0.10 0.25 bm <- liftIO $ newBurstMachine 5 256 (Just 80) 0.10 0.25
void $ ContT $ bracket none $ const do void $ ContT $ bracket none $ const do
debug $ "Cancelling thread for" <+> pretty p debug $ "Cancelling thread for" <+> pretty p
@ -721,26 +726,45 @@ downloadDispatcher brains env = flip runContT pure do
PChoose -> do PChoose -> do
what <- atomically do what <- atomically do
r <- newTVar ( HPSQ.empty @HashRef @Double @DCB )
blocks <- readTVar wip <&> HPSQ.toList blocks <- readTVar wip <&> HPSQ.toList
let todo = blocks let todo = blocks
flip fix todo $ \loop w -> do flip fix todo $ \loop w -> do
erno <- readTVar _errors erno <- readTVar _errors
if erno > 10 then if erno > 10 then
pure Nothing pure ()
else do else do
case w of case w of
[] -> retry [] -> none
(h,_,dcb):xs -> do
(h,_,dcb@DCB{..}):xs -> do
wpsize <- readTVar wip <&> HPSQ.size wpsize <- readTVar wip <&> HPSQ.size
let trsh = if wpsize < 10 then 3 else 0 let trsh = if wpsize < 10 then 3 else 0
busy <- readTVar (dcbBusy dcb) busy <- readTVar dcbBusy
down <- readTVar (dcbDownloaded dcb) down <- readTVar dcbDownloaded
absent <- readTVar _sizeCache <&> (== Just Nothing) . HM.lookup h absent <- readTVar _sizeCache <&> (== Just Nothing) . HM.lookup h
if busy > trsh || down || absent then if busy > trsh || down || absent then
loop xs loop xs
else do else do
modifyTVar (dcbBusy dcb) succ sizeCache <- readTVar _sizeCache
pure $ Just (h,dcb)
let eps = case dcbParent of
Nothing -> 1.0
Just hp -> case HM.lookup hp sizeCache of
Just (Just _) -> 0.5
Just Nothing -> 1.5
Nothing -> 1.0
modifyTVar r (HPSQ.insert h eps dcb)
s <- readTVar r <&> HPSQ.size
if s >= 8 then pure () else loop xs
w <- readTVar r <&> HPSQ.findMin
case w of
Nothing -> retry
Just (h,_,d) -> do
modifyTVar (dcbBusy d) succ
pure (Just (h,d))
case what of case what of
Just (hx, dcb) -> go (PInit hx dcb) Just (hx, dcb) -> go (PInit hx dcb)
@ -749,7 +773,7 @@ downloadDispatcher brains env = flip runContT pure do
if erno > 50 then do if erno > 50 then do
pause @'Seconds 60 pause @'Seconds 60
else do else do
pause @'Seconds 1 pause @'Seconds 0.25
go PChoose go PChoose
PInit hx dcb -> do PInit hx dcb -> do
@ -802,7 +826,7 @@ downloadDispatcher brains env = flip runContT pure do
avg <- readTVarIO _avg avg <- readTVarIO _avg
when (dtsec > avg * 1.10) do when (dtsec > avg * 2) do
burstMachineAddErrors bm 1 burstMachineAddErrors bm 1
atomically do atomically do