diff --git a/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs b/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs index bbaac0cc..55042e43 100644 --- a/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs +++ b/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs @@ -1,6 +1,6 @@ +{-# Language UndecidableInstances #-} module HBS2.Actors.ChunkWriter ( ChunkWriter - , ChunkId , newChunkWriterIO , runChunkWriter , stopChunkWriter @@ -17,6 +17,7 @@ import HBS2.Hash import HBS2.Storage import HBS2.Defaults import HBS2.Clock +import HBS2.Net.Proto.Sessions import Control.Monad.Trans.Maybe import Data.List qualified as L @@ -53,6 +54,7 @@ import Control.Concurrent.STM.TBQueue qualified as Q import Control.Concurrent.STM.TSem qualified as Sem import Control.Concurrent.STM.TSem (TSem) +import Data.Typeable import Control.Concurrent.MVar as MVar import Control.Concurrent.STM.TQueue qualified as Q0 @@ -61,13 +63,62 @@ import Control.Concurrent import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict qualified as HashMap --- --- ---TODO: cache file handles -newtype ChunkId = ChunkId FilePath - deriving newtype (IsString) - deriving stock (Eq,Ord,Show) +class ( Eq salt + , Eq (Hash h) + , Hashable salt + , Hashable (Hash h) + , Typeable salt + , Typeable (Hash h) + , Hashed h ByteString + ) => ChunkKey salt h + +instance ( Hashable salt + , Typeable salt + , Eq salt + , Eq (Hash h) + , Hashable (Hash h) + , Typeable (Hash h) + , Hashed h ByteString + ) => ChunkKey salt h + + +data Chunk h = P [(Offset, ByteString)] + | S (Hash h) ByteString + + +instance Hashed h ByteString => Monoid (Chunk h) where + mempty = P [] + +instance Hashed h ByteString => Semigroup (Chunk h) where + (<>) (P a) (P b) = P ( a <> b ) + + (<>) (S _ s1) (S _ s2) = S h3 s3 + where + s3 = s1 <> s2 + h3 = hashObject s3 + + (<>) p@(P{}) (S _ s) = S h3 s3 + where + (S _ s1) = toS p + s3 = s1 <> s + h3 = hashObject s3 + + (<>) (S _ s) p@(P{}) = S h3 s3 + where + (S _ s1) = toS p + s3 = s <> s1 + h3 = hashObject s3 + +mkP :: Offset -> ByteString -> Chunk h +mkP o b = P [(o,b)] + +toS :: Hashed h ByteString => Chunk h -> Chunk h +toS s@(S{}) = s +toS (P xs) = S h s + where + s = foldMap snd $ L.sortBy (compare `on` fst) xs + h = hashObject s data ChunkWriter h m = forall a . ( MonadIO m , Storage a h ByteString m @@ -78,8 +129,7 @@ data ChunkWriter h m = forall a . ( MonadIO m , pipeline :: Pipeline IO () , dir :: FilePath , storage :: a - , perBlock :: !(TVar (HashMap FilePath [Handle -> IO ()])) - , perBlockLock :: !(TVar (HashMap FilePath TSem)) + , perBlock :: !(TVar (HashMap SKey (Chunk h))) } @@ -130,7 +180,6 @@ newChunkWriterIO s tmp = do let d = fromMaybe def tmp mt <- liftIO $ newTVarIO mempty - mts <- liftIO $ newTVarIO mempty running <- liftIO $ newTVarIO False @@ -141,42 +190,19 @@ newChunkWriterIO s tmp = do , dir = d , storage = s , perBlock = mt - , perBlockLock = mts } -makeFileName :: (Hashable salt, Pretty (Hash h)) => ChunkWriter h m -> salt -> Hash h -> FilePath -makeFileName w salt h = dir w suff - where - suff = show $ pretty (fromIntegral (hash salt) :: Word32) <> "@" <> pretty h -delBlock :: (Hashable salt, MonadIO m, Pretty (Hash h)) - => ChunkWriter h IO -> salt -> Hash h -> m () - -delBlock w salt h = liftIO do +delBlock :: (MonadIO m, Pretty (Hash h)) + => ChunkWriter h IO -> SKey -> m () +delBlock w k = liftIO do let cache = perBlock w - let se = perBlockLock w + liftIO $ atomically $ TV.modifyTVar' cache $ HashMap.delete k - -- lock <- getLock w fn - - flush w fn - - -- atomically $ Sem.waitTSem lock - - void $ runExceptT $ liftIO $ removeFile fn - - liftIO $ atomically $ TV.modifyTVar' cache $ HashMap.delete fn - liftIO $ atomically $ TV.modifyTVar' se $ HashMap.delete fn - - -- atomically $ Sem.signalTSem lock - - where - fn = makeFileName w salt h - -writeChunk :: ( Hashable salt +writeChunk :: ( ChunkKey salt h , MonadIO m , Pretty (Hash h) - , Hashable (Hash h), Eq (Hash h) ) => ChunkWriter h m -> salt @@ -188,8 +214,7 @@ writeChunk = writeChunk2 getHash :: forall salt h m . - ( Hashable salt - , Hashed h ByteString + ( ChunkKey salt h , m ~ IO , Block ByteString ~ ByteString , Pretty (Hash h) @@ -204,12 +229,11 @@ getHash = getHash2 commitBlock :: forall salt h m . - ( Hashable salt + ( ChunkKey salt h , Hashed h ByteString , Block ByteString ~ ByteString , m ~ IO , Pretty (Hash h) - , Hashable (Hash h), Eq (Hash h) ) => ChunkWriter h m -> salt @@ -218,7 +242,7 @@ commitBlock :: forall salt h m . commitBlock = commitBlock2 -writeChunk2 :: (Hashable salt, MonadIO m, Pretty (Hash h), Hashable (Hash h), Eq (Hash h)) +writeChunk2 :: (ChunkKey salt h, MonadIO m, Pretty (Hash h)) => ChunkWriter h m -> salt -> Hash h @@ -228,60 +252,17 @@ writeChunk2 :: (Hashable salt, MonadIO m, Pretty (Hash h), Hashable (Hash h), Eq writeChunk2 w salt h o !bs = do let cache = perBlock w - - let action fh = do - void $ runExceptT $ liftIO $ do - hSeek fh AbsoluteSeek (fromIntegral o) - B.hPutStr fh bs -- (BS.copy (B.toStrict bs)) - hFlush fh - + let k = newSKey (salt, h) liftIO $ do - atomically $ modifyTVar cache (HashMap.insertWith (<>) fn [action]) + atomically $ modifyTVar cache (HashMap.insertWith (<>) k (mkP o bs) ) - where - fn = makeFileName w salt h - -getLock w fn = do - _lock <- atomically $ Sem.newTSem 1 - let locks = perBlockLock w - atomically $ stateTVar locks $ \x -> - case HashMap.lookup fn x of - Nothing -> (_lock, HashMap.insert fn _lock x) - Just s -> (s, x) - -flush :: ChunkWriter h IO -> FilePath -> IO () -flush w fn = do +flush :: Hashed h ByteString => ChunkWriter h IO -> SKey -> IO () +flush w k = do let cache = perBlock w + void $ atomically $ modifyTVar cache (HashMap.adjust toS k) - let pip = pipeline w - - - q <- liftIO $ Q.newTBQueueIO 1 - - -- addJob pip $ do - - lock <- getLock w fn - - race (pause (2 :: Timeout 'Seconds)) $ do - void $ runExceptT $ liftIO $ do - atomically $ Sem.waitTSem lock - mbactions <- atomically $ stateTVar cache (\v -> (HashMap.lookup fn v, HashMap.delete fn v)) - maybe1 mbactions (pure ()) $ \actions -> do - withBinaryFile fn ReadWriteMode $ \h -> do - for_ actions $ \f -> f h - - atomically $ Sem.signalTSem lock - void $ liftIO $ atomically $ Q.writeTBQueue q () - - void $ liftIO $ atomically $ Q.readTBQueue q - - --- Blocking! --- we need to write last chunk before this will happen --- FIXME: incremental calculation, --- streaming, blah-blah getHash2 :: forall salt h m . - ( Hashable salt + ( ChunkKey salt h , Hashed h ByteString , m ~ IO , Block ByteString ~ ByteString @@ -294,26 +275,19 @@ getHash2 :: forall salt h m . -> m (Maybe (Hash h)) getHash2 w salt h = do - - flush w fn - - runMaybeT $ do - res <- liftIO $! runExceptT $ liftIO do - ( B.readFile fn >>= \s -> pure $ hashObject @h s ) - - MaybeT $! pure $! either (const Nothing) Just res - - where - fn = makeFileName w salt h + let k = newSKey (salt, h) + chunk <- readTVarIO (perBlock w) <&> fmap toS . HashMap.lookup k + case chunk of + Just (S h1 _) -> pure (Just h1) + _ -> pure Nothing commitBlock2 :: forall salt h m . - ( Hashable salt + ( ChunkKey salt h , Hashed h ByteString , Block ByteString ~ ByteString , m ~ IO , Pretty (Hash h) - , Hashable (Hash h), Eq (Hash h) ) => ChunkWriter h m -> salt @@ -321,22 +295,11 @@ commitBlock2 :: forall salt h m . -> m () commitBlock2 w@(ChunkWriter {storage = stor}) salt h = do + let k = newSKey (salt, h) + chunk <- readTVarIO (perBlock w) <&> fmap toS . HashMap.lookup k - flush w fn - - exists <- doesFileExist fn - - when exists $ do - - res <- liftIO $ runExceptT $! liftIO ( B.readFile fn ) - - case res of - Left _ -> pure () - Right s -> do - void $ putBlock stor s - delBlock w salt h - - where - fn = makeFileName w salt h + case chunk of + Just (S _ s) -> void $ putBlock stor s >> delBlock w k + _ -> pure () -- FIXME: error diff --git a/hbs2-core/lib/HBS2/Net/Proto/Sessions.hs b/hbs2-core/lib/HBS2/Net/Proto/Sessions.hs index d40faa7f..5d35f6b1 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Sessions.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Sessions.hs @@ -1,4 +1,3 @@ -{-# Language FunctionalDependencies #-} module HBS2.Net.Proto.Sessions where import HBS2.Net.Proto.Types @@ -9,27 +8,26 @@ import Data.Hashable import Type.Reflection import Data.Kind -data SKey = forall a . (Unkey a, Eq a, Hashable a) => SKey (Proxy a) SomeTypeRep Dynamic +data SKey = forall a . (Unkey a, Eq a, Hashable a) => SKey !(Proxy a) !SomeTypeRep !Dynamic class Typeable a => Unkey a where unKey :: Proxy a -> Dynamic -> Maybe a instance Typeable a => Unkey a where unKey _ = fromDynamic @a + {-# INLINE unKey #-} newSKey :: forall a . (Eq a, Typeable a, Unkey a, Hashable a) => a -> SKey newSKey s = SKey (Proxy @a) (someTypeRep (Proxy @a)) (toDyn s) +{-# INLINE newSKey #-} instance Hashable SKey where - hashWithSalt s (SKey p t d) = hashWithSalt s (p, t, unKey p d) + hashWithSalt s (SKey p t d) = hashWithSalt s (t, unKey p d) instance Eq SKey where - (==) (SKey p1 ty1 a) (SKey p2 ty2 b) = - ty1 == ty2 - && unKey p1 a == unKey p1 b - && unKey p2 a == unKey p2 b + (==) (SKey p1 ty1 a) (SKey p2 ty2 b) = ty1 == ty2 && unKey p1 a == unKey p1 b data family SessionKey e p :: Type diff --git a/hbs2-tests/test/TestChunkWriter.hs b/hbs2-tests/test/TestChunkWriter.hs index b5f8f4fa..c1fe9220 100644 --- a/hbs2-tests/test/TestChunkWriter.hs +++ b/hbs2-tests/test/TestChunkWriter.hs @@ -52,7 +52,7 @@ main = do w2 <- replicateM 8 $ async $ runChunkWriter cw - let times = 500 + let times = 1000 let info = show $ "writing" <+> pretty (show (realToFrac size / (1024*1024) :: Fixed E2)) <+> "mb" @@ -67,9 +67,10 @@ main = do let psz = calcChunks (fromIntegral size) (fromIntegral chu) - psz' <- shuffleM psz + psz' <- pure psz + -- psz' <- shuffleM psz - forConcurrently_ psz' $ \(o,s) -> do + forM_ psz' $ \(o,s) -> do let t = B8.take s $ B8.drop o bytes writeChunk cw 1 hash (fromIntegral o) t