mirror of https://github.com/voidlizard/hbs2
works?
This commit is contained in:
parent
2fff2a7203
commit
79a352a83f
|
@ -506,7 +506,7 @@ downloadFromPeer t bu cache env h peer = liftIO $ withPeerM env do
|
||||||
blk <- readTVarIO _sBlockChunks2
|
blk <- readTVarIO _sBlockChunks2
|
||||||
let rs = LBS.concat $ IntMap.elems blk
|
let rs = LBS.concat $ IntMap.elems blk
|
||||||
|
|
||||||
ha <- enqueueBlock sto rs
|
ha <- putBlock sto rs
|
||||||
|
|
||||||
-- let ha = Just $ hashObject @HbSync rs
|
-- let ha = Just $ hashObject @HbSync rs
|
||||||
|
|
||||||
|
@ -549,18 +549,17 @@ data S2 =
|
||||||
newtype KnownSize = KnownSize Integer
|
newtype KnownSize = KnownSize Integer
|
||||||
|
|
||||||
instance BlockSizeCache e KnownSize where
|
instance BlockSizeCache e KnownSize where
|
||||||
cacheBlockSize _ _ _ _ = pure ()
|
cacheBlockSize _ y_ _ _ = pure ()
|
||||||
findBlockSize (KnownSize s) _ _ = pure (Just s)
|
findBlockSize (KnownSize s) _ _ = pure (Just s)
|
||||||
|
|
||||||
|
data BlockFetchResult =
|
||||||
|
BlockFetchError
|
||||||
|
| BlockFetched ByteString
|
||||||
|
| BlockAlreadyHere
|
||||||
|
|
||||||
data Work e =
|
data Work =
|
||||||
RequestSize HashRef (Maybe (Peer e))
|
RequestSize HashRef (Maybe Integer -> IO ())
|
||||||
| FetchBlock HashRef (Maybe (Peer e))
|
| FetchBlock HashRef Integer (BlockFetchResult -> IO ())
|
||||||
deriving stock (Generic)
|
|
||||||
|
|
||||||
deriving stock instance Eq (Peer e) => Eq (Work e)
|
|
||||||
deriving stock instance Ord (Peer e) => Ord (Work e)
|
|
||||||
instance (Eq (Peer e), Hashable (Peer e)) => Hashable (Work e)
|
|
||||||
|
|
||||||
downloadDispatcher :: forall e m . ( e ~ L4Proto
|
downloadDispatcher :: forall e m . ( e ~ L4Proto
|
||||||
, MonadUnliftIO m
|
, MonadUnliftIO m
|
||||||
|
@ -570,10 +569,17 @@ downloadDispatcher :: forall e m . ( e ~ L4Proto
|
||||||
-> m ()
|
-> m ()
|
||||||
downloadDispatcher brains env = flip runContT pure do
|
downloadDispatcher brains env = flip runContT pure do
|
||||||
|
|
||||||
pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async ()))
|
pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async (), TQueue Work))
|
||||||
tasks <- newTVarIO ( HPSQ.empty :: HashPSQ (Work e) Double (TVar Int) )
|
-- tasks <- newTVarIO ( HPSQ.empty :: HashPSQ (Work e) Double (TVar Int) )
|
||||||
|
|
||||||
void $ ContT $ withAsync $ manageThreads tasks pts
|
wip <- newTVarIO ( mempty :: HashMap HashRef NominalDiffTime )
|
||||||
|
sizeRq <- newTVarIO ( HPSQ.empty @HashRef @TimeSpec @() )
|
||||||
|
sizeRqWip <- newTVarIO ( mempty :: HashMap (Peer e, HashRef) TimeSpec)
|
||||||
|
sizeCache <- newTVarIO ( mempty :: HashMap (Peer e, HashRef) (Maybe Integer) )
|
||||||
|
downWip <- newTVarIO ( HPSQ.empty @(Peer e, HashRef) @TimeSpec @Integer )
|
||||||
|
choosen <- newTVarIO ( mempty :: HashSet HashRef )
|
||||||
|
|
||||||
|
void $ ContT $ withAsync $ manageThreads pts
|
||||||
|
|
||||||
sto <- withPeerM env getStorage
|
sto <- withPeerM env getStorage
|
||||||
|
|
||||||
|
@ -581,51 +587,142 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
subscribe @e DownloadReqKey $ \(DownloadReqData h) -> do
|
subscribe @e DownloadReqKey $ \(DownloadReqData h) -> do
|
||||||
here <- hasBlock sto h <&> isJust
|
here <- hasBlock sto h <&> isJust
|
||||||
unless here do
|
unless here do
|
||||||
atomically do
|
now <- getTimeCoarse
|
||||||
addNewTaskSTM tasks (RequestSize (HashRef h) Nothing) 1.0
|
|
||||||
|
|
||||||
debug $ green "New download request" <+> pretty h
|
debug $ green "New download request" <+> pretty h
|
||||||
|
atomically do
|
||||||
|
modifyTVar sizeRq (HPSQ.insert (HashRef h) now ())
|
||||||
|
modifyTVar wip (HM.insert (HashRef h) 10)
|
||||||
|
|
||||||
ContT $ withAsync do
|
let onBlockSize p h s = do
|
||||||
|
let color = if isJust s then green else red
|
||||||
|
debug $ color "GOT BLOCK SIZE" <+> pretty h <+> pretty s <+> pretty p
|
||||||
|
now <- getTimeCoarse
|
||||||
|
atomically do
|
||||||
|
modifyTVar sizeRq (HPSQ.delete h)
|
||||||
|
modifyTVar sizeCache (HM.insert (p,h) s)
|
||||||
|
modifyTVar sizeRqWip (HM.delete (p,h))
|
||||||
|
maybe1 s none $ \size -> do
|
||||||
|
modifyTVar downWip (HPSQ.insert (p,h) now size)
|
||||||
|
|
||||||
-- atomically do
|
parseQ <- newTQueueIO
|
||||||
-- tss <- readTVar tasks <&> HPSQ.toList
|
|
||||||
|
|
||||||
-- let tssAlive = flip L.filter tss $ \case
|
let
|
||||||
-- (RequestSize b _,_,_) | b == blk -> False
|
deleteBlockFromWip :: HashRef -> IO ()
|
||||||
-- (FetchBlock b _,_,_) | b == blk -> False
|
deleteBlockFromWip h = do
|
||||||
-- _ -> True
|
atomically do
|
||||||
|
modifyTVar wip (HM.delete h)
|
||||||
|
modifyTVar sizeRq (HPSQ.delete h)
|
||||||
|
modifyTVar choosen (HS.delete h)
|
||||||
|
srwq <- readTVar sizeRqWip <&> HM.toList
|
||||||
|
writeTVar sizeRqWip (HM.fromList $ [ x | x@((_,hi),_) <- srwq, hi /= h ])
|
||||||
|
dw <- readTVar downWip <&> HPSQ.toList
|
||||||
|
writeTVar downWip (HPSQ.fromList $ [ x | x@((_,hi),_,_) <- dw, hi /= h ])
|
||||||
|
cs <- readTVar sizeCache <&> HM.toList
|
||||||
|
writeTVar sizeCache (HM.fromList $ [ x | x@((_,hi),_) <- cs, hi /= h ])
|
||||||
|
|
||||||
-- writeTVar tasks (HPSQ.fromList tssAlive)
|
let onBlock p h = \case
|
||||||
|
|
||||||
|
BlockAlreadyHere -> do
|
||||||
|
debug $ yellow "ALREADY HAVE BLOCK" <+> pretty h
|
||||||
|
deleteBlockFromWip h
|
||||||
|
|
||||||
|
BlockFetched bs -> do
|
||||||
|
debug $ green "GOT BLOCK" <+> pretty h <+> pretty (LBS.length bs) <+> pretty p
|
||||||
|
atomically $ writeTQueue parseQ (h, bs)
|
||||||
|
deleteBlockFromWip h
|
||||||
|
|
||||||
|
BlockFetchError -> do
|
||||||
|
now <- getTimeCoarse
|
||||||
|
debug $ red "BLOCK DOWNLOAD FAIL" <+> pretty h <+> pretty p
|
||||||
|
atomically $ modifyTVar sizeRq (HPSQ.insert h now ())
|
||||||
|
|
||||||
|
ContT $ withAsync $ forever do
|
||||||
|
let blkz = readTVarIO wip <&> fmap (,10) . HM.keys
|
||||||
|
polling (Polling 1 1) blkz $ \h -> do
|
||||||
|
debug $ "CLEANUP BLOCK" <+> pretty h
|
||||||
|
here <- hasBlock sto (coerce h) <&> isJust
|
||||||
|
when here $ do
|
||||||
|
liftIO $ deleteBlockFromWip h
|
||||||
|
|
||||||
|
-- ContT $ withAsync do
|
||||||
|
-- let blkz = readTVarIO wip <&> HM.toList
|
||||||
|
-- polling (Polling 1 1) blkz $ \h -> do
|
||||||
|
-- debug $ "POLL BLOCK" <+> pretty h
|
||||||
|
-- atomically $ modifyTVar wip (HM.adjust (*1.10) h)
|
||||||
|
-- here <- hasBlock sto (coerce h) <&> isJust
|
||||||
|
-- now <- getTimeCoarse
|
||||||
|
-- unless here $ atomically do
|
||||||
|
-- modifyTVar sizeRq (HPSQ.insert h now ())
|
||||||
|
-- modifyTVar choosen (HS.delete h)
|
||||||
|
|
||||||
|
ContT $ withAsync $ forever do
|
||||||
|
(h,bs) <- atomically $ readTQueue parseQ
|
||||||
|
let refs = extractBlockRefs (coerce h) bs
|
||||||
|
missed <- findMissedBlocks sto h
|
||||||
|
now <- getTimeCoarse
|
||||||
|
for_ (HS.fromList ( refs <> missed )) $ \h -> do
|
||||||
|
here <- hasBlock sto (coerce h) <&> isJust
|
||||||
|
unless here do
|
||||||
|
atomically $ modifyTVar sizeRq (HPSQ.insert h now ())
|
||||||
|
|
||||||
|
ContT $ withAsync $ forever do
|
||||||
|
|
||||||
|
reqs <- atomically do
|
||||||
|
xs <- readTVar sizeRq <&> HPSQ.toList
|
||||||
|
when (L.null xs) retry
|
||||||
|
writeTVar sizeRq HPSQ.empty
|
||||||
|
pure [ h | (h,_,_) <- xs ]
|
||||||
|
|
||||||
|
peers <- readTVarIO pts <&> HM.toList
|
||||||
|
|
||||||
|
let tasks = [ (p, w, h) | (p,(_,w)) <- peers, h <- reqs ]
|
||||||
|
|
||||||
|
now <- getTimeCoarse
|
||||||
|
for_ tasks $ \(p, w, h) -> do
|
||||||
|
|
||||||
|
atomically do
|
||||||
|
writeTQueue w (RequestSize h (onBlockSize p h))
|
||||||
|
modifyTVar sizeRqWip (HM.insert (p,h) now)
|
||||||
|
|
||||||
|
ContT $ withAsync $ forever do
|
||||||
|
atomically do
|
||||||
|
peers <- readTVar pts
|
||||||
|
let n = HM.size peers * 20
|
||||||
|
|
||||||
|
when ( n == 0 ) retry
|
||||||
|
|
||||||
|
(w,rest) <- readTVar downWip <&> L.splitAt n . HPSQ.toList
|
||||||
|
|
||||||
|
writeTVar downWip (HPSQ.fromList rest)
|
||||||
|
|
||||||
|
when (L.null w) retry
|
||||||
|
|
||||||
|
for_ w $ \e@((p,h),prio,size) -> do
|
||||||
|
case HM.lookup p peers of
|
||||||
|
Nothing -> modifyTVar downWip (HPSQ.insert (p,h) prio size)
|
||||||
|
Just (_,q) -> do
|
||||||
|
here <- readTVar choosen <&> HS.member h
|
||||||
|
unless here do
|
||||||
|
writeTQueue q (FetchBlock h size (onBlock p h))
|
||||||
|
modifyTVar choosen (HS.insert h)
|
||||||
|
|
||||||
pause @'Seconds 5
|
|
||||||
|
|
||||||
forever $ (>> pause @'Seconds 10) do
|
forever $ (>> pause @'Seconds 10) do
|
||||||
size <- atomically $ readTVar tasks <&> HPSQ.size
|
sw0 <- readTVarIO wip <&> HM.size
|
||||||
debug $ yellow $ "wip:" <+> pretty size
|
srqw <- readTVarIO sizeRqWip <&> HM.size
|
||||||
|
sdw <- readTVarIO downWip <&> HPSQ.size
|
||||||
|
scsize <- readTVarIO sizeCache <&> HM.size
|
||||||
|
chooSize <- readTVarIO choosen <&> HS.size
|
||||||
|
debug $ yellow $ "wip0" <+> pretty sw0
|
||||||
|
<+> "wip1:" <+> pretty srqw
|
||||||
|
<+> "wip2" <+> pretty sdw
|
||||||
|
<+> "wip3" <+> pretty chooSize
|
||||||
|
<+> "sizes" <+> pretty scsize
|
||||||
|
|
||||||
where
|
where
|
||||||
|
|
||||||
addNewTaskSTM tasks k w = do
|
|
||||||
t <- newTVar 0
|
|
||||||
modifyTVar tasks (HPSQ.insert k w t)
|
|
||||||
|
|
||||||
delTaskSTM tasks k = modifyTVar tasks (HPSQ.delete k)
|
manageThreads pts = forever $ (>> pause @'Seconds 10) $ withPeerM env do
|
||||||
|
|
||||||
releaseTask v = atomically do
|
|
||||||
writeTVar v 0
|
|
||||||
|
|
||||||
seizeTask tasks k = atomically do
|
|
||||||
w <- readTVar tasks <&> HPSQ.lookup k
|
|
||||||
maybe1 w (pure Nothing) $ \(_,v) -> do
|
|
||||||
vv <- readTVar v
|
|
||||||
if vv > 0 then do
|
|
||||||
pure Nothing
|
|
||||||
else do
|
|
||||||
writeTVar v 1
|
|
||||||
pure (Just v)
|
|
||||||
|
|
||||||
manageThreads tasks pts = forever $ (>> pause @'Seconds 10) $ withPeerM env do
|
|
||||||
debug "MANAGE THREADS"
|
debug "MANAGE THREADS"
|
||||||
|
|
||||||
peers <- getKnownPeers @e <&> HS.fromList
|
peers <- getKnownPeers @e <&> HS.fromList
|
||||||
|
@ -633,8 +730,9 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
for_ peers $ \p -> do
|
for_ peers $ \p -> do
|
||||||
here <- readTVarIO pts <&> HM.member p
|
here <- readTVarIO pts <&> HM.member p
|
||||||
unless here do
|
unless here do
|
||||||
a <- async (peerThread p tasks)
|
work <- newTQueueIO
|
||||||
atomically $ modifyTVar pts (HM.insert p a)
|
a <- async (peerThread p work)
|
||||||
|
atomically $ modifyTVar pts (HM.insert p (a,work))
|
||||||
|
|
||||||
loosers <- atomically do
|
loosers <- atomically do
|
||||||
xs <- readTVar pts <&> HM.toList
|
xs <- readTVar pts <&> HM.toList
|
||||||
|
@ -643,9 +741,9 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
writeTVar pts (HM.fromList alive)
|
writeTVar pts (HM.fromList alive)
|
||||||
pure dead
|
pure dead
|
||||||
|
|
||||||
mapM_ cancel (fmap snd loosers)
|
mapM_ cancel (fmap (fst.snd) loosers)
|
||||||
|
|
||||||
peerThread p tasks = flip runContT pure do
|
peerThread p work = flip runContT pure do
|
||||||
|
|
||||||
sto <- withPeerM env getStorage
|
sto <- withPeerM env getStorage
|
||||||
|
|
||||||
|
@ -660,102 +758,37 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
|
|
||||||
bmt <- ContT $ withAsync $ runBurstMachine bm
|
bmt <- ContT $ withAsync $ runBurstMachine bm
|
||||||
|
|
||||||
parseQ <- newTQueueIO @_ @(HashRef, ByteString)
|
twork <- ContT $ withAsync $ forever do
|
||||||
|
w <- atomically $ readTQueue work
|
||||||
pt <- ContT $ withAsync $ forever do
|
|
||||||
(hr,bs) <- atomically $ readTQueue parseQ
|
|
||||||
let refs = extractBlockRefs (coerce hr) bs
|
|
||||||
for_ refs $ \r -> atomically do
|
|
||||||
addNewTaskSTM tasks (RequestSize r (Just p)) 0.1
|
|
||||||
addNewTaskSTM tasks (RequestSize r Nothing) 1
|
|
||||||
|
|
||||||
miss <- findMissedBlocks sto hr
|
|
||||||
atomically do
|
|
||||||
for_ miss $ \r -> do
|
|
||||||
addNewTaskSTM tasks (RequestSize r (Just p)) 0.1
|
|
||||||
addNewTaskSTM tasks (RequestSize r Nothing) 1
|
|
||||||
|
|
||||||
tt <- ContT $ withAsync $ forever $ flip runContT pure do
|
|
||||||
|
|
||||||
callCC \again -> callCC \again2 -> do
|
|
||||||
|
|
||||||
q <- readTVarIO tasks <&> HPSQ.keys
|
|
||||||
|
|
||||||
when (L.null q) do
|
|
||||||
pause @'Seconds 1
|
|
||||||
again ()
|
|
||||||
|
|
||||||
for_ q $ \case
|
|
||||||
|
|
||||||
RequestSize blk (Just who) | who == p -> do
|
|
||||||
r <- lift $ queryBlockSizeFromPeer brains env (coerce blk) p
|
|
||||||
case r of
|
|
||||||
Left{} -> do
|
|
||||||
atomically $ delTaskSTM tasks (RequestSize blk (Just p))
|
|
||||||
again ()
|
|
||||||
|
|
||||||
Right s -> do
|
|
||||||
atomically do
|
|
||||||
modifyTVar _sizeCache (HM.insert blk s)
|
|
||||||
delTaskSTM tasks (RequestSize blk (Just p))
|
|
||||||
addNewTaskSTM tasks (FetchBlock blk (Just p)) 1
|
|
||||||
|
|
||||||
RequestSize blk Nothing -> do
|
|
||||||
r <- lift $ queryBlockSizeFromPeer brains env (coerce blk) p
|
|
||||||
case r of
|
|
||||||
Left{} -> again ()
|
|
||||||
|
|
||||||
Right s -> do
|
|
||||||
atomically do
|
|
||||||
modifyTVar _sizeCache (HM.insert blk s)
|
|
||||||
delTaskSTM tasks (RequestSize blk (Just p))
|
|
||||||
addNewTaskSTM tasks (FetchBlock blk (Just p)) 1
|
|
||||||
|
|
||||||
FetchBlock blk w -> do
|
|
||||||
s <- atomically $ readTVar _sizeCache <&> HM.lookup blk
|
|
||||||
|
|
||||||
|
case w of
|
||||||
|
RequestSize h answ -> do
|
||||||
|
here <- hasBlock sto (coerce h) <&> isJust
|
||||||
|
unless here do
|
||||||
|
debug $ yellow "RequestSize" <+> pretty h <+> pretty p
|
||||||
|
s <- queryBlockSizeFromPeer brains env (coerce h) p
|
||||||
case s of
|
case s of
|
||||||
Just (Just size) -> do
|
Left{} -> none
|
||||||
|
Right s -> liftIO (answ s)
|
||||||
|
|
||||||
debug $ "START DOWNLOAD!" <+> pretty blk <+> pretty p
|
FetchBlock h s answ -> flip fix 0 $ \next i -> do
|
||||||
|
here <- hasBlock sto (coerce h) <&> isJust
|
||||||
|
if here then do
|
||||||
|
liftIO $ answ BlockAlreadyHere
|
||||||
|
else do
|
||||||
|
debug $ yellow "START TO DOWNLOAD" <+> pretty h <+> pretty p
|
||||||
|
bu <- lift $ getCurrentBurst bm
|
||||||
|
r <- lift $ downloadFromPeer (TimeoutSec 30) bu (KnownSize s) env (coerce h) p
|
||||||
|
|
||||||
here <- hasBlock sto (coerce blk) <&> isJust
|
case r of
|
||||||
|
Right bs -> liftIO $ answ (BlockFetched bs)
|
||||||
|
|
||||||
when here $ again ()
|
Left{} | i >= 5 -> liftIO $ answ BlockFetchError
|
||||||
|
| otherwise -> next (succ i)
|
||||||
seized' <- seizeTask tasks (FetchBlock blk w)
|
|
||||||
|
|
||||||
seized <- case seized' of
|
|
||||||
Nothing -> again2 ()
|
|
||||||
Just x -> pure x
|
|
||||||
|
|
||||||
bu <- lift $ getCurrentBurst bm
|
|
||||||
what <- lift $ downloadFromPeer (TimeoutSec 5) bu brains env (coerce blk) p
|
|
||||||
case what of
|
|
||||||
Left{} -> do
|
|
||||||
lift $ burstMachineAddErrors bm 1
|
|
||||||
releaseTask seized
|
|
||||||
again ()
|
|
||||||
|
|
||||||
Right bs -> do
|
|
||||||
atomically do
|
|
||||||
tss <- readTVar tasks <&> HPSQ.toList
|
|
||||||
|
|
||||||
let tssAlive = flip L.filter tss $ \case
|
|
||||||
(RequestSize b _,_,_) | b == blk -> False
|
|
||||||
(FetchBlock b _,_,_) | b == blk -> False
|
|
||||||
_ -> True
|
|
||||||
|
|
||||||
writeTVar tasks (HPSQ.fromList tssAlive)
|
|
||||||
writeTQueue parseQ (blk, bs)
|
|
||||||
|
|
||||||
_ -> none
|
|
||||||
|
|
||||||
_ -> none
|
|
||||||
|
|
||||||
bs <- ContT $ withAsync $ forever do
|
bs <- ContT $ withAsync $ forever do
|
||||||
pause @'Seconds 10
|
pause @'Seconds 10
|
||||||
debug $ yellow "I'm thread" <+> pretty p
|
debug $ yellow "I'm thread" <+> pretty p
|
||||||
|
|
||||||
void $ waitAnyCatchCancel [bmt,tt,bs]
|
void $ waitAnyCatchCancel [bmt,bs,twork]
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue