From 2d06149e259f060b205ce0472aae873d7db88eb7 Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Mon, 23 Jan 2023 10:13:48 +0300 Subject: [PATCH] better --- hbs2-core/lib/HBS2/Actors/ChunkWriter.hs | 248 +++++++++++++++++++++-- hbs2-tests/test/Peer2Main.hs | 5 +- 2 files changed, 239 insertions(+), 14 deletions(-) diff --git a/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs b/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs index b3e2148d..0c56a282 100644 --- a/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs +++ b/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs @@ -16,12 +16,17 @@ import HBS2.Actors import HBS2.Hash import HBS2.Storage import HBS2.Defaults +import HBS2.Clock +import Data.Functor +import Data.Function import Control.Exception import Data.ByteString.Lazy (ByteString) import Data.ByteString.Lazy qualified as B -- import Data.Cache (Cache) -- import Data.Cache qualified as Cache +import Data.Foldable +import Data.Traversable import Data.Hashable (hash) import Data.Maybe import Data.Word @@ -34,10 +39,18 @@ import System.IO.Temp import Control.Concurrent.Async +import Data.Cache (Cache) +import Data.Cache qualified as Cache import Control.Concurrent.STM import Control.Concurrent.STM.TBQueue qualified as Q +import Control.Concurrent.STM.TSem qualified as Sem +import Control.Concurrent.STM.TSem (TSem) --- TODO: cache file handles +import Control.Concurrent.STM.TQueue qualified as Q0 +import Control.Concurrent +-- +-- +--TODO: cache file handles newtype ChunkId = ChunkId FilePath deriving newtype (IsString) @@ -48,20 +61,72 @@ data ChunkWriter h m = forall a . ( MonadIO m , Block ByteString ~ ByteString ) => ChunkWriter - { pipeline :: Pipeline m () + { stopped :: TVar Bool + , pipeline :: Pipeline m () , dir :: FilePath , storage :: a + , perBlock :: Cache FilePath (TQueue (Handle -> IO ())) + , semFlush :: Cache FilePath TSem } +runChunkWriter :: forall h m . ( Eq (Hash h) + , Hashable (Hash h) + , MonadIO m ) + => ChunkWriter h m -> m () -runChunkWriter :: MonadIO m => ChunkWriter h m -> m () -runChunkWriter w = do +runChunkWriter = runChunkWriter2 + +runChunkWriter1 :: forall h m . ( Eq (Hash h) + , Hashable (Hash h) + , MonadIO m ) + => ChunkWriter h m -> m () + +runChunkWriter1 w = do liftIO $ createDirectoryIfMissing True ( dir w ) runPipeline (pipeline w) + +runChunkWriter2 :: forall h m . ( Eq (Hash h) + , Hashable (Hash h) + , MonadIO m ) + => ChunkWriter h m -> m () + +runChunkWriter2 w = do + liftIO $ createDirectoryIfMissing True ( dir w ) + let cache = perBlock w + fix \next -> do + -- kks <- liftIO $ take 1 <$> Cache.keys cache + -- liftIO $ for_ kks $ \h -> flush w h + + -- pause ( 1 :: Timeout 'Seconds ) + -- yield + -- next + + stop <- liftIO $ readTVarIO (stopped w) + + if stop then do + ks <- liftIO $ take 20 <$> Cache.keys cache + for_ ks $ \k -> flush w k + else do + ks <- liftIO $ Cache.keys cache + + amount <- for ks $ \k -> flush w k + + if (sum amount == 0) then do + pause ( 0.5 :: Timeout 'Seconds ) + else do + liftIO $ print ("flushed:" <+> pretty (sum amount)) + + stopChunkWriter :: MonadIO m => ChunkWriter h m -> m () -stopChunkWriter w = stopPipeline ( pipeline w ) +stopChunkWriter w = do + liftIO $ atomically $ writeTVar (stopped w) True + +stopChunkWriter1 :: MonadIO m => ChunkWriter h m -> m () +stopChunkWriter1 w = do + let cache = perBlock w + stopPipeline ( pipeline w ) newChunkWriterIO :: forall h a m . ( Key h ~ Hash h, h ~ HbSync , Storage a h ByteString m @@ -78,11 +143,19 @@ newChunkWriterIO s tmp = do def <- liftIO $ getXdgDirectory XdgData (defStorePath "temp-chunks") let d = fromMaybe def tmp + mt <- liftIO $ Cache.newCache Nothing + sem <- liftIO $ Cache.newCache Nothing + + running <- liftIO $ newTVarIO False + pure $ ChunkWriter - { pipeline = pip + { stopped = running + , pipeline = pip , dir = d , storage = s + , perBlock = mt + , semFlush = sem } makeFileName :: (Hashable salt, Pretty (Hash h)) => ChunkWriter h m -> salt -> Hash h -> FilePath @@ -113,14 +186,63 @@ delBlock w salt h = liftIO do where fn = makeFileName w salt h -writeChunk :: (Hashable salt, MonadIO m, Pretty (Hash h)) +writeChunk :: ( Hashable salt + , MonadIO m + , Pretty (Hash h) + , Hashable (Hash h), Eq (Hash h) + ) => ChunkWriter h m -> salt -> Hash h -> Offset -> ByteString -> m () -writeChunk w salt h o bs = addJob (pipeline w) $ liftIO do +writeChunk = writeChunk2 + + +getHash :: forall salt h m . + ( Hashable salt + , Hashed h ByteString + , MonadIO m + , Block ByteString ~ ByteString + , Pretty (Hash h) + , Hashable (Hash h), Eq (Hash h) + ) + => ChunkWriter h m + -> salt + -> Hash h + -> m (Hash h) + +getHash = getHash2 + + +commitBlock :: forall salt h m . + ( Hashable salt + , Hashed h ByteString + , Block ByteString ~ ByteString + , MonadIO m + , Pretty (Hash h) + , Hashable (Hash h), Eq (Hash h) + ) + => ChunkWriter h m + -> salt + -> Hash h + -> m () + +commitBlock = commitBlock2 + + + +writeChunk1 :: (Hashable salt, MonadIO m, Pretty (Hash h)) + => ChunkWriter h m + -> salt + -> Hash h + -> Offset + -> ByteString -> m () + +writeChunk1 w salt h o bs = addJob (pipeline w) $ liftIO do +-- writeChunk w salt h o bs = liftIO do + -- print $ "writeChunk:" <+> pretty fn withBinaryFile fn ReadWriteMode $ \fh -> do hSeek fh AbsoluteSeek (fromIntegral o) B.hPutStr fh bs @@ -129,23 +251,47 @@ writeChunk w salt h o bs = addJob (pipeline w) $ liftIO do where fn = makeFileName w salt h +writeChunk2 :: (Hashable salt, MonadIO m, Pretty (Hash h), Hashable (Hash h), Eq (Hash h)) + => ChunkWriter h m + -> salt + -> Hash h + -> Offset + -> ByteString -> m () + +writeChunk2 w salt h o bs = do + + let cache = perBlock w + + liftIO $ do + q <- Cache.fetchWithCache cache fn $ const Q0.newTQueueIO + atomically $ Q0.writeTQueue q $ \fh -> do + -- withBinaryFile fn ReadWriteMode $ \fh -> do + hSeek fh AbsoluteSeek (fromIntegral o) + B.hPutStr fh bs + -- hFlush fh + + where + fn = makeFileName w salt h + + -- Blocking! -- we need to write last chunk before this will happen -- FIXME: incremental calculation, -- streaming, blah-blah -getHash :: forall salt h m . +getHash1 :: forall salt h m . ( Hashable salt , Hashed h ByteString , MonadIO m , Block ByteString ~ ByteString , Pretty (Hash h) + , Hashable (Hash h), Eq (Hash h) ) => ChunkWriter h m -> salt -> Hash h -> m (Hash h) -getHash w salt h = liftIO do +getHash1 w salt h = liftIO do q <- Q.newTBQueueIO 1 @@ -159,7 +305,57 @@ getHash w salt h = liftIO do fn = makeFileName w salt h -commitBlock :: forall salt h m . +flush w fn = do + let cache = perBlock w + let scache = semFlush w + liftIO $ do + q <- Cache.fetchWithCache cache fn $ const Q0.newTQueueIO + s <- Cache.fetchWithCache scache fn $ const (atomically $ Sem.newTSem 2) + + atomically $ Sem.waitTSem s + + Cache.delete cache fn + + flushed <- atomically (Q0.flushTQueue q) + + liftIO $ do + + -- withBinaryFile fn ReadWriteMode $ \fh -> do + withFile fn ReadWriteMode $ \fh -> do + for_ flushed $ \f -> f fh + + atomically $ Sem.signalTSem s + + pure (length flushed) + + +-- Blocking! +-- we need to write last chunk before this will happen +-- FIXME: incremental calculation, +-- streaming, blah-blah +getHash2 :: forall salt h m . + ( Hashable salt + , Hashed h ByteString + , MonadIO m + , Block ByteString ~ ByteString + , Pretty (Hash h) + , Hashable (Hash h), Eq (Hash h) + ) + => ChunkWriter h m + -> salt + -> Hash h + -> m (Hash h) + +getHash2 w salt h = do + flush w fn + h1 <- liftIO $ hashObject @h <$> B.readFile fn + pure h1 + + where + fn = makeFileName w salt h + + +commitBlock1 :: forall salt h m . ( Hashable salt , Hashed h ByteString , Block ByteString ~ ByteString @@ -171,7 +367,7 @@ commitBlock :: forall salt h m . -> Hash h -> m () -commitBlock w@(ChunkWriter {storage = stor}) salt h = do +commitBlock1 w@(ChunkWriter {storage = stor}) salt h = do q <- liftIO $ Q.newTBQueueIO 1 addJob (pipeline w) (liftIO $ B.readFile fn >>= atomically . Q.writeTBQueue q) @@ -186,3 +382,31 @@ commitBlock w@(ChunkWriter {storage = stor}) salt h = do fn = makeFileName w salt h + +commitBlock2 :: forall salt h m . + ( Hashable salt + , Hashed h ByteString + , Block ByteString ~ ByteString + , MonadIO m + , Pretty (Hash h) + , Hashable (Hash h), Eq (Hash h) + ) + => ChunkWriter h m + -> salt + -> Hash h + -> m () + +commitBlock2 w@(ChunkWriter {storage = stor}) salt h = do + let cache = perBlock w + let scache = semFlush w + flush w fn + s <- liftIO $ B.readFile fn + void $ putBlock stor s + delBlock w salt h + liftIO $ Cache.delete cache fn + liftIO $ Cache.delete scache fn + + where + fn = makeFileName w salt h + + diff --git a/hbs2-tests/test/Peer2Main.hs b/hbs2-tests/test/Peer2Main.hs index 6ef23bbc..13a4b243 100644 --- a/hbs2-tests/test/Peer2Main.hs +++ b/hbs2-tests/test/Peer2Main.hs @@ -147,7 +147,7 @@ runTestPeer p zu = do cww <- newChunkWriterIO stor (Just chDir) sw <- liftIO $ replicateM 8 $ async $ simpleStorageWorker stor - cw <- liftIO $ replicateM 1 $ async $ runChunkWriter cww + cw <- liftIO $ replicateM 16 $ async $ runChunkWriter cww zu stor cww @@ -260,7 +260,7 @@ blockDownloadLoop = do env <- ask pip <- asks (view envDeferred) - debug "process block!" + -- debug "process block!" liftIO $ addJob pip $ withPeerM env $ do -- void $ liftIO $ async $ withPeerM env $ do @@ -353,6 +353,7 @@ mkAdapter cww = do when ( h1 == h ) $ do liftIO $ commitBlock cww cKey h expire cKey + -- debug "hash matched!" emit @e (BlockChunksEventKey h) (BlockReady h) when (written > mbSize * defBlockDownloadThreshold) $ do