This commit is contained in:
Dmitry Zuikov 2024-05-31 10:40:41 +03:00
parent ce87e43829
commit 7ae3c1e529
2 changed files with 50 additions and 28 deletions

View File

@ -76,6 +76,7 @@ library
, mtl , mtl
, prettyprinter , prettyprinter
, random , random
, safe
, stm , stm
, stm-chans , stm-chans
, streaming , streaming

View File

@ -26,6 +26,7 @@ import Control.Monad.Except
import Control.Monad.Trans.Maybe import Control.Monad.Trans.Maybe
import Control.Monad import Control.Monad
import Control.Concurrent.STM.TSem import Control.Concurrent.STM.TSem
import Safe
import UnliftIO import UnliftIO
import Debug.Trace import Debug.Trace
@ -75,7 +76,6 @@ data Header =
, hdrFwdOffset :: FwdEntryOffset , hdrFwdOffset :: FwdEntryOffset
, hdrIndexOffset :: EntryOffset , hdrIndexOffset :: EntryOffset
, hdrIndexEntries :: EntryNum , hdrIndexEntries :: EntryNum
, hdrGen :: Word32
, hdrPrev :: EntryOffset , hdrPrev :: EntryOffset
} }
deriving stock (Show,Generic) deriving stock (Show,Generic)
@ -84,7 +84,6 @@ data CompactStorage =
CompactStorage CompactStorage
{ csHandle :: MVar Handle { csHandle :: MVar Handle
, csHeaderOff :: IORef EntryOffset , csHeaderOff :: IORef EntryOffset
, csHeaderGen :: IORef Word32
, csSeq :: IORef Integer , csSeq :: IORef Integer
, csKeys :: IORef (HashMap ByteString (Either (IndexEntry,Integer) (ByteString,Integer))) , csKeys :: IORef (HashMap ByteString (Either (IndexEntry,Integer) (ByteString,Integer)))
} }
@ -96,6 +95,7 @@ data CompactStorageOpenOpt
data CompactStorageOpenError = data CompactStorageOpenError =
InvalidHeader InvalidHeader
| BrokenIndex | BrokenIndex
| InvalidFwdSection
deriving stock (Typeable,Show) deriving stock (Typeable,Show)
instance Exception CompactStorageOpenError instance Exception CompactStorageOpenError
@ -114,37 +114,26 @@ compactStorageOpen _ fp = do
hoff0 <- newIORef 0 hoff0 <- newIORef 0
keys0 <- newIORef mempty keys0 <- newIORef mempty
ss <- newIORef 0 ss <- newIORef 0
gen0 <- newIORef 0
if sz == 0 then if sz == 0 then
pure $ CompactStorage mha hoff0 gen0 ss keys0 pure $ CompactStorage mha hoff0 ss keys0
else do else do
(p,header) <- readHeader mha Nothing >>= maybe (throwIO InvalidHeader) pure (p,header) <- readHeader mha Nothing >>= maybe (throwIO InvalidHeader) pure
traceM (show ("HEADER",header)) traceM (show ("HEADER",header))
hoff <- newIORef p hoff <- newIORef p
let sto = CompactStorage mha hoff gen0 ss keys0 let sto = CompactStorage mha hoff ss keys0
updateHeaderGen sto header
readIndex sto (hdrIndexOffset header) (hdrIndexEntries header) readIndex sto (hdrIndexOffset header) (hdrIndexEntries header)
flip fix (hdrPrev header) $ \next -> \case flip fix (hdrPrev header) $ \next -> \case
0 -> pure () 0 -> pure ()
off -> do off -> do
(_,pHeader) <- readHeader mha (Just off) >>= maybe (throwIO InvalidHeader) pure (_,pHeader) <- readHeader mha (Just off) >>= maybe (throwIO InvalidHeader) pure
updateHeaderGen sto pHeader
traceM (show ("PHEADER",pHeader)) traceM (show ("PHEADER",pHeader))
readIndex sto (hdrIndexOffset pHeader) (hdrIndexEntries pHeader) readIndex sto (hdrIndexOffset pHeader) (hdrIndexEntries pHeader)
next (hdrPrev pHeader) next (hdrPrev pHeader)
pure sto pure sto
where
updateHeaderGen :: ForCompactStorage m
=> CompactStorage
-> Header
-> m ()
updateHeaderGen sto hdr = do
writeIORef (csHeaderGen sto) (hdrGen hdr)
readIndex :: ForCompactStorage m readIndex :: ForCompactStorage m
=> CompactStorage => CompactStorage
@ -199,7 +188,7 @@ compactStorageCommit sto = liftIO do
offFwd <- hTell ha offFwd <- hTell ha
LBS.hPut ha (toLazyByteString $ word64BE 0) LBS.hPut ha (toLazyByteString $ word64BE 0)
off0 <- hTell ha let off0 = offFwd + 8
-- write data -- write data
idxEntries <- flip fix (off0, items, mempty) $ \next (off, what, idx) -> do idxEntries <- flip fix (off0, items, mempty) $ \next (off, what, idx) -> do
@ -221,15 +210,14 @@ compactStorageCommit sto = liftIO do
offPrev <- readIORef (csHeaderOff sto) offPrev <- readIORef (csHeaderOff sto)
pver <- atomicModifyIORef' (csHeaderGen sto) (\v -> (v+1, v))
-- FIXME: maybe-slow-length-calc
appendHeader ha (fromIntegral offFwd) (Just offPrev) pver offIdx0 (fromIntegral $ length idxEntries)
offCommitHead <- hTell ha offCommitHead <- hTell ha
-- FIXME: maybe-slow-length-calc
appendHeader ha (fromIntegral offFwd) (Just offPrev) offIdx0 (fromIntegral $ length idxEntries)
hSeek ha AbsoluteSeek offFwd hSeek ha AbsoluteSeek offFwd
LBS.hPut ha (toLazyByteString $ word16BE (fromIntegral offCommitHead)) LBS.hPut ha (toLazyByteString $ word64BE (fromIntegral offCommitHead))
hFlush ha hFlush ha
@ -275,23 +263,57 @@ compactStorageClose sto = do
-- FIXME: hangs-forever-on-io-exception -- FIXME: hangs-forever-on-io-exception
liftIO $ withMVar (csHandle sto) hClose liftIO $ withMVar (csHandle sto) hClose
compactStorageFindLiveHeads :: ForCompactStorage m
=> FilePath
-> m [(EntryOffset, Header)]
compactStorageFindLiveHeads path = liftIO do
withFile path ReadMode $ \ha -> do
mv <- newMVar ha
flip fix (mempty :: [(EntryOffset, Header)]) $ \next acc -> do
what <- runMaybeT do
fwdOff <- hTell ha
-- fwd section
fwd <- lift (LBS.hGet ha 8)
<&> runGetOrFail getWord64be
>>= either (const mzero) pure
<&> view _3
traceM $ show ("JOPA1", fwdOff, fwd)
h@(o,header) <- MaybeT $ readHeader mv (Just $ fromIntegral fwd)
let magicOk = hdrMagic header == headerMagic
let fwdOk = hdrFwdOffset header == fromIntegral fwdOff
if magicOk && fwdOk then
pure h
else
mzero
maybe (pure acc) (\h -> next ( h : acc) ) what
appendHeader :: ForCompactStorage m appendHeader :: ForCompactStorage m
=> Handle => Handle
-> FwdEntryOffset -- fwd section offset -> FwdEntryOffset -- fwd section offset
-> Maybe EntryOffset -- prev. header -> Maybe EntryOffset -- prev. header
-> Word32 -- prev. header version
-> EntryOffset -> EntryOffset
-> EntryNum -> EntryNum
-> m () -> m ()
appendHeader ha fwdOff poffset v ioffset num = do appendHeader ha fwdOff poffset ioffset num = do
let bs = word16BE headerMagic -- 2 let bs = word16BE headerMagic -- 2
<> word16BE headerVersion -- 4 <> word16BE headerVersion -- 4
<> word64BE (coerce fwdOff) -- 12 <> word64BE (coerce fwdOff) -- 12
<> word64BE (coerce ioffset) -- 20 <> word64BE (coerce ioffset) -- 20
<> word32BE (coerce num) -- 24 <> word32BE (coerce num) -- 24
<> word32BE v -- 28 <> word64BE (coerce $ fromMaybe 0 poffset) -- 32
<> word64BE (coerce $ fromMaybe 0 poffset) -- 36
<> byteString (BS.replicate 4 0) -- 40
liftIO $ LBS.hPut ha (B.toLazyByteString bs) liftIO $ LBS.hPut ha (B.toLazyByteString bs)
readHeader :: ForCompactStorage m readHeader :: ForCompactStorage m
@ -317,7 +339,6 @@ readHeader mha moff = do
<*> getFwdOffset <*> getFwdOffset
<*> getOffset <*> getOffset
<*> getNum <*> getNum
<*> getWord32be
<*> getOffset <*> getOffset
pure $ either (const Nothing) (fmap (off,) . Just . view _3) what pure $ either (const Nothing) (fmap (off,) . Just . view _3) what
@ -334,7 +355,7 @@ headerVersion :: Word16
headerVersion = 1 headerVersion = 1
headerSize :: Integral a => Word16 -> a headerSize :: Integral a => Word16 -> a
headerSize 1 = fromIntegral (40 :: Integer) headerSize 1 = fromIntegral (32 :: Integer)
headerSize _ = error "unsupported header version" headerSize _ = error "unsupported header version"