From 8a2d1539145bde5c7bf708a2d357166594b855cf Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Thu, 26 Jan 2023 13:26:37 +0300 Subject: [PATCH] fucking fuck --- hbs2-core/lib/HBS2/Actors/ChunkWriter.hs | 31 ++++++++--- hbs2-core/lib/HBS2/Defaults.hs | 4 +- hbs2-tests/test/Peer2Main.hs | 65 ++++++++++++++---------- hbs2-tests/test/TestChunkWriter.hs | 12 ++++- 4 files changed, 73 insertions(+), 39 deletions(-) diff --git a/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs b/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs index de6db0e8..a54186d1 100644 --- a/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs +++ b/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs @@ -18,6 +18,7 @@ import HBS2.Storage import HBS2.Defaults import HBS2.Clock +import Control.Monad.Trans.Maybe import Data.List qualified as L import Data.Functor import Data.Function @@ -41,6 +42,7 @@ import System.FileLock import Control.Concurrent.Async +import Control.Monad import Data.Cache (Cache) import Data.Cache qualified as Cache import Control.Concurrent.STM @@ -186,7 +188,7 @@ getHash :: forall salt h m . => ChunkWriter h m -> salt -> Hash h - -> m (Hash h) + -> m (Maybe (Hash h)) getHash = getHash2 @@ -231,7 +233,6 @@ writeChunk2 w salt h o bs = do flush :: ChunkWriter h IO -> FilePath -> IO () flush w fn = do let cache = perBlock w - let sems = perBlockSem w let pip = pipeline w liftIO $ do @@ -268,11 +269,16 @@ getHash2 :: forall salt h m . => ChunkWriter h IO -> salt -> Hash h - -> m (Hash h) + -> m (Maybe (Hash h)) getHash2 w salt h = do flush w fn - liftIO $ hashObject @h <$> B.readFile fn + + runMaybeT $ do + res <- liftIO $ tryJust (guard . isDoesNotExistError) + ( B.readFile fn >>= \s -> pure $ hashObject @h s ) + + MaybeT $ pure $ either (const Nothing) Just res where fn = makeFileName w salt h @@ -292,10 +298,21 @@ commitBlock2 :: forall salt h m . -> m () commitBlock2 w@(ChunkWriter {storage = stor}) salt h = do + + print "FLUSHING" + flush w fn - s <- liftIO $ B.readFile fn - void $! putBlock stor s - delBlock w salt h + + print "FLUSHED" + + res <- liftIO $ tryJust (guard . isDoesNotExistError) + ( B.readFile fn ) + + case res of + Left _ -> pure () + Right s -> do + void $ putBlock stor s + delBlock w salt h where fn = makeFileName w salt h diff --git a/hbs2-core/lib/HBS2/Defaults.hs b/hbs2-core/lib/HBS2/Defaults.hs index 088d0259..dfc83c1d 100644 --- a/hbs2-core/lib/HBS2/Defaults.hs +++ b/hbs2-core/lib/HBS2/Defaults.hs @@ -37,10 +37,10 @@ defBlockInfoTimeout = toTimeSpec ( 60 :: Timeout 'Minutes) -- how much time wait for block from peer? defBlockWaitMax :: Timeout 'Seconds -defBlockWaitMax = 120 :: Timeout 'Seconds +defBlockWaitMax = 20 :: Timeout 'Seconds defBlockWaitSleep :: Timeout 'Seconds -defBlockWaitSleep = 0.1 :: Timeout 'Seconds +defBlockWaitSleep = 1 :: Timeout 'Seconds defSweepTimeout :: Timeout 'Seconds defSweepTimeout = 5 -- FIXME: only for debug! diff --git a/hbs2-tests/test/Peer2Main.hs b/hbs2-tests/test/Peer2Main.hs index 1e36c87b..827b48af 100644 --- a/hbs2-tests/test/Peer2Main.hs +++ b/hbs2-tests/test/Peer2Main.hs @@ -383,28 +383,33 @@ blockDownloadLoop cw = do subscribe @e (BlockChunksEventKey (coo,h)) $ \(BlockReady _) -> do processBlock q h - -- let blockWtf = do - -- debug $ "WTF!" <+> pretty (p,coo) <+> pretty h + let blockWtf = do + debug $ "WTF!" <+> pretty (p,coo) <+> pretty h - -- liftIO $ async $ do - -- -- FIXME: block is not downloaded, return it to the Q - -- void $ race (pause defBlockWaitMax >> blockWtf) - -- $ withPeerM env $ fix \next -> do - -- w <- find @e key (view sBlockWrittenT) + liftIO $ async $ do + -- FIXME: block is not downloaded, return it to the Q + void $ race (pause defBlockWaitMax >> blockWtf) + $ withPeerM env $ fix \next -> do + w <- find @e key (view sBlockWrittenT) - -- maybe1 w (pure ()) $ \z -> do - -- wrt <- liftIO $ readTVarIO z + maybe1 w (pure ()) $ \z -> do + wrt <- liftIO $ readTVarIO z - -- if fromIntegral wrt >= thisBkSize then do - -- -- debug $ "THE BLOCK IS ABOUT TO BE READY" <+> pretty h - -- h1 <- liftIO $ getHash cw key h - -- if h1 == h then do - -- liftIO $ commitBlock cw key h - -- -- expire @e key - -- else pause defBlockWaitSleep >> next - -- else do - -- pause defBlockWaitSleep - -- next + if fromIntegral wrt >= thisBkSize then do + h1 <- liftIO $ getHash cw key h + if | h1 == Just h -> do + liftIO $ commitBlock cw key h + expire @e key + + | h1 /= Just h -> do + debug "block fucked" + pause defBlockWaitMax -- + + | otherwise -> pure () + + else do + pause defBlockWaitSleep + next request @e p (BlockChunks @e coo (BlockGetAllChunks @e h chusz)) -- FIXME: nicer construction @@ -538,17 +543,21 @@ mkAdapter cww = do -- ПОСЧИТАТЬ ХЭШ -- ЕСЛИ СОШЁЛСЯ - ФИНАЛИЗИРОВАТЬ БЛОК -- ЕСЛИ НЕ СОШЁЛСЯ - ТО ПОДОЖДАТЬ ЕЩЕ - if ( h1 == h ) then do - liftIO $ commitBlock cww cKey h - -- debug "GOT BLOCK!" + if | h1 == Just h -> do + liftIO $ commitBlock cww cKey h + + updateStats @e False 1 + + expire cKey + -- debug "hash matched!" + emit @e (BlockChunksEventKey (c,h)) (BlockReady h) + + | h1 /= Just h -> do + debug "chunk receiving failed" + + | otherwise -> pure () - updateStats @e False 1 - expire cKey - -- debug "hash matched!" - emit @e (BlockChunksEventKey (c,h)) (BlockReady h) - else do - debug $ "FUCK FUCK!" <+> pretty h when (written > mbSize * defBlockDownloadThreshold) $ do debug $ "SESSION DELETED BECAUSE THAT PEER IS JERK:" <+> pretty p diff --git a/hbs2-tests/test/TestChunkWriter.hs b/hbs2-tests/test/TestChunkWriter.hs index 61093003..9ccf861b 100644 --- a/hbs2-tests/test/TestChunkWriter.hs +++ b/hbs2-tests/test/TestChunkWriter.hs @@ -42,7 +42,7 @@ main = do w1 <- replicateM 2 $ async (simpleStorageWorker storage) - cw <- newChunkWriterIO storage (Just (dir ".qqq")) + cw <- newChunkWriterIO @HbSync storage (Just (dir ".qqq")) w2 <- replicateM 2 $ async $ runChunkWriter cw @@ -69,10 +69,18 @@ main = do h2 <- getHash cw 1 hash - if hash /= h2 then do + -- commitBlock cw 1 hash + -- commitBlock cw 1 hash + print "JOPA" + commitBlock cw 1 hash + print "KITA" + + if Just hash /= h2 then do pure [1] else do + print "YAY!" commitBlock cw 1 hash + print "QQQ!" pure mempty mapM_ cancel $ w1 <> w2