diff --git a/hbs2-storage-simple/hbs2-storage-simple.cabal b/hbs2-storage-simple/hbs2-storage-simple.cabal index 3833f571..418731c5 100644 --- a/hbs2-storage-simple/hbs2-storage-simple.cabal +++ b/hbs2-storage-simple/hbs2-storage-simple.cabal @@ -88,6 +88,7 @@ library , temporary , filepattern , unliftio + , unix , vector diff --git a/hbs2-storage-simple/lib/HBS2/Storage/Compact.hs b/hbs2-storage-simple/lib/HBS2/Storage/Compact.hs index d0393d85..0100e362 100644 --- a/hbs2-storage-simple/lib/HBS2/Storage/Compact.hs +++ b/hbs2-storage-simple/lib/HBS2/Storage/Compact.hs @@ -1,7 +1,12 @@ {-# LANGUAGE PatternSynonyms #-} {-# Language ViewPatterns #-} +{-# Language UndecidableInstances #-} module HBS2.Storage.Compact where +import HBS2.Clock +import HBS2.Hash +import HBS2.Storage + import Data.Word import Data.ByteString (ByteString) @@ -32,6 +37,10 @@ import Control.Concurrent.STM.TSem import Safe import UnliftIO +-- import System.Posix.IO +-- import System.Posix.Fcntl +import System.Posix.Types + import Debug.Trace -- compact storage @@ -105,12 +114,16 @@ isFresh e = case e of Del{} -> True _ -> False -data CompactStorage = +type Bucket = TVar (HashMap ByteString Entry) + +data CompactStorage k = CompactStorage - { csHandle :: MVar Handle - , csHeaderOff :: IORef EntryOffset - , csSeq :: TVar Integer - , csKeys :: Vector (TVar (HashMap ByteString Entry)) + { csBuckets :: Int + , csHandle :: MVar Handle + , csHeaderOff :: TVar EntryOffset + , csSeq :: TVar Integer + , csKeys :: Vector Bucket + , csUncommitted :: TVar Integer } type ForCompactStorage m = MonadIO m @@ -125,37 +138,53 @@ data CompactStorageOpenError = instance Exception CompactStorageOpenError -buckets :: Int -buckets = 8 +getBucket :: CompactStorage k -> ByteString -> Bucket +getBucket sto bs = do + let i = maybe 0 (fromIntegral.fst) (BS.uncons bs) `mod` csBuckets sto + csKeys sto ! i +{-# INLINE getBucket #-} --- FIXME: buckets-hardcode -getKeyPrefix :: ByteString -> Int -getKeyPrefix bs = maybe 0 (fromIntegral.fst) (BS.uncons bs) `mod` buckets -{-# INLINE getKeyPrefix #-} + +-- openFileForReadWrite :: FilePath -> IO (Fd, Fd) +-- openFileForReadWrite fp = do +-- fdW <- openFd fp ReadWrite (Just ownerModes) defaultFileFlags { nonBlock = True } +-- fdR <- openFd fp ReadOnly Nothing defaultFileFlags { nonBlock = True } +-- return (fdW, fdR) + +-- -- Преобразование Fd в Handle +-- fdToHandleReadWrite :: (Fd, Fd) -> IO (Handle, Handle) +-- fdToHandleReadWrite (fdW, fdR) = do +-- haW <- fdToHandle fdW +-- haR <- fdToHandle fdR +-- return (haW, haR) compactStorageOpen :: ForCompactStorage m => [CompactStorageOpenOpt] -> FilePath - -> m CompactStorage + -> m (CompactStorage k) compactStorageOpen _ fp = do + + let buck = 8 + ha <- openFile fp ReadWriteMode + sz <- hFileSize ha mha <- newMVar ha - hoff0 <- newIORef 0 + hoff0 <- newTVarIO 0 - keys0 <- replicateM buckets (newTVarIO mempty) <&> V.fromList + keys0 <- replicateM buck (newTVarIO mempty) <&> V.fromList + uncommitted <- newTVarIO 0 - -- ss <- newIORef 0 ss <- newTVarIO 0 if sz == 0 then - pure $ CompactStorage mha hoff0 ss keys0 + pure $ CompactStorage buck mha hoff0 ss keys0 uncommitted else do (p,header) <- readHeader mha Nothing >>= maybe (throwIO InvalidHeader) pure - hoff <- newIORef p - let sto = CompactStorage mha hoff ss keys0 + hoff <- newTVarIO p + let sto = CompactStorage buck mha hoff ss keys0 uncommitted readIndex sto (hdrIndexOffset header) (hdrIndexEntries header) flip fix (hdrPrev header) $ \next -> \case @@ -169,7 +198,7 @@ compactStorageOpen _ fp = do readIndex :: ForCompactStorage m - => CompactStorage + => CompactStorage k -> EntryOffset -> EntryNum -> m () @@ -182,15 +211,17 @@ readIndex sto offset num = liftIO do (n,acc,rn) -> do what <- runMaybeT do - slen <- liftIO (try @_ @IOException (LBS.hGet ha 2)) + slen <- liftIO (try @_ @IOException (BS.hGet ha 2)) <&> either (const Nothing) Just & MaybeT + <&> LBS.fromStrict len <- either (const Nothing) (Just . view _3) (runGetOrFail getWord16be slen) & MaybeT . pure - sIdx <- liftIO (try @_ @IOException (LBS.hGet ha (fromIntegral len))) + sIdx <- liftIO (try @_ @IOException (BS.hGet ha (fromIntegral len))) >>= either (const mzero) pure + <&> LBS.fromStrict deserialiseOrFail @IndexEntry sIdx & either (const mzero) pure @@ -207,10 +238,10 @@ readIndex sto offset num = liftIO do -- so we keep only the newer values in map atomically do for_ new $ \(k,v) -> do - let tv = csKeys sto ! getKeyPrefix k + let tv = getBucket sto k modifyTVar tv (HM.insertWith (\_ o -> o) k v) -compactStorageCommit :: ForCompactStorage m => CompactStorage -> m () +compactStorageCommit :: ForCompactStorage m => CompactStorage k -> m () compactStorageCommit sto = liftIO do withMVar (csHandle sto) $ \ha -> do hSeek ha SeekFromEnd 0 @@ -224,7 +255,7 @@ compactStorageCommit sto = liftIO do -- write fwd offFwd <- hTell ha - LBS.hPut ha (toLazyByteString $ word64BE 0) + BS.hPut ha (LBS.toStrict $ toLazyByteString $ word64BE 0) let off0 = offFwd + 8 @@ -250,11 +281,11 @@ compactStorageCommit sto = liftIO do -- write index for_ idxEntries $ \(e,_) -> do let lbs = serialise e - LBS.hPut ha (B.toLazyByteString $ + BS.hPut ha $ LBS.toStrict (B.toLazyByteString $ word16BE (fromIntegral $ LBS.length lbs) <> B.lazyByteString lbs) - offPrev <- readIORef (csHeaderOff sto) + offPrev <- readTVarIO (csHeaderOff sto) offCommitHead <- hTell ha @@ -263,7 +294,7 @@ compactStorageCommit sto = liftIO do hSeek ha AbsoluteSeek offFwd - LBS.hPut ha (toLazyByteString $ word64BE (fromIntegral offCommitHead)) + BS.hPut ha (LBS.toStrict $ toLazyByteString $ word64BE (fromIntegral offCommitHead)) hFlush ha @@ -271,14 +302,13 @@ compactStorageCommit sto = liftIO do offLast <- hTell ha <&> fromIntegral - -- atomically do - atomicWriteIORef (csHeaderOff sto) (offLast - headerSize 1) - atomically do + writeTVar (csHeaderOff sto) (offLast - headerSize 1) for_ idxEntries $ \(e,i) -> do let k = idxEntryKey e - let tv = csKeys sto ! getKeyPrefix k + let tv = getBucket sto k modifyTVar tv (HM.alter (doAlter (Entry i (Off e))) k) + resetUncommitedSTM sto where @@ -294,10 +324,10 @@ compactStorageCommit sto = liftIO do getSeq = \case Entry i _ -> i -compactStorageDel :: ForCompactStorage m => CompactStorage -> ByteString -> m () +compactStorageDel :: ForCompactStorage m => CompactStorage k -> ByteString -> m () compactStorageDel sto key = do - let tvar = csKeys sto ! getKeyPrefix key + let tvar = getBucket sto key val <- readTVarIO tvar <&> HM.lookup key case val of @@ -308,27 +338,47 @@ compactStorageDel sto key = do atomically do j <- newSequenceSTM sto modifyTVar tvar (HM.insert key (Entry j (Del e))) + succUncommitedSTM sto 1 Just (Entry i (New v)) -> do -- FIXME: if-commit-in-progress-then-put-tomb - atomically $ modifyTVar tvar (HM.delete key) + atomically do + modifyTVar tvar (HM.delete key) + succUncommitedSTM sto 1 -newSequenceSTM :: CompactStorage -> STM Integer +newSequenceSTM :: CompactStorage k -> STM Integer newSequenceSTM sto = stateTVar (csSeq sto) (\n -> (n+1,n)) -compactStoragePut :: ForCompactStorage m => CompactStorage -> ByteString -> ByteString -> m () +succUncommitedSTM :: CompactStorage k -> Integer -> STM () +succUncommitedSTM sto k = modifyTVar (csUncommitted sto) (+k) + +resetUncommitedSTM :: CompactStorage k -> STM () +resetUncommitedSTM sto = writeTVar (csUncommitted sto) 0 + +compactStoragePut :: ForCompactStorage m => CompactStorage k -> ByteString -> ByteString -> m () compactStoragePut sto k v = do -- TODO: ASAP-do-not-write-value-if-not-changed - let tvar = csKeys sto ! getKeyPrefix k + let tvar = getBucket sto k atomically $ do c <- newSequenceSTM sto modifyTVar tvar (HM.insert k (Entry c (New v))) -compactStorageGet :: ForCompactStorage m => CompactStorage -> ByteString -> m (Maybe ByteString) +-- TODO: slow-parallel-read-access +-- будет тормозить на конкурентном считывании уже +-- существующих (не новых), значений +-- так как будет в эксклюзивном режиме елозить +-- указателем по файлу. +-- Возможные варианты: маппить файл при доступе +-- на чтение (а что делать, если он растёт?) +-- Собирать операции чтения в батчи и читать батч +-- последовательно (будут некие задержки, и +-- чтение становится хитрожопой операцией, а не +-- просто из файла считать) +compactStorageGet :: ForCompactStorage m => CompactStorage k -> ByteString -> m (Maybe ByteString) compactStorageGet sto key = do - let tvar = csKeys sto ! getKeyPrefix key + let tvar = getBucket sto key val <- readTVarIO tvar <&> HM.lookup key case val of @@ -343,7 +393,18 @@ compactStorageGet sto key = do BS.hGet ha (fromIntegral $ idxEntrySize e) either throwIO (pure . Just) r -compactStorageClose :: ForCompactStorage m => CompactStorage -> m () + +compactStorageExists :: ForCompactStorage m => CompactStorage k -> ByteString -> m (Maybe Integer) +compactStorageExists sto key = do + let tvar = getBucket sto key + val <- readTVarIO tvar <&> HM.lookup key + + case val of + Just (Entry _ (New s)) -> pure (Just (fromIntegral (BS.length s))) + Just (Entry _ (Off e)) -> pure (Just (fromIntegral $ idxEntrySize e)) + _ -> pure Nothing + +compactStorageClose :: ForCompactStorage m => CompactStorage k -> m () compactStorageClose sto = do compactStorageCommit sto -- FIXME: hangs-forever-on-io-exception @@ -365,14 +426,13 @@ compactStorageFindLiveHeads path = liftIO do fwdOff <- hTell ha -- fwd section - fwd <- lift (LBS.hGet ha 8) + fwd <- lift (LBS.fromStrict <$> BS.hGet ha 8) <&> runGetOrFail getWord64be >>= either (const mzero) pure <&> view _3 h@(o,header) <- MaybeT $ readHeader mv (Just $ fromIntegral fwd) - let magicOk = hdrMagic header == headerMagic let fwdOk = hdrFwdOffset header == fromIntegral fwdOff @@ -384,6 +444,10 @@ compactStorageFindLiveHeads path = liftIO do maybe (pure acc) (\h -> next ( h : acc) ) what +compactStorageRun :: ForCompactStorage m => m () +compactStorageRun = forever do + pause @'Seconds 1 + appendHeader :: ForCompactStorage m => Handle -> FwdEntryOffset -- fwd section offset @@ -398,7 +462,7 @@ appendHeader ha fwdOff poffset ioffset num = do <> word64BE (coerce ioffset) -- 20 <> word32BE (coerce num) -- 24 <> word64BE (coerce $ fromMaybe 0 poffset) -- 32 - liftIO $ LBS.hPut ha (B.toLazyByteString bs) + liftIO $ BS.hPut ha (LBS.toStrict $ B.toLazyByteString bs) readHeader :: ForCompactStorage m => MVar Handle @@ -415,9 +479,9 @@ readHeader mha moff = do hSeek ha AbsoluteSeek (fromIntegral off) p <- hTell ha <&> fromIntegral - (p,) <$> LBS.hGet ha (headerSize 1) + (p,) <$> BS.hGet ha (headerSize 1) - let what = flip runGetOrFail bs do + let what = flip runGetOrFail (LBS.fromStrict bs) do Header <$> getWord16be <*> getWord16be <*> getFwdOffset @@ -443,3 +507,58 @@ headerSize 1 = fromIntegral (32 :: Integer) headerSize _ = error "unsupported header version" +-- Storage instance + +translateKey :: Coercible (Hash hash) LBS.ByteString + => ByteString + -> Hash hash + -> ByteString +translateKey prefix hash = prefix <> LBS.toStrict (coerce hash) + +{-# INLINE translateKey #-} + +instance ( MonadIO m, IsKey hash + , Hashed hash LBS.ByteString + , Coercible (Hash hash) LBS.ByteString + , Serialise (Hash hash) + , Key hash ~ Hash hash + , Eq (Key hash) + ) + => Storage (CompactStorage hash) hash LBS.ByteString m where + + putBlock = enqueueBlock + + enqueueBlock s lbs = do + let hash = hashObject @hash lbs + compactStoragePut s (translateKey "V" hash) (LBS.toStrict lbs) + pure (Just hash) + + getBlock s hash = do + compactStorageGet s (translateKey "V" hash) <&> fmap LBS.fromStrict + + getChunk s k off size = do + undefined + -- liftIO $ simpleGetChunkLazy s k off size + + hasBlock sto k = do + compactStorageExists sto (translateKey "V" k) + + updateRef sto ref v = do + let hash = hashObject @hash ref + -- TODO: figure-out-what-to-do-with-metadata + compactStoragePut sto (translateKey "R" hash) (LBS.toStrict (serialise v)) + + getRef sto ref = do + let hash = hashObject @hash ref + runMaybeT do + v <- MaybeT $ compactStorageGet sto (translateKey "R" hash) + deserialiseOrFail @(Hash hash) (LBS.fromStrict v) + & either (const mzero) pure + + delBlock sto h = do + compactStorageDel sto (translateKey "V" h) + + delRef sto ref = do + compactStorageDel sto (translateKey "V" (hashObject @hash ref)) + +