This commit is contained in:
voidlizard 2024-11-05 12:50:19 +03:00
parent 9c10655a0c
commit 54a805ae6e
1 changed files with 72 additions and 149 deletions

View File

@ -59,6 +59,7 @@ import Data.HashSet (HashSet)
import Data.HashSet qualified as HS import Data.HashSet qualified as HS
import Data.IntMap qualified as IntMap import Data.IntMap qualified as IntMap
import Data.IntMap (IntMap) import Data.IntMap (IntMap)
import Data.List.Split qualified as Split
import Data.Text qualified as Text import Data.Text qualified as Text
import Data.Either import Data.Either
import Data.Maybe import Data.Maybe
@ -77,6 +78,7 @@ data DownloadError e =
DownloadStuckError HashRef (Peer e) DownloadStuckError HashRef (Peer e)
| StorageError | StorageError
| UnknownPeerError (Peer e) | UnknownPeerError (Peer e)
| InternalError Int
| PeerMissBlockError HashRef (Peer e) | PeerMissBlockError HashRef (Peer e)
| PeerBlockHashMismatch (Peer e) | PeerBlockHashMismatch (Peer e)
| PeerRequestTimeout (Peer e) | PeerRequestTimeout (Peer e)
@ -89,6 +91,7 @@ instance Pretty (Peer e) => Show (DownloadError e) where
show (PeerRequestTimeout p) = show $ parens $ "PeerRequestTimeout" <+> pretty p show (PeerRequestTimeout p) = show $ parens $ "PeerRequestTimeout" <+> pretty p
show (PeerBlockHashMismatch p) = show $ parens $ "PeerBlockHashMismatch" <+> pretty p show (PeerBlockHashMismatch p) = show $ parens $ "PeerBlockHashMismatch" <+> pretty p
show StorageError = show "StorageError" show StorageError = show "StorageError"
show (InternalError n) = show $ parens "InternalError" <+> pretty n
instance (Typeable e, Pretty (Peer e)) => Exception (DownloadError e) instance (Typeable e, Pretty (Peer e)) => Exception (DownloadError e)
@ -170,18 +173,25 @@ downloadFromPeerRec :: forall e t cache m . ( e ~ L4Proto
, BlockSizeCache e cache , BlockSizeCache e cache
) )
=> Timeout t => Timeout t
-> Int
-> cache -> cache
-> PeerEnv e -> PeerEnv e
-> Hash HbSync -> Hash HbSync
-> Peer e -> Peer e
-> m (Either (DownloadError e) ()) -> m (Either (DownloadError e) ())
downloadFromPeerRec t cache env h0 peer = do downloadFromPeerRec t bu0 cache env h0 peer = do
sto <- withPeerM env getStorage sto <- withPeerM env getStorage
q <- newTQueueIO q <- newTQueueIO
done <- newTVarIO (mempty :: HashSet (Hash HbSync))
atomically $ writeTQueue q h0
missed <- findMissedBlocks sto (HashRef h0)
mapM_ (atomically . writeTQueue q . coerce) missed
flip runContT pure $ callCC \exit -> do flip runContT pure $ callCC \exit -> do
@ -189,144 +199,58 @@ downloadFromPeerRec t cache env h0 peer = do
mt <- atomically $ isEmptyTQueue q mt <- atomically $ isEmptyTQueue q
when mt do
missed <- findMissedBlocks sto (HashRef h0)
mapM_ (atomically . writeTQueue q) missed
mt <- atomically $ isEmptyTQueue q
when mt $ exit $ Right () when mt $ exit $ Right ()
h <- atomically $ readTQueue q h <- atomically $ readTQueue q
w <- lift $ downloadFromPeer (TimeoutSec 5) cache env (coerce h) peer here1 <- readTVarIO done <&> HS.member h
here2 <- hasBlock sto (coerce h) <&> isJust
when (here1 || here2) next
w <- lift $ downloadFromPeer (TimeoutSec 5) bu0 cache env (coerce h) peer
case w of case w of
Right bs -> do Right bs -> do
h <- enqueueBlock sto bs h' <- enqueueBlock sto bs
pause @'Seconds 0.25 h3 <- ContT $ maybe1 h' (pure $ Left StorageError)
let refs = extractBlockRefs h3 bs
atomically $ mapM_ (writeTQueue q . coerce) refs
next next
Left e -> do Left e -> do
err $ "DOWNLOAD ERROR" <+> pretty h err $ "DOWNLOAD ERROR" <+> viaShow e
-- pause @'Seconds 0.25 -- pause @'Seconds 0.25
next next
missed <- findMissedBlocks sto (HashRef h0)
unless (L.null missed) do
mapM_ (atomically . writeTQueue q . coerce) missed
next
pure $ Right () pure $ Right ()
-- w <- newTVarIO (mempty :: HashSet (Hash HbSync) )
-- q <- newTQueueIO
-- p <- newTQueueIO
-- timeouts <- newTVarIO 0
-- sto <- withPeerM env getStorage
-- let addBlocks hx = atomically do
-- for_ hx $ \h -> do
-- writeTQueue q h
-- -- here <- readTVar w <&> HS.member h
-- -- unless here do
-- -- modifyTVar w (HS.insert h)
-- flip runContT pure do
-- atomically $ writeTQueue q h0
-- callCC \exit1 -> do
-- void $ ContT $ withAsync $ forever do
-- atomically do
-- (hx,bs) <- readTQueue p
-- let refs = extractBlockRefs hx bs
-- mapM (writeTQueue q) refs
-- fix \next -> do
-- h <- atomically do
-- h1 <- tryReadTQueue q
-- e <- isEmptyTQueue p
-- case h1 of
-- Just x -> pure (Just x)
-- Nothing | e -> pure Nothing
-- | otherwise -> retry
-- r <- case h of
-- Nothing -> exit1 okay
-- Just b -> do
-- debug $ "BLOCK TO DOWNLOAD" <+> pretty b
-- missed <- findMissedBlocks sto (HashRef b)
-- addBlocks (fmap coerce missed)
-- blk <- getBlock sto b
-- case blk of
-- Just bs -> pure (Right (b,bs))
-- Nothing -> do
-- debug $ "GO DOWNLOAD" <+> pretty b
-- w <- lift (downloadFromPeer (TimeoutSec 20) cache env b peer)
-- when (isLeft w) do
-- addBlocks [b]
-- pure $ fmap (b,) w
-- case r of
-- Left (PeerRequestTimeout{}) -> do
-- debug "DOWNLOAD STUCK!"
-- checkTimeout timeouts next (exit1 (Left $ DownloadStuckError (HashRef h0) peer))
-- Left (DownloadStuckError he pe) -> do
-- checkTimeout timeouts next (exit1 (Left $ DownloadStuckError he pe))
-- Left e -> exit1 (Left e)
-- Right (hx,bs) -> do
-- resetTimeouts timeouts
-- let refs = extractBlockRefs hx bs
-- for_ refs $ \z -> do
-- debug $ "PARSED REF" <+> pretty z
-- atomically $ mapM (writeTQueue q) refs
-- pause @'Seconds 0.01
-- -- atomically $ writeTQueue p (hx,bs)
-- next
-- pure okay
-- where
-- resetTimeouts timeouts = atomically $ writeTVar timeouts 0
-- checkTimeout timeouts n e = do
-- tn <- atomically do
-- modifyTVar timeouts succ
-- readTVar timeouts
-- if tn < 10 then n else e
-- okay = Right ()
downloadFromPeer :: forall e t cache m . ( e ~ L4Proto downloadFromPeer :: forall e t cache m . ( e ~ L4Proto
, MonadUnliftIO m , MonadUnliftIO m
, IsTimeout t , IsTimeout t
, BlockSizeCache e cache , BlockSizeCache e cache
) )
=> Timeout t => Timeout t
-> Int
-> cache -> cache
-> PeerEnv e -> PeerEnv e
-> Hash HbSync -> Hash HbSync
-> Peer e -> Peer e
-> m (Either (DownloadError e) ByteString) -> m (Either (DownloadError e) ByteString)
downloadFromPeer t cache env h peer = liftIO $ withPeerM env do downloadFromPeer t bu cache env h peer = liftIO $ withPeerM env do
pd@PeerData{..} <- find (KnownPeerKey peer) id pd@PeerData{..} <- find (KnownPeerKey peer) id
>>= orThrow (UnknownPeerError peer) >>= orThrow (UnknownPeerError peer)
sto <- liftIO $ withPeerM env $ getStorage sto <- getStorage
let chunkSize = defChunkSize let chunkSize = defChunkSize
@ -343,46 +267,39 @@ downloadFromPeer t cache env h peer = liftIO $ withPeerM env do
. set sBlockSize (fromIntegral size) . set sBlockSize (fromIntegral size)
$ down $ down
eblk <- liftIO $ withPeerM env do lift $ update @e new key id
update @e new key id
request peer (BlockChunks @e coo (BlockGetAllChunks h chunkSize)) let offsets = calcChunks size (fromIntegral chunkSize) :: [(Offset, Size)]
let total = L.length $ calcChunks size (fromIntegral chunkSize) let chunkNums = [ 0 .. pred (length offsets) ]
flip runContT pure do let bursts = calcBursts bu chunkNums
callCC $ \exit2 -> do
for_ bursts $ \(i,chunkN) -> do
let req = BlockChunks @e coo (BlockGetChunks h chunkSize (fromIntegral i) (fromIntegral chunkN))
lift $ request peer req
r <- liftIO $ race (pause t) do
worker <- ContT $ withAsync do
atomically do atomically do
wtf <- readTVar _sBlockChunks2 -- readTQueue chuQ
unless (IntMap.size wtf >= total) retry pieces <- readTVar _sBlockChunks2
pure wtf let done = and [ IntMap.member j pieces | j <- [i .. i + chunkN-1] ]
unless done retry
waiter <- ContT $ withAsync $ fix \next -> do atomically $ flushTQueue chuQ
r <- race (pause t) (atomically $ readTQueue chuQ)
case r of
Left{} -> cancel worker
Right{} -> next
result <- waitCatch worker either (const $ exit2 (Left $ DownloadStuckError (HashRef h) peer) ) pure r
void $ ContT $ bracket none $ const do
cancel waiter
atomically $ flushTQueue chuQ
expire @e key
pure result
callCC \exit2 -> do
blk <- case eblk of
Right x -> pure x
Left{} -> exit2 (Left (PeerRequestTimeout peer))
blk <- readTVarIO _sBlockChunks2
let rs = LBS.concat $ IntMap.elems blk let rs = LBS.concat $ IntMap.elems blk
ha <- enqueueBlock sto rs ha <- enqueueBlock sto rs
lift $ expire @e key
case ha of case ha of
Nothing -> pure $ Left StorageError Nothing -> pure $ Left StorageError
@ -442,26 +359,32 @@ instance (e ~ L4Proto, MonadUnliftIO m, HasRpcContext PeerAPI RPC2Context m) =>
_ -> pure nil _ -> pure nil
entry $ bindMatch "query-block-from-peer:rec" $ \syn -> do
entry $ bindMatch "query-block-from-peer:rec" \case flip runContT pure $ callCC \exit -> do
[ HashLike blk, StringLike addr ] -> flip runContT pure $ callCC \exit -> do
peer' <- liftIO $ try @_ @SomeException do (blk,addr,bu0) <- case syn of
let pa = fromString @(PeerAddr L4Proto) addr [ HashLike blk, StringLike addr ] -> pure (blk, addr, 4)
fromPeerAddr pa [ HashLike blk, StringLike addr, LitIntVal x ] -> pure (blk, addr, fromIntegral x)
peer <- either (const $ exit (mkSym "error:invalid-address")) pure peer' _ -> exit nil
what <- lift $ downloadFromPeerRec defChunkWaitMax rpcBrains rpcPeerEnv (coerce blk) peer callCC \exit2 -> do
case what of peer' <- liftIO $ try @_ @SomeException do
Left e -> pure $ mkList @C [ mkSym "error" , mkStr (show e) ] let pa = fromString @(PeerAddr L4Proto) addr
Right{} -> pure $ mkList @C [ mkSym "okay" ] fromPeerAddr pa
_ -> pure nil peer <- either (const $ exit2 (mkSym "error:invalid-address")) pure peer'
what <- lift $ downloadFromPeerRec defChunkWaitMax bu0 rpcBrains rpcPeerEnv (coerce blk) peer
case what of
Left e -> pure $ mkList @C [ mkSym "error" , mkStr (show e) ]
Right{} -> pure $ mkList @C [ mkSym "okay" ]
entry $ bindMatch "query-block-from-peer" \case entry $ bindMatch "query-block-from-peer" \case
[ HashLike blk, StringLike addr ] -> flip runContT pure $ callCC \exit -> do ( HashLike blk :StringLike addr : opts) -> flip runContT pure $ callCC \exit -> do
peer' <- liftIO $ try @_ @SomeException do peer' <- liftIO $ try @_ @SomeException do
let pa = fromString @(PeerAddr L4Proto) addr let pa = fromString @(PeerAddr L4Proto) addr
@ -469,7 +392,7 @@ instance (e ~ L4Proto, MonadUnliftIO m, HasRpcContext PeerAPI RPC2Context m) =>
peer <- either (const $ exit (mkSym "error:invalid-address")) pure peer' peer <- either (const $ exit (mkSym "error:invalid-address")) pure peer'
what <- lift $ downloadFromPeer defChunkWaitMax rpcBrains rpcPeerEnv (coerce blk) peer what <- lift $ downloadFromPeer defChunkWaitMax 4 rpcBrains rpcPeerEnv (coerce blk) peer
case what of case what of
Left e -> pure $ mkList @C [ mkSym "error" , mkStr (show e) ] Left e -> pure $ mkList @C [ mkSym "error" , mkStr (show e) ]