diff --git a/hbs2-storage-ncq/hbs2-storage-ncq.cabal b/hbs2-storage-ncq/hbs2-storage-ncq.cabal index 8ca9561c..9d59aaae 100644 --- a/hbs2-storage-ncq/hbs2-storage-ncq.cabal +++ b/hbs2-storage-ncq/hbs2-storage-ncq.cabal @@ -73,10 +73,6 @@ library HBS2.Storage.NCQ3.Internal.Files HBS2.Storage.NCQ3.Internal.Fossil HBS2.Storage.NCQ - HBS2.Storage.NCQ2 - HBS2.Storage.NCQ2.Internal - HBS2.Storage.NCQ2.Internal.Types - HBS2.Storage.NCQ2.Internal.Probes HBS2.Storage.NCQ.Types -- other-modules: -- other-extensions: diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs deleted file mode 100644 index 2c0d73ee..00000000 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs +++ /dev/null @@ -1,1419 +0,0 @@ -{-# Language MultiWayIf #-} -{-# Language RecordWildCards #-} -{-# Language PatternSynonyms #-} -module HBS2.Storage.NCQ2 - ( module HBS2.Storage.NCQ2 - , module HBS2.Storage.NCQ.Types - , module HBS2.Storage.NCQ2.Internal - ) - where - -import HBS2.Prelude.Plated -import HBS2.Hash -import HBS2.OrDie -import HBS2.Data.Types.Refs -import HBS2.Misc.PrettyStuff -import HBS2.System.Logger.Simple.ANSI - -import HBS2.Data.Log.Structured.NCQ - -import HBS2.Storage.NCQ.Types -import HBS2.Storage.NCQ2.Internal - -import Data.Config.Suckless.System - - -import Numeric (showHex) -import Data.ByteString.Builder -import Network.ByteOrder qualified as N -import Data.HashMap.Strict (HashMap) -import Control.Monad.Trans.Cont -import Control.Monad.Trans.Maybe -import Data.Ord (Down(..),comparing) -import Control.Concurrent.STM qualified as STM -import Control.Concurrent.STM.TSem -import Data.IntMap qualified as IntMap -import Data.Sequence qualified as Seq -import Data.List qualified as List -import Data.ByteString.Lazy qualified as LBS -import Data.ByteString (ByteString) -import Data.ByteString qualified as BS -import Data.ByteString.Char8 qualified as BS8 -import Data.Coerce -import Data.Sequence ((|>)) -import Data.Word -import Data.Either -import Data.Maybe -import Data.Vector qualified as V -import Data.Vector (Vector, (!)) -import Lens.Micro.Platform -import Data.HashSet (HashSet) -import Data.HashSet qualified as HS -import Data.HashMap.Strict qualified as HM -import System.FilePath.Posix -import System.Posix.Files qualified as Posix -import System.Posix.IO as PosixBase -import System.Posix.Types as Posix -import System.Posix.IO.ByteString as Posix -import System.Posix.Unistd -import System.Posix.Files ( getFileStatus - , modificationTimeHiRes - , setFileTimesHiRes - , getFdStatus - , FileStatus(..) - , setFileMode - ) -import System.Posix.Files qualified as PFS -import System.IO.MMap as MMap -import System.IO.Temp (emptyTempFile) -import Streaming.Prelude qualified as S - -import System.Random.MWC as MWC - -import UnliftIO -import UnliftIO.Concurrent(getNumCapabilities) -import UnliftIO.IO.File - --- FIXME: ASAP-USE-FILE-LOCK -import System.FileLock as FL - - -ncqStorageOpen2 :: MonadIO m => FilePath -> (NCQStorage2 -> NCQStorage2)-> m NCQStorage2 -ncqStorageOpen2 fp upd = do - let ncqRoot = fp - let ncqGen = 0 - let ncqFsync = 16 * megabytes - let ncqWriteQLen = 1024 * 4 - let ncqMinLog = 512 * megabytes - -- let ncqMaxLog = 16 * gigabytes -- ??? - let ncqMaxLog = 2 * ncqMinLog -- * gigabytes -- ??? - let ncqWriteBlock = max 128 $ ncqWriteQLen `div` 2 - let ncqMaxCached = 128 - let ncqIdleThrsh = 50.00 - let ncqPostponeMerge = 300.00 - let ncqPostponeSweep = 2 * ncqPostponeMerge - let ncqLuckyNum = 2 - - let shardNum = ncqLuckyNum * 2 - let wopNum = ncqLuckyNum - - cap <- getNumCapabilities <&> fromIntegral - ncqWriteQ <- newTVarIO mempty - ncqMemTable <- V.fromList <$> replicateM shardNum (newTVarIO mempty) - ncqStorageStopReq <- newTVarIO False - ncqStorageSyncReq <- newTVarIO False - ncqMergeReq <- newTVarIO False - ncqMergeSem <- atomically (newTSem 1) - ncqSyncNo <- newTVarIO 0 - ncqCurrentFiles <- newTVarIO mempty - ncqTrackedFiles <- newTVarIO V.empty - ncqStateVersion <- newTVarIO 0 - ncqStateUsage <- newTVarIO mempty - ncqStateName <- newTVarIO Nothing - ncqStateSem <- atomically $ newTSem 1 - ncqCachedEntries <- newTVarIO 0 - ncqStorageTasks <- newTVarIO 0 - ncqWrites <- newTVarIO 0 - ncqWriteEMA <- newTVarIO 0.00 - ncqJobQ <- newTQueueIO - ncqMiscSem <- atomically (newTSem 1) - ncqSweepSem <- atomically (newTSem 1) - ncqMergeTasks <- newTVarIO 0 - ncqOnRunWriteIdle <- newTVarIO none - ncqFactFiles <- newTVarIO mempty - ncqFacts <- newTVarIO mempty - - ncqReadReq <- newTQueueIO - - ncqWriteOps <- replicateM wopNum newTQueueIO <&> V.fromList - - ncqRndGen <- liftIO MWC.createSystemRandom - - let ncqSalt = "EstEFasxrCFqsGDxcY4haFcha9e4ZHRzsPbGUmDfdxLk" - - let ncq = NCQStorage2{..} & upd - - mkdir (ncqGetWorkDir ncq) - - liftIO $ withSem ncqMergeSem do - ncqRepair ncq - ncqPreloadIndexes ncq - ncqSweepStates ncq - - pure ncq - - -ncqWithStorage :: MonadUnliftIO m => FilePath -> ( NCQStorage2 -> m a ) -> m a -ncqWithStorage fp action = flip runContT pure do - sto <- lift (ncqStorageOpen2 fp id) - w <- ContT $ withAsync (ncqStorageRun2 sto) - link w - r <- lift (action sto) - lift (ncqStorageStop2 sto) - wait w - pure r - - -ncqLookupEntrySTM :: NCQStorage2 -> HashRef -> STM (Maybe NCQEntry) -ncqLookupEntrySTM ncq h = readTVar (ncqGetShard ncq h) <&> HM.lookup h - -ncqAlterEntrySTM :: NCQStorage2 - -> HashRef - -> (Maybe NCQEntry -> Maybe NCQEntry) - -> STM () -ncqAlterEntrySTM ncq h alterFn = do - let shard = ncqGetShard ncq h - modifyTVar shard (HM.alter alterFn h) - -ncqPutBS :: MonadUnliftIO m - => NCQStorage2 - -> Maybe NCQSectionType - -> Maybe HashRef - -> ByteString - -> m HashRef -ncqPutBS ncq@NCQStorage2{..} mtp mhref bs' = do - - waiter <- newEmptyTMVarIO - - let work = do - - let h = fromMaybe (HashRef (hashObject @HbSync bs')) mhref - let bs = ncqMakeSectionBS mtp h bs' - let shard = ncqGetShard ncq h - zero <- newTVarIO Nothing - - atomically do - - stop <- readTVar ncqStorageStopReq - filled <- readTVar ncqWriteQ <&> Seq.length - - when (not stop && filled > ncqWriteQLen) STM.retry - - upd <- stateTVar shard $ flip HM.alterF h \case - Nothing -> (True, Just (NCQEntry bs zero)) - Just e | ncqEntryData e /= bs -> (True, Just (NCQEntry bs zero)) - | otherwise -> (False, Just e) - - when upd do - modifyTVar ncqWriteQ (|> h) - - putTMVar waiter h - - atomically do - nw <- readTVar ncqWrites <&> (`mod` V.length ncqWriteOps) - modifyTVar ncqWrites succ - writeTQueue (ncqWriteOps ! nw) work - - atomically $ takeTMVar waiter - -ncqEntryUnwrap :: NCQStorage2 - -> ByteString - -> (ByteString, Either ByteString (NCQSectionType, ByteString)) -ncqEntryUnwrap n source = do - let (k,v) = BS.splitAt ncqKeyLen (BS.drop 4 source) - (k, ncqEntryUnwrapValue n v) -{-# INLINE ncqEntryUnwrap #-} - -ncqEntryUnwrapValue :: NCQStorage2 - -> ByteString - -> Either ByteString (NCQSectionType, ByteString) -ncqEntryUnwrapValue _ v = case ncqIsMeta v of - Just meta -> Right (meta, BS.drop ncqPrefixLen v) - Nothing -> Left v -{-# INLINE ncqEntryUnwrapValue #-} - -ncqIdxIsTombSize :: NCQIdxEntry -> Bool -ncqIdxIsTombSize (NCQIdxEntry _ s) = s == ncqSLen + ncqKeyLen + ncqPrefixLen -{-# INLINE ncqIdxIsTombSize #-} - -ncqIsTomb :: NCQStorage2 -> Location -> Bool -ncqIsTomb me loc = case ncqEntryUnwrap me (ncqGetEntryBS me loc) of - (_, Right (T, _)) -> True - _ -> False - -ncqDelEntry :: MonadUnliftIO m - => NCQStorage2 - -> HashRef - -> m () -ncqDelEntry me href = do - -- всегда пишем tomb и надеемся на лучшее - -- merge/compact разберутся - -- однако не пишем, если записи еще нет - ncqLocate2 me href >>= \case - Just loc | not (ncqIsTomb me loc) -> do - void $ ncqPutBS me (Just T) (Just href) "" - - _ -> none - -ncqLookupEntry :: MonadUnliftIO m => NCQStorage2 -> HashRef -> m (Maybe NCQEntry) -ncqLookupEntry sto hash = atomically (ncqLookupEntrySTM sto hash) - -ncqGetEntryBS :: NCQStorage2 -> Location -> ByteString -ncqGetEntryBS _ = \case - InMemory bs -> bs - InFossil _ mmap off size -> do - BS.take (fromIntegral size) $ BS.drop (fromIntegral off) mmap - -ncqEntrySize :: forall a . Integral a => Location -> a -ncqEntrySize = \case - InFossil _ _ _ size -> fromIntegral size - InMemory bs -> fromIntegral (BS.length bs) - - - - -ncqPreloadIndexes :: MonadUnliftIO m - => NCQStorage2 - -> m () -ncqPreloadIndexes me@NCQStorage2{..} = useVersion me $ const do - fs <- readTVarIO ncqTrackedFiles <&> take ncqMaxCached . V.toList - flip fix (fs, ncqMaxCached) $ \next (files,lim) -> do - case files of - (t@TrackedFile{..}:rest) | lim > 0 -> do - readTVarIO tfCached >>= \case - Nothing -> do - void $ ncqLoadTrackedFile me t - next (rest, pred lim) - _ -> next (rest, lim) - - _ -> none - -ncqLoadTrackedFile :: MonadUnliftIO m - => NCQStorage2 - -> TrackedFile - -> m (Maybe CachedEntry) -ncqLoadTrackedFile ncq@NCQStorage2{..} TrackedFile{..} = runMaybeT do - - let indexFile = ncqGetFileName ncq (toFileName (IndexFile tfKey)) - let dataFile = ncqGetFileName ncq (toFileName (DataFile tfKey)) - - idxHere <- liftIO $ doesFileExist indexFile - unless idxHere do - liftIO $ err $ red "missed index" <+> "in loadIndex" <+> pretty tfKey - mzero - - (idxBs, idxNway) <- MaybeT $ - liftIO (nwayHashMMapReadOnly indexFile) - - datBs <- liftIO $ mmapFileByteString dataFile Nothing - - tnow <- liftIO $ newTVarIO =<< getTimeCoarse - let ce = CachedEntry idxBs datBs idxNway tnow - - atomically do - writeTVar tfCached (Just ce) - modifyTVar ncqCachedEntries (+1) - evictIfNeededSTM ncq (Just 1) - - pure ce - -data Seek a = SeekStop !a | SeekNext !a - - ---- - -ncqSeekInFossils :: forall a f m . (MonadUnliftIO m, Monoid (f a)) - => NCQStorage2 - -> HashRef - -> (Location -> m (Seek (f a))) - -> m (f a) -ncqSeekInFossils ncq@NCQStorage2{..} href action = useVersion ncq $ const do - tracked <- readTVarIO ncqTrackedFiles - let l = V.length tracked - - let - go :: Int -> Int -> f a -> m (f a) - go i a r - | i >= l = pure r - | a > 1 = do - let TrackedFile{..} = tracked ! i - err $ "unable to load fossil" <+> pretty tfKey - go (i+1) 0 r - | otherwise = do - let TrackedFile{..} = tracked ! i - readTVarIO tfCached >>= \case - - Just PendingEntry{} -> - go (i+1) 0 r - - Nothing -> do - void $ ncqLoadTrackedFile ncq TrackedFile{..} - go i (a+1) r - - Just CachedEntry{..} -> do - liftIO (ncqLookupIndex href (cachedMmapedIdx, cachedNway)) >>= \case - Nothing -> go (i+1) 0 r - Just (NCQIdxEntry offset size) -> do - now <- getTimeCoarse - atomically $ writeTVar cachedTs now - action (InFossil tfKey cachedMmapedData offset size) >>= \case - SeekStop e -> pure (r <> e) - SeekNext e -> go (i+1) 0 (r <> e) - - go 0 0 mempty - - - -ncqLocate2 :: MonadUnliftIO m => NCQStorage2 -> HashRef -> m (Maybe Location) -ncqLocate2 NCQStorage2{..} href = do - answ <- newEmptyTMVarIO - - atomically do - modifyTVar ncqWrites succ - writeTQueue ncqReadReq (href, answ) - - atomically $ takeTMVar answ - -data RunSt = - RunNew - | RunWrite (FileKey, Fd, Int, Int) - | RunSync (FileKey, Fd, Int, Int, Bool) - | RunFin - -ncqStorageRun2 :: forall m . MonadUnliftIO m - => NCQStorage2 - -> m () -ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do - - closeQ <- newTQueueIO - - closer <- spawnActivity $ liftIO $ fix \loop -> do - what <- atomically do - stop <- readTVar ncqStorageStopReq - tryReadTQueue closeQ >>= \case - Just e -> pure $ Just e - Nothing | not stop -> STM.retry - | otherwise -> pure Nothing - - maybe1 what none $ \(fk, fh) -> do - debug $ red "CLOSE FILE" <+> pretty fk - closeFd fh - debug $ yellow "indexing" <+> pretty fk - idx <- ncqRunTaskNoMatterWhat ncq (ncqIndexFile ncq (DataFile fk)) - ncqRunTaskNoMatterWhat ncq $ ncqStateUpdate ncq [F 0 fk] - nwayHashMMapReadOnly idx >>= \case - Nothing -> err $ "can't open index" <+> pretty idx - Just (bs,nway) -> do - nwayHashScanAll nway bs $ \_ k _ -> do - unless (k == emptyKey) $ atomically $ void $ runMaybeT do - NCQEntry _ tfk <- MaybeT $ ncqLookupEntrySTM ncq (coerce k) - fk' <- MaybeT $ readTVar tfk - guard (fk == fk') -- remove only own stuff - lift $ ncqAlterEntrySTM ncq (coerce k) (const Nothing) - - ncqPreloadIndexes ncq - atomically (modifyTVar ncqCurrentFiles (HS.delete fk)) - loop - - spawnActivity $ forever (liftIO $ join $ atomically (readTQueue ncqJobQ)) - - replicateM_ 2 $ spawnActivity $ fix \next -> do - (h, answ) <- atomically $ readTQueue ncqReadReq - - let answer l = atomically (putTMVar answ l) - - let lookupCached fk = \case - PendingEntry{} -> none - CachedEntry{..} -> do - ncqLookupIndex h (cachedMmapedIdx, cachedNway) >>= \case - Nothing -> none - Just (NCQIdxEntry offset size) -> do - answer (Just (InFossil fk cachedMmapedData offset size)) - next - {-# INLINE lookupCached #-} - - ncqLookupEntry ncq h >>= \case - Nothing -> none - Just e -> answer (Just (InMemory (ncqEntryData e))) >> next - - useVersion ncq $ const do - - tracked <- readTVarIO ncqTrackedFiles - - for_ tracked $ \(TrackedFile{..}) -> do - readTVarIO tfCached >>= \case - Just ce -> lookupCached tfKey ce - Nothing -> ncqLoadTrackedFile ncq TrackedFile{..} >>= \case - Nothing -> err $ "unable to load index" <+> pretty tfKey - Just ce -> lookupCached tfKey ce - - next - - let shLast = V.length ncqWriteOps - 1 - spawnActivity $ pooledForConcurrentlyN_ (V.length ncqWriteOps) [0..shLast] $ \i -> do - let q = ncqWriteOps ! i - forever (liftIO $ join $ atomically (readTQueue q)) - - spawnActivity measureWPS - - -- FIXME: bigger-period - spawnActivity $ postponed ncqPostponeSweep $ forever $ (>> pause @'Seconds 120) $ do - ema <- readTVarIO ncqWriteEMA - n <- ncqListStateFiles ncq <&> List.length - when (ema < ncqIdleThrsh * 1.5 && n > 0) $ withSem ncqMergeSem do - debug $ yellow "run sweep routine" - ncqSweepStates ncq - ncqSweepFossils ncq - - spawnActivity $ postponed ncqPostponeMerge $ fix \again -> (>> again) do - ema <- readTVarIO ncqWriteEMA - mergeReq <- atomically $ stateTVar ncqMergeReq (,False) - - debug $ green "MERGE ATTEMPT" <+> pretty ema <+> "~" <+> pretty ncqIdleThrsh - - let notPending x = List.length [ k | (k,e,_) <- V.toList x, isNotPending e ] - - if ema > ncqIdleThrsh && not mergeReq then do - pause @'Seconds 10 - - else do - mq <- newEmptyTMVarIO - - spawnJob $ do - merged <- ncqMergeStep ncq - atomically $ putTMVar mq merged - - -- TODO: detect-dead-merge - void $ race (pause @'Seconds 300) (atomically $ readTMVar mq) >>= \case - Left{} -> warn $ yellow "MERGE FUCKING STALLED" - Right True -> none - Right False -> do - - debug "merge: all done, wait..." - n0 <- ncqListTrackedFiles ncq <&> notPending - - -- FIXME: bigger-timeout - void $ race (pause @'Seconds 60) do - atomically do - n <- ncqListTrackedFilesSTM ncq <&> notPending - when (n == n0) STM.retry - - - spawnActivity factsDB - - spawnActivity $ postponed 20 $ forever do - ema <- readTVarIO ncqWriteEMA - when (ema < 50 ) do - -- ncqKeyNumIntersectionProbe ncq - ncqTombCountProbe ncq - - pause @'Seconds 10 - - ContT $ bracket none $ const $ liftIO do - fhh <- atomically (STM.flushTQueue closeQ) - for_ fhh ( closeFd . snd ) - - flip fix RunNew $ \loop -> \case - - RunFin -> do - debug "wait finalizing" - atomically $ pollSTM closer >>= maybe STM.retry (const none) - debug "exit storage" - - RunNew -> do - stop <- readTVarIO ncqStorageStopReq - mt <- readTVarIO ncqWriteQ <&> Seq.null - - if stop && mt then do - loop RunFin - else do - (fk,fhx) <- openNewDataFile - liftIO (ncqStateUpdate ncq [P fk]) - debug $ "openNewDataFile" <+> pretty fk - loop $ RunWrite (fk,fhx,0,0) - - RunSync (fk, fh, w, total, continue) -> do - - stop <- readTVarIO ncqStorageStopReq - sync <- readTVarIO ncqStorageSyncReq - - let needClose = total >= ncqMinLog || stop - - rest <- if not (sync || needClose || w > ncqFsync) then - pure w - else do - appendTailSection fh >> liftIO (fileSynchronise fh) - -- FIXME: slow! - -- to make it appear in state, but to ignore until index is done - atomically do - writeTVar ncqStorageSyncReq False - modifyTVar ncqSyncNo succ - - pure 0 - - if | needClose && continue -> do - atomically $ writeTQueue closeQ (fk, fh) - loop RunNew - - | not continue -> loop RunFin - - | otherwise -> loop $ RunWrite (fk, fh, rest, total) - - RunWrite (fk, fh, w, total') -> do - - let timeoutMicro = 10_000_000 - - chunk <- liftIO $ timeout timeoutMicro $ atomically do - stop <- readTVar ncqStorageStopReq - sy <- readTVar ncqStorageSyncReq - chunk <- stateTVar ncqWriteQ (Seq.splitAt ncqWriteBlock) - - if | Seq.null chunk && stop -> pure $ Left () - | Seq.null chunk && not (stop || sy) -> STM.retry - | otherwise -> pure $ Right chunk - - case chunk of - Nothing -> do - liftIO $ join $ readTVarIO ncqOnRunWriteIdle - if w == 0 then do - loop $ RunWrite (fk,fh,w,total') - else do - atomically $ writeTVar ncqStorageSyncReq True - loop $ RunSync (fk, fh, w, total', True) -- exit () - - Just (Left{}) -> loop $ RunSync (fk, fh, w, total', False) -- exit () - - Just (Right chu) -> do - ws <- for chu $ \h -> do - atomically (ncqLookupEntrySTM ncq h) >>= \case - Just (NCQEntry bs w) -> do - atomically (writeTVar w (Just fk)) - lift (appendSection fh bs) - - _ -> pure 0 - - let written = sum ws - loop $ RunSync (fk, fh, w + written, total' + written, True) - - where - - emptyKey = ncqEmptyKey - - openNewDataFile :: forall mx . MonadIO mx => mx (FileKey, Fd) - openNewDataFile = do - fname <- ncqGetNewFossilName ncq - atomically $ modifyTVar ncqCurrentFiles (HS.insert (fromString fname)) - touch fname - let flags = defaultFileFlags { exclusive = False, creat = Just 0o666 } - (fromString fname,) <$> liftIO (PosixBase.openFd fname Posix.ReadWrite flags) - - spawnJob = ncqSpawnJob ncq - - postponed n m = liftIO (pause @'Seconds n) >> m - - spawnActivity m = do - a <- ContT $ withAsync m - link a - pure a - - measureWPS = void $ flip fix Nothing \loop -> \case - Nothing -> do - w <- readTVarIO ncqWrites - t <- getTimeCoarse - pause @'Seconds step >> loop (Just (w,t)) - - Just (w0,t0) -> do - w1 <- readTVarIO ncqWrites - t1 <- getTimeCoarse - let dt = max 1e-9 (realToFrac @_ @Double (t1 - t0)) / 1e9 - dw = fromIntegral (w1 - w0) - atomically $ modifyTVar ncqWriteEMA \ema -> alpha * (dw/dt) + 0.9 * ema - pause @'Seconds step >> loop (Just (w1,t1)) - - where - alpha = 0.1 - step = 1.00 - - factsDB = do - let dir = ncqGetFactsDir ncq - debug $ yellow "factsDB started" <+> pretty dir - mkdir dir - - forever do - debug $ yellow "factsDB wip" - pause @'Seconds 10 - - -ncqSpawnJob :: forall m . MonadIO m => NCQStorage2 -> IO () -> m () -ncqSpawnJob NCQStorage2{..} m = atomically $ writeTQueue ncqJobQ m - -ncqFileFastCheck :: MonadUnliftIO m => FilePath -> m () -ncqFileFastCheck fp = do - - -- debug $ "ncqFileFastCheck" <+> pretty fp - - mmaped <- liftIO $ mmapFileByteString fp Nothing - let size = BS.length mmaped - let s = BS.drop (size - 8) mmaped & N.word64 - - unless ( BS.length mmaped == fromIntegral s ) do - throwIO $ NCQFsckIssueExt (FsckInvalidFileSize (fromIntegral s)) - - -ncqStorageScanDataFile :: MonadIO m - => NCQStorage2 - -> FilePath - -> ( Integer -> Integer -> HashRef -> ByteString -> m () ) - -> m () -ncqStorageScanDataFile ncq fp' action = do - let fp = ncqGetFileName ncq fp' - mmaped <- liftIO (mmapFileByteString fp Nothing) - - flip runContT pure $ callCC \exit -> do - flip fix (0,mmaped) $ \next (o,bs) -> do - - when (BS.length bs < ncqSLen) $ exit () - - let w = BS.take ncqSLen bs & N.word32 & fromIntegral - - when (BS.length bs < ncqSLen + w) $ exit () - - let kv = BS.drop ncqSLen bs - - let k = BS.take ncqKeyLen kv & coerce @_ @HashRef - let v = BS.take (ncqFullDataLen (NCQFullRecordLen w)) $ BS.drop ncqKeyLen kv - - lift (action o (fromIntegral w) k v) - - next (ncqSLen + o + fromIntegral w, BS.drop (w+ncqSLen) bs) - - -ncqIndexFile :: MonadUnliftIO m => NCQStorage2 -> DataFile FileKey -> m FilePath -ncqIndexFile n@NCQStorage2{} fk = do - - let fp = toFileName fk & ncqGetFileName n - let dest = toFileName (IndexFile (coerce @_ @FileKey fk)) & ncqGetFileName n - - debug $ "INDEX" <+> pretty fp <+> pretty dest - - items <- S.toList_ do - ncqStorageScanDataFile n fp $ \o w k s -> case ncqIsMeta s of - Just M -> none - _ -> do - -- we need size in order to return block size faster - -- w/o search in fossil - let rs = (w + ncqSLen) & fromIntegral @_ @Word32 & N.bytestring32 - let os = fromIntegral @_ @Word64 o & N.bytestring64 - let record = os <> rs - S.yield (coerce k, record) - - let (dir,name) = splitFileName fp - let idxTemp = (dropExtension name <> "-") `addExtension` ".cq$" - - result <- nwayWriteBatch (nwayAllocDef 1.10 32 8 12) dir idxTemp items - - mv result dest - - ncqStateUpdate n [F 0 (coerce fk)] - - pure dest - - -ncqAddTrackedFiles :: MonadIO m => NCQStorage2 -> [DataFile FileKey] -> m () -ncqAddTrackedFiles ncq@NCQStorage2{..} files = flip runContT pure do - valid <- for files \fkey -> callCC \skip -> do - let fname = ncqGetFileName ncq (toFileName fkey) - let idxName = ncqGetFileName ncq (toFileName (IndexFile (coerce @_ @FileKey fkey))) - - idxHere <- doesFileExist idxName - unless idxHere do - err $ "Index does not exist" <+> pretty (takeFileName idxName) - skip Nothing - - stat <- liftIO $ PFS.getFileStatus fname - let ts = posixToTimeSpec $ PFS.modificationTimeHiRes stat - let fk = fromString (takeFileName fname) - - pure $ Just (ts, fk) - - atomically $ ncqAddTrackedFilesSTM ncq (catMaybes valid) - - -ncqAddTrackedFilesSTM :: NCQStorage2 -> [(TimeSpec, FileKey)] -> STM () -ncqAddTrackedFilesSTM NCQStorage2{..} newFiles = do - a <- readTVar ncqTrackedFiles <&> V.toList - let already = HS.fromList (map tfKey a) - - b <- for newFiles \(t, f) -> - if f `HS.member` already - then pure Nothing - else do - tv <- newTVar Nothing - pure . Just $ TrackedFile (FilePrio (Down t)) f tv - - let new = V.fromList $ List.sortOn tfTime (a <> catMaybes b) - writeTVar ncqTrackedFiles new -{-# INLINE ncqAddTrackedFilesSTM #-} - -evictIfNeededSTM :: NCQStorage2 -> Maybe Int -> STM () -evictIfNeededSTM me@NCQStorage2{..} howMany = do - cur <- readTVar ncqCachedEntries - - let need = fromMaybe cur howMany - excess = max 0 (cur + need - ncqMaxCached) - - when (excess > 0) do - files <- ncqListTrackedFilesSTM me - - oldest <- forM (V.toList files) \case - (k, Just (CachedEntry{..}) , t) -> do - ts <- readTVar cachedTs - pure (Just (ts, k, t)) - _ -> pure Nothing - - let victims = oldest & catMaybes & List.sortOn (view _1) & List.take excess - - for_ victims $ \(_,_,t) -> do - writeTVar t Nothing - modifyTVar ncqCachedEntries (subtract 1) - -{- HLINT ignore "Functor law" -} - -ncqListDirFossils :: MonadIO m => NCQStorage2 -> m [FilePath] -ncqListDirFossils ncq = do - let wd = ncqGetWorkDir ncq - dirFiles wd - >>= mapM (pure . takeFileName) - <&> List.filter (\f -> List.isPrefixOf "fossil-" f && List.isSuffixOf ".data" f) - <&> HS.toList . HS.fromList - -ncqListStateFiles :: forall m . MonadIO m => NCQStorage2 -> m [(TimeSpec, StateFile FileKey)] -ncqListStateFiles ncq = do - let wd = ncqGetWorkDir ncq - dirFiles wd - >>= mapM (pure . takeBaseName) - <&> List.filter (List.isPrefixOf "state-") - >>= mapM timespecOf - <&> fmap (over _2 fromString) . rights - <&> List.sortOn Down - - where - timespecOf x = liftIO @m $ try @_ @IOException do - (,x) . posixToTimeSpec . modificationTimeHiRes <$> getFileStatus (ncqGetFileName ncq x) - -ncqRepair :: MonadIO m => NCQStorage2 -> m () -ncqRepair me@NCQStorage2{..} = do - states <- ncqListStateFiles me <&> fmap snd - - fossils <- flip fix states $ \next -> \case - [] -> do - debug $ yellow "no valid state found; start from scratch" - ncqListDirFossils me <&> fmap (DataFile . fromString) - - (s:ss) -> tryLoadState s >>= \case - Just files -> do - debug $ yellow "used state" <+> pretty s - atomically $ writeTVar ncqStateName (Just s) - pure files - Nothing -> do - warn $ red "inconsistent state" <+> pretty s - rm (ncqGetFileName me $ toFileName s) - next ss - - ncqAddTrackedFiles me fossils - - void $ liftIO (ncqStateUpdate me mempty) - - where - - readState path = ncqReadStateKeys me path <&> fmap DataFile - - tryLoadState (fk :: StateFile FileKey) = liftIO do - - debug $ "tryLoadState" <+> pretty fk - - state <- readState fk - - let checkFile fo = flip fix 0 $ \next i -> do - let dataFile = ncqGetFileName me (toFileName fo) - let indexFile = ncqGetFileName me (toFileName (IndexFile (coerce @_ @FileKey fo))) - - here <- doesFileExist dataFile - - if not here then do - rm indexFile - pure False - - else do - - try @_ @SomeException (ncqFileFastCheck dataFile) >>= \case - - Left e -> do - err (viaShow e) - here <- doesFileExist dataFile - when here do - let broken = dropExtension dataFile `addExtension` ".broken" - mv dataFile broken - rm indexFile - warn $ red "renamed" <+> pretty dataFile <+> pretty broken - - pure False - - Right{} | i > 1 -> pure False - - Right{} -> do - exists <- doesFileExist indexFile - if exists then do - pure True - else do - debug $ "indexing" <+> pretty (toFileName fo) - r <- ncqIndexFile me fo - debug $ "indexed" <+> pretty r - next (succ i) - - results <- forM state checkFile - pure $ if and results then Just state else Nothing - - -ncqRefHash :: NCQStorage2 -> HashRef -> HashRef -ncqRefHash NCQStorage2 {..} h = HashRef (hashObject (coerce @_ @ByteString h <> coerce ncqSalt)) - -ncqRunTaskNoMatterWhat :: MonadUnliftIO m => NCQStorage2 -> m a -> m a -ncqRunTaskNoMatterWhat NCQStorage2{..} task = do - atomically (modifyTVar ncqStorageTasks succ) - task `finally` atomically (modifyTVar ncqStorageTasks pred) - -ncqRunTask :: MonadUnliftIO m => NCQStorage2 -> a -> m a -> m a -ncqRunTask ncq@NCQStorage2{..} def task = readTVarIO ncqStorageStopReq >>= \case - True -> pure def - False -> ncqRunTaskNoMatterWhat ncq task - -ncqWaitTasks :: MonadUnliftIO m => NCQStorage2 -> m () -ncqWaitTasks NCQStorage2{..} = atomically do - tno <- readTVar ncqStorageTasks - when (tno > 0) STM.retry - - -ncqStateUpdate :: MonadUnliftIO m => NCQStorage2 -> [StateOP] -> m Bool -ncqStateUpdate me@NCQStorage2{..} ops' = withSem ncqStateSem $ flip runContT pure $ callCC \exit -> do - - debug $ "ncqStateUpdate" <+> viaShow ops' - - t1 <- FilePrio . Down <$> liftIO getTimeCoarse - - ops <- checkWithDisk $ \name -> do - err $ "ncqStateUpdate invariant fail" <+> pretty name - exit False - - changed <- atomically do - current' <- readTVar ncqTrackedFiles <&> V.toList - - memStateVersionSTM (HS.fromList (fmap tfKey current')) - - let current = HM.fromList [ (tfKey, e) | e@TrackedFile{..} <- current' ] - - wtf <- flip fix (current, ops) $ \next (s, o) -> case o of - [] -> pure s - - (D fk : rest) -> next (HM.delete fk s, rest) - - (P fk : rest) | HM.member fk current -> next (s, rest) - | otherwise -> do - e <- TrackedFile t1 fk <$> newTVar (Just PendingEntry) - next (HM.insert fk e s, rest) - - (F t fk : rest) -> do - case HM.lookup fk s of - Nothing -> do - new <- TrackedFile (FilePrio (Down t)) fk <$> newTVar Nothing - next (HM.insert fk new s, rest) - - Just TrackedFile{..} -> do - pe <- readTVar tfCached - if isNotPending pe then - next (s, rest) - else do - writeTVar tfCached Nothing - next (s, rest) - - writeTVar ncqTrackedFiles (V.fromList $ List.sortOn tfTime (HM.elems wtf)) - - pure (HM.keysSet current /= HM.keysSet wtf) - - -- let fks = HS.fromList [ fk | F _ fk <- ops ] - -- tra <- lift $ ncqListTrackedFiles me <&> filter (not . isNotPending . view _2) . V.toList - -- let tra2 = [ k | (k,_,_) <- tra, HS.member k fks ] - - -- unless (List.null tra2) do - -- err $ red "FUCKED" <+> pretty tra2 - - when changed $ liftIO do - name <- ncqDumpCurrentState me - atomically $ writeTVar ncqStateName (Just name) - debug $ green "switched state" <+> pretty name - - -- a1 <- V.toList <$> lift (ncqListTrackedFiles me) - - -- let fsz = HS.fromList [ fk | F _ fk <- ops ] - - -- let p1 = [ fk | (fk, Just PendingEntry{}, _) <- a1, HS.member fk fsz ] - - -- unless (List.null p1) do - -- error $ show $ "PIZDA!" <+> pretty p1 <> line <> viaShow ops' - - pure changed - - where - - memStateVersionSTM currentKeys = do - k0 <- readTVar ncqStateVersion <&> fromIntegral - - let doAlter = \case - Nothing -> Just (0, currentKeys) - Just (u,f) -> Just (u,f) - - modifyTVar ncqStateUsage (IntMap.alter doAlter k0) - - checkWithDisk onFail = for ops' $ \case -- - f@(F _ fk) -> do - let datFile = ncqGetFileName me (toFileName $ DataFile fk) - e2 <- doesFileExist datFile - unless e2 (onFail datFile) - - ts <- liftIO (getFileStatus datFile) <&> - posixToTimeSpec . PFS.modificationTimeHiRes - - pure (F ts fk) - - d -> pure d - - -ncqDumpCurrentState :: MonadUnliftIO m => NCQStorage2 -> m (StateFile FileKey) -ncqDumpCurrentState me@NCQStorage2{..} = do - files <- ncqListTrackedFiles me - name <- ncqGetNewStateName me - writeBinaryFileDurableAtomic name (BS8.unlines [coerce k | (k,_,_) <- V.toList files]) - pure $ fromString name - -ncqMergeFull :: forall m . MonadUnliftIO m => NCQStorage2 -> m () -ncqMergeFull me = fix \next -> ncqMergeStep me >>= \case - False -> none - True -> next - --- FIXME: sometime-causes-no-such-file-or-directory -ncqMergeStep :: MonadUnliftIO m => NCQStorage2 -> m Bool -ncqMergeStep ncq@NCQStorage2{..} = do - withSem ncqMergeSem $ ncqRunTask ncq False do - - debug "ncqMergeStep" - - tracked <- ncqListTrackedFiles ncq - - files <- for tracked $ \(f,e,_) -> do - - let fn = ncqGetFileName ncq (toFileName $ DataFile f) - let idx = ncqGetFileName ncq (toFileName $ IndexFile f) - - dataHere <- doesFileExist fn - - sz <- case e of - Just PendingEntry -> pure (-100) - _ | dataHere -> liftIO (fileSize fn) - | otherwise -> pure (-3) - - idxHere <- doesFileExist idx - - pure (f, sz, idxHere) - - -- debug $ red "MERGE FILES" <+> viaShow files - - let bothOk (_, sz1, here1) (_, sz2, here2) = - here1 && here2 - && sz1 > 0 && sz2 > 0 - && (sz1 + sz2) < fromIntegral ncqMaxLog - - found <- flip fix (V.toList files, Nothing, Nothing) $ \next -> \case - ([], _, r) -> pure r - - (a:b:rest, Nothing, _) | bothOk a b -> do - next (b:rest, Just (view _2 a + view _2 b), Just (a,b)) - - (a:b:rest, Just s, _ ) | bothOk a b && (view _2 a + view _2 b) < s -> do - next (b:rest, Just (view _2 a + view _2 b), Just (a,b)) - - (_:rest, s, r) -> do - next (rest, s, r) - - case found of - Just (a,b) -> mergeStep a b >> pure True - _ -> do - debug "merge: not found shit" - pure False - - where - - ncqGetNewMergeName :: MonadIO m => NCQStorage2 -> m FilePath - ncqGetNewMergeName n@NCQStorage2{} = do - let (p,tpl) = splitFileName (ncqGetFileName n "merge-.data") - liftIO $ emptyTempFile p tpl - - mergeStep (a,_,_) (b,_,_) = do - debug $ "merge" <+> pretty a <+> pretty b - - let fDataNameA = ncqGetFileName ncq $ toFileName (DataFile a) - let fIndexNameA = ncqGetFileName ncq $ toFileName (IndexFile a) - - let fDataNameB = ncqGetFileName ncq $ toFileName (DataFile b) - let fIndexNameB = ncqGetFileName ncq $ toFileName (IndexFile b) - - -- TODO: proper-exception-handling - doesFileExist fDataNameA `orFail` ("not exist" <+> pretty fDataNameA) - doesFileExist fDataNameB `orFail` ("not exist" <+> pretty fDataNameB) - doesFileExist fIndexNameA `orFail` ("not exist" <+> pretty fIndexNameA) - doesFileExist fIndexNameB `orFail` ("not exist" <+> pretty fIndexNameB) - - flip runContT pure $ callCC \exit -> do - - mfile <- ncqGetNewMergeName ncq - - ContT $ bracket none $ const do - rm mfile - - liftIO $ withBinaryFileAtomic mfile WriteMode $ \fwh -> do - - debug $ "merge: okay, good to go" <+> pretty (takeFileName mfile) - - idxA <- nwayHashMMapReadOnly fIndexNameA - >>= orThrow (NCQMergeInvariantFailed (show $ "can't mmap" <+> pretty fIndexNameA)) - - - idxB <- nwayHashMMapReadOnly fIndexNameB - >>= orThrow (NCQMergeInvariantFailed (show $ "can't mmap" <+> pretty fIndexNameB)) - - debug $ "SCAN FILE A" <+> pretty fDataNameA - - -- we write only record from A, that last in index(A) and not meta - - writeFiltered ncq fDataNameA fwh $ \o _ k v -> do - let meta = Just M == ncqIsMeta v - liftIO (ncqLookupIndex (coerce k) idxA ) >>= \case - Just (NCQIdxEntry o1 _) | o1 == fromIntegral o -> pure $ not meta - _ -> pure $ False - - -- we write only record from B, that last in index(B) - -- and not meta and not already written 'A' pass - - debug $ "SCAN FILE B" <+> pretty fDataNameA - - writeFiltered ncq fDataNameB fwh $ \o _ k v -> do - let meta = Just M == ncqIsMeta v - foundInA <- liftIO (ncqLookupIndex (coerce k) idxA) <&> isJust - actual <- liftIO (ncqLookupIndex (coerce k) idxB ) >>= \case - Just (NCQIdxEntry o1 _) | o1 == fromIntegral o -> pure $ not meta - _ -> pure $ False - - pure $ not ( foundInA || meta || not actual ) - - appendTailSection =<< handleToFd fwh - - liftIO do - - result <- fileSize mfile - - idx <- if result == 0 then - pure Nothing - else do - fossil <- ncqGetNewFossilName ncq - mv mfile fossil - statA <- getFileStatus fDataNameA - let ts = modificationTimeHiRes statA - setFileTimesHiRes fossil ts ts - let fk = DataFile (fromString fossil) - void $ ncqIndexFile ncq fk - pure $ Just (ts,fk) - - for_ idx $ \(ts,DataFile fk) -> do - void $ ncqStateUpdate ncq [D a, D b, F (posixToTimeSpec ts) fk] - - orFail what e = do - r <- what - unless r (throwIO (NCQMergeInvariantFailed (show e))) - -ncqCompactStep :: forall m . MonadUnliftIO m => NCQStorage2 -> m () -ncqCompactStep me@NCQStorage2{..} = withSem ncqMergeSem $ flip runContT pure $ callCC \exit -> do - ContT $ useVersion me - - files <- lift (ncqListTrackedFiles me) - <&> filter (isNotPending . view _2) . V.toList - <&> fmap (view _1) - <&> zip [0 :: Int ..] - <&> IntMap.fromList - - (i,fkA,tombsA) <- lift (findFileA files) >>= maybe (exit ()) pure - - let (_,_,rest) = IntMap.splitLookup i files - - garbage0 <- lift $ getGarbageSlow fkA mempty - - -- FIXME: hardcode - (j,fkB,tombsB) <- lift (findClosestAmongst rest (HM.keysSet garbage0) 0.15) - >>= maybe (exit ()) pure - - notice $ "found" <+> pretty fkA <+> pretty fkB - - - -- for_ (IntMap.elems rest) $ \fk -> do - - -- let datF = ncqGetFileName me (toFileName (DataFile fk)) - -- dataSize <- liftIO (fileSize datF) - -- garbage <- lift $ getGarbageSlow fk tombsA - - -- let realProfit = sum (HM.elems garbage) - -- let kUse = realToFrac realProfit / (1 + realToFrac dataSize) :: Fixed E3 - - - -- notice $ "profit" <+> pretty fk <+> pretty dataSize <+> pretty realProfit <+> pretty kUse - - -- (aIdx, fileA, nTombs) <- findFileA files >>= maybe (exit ()) pure - - -- notice $ green "compact: fileA" <+> pretty fileA <+> pretty aIdx <+> pretty nTombs - - -- idxA <- lift (viewIndex fileA) - -- tombs <- lift (getTombsInIndex idxA) - - -- let (_,self,b) = IntMap.splitLookup aIdx files - - -- notice $ green "pretty" <+> viaShow b - - -- for_ (IntMap.elems b) $ \fk -> callCC \skip -> do - -- profit <- lift (getProfit fk tombs) - - -- let datF = ncqGetFileName me (toFileName (DataFile fk)) - -- here <- doesFileExist datF - - -- unless here do - -- throwIO (NCQCompactInvariantFailed (show $ "fossil exists" <+> pretty fk)) - - -- dataSize <- liftIO (fileSize datF) - - -- when (dataSize == 0) do - -- notice $ "skipped" <+> pretty fk <+> pretty dataSize <+> pretty profit - -- skip () - - -- garbage <- lift (getGargabeSlow fk mempty) - -- let realProfit = sum (HM.elems garbage) - - -- let pfl = (realToFrac realProfit / realToFrac dataSize) & realToFrac @_ @(Fixed E6) - - -- notice $ "profit" <+> pretty fk <+> pretty profit <+> pretty dataSize <+> pretty pfl <+> pretty realProfit - - -- none - - where - - findFileA files = do - tnums <- for (IntMap.toList files) $ \(i, fk) -> (i, fk,) <$> (getTombsInIndex =<< viewIndex fk) - pure $ listToMaybe ( List.sortOn ( Down . HS.size . view _3 ) tnums ) - - findClosestAmongst rest tombs ratio = flip runContT pure $ callCC \exit -> do - - for_ (IntMap.toList rest) $ \(i,fk) -> do - - let datF = ncqGetFileName me (toFileName (DataFile fk)) - dataSize <- liftIO (fileSize datF) - garbage <- lift (getGarbageSlow fk tombs) - - let realProfit = sum (HM.elems garbage) - let kUse = realToFrac realProfit / (1 + realToFrac dataSize) - - when (kUse >= ratio) $ exit $ Just (i, fk, HM.keysSet garbage) - - pure Nothing - - viewIndex fk = do - let idxf = ncqGetFileName me $ toFileName (IndexFile fk) - liftIO (nwayHashMMapReadOnly idxf) - >>= orThrow (NCQCompactInvariantFailed (show $ "index exists" <+> pretty fk)) - - getTombsInIndex :: MonadUnliftIO m => (ByteString, NWayHash) -> m (HashSet HashRef) - getTombsInIndex (idxBs, nway) = HS.fromList <$> S.toList_ do - nwayHashScanAll nway idxBs $ \_ k v -> do - when (k /= ncqEmptyKey && ncqIdxIsTombSize (decodeEntry v) ) do - S.yield (coerce @_ @HashRef k) - - getProfit :: MonadIO m => FileKey -> HashSet HashRef -> m NCQSize - getProfit fk tombs = do - (bs,nw) <- viewIndex fk - r <- S.toList_ $ nwayHashScanAll nw bs$ \_ k v -> do - when (HS.member (coerce k) tombs) $ S.yield $ let (NCQIdxEntry _ s) = decodeEntry v in s - pure (sum r) - - getGarbageSlow :: MonadUnliftIO m => FileKey -> HashSet HashRef -> m (HashMap HashRef NCQSize) - getGarbageSlow fk tombs = do - let datFile = ncqGetFileName me (toFileName $ DataFile fk) - idx <- viewIndex fk - - r <- newTVarIO mempty - - ncqStorageScanDataFile me datFile $ \o s k v -> do - case ncqEntryUnwrapValue me v of - Left bs -> atomically $ modifyTVar' r (HM.insertWith (+) k (fromIntegral s)) - Right (t, bs) -> do - ncqLookupIndex k idx >>= \case - Nothing -> do - -- notice $ "not found in index" <+> pretty k - atomically $ modifyTVar' r (HM.insertWith (+) k (fromIntegral s)) - - Just (NCQIdxEntry oi _) -> do - let garbage = HS.member k tombs || oi /= fromIntegral o - when garbage do - -- notice $ "offset mismatch or tomb" <+> pretty o <+> pretty oi <+> pretty k - when garbage $ atomically do - modifyTVar' r (HM.insertWith (+) k (fromIntegral s)) - - readTVarIO r - -ncqReadStateKeys :: forall m . MonadUnliftIO m => NCQStorage2 -> StateFile FileKey -> m [FileKey] -ncqReadStateKeys me path = liftIO do - keys <- BS8.readFile (ncqGetFileName me (toFileName path)) - <&> filter (not . BS8.null) . BS8.lines - pure $ fmap (coerce @_ @FileKey) keys - -ncqSweepFossils :: forall m . MonadUnliftIO m => NCQStorage2 -> m () -ncqSweepFossils me@NCQStorage2{..} = withSem ncqSweepSem do - debug $ yellow "sweep fossils" - - -- better be safe than sorry - - current <- readTVarIO ncqCurrentFiles - - sfs <- ncqListStateFiles me - - debug $ "STATE FILES" <+> vcat (fmap pretty sfs) - - mentioned <- mapM (safeRead . ncqReadStateKeys @m me) (fmap snd sfs) - <&> HS.fromList . mconcat - - kicked' <- ncqListDirFossils me <&> fmap fromString - - (kicked, used) <- atomically do - - active <- ncqListTrackedFilesSTM me <&> HS.fromList . fmap (view _1) . V.toList - - used' <- readTVar ncqStateUsage <&> IntMap.elems - - let used = current - <> active - <> mentioned - <> HS.unions [ keys | (n, keys) <- used', n > 0 ] - - let k = filter (\x -> not (HS.member x used)) kicked' - pure (k,HS.fromList $ HS.toList used) - - debug $ "KICK" <+> vcat (fmap pretty kicked) - - debug $ "LIVE SET" <+> vcat (fmap pretty (HS.toList used)) - - for_ kicked $ \fo -> do - debug $ "sweep fossil file" <+> pretty fo - rm (ncqGetFileName me (toFileName (IndexFile fo))) - rm (ncqGetFileName me (toFileName (DataFile fo))) - - where - safeRead m = try @_ @IOException m >>= \case - Right x -> pure x - Left e -> err ("ncqSweepFossils" <+> viaShow e) >> pure mempty - -ncqSweepStates :: MonadUnliftIO m => NCQStorage2 -> m () -ncqSweepStates me@NCQStorage2{..} = withSem ncqSweepSem $ flip runContT pure do - - debug $ yellow "remove unused states" - - current' <- readTVarIO ncqStateName - - current <- ContT $ for_ current' - - debug $ yellow "CURRENT STATE NAME" <+> pretty current - - states <- ncqListStateFiles me <&> fmap snd - - flip fix (Left states) $ \next -> \case - Left [] -> none - Right [] -> none - Left (x:xs) | x == current -> next (Right xs) - | otherwise -> next (Left xs) - - Right (x:xs) -> do - debug $ "Remove obsolete state" <+> pretty x - rm (ncqGetFileName me (toFileName x)) - next (Right xs) - -ncqSetOnRunWriteIdle :: MonadUnliftIO m => NCQStorage2 -> IO () -> m () -ncqSetOnRunWriteIdle NCQStorage2{..} io = atomically (writeTVar ncqOnRunWriteIdle io) - -ncqAddFacts :: MonadUnliftIO m => NCQStorage2 -> [FactE] -> m () -ncqAddFacts me facts = do - none - -writeFiltered :: forall m . MonadIO m - => NCQStorage2 - -> FilePath - -> Handle - -> ( Integer -> Integer -> HashRef -> ByteString -> m Bool) - -> m () - -writeFiltered ncq fn out filt = do - ncqStorageScanDataFile ncq fn $ \o s k v -> do - skip <- filt o s k v <&> not - - -- when skip do - -- debug $ pretty k <+> pretty "skipped" - - unless skip $ liftIO do - BS.hPut out (LBS.toStrict (makeEntryLBS k v)) - - where - - makeEntryLBS h bs = do - let b = byteString (coerce @_ @ByteString h) - <> byteString bs - - let wbs = toLazyByteString b - let len = LBS.length wbs - let ws = byteString (N.bytestring32 (fromIntegral len)) - - toLazyByteString (ws <> b) - - -zeroSyncEntry :: ByteString -zeroSyncEntry = ncqMakeSectionBS (Just B) zeroHash zeroPayload - where zeroPayload = N.bytestring64 0 - zeroHash = HashRef (hashObject zeroPayload) -{-# INLINE zeroSyncEntry #-} - -zeroSyncEntrySize :: Word64 -zeroSyncEntrySize = fromIntegral (BS.length zeroSyncEntry) -{-# INLINE zeroSyncEntrySize #-} - --- 1. It's M-record --- 2. It's last w64be == fileSize --- 3. It's hash == hash (bytestring64be fileSize) --- 4. recovery-strategy: start-to-end, end-to-start -fileTailRecord :: Integral a => a -> ByteString -fileTailRecord w = do - -- on open: last w64be == fileSize - let paylo = N.bytestring64 (fromIntegral w + zeroSyncEntrySize) - let h = hashObject @HbSync paylo & coerce - ncqMakeSectionBS (Just M) h paylo -{-# INLINE fileTailRecord #-} - -appendSection :: forall m . MonadUnliftIO m - => Fd - -> ByteString - -> m Int -- (FOff, Int) - -appendSection fh sect = do - -- off <- liftIO $ fdSeek fh SeekFromEnd 0 - -- pure (fromIntegral off, fromIntegral len) - liftIO (Posix.fdWrite fh sect) <&> fromIntegral -{-# INLINE appendSection #-} - -appendTailSection :: MonadIO m => Fd -> m () -appendTailSection fh = liftIO do - s <- Posix.fileSize <$> Posix.getFdStatus fh - void (appendSection fh (fileTailRecord s)) -{-# INLINE appendTailSection #-} - - - - diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2/Internal.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2/Internal.hs deleted file mode 100644 index 5c2e933d..00000000 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2/Internal.hs +++ /dev/null @@ -1,11 +0,0 @@ -module HBS2.Storage.NCQ2.Internal - ( module HBS2.Storage.NCQ2.Internal - , module Export - )where - -import HBS2.Storage.NCQ2.Internal.Types as Export -import HBS2.Storage.NCQ2.Internal.Probes as Export - - - - diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2/Internal/Probes.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2/Internal/Probes.hs deleted file mode 100644 index 6df1658d..00000000 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2/Internal/Probes.hs +++ /dev/null @@ -1,102 +0,0 @@ -{-# Language RecordWildCards #-} -module HBS2.Storage.NCQ2.Internal.Probes where - -import HBS2.Prelude -import HBS2.Hash -import HBS2.Data.Types.Refs -import HBS2.System.Logger.Simple.ANSI -import HBS2.Misc.PrettyStuff - -import HBS2.Data.Log.Structured.NCQ - -import HBS2.Storage.NCQ2.Internal.Types -import HBS2.Storage.NCQ.Types - -import Control.Monad.Trans.Maybe -import Data.Coerce -import Data.HashMap.Strict qualified as HM -import Data.List qualified as List -import Data.Maybe -import Data.Vector ((!)) -import Data.Vector qualified as V -import Lens.Micro.Platform -import System.Random.MWC qualified as MWC -import UnliftIO - - -randomTrackedFile :: MonadUnliftIO m => NCQStorage2 -> m (Maybe FileKey) -randomTrackedFile ncq@NCQStorage2{..} = runMaybeT do - files0 <- lift (ncqListTrackedFiles ncq) - let files = V.toList $ V.filter (isNotPending . view _2) files0 - guard (not (null files)) - i <- liftIO $ MWC.uniformRM (0, length files - 1) ncqRndGen - pure (view _1 (files !! i)) - -randomTrackedFilePair :: MonadUnliftIO m => NCQStorage2 -> m (Maybe (FileKey, FileKey)) -randomTrackedFilePair ncq@NCQStorage2{..} = runMaybeT do - files0 <- lift (ncqListTrackedFiles ncq) - let files = V.toList $ V.filter (isNotPending . view _2) files0 - guard (length files >= 2) - - (a, b) <- liftIO $ fix \loop -> do - i <- MWC.uniformRM (0, length files - 1) ncqRndGen - j <- MWC.uniformRM (0, length files - 1) ncqRndGen - if i == j then loop else pure (min i j, max i j) - - let fa = view _1 (files !! a) - let fb = view _1 (files !! b) - pure (fa, fb) - - -ncqTombCountProbeFor :: MonadUnliftIO m => NCQStorage2 -> FileKey -> m (Maybe Int) -ncqTombCountProbeFor ncq@NCQStorage2{..} fkey = runMaybeT do - let fIndex = ncqGetFileName ncq $ toFileName (IndexFile fkey) - - (bs, nh) <- liftIO (nwayHashMMapReadOnly fIndex) >>= toMPlus - - liftIO do - ref <- newTVarIO 0 - nwayHashScanAll nh bs $ \_ k v -> do - let NCQIdxEntry _ s = decodeEntry v - when (k /= ncqEmptyKey && s < 64) $ - atomically $ modifyTVar' ref (+1) - readTVarIO ref - -ncqKeyNumIntersectionProbeFor :: MonadUnliftIO m => NCQStorage2 -> (FileKey, FileKey) -> m (Maybe Int) -ncqKeyNumIntersectionProbeFor ncq@NCQStorage2{..} (fka, fkb) = runMaybeT do - let key = FactKey $ coerce $ hashObject @HbSync $ serialise $ List.sort [fka, fkb] - - known <- lift (readTVarIO ncqFacts <&> HM.member key) - guard (not known) - - let fIndexA = ncqGetFileName ncq (toFileName (IndexFile fka)) - let fIndexB = ncqGetFileName ncq (toFileName (IndexFile fkb)) - - idxPair' <- liftIO $ try @_ @IOException do - (,) <$> nwayHashMMapReadOnly fIndexA - <*> nwayHashMMapReadOnly fIndexB - - ((bs1,n1),(bs2,n2)) <- case idxPair' of - Right (Just x, Just y) -> pure (x,y) - _ -> warn ("can't load index pair" <+> pretty (fka, fkb)) >> mzero - - liftIO do - ref <- newTVarIO 0 - nwayHashScanAll n1 bs1 $ \_ k _ -> when (k /= ncqEmptyKey) do - here <- ncqLookupIndex (coerce k) (bs2,n2) - when (isJust here) $ atomically $ modifyTVar' ref (+1) - readTVarIO ref - - -ncqTombCountProbe :: MonadUnliftIO m => NCQStorage2 -> m () -ncqTombCountProbe ncq = useVersion ncq $ const $ void $ runMaybeT do - fk <- MaybeT (randomTrackedFile ncq) - count <- MaybeT (ncqTombCountProbeFor ncq fk) - debug $ yellow "ncqTombCountProbe" <+> pretty fk <+> pretty count - -ncqKeyNumIntersectionProbe :: MonadUnliftIO m => NCQStorage2 -> m () -ncqKeyNumIntersectionProbe ncq = useVersion ncq $ const $ void $ runMaybeT do - (fa, fb) <- MaybeT (randomTrackedFilePair ncq) - n <- MaybeT (ncqKeyNumIntersectionProbeFor ncq (fa, fb)) - debug $ yellow "ncqKeyNumIntersectionProbe" <+> pretty fa <+> pretty fb <+> pretty n - diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2/Internal/Types.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2/Internal/Types.hs deleted file mode 100644 index 68ecfa63..00000000 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2/Internal/Types.hs +++ /dev/null @@ -1,269 +0,0 @@ -{-# Language RecordWildCards #-} -module HBS2.Storage.NCQ2.Internal.Types where - -import HBS2.Prelude.Plated -import HBS2.Hash -import HBS2.Data.Types.Refs -import HBS2.Base58 -import HBS2.Net.Auth.Credentials -import HBS2.Storage -import HBS2.Misc.PrettyStuff -import HBS2.System.Logger.Simple.ANSI - -import HBS2.Data.Log.Structured.SD -import HBS2.Data.Log.Structured.NCQ - -import HBS2.Storage.NCQ.Types - -import Data.Config.Suckless.System - -import Numeric (showHex) -import Network.ByteOrder qualified as N -import Data.HashMap.Strict (HashMap) -import Control.Concurrent.STM.TSem -import Data.IntMap qualified as IntMap -import Data.IntMap (IntMap) -import Data.Sequence qualified as Seq -import Data.Sequence (Seq(..), (|>),(<|)) -import Data.ByteString (ByteString) -import Data.ByteString qualified as BS -import Data.Coerce -import Data.Word -import Data.Vector qualified as V -import Data.Vector (Vector, (!)) -import Lens.Micro.Platform -import Data.HashSet (HashSet) -import System.FilePath.Posix - -import Control.Monad.ST -import System.Random.MWC as MWC - -import UnliftIO - - -type FOff = Word64 - -data NCQEntry = - NCQEntry - { ncqEntryData :: !ByteString - , ncqDumped :: !(TVar (Maybe FileKey)) - } - -type Shard = TVar (HashMap HashRef NCQEntry) - -type NCQOffset = Word64 -type NCQSize = Word32 - -type StateVersion = Word64 - -data NCQIdxEntry = - NCQIdxEntry {-# UNPACK#-} !NCQOffset !NCQSize - -data StateOP = D FileKey | F TimeSpec FileKey | P FileKey - deriving (Eq,Ord,Show) - -data NCQFlag = - NCQMergeNow | NCQCompactNow - deriving (Eq,Ord,Generic) - -data Location = - InFossil {-# UNPACK #-} !FileKey !ByteString !NCQOffset !NCQSize - | InMemory {-# UNPACK #-} !ByteString - -instance Pretty Location where - pretty = \case - InFossil k _ o s -> parens $ "in-fossil" <+> pretty k <+> pretty o <+> pretty s - InMemory _ -> "in-memory" - -data TrackedFile = - TrackedFile - { tfTime :: FilePrio - , tfKey :: FileKey - , tfCached :: TVar (Maybe CachedEntry) - } - -data FactE = KeyIntersection FileKey FileKey Int - deriving (Eq,Ord,Show,Generic) - -type FactSeq = POSIXTime - -newtype FactKey = - FactKey ByteString - deriving newtype (Eq,Ord,Hashable) - -data Fact = - Facot - { factWritten :: Maybe FactSeq - , factE :: FactE - } - deriving (Eq,Ord,Show,Generic) - -instance Hashable FactE -instance Hashable Fact - -type TrackedFiles = Vector TrackedFile - -data NCQStorage2 = - NCQStorage2 - { ncqRoot :: FilePath - , ncqGen :: Int - , ncqSalt :: HashRef - , ncqPostponeMerge :: Timeout 'Seconds - , ncqPostponeSweep :: Timeout 'Seconds - , ncqLuckyNum :: Int - , ncqFsync :: Int - , ncqWriteQLen :: Int - , ncqWriteBlock :: Int - , ncqMinLog :: Int - , ncqMaxLog :: Int - , ncqMaxCached :: Int - , ncqIdleThrsh :: Double - , ncqMemTable :: Vector Shard - , ncqWriteQ :: TVar (Seq HashRef) - , ncqWriteOps :: Vector (TQueue (IO ())) - , ncqReadReq :: TQueue (HashRef, TMVar (Maybe Location)) - , ncqStorageTasks :: TVar Int - , ncqStorageStopReq :: TVar Bool - , ncqStorageSyncReq :: TVar Bool - , ncqMergeReq :: TVar Bool - , ncqMergeSem :: TSem - , ncqSyncNo :: TVar Int - , ncqCurrentFiles :: TVar (HashSet FileKey) - , ncqTrackedFiles :: TVar TrackedFiles - , ncqStateVersion :: TVar StateVersion - , ncqStateUsage :: TVar (IntMap (Int, HashSet FileKey)) - , ncqStateName :: TVar (Maybe (StateFile FileKey)) - , ncqStateSem :: TSem - , ncqCachedEntries :: TVar Int - , ncqWrites :: TVar Int - , ncqWriteEMA :: TVar Double -- for writes-per-seconds - , ncqJobQ :: TQueue (IO ()) - , ncqMiscSem :: TSem - , ncqSweepSem :: TSem - , ncqMergeTasks :: TVar Int - , ncqOnRunWriteIdle :: TVar (IO ()) - - , ncqFactFiles :: TVar (HashSet FileKey) - , ncqFacts :: TVar (HashMap FactKey Fact) - , ncqRndGen :: Gen RealWorld - } - - -ncqGetFileName :: NCQStorage2 -> FilePath -> FilePath -ncqGetFileName ncq fp = ncqGetWorkDir ncq takeFileName fp - -ncqGetWorkDir :: NCQStorage2 -> FilePath -ncqGetWorkDir NCQStorage2{..} = ncqRoot show ncqGen - -ncqGetLockFileName :: NCQStorage2 -> FilePath -ncqGetLockFileName ncq = ncqGetFileName ncq ".lock" - -ncqNewUniqFileName :: MonadIO m => NCQStorage2 -> FilePath -> FilePath -> m FilePath -ncqNewUniqFileName me@NCQStorage2{..} pref suff = liftIO $ withSem ncqMiscSem do - flip fix 0 $ \next i -> do - t <- round @_ @Integer . (* 1e9) <$> getPOSIXTime - let v = show $ pretty (showHex t "") <> "-" <> pretty (showHex i "") - let n = ncqGetFileName me (pref <> v <> suff) - doesFileExist n >>= \case - False -> pure n - True -> next (succ i) - -ncqEmptyKey :: ByteString -ncqEmptyKey = BS.replicate ncqKeyLen 0 - -ncqGetFactsDir :: NCQStorage2 -> FilePath -ncqGetFactsDir me = ncqGetWorkDir me ".facts" - -ncqGetNewFossilName :: MonadIO m => NCQStorage2 -> m FilePath -ncqGetNewFossilName me = ncqNewUniqFileName me "fossil-" ".data" - -ncqGetNewStateName :: MonadIO m => NCQStorage2 -> m FilePath -ncqGetNewStateName me = ncqNewUniqFileName me "state-" "" - -ncqGetNewCompactName :: MonadIO m => NCQStorage2 -> m FilePath -ncqGetNewCompactName me = ncqNewUniqFileName me "compact-" ".data" - -ncqGetNewFactFileName :: MonadIO m => NCQStorage2 -> m FilePath -ncqGetNewFactFileName me = do - ncqNewUniqFileName me (d "fact-") ".f" - where d = ncqGetFactsDir me - -ncqStorageStop2 :: MonadUnliftIO m => NCQStorage2 -> m () -ncqStorageStop2 NCQStorage2{..} = do - atomically $ writeTVar ncqStorageStopReq True - -ncqStorageSync2 :: MonadUnliftIO m => NCQStorage2 -> m () -ncqStorageSync2 NCQStorage2{..} = do - atomically $ writeTVar ncqStorageSyncReq True - -ncqShardIdx :: NCQStorage2 -> HashRef -> Int -ncqShardIdx NCQStorage2{..} h = - fromIntegral (BS.head (coerce h)) `mod` V.length ncqMemTable -{-# INLINE ncqShardIdx #-} - -ncqGetShard :: NCQStorage2 -> HashRef -> Shard -ncqGetShard ncq@NCQStorage2{..} h = ncqMemTable ! ncqShardIdx ncq h -{-# INLINE ncqGetShard #-} - - -useVersion :: forall m a . MonadUnliftIO m => NCQStorage2 -> (() -> m a) -> m a -useVersion ncq m = bracket succV predV m - where - succV = atomically (ncqStateUseSTM ncq) - predV = const $ atomically (ncqStateUnuseSTM ncq) - - -ncqStateUseSTM :: NCQStorage2 -> STM () -ncqStateUseSTM NCQStorage2{..} = do - k <- readTVar ncqStateVersion <&> fromIntegral - modifyTVar ncqStateUsage (IntMap.update (Just . over _1 succ) k) -{-# INLINE ncqStateUseSTM #-} - -ncqStateUnuseSTM :: NCQStorage2 -> STM () -ncqStateUnuseSTM NCQStorage2{..} = do - k <- readTVar ncqStateVersion <&> fromIntegral - -- TODO: remove when n <= 0 - modifyTVar ncqStateUsage (IntMap.update (Just . over _1 pred) k) -{-# INLINE ncqStateUnuseSTM #-} - -withSem :: forall a m . MonadUnliftIO m => TSem -> m a -> m a -withSem sem m = bracket enter leave (const m) - where enter = atomically (waitTSem sem) - leave = const $ atomically (signalTSem sem) - - - -ncqLookupIndex :: MonadUnliftIO m - => HashRef - -> (ByteString, NWayHash) - -> m (Maybe NCQIdxEntry ) -ncqLookupIndex hx (mmaped, nway) = do - fmap decodeEntry <$> nwayHashLookup nway mmaped (coerce hx) -{-# INLINE ncqLookupIndex #-} - -decodeEntry :: ByteString -> NCQIdxEntry -decodeEntry entryBs = do - let (p,r) = BS.splitAt 8 entryBs - let off = fromIntegral (N.word64 p) - let size = fromIntegral (N.word32 (BS.take 4 r)) - NCQIdxEntry off size -{-# INLINE decodeEntry #-} - - -isNotPending :: Maybe CachedEntry -> Bool -isNotPending = \case - Just (PendingEntry {}) -> False - _ -> True - -isPending :: Maybe CachedEntry -> Bool -isPending = not . isNotPending - - -ncqListTrackedFilesSTM :: NCQStorage2 -> STM (Vector (FileKey, Maybe CachedEntry, TVar (Maybe CachedEntry))) -ncqListTrackedFilesSTM NCQStorage2{..} = do - fs <- readTVar ncqTrackedFiles - for fs $ \TrackedFile{..} -> (tfKey,,) <$> readTVar tfCached <*> pure tfCached - -ncqListTrackedFiles :: MonadUnliftIO m => NCQStorage2 -> m (Vector (FileKey, Maybe CachedEntry, TVar (Maybe CachedEntry))) -ncqListTrackedFiles = atomically . ncqListTrackedFilesSTM - diff --git a/hbs2-tests/test/TestNCQ.hs b/hbs2-tests/test/TestNCQ.hs index 071ee419..38e1dada 100644 --- a/hbs2-tests/test/TestNCQ.hs +++ b/hbs2-tests/test/TestNCQ.hs @@ -23,7 +23,6 @@ import HBS2.System.Logger.Simple.ANSI import HBS2.Data.Log.Structured.SD import HBS2.Storage.NCQ -import HBS2.Storage.NCQ2 as N2 import HBS2.Data.Log.Structured.NCQ import HBS2.CLI.Run.Internal.Merkle @@ -555,770 +554,6 @@ testNCQConcurrent1 noRead tn n TestEnv{..} = flip runContT pure do rm ncqDir -testNCQ2Sweep1 :: forall c m . (MonadUnliftIO m, IsContext c) - => [Syntax c] - -> TestEnv - -> m () - -testNCQ2Sweep1 syn TestEnv{..} = do - debug $ "testNCQ2Sweep1" <+> pretty syn - let tmp = testEnvDir - let ncqDir = tmp - q <- newTQueueIO - - g <- liftIO MWC.createSystemRandom - - let (opts, argz) = splitOpts [] syn - - let n = headDef 100000 [ fromIntegral x | LitIntVal x <- argz ] - - bz <- replicateM n $ liftIO do - n <- (`mod` (256*1024)) <$> uniformM @Int g - uniformByteStringM n g - - notice $ "generate" <+> pretty n <+> "blocks" - - ncqWithStorage ncqDir $ \sto -> liftIO do - for bz $ \z -> do - h <- ncqPutBS sto (Just B) Nothing z - atomically $ writeTQueue q h - - ncqWithStorage ncqDir $ \sto -> liftIO do - notice $ red "PERFORM MERGE" - ncqMergeFull sto - - notice $ "full sweep unused states" - - ncqWithStorage ncqDir $ \sto -> liftIO do - ncqSweepStates sto - ncqSweepFossils sto - - notice $ "lookup" <+> pretty n <+> "blocks" - - ncqWithStorage ncqDir $ \sto -> liftIO do - hashes <- atomically (STM.flushTQueue q) - for_ hashes $ \ha -> do - found <- ncqLocate2 sto ha <&> maybe (-1) ncqEntrySize - assertBool (show $ "found" <+> pretty ha) (found > 0) - - - -testNCQ2Sweep2 :: forall c m . (MonadUnliftIO m, IsContext c) - => [Syntax c] - -> TestEnv - -> m () - -testNCQ2Sweep2 syn TestEnv{..} = do - debug $ "testNCQ2Sweep2" <+> pretty syn - let tmp = testEnvDir - let ncqDir = tmp - q <- newTQueueIO - - g <- liftIO MWC.createSystemRandom - - let (opts, argz) = splitOpts [] syn - - let n = headDef 100000 [ fromIntegral x | LitIntVal x <- argz ] - - notice $ "generate" <+> pretty n <+> "blocks" - - bz <- replicateM n $ liftIO do - n <- (`mod` (256*1024)) <$> uniformM @Int g - uniformByteStringM n g - - -- race (pause @'Seconds 260) do - - ncqWithStorage ncqDir $ \sto -> liftIO do - for_ bz $ \z -> do - h <- ncqPutBS sto (Just B) Nothing z - atomically $ writeTQueue q h - - notice "wait some time to see merge+sweep" - pause @'Seconds 240 - - ncqWithStorage ncqDir $ \sto -> liftIO do - hashes <- atomically (STM.flushTQueue q) - for_ hashes $ \ha -> do - found <- ncqLocate2 sto ha <&> maybe (-1) ncqEntrySize - assertBool (show $ "found" <+> pretty ha) (found > 0) - - - -testNCQ2Kill1 :: forall c m . (MonadUnliftIO m, IsContext c) - => [Syntax c] - -> TestEnv - -> m () - -testNCQ2Kill1 syn TestEnv{..} = flip runContT pure do - debug $ "testNCQ2Kill1" <+> pretty syn - let tmp = testEnvDir - let ncqDir = tmp - q <- newTQueueIO - - g <- liftIO MWC.createSystemRandom - - let (opts, argz) = splitOpts [] syn - - let n = headDef 100000 [ fromIntegral x | LitIntVal x <- argz ] - - notice $ "generate" <+> pretty n <+> "blocks" - - bz <- replicateM n $ liftIO do - n <- (`mod` (256*1024)) <$> uniformM @Int g - uniformByteStringM n g - - -- race (pause @'Seconds 260) do - - wIdle <- newEmptyTMVarIO - - ncq1 <- ContT $ withAsync $ ncqWithStorage ncqDir $ \sto -> liftIO do - ncqSetOnRunWriteIdle sto (atomically (putTMVar wIdle ())) - for_ bz $ \z -> do - h <- ncqPutBS sto (Just B) Nothing z - atomically $ writeTQueue q h - pause @'Seconds 300 - - notice $ red "WAIT FUCKING IDLE!" - - atomically $ takeTMVar wIdle - - notice $ red "GOT FUCKING IDLE!" <+> "lets see what happen now" - - cancel ncq1 - - liftIO $ ncqWithStorage ncqDir $ \sto -> liftIO do - hashes <- atomically (STM.flushTQueue q) - for_ hashes $ \ha -> do - found <- ncqLocate2 sto ha <&> maybe (-1) ncqEntrySize - assertBool (show $ "found" <+> pretty ha) (found > 0) - - -testNCQ2Simple1 :: forall c m . (MonadUnliftIO m, IsContext c) - => [Syntax c] - -> TestEnv - -> m () - -testNCQ2Simple1 syn TestEnv{..} = do - debug $ "testNCQ2Simple1" <+> pretty syn - let tmp = testEnvDir - let ncqDir = tmp - q <- newTQueueIO - - g <- liftIO MWC.createSystemRandom - - let (opts, argz) = splitOpts [] syn - - let n = headDef 100000 [ fromIntegral x | LitIntVal x <- argz ] - let l = headDef 5 $ drop 1 [ fromIntegral x | LitIntVal x <- argz ] - let s = headDef (256*1024) $ drop 2 [ fromIntegral (1024 * x) | LitIntVal x <- argz ] - - - notice $ "insert" <+> pretty n <+> "random blocks of size" <+> pretty s - - thashes <- newTQueueIO - - ncqWithStorage ncqDir $ \sto -> liftIO do - replicateM_ n do - n <- (`mod` s) <$> uniformM @Int g - z <- uniformByteStringM n g - h <- ncqPutBS sto (Just B) Nothing z - found <- ncqLocate2 sto h <&> maybe (-1) ncqEntrySize - atomically $ writeTQueue q h - assertBool (show $ "found-immediate" <+> pretty h) (found > 0) - atomically $ writeTQueue thashes h - - t0 <- getTimeCoarse - - hs <- atomically $ STM.flushTQueue thashes - - flip fix (t0, List.length hs, hs) $ \loop (tp, num, xs) -> case xs of - [] -> none - (ha:rest) -> do - t1 <- getTimeCoarse - - t2 <- if realToFrac (toNanoSecs (t1 - t0)) / 1e9 < 1.00 then do - pure tp - else do - notice $ green "lookup" <+> pretty num - pure t1 - - found <- ncqLocate2 sto ha <&> maybe (-1) ncqEntrySize - assertBool (show $ "found" <+> pretty ha) (found > 0) - unless (List.null hs) $ loop (t1, pred num, rest) - - hashes <- atomically (STM.flushTQueue q) - - notice $ "merge data" - - ncqWithStorage ncqDir $ \sto -> liftIO do - notice "perform merge" - ncqMergeFull sto - ncqSweepStates sto - ncqSweepFossils sto - - notice $ "full sweep unused states" - - ncqWithStorage ncqDir $ \sto -> liftIO do - ncqSweepStates sto - ncqSweepFossils sto - - notice $ "lookup" <+> pretty n <+> "blocks" - - ncqWithStorage ncqDir $ \sto -> liftIO do - - replicateM_ l do - - t0 <- getTimeCoarse - - pooledForConcurrentlyN_ 8 hashes $ \ha -> do - found <- ncqLocate2 sto ha <&> maybe (-1) ncqEntrySize - assertBool (show $ "found" <+> pretty ha) (found > 0) - -- debug $ fill 44 (pretty ha) <+> fill 8 (pretty found) - - t1 <- getTimeCoarse - let dt = realToFrac (toNanoSecs (t1 - t0)) / 1e9 :: Fixed E3 - - notice $ pretty (sec6 dt) <+> "lookup" <+> pretty n <+> "blocks" - -testNCQ2Lookup2:: forall c m . (MonadUnliftIO m, IsContext c) - => [Syntax c] - -> TestEnv - -> m () - -testNCQ2Lookup2 syn TestEnv{..} = do - debug $ "testNCQ2Lookup2" <+> pretty syn - let tmp = testEnvDir - let ncqDir = tmp - q <- newTQueueIO - - g <- liftIO MWC.createSystemRandom - - let (opts, argz) = splitOpts [("-m",0)] syn - - let n = headDef 100000 [ fromIntegral x | LitIntVal x <- argz ] - let nt = max 2 . headDef 1 $ [ fromIntegral x | LitIntVal x <- drop 1 argz ] - let nl = headDef 3 $ [ fromIntegral x | LitIntVal x <- drop 2 argz ] - let r = (64*1024, 256*1024) - - let merge = headDef False [ True | ListVal [StringLike "-m"] <- opts ] - - notice $ "insert" <+> pretty n <+> "random blocks of size" <+> parens (pretty r) <+> pretty opts - - thashes <- newTQueueIO - - sizes <- liftIO $ replicateM n (uniformRM r g ) - - res <- newTQueueIO - - ncqWithStorage ncqDir $ \sto -> liftIO do - pooledForConcurrentlyN_ 8 sizes $ \size -> do - z <- uniformByteStringM size g - h <- ncqPutBS sto (Just B) Nothing z - atomically $ writeTQueue thashes h - - hs <- atomically $ STM.flushTQueue thashes - - when merge do - notice "merge full" - ncqMergeFull sto - - ffs <- N2.ncqListTrackedFiles sto - notice $ "database prepared" <+> pretty (List.length ffs) <+> pretty (List.length hs) - - replicateM_ nl do - - tfound <- newTVarIO 0 - - t0 <- getTimeCoarse - - liftIO $ pooledForConcurrentlyN_ nt hs $ \h -> do - found <- ncqLocate2 sto h <&> isJust - when found do - atomically $ modifyTVar' tfound succ - - t1 <- getTimeCoarse - - let dt = realToFrac (toNanoSecs (t1 - t0)) / 1e9 :: Fixed E3 - atomically $ writeTQueue res dt - - found <- readTVarIO tfound - - notice $ "scan all files" <+> pretty dt <+> pretty found - - m <- atomically (STM.flushTQueue res) - <&> List.sort - <&> \x -> atDef 0 x (List.length x `quot` 2) - - notice $ "median" <+> pretty m - - -testNCQ2Lookup1:: forall c m . (MonadUnliftIO m, IsContext c) - => [Syntax c] - -> TestEnv - -> m () - -testNCQ2Lookup1 syn TestEnv{..} = do - debug $ "testNCQ2Lookup1" <+> pretty syn - let tmp = testEnvDir - let ncqDir = tmp - q <- newTQueueIO - - g <- liftIO MWC.createSystemRandom - - let (opts, argz) = splitOpts [("-r",1),("-m",0)] syn - - let n = headDef 100000 [ fromIntegral x | LitIntVal x <- argz ] - let nt = max 2 . headDef 1 $ [ fromIntegral x | LitIntVal x <- drop 1 argz ] - let nl = headDef 3 $ [ fromIntegral x | LitIntVal x <- drop 2 argz ] - let r = (4*1024, 64*1024) - - let rt = headDef 2 [ fromIntegral x | ListVal [StringLike "-r", LitIntVal x ] <- opts ] - let merge = headDef False [ True | ListVal [StringLike "-m"] <- opts ] - - notice $ "insert" <+> pretty n <+> "random blocks of size" <+> parens (pretty r) <+> pretty opts - - thashes <- newTQueueIO - - sizes <- liftIO $ replicateM n (uniformRM r g ) - - ncqWithStorage ncqDir $ \sto -> liftIO do - pooledForConcurrentlyN_ 8 sizes $ \size -> do - z <- uniformByteStringM size g - h <- ncqPutBS sto (Just B) Nothing z - atomically $ writeTQueue thashes h - - - hs <- atomically $ STM.flushTQueue thashes - - when merge do - notice "merge full" - ncqMergeFull sto - - ffs <- N2.ncqListTrackedFiles sto - notice $ "database prepared" <+> pretty (List.length ffs) <+> pretty (List.length hs) - - res <- newTQueueIO - - replicateM_ nl do - - tfound <- newTVarIO 0 - - t0 <- getTimeCoarse - - void $ flip runContT pure $ callCC \exit -> do - - readQ <- newTQueueIO - - reader <- replicateM rt $ ContT $ withAsync $ fix \next -> do - - (h, answ) <- atomically $ readTQueue readQ - - ncqLookupEntry sto h >>= \case - Nothing -> none - Just e -> atomically (putTMVar answ (Just (InMemory (ncqEntryData e)))) >> next - - ffs <- readTVarIO $ (N2.ncqTrackedFiles sto) - - for_ ffs $ \TrackedFile{..} -> do - readTVarIO tfCached >>= \case - - Just (PendingEntry{}) -> none - - Just (CachedEntry{..}) -> do - ncqLookupIndex h (cachedMmapedIdx, cachedNway) >>= \case - Nothing -> none - Just (NCQIdxEntry o s) -> atomically (putTMVar answ (Just (N2.InFossil tfKey cachedMmapedData o s))) >> next - - Nothing -> do - - ncqLoadTrackedFile sto TrackedFile{..} >>= \case - Nothing -> err "FUCK" >> next - Just PendingEntry -> next - Just CachedEntry{..} -> do - ncqLookupIndex h (cachedMmapedIdx, cachedNway) >>= \case - Nothing -> none - Just (NCQIdxEntry o s) -> atomically (putTMVar answ (Just (N2.InFossil tfKey cachedMmapedData o s))) >> next - - atomically (putTMVar answ Nothing) >> next - - liftIO $ pooledForConcurrentlyN_ nt hs $ \h -> do - answ <- newEmptyTMVarIO - atomically $ writeTQueue readQ (h, answ) - found <- atomically $ takeTMVar answ - - when (isJust found) do - atomically $ modifyTVar' tfound succ - - t1 <- getTimeCoarse - - let dt = realToFrac (toNanoSecs (t1 - t0)) / 1e9 :: Fixed E3 - atomically $ writeTQueue res dt - - found <- readTVarIO tfound - - notice $ "scan all files" <+> pretty dt <+> pretty found - - m <- atomically (STM.flushTQueue res) - <&> List.sort - <&> \x -> atDef 0 x (List.length x `quot` 2) - - notice $ "median" <+> pretty m - - - -testNCQ2Merge1 :: MonadUnliftIO m - => Int - -> TestEnv - -> m () - -testNCQ2Merge1 n TestEnv{..} = do - let tmp = testEnvDir - let ncqDir = tmp - - g <- liftIO MWC.createSystemRandom - - let fake = n `div` 3 - - ncqWithStorage ncqDir $ \sto -> liftIO do - - notice $ "write" <+> pretty n <+> "random blocks" - - ws <- flip fix (mempty :: HashSet HashRef) $ \loop -> \case - hs | HS.size hs >= n -> pure hs - | otherwise -> do - - s <- liftIO $ genRandomBS g (256 * 1024) - h <- ncqPutBS sto (Just B) Nothing s - loop (HS.insert h hs) - - notice $ "written" <+> pretty (HS.size ws) - - assertBool "all written" (HS.size ws == n) - - nHashes <- HS.fromList . filter (not . flip HS.member ws) <$> replicateM fake do - liftIO (genRandomBS g (64*1024)) <&> HashRef . hashObject - - notice $ "gen" <+> pretty (HS.size nHashes) <+> pretty "missed hashes" - - - (t1,n1) <- over _2 sum <$> timeItT do - for (HS.toList ws) $ \h -> do - r <- ncqLocate2 sto h - - unless (isJust r) do - err $ "not found" <+> pretty h - - pure $ maybe 0 (const 1) r - - notice $ pretty (sec3 t1) <+> pretty n1 <+> pretty (n1 == HS.size ws) - - assertBool "all written" (n1 == HS.size ws) - - ncqWaitTasks sto - - let hashes = HS.toList ws <> HS.toList nHashes - - (t2,_) <- timeItT do - for hashes $ \h -> do - r <- ncqLocate2 sto h - pure $ maybe 0 (const 1) r - - notice $ "before-merge" <+> pretty (sec3 t1) <+> pretty (List.length hashes) - - notice $ "merge whatever possible" - - n <- flip fix 0 \next i -> do - N2.ncqMergeStep sto >>= \case - False -> pure i - True -> next (succ i) - - notice $ "merged" <+> pretty n - - (t3,r) <- timeItT do - for hashes $ \h -> do - ncqLocate2 sto h >>= \case - Nothing -> pure $ Left h - Just{} -> pure $ Right h - - let w1 = HS.fromList (rights r) - let n2 = HS.fromList (lefts r) - - notice $ "after-merge" <+> pretty (sec3 t3) <+> pretty (HS.size w1) <+> pretty (HS.size n2) - - pause @'Seconds 300 - -testFilterEmulate1 :: MonadUnliftIO m - => Bool - -> Int - -> TestEnv - -> m () - -testFilterEmulate1 doMerge n TestEnv{..} = do - let tmp = testEnvDir - let ncqDir = tmp - - g <- liftIO MWC.createSystemRandom - - bz <- replicateM n $ liftIO do - n <- (`mod` (64*1024)) <$> uniformM @Int g - uniformByteStringM n g - - - hs' <- newTVarIO (mempty :: HashSet HashRef) - noHs' <- newTVarIO (mempty :: HashSet HashRef) - - ncqWithStorage ncqDir $ \sto -> liftIO do - - for bz $ \z -> do - h <- ncqPutBS sto (Just B) Nothing z - atomically $ modifyTVar' hs' (HS.insert h) - - replicateM_ (max 100 (n `div` 3)) $ liftIO do - n <- (`mod` (64*1024)) <$> uniformM @Int g - fake <- HashRef . hashObject @HbSync <$> uniformByteStringM n g - atomically $ modifyTVar' noHs' (HS.insert fake) - - hs <- readTVarIO hs' - noHs <- readTVarIO noHs' - - let allShit = HS.toList hs <> HS.toList noHs - - let bloom = easyList 0.01 (fmap (coerce @_ @ByteString) (HS.toList hs)) - - let bucno e = hash e `mod` 4096 - - let dumb = IntSet.fromList [ bucno k | k <- HS.toList hs ] - - ncqWithStorage ncqDir $ \sto -> liftIO do - - when doMerge do - notice "merge data" - fix $ \next -> ncqMergeStep sto >>= \case - True -> next - False -> none - - for_ [1..5] $ \i -> do - - notice $ "-- pass" <+> pretty i <+> "--" - - (t1,_) <- timeItT do - for_ allShit $ \ha -> do - void $ ncqLocate2 sto ha - - notice $ "lookup-no-filter" <+> pretty (realToFrac @_ @(Fixed E3) t1) - - (t2,_) <- timeItT do - for_ allShit $ \ha -> do - unless (HS.member ha noHs) do - void $ ncqLocate2 sto ha - - notice $ "lookup-fake-filter" <+> pretty (realToFrac @_ @(Fixed E3) t2) - - (t3,_) <- timeItT do - for_ allShit $ \ha -> do - let here = IntSet.member (bucno ha) dumb - when here do - void $ ncqLocate2 sto ha - - notice $ "lookup-dumb-filter" <+> pretty (realToFrac @_ @(Fixed E3) t3) - - (t4,_) <- timeItT do - for_ allShit $ \ha -> do - let here = Bloom.elem (coerce ha) bloom - when here do - void $ ncqLocate2 sto ha - - notice $ "lookup-simple-bloom-filter" <+> pretty (realToFrac @_ @(Fixed E3) t4) - - -testNCQ2Repair1:: MonadUnliftIO m - => TestEnv - -> m () - -testNCQ2Repair1 TestEnv{..} = do - debug "testNCQ2Repair1" - let tmp = testEnvDir - let ncqDir = tmp - q <- newTQueueIO - - g <- liftIO MWC.createSystemRandom - - bz <- replicateM 3000 $ liftIO do - n <- (`mod` (256*1024)) <$> uniformM @Int g - uniformByteStringM n g - - ncqWithStorage ncqDir $ \sto -> liftIO do - for_ bz $ \z -> do - h <- ncqPutBS sto (Just B) Nothing z - atomically $ writeTQueue q h - found <- ncqLocate2 sto h <&> maybe (-1) ncqEntrySize - assertBool (show $ "found-immediate" <+> pretty h) (found > 0) - - ncqWithStorage ncqDir $ \sto -> liftIO do - written <- N2.ncqListDirFossils sto - debug $ "TRACKED" <+> vcat (fmap pretty written) - toDestroy <- pure (headMay written) `orDie` "no file written" - - debug $ "adding garbage to" <+> pretty toDestroy - - k <- (`mod` 4096) <$> uniformM @Int g - shit <- uniformByteStringM k g - let df = toFileName (DataFile toDestroy) - let f = N2.ncqGetFileName sto df - let cq = N2.ncqGetFileName sto (toFileName (IndexFile toDestroy)) - rm cq - BS.appendFile f shit - - notice "after destroying storage" - - ncqWithStorage ncqDir $ \sto -> liftIO do - hashes <- atomically (STM.flushTQueue q) - for_ hashes $ \ha -> do - found <- ncqLocate2 sto ha <&> maybe (-1) ncqEntrySize - none - -- assertBool (show $ "found-immediate" <+> pretty ha) (found > 0) - -- debug $ fill 44 (pretty ha) <+> fill 8 (pretty found) - - -testWriteNThreads :: forall g m . (MonadUnliftIO m) - => FilePath - -> Int - -> Int - -> m () -testWriteNThreads ncqDir tnn n = do - - g <- liftIO MWC.createSystemRandom - - wtf <- liftIO getPOSIXTime <&> show . round - - t0 <- getTimeCoarse - - w <- ncqWithStorage (ncqDir wtf <> show tnn) $ \sto -> do - ss <- liftIO $ replicateM n $ MWC.uniformRM (64*1024, 256*1024) g - - pooledForConcurrentlyN_ tnn ss $ \len -> do - tbs <- liftIO $ genRandomBS g len - ncqPutBS sto (Just B) Nothing tbs - -- atomically $ modifyTVar' tss (+ len) - - -- 32 bytes per key, 4 per len - pure $ (List.length ss * 36) + sum ss - - t1 <- getTimeCoarse - - let t = realToFrac (toNanoSecs (t1 - t0)) / 1e9 - - let total = realToFrac w - - let speed = if t > 0 then total / t else 0 - let totMegs = realToFrac @_ @(Fixed E2) $ total / (1024**2) - let speedMbs = realToFrac @_ @(Fixed E2) $ speed / (1024**2) - - notice $ pretty tnn <+> pretty (sec2 t) <+> pretty totMegs <+> pretty speedMbs - - -testNCQ2Concurrent1 :: MonadUnliftIO m - => Bool - -> Int - -> Int - -> TestEnv - -> m () - -testNCQ2Concurrent1 noRead tn n TestEnv{..} = flip runContT pure do - - let tmp = testEnvDir - let inputDir = tmp "input" - let ncqDir = tmp "ncq" - - debug "preparing" - - mkdir inputDir - - debug $ pretty inputDir - - g <- liftIO MWC.createSystemRandom - - log <- liftIO $ Temp.emptyTempFile inputDir "log-.bin" - - (t0,size) <- timeItT do - liftIO $ withFile log IO.AppendMode $ \hlog -> do - replicateM_ n do - size <- MWC.uniformRM (64*1024, 256*1024) g - tbs <- genRandomBS g size - let ha = hashObject @HbSync tbs - let ss = coerce ha <> tbs - let bssize = N.bytestring32 (fromIntegral $ BS.length ss) - BS.hPut hlog (bssize <> ss) - getFileSize log - - - let mbps = realToFrac size / (1024**2) - let v0 = mbps / t0 - notice $ "baseline" <+> pretty n - <+> pretty (sec3 t0) - <+> pretty (realToFrac @_ @(Fixed E2) mbps) - <+> pretty (sec2 v0) - - - for_ [1..tn] $ \tnn -> liftIO do - testWriteNThreads ncqDir tnn n - - - -testNCQ2Concurrent2 :: MonadUnliftIO m - => Int -- ^ threads - -> Int -- ^ times - -> Int -- ^ blocks - -> TestEnv - -> m () - -testNCQ2Concurrent2 tn times n TestEnv{..} = flip runContT pure do - replicateM_ times do - lift $ testWriteNThreads testEnvDir tn n - -testNCQ2ConcurrentWriteSimple1 :: MonadUnliftIO m - => Int - -> Int - -> TestEnv - -> m () - -testNCQ2ConcurrentWriteSimple1 tn n TestEnv{..} = flip runContT pure do - - let tmp = testEnvDir - let inputDir = tmp "input" - let ncqDir = tmp "ncq-test-data" - - debug "preparing" - - mkdir inputDir - - debug $ pretty inputDir - - filez <- liftIO $ pooledReplicateConcurrentlyN 8 n $ do - size <- randomRIO (64*1024, 256*1024) - w <- liftIO (randomIO :: IO Word8) - let tbs = BS.replicate size w -- replicateM size w <&> BS.pack - let ha = hashObject @HbSync tbs -- & show . pretty - let fn = inputDir show (pretty ha) - liftIO $ BS.writeFile fn tbs - pure (fn, ha, BS.length tbs) - - debug "done" - - let fnv = V.fromList filez - let ssz = sum [ s | (_,_,s) <- filez ] & realToFrac - - -- setLoggingOff @DEBUG - - ncq1 <- ncqStorageOpen2 ncqDir (\x -> x { ncqFsync = 64^(1024^2) } ) - w <- ContT $ withAsync (ncqStorageRun2 ncq1) - - liftIO $ pooledForConcurrentlyN_ tn fnv $ \(n,ha,_) -> do - co <- BS.readFile n - ncqPutBS ncq1 (Just B) Nothing co - - liftIO $ ncqStorageStop2 ncq1 - wait w main :: IO () main = do @@ -1422,14 +657,6 @@ main = do e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "test:ncq:concurrent2" $ nil_ $ \case - [ LitIntVal tn, LitIntVal times, LitIntVal n ] -> do - debug $ "ncq:concurrent2" <+> pretty tn <+> pretty n - runTest $ testNCQ2Concurrent2 (fromIntegral tn) (fromIntegral times) (fromIntegral n) - - e -> throwIO $ BadFormException @C (mkList e) - entry $ bindMatch "test:ncq:concurrent1:wo" $ nil_ $ \case [ LitIntVal tn, LitIntVal n ] -> do debug $ "ncq:concurrent1" <+> pretty tn <+> pretty n @@ -1465,305 +692,6 @@ main = do e -> throwIO $ BadFormException @C (mkList e) - entry $ bindMatch "test:ncq2:merge1" $ nil_ $ \case - [ LitIntVal n ] -> do - runTest $ testNCQ2Merge1 (fromIntegral n) - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "test:ncq2:concurrent1" $ nil_ $ \case - [ LitIntVal tn, LitIntVal n ] -> do - debug $ "ncq2:concurrent1" <+> pretty tn <+> pretty n - runTest $ testNCQ2Concurrent1 False ( fromIntegral tn) (fromIntegral n) - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "test:ncq2:simple1" $ nil_ $ \e -> do - runTest (testNCQ2Simple1 e) - - entry $ bindMatch "test:ncq2:lookup1" $ nil_ $ \e -> do - runTest (testNCQ2Lookup1 e) - - entry $ bindMatch "test:ncq2:lookup2" $ nil_ $ \e -> do - runTest (testNCQ2Lookup2 e) - - entry $ bindMatch "test:ncq2:sweep1" $ nil_ $ \e -> do - runTest (testNCQ2Sweep1 e) - - entry $ bindMatch "test:ncq2:kill1" $ nil_ $ \e -> do - runTest (testNCQ2Kill1 e) - - entry $ bindMatch "test:ncq2:sweep2" $ nil_ $ \e -> do - runTest (testNCQ2Sweep2 e) - - entry $ bindMatch "test:ncq2:repair1" $ nil_ $ const $ do - runTest testNCQ2Repair1 - - entry $ bindMatch "test:ncq2:filefastcheck" $ nil_ $ \case - [ StringLike fn ] -> do - ncqFileFastCheck fn - - e -> throwIO $ BadFormException @C (mkList e) - - - entry $ bindMatch "test:ncq2:wtf1" $ nil_ $ const do - runTest $ \TestEnv{..} -> do - let dir = testEnvDir - r1 <- ncqWithStorage dir $ \sto -> do - h <- ncqPutBS sto (Just B) Nothing "JOPAKITAPECHENTRESKI" - loc <- ncqLocate2 sto h `orDie` "not found shit" - let re@(k,r) = ncqEntryUnwrap sto $ ncqGetEntryBS sto loc - notice $ pretty "MEM" <+> pretty (ncqEntrySize loc) <+> pretty (coerce @_ @HashRef k) <+> viaShow r - pure re - - ncqWithStorage dir $ \sto -> do - let (k,v) = r1 - loc <- ncqLocate2 sto (coerce k) `orDie` "not found shit" - let s0 = ncqGetEntryBS sto loc - let (k1,r1) = ncqEntryUnwrap sto s0 - notice $ "FOSSIL:" <+> pretty (ncqEntrySize loc) <+> pretty (coerce @_ @HashRef k1) <+> viaShow r1 - assertBool "written-same" (r1 == v && k == k1) - - - - entry $ bindMatch "test:ncq2:scan-index" $ nil_ \case - [ StringLike dir, HashLike item ] -> do - notice $ "SCAN DIR" <+> pretty dir <+> pretty item - - ncqWithStorage dir $ \sto@NCQStorage2{..} -> do - - -- let d = N2.ncqGetFileName sto "" - - -- files <- dirFiles d <&> List.filter (List.isSuffixOf ".cq") - - -- files <- N2.ncqListTrackedFiles sto - - tracked <- N2.ncqListTrackedFiles sto - - for_ tracked $ \(k,_,_) -> do - - let indexFile = N2.ncqGetFileName sto (toFileName (IndexFile k)) - - (idxBs, idxNway) <- liftIO (nwayHashMMapReadOnly indexFile) - >>= orThrow (NCQStorageCantMapFile indexFile) - - - notice $ "scan file" <+> pretty indexFile - - stat <- liftIO $ PFS.getFileStatus indexFile - -- -- FIXME: maybe-creation-time-actually - let ts = posixToTimeSpec $ PFS.modificationTimeHiRes stat - - nwayHashScanAll idxNway idxBs $ \_ k v -> do - when (coerce k == item ) do - - let off = fromIntegral $ N.word64 (BS.take 8 v) - let size = fromIntegral $ N.word32 (BS.take 4 (BS.drop 8 v)) - - notice $ yellow "found" - <+> pretty (fromString @FileKey indexFile) - <+> pretty (fromIntegral @_ @Word64 ts) - <+> pretty (off,size,item) - <+> pretty (foldMap (`showHex` "") (BS.unpack v) ) - - -- datBs <- liftIO $ mmapFileByteString dataFile Nothing - - - none - - e -> throwIO (BadFormException (mkList e)) - - - entry $ bindMatch "test:ncq2:del2" $ nil_ $ \syn -> do - - runTest $ \TestEnv{..} -> do - g <- liftIO MWC.createSystemRandom - let dir = testEnvDir - - let (_, argz) = splitOpts [] syn - let n = headDef 50000 [ fromIntegral x | LitIntVal x <- argz ] - let p0 = headDef 0.25 [ realToFrac x | LitScientificVal x <- drop 1 argz ] - - thashes <- newTVarIO mempty - - ncqWithStorage dir $ \sto@NCQStorage2{..} -> do - - sizes <- replicateM n $ uniformRM (32*1024, 256*1024) g - - notice $ "write" <+> pretty n <+> "blocks" - pooledForConcurrentlyN_ 16 sizes $ \s -> do - h <- ncqPutBS sto (Just B) Nothing =<< genRandomBS g s - - p1 <- uniformRM @Double (0, 1) g - - when (p1 < p0) do - ncqDelEntry sto h - atomically $ modifyTVar thashes (HS.insert h) - - deleted <- readTVarIO thashes - - tombs <- for (HS.toList deleted) $ \d -> do - ncqLocate2 sto d <&> maybe False (N2.ncqIsTomb sto) - - let tnum = sum [ 1 | x <- tombs, x ] - - notice $ "should be deleted" <+> pretty (HS.size deleted) <+> "/" <+> pretty tnum - - t0 <- getTimeCoarse - - ncqCompactStep sto - - t1 <- getTimeCoarse - - let dt = timeSpecDeltaSeconds @(Fixed E6) t0 t1 - - notice $ "ncqCompactStep time" <+> pretty dt - - none - - - entry $ bindMatch "test:ncq2:del1" $ nil_ $ \syn -> do - - runTest $ \TestEnv{..} -> do - g <- liftIO MWC.createSystemRandom - let dir = testEnvDir - - let (opts, argz) = splitOpts [("-m",0)] syn - let n = headDef 10000 [ fromIntegral x | LitIntVal x <- argz ] - - let merge = or [ True | ListVal [StringLike "-m"] <- opts ] - - thashes <- newTVarIO mempty - - ncqWithStorage dir $ \sto@NCQStorage2{..} -> do - - notice $ "write+immediate delete" <+> pretty n <+> "records" - - hashes <- replicateM n do - - h <- ncqPutBS sto (Just B) Nothing =<< genRandomBS g (64*1024) - ncqDelEntry sto h - - t <- (ncqLocate2 sto h <&> fmap (N2.ncqIsTomb sto)) - >>= orThrowUser ("missed" <+> pretty h) - - assertBool "tomb/1" t - - pure h - - atomically $ writeTVar thashes (HS.fromList hashes) - - flip runContT pure $ callCC \exit -> do - - for_ hashes $ \h -> do - l <- lift (ncqLocate2 sto h) - >>= orThrowUser ("missed" <+> pretty h) - - unless (N2.ncqIsTomb sto l) do - let (k,e') = ncqEntryUnwrap sto (ncqGetEntryBS sto l) - - e <- orThrowUser "bad entry" e' - err $ pretty l - err $ "WTF?" <+> pretty (coerce @_ @HashRef k) <+> pretty h <+> viaShow (fst e) - lfs <- readTVarIO ncqTrackedFiles - - for_ lfs $ \TrackedFile{..} -> do - npe <- readTVarIO tfCached <&> isNotPending - err $ "FILE" <+> pretty npe <+> pretty tfKey - - exit () - - when merge do - ncqWithStorage dir \sto -> do - ncqMergeFull sto - - ncqWithStorage dir $ \sto -> do - -- notice "check deleted" - hashes <- readTVarIO thashes - - for_ hashes $ \h -> do - - ncqLocate2 sto h >>= \case - Nothing -> notice $ "not-found" <+> pretty h - Just loc -> do - - what <- (ncqLocate2 sto h <&> fmap (ncqGetEntryBS sto)) - >>= orThrowUser "NOT FOUND" - - let (k,wtf) = ncqEntryUnwrap sto what - let tomb = N2.ncqIsTomb sto loc - - -- debug $ pretty (coerce @_ @HashRef k) <+> viaShow wtf <+> pretty tomb - - assertBool (show $ "tomb/3" <+> pretty h) tomb - - - entry $ bindMatch "test:ncq2:concurrent:write:simple1" $ nil_ $ \case - [ LitIntVal tn, LitIntVal n ] -> do - runTest $ testNCQ2ConcurrentWriteSimple1 ( fromIntegral tn) (fromIntegral n) - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "test:ncq2:ema" $ nil_ $ const do - notice "test:ncq2:ema" - runTest $ \TestEnv{..} -> do - g <- liftIO MWC.createSystemRandom - let dir = testEnvDir "ncq1" - let n = 50000 - ncqWithStorage dir $ \sto -> do - replicateM_ n do - ncqPutBS sto (Just B) Nothing =<< genRandomBS g (256*1024) - - notice $ "written" <+> pretty n - - pause @'Seconds 120 - - entry $ bindMatch "test:filter:emulate-1" $ nil_ $ \case - [ LitIntVal n ] -> runTest $ testFilterEmulate1 False (fromIntegral n) - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "test:filter:emulate:merged" $ nil_ $ \case - [ LitIntVal n ] -> runTest $ testFilterEmulate1 True (fromIntegral n) - e -> throwIO $ BadFormException @C (mkList e) - - - entry $ bindMatch "test:ncq2:facts-db1" $ nil_ $ \e -> do - notice "test:ncq2:probes-db1" - runTest $ \TestEnv{..} -> do - g <- liftIO MWC.createSystemRandom - let dir = testEnvDir - let n = 100000 - let p = 0.25 - - sizes <- replicateM n (uniformRM (4096, 256*1024) g) - - hashes <- newTVarIO (mempty :: IntMap HashRef) - - ncqWithStorage dir $ \sto -> void $ flip runContT pure do - notice $ "write" <+> pretty (List.length sizes) <+> pretty "random blocks" - - ContT $ withAsync $ forever do - pause @'Seconds 0.01 - p1 <- uniformRM (0,1) g - when (p1 <= p) do - hss <- readTVarIO hashes - let s = maybe 0 fst $ IntMap.lookupMax hss - i <- uniformRM (0,s) g - let hm = IntMap.lookup i hss - for_ hm $ \h -> do - ncqDelEntry sto h - atomically $ modifyTVar hashes (IntMap.delete i) - - liftIO $ pooledForConcurrentlyN_ 8 sizes $ \s -> do - h <- ncqPutBS sto (Just B) Nothing =<< genRandomBS g s - atomically do - i <- readTVar hashes <&> IntMap.size - modifyTVar hashes (IntMap.insert i h) - - notice $ "written" <+> pretty n - - pause @'Seconds 300 - -- NCQ3 tests ncq3Tests