From ceb03a558a745c29d05cc0587501c9de2332b4d1 Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Thu, 26 Jan 2023 15:41:36 +0300 Subject: [PATCH] wtf --- hbs2-core/lib/HBS2/Actors/ChunkWriter.hs | 96 +++++++++++++++--------- hbs2-tests/hbs2-tests.cabal | 1 + hbs2-tests/test/TestChunkWriter.hs | 12 +-- 3 files changed, 66 insertions(+), 43 deletions(-) diff --git a/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs b/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs index a54186d1..bbaac0cc 100644 --- a/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs +++ b/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs @@ -25,6 +25,7 @@ import Data.Function import Control.Exception import Data.ByteString.Lazy (ByteString) import Data.ByteString.Lazy qualified as B +import Data.ByteString qualified as BS -- import Data.Cache (Cache) -- import Data.Cache qualified as Cache import Data.Foldable @@ -42,6 +43,7 @@ import System.FileLock import Control.Concurrent.Async +import Control.Monad.Except import Control.Monad import Data.Cache (Cache) import Data.Cache qualified as Cache @@ -51,6 +53,8 @@ import Control.Concurrent.STM.TBQueue qualified as Q import Control.Concurrent.STM.TSem qualified as Sem import Control.Concurrent.STM.TSem (TSem) +import Control.Concurrent.MVar as MVar + import Control.Concurrent.STM.TQueue qualified as Q0 import Control.Concurrent @@ -70,12 +74,12 @@ data ChunkWriter h m = forall a . ( MonadIO m , Block ByteString ~ ByteString ) => ChunkWriter - { stopped :: TVar Bool - , pipeline :: Pipeline IO () - , dir :: FilePath - , storage :: a - , perBlock :: TVar (HashMap FilePath [Handle -> IO ()]) - , perBlockSem :: TVar (HashMap FilePath TSem) + { stopped :: TVar Bool + , pipeline :: Pipeline IO () + , dir :: FilePath + , storage :: a + , perBlock :: !(TVar (HashMap FilePath [Handle -> IO ()])) + , perBlockLock :: !(TVar (HashMap FilePath TSem)) } @@ -137,7 +141,7 @@ newChunkWriterIO s tmp = do , dir = d , storage = s , perBlock = mt - , perBlockSem = mts + , perBlockLock = mts } makeFileName :: (Hashable salt, Pretty (Hash h)) => ChunkWriter h m -> salt -> Hash h -> FilePath @@ -151,14 +155,20 @@ delBlock :: (Hashable salt, MonadIO m, Pretty (Hash h)) delBlock w salt h = liftIO do let cache = perBlock w - let se = perBlockSem w + let se = perBlockLock w - liftIO $ flush w fn + -- lock <- getLock w fn + + flush w fn + + -- atomically $ Sem.waitTSem lock + + void $ runExceptT $ liftIO $ removeFile fn liftIO $ atomically $ TV.modifyTVar' cache $ HashMap.delete fn liftIO $ atomically $ TV.modifyTVar' se $ HashMap.delete fn - void $ tryJust (guard . isDoesNotExistError) (removeFile fn) + -- atomically $ Sem.signalTSem lock where fn = makeFileName w salt h @@ -215,14 +225,15 @@ writeChunk2 :: (Hashable salt, MonadIO m, Pretty (Hash h), Hashable (Hash h), Eq -> Offset -> ByteString -> m () -writeChunk2 w salt h o bs = do +writeChunk2 w salt h o !bs = do let cache = perBlock w let action fh = do - -- withBinaryFile fn ReadWriteMode $ \fh -> do - hSeek fh AbsoluteSeek (fromIntegral o) - B.hPutStr fh bs + void $ runExceptT $ liftIO $ do + hSeek fh AbsoluteSeek (fromIntegral o) + B.hPutStr fh bs -- (BS.copy (B.toStrict bs)) + hFlush fh liftIO $ do atomically $ modifyTVar cache (HashMap.insertWith (<>) fn [action]) @@ -230,28 +241,39 @@ writeChunk2 w salt h o bs = do where fn = makeFileName w salt h +getLock w fn = do + _lock <- atomically $ Sem.newTSem 1 + let locks = perBlockLock w + atomically $ stateTVar locks $ \x -> + case HashMap.lookup fn x of + Nothing -> (_lock, HashMap.insert fn _lock x) + Just s -> (s, x) + flush :: ChunkWriter h IO -> FilePath -> IO () flush w fn = do let cache = perBlock w + let pip = pipeline w - liftIO $ do - actions <- atomically $ stateTVar cache (\v -> (HashMap.lookup fn v, HashMap.delete fn v)) + q <- liftIO $ Q.newTBQueueIO 1 - q <- liftIO $ Q.newTBQueueIO 1 + -- addJob pip $ do - addJob pip $ do + lock <- getLock w fn - as <- asyncBound $ do + race (pause (2 :: Timeout 'Seconds)) $ do + void $ runExceptT $ liftIO $ do + atomically $ Sem.waitTSem lock + mbactions <- atomically $ stateTVar cache (\v -> (HashMap.lookup fn v, HashMap.delete fn v)) + maybe1 mbactions (pure ()) $ \actions -> do withBinaryFile fn ReadWriteMode $ \h -> do - withFileLock fn Exclusive $ \_ -> do - for_ (fromMaybe mempty actions) $ \f -> f h - wait as + for_ actions $ \f -> f h - void $ liftIO $ atomically $ Q.writeTBQueue q () + atomically $ Sem.signalTSem lock + void $ liftIO $ atomically $ Q.writeTBQueue q () - liftIO $ atomically $ Q.readTBQueue q + void $ liftIO $ atomically $ Q.readTBQueue q -- Blocking! @@ -272,13 +294,14 @@ getHash2 :: forall salt h m . -> m (Maybe (Hash h)) getHash2 w salt h = do + flush w fn runMaybeT $ do - res <- liftIO $ tryJust (guard . isDoesNotExistError) - ( B.readFile fn >>= \s -> pure $ hashObject @h s ) + res <- liftIO $! runExceptT $ liftIO do + ( B.readFile fn >>= \s -> pure $ hashObject @h s ) - MaybeT $ pure $ either (const Nothing) Just res + MaybeT $! pure $! either (const Nothing) Just res where fn = makeFileName w salt h @@ -299,20 +322,19 @@ commitBlock2 :: forall salt h m . commitBlock2 w@(ChunkWriter {storage = stor}) salt h = do - print "FLUSHING" - flush w fn - print "FLUSHED" + exists <- doesFileExist fn - res <- liftIO $ tryJust (guard . isDoesNotExistError) - ( B.readFile fn ) + when exists $ do - case res of - Left _ -> pure () - Right s -> do - void $ putBlock stor s - delBlock w salt h + res <- liftIO $ runExceptT $! liftIO ( B.readFile fn ) + + case res of + Left _ -> pure () + Right s -> do + void $ putBlock stor s + delBlock w salt h where fn = makeFileName w salt h diff --git a/hbs2-tests/hbs2-tests.cabal b/hbs2-tests/hbs2-tests.cabal index 0d2b7b3c..a4cb6e94 100644 --- a/hbs2-tests/hbs2-tests.cabal +++ b/hbs2-tests/hbs2-tests.cabal @@ -26,6 +26,7 @@ common common-deps , data-default , directory , filepath + , deepseq , hashable , microlens-platform , mtl diff --git a/hbs2-tests/test/TestChunkWriter.hs b/hbs2-tests/test/TestChunkWriter.hs index 9ccf861b..539f450a 100644 --- a/hbs2-tests/test/TestChunkWriter.hs +++ b/hbs2-tests/test/TestChunkWriter.hs @@ -3,9 +3,12 @@ module Main where import HBS2.Prelude import HBS2.Actors.ChunkWriter import HBS2.Hash +import HBS2.Clock import HBS2.Storage import HBS2.Storage.Simple +import Data.Maybe +import Control.Monad.Except import Control.Concurrent.Async import Control.Monad import Data.ByteString.Lazy (ByteString) @@ -19,6 +22,9 @@ import System.Random.MWC import System.Random.Shuffle import System.TimeIt +import Control.DeepSeq +import Control.Exception (evaluate) + import Data.List qualified as L import Prettyprinter @@ -70,17 +76,11 @@ main = do h2 <- getHash cw 1 hash -- commitBlock cw 1 hash - -- commitBlock cw 1 hash - print "JOPA" - commitBlock cw 1 hash - print "KITA" if Just hash /= h2 then do pure [1] else do - print "YAY!" commitBlock cw 1 hash - print "QQQ!" pure mempty mapM_ cancel $ w1 <> w2