file corruption during concurrent writes

This commit is contained in:
Dmitry Zuikov 2023-01-25 10:15:00 +03:00
parent 6133c1cb2f
commit 5197c2fa2b
3 changed files with 27 additions and 22 deletions

View File

@ -301,7 +301,7 @@ runPeerM s bus p f = do
<*> liftIO (newTVarIO mempty)
let de = view envDeferred env
as <- liftIO $ replicateM 8 $ async $ runPipeline de
as <- liftIO $ replicateM 4 $ async $ runPipeline de
sw <- liftIO $ async $ forever $ withPeerM env $ do
pause defSweepTimeout

View File

@ -8,16 +8,16 @@ defChunkSize :: Integral a => a
defChunkSize = 500
defBlockSize :: Integer
defBlockSize = 1024 * 1024
defBlockSize = 256 * 1024
defStorePath :: IsString a => a
defStorePath = "hbs2"
defPipelineSize :: Int
defPipelineSize = 2000
defPipelineSize = 16000*4
defChunkWriterQ :: Integral a => a
defChunkWriterQ = 2000
defChunkWriterQ = 32000
defBlockDownloadQ :: Integral a => a
defBlockDownloadQ = 65536*128

View File

@ -158,8 +158,8 @@ runTestPeer p zu = do
stor <- simpleStorageInit opts
cww <- newChunkWriterIO stor (Just chDir)
sw <- liftIO $ replicateM 4 $ async $ simpleStorageWorker stor
cw <- liftIO $ replicateM 8 $ async $ runChunkWriter cww
sw <- liftIO $ replicateM 4 $ async $ simpleStorageWorker stor
cw <- liftIO $ replicateM 4 $ async $ runChunkWriter cww
zu stor cww
@ -295,6 +295,7 @@ blockDownloadLoop cw = do
let blks = [ "GTtQp6QjK7G9Sh5Aq4koGSpMX398WRWn3DV28NUAYARg"
, "5LoU2EVq7JSpiT9FmLEakVHxpsE989XnX6jE4gaUcLFA"
, "CotHSTLrg8T5NrYxyhG1AeJrdz1s4A5PdtA95Fh96JX8"
, "ANHxB2dUcSFDB7W7JuuqkSjAUXWyekVKdQLqNBhFKGgj"
]
blq <- liftIO $ Q.newTBQueueIO defBlockDownloadQ
@ -382,24 +383,28 @@ blockDownloadLoop cw = do
subscribe @e (BlockChunksEventKey h) $ \(BlockReady _) -> do
processBlock q h
-- liftIO $ async $ do
-- void $ race (pause defBlockWaitMax) $ withPeerM env $ fix \next -> do
-- pause defBlockWaitSleep
-- wl <- find key (view sBlockWrittenL)
-- let w = sum (fromMaybe [] wl)
-- debug $ "WTF?" <+> pretty (w, thisBkSize)
-- -- maybe1 w (pure ()) $ \z -> do
-- if fromIntegral w >= thisBkSize then do
-- debug "THE BLOCK IS ABOUT TO BE READY"
-- -- write to disk and so on
-- else do
-- pause defBlockWaitSleep
-- next
let blockWtf = do
debug $ "WTF!" <+> pretty (p,coo) <+> pretty h
liftIO $ async $ do
-- FIXME: block is not downloaded, return it to the Q
void $ race (pause defBlockWaitMax >> blockWtf)
$ withPeerM env $ fix \next -> do
w <- find @e key (view sBlockWrittenT)
maybe1 w (pure ()) $ \z -> do
wrt <- liftIO $ readTVarIO z
if fromIntegral wrt >= thisBkSize then do
debug $ "THE BLOCK IS ABOUT TO BE READY" <+> pretty h
h1 <- liftIO $ getHash cw key h
if h1 == h then do
liftIO $ commitBlock cw key h
expire @e key
else pause defBlockWaitSleep >> next
else do
pause defBlockWaitSleep
next
request @e p (BlockChunks @e coo (BlockGetAllChunks @e h chusz)) -- FIXME: nicer construction