This commit is contained in:
voidlizard 2024-11-04 12:11:50 +03:00
parent 8e37ae86ce
commit 87d84b0531
3 changed files with 235 additions and 10 deletions

View File

@ -1,5 +1,6 @@
{-# OPTIONS_GHC -fno-warn-orphans #-}
{-# Language UndecidableInstances #-}
{-# Language AllowAmbiguousTypes #-}
module RPC2
( module RPC2.Peer
, module RPC2.RefLog
@ -11,16 +12,22 @@ module RPC2
import HBS2.Prelude.Plated
import HBS2.OrDie
import HBS2.Hash
import HBS2.Defaults
import HBS2.Events
import HBS2.Net.Proto.Service
import HBS2.Net.Proto.Sessions
import HBS2.Base58
import HBS2.Data.Types.Peer
import HBS2.Data.Types.Refs
import HBS2.Actors.Peer
import HBS2.Peer.Proto.Peer
import HBS2.Peer.Proto.BlockInfo
import HBS2.Peer.Proto.BlockChunks
import HBS2.Storage
import HBS2.Clock
import HBS2.Net.Auth.Schema
@ -42,15 +49,175 @@ import PeerInfo
import Control.Monad.Trans.Maybe
import Control.Monad.Trans.Cont
import Control.Concurrent.STM (flushTQueue)
import Control.Concurrent.STM (flushTQueue,retry)
import Data.IntMap qualified as IntMap
import Data.IntMap (IntMap)
import Data.Text qualified as Text
import Data.Either
import Data.Maybe
import Data.ByteString.Lazy qualified as LBS
import Data.ByteString.Lazy (ByteString)
import Data.ByteString qualified as BS
import Data.List qualified as L
import Data.Coerce
import Numeric
import UnliftIO
import Lens.Micro.Platform
import Streaming.Prelude qualified as S
data DownloadError e =
DownloadStuckError HashRef (Peer e)
| StorageError
| UnknownPeerError (Peer e)
| PeerMissBlockError HashRef (Peer e)
| PeerBlockHashMismatch (Peer e)
| PeerRequestTimeout (Peer e)
deriving stock (Generic,Typeable)
instance Pretty (Peer e) => Show (DownloadError e) where
show (DownloadStuckError h p) = show $ parens $ "DownloadStuck" <+> pretty h <+> pretty p
show (UnknownPeerError p) = show $ parens $ "UnknownPeerError" <+> pretty p
show (PeerMissBlockError h p) = show $ parens $ "PeerMissBlockError" <+> pretty h <+> pretty p
show (PeerRequestTimeout p) = show $ parens $ "PeerRequestTimeout" <+> pretty p
show (PeerBlockHashMismatch p) = show $ parens $ "PeerBlockHashMismatch" <+> pretty p
show StorageError = show "StorageError"
instance (Typeable e, Pretty (Peer e)) => Exception (DownloadError e)
class BlockSizeCache e cache where
cacheBlockSize :: forall m . MonadUnliftIO m
=> cache
-> PubKey 'Sign (Encryption e)
-> Hash HbSync
-> Integer
-> m ()
findBlockSize :: forall m . MonadUnliftIO m
=> cache
-> PubKey 'Sign (Encryption e)
-> Hash HbSync
-> m (Maybe Integer)
instance BlockSizeCache e () where
cacheBlockSize _ _ _ _ = pure ()
findBlockSize _ _ _ = pure Nothing
queryBlockSizeFromPeer :: forall e cache m . ( e ~ L4Proto
, MonadUnliftIO m
, BlockSizeCache e cache
)
=> cache
-> PeerEnv e
-> Hash HbSync
-> Peer e
-> m (Either (DownloadError e) (Maybe Integer))
queryBlockSizeFromPeer cache e h peer = do
what <- try @_ @(DownloadError e) $ liftIO $ withPeerM e do
PeerData{..} <- find (KnownPeerKey peer) id
>>= orThrow (UnknownPeerError peer)
sizeQ <- newTQueueIO
subscribe @e (BlockSizeEventKey peer) $ \case
BlockSizeEvent (that, hx, sz) | hx == h -> do
atomically $ writeTQueue sizeQ (Just sz)
cacheBlockSize @e cache _peerSignKey h sz
_ -> do
atomically $ writeTQueue sizeQ Nothing
request peer (GetBlockSize @e h)
race ( pause defBlockInfoTimeout ) (atomically $ readTQueue sizeQ )
>>= orThrow (PeerRequestTimeout peer)
case what of
Left{} -> pure $ Left (PeerRequestTimeout peer)
Right x -> pure (Right x)
downloadFromPeer :: forall e t cache m . ( e ~ L4Proto
, MonadUnliftIO m
, IsTimeout t
, BlockSizeCache e cache
)
=> Timeout t
-> cache
-> PeerEnv e
-> Hash HbSync
-> Peer e
-> m (Either (DownloadError e) ByteString)
downloadFromPeer t cache env h peer = liftIO $ withPeerM env do
pd@PeerData{..} <- find (KnownPeerKey peer) id
>>= orThrow (UnknownPeerError peer)
sto <- liftIO $ withPeerM env $ getStorage
let chunkSize = defChunkSize
flip runContT pure $ callCC \exit -> do
size <- lift (findBlockSize @e cache _peerSignKey h)
>>= maybe (queryBlockSize exit) pure
coo <- genCookie (peer,h)
let key = DownloadSessionKey (peer, coo)
down@BlockDownload{..} <- newBlockDownload h
let chuQ = _sBlockChunks
let new = set sBlockChunkSize chunkSize
. set sBlockSize (fromIntegral size)
$ down
(what, rs) <- liftIO $ withPeerM env do
update @e new key id
debug $ "FUCKIN WAIT FOR CHUNKS!" <+> pretty h
request peer (BlockChunks @e coo (BlockGetAllChunks h chunkSize))
let total = L.length $ calcChunks size (fromIntegral chunkSize)
blk <- atomically do
wtf <- readTVar _sBlockChunks2
unless (IntMap.size wtf == total) retry
pure wtf
atomically $ flushTQueue chuQ
let rs = LBS.concat $ IntMap.elems blk
ha <- enqueueBlock sto rs
expire @e key
pure (ha, rs)
case what of
Nothing -> pure $ Left StorageError
Just h1 | h1 == h -> do
pure $ Right rs
Just h1 -> do
delBlock sto h1
pure $ Left (PeerBlockHashMismatch peer)
where
queryBlockSize exit = do
what <- lift $ queryBlockSizeFromPeer cache env h peer
case what of
Left{} -> exit (Left (PeerRequestTimeout peer))
Right Nothing -> exit (Left (PeerMissBlockError (HashRef h) peer))
Right (Just s) -> pure s
instance (e ~ L4Proto, MonadUnliftIO m, HasRpcContext PeerAPI RPC2Context m) => HandleMethod m RpcRunScript where
handleMethod top = do
@ -92,6 +259,41 @@ instance (e ~ L4Proto, MonadUnliftIO m, HasRpcContext PeerAPI RPC2Context m) =>
_ -> pure nil
entry $ bindMatch "query-block-from-peer" \case
[ HashLike blk, StringLike addr ] -> flip runContT pure $ callCC \exit -> do
peer' <- liftIO $ try @_ @SomeException do
let pa = fromString @(PeerAddr L4Proto) addr
fromPeerAddr pa
peer <- either (const $ exit (mkSym "error:invalid-address")) pure peer'
what <- lift $ downloadFromPeer defChunkWaitMax () rpcPeerEnv (coerce blk) peer
case what of
Left e -> pure $ mkList @C [ mkSym "error" , mkStr (show e) ]
Right bs -> pure $ mkList @C [ mkSym "okay", mkInt (LBS.length bs) ]
_ -> pure nil
entry $ bindMatch "query-block-size-from-peer" \case
[ HashLike blk, StringLike addr ] -> flip runContT pure $ callCC \exit -> do
peer' <- liftIO $ try @_ @SomeException do
let pa = fromString @(PeerAddr L4Proto) addr
fromPeerAddr pa
peer <- either (const $ exit (mkSym "error:invalid-address")) pure peer'
sz <- lift $ queryBlockSizeFromPeer @e () rpcPeerEnv (coerce blk) peer
case sz of
Left e -> pure $ mkList @C [ mkSym "error", mkStr (show e) ]
Right Nothing -> pure $ mkSym "no-block"
Right (Just s) -> pure $ mkList [mkSym "size", mkInt s]
_ -> pure $ mkSym "error:invalid-args"
entry $ bindMatch "request-block-size" \case
[LitScientificVal w, HashLike blk] -> do
@ -101,12 +303,11 @@ instance (e ~ L4Proto, MonadUnliftIO m, HasRpcContext PeerAPI RPC2Context m) =>
answ <- newTQueueIO
forKnownPeers @e $ \p _ -> do
forKnownPeers @e $ \p pd -> do
subscribe @e (BlockSizeEventKey p) $ \case
BlockSizeEvent (that, hx, sz) | hx == h -> do
debug $ "FUCKING GOT BLOCK SIZE!" <+> pretty (HashRef hx) <+> pretty p
atomically $ writeTQueue answ (sz, that)
atomically $ writeTQueue answ (sz, that, pd)
_ -> none
@ -119,10 +320,14 @@ instance (e ~ L4Proto, MonadUnliftIO m, HasRpcContext PeerAPI RPC2Context m) =>
xs <- flushTQueue answ
pure (x:xs)
rr <- S.toList_ $ for_ r $ \(s,p) -> do
S.yield $ mkList @C [ mkSym "size", mkInt s, mkSym (show $ pretty p) ]
rr <- S.toList_ $ for_ r $ \(s,p, PeerData{..}) -> do
let psk = AsBase58 _peerSignKey
S.yield $ mkList @C [ mkSym "size"
, mkInt s
, mkSym (show $ pretty $ psk)
, mkSym (show $ pretty p)
]
debug $ "WTF?!" <+> pretty rr
pure $ mkList rr
_ -> do

View File

@ -6,8 +6,8 @@ import HBS2.Hash
import HBS2.Net.Proto
import HBS2.Peer.Proto.Peer
import HBS2.Prelude.Plated
import HBS2.Storage
import HBS2.Actors.Peer
import HBS2.Storage
import HBS2.Net.Proto.Sessions
import Data.Word

View File

@ -28,6 +28,7 @@ blockSizeProto :: forall e m proto . ( MonadIO m
, Response e proto m
, HasDeferred proto e m
, EventEmitter e proto m
, EventEmitter e (AnyBlockSizeEvent e) m
, Sessions e (KnownPeer e) m
, proto ~ BlockInfo e
)
@ -52,16 +53,35 @@ blockSizeProto getBlockSize evHasBlock onNoBlock =
onNoBlock (p, h)
response (NoBlock @e h)
NoBlock h -> do
NoBlock h -> deferred @proto do
that <- thatPeer @proto
emit @e (BlockSizeEventKey that) (NoBlockEvent (that, h))
emit @e AnyBlockSizeEventKey (AnyBlockSizeEvent h Nothing that)
evHasBlock ( that, h, Nothing )
BlockSize h sz -> do
BlockSize h sz -> deferred @proto do
that <- thatPeer @proto
emit @e (BlockSizeEventKey @e that) (BlockSizeEvent (that, h, sz))
emit @e AnyBlockSizeEventKey (AnyBlockSizeEvent h (Just sz) that)
evHasBlock ( that, h, Just sz )
data AnyBlockSizeEvent e
data instance EventKey e (AnyBlockSizeEvent e) =
AnyBlockSizeEventKey
deriving stock (Typeable, Generic, Eq)
instance Hashable (EventKey e (AnyBlockSizeEvent e)) where
hashWithSalt s _ = hashWithSalt s ("AnyBlockSizeEventKey_1730696922" :: ByteString)
data instance Event e (AnyBlockSizeEvent e) =
AnyBlockSizeEvent
{ anyBlockSizeHash :: Hash HbSync
, anyBlockSize :: Maybe Integer
, anyBlockSizePeer :: Peer e
}
deriving stock (Generic,Typeable)
newtype instance SessionKey e (BlockInfo e) =
BlockSizeKey (Hash HbSync)
deriving stock (Typeable,Eq,Show)