diff --git a/hbs2-core/lib/HBS2/Actors.hs b/hbs2-core/lib/HBS2/Actors.hs index 90222ce4..0c303c7a 100644 --- a/hbs2-core/lib/HBS2/Actors.hs +++ b/hbs2-core/lib/HBS2/Actors.hs @@ -18,6 +18,7 @@ import Control.Concurrent.Async import Data.Function import Data.Functor import Data.Kind +import Control.Concurrent data Pipeline m a = Pipeline @@ -37,7 +38,7 @@ runPipeline pip = fix \next -> do case mbJob of Nothing -> pure () - Just job -> void job >> next + Just job -> void (liftIO yield >> job) >> next stopPipeline :: MonadIO m => Pipeline m a -> m () stopPipeline pip = liftIO $ do diff --git a/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs b/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs index 3882b5b3..c1888fdc 100644 --- a/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs +++ b/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs @@ -109,10 +109,12 @@ runChunkWriter2 w = do -- yield -- next + -- liftIO $ print "runChunkWriter2" + stop <- liftIO $ readTVarIO (stopped w) if stop then do - ks <- liftIO $ take 100 <$> Cache.keys cache + ks <- liftIO $ Cache.keys cache liftIO $ for_ ks $ \k -> flush w k else do ks <- liftIO $ Cache.keys cache @@ -120,7 +122,8 @@ runChunkWriter2 w = do amount <- for ks $ \k -> flush w k if (sum amount == 0) then do - pause ( 0.5 :: Timeout 'Seconds ) + pure () + -- pause ( 0.5 :: Timeout 'Seconds ) else do liftIO $ print ("flushed:" <+> pretty (sum amount)) @@ -315,10 +318,13 @@ flush w fn = do let cache = perBlock w let scache = semFlush w liftIO $ do - q <- Cache.fetchWithCache cache fn $ const Q0.newTQueueIO - s <- Cache.fetchWithCache scache fn $ const (atomically $ Sem.newTSem 2) - atomically $ Sem.waitTSem s + print "flush" + + q <- Cache.fetchWithCache cache fn $ const Q0.newTQueueIO + s <- Cache.fetchWithCache scache fn $ const (atomically $ Sem.newTSem 100) + + -- atomically $ Sem.waitTSem s Cache.delete cache fn @@ -330,7 +336,7 @@ flush w fn = do withFile fn ReadWriteMode $ \fh -> do for_ flushed $ \f -> f fh - atomically $ Sem.signalTSem s + -- atomically $ Sem.signalTSem s pure (length flushed) diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index a86ebcbe..e502896c 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -303,7 +303,7 @@ runPeerM s bus p f = do <*> liftIO (newTVarIO mempty) let de = view envDeferred env - as <- liftIO $ replicateM 1 $ async $ runPipeline de + as <- liftIO $ replicateM 2 $ async $ runPipeline de sw <- liftIO $ async $ forever $ withPeerM env $ do pause defSweepTimeout @@ -313,7 +313,7 @@ runPeerM s bus p f = do void $ runReaderT (fromPeerM f) env void $ liftIO $ stopPipeline de - liftIO $ mapM_ cancel as + liftIO $ mapM_ cancel (as <> [sw]) withPeerM :: MonadIO m => PeerEnv e -> PeerM e m a -> m () withPeerM env action = void $ runReaderT (fromPeerM action) env @@ -378,7 +378,6 @@ instance ( HasProtocol e p sendTo fab (To who) (From self) bs - instance ( MonadIO m , HasProtocol e p , Sessions e p m diff --git a/hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs b/hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs index a1c4a1fe..e304c1c7 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs @@ -86,6 +86,8 @@ blockChunksProto adapter (BlockChunks c p) = case p of BlockGetAllChunks h size -> deferred proto do + liftIO $ print $ "BlockGetAllChunks" <+> pretty h + me <- ownPeer @e who <- thatPeer proto diff --git a/hbs2-tests/test/Peer2Main.hs b/hbs2-tests/test/Peer2Main.hs index 0e0492d2..b4b927db 100644 --- a/hbs2-tests/test/Peer2Main.hs +++ b/hbs2-tests/test/Peer2Main.hs @@ -151,7 +151,7 @@ runTestPeer p zu = do cww <- newChunkWriterIO stor (Just chDir) sw <- liftIO $ replicateM 4 $ async $ simpleStorageWorker stor - cw <- liftIO $ replicateM 4 $ async $ runChunkWriter cww + cw <- liftIO $ replicateM 8 $ async $ runChunkWriter cww zu stor cww @@ -230,7 +230,7 @@ blockDownloadLoop cw = do wip <- liftIO $ blocksInProcess cw - if wip > 10 then do + if wip > 200 then do pause ( 1 :: Timeout 'Seconds ) else do case job of @@ -254,6 +254,9 @@ blockDownloadLoop cw = do where initDownload anyway q p h s = do + + debug $ "initDownload" <+> pretty h <+> pretty p <+> pretty s + sto <- getStorage here <- liftIO $ hasBlock sto h <&> isJust @@ -352,12 +355,23 @@ mkAdapter cww = do -- check if there is a session -- FIXME: -- TODO: log situation when no session + + ddd <- lift $ find cKey id + + when (isNothing ddd) $ do + debug "SESSION NOT FOUND!" + + dwnld <- MaybeT $ find cKey id + -- dwnld <- maybe1 dwnld' (debug "AAAA") $ pure + + -- debug "session found!" + let bslen = fromIntegral $ B8.length bs - let mbSize = view sBlockSize dwnld let mbChSize = view sBlockChunkSize dwnld + let mbSize = view sBlockSize dwnld let offset0 = fromIntegral n * fromIntegral mbChSize :: Offset @@ -367,12 +381,47 @@ mkAdapter cww = do let written = view sBlockWritten dwnld + bslen let maxOff = max offset0 (view sBlockOffset dwnld) - lift $ update dwnld cKey ( set sBlockOffset maxOff - . set sBlockWritten written + lift $ update dwnld cKey ( over sBlockOffset (max maxOff) + . over sBlockWritten (+ bslen) ) - let mbDone = (maxOff + fromIntegral mbChSize) > fromIntegral mbSize - && written >= mbSize + -- this is updating concurrently, + -- so collect last data from all posible threads + writtenLast <- MaybeT $ find cKey (view sBlockWritten) + maxOffLast <- MaybeT $ find cKey (view sBlockOffset) + + -- debug $ "gotShit" <+> pretty (B8.length bs) <+> pretty (writtenLast) <+> pretty (wr + -- debug $ "writtenLast" <+> pretty writtenLast + + -- теперь, что бы нас дидосить -- можно + -- после 2/3 пакетов посылать пакеты из этих двух третей + -- или вообще мусор, а мы тут будем считать хэши. + -- + -- кстати, если посылать мусор - то мы еще и на диск + -- его будем бесконечно писать. + -- + -- допустим, пришло 4 события. в обработчике каждого --- + -- мы не знаем, что там еще 3 события. + -- + -- таким образом, ни про какое событие нельзя понять, + -- что оно последнее + -- + -- как же нам узнать, что пришёл последний блок и можно + -- его коммитить? + -- + + let mbDone = (maxOffLast + fromIntegral mbChSize) > fromIntegral mbSize + && writtenLast >= ( (mbSize * 2) `div` 3 ) + + debug $ "blkAcceptChunk" <+> pretty (p,c) + <+> pretty maxOffLast + -- <+> pretty n + -- <+> pretty (B8.length bs) + -- <+> pretty + + + -- debug $ "written:" <+> pretty written <+> "/" <+> pretty mbSize + when mbDone $ lift do @@ -414,7 +463,8 @@ main = do others <- forM ps $ \p -> async $ runTestPeer p $ \s cw -> do let findBlk = hasBlock s - let size = 1024*1024*1 + -- let size = 1024*1024*1 + let size = 256*1024 g <- initialize $ U.fromList [fromIntegral p, fromIntegral size] bytes <- replicateM size $ uniformM g :: IO [Char] @@ -462,11 +512,14 @@ main = do as <- liftIO $ async $ withPeerM env (blockDownloadLoop cw) - runProto @Fake - [ makeResponse (blockSizeProto blk handleBlockInfo) - , makeResponse (blockChunksProto adapter) - , makeResponse blockAnnounceProto - ] + me <- liftIO $ replicateM 1 $ async $ liftIO $ withPeerM env $ do + runProto @Fake + [ makeResponse (blockSizeProto blk handleBlockInfo) + , makeResponse (blockChunksProto adapter) + , makeResponse blockAnnounceProto + ] + + liftIO $ mapM_ wait me liftIO $ cancel as