From 9b7c22414bc9190b286aaad72103a6099196236b Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Tue, 28 Mar 2023 11:22:58 +0300 Subject: [PATCH] fixed BCXLsnhgWC reliable-storage-write --- .fixme/log | 3 +- hbs2-core/lib/HBS2/Actors/Peer.hs | 2 + hbs2-core/lib/HBS2/Defaults.hs | 2 +- hbs2-core/lib/HBS2/Storage.hs | 4 + hbs2-storage-simple/hbs2-storage-simple.cabal | 8 + .../lib/HBS2/Storage/Simple.hs | 39 ++++- .../lib/HBS2/Storage/Simple/Extra.hs | 38 +++++ hbs2-tests/hbs2-tests.cabal | 46 +++++ hbs2-tests/test/TestConcurrentWrite.hs | 159 ++++++++++++++++++ hbs2/Main.hs | 19 ++- hbs2/hbs2.cabal | 2 +- 11 files changed, 309 insertions(+), 13 deletions(-) create mode 100644 hbs2-tests/test/TestConcurrentWrite.hs diff --git a/.fixme/log b/.fixme/log index abca36b0..193e8f36 100644 --- a/.fixme/log +++ b/.fixme/log @@ -1,3 +1,2 @@ -(fixme-set "assigned" "9Y2v3fXdhz" "voidlizard") -(fixme-set "workflow" "wip" "9Y2v3fXdhz") \ No newline at end of file +(fixme-set "workflow" "test" "BCXLsnhgWC") \ No newline at end of file diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index 08a364be..e0b0e475 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -55,6 +55,8 @@ instance (IsKey HbSync, Key HbSync ~ Hash HbSync, Block ByteString ~ ByteString) hasBlock (AnyStorage s) = hasBlock s updateRef (AnyStorage s) = updateRef s getRef (AnyStorage s) = getRef s + delBlock (AnyStorage s) = delBlock s + delRef (AnyStorage s) = delRef s data AnyMessage enc e = AnyMessage !Integer !(Encoded e) deriving stock (Generic) diff --git a/hbs2-core/lib/HBS2/Defaults.hs b/hbs2-core/lib/HBS2/Defaults.hs index 6a03421d..74f6f01c 100644 --- a/hbs2-core/lib/HBS2/Defaults.hs +++ b/hbs2-core/lib/HBS2/Defaults.hs @@ -77,7 +77,7 @@ defBlockWaitMax = 1 :: Timeout 'Seconds -- how much time wait for block from peer? defChunkWaitMax :: Timeout 'Seconds -defChunkWaitMax = 0.5 :: Timeout 'Seconds +defChunkWaitMax = 1 :: Timeout 'Seconds defSweepTimeout :: Timeout 'Seconds defSweepTimeout = 30 -- FIXME: only for debug! diff --git a/hbs2-core/lib/HBS2/Storage.hs b/hbs2-core/lib/HBS2/Storage.hs index c14fc4f9..e4d1e117 100644 --- a/hbs2-core/lib/HBS2/Storage.hs +++ b/hbs2-core/lib/HBS2/Storage.hs @@ -41,6 +41,8 @@ class ( Monad m getBlock :: a -> Key h -> m (Maybe (Block block)) + delBlock :: a -> Key h -> m () + getChunk :: a -> Key h -> Offset -> Size -> m (Maybe (Block block)) hasBlock :: a -> Key h -> m (Maybe Integer) @@ -49,6 +51,8 @@ class ( Monad m getRef :: Hashed h k => a -> k -> m (Maybe (Key h)) + delRef :: Hashed h k => a -> k -> m () + calcChunks :: forall a b . (Integral a, Integral b) => Integer -- | block size -> Integer -- | chunk size diff --git a/hbs2-storage-simple/hbs2-storage-simple.cabal b/hbs2-storage-simple/hbs2-storage-simple.cabal index feb9745c..0931ed13 100644 --- a/hbs2-storage-simple/hbs2-storage-simple.cabal +++ b/hbs2-storage-simple/hbs2-storage-simple.cabal @@ -62,6 +62,7 @@ library -- other-extensions: build-depends: base, hbs2-core , async + , atomic-write , bytestring , bytestring-mmap , cache @@ -77,6 +78,8 @@ library , transformers , uniplate , unordered-containers + , temporary + , filepattern hs-source-dirs: lib @@ -120,3 +123,8 @@ test-suite test , vector + + + + + diff --git a/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs b/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs index 53f7c8d7..cf39b4f6 100644 --- a/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs +++ b/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs @@ -34,6 +34,9 @@ import System.Directory import System.FilePath.Posix import System.IO import System.IO.Error +import System.IO.Temp +import System.AtomicWrite.Writer.LazyByteString qualified as AwLBS +import System.AtomicWrite.Writer.ByteString qualified as AwBS import Data.HashMap.Strict qualified as HashMap import Data.HashMap.Strict (HashMap) @@ -88,6 +91,10 @@ storageBlocks = to f where f b = _storageDir b "blocks" +storageTemp :: SimpleGetter (SimpleStorage h) FilePath +storageTemp = to f + where + f b = _storageDir b "temp" storageRefs :: SimpleGetter (SimpleStorage h) FilePath storageRefs = to f @@ -131,6 +138,7 @@ simpleStorageInit opts = liftIO $ do <*> TV.newTVarIO mempty createDirectoryIfMissing True (stor ^. storageBlocks) + createDirectoryIfMissing True (stor ^. storageTemp) let alph = getAlphabet @@ -272,6 +280,8 @@ simplePutBlockLazy doWait s lbs = do let hash = hashObject lbs let fn = simpleBlockFileName s hash + let fntmp = takeFileName fn + let tmp = view storageTemp s stop <- atomically $ TV.readTVar ( s ^. storageStopWriting ) @@ -286,10 +296,13 @@ simplePutBlockLazy doWait s lbs = do let action | size > 0 = atomically $ TBQ.writeTBQueue waits True | otherwise = do - catch (LBS.writeFile fn lbs) - (\(_ :: IOError) -> atomically $ TBQ.writeTBQueue waits False) - - atomically $ TBQ.writeTBQueue waits True + handle (\(_ :: IOError) -> atomically $ TBQ.writeTBQueue waits False) + do + withTempFile tmp fntmp $ \tname h -> do + BS.hPut h (LBS.toStrict lbs) + hClose h + renameFile tname fn + atomically $ TBQ.writeTBQueue waits True simpleAddTask s action @@ -337,7 +350,7 @@ simpleWriteLinkRaw ss h lbs = do runMaybeT $ do r <- MaybeT $ putBlock ss lbs MaybeT $ liftIO $ spawnAndWait ss $ do - BS.writeFile fnr (toByteString (AsBase58 r)) + AwBS.atomicWriteFile fnr (toByteString (AsBase58 r)) `catchAny` \_ -> do err $ "simpleWriteLinkRaw" <+> pretty h <+> pretty fnr @@ -355,7 +368,7 @@ simpleWriteLinkRawRef :: forall h . ( IsSimpleStorageKey h simpleWriteLinkRawRef ss h ref = do let fnr = simpleRefFileName ss h void $ spawnAndWait ss $ do - BS.writeFile fnr (toByteString (AsBase58 ref)) + AwBS.atomicWriteFile fnr (toByteString (AsBase58 ref)) `catchAny` \_ -> do err $ "simpleWriteLinkRawRef" <+> pretty h <+> pretty ref <+> pretty fnr @@ -387,7 +400,6 @@ simpleReadLinkVal :: ( IsKey h simpleReadLinkVal ss hash = do let fn = simpleRefFileName ss hash rs <- spawnAndWait ss $ do - -- FIXME: log-this-situation (Just <$> BS.readFile fn) `catchAny` \_ -> do err $ "simpleReadLinkVal" <+> pretty hash <+> pretty fn pure Nothing @@ -426,3 +438,16 @@ instance ( MonadIO m, IsKey hash parsed <- MaybeT $ pure $ fromByteString bss pure $ unAsBase58 parsed + delBlock ss h = do + let fn = simpleBlockFileName ss h + void $ liftIO $ spawnAndWait ss do + exists <- doesFileExist fn + when exists (removeFile fn) + + delRef ss ref = do + let refHash = hashObject @hash ref + let fn = simpleRefFileName ss refHash + void $ liftIO $ spawnAndWait ss $ do + here <- doesFileExist fn + when here (removeFile fn) + diff --git a/hbs2-storage-simple/lib/HBS2/Storage/Simple/Extra.hs b/hbs2-storage-simple/lib/HBS2/Storage/Simple/Extra.hs index b09b9204..0dd8c1c4 100644 --- a/hbs2-storage-simple/lib/HBS2/Storage/Simple/Extra.hs +++ b/hbs2-storage-simple/lib/HBS2/Storage/Simple/Extra.hs @@ -9,10 +9,20 @@ import HBS2.Storage.Simple import HBS2.Data.Types.Refs import HBS2.Defaults +import Data.Foldable (for_) import Data.Bifunctor import Data.ByteString.Lazy (ByteString) import Data.ByteString.Lazy qualified as B import Data.Function +import Lens.Micro.Platform +import System.FilePattern.Directory +import Data.ByteString.Lazy.Char8 qualified as LBS +import Data.ByteString.Char8 qualified as BS +import System.FilePath +import Data.Maybe +import Control.Concurrent.STM +import Control.Concurrent.Async +import Control.Monad import Streaming.Prelude qualified as S import Streaming qualified as S @@ -80,3 +90,31 @@ instance Block ByteString ~ ByteString => SimpleStorageExtra ByteString where pure (MerkleHash root) + +simpleStorageFsck :: forall h . (IsSimpleStorageKey h, Hashed h ByteString) + => SimpleStorage h + -> IO [(Maybe (Hash HbSync), FilePath)] + +simpleStorageFsck sto = do + let fblock = view storageBlocks sto + + files <- getDirectoryFiles fblock ["**/*"] + + -- FIXME: thread-num-hardcode + bad <- forM files $ \f -> do + let fname = fblock f + let ha = splitDirectories f & mconcat & fromStringMay @(Hash HbSync) + case ha of + Just hash -> do + hr <- BS.readFile fname <&> hashObject @HbSync + if hr == hash then do + pure [] + else + pure [(Just hash, fname)] + + Nothing -> do + pure [(Nothing, fname)] + + pure $ mconcat bad + + diff --git a/hbs2-tests/hbs2-tests.cabal b/hbs2-tests/hbs2-tests.cabal index 09b49919..40f3b3b2 100644 --- a/hbs2-tests/hbs2-tests.cabal +++ b/hbs2-tests/hbs2-tests.cabal @@ -354,3 +354,49 @@ executable test-walk-tree-meta -- , vector -- , fast-logger + +test-suite test-concurrent-write + import: shared-properties + import: common-deps + default-language: Haskell2010 + + + ghc-options: + -threaded + -rtsopts + "-with-rtsopts=-N6 -A64m -AL256m -I0" + + other-modules: + + -- other-extensions: + + type: exitcode-stdio-1.0 + hs-source-dirs: test + main-is: TestConcurrentWrite.hs + + build-depends: + base, hbs2-storage-simple, hbs2-core + , async + , bytestring + , cborg + , containers + , directory + , filepath + , hashable + , microlens-platform + , mtl + , prettyprinter + , QuickCheck + , stm + , random + , safe + , serialise + , tasty + , tasty-hunit + , temporary + , timeit + , uniplate + , vector + , terminal-progress-bar + + diff --git a/hbs2-tests/test/TestConcurrentWrite.hs b/hbs2-tests/test/TestConcurrentWrite.hs new file mode 100644 index 00000000..84df46aa --- /dev/null +++ b/hbs2-tests/test/TestConcurrentWrite.hs @@ -0,0 +1,159 @@ +module Main where + +import HBS2.Prelude.Plated +import HBS2.Defaults +import HBS2.Hash +import HBS2.Clock +import HBS2.Storage.Simple + +import HBS2.System.Logger.Simple + +import Test.QuickCheck +import Test.Tasty.HUnit + +import Control.Concurrent.Async +import Control.Monad +import Data.ByteString.Lazy.Char8 (ByteString) +import Data.ByteString.Lazy.Char8 qualified as LBS +import Lens.Micro.Platform +import System.Directory +import System.FilePath.Posix +import System.IO.Temp +import Control.Concurrent.STM +import System.ProgressBar +import Control.Concurrent +import System.IO + +randomByteString :: Int -> Gen ByteString +randomByteString n = vectorOf n arbitrary <&> LBS.pack +{-# NOINLINE randomByteString #-} + +{-# NOINLINE randomSizedByteString #-} +randomSizedByteString :: Gen ByteString +randomSizedByteString = do + let low = 0 + let high = 256 -- ceiling $ realToFrac defBlockSize * 1.5 + size <- choose (low, high) + randomByteString size + +waitTime :: Timeout 'Seconds +waitTime = 30 + +testSimpleStorageRandomReadWrite :: IO () +testSimpleStorageRandomReadWrite = do + + withTempDirectory "." "simpleStorageTest" $ \dir -> do + + let opts = [ StoragePrefix (dir ".storage") + ] + + storage <- simpleStorageInit [StoragePrefix (dir ".storage")] :: IO (SimpleStorage HbSync) + + exists <- doesDirectoryExist ( storage ^. storageBlocks ) + + assertBool "blocks directory exists" exists + + workers <- replicateM 8 $ async (simpleStorageWorker storage) + + blkQ <- newTQueueIO + err <- newTVarIO 0 + errHash <- newTVarIO 0 + done <- newTVarIO 0 + + let succErrIO v = atomically $ modifyTVar v succ + + let tot = toMicroSeconds waitTime + let st = defStyle { styleWidth = ConstantWidth 50 } + mon1 <- newProgressBar st 10 (Progress 0 tot ()) + + prog <- async $ forever do + let w = 1 + pause @'Seconds w + incProgress mon1 (toMicroSeconds w) + + producer <- async $ void $ race ( pause @'Seconds (waitTime + 0.25) ) $ do + replicateConcurrently 6 do + forever do + bs <- generate randomSizedByteString + times <- generate (elements [1,1,1,1,2]) + replicateConcurrently times $ do + ha <- putBlock storage bs + atomically $ writeTQueue blkQ ha + + checker <- async $ forever do + bh <- atomically $ readTQueue blkQ + + case bh of + Nothing -> do + succErrIO err + -- hPrint stderr "error 1" + + Just h -> do + blk <- getBlock storage h + case blk of + Nothing -> do + succErrIO err + -- hPrint stderr "error 2" + + Just s -> do + let hash = hashObject s + if hash /= h then do + succErrIO errHash + else do + succErrIO done + -- hPrint stderr "error 3" + + wait producer + + void $ waitAnyCatchCancel $ producer : prog : checker : workers + + e1 <- readTVarIO err + e2 <- readTVarIO errHash + ok <- readTVarIO done + + notice $ "errors:" <+> pretty e1 <+> pretty e2 + notice $ "blocks done:" <+> pretty ok + + assertEqual "errors1" e1 0 + assertEqual "errors2" e2 0 + +logPrefix s = set loggerTr (s <>) + +tracePrefix :: SetLoggerEntry +tracePrefix = logPrefix "[trace] " + +debugPrefix :: SetLoggerEntry +debugPrefix = logPrefix "[debug] " + +errorPrefix :: SetLoggerEntry +errorPrefix = logPrefix "[error] " + +warnPrefix :: SetLoggerEntry +warnPrefix = logPrefix "[warn] " + +noticePrefix :: SetLoggerEntry +noticePrefix = logPrefix "[notice] " + + +main :: IO () +main = do + -- hSetBuffering stdout LineBuffering + -- hSetBuffering stderr LineBuffering + + setLogging @DEBUG debugPrefix + setLogging @INFO defLog + setLogging @ERROR errorPrefix + setLogging @WARN warnPrefix + setLogging @NOTICE noticePrefix + setLoggingOff @TRACE + + testSimpleStorageRandomReadWrite + + + setLoggingOff @DEBUG + setLoggingOff @INFO + setLoggingOff @ERROR + setLoggingOff @WARN + setLoggingOff @NOTICE + setLoggingOff @TRACE + diff --git a/hbs2/Main.hs b/hbs2/Main.hs index e9faa5b5..da7cb6dc 100644 --- a/hbs2/Main.hs +++ b/hbs2/Main.hs @@ -46,6 +46,7 @@ import Lens.Micro.Platform -- import System.FilePath.Posix import System.IO import System.Exit +import System.ProgressBar import Codec.Serialise @@ -509,6 +510,8 @@ main = join . customExecParser (prefs showHelpOnError) $ parser = hsubparser ( command "store" (info pStore (progDesc "store block")) <> command "cat" (info pCat (progDesc "cat block")) <> command "hash" (info pHash (progDesc "calculates hash")) + <> command "fsck" (info pFsck (progDesc "check storage constistency")) + <> command "del" ( info pDel (progDesc "del block")) <> command "keyring-new" (info pNewKey (progDesc "generates a new keyring")) <> command "keyring-list" (info pKeyList (progDesc "list public keys from keyring")) <> command "keyring-key-add" (info pKeyAdd (progDesc "adds a new keypair into the keyring")) @@ -616,6 +619,18 @@ main = join . customExecParser (prefs showHelpOnError) $ reflogs <- strArgument ( metavar "REFLOG" ) pure $ withStore o (runRefLogGet reflogs) - -- o <- common - -- reflog <- strArgument ( metavar "REFLOG-HASH" ) + pFsck = do + o <- common + pure $ withStore o $ \sto -> do + rs <- simpleStorageFsck sto + forM_ rs $ \(h,f) -> do + print $ fill 24 (pretty f) <+> pretty h + + -- TODO: reflog-del-command + pDel = do + o <- common + h <- strArgument ( metavar "HASH" ) + pure $ withStore o $ \sto -> do + delBlock sto h + diff --git a/hbs2/hbs2.cabal b/hbs2/hbs2.cabal index 39b4e654..1fd3f006 100644 --- a/hbs2/hbs2.cabal +++ b/hbs2/hbs2.cabal @@ -89,7 +89,7 @@ executable hbs2 , transformers , uniplate , uuid - + , terminal-progress-bar hs-source-dirs: . default-language: Haskell2010