mirror of https://github.com/voidlizard/hbs2
wip
This commit is contained in:
parent
7ae3c1e529
commit
6de1949e56
|
@ -30,6 +30,7 @@ import Data.HashMap.Strict (HashMap)
|
|||
import Data.HashMap.Strict qualified as HashMap
|
||||
import Data.Map (Map)
|
||||
import Data.Map qualified as Map
|
||||
import Data.List.Split (chunksOf)
|
||||
|
||||
import Streaming.Prelude (Of,Stream)
|
||||
import Streaming.Prelude qualified as S
|
||||
|
@ -66,10 +67,7 @@ main = do
|
|||
let s = readDef @Int 256 ss
|
||||
let p = pref
|
||||
|
||||
let bss = randomByteStrings n s
|
||||
let bss2 = randomByteStrings n s
|
||||
let bss3 = randomByteStrings n s
|
||||
let bss4 = randomByteStrings n s
|
||||
bss <- S.toList_ $ randomByteStrings n s
|
||||
-- let bss41 = randomByteStrings (n `div` 2) s
|
||||
-- let bss42 = randomByteStrings (n`div` 2) s
|
||||
-- let bss43 = randomByteStrings (n`div`4) s
|
||||
|
@ -95,27 +93,28 @@ main = do
|
|||
|
||||
print $ "preparing to write" <+> pretty n <+> "chunks"
|
||||
|
||||
timeItNamed "write chunks to simple storage" do
|
||||
S.mapM_ (enqueueBlock storage) bss
|
||||
|
||||
timeItNamed "write chunks to sqlite test" do
|
||||
withDB env $ transactional do
|
||||
flip S.mapM_ bss2 $ \bs -> do
|
||||
let h = hashObject @HbSync bs & pretty & show
|
||||
insert [qc|insert into wtf (hash,val) values(?,?)|] (h,bs)
|
||||
|
||||
timeItNamed "write chunks to log" do
|
||||
fh <- openFile (path </> "lsm") AppendMode
|
||||
flip S.mapM_ bss3 $ \bs -> do
|
||||
forM_ bss $ \bs -> do
|
||||
let h = hashObject @HbSync bs & pretty & show
|
||||
LBS.hPut fh (serialise (h,bs))
|
||||
hClose fh
|
||||
|
||||
timeItNamed "write chunks to simple storage" do
|
||||
mapM_ (enqueueBlock storage) bss
|
||||
|
||||
timeItNamed "write chunks to sqlite test" do
|
||||
withDB env $ transactional do
|
||||
forM_ bss $ \bs -> do
|
||||
let h = hashObject @HbSync bs & pretty & show
|
||||
insert [qc|insert into wtf (hash,val) values(?,?)|] (h,bs)
|
||||
|
||||
|
||||
timeItNamed "write chunks to log 2" do
|
||||
buf <- newIORef (mempty, 0 :: Int)
|
||||
fh <- openFile (path </> "lsm2") AppendMode
|
||||
|
||||
flip S.mapM_ bss3 $ \bs -> do
|
||||
forM_ bss $ \bs -> do
|
||||
let h = hashObject @HbSync bs & pretty & show
|
||||
num <- atomicModifyIORef buf (\(chunks,sz) -> ((serialise (h,bs) : chunks,sz+1),sz+1))
|
||||
|
||||
|
@ -127,16 +126,23 @@ main = do
|
|||
LBS.hPut fh (mconcat w)
|
||||
hClose fh
|
||||
|
||||
|
||||
let cn = length bss `div` 2
|
||||
let chu = chunksOf cn bss
|
||||
|
||||
timeItNamed "write chunks to compact-storage" do
|
||||
|
||||
temp <- liftIO $ emptyTempFile "." "compact-storage"
|
||||
|
||||
sto <- compactStorageOpen mempty temp
|
||||
|
||||
flip S.mapM_ bss4 $ \bs -> do
|
||||
w <- for chu $ \css -> do
|
||||
async do
|
||||
for_ css $ \bs -> do
|
||||
let h = hashObject @HbSync bs
|
||||
compactStoragePut sto (coerce h) (LBS.toStrict bs)
|
||||
|
||||
mapM_ wait w
|
||||
compactStorageClose sto
|
||||
|
||||
timeItNamed "write chunks to LSM-mock" do
|
||||
|
|
|
@ -88,6 +88,7 @@ library
|
|||
, temporary
|
||||
, filepattern
|
||||
, unliftio
|
||||
, vector
|
||||
|
||||
|
||||
hs-source-dirs: lib
|
||||
|
@ -172,6 +173,7 @@ executable hbs2-storage-simple-benchmarks
|
|||
, safe
|
||||
, serialise
|
||||
, streaming
|
||||
, split
|
||||
, text
|
||||
, temporary
|
||||
, transformers
|
||||
|
|
|
@ -18,6 +18,8 @@ import Data.HashMap.Strict (HashMap)
|
|||
import Data.HashMap.Strict qualified as HM
|
||||
import Data.Foldable
|
||||
import Data.Traversable
|
||||
import Data.Vector (Vector,(!))
|
||||
import Data.Vector qualified as V
|
||||
import Codec.Serialise
|
||||
import GHC.Generics
|
||||
-- import System.IO
|
||||
|
@ -84,8 +86,9 @@ data CompactStorage =
|
|||
CompactStorage
|
||||
{ csHandle :: MVar Handle
|
||||
, csHeaderOff :: IORef EntryOffset
|
||||
, csSeq :: IORef Integer
|
||||
, csKeys :: IORef (HashMap ByteString (Either (IndexEntry,Integer) (ByteString,Integer)))
|
||||
-- , csSeq :: IORef Integer
|
||||
, csSeq :: TVar Integer
|
||||
, csKeys :: Vector (TVar (HashMap ByteString (Either (IndexEntry,Integer) (ByteString,Integer))))
|
||||
}
|
||||
|
||||
type ForCompactStorage m = MonadIO m
|
||||
|
@ -101,6 +104,15 @@ data CompactStorageOpenError =
|
|||
instance Exception CompactStorageOpenError
|
||||
|
||||
|
||||
|
||||
buckets :: Int
|
||||
buckets = 8
|
||||
|
||||
-- FIXME: buckets-hardcode
|
||||
getKeyPrefix :: ByteString -> Int
|
||||
getKeyPrefix bs = maybe 0 (fromIntegral.fst) (BS.uncons bs) `mod` buckets
|
||||
{-# INLINE getKeyPrefix #-}
|
||||
|
||||
compactStorageOpen :: ForCompactStorage m
|
||||
=> [CompactStorageOpenOpt]
|
||||
-> FilePath
|
||||
|
@ -112,8 +124,11 @@ compactStorageOpen _ fp = do
|
|||
mha <- newMVar ha
|
||||
|
||||
hoff0 <- newIORef 0
|
||||
keys0 <- newIORef mempty
|
||||
ss <- newIORef 0
|
||||
|
||||
keys0 <- replicateM buckets (newTVarIO mempty) <&> V.fromList
|
||||
|
||||
-- ss <- newIORef 0
|
||||
ss <- newTVarIO 0
|
||||
|
||||
if sz == 0 then
|
||||
pure $ CompactStorage mha hoff0 ss keys0
|
||||
|
@ -169,16 +184,21 @@ readIndex sto offset num = liftIO do
|
|||
when (rn /= num) do
|
||||
throwIO BrokenIndex
|
||||
|
||||
let new = HM.fromList [ (idxEntryKey e,Left (e,0)) | e <- entries ]
|
||||
let new = [ (idxEntryKey e,Left (e,0)) | e <- entries ]
|
||||
-- readIndex from newer to older
|
||||
-- so we keep only the newer values in map
|
||||
modifyIORef' (csKeys sto) (HM.unionWith (\_ b -> b) new)
|
||||
atomically do
|
||||
for_ new $ \(k,v) -> do
|
||||
let tv = csKeys sto ! getKeyPrefix k
|
||||
modifyTVar tv (HM.insertWith (\_ o -> o) k v)
|
||||
|
||||
compactStorageCommit :: ForCompactStorage m => CompactStorage -> m ()
|
||||
compactStorageCommit sto = liftIO do
|
||||
withMVar (csHandle sto) $ \ha -> do
|
||||
hSeek ha SeekFromEnd 0
|
||||
kv <- readIORef (csKeys sto) <&> HM.toList
|
||||
|
||||
kv <- atomically do
|
||||
mapM readTVar (csKeys sto) <&> mconcat . V.toList . fmap HM.toList
|
||||
|
||||
let items = [ (k, v) | (k, Right v) <- kv ]
|
||||
|
||||
|
@ -227,26 +247,36 @@ compactStorageCommit sto = liftIO do
|
|||
|
||||
-- atomically do
|
||||
atomicWriteIORef (csHeaderOff sto) (offLast - headerSize 1)
|
||||
atomicModifyIORef' (csKeys sto) $ \m -> do
|
||||
let new = foldl merge m idxEntries
|
||||
(new, ())
|
||||
|
||||
atomically do
|
||||
for_ idxEntries $ \(e,i) -> do
|
||||
let k = idxEntryKey e
|
||||
let tv = csKeys sto ! getKeyPrefix k
|
||||
modifyTVar tv (HM.insertWith merge k (Left (e, i)))
|
||||
|
||||
where
|
||||
merge m (el,i) = HM.insertWith mergeEl (idxEntryKey el) (Left (el,i)) m
|
||||
mergeEl new old = if getSeq new >= getSeq old then new else old
|
||||
merge new old = if getSeq new >= getSeq old then new else old
|
||||
getSeq = \case
|
||||
Left (_,i) -> i
|
||||
Right (_,i) -> i
|
||||
|
||||
|
||||
compactStoragePut :: ForCompactStorage m => CompactStorage -> ByteString -> ByteString -> m ()
|
||||
compactStoragePut sto k v = do
|
||||
-- TODO: ASAP-do-not-write-value-if-not-changed
|
||||
c <- atomicModifyIORef' (csSeq sto) (\n -> (n+1,n))
|
||||
atomicModifyIORef' (csKeys sto) (\m -> (HM.insert k (Right (v,c)) m, ()))
|
||||
|
||||
let tvar = csKeys sto ! getKeyPrefix k
|
||||
|
||||
atomically $ do
|
||||
c <- stateTVar (csSeq sto) (\n -> (n+1,n))
|
||||
modifyTVar tvar (HM.insert k (Right (v,c)))
|
||||
|
||||
compactStorageGet :: ForCompactStorage m => CompactStorage -> ByteString -> m (Maybe ByteString)
|
||||
compactStorageGet sto key = do
|
||||
val <- readIORef (csKeys sto) <&> HM.lookup key
|
||||
let tvar = csKeys sto ! getKeyPrefix key
|
||||
|
||||
val <- readTVarIO tvar <&> HM.lookup key
|
||||
|
||||
case val of
|
||||
Nothing -> pure Nothing
|
||||
Just (Right (s,_)) -> pure (Just s)
|
||||
|
|
Loading…
Reference in New Issue