hbs2/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs

1321 lines
41 KiB
Haskell

{-# Language MultiWayIf #-}
{-# Language RecordWildCards #-}
module HBS2.Storage.NCQ2
( module HBS2.Storage.NCQ2
, module HBS2.Storage.NCQ.Types
)
where
import HBS2.Prelude.Plated
import HBS2.Hash
import HBS2.OrDie
import HBS2.Data.Types.Refs
import HBS2.Base58
import HBS2.Net.Auth.Credentials
import HBS2.Storage
import HBS2.Misc.PrettyStuff
import HBS2.System.Logger.Simple.ANSI
import HBS2.Data.Log.Structured.NCQ
import HBS2.Data.Log.Structured.SD
import HBS2.Storage.NCQ.Types
import Data.Config.Suckless.System
import Data.Config.Suckless.Script hiding (void)
import Codec.Compression.Zstd qualified as Zstd
import Codec.Compression.Zstd.Lazy as ZstdL
import Codec.Compression.Zstd.Streaming qualified as ZstdS
import Codec.Compression.Zstd.Streaming (Result(..))
import Control.Applicative
import Data.ByteString.Builder
import Network.ByteOrder qualified as N
import Data.Bit.ThreadSafe qualified as BV
import Data.HashMap.Strict (HashMap)
import Control.Monad.Except
import Control.Monad.Trans.Cont
import Control.Monad.Trans.Maybe
import Data.Ord (Down(..),comparing)
import Control.Concurrent.STM qualified as STM
import Control.Concurrent.STM.TSem
import Data.HashPSQ qualified as HPSQ
import Data.HashPSQ (HashPSQ)
import Data.IntMap qualified as IntMap
import Data.IntMap (IntMap)
import Data.IntSet qualified as IntSet
import Data.IntSet (IntSet)
import Data.Sequence qualified as Seq
import Data.Sequence (Seq(..), (|>),(<|))
import Data.List qualified as List
import Data.ByteString.Lazy qualified as LBS
import Data.ByteString.Lazy.Char8 qualified as LBS8
import Data.ByteString (ByteString)
import Data.ByteString qualified as BS
import Data.ByteString.Char8 qualified as BS8
import Data.Char (isDigit)
import Data.Fixed
import Data.Coerce
import Data.Word
import Data.Either
import Data.Maybe
import Data.Text qualified as Text
import Data.Text.IO qualified as Text
import Data.Int
import Data.Vector qualified as V
import Data.Vector (Vector, (!))
import Lens.Micro.Platform
import Data.HashSet (HashSet)
import Data.HashSet qualified as HS
import Data.HashMap.Strict qualified as HM
import System.Directory (makeAbsolute)
import System.FilePath.Posix
import System.Posix.Fcntl
import System.Posix.Files qualified as Posix
import System.Posix.IO as PosixBase
import System.Posix.Types as Posix
import System.Posix.IO.ByteString as Posix
import System.Posix.Unistd
import System.Posix.Files ( getFileStatus
, modificationTimeHiRes
, setFileTimesHiRes
, getFdStatus
, FileStatus(..)
, setFileMode
)
import System.Posix.Files qualified as PFS
import System.IO.Error (catchIOError)
import System.IO.MMap as MMap
import System.IO.Temp (emptyTempFile)
import System.Mem
-- import Foreign.Ptr
-- import Foreign di
import qualified Data.ByteString.Internal as BSI
import Streaming.Prelude qualified as S
import UnliftIO
import UnliftIO.Concurrent(getNumCapabilities)
import UnliftIO.IO.File
import System.FileLock as FL
type FOff = Word64
newtype NCQEntry = NCQEntry ByteString
type Shard = TVar (HashMap HashRef NCQEntry)
type NCQOffset = Word64
type NCQSize = Word32
type StateVersion = Word64
data StateOP = D FileKey | F TimeSpec FileKey | P FileKey
deriving (Eq,Ord,Show)
data NCQFlag =
NCQMergeNow | NCQCompactNow
deriving (Eq,Ord,Generic)
data Location =
InFossil ByteString NCQOffset NCQSize
| InMemory ByteString
data NCQStorage2 =
NCQStorage2
{ ncqRoot :: FilePath
, ncqGen :: Int
, ncqSalt :: HashRef
, ncqFsync :: Int
, ncqWriteQLen :: Int
, ncqWriteBlock :: Int
, ncqMinLog :: Int
, ncqMaxLog :: Int
, ncqMaxCached :: Int
, ncqIdleThrsh :: Double
, ncqMemTable :: Vector Shard
, ncqWriteSem :: TSem
, ncqWriteQ :: TVar (Seq HashRef)
, ncqStorageTasks :: TVar Int
, ncqStorageStopReq :: TVar Bool
, ncqStorageSyncReq :: TVar Bool
, ncqMergeReq :: TVar Bool
, ncqMergeSem :: TSem
, ncqSyncNo :: TVar Int
, ncqCurrentFiles :: TVar (HashSet FileKey)
, ncqTrackedFiles :: TVar (HashPSQ FileKey FilePrio (Maybe CachedEntry))
, ncqStateVersion :: TVar StateVersion
, ncqStateUsage :: TVar (IntMap (Int, HashSet FileKey))
, ncqStateName :: TVar (Maybe FilePath)
, ncqCachedEntries :: TVar Int
, ncqWrites :: TVar Int
, ncqWriteEMA :: TVar Double -- for writes-per-seconds
, ncqJobQ :: TQueue (IO ())
}
megabytes :: forall a . Integral a => a
megabytes = 1024 ^ 2
gigabytes :: forall a . Integral a => a
gigabytes = 1024 ^ 3
ncqStorageOpen2 :: MonadIO m => FilePath -> (NCQStorage2 -> NCQStorage2)-> m NCQStorage2
ncqStorageOpen2 fp upd = do
let ncqRoot = fp
let ncqGen = 0
let ncqFsync = 16 * megabytes
let ncqWriteQLen = 1024 * 16
let ncqMinLog = 256 * megabytes
let ncqMaxLog = 16 * gigabytes -- ???
let ncqWriteBlock = 1024
let ncqMaxCached = 128
let ncqIdleThrsh = 50.00
cap <- getNumCapabilities <&> fromIntegral
ncqWriteQ <- newTVarIO mempty
ncqWriteSem <- atomically $ newTSem 16 -- (fromIntegral cap)
ncqMemTable <- V.fromList <$> replicateM cap (newTVarIO mempty)
ncqStorageStopReq <- newTVarIO False
ncqStorageSyncReq <- newTVarIO False
ncqMergeReq <- newTVarIO False
ncqMergeSem <- atomically (newTSem 1)
ncqSyncNo <- newTVarIO 0
ncqCurrentFiles <- newTVarIO mempty
ncqTrackedFiles <- newTVarIO HPSQ.empty
ncqStateVersion <- newTVarIO 0
ncqStateUsage <- newTVarIO mempty
ncqStateName <- newTVarIO Nothing
ncqCachedEntries <- newTVarIO 0
ncqStorageTasks <- newTVarIO 0
ncqWrites <- newTVarIO 0
ncqWriteEMA <- newTVarIO 0.00
ncqJobQ <- newTQueueIO
let ncqSalt = "EstEFasxrCFqsGDxcY4haFcha9e4ZHRzsPbGUmDfdxLk"
let ncq = NCQStorage2{..} & upd
mkdir (ncqGetWorkDir ncq)
liftIO $ withSem ncqMergeSem do
ncqRepair ncq
ncqLoadIndexes ncq
pure ncq
ncqWithStorage :: MonadUnliftIO m => FilePath -> ( NCQStorage2 -> m a ) -> m a
ncqWithStorage fp action = flip runContT pure do
sto <- lift (ncqStorageOpen2 fp id)
w <- ContT $ withAsync (ncqStorageRun2 sto)
link w
r <- lift (action sto)
lift (ncqStorageStop2 sto)
wait w
pure r
ncqGetFileName :: NCQStorage2 -> FilePath -> FilePath
ncqGetFileName ncq fp = ncqGetWorkDir ncq </> takeFileName fp
ncqGetWorkDir :: NCQStorage2 -> FilePath
ncqGetWorkDir NCQStorage2{..} = ncqRoot </> show ncqGen
ncqGetLockFileName :: NCQStorage2 -> FilePath
ncqGetLockFileName ncq = ncqGetFileName ncq ".lock"
ncqGetNewFossilName :: MonadIO m => NCQStorage2 -> m FilePath
ncqGetNewFossilName ncq = do
liftIO $ emptyTempFile (ncqGetWorkDir ncq) "fossil-.data"
ncqGetNewStateName :: MonadIO m => NCQStorage2 -> m FilePath
ncqGetNewStateName ncq = do
liftIO $ emptyTempFile (ncqGetWorkDir ncq) "state-"
ncqGetNewCompactName :: MonadIO m => NCQStorage2 -> m FilePath
ncqGetNewCompactName n@NCQStorage2{} = do
let (p,tpl) = splitFileName (ncqGetFileName n "compact-.data")
liftIO $ emptyTempFile p tpl
ncqStorageStop2 :: MonadUnliftIO m => NCQStorage2 -> m ()
ncqStorageStop2 NCQStorage2{..} = do
atomically $ writeTVar ncqStorageStopReq True
ncqStorageSync2 :: MonadUnliftIO m => NCQStorage2 -> m ()
ncqStorageSync2 NCQStorage2{..} = do
atomically $ writeTVar ncqStorageSyncReq True
ncqShardIdx :: NCQStorage2 -> HashRef -> Int
ncqShardIdx NCQStorage2{..} h =
fromIntegral (BS.head (coerce h)) `mod` V.length ncqMemTable
{-# INLINE ncqShardIdx #-}
ncqGetShard :: NCQStorage2 -> HashRef -> Shard
ncqGetShard ncq@NCQStorage2{..} h = ncqMemTable ! ncqShardIdx ncq h
{-# INLINE ncqGetShard #-}
ncqLookupEntrySTM :: NCQStorage2 -> HashRef -> STM (Maybe NCQEntry)
ncqLookupEntrySTM ncq h = readTVar (ncqGetShard ncq h) <&> HM.lookup h
ncqPutBS :: MonadUnliftIO m
=> NCQStorage2
-> Maybe NCQSectionType
-> Maybe HashRef
-> ByteString
-> m HashRef
ncqPutBS ncq@NCQStorage2{..} mtp mhref bs' = do
let h = fromMaybe (HashRef (hashObject @HbSync bs')) mhref
let bs = ncqMakeSectionBS mtp h bs'
atomically do
waitTSem ncqWriteSem
modifyTVar' ncqWrites succ
stop <- readTVar ncqStorageStopReq
filled <- readTVar ncqWriteQ <&> Seq.length
when (not stop && filled > ncqWriteQLen) STM.retry
ncqAlterEntrySTM ncq h $ \case
Just e -> Just e
Nothing -> Just (NCQEntry bs)
modifyTVar' ncqWriteQ (|> h)
signalTSem ncqWriteSem
pure h
ncqLookupEntry :: MonadUnliftIO m => NCQStorage2 -> HashRef -> m (Maybe NCQEntry)
ncqLookupEntry sto hash = atomically (ncqLookupEntrySTM sto hash)
ncqGetEntryBS :: NCQStorage2 -> Location -> ByteString
ncqGetEntryBS _ = \case
InMemory bs -> bs
InFossil mmap off size -> do
BS.take (fromIntegral size) $ BS.drop (fromIntegral off) mmap
ncqEntrySize :: forall a . Integral a => Location -> a
ncqEntrySize = \case
InFossil _ _ size -> fromIntegral size
InMemory bs -> fromIntegral (BS.length bs)
ncqLocate2 :: MonadUnliftIO m => NCQStorage2 -> HashRef -> m (Maybe Location)
ncqLocate2 ncq@NCQStorage2{..} href = flip runContT pure $ callCC \exit -> do
now <- getTimeCoarse
lift (ncqLookupEntry ncq href) >>= maybe none (exit . Just . InMemory . coerce)
atomically do
modifyTVar' ncqWrites succ
-- FIXME: race
-- merge can-delete-file-while-in-use
tracked <- readTVarIO ncqTrackedFiles <&> HPSQ.toList
for_ tracked $ \(fk, prio, mCached) -> callCC \skip -> do
case mCached of
Just CachedEntry{..} -> do
lookupEntry href (cachedMmapedIdx, cachedNway) >>= \case
Nothing -> none
Just (offset,size) -> do
atomically $ writeTVar cachedTs now
exit (Just $ InFossil cachedMmapedData offset size)
Just PendingEntry {} -> none
Nothing -> useVersion do
let indexFile = ncqGetFileName ncq (toFileName (IndexFile fk))
let dataFile = ncqGetFileName ncq (toFileName (DataFile fk))
idxHere <- doesFileExist indexFile
unless idxHere do
err $ red "missed index" <+> "in ncqLocate" <+> pretty fk
skip ()
(idxBs, idxNway) <- liftIO (nwayHashMMapReadOnly indexFile)
>>= orThrow (NCQStorageCantMapFile indexFile)
datBs <- liftIO $ mmapFileByteString dataFile Nothing
ce <- CachedEntry idxBs datBs idxNway <$> newTVarIO now
lookupEntry href (idxBs, idxNway) >>= \case
Nothing -> none
Just (offset, size) -> do
atomically do
modifyTVar ncqTrackedFiles (HPSQ.insert fk prio (Just ce))
modifyTVar ncqCachedEntries (+1)
evictIfNeededSTM ncq (Just 1)
exit $ Just (InFossil datBs offset size)
pure mzero
where
useVersion m = ContT (bracket succV predV) >> m
where
succV = atomically (ncqStateUseSTM ncq)
predV = const $ atomically (ncqStateUseSTM ncq)
lookupEntry (hx :: HashRef) (mmaped, nway) =
liftIO (nwayHashLookup nway mmaped (coerce hx)) >>= \case
Nothing -> pure Nothing
Just entryBs -> do
pure $ Just
( fromIntegral $ N.word64 (BS.take 8 entryBs)
, fromIntegral $ N.word32 (BS.take 4 (BS.drop 8 entryBs)) )
{-# INLINE lookupEntry #-}
ncqAlterEntrySTM :: NCQStorage2 -> HashRef -> (Maybe NCQEntry -> Maybe NCQEntry) -> STM ()
ncqAlterEntrySTM ncq h alterFn = do
let shard = ncqGetShard ncq h
modifyTVar shard (HM.alter alterFn h)
ncqStorageDel :: MonadUnliftIO m => NCQStorage2 -> HashRef -> m ()
ncqStorageDel ncq@NCQStorage2{..} h = flip runContT pure $ callCC \exit -> do
-- 1. absent
-- 1. in memtable only
-- 2. in disk
none
data RunSt =
RunNew
| RunWrite (FileKey, Fd, Int, Int)
| RunSync (FileKey, Fd, Int, Int, Bool)
| RunFin
ncqStorageRun2 :: forall m . MonadUnliftIO m
=> NCQStorage2
-> m ()
ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do
closeQ <- newTQueueIO
closer <- spawnActivity $ liftIO $ fix \loop -> do
what <- atomically do
stop <- readTVar ncqStorageStopReq
tryReadTQueue closeQ >>= \case
Just e -> pure $ Just e
Nothing | not stop -> STM.retry
| otherwise -> pure Nothing
maybe1 what none $ \(fk, fh) -> do
closeFd fh
-- notice $ yellow "indexing" <+> pretty fname
idx <- ncqRunTaskNoMatterWhat ncq (ncqIndexFile ncq (DataFile fk))
ncqRunTaskNoMatterWhat ncq $ ncqStateUpdate ncq [F 0 fk]
nwayHashMMapReadOnly idx >>= \case
Nothing -> err $ "can't open index" <+> pretty idx
Just (bs,nway) -> do
nwayHashScanAll nway bs $ \_ k _ -> do
unless (k == emptyKey) $ atomically do
ncqAlterEntrySTM ncq (coerce k) (const Nothing)
-- atomically (modifyTVar' ncqCurrentFiles (HS.delete fk))
loop
spawnActivity $ forever (liftIO $ join $ atomically (readTQueue ncqJobQ))
spawnActivity measureWPS
-- FIXME: bigger-period
spawnActivity $ forever $ (>> pause @'Seconds 300) $ do
ema <- readTVarIO ncqWriteEMA
when (ema < ncqIdleThrsh * 1.5) do
ncqSweepStates ncq
ncqSweepFossils ncq
spawnActivity $ fix \again -> (>> again) do
ema <- readTVarIO ncqWriteEMA
mergeReq <- atomically $ stateTVar ncqMergeReq (,False)
if ema > ncqIdleThrsh && not mergeReq then do
pause @'Seconds 10
else do
mq <- newEmptyTMVarIO
spawnJob $ do
merged <- ncqMergeStep ncq
atomically $ putTMVar mq merged
-- TODO: detect-dead-merge
void $ race (pause @'Seconds 300) (atomically $ readTMVar mq) >>= \case
Left{} -> none
Right True -> none
Right False -> do
debug "merge: all done, wait..."
n0 <- readTVarIO ncqTrackedFiles <&> HPSQ.size
atomically do
n <- readTVar ncqTrackedFiles <&> HPSQ.size
when (n == n0) STM.retry
ContT $ bracket none $ const $ liftIO do
fhh <- atomically (STM.flushTQueue closeQ)
for_ fhh ( closeFd . snd )
flip fix RunNew $ \loop -> \case
RunFin -> do
debug "wait finalizing"
atomically $ pollSTM closer >>= maybe STM.retry (const none)
debug "exit storage"
RunNew -> do
stop <- readTVarIO ncqStorageStopReq
mt <- readTVarIO ncqWriteQ <&> Seq.null
if stop && mt then do
loop RunFin
else do
(fk,fhx) <- openNewDataFile
atomically $ modifyTVar' ncqCurrentFiles (HS.insert fk)
loop $ RunWrite (fk,fhx,0,0)
RunSync (fk, fh, w, total, continue) -> do
stop <- readTVarIO ncqStorageStopReq
sync <- readTVarIO ncqStorageSyncReq
let needClose = total >= ncqMinLog || stop
rest <- if not (sync || needClose || w > ncqFsync) then
pure w
else do
appendTailSection fh >> liftIO (fileSynchronise fh)
-- FIXME: slow!
-- to make it appear in state, but to ignore until index is done
liftIO (ncqStateUpdate ncq [P fk])
atomically do
writeTVar ncqStorageSyncReq False
modifyTVar' ncqSyncNo succ
pure 0
if | needClose && continue -> do
atomically $ writeTQueue closeQ (fk, fh)
loop RunNew
| not continue -> loop RunFin
| otherwise -> loop $ RunWrite (fk, fh, rest, total)
RunWrite (fk, fh, w, total') -> do
chunk <- atomically do
stop <- readTVar ncqStorageStopReq
sy <- readTVar ncqStorageSyncReq
chunk <- stateTVar ncqWriteQ (Seq.splitAt ncqWriteBlock)
if | Seq.null chunk && stop -> pure $ Left ()
| Seq.null chunk && not (stop || sy) -> STM.retry
| otherwise -> pure $ Right chunk
case chunk of
Left{} -> loop $ RunSync (fk, fh, w, total', False) -- exit ()
Right chu -> do
ws <- for chu $ \h -> do
atomically (ncqLookupEntrySTM ncq h) >>= \case
Just (NCQEntry bs) -> do
lift (appendSection fh bs)
_ -> pure 0
let written = sum ws
loop $ RunSync (fk, fh, w + written, total' + written, True)
where
emptyKey = BS.replicate ncqKeyLen 0
openNewDataFile :: forall mx . MonadIO mx => mx (FileKey, Fd)
openNewDataFile = do
fname <- ncqGetNewFossilName ncq
touch fname
let flags = defaultFileFlags { exclusive = False, creat = Just 0o666 }
(fromString fname,) <$> liftIO (PosixBase.openFd fname Posix.ReadWrite flags)
spawnJob :: IO () -> m ()
spawnJob m = atomically $ writeTQueue ncqJobQ m
spawnActivity m = do
a <- ContT $ withAsync m
link a
pure a
measureWPS = void $ flip fix Nothing \loop -> \case
Nothing -> do
w <- readTVarIO ncqWrites
t <- getTimeCoarse
pause @'Seconds step >> loop (Just (w,t))
Just (w0,t0) -> do
w1 <- readTVarIO ncqWrites
t1 <- getTimeCoarse
let dt = max 1e-9 (realToFrac @_ @Double (t1 - t0)) / 1e9
dw = fromIntegral (w1 - w0)
atomically $ modifyTVar' ncqWriteEMA \ema -> alpha * (dw/dt) + 0.9 * ema
pause @'Seconds step >> loop (Just (w1,t1))
where
alpha = 0.1
step = 1.00
ncqFileFastCheck :: MonadUnliftIO m => FilePath -> m ()
ncqFileFastCheck fp = do
-- debug $ "ncqFileFastCheck" <+> pretty fp
mmaped <- liftIO $ mmapFileByteString fp Nothing
let size = BS.length mmaped
let s = BS.drop (size - 8) mmaped & N.word64
unless ( BS.length mmaped == fromIntegral s ) do
throwIO $ NCQFsckIssueExt (FsckInvalidFileSize (fromIntegral s))
ncqStorageScanDataFile :: MonadIO m
=> NCQStorage2
-> FilePath
-> ( Integer -> Integer -> HashRef -> ByteString -> m () )
-> m ()
ncqStorageScanDataFile ncq fp' action = do
let fp = ncqGetFileName ncq fp'
mmaped <- liftIO (mmapFileByteString fp Nothing)
flip runContT pure $ callCC \exit -> do
flip fix (0,mmaped) $ \next (o,bs) -> do
when (BS.length bs < ncqSLen) $ exit ()
let w = BS.take ncqSLen bs & N.word32 & fromIntegral
when (BS.length bs < ncqSLen + w) $ exit ()
let kv = BS.drop ncqSLen bs
let k = BS.take ncqKeyLen kv & coerce @_ @HashRef
let v = BS.take (ncqFullDataLen (NCQFullRecordLen w)) $ BS.drop ncqKeyLen kv
lift (action o (fromIntegral w) k v)
next (ncqSLen + o + fromIntegral w, BS.drop (w+ncqSLen) bs)
ncqIndexFile :: MonadUnliftIO m => NCQStorage2 -> DataFile FileKey -> m FilePath
ncqIndexFile n@NCQStorage2{} fk = do
let fp = toFileName fk & ncqGetFileName n
let dest = toFileName (IndexFile (coerce @_ @FileKey fk)) & ncqGetFileName n
debug $ "INDEX" <+> pretty fp <+> pretty dest
items <- S.toList_ do
ncqStorageScanDataFile n fp $ \o w k _ -> do
let rs = w - 32 & fromIntegral @_ @Word32 & N.bytestring32
let os = fromIntegral @_ @Word64 o & N.bytestring64
let record = os <> rs
-- debug $ "write record" <+> pretty (BS.length record)
S.yield (coerce k, record)
let (dir,name) = splitFileName fp
let idxTemp = (dropExtension name <> "-") `addExtension` ".cq$"
result <- nwayWriteBatch (nwayAllocDef 1.10 32 8 12) dir idxTemp items
mv result dest
pure dest
ncqAddTrackedFile :: MonadIO m => NCQStorage2 -> DataFile FileKey -> m Bool
ncqAddTrackedFile ncq@NCQStorage2{..} fkey = flip runContT pure $ callCC \exit -> do
let fname = ncqGetFileName ncq (toFileName fkey)
let idxName = ncqGetFileName ncq (toFileName (IndexFile (coerce @_ @FileKey fkey)))
idxHere <- doesFileExist idxName
unless idxHere do
err $ "Index does not exist" <+> pretty (takeFileName idxName)
exit False
stat <- liftIO $ PFS.getFileStatus fname
-- FIXME: maybe-creation-time-actually
let ts = posixToTimeSpec $ PFS.modificationTimeHiRes stat
let fk = fromString (takeFileName fname)
atomically $ ncqAddTrackedFileSTM ncq fk ts
pure True
ncqAddTrackedFileSTM :: NCQStorage2 -> FileKey -> TimeSpec -> STM ()
ncqAddTrackedFileSTM NCQStorage2{..} fk ts = do
modifyTVar' ncqTrackedFiles (HPSQ.insert fk (FilePrio (Down ts)) Nothing)
{-# INLINE ncqAddTrackedFileSTM #-}
evictIfNeededSTM :: NCQStorage2 -> Maybe Int -> STM ()
evictIfNeededSTM NCQStorage2{..} howMany = do
cur <- readTVar ncqCachedEntries
let need = fromMaybe cur howMany
excess = max 0 (cur + need - ncqMaxCached)
when (excess > 0) do
files <- readTVar ncqTrackedFiles <&> HPSQ.toList
oldest <- forM files \case
(k, prio, Just ce) -> do
ts <- readTVar (cachedTs ce)
pure (Just (ts, k, prio))
_ -> pure Nothing
let victims =
oldest
& catMaybes
& List.sortOn (\(ts,_,_) -> ts)
& List.take excess
for_ victims $ \(_,k,prio) -> do
modifyTVar ncqTrackedFiles (HPSQ.insert k prio Nothing)
modifyTVar ncqCachedEntries (subtract 1)
{- HLINT ignore "Functor law" -}
ncqListTrackedFiles :: MonadIO m => NCQStorage2 -> m [FilePath]
ncqListTrackedFiles ncq = do
let wd = ncqGetWorkDir ncq
dirFiles wd
>>= mapM (pure . takeFileName)
<&> List.filter (\f -> List.isPrefixOf "fossil-" f && List.isSuffixOf ".data" f)
<&> HS.toList . HS.fromList
ncqListStateFiles :: MonadIO m => NCQStorage2 -> m [(TimeSpec, FilePath)]
ncqListStateFiles ncq = do
let wd = ncqGetWorkDir ncq
dirFiles wd
>>= mapM (pure . takeBaseName)
<&> List.filter (List.isPrefixOf "state-")
>>= mapM (\x -> (,x) <$> timespecOf x)
<&> List.sortOn Down
where
timespecOf x = liftIO do
posixToTimeSpec . modificationTimeHiRes <$> getFileStatus (ncqGetFileName ncq x)
ncqLoadSomeIndexes :: MonadIO m => NCQStorage2 -> [FileKey] -> m ()
ncqLoadSomeIndexes ncq@NCQStorage2{..} keys = do
now <- getTimeCoarse
mapM_ (ncqAddTrackedFile ncq) (fmap DataFile keys)
loaded <- catMaybes <$> forM keys \key -> runMaybeT do
mEntry <- liftIO $ readTVarIO ncqTrackedFiles <&> HPSQ.lookup key
guard (maybe True (\(_, m) -> isNothing m) mEntry)
let idxFile = ncqGetFileName ncq (toFileName $ IndexFile key)
let datFile = ncqGetFileName ncq (toFileName $ DataFile key)
(mmIdx, nway) <- MaybeT $ liftIO $ nwayHashMMapReadOnly idxFile
mmData <- liftIO $ mmapFileByteString datFile Nothing
tnow <- newTVarIO now
pure (key, CachedEntry mmIdx mmData nway tnow)
atomically do
evictIfNeededSTM ncq (Just (List.length loaded))
for_ loaded \(k, ce) -> do
files <- readTVar ncqTrackedFiles
case HPSQ.lookup k files of
Just (p, Nothing) -> do
modifyTVar ncqTrackedFiles (HPSQ.insert k p (Just ce))
modifyTVar ncqCachedEntries (+1)
_ -> pure ()
ncqLoadIndexes :: MonadIO m => NCQStorage2 -> m ()
ncqLoadIndexes ncq@NCQStorage2{..} = do
w <- readTVarIO ncqTrackedFiles
<&> List.take (ncqMaxCached `div` 2) . HPSQ.keys
ncqLoadSomeIndexes ncq w
ncqRepair :: MonadIO m => NCQStorage2 -> m ()
ncqRepair me@NCQStorage2{..} = do
states <- ncqListStateFiles me <&> fmap snd
fossils <- flip fix states $ \next -> \case
[] -> do
warn $ yellow "no valid state found; start from scratch"
ncqListTrackedFiles me <&> fmap (DataFile . fromString)
(s:ss) -> tryLoadState s >>= \case
Just files -> do
debug $ yellow "used state" <+> pretty s
atomically $ writeTVar ncqStateName (Just $ takeFileName s)
pure files
Nothing -> do
warn $ red "inconsistent state" <+> pretty s
next ss
mapM_ (ncqAddTrackedFile me) fossils
void $ liftIO (ncqStateUpdate me mempty)
where
readState path = ncqReadStateKeys me path <&> fmap DataFile
tryLoadState path = liftIO do
debug $ "tryLoadState" <+> pretty path
state <- readState path
let checkFile fo = flip fix 0 $ \next i -> do
let dataFile = ncqGetFileName me (toFileName fo)
let indexFile = ncqGetFileName me (toFileName (IndexFile (coerce @_ @FileKey fo)))
-- debug $ "checkFile" <+> pretty dataFile
try @_ @SomeException (ncqFileFastCheck dataFile) >>= \case
Left e -> do
err (viaShow e)
here <- doesFileExist dataFile
when here do
let broken = dropExtension dataFile `addExtension` ".broken"
mv dataFile broken
rm indexFile
warn $ red "renamed" <+> pretty dataFile <+> pretty broken
pure False
Right{} | i > 1 -> pure False
Right{} -> do
exists <- doesFileExist indexFile
if exists then do
pure True
else do
debug $ "indexing" <+> pretty (toFileName fo)
r <- ncqIndexFile me fo
debug $ "indexed" <+> pretty r
next (succ i)
results <- forM state checkFile
pure $ if and results then Just state else Nothing
ncqRefHash :: NCQStorage2 -> HashRef -> HashRef
ncqRefHash NCQStorage2 {..} h = HashRef (hashObject (coerce @_ @ByteString h <> coerce ncqSalt))
ncqRunTaskNoMatterWhat :: MonadUnliftIO m => NCQStorage2 -> m a -> m a
ncqRunTaskNoMatterWhat NCQStorage2{..} task = do
atomically (modifyTVar ncqStorageTasks succ)
task `finally` atomically (modifyTVar ncqStorageTasks pred)
ncqRunTask :: MonadUnliftIO m => NCQStorage2 -> a -> m a -> m a
ncqRunTask ncq@NCQStorage2{..} def task = readTVarIO ncqStorageStopReq >>= \case
True -> pure def
False -> ncqRunTaskNoMatterWhat ncq task
ncqWaitTasks :: MonadUnliftIO m => NCQStorage2 -> m ()
ncqWaitTasks NCQStorage2{..} = atomically do
tno <- readTVar ncqStorageTasks
when (tno > 0) STM.retry
ncqStateUseSTM :: NCQStorage2 -> STM ()
ncqStateUseSTM NCQStorage2{..} = do
k <- readTVar ncqStateVersion <&> fromIntegral
modifyTVar' ncqStateUsage (IntMap.update (Just . over _1 succ) k)
ncqStateUnuseSTM :: NCQStorage2 -> STM ()
ncqStateUnuseSTM NCQStorage2{..} = do
k <- readTVar ncqStateVersion <&> fromIntegral
modifyTVar' ncqStateUsage (IntMap.update (Just . over _1 pred) k)
ncqStateUpdate :: MonadUnliftIO m => NCQStorage2 -> [StateOP] -> m Bool
ncqStateUpdate me@NCQStorage2{..} ops' = flip runContT pure $ callCC \exit -> do
t1 <- fromIntegral <$> liftIO getTimeCoarse
keys0 <- readTVarIO ncqTrackedFiles <&> HPSQ.keys
ops <- for ops' $ \case
f@(F _ fk) -> do
let datFile = ncqGetFileName me (toFileName $ DataFile fk)
e2 <- doesFileExist datFile
unless e2 do
err $ "ncqStateUpdate invariant fail" <+> pretty datFile
exit False
ts <- liftIO (getFileStatus datFile) <&>
posixToTimeSpec . PFS.modificationTimeHiRes
pure (F ts fk)
d -> pure d
changed <- atomically do
t0 <- readTVar ncqStateVersion
let k0 = fromIntegral t0
c <- if List.null ops then do
pure False
else do
writeTVar ncqStateVersion (max (succ t0) t1)
for_ ops $ \case
D fk -> modifyTVar' ncqTrackedFiles (HPSQ.delete fk)
F t fk -> ncqAddTrackedFileSTM me (coerce fk) t
P fk -> do
let onlyIfMissed = \case
Nothing -> ((), Just (FilePrio (Down 0), Just PendingEntry))
Just (p,v) -> ((), Just (p,v))
modifyTVar' ncqTrackedFiles (snd . HPSQ.alter onlyIfMissed fk)
pure True
old <- readTVar ncqTrackedFiles <&> HS.fromList . HPSQ.keys
let doAlter = \case
Nothing -> Just (0, old)
Just (u,f) -> Just (u,f)
modifyTVar' ncqStateUsage (IntMap.alter doAlter k0)
k1 <- readTVar ncqTrackedFiles <&> HPSQ.keys
pure (c && k1 /= keys0)
when changed $ liftIO do
name <- ncqDumpCurrentState me
atomically $ writeTVar ncqStateName (Just name)
pure changed
ncqDumpCurrentState :: MonadUnliftIO m => NCQStorage2 -> m FilePath
ncqDumpCurrentState me@NCQStorage2{..} = do
keys <- readTVarIO ncqTrackedFiles <&> List.sort . HPSQ.keys
name <- ncqGetNewStateName me
writeBinaryFileDurableAtomic name (BS8.unlines [coerce k | k <- keys])
pure name
ncqMergeFull :: forall m . MonadUnliftIO m => NCQStorage2 -> m ()
ncqMergeFull me = fix \next -> ncqMergeStep me >>= \case
False -> none
True -> next
-- FIXME: sometime-causes-no-such-file-or-directory
ncqMergeStep :: MonadUnliftIO m => NCQStorage2 -> m Bool
ncqMergeStep ncq@NCQStorage2{..} = do
withSem ncqMergeSem $ ncqRunTask ncq False $ flip runContT pure $ liftIO do
debug "ncqMergeStep"
tracked <- readTVarIO ncqTrackedFiles <&> HPSQ.toList
files <- for tracked $ \(f,p,e) -> do
let fn = ncqGetFileName ncq (toFileName $ DataFile f)
let idx = ncqGetFileName ncq (toFileName $ IndexFile f)
dataHere <- doesFileExist fn
sz <- case e of
Just PendingEntry -> pure (-1)
_ | dataHere -> liftIO (fileSize fn)
| otherwise -> pure (-1)
idxHere <- doesFileExist idx
pure (f, sz, p, idxHere)
let bothOk (_, sz1, _, here1) (_, sz2, _, here2) =
here1 && here2
&& sz1 > 0 && sz2 > 0
&& (sz1 + sz2) < fromIntegral ncqMaxLog
let found = flip fix (files, Nothing, Nothing) $ \next -> \case
([], _, r) -> r
(a:b:rest, Nothing, _) | bothOk a b -> do
next (b:rest, Just (view _2 a + view _2 b), Just (a,b))
(a:b:rest, Just s, _ ) | bothOk a b && view _2 a + view _2 b < s -> do
next (b:rest, Just (view _2 a + view _2 b), Just (a,b))
(_:rest, s, r) -> do
next (rest, s, r)
case found of
Just (a,b) -> mergeStep a b >> pure True
_ -> pure False
where
ncqGetNewMergeName :: MonadIO m => NCQStorage2 -> m FilePath
ncqGetNewMergeName n@NCQStorage2{} = do
let (p,tpl) = splitFileName (ncqGetFileName n "merge-.data")
liftIO $ emptyTempFile p tpl
mergeStep (a,_,p1,_) (b,_,p2,_) = do
debug $ "merge" <+> pretty a <+> pretty b
let fDataNameA = ncqGetFileName ncq $ toFileName (DataFile a)
let fIndexNameA = ncqGetFileName ncq $ toFileName (IndexFile a)
let fDataNameB = ncqGetFileName ncq $ toFileName (DataFile b)
let fIndexNameB = ncqGetFileName ncq $ toFileName (IndexFile b)
debug $ "file A" <+> pretty (timeSpecFromFilePrio p1) <+> pretty fDataNameA <+> pretty fIndexNameA
debug $ "file B" <+> pretty (timeSpecFromFilePrio p2) <+> pretty fDataNameB <+> pretty fIndexNameB
doesFileExist fDataNameA `orFail` ("not exist" <+> pretty fDataNameA)
doesFileExist fDataNameB `orFail` ("not exist" <+> pretty fDataNameB)
doesFileExist fIndexNameA `orFail` ("not exist" <+> pretty fIndexNameA)
flip runContT pure $ callCC \exit -> do
mfile <- ncqGetNewMergeName ncq
ContT $ bracket none $ const do
rm mfile
liftIO $ withBinaryFileAtomic mfile WriteMode $ \fwh -> do
debug $ "merge: okay, good to go" <+> pretty (takeFileName mfile)
(mmIdx, nway) <- nwayHashMMapReadOnly fIndexNameA
>>= orThrow (NCQMergeInvariantFailed (show $ "can't mmap" <+> pretty fIndexNameA))
debug $ "SCAN FILE A" <+> pretty fDataNameA
writeFiltered ncq fDataNameA fwh $ \_ _ _ _ -> do
pure True
debug $ "SCAN FILE B" <+> pretty fDataNameA
writeFiltered ncq fDataNameB fwh $ \_ _ k _ -> do
foundInA <- liftIO (nwayHashLookup nway mmIdx (coerce k)) <&> isJust
let skip = foundInA
pure $ not skip
appendTailSection =<< handleToFd fwh
liftIO do
result <- fileSize mfile
idx <- if result == 0 then
pure Nothing
else do
fossil <- ncqGetNewFossilName ncq
mv mfile fossil
statA <- getFileStatus fDataNameA
let ts = modificationTimeHiRes statA
setFileTimesHiRes fossil ts ts
let fk = DataFile (fromString fossil)
void $ ncqIndexFile ncq fk
pure $ Just (ts,fk)
atomically do
modifyTVar ncqTrackedFiles (HPSQ.delete a)
modifyTVar ncqTrackedFiles (HPSQ.delete b)
for_ idx $ \(ts,fk) -> do
ncqAddTrackedFileSTM ncq (coerce fk) (posixToTimeSpec ts)
for_ idx $ \(ts,DataFile fk) -> do
void $ ncqStateUpdate ncq [D a, D b, F (posixToTimeSpec ts) fk]
orFail what e = do
r <- what
unless r (throwIO (NCQMergeInvariantFailed (show e)))
ncqCompact :: MonadUnliftIO m => NCQStorage2 -> m ()
ncqCompact ncq@NCQStorage2{..} = do
q <- newTVarIO ( mempty :: HashMap FileKey (HashSet HashRef) )
ncqLinearScanForCompact ncq $ \fk h -> atomically do
modifyTVar q (HM.insertWith (<>) fk (HS.singleton h))
state0 <- readTVarIO q
for_ (HM.toList state0) $ \(fk, es) -> do
trace $ "TO DELETE" <+> pretty fk <+> pretty (HS.size es)
let fDataNameA = ncqGetFileName ncq (toFileName $ DataFile fk)
let fIndexNameA = ncqGetFileName ncq (toFileName (IndexFile fk))
flip runContT pure do
mfile <- ncqGetNewCompactName ncq
ContT $ bracket none $ const do
rm mfile
liftIO do
withBinaryFileAtomic mfile WriteMode $ \fwh -> do
writeFiltered ncq fDataNameA fwh $ \_ _ k v -> do
pure $ not $ HS.member k es
appendTailSection =<< handleToFd fwh
result <- fileSize mfile
if result == 0 then do
atomically $ modifyTVar ncqTrackedFiles (HPSQ.delete fk)
else do
fossil <- ncqGetNewFossilName ncq
mv mfile fossil
statA <- getFileStatus fDataNameA
let ts = modificationTimeHiRes statA
setFileTimesHiRes fossil ts ts
fname <- ncqIndexFile ncq (DataFile (fromString fossil))
atomically do
let fp = fromString fname
modifyTVar ncqTrackedFiles (HPSQ.delete fk)
ncqAddTrackedFileSTM ncq fp (posixToTimeSpec ts)
mapM_ rm [fDataNameA, fIndexNameA]
debug $ "compact done" <+> pretty (HM.size state0)
-- NOTE: incremental
-- now it may became incremental if we'll
-- limit amount of tombs per one pass
-- then remove all dead entries,
-- then call again to remove tombs. etc
-- as for now, seems it should work up to 10TB
-- of storage
ncqLinearScanForCompact :: MonadUnliftIO m
=> NCQStorage2
-> ( FileKey -> HashRef -> m () )
-> m Int
ncqLinearScanForCompact ncq@NCQStorage2{..} action = flip runContT pure do
tracked <- readTVarIO ncqTrackedFiles <&> HPSQ.toList
let state0 = mempty :: HashMap HashRef TimeSpec
profit <- newTVarIO 0
tombUse <- newTVarIO (mempty :: HashMap HashRef (FileKey, Int))
-- TODO: explicit-unmap-files
flip fix (tracked, state0) $ \next -> \case
([], s) -> none
((fk,p,_):rest, state) -> do
let cqFile = ncqGetFileName ncq (toFileName (IndexFile fk))
let dataFile = ncqGetFileName ncq (toFileName (DataFile fk))
(mmaped,meta@NWayHash{..}) <- liftIO $ nwayHashMMapReadOnly cqFile
>>= orThrow (NWayHashInvalidMetaData cqFile)
let emptyKey = BS.replicate nwayKeySize 0
found <- S.toList_ do
nwayHashScanAll meta mmaped $ \o k entryBs -> do
unless (k == emptyKey) do
let off = N.word64 (BS.take 8 entryBs)
let sz = N.word32 (BS.take 4 (BS.drop 8 entryBs))
when (sz == ncqPrefixLen || sz == ncqPrefixLen + 32) do
S.yield off
let kk = coerce k
case HM.lookup kk state of
Just ts | ts > timeSpecFromFilePrio p -> do
notice $ pretty kk <+> pretty (sz + ncqSLen)
atomically do
modifyTVar profit ( + (sz + ncqSLen) )
modifyTVar tombUse (HM.adjust (over _2 succ) kk)
lift $ lift $ action (fromString dataFile) kk
_ -> none
newEntries <- S.toList_ do
unless (List.null found) do
dataBs <- liftIO $ mmapFileByteString dataFile Nothing
for_ found $ \o -> do
let pre = BS.take (fromIntegral ncqPrefixLen) (BS.drop (ncqDataOffset o) dataBs)
when (pre == ncqRefPrefix || pre == ncqTombPrefix) do
let keyBs = BS.take ncqKeyLen (BS.drop (fromIntegral o + ncqSLen) dataBs)
let key = coerce (BS.copy keyBs)
unless (HM.member key state) do
S.yield (key, timeSpecFromFilePrio p)
when ( pre == ncqTombPrefix ) do
atomically $ modifyTVar tombUse (HM.insert key (fk,0))
next (rest, state <> HM.fromList newEntries)
use <- readTVarIO tombUse
let useless = [ (f,h) | (h, (f,n)) <- HM.toList use, n == 0 ]
for_ useless $ \(f,h) -> do
atomically $ modifyTVar profit (+ncqFullTombLen)
lift $ action f h
readTVarIO profit <&> fromIntegral
ncqReadStateKeys :: forall m . MonadUnliftIO m => NCQStorage2 -> FilePath -> m [FileKey]
ncqReadStateKeys me path = liftIO do
keys <- BS8.readFile (ncqGetFileName me path)
<&> filter (not . BS8.null) . BS8.lines
pure $ fmap (coerce @_ @FileKey) keys
ncqSweepFossils :: forall m . MonadUnliftIO m => NCQStorage2 -> m ()
ncqSweepFossils me@NCQStorage2{..} = withSem ncqMergeSem do
debug $ yellow "sweep fossils"
-- better be safe than sorry
current <- readTVarIO ncqCurrentFiles
-- >>= mapM (try @_ @IOException . ncqReadStateKeys me)
mentioned <- ncqListStateFiles me <&> fmap snd
>>= mapM (ncqReadStateKeys me)
<&> HS.fromList . mconcat
sfs <- ncqListStateFiles me <&> fmap snd
debug $ "STATE FILES" <+> vcat (fmap pretty sfs)
active <- readTVarIO ncqTrackedFiles <&> HS.fromList . HPSQ.keys
used' <- readTVarIO ncqStateUsage <&> IntMap.elems
let used = current
<> active
<> mentioned
<> HS.unions [ keys | (n, keys) <- used', n > 0 ]
kicked <- ncqListTrackedFiles me
<&> fmap (fromString @FileKey)
<&> filter (\x -> not (HS.member x used))
<&> HS.toList . HS.fromList
debug $ "KICK" <+> vcat (fmap pretty kicked)
debug $ "LIVE SET" <+> vcat (fmap pretty (HS.toList used))
for_ kicked $ \fo -> do
debug $ "sweep fossil file" <+> pretty fo
rm (ncqGetFileName me (toFileName (IndexFile fo)))
rm (ncqGetFileName me (toFileName (DataFile fo)))
ncqSweepStates :: MonadUnliftIO m => NCQStorage2 -> m ()
ncqSweepStates me@NCQStorage2{..} = withSem ncqMergeSem $ flip runContT pure do
debug $ yellow "remove unused states"
current' <- readTVarIO ncqStateName
current <- ContT $ for_ current'
states <- ncqListStateFiles me <&> fmap snd
flip fix (Left states) $ \next -> \case
Left [] -> none
Right [] -> none
Left (x:xs) | x == current -> next (Right xs)
| otherwise -> next (Left xs)
Right (x:xs) -> do
debug $ "Remove obsolete state" <+> pretty x
rm (ncqGetFileName me x)
next (Right xs)
writeFiltered :: forall m . MonadIO m
=> NCQStorage2
-> FilePath
-> Handle
-> ( Integer -> Integer -> HashRef -> ByteString -> m Bool)
-> m ()
writeFiltered ncq fn out filt = do
ncqStorageScanDataFile ncq fn $ \o s k v -> do
skip <- filt o s k v <&> not
when skip do
debug $ pretty k <+> pretty "skipped"
unless skip $ liftIO do
BS.hPut out (LBS.toStrict (makeEntryLBS k v))
where
makeEntryLBS h bs = do
let b = byteString (coerce @_ @ByteString h)
<> byteString bs
let wbs = toLazyByteString b
let len = LBS.length wbs
let ws = byteString (N.bytestring32 (fromIntegral len))
toLazyByteString (ws <> b)
zeroSyncEntry :: ByteString
zeroSyncEntry = ncqMakeSectionBS (Just B) zeroHash zeroPayload
where zeroPayload = N.bytestring64 0
zeroHash = HashRef (hashObject zeroPayload)
{-# INLINE zeroSyncEntry #-}
zeroSyncEntrySize :: Word64
zeroSyncEntrySize = fromIntegral (BS.length zeroSyncEntry)
{-# INLINE zeroSyncEntrySize #-}
-- 1. It's M-record
-- 2. It's last w64be == fileSize
-- 3. It's hash == hash (bytestring64be fileSize)
-- 4. recovery-strategy: start-to-end, end-to-start
fileTailRecord :: Integral a => a -> ByteString
fileTailRecord w = do
-- on open: last w64be == fileSize
let paylo = N.bytestring64 (fromIntegral w + zeroSyncEntrySize)
let h = hashObject @HbSync paylo & coerce
ncqMakeSectionBS (Just M) h paylo
{-# INLINE fileTailRecord #-}
appendSection :: forall m . MonadUnliftIO m
=> Fd
-> ByteString
-> m Int -- (FOff, Int)
appendSection fh sect = do
-- off <- liftIO $ fdSeek fh SeekFromEnd 0
-- pure (fromIntegral off, fromIntegral len)
liftIO (Posix.fdWrite fh sect) <&> fromIntegral
{-# INLINE appendSection #-}
appendTailSection :: MonadIO m => Fd -> m ()
appendTailSection fh = liftIO do
s <- Posix.fileSize <$> Posix.getFdStatus fh
void (appendSection fh (fileTailRecord s))
{-# INLINE appendTailSection #-}
withSem :: forall a m . MonadUnliftIO m => TSem -> m a -> m a
withSem sem m = bracket enter leave (const m)
where enter = atomically (waitTSem sem)
leave = const $ atomically (signalTSem sem)