diff --git a/Makefile b/Makefile index c3c045d0..cec88f99 100644 --- a/Makefile +++ b/Makefile @@ -26,7 +26,7 @@ BINS := \ fixme-new \ hbs2-git3 \ git-remote-hbs23 \ - + hbs2-ncq \ RT_DIR := tests/RT diff --git a/cabal.project b/cabal.project index 8bc432f9..d4f96be6 100644 --- a/cabal.project +++ b/cabal.project @@ -14,4 +14,6 @@ debug-info: True -- executable-static: True -- profiling: True --library-profiling: False +debug-info: True + diff --git a/hbs2-cli/lib/HBS2/CLI/Run/Peer.hs b/hbs2-cli/lib/HBS2/CLI/Run/Peer.hs index 9b98e5ac..2d14f551 100644 --- a/hbs2-cli/lib/HBS2/CLI/Run/Peer.hs +++ b/hbs2-cli/lib/HBS2/CLI/Run/Peer.hs @@ -84,12 +84,12 @@ peerEntries = do [isOpaqueOf @LBS.ByteString -> Just lbs] -> do sto <- getStorage - (putBlock sto lbs <&> fmap (mkStr . show . pretty . HashRef) ) + (putBlock sto lbs <&> fmap (mkSym . show . pretty . HashRef) ) >>= orThrowUser "storage error" [isOpaqueOf @BS.ByteString -> Just bs] -> do sto <- getStorage - (putBlock sto (LBS.fromStrict bs) <&> fmap (mkStr . show . pretty . HashRef) ) + (putBlock sto (LBS.fromStrict bs) <&> fmap (mkSym . show . pretty . HashRef) ) >>= orThrowUser "storage error" -- FIXME: deprecate-this @@ -103,7 +103,14 @@ peerEntries = do sto <- getStorage lift $ putTextLit sto s - _ -> throwIO $ BadFormException @c nil + [] -> do + bs <- liftIO BS.getContents + sto <- getStorage + putBlock sto (LBS.fromStrict bs) >>= \case + Nothing -> pure nil + Just h -> pure $ mkSym (show $ pretty $ HashRef h) + + e -> throwIO $ BadFormException @c (mkList e) brief "checks if peer available" $ noArgs diff --git a/hbs2-git3/hbs2-git3.cabal b/hbs2-git3/hbs2-git3.cabal index 64cb1b51..4a83ee2b 100644 --- a/hbs2-git3/hbs2-git3.cabal +++ b/hbs2-git3/hbs2-git3.cabal @@ -56,6 +56,7 @@ common shared-properties build-depends: hbs2-core + , hbs2-log-structured , hbs2-peer , hbs2-storage-simple , hbs2-keyman-direct-lib diff --git a/hbs2-git3/lib/HBS2/Data/Log/Structured.hs b/hbs2-git3/lib/HBS2/Data/Log/Structured.hs index 3640bda9..29dc3542 100644 --- a/hbs2-git3/lib/HBS2/Data/Log/Structured.hs +++ b/hbs2-git3/lib/HBS2/Data/Log/Structured.hs @@ -1,256 +1,7 @@ -module HBS2.Data.Log.Structured where - -import HBS2.Prelude.Plated -import HBS2.OrDie - -import Network.ByteOrder qualified as N -import Data.ByteString.Builder qualified as B -import Data.ByteString.Lazy (ByteString) -import Data.ByteString.Lazy qualified as LBS -import Data.ByteString qualified as BS -import Data.Maybe -import Network.ByteOrder hiding (ByteString) -import Control.Monad.State - -import Codec.Compression.Zstd qualified as Zstd -import Codec.Compression.Zstd.Streaming qualified as Zstd -import Codec.Compression.Zstd.Streaming (Result(..)) - -import Control.Exception -import Control.Monad.Trans.Maybe -import Lens.Micro.Platform - --- import UnliftIO - -class ReadLogOpts a where - -data ReadLogError = SomeReadLogError - deriving stock (Typeable, Show) - -instance Exception ReadLogError - -instance ReadLogOpts () - -type NumBytes = Int - -class Monad m => BytesReader m where - noBytesLeft :: m Bool - readBytes :: NumBytes -> m ByteString - - readBytesMaybe :: NumBytes -> m (Maybe ByteString) - readBytesMaybe n = do - bs <- readBytes n - if LBS.length bs == fromIntegral n then pure (Just bs) else pure Nothing - -newtype ConsumeLBS m a = ConsumeLBS { fromConsumeLBS :: StateT ByteString m a } - deriving newtype ( Applicative - , Functor - , Monad - , MonadState ByteString - , MonadIO - , MonadTrans - ) - -readChunkThrow :: MonadIO m => Int -> ConsumeLBS m ByteString -readChunkThrow n = do - lbs <- get - let (this, that) = LBS.splitAt (fromIntegral n) lbs - if LBS.length this /= fromIntegral n then - liftIO $ throwIO SomeReadLogError - else do - put $! that - pure this - -readChunkSimple :: Monad m => Int -> ConsumeLBS m ByteString -readChunkSimple n = do - lbs <- get - let (this, that) = LBS.splitAt (fromIntegral n) lbs - put $! that - pure this - -reminds :: Monad m => ConsumeLBS m Int -reminds = gets (fromIntegral . LBS.length) - -consumed :: Monad m => ConsumeLBS m Bool -consumed = gets LBS.null - -runConsumeLBS :: Monad m => ByteString -> ConsumeLBS m a -> m a -runConsumeLBS s m = evalStateT (fromConsumeLBS m) s - -newtype ConsumeBS m a = ConsumeBS { fromConsumeBS :: StateT BS.ByteString m a } - deriving newtype ( Applicative - , Functor - , Monad - , MonadState BS.ByteString - , MonadIO - , MonadTrans - ) - - -instance Monad m => BytesReader (ConsumeLBS m) where - readBytes = readChunkSimple - noBytesLeft = consumed - -instance Monad m => BytesReader (ConsumeBS m) where - noBytesLeft = gets BS.null - readBytes n = do - s <- get - let (a,b) = BS.splitAt n s - put $! b - pure (LBS.fromStrict a) - -{- HLINT ignore "Eta reduce"-} -toSectionList :: BS.ByteString -> [BS.ByteString] -toSectionList source = go source +module HBS2.Data.Log.Structured + ( module Exported + ) where - go bs | BS.length bs < 4 = [] - | otherwise = go1 (BS.splitAt 4 bs & over _1 (fromIntegral . N.word32)) - go1 (len,rest) | BS.length rest < len = [] - - go1 (len,rest) = do - let (sect, rest1) = BS.splitAt len rest - sect : go rest1 - -validateSorted :: BS.ByteString -> Bool -validateSorted bs = do - let sections = toSectionList bs - let r = flip fix (Nothing, sections, 0) $ \next -> \case - (_, [], e) -> e - (Nothing, x:xs, e) -> next (Just x, xs, e) - (Just v, x:_, e) | v > x -> (e+1) - (Just _, x:xs, e) -> next (Just x, xs, e) - r == 0 - - -scanBS :: Monad m => BS.ByteString -> ( BS.ByteString -> m () ) -> m () -scanBS bs action = do - let hsz = 4 - flip fix bs $ \next bss -> do - if BS.length bss < hsz then pure () - else do - let (ssize, rest) = BS.splitAt hsz bss - let size = N.word32 ssize & fromIntegral - let (sdata, rest2) = BS.splitAt size rest - if BS.length sdata < size then - pure () - else do - action sdata - next rest2 - -runConsumeBS :: Monad m => BS.ByteString -> ConsumeBS m a -> m a -runConsumeBS s m = evalStateT (fromConsumeBS m) s - - -readSections :: forall m . (MonadIO m, BytesReader m) - => ( Int -> ByteString -> m () ) - -> m () - -readSections action = fix \next -> do - done <- noBytesLeft - if done then - pure () - else do - ssize <- readBytesMaybe 4 - >>= orThrow SomeReadLogError - <&> fromIntegral . N.word32 . LBS.toStrict - - sdata <- readBytesMaybe ssize - >>= orThrow SomeReadLogError - - action ssize sdata - next - -writeSection :: forall m . Monad m - => ByteString - -> ( ByteString -> m () ) - -> m () - -writeSection bs output = do - let bssize = bytestring32 (fromIntegral $ LBS.length bs) - let section = B.byteString bssize <> B.lazyByteString bs - output (B.toLazyByteString section) - - -writeSections :: forall m . Monad m - => m (Maybe ByteString) - -> ( ByteString -> m () ) - -> m () - -writeSections source sink = fix \next -> do - source >>= maybe none (\bs -> writeSection bs sink >> next) - - -data CompressedStreamError = - CompressedStreamWriteError - deriving stock (Typeable,Show) - -instance Exception CompressedStreamError - -writeCompressedChunkZstd :: forall m . MonadIO m - => ( ByteString -> m () ) - -> Result - -> Maybe ByteString - -> m Result - -writeCompressedChunkZstd sink stream mlbs = do - flip fix ( LBS.toChunks lbs, stream) $ \next -> \case - - ([], r@(Done s)) -> sink (LBS.fromStrict s) >> pure r - - (_, Done{}) -> liftIO (throwIO CompressedStreamWriteError) - - (_, Error{})-> liftIO (throwIO CompressedStreamWriteError) - - (w, Produce s continue) -> do - sink (LBS.fromStrict s) - c <- liftIO continue - next (w, c) - - (_, Consume consume) | isNothing mlbs -> do - r <- liftIO (consume mempty) - next ([], r) - - ([], r@(Consume{})) -> pure r - - (x:xs, r@(Consume consume)) -> do - what <- liftIO (consume x) - next (xs, what) - - where - lbs = fromMaybe mempty mlbs - - -writeCompressedStreamZstd :: forall m . MonadIO m - => Result - -> m (Maybe ByteString) - -> ( ByteString -> m () ) - -> m () -writeCompressedStreamZstd stream source sink = do - flip fix stream $ \next sn -> do - source >>= \case - Nothing -> writeCompressedChunkZstd sink sn Nothing >> none - Just lbs -> writeCompressedChunkZstd sink sn (Just lbs) >>= next - - -binarySearchBS :: Monad m - => Int -- ^ record size - -> ( BS.ByteString -> BS.ByteString ) -- ^ key extractor - -> BS.ByteString -- ^ key - -> BS.ByteString -- ^ source - -> m (Maybe Int) - -binarySearchBS rs getKey s source = do - let maxn = BS.length source `div` rs - loop 0 maxn - where - loop l u | u <= l = pure Nothing - | otherwise = do - let e = getKey (BS.drop ( k * rs ) source) - case compare e s of - EQ -> pure $ Just (k * rs) - LT -> loop (k+1) u - GT -> loop l k - - where k = (l + u) `div` 2 +import HBS2.Data.Log.Structured.SD as Exported diff --git a/hbs2-log-structured/LICENSE b/hbs2-log-structured/LICENSE new file mode 100644 index 00000000..3cbe915d --- /dev/null +++ b/hbs2-log-structured/LICENSE @@ -0,0 +1,30 @@ +Copyright (c) 2023, + +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above + copyright notice, this list of conditions and the following + disclaimer in the documentation and/or other materials provided + with the distribution. + + * Neither the name of nor the names of other + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/hbs2-log-structured/hbs2-log-structured.cabal b/hbs2-log-structured/hbs2-log-structured.cabal new file mode 100644 index 00000000..2c347a8c --- /dev/null +++ b/hbs2-log-structured/hbs2-log-structured.cabal @@ -0,0 +1,103 @@ +cabal-version: 3.0 +name: hbs2-log-structured +version: 0.25.0.1 +-- synopsis: +-- description: +license: BSD-3-Clause +license-file: LICENSE +-- author: +-- maintainer: +-- copyright: +category: Database +build-type: Simple +-- extra-doc-files: CHANGELOG.md +-- extra-source-files: + +common shared-properties + ghc-options: + -Wall + -- -fno-warn-unused-matches + -- -fno-warn-unused-do-bind + -- -Werror=missing-methods + -- -Werror=incomplete-patterns + -- -fno-warn-unused-binds + -- -threaded + -- -rtsopts + -- "-with-rtsopts=-N4 -A64m -AL256m -I0" + + default-language: Haskell2010 + + default-extensions: + ApplicativeDo + , BangPatterns + , BlockArguments + , ConstraintKinds + , DataKinds + , DeriveDataTypeable + , DeriveGeneric + , DerivingStrategies + , DerivingVia + , ExtendedDefaultRules + , FlexibleContexts + , FlexibleInstances + , GADTs + , GeneralizedNewtypeDeriving + , ImportQualifiedPost + , LambdaCase + , MultiParamTypeClasses + , OverloadedStrings + , QuasiQuotes + , ScopedTypeVariables + , StandaloneDeriving + , TupleSections + , TypeApplications + , TypeOperators + , TypeFamilies + + +library + import: shared-properties + exposed-modules: + HBS2.Data.Log.Structured.SD + HBS2.Data.Log.Structured.NCQ + + -- other-modules: + -- other-extensions: + build-depends: base, hbs2-core, suckless-conf + , async + , atomic-write + , binary + , bytestring + , bytestring-mmap + , cache + , containers + , directory + , filepath + , filepattern + , memory + , microlens-platform + , mmap + , mtl + , network-byte-order + , prettyprinter + , random + , safe + , serialise + , stm + , stm-chans + , streaming + , temporary + , time + , transformers + , uniplate + , unix + , unliftio + , unordered-containers + , vector + , zstd + + + hs-source-dirs: lib + default-language: Haskell2010 + + diff --git a/hbs2-log-structured/lib/HBS2/Data/Log/Structured/NCQ.hs b/hbs2-log-structured/lib/HBS2/Data/Log/Structured/NCQ.hs new file mode 100644 index 00000000..466dd3f3 --- /dev/null +++ b/hbs2-log-structured/lib/HBS2/Data/Log/Structured/NCQ.hs @@ -0,0 +1,351 @@ +{-# Language MultiWayIf #-} +{-# Language AllowAmbiguousTypes #-} +{-# Language UndecidableInstances #-} +{-# Language RecordWildCards #-} +module HBS2.Data.Log.Structured.NCQ where + +-- ^ N-way pseudo-cuckoo disk hash tables + +import HBS2.Prelude.Plated +import HBS2.OrDie +import HBS2.Hash +import HBS2.Data.Types.Refs +import HBS2.Merkle + +import Data.Config.Suckless.Syntax +import Data.Config.Suckless.Script +import Data.Config.Suckless.System + +import Data.Bits +import Data.ByteString (ByteString) +import Data.ByteString qualified as BS +import Data.ByteString.Char8 qualified as BS8 +import Data.ByteString.Builder +import Data.Maybe +import Data.Word +import Data.List qualified as List +import Data.Vector qualified as V +import Data.Vector ((!)) +import Control.Monad.Trans.Cont +import Control.Monad.Trans.Maybe +import Network.ByteOrder qualified as N +import Data.Coerce +import Data.HashMap.Strict (HashMap) +import Data.HashMap.Strict qualified as HM +import Data.IntMap qualified as IntMap +import Data.IntMap (IntMap) +import Data.Fixed +import System.Environment +import System.Posix.Fcntl +import System.Posix.IO +import System.Posix.Files (setFileSize) +import System.FilePath.Posix +import System.IO.MMap +import System.IO.Temp +import System.IO qualified as IO +import Safe +import Lens.Micro.Platform +import Control.Concurrent.STM qualified as STM +import UnliftIO + + + +nextPowerOf2 :: Word64 -> Word64 +nextPowerOf2 0 = 1 -- 0 округляем к 1 (минимальная степень двойки) +nextPowerOf2 n = + let n' = n - 1 + in foldl (\x shift -> x .|. (x `shiftR` shift)) n' [1,2,4,8,16,32,64] + 1 + +data NWayHashException = + NWayHashInvalidMetaData String + deriving stock (Show, Typeable) + +instance Exception NWayHashException + + +type NWayPageOff = Word64 +type NWayPageBuckets = Word64 + +data NWayHash = + NWayHash + { nwayKeySize :: Int + , nwayKeyPartSize :: Int + , nwayValueSize :: Int + , nwayBucketSize :: Int + , nwayPages :: [(NWayPageOff, NWayPageBuckets)] + } + deriving stock (Show) + +nwayItemSize :: NWayHash -> Int +nwayItemSize NWayHash{..} = nwayKeySize + nwayValueSize + +instance IsContext c => MkSyntax c NWayHash where + mkSyntax NWayHash{..} = + mkList [ mkForm "keysize" [mkInt nwayKeySize] + , mkForm "keypartsize" [mkInt nwayKeyPartSize] + , mkForm "valuesize" [mkInt nwayValueSize] + , mkForm "bucksize" [mkInt nwayBucketSize] + , mkForm "buckets" [mkInt x | x <- fmap snd nwayPages] + , mkForm "cqfile" [mkInt 1] + ] + +instance Pretty NWayHash where + pretty = pretty . mkSyntax @C + +nwayHashMMapReadOnly :: MonadUnliftIO m => FilePath -> m (Maybe (ByteString, NWayHash)) +nwayHashMMapReadOnly fn = runMaybeT do + + bs0 <- liftIO $ mmapFileByteString fn Nothing + + let size = BS.length bs0 + let (_,metasize) = BS.splitAt (size - 4) bs0 & over _2 (fromIntegral . N.word32) + let (_,meta) = BS.splitAt (size - metasize - 4) bs0 & over _2 (BS8.unpack . BS.take metasize) + + let bs1 = BS.take (BS.length bs0 - 4 - metasize) bs0 + + metaSyn <- parseTop meta & toMPlus + + nwayKeySize <- headMay [ x | MatchOption "keysize" (LitIntVal x) <- metaSyn ] + & orThrow (NWayHashInvalidMetaData "keysize") + <&> fromIntegral + + nwayValueSize <- headMay [ x | MatchOption "valuesize" (LitIntVal x) <- metaSyn ] + & orThrow (NWayHashInvalidMetaData "valuesize") + <&> fromIntegral + + nwayBucketSize <- headMay [ x | MatchOption "bucksize" (LitIntVal x) <- metaSyn ] + & orThrow (NWayHashInvalidMetaData "bucksize") + <&> fromIntegral + + nwayKeyPartSize <- headMay [ x | MatchOption "keypartsize" (LitIntVal x) <- metaSyn ] + & orThrow (NWayHashInvalidMetaData "keypartsize") + <&> fromIntegral + + let buckets' = [ bsz | ListVal (SymbolVal "buckets" : bsz) <- metaSyn ] + & mconcat + + let buckets = [ fromIntegral n :: NWayPageBuckets | LitIntVal n <- buckets' ] + + let isize = fromIntegral nwayKeySize + fromIntegral nwayValueSize + + let nwayPages= List.scanl' (\sz x -> sz + x*isize* fromIntegral nwayBucketSize) 0 buckets + & flip zip buckets + + when (List.null nwayPages) do + throwIO $ NWayHashInvalidMetaData "buckets" + + pure (bs1,NWayHash{..}) + +bucketSizes :: Int -> [Int] +bucketSizes maxSize = takeWhile (<= maxSize) fibs + where + fibs = 0 : 1 : zipWith (+) fibs (tail fibs) + +nwayHashLookup :: MonadUnliftIO m + => NWayHash + -> ByteString + -> ByteString + -> m (Maybe ByteString) + +nwayHashLookup nw@NWayHash{..} mmaped keyBs = do + let keySize = fromIntegral nwayKeySize + let valSize = fromIntegral nwayValueSize + let itemSize = fromIntegral $ nwayItemSize nw + let buckL = fromIntegral nwayBucketSize :: Word64 + let buckSize = fromIntegral $ fromIntegral buckL * nwayItemSize nw + let emptyKey = BS.replicate keySize 0 + + let hxs = chunks nwayKeyPartSize (keyBs :: ByteString) + + flip runContT pure $ callCC \exit -> do + + for_ (zip nwayPages hxs) $ \((pageOff,nbuck), hx) -> do + let ki = N.word64 hx + let buck = ki `mod` nbuck + -- let buck = ki .&. (nbuck - 1) + let baseOff = pageOff + buck * buckSize + let buckEnd = baseOff + itemSize * buckL + -- liftIO $ print $ niceHash <+> pretty ki <+> pretty buck <+> pretty baseOff <+> pretty nbuck <+> pretty pageOff + + flip fix (baseOff :: NWayPageOff) $ \nextEntry -> \case + eOff | eOff >= buckEnd -> none + | otherwise -> do + let es = BS.drop (fromIntegral eOff) mmaped + let ks = BS.take keySize es + + if | ks == keyBs -> do + exit $ Just (BS.take valSize (BS.drop keySize es)) + + | ks == emptyKey -> do + exit Nothing + + | otherwise -> do + nextEntry (eOff + itemSize) + + pure Nothing + +data NWayHashAlloc = + NWayHashAlloc + { nwayAllocRatio :: Fixed E3 + , nwayAllocKeySize :: Int + , nwayAllocKeyPartSize :: Int + , nwayAllocValueSize :: Int + , nwayAllocBucketSize :: Int + , nwayAllocMinBuckets :: Int + , nwayAllocBucketNum :: NWayHashAlloc -> Int -> Int + , nwayAllocResize :: NWayHashAlloc -> Int -> Int -> Int -> Maybe Int + } + +nwayAllocDef :: Fixed E3 -> Int -> Int -> Int -> NWayHashAlloc +nwayAllocDef r ks kps vs = + NWayHashAlloc r ks kps vs 4 512 nwayAllocPow2 nwayAllocResizeDefault + +nwayAllocPow2 :: NWayHashAlloc -> Int -> Int +nwayAllocPow2 NWayHashAlloc{..} num = fromIntegral $ + nextPowerOf2 (ceiling (nwayAllocRatio * (realToFrac num / realToFrac nwayAllocBucketSize))) + +nwayAllocResizeDefault :: NWayHashAlloc -> Int -> Int -> Int -> Maybe Int +nwayAllocResizeDefault NWayHashAlloc{..} i c num = Nothing + +nwayWriteBatch :: MonadUnliftIO m + => NWayHashAlloc + -> FilePath -- ^ dir + -> FilePath -- ^ template + -> [(ByteString, ByteString)] + -> m FilePath + +nwayWriteBatch nwa@NWayHashAlloc{..} path tpl items = do + + let ks = nwayAllocKeySize + + let vs = nwayAllocValueSize + let kpiece = nwayAllocKeyPartSize + + let itemsInBuck = nwayAllocBucketSize + let itemSize = fromIntegral $ ks + vs + let buckSize = fromIntegral $ itemSize * itemsInBuck + + let kparts = ks `div` fromIntegral kpiece + + fn0 <- liftIO (emptyTempFile path tpl) + fn <- liftIO (emptyTempFile path (takeBaseName fn0 <>".part")) + + h0 <- openFile fn ReadWriteMode + fd <- liftIO $ handleToFd h0 + h <- liftIO $ fdToHandle fd + + flip runContT pure do + + buckets <- newTQueueIO + leftovers <- newTQueueIO + + void $ ContT $ bracket none $ const do + hClose h + + wq <- newTQueueIO + + writer <- ContT $ withAsync do + fix \next -> do + ops <- atomically do + void (peekTQueue wq) + STM.flushTQueue wq + + for_ ops $ \case + Just (_,op) -> op + Nothing -> none + + unless (any isNothing ops) next + + flip fix (Nothing,0,0,items) \nextPage (numBuck,pageOff,i,es) -> do + + let buckNum = case numBuck of + Just x -> x + Nothing -> max nwayAllocMinBuckets (nwayAllocBucketNum nwa (List.length es)) + + atomically $ writeTQueue buckets buckNum + + tvx <- replicateM (fromIntegral buckNum) ( newTVarIO 0 ) + let alloc = V.fromList tvx + + let pageSize = buckNum * buckSize + + liftIO do + fileAllocate fd pageOff (fromIntegral pageSize) + + for_ es $ \(k,v) -> do + let ki = BS.take kpiece (BS.drop (i*kpiece) k ) & N.word64 + let bn = ki `mod` fromIntegral buckNum + let buckOff = fromIntegral pageOff + bn * fromIntegral buckSize + + eIdx <- atomically do + e <- readTVar (alloc ! fromIntegral bn) + if e >= itemsInBuck then do + writeTQueue leftovers (k,v) + pure Nothing + else do + writeTVar (alloc ! fromIntegral bn) (e+1) + pure $ Just e + + for_ eIdx \e -> liftIO do + let woff = fromIntegral buckOff + fromIntegral (e * itemSize) + let op = liftIO do + hSeek h AbsoluteSeek woff + BS.hPut h (k <> BS.take (fromIntegral vs) v) + + atomically (writeTQueue wq (Just (woff, op))) + + lo <- atomically $ STM.flushTQueue leftovers + + if | List.null lo -> none + + | i + 1 < fromIntegral kparts -> do + let resize = nwayAllocResize nwa i buckNum (List.length lo) + nextPage (resize, pageOff + fromIntegral pageSize, succ i, lo) + + | otherwise -> do + -- TODO: check-how-it-works + liftIO (setFileSize fn pageOff) + nextPage (Just (buckNum*2), pageOff, i, lo) + + atomically $ writeTQueue wq Nothing + wait writer + + -- finalize write + bucklist <- atomically $ STM.flushTQueue buckets + + let meta = [ mkForm @C "keysize" [mkInt ks] + , mkForm "keypartsize" [mkInt kpiece] + , mkForm "valuesize" [mkInt vs] + , mkForm "bucksize" [mkInt itemsInBuck] + , mkForm "buckets" (fmap mkInt bucklist) + , mkForm "cqfile" [mkInt 1] + ] + + let metabs = BS8.pack $ show $ vsep (fmap pretty meta) + let metaSize = fromIntegral $ BS.length metabs + + liftIO do + hSeek h SeekFromEnd 0 + BS.hPut h metabs + BS.hPut h (N.bytestring32 metaSize) + mv fn fn0 + + pure fn0 + +nwayHashScanAll :: MonadIO m + => NWayHash + -> ByteString + -> ( NWayPageOff -> ByteString -> ByteString -> m () ) + -> m () + +nwayHashScanAll n@NWayHash{..} mmaped action = do + let itemSize = fromIntegral $ nwayItemSize n + flip fix (0,mmaped) $ \next (o,bs) -> do + if BS.null bs then + none + else do + let ks = BS.take nwayKeySize bs + let vs = BS.take nwayValueSize (BS.drop 32 bs) + action o ks vs + next (o+itemSize, BS.drop (fromIntegral itemSize) bs) + diff --git a/hbs2-log-structured/lib/HBS2/Data/Log/Structured/SD.hs b/hbs2-log-structured/lib/HBS2/Data/Log/Structured/SD.hs new file mode 100644 index 00000000..c0f52409 --- /dev/null +++ b/hbs2-log-structured/lib/HBS2/Data/Log/Structured/SD.hs @@ -0,0 +1,253 @@ +module HBS2.Data.Log.Structured.SD where + +import HBS2.Prelude.Plated +import HBS2.OrDie + +import Network.ByteOrder qualified as N +import Data.ByteString.Builder qualified as B +import Data.ByteString.Lazy (ByteString) +import Data.ByteString.Lazy qualified as LBS +import Data.ByteString qualified as BS +import Data.Maybe +import Network.ByteOrder hiding (ByteString) +import Control.Monad.State + +import Codec.Compression.Zstd.Streaming (Result(..)) + +import Control.Exception +import Lens.Micro.Platform + +-- import UnliftIO + +class ReadLogOpts a where + +data ReadLogError = SomeReadLogError + deriving stock (Typeable, Show) + +instance Exception ReadLogError + +instance ReadLogOpts () + +type NumBytes = Int + +class Monad m => BytesReader m where + noBytesLeft :: m Bool + readBytes :: NumBytes -> m ByteString + + readBytesMaybe :: NumBytes -> m (Maybe ByteString) + readBytesMaybe n = do + bs <- readBytes n + if LBS.length bs == fromIntegral n then pure (Just bs) else pure Nothing + +newtype ConsumeLBS m a = ConsumeLBS { fromConsumeLBS :: StateT ByteString m a } + deriving newtype ( Applicative + , Functor + , Monad + , MonadState ByteString + , MonadIO + , MonadTrans + ) + +readChunkThrow :: MonadIO m => Int -> ConsumeLBS m ByteString +readChunkThrow n = do + lbs <- get + let (this, that) = LBS.splitAt (fromIntegral n) lbs + if LBS.length this /= fromIntegral n then + liftIO $ throwIO SomeReadLogError + else do + put $! that + pure this + +readChunkSimple :: Monad m => Int -> ConsumeLBS m ByteString +readChunkSimple n = do + lbs <- get + let (this, that) = LBS.splitAt (fromIntegral n) lbs + put $! that + pure this + +reminds :: Monad m => ConsumeLBS m Int +reminds = gets (fromIntegral . LBS.length) + +consumed :: Monad m => ConsumeLBS m Bool +consumed = gets LBS.null + +runConsumeLBS :: Monad m => ByteString -> ConsumeLBS m a -> m a +runConsumeLBS s m = evalStateT (fromConsumeLBS m) s + +newtype ConsumeBS m a = ConsumeBS { fromConsumeBS :: StateT BS.ByteString m a } + deriving newtype ( Applicative + , Functor + , Monad + , MonadState BS.ByteString + , MonadIO + , MonadTrans + ) + + +instance Monad m => BytesReader (ConsumeLBS m) where + readBytes = readChunkSimple + noBytesLeft = consumed + +instance Monad m => BytesReader (ConsumeBS m) where + noBytesLeft = gets BS.null + readBytes n = do + s <- get + let (a,b) = BS.splitAt n s + put $! b + pure (LBS.fromStrict a) + +{- HLINT ignore "Eta reduce"-} +toSectionList :: BS.ByteString -> [BS.ByteString] +toSectionList source = go source + where + go bs | BS.length bs < 4 = [] + | otherwise = go1 (BS.splitAt 4 bs & over _1 (fromIntegral . N.word32)) + + go1 (len,rest) | BS.length rest < len = [] + + go1 (len,rest) = do + let (sect, rest1) = BS.splitAt len rest + sect : go rest1 + +validateSorted :: BS.ByteString -> Bool +validateSorted bs = do + let sections = toSectionList bs + let r = flip fix (Nothing, sections, 0) $ \next -> \case + (_, [], e) -> e + (Nothing, x:xs, e) -> next (Just x, xs, e) + (Just v, x:_, e) | v > x -> (e+1) + (Just _, x:xs, e) -> next (Just x, xs, e) + r == 0 + + +scanBS :: Monad m => BS.ByteString -> ( BS.ByteString -> m () ) -> m () +scanBS bs action = do + let hsz = 4 + flip fix bs $ \next bss -> do + if BS.length bss < hsz then pure () + else do + let (ssize, rest) = BS.splitAt hsz bss + let size = N.word32 ssize & fromIntegral + let (sdata, rest2) = BS.splitAt size rest + if BS.length sdata < size then + pure () + else do + action sdata + next rest2 + +runConsumeBS :: Monad m => BS.ByteString -> ConsumeBS m a -> m a +runConsumeBS s m = evalStateT (fromConsumeBS m) s + + +readSections :: forall m . (MonadIO m, BytesReader m) + => ( Int -> ByteString -> m () ) + -> m () + +readSections action = fix \next -> do + done <- noBytesLeft + if done then + pure () + else do + ssize <- readBytesMaybe 4 + >>= orThrow SomeReadLogError + <&> fromIntegral . N.word32 . LBS.toStrict + + sdata <- readBytesMaybe ssize + >>= orThrow SomeReadLogError + + action ssize sdata + next + +writeSection :: forall m . Monad m + => ByteString + -> ( ByteString -> m () ) + -> m () + +writeSection bs output = do + let bssize = bytestring32 (fromIntegral $ LBS.length bs) + let section = B.byteString bssize <> B.lazyByteString bs + output (B.toLazyByteString section) + + +writeSections :: forall m . Monad m + => m (Maybe ByteString) + -> ( ByteString -> m () ) + -> m () + +writeSections source sink = fix \next -> do + source >>= maybe none (\bs -> writeSection bs sink >> next) + + +data CompressedStreamError = + CompressedStreamWriteError + deriving stock (Typeable,Show) + +instance Exception CompressedStreamError + +writeCompressedChunkZstd :: forall m . MonadIO m + => ( ByteString -> m () ) + -> Result + -> Maybe ByteString + -> m Result + +writeCompressedChunkZstd sink stream mlbs = do + flip fix ( LBS.toChunks lbs, stream) $ \next -> \case + + ([], r@(Done s)) -> sink (LBS.fromStrict s) >> pure r + + (_, Done{}) -> liftIO (throwIO CompressedStreamWriteError) + + (_, Error{})-> liftIO (throwIO CompressedStreamWriteError) + + (w, Produce s continue) -> do + sink (LBS.fromStrict s) + c <- liftIO continue + next (w, c) + + (_, Consume consume) | isNothing mlbs -> do + r <- liftIO (consume mempty) + next ([], r) + + ([], r@(Consume{})) -> pure r + + (x:xs, r@(Consume consume)) -> do + what <- liftIO (consume x) + next (xs, what) + + where + lbs = fromMaybe mempty mlbs + + +writeCompressedStreamZstd :: forall m . MonadIO m + => Result + -> m (Maybe ByteString) + -> ( ByteString -> m () ) + -> m () +writeCompressedStreamZstd stream source sink = do + flip fix stream $ \next sn -> do + source >>= \case + Nothing -> writeCompressedChunkZstd sink sn Nothing >> none + Just lbs -> writeCompressedChunkZstd sink sn (Just lbs) >>= next + + +binarySearchBS :: Monad m + => Int -- ^ record size + -> ( BS.ByteString -> BS.ByteString ) -- ^ key extractor + -> BS.ByteString -- ^ key + -> BS.ByteString -- ^ source + -> m (Maybe Int) + +binarySearchBS rs getKey s source = do + let maxn = BS.length source `div` rs + loop 0 maxn + where + loop l u | u <= l = pure Nothing + | otherwise = do + let e = getKey (BS.drop ( k * rs ) source) + case compare e s of + EQ -> pure $ Just (k * rs) + LT -> loop (k+1) u + GT -> loop l k + + where k = (l + u) `div` 2 + diff --git a/hbs2-peer/app/BlockDownloadNew.hs b/hbs2-peer/app/BlockDownloadNew.hs index 2fcda313..849d479c 100644 --- a/hbs2-peer/app/BlockDownloadNew.hs +++ b/hbs2-peer/app/BlockDownloadNew.hs @@ -495,6 +495,7 @@ downloadFromPeer bu cache env h peer = liftIO $ withPeerM env do let w0 = 2.0 :: Timeout 'MilliSeconds + -- FIXME: possible-busyloop let watchdog = flip fix 0 \next x -> do r <- race (pause @'MilliSeconds wx) do atomically do diff --git a/hbs2-storage-ncq/app/Main.hs b/hbs2-storage-ncq/app/Main.hs new file mode 100644 index 00000000..0edd9754 --- /dev/null +++ b/hbs2-storage-ncq/app/Main.hs @@ -0,0 +1,48 @@ +module Main where + +import HBS2.Prelude.Plated +import HBS2.Storage.NCQ + +import Data.Config.Suckless.Script + +import System.Environment +import UnliftIO + + +runTop :: forall c m . ( IsContext c + , NCQPerks m + , MonadUnliftIO m + , Exception (BadFormException c) + ) => [Syntax c] -> m () +runTop forms = do + + + let dict = makeDict @c do + + internalEntries + + entry $ bindMatch "--help" $ nil_ \case + HelpEntryBound what -> helpEntry what + [StringLike s] -> helpList False (Just s) + _ -> helpList False Nothing + + entry $ bindMatch "ncq:init" $ nil_ $ \case + [ StringLike path ] -> do + ncqStorageInit path + + e -> throwIO $ BadFormException @c (mkList e) + + tvd <- newTVarIO dict + runEval tvd forms >>= eatNil display + + +main :: IO () +main = do + argz <- getArgs + + forms <- parseTop (unlines $ unwords <$> splitForms argz) + & either (error.show) pure + + runTop forms + + diff --git a/hbs2-storage-ncq/hbs2-storage-ncq.cabal b/hbs2-storage-ncq/hbs2-storage-ncq.cabal new file mode 100644 index 00000000..ed6f6ce9 --- /dev/null +++ b/hbs2-storage-ncq/hbs2-storage-ncq.cabal @@ -0,0 +1,154 @@ +cabal-version: 3.0 +name: hbs2-storage-ncq +version: 0.25.0.1 +-- synopsis: +-- description: +license: BSD-3-Clause +license-file: LICENSE +-- author: +-- maintainer: +-- copyright: +category: Database +build-type: Simple +-- extra-doc-files: CHANGELOG.md +-- extra-source-files: + +common shared-properties + ghc-options: + -Wall + -- -fno-warn-unused-matches + -- -fno-warn-unused-do-bind + -- -Werror=missing-methods + -- -Werror=incomplete-patterns + -- -fno-warn-unused-binds + -- -threaded + -- -rtsopts + -- "-with-rtsopts=-N4 -A64m -AL256m -I0" + + default-language: Haskell2010 + + default-extensions: + ApplicativeDo + , BangPatterns + , BlockArguments + , ConstraintKinds + , DataKinds + , DeriveDataTypeable + , DeriveGeneric + , DerivingStrategies + , DerivingVia + , ExtendedDefaultRules + , FlexibleContexts + , FlexibleInstances + , GADTs + , GeneralizedNewtypeDeriving + , ImportQualifiedPost + , LambdaCase + , MultiParamTypeClasses + , OverloadedStrings + , QuasiQuotes + , ScopedTypeVariables + , StandaloneDeriving + , TupleSections + , TypeApplications + , TypeOperators + , TypeFamilies + + +library + import: shared-properties + exposed-modules: HBS2.Storage.NCQ + -- other-modules: + -- other-extensions: + build-depends: base, hbs2-core, hbs2-log-structured, suckless-conf + , async + , binary + , bytestring + , bytestring-mmap + , containers + , directory + , filepath + , filepattern + , memory + , microlens-platform + , mmap + , mtl + , network-byte-order + , prettyprinter + , psqueues + , random + , safe + , serialise + , stm + , stm-chans + , streaming + , temporary + , time + , text + , transformers + , uniplate + , unix + , unliftio + , unordered-containers + , vector + + + hs-source-dirs: lib + default-language: Haskell2010 + + +executable hbs2-ncq + import: shared-properties + + ghc-options: + -Wall + -- -fno-warn-unused-matches + -- -fno-warn-unused-do-bind + -- -Werror=missing-methods + -- -Werror=incomplete-patterns + -- -fno-warn-unused-binds + -threaded + -rtsopts + "-with-rtsopts=-N8 -A64m -AL256m -I0" + + main-is: Main.hs + -- other-modules: + -- other-extensions: + build-depends: base, hbs2-core, hbs2-storage-ncq, suckless-conf + , aeson + , async + , base58-bytestring + , binary + , bytestring + , cborg + , clock + , containers + , directory + , filepath + , hashable + , memory + , microlens-platform + , mtl + , optparse-applicative + , prettyprinter + , random + , safe + , serialise + , streaming + , split + , text + , temporary + , transformers + , uniplate + , timeit + , stm + , unliftio + , network-byte-order + , unordered-containers + + hs-source-dirs: app + default-language: Haskell2010 + + + + diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs new file mode 100644 index 00000000..029d3403 --- /dev/null +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs @@ -0,0 +1,757 @@ +{-# Language RecordWildCards #-} +module HBS2.Storage.NCQ where + +import HBS2.Prelude.Plated +import HBS2.Hash +import HBS2.Data.Types.Refs +import HBS2.Base58 +import HBS2.Storage +import HBS2.Misc.PrettyStuff +import HBS2.System.Logger.Simple.ANSI + +import HBS2.Data.Log.Structured.NCQ +import HBS2.Data.Log.Structured.SD + +import Data.Config.Suckless.System +import Data.Config.Suckless.Script hiding (void) + +import Control.Applicative +import Data.ByteString.Builder +import Network.ByteOrder qualified as N +import Data.HashMap.Strict (HashMap) +import Control.Monad.Trans.Cont +import Control.Monad.Trans.Maybe +import Control.Concurrent.STM qualified as STM +import Control.Concurrent.STM.TSem +import Data.HashPSQ qualified as HPSQ +import Data.HashPSQ (HashPSQ) +import Data.IntMap qualified as IntMap +import Data.IntMap (IntMap) +import Data.Sequence as Seq +import Data.List qualified as List +import Data.ByteString.Lazy qualified as LBS +import Data.ByteString (ByteString) +import Data.ByteString qualified as BS +import Data.ByteString.Char8 qualified as BS8 +import Data.Char (isDigit) +import Data.Coerce +import Data.Word +import Data.Either +import Data.Maybe +import Data.Text qualified as Text +import Data.Text.IO qualified as Text +import Lens.Micro.Platform +import Data.HashSet (HashSet) +import Data.HashSet qualified as HS +import Data.HashMap.Strict qualified as HM +import System.FilePath.Posix +import System.Posix.Fcntl +import System.Posix.Files qualified as Posix +import System.Posix.IO as PosixBase +import System.Posix.Types as Posix +import System.Posix.IO.ByteString as Posix +import System.Posix.Unistd +import System.IO.MMap as MMap +import System.IO.Temp (emptyTempFile) +-- import Foreign.Ptr +-- import Foreign di +import qualified Data.ByteString.Internal as BSI +import Streaming.Prelude qualified as S + +import UnliftIO +import UnliftIO.Concurrent(getNumCapabilities) +import UnliftIO.IO.File + +{- HLINT ignore "Functor law" -} + +type NCQPerks m = MonadIO m + +data NCQStorageException = + NCQStorageAlreadyExist String + deriving stock (Show,Typeable) + +instance Exception NCQStorageException + + +newtype FileKey = FileKey ByteString + deriving newtype (Eq,Ord,Hashable,Show) + +instance IsString FileKey where + fromString = FileKey . BS8.pack . dropExtension . takeFileName + +instance Pretty FileKey where + pretty (FileKey s) = parens ("file-key" <+> pretty (BS8.unpack s)) + +data NCQStorage = + NCQStorage + { ncqRoot :: FilePath + , ncqGen :: Int + , ncqSyncSize :: Int + , ncqMinLog :: Int + , ncqMaxLog :: Int + , ncqMaxCachedIdx :: Int + , ncqMaxCachedData :: Int + , ncqRefsMem :: TVar (HashMap HashRef HashRef) + , ncqRefsDirty :: TVar Bool + , ncqWriteQueue :: TVar (HashPSQ HashRef TimeSpec LBS.ByteString) + , ncqWaitIndex :: TVar (HashPSQ HashRef TimeSpec (Word64,Word64)) + , ncqTrackedFiles :: TVar (HashSet FileKey) + , ncqCachedIndexes :: TVar (HashPSQ FileKey TimeSpec (ByteString,NWayHash)) + , ncqCachedData :: TVar (HashPSQ FileKey TimeSpec ByteString) + , ncqNotWritten :: TVar Word64 + , ncqLastWritten :: TVar TimeSpec + , ncqCurrentHandleW :: TVar Fd + , ncqCurrentHandleR :: TVar Fd + , ncqCurrentUsage :: TVar (IntMap Int) + , ncqCurrentReadReq :: TVar (Seq (Fd, Word64, Word64, TMVar ByteString)) + , ncqFlushNow :: TQueue () + , ncqOpenDone :: TMVar Bool + , ncqStopped :: TVar Bool + } + + +data Location = + InWriteQueue LBS.ByteString + | InCurrent (Word64, Word64) + | InFossil FileKey (Word64, Word64) + deriving stock (Eq,Show) + +instance Pretty Location where + pretty = \case + InWriteQueue{} -> "write-queue" + InCurrent (o,l) -> pretty $ mkForm @C "current" [mkInt o, mkInt l] + InFossil f (o,l) -> pretty $ mkForm @C "fossil " [mkSym (show (pretty f)), mkList [mkInt o, mkInt l]] + +type IsHCQKey h = ( Eq (Key h) + , Hashable (Key h) + , IsKey h + , Key h ~ Hash h + , ToByteString (AsBase58 (Hash h)) + , FromByteString (AsBase58 (Hash h)) + ) + +ncqGetCurrentName_ :: FilePath -> Int -> FilePath +ncqGetCurrentName_ root gen = root show (pretty gen) "current.data" + +ncqGetFileName :: NCQStorage -> FilePath -> FilePath +ncqGetFileName NCQStorage{..} f = ncqRoot show (pretty ncqGen) takeFileName f + +ncqGetCurrentName :: NCQStorage -> FilePath +ncqGetCurrentName NCQStorage{..} = ncqGetCurrentName_ ncqRoot ncqGen + +ncqGetCurrentDir :: NCQStorage -> FilePath +ncqGetCurrentDir ncq = takeDirectory (ncqGetCurrentName ncq) + +ncqGetCurrentSizeName_ :: FilePath -> Int -> FilePath +ncqGetCurrentSizeName_ root gen = dropExtension (ncqGetCurrentName_ root gen) <> ".size" + +ncqGetCurrentSizeName :: NCQStorage -> FilePath +ncqGetCurrentSizeName NCQStorage{..} = dropExtension (ncqGetCurrentName_ ncqRoot ncqGen) <> ".size" + +ncqGetNewFossilName :: MonadIO m => NCQStorage -> m FilePath +ncqGetNewFossilName n@NCQStorage{} = do + let fn = ncqGetFileName n "fossil-.data" + let (p,tpl) = splitFileName fn + liftIO $ emptyTempFile p tpl + +ncqGetIndexFileName :: NCQStorage -> FileKey -> FilePath +ncqGetIndexFileName ncq fk = do + ncqGetFileName ncq (addExtension (dropExtension (BS8.unpack (coerce fk))) ".cq") + +ncqGetDataFileName :: NCQStorage -> FileKey -> FilePath +ncqGetDataFileName ncq fk = do + ncqGetFileName ncq (addExtension (dropExtension (BS8.unpack (coerce fk))) ".data") + +ncqGetErrorLogName :: NCQStorage -> FilePath +ncqGetErrorLogName ncq = do + ncqGetFileName ncq "errors.log" + +-- ncqCheckCurrentSize :: MonadIO m => NCQStorage -> m (Either Integer Integer) +-- ncqCheckCurrentSize ncq = liftIO $ readCurrent `catch` (\(_ :: IOError) -> pure $ Left 0) +-- where +-- readCurrent = do +-- let name = ncqGetCurrentName ncq +-- a <- liftIO (BS.readFile (ncqGetCurrentSizeName ncq)) <&> N.word64 +-- b <- fileSize name +-- pure $ if a == fromIntegral b then Right (fromIntegral a) else Left (fromIntegral a) + + +ncqAddCachedSTM :: TimeSpec -- ^ now + -> Int -- ^ limit + -> TVar (HashPSQ FileKey TimeSpec a) -- ^ entry + -> FileKey -- ^ key + -> a -- ^ value + -> STM () +ncqAddCachedSTM now limit tv k v = do + + cache <- readTVar tv + + unless (HPSQ.member k cache) do + + let dst = if HPSQ.size cache + 1 > limit then + maybe cache (view _4) (HPSQ.minView cache) + else + cache + + writeTVar tv (HPSQ.insert k now v dst) + + +ncqAddTrackedFilesSTM :: NCQStorage -> [FileKey] -> STM () +ncqAddTrackedFilesSTM NCQStorage{..} keys = do + modifyTVar ncqTrackedFiles (HS.union (HS.fromList keys)) + +ncqReadTrackedFiles :: MonadIO m => NCQStorage -> m () +ncqReadTrackedFiles ncq@NCQStorage{} = do + + files <- dirFiles (ncqGetCurrentDir ncq) + >>= mapM (pure . takeBaseName) + <&> List.filter (List.isPrefixOf "fossil-") + <&> fmap fromString + + atomically $ ncqAddTrackedFilesSTM ncq files + +ncqWriteError :: MonadIO m => NCQStorage -> Text -> m () +ncqWriteError ncq txt = liftIO do + p <- getPOSIXTime <&> round @_ @Integer + let msg = Text.pack $ show $ "error" <+> fill 12 (pretty p) <+> pretty txt <> line + Text.appendFile (ncqGetErrorLogName ncq) msg + +ncqIndexFile :: MonadUnliftIO m => NCQStorage -> FilePath -> m FilePath +ncqIndexFile n@NCQStorage{} fp' = do + + let fp = ncqGetFileName n fp' + & takeBaseName + & (`addExtension` ".cq") + & ncqGetFileName n + + items <- S.toList_ do + ncqStorageScanDataFile n fp' $ \o w k v -> do + let rs = w - 32 & fromIntegral @_ @Word32 & N.bytestring32 + let os = fromIntegral @_ @Word64 o & N.bytestring64 + let record = os <> rs + -- debug $ "write record" <+> pretty (BS.length record) + S.yield (coerce k, record) + + let (dir,name) = splitFileName fp + + result <- nwayWriteBatch (nwayAllocDef 1.10 32 8 12) dir name items + + mv result fp + + pure fp + +ncqStorageStop :: MonadUnliftIO m => NCQStorage -> m () +ncqStorageStop ncq@NCQStorage{..} = do + atomically $ writeTVar ncqStopped True + ncqStorageSync ncq + atomically $ fix \next -> do + done <- readTVar ncqWriteQueue <&> HPSQ.null + unless done STM.retry + +ncqStorageRun :: MonadUnliftIO m => NCQStorage -> m () +ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do + + let dumpTimeout = round 10e6 + let dumpData = 1024 ^ 2 + let syncData = fromIntegral ncqSyncSize + + ContT $ bracket none $ const $ liftIO do + -- writeJournal syncData + readTVarIO ncqCurrentHandleW >>= closeFd + + debug "RUNNING STORAGE!" + + -- cap <- (10*) <$> getNumCapabilities + cap <- getNumCapabilities + + reader <- ContT $ withAsync $ forever do + + reqs <- atomically do + xs <- stateTVar ncqCurrentReadReq (Seq.splitAt cap) + when (List.null xs) STM.retry + pure xs + + for_ reqs $ \(fd,off,l,answ) -> liftIO do + atomically $ modifyTVar ncqCurrentUsage (IntMap.adjust pred (fromIntegral fd)) + fdSeek fd AbsoluteSeek (fromIntegral $ 4 + 32 + off) + bs <- Posix.fdRead fd (fromIntegral l) + atomically $ putTMVar answ bs + + link reader + + indexQ <- newTQueueIO + + indexer <- ContT $ withAsync $ forever do + (fd, fn) <- atomically (readTQueue indexQ) + key <- ncqIndexFile ncq fn <&> fromString @FileKey + + atomically do + ncqAddTrackedFilesSTM ncq [key] + modifyTVar ncqCurrentUsage (IntMap.adjust pred (fromIntegral fd)) + + ncqLoadSomeIndexes ncq [key] + + link indexer + + fix \loop -> do + + flush <- liftIO $ race (pause @'Seconds ( realToFrac dumpTimeout / 4e6 )) $ atomically do + peekTQueue ncqFlushNow >> STM.flushTQueue ncqFlushNow + pure True + + let flushNow = fromRight False flush + + now <- getTimeCoarse + lastW <- readTVarIO ncqLastWritten + bytes <- readTVarIO ncqNotWritten + + let dumpByTime = toMicroSeconds (TimeoutTS (now - lastW)) > dumpTimeout && bytes > 0 + + when (dumpByTime || bytes >= dumpData || flushNow) do + -- debug "NCQStorage: dump data!" + liftIO $ writeJournal indexQ syncData + + done <- atomically do + mt <- readTVar ncqWriteQueue <&> HPSQ.null + stop <- readTVar ncqStopped + pure (mt && stop) + + unless done loop + + where + + writeJournal indexQ syncData = liftIO do + + trace $ "writeJournal" <+> pretty syncData + + fh <- readTVarIO ncqCurrentHandleW + + fdSeek fh SeekFromEnd 0 + + init <- readTVarIO ncqWriteQueue + + wResult <- flip fix (0,init) \next (written,q) -> case HPSQ.minView q of + Nothing -> pure mempty + Just (h,_,bs,rest) -> do + + off <- fdSeek fh SeekFromEnd 0 + let b = byteString (coerce @_ @ByteString h) <> lazyByteString bs + let wbs = toLazyByteString b + let len = LBS.length wbs + let ws = N.bytestring32 (fromIntegral len) + let w = 4 + len + + liftIO (Posix.fdWrite fh (ws <> LBS.toStrict wbs)) + + let kks = LBS.take 32 (toLazyByteString b) & coerce @_ @HashRef . LBS.toStrict + -- debug $ "WRITE SHIT!" <+> pretty len <+> pretty kks <+> pretty (LBS.length bs) + + written' <- if written < syncData then do + pure (written + w) + else do + fileSynchronise fh + pure 0 + + ((h, (fromIntegral off, fromIntegral len)) : ) <$> next (written', rest) + + fileSynchronise fh + size <- fdSeek fh SeekFromEnd 0 + + now1 <- getTimeCoarse + atomically do + q0 <- readTVar ncqWriteQueue + w0 <- readTVar ncqWaitIndex + b0 <- readTVar ncqNotWritten + + wbytes <- newTVar 0 + + (rq,rw) <- flip fix (q0,w0,wResult) \next (q,w,r) -> do + case r of + [] -> pure (q,w) + ((h,(o,l)):xs) -> do + modifyTVar wbytes (+l) + next (HPSQ.delete h q, HPSQ.insert h now1 (o,l) w,xs) + + writeTVar ncqWriteQueue rq + writeTVar ncqWaitIndex rw + bw <- readTVar wbytes + writeTVar ncqNotWritten (max 0 (b0 - bw)) + + writeBinaryFileDurable (ncqGetCurrentSizeName ncq) (N.bytestring64 (fromIntegral size)) + + when (fromIntegral size >= ncqMinLog) do + + (n,u) <- atomically do + r <- readTVar ncqCurrentHandleR <&> fromIntegral + u <- readTVar ncqCurrentUsage <&> fromMaybe 0 . IntMap.lookup r + pure (fromIntegral @_ @Word32 r, u) + + let current = ncqGetCurrentName ncq + + fossilized <- ncqGetNewFossilName ncq + + warn $ "NEED TRUNCATE" <+> pretty current <+> viaShow size <+> pretty n <+> pretty u + + mv current fossilized + + atomically do + r <- readTVar ncqCurrentHandleR + -- NOTE: extra-use + -- добавляем лишний 1 для индексации. + -- исходный файл закрываем, только когда проиндексировано. + -- то есть должны отнять 1 после индексации. + modifyTVar ncqCurrentUsage (IntMap.insertWith (+) (fromIntegral r) 1) + writeTQueue indexQ (r, fossilized) + + let flags = defaultFileFlags { exclusive = True } + + touch current + writeBinaryFileDurable (ncqGetCurrentSizeName ncq) (N.bytestring64 0) + + liftIO (PosixBase.openFd current Posix.ReadWrite flags) + >>= atomically . writeTVar ncqCurrentHandleW + + liftIO (PosixBase.openFd current Posix.ReadWrite flags) + >>= atomically . writeTVar ncqCurrentHandleR + + debug $ "TRUNCATED, moved to" <+> pretty fossilized + + toClose <- atomically do + w <- readTVar ncqCurrentUsage <&> IntMap.toList + let (alive,dead) = List.partition( (>0) . snd) w + writeTVar ncqCurrentUsage (IntMap.fromList alive) + pure dead + + for_ toClose $ \(f,_) -> do + when (f > 0) do + debug $ "CLOSE FD" <+> pretty f + Posix.closeFd (fromIntegral f) + +ncqStoragePut :: MonadUnliftIO m => NCQStorage -> LBS.ByteString -> m (Maybe HashRef) +ncqStoragePut ncq@NCQStorage{..} lbs = flip runContT pure $ callCC \exit -> do + + stoped <- readTVarIO ncqStopped + + when stoped $ exit Nothing + + let h = hashObject @HbSync lbs & coerce + + ncqLocate ncq h >>= \case + Just{} -> exit (Just h) + _ -> none + + now <- getTimeCoarse + atomically do + ql <- readTVar ncqWriteQueue <&> HPSQ.size + -- FIXME: hardcode + when (ql > 8192) STM.retry + modifyTVar ncqWriteQueue (HPSQ.insert h now lbs) + modifyTVar ncqNotWritten (+ (fromIntegral $ 36 + LBS.length lbs)) + pure (Just h) + +ncqLocatedSize :: Location -> Integer +ncqLocatedSize = \case + InWriteQueue lbs -> fromIntegral $ LBS.length lbs + InCurrent (_,s) -> fromIntegral s + InFossil _ (_,s) -> fromIntegral s + +ncqLocate :: MonadIO m => NCQStorage -> HashRef -> m (Maybe Location) +ncqLocate ncq@NCQStorage{..} h = flip runContT pure $ callCC \exit -> do + + l1 <- atomically do + inQ <- readTVar ncqWriteQueue <&> (fmap snd . HPSQ.lookup h) <&> fmap InWriteQueue + inC <- readTVar ncqWaitIndex <&> (fmap snd . HPSQ.lookup h) <&> fmap InCurrent + pure (inQ <|> inC) + + for_ l1 $ exit . Just + + now <- getTimeCoarse + + (cachedIdx, rest) <- atomically do + cached <- readTVar ncqCachedIndexes + other' <- readTVar ncqTrackedFiles <&> HS.toList + let other = [ x | x <- other', not (HPSQ.member x cached) ] + pure (cached, other) + + + for_ (HPSQ.toList cachedIdx) $ \(fk,_,nway) -> do + lookupEntry h nway <&> fmap (InFossil fk) >>= \case + Nothing -> pure Nothing -- none + other -> do + atomically $ modifyTVar ncqCachedIndexes (HPSQ.insert fk now nway) + exit other + + -- TODO: use-filter-for-faster-scan + -- 1. Какой фильтр? + -- 2. Как и когда его перестраивать? + -- 2.1 На открытии? Будет расти время открытия (но можно параллельно) + -- + + for_ rest $ \r -> runMaybeT do + let fn = ncqGetIndexFileName ncq r + + nway' <- liftIO (nwayHashMMapReadOnly fn) + + when (isNothing nway') do + err ("NCQStorage: can't mmap file" <+> pretty fn) + + nway <- toMPlus nway' + + e <- lookupEntry h nway <&> fmap (InFossil r) >>= toMPlus + + liftIO (mmapFileByteString (ncqGetDataFileName ncq r) Nothing) >>= \mmaped -> + atomically do + ncqAddCachedSTM now ncqMaxCachedIdx ncqCachedIndexes r nway + ncqAddCachedSTM now ncqMaxCachedData ncqCachedData r mmaped + + lift (exit (Just e)) + + pure Nothing + + where + + lookupEntry (hx :: HashRef) (mmaped, nway) = runMaybeT do + + entryBs <- liftIO (nwayHashLookup nway mmaped (coerce hx)) + >>= toMPlus + + pure $ ( fromIntegral $ N.word64 (BS.take 8 entryBs), + fromIntegral $ N.word32 (BS.take 4 (BS.drop 8 entryBs))) + +ncqStorageHasBlock :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Maybe Integer) +ncqStorageHasBlock ncq h = ncqLocate ncq h <&> fmap ncqLocatedSize + +ncqStorageScanDataFile :: MonadIO m + => NCQStorage + -> FilePath + -> ( Integer -> Integer -> HashRef -> ByteString -> m () ) + -> m () +ncqStorageScanDataFile ncq fp' action = do + let fp = ncqGetFileName ncq fp' + mmaped <- liftIO (mmapFileByteString fp Nothing) + + flip runContT pure $ callCC \exit -> do + flip fix (0,mmaped) $ \next (o,bs) -> do + + when (BS.length bs < 4) $ exit () + + let w = BS.take 4 bs & N.word32 & fromIntegral + + when (BS.length bs < 4 + w) $ exit () + + let kv = BS.drop 4 bs + + let k = BS.take 32 kv & coerce @_ @HashRef + let v = BS.take (w-32) $ BS.drop 32 kv + + lift (action o (fromIntegral w) k v) + + next (4 + o + fromIntegral w, BS.drop (w+4) bs) + +ncqStorageGet :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Maybe LBS.ByteString) +ncqStorageGet ncq@NCQStorage{..} h = do + + ncqLocate ncq h >>= \case + Nothing -> pure Nothing + Just (InWriteQueue lbs) -> pure $ Just lbs + + Just (InCurrent (o,l)) -> do + -- FIXME: timeout! + answ <- atomically do + a <- newEmptyTMVar + fd <- readTVar ncqCurrentHandleR + modifyTVar ncqCurrentUsage (IntMap.insertWith (+) (fromIntegral fd) 1) + modifyTVar ncqCurrentReadReq ( |> (fd, o, l, a) ) + pure a + atomically $ takeTMVar answ <&> Just . LBS.fromStrict + + Just (InFossil key (o,l)) -> do + + mmaped <- readTVarIO ncqCachedData <&> HPSQ.lookup key >>= \case + Just (_,mmaped) -> do + now <- getTimeCoarse + atomically $ modifyTVar ncqCachedData (HPSQ.insert key now mmaped) + pure mmaped + + Nothing -> do + now <- getTimeCoarse + let fn = ncqGetDataFileName ncq key + -- TODO: possible-exception! + newMmaped <- liftIO (mmapFileByteString fn Nothing) + atomically $ ncqAddCachedSTM now ncqMaxCachedData ncqCachedData key newMmaped + pure newMmaped + + pure $ Just $ LBS.fromStrict $ BS.take (fromIntegral l) (BS.drop (fromIntegral o+4+32) mmaped) + +ncqStorageDel :: MonadUnliftIO m => NCQStorage -> HashRef -> m NCQStorage +ncqStorageDel sto h = do + error "not implemented yet" + +ncqStorageSync :: MonadUnliftIO m => NCQStorage -> m () +ncqStorageSync NCQStorage{..} = do + atomically $ writeTQueue ncqFlushNow () + + +ncqLoadSomeIndexes :: MonadIO m => NCQStorage -> [FileKey] -> m () +ncqLoadSomeIndexes ncq@NCQStorage{..} keys = do + now <- getTimeCoarse + for_ keys $ \key -> do + let fn = ncqGetIndexFileName ncq key + liftIO (nwayHashMMapReadOnly fn) >>= \case + Nothing -> err $ "NCQStorage: can't mmap index file" <+> pretty fn + Just nway -> atomically do + ncqAddCachedSTM now ncqMaxCachedIdx ncqCachedIndexes key nway + +ncqLoadIndexes :: MonadIO m => NCQStorage -> m () +ncqLoadIndexes ncq@NCQStorage{..} = do + debug "WIP: ncqStorageLoadIndexes" + w <- readTVarIO ncqTrackedFiles <&> List.take (ncqMaxCachedIdx `div` 2) . HS.toList + ncqLoadSomeIndexes ncq w + +ncqFixIndexes :: MonadUnliftIO m => NCQStorage -> m () +ncqFixIndexes ncq@NCQStorage{..} = do + debug "ncqFixIndexes" + + keys <- readTVarIO ncqTrackedFiles + + for_ keys $ \k -> do + let idxName = ncqGetIndexFileName ncq k + here <- doesFileExist idxName + + unless here do + warn $ "missed-index" <+> pretty k + let dataName = ncqGetDataFileName ncq k + newKey <- ncqIndexFile ncq dataName <&> fromString @FileKey + atomically $ ncqAddTrackedFilesSTM ncq [newKey] + +ncqStorageOpen :: MonadUnliftIO m => FilePath -> m NCQStorage +ncqStorageOpen fp = do + ncq@NCQStorage{..} <- ncqStorageInit_ False fp + ncqReadTrackedFiles ncq + ncqFixIndexes ncq + ncqLoadIndexes ncq + readCurrent ncq + atomically $ putTMVar ncqOpenDone True + pure ncq + + where + readCurrent ncq@NCQStorage{..} = do + let fn = ncqGetCurrentName ncq + -- liftIO $ print $ pretty "FILE" <+> pretty fn + bs0 <- liftIO $ mmapFileByteString fn Nothing + + now <- getTimeCoarse + + flip runContT pure $ callCC \exit ->do + flip fix (0,bs0) $ \next (o,bs) -> do + when (BS.length bs < 4) $ exit () + let w = BS.take 4 bs & N.word32 & fromIntegral + let p = BS.take w (BS.drop 4 bs) + + when (BS.length p < w ) do + err $ "broken file" <+> pretty fn + exit () + + let k = BS.take 32 p & coerce + let vs = w - 32 + + -- trace $ "GOT RECORD" + -- <+> pretty w + -- <+> pretty k + -- <+> pretty o + -- <+> pretty vs + + atomically $ modifyTVar ncqWaitIndex (HPSQ.insert k now (fromIntegral o, fromIntegral vs)) + + next (o+w+4, BS.drop (w+4) bs) + +ncqStorageInit :: MonadUnliftIO m => FilePath -> m NCQStorage +ncqStorageInit = ncqStorageInit_ True + + +ncqStorageInit_ :: MonadUnliftIO m => Bool -> FilePath -> m NCQStorage +ncqStorageInit_ check path = do + + let ncqGen = 0 + + here <- doesPathExist path + + when (here && check) $ throwIO (NCQStorageAlreadyExist path) + + mkdir (path show ncqGen) + + unless here do + now <- liftIO $ getPOSIXTime <&> round @_ @Int + + let meta = [ mkForm @C "created" [ mkInt now ] ] + let metas = show $ vsep (fmap pretty meta) + + liftIO $ appendFile (path "metadata") metas + + let ncqRoot = path + + ncqRefsMem <- newTVarIO mempty + ncqRefsDirty <- newTVarIO False + + let ncqSyncSize = 32 * (1024 ^ 2) + let ncqMinLog = 2 * (1024 ^ 3) + let ncqMaxLog = 10 * (1024 ^ 3) + + let ncqMaxCachedIdx = 64 + let ncqMaxCachedData = ncqMaxCachedIdx `div` 2 + + ncqWriteQueue <- newTVarIO HPSQ.empty + + ncqNotWritten <- newTVarIO 0 + ncqLastWritten <- getTimeCoarse >>= newTVarIO + ncqWaitIndex <- newTVarIO HPSQ.empty + + ncqFlushNow <- newTQueueIO + ncqOpenDone <- newEmptyTMVarIO + ncqCurrentReadReq <- newTVarIO mempty + ncqCurrentUsage <- newTVarIO mempty + ncqStopped <- newTVarIO False + ncqCachedIndexes <- newTVarIO HPSQ.empty + ncqCachedData <- newTVarIO HPSQ.empty + ncqTrackedFiles <- newTVarIO mempty + + let currentName = ncqGetCurrentName_ path ncqGen + + let currentSize = ncqGetCurrentSizeName_ path ncqGen + + hereCurrent <- doesPathExist currentName + + when hereCurrent $ liftIO do + let ncqCurrentHandleW = undefined + let ncqCurrentHandleR = undefined + let ncq0 = NCQStorage{..} + lastSz <- try @_ @IOException (BS.readFile currentSize) + <&> either (const 0) N.word64 + + currSz <- try @_ @IOException (fileSize currentName) + <&> fromRight 0 + <&> fromIntegral + + when (lastSz /= currSz ) do + fossilized <- ncqGetNewFossilName ncq0 + let fn = takeFileName fossilized + let msg = fromString $ show $ "wrong-size" <+> pretty lastSz <+> pretty fn + err $ pretty msg + ncqWriteError ncq0 msg + mv currentName fossilized + + touch currentName + + let flags = defaultFileFlags { exclusive = True } + + ncqCurrentHandleW <- liftIO (PosixBase.openFd currentName Posix.ReadWrite flags) + >>= newTVarIO + + ncqCurrentHandleR <- liftIO (PosixBase.openFd currentName Posix.ReadOnly flags) + >>= newTVarIO + + debug $ "currentFileName" <+> pretty (ncqGetCurrentName_ path ncqGen) + + pure $ NCQStorage{..} + + diff --git a/hbs2-tests/hbs2-tests.cabal b/hbs2-tests/hbs2-tests.cabal index a6e90f67..77ab447b 100644 --- a/hbs2-tests/hbs2-tests.cabal +++ b/hbs2-tests/hbs2-tests.cabal @@ -32,6 +32,7 @@ common common-deps , microlens-platform , mtl , mwc-random + , psqueues , prettyprinter , QuickCheck , random @@ -1177,3 +1178,25 @@ executable test-scripts , time , zstd + +executable test-cq-storage + import: shared-properties + import: common-deps + default-language: Haskell2010 + ghc-options: + hs-source-dirs: test + main-is: TestCQ.hs + build-depends: + base, hbs2-core, hbs2-log-structured, hbs2-storage-ncq + , network + , string-conversions + , db-pipe + , suckless-conf + , network-byte-order + , text + , time + , mmap + , zstd + , unix + + diff --git a/hbs2-tests/test/TestCQ.hs b/hbs2-tests/test/TestCQ.hs new file mode 100644 index 00000000..72febc2a --- /dev/null +++ b/hbs2-tests/test/TestCQ.hs @@ -0,0 +1,520 @@ +{-# Language AllowAmbiguousTypes #-} +{-# Language UndecidableInstances #-} +{-# Language MultiWayIf #-} +{-# Language RecordWildCards #-} +module Main where + +import HBS2.Prelude.Plated +import HBS2.OrDie +import HBS2.Hash +import HBS2.Data.Types.Refs +import HBS2.Clock +import HBS2.Merkle + +import HBS2.Storage +import HBS2.Storage.Simple +import HBS2.Storage.Operations.ByteString + +import HBS2.System.Logger.Simple.ANSI + +import HBS2.Storage.NCQ +import HBS2.Data.Log.Structured.NCQ + + +import Data.Config.Suckless.Syntax +import Data.Config.Suckless.Script +import Data.Config.Suckless.System + +import DBPipe.SQLite hiding (field) + +import Data.Bits +import Data.ByteString (ByteString) +import Data.ByteString qualified as BS +import Data.ByteString.Lazy qualified as LBS +import Data.ByteString.Char8 qualified as BS8 +import Data.ByteString.Builder +import Data.Maybe +import Data.Word +import Data.List qualified as List +import Data.Vector qualified as V +import Data.Vector ((!)) +import Control.Monad.Trans.Cont +import Control.Monad.Trans.Maybe +import Network.ByteOrder qualified as N +import Data.Coerce +import Data.HashPSQ qualified as HPSQ +import Data.HashSet qualified as HS +import Data.HashMap.Strict (HashMap) +import Data.HashMap.Strict qualified as HM +import Data.IntMap qualified as IntMap +import Data.IntMap (IntMap) +import Data.Fixed +import System.Environment +import System.Directory +import System.Posix.Fcntl +import System.Posix.IO +import System.IO.MMap +import System.IO qualified as IO +import System.Random +import Safe +import Lens.Micro.Platform +import Control.Concurrent.STM qualified as STM +import UnliftIO +import Text.InterpolatedString.Perl6 (qc) + +import Streaming.Prelude qualified as S +import System.TimeIt + + +setupLogger :: MonadIO m => m () +setupLogger = do + setLogging @DEBUG $ toStderr . logPrefix "[debug] " + setLogging @ERROR $ toStderr . logPrefix "[error] " + setLogging @WARN $ toStderr . logPrefix "[warn] " + setLogging @NOTICE $ toStdout . logPrefix "" + +flushLoggers :: MonadIO m => m () +flushLoggers = do + silence + +silence :: MonadIO m => m () +silence = do + setLoggingOff @DEBUG + setLoggingOff @ERROR + setLoggingOff @WARN + setLoggingOff @NOTICE + setLoggingOff @TRACE + +main :: IO () +main = do + + let dict = makeDict @C do + + entry $ bindMatch "--help" $ nil_ \case + HelpEntryBound what -> helpEntry what + [StringLike s] -> helpList False (Just s) + _ -> helpList False Nothing + + internalEntries + + entry $ bindMatch "test:sqlite" $ nil_ $ \case + [StringLike fn] -> liftIO do + hashes <- readFile fn <&> mapMaybe (fromStringMay @HashRef) . lines + + let dbname = "jopakita.db" + rm dbname + newDb <- newDBPipeEnv dbPipeOptsDef dbname + + withDB newDb do + ddl [qc|CREATE TABLE kv (k BLOB PRIMARY KEY, v int)|] + + timeItNamed "sqlite -- test insert" do + withDB newDb $ transactional do + for_ hashes $ \h -> do + let k = coerce @_ @ByteString h + insert [qc|insert into kv (k,v) values(?,?)|] (k,0) + + replicateM_ 5 do + withDB newDb do + timeItNamed "sqlite -- select test" do + -- fn <- newTVarIO 0 + -- fns <- newTVarIO 0 + q <- newTQueueIO + for_ hashes $ \h -> do + let k = coerce @_ @ByteString h + + founds <- select [qc|select k,v from kv where k = ?|] (Only k) + + for_ founds $ \(s :: ByteString,n :: Int) -> do + atomically $ writeTQueue q (s,n) + + found <- atomically (STM.flushTQueue q) <&> List.length + liftIO $ IO.hPrint stderr $ "FOUND" <+> pretty found + + e -> throwIO $ BadFormException @C (mkList e) + + entry $ bindMatch "test:hashmap" $ nil_ $ \case + [StringLike fn] -> liftIO do + hashes <- readFile fn <&> mapMaybe (fromStringMay @HashRef) . lines + let hma = HM.fromList [(h,()) | h <- hashes ] + + replicateM_ 5 do + timeItNamed (show $ "HashMap lookup test" <+> pretty (List.length hashes)) do + q <- newTQueueIO + for_ hashes $ \h -> do + when (HM.member h hma) do + atomically $ writeTQueue q h + + n <- atomically ( STM.flushTQueue q) <&> List.length + liftIO $ print $ "FOUND" <+> pretty n + + e -> throwIO $ BadFormException @C (mkList e) + + + entry $ bindMatch "test:nway:scan" $ nil_ $ \case + [ StringLike fn ]-> liftIO do + (mmaped,meta@NWayHash{..}) <- nwayHashMMapReadOnly fn >>= orThrow (NWayHashInvalidMetaData fn) + let emptyKey = BS.replicate nwayKeySize 0 + nwayHashScanAll meta mmaped $ \o k v -> do + unless (k == emptyKey) do + liftIO $ print $ "scan:found" <+> fill 44 (pretty (coerce @_ @HashRef k)) <+> pretty o + + e -> throwIO $ BadFormException @C (mkList e) + + + entry $ bindMatch "test:nway:lookup" $ nil_ $ \case + + [ StringLike fn ] -> liftIO do + + hashes <- getContents <&> mapMaybe (fromStringMay @HashRef) . lines + + (mmaped, nw) <- nwayHashMMapReadOnly fn >>= orThrow (NWayHashInvalidMetaData fn) + + replicateM_ 5 do + timeItNamed (show $ "lookup:nway" <+> pretty (List.length hashes)) do + rQ <- newTQueueIO + + for_ hashes $ \h -> do + r <- nwayHashLookup nw mmaped (coerce @_ @ByteString h) + when (isJust r) do + atomically $ writeTQueue rQ (h,r) + + found <- atomically $ STM.flushTQueue rQ + liftIO $ print $ "FOUND" <+> pretty (List.length found) + + e -> throwIO $ BadFormException @C (mkList e) + + entry $ bindMatch "test:nway:stats" $ \case + [StringLike fn] -> liftIO do + + mt_ <- newTVarIO 0 + total_ <- newTVarIO 0 + + (mmaped,meta@NWayHash{..}) <- nwayHashMMapReadOnly fn >>= orThrow (NWayHashInvalidMetaData fn) + + let emptyKey = BS.replicate nwayKeySize 0 + nwayHashScanAll meta mmaped $ \o k v -> do + atomically do + modifyTVar total_ succ + when (k == emptyKey) do + modifyTVar mt_ succ + + mt <- readTVarIO mt_ + total <- readTVarIO total_ + let used = total - mt + + let ratio = realToFrac @_ @(Fixed E3) (realToFrac used / realToFrac total) + + let stats = mkForm @C "stats" [ mkForm "empty" [mkInt mt] + , mkForm "used" [mkInt used] + , mkForm "total" [mkInt total] + , mkForm "ratio" [mkDouble ratio] + ] + + pure $ mkList [mkForm "metadata" [mkSyntax meta], stats] + + e -> throwIO $ BadFormException @C (mkList e) + + entry $ bindMatch "test:nway:metadata" $ \case + [StringLike fn] -> liftIO do + (_, nw) <- nwayHashMMapReadOnly fn >>= orThrowUser "can't mmape file" + pure $ mkSyntax nw + + e -> throwIO $ BadFormException @C (mkList e) + + entry $ bindMatch "test:nway:write" $ nil_ $ \case + [StringLike fn] -> liftIO do + hashes <- getContents <&> mapMaybe (fromStringMay @HashRef) . lines + let items = [ (coerce @_ @ByteString x, N.bytestring64 0) | x <- hashes ] + nwayWriteBatch (nwayAllocDef 1.10 32 8 8) "." fn items + + e -> throwIO $ BadFormException @C (mkList e) + + entry $ bindMatch "test:ncq:index" $ \case + [ StringLike p, StringLike fsrc ]-> lift $ flip runContT pure do + + ncq <- lift $ ncqStorageOpen p + writer <- ContT $ withAsync $ ncqStorageRun ncq + link writer + + fres <- lift $ ncqIndexFile ncq fsrc + + pure $ mkSym fres + + e -> throwIO $ BadFormException @C (mkList e) + + + entry $ bindMatch "test:ncq:raw:get" $ \case + + [StringLike fn, HashLike h] -> liftIO $ flip runContT pure do + + ncq <- lift $ ncqStorageOpen fn + writer <- ContT $ withAsync $ ncqStorageRun ncq + link writer + + lift do + ncqStorageGet ncq h >>= \case + Nothing -> pure nil + Just bs -> do + -- debug $ "GET" <+> pretty (LBS.length bs) <+> pretty (hashObject @HbSync bs) + mkOpaque bs + + e -> throwIO $ BadFormException @C (mkList e) + + entry $ bindMatch "test:ncq:has" $ \case + + [StringLike fn, HashLike h] -> liftIO $ flip runContT pure do + + ncq <- lift $ ncqStorageOpen fn + writer <- ContT $ withAsync $ ncqStorageRun ncq + link writer + + lift do + ncqStorageHasBlock ncq h >>= \case + Nothing -> pure nil + Just x -> pure $ mkInt x + + e -> throwIO $ BadFormException @C (mkList e) + + entry $ bindMatch "test:ncq:raw:up" $ nil_ $ \case + + [StringLike fn] -> liftIO $ flip runContT pure do + + ncq@NCQStorage{..} <- lift $ ncqStorageOpen fn + + writer <- ContT $ withAsync $ ncqStorageRun ncq + link writer + + trf <- readTVarIO ncqTrackedFiles <&> HS.toList + + for_ trf $ \tf -> do + notice $ "tracked" <+> pretty tf + + tri <- readTVarIO ncqCachedIndexes <&> HPSQ.toList + + for_ tri $ \(k,_,_) -> do + notice $ "cached-index" <+> pretty k + + e -> throwIO $ BadFormException @C (mkList e) + + entry $ bindMatch "test:ncq:raw" $ \case + [StringLike fn] -> liftIO $ flip runContT pure do + + debug "SHIT" + + ncq <- lift $ ncqStorageOpen fn + + writer <- ContT $ withAsync $ ncqStorageRun ncq + link writer + + h <- lift $ ncqStoragePut ncq "JOPAKITA!" + h2 <- lift $ ncqStoragePut ncq "PECHENTRESKI!" + + liftIO $ ncqStorageStop ncq + + pure $ mkList [mkSym (show $ pretty h), mkSym (show $ pretty h2)] + + e -> throwIO $ BadFormException @C (mkList e) + + entry $ bindMatch "test:ncq:raw:list" $ nil_ \case + [StringLike p, StringLike f] -> liftIO $ flip runContT pure do + + ncq <- lift $ ncqStorageOpen p + + writer <- ContT $ withAsync $ ncqStorageRun ncq + link writer + + lift $ ncqStorageScanDataFile ncq f $ \o _ k v -> do + liftIO $ print $ pretty k -- <+> pretty o <+> pretty (BS.length v) + + e -> throwIO $ BadFormException @C (mkList e) + + entry $ bindMatch "test:ncq:raw:find-some" $ nil_ \case + [StringLike fn] -> liftIO $ flip runContT pure do + hashes <- liftIO $ getContents <&> mapMaybe (fromStringMay @HashRef) . lines + + ncq <- lift $ ncqStorageOpen fn + + writer <- ContT $ withAsync $ ncqStorageRun ncq + link writer + + liftIO $ for_ hashes $ \h -> runMaybeT do + what <- liftIO (ncqStorageHasBlock ncq h) >>= toMPlus + -- let h1 = hashObject @HbSync what + -- liftIO $ print $ "block" <+> pretty h <+> pretty h1 <+> pretty (LBS.length what) + liftIO $ print $ "block" <+> pretty h <+> pretty what -- (LBS.length what) + + e -> throwIO $ BadFormException @C (mkList e) + + entry $ bindMatch "test:ncq:raw:dump-some" $ nil_ \case + [StringLike fn] -> liftIO $ flip runContT pure do + hashes <- liftIO $ getContents <&> mapMaybe (fromStringMay @HashRef) . lines + + xdg <- liftIO $ getXdgDirectory XdgData "hbs2" <&> fromString @StoragePrefix + + s <- simpleStorageInit @HbSync (Just xdg) + + w <- ContT $ withAsync $ simpleStorageWorker s + link w + + let sto = AnyStorage s + + rm fn + dump <- openFile fn WriteMode + + for_ hashes $ \h -> runMaybeT do + blk <- getBlock sto (coerce h) >>= toMPlus + debug $ "read" <+> pretty (LBS.length blk) + none + -- liftIO $ LBS.hPut dump blk + + hClose dump + + e -> throwIO $ BadFormException @C (mkList e) + + entry $ bindMatch "test:ncq:raw:locate" $ nil_ \case + [StringLike fn] -> liftIO $ flip runContT pure do + hashes <- liftIO $ getContents <&> mapMaybe (fromStringMay @HashRef) . lines + + ncq <- lift $ ncqStorageOpen fn + + writer <- ContT $ withAsync $ ncqStorageRun ncq + link writer + + timeItNamed (show $ "lookup" <+> pretty (List.length hashes)) do + for_ hashes $ \h -> liftIO do + ncqLocate ncq h >>= \case + Nothing -> print $ pretty "not-found" <+> pretty h + Just l -> print $ pretty "found" <+> pretty h <+> pretty l + + e -> throwIO $ BadFormException @C (mkList e) + + entry $ bindMatch "test:ncq:raw:put" $ \case + [StringLike fn] -> liftIO $ flip runContT pure do + + what <- liftIO BS.getContents + + ncq <- lift $ ncqStorageOpen fn + + writer <- ContT $ withAsync $ ncqStorageRun ncq + link writer + + href <- liftIO $ ncqStoragePut ncq (LBS.fromStrict what) + + liftIO $ ncqStorageStop ncq + + pure $ maybe nil (mkSym . show . pretty) href + + e -> throwIO $ BadFormException @C (mkList e) + + + + entry $ bindMatch "test:ncq:raw:merkle:write" $ nil_ \case + [StringLike fn, StringLike what] -> liftIO $ flip runContT pure do + + ncq <- lift $ ncqStorageOpen fn + + writer <- ContT $ withAsync $ ncqStorageRun ncq + link writer + + ContT $ bracket none $ const do + none + + lbs <- liftIO $ LBS.readFile what + + ta <- getTimeCoarse + + (t1,hashes) <- timeItT $ liftIO do + chu <- S.toList_ (readChunkedBS lbs (256*1024)) + forConcurrently chu $ \chunk -> do + ncqStoragePut ncq chunk >>= orThrowUser "can't save" + + tb <- getTimeCoarse + + notice $ "stored in" <+> pretty t1 + <+> pretty (realToFrac @_ @(Fixed E6) (realToFrac (toMicroSeconds (TimeoutTS (tb - ta))) / 1e6)) + + -- FIXME: handle-hardcode + let pt = toPTree (MaxSize 1024) (MaxNum 256) hashes -- FIXME: settings + + m <- makeMerkle 0 pt $ \(_,_,bss) -> liftIO do + void $ ncqStoragePut ncq bss >>= orThrowUser "can't save" + + liftIO $ print $ pretty m + + liftIO $ ncqStorageStop ncq + + e -> throwIO $ BadFormException @C (mkList e) + + entry $ bindMatch "test:ncq:raw:write-some" $ nil_ \case + [StringLike fn] -> liftIO $ flip runContT pure do + + hashes <- liftIO $ getContents <&> mapMaybe (fromStringMay @HashRef) . lines + + xdg <- liftIO $ getXdgDirectory XdgData "hbs2" <&> fromString @StoragePrefix + + s <- simpleStorageInit @HbSync (Just xdg) + + w <- ContT $ withAsync $ simpleStorageWorker s + link w + + let sto = AnyStorage s + + ncq <- lift $ ncqStorageOpen fn + + writer <- ContT $ withAsync $ ncqStorageRun ncq + link writer + + ContT $ bracket none $ const do + none + + for_ hashes $ \h -> runMaybeT do + blk <- getBlock sto (coerce h) >>= toMPlus + liftIO do + let l = LBS.length blk + -- print $ pretty h <+> pretty l + ncqStoragePut ncq blk + + liftIO $ ncqStorageStop ncq + + e -> throwIO $ BadFormException @C (mkList e) + + entry $ bindMatch "test:retry" $ nil_ $ const $ flip runContT pure do + + q <- newTQueueIO + w <- newTVarIO 0 + + p1 <- ContT $ withAsync $ forever do + pause @'Seconds 0.001 + x <- randomIO @Word64 + atomically do + writeTQueue q x + modifyTVar w succ + + p2 <- ContT $ withAsync $ do + atomically $ fix \next -> do + e <- readTQueue q + if (e == 0xDEADF00D) then none else next + + p3 <- ContT $ withAsync $ do + pause @'Seconds 10 + + waitAnyCatchCancel [p1,p2,p3] + + s <- atomically $ STM.flushTQueue q + n <- readTVarIO w + + liftIO $ print $ "so?" <+> pretty n <+> pretty (length s) + + setupLogger + + argz <- liftIO getArgs + + forms <- parseTop (unlines $ unwords <$> splitForms argz) + & either (error.show) pure + + tvd <- newTVarIO dict + + (runEval tvd forms >>= eatNil display) + `finally` flushLoggers +