This commit is contained in:
Dmitry Zuikov 2023-01-24 11:01:34 +03:00
parent eb452c06f5
commit 3f6e483299
3 changed files with 54 additions and 41 deletions

View File

@ -222,12 +222,12 @@ instance ( HasProtocol e p
liftIO $ atomically $ modifyTVar' ev (HashMap.insertWith (<>) sk [dyn]) liftIO $ atomically $ modifyTVar' ev (HashMap.insertWith (<>) sk [dyn])
-- FIXME: add a sweeping routine or else everything will be fucked! -- FIXME: add a sweeping routine or else everything will be fucked!
addSweeper (expiresIn (Proxy @(EventKey e p))) sk $ do addSweeper (expiresIn (Proxy @(EventKey e p))) sk $ do
liftIO $ print $ "sweep smth with key" <+> pretty (hash sk) -- liftIO $ print $ "sweep smth with key" <+> pretty (hash sk)
liftIO $ atomically $ modifyTVar' ev (HashMap.delete sk) liftIO $ atomically $ modifyTVar' ev (HashMap.delete sk)
addSweeper :: forall e . Maybe (Timeout 'Seconds) -> SKey -> PeerM e IO () -> PeerM e IO () addSweeper :: forall e . Maybe (Timeout 'Seconds) -> SKey -> PeerM e IO () -> PeerM e IO ()
addSweeper t k sweeper = do addSweeper t k sweeper = do
liftIO $ print $ "adding sweeper for key" <+> pretty (hash k) -- liftIO $ print $ "adding sweeper for key" <+> pretty (hash k)
ex <- asks (view envExpireTimes) ex <- asks (view envExpireTimes)
sw <- asks (view envSweepers) sw <- asks (view envSweepers)
liftIO $ Cache.insert' ex (toTimeSpec <$> t) k () liftIO $ Cache.insert' ex (toTimeSpec <$> t) k ()
@ -238,6 +238,8 @@ sweep = do
ex <- asks (view envExpireTimes) ex <- asks (view envExpireTimes)
sw <- asks (view envSweepers) sw <- asks (view envSweepers)
liftIO $ print "sweep"
liftIO $ Cache.purgeExpired ex liftIO $ Cache.purgeExpired ex
toSweep <- HashMap.toList <$> liftIO (readTVarIO sw) toSweep <- HashMap.toList <$> liftIO (readTVarIO sw)

View File

@ -30,13 +30,13 @@ defProtoPipelineSize :: Int
defProtoPipelineSize = 65536*4 defProtoPipelineSize = 65536*4
defCookieTimeout :: TimeSpec defCookieTimeout :: TimeSpec
defCookieTimeout = toTimeSpec ( 10 :: Timeout 'Minutes) defCookieTimeout = toTimeSpec ( 1 :: Timeout 'Minutes)
defBlockInfoTimeout :: TimeSpec defBlockInfoTimeout :: TimeSpec
defBlockInfoTimeout = toTimeSpec ( 10 :: Timeout 'Minutes) defBlockInfoTimeout = toTimeSpec ( 1 :: Timeout 'Minutes)
defSweepTimeout :: Timeout 'Seconds defSweepTimeout :: Timeout 'Seconds
defSweepTimeout = 600 -- FIXME: only for debug! defSweepTimeout = 5 -- FIXME: only for debug!

View File

@ -178,10 +178,16 @@ handleBlockInfo (p, h, sz') = do
let bsz = fromIntegral sz let bsz = fromIntegral sz
update @e def (BlockSizeKey h) (over bsBlockSizes (Map.insert p bsz)) update @e def (BlockSizeKey h) (over bsBlockSizes (Map.insert p bsz))
data DownloadTask e = DownloadTask (Hash HbSync) (Maybe (Peer e, Integer))
blockDownloadLoop :: forall e m . ( m ~ PeerM e IO blockDownloadLoop :: forall e m . ( m ~ PeerM e IO
-- , e ~ Fake
, Serialise (Encoded e)
, MonadIO m , MonadIO m
, Request e (BlockInfo e) m , Request e (BlockInfo e) m
, Request e (BlockChunks e) m , Request e (BlockAnnounce e) m
, HasProtocol e (BlockInfo e)
, HasProtocol e (BlockAnnounce e)
, EventListener e (BlockInfo e) m , EventListener e (BlockInfo e) m
, EventListener e (BlockChunks e) m , EventListener e (BlockChunks e) m
, EventListener e (BlockAnnounce e) m , EventListener e (BlockAnnounce e) m
@ -192,9 +198,11 @@ blockDownloadLoop :: forall e m . ( m ~ PeerM e IO
, HasStorage m , HasStorage m
, Num (Peer e) , Num (Peer e)
, Pretty (Peer e) , Pretty (Peer e)
, Block ByteString ~ ByteString
-- , Key HbSync ~ Hash HbSync -- , Key HbSync ~ Hash HbSync
) => m () )
blockDownloadLoop = do => ChunkWriter HbSync IO -> m ()
blockDownloadLoop cw = do
stor <- getStorage stor <- getStorage
@ -207,7 +215,7 @@ blockDownloadLoop = do
-- ] -- ]
blq <- liftIO $ Q.newTBQueueIO defBlockDownloadQ blq <- liftIO $ Q.newTBQueueIO defBlockDownloadQ
for_ blks $ \b -> liftIO $ atomically $ Q.writeTBQueue blq b for_ blks $ \b -> liftIO $ atomically $ Q.writeTBQueue blq (DownloadTask b Nothing)
subscribe @e BlockAnnounceInfoKey $ \(BlockAnnounceEvent p ann) -> do subscribe @e BlockAnnounceInfoKey $ \(BlockAnnounceEvent p ann) -> do
let h = view biHash ann let h = view biHash ann
@ -217,27 +225,30 @@ blockDownloadLoop = do
<+> pretty h <+> pretty h
<+> pretty (view biSize ann) <+> pretty (view biSize ann)
initDownload False blq p h s -- FIXME: don't trust everybody liftIO $ atomically $ Q.writeTBQueue blq (DownloadTask h (Just (p,s)))
fix \next -> do fix \next -> do
h <- liftIO $ atomically $ Q.readTBQueue blq job <- liftIO $ atomically $ Q.readTBQueue blq
wip <- liftIO $ blocksInProcess cw
here <- liftIO $ hasBlock stor h <&> isJust debug $ "WIP:" <+> pretty wip
if not here then do if wip > 10 then do
pause ( 1 :: Timeout 'Seconds )
subscribe @e (BlockSizeEventKey h) $ \(BlockSizeEvent (p,hx,s)) -> do else do
initDownload True blq p hx s case job of
DownloadTask hx (Just (p,s)) -> do
initDownload False blq p hx s
DownloadTask h Nothing -> do
peers <- getPeerLocator @e >>= knownPeers @e peers <- getPeerLocator @e >>= knownPeers @e
for_ peers $ \p -> do for_ peers $ \peer -> do
debug $ "requesting block" <+> pretty h <+> "from" <+> pretty p subscribe @e (BlockSizeEventKey h) $ \(BlockSizeEvent (p,hx,s)) -> do
request p (GetBlockSize @e h) liftIO $ atomically $ Q.writeTBQueue blq (DownloadTask hx (Just (p,s)))
else do request @e peer (GetBlockSize @e h)
processBlock blq h
next next
@ -261,7 +272,7 @@ blockDownloadLoop = do
subscribe @e (BlockChunksEventKey h) $ \(BlockReady _) -> do subscribe @e (BlockChunksEventKey h) $ \(BlockReady _) -> do
processBlock q h processBlock q h
request p (BlockChunks coo (BlockGetAllChunks @e h chusz)) -- FIXME: nicer construction request @e p (BlockChunks @e coo (BlockGetAllChunks @e h chusz)) -- FIXME: nicer construction
| anyway -> processBlock q h | anyway -> processBlock q h
@ -302,7 +313,7 @@ blockDownloadLoop = do
else do else do
-- if block is missed, then -- if block is missed, then
-- block to download q -- block to download q
atomically $ Q.writeTBQueue q blk liftIO $ atomically $ Q.writeTBQueue q (DownloadTask blk Nothing)
Just (Blob{}) -> do Just (Blob{}) -> do
pure () pure ()
@ -404,7 +415,7 @@ main = do
others <- forM ps $ \p -> async $ runTestPeer p $ \s cw -> do others <- forM ps $ \p -> async $ runTestPeer p $ \s cw -> do
let findBlk = hasBlock s let findBlk = hasBlock s
let size = 1024*1024*10 let size = 1024*1024*40
g <- initialize $ U.fromList [fromIntegral p, fromIntegral size] g <- initialize $ U.fromList [fromIntegral p, fromIntegral size]
bytes <- replicateM size $ uniformM g :: IO [Char] bytes <- replicateM size $ uniformM g :: IO [Char]
@ -444,7 +455,7 @@ main = do
addPeers @Fake pl ps addPeers @Fake pl ps
as <- liftIO $ async $ withPeerM env blockDownloadLoop as <- liftIO $ async $ withPeerM env (blockDownloadLoop cw)
runProto @Fake runProto @Fake
[ makeResponse (blockSizeProto blk handleBlockInfo) [ makeResponse (blockSizeProto blk handleBlockInfo)