mirror of https://github.com/voidlizard/hbs2
wip, data file merge
This commit is contained in:
parent
88447330b6
commit
6c3dc29041
|
@ -71,6 +71,7 @@ library
|
|||
HBS2.Storage.NCQ3.Internal.Index
|
||||
HBS2.Storage.NCQ3.Internal.MMapCache
|
||||
HBS2.Storage.NCQ3.Internal.Files
|
||||
HBS2.Storage.NCQ3.Internal.Fossil
|
||||
HBS2.Storage.NCQ
|
||||
HBS2.Storage.NCQ2
|
||||
HBS2.Storage.NCQ2.Internal
|
||||
|
|
|
@ -16,5 +16,6 @@ import HBS2.Storage.NCQ3.Internal
|
|||
import HBS2.Storage.NCQ3.Internal.Run
|
||||
import HBS2.Storage.NCQ3.Internal.State
|
||||
import HBS2.Storage.NCQ3.Internal.Memtable
|
||||
import HBS2.Storage.NCQ3.Internal.Index
|
||||
|
||||
|
||||
|
|
|
@ -45,7 +45,7 @@ ncqStorageOpen3 fp upd = do
|
|||
let ncqFsync = 16 * megabytes
|
||||
let ncqWriteQLen = 1024 * 4
|
||||
let ncqMinLog = 512 * megabytes
|
||||
let ncqMaxLog = 2 * ncqMinLog
|
||||
let ncqMaxLog = 32 * gigabytes
|
||||
let ncqWriteBlock = max 128 $ ncqWriteQLen `div` 2
|
||||
let ncqMaxCachedIndex = 16
|
||||
let ncqMaxCachedData = 64
|
||||
|
@ -131,15 +131,6 @@ ncqPutBS ncq@NCQStorage3{..} mtp mhref bs' = ncqOperation ncq (pure $ fromMaybe
|
|||
|
||||
where hash0 = HashRef (hashObject @HbSync bs')
|
||||
|
||||
ncqLocate :: MonadUnliftIO m => NCQStorage3 -> HashRef -> m (Maybe Location)
|
||||
ncqLocate me@NCQStorage3{..} href = ncqOperation me (pure Nothing) do
|
||||
answ <- newEmptyTMVarIO
|
||||
|
||||
atomically do
|
||||
modifyTVar ncqWrites succ
|
||||
writeTQueue ncqReadReq (href, answ)
|
||||
|
||||
atomically $ takeTMVar answ
|
||||
|
||||
ncqTryLoadState :: forall m. MonadUnliftIO m
|
||||
=> NCQStorage3
|
||||
|
|
|
@ -0,0 +1,181 @@
|
|||
module HBS2.Storage.NCQ3.Internal.Fossil where
|
||||
|
||||
import HBS2.Storage.NCQ3.Internal.Prelude
|
||||
import HBS2.Storage.NCQ3.Internal.Types
|
||||
import HBS2.Storage.NCQ3.Internal.Files
|
||||
import HBS2.Storage.NCQ3.Internal.Index
|
||||
import HBS2.Storage.NCQ3.Internal.State
|
||||
|
||||
import HBS2.Data.Types.Refs
|
||||
|
||||
import Data.HashSet qualified as HS
|
||||
import Data.List qualified as List
|
||||
import Data.ByteString.Lazy qualified as LBS
|
||||
import Data.ByteString qualified as BS
|
||||
import Control.Monad.Trans.Cont
|
||||
import Network.ByteOrder qualified as N
|
||||
import Data.ByteString.Builder
|
||||
import System.IO.Temp (emptyTempFile)
|
||||
|
||||
import System.FilePath.Posix
|
||||
import System.Posix.Files qualified as Posix
|
||||
import System.Posix.IO as PosixBase
|
||||
import System.Posix.Types as Posix
|
||||
import System.Posix.Unistd
|
||||
import System.Posix.IO.ByteString as Posix
|
||||
import System.Posix.Files ( getFileStatus
|
||||
, modificationTimeHiRes
|
||||
, setFileTimesHiRes
|
||||
, getFdStatus
|
||||
, FileStatus(..)
|
||||
, setFileMode
|
||||
)
|
||||
import System.Posix.Files qualified as PFS
|
||||
|
||||
import UnliftIO.IO.File
|
||||
|
||||
{-HLINT ignore "Functor law"-}
|
||||
|
||||
ncqFossilMergeStep :: forall m . MonadUnliftIO m
|
||||
=> NCQStorage3
|
||||
-> m Bool
|
||||
|
||||
ncqFossilMergeStep me@NCQStorage3{..} = withSem ncqServiceSem $ flip runContT pure $ callCC \exit -> do
|
||||
|
||||
debug "ncqFossilMergeStep"
|
||||
|
||||
-- TODO: consider-sort-by-timestamps
|
||||
files <- readTVarIO ncqState
|
||||
<&> fmap DataFile . HS.toList . ncqStateFiles
|
||||
<&> List.sortOn Down
|
||||
|
||||
r' <- lift $ ncqFindMinPairOf me files
|
||||
|
||||
r@(sumSize, f1, f2) <- ContT $ maybe1 r' (pure False)
|
||||
|
||||
debug $ "for compacting" <+> pretty f1 <+> pretty f2 <+> pretty r <+> pretty ncqMaxLog
|
||||
|
||||
when (fromIntegral sumSize > ncqMaxLog) $ exit False
|
||||
|
||||
let (p,tpl) = splitFileName (ncqGetFileName me "merge-.merge")
|
||||
|
||||
outFile <- liftIO $ emptyTempFile p tpl
|
||||
|
||||
ContT $ bracket none $ const do
|
||||
rm outFile
|
||||
|
||||
liftIO $ withBinaryFileAtomic outFile WriteMode $ \fwh -> do
|
||||
fd <- handleToFd fwh
|
||||
|
||||
already <- newTVarIO (mempty :: HashSet HashRef )
|
||||
|
||||
for_ [f1, f2] $ \fi -> do
|
||||
let fik = coerce fi
|
||||
writeFiltered me (ncqGetFileName me fi) fd $ \_ _ k _ -> do
|
||||
ncqLocate me k >>= \case
|
||||
Nothing -> pure True
|
||||
Just (InMemory{}) -> pure False
|
||||
Just (InFossil fk _ _) -> do
|
||||
let beWritten = fik >= fk
|
||||
atomically do
|
||||
here <- readTVar already <&> HS.member k
|
||||
let proceed = not here && beWritten
|
||||
when proceed (modifyTVar already (HS.insert k))
|
||||
pure proceed
|
||||
|
||||
appendTailSection fd
|
||||
|
||||
f3 <- DataFile <$> ncqGetNewFileKey me DataFile
|
||||
|
||||
let newFile = ncqGetFileName me f3
|
||||
|
||||
mv outFile newFile
|
||||
|
||||
ss <- liftIO (PFS.getFileStatus newFile) <&> fromIntegral . PFS.fileSize
|
||||
|
||||
ncqStateUpdate me do
|
||||
ncqStateAddFact (P (PData f3 ss))
|
||||
|
||||
lift $ ncqIndexFile me f3
|
||||
|
||||
ncqStateUpdate me do
|
||||
ncqStateDelDataFile (coerce f1)
|
||||
ncqStateDelDataFile (coerce f2)
|
||||
|
||||
debug $ "COMPACTED" <+> pretty f1 <+> pretty f2 <+> "=>" <+> pretty f3
|
||||
|
||||
pure True
|
||||
|
||||
|
||||
writeFiltered :: forall m . MonadIO m
|
||||
=> NCQStorage3
|
||||
-> FilePath
|
||||
-> Fd
|
||||
-> ( Integer -> Integer -> HashRef -> ByteString -> m Bool)
|
||||
-> m ()
|
||||
|
||||
writeFiltered ncq fn out filt = do
|
||||
ncqStorageScanDataFile ncq fn $ \o s k v -> do
|
||||
skip <- filt o s k v <&> not
|
||||
|
||||
when skip do
|
||||
debug $ pretty k <+> pretty "skipped"
|
||||
|
||||
unless skip $ liftIO do
|
||||
void $ appendSection out (LBS.toStrict (makeEntryLBS k v))
|
||||
|
||||
where
|
||||
|
||||
makeEntryLBS h bs = do
|
||||
let b = byteString (coerce @_ @ByteString h)
|
||||
<> byteString bs
|
||||
|
||||
let wbs = toLazyByteString b
|
||||
let len = LBS.length wbs
|
||||
let ws = byteString (N.bytestring32 (fromIntegral len))
|
||||
|
||||
toLazyByteString (ws <> b)
|
||||
|
||||
|
||||
|
||||
zeroSyncEntry :: ByteString
|
||||
zeroSyncEntry = ncqMakeSectionBS (Just B) zeroHash zeroPayload
|
||||
where zeroPayload = N.bytestring64 0
|
||||
zeroHash = HashRef (hashObject zeroPayload)
|
||||
{-# INLINE zeroSyncEntry #-}
|
||||
|
||||
zeroSyncEntrySize :: Word64
|
||||
zeroSyncEntrySize = fromIntegral (BS.length zeroSyncEntry)
|
||||
{-# INLINE zeroSyncEntrySize #-}
|
||||
|
||||
-- 1. It's M-record
|
||||
-- 2. It's last w64be == fileSize
|
||||
-- 3. It's hash == hash (bytestring64be fileSize)
|
||||
-- 4. recovery-strategy: start-to-end, end-to-start
|
||||
fileTailRecord :: Integral a => a -> ByteString
|
||||
fileTailRecord w = do
|
||||
-- on open: last w64be == fileSize
|
||||
let paylo = N.bytestring64 (fromIntegral w + zeroSyncEntrySize)
|
||||
let h = hashObject @HbSync paylo & coerce
|
||||
ncqMakeSectionBS (Just M) h paylo
|
||||
{-# INLINE fileTailRecord #-}
|
||||
|
||||
appendSection :: forall m . MonadUnliftIO m
|
||||
=> Fd
|
||||
-> ByteString
|
||||
-> m Int -- (FOff, Int)
|
||||
|
||||
appendSection fh sect = do
|
||||
-- off <- liftIO $ fdSeek fh SeekFromEnd 0
|
||||
-- pure (fromIntegral off, fromIntegral len)
|
||||
liftIO (Posix.fdWrite fh sect) <&> fromIntegral
|
||||
{-# INLINE appendSection #-}
|
||||
|
||||
appendTailSection :: MonadIO m => Fd -> m ()
|
||||
appendTailSection fh = liftIO do
|
||||
s <- Posix.fileSize <$> Posix.getFdStatus fh
|
||||
void (appendSection fh (fileTailRecord s))
|
||||
{-# INLINE appendTailSection #-}
|
||||
|
||||
|
||||
|
|
@ -49,6 +49,16 @@ ncqLookupIndex hx (mmaped, nway) = do
|
|||
{-# INLINE ncqLookupIndex #-}
|
||||
|
||||
|
||||
ncqLocate :: MonadUnliftIO m => NCQStorage3 -> HashRef -> m (Maybe Location)
|
||||
ncqLocate me@NCQStorage3{..} href = ncqOperation me (pure Nothing) do
|
||||
answ <- newEmptyTMVarIO
|
||||
|
||||
atomically do
|
||||
-- modifyTVar ncqWrites succ
|
||||
writeTQueue ncqReadReq (href, answ)
|
||||
|
||||
atomically $ takeTMVar answ
|
||||
|
||||
ncqIndexFile :: MonadUnliftIO m => NCQStorage3 -> DataFile FileKey -> m (Maybe FilePath)
|
||||
ncqIndexFile n fk = runMaybeT do
|
||||
|
||||
|
@ -108,7 +118,7 @@ ncqIndexFile n fk = runMaybeT do
|
|||
ncqIndexCompactStep :: MonadUnliftIO m
|
||||
=> NCQStorage3
|
||||
-> m Bool
|
||||
ncqIndexCompactStep me@NCQStorage3{..} = flip runContT pure $ callCC \exit -> do
|
||||
ncqIndexCompactStep me@NCQStorage3{..} = withSem ncqServiceSem $ flip runContT pure $ callCC \exit -> do
|
||||
|
||||
debug "ncqIndexCompactStep"
|
||||
|
||||
|
|
|
@ -10,7 +10,7 @@ import HBS2.Storage.NCQ3.Internal.Index
|
|||
import HBS2.Storage.NCQ3.Internal.State
|
||||
import HBS2.Storage.NCQ3.Internal.Sweep
|
||||
import HBS2.Storage.NCQ3.Internal.MMapCache
|
||||
|
||||
import HBS2.Storage.NCQ3.Internal.Fossil
|
||||
|
||||
import Control.Monad.Trans.Cont
|
||||
import Control.Monad.Trans.Maybe
|
||||
|
@ -118,21 +118,11 @@ ncqStorageRun3 ncq@NCQStorage3{..} = flip runContT pure do
|
|||
-- FIXME: timeout-hardcode
|
||||
pause @'Seconds 60
|
||||
|
||||
spawnActivity $ postponed 10 $ forever $ void $ runMaybeT do
|
||||
ema <- readTVarIO ncqWriteEMA
|
||||
spawnActivity $ postponed 10 $ compactLoop 10 300 do
|
||||
ncqIndexCompactStep ncq
|
||||
|
||||
when (ema > ncqIdleThrsh) $ pause @'Seconds 10 >> mzero
|
||||
|
||||
compacted <- lift $ ncqIndexCompactStep ncq
|
||||
|
||||
when compacted mzero
|
||||
|
||||
k0 <- readTVarIO ncqStateKey
|
||||
void $ lift $ race (pause @'Seconds 600) do
|
||||
flip fix k0 $ \waitState k1 -> do
|
||||
pause @'Seconds 60
|
||||
k2 <- readTVarIO ncqStateKey
|
||||
when (k2 == k1) $ waitState k2
|
||||
spawnActivity $ postponed 15 $ compactLoop 10 600 do
|
||||
ncqFossilMergeStep ncq
|
||||
|
||||
flip fix RunNew $ \loop -> \case
|
||||
RunFin -> do
|
||||
|
@ -256,6 +246,25 @@ ncqStorageRun3 ncq@NCQStorage3{..} = flip runContT pure do
|
|||
|
||||
postponed n m = liftIO (pause @'Seconds n) >> m
|
||||
|
||||
compactLoop :: Timeout 'Seconds -> Timeout 'Seconds -> m Bool -> m ()
|
||||
compactLoop t1 t2 what = forever $ void $ runMaybeT do
|
||||
ema <- readTVarIO ncqWriteEMA
|
||||
|
||||
when (ema > ncqIdleThrsh) $ pause @'Seconds t1 >> mzero
|
||||
|
||||
compacted <- lift what
|
||||
|
||||
when compacted mzero
|
||||
|
||||
k0 <- readTVarIO ncqStateKey
|
||||
void $ lift $ race (pause @'Seconds t2) do
|
||||
flip fix k0 $ \waitState k1 -> do
|
||||
pause @'Seconds 60
|
||||
k2 <- readTVarIO ncqStateKey
|
||||
when (k2 == k1) $ waitState k2
|
||||
|
||||
|
||||
|
||||
data RunSt =
|
||||
RunNew
|
||||
| RunWrite (FileKey, Fd, Int, Int)
|
||||
|
@ -263,43 +272,3 @@ data RunSt =
|
|||
| RunFin
|
||||
|
||||
|
||||
zeroSyncEntry :: ByteString
|
||||
zeroSyncEntry = ncqMakeSectionBS (Just B) zeroHash zeroPayload
|
||||
where zeroPayload = N.bytestring64 0
|
||||
zeroHash = HashRef (hashObject zeroPayload)
|
||||
{-# INLINE zeroSyncEntry #-}
|
||||
|
||||
zeroSyncEntrySize :: Word64
|
||||
zeroSyncEntrySize = fromIntegral (BS.length zeroSyncEntry)
|
||||
{-# INLINE zeroSyncEntrySize #-}
|
||||
|
||||
-- 1. It's M-record
|
||||
-- 2. It's last w64be == fileSize
|
||||
-- 3. It's hash == hash (bytestring64be fileSize)
|
||||
-- 4. recovery-strategy: start-to-end, end-to-start
|
||||
fileTailRecord :: Integral a => a -> ByteString
|
||||
fileTailRecord w = do
|
||||
-- on open: last w64be == fileSize
|
||||
let paylo = N.bytestring64 (fromIntegral w + zeroSyncEntrySize)
|
||||
let h = hashObject @HbSync paylo & coerce
|
||||
ncqMakeSectionBS (Just M) h paylo
|
||||
{-# INLINE fileTailRecord #-}
|
||||
|
||||
appendSection :: forall m . MonadUnliftIO m
|
||||
=> Fd
|
||||
-> ByteString
|
||||
-> m Int -- (FOff, Int)
|
||||
|
||||
appendSection fh sect = do
|
||||
-- off <- liftIO $ fdSeek fh SeekFromEnd 0
|
||||
-- pure (fromIntegral off, fromIntegral len)
|
||||
liftIO (Posix.fdWrite fh sect) <&> fromIntegral
|
||||
{-# INLINE appendSection #-}
|
||||
|
||||
appendTailSection :: MonadIO m => Fd -> m ()
|
||||
appendTailSection fh = liftIO do
|
||||
s <- Posix.fileSize <$> Posix.getFdStatus fh
|
||||
void (appendSection fh (fileTailRecord s))
|
||||
{-# INLINE appendTailSection #-}
|
||||
|
||||
|
||||
|
|
|
@ -16,6 +16,7 @@ import HBS2.Storage.Operations.ByteString
|
|||
import HBS2.Storage.NCQ3
|
||||
import HBS2.Storage.NCQ3.Internal.Files
|
||||
import HBS2.Storage.NCQ3.Internal.Index
|
||||
import HBS2.Storage.NCQ3.Internal.Fossil
|
||||
|
||||
import HBS2.System.Logger.Simple.ANSI
|
||||
|
||||
|
@ -244,3 +245,37 @@ ncq3Tests = do
|
|||
liftIO $ assertBool (show $ "found" <+> pretty h) found
|
||||
|
||||
|
||||
|
||||
entry $ bindMatch "test:ncq3:merge:fossil" $ nil_ \e -> do
|
||||
|
||||
let (opts,args) = splitOpts [] e
|
||||
let num = headDef 1000 [ fromIntegral n | LitIntVal n <- args ]
|
||||
g <- liftIO MWC.createSystemRandom
|
||||
|
||||
runTest $ \TestEnv{..} -> do
|
||||
ncqWithStorage3 testEnvDir $ \sto@NCQStorage3{..} -> flip runContT pure do
|
||||
|
||||
hst <- newTVarIO ( mempty :: HashSet HashRef )
|
||||
|
||||
notice $ "write" <+> pretty num
|
||||
replicateM_ num do
|
||||
n <- liftIO $ uniformRM (1024, 64*1024) g
|
||||
bs <- liftIO $ genRandomBS g n
|
||||
h <- lift $ ncqPutBS sto (Just B) Nothing bs
|
||||
atomically $ modifyTVar hst (HS.insert h)
|
||||
|
||||
lift (ncqFossilMergeStep sto)
|
||||
|
||||
notice "merge done"
|
||||
|
||||
pause @'Seconds 180
|
||||
|
||||
notice "check after compaction"
|
||||
|
||||
h1 <- readTVarIO hst
|
||||
|
||||
for_ h1 $ \h -> lift do
|
||||
found <- ncqLocate sto h <&> isJust
|
||||
liftIO $ assertBool (show $ "found" <+> pretty h) found
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue