wip. now looks working, but still slow

This commit is contained in:
Dmitry Zuikov 2023-01-24 11:36:01 +03:00
parent 3f6e483299
commit eef460c439
4 changed files with 40 additions and 17 deletions

View File

@ -238,7 +238,7 @@ sweep = do
ex <- asks (view envExpireTimes) ex <- asks (view envExpireTimes)
sw <- asks (view envSweepers) sw <- asks (view envSweepers)
liftIO $ print "sweep" liftIO $ print $ pretty "sweep"
liftIO $ Cache.purgeExpired ex liftIO $ Cache.purgeExpired ex
toSweep <- HashMap.toList <$> liftIO (readTVarIO sw) toSweep <- HashMap.toList <$> liftIO (readTVarIO sw)

View File

@ -30,10 +30,10 @@ defProtoPipelineSize :: Int
defProtoPipelineSize = 65536*4 defProtoPipelineSize = 65536*4
defCookieTimeout :: TimeSpec defCookieTimeout :: TimeSpec
defCookieTimeout = toTimeSpec ( 1 :: Timeout 'Minutes) defCookieTimeout = toTimeSpec ( 60 :: Timeout 'Minutes)
defBlockInfoTimeout :: TimeSpec defBlockInfoTimeout :: TimeSpec
defBlockInfoTimeout = toTimeSpec ( 1 :: Timeout 'Minutes) defBlockInfoTimeout = toTimeSpec ( 60 :: Timeout 'Minutes)
defSweepTimeout :: Timeout 'Seconds defSweepTimeout :: Timeout 'Seconds
defSweepTimeout = 5 -- FIXME: only for debug! defSweepTimeout = 5 -- FIXME: only for debug!

View File

@ -113,3 +113,28 @@ executable test-peer-run
hs-source-dirs: test hs-source-dirs: test
main-is: Peer2Main.hs main-is: Peer2Main.hs
build-depends:
base ^>=4.15.1.0, hbs2-core, hbs2-storage-simple
, async
, bytestring
, cache
, containers
, directory
, filepath
, hashable
, microlens-platform
, mtl
, prettyprinter
, QuickCheck
, random
, safe
, serialise
, stm
, streaming
, tasty
, tasty-hunit
, transformers
, uniplate
, vector
, data-default
, mwc-random

View File

@ -51,7 +51,6 @@ import System.FilePath.Posix
import System.IO import System.IO
import System.Random.MWC import System.Random.MWC
import System.Random.Stateful
import qualified Data.Vector.Unboxed as U import qualified Data.Vector.Unboxed as U
debug :: (MonadIO m) => Doc ann -> m () debug :: (MonadIO m) => Doc ann -> m ()
@ -92,10 +91,10 @@ instance HasProtocol Fake (BlockInfo Fake) where
-- FIXME: 3 is for debug only! -- FIXME: 3 is for debug only!
instance Expires (EventKey Fake (BlockInfo Fake)) where instance Expires (EventKey Fake (BlockInfo Fake)) where
expiresIn _ = Just 3 expiresIn _ = Just 600
instance Expires (EventKey Fake (BlockChunks Fake)) where instance Expires (EventKey Fake (BlockChunks Fake)) where
expiresIn _ = Just 10 expiresIn _ = Just 600
instance Expires (EventKey Fake (BlockAnnounce Fake)) where instance Expires (EventKey Fake (BlockAnnounce Fake)) where
expiresIn _ = Nothing expiresIn _ = Nothing
@ -152,7 +151,7 @@ runTestPeer p zu = do
cww <- newChunkWriterIO stor (Just chDir) cww <- newChunkWriterIO stor (Just chDir)
sw <- liftIO $ replicateM 4 $ async $ simpleStorageWorker stor sw <- liftIO $ replicateM 4 $ async $ simpleStorageWorker stor
cw <- liftIO $ replicateM 32 $ async $ runChunkWriter cww cw <- liftIO $ replicateM 4 $ async $ runChunkWriter cww
zu stor cww zu stor cww
@ -206,13 +205,9 @@ blockDownloadLoop cw = do
stor <- getStorage stor <- getStorage
let blks = []
-- let blks = [ "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt" let blks = [ "GTtQp6QjK7G9Sh5Aq4koGSpMX398WRWn3DV28NUAYARg"
-- , "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ" ]
-- , "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ"
-- , "GTtQp6QjK7G9Sh5Aq4koGSpMX398WRWn3DV28NUAYARg"
-- ]
blq <- liftIO $ Q.newTBQueueIO defBlockDownloadQ blq <- liftIO $ Q.newTBQueueIO defBlockDownloadQ
for_ blks $ \b -> liftIO $ atomically $ Q.writeTBQueue blq (DownloadTask b Nothing) for_ blks $ \b -> liftIO $ atomically $ Q.writeTBQueue blq (DownloadTask b Nothing)
@ -239,15 +234,18 @@ blockDownloadLoop cw = do
else do else do
case job of case job of
DownloadTask hx (Just (p,s)) -> do DownloadTask hx (Just (p,s)) -> do
initDownload False blq p hx s initDownload True blq p hx s
DownloadTask h Nothing -> do DownloadTask h Nothing -> do
peers <- getPeerLocator @e >>= knownPeers @e peers <- getPeerLocator @e >>= knownPeers @e
for_ peers $ \peer -> do for_ peers $ \peer -> do
subscribe @e (BlockSizeEventKey h) $ \(BlockSizeEvent (p,hx,s)) -> do subscribe @e (BlockSizeEventKey h) $ \(BlockSizeEvent (p,hx,s)) -> do
debug $ "got block size for" <+> pretty h
liftIO $ atomically $ Q.writeTBQueue blq (DownloadTask hx (Just (p,s))) liftIO $ atomically $ Q.writeTBQueue blq (DownloadTask hx (Just (p,s)))
debug $ "requesting size for" <+> pretty h
request @e peer (GetBlockSize @e h) request @e peer (GetBlockSize @e h)
next next
@ -405,7 +403,7 @@ main :: IO ()
main = do main = do
hSetBuffering stderr LineBuffering hSetBuffering stderr LineBuffering
void $ race (pause (30 :: Timeout 'Seconds)) $ do void $ race (pause (600 :: Timeout 'Seconds)) $ do
fake <- newFakeP2P True <&> Fabriq fake <- newFakeP2P True <&> Fabriq
@ -415,7 +413,7 @@ main = do
others <- forM ps $ \p -> async $ runTestPeer p $ \s cw -> do others <- forM ps $ \p -> async $ runTestPeer p $ \s cw -> do
let findBlk = hasBlock s let findBlk = hasBlock s
let size = 1024*1024*40 let size = 1024*1024*1
g <- initialize $ U.fromList [fromIntegral p, fromIntegral size] g <- initialize $ U.fromList [fromIntegral p, fromIntegral size]
bytes <- replicateM size $ uniformM g :: IO [Char] bytes <- replicateM size $ uniformM g :: IO [Char]
@ -465,7 +463,7 @@ main = do
liftIO $ cancel as liftIO $ cancel as
pause ( 29.9 :: Timeout 'Seconds ) pause ( 599.9 :: Timeout 'Seconds )
mapM_ cancel (our:others) mapM_ cancel (our:others)