wip, trying PendingEntry state

This commit is contained in:
voidlizard 2025-07-13 16:56:01 +03:00
parent a49ee574de
commit 15bec48522
2 changed files with 17 additions and 14 deletions

View File

@ -90,11 +90,12 @@ data CachedEntry =
, cachedNway :: NWayHash , cachedNway :: NWayHash
, cachedTs :: TVar TimeSpec , cachedTs :: TVar TimeSpec
} }
| PendingEntry {}
instance Show CachedEntry where instance Show CachedEntry where
show _ = "CachedEntry{...}" show = \case
CachedEntry{} -> "CachedEntry{...}"
PendingEntry{} -> "PendingEntry{...}"
newtype NCQFullRecordLen a = newtype NCQFullRecordLen a =
NCQFullRecordLen a NCQFullRecordLen a

View File

@ -144,7 +144,6 @@ data NCQStorage2 =
, ncqMergeSem :: TSem , ncqMergeSem :: TSem
, ncqSyncNo :: TVar Int , ncqSyncNo :: TVar Int
, ncqTrackedFiles :: TVar (HashPSQ FileKey FilePrio (Maybe CachedEntry)) , ncqTrackedFiles :: TVar (HashPSQ FileKey FilePrio (Maybe CachedEntry))
, ncqStaged :: TVar (HashSet FileKey)
, ncqStateVersion :: TVar StateVersion , ncqStateVersion :: TVar StateVersion
, ncqStateUsage :: TVar (IntMap (Int, HashSet FileKey)) , ncqStateUsage :: TVar (IntMap (Int, HashSet FileKey))
, ncqCachedEntries :: TVar Int , ncqCachedEntries :: TVar Int
@ -177,7 +176,6 @@ ncqStorageOpen2 fp upd = do
ncqMergeSem <- atomically (newTSem 1) ncqMergeSem <- atomically (newTSem 1)
ncqSyncNo <- newTVarIO 0 ncqSyncNo <- newTVarIO 0
ncqTrackedFiles <- newTVarIO HPSQ.empty ncqTrackedFiles <- newTVarIO HPSQ.empty
ncqStaged <- newTVarIO mempty
ncqStateVersion <- newTVarIO 0 ncqStateVersion <- newTVarIO 0
ncqStateUsage <- newTVarIO mempty ncqStateUsage <- newTVarIO mempty
ncqCachedEntries <- newTVarIO 0 ncqCachedEntries <- newTVarIO 0
@ -306,7 +304,7 @@ ncqLocate2 ncq@NCQStorage2{..} href = flip runContT pure $ callCC \exit -> do
tracked <- readTVarIO ncqTrackedFiles <&> HPSQ.toList tracked <- readTVarIO ncqTrackedFiles <&> HPSQ.toList
for_ tracked $ \(fk, prio, mCached) -> do for_ tracked $ \(fk, prio, mCached) -> callCC \skip -> do
case mCached of case mCached of
Just CachedEntry{..} -> do Just CachedEntry{..} -> do
lookupEntry href (cachedMmapedIdx, cachedNway) >>= \case lookupEntry href (cachedMmapedIdx, cachedNway) >>= \case
@ -315,10 +313,18 @@ ncqLocate2 ncq@NCQStorage2{..} href = flip runContT pure $ callCC \exit -> do
atomically $ writeTVar cachedTs now atomically $ writeTVar cachedTs now
exit (Just $ InFossil cachedMmapedData offset size) exit (Just $ InFossil cachedMmapedData offset size)
Just PendingEntry {} -> none
Nothing -> useVersion do Nothing -> useVersion do
let indexFile = ncqGetFileName ncq (toFileName (IndexFile fk)) let indexFile = ncqGetFileName ncq (toFileName (IndexFile fk))
let dataFile = ncqGetFileName ncq (toFileName (DataFile fk)) let dataFile = ncqGetFileName ncq (toFileName (DataFile fk))
idxHere <- doesFileExist indexFile
unless idxHere do
err $ red "missed index" <+> "in ncqLocate" <+> pretty fk
skip ()
(idxBs, idxNway) <- liftIO (nwayHashMMapReadOnly indexFile) (idxBs, idxNway) <- liftIO (nwayHashMMapReadOnly indexFile)
>>= orThrow (NCQStorageCantMapFile indexFile) >>= orThrow (NCQStorageCantMapFile indexFile)
@ -395,7 +401,6 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do
-- notice $ yellow "indexing" <+> pretty fname -- notice $ yellow "indexing" <+> pretty fname
idx <- ncqRunTaskNoMatterWhat ncq (ncqIndexFile ncq (DataFile fk)) idx <- ncqRunTaskNoMatterWhat ncq (ncqIndexFile ncq (DataFile fk))
ncqRunTaskNoMatterWhat ncq $ ncqStateUpdate ncq [F 0 fk] ncqRunTaskNoMatterWhat ncq $ ncqStateUpdate ncq [F 0 fk]
-- ncqAddTrackedFile ncq (DataFile fk)
nwayHashMMapReadOnly idx >>= \case nwayHashMMapReadOnly idx >>= \case
Nothing -> err $ "can't open index" <+> pretty idx Nothing -> err $ "can't open index" <+> pretty idx
Just (bs,nway) -> do Just (bs,nway) -> do
@ -468,9 +473,10 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do
else do else do
appendTailSection fh >> liftIO (fileSynchronise fh) appendTailSection fh >> liftIO (fileSynchronise fh)
-- FIXME: slow! -- FIXME: slow!
liftIO (ncqStateUpdate ncq [F 0 fk]) -- liftIO (ncqStateUpdate ncq [F 0 fk])
atomically do atomically do
modifyTVar ncqStaged (HS.insert fk) -- to make it appear in state, but to ignore until indexed
modifyTVar ncqTrackedFiles (HPSQ.insert fk (FilePrio (Down 0)) (Just PendingEntry))
writeTVar ncqStorageSyncReq False writeTVar ncqStorageSyncReq False
modifyTVar' ncqSyncNo succ modifyTVar' ncqSyncNo succ
@ -873,11 +879,7 @@ ncqStateUpdate me@NCQStorage2{..} ops' = flip runContT pure $ callCC \exit -> do
ncqDumpCurrentState :: MonadUnliftIO m => NCQStorage2 -> m () ncqDumpCurrentState :: MonadUnliftIO m => NCQStorage2 -> m ()
ncqDumpCurrentState me@NCQStorage2{..} = do ncqDumpCurrentState me@NCQStorage2{..} = do
keys <- atomically do keys <- readTVarIO ncqTrackedFiles <&> List.sort . HPSQ.keys
a1 <- readTVar ncqTrackedFiles <&> HPSQ.keys
a2 <- readTVar ncqStaged
pure (HS.toList (a2 <> HS.fromList a1))
name <- ncqGetNewStateName me name <- ncqGetNewStateName me
writeBinaryFileDurableAtomic name (BS8.unlines [coerce k | k <- keys]) writeBinaryFileDurableAtomic name (BS8.unlines [coerce k | k <- keys])