diff --git a/hbs2-core/lib/HBS2/Data/Detect.hs b/hbs2-core/lib/HBS2/Data/Detect.hs index 4d7d71f0..75fe8571 100644 --- a/hbs2-core/lib/HBS2/Data/Detect.hs +++ b/hbs2-core/lib/HBS2/Data/Detect.hs @@ -18,6 +18,7 @@ import Data.Either -- import Data.Function -- import Data.Functor +import Data.Coerce import Data.Maybe import Control.Concurrent.STM import Data.HashMap.Strict qualified as HashMap @@ -48,6 +49,38 @@ tryDetect hash obj = rights [mbAnn, mbLink, mbMerkle, mbSeq] & headDef orBlob data ScanLevel = ScanShallow | ScanDeep +extractBlockRefs :: Hash HbSync -> ByteString -> [Hash HbSync] +extractBlockRefs hx bs = + case tryDetect hx bs of + (SeqRef (SequentialRef _ (AnnotatedHashRef a' b))) -> + coerce <$> catMaybes [a', Just b] + + AnnRef (AnnotatedHashRef ann h) -> do + coerce <$> catMaybes [ann, Just h] + + Merkle (MNode _ hs) -> hs + + MerkleAnn (MTreeAnn{..}) -> do + + let meta = case _mtaMeta of + AnnHashRef ha -> [ha] + _ -> mempty + + let c = case _mtaCrypt of + CryptAccessKeyNaClAsymm hs -> [hs] + EncryptGroupNaClSymm1 hs _ -> [hs] + EncryptGroupNaClSymm2 _ hs _ -> [hs] + _ -> mempty + + let t = case _mtaTree of + MNode _ hs -> hs + _ -> mempty + + meta <> c <> t + + _ -> mempty + + -- TODO: control-nesting-level-to-avoid-abuse diff --git a/hbs2-peer/app/RPC2.hs b/hbs2-peer/app/RPC2.hs index 80d1ff61..8bfa7e53 100644 --- a/hbs2-peer/app/RPC2.hs +++ b/hbs2-peer/app/RPC2.hs @@ -1,6 +1,7 @@ {-# OPTIONS_GHC -fno-warn-orphans #-} {-# Language UndecidableInstances #-} {-# Language AllowAmbiguousTypes #-} +{-# LANGUAGE ImplicitParams #-} module RPC2 ( module RPC2.Peer , module RPC2.RefLog @@ -13,7 +14,9 @@ module RPC2 import HBS2.Prelude.Plated import HBS2.OrDie +import HBS2.Data.Detect import HBS2.Hash +import HBS2.Merkle import HBS2.Defaults import HBS2.Events import HBS2.Net.Proto.Service @@ -156,6 +159,90 @@ queryBlockSizeFromPeer cache e h peer = do Left{} -> pure $ Left (PeerRequestTimeout peer) Right x -> pure (Right x) + +-- | downloads block with dependencies recursively +downloadFromPeerRec :: forall e t cache m . ( e ~ L4Proto + , MonadUnliftIO m + , IsTimeout t + , BlockSizeCache e cache + ) + => Timeout t + -> cache + -> PeerEnv e + -> Hash HbSync + -> Peer e + -> m (Either (DownloadError e) ()) + +downloadFromPeerRec t cache env h0 peer = do + + q <- newTQueueIO + p <- newTQueueIO + + timeouts <- newTVarIO 0 + + sto <- withPeerM env getStorage + + flip runContT pure do + callCC \exit1 -> do + + void $ ContT $ withAsync $ forever do + atomically do + (hx,bs) <- readTQueue p + let refs = extractBlockRefs hx bs + mapM (writeTQueue q) refs + + fix \next -> do + + h <- atomically do + h1 <- tryReadTQueue q + e <- isEmptyTQueue p + + case h1 of + Just x -> pure (Just x) + Nothing | e -> pure Nothing + | otherwise -> retry + + r <- case h of + Nothing -> exit1 okay + Just b -> do + blk <- getBlock sto b + case blk of + Just bs -> pure (Right (b,bs)) + Nothing -> do + w <- lift (downloadFromPeer t cache env b peer) + pure $ fmap (b,) w + + case r of + Left (PeerRequestTimeout{}) -> do + checkTimeout timeouts next (exit1 (Left $ DownloadStuckError (HashRef h0) peer)) + + Left (DownloadStuckError he pe) -> do + checkTimeout timeouts next (exit1 (Left $ DownloadStuckError he pe)) + + Left e -> exit1 (Left e) + + Right (hx,bs) -> do + resetTimeouts timeouts + atomically $ writeTQueue p (hx,bs) + next + + pure okay + + where + + resetTimeouts timeouts = atomically $ writeTVar timeouts 0 + + checkTimeout timeouts n e = do + tn <- atomically do + modifyTVar timeouts succ + readTVar timeouts + + if tn < 3 then n else e + + + okay = Right () + + downloadFromPeer :: forall e t cache m . ( e ~ L4Proto , MonadUnliftIO m , IsTimeout t @@ -193,8 +280,6 @@ downloadFromPeer t cache env h peer = liftIO $ withPeerM env do eblk <- liftIO $ withPeerM env do update @e new key id - debug $ "FUCKIN WAIT FOR CHUNKS!" <+> pretty h - request peer (BlockChunks @e coo (BlockGetAllChunks h chunkSize)) let total = L.length $ calcChunks size (fromIntegral chunkSize) @@ -292,6 +377,23 @@ instance (e ~ L4Proto, MonadUnliftIO m, HasRpcContext PeerAPI RPC2Context m) => _ -> pure nil + entry $ bindMatch "query-block-from-peer:rec" \case + [ HashLike blk, StringLike addr ] -> flip runContT pure $ callCC \exit -> do + + peer' <- liftIO $ try @_ @SomeException do + let pa = fromString @(PeerAddr L4Proto) addr + fromPeerAddr pa + + peer <- either (const $ exit (mkSym "error:invalid-address")) pure peer' + + what <- lift $ downloadFromPeerRec defChunkWaitMax rpcBrains rpcPeerEnv (coerce blk) peer + + case what of + Left e -> pure $ mkList @C [ mkSym "error" , mkStr (show e) ] + Right{} -> pure $ mkList @C [ mkSym "okay" ] + + _ -> pure nil + entry $ bindMatch "query-block-from-peer" \case [ HashLike blk, StringLike addr ] -> flip runContT pure $ callCC \exit -> do