From 5197c2fa2b293d3fe9e0bcbc8678da1d18486b8a Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Wed, 25 Jan 2023 10:15:00 +0300 Subject: [PATCH] file corruption during concurrent writes --- hbs2-core/lib/HBS2/Actors/Peer.hs | 2 +- hbs2-core/lib/HBS2/Defaults.hs | 6 ++--- hbs2-tests/test/Peer2Main.hs | 41 +++++++++++++++++-------------- 3 files changed, 27 insertions(+), 22 deletions(-) diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index 2b4e9d00..fd419362 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -301,7 +301,7 @@ runPeerM s bus p f = do <*> liftIO (newTVarIO mempty) let de = view envDeferred env - as <- liftIO $ replicateM 8 $ async $ runPipeline de + as <- liftIO $ replicateM 4 $ async $ runPipeline de sw <- liftIO $ async $ forever $ withPeerM env $ do pause defSweepTimeout diff --git a/hbs2-core/lib/HBS2/Defaults.hs b/hbs2-core/lib/HBS2/Defaults.hs index ce81a479..8baeb891 100644 --- a/hbs2-core/lib/HBS2/Defaults.hs +++ b/hbs2-core/lib/HBS2/Defaults.hs @@ -8,16 +8,16 @@ defChunkSize :: Integral a => a defChunkSize = 500 defBlockSize :: Integer -defBlockSize = 1024 * 1024 +defBlockSize = 256 * 1024 defStorePath :: IsString a => a defStorePath = "hbs2" defPipelineSize :: Int -defPipelineSize = 2000 +defPipelineSize = 16000*4 defChunkWriterQ :: Integral a => a -defChunkWriterQ = 2000 +defChunkWriterQ = 32000 defBlockDownloadQ :: Integral a => a defBlockDownloadQ = 65536*128 diff --git a/hbs2-tests/test/Peer2Main.hs b/hbs2-tests/test/Peer2Main.hs index bc3cef53..7172b7e9 100644 --- a/hbs2-tests/test/Peer2Main.hs +++ b/hbs2-tests/test/Peer2Main.hs @@ -158,8 +158,8 @@ runTestPeer p zu = do stor <- simpleStorageInit opts cww <- newChunkWriterIO stor (Just chDir) - sw <- liftIO $ replicateM 4 $ async $ simpleStorageWorker stor - cw <- liftIO $ replicateM 8 $ async $ runChunkWriter cww + sw <- liftIO $ replicateM 4 $ async $ simpleStorageWorker stor + cw <- liftIO $ replicateM 4 $ async $ runChunkWriter cww zu stor cww @@ -295,6 +295,7 @@ blockDownloadLoop cw = do let blks = [ "GTtQp6QjK7G9Sh5Aq4koGSpMX398WRWn3DV28NUAYARg" , "5LoU2EVq7JSpiT9FmLEakVHxpsE989XnX6jE4gaUcLFA" , "CotHSTLrg8T5NrYxyhG1AeJrdz1s4A5PdtA95Fh96JX8" + , "ANHxB2dUcSFDB7W7JuuqkSjAUXWyekVKdQLqNBhFKGgj" ] blq <- liftIO $ Q.newTBQueueIO defBlockDownloadQ @@ -382,24 +383,28 @@ blockDownloadLoop cw = do subscribe @e (BlockChunksEventKey h) $ \(BlockReady _) -> do processBlock q h - -- liftIO $ async $ do - -- void $ race (pause defBlockWaitMax) $ withPeerM env $ fix \next -> do - -- pause defBlockWaitSleep - -- wl <- find key (view sBlockWrittenL) - - -- let w = sum (fromMaybe [] wl) - - -- debug $ "WTF?" <+> pretty (w, thisBkSize) - - -- -- maybe1 w (pure ()) $ \z -> do - -- if fromIntegral w >= thisBkSize then do - -- debug "THE BLOCK IS ABOUT TO BE READY" - -- -- write to disk and so on - -- else do - -- pause defBlockWaitSleep - -- next + let blockWtf = do + debug $ "WTF!" <+> pretty (p,coo) <+> pretty h + liftIO $ async $ do -- FIXME: block is not downloaded, return it to the Q + void $ race (pause defBlockWaitMax >> blockWtf) + $ withPeerM env $ fix \next -> do + w <- find @e key (view sBlockWrittenT) + + maybe1 w (pure ()) $ \z -> do + wrt <- liftIO $ readTVarIO z + + if fromIntegral wrt >= thisBkSize then do + debug $ "THE BLOCK IS ABOUT TO BE READY" <+> pretty h + h1 <- liftIO $ getHash cw key h + if h1 == h then do + liftIO $ commitBlock cw key h + expire @e key + else pause defBlockWaitSleep >> next + else do + pause defBlockWaitSleep + next request @e p (BlockChunks @e coo (BlockGetAllChunks @e h chusz)) -- FIXME: nicer construction