lesser memory footprint

This commit is contained in:
Dmitry Zuikov 2023-02-06 12:19:18 +03:00
parent ce3cf2728a
commit 1f36cc82a0
7 changed files with 5 additions and 373 deletions

View File

@ -65,7 +65,6 @@ library
exposed-modules: exposed-modules:
HBS2.Actors HBS2.Actors
, HBS2.Actors.ChunkWriter
, HBS2.Actors.Peer , HBS2.Actors.Peer
, HBS2.Base58 , HBS2.Base58
, HBS2.Clock , HBS2.Clock

View File

@ -1,264 +0,0 @@
{-# Language UndecidableInstances #-}
module HBS2.Actors.ChunkWriter
( ChunkWriter
, newChunkWriterIO
, runChunkWriter
, stopChunkWriter
, delBlock
, commitBlock
, writeChunk
, getHash
, blocksInProcess
) where
import HBS2.Prelude
import HBS2.Actors
import HBS2.Hash
import HBS2.Storage
import HBS2.Defaults
import HBS2.Net.Proto.Sessions
import Data.Functor
import Data.ByteString.Lazy (ByteString)
import Prettyprinter
import Control.Concurrent.STM
import Control.Concurrent.STM.TVar as TV
import Data.Typeable
import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HashMap
import Data.IntMap qualified as IntMap
import Data.IntMap (IntMap)
class ( Eq salt
, Eq (Hash h)
, Hashable salt
, Hashable (Hash h)
, Typeable salt
, Typeable (Hash h)
, Hashed h ByteString
) => ChunkKey salt h
instance ( Hashable salt
, Typeable salt
, Eq salt
, Eq (Hash h)
, Hashable (Hash h)
, Typeable (Hash h)
, Hashed h ByteString
) => ChunkKey salt h
data Chunk h = P (IntMap ByteString)
| S ByteString
instance Hashed h ByteString => Monoid (Chunk h) where
mempty = P mempty
instance Hashed h ByteString => Semigroup (Chunk h) where
(<>) (P a) (P b) = P ( a <> b )
(<>) (S s1) (S s2) = S s3
where
s3 = s1 <> s2
(<>) p@(P{}) (S s) = S s3
where
(S s1) = toS p
s3 = s1 <> s
(<>) (S s) p@(P{}) = S s3
where
(S s1) = toS p
s3 = s <> s1
mkP :: Offset -> ByteString -> Chunk h
mkP o b = P (IntMap.singleton (fromIntegral o) b)
toS :: Hashed h ByteString => Chunk h -> Chunk h
toS s@(S{}) = s
toS (P xs) = S s
where
s = mconcat $ IntMap.elems xs
data ChunkWriter h m = forall a . ( MonadIO m
, Storage a h ByteString m
, Block ByteString ~ ByteString
) =>
ChunkWriter
{ stopped :: TVar Bool
, pipeline :: Pipeline IO ()
, storage :: a
, perBlock :: !(TVar (HashMap SKey (Chunk h)))
}
-- FIXME: delete lost blocks!
blocksInProcess :: MonadIO m => ChunkWriter h m -> m Int
blocksInProcess cw = do
liftIO $ readTVarIO (perBlock cw) <&> HashMap.size
runChunkWriter :: forall h m . ( Eq (Hash h)
, Hashable (Hash h)
, MonadIO m )
=> ChunkWriter h IO -> m ()
runChunkWriter = runChunkWriter2
runChunkWriter2 :: forall h m . ( Eq (Hash h)
, Hashable (Hash h)
, MonadIO m )
=> ChunkWriter h IO -> m ()
runChunkWriter2 w = do
-- liftIO $ createDirectoryIfMissing True ( dir w )
let tv = perBlock w
liftIO $ runPipeline (pipeline w)
-- fix \next -> do
-- keys <- liftIO $ readTVarIO tv <&> (L.take 20 . HashMap.keys)
-- liftIO $ forConcurrently_ keys $ \f -> flush w f
-- pause ( 1.00 :: Timeout 'Seconds)
-- next
stopChunkWriter :: MonadIO m => ChunkWriter h m -> m ()
stopChunkWriter w = do
liftIO $ atomically $ writeTVar (stopped w) True
newChunkWriterIO :: forall h a m . ( Key h ~ Hash h, h ~ HbSync
, Storage a h ByteString m
, Block ByteString ~ ByteString
, MonadIO m
)
=> a
-> Maybe FilePath
-> m (ChunkWriter h m)
newChunkWriterIO s _ = do
pip <- newPipeline defChunkWriterQ
mt <- liftIO $ newTVarIO mempty
running <- liftIO $ newTVarIO False
pure $
ChunkWriter
{ stopped = running
, pipeline = pip
, storage = s
, perBlock = mt
}
delBlock :: (MonadIO m, ChunkKey salt h, Pretty (Hash h))
=> ChunkWriter h IO
-> salt
-> Hash h
-> m ()
delBlock w salt h = liftIO do
let k = newSKey (salt, h)
let cache = perBlock w
liftIO $ atomically $ TV.modifyTVar' cache $ HashMap.delete k
writeChunk :: ( ChunkKey salt h
, MonadIO m
, Pretty (Hash h)
)
=> ChunkWriter h m
-> salt
-> Hash h
-> Offset
-> ByteString -> m ()
writeChunk = writeChunk2
getHash :: forall salt h m .
( ChunkKey salt h
, m ~ IO
, Block ByteString ~ ByteString
, Pretty (Hash h)
, Hashable (Hash h), Eq (Hash h)
)
=> ChunkWriter h m
-> salt
-> Hash h
-> m (Maybe (Hash h))
getHash = getHash2
commitBlock :: forall salt h m .
( ChunkKey salt h
, Hashed h ByteString
, Block ByteString ~ ByteString
, m ~ IO
, Pretty (Hash h)
)
=> ChunkWriter h m
-> salt
-> Hash h
-> m ()
commitBlock = commitBlock2
writeChunk2 :: (ChunkKey salt h, MonadIO m, Pretty (Hash h))
=> ChunkWriter h m
-> salt
-> Hash h
-> Offset
-> ByteString -> m ()
writeChunk2 w salt h o !bs = do
let cache = perBlock w
let k = newSKey (salt, h)
liftIO $ do
atomically $ modifyTVar cache (HashMap.insertWith (<>) k (mkP o bs) )
getHash2 :: forall salt h m .
( ChunkKey salt h
, Hashed h ByteString
, m ~ IO
, Block ByteString ~ ByteString
, Pretty (Hash h)
, Hashable (Hash h), Eq (Hash h)
)
=> ChunkWriter h IO
-> salt
-> Hash h
-> m (Maybe (Hash h))
getHash2 w salt h = do
let k = newSKey (salt, h)
chunk <- readTVarIO (perBlock w) <&> fmap toS . HashMap.lookup k
case chunk of
Just (S s) -> pure (Just (hashObject s))
_ -> pure Nothing
commitBlock2 :: forall salt h m .
( ChunkKey salt h
, Hashed h ByteString
, Block ByteString ~ ByteString
, m ~ IO
, Pretty (Hash h)
)
=> ChunkWriter h m
-> salt
-> Hash h
-> m ()
commitBlock2 w@(ChunkWriter {storage = stor}) salt h = do
let k = newSKey (salt, h)
chunk <- readTVarIO (perBlock w) <&> fmap toS . HashMap.lookup k
case chunk of
Just (S s) -> void $ putBlock stor s >> delBlock w salt h
_ -> pure () -- FIXME: error

View File

@ -29,20 +29,17 @@ defStorePath :: IsString a => a
defStorePath = "hbs2" defStorePath = "hbs2"
defPipelineSize :: Int defPipelineSize :: Int
defPipelineSize = 16000*4 defPipelineSize = 16000
defChunkWriterQ :: Integral a => a
defChunkWriterQ = 16000*4
defBlockDownloadQ :: Integral a => a defBlockDownloadQ :: Integral a => a
defBlockDownloadQ = 65536*4 defBlockDownloadQ = 65536
defBlockDownloadThreshold :: Integral a => a defBlockDownloadThreshold :: Integral a => a
defBlockDownloadThreshold = 2 defBlockDownloadThreshold = 2
-- typical block hash 530+ chunks * parallel wip blocks amount -- typical block hash 530+ chunks * parallel wip blocks amount
defProtoPipelineSize :: Int defProtoPipelineSize :: Int
defProtoPipelineSize = 65536*4 defProtoPipelineSize = 65536*2
defCookieTimeoutSec :: Timeout 'Seconds defCookieTimeoutSec :: Timeout 'Seconds
defCookieTimeoutSec = 1200 defCookieTimeoutSec = 1200

View File

@ -63,7 +63,7 @@ common shared-properties
-- -fno-warn-unused-binds -- -fno-warn-unused-binds
-threaded -threaded
-rtsopts -rtsopts
"-with-rtsopts=-N4 -A256m -AL256m -I0" "-with-rtsopts=-N4 -A64m -AL256m -I0"
default-language: Haskell2010 default-language: Haskell2010

View File

@ -20,7 +20,7 @@ import Streaming qualified as S
import System.IO import System.IO
pieces :: Integral a => a pieces :: Integral a => a
pieces = 8192 pieces = 1024
class SimpleStorageExtra a where class SimpleStorageExtra a where
putAsMerkle :: forall h . (IsSimpleStorageKey h, Hashed h ByteString) => SimpleStorage h -> a -> IO MerkleHash putAsMerkle :: forall h . (IsSimpleStorageKey h, Hashed h ByteString) => SimpleStorage h -> a -> IO MerkleHash

View File

@ -107,19 +107,6 @@ test-suite test-skey
main-is: TestSKey.hs main-is: TestSKey.hs
test-suite test-cw
import: shared-properties
import: common-deps
default-language: Haskell2010
other-modules:
-- other-extensions:
type: exitcode-stdio-1.0
hs-source-dirs: test
main-is: TestChunkWriter.hs
executable test-udp executable test-udp
import: shared-properties import: shared-properties

View File

@ -1,87 +0,0 @@
module Main where
import HBS2.Prelude
import HBS2.Actors.ChunkWriter
import HBS2.Hash
import HBS2.Clock
import HBS2.Storage
import HBS2.Storage.Simple
import Data.Maybe
import Control.Monad.Except
import Control.Concurrent.Async
import Control.Monad
import Data.ByteString.Lazy (ByteString)
import Data.ByteString.Lazy.Char8 qualified as B8
import Data.Fixed
import Data.Functor
import qualified Data.Vector.Unboxed as U
import System.FilePath.Posix
import System.IO.Temp
import System.Random.MWC
import System.Random.Shuffle
import System.TimeIt
import Control.DeepSeq
import Control.Exception (evaluate)
import Data.List qualified as L
import Prettyprinter
main :: IO ()
main = do
-- let size = 1024*1024*1
let size = 256*1024
let chu = 500
g <- initialize $ U.fromList [0xFAFA, 0xBEBE, 0xC0C0]
bytes <- B8.pack <$> replicateM size (uniformM g)
let hash = hashObject bytes
let psz = calcChunks (fromIntegral size) (fromIntegral chu)
psz' <- shuffleM psz
withSystemTempDirectory "cww-test" $ \dir -> do
let opts = [ StoragePrefix (dir </> ".test-cww")
]
storage <- simpleStorageInit opts :: IO (SimpleStorage HbSync)
w1 <- replicateM 16 $ async (simpleStorageWorker storage)
cw <- newChunkWriterIO @HbSync storage (Just (dir </> ".qqq"))
w2 <- replicateM 8 $ async $ runChunkWriter cw
let times = 1000
let info = show $ "writing" <+> pretty (show (realToFrac size / (1024*1024) :: Fixed E2))
<+> "mb"
<+> pretty times <+> "times"
timeItNamed info $ do
failed <- replicateM times $ do
forConcurrently_ psz' $ \(o,s) -> do
let t = B8.take s $ B8.drop o bytes
writeChunk cw 1 hash (fromIntegral o) t
h2 <- getHash cw 1 hash
if Just hash /= h2 then do
pure [1]
else do
commitBlock cw 1 hash
pure mempty
mapM_ cancel $ w1 <> w2
print $ "failed" <+> pretty (sum (mconcat failed))