This commit is contained in:
Dmitry Zuikov 2024-05-30 12:07:55 +03:00
parent 00c265c7ac
commit 82b6a8af57
1 changed files with 20 additions and 14 deletions

View File

@ -14,6 +14,8 @@ import Data.List qualified as List
import Data.Maybe
import Data.Map (Map)
import Data.Map qualified as Map
import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HM
import Data.Foldable
import Data.Traversable
import Codec.Serialise
@ -23,6 +25,7 @@ import Lens.Micro.Platform
import Control.Monad.Except
import Control.Monad.Trans.Maybe
import Control.Monad
import Control.Concurrent.STM.TSem
import UnliftIO
import Debug.Trace
@ -73,9 +76,10 @@ data Header =
data CompactStorage =
CompactStorage
{ csHandle :: MVar Handle
, csHeaderOff :: TVar EntryOffset
, csKeys :: TVar (Map ByteString (Either IndexEntry ByteString))
{ csHandle :: MVar Handle
, csHandleSem :: TSem
, csHeaderOff :: TVar EntryOffset
, csKeys :: TVar (HashMap ByteString (Either IndexEntry ByteString))
}
type ForCompactStorage m = MonadIO m
@ -103,13 +107,15 @@ compactStorageOpen _ fp = do
hoff0 <- newTVarIO 0
keys0 <- newTVarIO mempty
sem <- atomically $ newTSem 1
if sz == 0 then
pure $ CompactStorage mha hoff0 keys0
pure $ CompactStorage mha sem hoff0 keys0
else do
(p,header) <- readHeader mha Nothing >>= maybe (throwIO InvalidHeader) pure
traceM (show ("HEADER",header))
hoff <- newTVarIO p
let sto = CompactStorage mha hoff keys0
let sto = CompactStorage mha sem hoff keys0
readIndex sto (hdrIndexOffset header) (hdrIndexEntries header)
flip fix (hdrPrev header) $ \next -> \case
@ -157,16 +163,16 @@ readIndex sto offset num = liftIO do
throwIO BrokenIndex
atomically do
let new = Map.fromList [ (k,Left e) | e@(IndexEntry _ _ _ _ k) <- entries ]
let new = HM.fromList [ (k,Left e) | e@(IndexEntry _ _ _ _ k) <- entries ]
-- readIndex from newer to older
-- so we keep only the newer values in map
modifyTVar (csKeys sto) (Map.unionWith (\_ b -> b) new)
modifyTVar (csKeys sto) (HM.unionWith (\_ b -> b) new)
compactStorageCommit :: ForCompactStorage m => CompactStorage -> m ()
compactStorageCommit sto = liftIO do
withMVar (csHandle sto) $ \ha -> do
hSeek ha SeekFromEnd 0
kv <- readTVarIO (csKeys sto) <&> Map.toList
kv <- readTVarIO (csKeys sto) <&> HM.toList
let items = [ (k, v) | (k, Right v) <- kv ]
@ -200,19 +206,20 @@ compactStorageCommit sto = liftIO do
offLast <- hTell ha <&> fromIntegral
let es = HM.fromList [ (idxEntryKey e, Left e) | e <- idxEntries ]
atomically do
writeTVar (csHeaderOff sto) (offLast - headerSize 1)
for_ idxEntries $ \e -> do
modifyTVar (csKeys sto) (Map.insert (idxEntryKey e) (Left e))
modifyTVar (csKeys sto) (`mappend` es)
compactStoragePut :: ForCompactStorage m => CompactStorage -> ByteString -> ByteString -> m ()
compactStoragePut sto k v = do
-- TODO: ASAP-do-not-write-value-if-not-changed
atomically $ modifyTVar (csKeys sto) (Map.insert k (Right v))
atomically $ modifyTVar (csKeys sto) (HM.insert k (Right v))
compactStorageGet :: ForCompactStorage m => CompactStorage -> ByteString -> m (Maybe ByteString)
compactStorageGet sto key = do
val <- readTVarIO (csKeys sto) <&> Map.lookup key
val <- readTVarIO (csKeys sto) <&> HM.lookup key
case val of
Nothing -> pure Nothing
Just (Right s) -> pure (Just s)
@ -227,8 +234,7 @@ compactStorageClose :: ForCompactStorage m => CompactStorage -> m ()
compactStorageClose sto = do
compactStorageCommit sto
-- FIXME: hangs-forever-on-io-exception
w <- takeMVar (csHandle sto)
hClose w
liftIO $ withMVar (csHandle sto) hClose
appendHeader :: ForCompactStorage m
=> Handle