{-# Language TemplateHaskell #-} module Main where import HBS2.Prelude import HBS2.Hash import HBS2.Data.Types.Refs import HBS2.Storage import HBS2.Storage.Simple import HBS2.Storage.Compact import System.TimeIt import DBPipe.SQLite import System.Environment import System.FilePath import System.IO.Temp import System.Random (randomRIO) import Control.Monad (replicateM) import Data.ByteString.Lazy qualified as LBS import Data.Word (Word8) import Data.Coerce import Data.Function import Text.InterpolatedString.Perl6 (qc) import Control.Monad import UnliftIO import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict qualified as HashMap import Data.Map (Map) import Data.Map qualified as Map import Data.List.Split (chunksOf) import Streaming.Prelude (Of,Stream) import Streaming.Prelude qualified as S import Control.Concurrent.STM (retry,flushTQueue) import Codec.Serialise -- Генерация одного случайного байта randomByte :: IO Word8 randomByte = randomRIO (0, 255) -- Генерация одной случайной байтовой строки заданной длины randomByteString :: Int -> IO LBS.ByteString randomByteString len = LBS.pack <$> replicateM len randomByte -- Генерация списка из n случайных байтовых строк заданной длины -- randomByteStrings :: Int -> Int -> IO (S.Stream (S.Of LBS.ByteString )) -- randomByteStrings n len = replicateM n (randomByteString len) randomByteStrings :: MonadIO m => Int -> Int -> Stream (Of LBS.ByteString) m () randomByteStrings n len = replicateM_ n $ do bs <- liftIO $ randomByteString len S.yield bs -- chunkStream :: Monad m => Int -> Stream (Of LBS.ByteString) m r -> Stream (Of [LBS.ByteString]) m r -- chunkStream n = S.mapsM (sequence . take n) . S.chunksOf n main :: IO () main = do (ns:ss:pref:_) <- getArgs let n = readDef @Int 100 ns let s = readDef @Int 256 ss let p = pref bss <- S.toList_ $ randomByteStrings n s -- let bss41 = randomByteStrings (n `div` 2) s -- let bss42 = randomByteStrings (n`div` 2) s -- let bss43 = randomByteStrings (n`div`4) s -- let bss44 = randomByteStrings (n`div`4) s let path = pref ".test-storage" storage <- simpleStorageInit [StoragePrefix path] :: IO (SimpleStorage HbSync) workers <- replicateM 4 $ async (simpleStorageWorker storage) env <- newDBPipeEnv dbPipeOptsDef (path "bench.db") withDB env do ddl [qc| create table if not exists wtf ( hash text not null , val text not null , primary key (hash) ) |] commitAll print $ "preparing to write" <+> pretty n <+> "chunks" timeItNamed "write chunks to log" do fh <- openFile (path "lsm") AppendMode forM_ bss $ \bs -> do let h = hashObject @HbSync bs & pretty & show LBS.hPut fh (serialise (h,bs)) hClose fh timeItNamed "write chunks to simple storage" do mapM_ (enqueueBlock storage) bss timeItNamed "write chunks to sqlite test" do withDB env $ transactional do forM_ bss $ \bs -> do let h = hashObject @HbSync bs & pretty & show insert [qc|insert into wtf (hash,val) values(?,?)|] (h,bs) timeItNamed "write chunks to log 2" do buf <- newIORef (mempty, 0 :: Int) fh <- openFile (path "lsm2") AppendMode forM_ bss $ \bs -> do let h = hashObject @HbSync bs & pretty & show num <- atomicModifyIORef buf (\(chunks,sz) -> ((serialise (h,bs) : chunks,sz+1),sz+1)) when (num >= 16) do w <- atomicModifyIORef buf (\(chunks,_) -> ((mempty,0),chunks)) LBS.hPut fh (mconcat w) (w,_) <- readIORef buf LBS.hPut fh (mconcat w) hClose fh let cn = length bss `div` 2 let chu = chunksOf cn bss timeItNamed "write chunks to compact-storage" do temp <- liftIO $ emptyTempFile "." "compact-storage" sto <- compactStorageOpen mempty temp w <- for chu $ \css -> do async do for_ css $ \bs -> do let h = hashObject @HbSync bs compactStoragePut sto (coerce h) (LBS.toStrict bs) mapM_ wait w compactStorageClose sto timeItNamed "write chunks to LSM-mock" do if n*s > 1073741824 then do print "too much" else do let k = 6 let batch = 100 rem <- newTVarIO n quit <- newTVarIO False out <- newTQueueIO queue <- newTQueueIO w1 <- async do fix \next -> do r <- readTVarIO rem when (r > 0) do b <- S.toList_ (randomByteStrings (min r batch) s) atomically $ do writeTQueue queue b modifyTVar rem (+ (-batch)) next atomically $ writeTVar quit True as <- forM [1..k] \j -> async do mem <- newIORef mempty -- mem <- newTVarIO mempty fix \next -> do b <- atomically $ do q <- readTVar quit if q then return Nothing else (Just <$> readTQueue queue) `orElse` ( readTVar quit >>= \z -> if z then pure Nothing else retry ) case b of Nothing -> pure () Just bss -> do new <- for bss $ \bs -> do let h = hashObject @HbSync bs & HashRef pure $ (h, bs) modifyIORef' mem (HashMap.union (HashMap.fromList new)) -- atomically $ modifyTVar mem (Map.union (Map.fromList new)) next co <- readIORef mem -- co <- readTVarIO mem atomically $ writeTQueue out co mapM_ wait (w1:as) result <- atomically $ flushTQueue out h <- openFile (path "lsm3") WriteMode LBS.hPut h (serialise result) hClose h