wip, implementing new block structure

This commit is contained in:
Dmitry Zuykov 2025-05-14 11:24:06 +03:00
parent a97685a74d
commit 38821dd138
1 changed files with 32 additions and 65 deletions

View File

@ -3,8 +3,10 @@ module HBS2.Storage.NCQ where
import HBS2.Prelude.Plated import HBS2.Prelude.Plated
import HBS2.Hash import HBS2.Hash
import HBS2.OrDie
import HBS2.Data.Types.Refs import HBS2.Data.Types.Refs
import HBS2.Base58 import HBS2.Base58
import HBS2.Net.Auth.Credentials
import HBS2.Storage import HBS2.Storage
import HBS2.Misc.PrettyStuff import HBS2.Misc.PrettyStuff
import HBS2.System.Logger.Simple.ANSI import HBS2.System.Logger.Simple.ANSI
@ -53,7 +55,12 @@ import System.Posix.IO as PosixBase
import System.Posix.Types as Posix import System.Posix.Types as Posix
import System.Posix.IO.ByteString as Posix import System.Posix.IO.ByteString as Posix
import System.Posix.Unistd import System.Posix.Unistd
import System.Posix.Files (getFileStatus, modificationTimeHiRes, getFdStatus, FileStatus(..)) import System.Posix.Files ( getFileStatus
, modificationTimeHiRes
, getFdStatus
, FileStatus(..)
, setFileMode
)
import System.Posix.Files qualified as PFS import System.Posix.Files qualified as PFS
import System.IO.Error (catchIOError) import System.IO.Error (catchIOError)
import System.IO.MMap as MMap import System.IO.MMap as MMap
@ -73,6 +80,7 @@ type NCQPerks m = MonadIO m
data NCQStorageException = data NCQStorageException =
NCQStorageAlreadyExist String NCQStorageAlreadyExist String
| NCQStorageSeedMissed
deriving stock (Show,Typeable) deriving stock (Show,Typeable)
instance Exception NCQStorageException instance Exception NCQStorageException
@ -117,8 +125,7 @@ data NCQStorage =
, ncqMinLog :: Int , ncqMinLog :: Int
, ncqMaxLog :: Int , ncqMaxLog :: Int
, ncqMaxCached :: Int , ncqMaxCached :: Int
, ncqRefsMem :: TVar (HashMap HashRef HashRef) , ncqSalt :: HashRef
, ncqRefsDirty :: TVar Int
, ncqWriteQueue :: TVar (HashPSQ HashRef TimeSpec WQItem) , ncqWriteQueue :: TVar (HashPSQ HashRef TimeSpec WQItem)
, ncqWaitIndex :: TVar (HashPSQ HashRef TimeSpec (Word64,Word64)) , ncqWaitIndex :: TVar (HashPSQ HashRef TimeSpec (Word64,Word64))
, ncqIndexNow :: TVar Int , ncqIndexNow :: TVar Int
@ -305,12 +312,11 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
debug "RUNNING STORAGE!" debug "RUNNING STORAGE!"
refsWriter <- makeRefsWriter
reader <- makeReader reader <- makeReader
writer <- makeWriter indexQ writer <- makeWriter indexQ
indexer <- makeIndexer writer indexQ indexer <- makeIndexer writer indexQ
mapM_ waitCatch [writer,indexer,refsWriter] mapM_ waitCatch [writer,indexer]
-- mapM_ waitCatch [writer,indexer,refsWriter] -- ,indexer,refsWriter] -- mapM_ waitCatch [writer,indexer,refsWriter] -- ,indexer,refsWriter]
mapM_ cancel [reader] mapM_ cancel [reader]
@ -376,40 +382,6 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
link writer link writer
pure writer pure writer
makeRefsWriter = do
refsWriter <- ContT $ withAsync do
myFlushQ <- newTQueueIO
atomically $ modifyTVar ncqFlushNow (myFlushQ:)
untilStopped do
-- FIXME: timeout-hardcode
void $ race (pause @'Seconds 1) $ atomically do
q <- tryPeekTQueue myFlushQ
s <- readTVar ncqStopped
if not (isJust q || s) then
STM.retry
else do
STM.flushTQueue myFlushQ
dirty <- readTVarIO ncqRefsDirty
when (dirty > 0) do
refs <- readTVarIO ncqRefsMem <&> HM.toList
withBinaryFileDurableAtomic (ncqGetRefsDataFileName ncq) WriteMode $ \fh -> do
for_ refs $ \(k,v) -> do
let ks = coerce @_ @ByteString k
let vs = coerce @_ @ByteString v
let w = 4 + BS.length ks + BS.length vs -- always 4+64, but okay
liftIO do
BS.hPutStr fh (N.bytestring32 (fromIntegral w))
BS.hPutStr fh ks
BS.hPutStr fh vs
atomically $ writeTVar ncqRefsDirty 0
link refsWriter
pure refsWriter
makeIndexer w indexQ = do makeIndexer w indexQ = do
indexer <- ContT $ withAsync $ fix \next -> do indexer <- ContT $ withAsync $ fix \next -> do
@ -589,9 +561,6 @@ ncqStoragePut_ check ncq@NCQStorage{..} lbs = flip runContT pure $ callCC \exit
ncqStoragePut :: MonadUnliftIO m => NCQStorage -> LBS.ByteString -> m (Maybe HashRef) ncqStoragePut :: MonadUnliftIO m => NCQStorage -> LBS.ByteString -> m (Maybe HashRef)
ncqStoragePut = ncqStoragePut_ True ncqStoragePut = ncqStoragePut_ True
ncqStoragePutFaster :: MonadUnliftIO m => NCQStorage -> LBS.ByteString -> m (Maybe HashRef)
ncqStoragePutFaster = ncqStoragePut_ False
ncqLocatedSize :: Location -> Integer ncqLocatedSize :: Location -> Integer
ncqLocatedSize = \case ncqLocatedSize = \case
InWriteQueue WQItem{..} -> fromIntegral $ maybe 0 LBS.length wqData InWriteQueue WQItem{..} -> fromIntegral $ maybe 0 LBS.length wqData
@ -768,19 +737,13 @@ ncqStorageGet ncq@NCQStorage{..} h = do
_ -> pure Nothing _ -> pure Nothing
ncqStorageGetRef :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Maybe HashRef) ncqStorageGetRef :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Maybe HashRef)
ncqStorageGetRef NCQStorage{..} ref = readTVarIO ncqRefsMem <&> HM.lookup ref ncqStorageGetRef NCQStorage{..} ref = error "not implemented"
ncqStorageSetRef :: MonadUnliftIO m => NCQStorage -> HashRef -> HashRef -> m () ncqStorageSetRef :: MonadUnliftIO m => NCQStorage -> HashRef -> HashRef -> m ()
ncqStorageSetRef NCQStorage{..} ref val = atomically do ncqStorageSetRef NCQStorage{..} ref val = error "not implemented"
stopped <- readTVar ncqStopped
unless stopped do
modifyTVar ncqRefsMem (HM.insert ref val)
modifyTVar ncqRefsDirty succ
ncqStorageDelRef :: MonadUnliftIO m => NCQStorage -> HashRef -> m () ncqStorageDelRef :: MonadUnliftIO m => NCQStorage -> HashRef -> m ()
ncqStorageDelRef NCQStorage{..} ref = atomically do ncqStorageDelRef NCQStorage{..} ref = error "not implemented"
modifyTVar ncqRefsMem (HM.delete ref)
modifyTVar ncqRefsDirty succ
ncqStorageDel :: MonadUnliftIO m => NCQStorage -> HashRef -> m () ncqStorageDel :: MonadUnliftIO m => NCQStorage -> HashRef -> m ()
ncqStorageDel ncq@NCQStorage{..} h = flip runContT pure $ callCC \exit -> do ncqStorageDel ncq@NCQStorage{..} h = flip runContT pure $ callCC \exit -> do
@ -869,21 +832,11 @@ ncqStorageOpen fp' = do
ncqFixIndexes ncq ncqFixIndexes ncq
ncqLoadIndexes ncq ncqLoadIndexes ncq
readCurrent ncq readCurrent ncq
readRefs ncq
atomically $ putTMVar ncqOpenDone True atomically $ putTMVar ncqOpenDone True
pure ncq pure ncq
where where
readRefs ncq@NCQStorage{..} = do
mmaped <- liftIO $ mmapFileByteString (ncqGetRefsDataFileName ncq) Nothing
kvs <- S.toList_ do
scanBS mmaped $ \bs -> do
let k = BS.copy (BS.take 32 bs) & coerce @_ @HashRef
let v = BS.copy (BS.take 32 (BS.drop 32 bs)) & coerce @_ @HashRef
S.yield (k,v)
atomically $ writeTVar ncqRefsMem (HM.fromList kvs)
readCurrent ncq@NCQStorage{..} = do readCurrent ncq@NCQStorage{..} = do
let fn = ncqGetCurrentName ncq let fn = ncqGetCurrentName ncq
-- liftIO $ print $ pretty "FILE" <+> pretty fn -- liftIO $ print $ pretty "FILE" <+> pretty fn
@ -926,6 +879,8 @@ ncqStorageInit_ check path = do
mkdir (path </> show ncqGen) mkdir (path </> show ncqGen)
let seedPath = path </> ".seed"
unless here do unless here do
now <- liftIO $ getPOSIXTime <&> round @_ @Int now <- liftIO $ getPOSIXTime <&> round @_ @Int
@ -934,10 +889,18 @@ ncqStorageInit_ check path = do
liftIO $ appendFile (path </> "metadata") metas liftIO $ appendFile (path </> "metadata") metas
let ncqRoot = path cred0 <- newCredentials @HBS2Basic
cred <- addKeyPair Nothing cred0
let seed = show $ "# storage seed file" <+> pretty now <> line
<> "# NEVER EVER MODIFY OR REMOVE THIS FILE" <> line
<> "# or references may be lost and recovery will be prolematic" <> line
<> pretty (AsCredFile $ AsBase58 cred)
ncqRefsMem <- newTVarIO mempty liftIO do
ncqRefsDirty <- newTVarIO 0 Prelude.writeFile seedPath seed
PFS.setFileMode seedPath 0o0444
let ncqRoot = path
let ncqSyncSize = 64 * (1024 ^ 2) let ncqSyncSize = 64 * (1024 ^ 2)
let ncqMinLog = 2 * (1024 ^ 3) let ncqMinLog = 2 * (1024 ^ 3)
@ -945,6 +908,10 @@ ncqStorageInit_ check path = do
let ncqMaxCached = 64 let ncqMaxCached = 64
ncqSalt <- try @_ @IOException (liftIO $ BS.readFile seedPath)
>>= orThrow NCQStorageSeedMissed
<&> HashRef . hashObject
ncqWriteQueue <- newTVarIO HPSQ.empty ncqWriteQueue <- newTVarIO HPSQ.empty
ncqNotWritten <- newTVarIO 0 ncqNotWritten <- newTVarIO 0