mirror of https://github.com/voidlizard/hbs2
1592 lines
50 KiB
Haskell
1592 lines
50 KiB
Haskell
{-# Language MultiWayIf #-}
|
|
{-# Language RecordWildCards #-}
|
|
{-# Language PatternSynonyms #-}
|
|
module HBS2.Storage.NCQ2
|
|
( module HBS2.Storage.NCQ2
|
|
, module HBS2.Storage.NCQ.Types
|
|
)
|
|
where
|
|
|
|
import HBS2.Prelude.Plated
|
|
import HBS2.Hash
|
|
import HBS2.OrDie
|
|
import HBS2.Data.Types.Refs
|
|
import HBS2.Base58
|
|
import HBS2.Net.Auth.Credentials
|
|
import HBS2.Storage
|
|
import HBS2.Misc.PrettyStuff
|
|
import HBS2.System.Logger.Simple.ANSI
|
|
|
|
import HBS2.Data.Log.Structured.NCQ
|
|
import HBS2.Data.Log.Structured.SD
|
|
|
|
import HBS2.Storage.NCQ.Types
|
|
|
|
import Data.Config.Suckless.System
|
|
import Data.Config.Suckless.Script hiding (void)
|
|
|
|
import Codec.Compression.Zstd qualified as Zstd
|
|
import Codec.Compression.Zstd.Lazy as ZstdL
|
|
import Codec.Compression.Zstd.Streaming qualified as ZstdS
|
|
import Codec.Compression.Zstd.Streaming (Result(..))
|
|
|
|
import Numeric (showHex)
|
|
import Control.Applicative
|
|
import Data.ByteString.Builder
|
|
import Network.ByteOrder qualified as N
|
|
import Data.Bit.ThreadSafe qualified as BV
|
|
import Data.HashMap.Strict (HashMap)
|
|
import Control.Monad.Except
|
|
import Control.Monad.Trans.Cont
|
|
import Control.Monad.Trans.Maybe
|
|
import Data.Time.Clock.POSIX
|
|
import Data.Ord (Down(..),comparing)
|
|
import Control.Concurrent.STM qualified as STM
|
|
import Control.Concurrent.STM.TSem
|
|
import Data.Hashable (hash)
|
|
import Data.HashPSQ qualified as HPSQ
|
|
import Data.HashPSQ (HashPSQ)
|
|
import Data.IntMap qualified as IntMap
|
|
import Data.IntMap (IntMap)
|
|
import Data.IntSet qualified as IntSet
|
|
import Data.IntSet (IntSet)
|
|
import Data.Sequence qualified as Seq
|
|
import Data.Sequence (Seq(..), (|>),(<|))
|
|
import Data.List qualified as List
|
|
import Data.ByteString.Lazy qualified as LBS
|
|
import Data.ByteString.Lazy.Char8 qualified as LBS8
|
|
import Data.ByteString (ByteString)
|
|
import Data.ByteString qualified as BS
|
|
import Data.ByteString.Char8 qualified as BS8
|
|
import Data.Char (isDigit)
|
|
import Data.Fixed
|
|
import Data.Coerce
|
|
import Data.Word
|
|
import Data.Either
|
|
import Data.Maybe
|
|
import Data.Text qualified as Text
|
|
import Data.Text.IO qualified as Text
|
|
import Data.Int
|
|
import Data.Vector qualified as V
|
|
import Data.Vector (Vector, (!))
|
|
import Lens.Micro.Platform
|
|
import Data.HashSet (HashSet)
|
|
import Data.HashSet qualified as HS
|
|
import Data.HashMap.Strict qualified as HM
|
|
import System.FilePath.Posix
|
|
import System.Posix.Fcntl
|
|
import System.Posix.Files qualified as Posix
|
|
import System.Posix.IO as PosixBase
|
|
import System.Posix.Types as Posix
|
|
import System.Posix.IO.ByteString as Posix
|
|
import System.Posix.Unistd
|
|
import System.Posix.Files ( getFileStatus
|
|
, modificationTimeHiRes
|
|
, setFileTimesHiRes
|
|
, getFdStatus
|
|
, FileStatus(..)
|
|
, setFileMode
|
|
)
|
|
import System.Posix.Files qualified as PFS
|
|
import System.IO.Error (catchIOError)
|
|
import System.IO.MMap as MMap
|
|
import System.IO.Temp (emptyTempFile)
|
|
import System.Mem
|
|
-- import Foreign.Ptr
|
|
-- import Foreign di
|
|
import qualified Data.ByteString.Internal as BSI
|
|
import Streaming.Prelude qualified as S
|
|
|
|
import UnliftIO
|
|
import UnliftIO.Concurrent(getNumCapabilities)
|
|
import UnliftIO.IO.File
|
|
|
|
-- FIXME: ASAP-USE-FILE-LOCK
|
|
import System.FileLock as FL
|
|
|
|
type FOff = Word64
|
|
|
|
data NCQEntry =
|
|
NCQEntry
|
|
{ ncqEntryData :: !ByteString
|
|
, ncqDumped :: !(TVar (Maybe FileKey))
|
|
}
|
|
|
|
type Shard = TVar (HashMap HashRef NCQEntry)
|
|
|
|
type NCQOffset = Word64
|
|
type NCQSize = Word32
|
|
|
|
type StateVersion = Word64
|
|
|
|
data StateOP = D FileKey | F TimeSpec FileKey | P FileKey
|
|
deriving (Eq,Ord,Show)
|
|
|
|
data NCQFlag =
|
|
NCQMergeNow | NCQCompactNow
|
|
deriving (Eq,Ord,Generic)
|
|
|
|
data Location =
|
|
InFossil {-# UNPACK #-} !FileKey !ByteString !NCQOffset !NCQSize
|
|
| InMemory {-# UNPACK #-} !ByteString
|
|
|
|
instance Pretty Location where
|
|
pretty = \case
|
|
InFossil k _ o s -> parens $ "in-fossil" <+> pretty k <+> pretty o <+> pretty s
|
|
InMemory _ -> "in-memory"
|
|
|
|
data TrackedFile =
|
|
TrackedFile
|
|
{ tfTime :: FilePrio
|
|
, tfKey :: FileKey
|
|
, tfCached :: TVar (Maybe CachedEntry)
|
|
}
|
|
|
|
type TrackedFiles = Vector TrackedFile
|
|
|
|
data NCQStorage2 =
|
|
NCQStorage2
|
|
{ ncqRoot :: FilePath
|
|
, ncqGen :: Int
|
|
, ncqSalt :: HashRef
|
|
, ncqPostponeMerge :: Timeout 'Seconds
|
|
, ncqPostponeSweep :: Timeout 'Seconds
|
|
, ncqLuckyNum :: Int
|
|
, ncqFsync :: Int
|
|
, ncqWriteQLen :: Int
|
|
, ncqWriteBlock :: Int
|
|
, ncqMinLog :: Int
|
|
, ncqMaxLog :: Int
|
|
, ncqMaxCached :: Int
|
|
, ncqIdleThrsh :: Double
|
|
, ncqMemTable :: Vector Shard
|
|
, ncqWriteQ :: TVar (Seq HashRef)
|
|
, ncqWriteOps :: Vector (TQueue (IO ()))
|
|
, ncqReadReq :: TQueue (HashRef, TMVar (Maybe Location))
|
|
, ncqStorageTasks :: TVar Int
|
|
, ncqStorageStopReq :: TVar Bool
|
|
, ncqStorageSyncReq :: TVar Bool
|
|
, ncqMergeReq :: TVar Bool
|
|
, ncqMergeSem :: TSem
|
|
, ncqSyncNo :: TVar Int
|
|
, ncqCurrentFiles :: TVar (HashSet FileKey)
|
|
, ncqTrackedFiles :: TVar TrackedFiles
|
|
, ncqStateVersion :: TVar StateVersion
|
|
, ncqStateUsage :: TVar (IntMap (Int, HashSet FileKey))
|
|
, ncqStateName :: TVar (Maybe StateFile)
|
|
, ncqStateSem :: TSem
|
|
, ncqCachedEntries :: TVar Int
|
|
, ncqWrites :: TVar Int
|
|
, ncqWriteEMA :: TVar Double -- for writes-per-seconds
|
|
, ncqJobQ :: TQueue (IO ())
|
|
, ncqMiscSem :: TSem
|
|
, ncqSweepSem :: TSem
|
|
, ncqMergeTasks :: TVar Int
|
|
, ncqOnRunWriteIdle :: TVar (IO ())
|
|
|
|
}
|
|
|
|
megabytes :: forall a . Integral a => a
|
|
megabytes = 1024 ^ 2
|
|
|
|
gigabytes :: forall a . Integral a => a
|
|
gigabytes = 1024 ^ 3
|
|
|
|
ncqStorageOpen2 :: MonadIO m => FilePath -> (NCQStorage2 -> NCQStorage2)-> m NCQStorage2
|
|
ncqStorageOpen2 fp upd = do
|
|
let ncqRoot = fp
|
|
let ncqGen = 0
|
|
let ncqFsync = 16 * megabytes
|
|
let ncqWriteQLen = 1024 * 4
|
|
let ncqMinLog = 512 * megabytes
|
|
let ncqMaxLog = 16 * gigabytes -- ???
|
|
let ncqWriteBlock = max 128 $ ncqWriteQLen `div` 2
|
|
let ncqMaxCached = 128
|
|
let ncqIdleThrsh = 50.00
|
|
let ncqPostponeMerge = 600.00
|
|
let ncqPostponeSweep = 2 * ncqPostponeMerge
|
|
let ncqLuckyNum = 2
|
|
|
|
let shardNum = ncqLuckyNum * 2
|
|
let wopNum = ncqLuckyNum
|
|
|
|
cap <- getNumCapabilities <&> fromIntegral
|
|
ncqWriteQ <- newTVarIO mempty
|
|
ncqMemTable <- V.fromList <$> replicateM shardNum (newTVarIO mempty)
|
|
ncqStorageStopReq <- newTVarIO False
|
|
ncqStorageSyncReq <- newTVarIO False
|
|
ncqMergeReq <- newTVarIO False
|
|
ncqMergeSem <- atomically (newTSem 1)
|
|
ncqSyncNo <- newTVarIO 0
|
|
ncqCurrentFiles <- newTVarIO mempty
|
|
ncqTrackedFiles <- newTVarIO V.empty
|
|
ncqStateVersion <- newTVarIO 0
|
|
ncqStateUsage <- newTVarIO mempty
|
|
ncqStateName <- newTVarIO Nothing
|
|
ncqStateSem <- atomically $ newTSem 1
|
|
ncqCachedEntries <- newTVarIO 0
|
|
ncqStorageTasks <- newTVarIO 0
|
|
ncqWrites <- newTVarIO 0
|
|
ncqWriteEMA <- newTVarIO 0.00
|
|
ncqJobQ <- newTQueueIO
|
|
ncqMiscSem <- atomically (newTSem 1)
|
|
ncqSweepSem <- atomically (newTSem 1)
|
|
ncqMergeTasks <- newTVarIO 0
|
|
ncqOnRunWriteIdle <- newTVarIO none
|
|
|
|
ncqReadReq <- newTQueueIO
|
|
|
|
ncqWriteOps <- replicateM wopNum newTQueueIO <&> V.fromList
|
|
|
|
let ncqSalt = "EstEFasxrCFqsGDxcY4haFcha9e4ZHRzsPbGUmDfdxLk"
|
|
|
|
let ncq = NCQStorage2{..} & upd
|
|
|
|
mkdir (ncqGetWorkDir ncq)
|
|
|
|
liftIO $ withSem ncqMergeSem do
|
|
ncqRepair ncq
|
|
ncqPreloadIndexes ncq
|
|
ncqSweepStates ncq
|
|
|
|
pure ncq
|
|
|
|
|
|
ncqWithStorage :: MonadUnliftIO m => FilePath -> ( NCQStorage2 -> m a ) -> m a
|
|
ncqWithStorage fp action = flip runContT pure do
|
|
sto <- lift (ncqStorageOpen2 fp id)
|
|
w <- ContT $ withAsync (ncqStorageRun2 sto)
|
|
link w
|
|
r <- lift (action sto)
|
|
lift (ncqStorageStop2 sto)
|
|
wait w
|
|
pure r
|
|
|
|
ncqGetFileName :: NCQStorage2 -> FilePath -> FilePath
|
|
ncqGetFileName ncq fp = ncqGetWorkDir ncq </> takeFileName fp
|
|
|
|
ncqGetWorkDir :: NCQStorage2 -> FilePath
|
|
ncqGetWorkDir NCQStorage2{..} = ncqRoot </> show ncqGen
|
|
|
|
ncqGetLockFileName :: NCQStorage2 -> FilePath
|
|
ncqGetLockFileName ncq = ncqGetFileName ncq ".lock"
|
|
|
|
ncqNewUniqFileName :: MonadIO m => NCQStorage2 -> FilePath -> FilePath -> m FilePath
|
|
ncqNewUniqFileName me@NCQStorage2{..} pref suff = liftIO $ withSem ncqMiscSem do
|
|
flip fix 0 $ \next i -> do
|
|
t <- round @_ @Integer . (* 1e9) <$> getPOSIXTime
|
|
let v = show $ pretty (showHex t "") <> "-" <> pretty (showHex i "")
|
|
let n = ncqGetFileName me (pref <> v <> suff)
|
|
doesFileExist n >>= \case
|
|
False -> pure n
|
|
True -> next (succ i)
|
|
|
|
ncqGetNewFossilName :: MonadIO m => NCQStorage2 -> m FilePath
|
|
ncqGetNewFossilName me = ncqNewUniqFileName me "fossil-" ".data"
|
|
|
|
ncqGetNewStateName :: MonadIO m => NCQStorage2 -> m FilePath
|
|
ncqGetNewStateName me = ncqNewUniqFileName me "state-" ""
|
|
|
|
ncqGetNewCompactName :: MonadIO m => NCQStorage2 -> m FilePath
|
|
ncqGetNewCompactName me = ncqNewUniqFileName me "compact-" ".data"
|
|
|
|
ncqStorageStop2 :: MonadUnliftIO m => NCQStorage2 -> m ()
|
|
ncqStorageStop2 NCQStorage2{..} = do
|
|
atomically $ writeTVar ncqStorageStopReq True
|
|
|
|
ncqStorageSync2 :: MonadUnliftIO m => NCQStorage2 -> m ()
|
|
ncqStorageSync2 NCQStorage2{..} = do
|
|
atomically $ writeTVar ncqStorageSyncReq True
|
|
|
|
ncqShardIdx :: NCQStorage2 -> HashRef -> Int
|
|
ncqShardIdx NCQStorage2{..} h =
|
|
fromIntegral (BS.head (coerce h)) `mod` V.length ncqMemTable
|
|
{-# INLINE ncqShardIdx #-}
|
|
|
|
ncqGetShard :: NCQStorage2 -> HashRef -> Shard
|
|
ncqGetShard ncq@NCQStorage2{..} h = ncqMemTable ! ncqShardIdx ncq h
|
|
{-# INLINE ncqGetShard #-}
|
|
|
|
ncqLookupEntrySTM :: NCQStorage2 -> HashRef -> STM (Maybe NCQEntry)
|
|
ncqLookupEntrySTM ncq h = readTVar (ncqGetShard ncq h) <&> HM.lookup h
|
|
|
|
ncqAlterEntrySTM :: NCQStorage2
|
|
-> HashRef
|
|
-> (Maybe NCQEntry -> Maybe NCQEntry)
|
|
-> STM ()
|
|
ncqAlterEntrySTM ncq h alterFn = do
|
|
let shard = ncqGetShard ncq h
|
|
modifyTVar shard (HM.alter alterFn h)
|
|
|
|
ncqPutBS :: MonadUnliftIO m
|
|
=> NCQStorage2
|
|
-> Maybe NCQSectionType
|
|
-> Maybe HashRef
|
|
-> ByteString
|
|
-> m HashRef
|
|
ncqPutBS ncq@NCQStorage2{..} mtp mhref bs' = do
|
|
|
|
waiter <- newEmptyTMVarIO
|
|
|
|
let work = do
|
|
|
|
let h = fromMaybe (HashRef (hashObject @HbSync bs')) mhref
|
|
let bs = ncqMakeSectionBS mtp h bs'
|
|
let shard = ncqGetShard ncq h
|
|
zero <- newTVarIO Nothing
|
|
|
|
atomically do
|
|
|
|
stop <- readTVar ncqStorageStopReq
|
|
filled <- readTVar ncqWriteQ <&> Seq.length
|
|
|
|
when (not stop && filled > ncqWriteQLen) STM.retry
|
|
|
|
upd <- stateTVar shard $ flip HM.alterF h \case
|
|
Nothing -> (True, Just (NCQEntry bs zero))
|
|
Just e | ncqEntryData e /= bs -> (True, Just (NCQEntry bs zero))
|
|
| otherwise -> (False, Just e)
|
|
|
|
when upd do
|
|
modifyTVar ncqWriteQ (|> h)
|
|
|
|
putTMVar waiter h
|
|
|
|
atomically do
|
|
nw <- readTVar ncqWrites <&> (`mod` V.length ncqWriteOps)
|
|
modifyTVar ncqWrites succ
|
|
writeTQueue (ncqWriteOps ! nw) work
|
|
|
|
atomically $ takeTMVar waiter
|
|
|
|
ncqEntryUnwrap :: NCQStorage2
|
|
-> ByteString
|
|
-> (ByteString, Either ByteString (NCQSectionType, ByteString))
|
|
ncqEntryUnwrap _ source = do
|
|
let (k,v) = BS.splitAt ncqKeyLen (BS.drop 4 source)
|
|
case ncqIsMeta v of
|
|
Just meta -> (k, Right (meta, BS.drop ncqPrefixLen v))
|
|
Nothing -> (k, Left v)
|
|
{-# INLINE ncqEntryUnwrap #-}
|
|
|
|
ncqIsTomb :: NCQStorage2 -> Location -> Bool
|
|
ncqIsTomb me loc = case ncqEntryUnwrap me (ncqGetEntryBS me loc) of
|
|
(_, Right (T, _)) -> True
|
|
_ -> False
|
|
|
|
ncqDelEntry :: MonadUnliftIO m
|
|
=> NCQStorage2
|
|
-> HashRef
|
|
-> m ()
|
|
ncqDelEntry me href = do
|
|
-- всегда пишем tomb и надеемся на лучшее
|
|
-- merge/compact разберутся
|
|
-- однако не пишем, если записи еще нет
|
|
ncqLocate2 me href >>= \case
|
|
Just loc | not (ncqIsTomb me loc) -> do
|
|
void $ ncqPutBS me (Just T) (Just href) ""
|
|
|
|
_ -> none
|
|
|
|
ncqLookupEntry :: MonadUnliftIO m => NCQStorage2 -> HashRef -> m (Maybe NCQEntry)
|
|
ncqLookupEntry sto hash = atomically (ncqLookupEntrySTM sto hash)
|
|
|
|
ncqGetEntryBS :: NCQStorage2 -> Location -> ByteString
|
|
ncqGetEntryBS _ = \case
|
|
InMemory bs -> bs
|
|
InFossil _ mmap off size -> do
|
|
BS.take (fromIntegral size) $ BS.drop (fromIntegral off) mmap
|
|
|
|
ncqEntrySize :: forall a . Integral a => Location -> a
|
|
ncqEntrySize = \case
|
|
InFossil _ _ _ size -> fromIntegral size
|
|
InMemory bs -> fromIntegral (BS.length bs)
|
|
|
|
useVersion :: forall m a . MonadUnliftIO m => NCQStorage2 -> (() -> m a) -> m a
|
|
useVersion ncq m = bracket succV predV m
|
|
where
|
|
succV = atomically (ncqStateUseSTM ncq)
|
|
predV = const $ atomically (ncqStateUnuseSTM ncq)
|
|
|
|
ncqListTrackedFilesSTM :: NCQStorage2 -> STM (Vector (FileKey, Maybe CachedEntry, TVar (Maybe CachedEntry)))
|
|
ncqListTrackedFilesSTM NCQStorage2{..} = do
|
|
fs <- readTVar ncqTrackedFiles
|
|
for fs $ \TrackedFile{..} -> (tfKey,,) <$> readTVar tfCached <*> pure tfCached
|
|
|
|
ncqListTrackedFiles :: MonadUnliftIO m => NCQStorage2 -> m (Vector (FileKey, Maybe CachedEntry, TVar (Maybe CachedEntry)))
|
|
ncqListTrackedFiles = atomically . ncqListTrackedFilesSTM
|
|
|
|
|
|
ncqPreloadIndexes :: MonadUnliftIO m
|
|
=> NCQStorage2
|
|
-> m ()
|
|
ncqPreloadIndexes me@NCQStorage2{..} = useVersion me $ const do
|
|
fs <- readTVarIO ncqTrackedFiles <&> take ncqMaxCached . V.toList
|
|
flip fix (fs, ncqMaxCached) $ \next (files,lim) -> do
|
|
case files of
|
|
(t@TrackedFile{..}:rest) | lim > 0 -> do
|
|
readTVarIO tfCached >>= \case
|
|
Nothing -> do
|
|
void $ ncqLoadTrackedFile me t
|
|
next (rest, pred lim)
|
|
_ -> next (rest, lim)
|
|
|
|
_ -> none
|
|
|
|
ncqLoadTrackedFile :: MonadUnliftIO m
|
|
=> NCQStorage2
|
|
-> TrackedFile
|
|
-> m (Maybe CachedEntry)
|
|
ncqLoadTrackedFile ncq@NCQStorage2{..} TrackedFile{..} = runMaybeT do
|
|
|
|
let indexFile = ncqGetFileName ncq (toFileName (IndexFile tfKey))
|
|
let dataFile = ncqGetFileName ncq (toFileName (DataFile tfKey))
|
|
|
|
idxHere <- liftIO $ doesFileExist indexFile
|
|
unless idxHere do
|
|
liftIO $ err $ red "missed index" <+> "in loadIndex" <+> pretty tfKey
|
|
mzero
|
|
|
|
(idxBs, idxNway) <- MaybeT $
|
|
liftIO (nwayHashMMapReadOnly indexFile)
|
|
|
|
datBs <- liftIO $ mmapFileByteString dataFile Nothing
|
|
|
|
tnow <- liftIO $ newTVarIO =<< getTimeCoarse
|
|
let ce = CachedEntry idxBs datBs idxNway tnow
|
|
|
|
atomically do
|
|
writeTVar tfCached (Just ce)
|
|
modifyTVar ncqCachedEntries (+1)
|
|
evictIfNeededSTM ncq (Just 1)
|
|
|
|
pure ce
|
|
|
|
data Seek a = SeekStop !a | SeekNext !a
|
|
|
|
|
|
---
|
|
|
|
ncqSeekInFossils :: forall a f m . (MonadUnliftIO m, Monoid (f a))
|
|
=> NCQStorage2
|
|
-> HashRef
|
|
-> (Location -> m (Seek (f a)))
|
|
-> m (f a)
|
|
ncqSeekInFossils ncq@NCQStorage2{..} href action = useVersion ncq $ const do
|
|
tracked <- readTVarIO ncqTrackedFiles
|
|
let l = V.length tracked
|
|
|
|
let
|
|
go :: Int -> Int -> f a -> m (f a)
|
|
go i a r
|
|
| i >= l = pure r
|
|
| a > 1 = do
|
|
let TrackedFile{..} = tracked ! i
|
|
err $ "unable to load fossil" <+> pretty tfKey
|
|
go (i+1) 0 r
|
|
| otherwise = do
|
|
let TrackedFile{..} = tracked ! i
|
|
readTVarIO tfCached >>= \case
|
|
|
|
Just PendingEntry{} ->
|
|
go (i+1) 0 r
|
|
|
|
Nothing -> do
|
|
void $ ncqLoadTrackedFile ncq TrackedFile{..}
|
|
go i (a+1) r
|
|
|
|
Just CachedEntry{..} -> do
|
|
liftIO (ncqLookupIndex href (cachedMmapedIdx, cachedNway)) >>= \case
|
|
Nothing -> go (i+1) 0 r
|
|
Just (offset, size) -> do
|
|
now <- getTimeCoarse
|
|
atomically $ writeTVar cachedTs now
|
|
action (InFossil tfKey cachedMmapedData offset size) >>= \case
|
|
SeekStop e -> pure (r <> e)
|
|
SeekNext e -> go (i+1) 0 (r <> e)
|
|
|
|
go 0 0 mempty
|
|
|
|
|
|
ncqLookupIndex :: MonadUnliftIO m
|
|
=> HashRef
|
|
-> (ByteString, NWayHash)
|
|
-> m (Maybe ( NCQOffset, NCQSize ))
|
|
ncqLookupIndex hx (mmaped, nway) = do
|
|
fmap decodeEntry <$> nwayHashLookup nway mmaped (coerce hx)
|
|
where
|
|
{-# INLINE decodeEntry #-}
|
|
decodeEntry entryBs = do
|
|
let (p,r) = BS.splitAt 8 entryBs
|
|
let off = fromIntegral (N.word64 p)
|
|
let size = fromIntegral (N.word32 (BS.take 4 r))
|
|
( off, size )
|
|
{-# INLINE ncqLookupIndex #-}
|
|
|
|
ncqLocateActually :: MonadUnliftIO m => NCQStorage2 -> HashRef -> m (Maybe Location)
|
|
ncqLocateActually ncq href = do
|
|
inMem <- ncqLookupEntry ncq href <&> fmap (InMemory . ncqEntryData)
|
|
inFo <- listToMaybe <$> ncqSeekInFossils ncq href \loc -> pure (SeekStop [loc])
|
|
pure $ inMem <|> inFo
|
|
|
|
ncqLocate2 :: MonadUnliftIO m => NCQStorage2 -> HashRef -> m (Maybe Location)
|
|
ncqLocate2 NCQStorage2{..} href = do
|
|
answ <- newEmptyTMVarIO
|
|
atomically $ writeTQueue ncqReadReq (href, answ)
|
|
atomically $ takeTMVar answ
|
|
|
|
data RunSt =
|
|
RunNew
|
|
| RunWrite (FileKey, Fd, Int, Int)
|
|
| RunSync (FileKey, Fd, Int, Int, Bool)
|
|
| RunFin
|
|
|
|
ncqStorageRun2 :: forall m . MonadUnliftIO m
|
|
=> NCQStorage2
|
|
-> m ()
|
|
ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do
|
|
|
|
closeQ <- newTQueueIO
|
|
|
|
closer <- spawnActivity $ liftIO $ fix \loop -> do
|
|
what <- atomically do
|
|
stop <- readTVar ncqStorageStopReq
|
|
tryReadTQueue closeQ >>= \case
|
|
Just e -> pure $ Just e
|
|
Nothing | not stop -> STM.retry
|
|
| otherwise -> pure Nothing
|
|
|
|
maybe1 what none $ \(fk, fh) -> do
|
|
debug $ red "CLOSE FILE" <+> pretty fk
|
|
closeFd fh
|
|
debug $ yellow "indexing" <+> pretty fk
|
|
idx <- ncqRunTaskNoMatterWhat ncq (ncqIndexFile ncq (DataFile fk))
|
|
ncqRunTaskNoMatterWhat ncq $ ncqStateUpdate ncq [F 0 fk]
|
|
debug $ "REMOVE ALL SHIT" <+> pretty idx
|
|
nwayHashMMapReadOnly idx >>= \case
|
|
Nothing -> err $ "can't open index" <+> pretty idx
|
|
Just (bs,nway) -> do
|
|
nwayHashScanAll nway bs $ \_ k _ -> do
|
|
unless (k == emptyKey) $ atomically $ void $ runMaybeT do
|
|
NCQEntry _ tfk <- MaybeT $ ncqLookupEntrySTM ncq (coerce k)
|
|
fk' <- MaybeT $ readTVar tfk
|
|
guard (fk == fk') -- remove only own stuff
|
|
lift $ ncqAlterEntrySTM ncq (coerce k) (const Nothing)
|
|
|
|
ncqPreloadIndexes ncq
|
|
atomically (modifyTVar ncqCurrentFiles (HS.delete fk))
|
|
loop
|
|
|
|
spawnActivity $ forever (liftIO $ join $ atomically (readTQueue ncqJobQ))
|
|
|
|
replicateM_ 2 $ spawnActivity $ fix \next -> do
|
|
(h, answ) <- atomically $ readTQueue ncqReadReq
|
|
|
|
let answer l = atomically (putTMVar answ l)
|
|
|
|
let lookupCached fk = \case
|
|
PendingEntry{} -> none
|
|
CachedEntry{..} -> do
|
|
ncqLookupIndex h (cachedMmapedIdx, cachedNway) >>= \case
|
|
Nothing -> none
|
|
Just (!offset,!size) -> do
|
|
answer (Just (InFossil fk cachedMmapedData offset size))
|
|
next
|
|
{-# INLINE lookupCached #-}
|
|
|
|
ncqLookupEntry ncq h >>= \case
|
|
Nothing -> none
|
|
Just e -> answer (Just (InMemory (ncqEntryData e))) >> next
|
|
|
|
useVersion ncq $ const do
|
|
|
|
tracked <- readTVarIO ncqTrackedFiles
|
|
|
|
for_ tracked $ \(TrackedFile{..}) -> do
|
|
readTVarIO tfCached >>= \case
|
|
Just ce -> lookupCached tfKey ce
|
|
Nothing -> ncqLoadTrackedFile ncq TrackedFile{..} >>= \case
|
|
Nothing -> err $ "unable to load index" <+> pretty tfKey
|
|
Just ce -> lookupCached tfKey ce
|
|
|
|
next
|
|
|
|
let shLast = V.length ncqWriteOps - 1
|
|
spawnActivity $ pooledForConcurrentlyN_ (V.length ncqWriteOps) [0..shLast] $ \i -> do
|
|
let q = ncqWriteOps ! i
|
|
forever (liftIO $ join $ atomically (readTQueue q))
|
|
|
|
spawnActivity measureWPS
|
|
|
|
-- FIXME: bigger-period
|
|
spawnActivity $ postponed ncqPostponeSweep $ forever $ (>> pause @'Seconds 120) $ do
|
|
ema <- readTVarIO ncqWriteEMA
|
|
n <- ncqListStateFiles ncq <&> List.length
|
|
when (ema < ncqIdleThrsh * 1.5 && n > 0) $ withSem ncqMergeSem do
|
|
debug $ yellow "run sweep routine"
|
|
ncqSweepStates ncq
|
|
ncqSweepFossils ncq
|
|
|
|
spawnActivity $ postponed ncqPostponeMerge $ fix \again -> (>> again) do
|
|
ema <- readTVarIO ncqWriteEMA
|
|
mergeReq <- atomically $ stateTVar ncqMergeReq (,False)
|
|
|
|
debug $ green "MERGE ATTEMPT" <+> pretty ema <+> "~" <+> pretty ncqIdleThrsh
|
|
|
|
let notPending x = List.length [ k | (k,e,_) <- V.toList x, isNotPending e ]
|
|
|
|
if ema > ncqIdleThrsh && not mergeReq then do
|
|
pause @'Seconds 10
|
|
|
|
else do
|
|
mq <- newEmptyTMVarIO
|
|
|
|
spawnJob $ do
|
|
merged <- ncqMergeStep ncq
|
|
atomically $ putTMVar mq merged
|
|
|
|
-- TODO: detect-dead-merge
|
|
void $ race (pause @'Seconds 300) (atomically $ readTMVar mq) >>= \case
|
|
Left{} -> warn $ yellow "MERGE FUCKING STALLED"
|
|
Right True -> none
|
|
Right False -> do
|
|
|
|
debug "merge: all done, wait..."
|
|
n0 <- ncqListTrackedFiles ncq <&> notPending
|
|
|
|
-- FIXME: bigger-timeout
|
|
void $ race (pause @'Seconds 60) do
|
|
atomically do
|
|
n <- ncqListTrackedFilesSTM ncq <&> notPending
|
|
when (n == n0) STM.retry
|
|
|
|
|
|
ContT $ bracket none $ const $ liftIO do
|
|
fhh <- atomically (STM.flushTQueue closeQ)
|
|
for_ fhh ( closeFd . snd )
|
|
|
|
flip fix RunNew $ \loop -> \case
|
|
|
|
RunFin -> do
|
|
debug "wait finalizing"
|
|
atomically $ pollSTM closer >>= maybe STM.retry (const none)
|
|
debug "exit storage"
|
|
|
|
RunNew -> do
|
|
stop <- readTVarIO ncqStorageStopReq
|
|
mt <- readTVarIO ncqWriteQ <&> Seq.null
|
|
|
|
if stop && mt then do
|
|
loop RunFin
|
|
else do
|
|
(fk,fhx) <- openNewDataFile
|
|
liftIO (ncqStateUpdate ncq [P fk])
|
|
debug $ "openNewDataFile" <+> pretty fk
|
|
loop $ RunWrite (fk,fhx,0,0)
|
|
|
|
RunSync (fk, fh, w, total, continue) -> do
|
|
|
|
stop <- readTVarIO ncqStorageStopReq
|
|
sync <- readTVarIO ncqStorageSyncReq
|
|
|
|
let needClose = total >= ncqMinLog || stop
|
|
|
|
rest <- if not (sync || needClose || w > ncqFsync) then
|
|
pure w
|
|
else do
|
|
appendTailSection fh >> liftIO (fileSynchronise fh)
|
|
-- FIXME: slow!
|
|
-- to make it appear in state, but to ignore until index is done
|
|
atomically do
|
|
writeTVar ncqStorageSyncReq False
|
|
modifyTVar ncqSyncNo succ
|
|
|
|
pure 0
|
|
|
|
if | needClose && continue -> do
|
|
atomically $ writeTQueue closeQ (fk, fh)
|
|
loop RunNew
|
|
|
|
| not continue -> loop RunFin
|
|
|
|
| otherwise -> loop $ RunWrite (fk, fh, rest, total)
|
|
|
|
RunWrite (fk, fh, w, total') -> do
|
|
|
|
let timeoutMicro = 10_000_000
|
|
|
|
chunk <- liftIO $ timeout timeoutMicro $ atomically do
|
|
stop <- readTVar ncqStorageStopReq
|
|
sy <- readTVar ncqStorageSyncReq
|
|
chunk <- stateTVar ncqWriteQ (Seq.splitAt ncqWriteBlock)
|
|
|
|
if | Seq.null chunk && stop -> pure $ Left ()
|
|
| Seq.null chunk && not (stop || sy) -> STM.retry
|
|
| otherwise -> pure $ Right chunk
|
|
|
|
case chunk of
|
|
Nothing -> do
|
|
liftIO $ join $ readTVarIO ncqOnRunWriteIdle
|
|
if w == 0 then do
|
|
loop $ RunWrite (fk,fh,w,total')
|
|
else do
|
|
atomically $ writeTVar ncqStorageSyncReq True
|
|
loop $ RunSync (fk, fh, w, total', True) -- exit ()
|
|
|
|
Just (Left{}) -> loop $ RunSync (fk, fh, w, total', False) -- exit ()
|
|
|
|
Just (Right chu) -> do
|
|
ws <- for chu $ \h -> do
|
|
atomically (ncqLookupEntrySTM ncq h) >>= \case
|
|
Just (NCQEntry bs w) -> do
|
|
atomically (writeTVar w (Just fk))
|
|
lift (appendSection fh bs)
|
|
|
|
_ -> pure 0
|
|
|
|
let written = sum ws
|
|
loop $ RunSync (fk, fh, w + written, total' + written, True)
|
|
|
|
where
|
|
|
|
emptyKey = BS.replicate ncqKeyLen 0
|
|
|
|
openNewDataFile :: forall mx . MonadIO mx => mx (FileKey, Fd)
|
|
openNewDataFile = do
|
|
fname <- ncqGetNewFossilName ncq
|
|
atomically $ modifyTVar ncqCurrentFiles (HS.insert (fromString fname))
|
|
touch fname
|
|
let flags = defaultFileFlags { exclusive = False, creat = Just 0o666 }
|
|
(fromString fname,) <$> liftIO (PosixBase.openFd fname Posix.ReadWrite flags)
|
|
|
|
spawnJob = ncqSpawnJob ncq
|
|
|
|
postponed n m = liftIO (pause @'Seconds n) >> m
|
|
|
|
spawnActivity m = do
|
|
a <- ContT $ withAsync m
|
|
link a
|
|
pure a
|
|
|
|
measureWPS = void $ flip fix Nothing \loop -> \case
|
|
Nothing -> do
|
|
w <- readTVarIO ncqWrites
|
|
t <- getTimeCoarse
|
|
pause @'Seconds step >> loop (Just (w,t))
|
|
|
|
Just (w0,t0) -> do
|
|
w1 <- readTVarIO ncqWrites
|
|
t1 <- getTimeCoarse
|
|
let dt = max 1e-9 (realToFrac @_ @Double (t1 - t0)) / 1e9
|
|
dw = fromIntegral (w1 - w0)
|
|
atomically $ modifyTVar ncqWriteEMA \ema -> alpha * (dw/dt) + 0.9 * ema
|
|
pause @'Seconds step >> loop (Just (w1,t1))
|
|
|
|
where
|
|
alpha = 0.1
|
|
step = 1.00
|
|
|
|
ncqSpawnJob :: forall m . MonadIO m => NCQStorage2 -> IO () -> m ()
|
|
ncqSpawnJob NCQStorage2{..} m = atomically $ writeTQueue ncqJobQ m
|
|
|
|
ncqFileFastCheck :: MonadUnliftIO m => FilePath -> m ()
|
|
ncqFileFastCheck fp = do
|
|
|
|
-- debug $ "ncqFileFastCheck" <+> pretty fp
|
|
|
|
mmaped <- liftIO $ mmapFileByteString fp Nothing
|
|
let size = BS.length mmaped
|
|
let s = BS.drop (size - 8) mmaped & N.word64
|
|
|
|
unless ( BS.length mmaped == fromIntegral s ) do
|
|
throwIO $ NCQFsckIssueExt (FsckInvalidFileSize (fromIntegral s))
|
|
|
|
|
|
ncqStorageScanDataFile :: MonadIO m
|
|
=> NCQStorage2
|
|
-> FilePath
|
|
-> ( Integer -> Integer -> HashRef -> ByteString -> m () )
|
|
-> m ()
|
|
ncqStorageScanDataFile ncq fp' action = do
|
|
let fp = ncqGetFileName ncq fp'
|
|
mmaped <- liftIO (mmapFileByteString fp Nothing)
|
|
|
|
flip runContT pure $ callCC \exit -> do
|
|
flip fix (0,mmaped) $ \next (o,bs) -> do
|
|
|
|
when (BS.length bs < ncqSLen) $ exit ()
|
|
|
|
let w = BS.take ncqSLen bs & N.word32 & fromIntegral
|
|
|
|
when (BS.length bs < ncqSLen + w) $ exit ()
|
|
|
|
let kv = BS.drop ncqSLen bs
|
|
|
|
let k = BS.take ncqKeyLen kv & coerce @_ @HashRef
|
|
let v = BS.take (ncqFullDataLen (NCQFullRecordLen w)) $ BS.drop ncqKeyLen kv
|
|
|
|
lift (action o (fromIntegral w) k v)
|
|
|
|
next (ncqSLen + o + fromIntegral w, BS.drop (w+ncqSLen) bs)
|
|
|
|
|
|
ncqIndexFile :: MonadUnliftIO m => NCQStorage2 -> DataFile FileKey -> m FilePath
|
|
ncqIndexFile n@NCQStorage2{} fk = do
|
|
|
|
let fp = toFileName fk & ncqGetFileName n
|
|
let dest = toFileName (IndexFile (coerce @_ @FileKey fk)) & ncqGetFileName n
|
|
|
|
debug $ "INDEX" <+> pretty fp <+> pretty dest
|
|
|
|
items <- S.toList_ do
|
|
ncqStorageScanDataFile n fp $ \o w k s -> case ncqIsMeta s of
|
|
Just M -> none
|
|
_ -> do
|
|
-- we need size in order to return block size faster
|
|
-- w/o search in fossil
|
|
let rs = (w + ncqSLen) & fromIntegral @_ @Word32 & N.bytestring32
|
|
let os = fromIntegral @_ @Word64 o & N.bytestring64
|
|
let record = os <> rs
|
|
S.yield (coerce k, record)
|
|
|
|
let (dir,name) = splitFileName fp
|
|
let idxTemp = (dropExtension name <> "-") `addExtension` ".cq$"
|
|
|
|
result <- nwayWriteBatch (nwayAllocDef 1.10 32 8 12) dir idxTemp items
|
|
|
|
mv result dest
|
|
|
|
ncqStateUpdate n [F 0 (coerce fk)]
|
|
|
|
pure dest
|
|
|
|
|
|
ncqAddTrackedFiles :: MonadIO m => NCQStorage2 -> [DataFile FileKey] -> m ()
|
|
ncqAddTrackedFiles ncq@NCQStorage2{..} files = flip runContT pure do
|
|
valid <- for files \fkey -> callCC \skip -> do
|
|
let fname = ncqGetFileName ncq (toFileName fkey)
|
|
let idxName = ncqGetFileName ncq (toFileName (IndexFile (coerce @_ @FileKey fkey)))
|
|
|
|
idxHere <- doesFileExist idxName
|
|
unless idxHere do
|
|
err $ "Index does not exist" <+> pretty (takeFileName idxName)
|
|
skip Nothing
|
|
|
|
stat <- liftIO $ PFS.getFileStatus fname
|
|
let ts = posixToTimeSpec $ PFS.modificationTimeHiRes stat
|
|
let fk = fromString (takeFileName fname)
|
|
|
|
pure $ Just (ts, fk)
|
|
|
|
atomically $ ncqAddTrackedFilesSTM ncq (catMaybes valid)
|
|
|
|
|
|
ncqAddTrackedFilesSTM :: NCQStorage2 -> [(TimeSpec, FileKey)] -> STM ()
|
|
ncqAddTrackedFilesSTM NCQStorage2{..} newFiles = do
|
|
a <- readTVar ncqTrackedFiles <&> V.toList
|
|
let already = HS.fromList (map tfKey a)
|
|
|
|
b <- for newFiles \(t, f) ->
|
|
if f `HS.member` already
|
|
then pure Nothing
|
|
else do
|
|
tv <- newTVar Nothing
|
|
pure . Just $ TrackedFile (FilePrio (Down t)) f tv
|
|
|
|
let new = V.fromList $ List.sortOn tfTime (a <> catMaybes b)
|
|
writeTVar ncqTrackedFiles new
|
|
{-# INLINE ncqAddTrackedFilesSTM #-}
|
|
|
|
evictIfNeededSTM :: NCQStorage2 -> Maybe Int -> STM ()
|
|
evictIfNeededSTM me@NCQStorage2{..} howMany = do
|
|
cur <- readTVar ncqCachedEntries
|
|
|
|
let need = fromMaybe cur howMany
|
|
excess = max 0 (cur + need - ncqMaxCached)
|
|
|
|
when (excess > 0) do
|
|
files <- ncqListTrackedFilesSTM me
|
|
|
|
oldest <- forM (V.toList files) \case
|
|
(k, Just (CachedEntry{..}) , t) -> do
|
|
ts <- readTVar cachedTs
|
|
pure (Just (ts, k, t))
|
|
_ -> pure Nothing
|
|
|
|
let victims = oldest & catMaybes & List.sortOn (view _1) & List.take excess
|
|
|
|
for_ victims $ \(_,_,t) -> do
|
|
writeTVar t Nothing
|
|
modifyTVar ncqCachedEntries (subtract 1)
|
|
|
|
{- HLINT ignore "Functor law" -}
|
|
|
|
ncqListDirFossils :: MonadIO m => NCQStorage2 -> m [FilePath]
|
|
ncqListDirFossils ncq = do
|
|
let wd = ncqGetWorkDir ncq
|
|
dirFiles wd
|
|
>>= mapM (pure . takeFileName)
|
|
<&> List.filter (\f -> List.isPrefixOf "fossil-" f && List.isSuffixOf ".data" f)
|
|
<&> HS.toList . HS.fromList
|
|
|
|
ncqListStateFiles :: forall m . MonadIO m => NCQStorage2 -> m [(TimeSpec, StateFile)]
|
|
ncqListStateFiles ncq = do
|
|
let wd = ncqGetWorkDir ncq
|
|
dirFiles wd
|
|
>>= mapM (pure . takeBaseName)
|
|
<&> List.filter (List.isPrefixOf "state-")
|
|
>>= mapM timespecOf
|
|
<&> fmap (over _2 fromString) . rights
|
|
<&> List.sortOn Down
|
|
|
|
where
|
|
timespecOf x = liftIO @m $ try @_ @IOException do
|
|
(,x) . posixToTimeSpec . modificationTimeHiRes <$> getFileStatus (ncqGetFileName ncq x)
|
|
|
|
ncqRepair :: MonadIO m => NCQStorage2 -> m ()
|
|
ncqRepair me@NCQStorage2{..} = do
|
|
states <- ncqListStateFiles me <&> fmap snd
|
|
|
|
fossils <- flip fix states $ \next -> \case
|
|
[] -> do
|
|
debug $ yellow "no valid state found; start from scratch"
|
|
ncqListDirFossils me <&> fmap (DataFile . fromString)
|
|
|
|
(s:ss) -> tryLoadState s >>= \case
|
|
Just files -> do
|
|
debug $ yellow "used state" <+> pretty s
|
|
atomically $ writeTVar ncqStateName (Just s)
|
|
pure files
|
|
Nothing -> do
|
|
warn $ red "inconsistent state" <+> pretty s
|
|
rm (ncqGetFileName me $ toFileName s)
|
|
next ss
|
|
|
|
ncqAddTrackedFiles me fossils
|
|
|
|
void $ liftIO (ncqStateUpdate me mempty)
|
|
|
|
where
|
|
|
|
readState path = ncqReadStateKeys me path <&> fmap DataFile
|
|
|
|
tryLoadState (fk :: StateFile) = liftIO do
|
|
|
|
debug $ "tryLoadState" <+> pretty fk
|
|
|
|
state <- readState fk
|
|
|
|
let checkFile fo = flip fix 0 $ \next i -> do
|
|
let dataFile = ncqGetFileName me (toFileName fo)
|
|
let indexFile = ncqGetFileName me (toFileName (IndexFile (coerce @_ @FileKey fo)))
|
|
|
|
here <- doesFileExist dataFile
|
|
|
|
if not here then do
|
|
rm indexFile
|
|
pure False
|
|
|
|
else do
|
|
|
|
try @_ @SomeException (ncqFileFastCheck dataFile) >>= \case
|
|
|
|
Left e -> do
|
|
err (viaShow e)
|
|
here <- doesFileExist dataFile
|
|
when here do
|
|
let broken = dropExtension dataFile `addExtension` ".broken"
|
|
mv dataFile broken
|
|
rm indexFile
|
|
warn $ red "renamed" <+> pretty dataFile <+> pretty broken
|
|
|
|
pure False
|
|
|
|
Right{} | i > 1 -> pure False
|
|
|
|
Right{} -> do
|
|
exists <- doesFileExist indexFile
|
|
if exists then do
|
|
pure True
|
|
else do
|
|
debug $ "indexing" <+> pretty (toFileName fo)
|
|
r <- ncqIndexFile me fo
|
|
debug $ "indexed" <+> pretty r
|
|
next (succ i)
|
|
|
|
results <- forM state checkFile
|
|
pure $ if and results then Just state else Nothing
|
|
|
|
|
|
ncqRefHash :: NCQStorage2 -> HashRef -> HashRef
|
|
ncqRefHash NCQStorage2 {..} h = HashRef (hashObject (coerce @_ @ByteString h <> coerce ncqSalt))
|
|
|
|
ncqRunTaskNoMatterWhat :: MonadUnliftIO m => NCQStorage2 -> m a -> m a
|
|
ncqRunTaskNoMatterWhat NCQStorage2{..} task = do
|
|
atomically (modifyTVar ncqStorageTasks succ)
|
|
task `finally` atomically (modifyTVar ncqStorageTasks pred)
|
|
|
|
ncqRunTask :: MonadUnliftIO m => NCQStorage2 -> a -> m a -> m a
|
|
ncqRunTask ncq@NCQStorage2{..} def task = readTVarIO ncqStorageStopReq >>= \case
|
|
True -> pure def
|
|
False -> ncqRunTaskNoMatterWhat ncq task
|
|
|
|
ncqWaitTasks :: MonadUnliftIO m => NCQStorage2 -> m ()
|
|
ncqWaitTasks NCQStorage2{..} = atomically do
|
|
tno <- readTVar ncqStorageTasks
|
|
when (tno > 0) STM.retry
|
|
|
|
ncqStateUseSTM :: NCQStorage2 -> STM ()
|
|
ncqStateUseSTM NCQStorage2{..} = do
|
|
k <- readTVar ncqStateVersion <&> fromIntegral
|
|
modifyTVar ncqStateUsage (IntMap.update (Just . over _1 succ) k)
|
|
{-# INLINE ncqStateUseSTM #-}
|
|
|
|
ncqStateUnuseSTM :: NCQStorage2 -> STM ()
|
|
ncqStateUnuseSTM NCQStorage2{..} = do
|
|
k <- readTVar ncqStateVersion <&> fromIntegral
|
|
-- TODO: remove when n <= 0
|
|
modifyTVar ncqStateUsage (IntMap.update (Just . over _1 pred) k)
|
|
{-# INLINE ncqStateUnuseSTM #-}
|
|
|
|
ncqStateUpdate :: MonadUnliftIO m => NCQStorage2 -> [StateOP] -> m Bool
|
|
ncqStateUpdate me@NCQStorage2{..} ops' = withSem ncqStateSem $ flip runContT pure $ callCC \exit -> do
|
|
|
|
debug $ "ncqStateUpdate" <+> viaShow ops'
|
|
|
|
t1 <- FilePrio . Down <$> liftIO getTimeCoarse
|
|
|
|
ops <- checkWithDisk $ \name -> do
|
|
err $ "ncqStateUpdate invariant fail" <+> pretty name
|
|
exit False
|
|
|
|
changed <- atomically do
|
|
current' <- readTVar ncqTrackedFiles <&> V.toList
|
|
|
|
memStateVersionSTM (HS.fromList (fmap tfKey current'))
|
|
|
|
let current = HM.fromList [ (tfKey, e) | e@TrackedFile{..} <- current' ]
|
|
|
|
wtf <- flip fix (current, ops) $ \next (s, o) -> case o of
|
|
[] -> pure s
|
|
|
|
(D fk : rest) -> next (HM.delete fk s, rest)
|
|
|
|
(P fk : rest) | HM.member fk current -> next (s, rest)
|
|
| otherwise -> do
|
|
e <- TrackedFile t1 fk <$> newTVar (Just PendingEntry)
|
|
next (HM.insert fk e s, rest)
|
|
|
|
(F t fk : rest) -> do
|
|
case HM.lookup fk s of
|
|
Nothing -> do
|
|
new <- TrackedFile (FilePrio (Down t)) fk <$> newTVar Nothing
|
|
next (HM.insert fk new s, rest)
|
|
|
|
Just TrackedFile{..} -> do
|
|
pe <- readTVar tfCached
|
|
if isNotPending pe then
|
|
next (s, rest)
|
|
else do
|
|
writeTVar tfCached Nothing
|
|
next (s, rest)
|
|
|
|
writeTVar ncqTrackedFiles (V.fromList $ List.sortOn tfTime (HM.elems wtf))
|
|
|
|
pure (HM.keysSet current /= HM.keysSet wtf)
|
|
|
|
-- let fks = HS.fromList [ fk | F _ fk <- ops ]
|
|
-- tra <- lift $ ncqListTrackedFiles me <&> filter (not . isNotPending . view _2) . V.toList
|
|
-- let tra2 = [ k | (k,_,_) <- tra, HS.member k fks ]
|
|
|
|
-- unless (List.null tra2) do
|
|
-- err $ red "FUCKED" <+> pretty tra2
|
|
|
|
when changed $ liftIO do
|
|
name <- ncqDumpCurrentState me
|
|
atomically $ writeTVar ncqStateName (Just name)
|
|
debug $ green "switched state" <+> pretty name
|
|
|
|
-- a1 <- V.toList <$> lift (ncqListTrackedFiles me)
|
|
|
|
-- let fsz = HS.fromList [ fk | F _ fk <- ops ]
|
|
|
|
-- let p1 = [ fk | (fk, Just PendingEntry{}, _) <- a1, HS.member fk fsz ]
|
|
|
|
-- unless (List.null p1) do
|
|
-- error $ show $ "PIZDA!" <+> pretty p1 <> line <> viaShow ops'
|
|
|
|
pure changed
|
|
|
|
where
|
|
|
|
memStateVersionSTM currentKeys = do
|
|
k0 <- readTVar ncqStateVersion <&> fromIntegral
|
|
|
|
let doAlter = \case
|
|
Nothing -> Just (0, currentKeys)
|
|
Just (u,f) -> Just (u,f)
|
|
|
|
modifyTVar ncqStateUsage (IntMap.alter doAlter k0)
|
|
|
|
checkWithDisk onFail = for ops' $ \case --
|
|
f@(F _ fk) -> do
|
|
let datFile = ncqGetFileName me (toFileName $ DataFile fk)
|
|
e2 <- doesFileExist datFile
|
|
unless e2 (onFail datFile)
|
|
|
|
ts <- liftIO (getFileStatus datFile) <&>
|
|
posixToTimeSpec . PFS.modificationTimeHiRes
|
|
|
|
pure (F ts fk)
|
|
|
|
d -> pure d
|
|
|
|
|
|
ncqDumpCurrentState :: MonadUnliftIO m => NCQStorage2 -> m StateFile
|
|
ncqDumpCurrentState me@NCQStorage2{..} = do
|
|
files <- ncqListTrackedFiles me
|
|
name <- ncqGetNewStateName me
|
|
writeBinaryFileDurableAtomic name (BS8.unlines [coerce k | (k,_,_) <- V.toList files])
|
|
pure $ fromString name
|
|
|
|
ncqMergeFull :: forall m . MonadUnliftIO m => NCQStorage2 -> m ()
|
|
ncqMergeFull me = fix \next -> ncqMergeStep me >>= \case
|
|
False -> none
|
|
True -> next
|
|
|
|
-- FIXME: sometime-causes-no-such-file-or-directory
|
|
ncqMergeStep :: MonadUnliftIO m => NCQStorage2 -> m Bool
|
|
ncqMergeStep ncq@NCQStorage2{..} = do
|
|
withSem ncqMergeSem $ ncqRunTask ncq False do
|
|
|
|
debug "ncqMergeStep"
|
|
|
|
tracked <- ncqListTrackedFiles ncq
|
|
|
|
files <- for tracked $ \(f,e,_) -> do
|
|
|
|
let fn = ncqGetFileName ncq (toFileName $ DataFile f)
|
|
let idx = ncqGetFileName ncq (toFileName $ IndexFile f)
|
|
|
|
dataHere <- doesFileExist fn
|
|
|
|
sz <- case e of
|
|
Just PendingEntry -> pure (-100)
|
|
_ | dataHere -> liftIO (fileSize fn)
|
|
| otherwise -> pure (-3)
|
|
|
|
idxHere <- doesFileExist idx
|
|
|
|
pure (f, sz, idxHere)
|
|
|
|
-- debug $ red "MERGE FILES" <+> viaShow files
|
|
|
|
let bothOk (_, sz1, here1) (_, sz2, here2) =
|
|
here1 && here2
|
|
&& sz1 > 0 && sz2 > 0
|
|
&& (sz1 + sz2) < fromIntegral ncqMaxLog
|
|
|
|
found <- flip fix (V.toList files, Nothing, Nothing) $ \next -> \case
|
|
([], _, r) -> pure r
|
|
|
|
(a:b:rest, Nothing, _) | bothOk a b -> do
|
|
next (b:rest, Just (view _2 a + view _2 b), Just (a,b))
|
|
|
|
(a:b:rest, Just s, _ ) | bothOk a b && (view _2 a + view _2 b) < s -> do
|
|
next (b:rest, Just (view _2 a + view _2 b), Just (a,b))
|
|
|
|
(_:rest, s, r) -> do
|
|
next (rest, s, r)
|
|
|
|
case found of
|
|
Just (a,b) -> mergeStep a b >> pure True
|
|
_ -> do
|
|
debug "merge: not found shit"
|
|
pure False
|
|
|
|
where
|
|
|
|
ncqGetNewMergeName :: MonadIO m => NCQStorage2 -> m FilePath
|
|
ncqGetNewMergeName n@NCQStorage2{} = do
|
|
let (p,tpl) = splitFileName (ncqGetFileName n "merge-.data")
|
|
liftIO $ emptyTempFile p tpl
|
|
|
|
mergeStep (a,_,_) (b,_,_) = do
|
|
debug $ "merge" <+> pretty a <+> pretty b
|
|
|
|
let fDataNameA = ncqGetFileName ncq $ toFileName (DataFile a)
|
|
let fIndexNameA = ncqGetFileName ncq $ toFileName (IndexFile a)
|
|
|
|
let fDataNameB = ncqGetFileName ncq $ toFileName (DataFile b)
|
|
|
|
doesFileExist fDataNameA `orFail` ("not exist" <+> pretty fDataNameA)
|
|
doesFileExist fDataNameB `orFail` ("not exist" <+> pretty fDataNameB)
|
|
doesFileExist fIndexNameA `orFail` ("not exist" <+> pretty fIndexNameA)
|
|
|
|
flip runContT pure $ callCC \exit -> do
|
|
|
|
mfile <- ncqGetNewMergeName ncq
|
|
|
|
ContT $ bracket none $ const do
|
|
rm mfile
|
|
|
|
liftIO $ withBinaryFileAtomic mfile WriteMode $ \fwh -> do
|
|
|
|
debug $ "merge: okay, good to go" <+> pretty (takeFileName mfile)
|
|
|
|
(mmIdx, nway) <- nwayHashMMapReadOnly fIndexNameA
|
|
>>= orThrow (NCQMergeInvariantFailed (show $ "can't mmap" <+> pretty fIndexNameA))
|
|
|
|
debug $ "SCAN FILE A" <+> pretty fDataNameA
|
|
|
|
writeFiltered ncq fDataNameA fwh $ \_ _ _ v -> do
|
|
let meta = Just M == ncqIsMeta v
|
|
pure $ not meta
|
|
|
|
debug $ "SCAN FILE B" <+> pretty fDataNameA
|
|
|
|
writeFiltered ncq fDataNameB fwh $ \_ _ k v -> do
|
|
let meta = Just M == ncqIsMeta v
|
|
foundInA <- liftIO (nwayHashLookup nway mmIdx (coerce k)) <&> isJust
|
|
let skip = foundInA || meta
|
|
pure $ not skip
|
|
|
|
appendTailSection =<< handleToFd fwh
|
|
|
|
liftIO do
|
|
|
|
result <- fileSize mfile
|
|
|
|
idx <- if result == 0 then
|
|
pure Nothing
|
|
else do
|
|
fossil <- ncqGetNewFossilName ncq
|
|
mv mfile fossil
|
|
statA <- getFileStatus fDataNameA
|
|
let ts = modificationTimeHiRes statA
|
|
setFileTimesHiRes fossil ts ts
|
|
let fk = DataFile (fromString fossil)
|
|
void $ ncqIndexFile ncq fk
|
|
pure $ Just (ts,fk)
|
|
|
|
for_ idx $ \(ts,DataFile fk) -> do
|
|
void $ ncqStateUpdate ncq [D a, D b, F (posixToTimeSpec ts) fk]
|
|
|
|
orFail what e = do
|
|
r <- what
|
|
unless r (throwIO (NCQMergeInvariantFailed (show e)))
|
|
|
|
|
|
-- ncqCompact :: MonadUnliftIO m => NCQStorage2 -> m ()
|
|
-- ncqCompact ncq@NCQStorage2{..} = withSem ncqMergeSem do
|
|
|
|
-- q <- newTVarIO ( mempty :: HashMap FileKey (HashSet HashRef) )
|
|
|
|
-- ncqLinearScanForCompact ncq $ \fk h -> atomically do
|
|
-- modifyTVar q (HM.insertWith (<>) fk (HS.singleton h))
|
|
|
|
-- state0 <- readTVarIO q
|
|
|
|
-- for_ (HM.toList state0) $ \(fk, es) -> do
|
|
-- trace $ "TO DELETE" <+> pretty fk <+> pretty (HS.size es)
|
|
|
|
-- let fDataNameA = ncqGetFileName ncq (toFileName $ DataFile fk)
|
|
|
|
-- flip runContT pure do
|
|
|
|
-- mfile <- ncqGetNewCompactName ncq
|
|
|
|
-- ContT $ bracket none $ const $ rm mfile
|
|
|
|
-- liftIO do
|
|
-- withBinaryFileAtomic mfile WriteMode $ \fwh -> do
|
|
-- writeFiltered ncq fDataNameA fwh $ \_ _ k v -> do
|
|
-- pure $ not $ HS.member k es
|
|
-- appendTailSection =<< handleToFd fwh
|
|
|
|
-- result <- fileSize mfile
|
|
|
|
-- if result == 0 then do
|
|
-- atomically $ modifyTVar ncqTrackedFiles (HPSQ.delete fk)
|
|
-- else do
|
|
|
|
-- fossil <- ncqGetNewFossilName ncq
|
|
-- mv mfile fossil
|
|
|
|
-- statA <- getFileStatus fDataNameA
|
|
|
|
-- let ts = modificationTimeHiRes statA
|
|
-- setFileTimesHiRes fossil ts ts
|
|
|
|
-- void $ ncqIndexFile ncq (DataFile (fromString fossil))
|
|
-- void $ ncqStateUpdate ncq [F (posixToTimeSpec ts) (fromString fossil)]
|
|
|
|
|
|
-- debug $ "compact done" <+> pretty (HM.size state0)
|
|
|
|
|
|
-- NOTE: incremental
|
|
-- now it may became incremental if we'll
|
|
-- limit amount of tombs per one pass
|
|
-- then remove all dead entries,
|
|
-- then call again to remove tombs. etc
|
|
-- as for now, seems it should work up to 10TB
|
|
-- of storage
|
|
ncqLinearScanForCompact :: MonadUnliftIO m
|
|
=> NCQStorage2
|
|
-> ( FileKey -> HashRef -> m () )
|
|
-> m Int
|
|
ncqLinearScanForCompact ncq@NCQStorage2{..} action = flip runContT pure do
|
|
|
|
ContT $ useVersion ncq
|
|
|
|
tracked <- readTVarIO ncqTrackedFiles <&> V.toList
|
|
|
|
let state0 = mempty :: HashMap HashRef TimeSpec
|
|
|
|
profit <- newTVarIO 0
|
|
tombUse <- newTVarIO (mempty :: HashMap HashRef (FileKey, Int))
|
|
|
|
-- TODO: explicit-unmap-files
|
|
|
|
flip fix (tracked, state0) $ \next -> \case
|
|
([], s) -> none
|
|
((TrackedFile{..}):rest, state) -> do
|
|
e <- readTVarIO tfCached
|
|
|
|
let cqFile = ncqGetFileName ncq (toFileName (IndexFile tfKey))
|
|
let dataFile = ncqGetFileName ncq (toFileName (DataFile tfKey))
|
|
|
|
c <- doesFileExist cqFile
|
|
d <- doesFileExist dataFile
|
|
let pending = not (isNotPending e)
|
|
|
|
if not c || not d || pending then
|
|
next (rest, state)
|
|
else do
|
|
|
|
|
|
(mmaped,meta@NWayHash{..}) <- liftIO $ nwayHashMMapReadOnly cqFile
|
|
>>= orThrow (NWayHashInvalidMetaData cqFile)
|
|
|
|
notice $ "SCAN" <+> pretty cqFile
|
|
|
|
let emptyKey = BS.replicate nwayKeySize 0
|
|
|
|
found <- S.toList_ do
|
|
nwayHashScanAll meta mmaped $ \o k entryBs -> do
|
|
unless (k == emptyKey) do
|
|
|
|
let off = N.word64 (BS.take 8 entryBs)
|
|
let sz = N.word32 (BS.take 4 (BS.drop 8 entryBs))
|
|
|
|
-- debug $ "SCAN SHIT" <+> pretty tfKey <+> pretty off <+> pretty sz
|
|
|
|
-- fast-n-dirty-check-for-deleted
|
|
when (sz <= ncqSLen + ncqKeyLen + ncqPrefixLen ) do
|
|
debug $ red "FOUND EMPTY RECORD" <+> pretty sz
|
|
S.yield off
|
|
|
|
let kk = coerce k
|
|
|
|
case HM.lookup kk state of
|
|
Just ts | ts > timeSpecFromFilePrio tfTime -> do
|
|
notice $ pretty kk <+> pretty (sz + ncqSLen)
|
|
atomically do
|
|
modifyTVar profit ( + (sz + ncqSLen) )
|
|
modifyTVar tombUse (HM.adjust (over _2 succ) kk)
|
|
lift $ lift $ action (fromString dataFile) kk
|
|
|
|
_ -> none
|
|
|
|
notice "SURVIVED 2"
|
|
|
|
newEntries <- S.toList_ do
|
|
unless (List.null found) do
|
|
notice $ red "TRY" <+> pretty dataFile
|
|
dataBs <- liftIO $ mmapFileByteString dataFile Nothing
|
|
notice "SURVIVED 3"
|
|
for_ found $ \o -> do
|
|
let pre = BS.take (fromIntegral ncqPrefixLen) (BS.drop (ncqDataOffset o) dataBs)
|
|
|
|
when (pre == ncqRefPrefix || pre == ncqTombPrefix) do
|
|
let keyBs = BS.take ncqKeyLen (BS.drop (fromIntegral o + ncqSLen) dataBs)
|
|
let key = coerce (BS.copy keyBs)
|
|
unless (HM.member key state) do
|
|
S.yield (key, timeSpecFromFilePrio tfTime)
|
|
when ( pre == ncqTombPrefix ) do
|
|
atomically $ modifyTVar tombUse (HM.insert key (tfKey,0))
|
|
|
|
next (rest, state <> HM.fromList newEntries)
|
|
|
|
use <- readTVarIO tombUse
|
|
let useless = [ (f,h) | (h, (f,n)) <- HM.toList use, n == 0 ]
|
|
|
|
for_ useless $ \(f,h) -> do
|
|
atomically $ modifyTVar profit (+ncqFullTombLen)
|
|
lift $ action f h
|
|
|
|
notice "SURVIVED 3"
|
|
|
|
readTVarIO profit <&> fromIntegral
|
|
|
|
ncqReadStateKeys :: forall m . MonadUnliftIO m => NCQStorage2 -> StateFile -> m [FileKey]
|
|
ncqReadStateKeys me path = liftIO do
|
|
keys <- BS8.readFile (ncqGetFileName me (toFileName path))
|
|
<&> filter (not . BS8.null) . BS8.lines
|
|
pure $ fmap (coerce @_ @FileKey) keys
|
|
|
|
ncqSweepFossils :: forall m . MonadUnliftIO m => NCQStorage2 -> m ()
|
|
ncqSweepFossils me@NCQStorage2{..} = withSem ncqSweepSem do
|
|
debug $ yellow "sweep fossils"
|
|
|
|
-- better be safe than sorry
|
|
|
|
current <- readTVarIO ncqCurrentFiles
|
|
|
|
sfs <- ncqListStateFiles me
|
|
|
|
debug $ "STATE FILES" <+> vcat (fmap pretty sfs)
|
|
|
|
mentioned <- mapM (safeRead . ncqReadStateKeys @m me) (fmap snd sfs)
|
|
<&> HS.fromList . mconcat
|
|
|
|
kicked' <- ncqListDirFossils me <&> fmap fromString
|
|
|
|
(kicked, used) <- atomically do
|
|
|
|
active <- ncqListTrackedFilesSTM me <&> HS.fromList . fmap (view _1) . V.toList
|
|
|
|
used' <- readTVar ncqStateUsage <&> IntMap.elems
|
|
|
|
let used = current
|
|
<> active
|
|
<> mentioned
|
|
<> HS.unions [ keys | (n, keys) <- used', n > 0 ]
|
|
|
|
let k = filter (\x -> not (HS.member x used)) kicked'
|
|
pure (k,HS.fromList $ HS.toList used)
|
|
|
|
debug $ "KICK" <+> vcat (fmap pretty kicked)
|
|
|
|
debug $ "LIVE SET" <+> vcat (fmap pretty (HS.toList used))
|
|
|
|
for_ kicked $ \fo -> do
|
|
debug $ "sweep fossil file" <+> pretty fo
|
|
rm (ncqGetFileName me (toFileName (IndexFile fo)))
|
|
rm (ncqGetFileName me (toFileName (DataFile fo)))
|
|
|
|
where
|
|
safeRead m = try @_ @IOException m >>= \case
|
|
Right x -> pure x
|
|
Left e -> err ("ncqSweepFossils" <+> viaShow e) >> pure mempty
|
|
|
|
ncqSweepStates :: MonadUnliftIO m => NCQStorage2 -> m ()
|
|
ncqSweepStates me@NCQStorage2{..} = withSem ncqSweepSem $ flip runContT pure do
|
|
|
|
debug $ yellow "remove unused states"
|
|
|
|
current' <- readTVarIO ncqStateName
|
|
|
|
current <- ContT $ for_ current'
|
|
|
|
debug $ yellow "CURRENT STATE NAME" <+> pretty current
|
|
|
|
states <- ncqListStateFiles me <&> fmap snd
|
|
|
|
flip fix (Left states) $ \next -> \case
|
|
Left [] -> none
|
|
Right [] -> none
|
|
Left (x:xs) | x == current -> next (Right xs)
|
|
| otherwise -> next (Left xs)
|
|
|
|
Right (x:xs) -> do
|
|
debug $ "Remove obsolete state" <+> pretty x
|
|
rm (ncqGetFileName me (toFileName x))
|
|
next (Right xs)
|
|
|
|
ncqSetOnRunWriteIdle :: MonadUnliftIO m => NCQStorage2 -> IO () -> m ()
|
|
ncqSetOnRunWriteIdle NCQStorage2{..} io = atomically (writeTVar ncqOnRunWriteIdle io)
|
|
|
|
writeFiltered :: forall m . MonadIO m
|
|
=> NCQStorage2
|
|
-> FilePath
|
|
-> Handle
|
|
-> ( Integer -> Integer -> HashRef -> ByteString -> m Bool)
|
|
-> m ()
|
|
|
|
writeFiltered ncq fn out filt = do
|
|
ncqStorageScanDataFile ncq fn $ \o s k v -> do
|
|
skip <- filt o s k v <&> not
|
|
|
|
-- when skip do
|
|
-- debug $ pretty k <+> pretty "skipped"
|
|
|
|
unless skip $ liftIO do
|
|
BS.hPut out (LBS.toStrict (makeEntryLBS k v))
|
|
|
|
where
|
|
|
|
makeEntryLBS h bs = do
|
|
let b = byteString (coerce @_ @ByteString h)
|
|
<> byteString bs
|
|
|
|
let wbs = toLazyByteString b
|
|
let len = LBS.length wbs
|
|
let ws = byteString (N.bytestring32 (fromIntegral len))
|
|
|
|
toLazyByteString (ws <> b)
|
|
|
|
|
|
zeroSyncEntry :: ByteString
|
|
zeroSyncEntry = ncqMakeSectionBS (Just B) zeroHash zeroPayload
|
|
where zeroPayload = N.bytestring64 0
|
|
zeroHash = HashRef (hashObject zeroPayload)
|
|
{-# INLINE zeroSyncEntry #-}
|
|
|
|
zeroSyncEntrySize :: Word64
|
|
zeroSyncEntrySize = fromIntegral (BS.length zeroSyncEntry)
|
|
{-# INLINE zeroSyncEntrySize #-}
|
|
|
|
-- 1. It's M-record
|
|
-- 2. It's last w64be == fileSize
|
|
-- 3. It's hash == hash (bytestring64be fileSize)
|
|
-- 4. recovery-strategy: start-to-end, end-to-start
|
|
fileTailRecord :: Integral a => a -> ByteString
|
|
fileTailRecord w = do
|
|
-- on open: last w64be == fileSize
|
|
let paylo = N.bytestring64 (fromIntegral w + zeroSyncEntrySize)
|
|
let h = hashObject @HbSync paylo & coerce
|
|
ncqMakeSectionBS (Just M) h paylo
|
|
{-# INLINE fileTailRecord #-}
|
|
|
|
appendSection :: forall m . MonadUnliftIO m
|
|
=> Fd
|
|
-> ByteString
|
|
-> m Int -- (FOff, Int)
|
|
|
|
appendSection fh sect = do
|
|
-- off <- liftIO $ fdSeek fh SeekFromEnd 0
|
|
-- pure (fromIntegral off, fromIntegral len)
|
|
liftIO (Posix.fdWrite fh sect) <&> fromIntegral
|
|
{-# INLINE appendSection #-}
|
|
|
|
appendTailSection :: MonadIO m => Fd -> m ()
|
|
appendTailSection fh = liftIO do
|
|
s <- Posix.fileSize <$> Posix.getFdStatus fh
|
|
void (appendSection fh (fileTailRecord s))
|
|
{-# INLINE appendTailSection #-}
|
|
|
|
|
|
withSem :: forall a m . MonadUnliftIO m => TSem -> m a -> m a
|
|
withSem sem m = bracket enter leave (const m)
|
|
where enter = atomically (waitTSem sem)
|
|
leave = const $ atomically (signalTSem sem)
|
|
|
|
isNotPending :: Maybe CachedEntry -> Bool
|
|
isNotPending = \case
|
|
Just (PendingEntry {}) -> False
|
|
_ -> True
|
|
|
|
|