mirror of https://github.com/voidlizard/hbs2
wip
This commit is contained in:
parent
0326f08392
commit
3e737feb0c
|
@ -18,6 +18,7 @@ import Data.Either
|
|||
-- import Data.Function
|
||||
-- import Data.Functor
|
||||
|
||||
import Data.Coerce
|
||||
import Data.Maybe
|
||||
import Control.Concurrent.STM
|
||||
import Data.HashMap.Strict qualified as HashMap
|
||||
|
@ -48,6 +49,38 @@ tryDetect hash obj = rights [mbAnn, mbLink, mbMerkle, mbSeq] & headDef orBlob
|
|||
|
||||
data ScanLevel = ScanShallow | ScanDeep
|
||||
|
||||
extractBlockRefs :: Hash HbSync -> ByteString -> [Hash HbSync]
|
||||
extractBlockRefs hx bs =
|
||||
case tryDetect hx bs of
|
||||
(SeqRef (SequentialRef _ (AnnotatedHashRef a' b))) ->
|
||||
coerce <$> catMaybes [a', Just b]
|
||||
|
||||
AnnRef (AnnotatedHashRef ann h) -> do
|
||||
coerce <$> catMaybes [ann, Just h]
|
||||
|
||||
Merkle (MNode _ hs) -> hs
|
||||
|
||||
MerkleAnn (MTreeAnn{..}) -> do
|
||||
|
||||
let meta = case _mtaMeta of
|
||||
AnnHashRef ha -> [ha]
|
||||
_ -> mempty
|
||||
|
||||
let c = case _mtaCrypt of
|
||||
CryptAccessKeyNaClAsymm hs -> [hs]
|
||||
EncryptGroupNaClSymm1 hs _ -> [hs]
|
||||
EncryptGroupNaClSymm2 _ hs _ -> [hs]
|
||||
_ -> mempty
|
||||
|
||||
let t = case _mtaTree of
|
||||
MNode _ hs -> hs
|
||||
_ -> mempty
|
||||
|
||||
meta <> c <> t
|
||||
|
||||
_ -> mempty
|
||||
|
||||
|
||||
|
||||
-- TODO: control-nesting-level-to-avoid-abuse
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
{-# OPTIONS_GHC -fno-warn-orphans #-}
|
||||
{-# Language UndecidableInstances #-}
|
||||
{-# Language AllowAmbiguousTypes #-}
|
||||
{-# LANGUAGE ImplicitParams #-}
|
||||
module RPC2
|
||||
( module RPC2.Peer
|
||||
, module RPC2.RefLog
|
||||
|
@ -13,7 +14,9 @@ module RPC2
|
|||
|
||||
import HBS2.Prelude.Plated
|
||||
import HBS2.OrDie
|
||||
import HBS2.Data.Detect
|
||||
import HBS2.Hash
|
||||
import HBS2.Merkle
|
||||
import HBS2.Defaults
|
||||
import HBS2.Events
|
||||
import HBS2.Net.Proto.Service
|
||||
|
@ -156,6 +159,90 @@ queryBlockSizeFromPeer cache e h peer = do
|
|||
Left{} -> pure $ Left (PeerRequestTimeout peer)
|
||||
Right x -> pure (Right x)
|
||||
|
||||
|
||||
-- | downloads block with dependencies recursively
|
||||
downloadFromPeerRec :: forall e t cache m . ( e ~ L4Proto
|
||||
, MonadUnliftIO m
|
||||
, IsTimeout t
|
||||
, BlockSizeCache e cache
|
||||
)
|
||||
=> Timeout t
|
||||
-> cache
|
||||
-> PeerEnv e
|
||||
-> Hash HbSync
|
||||
-> Peer e
|
||||
-> m (Either (DownloadError e) ())
|
||||
|
||||
downloadFromPeerRec t cache env h0 peer = do
|
||||
|
||||
q <- newTQueueIO
|
||||
p <- newTQueueIO
|
||||
|
||||
timeouts <- newTVarIO 0
|
||||
|
||||
sto <- withPeerM env getStorage
|
||||
|
||||
flip runContT pure do
|
||||
callCC \exit1 -> do
|
||||
|
||||
void $ ContT $ withAsync $ forever do
|
||||
atomically do
|
||||
(hx,bs) <- readTQueue p
|
||||
let refs = extractBlockRefs hx bs
|
||||
mapM (writeTQueue q) refs
|
||||
|
||||
fix \next -> do
|
||||
|
||||
h <- atomically do
|
||||
h1 <- tryReadTQueue q
|
||||
e <- isEmptyTQueue p
|
||||
|
||||
case h1 of
|
||||
Just x -> pure (Just x)
|
||||
Nothing | e -> pure Nothing
|
||||
| otherwise -> retry
|
||||
|
||||
r <- case h of
|
||||
Nothing -> exit1 okay
|
||||
Just b -> do
|
||||
blk <- getBlock sto b
|
||||
case blk of
|
||||
Just bs -> pure (Right (b,bs))
|
||||
Nothing -> do
|
||||
w <- lift (downloadFromPeer t cache env b peer)
|
||||
pure $ fmap (b,) w
|
||||
|
||||
case r of
|
||||
Left (PeerRequestTimeout{}) -> do
|
||||
checkTimeout timeouts next (exit1 (Left $ DownloadStuckError (HashRef h0) peer))
|
||||
|
||||
Left (DownloadStuckError he pe) -> do
|
||||
checkTimeout timeouts next (exit1 (Left $ DownloadStuckError he pe))
|
||||
|
||||
Left e -> exit1 (Left e)
|
||||
|
||||
Right (hx,bs) -> do
|
||||
resetTimeouts timeouts
|
||||
atomically $ writeTQueue p (hx,bs)
|
||||
next
|
||||
|
||||
pure okay
|
||||
|
||||
where
|
||||
|
||||
resetTimeouts timeouts = atomically $ writeTVar timeouts 0
|
||||
|
||||
checkTimeout timeouts n e = do
|
||||
tn <- atomically do
|
||||
modifyTVar timeouts succ
|
||||
readTVar timeouts
|
||||
|
||||
if tn < 3 then n else e
|
||||
|
||||
|
||||
okay = Right ()
|
||||
|
||||
|
||||
downloadFromPeer :: forall e t cache m . ( e ~ L4Proto
|
||||
, MonadUnliftIO m
|
||||
, IsTimeout t
|
||||
|
@ -193,8 +280,6 @@ downloadFromPeer t cache env h peer = liftIO $ withPeerM env do
|
|||
eblk <- liftIO $ withPeerM env do
|
||||
update @e new key id
|
||||
|
||||
debug $ "FUCKIN WAIT FOR CHUNKS!" <+> pretty h
|
||||
|
||||
request peer (BlockChunks @e coo (BlockGetAllChunks h chunkSize))
|
||||
|
||||
let total = L.length $ calcChunks size (fromIntegral chunkSize)
|
||||
|
@ -292,6 +377,23 @@ instance (e ~ L4Proto, MonadUnliftIO m, HasRpcContext PeerAPI RPC2Context m) =>
|
|||
_ -> pure nil
|
||||
|
||||
|
||||
entry $ bindMatch "query-block-from-peer:rec" \case
|
||||
[ HashLike blk, StringLike addr ] -> flip runContT pure $ callCC \exit -> do
|
||||
|
||||
peer' <- liftIO $ try @_ @SomeException do
|
||||
let pa = fromString @(PeerAddr L4Proto) addr
|
||||
fromPeerAddr pa
|
||||
|
||||
peer <- either (const $ exit (mkSym "error:invalid-address")) pure peer'
|
||||
|
||||
what <- lift $ downloadFromPeerRec defChunkWaitMax rpcBrains rpcPeerEnv (coerce blk) peer
|
||||
|
||||
case what of
|
||||
Left e -> pure $ mkList @C [ mkSym "error" , mkStr (show e) ]
|
||||
Right{} -> pure $ mkList @C [ mkSym "okay" ]
|
||||
|
||||
_ -> pure nil
|
||||
|
||||
entry $ bindMatch "query-block-from-peer" \case
|
||||
[ HashLike blk, StringLike addr ] -> flip runContT pure $ callCC \exit -> do
|
||||
|
||||
|
|
Loading…
Reference in New Issue