From f1d9513ad55324ed6b991827fbe91301d8b61510 Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Fri, 20 Jan 2023 06:48:00 +0300 Subject: [PATCH] wip --- hbs2-core/hbs2-core.cabal | 1 + hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs | 105 ++++++++ .../lib/HBS2/Storage/Simple.hs | 1 - .../lib/HBS2/Storage/Simple/Extra.hs | 1 + hbs2-tests/test/Main.hs | 244 +++++++----------- 5 files changed, 206 insertions(+), 146 deletions(-) create mode 100644 hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs diff --git a/hbs2-core/hbs2-core.cabal b/hbs2-core/hbs2-core.cabal index d2315427..baf979d3 100644 --- a/hbs2-core/hbs2-core.cabal +++ b/hbs2-core/hbs2-core.cabal @@ -80,6 +80,7 @@ library , HBS2.Net.Proto , HBS2.Net.Proto.Types , HBS2.Net.Proto.BlockInfo + , HBS2.Net.Proto.BlockChunks , HBS2.Prelude , HBS2.Prelude.Plated , HBS2.Storage diff --git a/hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs b/hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs new file mode 100644 index 00000000..5196a756 --- /dev/null +++ b/hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs @@ -0,0 +1,105 @@ +{-# Language RankNTypes #-} +module HBS2.Net.Proto.BlockChunks where + +import HBS2.Prelude.Plated +import HBS2.Hash +import HBS2.Net.Proto +import HBS2.Storage + +import Data.Word +import Prettyprinter +import Data.ByteString.Lazy (ByteString) +import Data.Foldable + +newtype ChunkSize = ChunkSize Word16 + deriving newtype (Num,Enum,Real,Integral,Pretty) + deriving stock (Eq,Ord,Show,Data,Generic) + + +newtype ChunkNum = ChunkNum Word16 + deriving newtype (Num,Enum,Real,Integral,Pretty) + deriving stock (Eq,Ord,Show,Data,Generic) + + +type OnBlockReady h m = Hash h -> m () + + +type GetBlockChunk h m = Hash h -> Offset -> Size -> m (Maybe ByteString) + + +type AcceptChunk h e m = Response e (BlockChunks e) m + => ( Cookie e, Peer e, Hash HbSync, ChunkNum, ByteString ) -> m () + +type GetBlockHash h e m = (Peer e, Cookie e) -> m (Maybe (Hash h)) + +data BlockChunksI e m = + BlockChunksI + { blkSize :: GetBlockSize HbSync m + , blkChunk :: GetBlockChunk HbSync m + , blkGetHash :: GetBlockHash HbSync e m + , blkAcceptChunk :: AcceptChunk HbSync e m + } + + +data BlockChunks e = BlockChunks (Cookie e) (BlockChunksProto e) + deriving stock (Generic) + +data BlockChunksProto e = BlockGetAllChunks (Hash HbSync) ChunkSize + | BlockNoChunks + | BlockChunk ChunkNum ByteString + | BlockLost + deriving stock (Generic) + + +instance HasCookie e (BlockChunks e) where + type instance Cookie e = Word32 + getCookie (BlockChunks c _) = Just c + +instance Serialise ChunkSize +instance Serialise ChunkNum +instance Serialise (BlockChunksProto e) +instance Serialise (BlockChunks e) + + +blockChunksProto :: forall e m . ( MonadIO m + , Response e (BlockChunks e) m + , Pretty (Peer e) + ) + => BlockChunksI e m + -> BlockChunks e + -> m () + +blockChunksProto adapter (BlockChunks c p) = + case p of + BlockGetAllChunks h size -> deferred proto do + bsz' <- blkSize adapter h + + maybe1 bsz' (pure ()) $ \bsz -> do + + let offsets' = calcChunks bsz (fromIntegral size) :: [(Offset, Size)] + let offsets = zip offsets' [0..] + + for_ offsets $ \((o,sz),i) -> do + chunk <- blkChunk adapter h o sz + maybe (pure ()) (response_ . BlockChunk @e i) chunk + + BlockChunk n bs -> do + who <- thatPeer proto + h <- blkGetHash adapter (who, c) + + maybe1 h (response_ (BlockLost @e)) $ \hh -> do + void $ blkAcceptChunk adapter (c, who, hh, n, bs) + + BlockNoChunks {} -> do + -- TODO: notification + pure () + + BlockLost{} -> do + pure () + + where + proto = Proxy @(BlockChunks e) + response_ pt = response (BlockChunks c pt) + + + diff --git a/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs b/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs index 1d297942..8536d0c9 100644 --- a/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs +++ b/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs @@ -2,7 +2,6 @@ {-# Language ScopedTypeVariables #-} module HBS2.Storage.Simple ( module HBS2.Storage.Simple - , module HBS2.Storage ) where import Control.Concurrent.Async diff --git a/hbs2-storage-simple/lib/HBS2/Storage/Simple/Extra.hs b/hbs2-storage-simple/lib/HBS2/Storage/Simple/Extra.hs index 3de4db52..e514776b 100644 --- a/hbs2-storage-simple/lib/HBS2/Storage/Simple/Extra.hs +++ b/hbs2-storage-simple/lib/HBS2/Storage/Simple/Extra.hs @@ -4,6 +4,7 @@ module HBS2.Storage.Simple.Extra where import HBS2.Merkle import HBS2.Hash import HBS2.Prelude +import HBS2.Storage import HBS2.Storage.Simple import HBS2.Data.Types.Refs import HBS2.Defaults diff --git a/hbs2-tests/test/Main.hs b/hbs2-tests/test/Main.hs index 2a864ea4..a8914d4d 100644 --- a/hbs2-tests/test/Main.hs +++ b/hbs2-tests/test/Main.hs @@ -9,10 +9,13 @@ import HBS2.Hash -- import HBS2.Net.Messaging import HBS2.Net.Proto import HBS2.Net.Proto.BlockInfo +import HBS2.Net.Proto.BlockChunks +import HBS2.Net.Messaging import HBS2.Net.Messaging.Fake import HBS2.Actors.Peer import HBS2.Defaults +import HBS2.Storage import HBS2.Storage.Simple import HBS2.Storage.Simple.Extra import HBS2.Actors.ChunkWriter @@ -22,12 +25,17 @@ import Test.Tasty.HUnit import Codec.Serialise import Control.Concurrent.Async +import Control.Monad +import Control.Monad.Trans.Maybe import Data.ByteString.Lazy (ByteString) import Data.ByteString.Lazy.Char8 qualified as B8 +import Data.Cache (Cache) +import Data.Cache qualified as Cache import Data.Foldable import Data.Hashable +import Data.Map (Map) +import Data.Map qualified as Map import Data.Maybe -import Data.Traversable import Data.Word import Lens.Micro.Platform import Prettyprinter @@ -35,11 +43,7 @@ import System.Directory import System.Exit import System.FilePath.Posix import System.IO -import Data.Cache (Cache) -import Data.Cache qualified as Cache -import Data.Map (Map) -import Data.Map qualified as Map -import Control.Monad.Trans.Maybe +import Control.Concurrent import Control.Concurrent.STM import Control.Concurrent.STM.TQueue qualified as Q @@ -47,21 +51,12 @@ import Control.Concurrent.STM.TQueue qualified as Q debug :: (MonadIO m) => Doc ann -> m () debug p = liftIO $ hPrint stderr p -newtype ChunkSize = ChunkSize Word16 - deriving newtype (Num,Enum,Real,Integral,Pretty) - deriving stock (Eq,Ord,Show,Data,Generic) - - -newtype ChunkNum = ChunkNum Word16 - deriving newtype (Num,Enum,Real,Integral,Pretty) - deriving stock (Eq,Ord,Show,Data,Generic) -- FIXME: peer should be a part of the key -- therefore, key is ( p | cookie ) -- but client's cookie in protocol should be just ( cookie :: Word32 ) -type OnBlockReady h m = Hash h -> m () data BlockDownload m = BlockDownload @@ -90,81 +85,8 @@ newBlockDownload :: forall m . MonadIO m newBlockDownload h = BlockDownload h 0 0 0 -type GetBlockChunk h m = Hash h -> Offset -> Size -> m (Maybe ByteString) -type AcceptChunk h e m = Response e (BlockChunks e) m - => ( Cookie e, Peer e, Hash HbSync, ChunkNum, ByteString ) -> m () - -type GetBlockHash h e m = (Peer e, Cookie e) -> m (Maybe (Hash h)) - -data BlockChunksI e m = - BlockChunksI - { blkSize :: GetBlockSize HbSync m - , blkChunk :: GetBlockChunk HbSync m - , blkGetHash :: GetBlockHash HbSync e m - , blkAcceptChunk :: AcceptChunk HbSync e m - } - - -instance HasCookie e (BlockChunks e) where - type instance Cookie e = Word32 - getCookie (BlockChunks c _) = Just c - -data BlockChunks e = BlockChunks (Cookie e) (BlockChunksProto e) - deriving stock (Generic) - -data BlockChunksProto e = BlockGetAllChunks (Hash HbSync) ChunkSize - | BlockNoChunks - | BlockChunk ChunkNum ByteString - | BlockLost - deriving stock (Generic) - - -instance Serialise ChunkSize -instance Serialise ChunkNum -instance Serialise (BlockChunksProto e) -instance Serialise (BlockChunks e) - - -blockChunksProto :: forall e m . ( MonadIO m - , Response e (BlockChunks e) m - , Pretty (Peer e) - ) - => BlockChunksI e m - -> BlockChunks e - -> m () - -blockChunksProto adapter (BlockChunks c p) = - case p of - BlockGetAllChunks h size -> deferred proto do - bsz <- blkSize adapter h - - let offsets' = calcChunks (fromJust bsz) (fromIntegral size) :: [(Offset, Size)] - let offsets = zip offsets' [0..] - - for_ offsets $ \((o,sz),i) -> do - chunk <- blkChunk adapter h o sz - maybe (pure ()) (response_ . BlockChunk @e i) chunk - - BlockChunk n bs -> do - who <- thatPeer proto - h <- blkGetHash adapter (who, c) - - maybe1 h (response_ (BlockLost @e)) $ \hh -> do - void $ blkAcceptChunk adapter (c, who, hh, n, bs) - - BlockNoChunks {} -> do - -- TODO: notification - pure () - - BlockLost{} -> do - pure () - - where - proto = Proxy @(BlockChunks e) - response_ pt = response (BlockChunks c pt) - data Fake instance HasPeer Fake where @@ -235,23 +157,40 @@ delSession se l k = liftIO do expireSession se l = liftIO do Cache.purgeExpired (view l se) -runFakePeer :: forall e m . (e ~ Fake, m ~ IO) => Sessions e m -> EngineEnv e -> m () -runFakePeer se env = do + +-- newtype FullPeerM m a = RealPeerM { fromRealPeerM :: ReaderT } + +runFakePeer :: forall e b . ( e ~ Fake + -- , m ~ IO + , Messaging b e ByteString + -- , MonadIO m + -- , Response e p m + -- , EngineM e m + ) + => Peer e + -> b + -> EngineM e IO () + -> IO () +runFakePeer p bus work = do + + env <- newEnv p bus + + se <- emptySessions @e let pid = fromIntegral (hash (env ^. self)) :: Word8 - dir <- canonicalizePath ( ".peers" show pid) + dir <- liftIO $ canonicalizePath ( ".peers" show pid) let chDir = dir "tmp-chunks" - createDirectoryIfMissing True dir + liftIO $ createDirectoryIfMissing True dir let opts = [ StoragePrefix dir ] - storage <- simpleStorageInit opts :: IO (SimpleStorage HbSync) + storage <- simpleStorageInit opts -- :: IO (SimpleStorage HbSync) - w <- async $ simpleStorageWorker storage + w <- liftIO $ async $ simpleStorageWorker storage cww <- newChunkWriterIO storage (Just chDir) @@ -342,16 +281,18 @@ runFakePeer se env = do -- ТАК ЧТО ТУТ ЖДЁМ, ДОПУСТИМ 2*mbSize и отваливаемся } - runPeer env - [ makeResponse (blockSizeProto (hasBlock storage) handleBlockInfo) - , makeResponse (blockChunksProto adapter) - ] + peer <- async $ runPeer env + [ makeResponse (blockSizeProto (hasBlock storage) handleBlockInfo) + , makeResponse (blockChunksProto adapter) + ] + + runEngineM env work simpleStorageStop storage stopChunkWriter cww - mapM_ cancel [w,cw] + mapM_ cancel [w,cw,peer] test1 :: IO () @@ -361,77 +302,90 @@ test1 = do fake <- newFakeP2P True - let peers@[p0,p1] = [0..1] :: [Peer Fake] - - envs@[e0,e1] <- forM peers $ \p -> newEnv p fake - - mtS <- emptySessions @Fake - let ee = zip (repeat mtS) envs - void $ race (pause (2 :: Timeout 'Seconds)) $ do - peerz <- mapM (async . uncurry runFakePeer) ee + let p0 = 0 :: Peer Fake + let p1 = 1 :: Peer Fake - runEngineM e0 $ do + p1Thread <- async $ runFakePeer p1 fake (liftIO $ forever yield) - let h = fromString "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ" + p0Thread <- async $ runFakePeer p0 fake $ do - -- TODO: #ASAP generate unique cookie!! - -- - -- FIXME: withAllCrap $ do ... - let s0 = (fst . head) ee + request p1 (GetBlockSize @Fake (fromString "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ")) + request p1 (GetBlockSize @Fake (fromString "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt")) - newCookie <- genCookie @Fake (p1, h) -- <<~~~ FIXME: generate a good session id! + let h = fromString "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ" - let cKey@(_, cookie) = (p1, newCookie) - let chsz = defChunkSize + -- cookie <- newSession ??? - debug $ "new cookie:" <+> pretty cookie + -- newCookie <- genCookie @Fake (p1, h) -- <<~~~ FIXME: generate a good session id! + -- let cKey@(_, cookie) = (p1, newCookie) - qblk <- liftIO Q.newTQueueIO + pure () - let onBlockReady bh = do - liftIO $ atomically $ Q.writeTQueue qblk bh + let peerz = p0Thread : [p1Thread] - let def = newBlockDownload h onBlockReady + -- peerz <- mapM (async . uncurry runFakePeer) ee - -- create sessions before sequesting anything - updSession s0 def sBlockDownload cKey (set sBlockChunkSize chsz) + --runEngineM e0 $ do - request p1 (GetBlockSize @Fake (fromString "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ")) - request p1 (GetBlockSize @Fake (fromString "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt")) - request p0 (GetBlockSize @Fake (fromString "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ")) - request p0 (GetBlockSize @Fake (fromString "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt")) + -- -- TODO: #ASAP generate unique cookie!! + -- -- + -- -- FIXME: withAllCrap $ do ... + -- let s0 = (fst . head) ee - -- TODO: #ASAP block ready notification + -- newCookie <- genCookie @Fake (p1, h) -- <<~~~ FIXME: generate a good session id! - debug $ "REQUEST BLOCK:" <+> pretty h <+> "from" <+> pretty p1 + -- let cKey@(_, cookie) = (p1, newCookie) + -- let chsz = defChunkSize - request p1 (BlockChunks @Fake cookie (BlockGetAllChunks h chsz)) + -- debug $ "new cookie:" <+> pretty cookie - blk <- liftIO $ atomically $ Q.readTQueue qblk + -- qblk <- liftIO Q.newTQueueIO - debug $ "BLOCK READY:" <+> pretty blk + -- let onBlockReady bh = do + -- liftIO $ atomically $ Q.writeTQueue qblk bh - -- TODO: смотрим, что за блок - -- если Merkle - то качаем рекурсивно - -- если ссылка - то смотрим, что за ссылка - -- проверяем пруфы - -- качаем рекурсивно + -- let def = newBlockDownload h onBlockReady - -- let mbLink = deserialiseOrFail @Merkle obj + -- -- create sessions before sequesting anything + -- updSession s0 def sBlockDownload cKey (set sBlockChunkSize chsz) - pure () + -- request p1 (GetBlockSize @Fake (fromString "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ")) + -- request p1 (GetBlockSize @Fake (fromString "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt")) - mapM_ cancel peerz + -- request p0 (GetBlockSize @Fake (fromString "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ")) + -- request p0 (GetBlockSize @Fake (fromString "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt")) - (_, e) <- waitAnyCatchCancel peerz + -- -- TODO: #ASAP block ready notification - debug (pretty $ show e) - debug "we're done" - assertBool "success" True - exitSuccess + -- debug $ "REQUEST BLOCK:" <+> pretty h <+> "from" <+> pretty p1 + + -- request p1 (BlockChunks @Fake cookie (BlockGetAllChunks h chsz)) + + -- blk <- liftIO $ atomically $ Q.readTQueue qblk + + -- debug $ "BLOCK READY:" <+> pretty blk + + -- -- TODO: смотрим, что за блок + -- -- если Merkle - то качаем рекурсивно + -- -- если ссылка - то смотрим, что за ссылка + -- -- проверяем пруфы + -- -- качаем рекурсивно + + -- -- let mbLink = deserialiseOrFail @Merkle obj + + -- pure () + + mapM_ cancel peerz + + (_, e) <- waitAnyCatchCancel peerz + + debug (pretty $ show e) + debug "we're done" + assertBool "success" True + exitSuccess assertBool "failed" False