diff --git a/hbs2-storage-simple/hbs2-storage-simple.cabal b/hbs2-storage-simple/hbs2-storage-simple.cabal index 4e72cc19..da62c866 100644 --- a/hbs2-storage-simple/hbs2-storage-simple.cabal +++ b/hbs2-storage-simple/hbs2-storage-simple.cabal @@ -76,6 +76,7 @@ library , mtl , prettyprinter , random + , safe , stm , stm-chans , streaming diff --git a/hbs2-storage-simple/lib/HBS2/Storage/Compact.hs b/hbs2-storage-simple/lib/HBS2/Storage/Compact.hs index 376e5e96..7e968a54 100644 --- a/hbs2-storage-simple/lib/HBS2/Storage/Compact.hs +++ b/hbs2-storage-simple/lib/HBS2/Storage/Compact.hs @@ -26,6 +26,7 @@ import Control.Monad.Except import Control.Monad.Trans.Maybe import Control.Monad import Control.Concurrent.STM.TSem +import Safe import UnliftIO import Debug.Trace @@ -75,7 +76,6 @@ data Header = , hdrFwdOffset :: FwdEntryOffset , hdrIndexOffset :: EntryOffset , hdrIndexEntries :: EntryNum - , hdrGen :: Word32 , hdrPrev :: EntryOffset } deriving stock (Show,Generic) @@ -84,7 +84,6 @@ data CompactStorage = CompactStorage { csHandle :: MVar Handle , csHeaderOff :: IORef EntryOffset - , csHeaderGen :: IORef Word32 , csSeq :: IORef Integer , csKeys :: IORef (HashMap ByteString (Either (IndexEntry,Integer) (ByteString,Integer))) } @@ -96,6 +95,7 @@ data CompactStorageOpenOpt data CompactStorageOpenError = InvalidHeader | BrokenIndex + | InvalidFwdSection deriving stock (Typeable,Show) instance Exception CompactStorageOpenError @@ -114,37 +114,26 @@ compactStorageOpen _ fp = do hoff0 <- newIORef 0 keys0 <- newIORef mempty ss <- newIORef 0 - gen0 <- newIORef 0 if sz == 0 then - pure $ CompactStorage mha hoff0 gen0 ss keys0 + pure $ CompactStorage mha hoff0 ss keys0 else do (p,header) <- readHeader mha Nothing >>= maybe (throwIO InvalidHeader) pure traceM (show ("HEADER",header)) hoff <- newIORef p - let sto = CompactStorage mha hoff gen0 ss keys0 - updateHeaderGen sto header + let sto = CompactStorage mha hoff ss keys0 readIndex sto (hdrIndexOffset header) (hdrIndexEntries header) flip fix (hdrPrev header) $ \next -> \case 0 -> pure () off -> do (_,pHeader) <- readHeader mha (Just off) >>= maybe (throwIO InvalidHeader) pure - updateHeaderGen sto pHeader traceM (show ("PHEADER",pHeader)) readIndex sto (hdrIndexOffset pHeader) (hdrIndexEntries pHeader) next (hdrPrev pHeader) pure sto - where - - updateHeaderGen :: ForCompactStorage m - => CompactStorage - -> Header - -> m () - updateHeaderGen sto hdr = do - writeIORef (csHeaderGen sto) (hdrGen hdr) readIndex :: ForCompactStorage m => CompactStorage @@ -199,7 +188,7 @@ compactStorageCommit sto = liftIO do offFwd <- hTell ha LBS.hPut ha (toLazyByteString $ word64BE 0) - off0 <- hTell ha + let off0 = offFwd + 8 -- write data idxEntries <- flip fix (off0, items, mempty) $ \next (off, what, idx) -> do @@ -221,15 +210,14 @@ compactStorageCommit sto = liftIO do offPrev <- readIORef (csHeaderOff sto) - pver <- atomicModifyIORef' (csHeaderGen sto) (\v -> (v+1, v)) - -- FIXME: maybe-slow-length-calc - appendHeader ha (fromIntegral offFwd) (Just offPrev) pver offIdx0 (fromIntegral $ length idxEntries) - offCommitHead <- hTell ha + -- FIXME: maybe-slow-length-calc + appendHeader ha (fromIntegral offFwd) (Just offPrev) offIdx0 (fromIntegral $ length idxEntries) + hSeek ha AbsoluteSeek offFwd - LBS.hPut ha (toLazyByteString $ word16BE (fromIntegral offCommitHead)) + LBS.hPut ha (toLazyByteString $ word64BE (fromIntegral offCommitHead)) hFlush ha @@ -275,23 +263,57 @@ compactStorageClose sto = do -- FIXME: hangs-forever-on-io-exception liftIO $ withMVar (csHandle sto) hClose +compactStorageFindLiveHeads :: ForCompactStorage m + => FilePath + -> m [(EntryOffset, Header)] + +compactStorageFindLiveHeads path = liftIO do + withFile path ReadMode $ \ha -> do + + mv <- newMVar ha + + flip fix (mempty :: [(EntryOffset, Header)]) $ \next acc -> do + + what <- runMaybeT do + + fwdOff <- hTell ha + + -- fwd section + fwd <- lift (LBS.hGet ha 8) + <&> runGetOrFail getWord64be + >>= either (const mzero) pure + <&> view _3 + + traceM $ show ("JOPA1", fwdOff, fwd) + + h@(o,header) <- MaybeT $ readHeader mv (Just $ fromIntegral fwd) + + + let magicOk = hdrMagic header == headerMagic + let fwdOk = hdrFwdOffset header == fromIntegral fwdOff + + if magicOk && fwdOk then + pure h + else + mzero + + maybe (pure acc) (\h -> next ( h : acc) ) what + + appendHeader :: ForCompactStorage m => Handle -> FwdEntryOffset -- fwd section offset -> Maybe EntryOffset -- prev. header - -> Word32 -- prev. header version -> EntryOffset -> EntryNum -> m () -appendHeader ha fwdOff poffset v ioffset num = do +appendHeader ha fwdOff poffset ioffset num = do let bs = word16BE headerMagic -- 2 <> word16BE headerVersion -- 4 <> word64BE (coerce fwdOff) -- 12 <> word64BE (coerce ioffset) -- 20 <> word32BE (coerce num) -- 24 - <> word32BE v -- 28 - <> word64BE (coerce $ fromMaybe 0 poffset) -- 36 - <> byteString (BS.replicate 4 0) -- 40 + <> word64BE (coerce $ fromMaybe 0 poffset) -- 32 liftIO $ LBS.hPut ha (B.toLazyByteString bs) readHeader :: ForCompactStorage m @@ -317,7 +339,6 @@ readHeader mha moff = do <*> getFwdOffset <*> getOffset <*> getNum - <*> getWord32be <*> getOffset pure $ either (const Nothing) (fmap (off,) . Just . view _3) what @@ -334,7 +355,7 @@ headerVersion :: Word16 headerVersion = 1 headerSize :: Integral a => Word16 -> a -headerSize 1 = fromIntegral (40 :: Integer) +headerSize 1 = fromIntegral (32 :: Integer) headerSize _ = error "unsupported header version"