faster writer

This commit is contained in:
Dmitry Zuikov 2023-01-27 09:18:50 +03:00
parent 8c5615d8ae
commit d046ae0bc7
3 changed files with 93 additions and 131 deletions

View File

@ -1,6 +1,6 @@
{-# Language UndecidableInstances #-}
module HBS2.Actors.ChunkWriter module HBS2.Actors.ChunkWriter
( ChunkWriter ( ChunkWriter
, ChunkId
, newChunkWriterIO , newChunkWriterIO
, runChunkWriter , runChunkWriter
, stopChunkWriter , stopChunkWriter
@ -17,6 +17,7 @@ import HBS2.Hash
import HBS2.Storage import HBS2.Storage
import HBS2.Defaults import HBS2.Defaults
import HBS2.Clock import HBS2.Clock
import HBS2.Net.Proto.Sessions
import Control.Monad.Trans.Maybe import Control.Monad.Trans.Maybe
import Data.List qualified as L import Data.List qualified as L
@ -53,6 +54,7 @@ import Control.Concurrent.STM.TBQueue qualified as Q
import Control.Concurrent.STM.TSem qualified as Sem import Control.Concurrent.STM.TSem qualified as Sem
import Control.Concurrent.STM.TSem (TSem) import Control.Concurrent.STM.TSem (TSem)
import Data.Typeable
import Control.Concurrent.MVar as MVar import Control.Concurrent.MVar as MVar
import Control.Concurrent.STM.TQueue qualified as Q0 import Control.Concurrent.STM.TQueue qualified as Q0
@ -61,13 +63,62 @@ import Control.Concurrent
import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HashMap import Data.HashMap.Strict qualified as HashMap
--
--
--TODO: cache file handles
newtype ChunkId = ChunkId FilePath class ( Eq salt
deriving newtype (IsString) , Eq (Hash h)
deriving stock (Eq,Ord,Show) , Hashable salt
, Hashable (Hash h)
, Typeable salt
, Typeable (Hash h)
, Hashed h ByteString
) => ChunkKey salt h
instance ( Hashable salt
, Typeable salt
, Eq salt
, Eq (Hash h)
, Hashable (Hash h)
, Typeable (Hash h)
, Hashed h ByteString
) => ChunkKey salt h
data Chunk h = P [(Offset, ByteString)]
| S (Hash h) ByteString
instance Hashed h ByteString => Monoid (Chunk h) where
mempty = P []
instance Hashed h ByteString => Semigroup (Chunk h) where
(<>) (P a) (P b) = P ( a <> b )
(<>) (S _ s1) (S _ s2) = S h3 s3
where
s3 = s1 <> s2
h3 = hashObject s3
(<>) p@(P{}) (S _ s) = S h3 s3
where
(S _ s1) = toS p
s3 = s1 <> s
h3 = hashObject s3
(<>) (S _ s) p@(P{}) = S h3 s3
where
(S _ s1) = toS p
s3 = s <> s1
h3 = hashObject s3
mkP :: Offset -> ByteString -> Chunk h
mkP o b = P [(o,b)]
toS :: Hashed h ByteString => Chunk h -> Chunk h
toS s@(S{}) = s
toS (P xs) = S h s
where
s = foldMap snd $ L.sortBy (compare `on` fst) xs
h = hashObject s
data ChunkWriter h m = forall a . ( MonadIO m data ChunkWriter h m = forall a . ( MonadIO m
, Storage a h ByteString m , Storage a h ByteString m
@ -78,8 +129,7 @@ data ChunkWriter h m = forall a . ( MonadIO m
, pipeline :: Pipeline IO () , pipeline :: Pipeline IO ()
, dir :: FilePath , dir :: FilePath
, storage :: a , storage :: a
, perBlock :: !(TVar (HashMap FilePath [Handle -> IO ()])) , perBlock :: !(TVar (HashMap SKey (Chunk h)))
, perBlockLock :: !(TVar (HashMap FilePath TSem))
} }
@ -130,7 +180,6 @@ newChunkWriterIO s tmp = do
let d = fromMaybe def tmp let d = fromMaybe def tmp
mt <- liftIO $ newTVarIO mempty mt <- liftIO $ newTVarIO mempty
mts <- liftIO $ newTVarIO mempty
running <- liftIO $ newTVarIO False running <- liftIO $ newTVarIO False
@ -141,42 +190,19 @@ newChunkWriterIO s tmp = do
, dir = d , dir = d
, storage = s , storage = s
, perBlock = mt , perBlock = mt
, perBlockLock = mts
} }
makeFileName :: (Hashable salt, Pretty (Hash h)) => ChunkWriter h m -> salt -> Hash h -> FilePath
makeFileName w salt h = dir w </> suff
where
suff = show $ pretty (fromIntegral (hash salt) :: Word32) <> "@" <> pretty h
delBlock :: (Hashable salt, MonadIO m, Pretty (Hash h)) delBlock :: (MonadIO m, Pretty (Hash h))
=> ChunkWriter h IO -> salt -> Hash h -> m () => ChunkWriter h IO -> SKey -> m ()
delBlock w salt h = liftIO do
delBlock w k = liftIO do
let cache = perBlock w let cache = perBlock w
let se = perBlockLock w liftIO $ atomically $ TV.modifyTVar' cache $ HashMap.delete k
-- lock <- getLock w fn writeChunk :: ( ChunkKey salt h
flush w fn
-- atomically $ Sem.waitTSem lock
void $ runExceptT $ liftIO $ removeFile fn
liftIO $ atomically $ TV.modifyTVar' cache $ HashMap.delete fn
liftIO $ atomically $ TV.modifyTVar' se $ HashMap.delete fn
-- atomically $ Sem.signalTSem lock
where
fn = makeFileName w salt h
writeChunk :: ( Hashable salt
, MonadIO m , MonadIO m
, Pretty (Hash h) , Pretty (Hash h)
, Hashable (Hash h), Eq (Hash h)
) )
=> ChunkWriter h m => ChunkWriter h m
-> salt -> salt
@ -188,8 +214,7 @@ writeChunk = writeChunk2
getHash :: forall salt h m . getHash :: forall salt h m .
( Hashable salt ( ChunkKey salt h
, Hashed h ByteString
, m ~ IO , m ~ IO
, Block ByteString ~ ByteString , Block ByteString ~ ByteString
, Pretty (Hash h) , Pretty (Hash h)
@ -204,12 +229,11 @@ getHash = getHash2
commitBlock :: forall salt h m . commitBlock :: forall salt h m .
( Hashable salt ( ChunkKey salt h
, Hashed h ByteString , Hashed h ByteString
, Block ByteString ~ ByteString , Block ByteString ~ ByteString
, m ~ IO , m ~ IO
, Pretty (Hash h) , Pretty (Hash h)
, Hashable (Hash h), Eq (Hash h)
) )
=> ChunkWriter h m => ChunkWriter h m
-> salt -> salt
@ -218,7 +242,7 @@ commitBlock :: forall salt h m .
commitBlock = commitBlock2 commitBlock = commitBlock2
writeChunk2 :: (Hashable salt, MonadIO m, Pretty (Hash h), Hashable (Hash h), Eq (Hash h)) writeChunk2 :: (ChunkKey salt h, MonadIO m, Pretty (Hash h))
=> ChunkWriter h m => ChunkWriter h m
-> salt -> salt
-> Hash h -> Hash h
@ -228,60 +252,17 @@ writeChunk2 :: (Hashable salt, MonadIO m, Pretty (Hash h), Hashable (Hash h), Eq
writeChunk2 w salt h o !bs = do writeChunk2 w salt h o !bs = do
let cache = perBlock w let cache = perBlock w
let k = newSKey (salt, h)
let action fh = do
void $ runExceptT $ liftIO $ do
hSeek fh AbsoluteSeek (fromIntegral o)
B.hPutStr fh bs -- (BS.copy (B.toStrict bs))
hFlush fh
liftIO $ do liftIO $ do
atomically $ modifyTVar cache (HashMap.insertWith (<>) fn [action]) atomically $ modifyTVar cache (HashMap.insertWith (<>) k (mkP o bs) )
where flush :: Hashed h ByteString => ChunkWriter h IO -> SKey -> IO ()
fn = makeFileName w salt h flush w k = do
getLock w fn = do
_lock <- atomically $ Sem.newTSem 1
let locks = perBlockLock w
atomically $ stateTVar locks $ \x ->
case HashMap.lookup fn x of
Nothing -> (_lock, HashMap.insert fn _lock x)
Just s -> (s, x)
flush :: ChunkWriter h IO -> FilePath -> IO ()
flush w fn = do
let cache = perBlock w let cache = perBlock w
void $ atomically $ modifyTVar cache (HashMap.adjust toS k)
let pip = pipeline w
q <- liftIO $ Q.newTBQueueIO 1
-- addJob pip $ do
lock <- getLock w fn
race (pause (2 :: Timeout 'Seconds)) $ do
void $ runExceptT $ liftIO $ do
atomically $ Sem.waitTSem lock
mbactions <- atomically $ stateTVar cache (\v -> (HashMap.lookup fn v, HashMap.delete fn v))
maybe1 mbactions (pure ()) $ \actions -> do
withBinaryFile fn ReadWriteMode $ \h -> do
for_ actions $ \f -> f h
atomically $ Sem.signalTSem lock
void $ liftIO $ atomically $ Q.writeTBQueue q ()
void $ liftIO $ atomically $ Q.readTBQueue q
-- Blocking!
-- we need to write last chunk before this will happen
-- FIXME: incremental calculation,
-- streaming, blah-blah
getHash2 :: forall salt h m . getHash2 :: forall salt h m .
( Hashable salt ( ChunkKey salt h
, Hashed h ByteString , Hashed h ByteString
, m ~ IO , m ~ IO
, Block ByteString ~ ByteString , Block ByteString ~ ByteString
@ -294,26 +275,19 @@ getHash2 :: forall salt h m .
-> m (Maybe (Hash h)) -> m (Maybe (Hash h))
getHash2 w salt h = do getHash2 w salt h = do
let k = newSKey (salt, h)
flush w fn chunk <- readTVarIO (perBlock w) <&> fmap toS . HashMap.lookup k
case chunk of
runMaybeT $ do Just (S h1 _) -> pure (Just h1)
res <- liftIO $! runExceptT $ liftIO do _ -> pure Nothing
( B.readFile fn >>= \s -> pure $ hashObject @h s )
MaybeT $! pure $! either (const Nothing) Just res
where
fn = makeFileName w salt h
commitBlock2 :: forall salt h m . commitBlock2 :: forall salt h m .
( Hashable salt ( ChunkKey salt h
, Hashed h ByteString , Hashed h ByteString
, Block ByteString ~ ByteString , Block ByteString ~ ByteString
, m ~ IO , m ~ IO
, Pretty (Hash h) , Pretty (Hash h)
, Hashable (Hash h), Eq (Hash h)
) )
=> ChunkWriter h m => ChunkWriter h m
-> salt -> salt
@ -321,22 +295,11 @@ commitBlock2 :: forall salt h m .
-> m () -> m ()
commitBlock2 w@(ChunkWriter {storage = stor}) salt h = do commitBlock2 w@(ChunkWriter {storage = stor}) salt h = do
let k = newSKey (salt, h)
chunk <- readTVarIO (perBlock w) <&> fmap toS . HashMap.lookup k
flush w fn case chunk of
Just (S _ s) -> void $ putBlock stor s >> delBlock w k
exists <- doesFileExist fn _ -> pure () -- FIXME: error
when exists $ do
res <- liftIO $ runExceptT $! liftIO ( B.readFile fn )
case res of
Left _ -> pure ()
Right s -> do
void $ putBlock stor s
delBlock w salt h
where
fn = makeFileName w salt h

View File

@ -1,4 +1,3 @@
{-# Language FunctionalDependencies #-}
module HBS2.Net.Proto.Sessions where module HBS2.Net.Proto.Sessions where
import HBS2.Net.Proto.Types import HBS2.Net.Proto.Types
@ -9,27 +8,26 @@ import Data.Hashable
import Type.Reflection import Type.Reflection
import Data.Kind import Data.Kind
data SKey = forall a . (Unkey a, Eq a, Hashable a) => SKey (Proxy a) SomeTypeRep Dynamic data SKey = forall a . (Unkey a, Eq a, Hashable a) => SKey !(Proxy a) !SomeTypeRep !Dynamic
class Typeable a => Unkey a where class Typeable a => Unkey a where
unKey :: Proxy a -> Dynamic -> Maybe a unKey :: Proxy a -> Dynamic -> Maybe a
instance Typeable a => Unkey a where instance Typeable a => Unkey a where
unKey _ = fromDynamic @a unKey _ = fromDynamic @a
{-# INLINE unKey #-}
newSKey :: forall a . (Eq a, Typeable a, Unkey a, Hashable a) => a -> SKey newSKey :: forall a . (Eq a, Typeable a, Unkey a, Hashable a) => a -> SKey
newSKey s = SKey (Proxy @a) (someTypeRep (Proxy @a)) (toDyn s) newSKey s = SKey (Proxy @a) (someTypeRep (Proxy @a)) (toDyn s)
{-# INLINE newSKey #-}
instance Hashable SKey where instance Hashable SKey where
hashWithSalt s (SKey p t d) = hashWithSalt s (p, t, unKey p d) hashWithSalt s (SKey p t d) = hashWithSalt s (t, unKey p d)
instance Eq SKey where instance Eq SKey where
(==) (SKey p1 ty1 a) (SKey p2 ty2 b) = (==) (SKey p1 ty1 a) (SKey p2 ty2 b) = ty1 == ty2 && unKey p1 a == unKey p1 b
ty1 == ty2
&& unKey p1 a == unKey p1 b
&& unKey p2 a == unKey p2 b
data family SessionKey e p :: Type data family SessionKey e p :: Type

View File

@ -52,7 +52,7 @@ main = do
w2 <- replicateM 8 $ async $ runChunkWriter cw w2 <- replicateM 8 $ async $ runChunkWriter cw
let times = 500 let times = 1000
let info = show $ "writing" <+> pretty (show (realToFrac size / (1024*1024) :: Fixed E2)) let info = show $ "writing" <+> pretty (show (realToFrac size / (1024*1024) :: Fixed E2))
<+> "mb" <+> "mb"
@ -67,9 +67,10 @@ main = do
let psz = calcChunks (fromIntegral size) (fromIntegral chu) let psz = calcChunks (fromIntegral size) (fromIntegral chu)
psz' <- shuffleM psz psz' <- pure psz
-- psz' <- shuffleM psz
forConcurrently_ psz' $ \(o,s) -> do forM_ psz' $ \(o,s) -> do
let t = B8.take s $ B8.drop o bytes let t = B8.take s $ B8.drop o bytes
writeChunk cw 1 hash (fromIntegral o) t writeChunk cw 1 hash (fromIntegral o) t