mirror of https://github.com/voidlizard/hbs2
ncqCompact
This commit is contained in:
parent
a5cd25a34a
commit
f4f2b26be6
|
@ -277,6 +277,13 @@ ncqGetNewMergeName n@NCQStorage{} = do
|
||||||
let (p,tpl) = splitFileName fn
|
let (p,tpl) = splitFileName fn
|
||||||
liftIO $ emptyTempFile p tpl
|
liftIO $ emptyTempFile p tpl
|
||||||
|
|
||||||
|
|
||||||
|
ncqGetNewCompactName :: MonadIO m => NCQStorage -> m FilePath
|
||||||
|
ncqGetNewCompactName n@NCQStorage{} = do
|
||||||
|
let fn = ncqGetFileName n "compact-.data"
|
||||||
|
let (p,tpl) = splitFileName fn
|
||||||
|
liftIO $ emptyTempFile p tpl
|
||||||
|
|
||||||
ncqGetIndexFileName :: NCQStorage -> FileKey -> FilePath
|
ncqGetIndexFileName :: NCQStorage -> FileKey -> FilePath
|
||||||
ncqGetIndexFileName ncq fk = do
|
ncqGetIndexFileName ncq fk = do
|
||||||
ncqGetFileName ncq (addExtension (dropExtension (BS8.unpack (coerce fk))) ".cq")
|
ncqGetFileName ncq (addExtension (dropExtension (BS8.unpack (coerce fk))) ".cq")
|
||||||
|
@ -1380,6 +1387,36 @@ withNCQ setopts p action = flip runContT pure do
|
||||||
pure e
|
pure e
|
||||||
|
|
||||||
|
|
||||||
|
writeFiltered :: forall m . MonadIO m
|
||||||
|
=> NCQStorage
|
||||||
|
-> FilePath
|
||||||
|
-> Handle
|
||||||
|
-> ( Integer -> Integer -> HashRef -> ByteString -> m Bool)
|
||||||
|
-> m ()
|
||||||
|
|
||||||
|
writeFiltered ncq fn out filt = do
|
||||||
|
ncqStorageScanDataFile ncq fn $ \o s k v -> do
|
||||||
|
skip <- filt o s k v <&> not
|
||||||
|
|
||||||
|
when skip do
|
||||||
|
debug $ pretty k <+> pretty "skipped"
|
||||||
|
|
||||||
|
unless skip $ liftIO do
|
||||||
|
BS.hPut out (LBS.toStrict (makeEntryLBS k v))
|
||||||
|
|
||||||
|
where
|
||||||
|
|
||||||
|
makeEntryLBS h bs = do
|
||||||
|
let b = byteString (coerce @_ @ByteString h)
|
||||||
|
<> byteString bs
|
||||||
|
|
||||||
|
let wbs = toLazyByteString b
|
||||||
|
let len = LBS.length wbs
|
||||||
|
let ws = byteString (N.bytestring32 (fromIntegral len))
|
||||||
|
|
||||||
|
toLazyByteString (ws <> b)
|
||||||
|
|
||||||
|
|
||||||
ncqStorageMerge :: MonadUnliftIO m => NCQStorage -> m ()
|
ncqStorageMerge :: MonadUnliftIO m => NCQStorage -> m ()
|
||||||
ncqStorageMerge NCQStorage{..} = atomically $ modifyTVar ncqMergeReq succ
|
ncqStorageMerge NCQStorage{..} = atomically $ modifyTVar ncqMergeReq succ
|
||||||
|
|
||||||
|
@ -1399,21 +1436,6 @@ ncqStorageMergeStep ncq@NCQStorage{..} = do
|
||||||
|
|
||||||
where
|
where
|
||||||
|
|
||||||
writeFiltered :: forall m . MonadIO m
|
|
||||||
=> FilePath
|
|
||||||
-> Handle
|
|
||||||
-> ( Integer -> Integer -> HashRef -> ByteString -> m Bool)
|
|
||||||
-> m ()
|
|
||||||
|
|
||||||
writeFiltered fn out filt = do
|
|
||||||
ncqStorageScanDataFile ncq fn $ \o s k v -> do
|
|
||||||
skip <- filt o s k v <&> not
|
|
||||||
|
|
||||||
when skip do
|
|
||||||
debug $ pretty k <+> pretty "skipped"
|
|
||||||
|
|
||||||
unless skip $ liftIO do
|
|
||||||
BS.hPut out (LBS.toStrict (makeEntryLBS k v))
|
|
||||||
|
|
||||||
mergeStep [] = none
|
mergeStep [] = none
|
||||||
mergeStep [_] = none
|
mergeStep [_] = none
|
||||||
|
@ -1450,12 +1472,12 @@ ncqStorageMergeStep ncq@NCQStorage{..} = do
|
||||||
|
|
||||||
debug $ "SCAN FILE A" <+> pretty fDataNameA
|
debug $ "SCAN FILE A" <+> pretty fDataNameA
|
||||||
|
|
||||||
writeFiltered fDataNameA fwh $ \_ _ _ v -> do
|
writeFiltered ncq fDataNameA fwh $ \_ _ _ v -> do
|
||||||
pure $ not (ncqIsTomb (LBS.fromStrict v))
|
pure $ not (ncqIsTomb (LBS.fromStrict v))
|
||||||
|
|
||||||
debug $ "SCAN FILE B" <+> pretty fDataNameA
|
debug $ "SCAN FILE B" <+> pretty fDataNameA
|
||||||
|
|
||||||
writeFiltered fDataNameB fwh $ \_ _ k v -> do
|
writeFiltered ncq fDataNameB fwh $ \_ _ k v -> do
|
||||||
let tomb = ncqIsTomb (LBS.fromStrict v)
|
let tomb = ncqIsTomb (LBS.fromStrict v)
|
||||||
foundInA <- liftIO (nwayHashLookup nway mmIdx (coerce k)) <&> isJust
|
foundInA <- liftIO (nwayHashLookup nway mmIdx (coerce k)) <&> isJust
|
||||||
let skip = tomb || foundInA
|
let skip = tomb || foundInA
|
||||||
|
@ -1494,15 +1516,6 @@ ncqStorageMergeStep ncq@NCQStorage{..} = do
|
||||||
r <- what
|
r <- what
|
||||||
unless r (throwIO (NCQMergeInvariantFailed (show e)))
|
unless r (throwIO (NCQMergeInvariantFailed (show e)))
|
||||||
|
|
||||||
makeEntryLBS h bs = do
|
|
||||||
let b = byteString (coerce @_ @ByteString h)
|
|
||||||
<> byteString bs
|
|
||||||
|
|
||||||
let wbs = toLazyByteString b
|
|
||||||
let len = LBS.length wbs
|
|
||||||
let ws = byteString (N.bytestring32 (fromIntegral len))
|
|
||||||
|
|
||||||
toLazyByteString (ws <> b)
|
|
||||||
|
|
||||||
|
|
||||||
posixToTimeSpec :: POSIXTime -> TimeSpec
|
posixToTimeSpec :: POSIXTime -> TimeSpec
|
||||||
|
@ -1517,6 +1530,8 @@ posixToTimeSpec pt =
|
||||||
-- limit amount of tombs per one pass
|
-- limit amount of tombs per one pass
|
||||||
-- then remove all dead entries,
|
-- then remove all dead entries,
|
||||||
-- then call again to remove tombs. etc
|
-- then call again to remove tombs. etc
|
||||||
|
-- as for now, seems it should work up to 10TB
|
||||||
|
-- of storage
|
||||||
ncqLinearScanForCompact :: MonadUnliftIO m
|
ncqLinearScanForCompact :: MonadUnliftIO m
|
||||||
=> NCQStorage
|
=> NCQStorage
|
||||||
-> ( FileKey -> HashRef -> m () )
|
-> ( FileKey -> HashRef -> m () )
|
||||||
|
@ -1530,6 +1545,8 @@ ncqLinearScanForCompact ncq@NCQStorage{..} action = do
|
||||||
bodyCount <- newTVarIO 0
|
bodyCount <- newTVarIO 0
|
||||||
tombUse <- newTVarIO (mempty :: HashMap HashRef (FileKey, Int))
|
tombUse <- newTVarIO (mempty :: HashMap HashRef (FileKey, Int))
|
||||||
|
|
||||||
|
-- TODO: explicit-unmap-files
|
||||||
|
|
||||||
flip fix (tracked, state0) $ \next -> \case
|
flip fix (tracked, state0) $ \next -> \case
|
||||||
([], s) -> none
|
([], s) -> none
|
||||||
((fk,p,_):rest, state) -> do
|
((fk,p,_):rest, state) -> do
|
||||||
|
@ -1589,3 +1606,57 @@ ncqLinearScanForCompact ncq@NCQStorage{..} action = do
|
||||||
readTVarIO bodyCount
|
readTVarIO bodyCount
|
||||||
|
|
||||||
|
|
||||||
|
ncqCompact :: MonadUnliftIO m => NCQStorage -> m ()
|
||||||
|
ncqCompact ncq@NCQStorage{..} = do
|
||||||
|
|
||||||
|
q <- newTVarIO ( mempty :: HashMap FileKey (HashSet HashRef) )
|
||||||
|
|
||||||
|
ncqLinearScanForCompact ncq $ \fk h -> atomically do
|
||||||
|
modifyTVar q (HM.insertWith (<>) fk (HS.singleton h))
|
||||||
|
|
||||||
|
state0 <- readTVarIO q
|
||||||
|
|
||||||
|
for_ (HM.toList state0) $ \(fk, es) -> do
|
||||||
|
notice $ "TO DELETE" <+> pretty fk <+> pretty (HS.size es)
|
||||||
|
|
||||||
|
let fDataNameA = ncqGetDataFileName ncq fk
|
||||||
|
let fIndexNameA = ncqGetIndexFileName ncq fk
|
||||||
|
|
||||||
|
flip runContT pure $ callCC \exit -> do
|
||||||
|
|
||||||
|
|
||||||
|
mfile <- ncqGetNewCompactName ncq
|
||||||
|
|
||||||
|
ContT $ bracket none $ const do
|
||||||
|
rm mfile
|
||||||
|
|
||||||
|
liftIO $ withBinaryFileAtomic mfile WriteMode $ \fwh -> do
|
||||||
|
|
||||||
|
writeFiltered ncq fDataNameA fwh $ \_ _ k v -> do
|
||||||
|
pure $ not $ HS.member k es
|
||||||
|
|
||||||
|
liftIO do
|
||||||
|
|
||||||
|
result <- fileSize mfile
|
||||||
|
|
||||||
|
if result == 0 then do
|
||||||
|
atomically $ modifyTVar ncqTrackedFiles (HPSQ.delete fk)
|
||||||
|
else do
|
||||||
|
|
||||||
|
fossil <- ncqGetNewFossilName ncq
|
||||||
|
mv mfile fossil
|
||||||
|
|
||||||
|
statA <- getFileStatus fDataNameA
|
||||||
|
|
||||||
|
let ts = modificationTimeHiRes statA
|
||||||
|
setFileTimesHiRes fossil ts ts
|
||||||
|
|
||||||
|
fname <- ncqIndexFile ncq fossil
|
||||||
|
|
||||||
|
atomically do
|
||||||
|
let fp = fromString fname
|
||||||
|
modifyTVar ncqTrackedFiles (HPSQ.delete fk)
|
||||||
|
ncqAddTrackedFilesSTM ncq [(fp, posixToTimeSpec ts)]
|
||||||
|
|
||||||
|
mapM_ rm [fDataNameA, fIndexNameA]
|
||||||
|
|
||||||
|
|
|
@ -230,6 +230,19 @@ main = do
|
||||||
|
|
||||||
pure nil
|
pure nil
|
||||||
|
|
||||||
|
entry $ bindMatch "ncq:compact" $ \syn -> lift do
|
||||||
|
|
||||||
|
tcq <- case syn of
|
||||||
|
[ isOpaqueOf @TCQ -> Just tcq ] -> do
|
||||||
|
pure tcq
|
||||||
|
|
||||||
|
e -> throwIO $ BadFormException @C (mkList e)
|
||||||
|
|
||||||
|
ncq <- getNCQ tcq
|
||||||
|
ncqCompact ncq
|
||||||
|
|
||||||
|
pure nil
|
||||||
|
|
||||||
entry $ bindMatch "ncq:merge" $ \syn -> lift do
|
entry $ bindMatch "ncq:merge" $ \syn -> lift do
|
||||||
|
|
||||||
tcq <- case syn of
|
tcq <- case syn of
|
||||||
|
|
Loading…
Reference in New Issue