This commit is contained in:
voidlizard 2024-11-05 19:09:26 +03:00
parent 3c0e6af814
commit 686c11a7ad
2 changed files with 86 additions and 46 deletions

View File

@ -81,7 +81,7 @@ defBlockWaitMax = 60 :: Timeout 'Seconds
-- how much time wait for block from peer?
defChunkWaitMax :: Timeout 'Seconds
defChunkWaitMax = 10 :: Timeout 'Seconds
defChunkWaitMax = 30 :: Timeout 'Seconds
defSweepTimeout :: Timeout 'Seconds
defSweepTimeout = 60 -- FIXME: only for debug!

View File

@ -70,6 +70,7 @@ import Data.List qualified as L
import Data.Coerce
import Numeric
import UnliftIO
import UnliftIO.Concurrent
import Lens.Micro.Platform
import Streaming.Prelude qualified as S
@ -82,6 +83,7 @@ data DownloadError e =
| PeerMissBlockError HashRef (Peer e)
| PeerBlockHashMismatch (Peer e)
| PeerRequestTimeout (Peer e)
| Incomplete HashRef
deriving stock (Generic,Typeable)
instance Pretty (Peer e) => Show (DownloadError e) where
@ -92,6 +94,7 @@ instance Pretty (Peer e) => Show (DownloadError e) where
show (PeerBlockHashMismatch p) = show $ parens $ "PeerBlockHashMismatch" <+> pretty p
show StorageError = show "StorageError"
show (InternalError n) = show $ parens "InternalError" <+> pretty n
show (Incomplete h) = show $ parens "Incomplete" <+> pretty h
instance (Typeable e, Pretty (Peer e)) => Exception (DownloadError e)
@ -166,6 +169,13 @@ queryBlockSizeFromPeer cache e h peer = do
Right x -> pure (Right x)
data S =
SInit
| SFetchQ
| SFetchPost (Hash HbSync) ByteString
| SCheckBefore
| SCheckAfter
-- | downloads block with dependencies recursively
downloadFromPeerRec :: forall e t cache m . ( e ~ L4Proto
, MonadUnliftIO m
@ -186,65 +196,77 @@ downloadFromPeerRec t bu0 cache env h0 peer = do
p <- newTQueueIO
q <- newTQueueIO
done <- newTVarIO (mempty :: HashSet (Hash HbSync))
qq <- newTQueueIO
atomically $ writeTQueue q h0
flip runContT pure do
missed <- findMissedBlocks sto (HashRef h0)
ContT $ withAsync $ forever do
join $ atomically (readTQueue p)
mapM_ (atomically . writeTQueue q . coerce) missed
ContT $ withAsync $ forever do
h <- atomically (readTQueue qq)
void $ queryBlockSizeFromPeer cache env h peer
pause @'Seconds 1.5
flip runContT pure $ callCC \exit -> do
flip fix SInit $ \next -> \case
ContT $ withAsync $ liftIO $ forever do
atomically (readTQueue p)
SInit -> do
debug "SInit"
atomically $ writeTQueue q h0
next SCheckBefore
fix \next -> do
SCheckBefore -> do
here <- hasBlock sto h0 <&> isJust
if here then next SCheckAfter else next SFetchQ
mt <- atomically do
pe <- isEmptyTQueue p
qe <- isEmptyTQueue q
when ( qe && not pe ) retry
pure qe
SFetchQ -> do
debug "SFetchQ"
when mt $ exit $ Right ()
done <- atomically do
pe <- isEmptyTQueue p
qe <- isEmptyTQueue q
unless pe retry
pure qe
h <- atomically $ readTQueue q
if done then
next SCheckAfter
else do
here1 <- readTVarIO done <&> HS.member h
h <- atomically $ readTQueue q
mbs <- getBlock sto h
here2 <- hasBlock sto (coerce h) <&> isJust
case mbs of
Just bs -> next (SFetchPost h bs)
Nothing -> none
when (here1 || here2) next
w <- lift $ downloadFromPeer t bu0 cache env (coerce h) peer
w <- lift $ downloadFromPeer (TimeoutSec 5) bu0 cache env (coerce h) peer
case w of
Right bs -> do
next (SFetchPost h bs)
case w of
Right bs -> do
Left e -> do
err $ "DOWNLOAD ERROR" <+> viaShow e
next SFetchQ
h' <- enqueueBlock sto bs
h3 <- ContT $ maybe1 h' (pure $ Left StorageError)
SFetchPost h bs -> do
debug $ "SFetchPost" <+> pretty h
let
parse :: IO ()
parse = do
let refs = extractBlockRefs h3 bs
atomically $ mapM_ (writeTQueue q . coerce) refs
let parse = do
let refs = extractBlockRefs h bs
atomically $ mapM_ (writeTQueue q . coerce) refs
mapM_ (atomically . writeTQueue qq . coerce) refs
atomically $ writeTQueue p parse
atomically $ writeTQueue p parse
next
next SFetchQ
Left e -> do
err $ "DOWNLOAD ERROR" <+> viaShow e
-- pause @'Seconds 0.25
next
missed <- findMissedBlocks sto (HashRef h0)
unless (L.null missed) do
SCheckAfter -> do
debug "SCheckAfter"
missed <- findMissedBlocks sto (HashRef h0)
mapM_ (atomically . writeTQueue q . coerce) missed
next
mapM_ (atomically . writeTQueue qq . coerce) missed
unless (L.null missed) $ next SFetchQ
pure $ Right ()
@ -266,6 +288,16 @@ downloadFromPeer t bu cache env h peer = liftIO $ withPeerM env do
pd@PeerData{..} <- find (KnownPeerKey peer) id
>>= orThrow (UnknownPeerError peer)
pinfo <- find (PeerInfoKey peer) id
>>= orThrow (UnknownPeerError peer)
rtt <- liftIO $ medianPeerRTT pinfo
<&> fmap ((*1) . realToFrac)
<&> fromMaybe 0
<&> (/1e6)
let wait = 1000
sto <- getStorage
let chunkSize = defChunkSize
@ -295,15 +327,19 @@ downloadFromPeer t bu cache env h peer = liftIO $ withPeerM env do
for_ bursts $ \(i,chunkN) -> do
-- s0 <- readTVarIO _sBlockChunks2 <&> IntMap.size
-- count <- newTVarIO 0
atomically $ flushTQueue chuQ
let req = BlockChunks @e coo (BlockGetChunks h chunkSize (fromIntegral i) (fromIntegral chunkN))
lift $ request peer req
r <- liftIO $ race (pause t) do
let watchdog = fix \next -> do
s1 <- readTVarIO _sBlockChunks2 <&> IntMap.size
pause @'MilliSeconds ( max (realToFrac chunkN * rtt) 2000 )
s2 <- readTVarIO _sBlockChunks2 <&> IntMap.size
when (s1 == s2) next
r <- liftIO $ race watchdog do
atomically do
pieces <- readTVar _sBlockChunks2
@ -312,12 +348,16 @@ downloadFromPeer t bu cache env h peer = liftIO $ withPeerM env do
atomically $ flushTQueue chuQ
either (const $ exit2 (Left $ DownloadStuckError (HashRef h) peer) ) pure r
case r of
Left{} -> exit2 (Left $ DownloadStuckError (HashRef h) peer)
_ -> pure ()
blk <- readTVarIO _sBlockChunks2
let rs = LBS.concat $ IntMap.elems blk
ha <- enqueueBlock sto rs
ha <- putBlock sto rs
-- let ha = Just $ hashObject @HbSync rs
lift $ expire @e key