mirror of https://github.com/voidlizard/hbs2
does not work
This commit is contained in:
parent
81040cbc97
commit
b7906e6f55
|
@ -49,7 +49,7 @@ tryDetect hash obj = rights [mbAnn, mbLink, mbMerkle, mbSeq] & headDef orBlob
|
||||||
|
|
||||||
data ScanLevel = ScanShallow | ScanDeep
|
data ScanLevel = ScanShallow | ScanDeep
|
||||||
|
|
||||||
extractBlockRefs :: Hash HbSync -> ByteString -> [Hash HbSync]
|
extractBlockRefs :: Hash HbSync -> ByteString -> [HashRef]
|
||||||
extractBlockRefs hx bs =
|
extractBlockRefs hx bs =
|
||||||
case tryDetect hx bs of
|
case tryDetect hx bs of
|
||||||
(SeqRef (SequentialRef _ (AnnotatedHashRef a' b))) ->
|
(SeqRef (SequentialRef _ (AnnotatedHashRef a' b))) ->
|
||||||
|
@ -58,7 +58,7 @@ extractBlockRefs hx bs =
|
||||||
AnnRef (AnnotatedHashRef ann h) -> do
|
AnnRef (AnnotatedHashRef ann h) -> do
|
||||||
coerce <$> catMaybes [ann, Just h]
|
coerce <$> catMaybes [ann, Just h]
|
||||||
|
|
||||||
Merkle (MNode _ hs) -> hs
|
Merkle (MNode _ hs) -> fmap HashRef hs
|
||||||
|
|
||||||
MerkleAnn (MTreeAnn{..}) -> do
|
MerkleAnn (MTreeAnn{..}) -> do
|
||||||
|
|
||||||
|
@ -76,7 +76,7 @@ extractBlockRefs hx bs =
|
||||||
MNode _ hs -> hs
|
MNode _ hs -> hs
|
||||||
_ -> mempty
|
_ -> mempty
|
||||||
|
|
||||||
meta <> c <> t
|
fmap HashRef (meta <> c <> t)
|
||||||
|
|
||||||
_ -> mempty
|
_ -> mempty
|
||||||
|
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
{-# Language UndecidableInstances #-}
|
{-# Language UndecidableInstances #-}
|
||||||
{-# Language AllowAmbiguousTypes #-}
|
{-# Language AllowAmbiguousTypes #-}
|
||||||
{-# Language ImplicitParams #-}
|
{-# Language ImplicitParams #-}
|
||||||
|
{-# Language RecordWildCards #-}
|
||||||
module BlockDownloadNew where
|
module BlockDownloadNew where
|
||||||
|
|
||||||
import HBS2.Prelude.Plated
|
import HBS2.Prelude.Plated
|
||||||
|
@ -552,6 +553,19 @@ instance BlockSizeCache e KnownSize where
|
||||||
findBlockSize (KnownSize s) _ _ = pure (Just s)
|
findBlockSize (KnownSize s) _ _ = pure (Just s)
|
||||||
|
|
||||||
|
|
||||||
|
data BState =
|
||||||
|
BNone
|
||||||
|
| BWaitSize TimeSpec
|
||||||
|
| BFetch
|
||||||
|
| BCheck
|
||||||
|
|
||||||
|
data BInfo e = BInfo (TVar BState)
|
||||||
|
|
||||||
|
data PInfo e =
|
||||||
|
PInfo
|
||||||
|
{ pinfoBm :: BurstMachine
|
||||||
|
}
|
||||||
|
|
||||||
downloadDispatcher :: forall e m . ( e ~ L4Proto
|
downloadDispatcher :: forall e m . ( e ~ L4Proto
|
||||||
, MonadUnliftIO m
|
, MonadUnliftIO m
|
||||||
)
|
)
|
||||||
|
@ -560,43 +574,139 @@ downloadDispatcher :: forall e m . ( e ~ L4Proto
|
||||||
-> m ()
|
-> m ()
|
||||||
downloadDispatcher brains env = flip runContT pure do
|
downloadDispatcher brains env = flip runContT pure do
|
||||||
|
|
||||||
let t0 = 10.00
|
|
||||||
|
|
||||||
pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async (), TQueue (Hash HbSync,Integer)) )
|
blkQ <- newTVarIO ( HPSQ.empty :: HashPSQ HashRef Int (BInfo e) )
|
||||||
|
peerQ <- newTVarIO ( HPSQ.empty :: HashPSQ (Peer e) Double (PInfo e) )
|
||||||
let seenLimit = 1000
|
_busy <- newTVarIO ( mempty :: HashSet (Peer e) )
|
||||||
|
allPeers <- newTVarIO ( mempty :: HashMap (Peer e) BurstMachine )
|
||||||
blkQ <- newTVarIO ( HPSQ.empty :: HashPSQ HashRef Int NominalDiffTime )
|
parseQ <- newTQueueIO @_ @(HashRef, ByteString)
|
||||||
|
|
||||||
sto <- withPeerM env getStorage
|
sto <- withPeerM env getStorage
|
||||||
|
|
||||||
work <- newTQueueIO
|
jobs <- newTQueueIO
|
||||||
|
|
||||||
ContT $ bracket none $ const do
|
-- ContT $ withAsync $ forever $ replicateM_ 16 do
|
||||||
readTVarIO pts >>= mapM_ (cancel . fst)
|
-- join $ atomically (readTQueue jobs)
|
||||||
atomically $ writeTVar pts mempty
|
|
||||||
|
ContT $ withAsync $ forever do
|
||||||
|
(h,bs) <- atomically $ readTQueue parseQ
|
||||||
|
let refs = extractBlockRefs (coerce h) bs
|
||||||
|
missed <- findMissedBlocks sto h
|
||||||
|
for_ (HS.fromList (refs <> missed)) $ \h -> do
|
||||||
|
here <- hasBlock sto (coerce h) <&> isJust
|
||||||
|
already <- readTVarIO blkQ <&> HPSQ.member h
|
||||||
|
unless (here || already) do
|
||||||
|
newBi <- BInfo @e <$> newTVarIO BNone
|
||||||
|
atomically $ modifyTVar blkQ (HPSQ.insert h 1 newBi)
|
||||||
|
|
||||||
liftIO $ withPeerM env do
|
liftIO $ withPeerM env do
|
||||||
subscribe @e DownloadReqKey $ \(DownloadReqData h) -> do
|
subscribe @e DownloadReqKey $ \(DownloadReqData h) -> do
|
||||||
now <- getTimeCoarse
|
|
||||||
here <- hasBlock sto h <&> isJust
|
here <- hasBlock sto h <&> isJust
|
||||||
already <- readTVarIO blkQ <&> HPSQ.member (HashRef h)
|
already <- readTVarIO blkQ <&> HPSQ.member (HashRef h)
|
||||||
unless (here || already) do
|
unless (here || already) do
|
||||||
atomically $ modifyTVar blkQ (HPSQ.insert (HashRef h) 1 10)
|
-- atomically $ modifyTVar blkQ (HPSQ.insert (HashRef h) 1 10)
|
||||||
|
newBi <- BInfo @e <$> newTVarIO BNone
|
||||||
|
atomically $ modifyTVar blkQ (HPSQ.insert (HashRef h) 1 newBi)
|
||||||
debug $ green "New download request" <+> pretty h
|
debug $ green "New download request" <+> pretty h
|
||||||
|
|
||||||
ContT $ withAsync $ manageThreads pts
|
-- block processing loop
|
||||||
|
ContT $ withAsync $ forever $ flip runContT pure $ callCC \exit -> callCC \exit2 -> do
|
||||||
|
blk' <- atomically $ readTVar blkQ <&> HPSQ.findMin
|
||||||
|
(h,prio,bi@(BInfo _bstate)) <- maybe (pause @'Seconds 0.5 >> exit ()) pure blk'
|
||||||
|
|
||||||
|
atomically do
|
||||||
|
free <- readTVar peerQ <&> HPSQ.size
|
||||||
|
unless (free > 0) retry
|
||||||
|
|
||||||
ContT $ withAsync do
|
debug $ green "JERK OFF" <+> pretty h
|
||||||
let sq = readTVarIO blkQ <&> fmap (\(a,_,b) -> (a,b)) . HPSQ.toList
|
|
||||||
polling (Polling 1 1) sq $ \h -> do
|
bstate <- readTVarIO _bstate
|
||||||
pips <- readTVarIO pts <&> HM.keys
|
|
||||||
forConcurrently_ pips $ \p -> do
|
case bstate of
|
||||||
|
BNone -> do
|
||||||
|
now <- getTimeCoarse
|
||||||
|
who' <- atomically do
|
||||||
|
p <- readTVar peerQ <&> HPSQ.findMin
|
||||||
|
case p of
|
||||||
|
Just some -> do
|
||||||
|
modifyTVar peerQ HPSQ.deleteMin
|
||||||
|
writeTVar _bstate (BWaitSize now)
|
||||||
|
pure (Just some)
|
||||||
|
|
||||||
|
Nothing -> pure Nothing
|
||||||
|
|
||||||
|
who <- maybe (exit2 ()) pure who'
|
||||||
|
|
||||||
|
let work _bs (p,pprio,i@PInfo{..}) = liftIO $ flip runContT pure do
|
||||||
|
ContT $ bracket none $ const do
|
||||||
|
atomically $ modifyTVar peerQ (HPSQ.insert p pprio i)
|
||||||
|
|
||||||
|
liftIO do
|
||||||
s <- queryBlockSizeFromPeer brains env (coerce h) p
|
s <- queryBlockSizeFromPeer brains env (coerce h) p
|
||||||
|
|
||||||
case s of
|
case s of
|
||||||
Right x -> debug $ red "GOT SIZE FOR BLOCK" <+> pretty h <+> pretty x <+> pretty p
|
Right (Just size) -> do
|
||||||
_ -> none
|
debug $ green "GOT BLOCK SIZE" <+> pretty h <+> pretty size <+> pretty p
|
||||||
|
atomically $ writeTVar _bs BFetch
|
||||||
|
bu <- getCurrentBurst pinfoBm
|
||||||
|
|
||||||
|
debug $ yellow "START DOWNLOAD" <+> pretty h <+> pretty p
|
||||||
|
r <- downloadFromPeer (TimeoutSec 10) bu (KnownSize size) env (coerce h) p
|
||||||
|
|
||||||
|
case r of
|
||||||
|
Right bs -> do
|
||||||
|
debug $ green "DOWNLOAD DONE" <+> pretty h <+> pretty p
|
||||||
|
atomically do
|
||||||
|
writeTQueue parseQ (h, bs)
|
||||||
|
writeTVar _bs BCheck
|
||||||
|
|
||||||
|
Left{} -> do
|
||||||
|
debug $ red "DOWNLOAD FAIL" <+> pretty h <+> pretty p
|
||||||
|
burstMachineAddErrors pinfoBm 1
|
||||||
|
|
||||||
|
Right Nothing -> do
|
||||||
|
debug $ yellow "NO BLOCK" <+> pretty h <+> pretty p
|
||||||
|
pure ()
|
||||||
|
|
||||||
|
Left{} -> do
|
||||||
|
pure ()
|
||||||
|
|
||||||
|
-- FIXME: dangeourous
|
||||||
|
void $ liftIO $ async (work _bstate who)
|
||||||
|
|
||||||
|
BWaitSize ts -> do
|
||||||
|
now <- getTimeCoarse
|
||||||
|
let done = expired (TimeoutSec 10) (now - ts)
|
||||||
|
when done do
|
||||||
|
atomically $ writeTVar _bstate BNone
|
||||||
|
atomically $ modifyTVar blkQ (HPSQ.insert h (succ prio) bi)
|
||||||
|
|
||||||
|
BFetch -> do
|
||||||
|
atomically $ modifyTVar blkQ (HPSQ.insert h (succ prio) bi)
|
||||||
|
|
||||||
|
BCheck -> do
|
||||||
|
atomically $ modifyTVar blkQ (HPSQ.delete h)
|
||||||
|
|
||||||
|
-- peer update loop
|
||||||
|
ContT $ withAsync $ withPeerM env $ forever $ (>> pause @'Seconds 5) do
|
||||||
|
|
||||||
|
peers <- getKnownPeers @e <&> HS.fromList
|
||||||
|
|
||||||
|
atomically do
|
||||||
|
busy <- readTVar _busy
|
||||||
|
every <- readTVar peerQ <&> HPSQ.toList
|
||||||
|
let alive = [ x | x@(p,_,_) <- every, HS.member p peers, not (HS.member p busy) ] & HPSQ.fromList
|
||||||
|
let newBusy = [ x | x <- HS.toList busy, HS.member x peers ]
|
||||||
|
writeTVar peerQ alive
|
||||||
|
writeTVar _busy (HS.fromList newBusy)
|
||||||
|
|
||||||
|
for_ peers $ \p -> do
|
||||||
|
here <- readTVarIO allPeers <&> HM.member p
|
||||||
|
unless here do
|
||||||
|
bm <- liftIO $ newBurstMachine 0.5 256 (Just 50) 0.05 0.10
|
||||||
|
atomically do
|
||||||
|
modifyTVar allPeers (HM.insert p bm)
|
||||||
|
modifyTVar peerQ (HPSQ.insert p 1.0 (PInfo bm))
|
||||||
|
|
||||||
forever do
|
forever do
|
||||||
pause @'Seconds 10
|
pause @'Seconds 10
|
||||||
|
@ -604,52 +714,3 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
debug $ yellow $ "I'm download dispatcher"
|
debug $ yellow $ "I'm download dispatcher"
|
||||||
debug $ yellow $ "wip:" <+> pretty size
|
debug $ yellow $ "wip:" <+> pretty size
|
||||||
|
|
||||||
where
|
|
||||||
|
|
||||||
manageThreads pts = withPeerM env $ forever do
|
|
||||||
pips <- getKnownPeers @e <&> HS.fromList
|
|
||||||
|
|
||||||
for_ pips $ \p -> do
|
|
||||||
here <- readTVarIO pts <&> HM.member p
|
|
||||||
unless here do
|
|
||||||
q <- newTQueueIO
|
|
||||||
a <- async $ peerDownloadLoop p q
|
|
||||||
atomically $ modifyTVar pts (HM.insert p (a,q))
|
|
||||||
|
|
||||||
debug $ "downloadDispatcher: knownPeer" <+> yellow (pretty p)
|
|
||||||
|
|
||||||
dead <- atomically do
|
|
||||||
total <- readTVar pts <&> HM.toList
|
|
||||||
|
|
||||||
what <- for total $ \(p,a) -> do
|
|
||||||
let pipExist = HS.member p pips
|
|
||||||
stillAlive <- pollSTM (fst a) <&> isNothing
|
|
||||||
|
|
||||||
if pipExist && stillAlive then do
|
|
||||||
pure $ Right (p,a)
|
|
||||||
else
|
|
||||||
pure $ Left (p,a)
|
|
||||||
|
|
||||||
writeTVar pts (HM.fromList (rights what))
|
|
||||||
pure $ lefts what
|
|
||||||
|
|
||||||
for_ dead $ \(p,a) -> do
|
|
||||||
cancel (fst a)
|
|
||||||
debug $ red "terminating peer loop" <+> pretty p
|
|
||||||
|
|
||||||
pause @'Seconds 5
|
|
||||||
|
|
||||||
peerDownloadLoop p q = do
|
|
||||||
bm <- liftIO $ newBurstMachine 0.5 256 (Just 50) 0.05 0.10
|
|
||||||
forever do
|
|
||||||
(h,s) <- atomically $ readTQueue q
|
|
||||||
bu <- getCurrentBurst bm
|
|
||||||
blk <- downloadFromPeer (TimeoutSec 5) bu (KnownSize s) env (coerce h) p
|
|
||||||
case blk of
|
|
||||||
Left{} -> pure ()
|
|
||||||
Right bs -> liftIO $ withPeerM env do
|
|
||||||
let refs = extractBlockRefs h bs
|
|
||||||
for_ refs $ \r -> do
|
|
||||||
addDownload @e (Just (coerce h)) r
|
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue