diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs index 9c994dc3..a22f9e78 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs @@ -110,6 +110,11 @@ type Shard = TVar (HashMap HashRef NCQEntry) type NCQOffset = Word64 type NCQSize = Word32 +type StateVersion = Word64 + +data StateOP = D FileKey | F TimeSpec FileKey + deriving (Eq,Ord,Show) + data NCQFlag = NCQMergeNow | NCQCompactNow deriving (Eq,Ord,Generic) @@ -137,6 +142,8 @@ data NCQStorage2 = , ncqStorageSyncReq :: TVar Bool , ncqSyncNo :: TVar Int , ncqTrackedFiles :: TVar (HashPSQ FileKey FilePrio (Maybe CachedEntry)) + , ncqStateVersion :: TVar StateVersion + , ncqStateUsage :: TVar (IntMap (Int, HashSet FileKey)) , ncqCachedEntries :: TVar Int , ncqWrites :: TVar Int , ncqWriteEMA :: TVar Double -- for writes-per-seconds @@ -165,6 +172,8 @@ ncqStorageOpen2 fp upd = do ncqStorageSyncReq <- newTVarIO False ncqSyncNo <- newTVarIO 0 ncqTrackedFiles <- newTVarIO HPSQ.empty + ncqStateVersion <- newTVarIO 0 + ncqStateUsage <- newTVarIO mempty ncqCachedEntries <- newTVarIO 0 ncqStorageTasks <- newTVarIO 0 ncqWrites <- newTVarIO 0 @@ -206,6 +215,10 @@ ncqGetNewFossilName :: MonadIO m => NCQStorage2 -> m FilePath ncqGetNewFossilName ncq = do liftIO $ emptyTempFile (ncqGetWorkDir ncq) "fossil-.data" +ncqGetNewStateName :: MonadIO m => NCQStorage2 -> m FilePath +ncqGetNewStateName ncq = do + liftIO $ emptyTempFile (ncqGetWorkDir ncq) "state-" + ncqGetNewCompactName :: MonadIO m => NCQStorage2 -> m FilePath ncqGetNewCompactName n@NCQStorage2{} = do let (p,tpl) = splitFileName (ncqGetFileName n "compact-.data") @@ -279,7 +292,8 @@ ncqLocate2 ncq@NCQStorage2{..} href = flip runContT pure $ callCC \exit -> do lift (ncqLookupEntry ncq href) >>= maybe none (exit . Just . InMemory . coerce) - -- atomically $ modifyTVar' ncqWrites succ + atomically do + modifyTVar' ncqWrites succ -- FIXME: race -- merge can-delete-file-while-in-use @@ -295,7 +309,7 @@ ncqLocate2 ncq@NCQStorage2{..} href = flip runContT pure $ callCC \exit -> do atomically $ writeTVar cachedTs now exit (Just $ InFossil cachedMmapedData offset size) - Nothing -> do + Nothing -> useVersion do let indexFile = ncqGetFileName ncq (toFileName (IndexFile fk)) let dataFile = ncqGetFileName ncq (toFileName (DataFile fk)) @@ -320,6 +334,12 @@ ncqLocate2 ncq@NCQStorage2{..} href = flip runContT pure $ callCC \exit -> do pure mzero where + + useVersion m = ContT (bracket succV predV) >> m + where + succV = atomically (ncqStateUseSTM ncq) + predV = const $ atomically (ncqStateUseSTM ncq) + lookupEntry (hx :: HashRef) (mmaped, nway) = liftIO (nwayHashLookup nway mmaped (coerce hx)) >>= \case Nothing -> pure Nothing @@ -368,7 +388,8 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do closeFd fh -- notice $ yellow "indexing" <+> pretty fname idx <- ncqRunTaskNoMatterWhat ncq (ncqIndexFile ncq (DataFile fk)) - ncqAddTrackedFile ncq (DataFile fk) + ncqStateUpdate ncq [F 0 fk] + -- ncqAddTrackedFile ncq (DataFile fk) nwayHashMMapReadOnly idx >>= \case Nothing -> err $ "can't open index" <+> pretty idx Just (bs,nway) -> do @@ -391,7 +412,9 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do mq <- newEmptyTMVarIO spawnJob $ do - merged <- ncqStorageMergeStep ncq + -- TODO: back-to-merge + -- merged <- ncqStorageMergeStep ncq + let merged = True atomically $ putTMVar mq merged -- TODO: detect-dead-merge @@ -434,11 +457,12 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do rest <- if not (sync || needClose || w > ncqFsync) then pure w - else liftIO do - appendTailSection fh >> fileSynchronise fh + else do + appendTailSection fh >> liftIO (fileSynchronise fh) atomically do writeTVar ncqStorageSyncReq False modifyTVar' ncqSyncNo succ + pure 0 if | needClose && continue -> do @@ -675,6 +699,9 @@ ncqRepair :: MonadIO m => NCQStorage2 -> m () ncqRepair me@NCQStorage2{} = do fossils <- ncqListTrackedFiles me + -- TODO: use-state + warn $ red "TODO: use state for load" + for_ fossils $ \fo -> liftIO $ flip fix 0 \next i -> do let dataFile = ncqGetFileName me $ toFileName (DataFile fo) try @_ @SomeException (ncqFileFastCheck dataFile) >>= \case @@ -699,6 +726,8 @@ ncqRepair me@NCQStorage2{} = do Right{} -> do err $ "skip indexing" <+> pretty dataFile + void (liftIO $ ncqStateUpdate me mempty) + ncqRefHash :: NCQStorage2 -> HashRef -> HashRef ncqRefHash NCQStorage2 {..} h = HashRef (hashObject (coerce @_ @ByteString h <> coerce ncqSalt)) @@ -717,6 +746,71 @@ ncqWaitTasks NCQStorage2{..} = atomically do tno <- readTVar ncqStorageTasks when (tno > 0) STM.retry +ncqStateUseSTM :: NCQStorage2 -> STM () +ncqStateUseSTM NCQStorage2{..} = do + k <- readTVar ncqStateVersion <&> fromIntegral + modifyTVar' ncqStateUsage (IntMap.update (Just . over _1 succ) k) + +ncqStateUnuseSTM :: NCQStorage2 -> STM () +ncqStateUnuseSTM NCQStorage2{..} = do + k <- readTVar ncqStateVersion <&> fromIntegral + modifyTVar' ncqStateUsage (IntMap.update (Just . over _1 pred) k) + +ncqStateUpdate :: MonadUnliftIO m => NCQStorage2 -> [StateOP] -> m Bool +ncqStateUpdate me@NCQStorage2{..} ops' = flip runContT pure $ callCC \exit -> do + t1 <- fromIntegral <$> liftIO getTimeCoarse + + ops <- for ops' $ \case + f@(F _ fk) -> do + let idxFile = ncqGetFileName me (toFileName $ IndexFile fk) + let datFile = ncqGetFileName me (toFileName $ DataFile fk) + + e1 <- doesFileExist idxFile + e2 <- doesFileExist datFile + + unless (e1 && e2) do + err $ "ncqStateUpdate invariant fail" <+> pretty idxFile <+> pretty datFile + exit False + + ts <- liftIO (getFileStatus datFile) <&> + posixToTimeSpec . PFS.modificationTimeHiRes + + pure (F ts fk) + + d -> pure d + + changed <- atomically do + t0 <- readTVar ncqStateVersion + let k0 = fromIntegral t0 + + c <- if List.null ops then do + pure False + else do + writeTVar ncqStateVersion (max (succ t0) t1) + for_ ops $ \case + D fk -> modifyTVar' ncqTrackedFiles (HPSQ.delete fk) + F t fk -> ncqAddTrackedFileSTM me (coerce fk) t + pure True + + old <- readTVar ncqTrackedFiles <&> HS.fromList . HPSQ.keys + + let doAlter = \case + Nothing -> Just (0, old) + Just (u,f) -> Just (u,f) + + modifyTVar' ncqStateUsage (IntMap.alter doAlter k0) + + pure c + + when changed (lift $ ncqDumpCurrentState me) + + pure changed + +ncqDumpCurrentState :: MonadUnliftIO m => NCQStorage2 -> m () +ncqDumpCurrentState me@NCQStorage2{..} = do + keys <- readTVarIO ncqTrackedFiles <&> List.sort . HPSQ.keys + name <- ncqGetNewStateName me + writeBinaryFileDurableAtomic name (BS8.unlines [coerce k | k <- keys]) -- FIXME: sometime-causes-no-such-file-or-directory ncqStorageMergeStep :: MonadUnliftIO m => NCQStorage2 -> m Bool @@ -823,8 +917,8 @@ ncqStorageMergeStep ncq@NCQStorage2{..} = ncqRunTask ncq False $ flip runContT for_ idx $ \(ts,fk) -> do ncqAddTrackedFileSTM ncq (coerce fk) (posixToTimeSpec ts) - mapM_ rm [fDataNameA, fDataNameB, fIndexNameB, fIndexNameA] - + for_ idx $ \(ts,DataFile fk) -> do + void $ ncqStateUpdate ncq [D a, D b, F (posixToTimeSpec ts) fk] orFail what e = do r <- what