This commit is contained in:
voidlizard 2024-11-05 08:00:52 +03:00
parent b9f7399b64
commit cb5512d114
2 changed files with 137 additions and 2 deletions

View File

@ -18,6 +18,7 @@ import Data.Either
-- import Data.Function
-- import Data.Functor
import Data.Coerce
import Data.Maybe
import Control.Concurrent.STM
import Data.HashMap.Strict qualified as HashMap
@ -48,6 +49,38 @@ tryDetect hash obj = rights [mbAnn, mbLink, mbMerkle, mbSeq] & headDef orBlob
data ScanLevel = ScanShallow | ScanDeep
extractBlockRefs :: Hash HbSync -> ByteString -> [Hash HbSync]
extractBlockRefs hx bs =
case tryDetect hx bs of
(SeqRef (SequentialRef _ (AnnotatedHashRef a' b))) ->
coerce <$> catMaybes [a', Just b]
AnnRef (AnnotatedHashRef ann h) -> do
coerce <$> catMaybes [ann, Just h]
Merkle (MNode _ hs) -> hs
MerkleAnn (MTreeAnn{..}) -> do
let meta = case _mtaMeta of
AnnHashRef ha -> [ha]
_ -> mempty
let c = case _mtaCrypt of
CryptAccessKeyNaClAsymm hs -> [hs]
EncryptGroupNaClSymm1 hs _ -> [hs]
EncryptGroupNaClSymm2 _ hs _ -> [hs]
_ -> mempty
let t = case _mtaTree of
MNode _ hs -> hs
_ -> mempty
meta <> c <> t
_ -> mempty
-- TODO: control-nesting-level-to-avoid-abuse

View File

@ -1,6 +1,7 @@
{-# OPTIONS_GHC -fno-warn-orphans #-}
{-# Language UndecidableInstances #-}
{-# Language AllowAmbiguousTypes #-}
{-# LANGUAGE ImplicitParams #-}
module RPC2
( module RPC2.Peer
, module RPC2.RefLog
@ -13,7 +14,9 @@ module RPC2
import HBS2.Prelude.Plated
import HBS2.OrDie
import HBS2.Data.Detect
import HBS2.Hash
import HBS2.Merkle
import HBS2.Defaults
import HBS2.Events
import HBS2.Net.Proto.Service
@ -156,6 +159,90 @@ queryBlockSizeFromPeer cache e h peer = do
Left{} -> pure $ Left (PeerRequestTimeout peer)
Right x -> pure (Right x)
-- | downloads block with dependencies recursively
downloadFromPeerRec :: forall e t cache m . ( e ~ L4Proto
, MonadUnliftIO m
, IsTimeout t
, BlockSizeCache e cache
)
=> Timeout t
-> cache
-> PeerEnv e
-> Hash HbSync
-> Peer e
-> m (Either (DownloadError e) ())
downloadFromPeerRec t cache env h0 peer = do
q <- newTQueueIO
p <- newTQueueIO
timeouts <- newTVarIO 0
sto <- withPeerM env getStorage
flip runContT pure do
callCC \exit1 -> do
void $ ContT $ withAsync $ forever do
atomically do
(hx,bs) <- readTQueue p
let refs = extractBlockRefs hx bs
mapM (writeTQueue q) refs
fix \next -> do
h <- atomically do
h1 <- tryReadTQueue q
e <- isEmptyTQueue p
case h1 of
Just x -> pure (Just x)
Nothing | e -> pure Nothing
| otherwise -> retry
r <- case h of
Nothing -> exit1 okay
Just b -> do
blk <- getBlock sto b
case blk of
Just bs -> pure (Right (b,bs))
Nothing -> do
w <- lift (downloadFromPeer t cache env b peer)
pure $ fmap (b,) w
case r of
Left (PeerRequestTimeout{}) -> do
checkTimeout timeouts next (exit1 (Left $ DownloadStuckError (HashRef h0) peer))
Left (DownloadStuckError he pe) -> do
checkTimeout timeouts next (exit1 (Left $ DownloadStuckError he pe))
Left e -> exit1 (Left e)
Right (hx,bs) -> do
resetTimeouts timeouts
atomically $ writeTQueue p (hx,bs)
next
pure okay
where
resetTimeouts timeouts = atomically $ writeTVar timeouts 0
checkTimeout timeouts n e = do
tn <- atomically do
modifyTVar timeouts succ
readTVar timeouts
if tn < 3 then n else e
okay = Right ()
downloadFromPeer :: forall e t cache m . ( e ~ L4Proto
, MonadUnliftIO m
, IsTimeout t
@ -193,8 +280,6 @@ downloadFromPeer t cache env h peer = liftIO $ withPeerM env do
eblk <- liftIO $ withPeerM env do
update @e new key id
debug $ "FUCKIN WAIT FOR CHUNKS!" <+> pretty h
request peer (BlockChunks @e coo (BlockGetAllChunks h chunkSize))
let total = L.length $ calcChunks size (fromIntegral chunkSize)
@ -292,6 +377,23 @@ instance (e ~ L4Proto, MonadUnliftIO m, HasRpcContext PeerAPI RPC2Context m) =>
_ -> pure nil
entry $ bindMatch "query-block-from-peer:rec" \case
[ HashLike blk, StringLike addr ] -> flip runContT pure $ callCC \exit -> do
peer' <- liftIO $ try @_ @SomeException do
let pa = fromString @(PeerAddr L4Proto) addr
fromPeerAddr pa
peer <- either (const $ exit (mkSym "error:invalid-address")) pure peer'
what <- lift $ downloadFromPeerRec defChunkWaitMax rpcBrains rpcPeerEnv (coerce blk) peer
case what of
Left e -> pure $ mkList @C [ mkSym "error" , mkStr (show e) ]
Right{} -> pure $ mkList @C [ mkSym "okay" ]
_ -> pure nil
entry $ bindMatch "query-block-from-peer" \case
[ HashLike blk, StringLike addr ] -> flip runContT pure $ callCC \exit -> do