From 87d84b0531670dbfdc6ecc5145fe3867a50a778c Mon Sep 17 00:00:00 2001 From: voidlizard Date: Mon, 4 Nov 2024 12:11:50 +0300 Subject: [PATCH] wip --- hbs2-peer/app/RPC2.hs | 219 ++++++++++++++++++- hbs2-peer/lib/HBS2/Peer/Proto/BlockChunks.hs | 2 +- hbs2-peer/lib/HBS2/Peer/Proto/BlockInfo.hs | 24 +- 3 files changed, 235 insertions(+), 10 deletions(-) diff --git a/hbs2-peer/app/RPC2.hs b/hbs2-peer/app/RPC2.hs index 40d2b65c..622d2adc 100644 --- a/hbs2-peer/app/RPC2.hs +++ b/hbs2-peer/app/RPC2.hs @@ -1,5 +1,6 @@ {-# OPTIONS_GHC -fno-warn-orphans #-} {-# Language UndecidableInstances #-} +{-# Language AllowAmbiguousTypes #-} module RPC2 ( module RPC2.Peer , module RPC2.RefLog @@ -11,16 +12,22 @@ module RPC2 import HBS2.Prelude.Plated +import HBS2.OrDie +import HBS2.Hash +import HBS2.Defaults import HBS2.Events import HBS2.Net.Proto.Service import HBS2.Net.Proto.Sessions + import HBS2.Base58 import HBS2.Data.Types.Peer import HBS2.Data.Types.Refs import HBS2.Actors.Peer import HBS2.Peer.Proto.Peer import HBS2.Peer.Proto.BlockInfo +import HBS2.Peer.Proto.BlockChunks +import HBS2.Storage import HBS2.Clock import HBS2.Net.Auth.Schema @@ -42,15 +49,175 @@ import PeerInfo import Control.Monad.Trans.Maybe import Control.Monad.Trans.Cont -import Control.Concurrent.STM (flushTQueue) +import Control.Concurrent.STM (flushTQueue,retry) +import Data.IntMap qualified as IntMap +import Data.IntMap (IntMap) import Data.Text qualified as Text import Data.Either import Data.Maybe +import Data.ByteString.Lazy qualified as LBS +import Data.ByteString.Lazy (ByteString) +import Data.ByteString qualified as BS +import Data.List qualified as L import Data.Coerce import Numeric import UnliftIO +import Lens.Micro.Platform import Streaming.Prelude qualified as S + +data DownloadError e = + DownloadStuckError HashRef (Peer e) + | StorageError + | UnknownPeerError (Peer e) + | PeerMissBlockError HashRef (Peer e) + | PeerBlockHashMismatch (Peer e) + | PeerRequestTimeout (Peer e) + deriving stock (Generic,Typeable) + +instance Pretty (Peer e) => Show (DownloadError e) where + show (DownloadStuckError h p) = show $ parens $ "DownloadStuck" <+> pretty h <+> pretty p + show (UnknownPeerError p) = show $ parens $ "UnknownPeerError" <+> pretty p + show (PeerMissBlockError h p) = show $ parens $ "PeerMissBlockError" <+> pretty h <+> pretty p + show (PeerRequestTimeout p) = show $ parens $ "PeerRequestTimeout" <+> pretty p + show (PeerBlockHashMismatch p) = show $ parens $ "PeerBlockHashMismatch" <+> pretty p + show StorageError = show "StorageError" + +instance (Typeable e, Pretty (Peer e)) => Exception (DownloadError e) + + +class BlockSizeCache e cache where + + cacheBlockSize :: forall m . MonadUnliftIO m + => cache + -> PubKey 'Sign (Encryption e) + -> Hash HbSync + -> Integer + -> m () + + findBlockSize :: forall m . MonadUnliftIO m + => cache + -> PubKey 'Sign (Encryption e) + -> Hash HbSync + -> m (Maybe Integer) + +instance BlockSizeCache e () where + cacheBlockSize _ _ _ _ = pure () + findBlockSize _ _ _ = pure Nothing + +queryBlockSizeFromPeer :: forall e cache m . ( e ~ L4Proto + , MonadUnliftIO m + , BlockSizeCache e cache + ) + => cache + -> PeerEnv e + -> Hash HbSync + -> Peer e + -> m (Either (DownloadError e) (Maybe Integer)) + +queryBlockSizeFromPeer cache e h peer = do + + what <- try @_ @(DownloadError e) $ liftIO $ withPeerM e do + + PeerData{..} <- find (KnownPeerKey peer) id + >>= orThrow (UnknownPeerError peer) + + sizeQ <- newTQueueIO + + subscribe @e (BlockSizeEventKey peer) $ \case + BlockSizeEvent (that, hx, sz) | hx == h -> do + atomically $ writeTQueue sizeQ (Just sz) + cacheBlockSize @e cache _peerSignKey h sz + + _ -> do + atomically $ writeTQueue sizeQ Nothing + + request peer (GetBlockSize @e h) + + race ( pause defBlockInfoTimeout ) (atomically $ readTQueue sizeQ ) + >>= orThrow (PeerRequestTimeout peer) + + case what of + Left{} -> pure $ Left (PeerRequestTimeout peer) + Right x -> pure (Right x) + +downloadFromPeer :: forall e t cache m . ( e ~ L4Proto + , MonadUnliftIO m + , IsTimeout t + , BlockSizeCache e cache + ) + => Timeout t + -> cache + -> PeerEnv e + -> Hash HbSync + -> Peer e + -> m (Either (DownloadError e) ByteString) + +downloadFromPeer t cache env h peer = liftIO $ withPeerM env do + + pd@PeerData{..} <- find (KnownPeerKey peer) id + >>= orThrow (UnknownPeerError peer) + + sto <- liftIO $ withPeerM env $ getStorage + + let chunkSize = defChunkSize + + flip runContT pure $ callCC \exit -> do + + size <- lift (findBlockSize @e cache _peerSignKey h) + >>= maybe (queryBlockSize exit) pure + + coo <- genCookie (peer,h) + let key = DownloadSessionKey (peer, coo) + down@BlockDownload{..} <- newBlockDownload h + let chuQ = _sBlockChunks + let new = set sBlockChunkSize chunkSize + . set sBlockSize (fromIntegral size) + $ down + + (what, rs) <- liftIO $ withPeerM env do + update @e new key id + + debug $ "FUCKIN WAIT FOR CHUNKS!" <+> pretty h + + request peer (BlockChunks @e coo (BlockGetAllChunks h chunkSize)) + + let total = L.length $ calcChunks size (fromIntegral chunkSize) + + blk <- atomically do + wtf <- readTVar _sBlockChunks2 + unless (IntMap.size wtf == total) retry + pure wtf + + atomically $ flushTQueue chuQ + + let rs = LBS.concat $ IntMap.elems blk + + ha <- enqueueBlock sto rs + + expire @e key + + pure (ha, rs) + + case what of + Nothing -> pure $ Left StorageError + + Just h1 | h1 == h -> do + pure $ Right rs + + Just h1 -> do + delBlock sto h1 + pure $ Left (PeerBlockHashMismatch peer) + + where + + queryBlockSize exit = do + what <- lift $ queryBlockSizeFromPeer cache env h peer + case what of + Left{} -> exit (Left (PeerRequestTimeout peer)) + Right Nothing -> exit (Left (PeerMissBlockError (HashRef h) peer)) + Right (Just s) -> pure s + instance (e ~ L4Proto, MonadUnliftIO m, HasRpcContext PeerAPI RPC2Context m) => HandleMethod m RpcRunScript where handleMethod top = do @@ -92,6 +259,41 @@ instance (e ~ L4Proto, MonadUnliftIO m, HasRpcContext PeerAPI RPC2Context m) => _ -> pure nil + entry $ bindMatch "query-block-from-peer" \case + [ HashLike blk, StringLike addr ] -> flip runContT pure $ callCC \exit -> do + + peer' <- liftIO $ try @_ @SomeException do + let pa = fromString @(PeerAddr L4Proto) addr + fromPeerAddr pa + + peer <- either (const $ exit (mkSym "error:invalid-address")) pure peer' + + what <- lift $ downloadFromPeer defChunkWaitMax () rpcPeerEnv (coerce blk) peer + + case what of + Left e -> pure $ mkList @C [ mkSym "error" , mkStr (show e) ] + Right bs -> pure $ mkList @C [ mkSym "okay", mkInt (LBS.length bs) ] + + _ -> pure nil + + entry $ bindMatch "query-block-size-from-peer" \case + [ HashLike blk, StringLike addr ] -> flip runContT pure $ callCC \exit -> do + + peer' <- liftIO $ try @_ @SomeException do + let pa = fromString @(PeerAddr L4Proto) addr + fromPeerAddr pa + + peer <- either (const $ exit (mkSym "error:invalid-address")) pure peer' + + sz <- lift $ queryBlockSizeFromPeer @e () rpcPeerEnv (coerce blk) peer + + case sz of + Left e -> pure $ mkList @C [ mkSym "error", mkStr (show e) ] + Right Nothing -> pure $ mkSym "no-block" + Right (Just s) -> pure $ mkList [mkSym "size", mkInt s] + + _ -> pure $ mkSym "error:invalid-args" + entry $ bindMatch "request-block-size" \case [LitScientificVal w, HashLike blk] -> do @@ -101,12 +303,11 @@ instance (e ~ L4Proto, MonadUnliftIO m, HasRpcContext PeerAPI RPC2Context m) => answ <- newTQueueIO - forKnownPeers @e $ \p _ -> do + forKnownPeers @e $ \p pd -> do subscribe @e (BlockSizeEventKey p) $ \case BlockSizeEvent (that, hx, sz) | hx == h -> do - debug $ "FUCKING GOT BLOCK SIZE!" <+> pretty (HashRef hx) <+> pretty p - atomically $ writeTQueue answ (sz, that) + atomically $ writeTQueue answ (sz, that, pd) _ -> none @@ -119,10 +320,14 @@ instance (e ~ L4Proto, MonadUnliftIO m, HasRpcContext PeerAPI RPC2Context m) => xs <- flushTQueue answ pure (x:xs) - rr <- S.toList_ $ for_ r $ \(s,p) -> do - S.yield $ mkList @C [ mkSym "size", mkInt s, mkSym (show $ pretty p) ] + rr <- S.toList_ $ for_ r $ \(s,p, PeerData{..}) -> do + let psk = AsBase58 _peerSignKey + S.yield $ mkList @C [ mkSym "size" + , mkInt s + , mkSym (show $ pretty $ psk) + , mkSym (show $ pretty p) + ] - debug $ "WTF?!" <+> pretty rr pure $ mkList rr _ -> do diff --git a/hbs2-peer/lib/HBS2/Peer/Proto/BlockChunks.hs b/hbs2-peer/lib/HBS2/Peer/Proto/BlockChunks.hs index 9a8038c7..06c82315 100644 --- a/hbs2-peer/lib/HBS2/Peer/Proto/BlockChunks.hs +++ b/hbs2-peer/lib/HBS2/Peer/Proto/BlockChunks.hs @@ -6,8 +6,8 @@ import HBS2.Hash import HBS2.Net.Proto import HBS2.Peer.Proto.Peer import HBS2.Prelude.Plated -import HBS2.Storage import HBS2.Actors.Peer +import HBS2.Storage import HBS2.Net.Proto.Sessions import Data.Word diff --git a/hbs2-peer/lib/HBS2/Peer/Proto/BlockInfo.hs b/hbs2-peer/lib/HBS2/Peer/Proto/BlockInfo.hs index 11a9b357..cbb35be0 100644 --- a/hbs2-peer/lib/HBS2/Peer/Proto/BlockInfo.hs +++ b/hbs2-peer/lib/HBS2/Peer/Proto/BlockInfo.hs @@ -28,6 +28,7 @@ blockSizeProto :: forall e m proto . ( MonadIO m , Response e proto m , HasDeferred proto e m , EventEmitter e proto m + , EventEmitter e (AnyBlockSizeEvent e) m , Sessions e (KnownPeer e) m , proto ~ BlockInfo e ) @@ -52,16 +53,35 @@ blockSizeProto getBlockSize evHasBlock onNoBlock = onNoBlock (p, h) response (NoBlock @e h) - NoBlock h -> do + NoBlock h -> deferred @proto do that <- thatPeer @proto emit @e (BlockSizeEventKey that) (NoBlockEvent (that, h)) + emit @e AnyBlockSizeEventKey (AnyBlockSizeEvent h Nothing that) evHasBlock ( that, h, Nothing ) - BlockSize h sz -> do + BlockSize h sz -> deferred @proto do that <- thatPeer @proto emit @e (BlockSizeEventKey @e that) (BlockSizeEvent (that, h, sz)) + emit @e AnyBlockSizeEventKey (AnyBlockSizeEvent h (Just sz) that) evHasBlock ( that, h, Just sz ) +data AnyBlockSizeEvent e + +data instance EventKey e (AnyBlockSizeEvent e) = + AnyBlockSizeEventKey + deriving stock (Typeable, Generic, Eq) + +instance Hashable (EventKey e (AnyBlockSizeEvent e)) where + hashWithSalt s _ = hashWithSalt s ("AnyBlockSizeEventKey_1730696922" :: ByteString) + +data instance Event e (AnyBlockSizeEvent e) = + AnyBlockSizeEvent + { anyBlockSizeHash :: Hash HbSync + , anyBlockSize :: Maybe Integer + , anyBlockSizePeer :: Peer e + } + deriving stock (Generic,Typeable) + newtype instance SessionKey e (BlockInfo e) = BlockSizeKey (Hash HbSync) deriving stock (Typeable,Eq,Show)