diff --git a/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs b/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs index fdde51f6..513ffd1d 100644 --- a/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs +++ b/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs @@ -4,7 +4,6 @@ module HBS2.Actors.ChunkWriter , newChunkWriterIO , runChunkWriter , stopChunkWriter - , newBlock , delBlock , commitBlock , writeChunk @@ -50,6 +49,10 @@ import Control.Concurrent.STM.TSem (TSem) import Control.Concurrent.STM.TQueue qualified as Q0 import Control.Concurrent + +import Data.HashMap.Strict (HashMap) +import Data.HashMap.Strict qualified as HashMap + -- -- --TODO: cache file handles @@ -67,15 +70,16 @@ data ChunkWriter h m = forall a . ( MonadIO m , pipeline :: Pipeline m () , dir :: FilePath , storage :: a - , perBlock :: Cache FilePath (TQueue (IO ())) + , perBlock :: TVar (HashMap FilePath [IO ()]) , perBlockSem :: Cache FilePath TSem } blocksInProcess :: MonadIO m => ChunkWriter h m -> m Int -blocksInProcess cw = liftIO $ Cache.purgeExpired cache >> Cache.size cache - where - cache = perBlock cw +blocksInProcess cw = undefined + -- liftIO $ Cache.purgeExpired cache >> Cache.size cache + -- where + -- cache = perBlock cw runChunkWriter :: forall h m . ( Eq (Hash h) , Hashable (Hash h) @@ -84,15 +88,6 @@ runChunkWriter :: forall h m . ( Eq (Hash h) runChunkWriter = runChunkWriter2 -runChunkWriter1 :: forall h m . ( Eq (Hash h) - , Hashable (Hash h) - , MonadIO m ) - => ChunkWriter h m -> m () - -runChunkWriter1 w = do - liftIO $ createDirectoryIfMissing True ( dir w ) - runPipeline (pipeline w) - runChunkWriter2 :: forall h m . ( Eq (Hash h) , Hashable (Hash h) @@ -101,43 +96,13 @@ runChunkWriter2 :: forall h m . ( Eq (Hash h) runChunkWriter2 w = do liftIO $ createDirectoryIfMissing True ( dir w ) - let cache = perBlock w - fix \next -> do - -- kks <- liftIO $ take 1 <$> Cache.keys cache - -- liftIO $ for_ kks $ \h -> flush w h - - -- pause ( 1 :: Timeout 'Seconds ) - -- yield - -- next - - -- liftIO $ print "runChunkWriter2" - - stop <- liftIO $ readTVarIO (stopped w) - - if stop then do - ks <- liftIO $ Cache.keys cache - liftIO $ for_ ks $ \k -> flush w k - else do - ks <- liftIO $ Cache.keys cache - - amount <- for ks $ \k -> flush w k - - if (sum amount == 0) then do - -- pure () - pause ( 0.1 :: Timeout 'Seconds ) - else do - liftIO $ print ("flushed:" <+> pretty (sum amount)) + fix \next -> pause ( 1 :: Timeout 'Seconds) >> next stopChunkWriter :: MonadIO m => ChunkWriter h m -> m () stopChunkWriter w = do liftIO $ atomically $ writeTVar (stopped w) True -stopChunkWriter1 :: MonadIO m => ChunkWriter h m -> m () -stopChunkWriter1 w = do - let cache = perBlock w - stopPipeline ( pipeline w ) - newChunkWriterIO :: forall h a m . ( Key h ~ Hash h, h ~ HbSync , Storage a h ByteString m , Block ByteString ~ ByteString @@ -153,7 +118,7 @@ newChunkWriterIO s tmp = do def <- liftIO $ getXdgDirectory XdgData (defStorePath "temp-chunks") let d = fromMaybe def tmp - mt <- liftIO $ Cache.newCache Nothing + mt <- liftIO $ newTVarIO mempty mts <- liftIO $ Cache.newCache Nothing running <- liftIO $ newTVarIO False @@ -173,21 +138,6 @@ makeFileName w salt h = dir w suff where suff = show $ pretty (fromIntegral (hash salt) :: Word32) <> "@" <> pretty h --- TODO: check uniqueness -newBlock :: ( MonadIO m - , Hashable salt - , Pretty (Hash h) - ) - => ChunkWriter h m - -> salt - -> Hash h - -> Size -> m () - -newBlock w salt h size = liftIO do - withBinaryFile fn ReadWriteMode (`hSetFileSize` fromIntegral size) - where - fn = makeFileName w salt h - delBlock :: (Hashable salt, MonadIO m, Pretty (Hash h)) => ChunkWriter h m -> salt -> Hash h -> m () @@ -241,26 +191,6 @@ commitBlock :: forall salt h m . commitBlock = commitBlock2 - - -writeChunk1 :: (Hashable salt, MonadIO m, Pretty (Hash h)) - => ChunkWriter h m - -> salt - -> Hash h - -> Offset - -> ByteString -> m () - -writeChunk1 w salt h o bs = addJob (pipeline w) $ liftIO do --- writeChunk w salt h o bs = liftIO do - -- print $ "writeChunk:" <+> pretty fn - withBinaryFile fn ReadWriteMode $ \fh -> do - hSeek fh AbsoluteSeek (fromIntegral o) - B.hPutStr fh bs - hFlush fh - - where - fn = makeFileName w salt h - writeChunk2 :: (Hashable salt, MonadIO m, Pretty (Hash h), Hashable (Hash h), Eq (Hash h)) => ChunkWriter h m -> salt @@ -272,46 +202,13 @@ writeChunk2 w salt h o bs = do let cache = perBlock w - -- liftIO $ print $ "writeChunk" <+> pretty o <+> pretty (B.length bs) <+> pretty h + let action = do + withBinaryFile fn ReadWriteMode $ \fh -> do + hSeek fh AbsoluteSeek (fromIntegral o) + B.hPutStr fh bs liftIO $ do - q <- Cache.fetchWithCache cache fn $ const Q0.newTQueueIO - atomically $ Q0.writeTQueue q $ do - withBinaryFile fn ReadWriteMode $ \fh -> do - hSeek fh AbsoluteSeek (fromIntegral o) - B.hPutStr fh bs - -- hFlush fh - - where - fn = makeFileName w salt h - - --- Blocking! --- we need to write last chunk before this will happen --- FIXME: incremental calculation, --- streaming, blah-blah -getHash1 :: forall salt h m . - ( Hashable salt - , Hashed h ByteString - , MonadIO m - , Block ByteString ~ ByteString - , Pretty (Hash h) - , Hashable (Hash h), Eq (Hash h) - ) - => ChunkWriter h m - -> salt - -> Hash h - -> m (Hash h) - -getHash1 w salt h = liftIO do - - q <- Q.newTBQueueIO 1 - - addJob (pipeline w) $ liftIO do - h1 <- hashObject @h <$> B.readFile fn - atomically $ Q.writeTBQueue q h1 - - atomically $ Q.readTBQueue q + atomically $ modifyTVar cache (HashMap.insertWith (<>) fn [action]) where fn = makeFileName w salt h @@ -319,28 +216,10 @@ getHash1 w salt h = liftIO do flush w fn = do let cache = perBlock w - let scache = perBlockSem w + liftIO $ do - - q <- Cache.fetchWithCache cache fn $ const Q0.newTQueueIO - s <- Cache.fetchWithCache scache fn $ const $ atomically $ Sem.newTSem 1 - - atomically $ Sem.waitTSem s - - Cache.delete cache fn - - flushed <- atomically (Q0.flushTQueue q) - - liftIO $ do - - -- withBinaryFile fn ReadWriteMode $ \fh -> do - -- withBinaryFile fn ReadWriteMode $ \fh -> do - for_ flushed id - - atomically $ Sem.signalTSem s - - pure (length flushed) - + actions <- atomically $ stateTVar cache (\v -> (HashMap.lookup fn v, HashMap.delete fn v)) + sequence_ (fromMaybe mempty actions) -- Blocking! -- we need to write last chunk before this will happen @@ -361,41 +240,12 @@ getHash2 :: forall salt h m . getHash2 w salt h = do flush w fn - h1 <- liftIO $ hashObject @h <$> B.readFile fn - pure h1 + liftIO $ hashObject @h <$> B.readFile fn where fn = makeFileName w salt h -commitBlock1 :: forall salt h m . - ( Hashable salt - , Hashed h ByteString - , Block ByteString ~ ByteString - , MonadIO m - , Pretty (Hash h) - ) - => ChunkWriter h m - -> salt - -> Hash h - -> m () - -commitBlock1 w@(ChunkWriter {storage = stor}) salt h = do - q <- liftIO $ Q.newTBQueueIO 1 - - addJob (pipeline w) (liftIO $ B.readFile fn >>= atomically . Q.writeTBQueue q) - - s <- liftIO $ atomically $ Q.readTBQueue q - - void $ putBlock stor s - - delBlock w salt h - - where - fn = makeFileName w salt h - - - commitBlock2 :: forall salt h m . ( Hashable salt , Hashed h ByteString @@ -415,7 +265,6 @@ commitBlock2 w@(ChunkWriter {storage = stor}) salt h = do s <- liftIO $ B.readFile fn void $ putBlock stor s delBlock w salt h - liftIO $ Cache.delete cache fn where fn = makeFileName w salt h diff --git a/hbs2-tests/hie.yaml b/hbs2-tests/hie.yaml deleted file mode 100644 index a7da04bd..00000000 --- a/hbs2-tests/hie.yaml +++ /dev/null @@ -1,11 +0,0 @@ -cradle: - cabal: - - path: "test/Peer2Main.hs" - component: "hbs2-tests:exe:test-peer-run" - - - path: "test/TestSKey" - component: "hbs2-tests:test:test-skey" - - - path: "test/TestChunkWriter" - component: "hbs2-tests:test:test-cw" - diff --git a/hbs2-tests/test/TestChunkWriter.hs b/hbs2-tests/test/TestChunkWriter.hs index fe5e25e9..25640105 100644 --- a/hbs2-tests/test/TestChunkWriter.hs +++ b/hbs2-tests/test/TestChunkWriter.hs @@ -31,6 +31,17 @@ main = do withSystemTempDirectory "cww-test" $ \dir -> do + let opts = [ StoragePrefix (dir ".test-cww") + ] + + storage <- simpleStorageInit opts :: IO (SimpleStorage HbSync) + + w1 <- replicateM 1 $ async (simpleStorageWorker storage) + + cw <- newChunkWriterIO storage (Just (dir ".qqq")) + + w2 <- replicateM 1 $ async $ runChunkWriter cw + failed <- replicateM 100 $ do bytes <- B8.pack <$> (replicateM size $ uniformM g) @@ -39,35 +50,24 @@ main = do let psz = calcChunks (fromIntegral size) (fromIntegral chu) - let opts = [ StoragePrefix (dir ".test-cww") - ] - - storage <- simpleStorageInit opts :: IO (SimpleStorage HbSync) - - w1 <- replicateM 1 $ async (simpleStorageWorker storage) - - cw <- newChunkWriterIO storage (Just (dir ".qqq")) - - w2 <- replicateM 1 $ async $ runChunkWriter cw - psz' <- shuffleM psz -- psz' <- pure psz + -- forConcurrently_ psz' $ \(o,s) -> do forConcurrently_ psz' $ \(o,s) -> do - -- forM_ psz' $ \(o,s) -> do let t = B8.take s $ B8.drop o bytes writeChunk cw 1 hash (fromIntegral o) t h2 <- getHash cw 1 hash - -- h3 <- getHash cw 1 hash - - mapM_ cancel $ w1 <> w2 if hash /= h2 then do pure [1] - else + else do + commitBlock cw 1 hash pure mempty + mapM_ cancel $ w1 <> w2 + print $ "failed" <+> pretty (sum (mconcat failed)) pure () diff --git a/hie.yaml b/hie.yaml index 04cd2439..9271e42b 100644 --- a/hie.yaml +++ b/hie.yaml @@ -1,2 +1,17 @@ cradle: cabal: + - path: "hbs-tests/test/Peer2Main.hs" + component: "hbs2-tests:exe:test-peer-run" + + - path: "hbs2-tests/test/TestSKey" + component: "hbs2-tests:test:test-skey" + + - path: "hbs2-tests/test/TestChunkWriter" + component: "hbs2-tests:test:test-cw" + + - path: "hbs2-core/lib" + component: "hbs2-core:lib:hbs2-core" + + - path: "hbs2-storage-simple/lib" + component: "hbs2-storage-simple:lib:hbs2-storage-simple" +