This commit is contained in:
Dmitry Zuikov 2023-01-24 14:37:10 +03:00
parent 3d03d1ce0f
commit aff86cf2b3
6 changed files with 51 additions and 61 deletions

View File

@ -319,8 +319,6 @@ flush w fn = do
let scache = semFlush w let scache = semFlush w
liftIO $ do liftIO $ do
print "flush"
q <- Cache.fetchWithCache cache fn $ const Q0.newTQueueIO q <- Cache.fetchWithCache cache fn $ const Q0.newTQueueIO
s <- Cache.fetchWithCache scache fn $ const (atomically $ Sem.newTSem 100) s <- Cache.fetchWithCache scache fn $ const (atomically $ Sem.newTSem 100)

View File

@ -303,7 +303,7 @@ runPeerM s bus p f = do
<*> liftIO (newTVarIO mempty) <*> liftIO (newTVarIO mempty)
let de = view envDeferred env let de = view envDeferred env
as <- liftIO $ replicateM 2 $ async $ runPipeline de as <- liftIO $ replicateM 8 $ async $ runPipeline de
sw <- liftIO $ async $ forever $ withPeerM env $ do sw <- liftIO $ async $ forever $ withPeerM env $ do
pause defSweepTimeout pause defSweepTimeout

View File

@ -35,6 +35,13 @@ defCookieTimeout = toTimeSpec ( 60 :: Timeout 'Minutes)
defBlockInfoTimeout :: TimeSpec defBlockInfoTimeout :: TimeSpec
defBlockInfoTimeout = toTimeSpec ( 60 :: Timeout 'Minutes) defBlockInfoTimeout = toTimeSpec ( 60 :: Timeout 'Minutes)
-- how much time wait for block from peer?
defBlockWaitMax :: Timeout 'Seconds
defBlockWaitMax = 10 :: Timeout 'Seconds
defBlockWaitSleep :: Timeout 'Seconds
defBlockWaitSleep = 0.01 :: Timeout 'Seconds
defSweepTimeout :: Timeout 'Seconds defSweepTimeout :: Timeout 'Seconds
defSweepTimeout = 5 -- FIXME: only for debug! defSweepTimeout = 5 -- FIXME: only for debug!

View File

@ -86,8 +86,6 @@ blockChunksProto adapter (BlockChunks c p) =
case p of case p of
BlockGetAllChunks h size -> deferred proto do BlockGetAllChunks h size -> deferred proto do
liftIO $ print $ "BlockGetAllChunks" <+> pretty h
me <- ownPeer @e me <- ownPeer @e
who <- thatPeer proto who <- thatPeer proto

View File

@ -1,4 +0,0 @@
cradle:
cabal:
- path: "test/Peer2Main.hs"
component: "hbs2-tests:exe:test-peer-run"

View File

@ -49,6 +49,7 @@ import System.Directory
import System.Exit import System.Exit
import System.FilePath.Posix import System.FilePath.Posix
import System.IO import System.IO
import Safe
import System.Random.MWC import System.Random.MWC
import qualified Data.Vector.Unboxed as U import qualified Data.Vector.Unboxed as U
@ -65,13 +66,16 @@ data BlockDownload =
, _sBlockChunkSize :: ChunkSize , _sBlockChunkSize :: ChunkSize
, _sBlockOffset :: Offset , _sBlockOffset :: Offset
, _sBlockWritten :: Size , _sBlockWritten :: Size
, _sBlockWrittenT :: TVar Size
} }
deriving stock (Typeable) deriving stock (Typeable)
makeLenses 'BlockDownload makeLenses 'BlockDownload
newBlockDownload :: Hash HbSync -> BlockDownload newBlockDownload :: MonadIO m => Hash HbSync -> m BlockDownload
newBlockDownload h = BlockDownload h 0 0 0 0 newBlockDownload h = do
t <- liftIO $ newTVarIO 0
pure $ BlockDownload h 0 0 0 0 t
instance HasPeer Fake where instance HasPeer Fake where
newtype instance Peer Fake = FakePeer Word8 newtype instance Peer Fake = FakePeer Word8
@ -243,19 +247,20 @@ blockDownloadLoop cw = do
for_ peers $ \peer -> do for_ peers $ \peer -> do
subscribe @e (BlockSizeEventKey h) $ \(BlockSizeEvent (p,hx,s)) -> do subscribe @e (BlockSizeEventKey h) $ \(BlockSizeEvent (p,hx,s)) -> do
debug $ "got block size for" <+> pretty h
liftIO $ atomically $ Q.writeTBQueue blq (DownloadTask hx (Just (p,s))) liftIO $ atomically $ Q.writeTBQueue blq (DownloadTask hx (Just (p,s)))
debug $ "requesting size for" <+> pretty h -- debug $ "requesting size for" <+> pretty h
request @e peer (GetBlockSize @e h) request @e peer (GetBlockSize @e h)
next next
where where
initDownload anyway q p h s = do initDownload anyway q p h thisBkSize = do
debug $ "initDownload" <+> pretty h <+> pretty p <+> pretty s env <- ask
-- debug $ "initDownload" <+> pretty h <+> pretty p <+> pretty thisBkSize
sto <- getStorage sto <- getStorage
here <- liftIO $ hasBlock sto h <&> isJust here <- liftIO $ hasBlock sto h <&> isJust
@ -265,15 +270,35 @@ blockDownloadLoop cw = do
coo <- genCookie (p,h) coo <- genCookie (p,h)
let key = DownloadSessionKey (p, coo) let key = DownloadSessionKey (p, coo)
let chusz = defChunkSize let chusz = defChunkSize
dnwld <- newBlockDownload h
let new = set sBlockChunkSize chusz let new = set sBlockChunkSize chusz
. set sBlockSize (fromIntegral s) . set sBlockSize (fromIntegral thisBkSize)
$ newBlockDownload h $ dnwld
update @e new key id update @e new key id
subscribe @e (BlockChunksEventKey h) $ \(BlockReady _) -> do subscribe @e (BlockChunksEventKey h) $ \(BlockReady _) -> do
processBlock q h processBlock q h
-- liftIO $ async $ do
-- void $ race (pause defBlockWaitMax) $ withPeerM env $ fix \next -> do
-- pause defBlockWaitSleep
-- wl <- find key (view sBlockWrittenL)
-- let w = sum (fromMaybe [] wl)
-- debug $ "WTF?" <+> pretty (w, thisBkSize)
-- -- maybe1 w (pure ()) $ \z -> do
-- if fromIntegral w >= thisBkSize then do
-- debug "THE BLOCK IS ABOUT TO BE READY"
-- -- write to disk and so on
-- else do
-- pause defBlockWaitSleep
-- next
-- FIXME: block is not downloaded, return it to the Q
request @e p (BlockChunks @e coo (BlockGetAllChunks @e h chusz)) -- FIXME: nicer construction request @e p (BlockChunks @e coo (BlockGetAllChunks @e h chusz)) -- FIXME: nicer construction
| anyway -> processBlock q h | anyway -> processBlock q h
@ -385,48 +410,14 @@ mkAdapter cww = do
. over sBlockWritten (+ bslen) . over sBlockWritten (+ bslen)
) )
wrt <- MaybeT $ find cKey (view sBlockWrittenT)
-- debug $ "gotShit" <+> pretty (B8.length bs) <+> pretty (writtenLast) <+> pretty (wr liftIO $ atomically $ modifyTVar wrt (+bslen)
-- debug $ "writtenLast" <+> pretty writtenLast
-- теперь, что бы нас дидосить -- можно wrActually <- liftIO $ readTVarIO wrt
-- после 2/3 пакетов посылать пакеты из этих двух третей
-- или вообще мусор, а мы тут будем считать хэши.
--
-- кстати, если посылать мусор - то мы еще и на диск
-- его будем бесконечно писать.
--
-- допустим, пришло 4 события. в обработчике каждого ---
-- мы не знаем, что там еще 3 события.
--
-- таким образом, ни про какое событие нельзя понять,
-- что оно последнее, если у нас 4 потока.
--
-- как же нам узнать, что пришёл последний блок и можно
-- его коммитить?
--
--
-- вариант N1. повесить "добивающий монитор".
-- Здесь только апдейтить счётчики, а вот монитор будет уже
-- смотреть что как и коммитить.
--
-- Монитор может быть протухающим.
-- Как это сделать?
-- this is updating concurrently, let mbDone = wrActually >= mbSize
-- so collect last data from all posible threads -- && (maxOffLast + fromIntegral mbChSize) > fromIntegral mbSize
writtenLast <- MaybeT $ find cKey (view sBlockWritten)
maxOffLast <- MaybeT $ find cKey (view sBlockOffset)
let mbDone = (maxOffLast + fromIntegral mbChSize) > fromIntegral mbSize
&& writtenLast >= ( (mbSize * 2) `div` 3 )
debug $ "blkAcceptChunk" <+> pretty (p,c)
<+> pretty maxOffLast
-- <+> pretty n
-- <+> pretty (B8.length bs)
-- <+> pretty
-- debug $ "written:" <+> pretty written <+> "/" <+> pretty mbSize
when mbDone $ lift do when mbDone $ lift do
@ -469,7 +460,7 @@ main = do
let findBlk = hasBlock s let findBlk = hasBlock s
-- let size = 1024*1024*1 -- let size = 1024*1024*1
let size = 256*1024 let size = 1024*1024*10
g <- initialize $ U.fromList [fromIntegral p, fromIntegral size] g <- initialize $ U.fromList [fromIntegral p, fromIntegral size]
bytes <- replicateM size $ uniformM g :: IO [Char] bytes <- replicateM size $ uniformM g :: IO [Char]
@ -502,10 +493,10 @@ main = do
our <- async $ runTestPeer p0 $ \s cw -> do our <- async $ runTestPeer p0 $ \s cw -> do
let blk = hasBlock s let blk = hasBlock s
void $ async $ forever $ do -- void $ async $ forever $ do
pause ( 1 :: Timeout 'Seconds ) -- pause ( 1 :: Timeout 'Seconds )
wip <- blocksInProcess cw -- wip <- blocksInProcess cw
debug $ "blocks wip:" <+> pretty wip -- debug $ "blocks wip:" <+> pretty wip
runPeerM (AnyStorage s) fake p0 $ do runPeerM (AnyStorage s) fake p0 $ do
adapter <- mkAdapter cw adapter <- mkAdapter cw