mirror of https://github.com/voidlizard/hbs2
NCQv1 storage removed
This commit is contained in:
parent
d04926ee05
commit
ed44770f55
|
@ -76,7 +76,6 @@ library
|
|||
HBS2.Storage.NCQ3.Internal.Flags
|
||||
HBS2.Storage.NCQ3.Internal.Fsync
|
||||
HBS2.Storage.NCQ3.Internal.CLI
|
||||
HBS2.Storage.NCQ
|
||||
HBS2.Storage.NCQ.Types
|
||||
-- other-modules:
|
||||
-- other-extensions:
|
||||
|
|
|
@ -1183,26 +1183,6 @@ executable test-scripts
|
|||
, zstd
|
||||
|
||||
|
||||
executable tcq
|
||||
import: shared-properties
|
||||
import: common-deps
|
||||
default-language: Haskell2010
|
||||
ghc-options:
|
||||
hs-source-dirs: test
|
||||
main-is: TCQ.hs
|
||||
build-depends:
|
||||
base, hbs2-core, hbs2-log-structured, hbs2-storage-ncq
|
||||
, network
|
||||
, string-conversions
|
||||
, db-pipe
|
||||
, suckless-conf
|
||||
, network-byte-order
|
||||
, text
|
||||
, time
|
||||
, mmap
|
||||
, zstd
|
||||
, unix
|
||||
|
||||
|
||||
executable test-ncq
|
||||
import: shared-properties
|
||||
|
|
|
@ -1,786 +0,0 @@
|
|||
{-# Language AllowAmbiguousTypes #-}
|
||||
{-# Language UndecidableInstances #-}
|
||||
{-# Language MultiWayIf #-}
|
||||
{-# Language RecordWildCards #-}
|
||||
{-# Language ViewPatterns #-}
|
||||
{-# OPTIONS_GHC -Wno-orphans #-}
|
||||
module Main where
|
||||
|
||||
import HBS2.Prelude.Plated
|
||||
import HBS2.OrDie
|
||||
import HBS2.Hash
|
||||
import HBS2.Data.Types.Refs
|
||||
import HBS2.Clock
|
||||
import HBS2.Merkle
|
||||
|
||||
import HBS2.Storage
|
||||
import HBS2.Storage.Simple
|
||||
import HBS2.Storage.Operations.ByteString
|
||||
import HBS2.Storage.Operations.Delete
|
||||
import HBS2.Net.Auth.Credentials
|
||||
import HBS2.Peer.Proto.RefLog
|
||||
import HBS2.Peer.Proto.LWWRef
|
||||
import HBS2.Data.Types.SignedBox
|
||||
|
||||
import HBS2.System.Logger.Simple.ANSI
|
||||
|
||||
import HBS2.Storage.NCQ
|
||||
import HBS2.Data.Log.Structured.SD
|
||||
import HBS2.Data.Log.Structured.NCQ
|
||||
|
||||
import HBS2.CLI.Run.Internal.Merkle
|
||||
|
||||
import Data.Config.Suckless.Syntax
|
||||
import Data.Config.Suckless.Script as SC
|
||||
import Data.Config.Suckless.System as SF
|
||||
import Data.Config.Suckless.Script.File as SF
|
||||
|
||||
import DBPipe.SQLite hiding (field)
|
||||
|
||||
import System.Random.MWC as MWC
|
||||
|
||||
import System.IO.Temp as Temp
|
||||
import Data.Bits
|
||||
import Data.ByteString (ByteString)
|
||||
import Data.ByteString qualified as BS
|
||||
import Data.ByteString.Lazy qualified as LBS
|
||||
import Data.Text.Encoding qualified as TE
|
||||
import Data.ByteString.Char8 qualified as BS8
|
||||
import Data.ByteString.Builder
|
||||
import Data.Maybe
|
||||
import Data.Word
|
||||
import Data.List qualified as List
|
||||
import Data.Vector qualified as V
|
||||
import Data.Vector ((!))
|
||||
import Control.Monad.Trans.Cont
|
||||
import Control.Monad.Trans.Maybe
|
||||
import Control.Monad.Except (runExceptT)
|
||||
import Network.ByteOrder qualified as N
|
||||
import Data.Coerce
|
||||
import Data.HashPSQ qualified as HPSQ
|
||||
import Data.HashSet qualified as HS
|
||||
import Data.HashMap.Strict (HashMap)
|
||||
import Data.HashMap.Strict qualified as HM
|
||||
import Data.IntMap qualified as IntMap
|
||||
import Data.IntMap (IntMap)
|
||||
import Data.Fixed
|
||||
import System.Environment
|
||||
import System.Directory
|
||||
import System.Posix.Fcntl
|
||||
import System.Posix.IO
|
||||
import System.IO.MMap
|
||||
import System.IO qualified as IO
|
||||
import System.Exit (exitSuccess, exitFailure)
|
||||
import System.Random
|
||||
import System.Random.Stateful
|
||||
import Safe
|
||||
import Lens.Micro.Platform
|
||||
import Control.Concurrent.STM qualified as STM
|
||||
import Data.Hashable
|
||||
|
||||
import UnliftIO
|
||||
import UnliftIO.Async
|
||||
|
||||
import Text.InterpolatedString.Perl6 (qc)
|
||||
|
||||
import Streaming.Prelude qualified as S
|
||||
import System.TimeIt
|
||||
|
||||
import System.IO.Unsafe (unsafePerformIO)
|
||||
|
||||
{- HLINT ignore "Functor law" -}
|
||||
|
||||
setupLogger :: MonadIO m => m ()
|
||||
setupLogger = do
|
||||
-- setLogging @DEBUG $ toStderr . logPrefix "[debug] "
|
||||
setLogging @ERROR $ toStderr . logPrefix "[error] "
|
||||
setLogging @WARN $ toStderr . logPrefix "[warn] "
|
||||
setLogging @NOTICE $ toStdout . logPrefix ""
|
||||
|
||||
flushLoggers :: MonadIO m => m ()
|
||||
flushLoggers = do
|
||||
silence
|
||||
|
||||
silence :: MonadIO m => m ()
|
||||
silence = do
|
||||
setLoggingOff @DEBUG
|
||||
setLoggingOff @ERROR
|
||||
setLoggingOff @WARN
|
||||
setLoggingOff @NOTICE
|
||||
setLoggingOff @TRACE
|
||||
|
||||
|
||||
data TCQError =
|
||||
TCQAlreadyOpen FilePath
|
||||
| TCQGone FilePath
|
||||
deriving stock (Show,Typeable)
|
||||
|
||||
instance Exception TCQError
|
||||
|
||||
newtype TCQ =
|
||||
TCQ FilePath
|
||||
deriving newtype (Eq,Ord,Show,Typeable)
|
||||
|
||||
|
||||
main :: IO ()
|
||||
main = do
|
||||
|
||||
instances <- newTVarIO (mempty :: HashMap FilePath (NCQStorage, Async ()))
|
||||
|
||||
tvd <- newTVarIO mempty
|
||||
|
||||
let runScript dict argz what = liftIO do
|
||||
script <- either (error.show) pure $ parseTop what
|
||||
runM dict do
|
||||
bindCliArgs argz
|
||||
void $ evalTop script
|
||||
|
||||
let finalizeStorages = do
|
||||
debug "finalize ncq"
|
||||
r <- readTVarIO instances <&> HM.toList
|
||||
mapM_ ncqStorageStop (fmap (fst.snd) r)
|
||||
mapM_ wait (fmap (snd.snd) r)
|
||||
|
||||
let getNCQ (TCQ p) = do
|
||||
readTVarIO instances
|
||||
<&> HM.lookup p
|
||||
<&> fmap fst
|
||||
>>= orThrow (TCQGone p)
|
||||
|
||||
let getTCQ (TCQ p) = do
|
||||
readTVarIO instances
|
||||
<&> HM.lookup p
|
||||
>>= orThrow (TCQGone p)
|
||||
|
||||
let dict = makeDict @C do
|
||||
|
||||
entry $ bindMatch "--help" $ nil_ \case
|
||||
HelpEntryBound what -> helpEntry what
|
||||
[StringLike s] -> helpList True (Just s)
|
||||
_ -> helpList True Nothing
|
||||
|
||||
|
||||
hidden $ internalEntries
|
||||
|
||||
SF.entries
|
||||
|
||||
entry $ bindMatch "#!" $ nil_ $ const none
|
||||
|
||||
entry $ bindMatch "stdin" $ nil_ $ \case
|
||||
argz -> do
|
||||
liftIO getContents >>= runScript dict argz
|
||||
|
||||
entry $ bindMatch "file" $ nil_ $ \case
|
||||
( StringLike fn : argz ) -> do
|
||||
liftIO (readFile fn) >>= runScript dict argz
|
||||
|
||||
e -> error (show $ pretty $ mkList e)
|
||||
|
||||
|
||||
entry $ bindMatch "debug" $ nil_ \case
|
||||
|
||||
[ LitBoolVal False ] -> do
|
||||
setLoggingOff @DEBUG
|
||||
|
||||
[ StringLike "off" ] -> do
|
||||
setLoggingOff @DEBUG
|
||||
|
||||
_ ->
|
||||
setLogging @DEBUG $ toStderr . logPrefix "[debug] "
|
||||
|
||||
entry $ bindMatch "ncq:open" $ \case
|
||||
[ StringLike path ] -> do
|
||||
debug $ "ncq:open" <+> pretty path
|
||||
ncq <- ncqStorageOpen path
|
||||
r <- async (ncqStorageRun ncq)
|
||||
|
||||
e <- atomically do
|
||||
already <- readTVar instances <&> HM.member path
|
||||
if already then
|
||||
pure $ Left $ TCQAlreadyOpen path
|
||||
else do
|
||||
modifyTVar instances (HM.insert path (ncq,r))
|
||||
pure $ Right ()
|
||||
|
||||
either throwIO pure e
|
||||
|
||||
mkOpaque (TCQ path)
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
|
||||
entry $ bindMatch "ncq:poke" $ \case
|
||||
[ isOpaqueOf @TCQ -> Just tcq ] -> lift do
|
||||
ncq <- getNCQ tcq
|
||||
pure $ mkSym "okay"
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
entry $ bindMatch "ncq:fossilize" $ nil_ \case
|
||||
[ isOpaqueOf @TCQ -> Just tcq ] -> lift do
|
||||
ncq <- getNCQ tcq
|
||||
ncqIndexRightNow ncq
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
entry $ bindMatch "ncq:merge:step" $ \syn -> lift do
|
||||
|
||||
tcq <- case syn of
|
||||
[ isOpaqueOf @TCQ -> Just tcq ] -> do
|
||||
pure tcq
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
ncq <- getNCQ tcq
|
||||
ncqStorageMergeStep ncq
|
||||
|
||||
pure nil
|
||||
|
||||
entry $ bindMatch "ncq:compact" $ \syn -> lift do
|
||||
|
||||
tcq <- case syn of
|
||||
[ isOpaqueOf @TCQ -> Just tcq ] -> do
|
||||
pure tcq
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
ncq <- getNCQ tcq
|
||||
ncqCompact ncq
|
||||
|
||||
pure nil
|
||||
|
||||
entry $ bindMatch "ncq:compact:scan" $ \syn -> lift do
|
||||
|
||||
tcq <- case syn of
|
||||
[ isOpaqueOf @TCQ -> Just tcq ] -> do
|
||||
pure tcq
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
ncq <- getNCQ tcq
|
||||
r <- ncqLinearScanForCompact ncq (\_ _ -> none)
|
||||
|
||||
pure $ mkInt r
|
||||
|
||||
entry $ bindMatch "ncq:merge" $ \syn -> lift do
|
||||
|
||||
tcq <- case syn of
|
||||
[ isOpaqueOf @TCQ -> Just tcq ] -> do
|
||||
pure tcq
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
ncq <- getNCQ tcq
|
||||
ncqStorageMerge ncq
|
||||
|
||||
pure nil
|
||||
|
||||
entry $ bindMatch "ncq:close" $ nil_ \case
|
||||
[ isOpaqueOf @TCQ -> Just tcq ] -> lift do
|
||||
ncq <- getNCQ tcq
|
||||
ncqStorageStop ncq
|
||||
|
||||
void $ runMaybeT do
|
||||
(s,r) <- readTVarIO instances
|
||||
<&> HM.lookup (coerce tcq)
|
||||
>>= toMPlus
|
||||
|
||||
wait r
|
||||
atomically $ modifyTVar instances (HM.delete (coerce tcq))
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
|
||||
entry $ bindMatch "ncq:fsck" $ nil_ \case
|
||||
[ StringLike fpath ] -> lift do
|
||||
issues <- ncqFsck fpath
|
||||
|
||||
for_ issues $ \i -> do
|
||||
err $ viaShow i
|
||||
|
||||
unless (List.null issues) exitFailure
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
entry $ bindMatch "ncq:cached:entries" $ \case
|
||||
[ isOpaqueOf @TCQ -> Just tcq ] -> lift do
|
||||
NCQStorage{..} <- getNCQ tcq
|
||||
readTVarIO ncqCachedEntries <&> mkInt
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
entry $ bindMatch "ncq:locate" $ \case
|
||||
[ isOpaqueOf @TCQ -> Just tcq, HashLike hash ] -> lift do
|
||||
ncq <- getNCQ tcq
|
||||
ncqLocate ncq hash >>= \case
|
||||
Just x -> do
|
||||
parseSyntax (show $ pretty x) & either (error.show) pure
|
||||
|
||||
_ -> pure nil
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
entry $ bindMatch "ncq:has" $ \case
|
||||
[ isOpaqueOf @TCQ -> Just tcq, HashLike hash ] -> lift do
|
||||
ncq <- getNCQ tcq
|
||||
ncqStorageHasBlock ncq hash <&> maybe nil mkInt
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
|
||||
entry $ bindMatch "ncq:del" $ nil_ \case
|
||||
[ isOpaqueOf @TCQ -> Just tcq, HashLike hash ] -> lift do
|
||||
ncq <- getNCQ tcq
|
||||
ncqStorageDel ncq hash
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
entry $ bindMatch "ncq:flush" $ nil_ \case
|
||||
[ isOpaqueOf @TCQ -> Just tcq ] -> lift do
|
||||
ncq <- getNCQ tcq
|
||||
ncqStorageFlush ncq
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
entry $ bindMatch "ncq:stop" $ nil_ \case
|
||||
[ isOpaqueOf @TCQ -> Just tcq ] -> lift do
|
||||
(ncq, w) <- getTCQ tcq
|
||||
ncqStorageStop ncq
|
||||
debug "wait storage to stop"
|
||||
wait w
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
entry $ bindMatch "ncq:set:ref" $ \case
|
||||
[ isOpaqueOf @TCQ -> Just tcq, HashLike ref , HashLike val ] -> lift do
|
||||
ncq <- getNCQ tcq
|
||||
ncqStorageSetRef ncq ref val
|
||||
pure nil
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
entry $ bindMatch "ncq:del:ref" $ \case
|
||||
[ isOpaqueOf @TCQ -> Just tcq , HashLike ref ] -> lift do
|
||||
ncq <- getNCQ tcq
|
||||
ncqStorageDelRef ncq ref
|
||||
pure nil
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
entry $ bindMatch "ncq:get:ref" $ \case
|
||||
[ isOpaqueOf @TCQ -> Just tcq, HashLike w ] -> lift do
|
||||
ncq <- getNCQ tcq
|
||||
ref <- ncqStorageGetRef ncq w
|
||||
debug $ "ref" <+> pretty w <+> pretty ref
|
||||
pure $ maybe nil (mkSym . show . pretty) ref
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
entry $ bindMatch "ncq:get:reflog" $ \case
|
||||
[ isOpaqueOf @TCQ -> Just tcq, SignPubKeyLike reflog ] -> lift do
|
||||
ncq <- getNCQ tcq
|
||||
let sto = AnyStorage ncq
|
||||
let ha = hashObject @HbSync (RefLogKey @HBS2Basic reflog)
|
||||
debug $ "refhash" <+> pretty ha
|
||||
ref <- getRef sto (RefLogKey @HBS2Basic reflog)
|
||||
pure $ maybe nil (mkSym . show . pretty) ref
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
entry $ bindMatch "ncq:get:lwwref" $ \case
|
||||
[ isOpaqueOf @TCQ -> Just tcq, SignPubKeyLike lww ] -> lift do
|
||||
ncq <- getNCQ tcq
|
||||
let sto = AnyStorage ncq
|
||||
val <- runMaybeT do
|
||||
rv <- getRef sto (LWWRefKey @HBS2Basic lww) >>= toMPlus
|
||||
getBlock sto rv >>= toMPlus
|
||||
<&> unboxSignedBox @(LWWRef 'HBS2Basic) @HBS2Basic
|
||||
>>= toMPlus
|
||||
<&> snd
|
||||
|
||||
pure $ maybe nil (mkSym . show . pretty) val
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
entry $ bindMatch "ncq:refhash" $ \case
|
||||
[ isOpaqueOf @TCQ -> Just tcq, HashLike w ] -> lift do
|
||||
ncq <- getNCQ tcq
|
||||
let rf = ncqRefHash ncq w
|
||||
pure $ mkSym ( show $ pretty $ rf )
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
entry $ bindMatch "ncq:hash" $ \case
|
||||
[ isOpaqueOf @ByteString -> Just bs ] -> lift do
|
||||
pure $ mkSym ( show $ pretty $ hashObject @HbSync bs )
|
||||
|
||||
[ StringLike s ] -> lift do
|
||||
pure $ mkSym ( show $ pretty $ hashObject @HbSync (BS8.pack s) )
|
||||
|
||||
e -> pure nil
|
||||
|
||||
entry $ bindMatch "ncq:get" $ \case
|
||||
[ isOpaqueOf @TCQ -> Just tcq, HashLike hash ] -> lift do
|
||||
ncq <- getNCQ tcq
|
||||
ncqStorageGetBlock ncq hash >>= maybe (pure nil) mkOpaque
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
entry $ bindMatch "ncq:put" $ \syn -> do
|
||||
(tcq,bs) <- case syn of
|
||||
[ isOpaqueOf @TCQ -> Just tcq, isOpaqueOf @ByteString -> Just bs ] -> lift do
|
||||
pure (tcq, LBS.fromStrict bs)
|
||||
|
||||
[ isOpaqueOf @TCQ -> Just tcq, TextLike s ] -> lift do
|
||||
pure (tcq, LBS.fromStrict (TE.encodeUtf8 s))
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
lift do
|
||||
ncq <- getNCQ tcq
|
||||
r <- ncqStoragePutBlock ncq bs
|
||||
pure $ maybe nil (mkSym . show . pretty) r
|
||||
|
||||
entry $ bindMatch "ncq:merkle:hashes" $ \case
|
||||
[ isOpaqueOf @TCQ -> Just tcq, HashLike h ] -> lift do
|
||||
ncq <- getNCQ tcq
|
||||
liftIO do
|
||||
let sto = AnyStorage ncq
|
||||
mkList <$> S.toList_ do
|
||||
walkMerkle (coerce h) (getBlock sto) $ \case
|
||||
Left{} -> throwIO MissedBlockError
|
||||
Right (hrr :: [HashRef]) -> do
|
||||
forM_ hrr $ \hx -> do
|
||||
S.yield (mkSym $ show $ pretty hx)
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
|
||||
entry $ bindMatch "sqlite:nwrite" $ nil_ \case
|
||||
[ LitIntVal tn', LitIntVal n ] -> lift do
|
||||
|
||||
let tn = fromIntegral tn'
|
||||
let num = fromIntegral n
|
||||
|
||||
|
||||
g <- liftIO MWC.createSystemRandom
|
||||
|
||||
for_ [1..tn] $ \tnn -> flip runContT pure do
|
||||
|
||||
let fnv = num `quot` tnn
|
||||
|
||||
mkdir "temp"
|
||||
|
||||
let tmp = "temp"
|
||||
|
||||
-- tmp <- ContT $ Temp.withTempDirectory "temp" "nwrite"
|
||||
|
||||
dbf <- liftIO $ Temp.emptyTempFile tmp "nwrite-.db"
|
||||
|
||||
db <- newDBPipeEnv dbPipeOptsDef dbf
|
||||
|
||||
pipe <- ContT $ withAsync (runPipe db)
|
||||
|
||||
tw <- newTVarIO 0
|
||||
|
||||
withDB db do
|
||||
ddl "create table if not exists block (hash blob not null primary key, value blob)"
|
||||
commitAll
|
||||
|
||||
withDB db do
|
||||
ddl [qc|
|
||||
pragma journal_mode=WAL;
|
||||
pragma synchronous=normal;
|
||||
|]
|
||||
|
||||
|
||||
t0 <- getTimeCoarse
|
||||
|
||||
ss <- replicateM num $ liftIO $ MWC.uniformRM (64*1024, 256*1024) g
|
||||
|
||||
liftIO $ pooledForConcurrentlyN_ tnn ss $ \size -> do
|
||||
lbs <- uniformByteStringM size g <&> LBS.fromStrict
|
||||
|
||||
let ha = hashObject @HbSync lbs
|
||||
|
||||
let sql = [qc|insert into block (hash, value) values(?,?) on conflict (hash) do nothing |]
|
||||
|
||||
withDB db do
|
||||
insert sql (coerce @_ @ByteString ha, lbs)
|
||||
atomically $ modifyTVar tw (+ (32 + size))
|
||||
|
||||
withDB db do
|
||||
commitAll
|
||||
|
||||
w <- readTVarIO tw
|
||||
t1 <- getTimeCoarse
|
||||
|
||||
let t = realToFrac (toNanoSecs (t1 - t0)) / 1e9
|
||||
let tsec = realToFrac @_ @(Fixed E2) t
|
||||
|
||||
let total = realToFrac w
|
||||
|
||||
let speed = if t > 0 then total / t else 0
|
||||
let totMegs = realToFrac @_ @(Fixed E2) $ total / (1024**2)
|
||||
let speedMbs = realToFrac @_ @(Fixed E2) $ speed / (1024**2)
|
||||
|
||||
notice $ pretty tnn <+> pretty (tsec) <+> pretty totMegs <+> pretty speedMbs
|
||||
|
||||
none
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
|
||||
entry $ bindMatch "sqlite:merkle:write" $ nil_ \case
|
||||
[ StringLike dbf, StringLike fname ] -> lift do
|
||||
db <- newDBPipeEnv dbPipeOptsDef dbf
|
||||
|
||||
|
||||
withDB db do
|
||||
ddl "create table if not exists block (hash blob not null primary key, value blob)"
|
||||
commitAll
|
||||
|
||||
withDB db do
|
||||
ddl [qc|
|
||||
pragma journal_mode=WAL;
|
||||
pragma synchronous=normal;
|
||||
|]
|
||||
|
||||
flip runContT pure do
|
||||
pipe <- ContT $ withAsync (runPipe db)
|
||||
|
||||
lbs <- liftIO $ LBS.readFile fname
|
||||
|
||||
chu <- S.toList_ (readChunkedBS lbs (256*1024))
|
||||
|
||||
let sql = [qc|insert into block (hash, value) values(?,?) on conflict (hash) do nothing |]
|
||||
|
||||
withDB db do
|
||||
hashes <- for chu $ \chunk -> do
|
||||
let ha = hashObject @HbSync chunk
|
||||
insert sql (coerce @_ @ByteString ha, chunk)
|
||||
pure ha
|
||||
|
||||
let pt = toPTree (MaxSize 1024) (MaxNum 256) hashes
|
||||
|
||||
m <- makeMerkle 0 pt $ \(ha,_,bss) -> do
|
||||
insert sql (coerce @_ @ByteString ha, bss)
|
||||
|
||||
withDB db do
|
||||
commitAll
|
||||
|
||||
pure $ mkSym @C (show $ pretty m)
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
entry $ bindMatch "ncq:merkle:write" $ \syn -> do
|
||||
(tcq,fname) <- case syn of
|
||||
[ isOpaqueOf @TCQ -> Just tcq, StringLike f ] -> lift do
|
||||
pure (tcq, f)
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
lift do
|
||||
ncq <- getNCQ tcq
|
||||
|
||||
lbs <- liftIO $ LBS.readFile fname
|
||||
|
||||
chu <- S.toList_ (readChunkedBS lbs (256*1024))
|
||||
hashes <- forConcurrently chu $ \chunk -> do
|
||||
ncqStoragePutBlock ncq chunk >>= orThrowUser "can't save"
|
||||
|
||||
-- FIXME: handle-hardcode
|
||||
let pt = toPTree (MaxSize 1024) (MaxNum 256) hashes -- FIXME: settings
|
||||
|
||||
m <- makeMerkle 0 pt $ \(_,_,bss) -> liftIO do
|
||||
void $ ncqStoragePutBlock ncq bss >>= orThrowUser "can't save"
|
||||
|
||||
pure $ mkSym (show $ pretty m)
|
||||
|
||||
|
||||
entry $ bindMatch "ncq:merkle:del" $ nil_ \syn -> do
|
||||
(sto,root) <- case syn of
|
||||
[ isOpaqueOf @TCQ -> Just tcq, HashLike root ] -> lift do
|
||||
|
||||
ncq <- AnyStorage <$> getNCQ tcq
|
||||
pure (ncq, root)
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
lift do
|
||||
deleteMerkleTree sto root
|
||||
|
||||
|
||||
entry $ bindMatch "ncq:merkle:read:stdout" $ nil_ \syn -> do
|
||||
(tcq,h) <- case syn of
|
||||
[ isOpaqueOf @TCQ -> Just tcq, HashLike f ] -> lift do
|
||||
pure (tcq, f)
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
lift do
|
||||
ncq <- getNCQ tcq
|
||||
|
||||
lbs <- runExceptT (getTreeContents (AnyStorage ncq) h)
|
||||
>>= orThrowPassIO
|
||||
|
||||
LBS.putStr lbs
|
||||
|
||||
|
||||
entry $ bindMatch "ncq:sd:scan:test" $ \case
|
||||
[StringLike fn] -> liftIO do
|
||||
|
||||
isDir <- SF.doesDirectoryExist fn
|
||||
|
||||
files <- if not isDir then
|
||||
pure [fn]
|
||||
else do
|
||||
S.toList_ do
|
||||
glob ["**/*.data"] [] fn $ \f -> do
|
||||
S.yield f
|
||||
pure True
|
||||
|
||||
|
||||
ttombs <- newTVarIO 0
|
||||
rrefs <- newTVarIO 0
|
||||
|
||||
for_ files $ \fp -> do
|
||||
|
||||
mmaped <- liftIO $ mmapFileByteString fp Nothing
|
||||
|
||||
runConsumeBS mmaped do
|
||||
readSections $ \size bs -> do
|
||||
let ssz = LBS.length bs
|
||||
let (_, rest1) = LBS.splitAt 32 bs
|
||||
let (prefix, _) = LBS.splitAt ncqPrefixLen rest1 & over _1 LBS.toStrict
|
||||
|
||||
if | prefix == ncqTombPrefix -> do
|
||||
atomically $ modifyTVar ttombs succ
|
||||
| prefix == ncqRefPrefix -> do
|
||||
atomically $ modifyTVar rrefs succ
|
||||
| otherwise -> none
|
||||
|
||||
r <- readTVarIO rrefs
|
||||
t <- readTVarIO ttombs
|
||||
|
||||
pure $ mkList [mkInt t, mkInt r]
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
entry $ bindMatch "ncq:scan:for-compact" $ nil_ \syn -> do
|
||||
ncq@NCQStorage{..} <- case syn of
|
||||
[ isOpaqueOf @TCQ -> Just tcq ] -> lift $ getNCQ tcq
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
ncqLinearScanForCompact ncq $ \fk h -> do
|
||||
notice $ "TO DELETE" <+> pretty fk <+> pretty h
|
||||
|
||||
entry $ bindMatch "ncq:nway:scan:cq:test" $ \case
|
||||
[StringLike fn] -> liftIO do
|
||||
|
||||
isDir <- SF.doesDirectoryExist fn
|
||||
|
||||
files <- if not isDir then
|
||||
pure [fn]
|
||||
else do
|
||||
S.toList_ do
|
||||
glob ["**/*.cq"] [] fn $ \f -> do
|
||||
S.yield f
|
||||
pure True
|
||||
|
||||
counters <- newTVarIO mempty
|
||||
|
||||
for_ files $ \f -> do
|
||||
|
||||
(mmaped,meta@NWayHash{..}) <- nwayHashMMapReadOnly f >>= orThrow (NWayHashInvalidMetaData f)
|
||||
|
||||
let emptyKey = BS.replicate nwayKeySize 0
|
||||
nwayHashScanAll meta mmaped $ \o k v -> do
|
||||
unless (k == emptyKey) do
|
||||
atomically do
|
||||
let k1 = hash k `mod` (2 ^ 17)
|
||||
modifyTVar counters (IntMap.insertWith (+) k1 1)
|
||||
|
||||
r <- readTVarIO counters <&> IntMap.size
|
||||
pure $ mkInt r
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
|
||||
entry $ bindMatch "ncq:nway:stats" $ \case
|
||||
[StringLike fn] -> liftIO do
|
||||
|
||||
mt_ <- newTVarIO 0
|
||||
total_ <- newTVarIO 0
|
||||
|
||||
(mmaped,meta@NWayHash{..}) <- nwayHashMMapReadOnly fn >>= orThrow (NWayHashInvalidMetaData fn)
|
||||
|
||||
let emptyKey = BS.replicate nwayKeySize 0
|
||||
nwayHashScanAll meta mmaped $ \o k v -> do
|
||||
atomically do
|
||||
modifyTVar total_ succ
|
||||
when (k == emptyKey) do
|
||||
modifyTVar mt_ succ
|
||||
|
||||
mt <- readTVarIO mt_
|
||||
total <- readTVarIO total_
|
||||
let used = total - mt
|
||||
|
||||
let ratio = realToFrac @_ @(Fixed E3) (realToFrac used / realToFrac total)
|
||||
|
||||
let stats = mkForm @C "stats" [ mkForm "empty" [mkInt mt]
|
||||
, mkForm "used" [mkInt used]
|
||||
, mkForm "total" [mkInt total]
|
||||
, mkForm "ratio" [mkDouble ratio]
|
||||
]
|
||||
|
||||
pure $ mkList [mkForm "metadata" [mkSyntax meta], stats]
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
|
||||
setupLogger
|
||||
|
||||
argz <- liftIO getArgs
|
||||
|
||||
forms <- parseTop (unlines $ unwords <$> splitForms argz)
|
||||
& either (error.show) pure
|
||||
|
||||
atomically $ writeTVar tvd dict
|
||||
|
||||
flip runContT pure do
|
||||
|
||||
ContT $ bracket none $ const do
|
||||
finalizeStorages
|
||||
flushLoggers
|
||||
|
||||
lift do
|
||||
case forms of
|
||||
|
||||
( cmd@(ListVal [StringLike "file", StringLike fn]) : _ ) -> do
|
||||
void $ run dict [cmd]
|
||||
|
||||
( cmd@(ListVal [StringLike "stdin"]) : _ ) -> do
|
||||
void $ run dict [cmd]
|
||||
|
||||
( cmd@(ListVal [StringLike "--help"]) : _ ) -> do
|
||||
void $ run dict [cmd]
|
||||
|
||||
[] -> do
|
||||
eof <- liftIO IO.isEOF
|
||||
if eof then
|
||||
void $ run dict [mkForm "help" []]
|
||||
else do
|
||||
what <- liftIO getContents
|
||||
>>= either (error.show) pure . parseTop
|
||||
|
||||
run dict what >>= eatNil display
|
||||
|
||||
e -> void $ run dict e
|
||||
|
||||
-- (runEval tvd forms >>= eatNil display)
|
||||
-- `finally` (finalizeStorages >> flushLoggers)
|
||||
|
||||
|
||||
|
|
@ -1,560 +1,19 @@
|
|||
{-# Language AllowAmbiguousTypes #-}
|
||||
{-# Language UndecidableInstances #-}
|
||||
{-# Language MultiWayIf #-}
|
||||
{-# Language RecordWildCards #-}
|
||||
{-# Language ViewPatterns #-}
|
||||
{-# OPTIONS_GHC -Wno-orphans #-}
|
||||
module Main where
|
||||
|
||||
import HBS2.Prelude.Plated
|
||||
import HBS2.OrDie
|
||||
import HBS2.Hash
|
||||
import HBS2.Data.Types.Refs
|
||||
import HBS2.Misc.PrettyStuff
|
||||
import HBS2.Clock
|
||||
import HBS2.Merkle
|
||||
import HBS2.Polling
|
||||
|
||||
import HBS2.Storage
|
||||
import HBS2.Storage.Simple
|
||||
import HBS2.Storage.Operations.ByteString
|
||||
|
||||
import HBS2.System.Logger.Simple.ANSI
|
||||
|
||||
import HBS2.Data.Log.Structured.SD
|
||||
import HBS2.Storage.NCQ
|
||||
import HBS2.Data.Log.Structured.NCQ
|
||||
|
||||
import HBS2.CLI.Run.Internal.Merkle
|
||||
|
||||
import Data.Config.Suckless.Syntax
|
||||
import Data.Config.Suckless.Script as SC
|
||||
import Data.Config.Suckless.System
|
||||
|
||||
import NCQTestCommon
|
||||
import NCQ3
|
||||
|
||||
import DBPipe.SQLite hiding (field)
|
||||
|
||||
import Codec.Compression.Zstd qualified as Zstd
|
||||
|
||||
import System.Posix.Files qualified as PFS
|
||||
import Numeric (showHex)
|
||||
import Data.Ord (Down(..))
|
||||
import Data.Char
|
||||
import Data.Bits
|
||||
import Data.ByteString (ByteString)
|
||||
import Data.ByteString qualified as BS
|
||||
import Data.ByteString.Lazy qualified as LBS
|
||||
import Data.Text.Encoding qualified as TE
|
||||
import Data.ByteString.Char8 qualified as BS8
|
||||
import Data.ByteString.Builder
|
||||
import Data.Hashable (hash)
|
||||
import Data.Maybe
|
||||
import Data.Either
|
||||
import Data.Word
|
||||
import Data.List qualified as List
|
||||
import Data.Vector qualified as V
|
||||
import Data.Vector ((!))
|
||||
import Control.Monad.Trans.Cont
|
||||
import Control.Monad.Trans.Maybe
|
||||
import Control.Monad.Except (runExceptT)
|
||||
import Network.ByteOrder qualified as N
|
||||
import Data.Coerce
|
||||
import Data.HashPSQ qualified as HPSQ
|
||||
import Data.HashSet qualified as HS
|
||||
import Data.HashSet (HashSet)
|
||||
import Data.HashMap.Strict (HashMap)
|
||||
import Data.HashMap.Strict qualified as HM
|
||||
import Data.IntMap qualified as IntMap
|
||||
import Data.IntMap (IntMap)
|
||||
import Data.IntSet (IntSet)
|
||||
import Data.IntSet qualified as IntSet
|
||||
import Data.Fixed
|
||||
import System.Environment
|
||||
import System.FilePath.Posix
|
||||
import System.Directory
|
||||
import System.Posix.Fcntl
|
||||
import System.Posix.IO
|
||||
import System.IO.MMap
|
||||
import System.IO qualified as IO
|
||||
import System.Exit (exitSuccess, exitFailure)
|
||||
import System.Random
|
||||
import System.Random.MWC as MWC
|
||||
import System.Random.Stateful
|
||||
import System.Random.Shuffle (shuffleM)
|
||||
import Safe
|
||||
import Lens.Micro.Platform
|
||||
import Control.Concurrent.STM qualified as STM
|
||||
import System.IO.Temp qualified as Temp
|
||||
import System.Mem
|
||||
|
||||
import UnliftIO
|
||||
import UnliftIO.Async
|
||||
|
||||
import Test.Tasty.HUnit
|
||||
import Text.InterpolatedString.Perl6 (qc)
|
||||
|
||||
import Streaming.Prelude qualified as S
|
||||
import System.TimeIt
|
||||
|
||||
import System.IO.Unsafe (unsafePerformIO)
|
||||
|
||||
import Data.BloomFilter.Easy as Bloom
|
||||
|
||||
{- HLINT ignore "Functor law" -}
|
||||
|
||||
|
||||
|
||||
|
||||
testNCQFuckupRecovery1 :: MonadUnliftIO m
|
||||
=> TestEnv
|
||||
-> m ()
|
||||
|
||||
testNCQFuckupRecovery1 TestEnv{..} = flip runContT pure do
|
||||
|
||||
let ncqDir = testEnvDir </> "ncq"
|
||||
|
||||
(cur,ha,h0) <- lift $ withNCQ id ncqDir $ \ncq -> do
|
||||
let sto = AnyStorage ncq
|
||||
|
||||
source <- LBS.take (100 * 1024^2) <$> liftIO (LBS.readFile "/dev/urandom")
|
||||
|
||||
let h0 = hashObject @HbSync source
|
||||
|
||||
hash <- runExceptT (writeAsMerkle sto source <&> HashRef)
|
||||
>>= orThrowPassIO @_ @SomeException
|
||||
|
||||
notice $ "stored" <+> pretty hash <+> pretty (LBS.length source)
|
||||
|
||||
pure (ncqGetCurrentName ncq, hash, h0)
|
||||
|
||||
liftIO do
|
||||
ss <- randomRIO (1, 32*1024)
|
||||
shit <- LBS.take ss <$> LBS.readFile "/dev/urandom"
|
||||
BS.appendFile cur (LBS.toStrict shit)
|
||||
newSize <- getFileSize cur
|
||||
notice $ "CURRENT-FILE" <+> pretty cur <+> "successfully corrupted" <+> pretty newSize
|
||||
|
||||
notice $ "CURRENT-FILE" <+> pretty cur
|
||||
|
||||
lift $ withNCQ id ncqDir $ \ncq -> do
|
||||
notice $ "REOPEN STORAGE"
|
||||
let sto = AnyStorage ncq
|
||||
|
||||
lbs <- runExceptT (getTreeContents sto ha)
|
||||
>>= orThrowPassIO
|
||||
|
||||
let h1 = hashObject @HbSync lbs
|
||||
|
||||
when (h0 /= h1) do
|
||||
error "corrupted state"
|
||||
|
||||
notice $ "loaded" <+> pretty ha <+> pretty (LBS.length lbs)
|
||||
|
||||
|
||||
|
||||
testNCQLongWrite :: MonadUnliftIO m => Int -> TestEnv -> m ()
|
||||
testNCQLongWrite n TestEnv{..} = flip runContT pure do
|
||||
let ncqDir = testEnvDir </> "ncq-simple"
|
||||
|
||||
-- Step 1: Write block
|
||||
lift $ withNCQ id ncqDir $ \ncq -> liftIO do
|
||||
let sto = AnyStorage ncq
|
||||
replicateM_ n do
|
||||
size <- randomRIO (1, 256*1024)
|
||||
let payload = LBS.replicate size 0x41 -- 0x41 = 'A'
|
||||
h <- putBlock sto payload
|
||||
assertBool "block written" (isJust h)
|
||||
|
||||
|
||||
testNCQLongWriteRead :: MonadUnliftIO m => Int -> TestEnv -> m ()
|
||||
testNCQLongWriteRead n TestEnv{..} = flip runContT pure do
|
||||
let ncqDir = testEnvDir </> "ncq-simple"
|
||||
|
||||
wq <- newTQueueIO
|
||||
|
||||
-- Step 1: Write block
|
||||
lift $ withNCQ id ncqDir $ \ncq -> liftIO do
|
||||
let sto = AnyStorage ncq
|
||||
replicateM_ n do
|
||||
size <- randomRIO (1, 256*1024)
|
||||
let payload = LBS.replicate size 0x41 -- 0x41 = 'A'
|
||||
h <- putBlock sto payload
|
||||
assertBool "block written" (isJust h)
|
||||
for_ h $ \hhh -> do
|
||||
atomically $ writeTQueue wq (HashRef hhh)
|
||||
|
||||
r <- atomically $ STM.flushTQueue wq
|
||||
|
||||
for_ r $ \h -> do
|
||||
s <- ncqLocate ncq h
|
||||
assertBool "actually written" (isJust s)
|
||||
|
||||
testNCQSimple1 :: MonadUnliftIO m => TestEnv -> m ()
|
||||
testNCQSimple1 TestEnv{..} = flip runContT pure do
|
||||
let ncqDir = testEnvDir </> "ncq-simple"
|
||||
|
||||
for_ [ 0 .. 18 ] $ \s -> do
|
||||
let size = 2 ^ s
|
||||
let payload = LBS.replicate size 0x41 -- 0x41 = 'A'
|
||||
let expectedHash = hashObject @HbSync payload
|
||||
|
||||
-- Step 1: Write block
|
||||
lift $ withNCQ id ncqDir $ \ncq -> do
|
||||
let sto = AnyStorage ncq
|
||||
h <- putBlock sto payload `orDie` "failed to write block"
|
||||
liftIO $ assertBool "hashes match (write)" (h == expectedHash)
|
||||
|
||||
-- Step 2: Read back
|
||||
lift $ withNCQ id ncqDir $ \ncq -> do
|
||||
let sto = AnyStorage ncq
|
||||
blk <- getBlock sto (coerce expectedHash) `orDie` "block not found"
|
||||
sx <- hasBlock sto (coerce expectedHash)
|
||||
|
||||
loc <- ncqLocate ncq (coerce expectedHash)
|
||||
>>= orThrowUser "not found"
|
||||
|
||||
blk0 <- ncqStorageGet_ ncq loc
|
||||
|
||||
let sblk0 = LBS.length <$> blk0
|
||||
|
||||
liftIO $ print $ "block size"
|
||||
<+> pretty sx
|
||||
<+> ";"
|
||||
<+> pretty (LBS.length blk)
|
||||
<+> ";"
|
||||
<+> pretty size
|
||||
<+> ";"
|
||||
<+> pretty sblk0
|
||||
<+> pretty loc
|
||||
|
||||
liftIO $ do
|
||||
assertBool "block has correct length" (LBS.length blk == size)
|
||||
assertBool "block contents are correct" (blk == payload)
|
||||
|
||||
|
||||
testNCQSimple2 :: MonadUnliftIO m => Int -> TestEnv -> m ()
|
||||
testNCQSimple2 n TestEnv{..} = flip runContT pure do
|
||||
let ncqDir = testEnvDir </> "ncq-simple2"
|
||||
|
||||
let alph_ = V.fromList ['A' .. 'z']
|
||||
cnt <- newTVarIO 0
|
||||
|
||||
let alphx = liftIO do
|
||||
i <- atomically $ stateTVar cnt (\x -> (x, succ x))
|
||||
pure $ alph_ ! ( i `mod` V.length alph_)
|
||||
|
||||
-- Step 1: write N blocks
|
||||
hashes <- lift $ withNCQ id ncqDir $ \ncq -> do
|
||||
let sto = AnyStorage ncq
|
||||
replicateM n do
|
||||
size <- liftIO $ randomRIO (0, 256 * 1024)
|
||||
chr <- alphx
|
||||
let payload = LBS.replicate size (fromIntegral $ ord chr)
|
||||
let h = hashObject @HbSync payload
|
||||
h' <- putBlock sto payload `orDie` "putBlock failed"
|
||||
loc <- ncqLocate ncq (coerce h)
|
||||
s <- hasBlock sto h
|
||||
w <- getBlock sto h
|
||||
let w' = fromMaybe mempty w
|
||||
|
||||
if w == Just payload then do
|
||||
debug $ "okay" <+> pretty loc
|
||||
else do
|
||||
err $ pretty s <> "/" <> pretty size
|
||||
<+> viaShow (LBS.take 48 w')
|
||||
<+> ".."
|
||||
<+> viaShow (LBS.take 8 $ LBS.reverse w')
|
||||
<> line
|
||||
<+> pretty loc
|
||||
|
||||
error "ABORTED"
|
||||
|
||||
liftIO $ assertBool "hash matches" (h == h')
|
||||
pure (h, size, payload)
|
||||
|
||||
let testRead ncq = do
|
||||
let sto = AnyStorage ncq
|
||||
forM_ hashes $ \(h, expectedSize, expectedPayload) -> do
|
||||
loc <- ncqLocate ncq (coerce h) >>= orThrowUser "not found"
|
||||
blk <- getBlock sto (coerce h) `orDie` "block not found"
|
||||
sx <- hasBlock sto (coerce h)
|
||||
blk0 <- ncqStorageGet_ ncq loc
|
||||
let sblk0 = LBS.length <$> blk0
|
||||
let actualSize = LBS.length blk
|
||||
|
||||
debug $ "block size"
|
||||
<+> pretty sx
|
||||
<+> ";"
|
||||
<+> pretty actualSize
|
||||
<+> ";"
|
||||
<+> pretty expectedSize
|
||||
<+> ";"
|
||||
<+> pretty sblk0
|
||||
<+> pretty loc
|
||||
|
||||
liftIO do
|
||||
assertBool "size match" (actualSize == expectedSize)
|
||||
assertBool "payload match" (blk == expectedPayload)
|
||||
|
||||
-- Step 2: reopen and verify
|
||||
lift $ withNCQ id ncqDir $ \ncq -> do
|
||||
testRead ncq
|
||||
-- ncqIndexRightNow ncq
|
||||
pause @'Seconds 2
|
||||
|
||||
liftIO $ print $ "LAST PASS"
|
||||
-- Step 3: reopen and verify - fossil
|
||||
lift $ withNCQ id ncqDir $ \ncq -> do
|
||||
testRead ncq
|
||||
|
||||
testNCQ1 :: MonadUnliftIO m
|
||||
=> Int
|
||||
-> TestEnv
|
||||
-> m ()
|
||||
|
||||
testNCQ1 n TestEnv{..} = flip runContT pure $ callCC \stop -> do
|
||||
|
||||
let tmp = testEnvDir
|
||||
|
||||
let inputDir = tmp </> "input"
|
||||
let ncqDir = tmp </> "ncq-test-data"
|
||||
|
||||
for_ [inputDir] mkdir
|
||||
|
||||
twritten <- newTVarIO (mempty :: HashSet HashRef)
|
||||
|
||||
nSize <- newTVarIO 0
|
||||
|
||||
tssQ <- newTQueueIO
|
||||
|
||||
forM_ [1..n] $ \i -> liftIO do
|
||||
withBinaryFile "/dev/urandom" ReadMode \urandom -> do
|
||||
let fname = inputDir </> show i <> ".bin"
|
||||
size <- randomRIO (1, 256*1024)
|
||||
atomically $ modifyTVar' nSize (+size)
|
||||
file <- BS.copy <$> BS.hGetSome urandom size
|
||||
BS.writeFile fname file
|
||||
let !ha = hashObject @HbSync file
|
||||
let !len = fromIntegral $ BS.length file
|
||||
-- atomically $ writeTQueue tssQ (fname, (ha, fromIntegral $! BS.length file))
|
||||
-- l <- getFileSize fname
|
||||
-- atomically $ writeTQueue tssQ (fname, (ha, l))
|
||||
atomically $ writeTQueue tssQ (fname, (ha, len))
|
||||
-- performGC
|
||||
|
||||
fss <- atomically (STM.flushTQueue tssQ)
|
||||
|
||||
stop ()
|
||||
|
||||
liftIO do
|
||||
withNCQ id ncqDir $ \ncq -> flip runContT pure do
|
||||
|
||||
let sto = AnyStorage ncq
|
||||
let fileMap = HM.fromList [ (ha,(s,fn)) | (fn,(ha,s)) <- fss ]
|
||||
|
||||
let
|
||||
written :: forall m a . (Fractional a, MonadIO m) => m [(HashRef, a)]
|
||||
written = readTVarIO twritten <&> HS.toList <&> fmap (,0.1)
|
||||
|
||||
ContT $ withAsync $ forever do
|
||||
polling (Polling 0.25 0.25) written $ \(HashRef hz) -> liftIO do
|
||||
what <- getBlock sto hz >>= orThrowUser ("block not found" <+> pretty hz)
|
||||
let h2 = hashObject @HbSync what
|
||||
|
||||
(s,_) <- HM.lookup hz fileMap & orThrowUser "fileMap entry missed"
|
||||
|
||||
ssz <- hasBlock sto hz
|
||||
>>= orThrowUser ("block size not found" <+> pretty hz)
|
||||
|
||||
when (ssz /= s) do
|
||||
error $ show $ "size mismatch" <+> pretty hz
|
||||
|
||||
when (hz /= h2) do
|
||||
error $ show $ pretty "hash does not match" <+> pretty hz <+> pretty s
|
||||
|
||||
liftIO $ forConcurrently_ fss $ \(fn, (ha,s)) -> do
|
||||
co <- liftIO (BS.readFile fn) <&> LBS.fromStrict
|
||||
h1 <- putBlock sto co >>= orThrowUser "block not written"
|
||||
lbs2 <- getBlock sto ha >>= orThrowUser "block not found"
|
||||
let h2 = hashObject @HbSync lbs2
|
||||
|
||||
when (ha /= h2 || h1 /= ha) do
|
||||
error $ show $ pretty "hash does not match" <+> pretty h1 <+> pretty s
|
||||
|
||||
atomically $ modifyTVar twritten (HS.insert (HashRef h1))
|
||||
|
||||
debug $ "putBlock" <+> pretty ha <+> pretty h2
|
||||
|
||||
liftIO $ forConcurrently_ fss $ \(fn, (ha,s)) -> do
|
||||
lbs2 <- getBlock sto ha >>= orThrowUser "block not found"
|
||||
let h2 = hashObject @HbSync lbs2
|
||||
|
||||
when (ha /= h2) do
|
||||
error $ show $ pretty "hash does not match" <+> pretty ha <+> pretty s
|
||||
|
||||
debug $ "getBlock" <+> pretty ha <+> pretty h2
|
||||
|
||||
liftIO do
|
||||
withNCQ id ncqDir $ \ncq -> flip runContT pure do
|
||||
|
||||
let sto = AnyStorage ncq
|
||||
|
||||
for_ fss $ \(fn, (ha,s)) -> do
|
||||
lbs2 <- getBlock sto ha >>= orThrowUser "block not found"
|
||||
let h2 = hashObject @HbSync lbs2
|
||||
|
||||
when (ha /= h2) do
|
||||
error $ show $ pretty "hash does not match" <+> pretty ha <+> pretty s
|
||||
|
||||
debug $ "getBlock" <+> pretty ha <+> pretty h2
|
||||
|
||||
|
||||
testNCQTree1 :: MonadUnliftIO m
|
||||
=> Int
|
||||
-> TestEnv
|
||||
-> m ()
|
||||
|
||||
testNCQTree1 n TestEnv{..} = flip runContT pure do
|
||||
|
||||
let size = 1024 * 1024 * fromIntegral n
|
||||
|
||||
let tmp = testEnvDir
|
||||
|
||||
let inputDir = tmp </> "input"
|
||||
let ncqDir = tmp </> "ncq-test-data"
|
||||
|
||||
treeLbs <- LBS.take size <$> liftIO (LBS.readFile ("/dev/urandom"))
|
||||
|
||||
let h1 = hashObject @HbSync treeLbs
|
||||
|
||||
lift $ withNCQ id ncqDir $ \ncq1 -> do
|
||||
|
||||
let sto = AnyStorage ncq1
|
||||
|
||||
r <- createTreeWithMetadata sto Nothing mempty treeLbs
|
||||
>>= orThrowPassIO
|
||||
|
||||
lbs2 <- runExceptT (getTreeContents sto r)
|
||||
>>= orThrowPassIO
|
||||
|
||||
let h2 = hashObject @HbSync lbs2
|
||||
|
||||
|
||||
let l1 = LBS.length treeLbs
|
||||
let l2 = LBS.length treeLbs
|
||||
display (mkList @C [mkSym r, mkSym h1, mkSym h2, mkInt l1, mkInt l2])
|
||||
|
||||
liftIO $ assertBool "hashes equal" (h1 == h2)
|
||||
|
||||
-- display (mkSym @C $ show $ pretty r)
|
||||
|
||||
testNCQRefs1 :: MonadUnliftIO m
|
||||
=> Int
|
||||
-> TestEnv
|
||||
-> m ()
|
||||
|
||||
testNCQRefs1 n TestEnv{..} = flip runContT pure do
|
||||
|
||||
let tmp = testEnvDir
|
||||
|
||||
let ncqDir = tmp </> "ncq-test-data"
|
||||
|
||||
refs <- liftIO $ replicateM n $ do
|
||||
ref <- SomeRefKey <$> randomIO @Word64
|
||||
val <- randomIO @Word64 <&> hashObject . serialise
|
||||
pure (ref, val)
|
||||
|
||||
lift $ withNCQ id ncqDir $ \ncq -> do
|
||||
let sto = AnyStorage ncq
|
||||
|
||||
for_ refs $ \(k,v) -> do
|
||||
updateRef sto k v
|
||||
|
||||
for_ refs $ \(k,v0) -> liftIO do
|
||||
v1 <- getRef sto k
|
||||
assertBool "refs equal 1" (Just v0 == v1)
|
||||
|
||||
notice $ "all" <+> pretty n <+> "refs found"
|
||||
|
||||
debug "restart storage"
|
||||
|
||||
lift $ withNCQ id ncqDir $ \ncq -> do
|
||||
let sto = AnyStorage ncq
|
||||
|
||||
for_ refs $ \(k,v0) -> liftIO do
|
||||
v1 <- getRef sto k
|
||||
assertBool "refs equal 2" (Just v0 == v1)
|
||||
delRef sto k
|
||||
|
||||
notice $ "all" <+> pretty n <+> "refs found after restart"
|
||||
|
||||
for_ refs $ \(k,_) -> liftIO do
|
||||
v1 <- getRef sto k
|
||||
assertBool "ref deleted" (isNothing v1)
|
||||
|
||||
notice $ "all" <+> pretty n <+> "refs deleted"
|
||||
|
||||
|
||||
testNCQConcurrent1 :: MonadUnliftIO m
|
||||
=> Bool
|
||||
-> Int
|
||||
-> Int
|
||||
-> TestEnv
|
||||
-> m ()
|
||||
|
||||
testNCQConcurrent1 noRead tn n TestEnv{..} = flip runContT pure do
|
||||
|
||||
let tmp = testEnvDir
|
||||
let inputDir = tmp </> "input"
|
||||
let ncqDir = tmp </> "ncq-test-data"
|
||||
|
||||
debug "preparing"
|
||||
|
||||
mkdir inputDir
|
||||
|
||||
debug $ pretty inputDir
|
||||
|
||||
filez <- liftIO $ pooledReplicateConcurrentlyN 8 n $ do
|
||||
size <- randomRIO (64*1024, 256*1024)
|
||||
w <- liftIO (randomIO :: IO Word8)
|
||||
let tbs = BS.replicate size w -- replicateM size w <&> BS.pack
|
||||
let ha = hashObject @HbSync tbs -- & show . pretty
|
||||
let fn = inputDir </> show (pretty ha)
|
||||
liftIO $ BS.writeFile fn tbs
|
||||
pure (fn, ha, BS.length tbs)
|
||||
|
||||
debug "done"
|
||||
|
||||
let fnv = V.fromList filez
|
||||
let ssz = sum [ s | (_,_,s) <- filez ] & realToFrac
|
||||
|
||||
setLoggingOff @DEBUG
|
||||
|
||||
for_ [1 .. tn] $ \tnn -> do
|
||||
|
||||
(t,_) <- timeItT $ liftIO $ withNCQ id ncqDir $ \ncq1 -> do
|
||||
|
||||
pooledForConcurrentlyN_ tnn fnv $ \(n,ha,_) -> do
|
||||
co <- BS.readFile n <&> LBS.fromStrict
|
||||
putBlock ncq1 co
|
||||
|
||||
pooledReplicateConcurrentlyN_ tnn (10 * V.length fnv) do
|
||||
unless noRead do
|
||||
i <- randomRIO (0, V.length fnv - 1)
|
||||
let (n,ha,_) = fnv ! i
|
||||
sz <- getBlock ncq1 ha
|
||||
none
|
||||
|
||||
let tt = realToFrac @_ @(Fixed E2) t
|
||||
let speed = ((ssz / (1024 **2)) / t) & realToFrac @_ @(Fixed E2)
|
||||
notice $ pretty tnn <+> pretty tt <+> pretty speed
|
||||
|
||||
rm ncqDir
|
||||
|
||||
|
||||
|
||||
main :: IO ()
|
||||
main = do
|
||||
|
||||
|
|
Loading…
Reference in New Issue