mirror of https://github.com/voidlizard/hbs2
wip
This commit is contained in:
parent
7365aa3813
commit
4b003fe2ec
|
@ -55,7 +55,7 @@ common shared-properties
|
||||||
, TypeFamilies
|
, TypeFamilies
|
||||||
, TypeOperators
|
, TypeOperators
|
||||||
, RecordWildCards
|
, RecordWildCards
|
||||||
|
, OverloadedLabels
|
||||||
|
|
||||||
library
|
library
|
||||||
import: shared-properties
|
import: shared-properties
|
||||||
|
@ -81,13 +81,16 @@ library
|
||||||
build-depends: base, hbs2-core, hbs2-log-structured, suckless-conf
|
build-depends: base, hbs2-core, hbs2-log-structured, suckless-conf
|
||||||
, async
|
, async
|
||||||
, binary
|
, binary
|
||||||
|
, bitvec
|
||||||
, bytestring
|
, bytestring
|
||||||
, bytestring-mmap
|
, bytestring-mmap
|
||||||
, bitvec
|
|
||||||
, containers
|
, containers
|
||||||
, directory
|
, directory
|
||||||
|
, filelock
|
||||||
, filepath
|
, filepath
|
||||||
, filepattern
|
, filepattern
|
||||||
|
, generic-lens
|
||||||
|
-- , generic-optics
|
||||||
, hashable
|
, hashable
|
||||||
, memory
|
, memory
|
||||||
, microlens-platform
|
, microlens-platform
|
||||||
|
@ -95,6 +98,8 @@ library
|
||||||
, mtl
|
, mtl
|
||||||
, mwc-random
|
, mwc-random
|
||||||
, network-byte-order
|
, network-byte-order
|
||||||
|
, optics-core
|
||||||
|
, optics
|
||||||
, prettyprinter
|
, prettyprinter
|
||||||
, psqueues
|
, psqueues
|
||||||
, random
|
, random
|
||||||
|
@ -104,8 +109,8 @@ library
|
||||||
, stm-chans
|
, stm-chans
|
||||||
, streaming
|
, streaming
|
||||||
, temporary
|
, temporary
|
||||||
, time
|
|
||||||
, text
|
, text
|
||||||
|
, time
|
||||||
, transformers
|
, transformers
|
||||||
, uniplate
|
, uniplate
|
||||||
, unix
|
, unix
|
||||||
|
@ -113,8 +118,6 @@ library
|
||||||
, unordered-containers
|
, unordered-containers
|
||||||
, vector
|
, vector
|
||||||
, zstd
|
, zstd
|
||||||
, filelock
|
|
||||||
|
|
||||||
|
|
||||||
hs-source-dirs: lib
|
hs-source-dirs: lib
|
||||||
default-language: Haskell2010
|
default-language: Haskell2010
|
||||||
|
|
|
@ -53,8 +53,10 @@ instance Pretty FileKey where
|
||||||
pretty (FileKey s) = pretty (BS8.unpack s)
|
pretty (FileKey s) = pretty (BS8.unpack s)
|
||||||
|
|
||||||
newtype DataFile a = DataFile a
|
newtype DataFile a = DataFile a
|
||||||
|
deriving newtype (IsString,Pretty)
|
||||||
|
|
||||||
newtype IndexFile a = IndexFile a
|
newtype IndexFile a = IndexFile a
|
||||||
|
deriving newtype (IsString,Pretty)
|
||||||
|
|
||||||
newtype StateFile a = StateFile a
|
newtype StateFile a = StateFile a
|
||||||
deriving newtype (IsString,Eq,Ord,Pretty)
|
deriving newtype (IsString,Eq,Ord,Pretty)
|
||||||
|
@ -62,6 +64,9 @@ newtype StateFile a = StateFile a
|
||||||
class ToFileName a where
|
class ToFileName a where
|
||||||
toFileName :: a -> FilePath
|
toFileName :: a -> FilePath
|
||||||
|
|
||||||
|
instance ToFileName FilePath where
|
||||||
|
toFileName = id
|
||||||
|
|
||||||
instance ToFileName FileKey where
|
instance ToFileName FileKey where
|
||||||
toFileName = BS8.unpack . coerce
|
toFileName = BS8.unpack . coerce
|
||||||
|
|
||||||
|
|
|
@ -7,12 +7,15 @@ import HBS2.Storage.NCQ3.Internal.State
|
||||||
import HBS2.Storage.NCQ3.Internal.Run
|
import HBS2.Storage.NCQ3.Internal.Run
|
||||||
import HBS2.Storage.NCQ3.Internal.Memtable
|
import HBS2.Storage.NCQ3.Internal.Memtable
|
||||||
import HBS2.Storage.NCQ3.Internal.Files
|
import HBS2.Storage.NCQ3.Internal.Files
|
||||||
|
import HBS2.Storage.NCQ3.Internal.Index
|
||||||
|
|
||||||
import Control.Monad.Trans.Cont
|
import Control.Monad.Trans.Cont
|
||||||
import Network.ByteOrder qualified as N
|
import Network.ByteOrder qualified as N
|
||||||
import Data.HashPSQ qualified as HPSQ
|
import Data.HashPSQ qualified as HPSQ
|
||||||
import Data.Vector qualified as V
|
import Data.Vector qualified as V
|
||||||
import Data.HashMap.Strict qualified as HM
|
import Data.HashMap.Strict qualified as HM
|
||||||
|
import Data.List qualified as List
|
||||||
|
import Data.Set qualified as Set
|
||||||
import Data.ByteString qualified as BS
|
import Data.ByteString qualified as BS
|
||||||
import Data.Sequence qualified as Seq
|
import Data.Sequence qualified as Seq
|
||||||
import System.FilePath.Posix
|
import System.FilePath.Posix
|
||||||
|
@ -56,12 +59,6 @@ ncqStorageOpen3 fp upd = do
|
||||||
ncqMemTable <- V.fromList <$> replicateM shardNum (newTVarIO mempty)
|
ncqMemTable <- V.fromList <$> replicateM shardNum (newTVarIO mempty)
|
||||||
ncqMMapCachedIdx <- newTVarIO HPSQ.empty
|
ncqMMapCachedIdx <- newTVarIO HPSQ.empty
|
||||||
ncqMMapCachedData <- newTVarIO HPSQ.empty
|
ncqMMapCachedData <- newTVarIO HPSQ.empty
|
||||||
ncqStateFiles <- newTVarIO mempty
|
|
||||||
ncqStateIndex <- newTVarIO mempty
|
|
||||||
ncqStateFileSeq <- newTVarIO 0
|
|
||||||
ncqStateVersion <- newTVarIO 0
|
|
||||||
ncqStateUsage <- newTVarIO mempty
|
|
||||||
ncqStateFacts <- newTVarIO mempty
|
|
||||||
ncqWrites <- newTVarIO 0
|
ncqWrites <- newTVarIO 0
|
||||||
ncqWriteEMA <- newTVarIO 0.0
|
ncqWriteEMA <- newTVarIO 0.0
|
||||||
ncqWriteOps <- V.fromList <$> replicateM wopNum newTQueueIO
|
ncqWriteOps <- V.fromList <$> replicateM wopNum newTQueueIO
|
||||||
|
@ -71,6 +68,7 @@ ncqStorageOpen3 fp upd = do
|
||||||
ncqSyncReq <- newTVarIO False
|
ncqSyncReq <- newTVarIO False
|
||||||
ncqOnRunWriteIdle <- newTVarIO none
|
ncqOnRunWriteIdle <- newTVarIO none
|
||||||
ncqSyncNo <- newTVarIO 0
|
ncqSyncNo <- newTVarIO 0
|
||||||
|
ncqState <- newTVarIO mempty
|
||||||
|
|
||||||
let ncq = NCQStorage3{..} & upd
|
let ncq = NCQStorage3{..} & upd
|
||||||
|
|
||||||
|
@ -134,3 +132,67 @@ ncqLocate me@NCQStorage3{..} href = ncqOperation me (pure Nothing) do
|
||||||
|
|
||||||
atomically $ takeTMVar answ
|
atomically $ takeTMVar answ
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
ncqTryLoadState :: forall m. MonadUnliftIO m
|
||||||
|
=> NCQStorage3
|
||||||
|
-> m ()
|
||||||
|
|
||||||
|
ncqTryLoadState me = do
|
||||||
|
|
||||||
|
stateFiles <- ncqListFilesBy me ( List.isPrefixOf "s-" )
|
||||||
|
|
||||||
|
r <- flip fix ([], ncqState0, stateFiles) $ \next -> \case
|
||||||
|
(r, s, []) -> pure (r,s,[])
|
||||||
|
(l, s0, (_,s):ss) -> do
|
||||||
|
|
||||||
|
readStateMay me s >>= \case
|
||||||
|
Nothing -> next (s : l, s0, ss)
|
||||||
|
Just ns -> do
|
||||||
|
ok <- checkState ns
|
||||||
|
if ok then
|
||||||
|
pure (l <> fmap snd ss, ns, ss)
|
||||||
|
else
|
||||||
|
next (s : l, s0, ss)
|
||||||
|
|
||||||
|
let (bad, NCQState{..}, rest) = r
|
||||||
|
|
||||||
|
for_ [ (d,s) | P (PData d s) <- Set.toList ncqStateFacts ] $ \(dataFile,s) -> do
|
||||||
|
let path = ncqGetFileName me dataFile
|
||||||
|
realSize <- fileSize path
|
||||||
|
|
||||||
|
let corrupted = realSize /= fromIntegral s
|
||||||
|
let color = if corrupted then red else id
|
||||||
|
|
||||||
|
debug $ yellow "indexing" <+> pretty dataFile <+> pretty s <+> color (pretty realSize)
|
||||||
|
|
||||||
|
when corrupted $ liftIO do
|
||||||
|
warn $ red "trim" <+> pretty s <+> pretty (takeFileName path)
|
||||||
|
PFS.setFileSize path (fromIntegral s)
|
||||||
|
|
||||||
|
ncqIndexFile me dataFile
|
||||||
|
|
||||||
|
for_ (bad <> drop 3 (fmap snd rest)) $ \f -> do
|
||||||
|
rm (ncqGetFileName me (StateFile f))
|
||||||
|
|
||||||
|
|
||||||
|
where
|
||||||
|
|
||||||
|
-- TODO: created-but-not-indexed-file?
|
||||||
|
|
||||||
|
checkState NCQState{..} = flip runContT pure $ callCC \exit -> do
|
||||||
|
|
||||||
|
for_ ncqStateFiles $ \fk -> do
|
||||||
|
|
||||||
|
let dataFile = ncqGetFileName me (DataFile fk)
|
||||||
|
here <- doesFileExist dataFile
|
||||||
|
|
||||||
|
unless here $ exit False
|
||||||
|
|
||||||
|
lift (try @_ @SomeException (ncqFileFastCheck dataFile)) >>= \case
|
||||||
|
Left e -> err (viaShow e) >> exit False
|
||||||
|
Right () -> none
|
||||||
|
|
||||||
|
pure True
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
{-# Language OverloadedRecordDot #-}
|
||||||
module HBS2.Storage.NCQ3.Internal.Files where
|
module HBS2.Storage.NCQ3.Internal.Files where
|
||||||
|
|
||||||
import HBS2.Storage.NCQ3.Internal.Prelude
|
import HBS2.Storage.NCQ3.Internal.Prelude
|
||||||
|
@ -6,8 +7,9 @@ import HBS2.Storage.NCQ3.Internal.Types
|
||||||
import System.Posix.Files qualified as PFS
|
import System.Posix.Files qualified as PFS
|
||||||
import Data.List qualified as List
|
import Data.List qualified as List
|
||||||
|
|
||||||
ncqGetFileName :: NCQStorage3 -> FilePath -> FilePath
|
|
||||||
ncqGetFileName ncq fp = ncqGetWorkDir ncq </> takeFileName fp
|
ncqGetFileName :: forall f . ToFileName f => NCQStorage3 -> f -> FilePath
|
||||||
|
ncqGetFileName ncq fp = ncqGetWorkDir ncq </> takeFileName (toFileName fp)
|
||||||
|
|
||||||
ncqGetWorkDir :: NCQStorage3 -> FilePath
|
ncqGetWorkDir :: NCQStorage3 -> FilePath
|
||||||
ncqGetWorkDir NCQStorage3{..} = ncqRoot </> show ncqGen
|
ncqGetWorkDir NCQStorage3{..} = ncqRoot </> show ncqGen
|
||||||
|
@ -20,10 +22,11 @@ ncqGetNewFileKey :: forall f m . (ToFileName f, MonadIO m)
|
||||||
-> ( FileKey -> f )
|
-> ( FileKey -> f )
|
||||||
-> m FileKey
|
-> m FileKey
|
||||||
ncqGetNewFileKey me@NCQStorage3{..} fnameOf = fix \next -> do
|
ncqGetNewFileKey me@NCQStorage3{..} fnameOf = fix \next -> do
|
||||||
n <- atomically $ stateTVar ncqStateFileSeq (\x -> (x, succ x))
|
n <- atomically $ stateTVar ncqState (\e -> (e.ncqStateFileSeq , succSeq e))
|
||||||
here <- doesFileExist (ncqGetFileName me (toFileName $ fnameOf n))
|
here <- doesFileExist (ncqGetFileName me (fnameOf n))
|
||||||
if here then next else pure n
|
if here then next else pure n
|
||||||
|
where
|
||||||
|
succSeq e = e { ncqStateFileSeq = succ e.ncqStateFileSeq }
|
||||||
|
|
||||||
ncqListFilesBy :: forall m . MonadUnliftIO m => NCQStorage3 -> (FilePath -> Bool) -> m [(POSIXTime, FileKey)]
|
ncqListFilesBy :: forall m . MonadUnliftIO m => NCQStorage3 -> (FilePath -> Bool) -> m [(POSIXTime, FileKey)]
|
||||||
ncqListFilesBy me@NCQStorage3{..} filt = do
|
ncqListFilesBy me@NCQStorage3{..} filt = do
|
||||||
|
|
|
@ -46,7 +46,7 @@ ncqIndexFile n@NCQStorage3{..} fk = runMaybeT do
|
||||||
let fp = toFileName fk & ncqGetFileName n
|
let fp = toFileName fk & ncqGetFileName n
|
||||||
fki <- ncqGetNewFileKey n IndexFile
|
fki <- ncqGetNewFileKey n IndexFile
|
||||||
|
|
||||||
let dest = ncqGetFileName n (toFileName (IndexFile fki))
|
let dest = ncqGetFileName n (IndexFile fki)
|
||||||
|
|
||||||
debug $ "INDEX" <+> pretty fp <+> pretty dest
|
debug $ "INDEX" <+> pretty fp <+> pretty dest
|
||||||
|
|
||||||
|
@ -81,6 +81,7 @@ ncqIndexFile n@NCQStorage3{..} fk = runMaybeT do
|
||||||
ncqStateAddIndexFile ts fki
|
ncqStateAddIndexFile ts fki
|
||||||
ncqStateAddDataFile (coerce fk)
|
ncqStateAddDataFile (coerce fk)
|
||||||
ncqStateAddFact (FI fk (IndexFile fki))
|
ncqStateAddFact (FI fk (IndexFile fki))
|
||||||
|
ncqStateDelFact (P (PData fk 0))
|
||||||
|
|
||||||
(bs,nw) <- toMPlus midx
|
(bs,nw) <- toMPlus midx
|
||||||
|
|
||||||
|
|
|
@ -38,7 +38,7 @@ ncqGetCachedData ncq@NCQStorage3{..} =
|
||||||
cacheLookupOrInsert ncqMaxCachedData load ncqMMapCachedData
|
cacheLookupOrInsert ncqMaxCachedData load ncqMMapCachedData
|
||||||
where
|
where
|
||||||
load fk = do
|
load fk = do
|
||||||
let path = ncqGetFileName ncq (toFileName (DataFile fk))
|
let path = ncqGetFileName ncq (DataFile fk)
|
||||||
bs <- liftIO (mmapFileByteString path Nothing)
|
bs <- liftIO (mmapFileByteString path Nothing)
|
||||||
pure (CachedData bs)
|
pure (CachedData bs)
|
||||||
|
|
||||||
|
@ -47,7 +47,7 @@ ncqGetCachedIndex ncq@NCQStorage3{..} =
|
||||||
cacheLookupOrInsert ncqMaxCachedIndex load ncqMMapCachedIdx
|
cacheLookupOrInsert ncqMaxCachedIndex load ncqMMapCachedIdx
|
||||||
where
|
where
|
||||||
load fk = do
|
load fk = do
|
||||||
let path = ncqGetFileName ncq (toFileName (IndexFile fk))
|
let path = ncqGetFileName ncq (IndexFile fk)
|
||||||
nwayHashMMapReadOnly path >>= \case
|
nwayHashMMapReadOnly path >>= \case
|
||||||
Nothing -> throwIO $ NCQStorageCantMapFile path
|
Nothing -> throwIO $ NCQStorageCantMapFile path
|
||||||
Just (bs, nway) -> pure (CachedIndex bs nway)
|
Just (bs, nway) -> pure (CachedIndex bs nway)
|
||||||
|
|
|
@ -7,6 +7,7 @@ import HBS2.Storage.NCQ3.Internal.Types
|
||||||
import HBS2.Storage.NCQ3.Internal.Files
|
import HBS2.Storage.NCQ3.Internal.Files
|
||||||
import HBS2.Storage.NCQ3.Internal.Memtable
|
import HBS2.Storage.NCQ3.Internal.Memtable
|
||||||
import HBS2.Storage.NCQ3.Internal.Index
|
import HBS2.Storage.NCQ3.Internal.Index
|
||||||
|
import HBS2.Storage.NCQ3.Internal.State
|
||||||
import HBS2.Storage.NCQ3.Internal.MMapCache
|
import HBS2.Storage.NCQ3.Internal.MMapCache
|
||||||
|
|
||||||
|
|
||||||
|
@ -74,9 +75,9 @@ ncqStorageRun3 ncq@NCQStorage3{..} = flip runContT pure do
|
||||||
Nothing -> none
|
Nothing -> none
|
||||||
Just e -> answer (Just (InMemory (ncqEntryData e))) >> next
|
Just e -> answer (Just (InMemory (ncqEntryData e))) >> next
|
||||||
|
|
||||||
tracked <- readTVarIO ncqStateIndex
|
NCQState{..} <- readTVarIO ncqState
|
||||||
|
|
||||||
for_ tracked $ \(_, fk) -> do
|
for_ ncqStateIndex $ \(_, fk) -> do
|
||||||
CachedIndex bs nw <- ncqGetCachedIndex ncq fk
|
CachedIndex bs nw <- ncqGetCachedIndex ncq fk
|
||||||
ncqLookupIndex h (bs, nw) >>= \case
|
ncqLookupIndex h (bs, nw) >>= \case
|
||||||
Just (IndexEntry fk o s) -> answer (Just (InFossil fk o s)) >> next
|
Just (IndexEntry fk o s) -> answer (Just (InFossil fk o s)) >> next
|
||||||
|
@ -112,6 +113,10 @@ ncqStorageRun3 ncq@NCQStorage3{..} = flip runContT pure do
|
||||||
pure w
|
pure w
|
||||||
else do
|
else do
|
||||||
appendTailSection fh >> liftIO (fileSynchronise fh)
|
appendTailSection fh >> liftIO (fileSynchronise fh)
|
||||||
|
ss <- liftIO (PFS.getFdStatus fh) <&> fromIntegral . PFS.fileSize
|
||||||
|
ncqStateUpdate ncq do
|
||||||
|
ncqStateAddFact (P (PData (DataFile fk) ss))
|
||||||
|
|
||||||
atomically do
|
atomically do
|
||||||
writeTVar ncqSyncReq False
|
writeTVar ncqSyncReq False
|
||||||
modifyTVar ncqSyncNo succ
|
modifyTVar ncqSyncNo succ
|
||||||
|
@ -173,7 +178,7 @@ ncqStorageRun3 ncq@NCQStorage3{..} = flip runContT pure do
|
||||||
openNewDataFile :: forall mx . MonadIO mx => mx (FileKey, Fd)
|
openNewDataFile :: forall mx . MonadIO mx => mx (FileKey, Fd)
|
||||||
openNewDataFile = do
|
openNewDataFile = do
|
||||||
fk <- ncqGetNewFileKey ncq DataFile
|
fk <- ncqGetNewFileKey ncq DataFile
|
||||||
let fname = ncqGetFileName ncq (toFileName (DataFile fk))
|
let fname = ncqGetFileName ncq (DataFile fk)
|
||||||
touch fname
|
touch fname
|
||||||
let flags = defaultFileFlags { exclusive = False, creat = Just 0o666 }
|
let flags = defaultFileFlags { exclusive = False, creat = Just 0o666 }
|
||||||
(fk,) <$> liftIO (PosixBase.openFd fname Posix.ReadWrite flags)
|
(fk,) <$> liftIO (PosixBase.openFd fname Posix.ReadWrite flags)
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
{-# Language ViewPatterns #-}
|
||||||
module HBS2.Storage.NCQ3.Internal.State where
|
module HBS2.Storage.NCQ3.Internal.State where
|
||||||
|
|
||||||
import HBS2.Storage.NCQ3.Internal.Prelude
|
import HBS2.Storage.NCQ3.Internal.Prelude
|
||||||
|
@ -6,8 +7,10 @@ import HBS2.Storage.NCQ3.Internal.Files
|
||||||
|
|
||||||
import Data.Config.Suckless.Script
|
import Data.Config.Suckless.Script
|
||||||
|
|
||||||
|
import Data.Generics.Product
|
||||||
import Data.List qualified as List
|
import Data.List qualified as List
|
||||||
import Control.Monad.Reader
|
import Control.Monad.Reader
|
||||||
|
import Control.Monad.Trans.Maybe
|
||||||
import Control.Monad.Trans.Cont
|
import Control.Monad.Trans.Cont
|
||||||
import Data.HashSet qualified as HS
|
import Data.HashSet qualified as HS
|
||||||
import Data.Set qualified as Set
|
import Data.Set qualified as Set
|
||||||
|
@ -16,6 +19,8 @@ import UnliftIO.IO.File
|
||||||
import Network.ByteOrder qualified as N
|
import Network.ByteOrder qualified as N
|
||||||
import UnliftIO.IO
|
import UnliftIO.IO
|
||||||
import System.IO qualified as IO
|
import System.IO qualified as IO
|
||||||
|
import Lens.Micro.Platform
|
||||||
|
import Streaming.Prelude qualified as S
|
||||||
|
|
||||||
newtype StateOP a =
|
newtype StateOP a =
|
||||||
StateOP { fromStateOp :: ReaderT NCQStorage3 STM a }
|
StateOP { fromStateOp :: ReaderT NCQStorage3 STM a }
|
||||||
|
@ -28,38 +33,34 @@ ncqStateUpdate :: MonadIO m
|
||||||
-> StateOP a
|
-> StateOP a
|
||||||
-> m ()
|
-> m ()
|
||||||
ncqStateUpdate ncq@NCQStorage3{..} action = do
|
ncqStateUpdate ncq@NCQStorage3{..} action = do
|
||||||
snkFile <- ncqGetNewFileKey ncq StateFile <&> ncqGetFileName ncq . toFileName . StateFile
|
s0 <- readTVarIO ncqState
|
||||||
(n,i,f,facts) <- atomically do
|
|
||||||
runReaderT (fromStateOp action) ncq
|
|
||||||
n <- readTVar ncqStateFileSeq
|
|
||||||
i <- readTVar ncqStateIndex
|
|
||||||
f <- readTVar ncqStateFiles
|
|
||||||
fa <- readTVar ncqStateFacts
|
|
||||||
pure (n,i,f,fa)
|
|
||||||
|
|
||||||
liftIO $ withBinaryFileDurableAtomic snkFile WriteMode $ \fh -> do
|
s1 <- atomically do
|
||||||
for_ i $ \(Down p, fk) -> do
|
void $ runReaderT (fromStateOp action) ncq
|
||||||
IO.hPrint fh $ "i" <+> pretty fk <+> pretty (round @_ @Word64 p)
|
readTVar ncqState
|
||||||
|
|
||||||
for_ f $ \fk -> do
|
unless (s1 == s0) do
|
||||||
IO.hPrint fh $ "f" <+> pretty fk
|
snkFile <- ncqGetNewFileKey ncq StateFile <&> ncqGetFileName ncq . StateFile
|
||||||
|
liftIO $ withBinaryFileDurableAtomic snkFile WriteMode $ \fh -> do
|
||||||
for_ facts $ \(FI (DataFile a) (IndexFile b)) -> do
|
IO.hPrint fh (pretty s1)
|
||||||
IO.hPrint fh $ "fi" <+> pretty a <+> pretty b
|
|
||||||
|
|
||||||
IO.hPrint fh $ "n" <+> pretty n
|
|
||||||
|
|
||||||
ncqStateAddDataFile :: FileKey -> StateOP ()
|
ncqStateAddDataFile :: FileKey -> StateOP ()
|
||||||
ncqStateAddDataFile fk = do
|
ncqStateAddDataFile fk = do
|
||||||
NCQStorage3{..} <- ask
|
NCQStorage3{..} <- ask
|
||||||
StateOP $ lift do
|
StateOP $ lift do
|
||||||
modifyTVar ncqStateFiles (HS.insert fk)
|
modifyTVar ncqState (over (field @"ncqStateFiles") (HS.insert fk))
|
||||||
|
|
||||||
ncqStateAddFact :: Fact -> StateOP ()
|
ncqStateAddFact :: Fact -> StateOP ()
|
||||||
ncqStateAddFact fact = do
|
ncqStateAddFact fact = do
|
||||||
NCQStorage3{..} <- ask
|
NCQStorage3{..} <- ask
|
||||||
StateOP $ lift do
|
StateOP $ lift do
|
||||||
modifyTVar ncqStateFacts (Set.insert fact)
|
modifyTVar ncqState (over (field @"ncqStateFacts") (Set.insert fact))
|
||||||
|
|
||||||
|
ncqStateDelFact :: Fact -> StateOP ()
|
||||||
|
ncqStateDelFact fact = do
|
||||||
|
NCQStorage3{..} <- ask
|
||||||
|
StateOP $ lift do
|
||||||
|
modifyTVar ncqState (over (field @"ncqStateFacts") (Set.delete fact))
|
||||||
|
|
||||||
ncqStateAddIndexFile :: POSIXTime
|
ncqStateAddIndexFile :: POSIXTime
|
||||||
-> FileKey
|
-> FileKey
|
||||||
|
@ -67,10 +68,10 @@ ncqStateAddIndexFile :: POSIXTime
|
||||||
|
|
||||||
ncqStateAddIndexFile ts fk = do
|
ncqStateAddIndexFile ts fk = do
|
||||||
NCQStorage3{..} <- ask
|
NCQStorage3{..} <- ask
|
||||||
StateOP $ lift do
|
StateOP $ lift $ modifyTVar' ncqState sortIndexes
|
||||||
modifyTVar' ncqStateIndex $ \xs ->
|
|
||||||
List.sortOn fst ((Down ts, fk) : xs)
|
|
||||||
|
|
||||||
|
sortIndexes :: NCQState -> NCQState
|
||||||
|
sortIndexes = over (field @"ncqStateIndex") (List.sortOn fst)
|
||||||
|
|
||||||
ncqFileFastCheck :: MonadUnliftIO m => FilePath -> m ()
|
ncqFileFastCheck :: MonadUnliftIO m => FilePath -> m ()
|
||||||
ncqFileFastCheck fp = do
|
ncqFileFastCheck fp = do
|
||||||
|
@ -85,73 +86,38 @@ ncqFileFastCheck fp = do
|
||||||
throwIO $ NCQFsckIssueExt (FsckInvalidFileSize (fromIntegral s))
|
throwIO $ NCQFsckIssueExt (FsckInvalidFileSize (fromIntegral s))
|
||||||
|
|
||||||
|
|
||||||
ncqTryLoadState :: forall m. MonadUnliftIO m
|
readStateMay :: forall m . MonadUnliftIO m
|
||||||
=> NCQStorage3
|
=> NCQStorage3
|
||||||
-> m ()
|
-> FileKey
|
||||||
|
-> m (Maybe NCQState)
|
||||||
|
readStateMay sto key = fmap sortIndexes <$> do
|
||||||
|
s <- liftIO (readFile (ncqGetFileName sto (StateFile key)))
|
||||||
|
runMaybeT do
|
||||||
|
sexps <- parseTop s & toMPlus
|
||||||
|
|
||||||
ncqTryLoadState me@NCQStorage3{..} = do
|
flip fix (ncqState0, sexps) $ \next -> \case
|
||||||
|
(acc, []) -> pure acc
|
||||||
|
(acc, e : ss) -> liftIO (print (pretty e)) >> next (acc <> entryOf e, ss)
|
||||||
|
|
||||||
stateFiles <- ncqListFilesBy me ( List.isPrefixOf "s-" )
|
where
|
||||||
|
|
||||||
flip runContT pure $ callCC \exit -> do
|
entryOf = \case
|
||||||
|
ListVal [SymbolVal "i", LitIntVal n, LitIntVal ts] ->
|
||||||
|
ncqState0 { ncqStateIndex = [ (fromIntegral ts, fromIntegral n) ] }
|
||||||
|
|
||||||
for stateFiles $ \(_,fn) -> do
|
ListVal [SymbolVal "f", LitIntVal n] ->
|
||||||
none
|
ncqState0 { ncqStateFiles = HS.singleton (fromIntegral n) }
|
||||||
|
|
||||||
none
|
ListVal [SymbolVal "fi", LitIntVal a, LitIntVal b] ->
|
||||||
|
ncqState0 { ncqStateFacts = Set.singleton (FI (DataFile (fromIntegral a)) (IndexFile (fromIntegral b))) }
|
||||||
|
|
||||||
-- for_ stateFiles $ \(d,f) -> do
|
ListVal [SymbolVal "fp", LitIntVal a, LitIntVal s] ->
|
||||||
-- notice $ "state-file" <+> pretty (toFileName (StateFile f))
|
ncqState0 { ncqStateFacts = Set.singleton (P (PData (DataFile $ fromIntegral a) (fromIntegral s))) }
|
||||||
|
|
||||||
-- tryLoadState :: forall m. MonadUnliftIO m
|
ListVal [SymbolVal "n", LitIntVal a] ->
|
||||||
-- => NCQStorage3
|
ncqState0 { ncqStateFileSeq = fromIntegral a }
|
||||||
-- -> StateFile FileKey
|
|
||||||
-- -> m (Maybe (HashSet FileKey, [(Down POSIXTime, FileKey)], FileKey))
|
|
||||||
-- tryLoadState me@NCQStorage3{..} fk = do
|
|
||||||
-- debug $ "tryLoadState" <+> pretty fk
|
|
||||||
|
|
||||||
-- (fset, idxList, n) <- liftIO (readState fk)
|
_ -> ncqState0
|
||||||
|
|
||||||
-- let checkFile :: DataFile FileKey -> m Bool
|
|
||||||
-- checkFile fo = flip fix 0 \next (i :: Int) -> do
|
|
||||||
-- let dataFile = ncqGetFileName me (toFileName fo)
|
|
||||||
-- let indexFile = ncqGetFileName me (toFileName (IndexFile (coerce fo)))
|
|
||||||
|
|
||||||
-- doesFileExist dataFile >>= \case
|
|
||||||
-- False -> do
|
|
||||||
-- rm indexFile
|
|
||||||
-- pure False
|
|
||||||
|
|
||||||
-- True -> do
|
|
||||||
-- try @_ @SomeException (ncqFileFastCheck dataFile) >>= \case
|
|
||||||
-- Left e -> do
|
|
||||||
-- err (viaShow e)
|
|
||||||
-- stillThere <- doesFileExist dataFile
|
|
||||||
-- when stillThere do
|
|
||||||
-- let broken = dropExtension dataFile `addExtension` ".broken"
|
|
||||||
-- mv dataFile broken
|
|
||||||
-- rm indexFile
|
|
||||||
-- warn $ red "renamed" <+> pretty dataFile <+> pretty broken
|
|
||||||
-- pure False
|
|
||||||
|
|
||||||
-- Right{} | i > 1 -> pure False
|
|
||||||
|
|
||||||
-- Right{} -> do
|
|
||||||
-- exists <- doesFileExist indexFile
|
|
||||||
-- if exists
|
|
||||||
-- then pure True
|
|
||||||
-- else do
|
|
||||||
-- debug $ "indexing" <+> pretty (toFileName fo)
|
|
||||||
-- _ <- ncqIndexFile me fo
|
|
||||||
-- debug $ "indexed" <+> pretty indexFile
|
|
||||||
-- next (i + 1)
|
|
||||||
|
|
||||||
-- results <- forM (HS.toList fset) (checkFile . DataFile)
|
|
||||||
|
|
||||||
-- pure $
|
|
||||||
-- if and results
|
|
||||||
-- then Just (fset, idxList, n)
|
|
||||||
-- else Nothing
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,9 +1,16 @@
|
||||||
|
{-# LANGUAGE OverloadedLabels #-}
|
||||||
|
{-# LANGUAGE UndecidableInstances #-}
|
||||||
module HBS2.Storage.NCQ3.Internal.Types where
|
module HBS2.Storage.NCQ3.Internal.Types where
|
||||||
|
|
||||||
import HBS2.Storage.NCQ3.Internal.Prelude
|
import HBS2.Storage.NCQ3.Internal.Prelude
|
||||||
|
|
||||||
|
import Data.Generics.Product
|
||||||
import Numeric (readHex)
|
import Numeric (readHex)
|
||||||
|
import Data.Set qualified as Set
|
||||||
|
import Data.HashSet qualified as HS
|
||||||
import Text.Printf
|
import Text.Printf
|
||||||
|
-- import Lens.Micro.Platform
|
||||||
|
|
||||||
|
|
||||||
data CachedData = CachedData !ByteString
|
data CachedData = CachedData !ByteString
|
||||||
data CachedIndex = CachedIndex !ByteString !NWayHash
|
data CachedIndex = CachedIndex !ByteString !NWayHash
|
||||||
|
@ -16,25 +23,13 @@ type Shard = TVar (HashMap HashRef NCQEntry)
|
||||||
type StateVersion = Word64
|
type StateVersion = Word64
|
||||||
|
|
||||||
newtype FileKey = FileKey Word32
|
newtype FileKey = FileKey Word32
|
||||||
deriving newtype (Eq,Ord,Show,Num,Enum,Pretty,Hashable)
|
deriving newtype (Eq,Ord,Show,Num,Enum,Real,Integral,Pretty,Hashable)
|
||||||
|
|
||||||
deriving stock instance Eq (DataFile FileKey)
|
deriving stock instance Eq (DataFile FileKey)
|
||||||
deriving stock instance Ord (DataFile FileKey)
|
deriving stock instance Ord (DataFile FileKey)
|
||||||
deriving stock instance Eq (IndexFile FileKey)
|
deriving stock instance Eq (IndexFile FileKey)
|
||||||
deriving stock instance Ord (IndexFile FileKey)
|
deriving stock instance Ord (IndexFile FileKey)
|
||||||
|
|
||||||
instance IsString FileKey where
|
|
||||||
fromString = FileKey . maybe maxBound fst . headMay . readHex . drop 1 . dropWhile (/= '-') . takeBaseName
|
|
||||||
|
|
||||||
instance ToFileName (DataFile FileKey) where
|
|
||||||
toFileName (DataFile fk) = ncqMakeFossilName fk
|
|
||||||
|
|
||||||
instance ToFileName (IndexFile FileKey) where
|
|
||||||
toFileName (IndexFile fk) = printf "i-%08x.cq" (coerce @_ @Word32 fk)
|
|
||||||
|
|
||||||
instance ToFileName (StateFile FileKey) where
|
|
||||||
toFileName (StateFile fk) = printf "s-%08x" (coerce @_ @Word32 fk)
|
|
||||||
|
|
||||||
data NCQEntry =
|
data NCQEntry =
|
||||||
NCQEntry
|
NCQEntry
|
||||||
{ ncqEntryData :: !ByteString
|
{ ncqEntryData :: !ByteString
|
||||||
|
@ -48,15 +43,30 @@ data Location =
|
||||||
InFossil {-# UNPACK #-} !FileKey !NCQOffset !NCQSize
|
InFossil {-# UNPACK #-} !FileKey !NCQOffset !NCQSize
|
||||||
| InMemory {-# UNPACK #-} !ByteString
|
| InMemory {-# UNPACK #-} !ByteString
|
||||||
|
|
||||||
instance Pretty Location where
|
|
||||||
pretty = \case
|
|
||||||
InFossil k o s -> parens $ "in-fossil" <+> pretty k <+> pretty o <+> pretty s
|
|
||||||
InMemory _ -> "in-memory"
|
|
||||||
|
|
||||||
data Fact =
|
data Fact =
|
||||||
FI (DataFile FileKey) (IndexFile FileKey) -- file X has index Y
|
FI (DataFile FileKey) (IndexFile FileKey) -- file X has index Y
|
||||||
|
| P PData -- pending, not indexed
|
||||||
deriving stock (Eq,Ord)
|
deriving stock (Eq,Ord)
|
||||||
|
|
||||||
|
data PData = PData (DataFile FileKey) Word64
|
||||||
|
|
||||||
|
instance Ord PData where
|
||||||
|
compare (PData a _) (PData b _) = compare a b
|
||||||
|
|
||||||
|
instance Eq PData where
|
||||||
|
(==) (PData a _) (PData b _) = a == b
|
||||||
|
|
||||||
|
data NCQState =
|
||||||
|
NCQState
|
||||||
|
{ ncqStateFiles :: HashSet FileKey
|
||||||
|
, ncqStateIndex :: [(Down POSIXTime, FileKey)] -- backward timestamp order
|
||||||
|
, ncqStateFileSeq :: FileKey
|
||||||
|
, ncqStateVersion :: StateVersion
|
||||||
|
, ncqStateFacts :: Set Fact
|
||||||
|
}
|
||||||
|
deriving stock (Eq,Generic)
|
||||||
|
|
||||||
data NCQStorage3 =
|
data NCQStorage3 =
|
||||||
NCQStorage3
|
NCQStorage3
|
||||||
{ ncqRoot :: FilePath
|
{ ncqRoot :: FilePath
|
||||||
|
@ -74,13 +84,8 @@ data NCQStorage3 =
|
||||||
, ncqIdleThrsh :: Double
|
, ncqIdleThrsh :: Double
|
||||||
, ncqMMapCachedIdx :: TVar (HashPSQ FileKey CachePrio CachedIndex)
|
, ncqMMapCachedIdx :: TVar (HashPSQ FileKey CachePrio CachedIndex)
|
||||||
, ncqMMapCachedData :: TVar (HashPSQ FileKey CachePrio CachedData)
|
, ncqMMapCachedData :: TVar (HashPSQ FileKey CachePrio CachedData)
|
||||||
, ncqStateFiles :: TVar (HashSet FileKey)
|
|
||||||
, ncqStateIndex :: TVar [(Down POSIXTime, FileKey)] -- backward timestamp orde
|
|
||||||
, ncqStateFileSeq :: TVar FileKey
|
|
||||||
, ncqStateVersion :: TVar StateVersion
|
|
||||||
, ncqStateUsage :: TVar (IntMap (Int, HashSet FileKey))
|
|
||||||
, ncqStateFacts :: TVar (Set Fact)
|
|
||||||
, ncqMemTable :: Vector Shard
|
, ncqMemTable :: Vector Shard
|
||||||
|
, ncqState :: TVar NCQState
|
||||||
, ncqWrites :: TVar Int
|
, ncqWrites :: TVar Int
|
||||||
, ncqWriteEMA :: TVar Double -- for writes-per-seconds
|
, ncqWriteEMA :: TVar Double -- for writes-per-seconds
|
||||||
, ncqWriteQ :: TVar (Seq HashRef)
|
, ncqWriteQ :: TVar (Seq HashRef)
|
||||||
|
@ -94,6 +99,82 @@ data NCQStorage3 =
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
instance Monoid FileKey where
|
||||||
|
mempty = FileKey 0
|
||||||
|
|
||||||
|
instance Semigroup FileKey where
|
||||||
|
(<>) (FileKey a) (FileKey b) = FileKey (max a b)
|
||||||
|
|
||||||
|
instance IsString FileKey where
|
||||||
|
fromString = FileKey . maybe maxBound fst . headMay . readHex . drop 1 . dropWhile (/= '-') . takeBaseName
|
||||||
|
|
||||||
|
instance ToFileName (DataFile FileKey) where
|
||||||
|
toFileName (DataFile fk) = ncqMakeFossilName fk
|
||||||
|
|
||||||
|
instance ToFileName (IndexFile FileKey) where
|
||||||
|
toFileName (IndexFile fk) = printf "i-%08x.cq" (coerce @_ @Word32 fk)
|
||||||
|
|
||||||
|
instance ToFileName (StateFile FileKey) where
|
||||||
|
toFileName (StateFile fk) = printf "s-%08x" (coerce @_ @Word32 fk)
|
||||||
|
|
||||||
|
|
||||||
|
instance Monoid NCQState where
|
||||||
|
mempty = ncqState0
|
||||||
|
|
||||||
|
instance Semigroup NCQState where
|
||||||
|
(<>) a b = NCQState files index seqq version facts
|
||||||
|
where
|
||||||
|
files = ncqStateFiles a <> ncqStateFiles b
|
||||||
|
index = ncqStateIndex a <> ncqStateIndex b
|
||||||
|
seqq = max (ncqStateFileSeq a) (ncqStateFileSeq b)
|
||||||
|
version = max (ncqStateVersion a) (ncqStateVersion b)
|
||||||
|
facts = ncqStateFacts a <> ncqStateFacts b
|
||||||
|
|
||||||
|
|
||||||
|
instance Pretty Location where
|
||||||
|
pretty = \case
|
||||||
|
InFossil k o s -> parens $ "in-fossil" <+> pretty k <+> pretty o <+> pretty s
|
||||||
|
InMemory _ -> "in-memory"
|
||||||
|
|
||||||
ncqMakeFossilName :: FileKey -> FilePath
|
ncqMakeFossilName :: FileKey -> FilePath
|
||||||
ncqMakeFossilName = printf "f-%08x.data" . coerce @_ @Word32
|
ncqMakeFossilName = printf "f-%08x.data" . coerce @_ @Word32
|
||||||
|
|
||||||
|
ncqState0 :: NCQState
|
||||||
|
ncqState0 = NCQState{..}
|
||||||
|
where
|
||||||
|
ncqStateFiles = mempty
|
||||||
|
ncqStateIndex = mempty
|
||||||
|
ncqStateVersion = 0
|
||||||
|
ncqStateFacts = mempty
|
||||||
|
ncqStateFileSeq = 0
|
||||||
|
|
||||||
|
|
||||||
|
instance Pretty NCQState where
|
||||||
|
pretty NCQState{..} = vcat
|
||||||
|
[ prettyIndex
|
||||||
|
, prettyFiles
|
||||||
|
, prettyFacts
|
||||||
|
, prettySeq
|
||||||
|
]
|
||||||
|
where
|
||||||
|
prettySeq = "n" <+> pretty ncqStateFileSeq
|
||||||
|
|
||||||
|
prettyIndex = vcat
|
||||||
|
[ "i" <+> pretty fk <+> pretty (round @_ @Word64 p)
|
||||||
|
| (Down p, fk) <- ncqStateIndex
|
||||||
|
]
|
||||||
|
|
||||||
|
prettyFiles = vcat
|
||||||
|
[ "f" <+> pretty fk
|
||||||
|
| fk <- HS.toList ncqStateFiles
|
||||||
|
]
|
||||||
|
|
||||||
|
prettyFacts = vcat
|
||||||
|
[ pf f
|
||||||
|
| f <- Set.toList ncqStateFacts
|
||||||
|
]
|
||||||
|
|
||||||
|
pf (FI (DataFile a) (IndexFile b)) = "fi" <+> pretty a <+> pretty b
|
||||||
|
pf (P (PData (DataFile a) s)) = "fp" <+> pretty a <+> pretty s
|
||||||
|
|
||||||
|
|
|
@ -14,6 +14,7 @@ import HBS2.Storage
|
||||||
import HBS2.Storage.Simple
|
import HBS2.Storage.Simple
|
||||||
import HBS2.Storage.Operations.ByteString
|
import HBS2.Storage.Operations.ByteString
|
||||||
import HBS2.Storage.NCQ3
|
import HBS2.Storage.NCQ3
|
||||||
|
import HBS2.Storage.NCQ3.Internal.Files
|
||||||
|
|
||||||
import HBS2.System.Logger.Simple.ANSI
|
import HBS2.System.Logger.Simple.ANSI
|
||||||
|
|
||||||
|
@ -28,6 +29,9 @@ import Data.Config.Suckless.System
|
||||||
|
|
||||||
import NCQTestCommon
|
import NCQTestCommon
|
||||||
|
|
||||||
|
import Data.ByteString qualified as BS
|
||||||
|
import Data.Ord
|
||||||
|
import Data.Set qualified as Set
|
||||||
import System.Random.MWC as MWC
|
import System.Random.MWC as MWC
|
||||||
import UnliftIO
|
import UnliftIO
|
||||||
|
|
||||||
|
@ -52,13 +56,24 @@ ncq3Tests = do
|
||||||
g <- liftIO MWC.createSystemRandom
|
g <- liftIO MWC.createSystemRandom
|
||||||
runTest $ \TestEnv{..} -> do
|
runTest $ \TestEnv{..} -> do
|
||||||
|
|
||||||
ncqWithStorage3 testEnvDir $ \sto -> do
|
pending <- ncqWithStorage3 testEnvDir $ \sto -> do
|
||||||
notice $ "write" <+> pretty num <+> "blocks"
|
notice $ "write" <+> pretty num <+> "blocks"
|
||||||
replicateM_ num do
|
replicateM_ num do
|
||||||
n <- liftIO $ uniformRM (1024, 256*1024) g
|
n <- liftIO $ uniformRM (1024, 256*1024) g
|
||||||
bs <- liftIO $ genRandomBS g n
|
bs <- liftIO $ genRandomBS g n
|
||||||
ncqPutBS sto (Just B) Nothing bs
|
ncqPutBS sto (Just B) Nothing bs
|
||||||
|
|
||||||
|
fa <- readTVarIO (ncqState sto) <&> ncqStateFacts
|
||||||
|
|
||||||
|
pure $ [ (ncqGetFileName sto (toFileName k),s) | P (PData k s) <- Set.toList fa ]
|
||||||
|
& maximumByMay (comparing snd)
|
||||||
|
|
||||||
|
for_ pending $ \(dataFile,_) -> do
|
||||||
|
n <- liftIO $ uniformRM (1, 16*1024) g
|
||||||
|
bss <- liftIO $ genRandomBS g n
|
||||||
|
notice $ "CORRUPTING PENDING FILE" <+> pretty n <+> pretty dataFile
|
||||||
|
liftIO $ BS.appendFile dataFile bss
|
||||||
|
|
||||||
notice $ "reopen"
|
notice $ "reopen"
|
||||||
ncqWithStorage3 testEnvDir $ \sto -> do
|
ncqWithStorage3 testEnvDir $ \sto -> do
|
||||||
pause @'Seconds 2
|
pause @'Seconds 2
|
||||||
|
|
Loading…
Reference in New Issue