From 31c49e216945b90ff7f37d558ffba843d603e852 Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Sat, 21 Jan 2023 12:37:48 +0300 Subject: [PATCH] wip --- hbs2-tests/test/Peer2Main.hs | 130 +++++++++++++++++++++++++++++------ 1 file changed, 109 insertions(+), 21 deletions(-) diff --git a/hbs2-tests/test/Peer2Main.hs b/hbs2-tests/test/Peer2Main.hs index 70689b16..8c7aadab 100644 --- a/hbs2-tests/test/Peer2Main.hs +++ b/hbs2-tests/test/Peer2Main.hs @@ -4,30 +4,43 @@ {-# Language AllowAmbiguousTypes #-} module Main where -import HBS2.Prelude.Plated import HBS2.Actors +import HBS2.Actors.ChunkWriter +import HBS2.Clock +import HBS2.Defaults import HBS2.Hash import HBS2.Net.Messaging import HBS2.Net.Messaging.Fake import HBS2.Net.Proto import HBS2.Net.Proto.BlockChunks import HBS2.Net.Proto.BlockInfo +import HBS2.Prelude.Plated import HBS2.Storage -import HBS2.Defaults +import HBS2.Storage.Simple +import HBS2.Storage.Simple.Extra + +import Test.Tasty.HUnit -import Control.Monad.Reader -import Data.Foldable import Codec.Serialise hiding (encode,decode) +import Control.Concurrent.Async +import Control.Monad.Reader import Data.ByteString.Lazy (ByteString) +import Data.ByteString.Lazy.Char8 qualified as B8 import Data.Default +import Data.Foldable import Data.Map (Map) import Data.Map qualified as Map import Data.Word import GHC.TypeLits import Lens.Micro.Platform -import Control.Concurrent.Async - import Prettyprinter hiding (pipe) +import System.Directory +import System.Exit +import System.FilePath.Posix +import System.IO + +debug :: (MonadIO m) => Doc ann -> m () +debug p = liftIO $ hPrint stderr p data Fake @@ -91,10 +104,9 @@ class Monad m => HasOwnPeer e m where ownPeer :: m (Peer e) -data AnyStorage = forall s . Storage s HbSync ByteString IO => AnyStorage s +data AnyStorage = forall zu . Storage zu HbSync ByteString IO => AnyStorage zu -instance Storage s HbSync ByteString IO - => Storage AnyStorage HbSync ByteString IO where +instance (IsKey HbSync, Key HbSync ~ Hash HbSync) => Storage AnyStorage HbSync ByteString IO where putBlock (AnyStorage s) = putBlock s enqueueBlock (AnyStorage s) = enqueueBlock s @@ -102,6 +114,8 @@ instance Storage s HbSync ByteString IO getChunk (AnyStorage s) = getChunk s hasBlock (AnyStorage s) = hasBlock s +class HasStorage m where + getStorage :: m AnyStorage data Fabriq e = forall bus . Messaging bus e ByteString => Fabriq bus @@ -144,6 +158,7 @@ data PeerEnv e = PeerEnv { _envSelf :: Peer e , _envFab :: Fabriq e + , _envStorage :: AnyStorage , _envDeferred :: Pipeline IO () } @@ -188,9 +203,12 @@ instance Monad m => HasOwnPeer e (PeerM e m) where instance Monad m => HasFabriq e (PeerM e m) where getFabriq = asks (view envFab) -runPeerM :: MonadIO m => Peer e -> Fabriq e -> PeerM e m a -> m () -runPeerM p bus f = do - env <- PeerEnv p bus <$> newPipeline defProtoPipelineSize +instance Monad m => HasStorage (PeerM e m) where + getStorage = asks (view envStorage) + +runPeerM :: MonadIO m => AnyStorage -> Fabriq e -> Peer e -> PeerM e m a -> m () +runPeerM s bus p f = do + env <- PeerEnv p bus s <$> newPipeline defProtoPipelineSize let de = view envDeferred env as <- liftIO $ async $ runPipeline de void $ runReaderT (fromPeerM f) env @@ -238,6 +256,7 @@ runProto hh = do instance ( HasProtocol e p , Serialise (Encoded e) , MonadTrans (ResponseM e) + , HasStorage (PeerM e IO) ) => Response e p (ResponseM e (PeerM e IO)) where thatPeer _ = asks (view answTo) @@ -246,7 +265,8 @@ instance ( HasProtocol e p who <- asks (view answTo) fab <- lift $ getFabriq @e pip <- lift $ asks (view envDeferred) - liftIO $ addJob pip $ runPeerM who fab (runResponseM who action) + ss <- lift getStorage + liftIO $ addJob pip $ runPeerM ss fab who (runResponseM who action) response msg = do let proto = protoId @e @p (Proxy @p) @@ -256,18 +276,86 @@ instance ( HasProtocol e p let bs = serialise (AnyMessage @e proto (encode msg)) sendTo fab (To who) (From self) bs + +runTestPeer :: Peer Fake + -> (SimpleStorage HbSync -> IO ()) + -> IO () + +runTestPeer p zu = do + + dir <- liftIO $ canonicalizePath ( ".peers" show p) + let chDir = dir "tmp-chunks" + liftIO $ createDirectoryIfMissing True dir + + let opts = [ StoragePrefix dir + ] + + stor <- simpleStorageInit @_ @_ @HbSync opts + cww <- newChunkWriterIO stor (Just chDir) + + sw <- liftIO $ async $ simpleStorageWorker stor + cw <- liftIO $ async $ runChunkWriter cww + + zu stor + + simpleStorageStop stor + stopChunkWriter cww + + mapM_ cancel [sw,cw] + + + + main :: IO () main = do - print "preved" + hSetBuffering stderr LineBuffering - fake <- newFakeP2P True + void $ race (pause (10 :: Timeout 'Seconds)) $ do - runPeerM (FakePeer 0) (Fabriq fake) $ do - runProto @Fake - [ makeResponse (blockSizeProto undefined undefined) - -- , makeResponse (blockChunksProto undefined) - ] + fake <- newFakeP2P True <&> Fabriq + + let (p0:ps) = [0..1] :: [Peer Fake] + + -- others + others <- forM ps $ \p -> async $ runTestPeer p $ \s -> do + let findBlk = hasBlock s + + let size = 1024*1024 + + let blk = B8.concat [ fromString (take 1 $ show x) + | x <- replicate size (fromIntegral p :: Int) + ] + + root <- putAsMerkle s blk + + debug $ "I'm" <+> pretty p <+> pretty root + + runPeerM (AnyStorage s) fake p $ do + runProto @Fake + [ makeResponse (blockSizeProto findBlk dontHandle) + -- , makeResponse (blockChunksProto undefined) + ] + + our <- async $ runTestPeer p0 $ \s -> do + let blk = hasBlock s + runPeerM (AnyStorage s) fake p0 $ do + runProto @Fake + [ makeResponse (blockSizeProto blk dontHandle) + -- , makeResponse (blockChunksProto undefined) + ] + + pause ( 5 :: Timeout 'Seconds) + + mapM_ cancel (our:others) + + (_, e) <- waitAnyCatchCancel (our:others) + + debug (pretty $ show e) + debug "we're done" + assertBool "success" True + exitSuccess + + assertBool "failed" False - pure ()