mirror of https://github.com/voidlizard/hbs2
wip, storage audit log, disabled by default
This commit is contained in:
parent
dfead1d585
commit
65aa06bafc
|
@ -61,6 +61,9 @@ newtype IndexFile a = IndexFile a
|
|||
newtype StateFile a = StateFile a
|
||||
deriving newtype (IsString,Eq,Ord,Pretty)
|
||||
|
||||
data AuditFile = AuditFile
|
||||
deriving stock (Eq,Ord)
|
||||
|
||||
class ToFileName a where
|
||||
toFileName :: a -> FilePath
|
||||
|
||||
|
@ -85,6 +88,9 @@ instance ToFileName (IndexFile FilePath) where
|
|||
instance ToFileName (StateFile FileKey) where
|
||||
toFileName (StateFile fk) = toFileName fk
|
||||
|
||||
instance ToFileName AuditFile where
|
||||
toFileName = const "audit.log"
|
||||
|
||||
newtype FilePrio = FilePrio (Down TimeSpec)
|
||||
deriving newtype (Eq,Ord)
|
||||
deriving stock (Generic,Show)
|
||||
|
@ -197,6 +203,24 @@ ncqMakeSectionBS t h bs = do
|
|||
{-# INLINE ncqMakeSectionBS #-}
|
||||
|
||||
|
||||
ncqMakeAuditSectionBS :: HashRef
|
||||
-> ByteString
|
||||
-> NCQSectionType
|
||||
-> Maybe ByteString
|
||||
ncqMakeAuditSectionBS h bs = \case
|
||||
B -> Just $ section ncqBlockPrefix h ""
|
||||
T -> Just $ section ncqTombPrefix h ""
|
||||
R -> Just $ section ncqRefPrefix h (BS.take 32 bs)
|
||||
_ -> Nothing
|
||||
where
|
||||
section pref hash pl = do
|
||||
let slen = ncqKeyLen + fromIntegral (BS.length pref) + fromIntegral (BS.length bs)
|
||||
let ss = N.bytestring32 slen
|
||||
ss <> coerce h <> pref <> pl
|
||||
{-# INLINE section #-}
|
||||
|
||||
{-# INLINE ncqMakeAuditSectionBS #-}
|
||||
|
||||
data NCQFsckException =
|
||||
NCQFsckException | NCQFsckIssueExt NCQFsckIssueType
|
||||
deriving stock (Show,Typeable)
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
{-# Language RecordWildCards #-}
|
||||
{-# Language ViewPatterns #-}
|
||||
{-# Language MultiWayIf #-}
|
||||
module HBS2.Storage.NCQ3.Internal where
|
||||
|
||||
|
@ -12,6 +13,8 @@ import HBS2.Storage.NCQ3.Internal.Fossil
|
|||
import HBS2.Storage.NCQ3.Internal.Index
|
||||
import HBS2.Storage.NCQ3.Internal.MMapCache
|
||||
|
||||
import Data.Config.Suckless.Script
|
||||
|
||||
import Control.Monad.Trans.Cont
|
||||
import Data.HashPSQ qualified as HPSQ
|
||||
import Data.Vector qualified as V
|
||||
|
@ -24,11 +27,14 @@ import Data.ByteString.Lazy qualified as LBS
|
|||
import System.Posix.Files qualified as PFS
|
||||
import Control.Concurrent.STM.TSem
|
||||
import System.FileLock as FL
|
||||
import Lens.Micro.Platform
|
||||
|
||||
ncqStorageOpen :: MonadIO m => FilePath -> (NCQStorage -> NCQStorage) -> m NCQStorage
|
||||
ncqStorageOpen fp upd = do
|
||||
|
||||
let ncqRoot = fp
|
||||
let ncqGen = 0
|
||||
let ncqAuditEnabled = False
|
||||
-- let ncqFsync = 16 * megabytes
|
||||
let ncqFsync = 16 * megabytes
|
||||
let ncqWriteQLen = 1024 * 4
|
||||
|
@ -54,6 +60,7 @@ ncqStorageOpen fp upd = do
|
|||
let !ncqReadThreads = wopNum * 4
|
||||
|
||||
ncqWriteQ <- newTVarIO mempty
|
||||
ncqAuditQ <- newTQueueIO
|
||||
ncqMemTable <- V.fromList <$> replicateM shardNum (newTVarIO mempty)
|
||||
ncqMMapCachedIdx <- newTVarIO HPSQ.empty
|
||||
ncqMMapCachedData <- newTVarIO HPSQ.empty
|
||||
|
@ -80,7 +87,28 @@ ncqStorageOpen fp upd = do
|
|||
ncqCurrentFossils <- newTVarIO mempty
|
||||
ncqReplQueue <- newTVarIO mempty
|
||||
|
||||
let ncq = NCQStorage{..} & upd
|
||||
let ncq0 = NCQStorage{..} -- & upd
|
||||
|
||||
let confFile = fp </> "config"
|
||||
|
||||
touch confFile
|
||||
|
||||
conf <- liftIO (try @_ @SomeException (readFile confFile)) >>= \case
|
||||
Left e -> warn (viaShow e) >> pure mempty
|
||||
Right s -> either (\e -> warn (viaShow e) >> pure mempty) pure (parseTop s)
|
||||
|
||||
let auditFlag = \case
|
||||
ListVal [ SymbolVal "audit", StringLike "off" ] -> Just False
|
||||
ListVal [ SymbolVal "audit", StringLike "on" ] -> Just True
|
||||
ListVal [ SymbolVal "audit", LitBoolVal f ] -> Just f
|
||||
_ -> Nothing
|
||||
|
||||
let audit = lastDef ncqAuditEnabled [ f | (auditFlag -> Just f) <- conf ]
|
||||
let auditSet = set #ncqAuditEnabled audit
|
||||
|
||||
let applySettings x = foldl (flip ($)) x [auditSet]
|
||||
|
||||
let ncq = ncq0 & applySettings & upd
|
||||
|
||||
mkdir (ncqGetWorkDir ncq)
|
||||
|
||||
|
@ -201,6 +229,8 @@ ncqPutBS0 wait ncq@NCQStorage{..} mtp mhref bs' = ncqOperation ncq (pure $ fromM
|
|||
|
||||
when upd do
|
||||
modifyTVar ncqWriteQ (|> h)
|
||||
when ncqAuditEnabled do
|
||||
for_ (ncqMakeAuditSectionBS h bs' =<< mtp) $ \x -> writeTQueue ncqAuditQ x
|
||||
|
||||
putTMVar waiter h
|
||||
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
{-# Language ViewPatterns #-}
|
||||
{-# Language MultiWayIf #-}
|
||||
module HBS2.Storage.NCQ3.Internal.CLI where
|
||||
|
||||
import HBS2.Storage.NCQ3.Internal.Prelude
|
||||
|
@ -15,6 +16,7 @@ import HBS2.Net.Auth.Credentials
|
|||
|
||||
import Data.Config.Suckless.Script
|
||||
|
||||
import Control.Monad.Trans.Cont
|
||||
import Network.ByteOrder qualified as N
|
||||
import Data.Fixed
|
||||
import Data.Text.Encoding qualified as TE
|
||||
|
@ -24,6 +26,7 @@ import Data.ByteString (ByteString)
|
|||
import Data.ByteString qualified as BS
|
||||
import Data.ByteString.Lazy qualified as LBS
|
||||
import Data.HashMap.Strict qualified as HM
|
||||
import System.IO qualified as IO
|
||||
import Data.HashMap.Strict (HashMap)
|
||||
import System.Environment
|
||||
import UnliftIO
|
||||
|
@ -303,7 +306,7 @@ entries instances = do
|
|||
Just (bs,nw) -> do
|
||||
mval <- nwayHashLookup nw bs (coerce h)
|
||||
case mval of
|
||||
Nothing -> debug "fucking nothing!" >> pure ()
|
||||
Nothing -> pure ()
|
||||
Just entryBs -> do
|
||||
let IndexEntry fk off sz = unpackIndexEntry entryBs
|
||||
print $
|
||||
|
@ -475,6 +478,66 @@ entries instances = do
|
|||
e -> throwIO $ BadFormException (mkList e)
|
||||
|
||||
|
||||
entry $ bindMatch "ncq3:audit:scan" $ nil_ \case
|
||||
[ StringLike path ] -> flip runContT pure $ callCC \exit -> do
|
||||
sto <- ncqStorageOpen path id
|
||||
let fn = ncqGetFileName sto AuditFile
|
||||
here <- doesFileExist fn
|
||||
|
||||
liftIO $ print $ "WYF?" <+> pretty fn
|
||||
|
||||
let readField n bs = if BS.length bs < n then Left bs else Right (BS.take n bs, BS.drop n bs)
|
||||
|
||||
unless here do
|
||||
err $ pretty (toFileName AuditFile) <+> "not found"
|
||||
exit ()
|
||||
|
||||
mmaped <- liftIO $ mmapFileByteString fn Nothing
|
||||
|
||||
void $ flip runContT pure $ callCC \stop -> do
|
||||
flip fix mmaped $ \next bs -> do
|
||||
|
||||
(s, rest) <- case readField 4 bs of
|
||||
Left r -> stop (Left r)
|
||||
Right s -> pure s
|
||||
|
||||
(k, rest1) <- case readField ncqKeyLen rest of
|
||||
Left r -> stop (Left r)
|
||||
Right s -> pure s
|
||||
|
||||
(p, rest2) <- case readField ncqPrefixLen rest1 of
|
||||
Left r -> stop (Left r)
|
||||
Right s -> pure s
|
||||
|
||||
let t = if | p == ncqBlockPrefix -> Just B
|
||||
| p == ncqRefPrefix -> Just R
|
||||
| p == ncqTombPrefix -> Just T
|
||||
| otherwise -> Nothing
|
||||
|
||||
case t of
|
||||
Just B -> do
|
||||
liftIO $ IO.hPrint stdout ("B" <+> pretty (coerce @_ @HashRef k))
|
||||
next rest2
|
||||
|
||||
Just T -> do
|
||||
liftIO $ IO.hPrint stdout ("T" <+> pretty (coerce @_ @HashRef k))
|
||||
next rest2
|
||||
|
||||
Just R -> do
|
||||
|
||||
(v, rest3) <- case readField ncqKeyLen rest2 of
|
||||
Left r -> stop (Left r)
|
||||
Right s -> pure s
|
||||
|
||||
liftIO $ IO.hPrint stdout ("R" <+> pretty (coerce @_ @HashRef k) <+> pretty (coerce @_ @HashRef v))
|
||||
next rest3
|
||||
|
||||
_ -> do
|
||||
liftIO $ IO.hPrint stdout ("E" <+> "audit file damaged or incomplete")
|
||||
pure $ Left rest2
|
||||
|
||||
e -> throwIO $ BadFormException (mkList e)
|
||||
|
||||
|
||||
printDataEntry :: MonadUnliftIO m => NCQOffset -> NCQSize -> HashRef -> ByteString -> m ()
|
||||
printDataEntry offset size key val = do
|
||||
|
|
|
@ -4,12 +4,14 @@ module HBS2.Storage.NCQ3.Internal.Prelude
|
|||
, megabytes
|
||||
, gigabytes
|
||||
, ncqMakeSectionBS
|
||||
, ncqMakeAuditSectionBS
|
||||
, ncqSLen
|
||||
, ncqKeyLen
|
||||
, ncqPrefixLen
|
||||
, ncqRefPrefix
|
||||
, ncqBlockPrefix
|
||||
, ncqMetaPrefix
|
||||
, ncqTombPrefix
|
||||
, ncqIsMeta
|
||||
, ncqFullDataLen
|
||||
, ncqEntryPayloadSize
|
||||
|
@ -18,6 +20,7 @@ module HBS2.Storage.NCQ3.Internal.Prelude
|
|||
, IndexFile(..)
|
||||
, DataFile(..)
|
||||
, StateFile(..)
|
||||
, AuditFile(..)
|
||||
, FilePrio(..)
|
||||
, NCQStorageException(..)
|
||||
, NCQFsckException(..)
|
||||
|
|
|
@ -32,6 +32,8 @@ import System.Posix.IO.ByteString as Posix
|
|||
import System.Posix.Types as Posix
|
||||
import System.Posix.Unistd
|
||||
|
||||
import Data.ByteString qualified as BS
|
||||
|
||||
{- HLINT ignore "Eta reduce" -}
|
||||
|
||||
ncqStorageStop :: forall m . MonadUnliftIO m => NCQStorage -> m ()
|
||||
|
@ -109,7 +111,11 @@ ncqTryLoadState me@NCQStorage{..} = do
|
|||
|
||||
let best = if i < 1 then max s o else s
|
||||
|
||||
warn $ red "trim" <+> pretty s <+> pretty best <+> red (pretty (fromIntegral best - realSize)) <+> pretty (takeFileName path)
|
||||
warn $ red "trim"
|
||||
<+> pretty s
|
||||
<+> pretty best
|
||||
<+> red (pretty (fromIntegral best - realSize))
|
||||
<+> pretty (takeFileName path)
|
||||
|
||||
liftIO $ PFS.setFileSize path (fromIntegral best)
|
||||
|
||||
|
@ -235,6 +241,30 @@ ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do
|
|||
|
||||
spawnActivity (ncqStateUpdateLoop ncq)
|
||||
|
||||
spawnActivity $ flip runContT pure $ callCC \exit -> do
|
||||
|
||||
unless ncqAuditEnabled $ exit ()
|
||||
|
||||
let auditName = ncqGetFileName ncq AuditFile
|
||||
touch auditName
|
||||
let flags = defaultFileFlags { exclusive = False, append = True }
|
||||
fd <- liftIO (PosixBase.openFd auditName Posix.WriteOnly flags)
|
||||
|
||||
void $ ContT $ bracket (pure fd) $ \h -> liftIO do
|
||||
bss <- atomically $ STM.flushTQueue ncqAuditQ
|
||||
void $ Posix.fdWrite h (mconcat bss)
|
||||
closeFd h
|
||||
|
||||
forever do
|
||||
flip fix mempty $ \next bss -> do
|
||||
-- if BS.length bss >= 4096 then do
|
||||
if True then do
|
||||
liftIO (Posix.fdWrite fd bss >> fileSynchronisePortable fd)
|
||||
next mempty
|
||||
else do
|
||||
s <- atomically (readTQueue ncqAuditQ)
|
||||
next (bss <> s)
|
||||
|
||||
spawnActivity $ forever do
|
||||
pause @'Seconds 30
|
||||
ema <- readTVarIO ncqWriteEMA
|
||||
|
@ -406,7 +436,7 @@ ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do
|
|||
let fname = ncqGetFileName ncq (DataFile fk)
|
||||
-- touch fname
|
||||
let flags = defaultFileFlags { exclusive = False, creat = Just 0o666 }
|
||||
(fk,) <$> liftIO (PosixBase.openFd fname Posix.ReadWrite flags)
|
||||
(fk,) <$> liftIO (PosixBase.openFd fname Posix.WriteOnly flags)
|
||||
|
||||
spawnActivity m = do
|
||||
a <- ContT $ withAsync m
|
||||
|
|
|
@ -85,6 +85,7 @@ data NCQState =
|
|||
data NCQStorage =
|
||||
NCQStorage
|
||||
{ ncqRoot :: FilePath
|
||||
, ncqAuditEnabled :: Bool
|
||||
, ncqGen :: Int
|
||||
, ncqSalt :: HashRef
|
||||
, ncqPostponeService :: Timeout 'Seconds
|
||||
|
@ -112,6 +113,7 @@ data NCQStorage =
|
|||
, ncqWrites :: TVar Int
|
||||
, ncqWriteEMA :: TVar Double -- for writes-per-seconds
|
||||
, ncqWriteQ :: TVar (Seq HashRef)
|
||||
, ncqAuditQ :: TQueue ByteString
|
||||
, ncqWriteOps :: Vector (TQueue (IO ()))
|
||||
, ncqSyncOps :: TQueue (IO ())
|
||||
, ncqReadReq :: TQueue (HashRef, TMVar (Maybe Location))
|
||||
|
|
Loading…
Reference in New Issue