diff --git a/hbs2-storage-ncq/hbs2-storage-ncq.cabal b/hbs2-storage-ncq/hbs2-storage-ncq.cabal index 1fdc47b2..e0a5d0e0 100644 --- a/hbs2-storage-ncq/hbs2-storage-ncq.cabal +++ b/hbs2-storage-ncq/hbs2-storage-ncq.cabal @@ -45,14 +45,15 @@ common shared-properties , ImportQualifiedPost , LambdaCase , MultiParamTypeClasses + , NumericUnderscores , OverloadedStrings , QuasiQuotes , ScopedTypeVariables , StandaloneDeriving , TupleSections , TypeApplications - , TypeOperators , TypeFamilies + , TypeOperators library @@ -73,6 +74,7 @@ library , directory , filepath , filepattern + , hashable , memory , microlens-platform , mmap diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ/Types.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ/Types.hs index 62f090c3..13159bd1 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ/Types.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ/Types.hs @@ -54,6 +54,9 @@ newtype DataFile a = DataFile a newtype IndexFile a = IndexFile a +newtype StateFile = StateFile FileKey + deriving newtype (IsString,Eq,Ord,Pretty) + class ToFileName a where toFileName :: a -> FilePath @@ -63,7 +66,6 @@ instance ToFileName FileKey where instance ToFileName (DataFile FileKey) where toFileName (DataFile fk) = dropExtension (toFileName fk) `addExtension` ".data" - instance ToFileName (IndexFile FileKey) where toFileName (IndexFile fk) = dropExtension (toFileName fk) `addExtension` ".cq" @@ -73,6 +75,9 @@ instance ToFileName (DataFile FilePath) where instance ToFileName (IndexFile FilePath) where toFileName (IndexFile fp) = dropExtension fp `addExtension` ".cq" +instance ToFileName StateFile where + toFileName (StateFile fk) = toFileName fk + newtype FilePrio = FilePrio (Down TimeSpec) deriving newtype (Eq,Ord) deriving stock (Generic,Show) @@ -151,6 +156,14 @@ ncqTombPrefix = "T;;\x00" ncqMetaPrefix :: ByteString ncqMetaPrefix = "M;;\x00" +ncqIsMeta :: ByteString -> Maybe NCQSectionType +ncqIsMeta bs = headMay [ t | (t,x) <- meta, BS.isPrefixOf x bs ] + where meta = [ (R, ncqRefPrefix) + , (B, ncqBlockPrefix) + , (T, ncqTombPrefix) + , (M, ncqMetaPrefix) + ] + ncqMakeSectionBS :: Maybe NCQSectionType -> HashRef -> ByteString diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs index 2510ba75..7f26c97b 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs @@ -1,5 +1,6 @@ {-# Language MultiWayIf #-} {-# Language RecordWildCards #-} +{-# Language PatternSynonyms #-} module HBS2.Storage.NCQ2 ( module HBS2.Storage.NCQ2 , module HBS2.Storage.NCQ.Types @@ -29,6 +30,7 @@ import Codec.Compression.Zstd.Lazy as ZstdL import Codec.Compression.Zstd.Streaming qualified as ZstdS import Codec.Compression.Zstd.Streaming (Result(..)) +import Numeric (showHex) import Control.Applicative import Data.ByteString.Builder import Network.ByteOrder qualified as N @@ -37,9 +39,11 @@ import Data.HashMap.Strict (HashMap) import Control.Monad.Except import Control.Monad.Trans.Cont import Control.Monad.Trans.Maybe +import Data.Time.Clock.POSIX import Data.Ord (Down(..),comparing) import Control.Concurrent.STM qualified as STM import Control.Concurrent.STM.TSem +import Data.Hashable (hash) import Data.HashPSQ qualified as HPSQ import Data.HashPSQ (HashPSQ) import Data.IntMap qualified as IntMap @@ -69,7 +73,6 @@ import Lens.Micro.Platform import Data.HashSet (HashSet) import Data.HashSet qualified as HS import Data.HashMap.Strict qualified as HM -import System.Directory (makeAbsolute) import System.FilePath.Posix import System.Posix.Fcntl import System.Posix.Files qualified as Posix @@ -98,12 +101,16 @@ import UnliftIO import UnliftIO.Concurrent(getNumCapabilities) import UnliftIO.IO.File +-- FIXME: ASAP-USE-FILE-LOCK import System.FileLock as FL - type FOff = Word64 -newtype NCQEntry = NCQEntry ByteString +data NCQEntry = + NCQEntry + { ncqEntryData :: ByteString + , ncqDumped :: TVar (Maybe FileKey) + } type Shard = TVar (HashMap HashRef NCQEntry) @@ -119,15 +126,33 @@ data NCQFlag = NCQMergeNow | NCQCompactNow deriving (Eq,Ord,Generic) + data Location = - InFossil ByteString NCQOffset NCQSize + InFossil FileKey !ByteString !NCQOffset !NCQSize | InMemory ByteString +instance Pretty Location where + pretty = \case + InFossil k _ o s -> parens $ "in-fossil" <+> pretty k <+> pretty o <+> pretty s + InMemory _ -> "in-memory" + +data TrackedFile = + TrackedFile + { tfTime :: FilePrio + , tfKey :: FileKey + , tfCached :: TVar (Maybe CachedEntry) + } + +type TrackedFiles = Vector TrackedFile + data NCQStorage2 = NCQStorage2 { ncqRoot :: FilePath , ncqGen :: Int , ncqSalt :: HashRef + , ncqPostponeMerge :: Timeout 'Seconds + , ncqPostponeSweep :: Timeout 'Seconds + , ncqLuckyNum :: Int , ncqFsync :: Int , ncqWriteQLen :: Int , ncqWriteBlock :: Int @@ -136,8 +161,8 @@ data NCQStorage2 = , ncqMaxCached :: Int , ncqIdleThrsh :: Double , ncqMemTable :: Vector Shard - , ncqWriteSem :: TSem , ncqWriteQ :: TVar (Seq HashRef) + , ncqWriteOps :: Vector (TQueue (IO ())) , ncqStorageTasks :: TVar Int , ncqStorageStopReq :: TVar Bool , ncqStorageSyncReq :: TVar Bool @@ -145,14 +170,19 @@ data NCQStorage2 = , ncqMergeSem :: TSem , ncqSyncNo :: TVar Int , ncqCurrentFiles :: TVar (HashSet FileKey) - , ncqTrackedFiles :: TVar (HashPSQ FileKey FilePrio (Maybe CachedEntry)) + , ncqTrackedFiles :: TVar TrackedFiles , ncqStateVersion :: TVar StateVersion , ncqStateUsage :: TVar (IntMap (Int, HashSet FileKey)) - , ncqStateName :: TVar (Maybe FilePath) + , ncqStateName :: TVar (Maybe StateFile) + , ncqStateSem :: TSem , ncqCachedEntries :: TVar Int , ncqWrites :: TVar Int , ncqWriteEMA :: TVar Double -- for writes-per-seconds , ncqJobQ :: TQueue (IO ()) + , ncqMiscSem :: TSem + , ncqSweepSem :: TSem + , ncqMergeTasks :: TVar Int + , ncqOnRunWriteIdle :: TVar (IO ()) } megabytes :: forall a . Integral a => a @@ -165,33 +195,45 @@ ncqStorageOpen2 :: MonadIO m => FilePath -> (NCQStorage2 -> NCQStorage2)-> m NCQ ncqStorageOpen2 fp upd = do let ncqRoot = fp let ncqGen = 0 - let ncqFsync = 16 * megabytes - let ncqWriteQLen = 1024 * 16 - let ncqMinLog = 256 * megabytes - let ncqMaxLog = 16 * gigabytes -- ??? - let ncqWriteBlock = 1024 + let ncqFsync = 8 * megabytes + let ncqWriteQLen = 1024 * 4 + let ncqMinLog = 512 * megabytes + let ncqMaxLog = 16 * gigabytes -- ??? + let ncqWriteBlock = max 128 $ ncqWriteQLen `div` 2 let ncqMaxCached = 128 let ncqIdleThrsh = 50.00 + let ncqPostponeMerge = 30.00 + let ncqPostponeSweep = 2 * ncqPostponeMerge + let ncqLuckyNum = 2 + + let shardNum = ncqLuckyNum * 2 + let wopNum = ncqLuckyNum cap <- getNumCapabilities <&> fromIntegral ncqWriteQ <- newTVarIO mempty - ncqWriteSem <- atomically $ newTSem 16 -- (fromIntegral cap) - ncqMemTable <- V.fromList <$> replicateM cap (newTVarIO mempty) + ncqMemTable <- V.fromList <$> replicateM shardNum (newTVarIO mempty) ncqStorageStopReq <- newTVarIO False ncqStorageSyncReq <- newTVarIO False ncqMergeReq <- newTVarIO False ncqMergeSem <- atomically (newTSem 1) ncqSyncNo <- newTVarIO 0 ncqCurrentFiles <- newTVarIO mempty - ncqTrackedFiles <- newTVarIO HPSQ.empty + ncqTrackedFiles <- newTVarIO V.empty ncqStateVersion <- newTVarIO 0 ncqStateUsage <- newTVarIO mempty ncqStateName <- newTVarIO Nothing + ncqStateSem <- atomically $ newTSem 1 ncqCachedEntries <- newTVarIO 0 ncqStorageTasks <- newTVarIO 0 ncqWrites <- newTVarIO 0 ncqWriteEMA <- newTVarIO 0.00 ncqJobQ <- newTQueueIO + ncqMiscSem <- atomically (newTSem 1) + ncqSweepSem <- atomically (newTSem 1) + ncqMergeTasks <- newTVarIO 0 + ncqOnRunWriteIdle <- newTVarIO none + + ncqWriteOps <- replicateM wopNum newTQueueIO <&> V.fromList let ncqSalt = "EstEFasxrCFqsGDxcY4haFcha9e4ZHRzsPbGUmDfdxLk" @@ -201,7 +243,8 @@ ncqStorageOpen2 fp upd = do liftIO $ withSem ncqMergeSem do ncqRepair ncq - ncqLoadIndexes ncq + ncqPreloadIndexes ncq + ncqSweepStates ncq pure ncq @@ -225,18 +268,24 @@ ncqGetWorkDir NCQStorage2{..} = ncqRoot show ncqGen ncqGetLockFileName :: NCQStorage2 -> FilePath ncqGetLockFileName ncq = ncqGetFileName ncq ".lock" +ncqNewUniqFileName :: MonadIO m => NCQStorage2 -> FilePath -> FilePath -> m FilePath +ncqNewUniqFileName me@NCQStorage2{..} pref suff = liftIO $ withSem ncqMiscSem do + flip fix 0 $ \next i -> do + t <- round @_ @Integer . (* 1e9) <$> getPOSIXTime + let v = show $ pretty (showHex t "") <> "-" <> pretty (showHex i "") + let n = ncqGetFileName me (pref <> v <> suff) + doesFileExist n >>= \case + False -> pure n + True -> next (succ i) + ncqGetNewFossilName :: MonadIO m => NCQStorage2 -> m FilePath -ncqGetNewFossilName ncq = do - liftIO $ emptyTempFile (ncqGetWorkDir ncq) "fossil-.data" +ncqGetNewFossilName me = ncqNewUniqFileName me "fossil-" ".data" ncqGetNewStateName :: MonadIO m => NCQStorage2 -> m FilePath -ncqGetNewStateName ncq = do - liftIO $ emptyTempFile (ncqGetWorkDir ncq) "state-" +ncqGetNewStateName me = ncqNewUniqFileName me "state-" "" ncqGetNewCompactName :: MonadIO m => NCQStorage2 -> m FilePath -ncqGetNewCompactName n@NCQStorage2{} = do - let (p,tpl) = splitFileName (ncqGetFileName n "compact-.data") - liftIO $ emptyTempFile p tpl +ncqGetNewCompactName me = ncqNewUniqFileName me "compact-" ".data" ncqStorageStop2 :: MonadUnliftIO m => NCQStorage2 -> m () ncqStorageStop2 NCQStorage2{..} = do @@ -258,6 +307,14 @@ ncqGetShard ncq@NCQStorage2{..} h = ncqMemTable ! ncqShardIdx ncq h ncqLookupEntrySTM :: NCQStorage2 -> HashRef -> STM (Maybe NCQEntry) ncqLookupEntrySTM ncq h = readTVar (ncqGetShard ncq h) <&> HM.lookup h +ncqAlterEntrySTM :: NCQStorage2 + -> HashRef + -> (Maybe NCQEntry -> Maybe NCQEntry) + -> STM () +ncqAlterEntrySTM ncq h alterFn = do + let shard = ncqGetShard ncq h + modifyTVar shard (HM.alter alterFn h) + ncqPutBS :: MonadUnliftIO m => NCQStorage2 -> Maybe NCQSectionType @@ -265,25 +322,68 @@ ncqPutBS :: MonadUnliftIO m -> ByteString -> m HashRef ncqPutBS ncq@NCQStorage2{..} mtp mhref bs' = do - let h = fromMaybe (HashRef (hashObject @HbSync bs')) mhref - let bs = ncqMakeSectionBS mtp h bs' + + waiter <- newEmptyTMVarIO + + let work = do + + let h = fromMaybe (HashRef (hashObject @HbSync bs')) mhref + let bs = ncqMakeSectionBS mtp h bs' + let shard = ncqGetShard ncq h + zero <- newTVarIO Nothing + + atomically do + + stop <- readTVar ncqStorageStopReq + filled <- readTVar ncqWriteQ <&> Seq.length + + when (not stop && filled > ncqWriteQLen) STM.retry + + upd <- stateTVar shard $ flip HM.alterF h \case + Nothing -> (True, Just (NCQEntry bs zero)) + Just e | ncqEntryData e /= bs -> (True, Just (NCQEntry bs zero)) + | otherwise -> (False, Just e) + + when upd do + modifyTVar' ncqWriteQ (|> h) + + putTMVar waiter h + atomically do - waitTSem ncqWriteSem - + nw <- readTVar ncqWrites <&> (`mod` V.length ncqWriteOps) modifyTVar' ncqWrites succ - stop <- readTVar ncqStorageStopReq - filled <- readTVar ncqWriteQ <&> Seq.length + writeTQueue (ncqWriteOps ! nw) work - when (not stop && filled > ncqWriteQLen) STM.retry + atomically $ takeTMVar waiter - ncqAlterEntrySTM ncq h $ \case - Just e -> Just e - Nothing -> Just (NCQEntry bs) +ncqEntryUnwrap :: NCQStorage2 + -> ByteString + -> (ByteString, Either ByteString (NCQSectionType, ByteString)) +ncqEntryUnwrap _ source = do + let (k,v) = BS.splitAt ncqKeyLen (BS.drop 4 source) + case ncqIsMeta v of + Just meta -> (k, Right (meta, BS.drop ncqPrefixLen v)) + Nothing -> (k, Left v) +{-# INLINE ncqEntryUnwrap #-} - modifyTVar' ncqWriteQ (|> h) - signalTSem ncqWriteSem +ncqIsTomb :: NCQStorage2 -> Location -> Bool +ncqIsTomb me loc = case ncqEntryUnwrap me (ncqGetEntryBS me loc) of + (_, Right (T, _)) -> True + _ -> False - pure h +ncqDelEntry :: MonadUnliftIO m + => NCQStorage2 + -> HashRef + -> m () +ncqDelEntry me href = do + -- всегда пишем tomb и надеемся на лучшее + -- merge/compact разберутся + -- однако не пишем, если записи еще нет + ncqLocate2 me href >>= \case + Just loc | not (ncqIsTomb me loc) -> do + void $ ncqPutBS me (Just T) (Just href) "" + + _ -> none ncqLookupEntry :: MonadUnliftIO m => NCQStorage2 -> HashRef -> m (Maybe NCQEntry) ncqLookupEntry sto hash = atomically (ncqLookupEntrySTM sto hash) @@ -291,99 +391,137 @@ ncqLookupEntry sto hash = atomically (ncqLookupEntrySTM sto hash) ncqGetEntryBS :: NCQStorage2 -> Location -> ByteString ncqGetEntryBS _ = \case InMemory bs -> bs - InFossil mmap off size -> do + InFossil _ mmap off size -> do BS.take (fromIntegral size) $ BS.drop (fromIntegral off) mmap ncqEntrySize :: forall a . Integral a => Location -> a ncqEntrySize = \case - InFossil _ _ size -> fromIntegral size + InFossil _ _ _ size -> fromIntegral size InMemory bs -> fromIntegral (BS.length bs) +useVersion :: forall m a . MonadUnliftIO m => NCQStorage2 -> (() -> m a) -> m a +useVersion ncq m = bracket succV predV m + where + succV = atomically (ncqStateUseSTM ncq) + predV = const $ atomically (ncqStateUnuseSTM ncq) -ncqLocate2 :: MonadUnliftIO m => NCQStorage2 -> HashRef -> m (Maybe Location) -ncqLocate2 ncq@NCQStorage2{..} href = flip runContT pure $ callCC \exit -> do - now <- getTimeCoarse +ncqListTrackedFilesSTM :: NCQStorage2 -> STM (Vector (FileKey, Maybe CachedEntry, TVar (Maybe CachedEntry))) +ncqListTrackedFilesSTM NCQStorage2{..} = do + fs <- readTVar ncqTrackedFiles + for fs $ \TrackedFile{..} -> (tfKey,,) <$> readTVar tfCached <*> pure tfCached - lift (ncqLookupEntry ncq href) >>= maybe none (exit . Just . InMemory . coerce) +ncqListTrackedFiles :: MonadUnliftIO m => NCQStorage2 -> m (Vector (FileKey, Maybe CachedEntry, TVar (Maybe CachedEntry))) +ncqListTrackedFiles = atomically . ncqListTrackedFilesSTM + + +ncqPreloadIndexes :: MonadUnliftIO m + => NCQStorage2 + -> m () +ncqPreloadIndexes me@NCQStorage2{..} = useVersion me $ const do + fs <- readTVarIO ncqTrackedFiles <&> take ncqMaxCached . V.toList + flip fix (fs, ncqMaxCached) $ \next (files,lim) -> do + case files of + (t@TrackedFile{..}:rest) | lim > 0 -> do + readTVarIO tfCached >>= \case + Nothing -> do + void $ ncqLoadTrackedFile me t + next (rest, pred lim) + _ -> next (rest, lim) + + _ -> none + +ncqLoadTrackedFile :: MonadUnliftIO m + => NCQStorage2 + -> TrackedFile + -> m (Maybe CachedEntry) +ncqLoadTrackedFile ncq@NCQStorage2{..} TrackedFile{..} = runMaybeT do + + let indexFile = ncqGetFileName ncq (toFileName (IndexFile tfKey)) + let dataFile = ncqGetFileName ncq (toFileName (DataFile tfKey)) + + idxHere <- liftIO $ doesFileExist indexFile + unless idxHere do + liftIO $ err $ red "missed index" <+> "in loadIndex" <+> pretty tfKey + mzero + + (idxBs, idxNway) <- MaybeT $ + liftIO (nwayHashMMapReadOnly indexFile) + + datBs <- liftIO $ mmapFileByteString dataFile Nothing + + tnow <- liftIO $ newTVarIO =<< getTimeCoarse + let ce = CachedEntry idxBs datBs idxNway tnow atomically do - modifyTVar' ncqWrites succ + writeTVar tfCached (Just ce) + modifyTVar ncqCachedEntries (+1) + evictIfNeededSTM ncq (Just 1) - -- FIXME: race - -- merge can-delete-file-while-in-use + pure ce - tracked <- readTVarIO ncqTrackedFiles <&> HPSQ.toList +data Seek a = SeekStop !a | SeekNext !a - for_ tracked $ \(fk, prio, mCached) -> callCC \skip -> do - case mCached of - Just CachedEntry{..} -> do - lookupEntry href (cachedMmapedIdx, cachedNway) >>= \case - Nothing -> none - Just (offset,size) -> do - atomically $ writeTVar cachedTs now - exit (Just $ InFossil cachedMmapedData offset size) - Just PendingEntry {} -> none +--- - Nothing -> useVersion do - let indexFile = ncqGetFileName ncq (toFileName (IndexFile fk)) - let dataFile = ncqGetFileName ncq (toFileName (DataFile fk)) +ncqSeekInFossils :: forall a f m . (MonadUnliftIO m, Monoid (f a)) + => NCQStorage2 + -> HashRef + -> (Location -> m (Seek (f a))) + -> m (f a) +ncqSeekInFossils ncq@NCQStorage2{..} href action = useVersion ncq $ const do + tracked <- readTVarIO ncqTrackedFiles + let l = V.length tracked - idxHere <- doesFileExist indexFile + let + go :: Int -> Int -> f a -> m (f a) + go i a r + | i >= l = pure r + | a > 1 = do + let TrackedFile{..} = tracked ! i + err $ "unable to load fossil" <+> pretty tfKey + go (i+1) 0 r + | otherwise = do + let TrackedFile{..} = tracked ! i + readTVarIO tfCached >>= \case - unless idxHere do - err $ red "missed index" <+> "in ncqLocate" <+> pretty fk - skip () + Just PendingEntry{} -> + go (i+1) 0 r - (idxBs, idxNway) <- liftIO (nwayHashMMapReadOnly indexFile) - >>= orThrow (NCQStorageCantMapFile indexFile) + Nothing -> do + void $ ncqLoadTrackedFile ncq TrackedFile{..} + go i (a+1) r - datBs <- liftIO $ mmapFileByteString dataFile Nothing + Just CachedEntry{..} -> do + liftIO (lookupEntry href (cachedMmapedIdx, cachedNway)) >>= \case + Nothing -> go (i+1) 0 r + Just (offset, size) -> do + now <- getTimeCoarse + atomically $ writeTVar cachedTs now + action (InFossil tfKey cachedMmapedData offset size) >>= \case + SeekStop e -> pure (r <> e) + SeekNext e -> go (i+1) 0 (r <> e) - ce <- CachedEntry idxBs datBs idxNway <$> newTVarIO now - - lookupEntry href (idxBs, idxNway) >>= \case - Nothing -> none - Just (offset, size) -> do - - atomically do - modifyTVar ncqTrackedFiles (HPSQ.insert fk prio (Just ce)) - modifyTVar ncqCachedEntries (+1) - evictIfNeededSTM ncq (Just 1) - - exit $ Just (InFossil datBs offset size) - - pure mzero + go 0 0 mempty where - useVersion m = ContT (bracket succV predV) >> m - where - succV = atomically (ncqStateUseSTM ncq) - predV = const $ atomically (ncqStateUseSTM ncq) - - lookupEntry (hx :: HashRef) (mmaped, nway) = - liftIO (nwayHashLookup nway mmaped (coerce hx)) >>= \case - Nothing -> pure Nothing - Just entryBs -> do - pure $ Just - ( fromIntegral $ N.word64 (BS.take 8 entryBs) - , fromIntegral $ N.word32 (BS.take 4 (BS.drop 8 entryBs)) ) - {-# INLINE lookupEntry #-} + lookupEntry hx (mmaped, nway) = do + fmap decodeEntry <$> nwayHashLookup nway mmaped (coerce hx) + where + {-# INLINE decodeEntry #-} + decodeEntry entryBs = do + let (p,r) = BS.splitAt 8 entryBs + let off = fromIntegral (N.word64 p) + let size = fromIntegral (N.word32 (BS.take 4 r)) + ( off, size ) -ncqAlterEntrySTM :: NCQStorage2 -> HashRef -> (Maybe NCQEntry -> Maybe NCQEntry) -> STM () -ncqAlterEntrySTM ncq h alterFn = do - let shard = ncqGetShard ncq h - modifyTVar shard (HM.alter alterFn h) - - -ncqStorageDel :: MonadUnliftIO m => NCQStorage2 -> HashRef -> m () -ncqStorageDel ncq@NCQStorage2{..} h = flip runContT pure $ callCC \exit -> do - -- 1. absent - -- 1. in memtable only - -- 2. in disk - none +ncqLocate2 :: MonadUnliftIO m => NCQStorage2 -> HashRef -> m (Maybe Location) +ncqLocate2 ncq href = do + inMem <- ncqLookupEntry ncq href <&> fmap (InMemory . ncqEntryData) + inFo <- listToMaybe <$> ncqSeekInFossils ncq href \loc -> pure (SeekStop [loc]) + pure $ inMem <|> inFo data RunSt = RunNew @@ -407,34 +545,52 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do | otherwise -> pure Nothing maybe1 what none $ \(fk, fh) -> do + debug $ red "CLOSE FILE" <+> pretty fk closeFd fh - -- notice $ yellow "indexing" <+> pretty fname + debug $ yellow "indexing" <+> pretty fk idx <- ncqRunTaskNoMatterWhat ncq (ncqIndexFile ncq (DataFile fk)) ncqRunTaskNoMatterWhat ncq $ ncqStateUpdate ncq [F 0 fk] + debug $ "REMOVE ALL SHIT" <+> pretty idx nwayHashMMapReadOnly idx >>= \case Nothing -> err $ "can't open index" <+> pretty idx Just (bs,nway) -> do nwayHashScanAll nway bs $ \_ k _ -> do - unless (k == emptyKey) $ atomically do - ncqAlterEntrySTM ncq (coerce k) (const Nothing) - -- atomically (modifyTVar' ncqCurrentFiles (HS.delete fk)) + unless (k == emptyKey) $ atomically $ void $ runMaybeT do + NCQEntry _ tfk <- MaybeT $ ncqLookupEntrySTM ncq (coerce k) + fk' <- MaybeT $ readTVar tfk + guard (fk == fk') -- remove only own stuff + lift $ ncqAlterEntrySTM ncq (coerce k) (const Nothing) + + ncqPreloadIndexes ncq + atomically (modifyTVar' ncqCurrentFiles (HS.delete fk)) loop spawnActivity $ forever (liftIO $ join $ atomically (readTQueue ncqJobQ)) + let shLast = V.length ncqWriteOps - 1 + spawnActivity $ pooledForConcurrentlyN_ (V.length ncqWriteOps) [0..shLast] $ \i -> do + let q = ncqWriteOps ! i + forever (liftIO $ join $ atomically (readTQueue q)) + spawnActivity measureWPS -- FIXME: bigger-period - spawnActivity $ forever $ (>> pause @'Seconds 300) $ do + spawnActivity $ postponed ncqPostponeSweep $ forever $ (>> pause @'Seconds 120) $ do ema <- readTVarIO ncqWriteEMA - when (ema < ncqIdleThrsh * 1.5) do + n <- ncqListStateFiles ncq <&> List.length + when (ema < ncqIdleThrsh * 1.5 && n > 0) $ withSem ncqMergeSem do + debug $ yellow "run sweep routine" ncqSweepStates ncq ncqSweepFossils ncq - spawnActivity $ fix \again -> (>> again) do + spawnActivity $ postponed ncqPostponeMerge $ fix \again -> (>> again) do ema <- readTVarIO ncqWriteEMA mergeReq <- atomically $ stateTVar ncqMergeReq (,False) + debug $ green "MERGE ATTEMPT" <+> pretty ema <+> "~" <+> pretty ncqIdleThrsh + + let notPending x = List.length [ k | (k,e,_) <- V.toList x, isNotPending e ] + if ema > ncqIdleThrsh && not mergeReq then do pause @'Seconds 10 @@ -447,14 +603,19 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do -- TODO: detect-dead-merge void $ race (pause @'Seconds 300) (atomically $ readTMVar mq) >>= \case - Left{} -> none + Left{} -> warn $ yellow "MERGE FUCKING STALLED" Right True -> none Right False -> do + debug "merge: all done, wait..." - n0 <- readTVarIO ncqTrackedFiles <&> HPSQ.size - atomically do - n <- readTVar ncqTrackedFiles <&> HPSQ.size - when (n == n0) STM.retry + n0 <- ncqListTrackedFiles ncq <&> notPending + + -- FIXME: bigger-timeout + void $ race (pause @'Seconds 60) do + atomically do + n <- ncqListTrackedFilesSTM ncq <&> notPending + when (n == n0) STM.retry + ContT $ bracket none $ const $ liftIO do fhh <- atomically (STM.flushTQueue closeQ) @@ -475,13 +636,15 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do loop RunFin else do (fk,fhx) <- openNewDataFile - atomically $ modifyTVar' ncqCurrentFiles (HS.insert fk) + liftIO (ncqStateUpdate ncq [P fk]) + debug $ "openNewDataFile" <+> pretty fk loop $ RunWrite (fk,fhx,0,0) RunSync (fk, fh, w, total, continue) -> do stop <- readTVarIO ncqStorageStopReq sync <- readTVarIO ncqStorageSyncReq + let needClose = total >= ncqMinLog || stop rest <- if not (sync || needClose || w > ncqFsync) then @@ -490,7 +653,6 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do appendTailSection fh >> liftIO (fileSynchronise fh) -- FIXME: slow! -- to make it appear in state, but to ignore until index is done - liftIO (ncqStateUpdate ncq [P fk]) atomically do writeTVar ncqStorageSyncReq False modifyTVar' ncqSyncNo succ @@ -507,7 +669,9 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do RunWrite (fk, fh, w, total') -> do - chunk <- atomically do + let timeoutMicro = 10_000_000 + + chunk <- liftIO $ timeout timeoutMicro $ atomically do stop <- readTVar ncqStorageStopReq sy <- readTVar ncqStorageSyncReq chunk <- stateTVar ncqWriteQ (Seq.splitAt ncqWriteBlock) @@ -517,11 +681,21 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do | otherwise -> pure $ Right chunk case chunk of - Left{} -> loop $ RunSync (fk, fh, w, total', False) -- exit () - Right chu -> do + Nothing -> do + liftIO $ join $ readTVarIO ncqOnRunWriteIdle + if w == 0 then do + loop $ RunWrite (fk,fh,w,total') + else do + atomically $ writeTVar ncqStorageSyncReq True + loop $ RunSync (fk, fh, w, total', True) -- exit () + + Just (Left{}) -> loop $ RunSync (fk, fh, w, total', False) -- exit () + + Just (Right chu) -> do ws <- for chu $ \h -> do atomically (ncqLookupEntrySTM ncq h) >>= \case - Just (NCQEntry bs) -> do + Just (NCQEntry bs w) -> do + atomically (writeTVar w (Just fk)) lift (appendSection fh bs) _ -> pure 0 @@ -531,18 +705,19 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do where - emptyKey = BS.replicate ncqKeyLen 0 openNewDataFile :: forall mx . MonadIO mx => mx (FileKey, Fd) openNewDataFile = do fname <- ncqGetNewFossilName ncq + atomically $ modifyTVar' ncqCurrentFiles (HS.insert (fromString fname)) touch fname let flags = defaultFileFlags { exclusive = False, creat = Just 0o666 } (fromString fname,) <$> liftIO (PosixBase.openFd fname Posix.ReadWrite flags) - spawnJob :: IO () -> m () - spawnJob m = atomically $ writeTQueue ncqJobQ m + spawnJob = ncqSpawnJob ncq + + postponed n m = liftIO (pause @'Seconds n) >> m spawnActivity m = do a <- ContT $ withAsync m @@ -567,6 +742,8 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do alpha = 0.1 step = 1.00 +ncqSpawnJob :: forall m . MonadIO m => NCQStorage2 -> IO () -> m () +ncqSpawnJob NCQStorage2{..} m = atomically $ writeTQueue ncqJobQ m ncqFileFastCheck :: MonadUnliftIO m => FilePath -> m () ncqFileFastCheck fp = do @@ -618,12 +795,15 @@ ncqIndexFile n@NCQStorage2{} fk = do debug $ "INDEX" <+> pretty fp <+> pretty dest items <- S.toList_ do - ncqStorageScanDataFile n fp $ \o w k _ -> do - let rs = w - 32 & fromIntegral @_ @Word32 & N.bytestring32 - let os = fromIntegral @_ @Word64 o & N.bytestring64 - let record = os <> rs - -- debug $ "write record" <+> pretty (BS.length record) - S.yield (coerce k, record) + ncqStorageScanDataFile n fp $ \o w k s -> case ncqIsMeta s of + Just M -> none + _ -> do + -- we need size in order to return block size faster + -- w/o search in fossil + let rs = (w + ncqSLen) & fromIntegral @_ @Word32 & N.bytestring32 + let os = fromIntegral @_ @Word64 o & N.bytestring64 + let record = os <> rs + S.yield (coerce k, record) let (dir,name) = splitFileName fp let idxTemp = (dropExtension name <> "-") `addExtension` ".cq$" @@ -631,117 +811,93 @@ ncqIndexFile n@NCQStorage2{} fk = do result <- nwayWriteBatch (nwayAllocDef 1.10 32 8 12) dir idxTemp items mv result dest + + ncqStateUpdate n [F 0 (coerce fk)] + pure dest -ncqAddTrackedFile :: MonadIO m => NCQStorage2 -> DataFile FileKey -> m Bool -ncqAddTrackedFile ncq@NCQStorage2{..} fkey = flip runContT pure $ callCC \exit -> do - let fname = ncqGetFileName ncq (toFileName fkey) - let idxName = ncqGetFileName ncq (toFileName (IndexFile (coerce @_ @FileKey fkey))) - idxHere <- doesFileExist idxName +ncqAddTrackedFiles :: MonadIO m => NCQStorage2 -> [DataFile FileKey] -> m () +ncqAddTrackedFiles ncq@NCQStorage2{..} files = flip runContT pure do + valid <- for files \fkey -> callCC \skip -> do + let fname = ncqGetFileName ncq (toFileName fkey) + let idxName = ncqGetFileName ncq (toFileName (IndexFile (coerce @_ @FileKey fkey))) - unless idxHere do - err $ "Index does not exist" <+> pretty (takeFileName idxName) - exit False + idxHere <- doesFileExist idxName + unless idxHere do + err $ "Index does not exist" <+> pretty (takeFileName idxName) + skip Nothing - stat <- liftIO $ PFS.getFileStatus fname - -- FIXME: maybe-creation-time-actually - let ts = posixToTimeSpec $ PFS.modificationTimeHiRes stat - let fk = fromString (takeFileName fname) + stat <- liftIO $ PFS.getFileStatus fname + let ts = posixToTimeSpec $ PFS.modificationTimeHiRes stat + let fk = fromString (takeFileName fname) - atomically $ ncqAddTrackedFileSTM ncq fk ts - pure True + pure $ Just (ts, fk) -ncqAddTrackedFileSTM :: NCQStorage2 -> FileKey -> TimeSpec -> STM () -ncqAddTrackedFileSTM NCQStorage2{..} fk ts = do - modifyTVar' ncqTrackedFiles (HPSQ.insert fk (FilePrio (Down ts)) Nothing) -{-# INLINE ncqAddTrackedFileSTM #-} + atomically $ ncqAddTrackedFilesSTM ncq (catMaybes valid) + + +ncqAddTrackedFilesSTM :: NCQStorage2 -> [(TimeSpec, FileKey)] -> STM () +ncqAddTrackedFilesSTM NCQStorage2{..} newFiles = do + a <- readTVar ncqTrackedFiles <&> V.toList + let already = HS.fromList (map tfKey a) + + b <- for newFiles \(t, f) -> + if f `HS.member` already + then pure Nothing + else do + tv <- newTVar Nothing + pure . Just $ TrackedFile (FilePrio (Down t)) f tv + + let new = V.fromList $ List.sortOn tfTime (a <> catMaybes b) + writeTVar ncqTrackedFiles new +{-# INLINE ncqAddTrackedFilesSTM #-} evictIfNeededSTM :: NCQStorage2 -> Maybe Int -> STM () -evictIfNeededSTM NCQStorage2{..} howMany = do +evictIfNeededSTM me@NCQStorage2{..} howMany = do cur <- readTVar ncqCachedEntries let need = fromMaybe cur howMany excess = max 0 (cur + need - ncqMaxCached) when (excess > 0) do - files <- readTVar ncqTrackedFiles <&> HPSQ.toList + files <- ncqListTrackedFilesSTM me - oldest <- forM files \case - (k, prio, Just ce) -> do - ts <- readTVar (cachedTs ce) - pure (Just (ts, k, prio)) + oldest <- forM (V.toList files) \case + (k, Just (CachedEntry{..}) , t) -> do + ts <- readTVar cachedTs + pure (Just (ts, k, t)) _ -> pure Nothing - let victims = - oldest - & catMaybes - & List.sortOn (\(ts,_,_) -> ts) - & List.take excess + let victims = oldest & catMaybes & List.sortOn (view _1) & List.take excess - for_ victims $ \(_,k,prio) -> do - modifyTVar ncqTrackedFiles (HPSQ.insert k prio Nothing) + for_ victims $ \(_,_,t) -> do + writeTVar t Nothing modifyTVar ncqCachedEntries (subtract 1) - {- HLINT ignore "Functor law" -} -ncqListTrackedFiles :: MonadIO m => NCQStorage2 -> m [FilePath] -ncqListTrackedFiles ncq = do +ncqListDirFossils :: MonadIO m => NCQStorage2 -> m [FilePath] +ncqListDirFossils ncq = do let wd = ncqGetWorkDir ncq dirFiles wd >>= mapM (pure . takeFileName) <&> List.filter (\f -> List.isPrefixOf "fossil-" f && List.isSuffixOf ".data" f) <&> HS.toList . HS.fromList -ncqListStateFiles :: MonadIO m => NCQStorage2 -> m [(TimeSpec, FilePath)] +ncqListStateFiles :: forall m . MonadIO m => NCQStorage2 -> m [(TimeSpec, StateFile)] ncqListStateFiles ncq = do let wd = ncqGetWorkDir ncq dirFiles wd >>= mapM (pure . takeBaseName) <&> List.filter (List.isPrefixOf "state-") - >>= mapM (\x -> (,x) <$> timespecOf x) + >>= mapM timespecOf + <&> fmap (over _2 fromString) . rights <&> List.sortOn Down where - timespecOf x = liftIO do - posixToTimeSpec . modificationTimeHiRes <$> getFileStatus (ncqGetFileName ncq x) - -ncqLoadSomeIndexes :: MonadIO m => NCQStorage2 -> [FileKey] -> m () -ncqLoadSomeIndexes ncq@NCQStorage2{..} keys = do - now <- getTimeCoarse - - mapM_ (ncqAddTrackedFile ncq) (fmap DataFile keys) - - loaded <- catMaybes <$> forM keys \key -> runMaybeT do - mEntry <- liftIO $ readTVarIO ncqTrackedFiles <&> HPSQ.lookup key - guard (maybe True (\(_, m) -> isNothing m) mEntry) - - let idxFile = ncqGetFileName ncq (toFileName $ IndexFile key) - let datFile = ncqGetFileName ncq (toFileName $ DataFile key) - - (mmIdx, nway) <- MaybeT $ liftIO $ nwayHashMMapReadOnly idxFile - mmData <- liftIO $ mmapFileByteString datFile Nothing - tnow <- newTVarIO now - pure (key, CachedEntry mmIdx mmData nway tnow) - - atomically do - evictIfNeededSTM ncq (Just (List.length loaded)) - - for_ loaded \(k, ce) -> do - files <- readTVar ncqTrackedFiles - case HPSQ.lookup k files of - Just (p, Nothing) -> do - modifyTVar ncqTrackedFiles (HPSQ.insert k p (Just ce)) - modifyTVar ncqCachedEntries (+1) - _ -> pure () - -ncqLoadIndexes :: MonadIO m => NCQStorage2 -> m () -ncqLoadIndexes ncq@NCQStorage2{..} = do - w <- readTVarIO ncqTrackedFiles - <&> List.take (ncqMaxCached `div` 2) . HPSQ.keys - ncqLoadSomeIndexes ncq w - + timespecOf x = liftIO @m $ try @_ @IOException do + (,x) . posixToTimeSpec . modificationTimeHiRes <$> getFileStatus (ncqGetFileName ncq x) ncqRepair :: MonadIO m => NCQStorage2 -> m () ncqRepair me@NCQStorage2{..} = do @@ -749,19 +905,20 @@ ncqRepair me@NCQStorage2{..} = do fossils <- flip fix states $ \next -> \case [] -> do - warn $ yellow "no valid state found; start from scratch" - ncqListTrackedFiles me <&> fmap (DataFile . fromString) + debug $ yellow "no valid state found; start from scratch" + ncqListDirFossils me <&> fmap (DataFile . fromString) (s:ss) -> tryLoadState s >>= \case Just files -> do debug $ yellow "used state" <+> pretty s - atomically $ writeTVar ncqStateName (Just $ takeFileName s) + atomically $ writeTVar ncqStateName (Just s) pure files Nothing -> do warn $ red "inconsistent state" <+> pretty s + rm (ncqGetFileName me $ toFileName s) next ss - mapM_ (ncqAddTrackedFile me) fossils + ncqAddTrackedFiles me fossils void $ liftIO (ncqStateUpdate me mempty) @@ -769,42 +926,48 @@ ncqRepair me@NCQStorage2{..} = do readState path = ncqReadStateKeys me path <&> fmap DataFile - tryLoadState path = liftIO do + tryLoadState (fk :: StateFile) = liftIO do - debug $ "tryLoadState" <+> pretty path + debug $ "tryLoadState" <+> pretty fk - state <- readState path + state <- readState fk let checkFile fo = flip fix 0 $ \next i -> do let dataFile = ncqGetFileName me (toFileName fo) let indexFile = ncqGetFileName me (toFileName (IndexFile (coerce @_ @FileKey fo))) - -- debug $ "checkFile" <+> pretty dataFile + here <- doesFileExist dataFile - try @_ @SomeException (ncqFileFastCheck dataFile) >>= \case + if not here then do + rm indexFile + pure False - Left e -> do - err (viaShow e) - here <- doesFileExist dataFile - when here do - let broken = dropExtension dataFile `addExtension` ".broken" - mv dataFile broken - rm indexFile - warn $ red "renamed" <+> pretty dataFile <+> pretty broken + else do - pure False + try @_ @SomeException (ncqFileFastCheck dataFile) >>= \case - Right{} | i > 1 -> pure False + Left e -> do + err (viaShow e) + here <- doesFileExist dataFile + when here do + let broken = dropExtension dataFile `addExtension` ".broken" + mv dataFile broken + rm indexFile + warn $ red "renamed" <+> pretty dataFile <+> pretty broken - Right{} -> do - exists <- doesFileExist indexFile - if exists then do - pure True - else do - debug $ "indexing" <+> pretty (toFileName fo) - r <- ncqIndexFile me fo - debug $ "indexed" <+> pretty r - next (succ i) + pure False + + Right{} | i > 1 -> pure False + + Right{} -> do + exists <- doesFileExist indexFile + if exists then do + pure True + else do + debug $ "indexing" <+> pretty (toFileName fo) + r <- ncqIndexFile me fo + debug $ "indexed" <+> pretty r + next (succ i) results <- forM state checkFile pure $ if and results then Just state else Nothing @@ -836,92 +999,126 @@ ncqStateUseSTM NCQStorage2{..} = do ncqStateUnuseSTM :: NCQStorage2 -> STM () ncqStateUnuseSTM NCQStorage2{..} = do k <- readTVar ncqStateVersion <&> fromIntegral + -- TODO: remove when n <= 0 modifyTVar' ncqStateUsage (IntMap.update (Just . over _1 pred) k) ncqStateUpdate :: MonadUnliftIO m => NCQStorage2 -> [StateOP] -> m Bool -ncqStateUpdate me@NCQStorage2{..} ops' = flip runContT pure $ callCC \exit -> do - t1 <- fromIntegral <$> liftIO getTimeCoarse +ncqStateUpdate me@NCQStorage2{..} ops' = withSem ncqStateSem $ flip runContT pure $ callCC \exit -> do - keys0 <- readTVarIO ncqTrackedFiles <&> HPSQ.keys + debug $ "ncqStateUpdate" <+> viaShow ops' - ops <- for ops' $ \case - f@(F _ fk) -> do - let datFile = ncqGetFileName me (toFileName $ DataFile fk) + t1 <- FilePrio . Down <$> liftIO getTimeCoarse - e2 <- doesFileExist datFile - - unless e2 do - err $ "ncqStateUpdate invariant fail" <+> pretty datFile - exit False - - ts <- liftIO (getFileStatus datFile) <&> - posixToTimeSpec . PFS.modificationTimeHiRes - - pure (F ts fk) - - d -> pure d + ops <- checkWithDisk $ \name -> do + err $ "ncqStateUpdate invariant fail" <+> pretty name + exit False changed <- atomically do - t0 <- readTVar ncqStateVersion - let k0 = fromIntegral t0 + current' <- readTVar ncqTrackedFiles <&> V.toList - c <- if List.null ops then do - pure False - else do - writeTVar ncqStateVersion (max (succ t0) t1) - for_ ops $ \case - D fk -> modifyTVar' ncqTrackedFiles (HPSQ.delete fk) - F t fk -> ncqAddTrackedFileSTM me (coerce fk) t - P fk -> do - let onlyIfMissed = \case - Nothing -> ((), Just (FilePrio (Down 0), Just PendingEntry)) - Just (p,v) -> ((), Just (p,v)) + memStateVersionSTM (HS.fromList (fmap tfKey current')) - modifyTVar' ncqTrackedFiles (snd . HPSQ.alter onlyIfMissed fk) + let current = HM.fromList [ (tfKey, e) | e@TrackedFile{..} <- current' ] - pure True + wtf <- flip fix (current, ops) $ \next (s, o) -> case o of + [] -> pure s - old <- readTVar ncqTrackedFiles <&> HS.fromList . HPSQ.keys + (D fk : rest) -> next (HM.delete fk s, rest) - let doAlter = \case - Nothing -> Just (0, old) - Just (u,f) -> Just (u,f) + (P fk : rest) | HM.member fk current -> next (s, rest) + | otherwise -> do + e <- TrackedFile t1 fk <$> newTVar (Just PendingEntry) + next (HM.insert fk e s, rest) - modifyTVar' ncqStateUsage (IntMap.alter doAlter k0) + (F t fk : rest) -> do + case HM.lookup fk s of + Nothing -> do + new <- TrackedFile (FilePrio (Down t)) fk <$> newTVar Nothing + next (HM.insert fk new s, rest) - k1 <- readTVar ncqTrackedFiles <&> HPSQ.keys + Just TrackedFile{..} -> do + pe <- readTVar tfCached + if isNotPending pe then + next (s, rest) + else do + writeTVar tfCached Nothing + next (s, rest) - pure (c && k1 /= keys0) + writeTVar ncqTrackedFiles (V.fromList $ List.sortOn tfTime (HM.elems wtf)) + + pure (HM.keysSet current /= HM.keysSet wtf) + + -- let fks = HS.fromList [ fk | F _ fk <- ops ] + -- tra <- lift $ ncqListTrackedFiles me <&> filter (not . isNotPending . view _2) . V.toList + -- let tra2 = [ k | (k,_,_) <- tra, HS.member k fks ] + + -- unless (List.null tra2) do + -- err $ red "FUCKED" <+> pretty tra2 when changed $ liftIO do name <- ncqDumpCurrentState me atomically $ writeTVar ncqStateName (Just name) + debug $ green "switched state" <+> pretty name + + -- a1 <- V.toList <$> lift (ncqListTrackedFiles me) + + -- let fsz = HS.fromList [ fk | F _ fk <- ops ] + + -- let p1 = [ fk | (fk, Just PendingEntry{}, _) <- a1, HS.member fk fsz ] + + -- unless (List.null p1) do + -- error $ show $ "PIZDA!" <+> pretty p1 <> line <> viaShow ops' pure changed -ncqDumpCurrentState :: MonadUnliftIO m => NCQStorage2 -> m FilePath + where + + memStateVersionSTM currentKeys = do + k0 <- readTVar ncqStateVersion <&> fromIntegral + + let doAlter = \case + Nothing -> Just (0, currentKeys) + Just (u,f) -> Just (u,f) + + modifyTVar' ncqStateUsage (IntMap.alter doAlter k0) + + + checkWithDisk onFail = for ops' $ \case -- + f@(F _ fk) -> do + let datFile = ncqGetFileName me (toFileName $ DataFile fk) + e2 <- doesFileExist datFile + unless e2 (onFail datFile) + + ts <- liftIO (getFileStatus datFile) <&> + posixToTimeSpec . PFS.modificationTimeHiRes + + pure (F ts fk) + + d -> pure d + + +ncqDumpCurrentState :: MonadUnliftIO m => NCQStorage2 -> m StateFile ncqDumpCurrentState me@NCQStorage2{..} = do - keys <- readTVarIO ncqTrackedFiles <&> List.sort . HPSQ.keys + files <- ncqListTrackedFiles me name <- ncqGetNewStateName me - writeBinaryFileDurableAtomic name (BS8.unlines [coerce k | k <- keys]) - pure name + writeBinaryFileDurableAtomic name (BS8.unlines [coerce k | (k,_,_) <- V.toList files]) + pure $ fromString name ncqMergeFull :: forall m . MonadUnliftIO m => NCQStorage2 -> m () ncqMergeFull me = fix \next -> ncqMergeStep me >>= \case False -> none True -> next - -- FIXME: sometime-causes-no-such-file-or-directory ncqMergeStep :: MonadUnliftIO m => NCQStorage2 -> m Bool ncqMergeStep ncq@NCQStorage2{..} = do - withSem ncqMergeSem $ ncqRunTask ncq False $ flip runContT pure $ liftIO do + withSem ncqMergeSem $ ncqRunTask ncq False do debug "ncqMergeStep" - tracked <- readTVarIO ncqTrackedFiles <&> HPSQ.toList + tracked <- ncqListTrackedFiles ncq - files <- for tracked $ \(f,p,e) -> do + files <- for tracked $ \(f,e,_) -> do let fn = ncqGetFileName ncq (toFileName $ DataFile f) let idx = ncqGetFileName ncq (toFileName $ IndexFile f) @@ -929,26 +1126,28 @@ ncqMergeStep ncq@NCQStorage2{..} = do dataHere <- doesFileExist fn sz <- case e of - Just PendingEntry -> pure (-1) + Just PendingEntry -> pure (-100) _ | dataHere -> liftIO (fileSize fn) - | otherwise -> pure (-1) + | otherwise -> pure (-3) idxHere <- doesFileExist idx - pure (f, sz, p, idxHere) + pure (f, sz, idxHere) - let bothOk (_, sz1, _, here1) (_, sz2, _, here2) = + -- debug $ red "MERGE FILES" <+> viaShow files + + let bothOk (_, sz1, here1) (_, sz2, here2) = here1 && here2 && sz1 > 0 && sz2 > 0 && (sz1 + sz2) < fromIntegral ncqMaxLog - let found = flip fix (files, Nothing, Nothing) $ \next -> \case - ([], _, r) -> r + found <- flip fix (V.toList files, Nothing, Nothing) $ \next -> \case + ([], _, r) -> pure r (a:b:rest, Nothing, _) | bothOk a b -> do next (b:rest, Just (view _2 a + view _2 b), Just (a,b)) - (a:b:rest, Just s, _ ) | bothOk a b && view _2 a + view _2 b < s -> do + (a:b:rest, Just s, _ ) | bothOk a b && (view _2 a + view _2 b) < s -> do next (b:rest, Just (view _2 a + view _2 b), Just (a,b)) (_:rest, s, r) -> do @@ -956,7 +1155,9 @@ ncqMergeStep ncq@NCQStorage2{..} = do case found of Just (a,b) -> mergeStep a b >> pure True - _ -> pure False + _ -> do + debug "merge: not found shit" + pure False where @@ -965,17 +1166,13 @@ ncqMergeStep ncq@NCQStorage2{..} = do let (p,tpl) = splitFileName (ncqGetFileName n "merge-.data") liftIO $ emptyTempFile p tpl - mergeStep (a,_,p1,_) (b,_,p2,_) = do + mergeStep (a,_,_) (b,_,_) = do debug $ "merge" <+> pretty a <+> pretty b let fDataNameA = ncqGetFileName ncq $ toFileName (DataFile a) let fIndexNameA = ncqGetFileName ncq $ toFileName (IndexFile a) let fDataNameB = ncqGetFileName ncq $ toFileName (DataFile b) - let fIndexNameB = ncqGetFileName ncq $ toFileName (IndexFile b) - - debug $ "file A" <+> pretty (timeSpecFromFilePrio p1) <+> pretty fDataNameA <+> pretty fIndexNameA - debug $ "file B" <+> pretty (timeSpecFromFilePrio p2) <+> pretty fDataNameB <+> pretty fIndexNameB doesFileExist fDataNameA `orFail` ("not exist" <+> pretty fDataNameA) doesFileExist fDataNameB `orFail` ("not exist" <+> pretty fDataNameB) @@ -997,14 +1194,16 @@ ncqMergeStep ncq@NCQStorage2{..} = do debug $ "SCAN FILE A" <+> pretty fDataNameA - writeFiltered ncq fDataNameA fwh $ \_ _ _ _ -> do - pure True + writeFiltered ncq fDataNameA fwh $ \_ _ _ v -> do + let meta = Just M == ncqIsMeta v + pure $ not meta debug $ "SCAN FILE B" <+> pretty fDataNameA - writeFiltered ncq fDataNameB fwh $ \_ _ k _ -> do + writeFiltered ncq fDataNameB fwh $ \_ _ k v -> do + let meta = Just M == ncqIsMeta v foundInA <- liftIO (nwayHashLookup nway mmIdx (coerce k)) <&> isJust - let skip = foundInA + let skip = foundInA || meta pure $ not skip appendTailSection =<< handleToFd fwh @@ -1025,12 +1224,6 @@ ncqMergeStep ncq@NCQStorage2{..} = do void $ ncqIndexFile ncq fk pure $ Just (ts,fk) - atomically do - modifyTVar ncqTrackedFiles (HPSQ.delete a) - modifyTVar ncqTrackedFiles (HPSQ.delete b) - for_ idx $ \(ts,fk) -> do - ncqAddTrackedFileSTM ncq (coerce fk) (posixToTimeSpec ts) - for_ idx $ \(ts,DataFile fk) -> do void $ ncqStateUpdate ncq [D a, D b, F (posixToTimeSpec ts) fk] @@ -1039,59 +1232,52 @@ ncqMergeStep ncq@NCQStorage2{..} = do unless r (throwIO (NCQMergeInvariantFailed (show e))) -ncqCompact :: MonadUnliftIO m => NCQStorage2 -> m () -ncqCompact ncq@NCQStorage2{..} = do +-- ncqCompact :: MonadUnliftIO m => NCQStorage2 -> m () +-- ncqCompact ncq@NCQStorage2{..} = withSem ncqMergeSem do - q <- newTVarIO ( mempty :: HashMap FileKey (HashSet HashRef) ) +-- q <- newTVarIO ( mempty :: HashMap FileKey (HashSet HashRef) ) - ncqLinearScanForCompact ncq $ \fk h -> atomically do - modifyTVar q (HM.insertWith (<>) fk (HS.singleton h)) +-- ncqLinearScanForCompact ncq $ \fk h -> atomically do +-- modifyTVar q (HM.insertWith (<>) fk (HS.singleton h)) - state0 <- readTVarIO q +-- state0 <- readTVarIO q - for_ (HM.toList state0) $ \(fk, es) -> do - trace $ "TO DELETE" <+> pretty fk <+> pretty (HS.size es) +-- for_ (HM.toList state0) $ \(fk, es) -> do +-- trace $ "TO DELETE" <+> pretty fk <+> pretty (HS.size es) - let fDataNameA = ncqGetFileName ncq (toFileName $ DataFile fk) - let fIndexNameA = ncqGetFileName ncq (toFileName (IndexFile fk)) +-- let fDataNameA = ncqGetFileName ncq (toFileName $ DataFile fk) - flip runContT pure do +-- flip runContT pure do - mfile <- ncqGetNewCompactName ncq +-- mfile <- ncqGetNewCompactName ncq - ContT $ bracket none $ const do - rm mfile +-- ContT $ bracket none $ const $ rm mfile - liftIO do - withBinaryFileAtomic mfile WriteMode $ \fwh -> do - writeFiltered ncq fDataNameA fwh $ \_ _ k v -> do - pure $ not $ HS.member k es - appendTailSection =<< handleToFd fwh +-- liftIO do +-- withBinaryFileAtomic mfile WriteMode $ \fwh -> do +-- writeFiltered ncq fDataNameA fwh $ \_ _ k v -> do +-- pure $ not $ HS.member k es +-- appendTailSection =<< handleToFd fwh - result <- fileSize mfile +-- result <- fileSize mfile - if result == 0 then do - atomically $ modifyTVar ncqTrackedFiles (HPSQ.delete fk) - else do +-- if result == 0 then do +-- atomically $ modifyTVar ncqTrackedFiles (HPSQ.delete fk) +-- else do - fossil <- ncqGetNewFossilName ncq - mv mfile fossil +-- fossil <- ncqGetNewFossilName ncq +-- mv mfile fossil - statA <- getFileStatus fDataNameA +-- statA <- getFileStatus fDataNameA - let ts = modificationTimeHiRes statA - setFileTimesHiRes fossil ts ts +-- let ts = modificationTimeHiRes statA +-- setFileTimesHiRes fossil ts ts - fname <- ncqIndexFile ncq (DataFile (fromString fossil)) +-- void $ ncqIndexFile ncq (DataFile (fromString fossil)) +-- void $ ncqStateUpdate ncq [F (posixToTimeSpec ts) (fromString fossil)] - atomically do - let fp = fromString fname - modifyTVar ncqTrackedFiles (HPSQ.delete fk) - ncqAddTrackedFileSTM ncq fp (posixToTimeSpec ts) - mapM_ rm [fDataNameA, fIndexNameA] - - debug $ "compact done" <+> pretty (HM.size state0) +-- debug $ "compact done" <+> pretty (HM.size state0) -- NOTE: incremental @@ -1107,8 +1293,9 @@ ncqLinearScanForCompact :: MonadUnliftIO m -> m Int ncqLinearScanForCompact ncq@NCQStorage2{..} action = flip runContT pure do + ContT $ useVersion ncq - tracked <- readTVarIO ncqTrackedFiles <&> HPSQ.toList + tracked <- readTVarIO ncqTrackedFiles <&> V.toList let state0 = mempty :: HashMap HashRef TimeSpec @@ -1119,53 +1306,73 @@ ncqLinearScanForCompact ncq@NCQStorage2{..} action = flip runContT pure do flip fix (tracked, state0) $ \next -> \case ([], s) -> none - ((fk,p,_):rest, state) -> do + ((TrackedFile{..}):rest, state) -> do + e <- readTVarIO tfCached - let cqFile = ncqGetFileName ncq (toFileName (IndexFile fk)) - let dataFile = ncqGetFileName ncq (toFileName (DataFile fk)) + let cqFile = ncqGetFileName ncq (toFileName (IndexFile tfKey)) + let dataFile = ncqGetFileName ncq (toFileName (DataFile tfKey)) - (mmaped,meta@NWayHash{..}) <- liftIO $ nwayHashMMapReadOnly cqFile - >>= orThrow (NWayHashInvalidMetaData cqFile) + c <- doesFileExist cqFile + d <- doesFileExist dataFile + let pending = not (isNotPending e) - let emptyKey = BS.replicate nwayKeySize 0 + if not c || not d || pending then + next (rest, state) + else do - found <- S.toList_ do - nwayHashScanAll meta mmaped $ \o k entryBs -> do - unless (k == emptyKey) do - let off = N.word64 (BS.take 8 entryBs) - let sz = N.word32 (BS.take 4 (BS.drop 8 entryBs)) + (mmaped,meta@NWayHash{..}) <- liftIO $ nwayHashMMapReadOnly cqFile + >>= orThrow (NWayHashInvalidMetaData cqFile) - when (sz == ncqPrefixLen || sz == ncqPrefixLen + 32) do + notice $ "SCAN" <+> pretty cqFile + + let emptyKey = BS.replicate nwayKeySize 0 + + found <- S.toList_ do + nwayHashScanAll meta mmaped $ \o k entryBs -> do + unless (k == emptyKey) do + + let off = N.word64 (BS.take 8 entryBs) + let sz = N.word32 (BS.take 4 (BS.drop 8 entryBs)) + + -- debug $ "SCAN SHIT" <+> pretty tfKey <+> pretty off <+> pretty sz + + -- fast-n-dirty-check-for-deleted + when (sz <= ncqSLen + ncqKeyLen + ncqPrefixLen ) do + debug $ red "FOUND EMPTY RECORD" <+> pretty sz S.yield off - let kk = coerce k + let kk = coerce k - case HM.lookup kk state of - Just ts | ts > timeSpecFromFilePrio p -> do - notice $ pretty kk <+> pretty (sz + ncqSLen) - atomically do - modifyTVar profit ( + (sz + ncqSLen) ) - modifyTVar tombUse (HM.adjust (over _2 succ) kk) - lift $ lift $ action (fromString dataFile) kk + case HM.lookup kk state of + Just ts | ts > timeSpecFromFilePrio tfTime -> do + notice $ pretty kk <+> pretty (sz + ncqSLen) + atomically do + modifyTVar profit ( + (sz + ncqSLen) ) + modifyTVar tombUse (HM.adjust (over _2 succ) kk) + lift $ lift $ action (fromString dataFile) kk - _ -> none + _ -> none - newEntries <- S.toList_ do - unless (List.null found) do - dataBs <- liftIO $ mmapFileByteString dataFile Nothing - for_ found $ \o -> do - let pre = BS.take (fromIntegral ncqPrefixLen) (BS.drop (ncqDataOffset o) dataBs) + notice "SURVIVED 2" - when (pre == ncqRefPrefix || pre == ncqTombPrefix) do - let keyBs = BS.take ncqKeyLen (BS.drop (fromIntegral o + ncqSLen) dataBs) - let key = coerce (BS.copy keyBs) - unless (HM.member key state) do - S.yield (key, timeSpecFromFilePrio p) - when ( pre == ncqTombPrefix ) do - atomically $ modifyTVar tombUse (HM.insert key (fk,0)) + newEntries <- S.toList_ do + unless (List.null found) do + notice $ red "TRY" <+> pretty dataFile + dataBs <- liftIO $ mmapFileByteString dataFile Nothing + notice "SURVIVED 3" + for_ found $ \o -> do + let pre = BS.take (fromIntegral ncqPrefixLen) (BS.drop (ncqDataOffset o) dataBs) - next (rest, state <> HM.fromList newEntries) + when (pre == ncqRefPrefix || pre == ncqTombPrefix) do + let keyBs = BS.take ncqKeyLen (BS.drop (fromIntegral o + ncqSLen) dataBs) + let key = coerce (BS.copy keyBs) + unless (HM.member key state) do + S.yield (key, timeSpecFromFilePrio tfTime) + when ( pre == ncqTombPrefix ) do + atomically $ modifyTVar tombUse (HM.insert key (tfKey,0)) + + next (rest, state <> HM.fromList newEntries) use <- readTVarIO tombUse let useless = [ (f,h) | (h, (f,n)) <- HM.toList use, n == 0 ] @@ -1174,44 +1381,46 @@ ncqLinearScanForCompact ncq@NCQStorage2{..} action = flip runContT pure do atomically $ modifyTVar profit (+ncqFullTombLen) lift $ action f h + notice "SURVIVED 3" + readTVarIO profit <&> fromIntegral -ncqReadStateKeys :: forall m . MonadUnliftIO m => NCQStorage2 -> FilePath -> m [FileKey] +ncqReadStateKeys :: forall m . MonadUnliftIO m => NCQStorage2 -> StateFile -> m [FileKey] ncqReadStateKeys me path = liftIO do - keys <- BS8.readFile (ncqGetFileName me path) + keys <- BS8.readFile (ncqGetFileName me (toFileName path)) <&> filter (not . BS8.null) . BS8.lines pure $ fmap (coerce @_ @FileKey) keys ncqSweepFossils :: forall m . MonadUnliftIO m => NCQStorage2 -> m () -ncqSweepFossils me@NCQStorage2{..} = withSem ncqMergeSem do +ncqSweepFossils me@NCQStorage2{..} = withSem ncqSweepSem do debug $ yellow "sweep fossils" -- better be safe than sorry current <- readTVarIO ncqCurrentFiles - -- >>= mapM (try @_ @IOException . ncqReadStateKeys me) - mentioned <- ncqListStateFiles me <&> fmap snd - >>= mapM (ncqReadStateKeys me) - <&> HS.fromList . mconcat - - sfs <- ncqListStateFiles me <&> fmap snd + sfs <- ncqListStateFiles me debug $ "STATE FILES" <+> vcat (fmap pretty sfs) - active <- readTVarIO ncqTrackedFiles <&> HS.fromList . HPSQ.keys + mentioned <- mapM (safeRead . ncqReadStateKeys @m me) (fmap snd sfs) + <&> HS.fromList . mconcat - used' <- readTVarIO ncqStateUsage <&> IntMap.elems + kicked' <- ncqListDirFossils me <&> fmap fromString - let used = current - <> active - <> mentioned - <> HS.unions [ keys | (n, keys) <- used', n > 0 ] + (kicked, used) <- atomically do - kicked <- ncqListTrackedFiles me - <&> fmap (fromString @FileKey) - <&> filter (\x -> not (HS.member x used)) - <&> HS.toList . HS.fromList + active <- ncqListTrackedFilesSTM me <&> HS.fromList . fmap (view _1) . V.toList + + used' <- readTVar ncqStateUsage <&> IntMap.elems + + let used = current + <> active + <> mentioned + <> HS.unions [ keys | (n, keys) <- used', n > 0 ] + + let k = filter (\x -> not (HS.member x used)) kicked' + pure (k,HS.fromList $ HS.toList used) debug $ "KICK" <+> vcat (fmap pretty kicked) @@ -1222,13 +1431,22 @@ ncqSweepFossils me@NCQStorage2{..} = withSem ncqMergeSem do rm (ncqGetFileName me (toFileName (IndexFile fo))) rm (ncqGetFileName me (toFileName (DataFile fo))) + where + safeRead m = try @_ @IOException m >>= \case + Right x -> pure x + Left e -> err ("ncqSweepFossils" <+> viaShow e) >> pure mempty + ncqSweepStates :: MonadUnliftIO m => NCQStorage2 -> m () -ncqSweepStates me@NCQStorage2{..} = withSem ncqMergeSem $ flip runContT pure do +ncqSweepStates me@NCQStorage2{..} = withSem ncqSweepSem $ flip runContT pure do + debug $ yellow "remove unused states" + current' <- readTVarIO ncqStateName current <- ContT $ for_ current' + debug $ yellow "CURRENT STATE NAME" <+> pretty current + states <- ncqListStateFiles me <&> fmap snd flip fix (Left states) $ \next -> \case @@ -1239,9 +1457,11 @@ ncqSweepStates me@NCQStorage2{..} = withSem ncqMergeSem $ flip runContT pure do Right (x:xs) -> do debug $ "Remove obsolete state" <+> pretty x - rm (ncqGetFileName me x) + rm (ncqGetFileName me (toFileName x)) next (Right xs) +ncqSetOnRunWriteIdle :: MonadUnliftIO m => NCQStorage2 -> IO () -> m () +ncqSetOnRunWriteIdle NCQStorage2{..} io = atomically (writeTVar ncqOnRunWriteIdle io) writeFiltered :: forall m . MonadIO m => NCQStorage2 @@ -1254,8 +1474,8 @@ writeFiltered ncq fn out filt = do ncqStorageScanDataFile ncq fn $ \o s k v -> do skip <- filt o s k v <&> not - when skip do - debug $ pretty k <+> pretty "skipped" + -- when skip do + -- debug $ pretty k <+> pretty "skipped" unless skip $ liftIO do BS.hPut out (LBS.toStrict (makeEntryLBS k v)) @@ -1318,3 +1538,9 @@ withSem sem m = bracket enter leave (const m) where enter = atomically (waitTSem sem) leave = const $ atomically (signalTSem sem) +isNotPending :: Maybe CachedEntry -> Bool +isNotPending = \case + Just (PendingEntry {}) -> False + _ -> True + + diff --git a/hbs2-storage-ncq/tests/benchmarks/test-ncq2-concurrent1.log b/hbs2-storage-ncq/tests/benchmarks/test-ncq2-concurrent1.log new file mode 100644 index 00000000..ffa25e98 --- /dev/null +++ b/hbs2-storage-ncq/tests/benchmarks/test-ncq2-concurrent1.log @@ -0,0 +1,35 @@ +[dmz@serenity:~/w/hbs2]$ test-ncq test:root temp and debug off and test:ncq2:concurrent1 32 10000 +baseline 10000 3.852 1552.57 403.00 +1 3.74 1566.85 418.61 +2 2.22 1570.29 705.12 +3 1.82 1563.56 856.75 +4 1.69 1571.03 927.41 +5 1.68 1567.80 927.70 +6 1.63 1562.91 953.00 +7 1.64 1559.46 949.74 +8 1.62 1556.07 958.77 +9 1.59 1562.96 980.53 +10 1.62 1565.51 965.18 +11 1.59 1557.23 973.27 +12 1.59 1564.51 980.27 +13 1.62 1563.44 959.17 +14 1.61 1566.02 967.28 +15 1.61 1566.78 967.15 +16 1.65 1572.14 951.66 +17 1.63 1558.96 951.75 +18 1.63 1561.53 952.73 +19 1.63 1557.93 951.70 +20 1.60 1552.89 969.95 +21 1.62 1562.02 961.25 +22 1.61 1567.37 968.11 +23 1.60 1565.27 972.22 +24 1.62 1568.21 962.68 +25 1.60 1556.52 967.39 +26 1.62 1555.00 958.10 +27 1.64 1573.31 953.53 +28 1.63 1557.48 952.59 +29 1.66 1560.38 938.29 +30 1.62 1561.35 960.83 +31 1.63 1563.76 954.68 +32 1.61 1562.60 966.96 + diff --git a/hbs2-tests/test/TCQ.hs b/hbs2-tests/test/TCQ.hs index 4c11dcd2..212ca50a 100644 --- a/hbs2-tests/test/TCQ.hs +++ b/hbs2-tests/test/TCQ.hs @@ -37,6 +37,9 @@ import Data.Config.Suckless.Script.File as SF import DBPipe.SQLite hiding (field) +import System.Random.MWC as MWC + +import System.IO.Temp as Temp import Data.Bits import Data.ByteString (ByteString) import Data.ByteString qualified as BS @@ -69,12 +72,14 @@ import System.IO.MMap import System.IO qualified as IO import System.Exit (exitSuccess, exitFailure) import System.Random +import System.Random.Stateful import Safe import Lens.Micro.Platform import Control.Concurrent.STM qualified as STM import Data.Hashable import UnliftIO +import UnliftIO.Async import Text.InterpolatedString.Perl6 (qc) @@ -450,6 +455,82 @@ main = do e -> throwIO $ BadFormException @C (mkList e) + + entry $ bindMatch "sqlite:nwrite" $ nil_ \case + [ LitIntVal tn', LitIntVal n ] -> lift do + + let tn = fromIntegral tn' + let num = fromIntegral n + + + g <- liftIO MWC.createSystemRandom + + for_ [1..tn] $ \tnn -> flip runContT pure do + + let fnv = num `quot` tnn + + mkdir "temp" + + let tmp = "temp" + + -- tmp <- ContT $ Temp.withTempDirectory "temp" "nwrite" + + dbf <- liftIO $ Temp.emptyTempFile tmp "nwrite-.db" + + db <- newDBPipeEnv dbPipeOptsDef dbf + + pipe <- ContT $ withAsync (runPipe db) + + tw <- newTVarIO 0 + + withDB db do + ddl "create table if not exists block (hash blob not null primary key, value blob)" + commitAll + + withDB db do + ddl [qc| + pragma journal_mode=WAL; + pragma synchronous=normal; + |] + + + t0 <- getTimeCoarse + + ss <- replicateM num $ liftIO $ MWC.uniformRM (64*1024, 256*1024) g + + liftIO $ pooledForConcurrentlyN_ tnn ss $ \size -> do + lbs <- uniformByteStringM size g <&> LBS.fromStrict + + let ha = hashObject @HbSync lbs + + let sql = [qc|insert into block (hash, value) values(?,?) on conflict (hash) do nothing |] + + withDB db do + insert sql (coerce @_ @ByteString ha, lbs) + atomically $ modifyTVar tw (+ (32 + size)) + + withDB db do + commitAll + + w <- readTVarIO tw + t1 <- getTimeCoarse + + let t = realToFrac (toNanoSecs (t1 - t0)) / 1e9 + let tsec = realToFrac @_ @(Fixed E2) t + + let total = realToFrac w + + let speed = if t > 0 then total / t else 0 + let totMegs = realToFrac @_ @(Fixed E2) $ total / (1024**2) + let speedMbs = realToFrac @_ @(Fixed E2) $ speed / (1024**2) + + notice $ pretty tnn <+> pretty (tsec) <+> pretty totMegs <+> pretty speedMbs + + none + + e -> throwIO $ BadFormException @C (mkList e) + + entry $ bindMatch "sqlite:merkle:write" $ nil_ \case [ StringLike dbf, StringLike fname ] -> lift do db <- newDBPipeEnv dbPipeOptsDef dbf diff --git a/hbs2-tests/test/TestNCQ.hs b/hbs2-tests/test/TestNCQ.hs index b485c86e..dabe1a17 100644 --- a/hbs2-tests/test/TestNCQ.hs +++ b/hbs2-tests/test/TestNCQ.hs @@ -10,6 +10,7 @@ import HBS2.Prelude.Plated import HBS2.OrDie import HBS2.Hash import HBS2.Data.Types.Refs +import HBS2.Misc.PrettyStuff import HBS2.Clock import HBS2.Merkle import HBS2.Polling @@ -20,6 +21,7 @@ import HBS2.Storage.Operations.ByteString import HBS2.System.Logger.Simple.ANSI +import HBS2.Data.Log.Structured.SD import HBS2.Storage.NCQ import HBS2.Storage.NCQ2 as N2 import HBS2.Data.Log.Structured.NCQ @@ -32,6 +34,9 @@ import Data.Config.Suckless.System import DBPipe.SQLite hiding (field) +import System.Posix.Files qualified as PFS +import Numeric (showHex) +import Data.Ord (Down(..)) import Data.Char import Data.Bits import Data.ByteString (ByteString) @@ -590,48 +595,232 @@ testNCQConcurrent1 noRead tn n TestEnv{..} = flip runContT pure do rm ncqDir -testNCQ2Simple1 :: MonadUnliftIO m - => TestEnv +testNCQ2Sweep1 :: forall c m . (MonadUnliftIO m, IsContext c) + => [Syntax c] + -> TestEnv -> m () -testNCQ2Simple1 TestEnv{..} = do - debug "testNCQ2Simple1" +testNCQ2Sweep1 syn TestEnv{..} = do + debug $ "testNCQ2Sweep1" <+> pretty syn let tmp = testEnvDir let ncqDir = tmp q <- newTQueueIO g <- liftIO MWC.createSystemRandom - bz <- replicateM 100000 $ liftIO do + let (opts, argz) = splitOpts [] syn + + let n = headDef 100000 [ fromIntegral x | LitIntVal x <- argz ] + + bz <- replicateM n $ liftIO do n <- (`mod` (256*1024)) <$> uniformM @Int g uniformByteStringM n g + notice $ "generate" <+> pretty n <+> "blocks" + ncqWithStorage ncqDir $ \sto -> liftIO do for bz $ \z -> do h <- ncqPutBS sto (Just B) Nothing z atomically $ writeTQueue q h - found <- ncqLocate2 sto h <&> maybe (-1) ncqEntrySize - assertBool (show $ "found-immediate" <+> pretty h) (found > 0) ncqWithStorage ncqDir $ \sto -> liftIO do - notice "perform merge" + notice $ red "PERFORM MERGE" ncqMergeFull sto + notice $ "full sweep unused states" + ncqWithStorage ncqDir $ \sto -> liftIO do ncqSweepStates sto ncqSweepFossils sto + notice $ "lookup" <+> pretty n <+> "blocks" + ncqWithStorage ncqDir $ \sto -> liftIO do hashes <- atomically (STM.flushTQueue q) for_ hashes $ \ha -> do found <- ncqLocate2 sto ha <&> maybe (-1) ncqEntrySize assertBool (show $ "found" <+> pretty ha) (found > 0) - -- debug $ fill 44 (pretty ha) <+> fill 8 (pretty found) + + + +testNCQ2Sweep2 :: forall c m . (MonadUnliftIO m, IsContext c) + => [Syntax c] + -> TestEnv + -> m () + +testNCQ2Sweep2 syn TestEnv{..} = do + debug $ "testNCQ2Sweep2" <+> pretty syn + let tmp = testEnvDir + let ncqDir = tmp + q <- newTQueueIO + + g <- liftIO MWC.createSystemRandom + + let (opts, argz) = splitOpts [] syn + + let n = headDef 100000 [ fromIntegral x | LitIntVal x <- argz ] + + notice $ "generate" <+> pretty n <+> "blocks" + + bz <- replicateM n $ liftIO do + n <- (`mod` (256*1024)) <$> uniformM @Int g + uniformByteStringM n g + + -- race (pause @'Seconds 260) do + + ncqWithStorage ncqDir $ \sto -> liftIO do + for_ bz $ \z -> do + h <- ncqPutBS sto (Just B) Nothing z + atomically $ writeTQueue q h + + notice "wait some time to see merge+sweep" + pause @'Seconds 240 + + ncqWithStorage ncqDir $ \sto -> liftIO do + hashes <- atomically (STM.flushTQueue q) + for_ hashes $ \ha -> do + found <- ncqLocate2 sto ha <&> maybe (-1) ncqEntrySize + assertBool (show $ "found" <+> pretty ha) (found > 0) + + + +testNCQ2Kill1 :: forall c m . (MonadUnliftIO m, IsContext c) + => [Syntax c] + -> TestEnv + -> m () + +testNCQ2Kill1 syn TestEnv{..} = flip runContT pure do + debug $ "testNCQ2Kill1" <+> pretty syn + let tmp = testEnvDir + let ncqDir = tmp + q <- newTQueueIO + + g <- liftIO MWC.createSystemRandom + + let (opts, argz) = splitOpts [] syn + + let n = headDef 100000 [ fromIntegral x | LitIntVal x <- argz ] + + notice $ "generate" <+> pretty n <+> "blocks" + + bz <- replicateM n $ liftIO do + n <- (`mod` (256*1024)) <$> uniformM @Int g + uniformByteStringM n g + + -- race (pause @'Seconds 260) do + + wIdle <- newEmptyTMVarIO + + ncq1 <- ContT $ withAsync $ ncqWithStorage ncqDir $ \sto -> liftIO do + ncqSetOnRunWriteIdle sto (atomically (putTMVar wIdle ())) + for_ bz $ \z -> do + h <- ncqPutBS sto (Just B) Nothing z + atomically $ writeTQueue q h + pause @'Seconds 300 + + notice $ red "WAIT FUCKING IDLE!" + + atomically $ takeTMVar wIdle + + notice $ red "GOT FUCKING IDLE!" <+> "lets see what happen now" + + cancel ncq1 + + liftIO $ ncqWithStorage ncqDir $ \sto -> liftIO do + hashes <- atomically (STM.flushTQueue q) + for_ hashes $ \ha -> do + found <- ncqLocate2 sto ha <&> maybe (-1) ncqEntrySize + assertBool (show $ "found" <+> pretty ha) (found > 0) + + +testNCQ2Simple1 :: forall c m . (MonadUnliftIO m, IsContext c) + => [Syntax c] + -> TestEnv + -> m () + +testNCQ2Simple1 syn TestEnv{..} = do + debug $ "testNCQ2Simple1" <+> pretty syn + let tmp = testEnvDir + let ncqDir = tmp + q <- newTQueueIO + + g <- liftIO MWC.createSystemRandom + + let (opts, argz) = splitOpts [] syn + + let n = headDef 100000 [ fromIntegral x | LitIntVal x <- argz ] + let l = headDef 5 $ drop 1 [ fromIntegral x | LitIntVal x <- argz ] + let s = headDef (256*1024) $ drop 2 [ fromIntegral (1024 * x) | LitIntVal x <- argz ] + + notice $ "insert" <+> pretty n <+> "random blocks of size" <+> pretty s + + thashes <- newTQueueIO + + ncqWithStorage ncqDir $ \sto -> liftIO do + replicateM_ n do + n <- (`mod` s) <$> uniformM @Int g + z <- uniformByteStringM n g + h <- ncqPutBS sto (Just B) Nothing z + found <- ncqLocate2 sto h <&> maybe (-1) ncqEntrySize + atomically $ writeTQueue q h + assertBool (show $ "found-immediate" <+> pretty h) (found > 0) + atomically $ writeTQueue thashes h + + t0 <- getTimeCoarse + + hs <- atomically $ STM.flushTQueue thashes + + flip fix (t0, List.length hs, hs) $ \loop (tp, num, xs) -> case xs of + [] -> none + (ha:rest) -> do + t1 <- getTimeCoarse + + t2 <- if realToFrac (toNanoSecs (t1 - t0)) / 1e9 < 1.00 then do + pure tp + else do + notice $ green "lookup" <+> pretty num + pure t1 + + found <- ncqLocate2 sto ha <&> maybe (-1) ncqEntrySize + assertBool (show $ "found" <+> pretty ha) (found > 0) + unless (List.null hs) $ loop (t1, pred num, rest) + + hashes <- atomically (STM.flushTQueue q) + + notice $ "merge data" + + ncqWithStorage ncqDir $ \sto -> liftIO do + notice "perform merge" + ncqMergeFull sto + ncqSweepStates sto + ncqSweepFossils sto + + notice $ "full sweep unused states" + + ncqWithStorage ncqDir $ \sto -> liftIO do + ncqSweepStates sto + ncqSweepFossils sto + + notice $ "lookup" <+> pretty n <+> "blocks" + + ncqWithStorage ncqDir $ \sto -> liftIO do + + replicateM_ l do + + -- performMajorGC + + (t1,_) <- timeItT do + + for_ hashes $ \ha -> do + found <- ncqLocate2 sto ha <&> maybe (-1) ncqEntrySize + assertBool (show $ "found" <+> pretty ha) (found > 0) + -- debug $ fill 44 (pretty ha) <+> fill 8 (pretty found) + + notice $ pretty (sec6 t1) <+> "lookup" <+> pretty n <+> "blocks" genRandomBS :: forall g m . (Monad m, StatefulGen g m) => g -> Int -> m ByteString genRandomBS g n = do - n <- (`mod` (64*1024)) <$> uniformM @Int g uniformByteStringM n g sec6 :: RealFrac a => a -> Fixed E6 @@ -832,7 +1021,7 @@ testNCQ2Repair1 TestEnv{..} = do assertBool (show $ "found-immediate" <+> pretty h) (found > 0) ncqWithStorage ncqDir $ \sto -> liftIO do - written <- N2.ncqListTrackedFiles sto + written <- N2.ncqListDirFossils sto debug $ "TRACKED" <+> vcat (fmap pretty written) toDestroy <- pure (headMay written) `orDie` "no file written" @@ -856,6 +1045,44 @@ testNCQ2Repair1 TestEnv{..} = do -- assertBool (show $ "found-immediate" <+> pretty ha) (found > 0) -- debug $ fill 44 (pretty ha) <+> fill 8 (pretty found) + +testWriteNThreads :: forall g m . (MonadUnliftIO m) + => FilePath + -> Int + -> Int + -> m () +testWriteNThreads ncqDir tnn n = do + + g <- liftIO MWC.createSystemRandom + + wtf <- liftIO getPOSIXTime <&> show . round + + t0 <- getTimeCoarse + + w <- ncqWithStorage (ncqDir wtf <> show tnn) $ \sto -> do + ss <- liftIO $ replicateM n $ MWC.uniformRM (64*1024, 256*1024) g + + pooledForConcurrentlyN_ tnn ss $ \len -> do + tbs <- liftIO $ genRandomBS g len + ncqPutBS sto (Just B) Nothing tbs + -- atomically $ modifyTVar' tss (+ len) + + -- 32 bytes per key, 4 per len + pure $ (List.length ss * 36) + sum ss + + t1 <- getTimeCoarse + + let t = realToFrac (toNanoSecs (t1 - t0)) / 1e9 + + let total = realToFrac w + + let speed = if t > 0 then total / t else 0 + let totMegs = realToFrac @_ @(Fixed E2) $ total / (1024**2) + let speedMbs = realToFrac @_ @(Fixed E2) $ speed / (1024**2) + + notice $ pretty tnn <+> pretty (sec2 t) <+> pretty totMegs <+> pretty speedMbs + + testNCQ2Concurrent1 :: MonadUnliftIO m => Bool -> Int @@ -867,7 +1094,7 @@ testNCQ2Concurrent1 noRead tn n TestEnv{..} = flip runContT pure do let tmp = testEnvDir let inputDir = tmp "input" - let ncqDir = tmp "ncq-test-data" + let ncqDir = tmp "ncq" debug "preparing" @@ -875,44 +1102,45 @@ testNCQ2Concurrent1 noRead tn n TestEnv{..} = flip runContT pure do debug $ pretty inputDir - filez <- liftIO $ pooledReplicateConcurrentlyN 8 n $ do - size <- randomRIO (64*1024, 256*1024) - w <- liftIO (randomIO :: IO Word8) - let tbs = BS.replicate size w -- replicateM size w <&> BS.pack - let ha = hashObject @HbSync tbs -- & show . pretty - let fn = inputDir show (pretty ha) - liftIO $ BS.writeFile fn tbs - pure (fn, ha, BS.length tbs) + g <- liftIO MWC.createSystemRandom - debug "done" + log <- liftIO $ Temp.emptyTempFile inputDir "log-.bin" - let fnv = V.fromList filez - let ssz = sum [ s | (_,_,s) <- filez ] & realToFrac + (t0,size) <- timeItT do + liftIO $ withFile log IO.AppendMode $ \hlog -> do + replicateM_ n do + size <- MWC.uniformRM (64*1024, 256*1024) g + tbs <- genRandomBS g size + let ha = hashObject @HbSync tbs + let ss = coerce ha <> tbs + let bssize = N.bytestring32 (fromIntegral $ BS.length ss) + BS.hPut hlog (bssize <> ss) + getFileSize log - notice "NO SHIT" - -- setLoggingOff @DEBUG + let mbps = realToFrac size / (1024**2) + let v0 = mbps / t0 + notice $ "baseline" <+> pretty n + <+> pretty (sec3 t0) + <+> pretty (realToFrac @_ @(Fixed E2) mbps) + <+> pretty (sec2 v0) - for_ [1..tn] $ \tnn -> do - ncq1 <- ncqStorageOpen2 ncqDir (\x -> x { ncqFsync = 64^(1024^2) } ) - w <- ContT $ withAsync (ncqStorageRun2 ncq1) + for_ [1..tn] $ \tnn -> liftIO do + testWriteNThreads ncqDir tnn n - (t,_) <- timeItT $ liftIO do - pooledForConcurrentlyN_ tnn fnv $ \(n,ha,_) -> do - co <- BS.readFile n - ncqPutBS ncq1 (Just B) Nothing co - ncqStorageStop2 ncq1 - performMajorGC - wait w - rm ncqDir - - let tt = realToFrac @_ @(Fixed E2) t - let speed = ((ssz / (1024 **2)) / t) & realToFrac @_ @(Fixed E2) - notice $ pretty tnn <+> pretty tt <+> pretty speed +testNCQ2Concurrent2 :: MonadUnliftIO m + => Int -- ^ threads + -> Int -- ^ times + -> Int -- ^ blocks + -> TestEnv + -> m () +testNCQ2Concurrent2 tn times n TestEnv{..} = flip runContT pure do + replicateM_ times do + lift $ testWriteNThreads testEnvDir tn n testNCQ2ConcurrentWriteSimple1 :: MonadUnliftIO m => Int @@ -1061,6 +1289,13 @@ main = do e -> throwIO $ BadFormException @C (mkList e) + entry $ bindMatch "test:ncq:concurrent2" $ nil_ $ \case + [ LitIntVal tn, LitIntVal times, LitIntVal n ] -> do + debug $ "ncq:concurrent2" <+> pretty tn <+> pretty n + runTest $ testNCQ2Concurrent2 (fromIntegral tn) (fromIntegral times) (fromIntegral n) + + e -> throwIO $ BadFormException @C (mkList e) + entry $ bindMatch "test:ncq:concurrent1:wo" $ nil_ $ \case [ LitIntVal tn, LitIntVal n ] -> do debug $ "ncq:concurrent1" <+> pretty tn <+> pretty n @@ -1109,8 +1344,17 @@ main = do e -> throwIO $ BadFormException @C (mkList e) - entry $ bindMatch "test:ncq2:simple1" $ nil_ $ const $ do - runTest testNCQ2Simple1 + entry $ bindMatch "test:ncq2:simple1" $ nil_ $ \e -> do + runTest (testNCQ2Simple1 e) + + entry $ bindMatch "test:ncq2:sweep1" $ nil_ $ \e -> do + runTest (testNCQ2Sweep1 e) + + entry $ bindMatch "test:ncq2:kill1" $ nil_ $ \e -> do + runTest (testNCQ2Kill1 e) + + entry $ bindMatch "test:ncq2:sweep2" $ nil_ $ \e -> do + runTest (testNCQ2Sweep2 e) entry $ bindMatch "test:ncq2:repair1" $ nil_ $ const $ do runTest testNCQ2Repair1 @@ -1121,6 +1365,147 @@ main = do e -> throwIO $ BadFormException @C (mkList e) + + entry $ bindMatch "test:ncq2:wtf1" $ nil_ $ const do + runTest $ \TestEnv{..} -> do + let dir = testEnvDir + r1 <- ncqWithStorage dir $ \sto -> do + h <- ncqPutBS sto (Just B) Nothing "JOPAKITAPECHENTRESKI" + loc <- ncqLocate2 sto h `orDie` "not found shit" + let re@(k,r) = ncqEntryUnwrap sto $ ncqGetEntryBS sto loc + notice $ pretty "MEM" <+> pretty (ncqEntrySize loc) <+> pretty (coerce @_ @HashRef k) <+> viaShow r + pure re + + ncqWithStorage dir $ \sto -> do + let (k,v) = r1 + loc <- ncqLocate2 sto (coerce k) `orDie` "not found shit" + let s0 = ncqGetEntryBS sto loc + let (k1,r1) = ncqEntryUnwrap sto s0 + notice $ "FOSSIL:" <+> pretty (ncqEntrySize loc) <+> pretty (coerce @_ @HashRef k1) <+> viaShow r1 + assertBool "written-same" (r1 == v && k == k1) + + + + entry $ bindMatch "test:ncq2:scan-index" $ nil_ \case + [ StringLike dir, HashLike item ] -> do + notice $ "SCAN DIR" <+> pretty dir <+> pretty item + + ncqWithStorage dir $ \sto@NCQStorage2{..} -> do + + -- let d = N2.ncqGetFileName sto "" + + -- files <- dirFiles d <&> List.filter (List.isSuffixOf ".cq") + + -- files <- N2.ncqListTrackedFiles sto + + tracked <- N2.ncqListTrackedFiles sto + + for_ tracked $ \(k,_,_) -> do + + let indexFile = N2.ncqGetFileName sto (toFileName (IndexFile k)) + + (idxBs, idxNway) <- liftIO (nwayHashMMapReadOnly indexFile) + >>= orThrow (NCQStorageCantMapFile indexFile) + + + notice $ "scan file" <+> pretty indexFile + + stat <- liftIO $ PFS.getFileStatus indexFile + -- -- FIXME: maybe-creation-time-actually + let ts = posixToTimeSpec $ PFS.modificationTimeHiRes stat + + nwayHashScanAll idxNway idxBs $ \_ k v -> do + when (coerce k == item ) do + + let off = fromIntegral $ N.word64 (BS.take 8 v) + let size = fromIntegral $ N.word32 (BS.take 4 (BS.drop 8 v)) + + notice $ yellow "found" + <+> pretty (fromString @FileKey indexFile) + <+> pretty (fromIntegral @_ @Word64 ts) + <+> pretty (off,size,item) + <+> pretty (foldMap (`showHex` "") (BS.unpack v) ) + + -- datBs <- liftIO $ mmapFileByteString dataFile Nothing + + + none + + e -> throwIO (BadFormException (mkList e)) + + entry $ bindMatch "test:ncq2:del1" $ nil_ $ \syn -> do + + runTest $ \TestEnv{..} -> do + g <- liftIO MWC.createSystemRandom + let dir = testEnvDir + + let (_, argz) = splitOpts [] syn + let n = headDef 10000 [ fromIntegral x | LitIntVal x <- argz ] + + thashes <- newTVarIO mempty + + ncqWithStorage dir $ \sto@NCQStorage2{..} -> do + + notice $ "write+immediate delete" <+> pretty n <+> "records" + + hashes <- replicateM n do + + h <- ncqPutBS sto (Just B) Nothing =<< genRandomBS g (64*1024) + ncqDelEntry sto h + + t <- (ncqLocate2 sto h <&> fmap (N2.ncqIsTomb sto)) + >>= orThrowUser ("missed" <+> pretty h) + + assertBool "tomb/1" t + + pure h + + + pause @'Seconds 5 + + atomically $ writeTVar thashes (HS.fromList hashes) + + flip runContT pure $ callCC \exit -> do + + for_ hashes $ \h -> do + l <- lift (ncqLocate2 sto h) + >>= orThrowUser ("missed" <+> pretty h) + + unless (N2.ncqIsTomb sto l) do + let (k,e') = ncqEntryUnwrap sto (ncqGetEntryBS sto l) + + e <- orThrowUser "bad entry" e' + err $ pretty l + err $ "WTF?" <+> pretty (coerce @_ @HashRef k) <+> pretty h <+> viaShow (fst e) + lfs <- readTVarIO ncqTrackedFiles + + for_ lfs $ \TrackedFile{..} -> do + npe <- readTVarIO tfCached <&> isNotPending + err $ "FILE" <+> pretty npe <+> pretty tfKey + + exit () + + ncqWithStorage dir $ \sto -> do + -- notice "check deleted" + hashes <- readTVarIO thashes + + for_ hashes $ \h -> do + + ncqLocate2 sto h >>= \case + Nothing -> notice $ "not-found" <+> pretty h + Just loc -> do + + what <- (ncqLocate2 sto h <&> fmap (ncqGetEntryBS sto)) + >>= orThrowUser "NOT FOUND" + + let (k,wtf) = ncqEntryUnwrap sto what + let tomb = N2.ncqIsTomb sto loc + + -- debug $ pretty (coerce @_ @HashRef k) <+> viaShow wtf <+> pretty tomb + + assertBool (show $ "tomb/3" <+> pretty h) tomb + + entry $ bindMatch "test:ncq2:concurrent:write:simple1" $ nil_ $ \case [ LitIntVal tn, LitIntVal n ] -> do runTest $ testNCQ2ConcurrentWriteSimple1 ( fromIntegral tn) (fromIntegral n) @@ -1149,6 +1534,7 @@ main = do [ LitIntVal n ] -> runTest $ testFilterEmulate1 True (fromIntegral n) e -> throwIO $ BadFormException @C (mkList e) + hidden do internalEntries entry $ bindMatch "#!" $ nil_ $ const none @@ -1166,4 +1552,3 @@ main = do `finally` flushLoggers -