This commit is contained in:
Dmitry Zuikov 2023-01-24 13:15:32 +03:00
parent 6a4503e4a3
commit 07937ce32b
5 changed files with 84 additions and 23 deletions

View File

@ -18,6 +18,7 @@ import Control.Concurrent.Async
import Data.Function import Data.Function
import Data.Functor import Data.Functor
import Data.Kind import Data.Kind
import Control.Concurrent
data Pipeline m a = data Pipeline m a =
Pipeline Pipeline
@ -37,7 +38,7 @@ runPipeline pip = fix \next -> do
case mbJob of case mbJob of
Nothing -> pure () Nothing -> pure ()
Just job -> void job >> next Just job -> void (liftIO yield >> job) >> next
stopPipeline :: MonadIO m => Pipeline m a -> m () stopPipeline :: MonadIO m => Pipeline m a -> m ()
stopPipeline pip = liftIO $ do stopPipeline pip = liftIO $ do

View File

@ -109,10 +109,12 @@ runChunkWriter2 w = do
-- yield -- yield
-- next -- next
-- liftIO $ print "runChunkWriter2"
stop <- liftIO $ readTVarIO (stopped w) stop <- liftIO $ readTVarIO (stopped w)
if stop then do if stop then do
ks <- liftIO $ take 100 <$> Cache.keys cache ks <- liftIO $ Cache.keys cache
liftIO $ for_ ks $ \k -> flush w k liftIO $ for_ ks $ \k -> flush w k
else do else do
ks <- liftIO $ Cache.keys cache ks <- liftIO $ Cache.keys cache
@ -120,7 +122,8 @@ runChunkWriter2 w = do
amount <- for ks $ \k -> flush w k amount <- for ks $ \k -> flush w k
if (sum amount == 0) then do if (sum amount == 0) then do
pause ( 0.5 :: Timeout 'Seconds ) pure ()
-- pause ( 0.5 :: Timeout 'Seconds )
else do else do
liftIO $ print ("flushed:" <+> pretty (sum amount)) liftIO $ print ("flushed:" <+> pretty (sum amount))
@ -315,10 +318,13 @@ flush w fn = do
let cache = perBlock w let cache = perBlock w
let scache = semFlush w let scache = semFlush w
liftIO $ do liftIO $ do
q <- Cache.fetchWithCache cache fn $ const Q0.newTQueueIO
s <- Cache.fetchWithCache scache fn $ const (atomically $ Sem.newTSem 2)
atomically $ Sem.waitTSem s print "flush"
q <- Cache.fetchWithCache cache fn $ const Q0.newTQueueIO
s <- Cache.fetchWithCache scache fn $ const (atomically $ Sem.newTSem 100)
-- atomically $ Sem.waitTSem s
Cache.delete cache fn Cache.delete cache fn
@ -330,7 +336,7 @@ flush w fn = do
withFile fn ReadWriteMode $ \fh -> do withFile fn ReadWriteMode $ \fh -> do
for_ flushed $ \f -> f fh for_ flushed $ \f -> f fh
atomically $ Sem.signalTSem s -- atomically $ Sem.signalTSem s
pure (length flushed) pure (length flushed)

View File

@ -303,7 +303,7 @@ runPeerM s bus p f = do
<*> liftIO (newTVarIO mempty) <*> liftIO (newTVarIO mempty)
let de = view envDeferred env let de = view envDeferred env
as <- liftIO $ replicateM 1 $ async $ runPipeline de as <- liftIO $ replicateM 2 $ async $ runPipeline de
sw <- liftIO $ async $ forever $ withPeerM env $ do sw <- liftIO $ async $ forever $ withPeerM env $ do
pause defSweepTimeout pause defSweepTimeout
@ -313,7 +313,7 @@ runPeerM s bus p f = do
void $ runReaderT (fromPeerM f) env void $ runReaderT (fromPeerM f) env
void $ liftIO $ stopPipeline de void $ liftIO $ stopPipeline de
liftIO $ mapM_ cancel as liftIO $ mapM_ cancel (as <> [sw])
withPeerM :: MonadIO m => PeerEnv e -> PeerM e m a -> m () withPeerM :: MonadIO m => PeerEnv e -> PeerM e m a -> m ()
withPeerM env action = void $ runReaderT (fromPeerM action) env withPeerM env action = void $ runReaderT (fromPeerM action) env
@ -378,7 +378,6 @@ instance ( HasProtocol e p
sendTo fab (To who) (From self) bs sendTo fab (To who) (From self) bs
instance ( MonadIO m instance ( MonadIO m
, HasProtocol e p , HasProtocol e p
, Sessions e p m , Sessions e p m

View File

@ -86,6 +86,8 @@ blockChunksProto adapter (BlockChunks c p) =
case p of case p of
BlockGetAllChunks h size -> deferred proto do BlockGetAllChunks h size -> deferred proto do
liftIO $ print $ "BlockGetAllChunks" <+> pretty h
me <- ownPeer @e me <- ownPeer @e
who <- thatPeer proto who <- thatPeer proto

View File

@ -151,7 +151,7 @@ runTestPeer p zu = do
cww <- newChunkWriterIO stor (Just chDir) cww <- newChunkWriterIO stor (Just chDir)
sw <- liftIO $ replicateM 4 $ async $ simpleStorageWorker stor sw <- liftIO $ replicateM 4 $ async $ simpleStorageWorker stor
cw <- liftIO $ replicateM 4 $ async $ runChunkWriter cww cw <- liftIO $ replicateM 8 $ async $ runChunkWriter cww
zu stor cww zu stor cww
@ -230,7 +230,7 @@ blockDownloadLoop cw = do
wip <- liftIO $ blocksInProcess cw wip <- liftIO $ blocksInProcess cw
if wip > 10 then do if wip > 200 then do
pause ( 1 :: Timeout 'Seconds ) pause ( 1 :: Timeout 'Seconds )
else do else do
case job of case job of
@ -254,6 +254,9 @@ blockDownloadLoop cw = do
where where
initDownload anyway q p h s = do initDownload anyway q p h s = do
debug $ "initDownload" <+> pretty h <+> pretty p <+> pretty s
sto <- getStorage sto <- getStorage
here <- liftIO $ hasBlock sto h <&> isJust here <- liftIO $ hasBlock sto h <&> isJust
@ -352,12 +355,23 @@ mkAdapter cww = do
-- check if there is a session -- check if there is a session
-- FIXME: -- FIXME:
-- TODO: log situation when no session -- TODO: log situation when no session
ddd <- lift $ find cKey id
when (isNothing ddd) $ do
debug "SESSION NOT FOUND!"
dwnld <- MaybeT $ find cKey id dwnld <- MaybeT $ find cKey id
-- dwnld <- maybe1 dwnld' (debug "AAAA") $ pure
-- debug "session found!"
let bslen = fromIntegral $ B8.length bs let bslen = fromIntegral $ B8.length bs
let mbSize = view sBlockSize dwnld
let mbChSize = view sBlockChunkSize dwnld let mbChSize = view sBlockChunkSize dwnld
let mbSize = view sBlockSize dwnld
let offset0 = fromIntegral n * fromIntegral mbChSize :: Offset let offset0 = fromIntegral n * fromIntegral mbChSize :: Offset
@ -367,12 +381,47 @@ mkAdapter cww = do
let written = view sBlockWritten dwnld + bslen let written = view sBlockWritten dwnld + bslen
let maxOff = max offset0 (view sBlockOffset dwnld) let maxOff = max offset0 (view sBlockOffset dwnld)
lift $ update dwnld cKey ( set sBlockOffset maxOff lift $ update dwnld cKey ( over sBlockOffset (max maxOff)
. set sBlockWritten written . over sBlockWritten (+ bslen)
) )
let mbDone = (maxOff + fromIntegral mbChSize) > fromIntegral mbSize -- this is updating concurrently,
&& written >= mbSize -- so collect last data from all posible threads
writtenLast <- MaybeT $ find cKey (view sBlockWritten)
maxOffLast <- MaybeT $ find cKey (view sBlockOffset)
-- debug $ "gotShit" <+> pretty (B8.length bs) <+> pretty (writtenLast) <+> pretty (wr
-- debug $ "writtenLast" <+> pretty writtenLast
-- теперь, что бы нас дидосить -- можно
-- после 2/3 пакетов посылать пакеты из этих двух третей
-- или вообще мусор, а мы тут будем считать хэши.
--
-- кстати, если посылать мусор - то мы еще и на диск
-- его будем бесконечно писать.
--
-- допустим, пришло 4 события. в обработчике каждого ---
-- мы не знаем, что там еще 3 события.
--
-- таким образом, ни про какое событие нельзя понять,
-- что оно последнее
--
-- как же нам узнать, что пришёл последний блок и можно
-- его коммитить?
--
let mbDone = (maxOffLast + fromIntegral mbChSize) > fromIntegral mbSize
&& writtenLast >= ( (mbSize * 2) `div` 3 )
debug $ "blkAcceptChunk" <+> pretty (p,c)
<+> pretty maxOffLast
-- <+> pretty n
-- <+> pretty (B8.length bs)
-- <+> pretty
-- debug $ "written:" <+> pretty written <+> "/" <+> pretty mbSize
when mbDone $ lift do when mbDone $ lift do
@ -414,7 +463,8 @@ main = do
others <- forM ps $ \p -> async $ runTestPeer p $ \s cw -> do others <- forM ps $ \p -> async $ runTestPeer p $ \s cw -> do
let findBlk = hasBlock s let findBlk = hasBlock s
let size = 1024*1024*1 -- let size = 1024*1024*1
let size = 256*1024
g <- initialize $ U.fromList [fromIntegral p, fromIntegral size] g <- initialize $ U.fromList [fromIntegral p, fromIntegral size]
bytes <- replicateM size $ uniformM g :: IO [Char] bytes <- replicateM size $ uniformM g :: IO [Char]
@ -462,11 +512,14 @@ main = do
as <- liftIO $ async $ withPeerM env (blockDownloadLoop cw) as <- liftIO $ async $ withPeerM env (blockDownloadLoop cw)
runProto @Fake me <- liftIO $ replicateM 1 $ async $ liftIO $ withPeerM env $ do
[ makeResponse (blockSizeProto blk handleBlockInfo) runProto @Fake
, makeResponse (blockChunksProto adapter) [ makeResponse (blockSizeProto blk handleBlockInfo)
, makeResponse blockAnnounceProto , makeResponse (blockChunksProto adapter)
] , makeResponse blockAnnounceProto
]
liftIO $ mapM_ wait me
liftIO $ cancel as liftIO $ cancel as