This commit is contained in:
Dmitry Zuikov 2024-05-30 09:36:25 +03:00
parent 3d4ec7b3a4
commit 9d0fddd417
1 changed files with 205 additions and 30 deletions

View File

@ -1,6 +1,5 @@
module HBS2.Storage.Compact {-# Language PatternSynonyms #-}
( module HBS2.Storage.Compact where
) where
import Data.Word import Data.Word
@ -10,99 +9,271 @@ import Data.ByteString.Lazy qualified as LBS
import Data.ByteString.Builder as B import Data.ByteString.Builder as B
import Data.Binary.Get import Data.Binary.Get
import Data.Coerce import Data.Coerce
import Data.Function
import Data.List qualified as List
import Data.Maybe
import Data.Map (Map)
import Data.Map qualified as Map
import Data.Foldable
import Data.Traversable
import Codec.Serialise import Codec.Serialise
import GHC.Generics import GHC.Generics
-- import System.IO -- import System.IO
import Lens.Micro.Platform import Lens.Micro.Platform
import Control.Monad.Except import Control.Monad.Except
import Control.Monad.Trans.Maybe
import Control.Monad
import UnliftIO import UnliftIO
import Debug.Trace
-- compact storage -- compact storage
-- for the off-tree data representation -- for the off-tree data representation
-- may be it will be faster, than Simple storage -- may be it will be faster, than Simple storage
-- who knows -- who knows
newtype EntryOffset = EntryOffset Word64 newtype EntryOffset = EntryOffset Word64
deriving newtype (Ord,Eq,Num,Enum,Real) deriving newtype (Ord,Eq,Num,Enum,Real,Integral,Show)
deriving stock Generic deriving stock Generic
newtype EntrySize = EntrySize Word64 newtype EntrySize = EntrySize Word64
deriving newtype (Ord,Eq,Num,Enum,Real) deriving newtype (Ord,Eq,Num,Enum,Real,Integral,Show)
deriving stock Generic deriving stock Generic
newtype EntryNum = EntryNum Word64
deriving newtype (Ord,Eq,Num,Enum,Real,Integral,Show)
deriving stock Generic
data IndexEntry = data IndexEntry =
IndexEntry IndexEntry
{ idxEntryPrev :: Maybe Word64 { idxEntryOffset :: EntryOffset
, idxEntryOffset :: EntryOffset
, idxEntrySize :: EntrySize , idxEntrySize :: EntrySize
, idxEntrySeq :: Word64
, idxEntryTomb :: Bool
, idxEntryKey :: ByteString , idxEntryKey :: ByteString
} }
deriving stock Generic deriving stock (Show,Generic)
instance Serialise EntryOffset
instance Serialise EntrySize
instance Serialise EntryNum
instance Serialise IndexEntry
data Header = data Header =
Header Header
{ hdrMagic :: Word16 { hdrMagic :: Word16
, hdrVersion :: Word16 , hdrVersion :: Word16
, hdrIndexOffset :: EntryOffset , hdrIndexOffset :: EntryOffset
, hdrIndexEntries :: EntryNum
, hdrPrev :: EntryOffset
} }
deriving stock Generic deriving stock (Show,Generic)
data CompactStorage = data CompactStorage =
CompactStorage CompactStorage
{ csHandle :: MVar Handle { csHandle :: MVar Handle
, csHeaderOff :: TVar EntryOffset
, csKeys :: TVar (Map ByteString (Either IndexEntry ByteString))
} }
type ForCompactStorage m = MonadIO m type ForCompactStorage m = MonadIO m
data CompactStorageOpenOpt = Default data CompactStorageOpenOpt
deriving stock (Eq,Ord)
data CompactStorageOpenError = data CompactStorageOpenError =
InvalidHeader InvalidHeader
| BrokenIndex
deriving stock (Typeable,Show) deriving stock (Typeable,Show)
instance Exception CompactStorageOpenError instance Exception CompactStorageOpenError
compactStorageOpen :: ForCompactStorage m compactStorageOpen :: ForCompactStorage m
=> CompactStorageOpenOpt => [CompactStorageOpenOpt]
-> FilePath -> FilePath
-> m CompactStorage -> m CompactStorage
compactStorageOpen _ fp = do compactStorageOpen _ fp = do
ha <- openFile fp ReadWriteMode ha <- openFile fp ReadWriteMode
sz <- hFileSize ha
mha <- newMVar ha mha <- newMVar ha
header <- readHeader mha >>= maybe (throwIO InvalidHeader) pure
pure $ CompactStorage mha hoff0 <- newTVarIO 0
keys0 <- newTVarIO mempty
if sz == 0 then
pure $ CompactStorage mha hoff0 keys0
else do
(p,header) <- readHeader mha Nothing >>= maybe (throwIO InvalidHeader) pure
traceM (show ("HEADER",header))
hoff <- newTVarIO p
let sto = CompactStorage mha hoff keys0
readIndex sto (hdrIndexOffset header) (hdrIndexEntries header)
flip fix (hdrPrev header) $ \next -> \case
0 -> pure ()
off -> do
(_,pHeader) <- readHeader mha (Just off) >>= maybe (throwIO InvalidHeader) pure
traceM (show ("PHEADER",pHeader))
readIndex sto (hdrIndexOffset pHeader) (hdrIndexEntries pHeader)
next (hdrPrev pHeader)
pure sto
readIndex :: ForCompactStorage m
=> CompactStorage
-> EntryOffset
-> EntryNum
-> m ()
readIndex sto offset num = liftIO do
withMVar (csHandle sto) $ \ha -> do
hSeek ha AbsoluteSeek (fromIntegral offset)
(rn,entries) <- flip fix (num, mempty, 0) $ \next left -> do
case left of
(0,acc,n) -> pure (n,acc)
(n,acc,rn) -> do
what <- runMaybeT do
slen <- liftIO (try @_ @IOException (LBS.hGet ha 2))
<&> either (const Nothing) Just
& MaybeT
len <- either (const Nothing) (Just . view _3) (runGetOrFail getWord16be slen)
& MaybeT . pure
sIdx <- liftIO (try @_ @IOException (LBS.hGet ha (fromIntegral len)))
>>= either (const mzero) pure
deserialiseOrFail @IndexEntry sIdx
& either (const mzero) pure
case what of
Nothing -> pure (0,mempty :: [IndexEntry])
Just idx -> next (pred n, idx : acc, succ rn)
when (rn /= num) do
throwIO BrokenIndex
atomically do
let new = Map.fromList [ (k,Left e) | e@(IndexEntry _ _ _ _ k) <- entries ]
-- readIndex from newer to older
-- so we keep only the newer values in map
modifyTVar (csKeys sto) (Map.unionWith (\_ b -> b) new)
compactStorageCommit :: ForCompactStorage m => CompactStorage -> m () compactStorageCommit :: ForCompactStorage m => CompactStorage -> m ()
compactStorageCommit sto = do compactStorageCommit sto = liftIO do
pure () withMVar (csHandle sto) $ \ha -> do
hSeek ha SeekFromEnd 0
kv <- readTVarIO (csKeys sto) <&> Map.toList
appendHeader :: ForCompactStorage m => Handle -> EntryOffset -> m () let items = [ (k, v) | (k, Right v) <- kv ]
appendHeader ha offset = do
unless (List.null items) do
off0 <- hTell ha
idxEntries <- flip fix (off0, items, mempty) $ \next (off, what, idx) -> do
case what of
[] -> pure idx
((k,v):rest) -> do
BS.hPut ha v
let sz = fromIntegral $ BS.length v
next (off + sz, rest, IndexEntry (fromIntegral off) (fromIntegral sz) 0 False k : idx)
offIdx0 <- hTell ha <&> fromIntegral
for_ idxEntries $ \e -> do
let lbs = serialise e
LBS.hPut ha (B.toLazyByteString $
word16BE (fromIntegral $ LBS.length lbs)
<> B.lazyByteString lbs)
offPrev <- readTVarIO (csHeaderOff sto)
-- FIXME: maybe-slow-length-calc
appendHeader ha (Just offPrev) offIdx0 (fromIntegral $ length idxEntries)
hFlush ha
hSeek ha SeekFromEnd 0
offLast <- hTell ha <&> fromIntegral
atomically do
writeTVar (csHeaderOff sto) (offLast - headerSize 1)
for_ idxEntries $ \e -> do
modifyTVar (csKeys sto) (Map.insert (idxEntryKey e) (Left e))
compactStoragePut :: ForCompactStorage m => CompactStorage -> ByteString -> ByteString -> m ()
compactStoragePut sto k v = do
-- TODO: ASAP-do-not-write-value-if-not-changed
atomically $ modifyTVar (csKeys sto) (Map.insert k (Right v))
compactStorageGet :: ForCompactStorage m => CompactStorage -> ByteString -> m (Maybe ByteString)
compactStorageGet sto key = do
val <- readTVarIO (csKeys sto) <&> Map.lookup key
case val of
Nothing -> pure Nothing
Just (Right s) -> pure (Just s)
Just (Left e) -> liftIO do
r <- withMVar (csHandle sto) $ \ha -> do
try @_ @IOException do
hSeek ha AbsoluteSeek (fromIntegral $ idxEntryOffset e)
BS.hGet ha (fromIntegral $ idxEntrySize e)
either throwIO (pure . Just) r
compactStorageClose :: ForCompactStorage m => CompactStorage -> m ()
compactStorageClose sto = do
compactStorageCommit sto
-- FIXME: hangs-forever-on-io-exception
w <- takeMVar (csHandle sto)
hClose w
appendHeader :: ForCompactStorage m
=> Handle
-> Maybe EntryOffset -- prev. header
-> EntryOffset
-> EntryNum
-> m ()
appendHeader ha hoffset offset num = do
let bs = word16BE headerMagic let bs = word16BE headerMagic
<> word16BE headerVersion <> word16BE headerVersion
<> word64BE (coerce offset) <> word64BE (coerce offset)
<> byteString (BS.replicate 52 0) <> word64BE (coerce num)
<> word64BE (coerce (fromMaybe 0 hoffset))
<> byteString (BS.replicate 4 0)
liftIO $ LBS.hPut ha (B.toLazyByteString bs) liftIO $ LBS.hPut ha (B.toLazyByteString bs)
readHeader :: ForCompactStorage m => MVar Handle -> m (Maybe Header) readHeader :: ForCompactStorage m
readHeader mha = do => MVar Handle
bs <- liftIO $ withMVar mha $ \ha -> do -> Maybe EntryOffset
hSeek ha SeekFromEnd (-64) -> m (Maybe (EntryOffset, Header))
LBS.hGet ha 64
readHeader mha moff = do
(off,bs) <- liftIO $ withMVar mha $ \ha -> do
case moff of
Nothing -> do
hSeek ha SeekFromEnd (negate $ headerSize 1)
Just off -> do
hSeek ha AbsoluteSeek (fromIntegral off)
p <- hTell ha <&> fromIntegral
(p,) <$> LBS.hGet ha (headerSize 1)
let what = flip runGetOrFail bs do let what = flip runGetOrFail bs do
Header <$> getWord16be Header <$> getWord16be
<*> getWord16be <*> getWord16be
<*> getOffset <*> getOffset
<*> getNum
<*> getOffset
pure $ either (const Nothing) (Just . view _3) what pure $ either (const Nothing) (fmap (off,) . Just . view _3) what
where where
getOffset = EntryOffset <$> getWord64be getOffset = EntryOffset <$> getWord64be
getNum = EntryNum <$> getWord64be
headerMagic :: Word16 headerMagic :: Word16
headerMagic = 32264 headerMagic = 32264
@ -110,4 +281,8 @@ headerMagic = 32264
headerVersion :: Word16 headerVersion :: Word16
headerVersion = 1 headerVersion = 1
headerSize :: Integral a => Word16 -> a
headerSize 1 = fromIntegral (32 :: Integer)
headerSize _ = error "unsupported header version"