mirror of https://github.com/voidlizard/hbs2
731 lines
21 KiB
Haskell
731 lines
21 KiB
Haskell
{-# LANGUAGE PatternSynonyms #-}
|
||
{-# Language ViewPatterns #-}
|
||
{-# Language UndecidableInstances #-}
|
||
module HBS2.Storage.Compact
|
||
( Storage(..)
|
||
, CompactStorageOpenError(..)
|
||
, CompactStorage
|
||
, CompactStorageOpenOpt(..)
|
||
, CompactStorageError(..)
|
||
, readonly
|
||
, compactStorageOpen
|
||
, compactStorageClose
|
||
, compactStorageCommit
|
||
, compactStoragePut
|
||
, compactStorageGet
|
||
, compactStorageDel
|
||
, compactStorageSize
|
||
, compactStorageFindLiveHeads
|
||
, compactStorageRun
|
||
, HBS2.Storage.Compact.keys
|
||
, HBS2.Storage.Compact.member
|
||
, HBS2.Storage.Compact.put
|
||
, HBS2.Storage.Compact.putVal
|
||
, HBS2.Storage.Compact.get
|
||
, HBS2.Storage.Compact.getValEither
|
||
, HBS2.Storage.Compact.del
|
||
, HBS2.Storage.Compact.commit
|
||
) where
|
||
|
||
import HBS2.Clock
|
||
import HBS2.Hash
|
||
import HBS2.Storage
|
||
|
||
import Data.Word
|
||
import Data.ByteString (ByteString)
|
||
import Data.ByteString qualified as BS
|
||
import Data.ByteString.Internal qualified as BS
|
||
import Data.ByteString.Lazy qualified as LBS
|
||
import Data.ByteString.Builder as B
|
||
import Data.Binary.Get
|
||
import Data.Coerce
|
||
import Data.Function
|
||
import Data.List qualified as List
|
||
import Data.Maybe
|
||
import Data.HashMap.Strict (HashMap)
|
||
import Data.HashMap.Strict qualified as HM
|
||
import Data.Foldable
|
||
import Data.Traversable
|
||
import Data.Vector (Vector,(!))
|
||
import Data.Vector qualified as V
|
||
import Codec.Serialise
|
||
import GHC.Generics
|
||
import Lens.Micro.Platform
|
||
import Control.Monad
|
||
import Control.Monad.Except
|
||
import Control.Monad.Trans
|
||
import Control.Monad.Trans.Maybe
|
||
import UnliftIO
|
||
|
||
import Foreign
|
||
import System.IO.MMap
|
||
|
||
import Debug.Trace
|
||
|
||
-- compact storage
|
||
-- for the off-tree data representation
|
||
-- may be it will be faster, than Simple storage
|
||
-- who knows
|
||
|
||
newtype EntryOffset = EntryOffset Word64
|
||
deriving newtype (Ord,Eq,Num,Enum,Real,Integral,Show)
|
||
deriving stock Generic
|
||
|
||
newtype FwdEntryOffset = FwdEntryOffset Word64
|
||
deriving newtype (Ord,Eq,Num,Enum,Real,Integral,Show)
|
||
deriving stock Generic
|
||
|
||
newtype EntrySize = EntrySize Word64
|
||
deriving newtype (Ord,Eq,Num,Enum,Real,Integral,Show)
|
||
deriving stock Generic
|
||
|
||
|
||
newtype EntryNum = EntryNum Word32
|
||
deriving newtype (Ord,Eq,Num,Enum,Real,Integral,Show)
|
||
deriving stock Generic
|
||
|
||
data IndexEntry =
|
||
IndexEntry
|
||
{ idxEntryOffset :: !EntryOffset
|
||
, idxEntrySize :: !EntrySize
|
||
, idxEntrySeq :: !Word64
|
||
, idxEntryTomb :: !Bool
|
||
, idxEntryKey :: !ByteString
|
||
}
|
||
deriving stock (Show,Generic)
|
||
|
||
|
||
instance Serialise EntryOffset
|
||
instance Serialise EntrySize
|
||
instance Serialise EntryNum
|
||
instance Serialise IndexEntry
|
||
|
||
data Header =
|
||
Header
|
||
{ hdrMagic :: Word16
|
||
, hdrVersion :: Word16
|
||
, hdrFwdOffset :: FwdEntryOffset
|
||
, hdrIndexOffset :: EntryOffset
|
||
, hdrIndexEntries :: EntryNum
|
||
, hdrPrev :: EntryOffset
|
||
}
|
||
deriving stock (Show,Generic)
|
||
|
||
data E = New ByteString
|
||
| Upd ByteString IndexEntry
|
||
| Off IndexEntry
|
||
| Del IndexEntry
|
||
|
||
data Entry = Entry Integer E
|
||
|
||
pattern Fresh :: Entry -> Entry
|
||
pattern Fresh e <- e@(Entry _ ( isFresh -> True ))
|
||
|
||
pattern Tomb :: Entry -> Entry
|
||
pattern Tomb e <- e@(Entry _ ( isTomb -> True ))
|
||
|
||
pattern Existed :: Entry -> IndexEntry -> Entry
|
||
pattern Existed e w <- e@(Entry _ (existed -> Just w))
|
||
|
||
-- {-# COMPLETE Existed #-}
|
||
|
||
isAlive :: Entry -> Bool
|
||
isAlive = \case
|
||
Entry _ New{} -> True
|
||
Entry _ Upd{} -> True
|
||
Entry _ e@(Off{}) -> not (isTomb e)
|
||
_ -> False
|
||
|
||
isTomb :: E -> Bool
|
||
isTomb (Off e) = idxEntryTomb e
|
||
isTomb _ = False
|
||
|
||
existed :: E -> Maybe IndexEntry
|
||
existed = \case
|
||
Off e -> Just e
|
||
Upd _ e -> Just e
|
||
Del e -> Just e
|
||
_ -> Nothing
|
||
|
||
isFresh :: E -> Bool
|
||
isFresh e = case e of
|
||
New{} -> True
|
||
Del{} -> True
|
||
Upd{} -> True
|
||
_ -> False
|
||
|
||
type Bucket = TVar (HashMap ByteString Entry)
|
||
|
||
type MMaped = (ForeignPtr Word8, Int, Int)
|
||
|
||
data CompactStorage k =
|
||
CompactStorage
|
||
{ csBuckets :: Int
|
||
, csFile :: FilePath
|
||
, csOpts :: CompactStorageOpenOpt
|
||
, csHandle :: MVar Handle
|
||
, csHeaderOff :: TVar EntryOffset
|
||
, csSeq :: TVar Integer
|
||
, csKeys :: Vector Bucket
|
||
, csUncommitted :: TVar Integer
|
||
, csMMapped :: TVar MMaped
|
||
}
|
||
|
||
type ForCompactStorage m = MonadIO m
|
||
|
||
data CompactStorageOpenOpt =
|
||
CompactStorageOpenOpt
|
||
{ csReadOnly :: Bool
|
||
}
|
||
|
||
|
||
instance Monoid CompactStorageOpenOpt where
|
||
mempty = CompactStorageOpenOpt False
|
||
|
||
instance Semigroup CompactStorageOpenOpt where
|
||
(<>) _ b = CompactStorageOpenOpt (csReadOnly b)
|
||
|
||
readonly :: CompactStorageOpenOpt
|
||
readonly = CompactStorageOpenOpt True
|
||
|
||
data CompactStorageOpenError =
|
||
InvalidHeader
|
||
| BrokenIndex
|
||
| InvalidFwdSection
|
||
deriving stock (Typeable,Show)
|
||
|
||
instance Exception CompactStorageOpenError
|
||
|
||
getBucket :: CompactStorage k -> ByteString -> Bucket
|
||
getBucket sto bs = do
|
||
let i = maybe 0 (fromIntegral.fst) (BS.uncons bs) `mod` csBuckets sto
|
||
csKeys sto ! i
|
||
{-# INLINE getBucket #-}
|
||
|
||
|
||
compactStorageOpen :: forall k m . (ForCompactStorage m)
|
||
=> CompactStorageOpenOpt
|
||
-> FilePath
|
||
-> m (CompactStorage k)
|
||
|
||
compactStorageOpen opt fp = do
|
||
|
||
let buck = 8
|
||
|
||
ha <- openFile fp ReadWriteMode
|
||
|
||
sz <- hFileSize ha
|
||
mha <- newMVar ha
|
||
|
||
hoff0 <- newTVarIO 0
|
||
|
||
keys0 <- replicateM buck (newTVarIO mempty) <&> V.fromList
|
||
uncommitted <- newTVarIO 0
|
||
|
||
ss <- newTVarIO 0
|
||
|
||
mmapped <- liftIO (mmapFileForeignPtr fp ReadOnly Nothing)
|
||
>>= newTVarIO
|
||
|
||
if sz == 0 then
|
||
pure $ CompactStorage buck fp opt mha hoff0 ss keys0 uncommitted mmapped
|
||
else do
|
||
(p,header) <- readHeader mha Nothing >>= maybe (throwIO InvalidHeader) pure
|
||
hoff <- newTVarIO p
|
||
let sto = CompactStorage buck fp opt mha hoff ss keys0 uncommitted mmapped
|
||
readIndex sto (hdrIndexOffset header) (hdrIndexEntries header)
|
||
|
||
flip fix (hdrPrev header) $ \next -> \case
|
||
0 -> pure ()
|
||
off -> do
|
||
(_,pHeader) <- readHeader mha (Just off) >>= maybe (throwIO InvalidHeader) pure
|
||
readIndex sto (hdrIndexOffset pHeader) (hdrIndexEntries pHeader)
|
||
next (hdrPrev pHeader)
|
||
|
||
pure sto
|
||
|
||
|
||
readIndex :: ForCompactStorage m
|
||
=> CompactStorage k
|
||
-> EntryOffset
|
||
-> EntryNum
|
||
-> m ()
|
||
readIndex sto offset num = liftIO do
|
||
withMVar (csHandle sto) $ \ha -> do
|
||
hSeek ha AbsoluteSeek (fromIntegral offset)
|
||
(rn,entries) <- flip fix (num, mempty, 0) $ \next left -> do
|
||
case left of
|
||
(0,acc,n) -> pure (n,acc)
|
||
(n,acc,rn) -> do
|
||
what <- runMaybeT do
|
||
|
||
slen <- liftIO (try @_ @IOException (BS.hGet ha 2))
|
||
<&> either (const Nothing) Just
|
||
& MaybeT
|
||
<&> LBS.fromStrict
|
||
|
||
len <- either (const Nothing) (Just . view _3) (runGetOrFail getWord16be slen)
|
||
& MaybeT . pure
|
||
|
||
sIdx <- liftIO (try @_ @IOException (BS.hGet ha (fromIntegral len)))
|
||
>>= either (const mzero) pure
|
||
<&> LBS.fromStrict
|
||
|
||
deserialiseOrFail @IndexEntry sIdx
|
||
& either (const mzero) pure
|
||
|
||
case what of
|
||
Nothing -> pure (0,mempty :: [IndexEntry])
|
||
Just idx -> next (pred n, idx : acc, succ rn)
|
||
|
||
when (rn /= num) do
|
||
throwIO BrokenIndex
|
||
|
||
let new = [ (idxEntryKey e,Entry 0 (Off e)) | e <- entries ]
|
||
-- readIndex from newer to older
|
||
-- so we keep only the newer values in map
|
||
atomically do
|
||
for_ new $ \(k,v) -> do
|
||
let tv = getBucket sto k
|
||
modifyTVar tv (HM.insertWith (\_ o -> o) k v)
|
||
|
||
compactStorageCommit :: ForCompactStorage m => CompactStorage k -> m ()
|
||
compactStorageCommit sto = liftIO do
|
||
withMVar (csHandle sto) $ \ha -> do
|
||
hSeek ha SeekFromEnd 0
|
||
|
||
mma <- readTVarIO (csMMapped sto)
|
||
|
||
kv <- atomically do
|
||
mapM readTVar (csKeys sto) <&> mconcat . V.toList . fmap HM.toList
|
||
|
||
let items = [ (k, e)
|
||
| (k, e@Fresh{}) <- kv
|
||
, reallyUpdated mma e
|
||
]
|
||
|
||
unless (List.null items) do
|
||
|
||
-- write fwd
|
||
offFwd <- hTell ha
|
||
BS.hPut ha (LBS.toStrict $ toLazyByteString $ word64BE 0)
|
||
|
||
let off0 = offFwd + 8
|
||
|
||
-- write data
|
||
idxEntries <- flip fix (off0, items, mempty) $ \next (off, what, idx) -> do
|
||
case what of
|
||
[] -> pure idx
|
||
|
||
((_,Entry i (Del e)):rest) | not (idxEntryTomb e) -> do
|
||
next (off + 0, rest, (e { idxEntryTomb = True },i) : idx)
|
||
|
||
((k,Entry i (Upd v e)):rest) -> do
|
||
BS.hPut ha v
|
||
let sz = fromIntegral $ BS.length v
|
||
next (off + sz, rest, (IndexEntry (fromIntegral off) (fromIntegral sz) 0 False k,i) : idx)
|
||
|
||
((k,Entry i (New v)):rest) -> do
|
||
BS.hPut ha v
|
||
let sz = fromIntegral $ BS.length v
|
||
next (off + sz, rest, (IndexEntry (fromIntegral off) (fromIntegral sz) 0 False k,i) : idx)
|
||
|
||
((_,Entry _ _):rest) -> do
|
||
next (off + 0, rest, idx)
|
||
|
||
|
||
offIdx0 <- hTell ha <&> fromIntegral
|
||
|
||
-- write index
|
||
for_ idxEntries $ \(e,_) -> do
|
||
let lbs = serialise e
|
||
BS.hPut ha $ LBS.toStrict (B.toLazyByteString $
|
||
word16BE (fromIntegral $ LBS.length lbs)
|
||
<> B.lazyByteString lbs)
|
||
|
||
offPrev <- readTVarIO (csHeaderOff sto)
|
||
|
||
offCommitHead <- hTell ha
|
||
|
||
-- FIXME: maybe-slow-length-calc
|
||
appendHeader ha (fromIntegral offFwd) (Just offPrev) offIdx0 (fromIntegral $ length idxEntries)
|
||
|
||
hSeek ha AbsoluteSeek offFwd
|
||
|
||
BS.hPut ha (LBS.toStrict $ toLazyByteString $ word64BE (fromIntegral offCommitHead))
|
||
|
||
hFlush ha
|
||
|
||
hSeek ha SeekFromEnd 0
|
||
|
||
offLast <- hTell ha <&> fromIntegral
|
||
|
||
remapFile
|
||
|
||
atomically do
|
||
writeTVar (csHeaderOff sto) (offLast - headerSize 1)
|
||
for_ idxEntries $ \(e,i) -> do
|
||
let k = idxEntryKey e
|
||
let tv = getBucket sto k
|
||
modifyTVar tv (HM.alter (doAlter (Entry i (Off e))) k)
|
||
resetUncommitedSTM sto
|
||
|
||
where
|
||
|
||
doAlter y@(Entry i (Off e)) v0 = case v0 of
|
||
-- deleted-during-commit
|
||
Nothing -> Just (Entry i (Del e))
|
||
|
||
Just x | getSeq x > getSeq y -> Just x
|
||
| otherwise -> Just y
|
||
|
||
doAlter _ v = v
|
||
|
||
getSeq = \case
|
||
Entry i _ -> i
|
||
|
||
remapFile :: ForCompactStorage m => m ()
|
||
remapFile = do
|
||
let fp = csFile sto
|
||
unmapFile sto
|
||
mmapped <- liftIO (mmapFileForeignPtr fp ReadOnly Nothing)
|
||
atomically (writeTVar (csMMapped sto) mmapped)
|
||
|
||
-- NOTE: this-might-be-slow
|
||
-- но это правильно, поскольку
|
||
-- у нас **compact** storage и мы не хотим,
|
||
-- что бы его раздувало одинаковыми значениями
|
||
-- Можно попробовать использовать siphash
|
||
-- при загрузке (?)... да ну нахрен, капец долго
|
||
-- будет. если только его не хранить (это можно)
|
||
reallyUpdated mma = \case
|
||
Entry _ (Upd v e) -> readValue mma e /= v
|
||
|
||
_ -> True
|
||
|
||
|
||
compactStorageDel :: ForCompactStorage m => CompactStorage k -> ByteString -> m ()
|
||
compactStorageDel sto key = do
|
||
|
||
let tvar = getBucket sto key
|
||
val <- readTVarIO tvar <&> HM.lookup key
|
||
|
||
case val of
|
||
Nothing -> pure ()
|
||
|
||
Just (Entry i (Del _)) -> pure ()
|
||
|
||
Just (Entry _ (New _)) -> do
|
||
-- FIXME: if-commit-in-progress-then-put-tomb
|
||
atomically do
|
||
modifyTVar tvar (HM.delete key)
|
||
succUncommitedSTM sto 1
|
||
|
||
Just (Existed e what) -> do
|
||
atomically do
|
||
j <- newSequenceSTM sto
|
||
modifyTVar tvar (HM.insert key (Entry j (Del what)))
|
||
succUncommitedSTM sto 1
|
||
|
||
-- FIXME: fix-incomplete-pattern-warning
|
||
_ -> pure ()
|
||
|
||
newSequenceSTM :: CompactStorage k -> STM Integer
|
||
newSequenceSTM sto = stateTVar (csSeq sto) (\n -> (n+1,n))
|
||
|
||
succUncommitedSTM :: CompactStorage k -> Integer -> STM ()
|
||
succUncommitedSTM sto k = modifyTVar (csUncommitted sto) (+k)
|
||
|
||
resetUncommitedSTM :: CompactStorage k -> STM ()
|
||
resetUncommitedSTM sto = writeTVar (csUncommitted sto) 0
|
||
|
||
compactStorageSize :: ForCompactStorage m => CompactStorage k -> m Integer
|
||
compactStorageSize sto = liftIO $ withMVar (csHandle sto) hFileSize
|
||
|
||
compactStoragePut :: ForCompactStorage m => CompactStorage k -> ByteString -> ByteString -> m ()
|
||
compactStoragePut sto k v = do
|
||
let tvar = getBucket sto k
|
||
|
||
atomically $ do
|
||
c <- newSequenceSTM sto
|
||
modifyTVar tvar (HM.insertWith check k (Entry c (New v)))
|
||
|
||
where
|
||
check (Entry i (New v1)) (Entry _ (Off e)) = Entry i (Upd v1 e)
|
||
check (Entry i (New v1)) (Entry _ (Upd v0 e)) = Entry i (Upd v1 e)
|
||
check x _ = x
|
||
|
||
readValue :: MMaped -> IndexEntry -> ByteString
|
||
readValue what e = do
|
||
let ptr = what & view _1
|
||
BS.fromForeignPtr ptr (fromIntegral $ idxEntryOffset e)
|
||
(fromIntegral $ idxEntrySize e)
|
||
{-# INLINE readValue #-}
|
||
|
||
compactStorageGet :: ForCompactStorage m => CompactStorage k -> ByteString -> m (Maybe ByteString)
|
||
compactStorageGet sto key = do
|
||
let tvar = getBucket sto key
|
||
val <- readTVarIO tvar <&> HM.lookup key
|
||
|
||
case val of
|
||
Nothing -> pure Nothing
|
||
Just (Tomb{}) -> pure Nothing
|
||
Just (Entry _ (Del _)) -> pure Nothing
|
||
Just (Entry _ (New s)) -> pure (Just s)
|
||
Just (Entry _ (Upd s _)) -> pure (Just s)
|
||
Just (Entry _ (Off e)) -> Just <$> (readTVarIO (csMMapped sto) <&> flip readValue e)
|
||
|
||
compactStorageExists :: ForCompactStorage m => CompactStorage k -> ByteString -> m (Maybe Integer)
|
||
compactStorageExists sto key = do
|
||
let tvar = getBucket sto key
|
||
val <- readTVarIO tvar <&> HM.lookup key
|
||
|
||
case val of
|
||
Just (Entry _ (New s)) -> pure (Just (fromIntegral (BS.length s)))
|
||
Just (Entry _ (Off e)) -> pure (Just (fromIntegral $ idxEntrySize e))
|
||
Just (Entry _ (Upd v e)) -> pure (Just (fromIntegral $ BS.length v))
|
||
_ -> pure Nothing
|
||
|
||
unmapFile :: ForCompactStorage m => CompactStorage sto -> m ()
|
||
unmapFile sto = do
|
||
mmapped <- readTVarIO (csMMapped sto)
|
||
liftIO $ finalizeForeignPtr (view _1 mmapped)
|
||
-- NOTE: mmapped-is-invalid-now
|
||
-- если теперь позвать что-то, что
|
||
-- читает из этого мапинга -- то всё грохнется
|
||
|
||
|
||
compactStorageClose :: ForCompactStorage m => CompactStorage k -> m ()
|
||
compactStorageClose sto = do
|
||
|
||
unless (csOpts sto & csReadOnly) do
|
||
compactStorageCommit sto
|
||
|
||
-- FIXME: hangs-forever-on-io-exception
|
||
liftIO $ do
|
||
unmapFile sto
|
||
withMVar (csHandle sto) hClose
|
||
|
||
compactStorageFindLiveHeads :: ForCompactStorage m
|
||
=> FilePath
|
||
-> m [(EntryOffset, Header)]
|
||
|
||
compactStorageFindLiveHeads path = liftIO do
|
||
withFile path ReadMode $ \ha -> do
|
||
|
||
mv <- newMVar ha
|
||
|
||
flip fix (mempty :: [(EntryOffset, Header)]) $ \next acc -> do
|
||
|
||
what <- runMaybeT do
|
||
|
||
fwdOff <- hTell ha
|
||
|
||
-- fwd section
|
||
fwd <- lift (LBS.fromStrict <$> BS.hGet ha 8)
|
||
<&> runGetOrFail getWord64be
|
||
>>= either (const mzero) pure
|
||
<&> view _3
|
||
|
||
h@(o,header) <- MaybeT $ readHeader mv (Just $ fromIntegral fwd)
|
||
|
||
let magicOk = hdrMagic header == headerMagic
|
||
let fwdOk = hdrFwdOffset header == fromIntegral fwdOff
|
||
|
||
if magicOk && fwdOk then
|
||
pure h
|
||
else
|
||
mzero
|
||
|
||
maybe (pure acc) (\h -> next ( h : acc) ) what
|
||
|
||
|
||
compactStorageRun :: ForCompactStorage m => m ()
|
||
compactStorageRun = forever do
|
||
pause @'Seconds 1
|
||
|
||
appendHeader :: ForCompactStorage m
|
||
=> Handle
|
||
-> FwdEntryOffset -- fwd section offset
|
||
-> Maybe EntryOffset -- prev. header
|
||
-> EntryOffset
|
||
-> EntryNum
|
||
-> m ()
|
||
appendHeader ha fwdOff poffset ioffset num = do
|
||
let bs = word16BE headerMagic -- 2
|
||
<> word16BE headerVersion -- 4
|
||
<> word64BE (coerce fwdOff) -- 12
|
||
<> word64BE (coerce ioffset) -- 20
|
||
<> word32BE (coerce num) -- 24
|
||
<> word64BE (coerce $ fromMaybe 0 poffset) -- 32
|
||
liftIO $ BS.hPut ha (LBS.toStrict $ B.toLazyByteString bs)
|
||
|
||
readHeader :: ForCompactStorage m
|
||
=> MVar Handle
|
||
-> Maybe EntryOffset
|
||
-> m (Maybe (EntryOffset, Header))
|
||
|
||
readHeader mha moff = do
|
||
(off,bs) <- liftIO $ withMVar mha $ \ha -> do
|
||
|
||
case moff of
|
||
Nothing -> do
|
||
hSeek ha SeekFromEnd (negate $ headerSize 1)
|
||
Just off -> do
|
||
hSeek ha AbsoluteSeek (fromIntegral off)
|
||
|
||
p <- hTell ha <&> fromIntegral
|
||
(p,) <$> BS.hGet ha (headerSize 1)
|
||
|
||
let what = flip runGetOrFail (LBS.fromStrict bs) do
|
||
Header <$> getWord16be
|
||
<*> getWord16be
|
||
<*> getFwdOffset
|
||
<*> getOffset
|
||
<*> getNum
|
||
<*> getOffset
|
||
|
||
pure $ either (const Nothing) (fmap (off,) . Just . view _3) what
|
||
|
||
where
|
||
getOffset = EntryOffset <$> getWord64be
|
||
getNum = EntryNum <$> getWord32be
|
||
getFwdOffset = FwdEntryOffset <$> getWord64be
|
||
|
||
headerMagic :: Word16
|
||
headerMagic = 32264
|
||
|
||
headerVersion :: Word16
|
||
headerVersion = 1
|
||
|
||
headerSize :: Integral a => Word16 -> a
|
||
headerSize 1 = fromIntegral (32 :: Integer)
|
||
headerSize _ = error "unsupported header version"
|
||
|
||
|
||
-- Map-like interface
|
||
|
||
keys :: ForCompactStorage m => CompactStorage k -> m [ ByteString ]
|
||
keys sto = do
|
||
what <- atomically $ mapM readTVar (csKeys sto)
|
||
let w = foldMap HM.toList (V.toList what)
|
||
pure [ k | (k,x) <- w, isAlive x ]
|
||
|
||
member :: ForCompactStorage m
|
||
=> CompactStorage k
|
||
-> ByteString
|
||
-> m Bool
|
||
member s k = isJust <$> compactStorageExists s k
|
||
|
||
put :: ForCompactStorage m
|
||
=> CompactStorage k
|
||
-> ByteString
|
||
-> ByteString
|
||
-> m ()
|
||
|
||
put = compactStoragePut
|
||
|
||
putVal :: forall k v m h . (ForCompactStorage m, Serialise k, Serialise v)
|
||
=> CompactStorage h
|
||
-> k
|
||
-> v
|
||
-> m ()
|
||
putVal sto k v = do
|
||
put sto (LBS.toStrict $ serialise k) (LBS.toStrict $ serialise v)
|
||
|
||
get :: ForCompactStorage m
|
||
=> CompactStorage k
|
||
-> ByteString
|
||
-> m (Maybe ByteString)
|
||
|
||
get = compactStorageGet
|
||
|
||
data CompactStorageError =
|
||
DeserealiseError
|
||
deriving (Typeable,Show)
|
||
|
||
instance Exception CompactStorageError
|
||
|
||
getValEither :: forall v k m h . ( ForCompactStorage m
|
||
, Serialise k, Serialise v
|
||
)
|
||
=> CompactStorage h
|
||
-> k
|
||
-> m (Either CompactStorageError (Maybe v))
|
||
|
||
getValEither sto k = do
|
||
bs <- get sto (LBS.toStrict (serialise k))
|
||
let v = fmap (deserialiseOrFail @v . LBS.fromStrict) bs
|
||
case v of
|
||
Just (Left _) -> pure $ Left DeserealiseError
|
||
Just (Right x) -> pure $ Right (Just x)
|
||
Nothing -> pure (Right Nothing)
|
||
|
||
del :: ForCompactStorage m
|
||
=> CompactStorage k
|
||
-> ByteString
|
||
-> m ()
|
||
|
||
del = compactStorageDel
|
||
|
||
commit :: ForCompactStorage m
|
||
=> CompactStorage sto
|
||
-> m ()
|
||
commit = compactStorageCommit
|
||
|
||
-- Storage instance
|
||
|
||
translateKey :: Coercible (Hash hash) ByteString
|
||
=> ByteString
|
||
-> Hash hash
|
||
-> ByteString
|
||
translateKey prefix hash = prefix <> coerce hash
|
||
|
||
{-# INLINE translateKey #-}
|
||
|
||
instance ( MonadIO m, IsKey hash
|
||
, Hashed hash LBS.ByteString
|
||
, Coercible (Hash hash) ByteString
|
||
, Serialise (Hash hash)
|
||
, Key hash ~ Hash hash
|
||
, Eq (Key hash)
|
||
)
|
||
=> Storage (CompactStorage hash) hash LBS.ByteString m where
|
||
|
||
putBlock = enqueueBlock
|
||
|
||
enqueueBlock s lbs = do
|
||
let hash = hashObject @hash lbs
|
||
compactStoragePut s (translateKey "V" hash) (LBS.toStrict lbs)
|
||
pure (Just hash)
|
||
|
||
getBlock s hash = do
|
||
compactStorageGet s (translateKey "V" hash) <&> fmap LBS.fromStrict
|
||
|
||
getChunk s hash off size = runMaybeT do
|
||
bs <- MaybeT $ compactStorageGet s (translateKey "V" hash)
|
||
pure $ LBS.fromStrict $ BS.take (fromIntegral size) $ BS.drop (fromIntegral off) bs
|
||
|
||
hasBlock sto k = do
|
||
compactStorageExists sto (translateKey "V" k)
|
||
|
||
updateRef sto ref v = do
|
||
let hash = hashObject @hash ref
|
||
-- TODO: figure-out-what-to-do-with-metadata
|
||
compactStoragePut sto (translateKey "R" hash) (LBS.toStrict (serialise v))
|
||
|
||
getRef sto ref = do
|
||
let hash = hashObject @hash ref
|
||
runMaybeT do
|
||
v <- MaybeT $ compactStorageGet sto (translateKey "R" hash)
|
||
deserialiseOrFail @(Hash hash) (LBS.fromStrict v)
|
||
& either (const mzero) pure
|
||
|
||
delBlock sto h = do
|
||
compactStorageDel sto (translateKey "V" h)
|
||
|
||
delRef sto ref = do
|
||
compactStorageDel sto (translateKey "R" (hashObject @hash ref))
|
||
|
||
|