mirror of https://github.com/voidlizard/hbs2
more or less works
This commit is contained in:
parent
86dab9c717
commit
bf435f1f47
|
@ -569,13 +569,14 @@ data Work =
|
||||||
-- | Download control block
|
-- | Download control block
|
||||||
data DCB =
|
data DCB =
|
||||||
DCB
|
DCB
|
||||||
{ dcbStart :: TimeSpec
|
{ dcbStart :: !TimeSpec
|
||||||
, dcbBusy :: TVar Int
|
, dcbParent :: !(Maybe HashRef)
|
||||||
, dcbDownloaded :: TVar Bool
|
, dcbBusy :: !(TVar Int)
|
||||||
|
, dcbDownloaded :: !(TVar Bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
newDcbSTM :: TimeSpec -> STM DCB
|
newDcbSTM :: TimeSpec -> Maybe HashRef -> STM DCB
|
||||||
newDcbSTM ts = DCB ts <$> newTVar 0 <*> newTVar False
|
newDcbSTM ts parent = DCB ts parent <$> newTVar 0 <*> newTVar False
|
||||||
|
|
||||||
data PSt =
|
data PSt =
|
||||||
PChoose
|
PChoose
|
||||||
|
@ -616,12 +617,12 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
debug $ green "New download request" <+> pretty h
|
debug $ green "New download request" <+> pretty h
|
||||||
atomically do
|
atomically do
|
||||||
already <- readTVar wip <&> HPSQ.member (HashRef h)
|
already <- readTVar wip <&> HPSQ.member (HashRef h)
|
||||||
dcb <- newDcbSTM now
|
dcb <- newDcbSTM now mzero
|
||||||
let w = 1.0 -- realToFrac now
|
let w = 1.0 -- realToFrac now
|
||||||
unless already do
|
unless already do
|
||||||
modifyTVar wip (HPSQ.insert (HashRef h) w dcb)
|
modifyTVar wip (HPSQ.insert (HashRef h) w dcb)
|
||||||
|
|
||||||
ContT $ withAsync $ forever $ (>> pause @'Seconds 5) do
|
ContT $ withAsync $ forever $ (>> pause @'Seconds 30) do
|
||||||
debug "Sweep blocks"
|
debug "Sweep blocks"
|
||||||
atomically do
|
atomically do
|
||||||
total <- readTVar wip <&> HPSQ.toList
|
total <- readTVar wip <&> HPSQ.toList
|
||||||
|
@ -636,13 +637,13 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
writeTVar wip (HPSQ.fromList (catMaybes alive))
|
writeTVar wip (HPSQ.fromList (catMaybes alive))
|
||||||
|
|
||||||
|
|
||||||
ContT $ withAsync $ replicateM_ 4 $ forever do
|
ContT $ withAsync $ forever do
|
||||||
what <- atomically $ readTQueue parseQ
|
what <- atomically $ readTQueue parseQ
|
||||||
missed <- findMissedBlocks sto what
|
missed <- findMissedBlocks sto what
|
||||||
now <- getTimeCoarse
|
now <- getTimeCoarse
|
||||||
for_ missed $ \hi -> do
|
for_ missed $ \hi -> do
|
||||||
atomically do
|
atomically do
|
||||||
dcb <- newDcbSTM now
|
dcb <- newDcbSTM now (Just what)
|
||||||
let w = realToFrac now
|
let w = realToFrac now
|
||||||
already <- readTVar wip <&> HPSQ.member hi
|
already <- readTVar wip <&> HPSQ.member hi
|
||||||
unless already do
|
unless already do
|
||||||
|
@ -654,15 +655,19 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
|
|
||||||
where
|
where
|
||||||
|
|
||||||
manageThreads onBlock wip pts = forever $ (>> pause @'Seconds 10) $ withPeerM env do
|
manageThreads onBlock wip pts = do
|
||||||
|
_pnum <- newTVarIO 1
|
||||||
|
|
||||||
|
forever $ (>> pause @'Seconds 10) $ withPeerM env do
|
||||||
debug "MANAGE THREADS"
|
debug "MANAGE THREADS"
|
||||||
|
|
||||||
peers <- getKnownPeers @e <&> HS.fromList
|
peers <- getKnownPeers @e <&> HS.fromList
|
||||||
|
|
||||||
for_ peers $ \p -> do
|
for_ peers $ \p -> do
|
||||||
here <- readTVarIO pts <&> HM.member p
|
here <- readTVarIO pts <&> HM.member p
|
||||||
|
i <- atomically $ stateTVar _pnum (\i -> (i, succ i))
|
||||||
unless here do
|
unless here do
|
||||||
a <- async (peerThread onBlock p wip)
|
a <- async (peerThread i onBlock p wip)
|
||||||
atomically $ modifyTVar pts (HM.insert p a)
|
atomically $ modifyTVar pts (HM.insert p a)
|
||||||
|
|
||||||
loosers <- atomically do
|
loosers <- atomically do
|
||||||
|
@ -674,7 +679,7 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
|
|
||||||
mapM_ cancel (fmap snd loosers)
|
mapM_ cancel (fmap snd loosers)
|
||||||
|
|
||||||
peerThread onBlock p wip = flip runContT pure do
|
peerThread me onBlock p wip = flip runContT pure do
|
||||||
|
|
||||||
btimes <- newTVarIO ( mempty :: [Double] )
|
btimes <- newTVarIO ( mempty :: [Double] )
|
||||||
|
|
||||||
|
@ -685,7 +690,7 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
|
|
||||||
_sizeCache <- newTVarIO ( mempty :: HashMap HashRef (Maybe Integer) )
|
_sizeCache <- newTVarIO ( mempty :: HashMap HashRef (Maybe Integer) )
|
||||||
|
|
||||||
bm <- liftIO $ newBurstMachine 10 256 (Just 50) 0.10 0.25
|
bm <- liftIO $ newBurstMachine 5 256 (Just 80) 0.10 0.25
|
||||||
|
|
||||||
void $ ContT $ bracket none $ const do
|
void $ ContT $ bracket none $ const do
|
||||||
debug $ "Cancelling thread for" <+> pretty p
|
debug $ "Cancelling thread for" <+> pretty p
|
||||||
|
@ -721,26 +726,45 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
PChoose -> do
|
PChoose -> do
|
||||||
|
|
||||||
what <- atomically do
|
what <- atomically do
|
||||||
|
r <- newTVar ( HPSQ.empty @HashRef @Double @DCB )
|
||||||
blocks <- readTVar wip <&> HPSQ.toList
|
blocks <- readTVar wip <&> HPSQ.toList
|
||||||
let todo = blocks
|
let todo = blocks
|
||||||
flip fix todo $ \loop w -> do
|
flip fix todo $ \loop w -> do
|
||||||
erno <- readTVar _errors
|
erno <- readTVar _errors
|
||||||
if erno > 10 then
|
if erno > 10 then
|
||||||
pure Nothing
|
pure ()
|
||||||
else do
|
else do
|
||||||
case w of
|
case w of
|
||||||
[] -> retry
|
[] -> none
|
||||||
(h,_,dcb):xs -> do
|
|
||||||
|
(h,_,dcb@DCB{..}):xs -> do
|
||||||
wpsize <- readTVar wip <&> HPSQ.size
|
wpsize <- readTVar wip <&> HPSQ.size
|
||||||
let trsh = if wpsize < 10 then 3 else 0
|
let trsh = if wpsize < 10 then 3 else 0
|
||||||
busy <- readTVar (dcbBusy dcb)
|
busy <- readTVar dcbBusy
|
||||||
down <- readTVar (dcbDownloaded dcb)
|
down <- readTVar dcbDownloaded
|
||||||
absent <- readTVar _sizeCache <&> (== Just Nothing) . HM.lookup h
|
absent <- readTVar _sizeCache <&> (== Just Nothing) . HM.lookup h
|
||||||
if busy > trsh || down || absent then
|
if busy > trsh || down || absent then
|
||||||
loop xs
|
loop xs
|
||||||
else do
|
else do
|
||||||
modifyTVar (dcbBusy dcb) succ
|
sizeCache <- readTVar _sizeCache
|
||||||
pure $ Just (h,dcb)
|
|
||||||
|
let eps = case dcbParent of
|
||||||
|
Nothing -> 1.0
|
||||||
|
Just hp -> case HM.lookup hp sizeCache of
|
||||||
|
Just (Just _) -> 0.5
|
||||||
|
Just Nothing -> 1.5
|
||||||
|
Nothing -> 1.0
|
||||||
|
|
||||||
|
modifyTVar r (HPSQ.insert h eps dcb)
|
||||||
|
s <- readTVar r <&> HPSQ.size
|
||||||
|
if s >= 8 then pure () else loop xs
|
||||||
|
|
||||||
|
w <- readTVar r <&> HPSQ.findMin
|
||||||
|
case w of
|
||||||
|
Nothing -> retry
|
||||||
|
Just (h,_,d) -> do
|
||||||
|
modifyTVar (dcbBusy d) succ
|
||||||
|
pure (Just (h,d))
|
||||||
|
|
||||||
case what of
|
case what of
|
||||||
Just (hx, dcb) -> go (PInit hx dcb)
|
Just (hx, dcb) -> go (PInit hx dcb)
|
||||||
|
@ -749,7 +773,7 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
if erno > 50 then do
|
if erno > 50 then do
|
||||||
pause @'Seconds 60
|
pause @'Seconds 60
|
||||||
else do
|
else do
|
||||||
pause @'Seconds 1
|
pause @'Seconds 0.25
|
||||||
go PChoose
|
go PChoose
|
||||||
|
|
||||||
PInit hx dcb -> do
|
PInit hx dcb -> do
|
||||||
|
@ -802,7 +826,7 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
|
|
||||||
avg <- readTVarIO _avg
|
avg <- readTVarIO _avg
|
||||||
|
|
||||||
when (dtsec > avg * 1.10) do
|
when (dtsec > avg * 2) do
|
||||||
burstMachineAddErrors bm 1
|
burstMachineAddErrors bm 1
|
||||||
|
|
||||||
atomically do
|
atomically do
|
||||||
|
|
Loading…
Reference in New Issue