mirror of https://github.com/voidlizard/hbs2
wip, wtf
This commit is contained in:
parent
e51b72d57c
commit
0c71a7dab0
|
@ -35,6 +35,7 @@ data NCQStorageException =
|
|||
| NCQStorageCantOpenCurrent
|
||||
| NCQStorageBrokenCurrent
|
||||
| NCQMergeInvariantFailed String
|
||||
| NCQCompactInvariantFailed String
|
||||
| NCQStorageCantLock FilePath
|
||||
| NCQStorageCantMapFile FilePath
|
||||
deriving stock (Show,Typeable)
|
||||
|
@ -48,7 +49,7 @@ instance IsString FileKey where
|
|||
fromString = FileKey . BS8.pack . dropExtension . takeFileName
|
||||
|
||||
instance Pretty FileKey where
|
||||
pretty (FileKey s) = parens ("file-key" <+> pretty (BS8.unpack s))
|
||||
pretty (FileKey s) = pretty (BS8.unpack s)
|
||||
|
||||
newtype DataFile a = DataFile a
|
||||
|
||||
|
|
|
@ -119,6 +119,12 @@ type NCQSize = Word32
|
|||
|
||||
type StateVersion = Word64
|
||||
|
||||
data NCQIdxEntry =
|
||||
NCQIdxEntry
|
||||
{ ncqIdxEntryOffset :: !NCQOffset
|
||||
, ncqIdxEntrySize :: !NCQSize
|
||||
}
|
||||
|
||||
data StateOP = D FileKey | F TimeSpec FileKey | P FileKey
|
||||
deriving (Eq,Ord,Show)
|
||||
|
||||
|
@ -372,6 +378,11 @@ ncqEntryUnwrap _ source = do
|
|||
Nothing -> (k, Left v)
|
||||
{-# INLINE ncqEntryUnwrap #-}
|
||||
|
||||
|
||||
ncqIdxIsTombSize :: NCQIdxEntry -> Bool
|
||||
ncqIdxIsTombSize (NCQIdxEntry _ s) = s == ncqSLen + ncqKeyLen + ncqPrefixLen
|
||||
{-# INLINE ncqIdxIsTombSize #-}
|
||||
|
||||
ncqIsTomb :: NCQStorage2 -> Location -> Bool
|
||||
ncqIsTomb me loc = case ncqEntryUnwrap me (ncqGetEntryBS me loc) of
|
||||
(_, Right (T, _)) -> True
|
||||
|
@ -501,7 +512,7 @@ ncqSeekInFossils ncq@NCQStorage2{..} href action = useVersion ncq $ const do
|
|||
Just CachedEntry{..} -> do
|
||||
liftIO (ncqLookupIndex href (cachedMmapedIdx, cachedNway)) >>= \case
|
||||
Nothing -> go (i+1) 0 r
|
||||
Just (offset, size) -> do
|
||||
Just (NCQIdxEntry offset size) -> do
|
||||
now <- getTimeCoarse
|
||||
atomically $ writeTVar cachedTs now
|
||||
action (InFossil tfKey cachedMmapedData offset size) >>= \case
|
||||
|
@ -514,17 +525,17 @@ ncqSeekInFossils ncq@NCQStorage2{..} href action = useVersion ncq $ const do
|
|||
ncqLookupIndex :: MonadUnliftIO m
|
||||
=> HashRef
|
||||
-> (ByteString, NWayHash)
|
||||
-> m (Maybe ( NCQOffset, NCQSize ))
|
||||
-> m (Maybe NCQIdxEntry )
|
||||
ncqLookupIndex hx (mmaped, nway) = do
|
||||
fmap decodeEntry <$> nwayHashLookup nway mmaped (coerce hx)
|
||||
{-# INLINE ncqLookupIndex #-}
|
||||
|
||||
decodeEntry :: ByteString -> ( NCQOffset, NCQSize )
|
||||
decodeEntry :: ByteString -> NCQIdxEntry
|
||||
decodeEntry entryBs = do
|
||||
let (p,r) = BS.splitAt 8 entryBs
|
||||
let off = fromIntegral (N.word64 p)
|
||||
let size = fromIntegral (N.word32 (BS.take 4 r))
|
||||
( off, size )
|
||||
NCQIdxEntry off size
|
||||
{-# INLINE decodeEntry #-}
|
||||
|
||||
ncqLocateActually :: MonadUnliftIO m => NCQStorage2 -> HashRef -> m (Maybe Location)
|
||||
|
@ -593,7 +604,7 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do
|
|||
CachedEntry{..} -> do
|
||||
ncqLookupIndex h (cachedMmapedIdx, cachedNway) >>= \case
|
||||
Nothing -> none
|
||||
Just (!offset,!size) -> do
|
||||
Just (NCQIdxEntry offset size) -> do
|
||||
answer (Just (InFossil fk cachedMmapedData offset size))
|
||||
next
|
||||
{-# INLINE lookupCached #-}
|
||||
|
@ -1222,10 +1233,13 @@ ncqMergeStep ncq@NCQStorage2{..} = do
|
|||
let fIndexNameA = ncqGetFileName ncq $ toFileName (IndexFile a)
|
||||
|
||||
let fDataNameB = ncqGetFileName ncq $ toFileName (DataFile b)
|
||||
let fIndexNameB = ncqGetFileName ncq $ toFileName (IndexFile b)
|
||||
|
||||
-- TODO: proper-exception-handling
|
||||
doesFileExist fDataNameA `orFail` ("not exist" <+> pretty fDataNameA)
|
||||
doesFileExist fDataNameB `orFail` ("not exist" <+> pretty fDataNameB)
|
||||
doesFileExist fIndexNameA `orFail` ("not exist" <+> pretty fIndexNameA)
|
||||
doesFileExist fIndexNameB `orFail` ("not exist" <+> pretty fIndexNameB)
|
||||
|
||||
flip runContT pure $ callCC \exit -> do
|
||||
|
||||
|
@ -1280,159 +1294,102 @@ ncqMergeStep ncq@NCQStorage2{..} = do
|
|||
r <- what
|
||||
unless r (throwIO (NCQMergeInvariantFailed (show e)))
|
||||
|
||||
ncqCompactStep :: forall m . MonadUnliftIO m => NCQStorage2 -> m ()
|
||||
ncqCompactStep me@NCQStorage2{..} = withSem ncqMergeSem $ flip runContT pure $ callCC \exit -> do
|
||||
ContT $ useVersion me
|
||||
|
||||
-- ncqCompact :: MonadUnliftIO m => NCQStorage2 -> m ()
|
||||
-- ncqCompact ncq@NCQStorage2{..} = withSem ncqMergeSem do
|
||||
files <- lift (ncqListTrackedFiles me)
|
||||
<&> filter (isNotPending . view _2) . V.toList
|
||||
<&> fmap (view _1)
|
||||
<&> zip [0 :: Int ..]
|
||||
<&> IntMap.fromList
|
||||
|
||||
-- q <- newTVarIO ( mempty :: HashMap FileKey (HashSet HashRef) )
|
||||
for_ (IntMap.elems files) $ \fk -> do
|
||||
|
||||
-- ncqLinearScanForCompact ncq $ \fk h -> atomically do
|
||||
-- modifyTVar q (HM.insertWith (<>) fk (HS.singleton h))
|
||||
let datF = ncqGetFileName me (toFileName (DataFile fk))
|
||||
dataSize <- liftIO (fileSize datF)
|
||||
garbage <- lift $ getGargabeSlow fk mempty
|
||||
|
||||
-- state0 <- readTVarIO q
|
||||
let realProfit = sum (HM.elems garbage)
|
||||
let pfl = (realToFrac realProfit / realToFrac dataSize) & realToFrac @_ @(Fixed E6)
|
||||
notice $ "profit" <+> pretty fk <+> pretty dataSize <+> pretty realProfit <+> pretty pfl
|
||||
|
||||
-- for_ (HM.toList state0) $ \(fk, es) -> do
|
||||
-- trace $ "TO DELETE" <+> pretty fk <+> pretty (HS.size es)
|
||||
-- (aIdx, fileA, nTombs) <- findFileA files >>= maybe (exit ()) pure
|
||||
|
||||
-- let fDataNameA = ncqGetFileName ncq (toFileName $ DataFile fk)
|
||||
-- notice $ green "compact: fileA" <+> pretty fileA <+> pretty aIdx <+> pretty nTombs
|
||||
|
||||
-- flip runContT pure do
|
||||
-- idxA <- lift (viewIndex fileA)
|
||||
-- tombs <- lift (getTombsInIndex idxA)
|
||||
|
||||
-- mfile <- ncqGetNewCompactName ncq
|
||||
-- let (_,self,b) = IntMap.splitLookup aIdx files
|
||||
|
||||
-- ContT $ bracket none $ const $ rm mfile
|
||||
-- notice $ green "pretty" <+> viaShow b
|
||||
|
||||
-- liftIO do
|
||||
-- withBinaryFileAtomic mfile WriteMode $ \fwh -> do
|
||||
-- writeFiltered ncq fDataNameA fwh $ \_ _ k v -> do
|
||||
-- pure $ not $ HS.member k es
|
||||
-- appendTailSection =<< handleToFd fwh
|
||||
-- for_ (IntMap.elems b) $ \fk -> callCC \skip -> do
|
||||
-- profit <- lift (getProfit fk tombs)
|
||||
|
||||
-- result <- fileSize mfile
|
||||
-- let datF = ncqGetFileName me (toFileName (DataFile fk))
|
||||
-- here <- doesFileExist datF
|
||||
|
||||
-- if result == 0 then do
|
||||
-- atomically $ modifyTVar ncqTrackedFiles (HPSQ.delete fk)
|
||||
-- else do
|
||||
-- unless here do
|
||||
-- throwIO (NCQCompactInvariantFailed (show $ "fossil exists" <+> pretty fk))
|
||||
|
||||
-- fossil <- ncqGetNewFossilName ncq
|
||||
-- mv mfile fossil
|
||||
-- dataSize <- liftIO (fileSize datF)
|
||||
|
||||
-- statA <- getFileStatus fDataNameA
|
||||
-- when (dataSize == 0) do
|
||||
-- notice $ "skipped" <+> pretty fk <+> pretty dataSize <+> pretty profit
|
||||
-- skip ()
|
||||
|
||||
-- let ts = modificationTimeHiRes statA
|
||||
-- setFileTimesHiRes fossil ts ts
|
||||
-- garbage <- lift (getGargabeSlow fk mempty)
|
||||
-- let realProfit = sum (HM.elems garbage)
|
||||
|
||||
-- void $ ncqIndexFile ncq (DataFile (fromString fossil))
|
||||
-- void $ ncqStateUpdate ncq [F (posixToTimeSpec ts) (fromString fossil)]
|
||||
-- let pfl = (realToFrac realProfit / realToFrac dataSize) & realToFrac @_ @(Fixed E6)
|
||||
|
||||
-- notice $ "profit" <+> pretty fk <+> pretty profit <+> pretty dataSize <+> pretty pfl <+> pretty realProfit
|
||||
|
||||
-- debug $ "compact done" <+> pretty (HM.size state0)
|
||||
-- none
|
||||
|
||||
where
|
||||
|
||||
-- NOTE: incremental
|
||||
-- now it may became incremental if we'll
|
||||
-- limit amount of tombs per one pass
|
||||
-- then remove all dead entries,
|
||||
-- then call again to remove tombs. etc
|
||||
-- as for now, seems it should work up to 10TB
|
||||
-- of storage
|
||||
ncqLinearScanForCompact :: MonadUnliftIO m
|
||||
=> NCQStorage2
|
||||
-> ( FileKey -> HashRef -> m () )
|
||||
-> m Int
|
||||
ncqLinearScanForCompact ncq@NCQStorage2{..} action = flip runContT pure do
|
||||
-- findFileA files = lift do
|
||||
-- tnums <- for (IntMap.toList files) $ \(i, fk) -> (i, fk,) . HS.size <$> (getTombsInIndex =<< viewIndex fk)
|
||||
-- pure $ listToMaybe ( List.sortOn ( Down . view _3 ) tnums )
|
||||
|
||||
ContT $ useVersion ncq
|
||||
viewIndex fk = do
|
||||
let idxf = ncqGetFileName me $ toFileName (IndexFile fk)
|
||||
liftIO (nwayHashMMapReadOnly idxf)
|
||||
>>= orThrow (NCQCompactInvariantFailed (show $ "index exists" <+> pretty fk))
|
||||
|
||||
tracked <- readTVarIO ncqTrackedFiles <&> V.toList
|
||||
getTombsInIndex :: MonadUnliftIO m => (ByteString, NWayHash) -> m (HashSet HashRef)
|
||||
getTombsInIndex (idxBs, nway) = HS.fromList <$> S.toList_ do
|
||||
nwayHashScanAll nway idxBs $ \_ k v -> do
|
||||
when (k /= ncqEmptyKey && ncqIdxIsTombSize (decodeEntry v) ) do
|
||||
S.yield (coerce @_ @HashRef k)
|
||||
|
||||
let state0 = mempty :: HashMap HashRef TimeSpec
|
||||
getProfit :: MonadIO m => FileKey -> HashSet HashRef -> m NCQSize
|
||||
getProfit fk tombs = do
|
||||
(bs,nw) <- viewIndex fk
|
||||
r <- S.toList_ $ nwayHashScanAll nw bs$ \_ k v -> do
|
||||
when (HS.member (coerce k) tombs) $ S.yield $ ncqIdxEntrySize (decodeEntry v)
|
||||
pure (sum r)
|
||||
|
||||
profit <- newTVarIO 0
|
||||
tombUse <- newTVarIO (mempty :: HashMap HashRef (FileKey, Int))
|
||||
getGargabeSlow :: MonadIO m => FileKey -> HashSet HashRef -> m (HashMap HashRef NCQSize)
|
||||
getGargabeSlow fk tombs = do
|
||||
let datFile = ncqGetFileName me (toFileName $ DataFile fk)
|
||||
idx <- viewIndex fk
|
||||
|
||||
-- TODO: explicit-unmap-files
|
||||
mmaped <- liftIO (mmapFileByteString datFile Nothing)
|
||||
|
||||
flip fix (tracked, state0) $ \next -> \case
|
||||
([], s) -> none
|
||||
((TrackedFile{..}):rest, state) -> do
|
||||
e <- readTVarIO tfCached
|
||||
r <- newTVarIO mempty
|
||||
runConsumeBS mmaped do
|
||||
readSections $ \size bs -> do
|
||||
let k = coerce @_ @HashRef $ fst (ncqEntryUnwrap me (LBS.toStrict bs))
|
||||
found <- isJust <$> lift (ncqLookupIndex k idx)
|
||||
let garbage = HS.member k tombs || not found
|
||||
when garbage $ atomically do
|
||||
modifyTVar' r (HM.insertWith (+) k (fromIntegral size))
|
||||
|
||||
let cqFile = ncqGetFileName ncq (toFileName (IndexFile tfKey))
|
||||
let dataFile = ncqGetFileName ncq (toFileName (DataFile tfKey))
|
||||
|
||||
c <- doesFileExist cqFile
|
||||
d <- doesFileExist dataFile
|
||||
let pending = not (isNotPending e)
|
||||
|
||||
if not c || not d || pending then
|
||||
next (rest, state)
|
||||
else do
|
||||
|
||||
|
||||
(mmaped,meta@NWayHash{..}) <- liftIO $ nwayHashMMapReadOnly cqFile
|
||||
>>= orThrow (NWayHashInvalidMetaData cqFile)
|
||||
|
||||
notice $ "SCAN" <+> pretty cqFile
|
||||
|
||||
let emptyKey = BS.replicate nwayKeySize 0
|
||||
|
||||
found <- S.toList_ do
|
||||
nwayHashScanAll meta mmaped $ \o k entryBs -> do
|
||||
unless (k == emptyKey) do
|
||||
|
||||
let off = N.word64 (BS.take 8 entryBs)
|
||||
let sz = N.word32 (BS.take 4 (BS.drop 8 entryBs))
|
||||
|
||||
-- debug $ "SCAN SHIT" <+> pretty tfKey <+> pretty off <+> pretty sz
|
||||
|
||||
-- fast-n-dirty-check-for-deleted
|
||||
when (sz <= ncqSLen + ncqKeyLen + ncqPrefixLen ) do
|
||||
debug $ red "FOUND EMPTY RECORD" <+> pretty sz
|
||||
S.yield off
|
||||
|
||||
let kk = coerce k
|
||||
|
||||
case HM.lookup kk state of
|
||||
Just ts | ts > timeSpecFromFilePrio tfTime -> do
|
||||
notice $ pretty kk <+> pretty (sz + ncqSLen)
|
||||
atomically do
|
||||
modifyTVar profit ( + (sz + ncqSLen) )
|
||||
modifyTVar tombUse (HM.adjust (over _2 succ) kk)
|
||||
lift $ lift $ action (fromString dataFile) kk
|
||||
|
||||
_ -> none
|
||||
|
||||
notice "SURVIVED 2"
|
||||
|
||||
newEntries <- S.toList_ do
|
||||
unless (List.null found) do
|
||||
notice $ red "TRY" <+> pretty dataFile
|
||||
dataBs <- liftIO $ mmapFileByteString dataFile Nothing
|
||||
notice "SURVIVED 3"
|
||||
for_ found $ \o -> do
|
||||
let pre = BS.take (fromIntegral ncqPrefixLen) (BS.drop (ncqDataOffset o) dataBs)
|
||||
|
||||
when (pre == ncqRefPrefix || pre == ncqTombPrefix) do
|
||||
let keyBs = BS.take ncqKeyLen (BS.drop (fromIntegral o + ncqSLen) dataBs)
|
||||
let key = coerce (BS.copy keyBs)
|
||||
unless (HM.member key state) do
|
||||
S.yield (key, timeSpecFromFilePrio tfTime)
|
||||
when ( pre == ncqTombPrefix ) do
|
||||
atomically $ modifyTVar tombUse (HM.insert key (tfKey,0))
|
||||
|
||||
next (rest, state <> HM.fromList newEntries)
|
||||
|
||||
use <- readTVarIO tombUse
|
||||
let useless = [ (f,h) | (h, (f,n)) <- HM.toList use, n == 0 ]
|
||||
|
||||
for_ useless $ \(f,h) -> do
|
||||
atomically $ modifyTVar profit (+ncqFullTombLen)
|
||||
lift $ action f h
|
||||
|
||||
notice "SURVIVED 3"
|
||||
|
||||
readTVarIO profit <&> fromIntegral
|
||||
readTVarIO r
|
||||
|
||||
ncqReadStateKeys :: forall m . MonadUnliftIO m => NCQStorage2 -> StateFile -> m [FileKey]
|
||||
ncqReadStateKeys me path = liftIO do
|
||||
|
|
|
@ -969,7 +969,7 @@ testNCQ2Lookup1 syn TestEnv{..} = do
|
|||
Just (CachedEntry{..}) -> do
|
||||
ncqLookupIndex h (cachedMmapedIdx, cachedNway) >>= \case
|
||||
Nothing -> none
|
||||
Just (o,s) -> atomically (putTMVar answ (Just (N2.InFossil tfKey cachedMmapedData o s))) >> next
|
||||
Just (NCQIdxEntry o s) -> atomically (putTMVar answ (Just (N2.InFossil tfKey cachedMmapedData o s))) >> next
|
||||
|
||||
Nothing -> do
|
||||
|
||||
|
@ -979,7 +979,7 @@ testNCQ2Lookup1 syn TestEnv{..} = do
|
|||
Just CachedEntry{..} -> do
|
||||
ncqLookupIndex h (cachedMmapedIdx, cachedNway) >>= \case
|
||||
Nothing -> none
|
||||
Just (o,s) -> atomically (putTMVar answ (Just (N2.InFossil tfKey cachedMmapedData o s))) >> next
|
||||
Just (NCQIdxEntry o s) -> atomically (putTMVar answ (Just (N2.InFossil tfKey cachedMmapedData o s))) >> next
|
||||
|
||||
atomically (putTMVar answ Nothing) >> next
|
||||
|
||||
|
@ -1663,34 +1663,7 @@ main = do
|
|||
|
||||
notice $ "should be deleted" <+> pretty (HS.size deleted) <+> "/" <+> pretty tnum
|
||||
|
||||
useVersion sto $ const do
|
||||
tfs <- N2.ncqListTrackedFiles sto <&> filter (isNotPending . view _2) . V.toList
|
||||
|
||||
t0 <- getTimeCoarse
|
||||
for_ tfs $ \(fk,_,_) -> void $ runMaybeT do
|
||||
|
||||
let idxf = N2.ncqGetFileName sto $ toFileName (IndexFile fk)
|
||||
|
||||
(idxBs, nway) <- liftIO $ nwayHashMMapReadOnly idxf
|
||||
>>= orThrowUser "can't mmap index"
|
||||
|
||||
stat' <- S.toList_ $ nwayHashScanAll nway idxBs $ \_ k v -> do
|
||||
unless (k == ncqEmptyKey) do
|
||||
let (o,s) = decodeEntry v
|
||||
when ( s == ncqSLen + ncqKeyLen + ncqPrefixLen ) do
|
||||
let hk = coerce @_ @HashRef k
|
||||
S.yield (fk, 1)
|
||||
|
||||
let stat = HM.fromListWith (+) stat'
|
||||
for_ (HM.toList stat) $ \(k, num) -> do
|
||||
notice $ pretty k <+> pretty num
|
||||
|
||||
t1 <- getTimeCoarse
|
||||
|
||||
let dt = realToFrac (toNanoSecs (t1 - t0)) * 1e-9 & sec3
|
||||
|
||||
notice $ "scan time" <+> pretty dt
|
||||
|
||||
ncqCompactStep sto
|
||||
|
||||
entry $ bindMatch "test:ncq2:del1" $ nil_ $ \syn -> do
|
||||
|
||||
|
|
Loading…
Reference in New Issue