mirror of https://github.com/voidlizard/hbs2
wip
This commit is contained in:
parent
39ac3e8832
commit
e8d019eaa2
|
@ -108,6 +108,9 @@ class Expires a where
|
||||||
-- FIXME: dangerous!
|
-- FIXME: dangerous!
|
||||||
expiresIn _ = Nothing
|
expiresIn _ = Nothing
|
||||||
|
|
||||||
|
timeSpecDeltaSeconds :: RealFrac a => TimeSpec -> TimeSpec -> a
|
||||||
|
timeSpecDeltaSeconds a b = realToFrac . (*1e-9) . realToFrac $ toNanoSecs (max a b - min a b)
|
||||||
|
|
||||||
getEpoch :: MonadIO m => m Word64
|
getEpoch :: MonadIO m => m Word64
|
||||||
getEpoch = liftIO getPOSIXTime <&> floor
|
getEpoch = liftIO getPOSIXTime <&> floor
|
||||||
|
|
||||||
|
|
|
@ -368,13 +368,18 @@ ncqPutBS ncq@NCQStorage2{..} mtp mhref bs' = do
|
||||||
ncqEntryUnwrap :: NCQStorage2
|
ncqEntryUnwrap :: NCQStorage2
|
||||||
-> ByteString
|
-> ByteString
|
||||||
-> (ByteString, Either ByteString (NCQSectionType, ByteString))
|
-> (ByteString, Either ByteString (NCQSectionType, ByteString))
|
||||||
ncqEntryUnwrap _ source = do
|
ncqEntryUnwrap n source = do
|
||||||
let (k,v) = BS.splitAt ncqKeyLen (BS.drop 4 source)
|
let (k,v) = BS.splitAt ncqKeyLen (BS.drop 4 source)
|
||||||
case ncqIsMeta v of
|
(k, ncqEntryUnwrapValue n v)
|
||||||
Just meta -> (k, Right (meta, BS.drop ncqPrefixLen v))
|
|
||||||
Nothing -> (k, Left v)
|
|
||||||
{-# INLINE ncqEntryUnwrap #-}
|
{-# INLINE ncqEntryUnwrap #-}
|
||||||
|
|
||||||
|
ncqEntryUnwrapValue :: NCQStorage2
|
||||||
|
-> ByteString
|
||||||
|
-> Either ByteString (NCQSectionType, ByteString)
|
||||||
|
ncqEntryUnwrapValue _ v = case ncqIsMeta v of
|
||||||
|
Just meta -> Right (meta, BS.drop ncqPrefixLen v)
|
||||||
|
Nothing -> Left v
|
||||||
|
{-# INLINE ncqEntryUnwrapValue #-}
|
||||||
|
|
||||||
ncqIdxIsTombSize :: NCQIdxEntry -> Bool
|
ncqIdxIsTombSize :: NCQIdxEntry -> Bool
|
||||||
ncqIdxIsTombSize (NCQIdxEntry _ s) = s == ncqSLen + ncqKeyLen + ncqPrefixLen
|
ncqIdxIsTombSize (NCQIdxEntry _ s) = s == ncqSLen + ncqKeyLen + ncqPrefixLen
|
||||||
|
@ -535,12 +540,6 @@ decodeEntry entryBs = do
|
||||||
NCQIdxEntry off size
|
NCQIdxEntry off size
|
||||||
{-# INLINE decodeEntry #-}
|
{-# INLINE decodeEntry #-}
|
||||||
|
|
||||||
ncqLocateActually :: MonadUnliftIO m => NCQStorage2 -> HashRef -> m (Maybe Location)
|
|
||||||
ncqLocateActually ncq href = do
|
|
||||||
inMem <- ncqLookupEntry ncq href <&> fmap (InMemory . ncqEntryData)
|
|
||||||
inFo <- listToMaybe <$> ncqSeekInFossils ncq href \loc -> pure (SeekStop [loc])
|
|
||||||
pure $ inMem <|> inFo
|
|
||||||
|
|
||||||
ncqLocate2 :: MonadUnliftIO m => NCQStorage2 -> HashRef -> m (Maybe Location)
|
ncqLocate2 :: MonadUnliftIO m => NCQStorage2 -> HashRef -> m (Maybe Location)
|
||||||
ncqLocate2 NCQStorage2{..} href = do
|
ncqLocate2 NCQStorage2{..} href = do
|
||||||
answ <- newEmptyTMVarIO
|
answ <- newEmptyTMVarIO
|
||||||
|
@ -578,7 +577,6 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do
|
||||||
debug $ yellow "indexing" <+> pretty fk
|
debug $ yellow "indexing" <+> pretty fk
|
||||||
idx <- ncqRunTaskNoMatterWhat ncq (ncqIndexFile ncq (DataFile fk))
|
idx <- ncqRunTaskNoMatterWhat ncq (ncqIndexFile ncq (DataFile fk))
|
||||||
ncqRunTaskNoMatterWhat ncq $ ncqStateUpdate ncq [F 0 fk]
|
ncqRunTaskNoMatterWhat ncq $ ncqStateUpdate ncq [F 0 fk]
|
||||||
debug $ "REMOVE ALL SHIT" <+> pretty idx
|
|
||||||
nwayHashMMapReadOnly idx >>= \case
|
nwayHashMMapReadOnly idx >>= \case
|
||||||
Nothing -> err $ "can't open index" <+> pretty idx
|
Nothing -> err $ "can't open index" <+> pretty idx
|
||||||
Just (bs,nway) -> do
|
Just (bs,nway) -> do
|
||||||
|
@ -1253,22 +1251,36 @@ ncqMergeStep ncq@NCQStorage2{..} = do
|
||||||
|
|
||||||
debug $ "merge: okay, good to go" <+> pretty (takeFileName mfile)
|
debug $ "merge: okay, good to go" <+> pretty (takeFileName mfile)
|
||||||
|
|
||||||
(mmIdx, nway) <- nwayHashMMapReadOnly fIndexNameA
|
idxA <- nwayHashMMapReadOnly fIndexNameA
|
||||||
>>= orThrow (NCQMergeInvariantFailed (show $ "can't mmap" <+> pretty fIndexNameA))
|
>>= orThrow (NCQMergeInvariantFailed (show $ "can't mmap" <+> pretty fIndexNameA))
|
||||||
|
|
||||||
|
|
||||||
|
idxB <- nwayHashMMapReadOnly fIndexNameB
|
||||||
|
>>= orThrow (NCQMergeInvariantFailed (show $ "can't mmap" <+> pretty fIndexNameB))
|
||||||
|
|
||||||
debug $ "SCAN FILE A" <+> pretty fDataNameA
|
debug $ "SCAN FILE A" <+> pretty fDataNameA
|
||||||
|
|
||||||
writeFiltered ncq fDataNameA fwh $ \_ _ _ v -> do
|
-- we write only record from A, that last in index(A) and not meta
|
||||||
|
|
||||||
|
writeFiltered ncq fDataNameA fwh $ \o _ k v -> do
|
||||||
let meta = Just M == ncqIsMeta v
|
let meta = Just M == ncqIsMeta v
|
||||||
pure $ not meta
|
liftIO (ncqLookupIndex (coerce k) idxA ) >>= \case
|
||||||
|
Just (NCQIdxEntry o1 _) | o1 == fromIntegral o -> pure $ not meta
|
||||||
|
_ -> pure $ False
|
||||||
|
|
||||||
|
-- we write only record from B, that last in index(B)
|
||||||
|
-- and not meta and not already written 'A' pass
|
||||||
|
|
||||||
debug $ "SCAN FILE B" <+> pretty fDataNameA
|
debug $ "SCAN FILE B" <+> pretty fDataNameA
|
||||||
|
|
||||||
writeFiltered ncq fDataNameB fwh $ \_ _ k v -> do
|
writeFiltered ncq fDataNameB fwh $ \o _ k v -> do
|
||||||
let meta = Just M == ncqIsMeta v
|
let meta = Just M == ncqIsMeta v
|
||||||
foundInA <- liftIO (nwayHashLookup nway mmIdx (coerce k)) <&> isJust
|
foundInA <- liftIO (ncqLookupIndex (coerce k) idxA) <&> isJust
|
||||||
let skip = foundInA || meta
|
actual <- liftIO (ncqLookupIndex (coerce k) idxB ) >>= \case
|
||||||
pure $ not skip
|
Just (NCQIdxEntry o1 _) | o1 == fromIntegral o -> pure $ not meta
|
||||||
|
_ -> pure $ False
|
||||||
|
|
||||||
|
pure $ not ( foundInA || meta || not actual )
|
||||||
|
|
||||||
appendTailSection =<< handleToFd fwh
|
appendTailSection =<< handleToFd fwh
|
||||||
|
|
||||||
|
@ -1305,15 +1317,30 @@ ncqCompactStep me@NCQStorage2{..} = withSem ncqMergeSem $ flip runContT pure $ c
|
||||||
<&> zip [0 :: Int ..]
|
<&> zip [0 :: Int ..]
|
||||||
<&> IntMap.fromList
|
<&> IntMap.fromList
|
||||||
|
|
||||||
for_ (IntMap.elems files) $ \fk -> do
|
(i,fkA,tombsA) <- lift (findFileA files) >>= maybe (exit ()) pure
|
||||||
|
|
||||||
let datF = ncqGetFileName me (toFileName (DataFile fk))
|
let (_,_,rest) = IntMap.splitLookup i files
|
||||||
dataSize <- liftIO (fileSize datF)
|
|
||||||
garbage <- lift $ getGargabeSlow fk mempty
|
|
||||||
|
|
||||||
let realProfit = sum (HM.elems garbage)
|
garbage0 <- lift $ getGarbageSlow fkA mempty
|
||||||
let pfl = (realToFrac realProfit / realToFrac dataSize) & realToFrac @_ @(Fixed E6)
|
|
||||||
notice $ "profit" <+> pretty fk <+> pretty dataSize <+> pretty realProfit <+> pretty pfl
|
-- FIXME: hardcode
|
||||||
|
(j,fkB,tombsB) <- lift (findClosestAmongst rest (HM.keysSet garbage0) 0.15)
|
||||||
|
>>= maybe (exit ()) pure
|
||||||
|
|
||||||
|
notice $ "found" <+> pretty fkA <+> pretty fkB
|
||||||
|
|
||||||
|
|
||||||
|
-- for_ (IntMap.elems rest) $ \fk -> do
|
||||||
|
|
||||||
|
-- let datF = ncqGetFileName me (toFileName (DataFile fk))
|
||||||
|
-- dataSize <- liftIO (fileSize datF)
|
||||||
|
-- garbage <- lift $ getGarbageSlow fk tombsA
|
||||||
|
|
||||||
|
-- let realProfit = sum (HM.elems garbage)
|
||||||
|
-- let kUse = realToFrac realProfit / (1 + realToFrac dataSize) :: Fixed E3
|
||||||
|
|
||||||
|
|
||||||
|
-- notice $ "profit" <+> pretty fk <+> pretty dataSize <+> pretty realProfit <+> pretty kUse
|
||||||
|
|
||||||
-- (aIdx, fileA, nTombs) <- findFileA files >>= maybe (exit ()) pure
|
-- (aIdx, fileA, nTombs) <- findFileA files >>= maybe (exit ()) pure
|
||||||
|
|
||||||
|
@ -1352,9 +1379,24 @@ ncqCompactStep me@NCQStorage2{..} = withSem ncqMergeSem $ flip runContT pure $ c
|
||||||
|
|
||||||
where
|
where
|
||||||
|
|
||||||
-- findFileA files = lift do
|
findFileA files = do
|
||||||
-- tnums <- for (IntMap.toList files) $ \(i, fk) -> (i, fk,) . HS.size <$> (getTombsInIndex =<< viewIndex fk)
|
tnums <- for (IntMap.toList files) $ \(i, fk) -> (i, fk,) <$> (getTombsInIndex =<< viewIndex fk)
|
||||||
-- pure $ listToMaybe ( List.sortOn ( Down . view _3 ) tnums )
|
pure $ listToMaybe ( List.sortOn ( Down . HS.size . view _3 ) tnums )
|
||||||
|
|
||||||
|
findClosestAmongst rest tombs ratio = flip runContT pure $ callCC \exit -> do
|
||||||
|
|
||||||
|
for_ (IntMap.toList rest) $ \(i,fk) -> do
|
||||||
|
|
||||||
|
let datF = ncqGetFileName me (toFileName (DataFile fk))
|
||||||
|
dataSize <- liftIO (fileSize datF)
|
||||||
|
garbage <- lift (getGarbageSlow fk tombs)
|
||||||
|
|
||||||
|
let realProfit = sum (HM.elems garbage)
|
||||||
|
let kUse = realToFrac realProfit / (1 + realToFrac dataSize)
|
||||||
|
|
||||||
|
when (kUse >= ratio) $ exit $ Just (i, fk, HM.keysSet garbage)
|
||||||
|
|
||||||
|
pure Nothing
|
||||||
|
|
||||||
viewIndex fk = do
|
viewIndex fk = do
|
||||||
let idxf = ncqGetFileName me $ toFileName (IndexFile fk)
|
let idxf = ncqGetFileName me $ toFileName (IndexFile fk)
|
||||||
|
@ -1374,21 +1416,28 @@ ncqCompactStep me@NCQStorage2{..} = withSem ncqMergeSem $ flip runContT pure $ c
|
||||||
when (HS.member (coerce k) tombs) $ S.yield $ let (NCQIdxEntry _ s) = decodeEntry v in s
|
when (HS.member (coerce k) tombs) $ S.yield $ let (NCQIdxEntry _ s) = decodeEntry v in s
|
||||||
pure (sum r)
|
pure (sum r)
|
||||||
|
|
||||||
getGargabeSlow :: MonadIO m => FileKey -> HashSet HashRef -> m (HashMap HashRef NCQSize)
|
getGarbageSlow :: MonadUnliftIO m => FileKey -> HashSet HashRef -> m (HashMap HashRef NCQSize)
|
||||||
getGargabeSlow fk tombs = do
|
getGarbageSlow fk tombs = do
|
||||||
let datFile = ncqGetFileName me (toFileName $ DataFile fk)
|
let datFile = ncqGetFileName me (toFileName $ DataFile fk)
|
||||||
idx <- viewIndex fk
|
idx <- viewIndex fk
|
||||||
|
|
||||||
mmaped <- liftIO (mmapFileByteString datFile Nothing)
|
|
||||||
|
|
||||||
r <- newTVarIO mempty
|
r <- newTVarIO mempty
|
||||||
runConsumeBS mmaped do
|
|
||||||
readSections $ \size bs -> do
|
ncqStorageScanDataFile me datFile $ \o s k v -> do
|
||||||
let k = coerce @_ @HashRef $ fst (ncqEntryUnwrap me (LBS.toStrict bs))
|
case ncqEntryUnwrapValue me v of
|
||||||
found <- isJust <$> lift (ncqLookupIndex k idx)
|
Left bs -> atomically $ modifyTVar' r (HM.insertWith (+) k (fromIntegral s))
|
||||||
let garbage = HS.member k tombs || not found
|
Right (t, bs) -> do
|
||||||
|
ncqLookupIndex k idx >>= \case
|
||||||
|
Nothing -> do
|
||||||
|
-- notice $ "not found in index" <+> pretty k
|
||||||
|
atomically $ modifyTVar' r (HM.insertWith (+) k (fromIntegral s))
|
||||||
|
|
||||||
|
Just (NCQIdxEntry oi _) -> do
|
||||||
|
let garbage = HS.member k tombs || oi /= fromIntegral o
|
||||||
|
when garbage do
|
||||||
|
-- notice $ "offset mismatch or tomb" <+> pretty o <+> pretty oi <+> pretty k
|
||||||
when garbage $ atomically do
|
when garbage $ atomically do
|
||||||
modifyTVar' r (HM.insertWith (+) k (fromIntegral size))
|
modifyTVar' r (HM.insertWith (+) k (fromIntegral s))
|
||||||
|
|
||||||
readTVarIO r
|
readTVarIO r
|
||||||
|
|
||||||
|
|
|
@ -792,17 +792,17 @@ testNCQ2Simple1 syn TestEnv{..} = do
|
||||||
|
|
||||||
notice $ "merge data"
|
notice $ "merge data"
|
||||||
|
|
||||||
-- ncqWithStorage ncqDir $ \sto -> liftIO do
|
ncqWithStorage ncqDir $ \sto -> liftIO do
|
||||||
-- notice "perform merge"
|
notice "perform merge"
|
||||||
-- ncqMergeFull sto
|
ncqMergeFull sto
|
||||||
-- ncqSweepStates sto
|
ncqSweepStates sto
|
||||||
-- ncqSweepFossils sto
|
ncqSweepFossils sto
|
||||||
|
|
||||||
-- notice $ "full sweep unused states"
|
notice $ "full sweep unused states"
|
||||||
|
|
||||||
-- ncqWithStorage ncqDir $ \sto -> liftIO do
|
ncqWithStorage ncqDir $ \sto -> liftIO do
|
||||||
-- ncqSweepStates sto
|
ncqSweepStates sto
|
||||||
-- ncqSweepFossils sto
|
ncqSweepFossils sto
|
||||||
|
|
||||||
notice $ "lookup" <+> pretty n <+> "blocks"
|
notice $ "lookup" <+> pretty n <+> "blocks"
|
||||||
|
|
||||||
|
@ -1663,17 +1663,30 @@ main = do
|
||||||
|
|
||||||
notice $ "should be deleted" <+> pretty (HS.size deleted) <+> "/" <+> pretty tnum
|
notice $ "should be deleted" <+> pretty (HS.size deleted) <+> "/" <+> pretty tnum
|
||||||
|
|
||||||
|
t0 <- getTimeCoarse
|
||||||
|
|
||||||
ncqCompactStep sto
|
ncqCompactStep sto
|
||||||
|
|
||||||
|
t1 <- getTimeCoarse
|
||||||
|
|
||||||
|
let dt = timeSpecDeltaSeconds @(Fixed E6) t0 t1
|
||||||
|
|
||||||
|
notice $ "ncqCompactStep time" <+> pretty dt
|
||||||
|
|
||||||
|
none
|
||||||
|
|
||||||
|
|
||||||
entry $ bindMatch "test:ncq2:del1" $ nil_ $ \syn -> do
|
entry $ bindMatch "test:ncq2:del1" $ nil_ $ \syn -> do
|
||||||
|
|
||||||
runTest $ \TestEnv{..} -> do
|
runTest $ \TestEnv{..} -> do
|
||||||
g <- liftIO MWC.createSystemRandom
|
g <- liftIO MWC.createSystemRandom
|
||||||
let dir = testEnvDir
|
let dir = testEnvDir
|
||||||
|
|
||||||
let (_, argz) = splitOpts [] syn
|
let (opts, argz) = splitOpts [("-m",0)] syn
|
||||||
let n = headDef 10000 [ fromIntegral x | LitIntVal x <- argz ]
|
let n = headDef 10000 [ fromIntegral x | LitIntVal x <- argz ]
|
||||||
|
|
||||||
|
let merge = or [ True | ListVal [StringLike "-m"] <- opts ]
|
||||||
|
|
||||||
thashes <- newTVarIO mempty
|
thashes <- newTVarIO mempty
|
||||||
|
|
||||||
ncqWithStorage dir $ \sto@NCQStorage2{..} -> do
|
ncqWithStorage dir $ \sto@NCQStorage2{..} -> do
|
||||||
|
@ -1692,9 +1705,6 @@ main = do
|
||||||
|
|
||||||
pure h
|
pure h
|
||||||
|
|
||||||
|
|
||||||
pause @'Seconds 5
|
|
||||||
|
|
||||||
atomically $ writeTVar thashes (HS.fromList hashes)
|
atomically $ writeTVar thashes (HS.fromList hashes)
|
||||||
|
|
||||||
flip runContT pure $ callCC \exit -> do
|
flip runContT pure $ callCC \exit -> do
|
||||||
|
@ -1717,6 +1727,10 @@ main = do
|
||||||
|
|
||||||
exit ()
|
exit ()
|
||||||
|
|
||||||
|
when merge do
|
||||||
|
ncqWithStorage dir \sto -> do
|
||||||
|
ncqMergeFull sto
|
||||||
|
|
||||||
ncqWithStorage dir $ \sto -> do
|
ncqWithStorage dir $ \sto -> do
|
||||||
-- notice "check deleted"
|
-- notice "check deleted"
|
||||||
hashes <- readTVarIO thashes
|
hashes <- readTVarIO thashes
|
||||||
|
|
Loading…
Reference in New Issue