From 1f36cc82a0afec30643aaac315df87845322ce5d Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Mon, 6 Feb 2023 12:19:18 +0300 Subject: [PATCH] lesser memory footprint --- hbs2-core/hbs2-core.cabal | 1 - hbs2-core/lib/HBS2/Actors/ChunkWriter.hs | 264 ------------------ hbs2-core/lib/HBS2/Defaults.hs | 9 +- hbs2-peer/hbs2-peer.cabal | 2 +- .../lib/HBS2/Storage/Simple/Extra.hs | 2 +- hbs2-tests/hbs2-tests.cabal | 13 - hbs2-tests/test/TestChunkWriter.hs | 87 ------ 7 files changed, 5 insertions(+), 373 deletions(-) delete mode 100644 hbs2-core/lib/HBS2/Actors/ChunkWriter.hs delete mode 100644 hbs2-tests/test/TestChunkWriter.hs diff --git a/hbs2-core/hbs2-core.cabal b/hbs2-core/hbs2-core.cabal index 407ec0f1..bca17434 100644 --- a/hbs2-core/hbs2-core.cabal +++ b/hbs2-core/hbs2-core.cabal @@ -65,7 +65,6 @@ library exposed-modules: HBS2.Actors - , HBS2.Actors.ChunkWriter , HBS2.Actors.Peer , HBS2.Base58 , HBS2.Clock diff --git a/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs b/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs deleted file mode 100644 index 99b29fb7..00000000 --- a/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs +++ /dev/null @@ -1,264 +0,0 @@ -{-# Language UndecidableInstances #-} -module HBS2.Actors.ChunkWriter - ( ChunkWriter - , newChunkWriterIO - , runChunkWriter - , stopChunkWriter - , delBlock - , commitBlock - , writeChunk - , getHash - , blocksInProcess - ) where - -import HBS2.Prelude -import HBS2.Actors -import HBS2.Hash -import HBS2.Storage -import HBS2.Defaults -import HBS2.Net.Proto.Sessions - -import Data.Functor -import Data.ByteString.Lazy (ByteString) -import Prettyprinter - -import Control.Concurrent.STM -import Control.Concurrent.STM.TVar as TV - -import Data.Typeable - -import Data.HashMap.Strict (HashMap) -import Data.HashMap.Strict qualified as HashMap - -import Data.IntMap qualified as IntMap -import Data.IntMap (IntMap) - - -class ( Eq salt - , Eq (Hash h) - , Hashable salt - , Hashable (Hash h) - , Typeable salt - , Typeable (Hash h) - , Hashed h ByteString - ) => ChunkKey salt h - -instance ( Hashable salt - , Typeable salt - , Eq salt - , Eq (Hash h) - , Hashable (Hash h) - , Typeable (Hash h) - , Hashed h ByteString - ) => ChunkKey salt h - -data Chunk h = P (IntMap ByteString) - | S ByteString - -instance Hashed h ByteString => Monoid (Chunk h) where - mempty = P mempty - -instance Hashed h ByteString => Semigroup (Chunk h) where - (<>) (P a) (P b) = P ( a <> b ) - - (<>) (S s1) (S s2) = S s3 - where - s3 = s1 <> s2 - - (<>) p@(P{}) (S s) = S s3 - where - (S s1) = toS p - s3 = s1 <> s - - (<>) (S s) p@(P{}) = S s3 - where - (S s1) = toS p - s3 = s <> s1 - -mkP :: Offset -> ByteString -> Chunk h -mkP o b = P (IntMap.singleton (fromIntegral o) b) - -toS :: Hashed h ByteString => Chunk h -> Chunk h -toS s@(S{}) = s -toS (P xs) = S s - where - s = mconcat $ IntMap.elems xs - -data ChunkWriter h m = forall a . ( MonadIO m - , Storage a h ByteString m - , Block ByteString ~ ByteString - ) => - ChunkWriter - { stopped :: TVar Bool - , pipeline :: Pipeline IO () - , storage :: a - , perBlock :: !(TVar (HashMap SKey (Chunk h))) - } - - --- FIXME: delete lost blocks! -blocksInProcess :: MonadIO m => ChunkWriter h m -> m Int -blocksInProcess cw = do - liftIO $ readTVarIO (perBlock cw) <&> HashMap.size - -runChunkWriter :: forall h m . ( Eq (Hash h) - , Hashable (Hash h) - , MonadIO m ) - => ChunkWriter h IO -> m () - -runChunkWriter = runChunkWriter2 - - -runChunkWriter2 :: forall h m . ( Eq (Hash h) - , Hashable (Hash h) - , MonadIO m ) - => ChunkWriter h IO -> m () - -runChunkWriter2 w = do - -- liftIO $ createDirectoryIfMissing True ( dir w ) - let tv = perBlock w - liftIO $ runPipeline (pipeline w) - -- fix \next -> do - -- keys <- liftIO $ readTVarIO tv <&> (L.take 20 . HashMap.keys) - -- liftIO $ forConcurrently_ keys $ \f -> flush w f - -- pause ( 1.00 :: Timeout 'Seconds) - -- next - -stopChunkWriter :: MonadIO m => ChunkWriter h m -> m () -stopChunkWriter w = do - liftIO $ atomically $ writeTVar (stopped w) True - -newChunkWriterIO :: forall h a m . ( Key h ~ Hash h, h ~ HbSync - , Storage a h ByteString m - , Block ByteString ~ ByteString - , MonadIO m - ) - => a - -> Maybe FilePath - -> m (ChunkWriter h m) - -newChunkWriterIO s _ = do - pip <- newPipeline defChunkWriterQ - - mt <- liftIO $ newTVarIO mempty - - running <- liftIO $ newTVarIO False - - pure $ - ChunkWriter - { stopped = running - , pipeline = pip - , storage = s - , perBlock = mt - } - - -delBlock :: (MonadIO m, ChunkKey salt h, Pretty (Hash h)) - => ChunkWriter h IO - -> salt - -> Hash h - -> m () - -delBlock w salt h = liftIO do - let k = newSKey (salt, h) - let cache = perBlock w - liftIO $ atomically $ TV.modifyTVar' cache $ HashMap.delete k - -writeChunk :: ( ChunkKey salt h - , MonadIO m - , Pretty (Hash h) - ) - => ChunkWriter h m - -> salt - -> Hash h - -> Offset - -> ByteString -> m () - -writeChunk = writeChunk2 - - -getHash :: forall salt h m . - ( ChunkKey salt h - , m ~ IO - , Block ByteString ~ ByteString - , Pretty (Hash h) - , Hashable (Hash h), Eq (Hash h) - ) - => ChunkWriter h m - -> salt - -> Hash h - -> m (Maybe (Hash h)) - -getHash = getHash2 - - -commitBlock :: forall salt h m . - ( ChunkKey salt h - , Hashed h ByteString - , Block ByteString ~ ByteString - , m ~ IO - , Pretty (Hash h) - ) - => ChunkWriter h m - -> salt - -> Hash h - -> m () - -commitBlock = commitBlock2 - -writeChunk2 :: (ChunkKey salt h, MonadIO m, Pretty (Hash h)) - => ChunkWriter h m - -> salt - -> Hash h - -> Offset - -> ByteString -> m () - -writeChunk2 w salt h o !bs = do - - let cache = perBlock w - let k = newSKey (salt, h) - liftIO $ do - atomically $ modifyTVar cache (HashMap.insertWith (<>) k (mkP o bs) ) - -getHash2 :: forall salt h m . - ( ChunkKey salt h - , Hashed h ByteString - , m ~ IO - , Block ByteString ~ ByteString - , Pretty (Hash h) - , Hashable (Hash h), Eq (Hash h) - ) - => ChunkWriter h IO - -> salt - -> Hash h - -> m (Maybe (Hash h)) - -getHash2 w salt h = do - let k = newSKey (salt, h) - chunk <- readTVarIO (perBlock w) <&> fmap toS . HashMap.lookup k - case chunk of - Just (S s) -> pure (Just (hashObject s)) - _ -> pure Nothing - - -commitBlock2 :: forall salt h m . - ( ChunkKey salt h - , Hashed h ByteString - , Block ByteString ~ ByteString - , m ~ IO - , Pretty (Hash h) - ) - => ChunkWriter h m - -> salt - -> Hash h - -> m () - -commitBlock2 w@(ChunkWriter {storage = stor}) salt h = do - let k = newSKey (salt, h) - chunk <- readTVarIO (perBlock w) <&> fmap toS . HashMap.lookup k - - case chunk of - Just (S s) -> void $ putBlock stor s >> delBlock w salt h - _ -> pure () -- FIXME: error - - diff --git a/hbs2-core/lib/HBS2/Defaults.hs b/hbs2-core/lib/HBS2/Defaults.hs index 85e3125d..0b03fbfd 100644 --- a/hbs2-core/lib/HBS2/Defaults.hs +++ b/hbs2-core/lib/HBS2/Defaults.hs @@ -29,20 +29,17 @@ defStorePath :: IsString a => a defStorePath = "hbs2" defPipelineSize :: Int -defPipelineSize = 16000*4 - -defChunkWriterQ :: Integral a => a -defChunkWriterQ = 16000*4 +defPipelineSize = 16000 defBlockDownloadQ :: Integral a => a -defBlockDownloadQ = 65536*4 +defBlockDownloadQ = 65536 defBlockDownloadThreshold :: Integral a => a defBlockDownloadThreshold = 2 -- typical block hash 530+ chunks * parallel wip blocks amount defProtoPipelineSize :: Int -defProtoPipelineSize = 65536*4 +defProtoPipelineSize = 65536*2 defCookieTimeoutSec :: Timeout 'Seconds defCookieTimeoutSec = 1200 diff --git a/hbs2-peer/hbs2-peer.cabal b/hbs2-peer/hbs2-peer.cabal index dc3b8249..20badc1b 100644 --- a/hbs2-peer/hbs2-peer.cabal +++ b/hbs2-peer/hbs2-peer.cabal @@ -63,7 +63,7 @@ common shared-properties -- -fno-warn-unused-binds -threaded -rtsopts - "-with-rtsopts=-N4 -A256m -AL256m -I0" + "-with-rtsopts=-N4 -A64m -AL256m -I0" default-language: Haskell2010 diff --git a/hbs2-storage-simple/lib/HBS2/Storage/Simple/Extra.hs b/hbs2-storage-simple/lib/HBS2/Storage/Simple/Extra.hs index 2854ee1e..75ad3127 100644 --- a/hbs2-storage-simple/lib/HBS2/Storage/Simple/Extra.hs +++ b/hbs2-storage-simple/lib/HBS2/Storage/Simple/Extra.hs @@ -20,7 +20,7 @@ import Streaming qualified as S import System.IO pieces :: Integral a => a -pieces = 8192 +pieces = 1024 class SimpleStorageExtra a where putAsMerkle :: forall h . (IsSimpleStorageKey h, Hashed h ByteString) => SimpleStorage h -> a -> IO MerkleHash diff --git a/hbs2-tests/hbs2-tests.cabal b/hbs2-tests/hbs2-tests.cabal index 5e7a92ad..6059517c 100644 --- a/hbs2-tests/hbs2-tests.cabal +++ b/hbs2-tests/hbs2-tests.cabal @@ -107,19 +107,6 @@ test-suite test-skey main-is: TestSKey.hs -test-suite test-cw - import: shared-properties - import: common-deps - default-language: Haskell2010 - - other-modules: - - -- other-extensions: - - type: exitcode-stdio-1.0 - hs-source-dirs: test - main-is: TestChunkWriter.hs - executable test-udp import: shared-properties diff --git a/hbs2-tests/test/TestChunkWriter.hs b/hbs2-tests/test/TestChunkWriter.hs deleted file mode 100644 index f18fb157..00000000 --- a/hbs2-tests/test/TestChunkWriter.hs +++ /dev/null @@ -1,87 +0,0 @@ -module Main where - -import HBS2.Prelude -import HBS2.Actors.ChunkWriter -import HBS2.Hash -import HBS2.Clock -import HBS2.Storage -import HBS2.Storage.Simple - -import Data.Maybe -import Control.Monad.Except -import Control.Concurrent.Async -import Control.Monad -import Data.ByteString.Lazy (ByteString) -import Data.ByteString.Lazy.Char8 qualified as B8 -import Data.Fixed -import Data.Functor -import qualified Data.Vector.Unboxed as U -import System.FilePath.Posix -import System.IO.Temp -import System.Random.MWC -import System.Random.Shuffle -import System.TimeIt - -import Control.DeepSeq -import Control.Exception (evaluate) - -import Data.List qualified as L -import Prettyprinter - -main :: IO () -main = do - - -- let size = 1024*1024*1 - let size = 256*1024 - let chu = 500 - - g <- initialize $ U.fromList [0xFAFA, 0xBEBE, 0xC0C0] - - bytes <- B8.pack <$> replicateM size (uniformM g) - - let hash = hashObject bytes - - let psz = calcChunks (fromIntegral size) (fromIntegral chu) - - psz' <- shuffleM psz - - withSystemTempDirectory "cww-test" $ \dir -> do - - let opts = [ StoragePrefix (dir ".test-cww") - ] - - storage <- simpleStorageInit opts :: IO (SimpleStorage HbSync) - - w1 <- replicateM 16 $ async (simpleStorageWorker storage) - - cw <- newChunkWriterIO @HbSync storage (Just (dir ".qqq")) - - w2 <- replicateM 8 $ async $ runChunkWriter cw - - let times = 1000 - - let info = show $ "writing" <+> pretty (show (realToFrac size / (1024*1024) :: Fixed E2)) - <+> "mb" - <+> pretty times <+> "times" - - timeItNamed info $ do - - failed <- replicateM times $ do - - forConcurrently_ psz' $ \(o,s) -> do - let t = B8.take s $ B8.drop o bytes - writeChunk cw 1 hash (fromIntegral o) t - - h2 <- getHash cw 1 hash - - if Just hash /= h2 then do - pure [1] - else do - commitBlock cw 1 hash - pure mempty - - mapM_ cancel $ w1 <> w2 - - print $ "failed" <+> pretty (sum (mconcat failed)) - -