diff --git a/hbs2-core/hbs2-core.cabal b/hbs2-core/hbs2-core.cabal index 4879e7d9..2ead3b61 100644 --- a/hbs2-core/hbs2-core.cabal +++ b/hbs2-core/hbs2-core.cabal @@ -106,6 +106,7 @@ library , deepseq , directory , filepath + , filelock , hashable , interpolatedstring-perl6 , memory diff --git a/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs b/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs index 3a310eb8..150dd01a 100644 --- a/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs +++ b/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs @@ -37,6 +37,7 @@ import System.FilePath import System.IO.Error import System.IO import System.IO.Temp +import System.FileLock import Control.Concurrent.Async @@ -62,11 +63,12 @@ data ChunkWriter h m = forall a . ( MonadIO m , Block ByteString ~ ByteString ) => ChunkWriter - { stopped :: TVar Bool - , pipeline :: Pipeline m () - , dir :: FilePath - , storage :: a - , perBlock :: Cache FilePath (TQueue (Handle -> IO ())) + { stopped :: TVar Bool + , pipeline :: Pipeline m () + , dir :: FilePath + , storage :: a + , perBlock :: Cache FilePath (TQueue (Handle -> IO ())) + , perBlockSem :: Cache FilePath TSem } @@ -152,6 +154,7 @@ newChunkWriterIO s tmp = do let d = fromMaybe def tmp mt <- liftIO $ Cache.newCache Nothing + mts <- liftIO $ Cache.newCache Nothing running <- liftIO $ newTVarIO False @@ -162,6 +165,7 @@ newChunkWriterIO s tmp = do , dir = d , storage = s , perBlock = mt + , perBlockSem = mts } makeFileName :: (Hashable salt, Pretty (Hash h)) => ChunkWriter h m -> salt -> Hash h -> FilePath @@ -268,6 +272,8 @@ writeChunk2 w salt h o bs = do let cache = perBlock w + -- liftIO $ print $ "writeChunk" <+> pretty o <+> pretty (B.length bs) <+> pretty h + liftIO $ do q <- Cache.fetchWithCache cache fn $ const Q0.newTQueueIO atomically $ Q0.writeTQueue q $ \fh -> do @@ -313,11 +319,13 @@ getHash1 w salt h = liftIO do flush w fn = do let cache = perBlock w + let scache = perBlockSem w liftIO $ do q <- Cache.fetchWithCache cache fn $ const Q0.newTQueueIO + s <- Cache.fetchWithCache scache fn $ const $ atomically $ Sem.newTSem 1 - -- atomically $ Sem.waitTSem s + atomically $ Sem.waitTSem s Cache.delete cache fn @@ -326,10 +334,10 @@ flush w fn = do liftIO $ do -- withBinaryFile fn ReadWriteMode $ \fh -> do - withFile fn ReadWriteMode $ \fh -> do + withBinaryFile fn ReadWriteMode $ \fh -> do for_ flushed $ \f -> f fh - -- atomically $ Sem.signalTSem s + atomically $ Sem.signalTSem s pure (length flushed) diff --git a/hbs2-core/lib/HBS2/Defaults.hs b/hbs2-core/lib/HBS2/Defaults.hs index 8baeb891..6fc7b163 100644 --- a/hbs2-core/lib/HBS2/Defaults.hs +++ b/hbs2-core/lib/HBS2/Defaults.hs @@ -40,7 +40,7 @@ defBlockWaitMax :: Timeout 'Seconds defBlockWaitMax = 10 :: Timeout 'Seconds defBlockWaitSleep :: Timeout 'Seconds -defBlockWaitSleep = 0.01 :: Timeout 'Seconds +defBlockWaitSleep = 0.1 :: Timeout 'Seconds defSweepTimeout :: Timeout 'Seconds defSweepTimeout = 5 -- FIXME: only for debug! diff --git a/hbs2-tests/hbs2-tests.cabal b/hbs2-tests/hbs2-tests.cabal index f7f12f4d..0d2b7b3c 100644 --- a/hbs2-tests/hbs2-tests.cabal +++ b/hbs2-tests/hbs2-tests.cabal @@ -23,27 +23,29 @@ common common-deps , bytestring , cache , containers + , data-default , directory , filepath , hashable , microlens-platform , mtl + , mwc-random , prettyprinter , QuickCheck , random + , random-shuffle , safe , serialise , stm , streaming , tasty , tasty-hunit + , temporary + , timeit , transformers , uniplate - , vector - , data-default - , mwc-random - , timeit , unordered-containers + , vector common shared-properties ghc-options: @@ -102,6 +104,20 @@ test-suite test-skey main-is: TestSKey.hs +test-suite test-cw + import: shared-properties + import: common-deps + default-language: Haskell2010 + + other-modules: + + -- other-extensions: + + type: exitcode-stdio-1.0 + hs-source-dirs: test + main-is: TestChunkWriter.hs + + executable test-peer-run import: shared-properties import: common-deps diff --git a/hbs2-tests/hie.yaml b/hbs2-tests/hie.yaml index 9e651b52..a7da04bd 100644 --- a/hbs2-tests/hie.yaml +++ b/hbs2-tests/hie.yaml @@ -6,5 +6,6 @@ cradle: - path: "test/TestSKey" component: "hbs2-tests:test:test-skey" - + - path: "test/TestChunkWriter" + component: "hbs2-tests:test:test-cw" diff --git a/hbs2-tests/test/Peer2Main.hs b/hbs2-tests/test/Peer2Main.hs index 7172b7e9..75f288f7 100644 --- a/hbs2-tests/test/Peer2Main.hs +++ b/hbs2-tests/test/Peer2Main.hs @@ -159,7 +159,7 @@ runTestPeer p zu = do cww <- newChunkWriterIO stor (Just chDir) sw <- liftIO $ replicateM 4 $ async $ simpleStorageWorker stor - cw <- liftIO $ replicateM 4 $ async $ runChunkWriter cww + cw <- liftIO $ replicateM 8 $ async $ runChunkWriter cww zu stor cww @@ -396,7 +396,7 @@ blockDownloadLoop cw = do wrt <- liftIO $ readTVarIO z if fromIntegral wrt >= thisBkSize then do - debug $ "THE BLOCK IS ABOUT TO BE READY" <+> pretty h + -- debug $ "THE BLOCK IS ABOUT TO BE READY" <+> pretty h h1 <- liftIO $ getHash cw key h if h1 == h then do liftIO $ commitBlock cw key h @@ -537,7 +537,7 @@ mkAdapter cww = do -- ПОСЧИТАТЬ ХЭШ -- ЕСЛИ СОШЁЛСЯ - ФИНАЛИЗИРОВАТЬ БЛОК -- ЕСЛИ НЕ СОШЁЛСЯ - ТО ПОДОЖДАТЬ ЕЩЕ - when ( h1 == h ) $ do + if ( h1 == h ) then do liftIO $ commitBlock cww cKey h updateStats @e False 1 @@ -545,6 +545,8 @@ mkAdapter cww = do expire cKey -- debug "hash matched!" emit @e (BlockChunksEventKey h) (BlockReady h) + else do + debug $ "FUCK FUCK!" <+> pretty h when (written > mbSize * defBlockDownloadThreshold) $ do debug $ "SESSION DELETED BECAUSE THAT PEER IS JERK:" <+> pretty p @@ -572,7 +574,7 @@ main = do let findBlk = hasBlock s -- let size = 1024*1024*1 - let size = 1024*1024*10 + let size = 1024*1024*30 g <- initialize $ U.fromList [fromIntegral p, fromIntegral size] bytes <- replicateM size $ uniformM g :: IO [Char] diff --git a/hbs2-tests/test/TestChunkWriter.hs b/hbs2-tests/test/TestChunkWriter.hs new file mode 100644 index 00000000..d1199547 --- /dev/null +++ b/hbs2-tests/test/TestChunkWriter.hs @@ -0,0 +1,74 @@ +module Main where + +import HBS2.Prelude +import HBS2.Actors.ChunkWriter +import HBS2.Hash +import HBS2.Storage +import HBS2.Storage.Simple + +import Control.Concurrent.Async +import Control.Monad +import Data.ByteString.Lazy (ByteString) +import Data.ByteString.Lazy.Char8 qualified as B8 +import Data.Functor +import qualified Data.Vector.Unboxed as U +import System.IO.Temp +import System.Random.MWC +import System.Random.Shuffle +import System.FilePath.Posix + +import Data.List qualified as L +import Prettyprinter + +main :: IO () +main = do + + -- let size = 1024*1024*1 + let size = 1024*1024 + let chu = 500 + + g <- initialize $ U.fromList [0xFAFA, 0xBEBE, 0xC0C0] + + withSystemTempDirectory "cww-test" $ \dir -> do + + failed <- replicateM 100 $ do + + bytes <- B8.pack <$> (replicateM size $ uniformM g) + + let hash = hashObject bytes + + let psz = calcChunks (fromIntegral size) (fromIntegral chu) + + let opts = [ StoragePrefix (dir ".test-cww") + ] + + storage <- simpleStorageInit opts :: IO (SimpleStorage HbSync) + + w1 <- replicateM 1 $ async (simpleStorageWorker storage) + + cw <- newChunkWriterIO storage (Just (dir ".qqq")) + + w2 <- replicateM 1 $ async $ runChunkWriter cw + + -- psz' <- shuffleM psz + psz' <- pure psz + + forConcurrently_ psz' $ \(o,s) -> do + let t = B8.take s $ B8.drop o bytes + writeChunk cw 1 hash (fromIntegral o) t + + h2 <- getHash cw 1 hash + -- h3 <- getHash cw 1 hash + + mapM_ cancel $ w1 <> w2 + + if hash /= h2 then do + pure [1] + else + pure mempty + + print $ "failed" <+> pretty (sum (mconcat failed)) + + pure () + + diff --git a/hbs2/Main.hs b/hbs2/Main.hs index a61e1aa5..fc32c43a 100644 --- a/hbs2/Main.hs +++ b/hbs2/Main.hs @@ -68,6 +68,11 @@ data CatOpts = } deriving stock (Data) +newtype HashOpts = + HashOpts + { hashFp :: FilePath + } + deriving stock (Data) newtype NewRefOpts = NewRefOpts @@ -75,6 +80,11 @@ newtype NewRefOpts = } deriving stock (Data) + +runHash :: HashOpts -> SimpleStorage HbSync -> IO () +runHash opts ss = do + pure () + runCat :: Data opts => opts -> SimpleStorage HbSync -> IO () runCat opts ss = do @@ -112,7 +122,7 @@ runCat opts ss = do maybe (error "empty ref") walk mbHead -runStore :: Data opts => opts -> SimpleStorage HbSync -> IO () +runStore ::(Data opts, Block ByteString ~ ByteString) => opts -> SimpleStorage HbSync -> IO () runStore opts ss | justInit = do putStrLn "initialized" @@ -169,6 +179,7 @@ main = join . customExecParser (prefs showHelpOnError) $ parser = hsubparser ( command "store" (info pStore (progDesc "store block")) <> command "new-ref" (info pNewRef (progDesc "creates reference")) <> command "cat" (info pCat (progDesc "cat block")) + <> command "hash" (info pHash (progDesc "calculates hash")) ) common = do @@ -193,3 +204,9 @@ main = join . customExecParser (prefs showHelpOnError) $ onlyh <- optional $ flag' True ( short 'H' <> long "hashes-only" <> help "list only block hashes" ) pure $ withStore o $ runCat $ CatOpts hash (CatHashesOnly <$> onlyh) + pHash = do + o <- common + hash <- strArgument ( metavar "HASH" ) + pure $ withStore o $ runHash $ HashOpts hash + +