This commit is contained in:
Dmitry Zuikov 2024-06-02 08:01:55 +03:00
parent 065554ad04
commit 7660fff728
2 changed files with 165 additions and 45 deletions

View File

@ -88,6 +88,7 @@ library
, temporary
, filepattern
, unliftio
, unix
, vector

View File

@ -1,7 +1,12 @@
{-# LANGUAGE PatternSynonyms #-}
{-# Language ViewPatterns #-}
{-# Language UndecidableInstances #-}
module HBS2.Storage.Compact where
import HBS2.Clock
import HBS2.Hash
import HBS2.Storage
import Data.Word
import Data.ByteString (ByteString)
@ -32,6 +37,10 @@ import Control.Concurrent.STM.TSem
import Safe
import UnliftIO
-- import System.Posix.IO
-- import System.Posix.Fcntl
import System.Posix.Types
import Debug.Trace
-- compact storage
@ -105,12 +114,16 @@ isFresh e = case e of
Del{} -> True
_ -> False
data CompactStorage =
type Bucket = TVar (HashMap ByteString Entry)
data CompactStorage k =
CompactStorage
{ csHandle :: MVar Handle
, csHeaderOff :: IORef EntryOffset
, csSeq :: TVar Integer
, csKeys :: Vector (TVar (HashMap ByteString Entry))
{ csBuckets :: Int
, csHandle :: MVar Handle
, csHeaderOff :: TVar EntryOffset
, csSeq :: TVar Integer
, csKeys :: Vector Bucket
, csUncommitted :: TVar Integer
}
type ForCompactStorage m = MonadIO m
@ -125,37 +138,53 @@ data CompactStorageOpenError =
instance Exception CompactStorageOpenError
buckets :: Int
buckets = 8
getBucket :: CompactStorage k -> ByteString -> Bucket
getBucket sto bs = do
let i = maybe 0 (fromIntegral.fst) (BS.uncons bs) `mod` csBuckets sto
csKeys sto ! i
{-# INLINE getBucket #-}
-- FIXME: buckets-hardcode
getKeyPrefix :: ByteString -> Int
getKeyPrefix bs = maybe 0 (fromIntegral.fst) (BS.uncons bs) `mod` buckets
{-# INLINE getKeyPrefix #-}
-- openFileForReadWrite :: FilePath -> IO (Fd, Fd)
-- openFileForReadWrite fp = do
-- fdW <- openFd fp ReadWrite (Just ownerModes) defaultFileFlags { nonBlock = True }
-- fdR <- openFd fp ReadOnly Nothing defaultFileFlags { nonBlock = True }
-- return (fdW, fdR)
-- -- Преобразование Fd в Handle
-- fdToHandleReadWrite :: (Fd, Fd) -> IO (Handle, Handle)
-- fdToHandleReadWrite (fdW, fdR) = do
-- haW <- fdToHandle fdW
-- haR <- fdToHandle fdR
-- return (haW, haR)
compactStorageOpen :: ForCompactStorage m
=> [CompactStorageOpenOpt]
-> FilePath
-> m CompactStorage
-> m (CompactStorage k)
compactStorageOpen _ fp = do
let buck = 8
ha <- openFile fp ReadWriteMode
sz <- hFileSize ha
mha <- newMVar ha
hoff0 <- newIORef 0
hoff0 <- newTVarIO 0
keys0 <- replicateM buckets (newTVarIO mempty) <&> V.fromList
keys0 <- replicateM buck (newTVarIO mempty) <&> V.fromList
uncommitted <- newTVarIO 0
-- ss <- newIORef 0
ss <- newTVarIO 0
if sz == 0 then
pure $ CompactStorage mha hoff0 ss keys0
pure $ CompactStorage buck mha hoff0 ss keys0 uncommitted
else do
(p,header) <- readHeader mha Nothing >>= maybe (throwIO InvalidHeader) pure
hoff <- newIORef p
let sto = CompactStorage mha hoff ss keys0
hoff <- newTVarIO p
let sto = CompactStorage buck mha hoff ss keys0 uncommitted
readIndex sto (hdrIndexOffset header) (hdrIndexEntries header)
flip fix (hdrPrev header) $ \next -> \case
@ -169,7 +198,7 @@ compactStorageOpen _ fp = do
readIndex :: ForCompactStorage m
=> CompactStorage
=> CompactStorage k
-> EntryOffset
-> EntryNum
-> m ()
@ -182,15 +211,17 @@ readIndex sto offset num = liftIO do
(n,acc,rn) -> do
what <- runMaybeT do
slen <- liftIO (try @_ @IOException (LBS.hGet ha 2))
slen <- liftIO (try @_ @IOException (BS.hGet ha 2))
<&> either (const Nothing) Just
& MaybeT
<&> LBS.fromStrict
len <- either (const Nothing) (Just . view _3) (runGetOrFail getWord16be slen)
& MaybeT . pure
sIdx <- liftIO (try @_ @IOException (LBS.hGet ha (fromIntegral len)))
sIdx <- liftIO (try @_ @IOException (BS.hGet ha (fromIntegral len)))
>>= either (const mzero) pure
<&> LBS.fromStrict
deserialiseOrFail @IndexEntry sIdx
& either (const mzero) pure
@ -207,10 +238,10 @@ readIndex sto offset num = liftIO do
-- so we keep only the newer values in map
atomically do
for_ new $ \(k,v) -> do
let tv = csKeys sto ! getKeyPrefix k
let tv = getBucket sto k
modifyTVar tv (HM.insertWith (\_ o -> o) k v)
compactStorageCommit :: ForCompactStorage m => CompactStorage -> m ()
compactStorageCommit :: ForCompactStorage m => CompactStorage k -> m ()
compactStorageCommit sto = liftIO do
withMVar (csHandle sto) $ \ha -> do
hSeek ha SeekFromEnd 0
@ -224,7 +255,7 @@ compactStorageCommit sto = liftIO do
-- write fwd
offFwd <- hTell ha
LBS.hPut ha (toLazyByteString $ word64BE 0)
BS.hPut ha (LBS.toStrict $ toLazyByteString $ word64BE 0)
let off0 = offFwd + 8
@ -250,11 +281,11 @@ compactStorageCommit sto = liftIO do
-- write index
for_ idxEntries $ \(e,_) -> do
let lbs = serialise e
LBS.hPut ha (B.toLazyByteString $
BS.hPut ha $ LBS.toStrict (B.toLazyByteString $
word16BE (fromIntegral $ LBS.length lbs)
<> B.lazyByteString lbs)
offPrev <- readIORef (csHeaderOff sto)
offPrev <- readTVarIO (csHeaderOff sto)
offCommitHead <- hTell ha
@ -263,7 +294,7 @@ compactStorageCommit sto = liftIO do
hSeek ha AbsoluteSeek offFwd
LBS.hPut ha (toLazyByteString $ word64BE (fromIntegral offCommitHead))
BS.hPut ha (LBS.toStrict $ toLazyByteString $ word64BE (fromIntegral offCommitHead))
hFlush ha
@ -271,14 +302,13 @@ compactStorageCommit sto = liftIO do
offLast <- hTell ha <&> fromIntegral
-- atomically do
atomicWriteIORef (csHeaderOff sto) (offLast - headerSize 1)
atomically do
writeTVar (csHeaderOff sto) (offLast - headerSize 1)
for_ idxEntries $ \(e,i) -> do
let k = idxEntryKey e
let tv = csKeys sto ! getKeyPrefix k
let tv = getBucket sto k
modifyTVar tv (HM.alter (doAlter (Entry i (Off e))) k)
resetUncommitedSTM sto
where
@ -294,10 +324,10 @@ compactStorageCommit sto = liftIO do
getSeq = \case
Entry i _ -> i
compactStorageDel :: ForCompactStorage m => CompactStorage -> ByteString -> m ()
compactStorageDel :: ForCompactStorage m => CompactStorage k -> ByteString -> m ()
compactStorageDel sto key = do
let tvar = csKeys sto ! getKeyPrefix key
let tvar = getBucket sto key
val <- readTVarIO tvar <&> HM.lookup key
case val of
@ -308,27 +338,47 @@ compactStorageDel sto key = do
atomically do
j <- newSequenceSTM sto
modifyTVar tvar (HM.insert key (Entry j (Del e)))
succUncommitedSTM sto 1
Just (Entry i (New v)) -> do
-- FIXME: if-commit-in-progress-then-put-tomb
atomically $ modifyTVar tvar (HM.delete key)
atomically do
modifyTVar tvar (HM.delete key)
succUncommitedSTM sto 1
newSequenceSTM :: CompactStorage -> STM Integer
newSequenceSTM :: CompactStorage k -> STM Integer
newSequenceSTM sto = stateTVar (csSeq sto) (\n -> (n+1,n))
compactStoragePut :: ForCompactStorage m => CompactStorage -> ByteString -> ByteString -> m ()
succUncommitedSTM :: CompactStorage k -> Integer -> STM ()
succUncommitedSTM sto k = modifyTVar (csUncommitted sto) (+k)
resetUncommitedSTM :: CompactStorage k -> STM ()
resetUncommitedSTM sto = writeTVar (csUncommitted sto) 0
compactStoragePut :: ForCompactStorage m => CompactStorage k -> ByteString -> ByteString -> m ()
compactStoragePut sto k v = do
-- TODO: ASAP-do-not-write-value-if-not-changed
let tvar = csKeys sto ! getKeyPrefix k
let tvar = getBucket sto k
atomically $ do
c <- newSequenceSTM sto
modifyTVar tvar (HM.insert k (Entry c (New v)))
compactStorageGet :: ForCompactStorage m => CompactStorage -> ByteString -> m (Maybe ByteString)
-- TODO: slow-parallel-read-access
-- будет тормозить на конкурентном считывании уже
-- существующих (не новых), значений
-- так как будет в эксклюзивном режиме елозить
-- указателем по файлу.
-- Возможные варианты: маппить файл при доступе
-- на чтение (а что делать, если он растёт?)
-- Собирать операции чтения в батчи и читать батч
-- последовательно (будут некие задержки, и
-- чтение становится хитрожопой операцией, а не
-- просто из файла считать)
compactStorageGet :: ForCompactStorage m => CompactStorage k -> ByteString -> m (Maybe ByteString)
compactStorageGet sto key = do
let tvar = csKeys sto ! getKeyPrefix key
let tvar = getBucket sto key
val <- readTVarIO tvar <&> HM.lookup key
case val of
@ -343,7 +393,18 @@ compactStorageGet sto key = do
BS.hGet ha (fromIntegral $ idxEntrySize e)
either throwIO (pure . Just) r
compactStorageClose :: ForCompactStorage m => CompactStorage -> m ()
compactStorageExists :: ForCompactStorage m => CompactStorage k -> ByteString -> m (Maybe Integer)
compactStorageExists sto key = do
let tvar = getBucket sto key
val <- readTVarIO tvar <&> HM.lookup key
case val of
Just (Entry _ (New s)) -> pure (Just (fromIntegral (BS.length s)))
Just (Entry _ (Off e)) -> pure (Just (fromIntegral $ idxEntrySize e))
_ -> pure Nothing
compactStorageClose :: ForCompactStorage m => CompactStorage k -> m ()
compactStorageClose sto = do
compactStorageCommit sto
-- FIXME: hangs-forever-on-io-exception
@ -365,14 +426,13 @@ compactStorageFindLiveHeads path = liftIO do
fwdOff <- hTell ha
-- fwd section
fwd <- lift (LBS.hGet ha 8)
fwd <- lift (LBS.fromStrict <$> BS.hGet ha 8)
<&> runGetOrFail getWord64be
>>= either (const mzero) pure
<&> view _3
h@(o,header) <- MaybeT $ readHeader mv (Just $ fromIntegral fwd)
let magicOk = hdrMagic header == headerMagic
let fwdOk = hdrFwdOffset header == fromIntegral fwdOff
@ -384,6 +444,10 @@ compactStorageFindLiveHeads path = liftIO do
maybe (pure acc) (\h -> next ( h : acc) ) what
compactStorageRun :: ForCompactStorage m => m ()
compactStorageRun = forever do
pause @'Seconds 1
appendHeader :: ForCompactStorage m
=> Handle
-> FwdEntryOffset -- fwd section offset
@ -398,7 +462,7 @@ appendHeader ha fwdOff poffset ioffset num = do
<> word64BE (coerce ioffset) -- 20
<> word32BE (coerce num) -- 24
<> word64BE (coerce $ fromMaybe 0 poffset) -- 32
liftIO $ LBS.hPut ha (B.toLazyByteString bs)
liftIO $ BS.hPut ha (LBS.toStrict $ B.toLazyByteString bs)
readHeader :: ForCompactStorage m
=> MVar Handle
@ -415,9 +479,9 @@ readHeader mha moff = do
hSeek ha AbsoluteSeek (fromIntegral off)
p <- hTell ha <&> fromIntegral
(p,) <$> LBS.hGet ha (headerSize 1)
(p,) <$> BS.hGet ha (headerSize 1)
let what = flip runGetOrFail bs do
let what = flip runGetOrFail (LBS.fromStrict bs) do
Header <$> getWord16be
<*> getWord16be
<*> getFwdOffset
@ -443,3 +507,58 @@ headerSize 1 = fromIntegral (32 :: Integer)
headerSize _ = error "unsupported header version"
-- Storage instance
translateKey :: Coercible (Hash hash) LBS.ByteString
=> ByteString
-> Hash hash
-> ByteString
translateKey prefix hash = prefix <> LBS.toStrict (coerce hash)
{-# INLINE translateKey #-}
instance ( MonadIO m, IsKey hash
, Hashed hash LBS.ByteString
, Coercible (Hash hash) LBS.ByteString
, Serialise (Hash hash)
, Key hash ~ Hash hash
, Eq (Key hash)
)
=> Storage (CompactStorage hash) hash LBS.ByteString m where
putBlock = enqueueBlock
enqueueBlock s lbs = do
let hash = hashObject @hash lbs
compactStoragePut s (translateKey "V" hash) (LBS.toStrict lbs)
pure (Just hash)
getBlock s hash = do
compactStorageGet s (translateKey "V" hash) <&> fmap LBS.fromStrict
getChunk s k off size = do
undefined
-- liftIO $ simpleGetChunkLazy s k off size
hasBlock sto k = do
compactStorageExists sto (translateKey "V" k)
updateRef sto ref v = do
let hash = hashObject @hash ref
-- TODO: figure-out-what-to-do-with-metadata
compactStoragePut sto (translateKey "R" hash) (LBS.toStrict (serialise v))
getRef sto ref = do
let hash = hashObject @hash ref
runMaybeT do
v <- MaybeT $ compactStorageGet sto (translateKey "R" hash)
deserialiseOrFail @(Hash hash) (LBS.fromStrict v)
& either (const mzero) pure
delBlock sto h = do
compactStorageDel sto (translateKey "V" h)
delRef sto ref = do
compactStorageDel sto (translateKey "V" (hashObject @hash ref))