mirror of https://github.com/voidlizard/hbs2
sort of work?
This commit is contained in:
parent
46369fc123
commit
c134ae1a6f
|
@ -553,18 +553,14 @@ instance BlockSizeCache e KnownSize where
|
||||||
findBlockSize (KnownSize s) _ _ = pure (Just s)
|
findBlockSize (KnownSize s) _ _ = pure (Just s)
|
||||||
|
|
||||||
|
|
||||||
data BState =
|
data Work e =
|
||||||
BNone
|
RequestSize (Maybe (Peer e)) HashRef
|
||||||
| BWaitSize TimeSpec
|
| FetchBlock (Maybe (Peer e)) HashRef
|
||||||
| BFetch
|
deriving stock (Generic)
|
||||||
| BCheck
|
|
||||||
|
|
||||||
data BInfo e = BInfo (TVar BState)
|
deriving stock instance Eq (Peer e) => Eq (Work e)
|
||||||
|
deriving stock instance Ord (Peer e) => Ord (Work e)
|
||||||
data PInfo e =
|
instance (Eq (Peer e), Hashable (Peer e)) => Hashable (Work e)
|
||||||
PInfo
|
|
||||||
{ pinfoBm :: BurstMachine
|
|
||||||
}
|
|
||||||
|
|
||||||
downloadDispatcher :: forall e m . ( e ~ L4Proto
|
downloadDispatcher :: forall e m . ( e ~ L4Proto
|
||||||
, MonadUnliftIO m
|
, MonadUnliftIO m
|
||||||
|
@ -574,138 +570,144 @@ downloadDispatcher :: forall e m . ( e ~ L4Proto
|
||||||
-> m ()
|
-> m ()
|
||||||
downloadDispatcher brains env = flip runContT pure do
|
downloadDispatcher brains env = flip runContT pure do
|
||||||
|
|
||||||
|
pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async ()))
|
||||||
|
tasks <- newTVarIO ( HPSQ.empty :: HashPSQ (Work e) Double Int )
|
||||||
|
|
||||||
blkQ <- newTVarIO ( HPSQ.empty :: HashPSQ HashRef Int (BInfo e) )
|
void $ ContT $ withAsync $ manageThreads tasks pts
|
||||||
peerQ <- newTVarIO ( HPSQ.empty :: HashPSQ (Peer e) Double (PInfo e) )
|
|
||||||
_busy <- newTVarIO ( mempty :: HashSet (Peer e) )
|
|
||||||
allPeers <- newTVarIO ( mempty :: HashMap (Peer e) BurstMachine )
|
|
||||||
parseQ <- newTQueueIO @_ @(HashRef, ByteString)
|
|
||||||
|
|
||||||
sto <- withPeerM env getStorage
|
sto <- withPeerM env getStorage
|
||||||
|
|
||||||
jobs <- newTQueueIO
|
|
||||||
|
|
||||||
-- ContT $ withAsync $ forever $ replicateM_ 16 do
|
|
||||||
-- join $ atomically (readTQueue jobs)
|
|
||||||
|
|
||||||
ContT $ withAsync $ forever do
|
|
||||||
(h,bs) <- atomically $ readTQueue parseQ
|
|
||||||
let refs = extractBlockRefs (coerce h) bs
|
|
||||||
missed <- findMissedBlocks sto h
|
|
||||||
for_ (HS.fromList (refs <> missed)) $ \h -> do
|
|
||||||
here <- hasBlock sto (coerce h) <&> isJust
|
|
||||||
already <- readTVarIO blkQ <&> HPSQ.member h
|
|
||||||
unless (here || already) do
|
|
||||||
newBi <- BInfo @e <$> newTVarIO BNone
|
|
||||||
atomically $ modifyTVar blkQ (HPSQ.insert h 1 newBi)
|
|
||||||
|
|
||||||
liftIO $ withPeerM env do
|
liftIO $ withPeerM env do
|
||||||
subscribe @e DownloadReqKey $ \(DownloadReqData h) -> do
|
subscribe @e DownloadReqKey $ \(DownloadReqData h) -> do
|
||||||
here <- hasBlock sto h <&> isJust
|
here <- hasBlock sto h <&> isJust
|
||||||
already <- readTVarIO blkQ <&> HPSQ.member (HashRef h)
|
unless here do
|
||||||
unless (here || already) do
|
atomically do
|
||||||
-- atomically $ modifyTVar blkQ (HPSQ.insert (HashRef h) 1 10)
|
modifyTVar tasks (HPSQ.insert (RequestSize Nothing (HashRef h)) 1.0 0)
|
||||||
newBi <- BInfo @e <$> newTVarIO BNone
|
|
||||||
atomically $ modifyTVar blkQ (HPSQ.insert (HashRef h) 1 newBi)
|
|
||||||
debug $ green "New download request" <+> pretty h
|
debug $ green "New download request" <+> pretty h
|
||||||
|
|
||||||
-- block processing loop
|
forever $ (>> pause @'Seconds 10) do
|
||||||
ContT $ withAsync $ forever $ flip runContT pure $ callCC \exit -> callCC \exit2 -> do
|
size <- atomically $ readTVar tasks <&> HPSQ.size
|
||||||
blk' <- atomically $ readTVar blkQ <&> HPSQ.findMin
|
|
||||||
(h,prio,bi@(BInfo _bstate)) <- maybe (pause @'Seconds 0.5 >> exit ()) pure blk'
|
|
||||||
|
|
||||||
|
|
||||||
bstate <- readTVarIO _bstate
|
|
||||||
|
|
||||||
case bstate of
|
|
||||||
BNone -> do
|
|
||||||
now <- getTimeCoarse
|
|
||||||
who <- atomically do
|
|
||||||
p <- readTVar peerQ <&> HPSQ.findMin
|
|
||||||
case p of
|
|
||||||
Just some -> do
|
|
||||||
modifyTVar peerQ HPSQ.deleteMin
|
|
||||||
writeTVar _bstate (BWaitSize now)
|
|
||||||
modifyTVar blkQ (HPSQ.insert h (succ prio) bi)
|
|
||||||
pure some
|
|
||||||
|
|
||||||
Nothing -> retry
|
|
||||||
|
|
||||||
let work _bs (p,pprio,i@PInfo{..}) = liftIO $ flip runContT pure do
|
|
||||||
ContT $ bracket none $ const do
|
|
||||||
atomically $ modifyTVar peerQ (HPSQ.insert p pprio i)
|
|
||||||
|
|
||||||
liftIO do
|
|
||||||
s <- queryBlockSizeFromPeer brains env (coerce h) p
|
|
||||||
|
|
||||||
case s of
|
|
||||||
Right (Just size) -> do
|
|
||||||
debug $ green "GOT BLOCK SIZE" <+> pretty h <+> pretty size <+> pretty p
|
|
||||||
atomically $ writeTVar _bs BFetch
|
|
||||||
bu <- getCurrentBurst pinfoBm
|
|
||||||
|
|
||||||
debug $ yellow "START DOWNLOAD" <+> pretty h <+> pretty p
|
|
||||||
r <- downloadFromPeer (TimeoutSec 10) bu (KnownSize size) env (coerce h) p
|
|
||||||
|
|
||||||
case r of
|
|
||||||
Right bs -> do
|
|
||||||
debug $ green "DOWNLOAD DONE" <+> pretty h <+> pretty p
|
|
||||||
atomically do
|
|
||||||
writeTQueue parseQ (h, bs)
|
|
||||||
writeTVar _bs BCheck
|
|
||||||
|
|
||||||
Left{} -> do
|
|
||||||
debug $ red "DOWNLOAD FAIL" <+> pretty h <+> pretty p
|
|
||||||
atomically $ writeTVar _bstate BNone
|
|
||||||
burstMachineAddErrors pinfoBm 1
|
|
||||||
|
|
||||||
Right Nothing -> do
|
|
||||||
debug $ yellow "NO BLOCK" <+> pretty h <+> pretty p
|
|
||||||
atomically $ writeTVar _bstate BNone
|
|
||||||
|
|
||||||
Left{} -> do
|
|
||||||
pure ()
|
|
||||||
|
|
||||||
-- FIXME: dangeourous
|
|
||||||
void $ liftIO $ async (work _bstate who)
|
|
||||||
|
|
||||||
BWaitSize ts -> do
|
|
||||||
now <- getTimeCoarse
|
|
||||||
let done = expired (TimeoutSec 10) (now - ts)
|
|
||||||
when done do
|
|
||||||
atomically $ writeTVar _bstate BNone
|
|
||||||
atomically $ modifyTVar blkQ (HPSQ.insert h (succ prio) bi)
|
|
||||||
|
|
||||||
BFetch -> do
|
|
||||||
atomically $ modifyTVar blkQ (HPSQ.insert h (succ prio) bi)
|
|
||||||
|
|
||||||
BCheck -> do
|
|
||||||
atomically $ modifyTVar blkQ (HPSQ.delete h)
|
|
||||||
|
|
||||||
-- peer update loop
|
|
||||||
ContT $ withAsync $ withPeerM env $ forever $ (>> pause @'Seconds 5) do
|
|
||||||
|
|
||||||
peers <- getKnownPeers @e <&> HS.fromList
|
|
||||||
|
|
||||||
atomically do
|
|
||||||
busy <- readTVar _busy
|
|
||||||
every <- readTVar peerQ <&> HPSQ.toList
|
|
||||||
let alive = [ x | x@(p,_,_) <- every, HS.member p peers, not (HS.member p busy) ] & HPSQ.fromList
|
|
||||||
let newBusy = [ x | x <- HS.toList busy, HS.member x peers ]
|
|
||||||
writeTVar peerQ alive
|
|
||||||
writeTVar _busy (HS.fromList newBusy)
|
|
||||||
|
|
||||||
for_ peers $ \p -> do
|
|
||||||
here <- readTVarIO allPeers <&> HM.member p
|
|
||||||
unless here do
|
|
||||||
bm <- liftIO $ newBurstMachine 0.5 256 (Just 50) 0.05 0.10
|
|
||||||
atomically do
|
|
||||||
modifyTVar allPeers (HM.insert p bm)
|
|
||||||
modifyTVar peerQ (HPSQ.insert p 1.0 (PInfo bm))
|
|
||||||
|
|
||||||
forever do
|
|
||||||
pause @'Seconds 10
|
|
||||||
size <- atomically $ readTVar blkQ <&> HPSQ.size
|
|
||||||
debug $ yellow $ "I'm download dispatcher"
|
|
||||||
debug $ yellow $ "wip:" <+> pretty size
|
debug $ yellow $ "wip:" <+> pretty size
|
||||||
|
|
||||||
|
where
|
||||||
|
manageThreads tasks pts = forever $ (>> pause @'Seconds 10) $ withPeerM env do
|
||||||
|
debug "MANAGE THREADS"
|
||||||
|
|
||||||
|
peers <- getKnownPeers @e <&> HS.fromList
|
||||||
|
|
||||||
|
for_ peers $ \p -> do
|
||||||
|
here <- readTVarIO pts <&> HM.member p
|
||||||
|
unless here do
|
||||||
|
a <- async (peerThread p tasks)
|
||||||
|
atomically $ modifyTVar pts (HM.insert p a)
|
||||||
|
|
||||||
|
loosers <- atomically do
|
||||||
|
xs <- readTVar pts <&> HM.toList
|
||||||
|
-- FIXME: filter-stopped-tasks
|
||||||
|
let (alive,dead) = L.partition ( \(x,_) -> HS.member x peers ) xs
|
||||||
|
writeTVar pts (HM.fromList alive)
|
||||||
|
pure dead
|
||||||
|
|
||||||
|
mapM_ cancel (fmap snd loosers)
|
||||||
|
|
||||||
|
peerThread p tasks = flip runContT pure do
|
||||||
|
|
||||||
|
sto <- withPeerM env getStorage
|
||||||
|
|
||||||
|
_sizeCache <- newTVarIO ( mempty :: HashMap HashRef (Maybe Integer) )
|
||||||
|
|
||||||
|
bm <- liftIO $ newBurstMachine 0.5 256 (Just 50) 0.05 0.10
|
||||||
|
|
||||||
|
void $ ContT $ bracket none $ const do
|
||||||
|
debug $ "Cancelling thread for" <+> pretty p
|
||||||
|
|
||||||
|
debug $ yellow "Started thread for" <+> pretty p
|
||||||
|
|
||||||
|
bmt <- ContT $ withAsync $ runBurstMachine bm
|
||||||
|
|
||||||
|
parseQ <- newTQueueIO @_ @(HashRef, ByteString)
|
||||||
|
|
||||||
|
pt <- ContT $ withAsync $ forever do
|
||||||
|
(hr,bs) <- atomically $ readTQueue parseQ
|
||||||
|
let refs = extractBlockRefs (coerce hr) bs
|
||||||
|
atomically do
|
||||||
|
for_ refs $ \r -> do
|
||||||
|
modifyTVar tasks (HPSQ.insert (RequestSize (Just p) r) 0.1 0)
|
||||||
|
modifyTVar tasks (HPSQ.insert (RequestSize Nothing r) 1 0)
|
||||||
|
|
||||||
|
miss <- findMissedBlocks sto hr
|
||||||
|
atomically do
|
||||||
|
for_ miss $ \r -> do
|
||||||
|
modifyTVar tasks (HPSQ.insert (RequestSize (Just p) r) 0.1 0)
|
||||||
|
modifyTVar tasks (HPSQ.insert (RequestSize Nothing r) 1 0)
|
||||||
|
|
||||||
|
tt <- ContT $ withAsync $ forever $ flip runContT pure do
|
||||||
|
|
||||||
|
callCC \again -> do
|
||||||
|
|
||||||
|
q <- readTVarIO tasks <&> HPSQ.keys
|
||||||
|
|
||||||
|
for_ q $ \case
|
||||||
|
|
||||||
|
RequestSize (Just who) blk | who == p -> do
|
||||||
|
r <- lift $ queryBlockSizeFromPeer brains env (coerce blk) p
|
||||||
|
case r of
|
||||||
|
Left{} -> do
|
||||||
|
atomically $ modifyTVar tasks (HPSQ.delete (RequestSize (Just p) blk))
|
||||||
|
again ()
|
||||||
|
|
||||||
|
Right s -> do
|
||||||
|
atomically do
|
||||||
|
modifyTVar _sizeCache (HM.insert blk s)
|
||||||
|
modifyTVar tasks (HPSQ.delete (RequestSize (Just p) blk))
|
||||||
|
modifyTVar tasks (HPSQ.insert (FetchBlock (Just p) blk) 1 0)
|
||||||
|
|
||||||
|
RequestSize Nothing blk -> do
|
||||||
|
r <- lift $ queryBlockSizeFromPeer brains env (coerce blk) p
|
||||||
|
case r of
|
||||||
|
Left{} -> again ()
|
||||||
|
|
||||||
|
Right s -> do
|
||||||
|
atomically do
|
||||||
|
modifyTVar _sizeCache (HM.insert blk s)
|
||||||
|
modifyTVar tasks (HPSQ.delete (RequestSize (Just p) blk))
|
||||||
|
modifyTVar tasks (HPSQ.insert (FetchBlock (Just p) blk) 1 0)
|
||||||
|
|
||||||
|
FetchBlock w blk -> do
|
||||||
|
s <- atomically $ readTVar _sizeCache <&> HM.lookup blk
|
||||||
|
|
||||||
|
case s of
|
||||||
|
Just (Just size) -> do
|
||||||
|
debug $ "START DOWNLOAD!" <+> pretty blk <+> pretty p
|
||||||
|
atomically $ modifyTVar tasks (HPSQ.delete (FetchBlock w blk))
|
||||||
|
bu <- lift $ getCurrentBurst bm
|
||||||
|
what <- lift $ downloadFromPeer (TimeoutSec 5) bu brains env (coerce blk) p
|
||||||
|
case what of
|
||||||
|
Left{} -> do
|
||||||
|
lift $ burstMachineAddErrors bm 1
|
||||||
|
atomically $ modifyTVar tasks (HPSQ.insert (FetchBlock w blk) 1 0)
|
||||||
|
again ()
|
||||||
|
|
||||||
|
Right bs -> do
|
||||||
|
atomically do
|
||||||
|
tss <- readTVar tasks <&> HPSQ.toList
|
||||||
|
|
||||||
|
let tssAlive = flip L.filter tss $ \case
|
||||||
|
(RequestSize _ b,_,_) | b == blk -> False
|
||||||
|
(FetchBlock _ b,_,_) | b == blk -> False
|
||||||
|
_ -> True
|
||||||
|
|
||||||
|
writeTVar tasks (HPSQ.fromList tssAlive)
|
||||||
|
writeTQueue parseQ (blk, bs)
|
||||||
|
|
||||||
|
_ -> none
|
||||||
|
|
||||||
|
_ -> none
|
||||||
|
|
||||||
|
bs <- ContT $ withAsync $ forever do
|
||||||
|
pause @'Seconds 10
|
||||||
|
debug $ yellow "I'm thread" <+> pretty p
|
||||||
|
|
||||||
|
void $ waitAnyCatchCancel [bmt,tt,bs]
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue