mirror of https://github.com/voidlizard/hbs2
wip
This commit is contained in:
parent
ddd7485340
commit
933a2e31ae
|
@ -18,6 +18,7 @@ import Data.Either
|
||||||
-- import Data.Function
|
-- import Data.Function
|
||||||
-- import Data.Functor
|
-- import Data.Functor
|
||||||
|
|
||||||
|
import Data.Coerce
|
||||||
import Data.Maybe
|
import Data.Maybe
|
||||||
import Control.Concurrent.STM
|
import Control.Concurrent.STM
|
||||||
import Data.HashMap.Strict qualified as HashMap
|
import Data.HashMap.Strict qualified as HashMap
|
||||||
|
@ -48,6 +49,38 @@ tryDetect hash obj = rights [mbAnn, mbLink, mbMerkle, mbSeq] & headDef orBlob
|
||||||
|
|
||||||
data ScanLevel = ScanShallow | ScanDeep
|
data ScanLevel = ScanShallow | ScanDeep
|
||||||
|
|
||||||
|
extractBlockRefs :: Hash HbSync -> ByteString -> [Hash HbSync]
|
||||||
|
extractBlockRefs hx bs =
|
||||||
|
case tryDetect hx bs of
|
||||||
|
(SeqRef (SequentialRef _ (AnnotatedHashRef a' b))) ->
|
||||||
|
coerce <$> catMaybes [a', Just b]
|
||||||
|
|
||||||
|
AnnRef (AnnotatedHashRef ann h) -> do
|
||||||
|
coerce <$> catMaybes [ann, Just h]
|
||||||
|
|
||||||
|
Merkle (MNode _ hs) -> hs
|
||||||
|
|
||||||
|
MerkleAnn (MTreeAnn{..}) -> do
|
||||||
|
|
||||||
|
let meta = case _mtaMeta of
|
||||||
|
AnnHashRef ha -> [ha]
|
||||||
|
_ -> mempty
|
||||||
|
|
||||||
|
let c = case _mtaCrypt of
|
||||||
|
CryptAccessKeyNaClAsymm hs -> [hs]
|
||||||
|
EncryptGroupNaClSymm1 hs _ -> [hs]
|
||||||
|
EncryptGroupNaClSymm2 _ hs _ -> [hs]
|
||||||
|
_ -> mempty
|
||||||
|
|
||||||
|
let t = case _mtaTree of
|
||||||
|
MNode _ hs -> hs
|
||||||
|
_ -> mempty
|
||||||
|
|
||||||
|
meta <> c <> t
|
||||||
|
|
||||||
|
_ -> mempty
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
-- TODO: control-nesting-level-to-avoid-abuse
|
-- TODO: control-nesting-level-to-avoid-abuse
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
{-# OPTIONS_GHC -fno-warn-orphans #-}
|
{-# OPTIONS_GHC -fno-warn-orphans #-}
|
||||||
{-# Language UndecidableInstances #-}
|
{-# Language UndecidableInstances #-}
|
||||||
{-# Language AllowAmbiguousTypes #-}
|
{-# Language AllowAmbiguousTypes #-}
|
||||||
|
{-# LANGUAGE ImplicitParams #-}
|
||||||
module RPC2
|
module RPC2
|
||||||
( module RPC2.Peer
|
( module RPC2.Peer
|
||||||
, module RPC2.RefLog
|
, module RPC2.RefLog
|
||||||
|
@ -13,7 +14,9 @@ module RPC2
|
||||||
|
|
||||||
import HBS2.Prelude.Plated
|
import HBS2.Prelude.Plated
|
||||||
import HBS2.OrDie
|
import HBS2.OrDie
|
||||||
|
import HBS2.Data.Detect
|
||||||
import HBS2.Hash
|
import HBS2.Hash
|
||||||
|
import HBS2.Merkle
|
||||||
import HBS2.Defaults
|
import HBS2.Defaults
|
||||||
import HBS2.Events
|
import HBS2.Events
|
||||||
import HBS2.Net.Proto.Service
|
import HBS2.Net.Proto.Service
|
||||||
|
@ -156,6 +159,90 @@ queryBlockSizeFromPeer cache e h peer = do
|
||||||
Left{} -> pure $ Left (PeerRequestTimeout peer)
|
Left{} -> pure $ Left (PeerRequestTimeout peer)
|
||||||
Right x -> pure (Right x)
|
Right x -> pure (Right x)
|
||||||
|
|
||||||
|
|
||||||
|
-- | downloads block with dependencies recursively
|
||||||
|
downloadFromPeerRec :: forall e t cache m . ( e ~ L4Proto
|
||||||
|
, MonadUnliftIO m
|
||||||
|
, IsTimeout t
|
||||||
|
, BlockSizeCache e cache
|
||||||
|
)
|
||||||
|
=> Timeout t
|
||||||
|
-> cache
|
||||||
|
-> PeerEnv e
|
||||||
|
-> Hash HbSync
|
||||||
|
-> Peer e
|
||||||
|
-> m (Either (DownloadError e) ())
|
||||||
|
|
||||||
|
downloadFromPeerRec t cache env h0 peer = do
|
||||||
|
|
||||||
|
q <- newTQueueIO
|
||||||
|
p <- newTQueueIO
|
||||||
|
|
||||||
|
timeouts <- newTVarIO 0
|
||||||
|
|
||||||
|
sto <- withPeerM env getStorage
|
||||||
|
|
||||||
|
flip runContT pure do
|
||||||
|
callCC \exit1 -> do
|
||||||
|
|
||||||
|
void $ ContT $ withAsync $ forever do
|
||||||
|
atomically do
|
||||||
|
(hx,bs) <- readTQueue p
|
||||||
|
let refs = extractBlockRefs hx bs
|
||||||
|
mapM (writeTQueue q) refs
|
||||||
|
|
||||||
|
fix \next -> do
|
||||||
|
|
||||||
|
h <- atomically do
|
||||||
|
h1 <- tryReadTQueue q
|
||||||
|
e <- isEmptyTQueue p
|
||||||
|
|
||||||
|
case h1 of
|
||||||
|
Just x -> pure (Just x)
|
||||||
|
Nothing | e -> pure Nothing
|
||||||
|
| otherwise -> retry
|
||||||
|
|
||||||
|
r <- case h of
|
||||||
|
Nothing -> exit1 okay
|
||||||
|
Just b -> do
|
||||||
|
blk <- getBlock sto b
|
||||||
|
case blk of
|
||||||
|
Just bs -> pure (Right (b,bs))
|
||||||
|
Nothing -> do
|
||||||
|
w <- lift (downloadFromPeer t cache env b peer)
|
||||||
|
pure $ fmap (b,) w
|
||||||
|
|
||||||
|
case r of
|
||||||
|
Left (PeerRequestTimeout{}) -> do
|
||||||
|
checkTimeout timeouts next (exit1 (Left $ DownloadStuckError (HashRef h0) peer))
|
||||||
|
|
||||||
|
Left (DownloadStuckError he pe) -> do
|
||||||
|
checkTimeout timeouts next (exit1 (Left $ DownloadStuckError he pe))
|
||||||
|
|
||||||
|
Left e -> exit1 (Left e)
|
||||||
|
|
||||||
|
Right (hx,bs) -> do
|
||||||
|
resetTimeouts timeouts
|
||||||
|
atomically $ writeTQueue p (hx,bs)
|
||||||
|
next
|
||||||
|
|
||||||
|
pure okay
|
||||||
|
|
||||||
|
where
|
||||||
|
|
||||||
|
resetTimeouts timeouts = atomically $ writeTVar timeouts 0
|
||||||
|
|
||||||
|
checkTimeout timeouts n e = do
|
||||||
|
tn <- atomically do
|
||||||
|
modifyTVar timeouts succ
|
||||||
|
readTVar timeouts
|
||||||
|
|
||||||
|
if tn < 3 then n else e
|
||||||
|
|
||||||
|
|
||||||
|
okay = Right ()
|
||||||
|
|
||||||
|
|
||||||
downloadFromPeer :: forall e t cache m . ( e ~ L4Proto
|
downloadFromPeer :: forall e t cache m . ( e ~ L4Proto
|
||||||
, MonadUnliftIO m
|
, MonadUnliftIO m
|
||||||
, IsTimeout t
|
, IsTimeout t
|
||||||
|
@ -193,8 +280,6 @@ downloadFromPeer t cache env h peer = liftIO $ withPeerM env do
|
||||||
eblk <- liftIO $ withPeerM env do
|
eblk <- liftIO $ withPeerM env do
|
||||||
update @e new key id
|
update @e new key id
|
||||||
|
|
||||||
debug $ "FUCKIN WAIT FOR CHUNKS!" <+> pretty h
|
|
||||||
|
|
||||||
request peer (BlockChunks @e coo (BlockGetAllChunks h chunkSize))
|
request peer (BlockChunks @e coo (BlockGetAllChunks h chunkSize))
|
||||||
|
|
||||||
let total = L.length $ calcChunks size (fromIntegral chunkSize)
|
let total = L.length $ calcChunks size (fromIntegral chunkSize)
|
||||||
|
@ -292,6 +377,23 @@ instance (e ~ L4Proto, MonadUnliftIO m, HasRpcContext PeerAPI RPC2Context m) =>
|
||||||
_ -> pure nil
|
_ -> pure nil
|
||||||
|
|
||||||
|
|
||||||
|
entry $ bindMatch "query-block-from-peer:rec" \case
|
||||||
|
[ HashLike blk, StringLike addr ] -> flip runContT pure $ callCC \exit -> do
|
||||||
|
|
||||||
|
peer' <- liftIO $ try @_ @SomeException do
|
||||||
|
let pa = fromString @(PeerAddr L4Proto) addr
|
||||||
|
fromPeerAddr pa
|
||||||
|
|
||||||
|
peer <- either (const $ exit (mkSym "error:invalid-address")) pure peer'
|
||||||
|
|
||||||
|
what <- lift $ downloadFromPeerRec defChunkWaitMax rpcBrains rpcPeerEnv (coerce blk) peer
|
||||||
|
|
||||||
|
case what of
|
||||||
|
Left e -> pure $ mkList @C [ mkSym "error" , mkStr (show e) ]
|
||||||
|
Right{} -> pure $ mkList @C [ mkSym "okay" ]
|
||||||
|
|
||||||
|
_ -> pure nil
|
||||||
|
|
||||||
entry $ bindMatch "query-block-from-peer" \case
|
entry $ bindMatch "query-block-from-peer" \case
|
||||||
[ HashLike blk, StringLike addr ] -> flip runContT pure $ callCC \exit -> do
|
[ HashLike blk, StringLike addr ] -> flip runContT pure $ callCC \exit -> do
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue