mirror of https://github.com/voidlizard/hbs2
wip
This commit is contained in:
parent
0bd62e50cc
commit
8435f16a15
|
@ -81,7 +81,7 @@ defBlockWaitMax = 60 :: Timeout 'Seconds
|
||||||
|
|
||||||
-- how much time wait for block from peer?
|
-- how much time wait for block from peer?
|
||||||
defChunkWaitMax :: Timeout 'Seconds
|
defChunkWaitMax :: Timeout 'Seconds
|
||||||
defChunkWaitMax = 10 :: Timeout 'Seconds
|
defChunkWaitMax = 30 :: Timeout 'Seconds
|
||||||
|
|
||||||
defSweepTimeout :: Timeout 'Seconds
|
defSweepTimeout :: Timeout 'Seconds
|
||||||
defSweepTimeout = 60 -- FIXME: only for debug!
|
defSweepTimeout = 60 -- FIXME: only for debug!
|
||||||
|
|
|
@ -70,6 +70,7 @@ import Data.List qualified as L
|
||||||
import Data.Coerce
|
import Data.Coerce
|
||||||
import Numeric
|
import Numeric
|
||||||
import UnliftIO
|
import UnliftIO
|
||||||
|
import UnliftIO.Concurrent
|
||||||
import Lens.Micro.Platform
|
import Lens.Micro.Platform
|
||||||
import Streaming.Prelude qualified as S
|
import Streaming.Prelude qualified as S
|
||||||
|
|
||||||
|
@ -82,6 +83,7 @@ data DownloadError e =
|
||||||
| PeerMissBlockError HashRef (Peer e)
|
| PeerMissBlockError HashRef (Peer e)
|
||||||
| PeerBlockHashMismatch (Peer e)
|
| PeerBlockHashMismatch (Peer e)
|
||||||
| PeerRequestTimeout (Peer e)
|
| PeerRequestTimeout (Peer e)
|
||||||
|
| Incomplete HashRef
|
||||||
deriving stock (Generic,Typeable)
|
deriving stock (Generic,Typeable)
|
||||||
|
|
||||||
instance Pretty (Peer e) => Show (DownloadError e) where
|
instance Pretty (Peer e) => Show (DownloadError e) where
|
||||||
|
@ -92,6 +94,7 @@ instance Pretty (Peer e) => Show (DownloadError e) where
|
||||||
show (PeerBlockHashMismatch p) = show $ parens $ "PeerBlockHashMismatch" <+> pretty p
|
show (PeerBlockHashMismatch p) = show $ parens $ "PeerBlockHashMismatch" <+> pretty p
|
||||||
show StorageError = show "StorageError"
|
show StorageError = show "StorageError"
|
||||||
show (InternalError n) = show $ parens "InternalError" <+> pretty n
|
show (InternalError n) = show $ parens "InternalError" <+> pretty n
|
||||||
|
show (Incomplete h) = show $ parens "Incomplete" <+> pretty h
|
||||||
|
|
||||||
instance (Typeable e, Pretty (Peer e)) => Exception (DownloadError e)
|
instance (Typeable e, Pretty (Peer e)) => Exception (DownloadError e)
|
||||||
|
|
||||||
|
@ -166,6 +169,13 @@ queryBlockSizeFromPeer cache e h peer = do
|
||||||
Right x -> pure (Right x)
|
Right x -> pure (Right x)
|
||||||
|
|
||||||
|
|
||||||
|
data S =
|
||||||
|
SInit
|
||||||
|
| SFetchQ
|
||||||
|
| SFetchPost (Hash HbSync) ByteString
|
||||||
|
| SCheckBefore
|
||||||
|
| SCheckAfter
|
||||||
|
|
||||||
-- | downloads block with dependencies recursively
|
-- | downloads block with dependencies recursively
|
||||||
downloadFromPeerRec :: forall e t cache m . ( e ~ L4Proto
|
downloadFromPeerRec :: forall e t cache m . ( e ~ L4Proto
|
||||||
, MonadUnliftIO m
|
, MonadUnliftIO m
|
||||||
|
@ -186,65 +196,77 @@ downloadFromPeerRec t bu0 cache env h0 peer = do
|
||||||
|
|
||||||
p <- newTQueueIO
|
p <- newTQueueIO
|
||||||
q <- newTQueueIO
|
q <- newTQueueIO
|
||||||
done <- newTVarIO (mempty :: HashSet (Hash HbSync))
|
qq <- newTQueueIO
|
||||||
|
|
||||||
|
flip runContT pure do
|
||||||
|
|
||||||
|
ContT $ withAsync $ forever do
|
||||||
|
join $ atomically (readTQueue p)
|
||||||
|
|
||||||
|
ContT $ withAsync $ forever do
|
||||||
|
h <- atomically (readTQueue qq)
|
||||||
|
void $ queryBlockSizeFromPeer cache env h peer
|
||||||
|
pause @'Seconds 1.5
|
||||||
|
|
||||||
|
flip fix SInit $ \next -> \case
|
||||||
|
|
||||||
|
SInit -> do
|
||||||
|
debug "SInit"
|
||||||
atomically $ writeTQueue q h0
|
atomically $ writeTQueue q h0
|
||||||
|
next SCheckBefore
|
||||||
|
|
||||||
missed <- findMissedBlocks sto (HashRef h0)
|
SCheckBefore -> do
|
||||||
|
here <- hasBlock sto h0 <&> isJust
|
||||||
|
if here then next SCheckAfter else next SFetchQ
|
||||||
|
|
||||||
mapM_ (atomically . writeTQueue q . coerce) missed
|
SFetchQ -> do
|
||||||
|
debug "SFetchQ"
|
||||||
|
|
||||||
flip runContT pure $ callCC \exit -> do
|
done <- atomically do
|
||||||
|
|
||||||
ContT $ withAsync $ liftIO $ forever do
|
|
||||||
atomically (readTQueue p)
|
|
||||||
|
|
||||||
fix \next -> do
|
|
||||||
|
|
||||||
mt <- atomically do
|
|
||||||
pe <- isEmptyTQueue p
|
pe <- isEmptyTQueue p
|
||||||
qe <- isEmptyTQueue q
|
qe <- isEmptyTQueue q
|
||||||
when ( qe && not pe ) retry
|
unless pe retry
|
||||||
pure qe
|
pure qe
|
||||||
|
|
||||||
when mt $ exit $ Right ()
|
if done then
|
||||||
|
next SCheckAfter
|
||||||
|
else do
|
||||||
|
|
||||||
h <- atomically $ readTQueue q
|
h <- atomically $ readTQueue q
|
||||||
|
mbs <- getBlock sto h
|
||||||
|
|
||||||
here1 <- readTVarIO done <&> HS.member h
|
case mbs of
|
||||||
|
Just bs -> next (SFetchPost h bs)
|
||||||
|
Nothing -> none
|
||||||
|
|
||||||
here2 <- hasBlock sto (coerce h) <&> isJust
|
w <- lift $ downloadFromPeer t bu0 cache env (coerce h) peer
|
||||||
|
|
||||||
when (here1 || here2) next
|
|
||||||
|
|
||||||
w <- lift $ downloadFromPeer (TimeoutSec 5) bu0 cache env (coerce h) peer
|
|
||||||
|
|
||||||
case w of
|
case w of
|
||||||
Right bs -> do
|
Right bs -> do
|
||||||
|
next (SFetchPost h bs)
|
||||||
h' <- enqueueBlock sto bs
|
|
||||||
h3 <- ContT $ maybe1 h' (pure $ Left StorageError)
|
|
||||||
|
|
||||||
let
|
|
||||||
parse :: IO ()
|
|
||||||
parse = do
|
|
||||||
let refs = extractBlockRefs h3 bs
|
|
||||||
atomically $ mapM_ (writeTQueue q . coerce) refs
|
|
||||||
|
|
||||||
atomically $ writeTQueue p parse
|
|
||||||
|
|
||||||
next
|
|
||||||
|
|
||||||
Left e -> do
|
Left e -> do
|
||||||
err $ "DOWNLOAD ERROR" <+> viaShow e
|
err $ "DOWNLOAD ERROR" <+> viaShow e
|
||||||
-- pause @'Seconds 0.25
|
next SFetchQ
|
||||||
next
|
|
||||||
|
|
||||||
|
SFetchPost h bs -> do
|
||||||
|
debug $ "SFetchPost" <+> pretty h
|
||||||
|
|
||||||
|
let parse = do
|
||||||
|
let refs = extractBlockRefs h bs
|
||||||
|
atomically $ mapM_ (writeTQueue q . coerce) refs
|
||||||
|
mapM_ (atomically . writeTQueue qq . coerce) refs
|
||||||
|
|
||||||
|
atomically $ writeTQueue p parse
|
||||||
|
|
||||||
|
next SFetchQ
|
||||||
|
|
||||||
|
SCheckAfter -> do
|
||||||
|
debug "SCheckAfter"
|
||||||
missed <- findMissedBlocks sto (HashRef h0)
|
missed <- findMissedBlocks sto (HashRef h0)
|
||||||
|
|
||||||
unless (L.null missed) do
|
|
||||||
mapM_ (atomically . writeTQueue q . coerce) missed
|
mapM_ (atomically . writeTQueue q . coerce) missed
|
||||||
next
|
mapM_ (atomically . writeTQueue qq . coerce) missed
|
||||||
|
unless (L.null missed) $ next SFetchQ
|
||||||
|
|
||||||
pure $ Right ()
|
pure $ Right ()
|
||||||
|
|
||||||
|
@ -266,6 +288,16 @@ downloadFromPeer t bu cache env h peer = liftIO $ withPeerM env do
|
||||||
pd@PeerData{..} <- find (KnownPeerKey peer) id
|
pd@PeerData{..} <- find (KnownPeerKey peer) id
|
||||||
>>= orThrow (UnknownPeerError peer)
|
>>= orThrow (UnknownPeerError peer)
|
||||||
|
|
||||||
|
pinfo <- find (PeerInfoKey peer) id
|
||||||
|
>>= orThrow (UnknownPeerError peer)
|
||||||
|
|
||||||
|
rtt <- liftIO $ medianPeerRTT pinfo
|
||||||
|
<&> fmap ((*1) . realToFrac)
|
||||||
|
<&> fromMaybe 0
|
||||||
|
<&> (/1e6)
|
||||||
|
|
||||||
|
let wait = 1000
|
||||||
|
|
||||||
sto <- getStorage
|
sto <- getStorage
|
||||||
|
|
||||||
let chunkSize = defChunkSize
|
let chunkSize = defChunkSize
|
||||||
|
@ -295,15 +327,19 @@ downloadFromPeer t bu cache env h peer = liftIO $ withPeerM env do
|
||||||
|
|
||||||
for_ bursts $ \(i,chunkN) -> do
|
for_ bursts $ \(i,chunkN) -> do
|
||||||
|
|
||||||
-- s0 <- readTVarIO _sBlockChunks2 <&> IntMap.size
|
|
||||||
|
|
||||||
-- count <- newTVarIO 0
|
|
||||||
atomically $ flushTQueue chuQ
|
atomically $ flushTQueue chuQ
|
||||||
|
|
||||||
let req = BlockChunks @e coo (BlockGetChunks h chunkSize (fromIntegral i) (fromIntegral chunkN))
|
let req = BlockChunks @e coo (BlockGetChunks h chunkSize (fromIntegral i) (fromIntegral chunkN))
|
||||||
|
|
||||||
lift $ request peer req
|
lift $ request peer req
|
||||||
|
|
||||||
r <- liftIO $ race (pause t) do
|
let watchdog = fix \next -> do
|
||||||
|
s1 <- readTVarIO _sBlockChunks2 <&> IntMap.size
|
||||||
|
pause @'MilliSeconds ( max (realToFrac chunkN * rtt) 2000 )
|
||||||
|
s2 <- readTVarIO _sBlockChunks2 <&> IntMap.size
|
||||||
|
when (s1 == s2) next
|
||||||
|
|
||||||
|
r <- liftIO $ race watchdog do
|
||||||
|
|
||||||
atomically do
|
atomically do
|
||||||
pieces <- readTVar _sBlockChunks2
|
pieces <- readTVar _sBlockChunks2
|
||||||
|
@ -312,12 +348,16 @@ downloadFromPeer t bu cache env h peer = liftIO $ withPeerM env do
|
||||||
|
|
||||||
atomically $ flushTQueue chuQ
|
atomically $ flushTQueue chuQ
|
||||||
|
|
||||||
either (const $ exit2 (Left $ DownloadStuckError (HashRef h) peer) ) pure r
|
case r of
|
||||||
|
Left{} -> exit2 (Left $ DownloadStuckError (HashRef h) peer)
|
||||||
|
_ -> pure ()
|
||||||
|
|
||||||
blk <- readTVarIO _sBlockChunks2
|
blk <- readTVarIO _sBlockChunks2
|
||||||
let rs = LBS.concat $ IntMap.elems blk
|
let rs = LBS.concat $ IntMap.elems blk
|
||||||
|
|
||||||
ha <- enqueueBlock sto rs
|
ha <- putBlock sto rs
|
||||||
|
|
||||||
|
-- let ha = Just $ hashObject @HbSync rs
|
||||||
|
|
||||||
lift $ expire @e key
|
lift $ expire @e key
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue