This commit is contained in:
voidlizard 2024-11-12 10:09:59 +03:00
parent cd33fc0edc
commit d74573ac5f
3 changed files with 48 additions and 18 deletions

View File

@ -13,6 +13,7 @@ import Streaming.Prelude qualified as S
import Streaming.Prelude (Stream, Of(..)) import Streaming.Prelude (Stream, Of(..))
import Control.Monad.Trans.Maybe import Control.Monad.Trans.Maybe
import Control.Monad import Control.Monad
import Data.Coerce
import Data.Maybe import Data.Maybe
-- TODO: slow-dangerous -- TODO: slow-dangerous
@ -39,6 +40,12 @@ findMissedBlocks2 sto href = do
maybe1 blk none $ \bs -> do maybe1 blk none $ \bs -> do
let w = tryDetect (fromHashRef hx) bs let w = tryDetect (fromHashRef hx) bs
let refs = extractBlockRefs (coerce hx) bs
for_ refs $ \r -> do
here <- hasBlock sto (coerce r) <&> isJust
unless here $ lift $ S.yield r
r <- case w of r <- case w of
Merkle{} -> lift $ lift $ findMissedBlocks sto hx Merkle{} -> lift $ lift $ findMissedBlocks sto hx
MerkleAnn t -> lift $ lift do MerkleAnn t -> lift $ lift do

View File

@ -483,7 +483,7 @@ downloadFromPeer t bu cache env h peer = liftIO $ withPeerM env do
let watchdog = fix \next -> do let watchdog = fix \next -> do
wx <- readTVarIO _wx <&> realToFrac wx <- readTVarIO _wx <&> realToFrac
-- debug $ "WATCHDOG" <+> pretty wx <+> pretty waity -- debug $ "WATCHDOG" <+> pretty wx <+> pretty waity
r <- race (pause @'MilliSeconds (max wx waity)) do r <- race (pause @'MilliSeconds (min wx waity)) do
void $ atomically $ readTQueue chuQ void $ atomically $ readTQueue chuQ
either (const none) (const next) r either (const none) (const next) r
@ -573,7 +573,7 @@ downloadDispatcher :: forall e m . ( e ~ L4Proto
-> m () -> m ()
downloadDispatcher brains env = flip runContT pure do downloadDispatcher brains env = flip runContT pure do
pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async (), TQueue Work)) pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async (), TBQueue Work))
-- tasks <- newTVarIO ( HPSQ.empty :: HashPSQ (Work e) Double (TVar Int) ) -- tasks <- newTVarIO ( HPSQ.empty :: HashPSQ (Work e) Double (TVar Int) )
_blkNum <- newTVarIO 0 _blkNum <- newTVarIO 0
@ -607,6 +607,10 @@ downloadDispatcher brains env = flip runContT pure do
modifyTVar sizeRqWip (HM.delete (p,h)) modifyTVar sizeRqWip (HM.delete (p,h))
modifyTVar sizeRq (HPSQ.delete h) modifyTVar sizeRq (HPSQ.delete h)
l <- atomically do
w <- readTVar pts <&> fmap snd . HM.lookup p
maybe1 w (pure 0) lengthTBQueue
-- let color = if isJust s then green else red -- let color = if isJust s then green else red
-- debug $ color "GOT BLOCK SIZE" <+> pretty h <+> pretty s <+> pretty p -- debug $ color "GOT BLOCK SIZE" <+> pretty h <+> pretty s <+> pretty p
dtt <- randomRIO (-0.05, 0.05) dtt <- randomRIO (-0.05, 0.05)
@ -614,7 +618,7 @@ downloadDispatcher brains env = flip runContT pure do
here <- hasBlock sto (coerce h) <&> isJust here <- hasBlock sto (coerce h) <&> isJust
unless here do unless here do
dt <- readTVarIO stat <&> (*(1+dtt)) . fromMaybe 1.0 . HM.lookup p dt <- readTVarIO stat <&> (*(1+dtt)) . fromMaybe 1.0 . HM.lookup p
let rate = dt let rate = dt + realToFrac l
atomically do atomically do
-- blkNum <- stateTVar _blkNum (\x -> (x, succ x)) -- blkNum <- stateTVar _blkNum (\x -> (x, succ x))
modifyTVar sizeCache (HM.insert (p,h) s) modifyTVar sizeCache (HM.insert (p,h) s)
@ -708,11 +712,10 @@ downloadDispatcher brains env = flip runContT pure do
modifyTVar choosen (HS.delete h) modifyTVar choosen (HS.delete h)
ContT $ withAsync $ forever do ContT $ withAsync $ forever do
(h,bs) <- atomically $ readTQueue parseQ (h,_) <- atomically $ readTQueue parseQ
let refs = extractBlockRefs (coerce h) bs
missed <- findMissedBlocks sto h missed <- findMissedBlocks sto h
now <- getTimeCoarse now <- getTimeCoarse
for_ (HS.fromList ( refs <> missed )) $ \h -> do for_ (HS.fromList missed) $ \h -> do
here <- hasBlock sto (coerce h) <&> isJust here <- hasBlock sto (coerce h) <&> isJust
unless here do unless here do
atomically $ modifyTVar sizeRq (HPSQ.insert h now ()) atomically $ modifyTVar sizeRq (HPSQ.insert h now ())
@ -733,14 +736,25 @@ downloadDispatcher brains env = flip runContT pure do
for_ tasks $ \(p, w, h) -> do for_ tasks $ \(p, w, h) -> do
atomically do atomically do
writeTQueue w (RequestSize h (onBlockSize p h)) full <- isFullTBQueue w
modifyTVar sizeRqWip (HM.insert (p,h) now) unless full do
writeTBQueue w (RequestSize h (onBlockSize p h))
modifyTVar sizeRqWip (HM.insert (p,h) now)
ContT $ withAsync $ forever do ContT $ withAsync $ forever do
(w1,dw,peerz) <- atomically do (w1,dw,peerz) <- atomically do
peers <- readTVar pts peers0 <- readTVar pts
peers1 <- for (HM.toList peers0) $ \e@(_,(_,q)) -> do
full <- isFullTBQueue q
if full then do
pure (Left e)
else do
pure (Right e)
let peers = HM.fromList (rights peers1)
let n = HM.size peers let n = HM.size peers
@ -750,14 +764,18 @@ downloadDispatcher brains env = flip runContT pure do
dw <- readTVar downWip dw <- readTVar downWip
let total = [ x | x@((p,_),_,_) <- L.take 32 (HPSQ.toList dw), HM.member p peers ] let total = [ x
| x@((p,h),_,_) <- HPSQ.toList dw
, HM.member p peers
, not (HS.member h already)
] & L.take 100
when (L.null total) retry when (L.null total) retry
let queue = total let queue = total
let qq = [ (h, HS.singleton p) let qq = [ (h, HS.singleton p)
| ((p,h),_,_) <- queue, not (HS.member h already) | ((p,h),_,_) <- queue
] & fmap (over _2 (V.fromList . HS.toList)) . HM.toList . HM.fromListWith (<>) ] & fmap (over _2 (V.fromList . HS.toList)) . HM.toList . HM.fromListWith (<>)
when (L.null qq) retry when (L.null qq) retry
@ -781,8 +799,13 @@ downloadDispatcher brains env = flip runContT pure do
atomically do atomically do
choo <- readTVar choosen <&> HS.member h choo <- readTVar choosen <&> HS.member h
unless choo do unless choo do
writeTQueue who (FetchBlock h size (onBlock p0 h)) full <- isFullTBQueue who
modifyTVar choosen (HS.insert h) if full then do
-- FIXME: wtf?
modifyTVar downWip (HPSQ.insert (p0,h) 0.01 size)
else do
writeTBQueue who (FetchBlock h size (onBlock p0 h))
modifyTVar choosen (HS.insert h)
forever $ (>> pause @'Seconds 10) do forever $ (>> pause @'Seconds 10) do
sw0 <- readTVarIO wip <&> HM.size sw0 <- readTVarIO wip <&> HM.size
@ -811,7 +834,7 @@ downloadDispatcher brains env = flip runContT pure do
for_ peers $ \p -> do for_ peers $ \p -> do
here <- readTVarIO pts <&> HM.member p here <- readTVarIO pts <&> HM.member p
unless here do unless here do
work <- newTQueueIO work <- newTBQueueIO (16*1024*10)
a <- async (peerThread stat p work) a <- async (peerThread stat p work)
atomically $ modifyTVar pts (HM.insert p (a,work)) atomically $ modifyTVar pts (HM.insert p (a,work))
@ -833,7 +856,7 @@ downloadDispatcher brains env = flip runContT pure do
_sizeCache <- newTVarIO ( mempty :: HashMap HashRef (Maybe Integer) ) _sizeCache <- newTVarIO ( mempty :: HashMap HashRef (Maybe Integer) )
bm <- liftIO $ newBurstMachine 5 256 (Just 50) 0.05 0.20 bm <- liftIO $ newBurstMachine 5 256 (Just 50) 0.10 0.25
void $ ContT $ bracket none $ const do void $ ContT $ bracket none $ const do
debug $ "Cancelling thread for" <+> pretty p debug $ "Cancelling thread for" <+> pretty p
@ -851,7 +874,7 @@ downloadDispatcher brains env = flip runContT pure do
writeTVar _avg avg writeTVar _avg avg
twork <- ContT $ withAsync $ forever do twork <- ContT $ withAsync $ forever do
w <- atomically $ readTQueue work w <- atomically $ readTBQueue work
case w of case w of
RequestSize h answ -> do RequestSize h answ -> do
@ -888,7 +911,7 @@ downloadDispatcher brains env = flip runContT pure do
atomically $ modifyTVar btimes ( take 100 . (dtsec :) ) atomically $ modifyTVar btimes ( take 100 . (dtsec :) )
liftIO $ answ (BlockFetched bs) liftIO $ answ (BlockFetched bs)
Left{} | i >= 5 -> liftIO $ answ BlockFetchError Left{} | i >= 2 -> liftIO $ answ BlockFetchError
| otherwise -> next (succ i) | otherwise -> next (succ i)
bs <- ContT $ withAsync $ forever do bs <- ContT $ withAsync $ forever do

View File

@ -248,7 +248,7 @@ runCat opts ss = do
MerkleAnn (MTreeAnn {_mtaCrypt = NullEncryption }) -> do MerkleAnn (MTreeAnn {_mtaCrypt = NullEncryption }) -> do
bs <- runExceptT (readFromMerkle (AnyStorage ss) (SimpleKey mhash)) bs <- runExceptT (readFromMerkle (AnyStorage ss) (SimpleKey mhash))
>>= orThrowUser "can't read/decode tree" >>= orThrowPassIO -- User "can't read/decode tree"
LBS.putStr bs LBS.putStr bs
MerkleAnn ann@(MTreeAnn {_mtaCrypt = EncryptGroupNaClSymm gkh _}) -> do MerkleAnn ann@(MTreeAnn {_mtaCrypt = EncryptGroupNaClSymm gkh _}) -> do