This commit is contained in:
Dmitry Zuikov 2024-05-30 13:18:39 +03:00
parent 82b6a8af57
commit 6c5ebfe38e
1 changed files with 32 additions and 26 deletions

View File

@ -77,9 +77,9 @@ data Header =
data CompactStorage = data CompactStorage =
CompactStorage CompactStorage
{ csHandle :: MVar Handle { csHandle :: MVar Handle
, csHandleSem :: TSem , csHeaderOff :: IORef EntryOffset
, csHeaderOff :: TVar EntryOffset , csSeq :: IORef Integer
, csKeys :: TVar (HashMap ByteString (Either IndexEntry ByteString)) , csKeys :: IORef (HashMap ByteString (Either (IndexEntry,Integer) (ByteString,Integer)))
} }
type ForCompactStorage m = MonadIO m type ForCompactStorage m = MonadIO m
@ -104,18 +104,17 @@ compactStorageOpen _ fp = do
sz <- hFileSize ha sz <- hFileSize ha
mha <- newMVar ha mha <- newMVar ha
hoff0 <- newTVarIO 0 hoff0 <- newIORef 0
keys0 <- newTVarIO mempty keys0 <- newIORef mempty
ss <- newIORef 0
sem <- atomically $ newTSem 1
if sz == 0 then if sz == 0 then
pure $ CompactStorage mha sem hoff0 keys0 pure $ CompactStorage mha hoff0 ss keys0
else do else do
(p,header) <- readHeader mha Nothing >>= maybe (throwIO InvalidHeader) pure (p,header) <- readHeader mha Nothing >>= maybe (throwIO InvalidHeader) pure
traceM (show ("HEADER",header)) traceM (show ("HEADER",header))
hoff <- newTVarIO p hoff <- newIORef p
let sto = CompactStorage mha sem hoff keys0 let sto = CompactStorage mha hoff ss keys0
readIndex sto (hdrIndexOffset header) (hdrIndexEntries header) readIndex sto (hdrIndexOffset header) (hdrIndexEntries header)
flip fix (hdrPrev header) $ \next -> \case flip fix (hdrPrev header) $ \next -> \case
@ -162,17 +161,16 @@ readIndex sto offset num = liftIO do
when (rn /= num) do when (rn /= num) do
throwIO BrokenIndex throwIO BrokenIndex
atomically do let new = HM.fromList [ (idxEntryKey e,Left (e,0)) | e <- entries ]
let new = HM.fromList [ (k,Left e) | e@(IndexEntry _ _ _ _ k) <- entries ]
-- readIndex from newer to older -- readIndex from newer to older
-- so we keep only the newer values in map -- so we keep only the newer values in map
modifyTVar (csKeys sto) (HM.unionWith (\_ b -> b) new) modifyIORef' (csKeys sto) (HM.unionWith (\_ b -> b) new)
compactStorageCommit :: ForCompactStorage m => CompactStorage -> m () compactStorageCommit :: ForCompactStorage m => CompactStorage -> m ()
compactStorageCommit sto = liftIO do compactStorageCommit sto = liftIO do
withMVar (csHandle sto) $ \ha -> do withMVar (csHandle sto) $ \ha -> do
hSeek ha SeekFromEnd 0 hSeek ha SeekFromEnd 0
kv <- readTVarIO (csKeys sto) <&> HM.toList kv <- readIORef (csKeys sto) <&> HM.toList
let items = [ (k, v) | (k, Right v) <- kv ] let items = [ (k, v) | (k, Right v) <- kv ]
@ -183,20 +181,20 @@ compactStorageCommit sto = liftIO do
idxEntries <- flip fix (off0, items, mempty) $ \next (off, what, idx) -> do idxEntries <- flip fix (off0, items, mempty) $ \next (off, what, idx) -> do
case what of case what of
[] -> pure idx [] -> pure idx
((k,v):rest) -> do ((k,(v,i)):rest) -> do
BS.hPut ha v BS.hPut ha v
let sz = fromIntegral $ BS.length v let sz = fromIntegral $ BS.length v
next (off + sz, rest, IndexEntry (fromIntegral off) (fromIntegral sz) 0 False k : idx) next (off + sz, rest, (IndexEntry (fromIntegral off) (fromIntegral sz) 0 False k,i) : idx)
offIdx0 <- hTell ha <&> fromIntegral offIdx0 <- hTell ha <&> fromIntegral
for_ idxEntries $ \e -> do for_ idxEntries $ \(e,_) -> do
let lbs = serialise e let lbs = serialise e
LBS.hPut ha (B.toLazyByteString $ LBS.hPut ha (B.toLazyByteString $
word16BE (fromIntegral $ LBS.length lbs) word16BE (fromIntegral $ LBS.length lbs)
<> B.lazyByteString lbs) <> B.lazyByteString lbs)
offPrev <- readTVarIO (csHeaderOff sto) offPrev <- readIORef (csHeaderOff sto)
-- FIXME: maybe-slow-length-calc -- FIXME: maybe-slow-length-calc
appendHeader ha (Just offPrev) offIdx0 (fromIntegral $ length idxEntries) appendHeader ha (Just offPrev) offIdx0 (fromIntegral $ length idxEntries)
@ -206,24 +204,32 @@ compactStorageCommit sto = liftIO do
offLast <- hTell ha <&> fromIntegral offLast <- hTell ha <&> fromIntegral
let es = HM.fromList [ (idxEntryKey e, Left e) | e <- idxEntries ] -- atomically do
atomicWriteIORef (csHeaderOff sto) (offLast - headerSize 1)
atomicModifyIORef' (csKeys sto) $ \m -> do
let new = foldl merge m idxEntries
(new, ())
atomically do where
writeTVar (csHeaderOff sto) (offLast - headerSize 1) merge m (el,i) = HM.insertWith mergeEl (idxEntryKey el) (Left (el,i)) m
modifyTVar (csKeys sto) (`mappend` es) mergeEl new old = if getSeq new >= getSeq old then new else old
getSeq = \case
Left (_,i) -> i
Right (_,i) -> i
compactStoragePut :: ForCompactStorage m => CompactStorage -> ByteString -> ByteString -> m () compactStoragePut :: ForCompactStorage m => CompactStorage -> ByteString -> ByteString -> m ()
compactStoragePut sto k v = do compactStoragePut sto k v = do
-- TODO: ASAP-do-not-write-value-if-not-changed -- TODO: ASAP-do-not-write-value-if-not-changed
atomically $ modifyTVar (csKeys sto) (HM.insert k (Right v)) c <- atomicModifyIORef' (csSeq sto) (\n -> (n+1,n))
atomicModifyIORef' (csKeys sto) (\m -> (HM.insert k (Right (v,c)) m, ()))
compactStorageGet :: ForCompactStorage m => CompactStorage -> ByteString -> m (Maybe ByteString) compactStorageGet :: ForCompactStorage m => CompactStorage -> ByteString -> m (Maybe ByteString)
compactStorageGet sto key = do compactStorageGet sto key = do
val <- readTVarIO (csKeys sto) <&> HM.lookup key val <- readIORef (csKeys sto) <&> HM.lookup key
case val of case val of
Nothing -> pure Nothing Nothing -> pure Nothing
Just (Right s) -> pure (Just s) Just (Right (s,_)) -> pure (Just s)
Just (Left e) -> liftIO do Just (Left (e,_)) -> liftIO do
r <- withMVar (csHandle sto) $ \ha -> do r <- withMVar (csHandle sto) $ \ha -> do
try @_ @IOException do try @_ @IOException do
hSeek ha AbsoluteSeek (fromIntegral $ idxEntryOffset e) hSeek ha AbsoluteSeek (fromIntegral $ idxEntryOffset e)