This commit is contained in:
Dmitry Zuikov 2023-01-19 09:42:15 +03:00
parent a58193eb96
commit adcfbf5be2
1 changed files with 51 additions and 26 deletions

View File

@ -40,6 +40,9 @@ import Data.Map (Map)
import Data.Map qualified as Map
import Control.Monad.Trans.Maybe
import Control.Concurrent.STM
import Control.Concurrent.STM.TQueue qualified as Q
debug :: (MonadIO m) => Doc ann -> m ()
debug p = liftIO $ hPrint stderr p
@ -57,27 +60,33 @@ newtype ChunkNum = ChunkNum Word16
-- therefore, key is ( p | cookie )
-- but client's cookie in protocol should be just ( cookie :: Word32 )
data BlockDownload =
type OnBlockReady h m = Hash h -> m ()
data BlockDownload m =
BlockDownload
{ _sBlockHash :: Hash HbSync
, _sBlockChunkSize :: ChunkSize
, _sBlockOffset :: Offset
, _sBlockWritten :: Size
, _sOnBlockReady :: OnBlockReady HbSync m
}
data Sessions e =
data Sessions e m =
Sessions
{ _sBlockDownload :: Cache (Peer e, Cookie e) BlockDownload
{ _sBlockDownload :: Cache (Peer e, Cookie e) (BlockDownload m)
, _sBlockSizes :: Cache (Hash HbSync) (Map (Peer e) Size)
, _sBlockSize :: Cache (Hash HbSync) Size
}
makeLenses 'Sessions
makeLenses 'BlockDownload
newBlockDownload :: Hash HbSync -> BlockDownload
newBlockDownload :: forall m . MonadIO m
=> Hash HbSync
-> OnBlockReady HbSync m
-> BlockDownload m
newBlockDownload h = BlockDownload h 0 0 0
type GetBlockChunk h m = Hash h -> Offset -> Size -> m (Maybe ByteString)
@ -191,7 +200,7 @@ main = do
-- ]
emptySessions :: forall e m . MonadIO m => m (Sessions e)
emptySessions :: forall e m . MonadIO m => m (Sessions e m)
emptySessions = liftIO $
Sessions <$> Cache.newCache (Just defCookieTimeout)
<*> Cache.newCache (Just defBlockInfoTimeout)
@ -225,7 +234,7 @@ delSession se l k = liftIO do
expireSession se l = liftIO do
Cache.purgeExpired (view l se)
runFakePeer :: forall e . e ~ Fake => Sessions e -> EngineEnv e -> IO ()
runFakePeer :: forall e m . (e ~ Fake, m ~ IO) => Sessions e m -> EngineEnv e -> m ()
runFakePeer se env = do
let pid = fromIntegral (hash (env ^. self)) :: Word8
@ -281,9 +290,13 @@ runFakePeer se env = do
-- УДАЛЯЕМ КУКУ?
, blkAcceptChunk = \(c,p,h,n,bs) -> void $ runMaybeT $ do
let def = newBlockDownload h
let cKey = (p,c)
-- check if there is a session
void $ MaybeT $ getSession' se sBlockDownload cKey id
let def = newBlockDownload h dontHandle
let bslen = fromIntegral $ B8.length bs
-- TODO: log this situation
mbSize <- MaybeT $ getSession' se sBlockSizes h (Map.lookup p) <&> fromMaybe Nothing
@ -297,8 +310,11 @@ runFakePeer se env = do
writeChunk cww cKey h offset bs
updSession se def sBlockDownload cKey (over sBlockWritten (+bslen))
maxOff <- MaybeT $ getSession' se sBlockDownload cKey (view sBlockOffset)
written <- MaybeT $ getSession' se sBlockDownload cKey (view sBlockWritten)
dwnld <- MaybeT $ getSession' se sBlockDownload cKey id
let maxOff = view sBlockOffset dwnld
let written = view sBlockWritten dwnld
let notify = view sOnBlockReady dwnld
let mbDone = (maxOff + fromIntegral mbChSize) > fromIntegral mbSize
&& written >= mbSize
@ -311,18 +327,16 @@ runFakePeer se env = do
-- ЕСЛИ СОШЁЛСЯ - ФИНАЛИЗИРОВАТЬ БЛОК
-- ЕСЛИ НЕ СОШЁЛСЯ - ТО ПОДОЖДАТЬ ЕЩЕ
when ( h1 == h ) $ do
debug $ "THIS BLOCK IS DEFINITELY DONE" <+> pretty h1
liftIO $ commitBlock cww cKey h
lift $ commitBlock cww cKey h
lift $ notify h
delSession se sBlockDownload cKey
-- TODO: #ASAP
-- NOTIFY BLOCK IS DONE
when (written > mbSize * defBlockDownloadThreshold) $ do
debug $ "SESSION DELETED BECAUSE THAT PEER IS JERK:" <+> pretty p
delSession se sBlockDownload cKey
-- ЕСЛИ ТУТ ВИСЕТЬ ДОЛГО, ТО НАС МОЖНО ДИДОСИТЬ,
-- ПОСЫЛАЯ НЕ ВСЕ БЛОКИ ЧАНКА ИЛИ ПОСЫЛАЯ ОТДЕЛЬНЫЕ
-- ЧАНКИ, ПО МНОГУ РАЗ. А МЫ БУДЕМ ХЭШИ СЧИТАТЬ.
-- ЧАНКИ ПО МНОГУ РАЗ. А МЫ БУДЕМ ХЭШИ СЧИТАТЬ.
-- ТАК НЕ ПОЙДЕТ
-- ТАК ЧТО ТУТ ЖДЁМ, ДОПУСТИМ 2*mbSize и отваливаемся
}
@ -336,8 +350,6 @@ runFakePeer se env = do
stopChunkWriter cww
pause ( 0.25 :: Timeout 'Seconds)
mapM_ cancel [w,cw]
@ -360,11 +372,6 @@ test1 = do
peerz <- mapM (async . uncurry runFakePeer) ee
runEngineM e0 $ do
request p1 (GetBlockSize @Fake (fromString "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ"))
request p1 (GetBlockSize @Fake (fromString "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt"))
request p0 (GetBlockSize @Fake (fromString "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ"))
request p0 (GetBlockSize @Fake (fromString "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt"))
let h = fromString "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt"
@ -374,18 +381,36 @@ test1 = do
let s0 = (fst . head) ee
let cKey@(_, cookie) = (p1, 0) -- <<~~~ FIXME: generate a good session id!
let chsz = defChunkSize
let def = newBlockDownload h
qblk <- liftIO Q.newTQueueIO
let onBlockReady bh = do
liftIO $ atomically $ Q.writeTQueue qblk bh
let def = newBlockDownload h onBlockReady
-- create sessions before sequesting anything
updSession s0 def sBlockDownload cKey (set sBlockChunkSize chsz)
request p1 (GetBlockSize @Fake (fromString "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ"))
request p1 (GetBlockSize @Fake (fromString "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt"))
request p0 (GetBlockSize @Fake (fromString "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ"))
request p0 (GetBlockSize @Fake (fromString "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt"))
-- TODO: #ASAP block ready notification
debug $ "REQUEST BLOCK:" <+> pretty h <+> "from" <+> pretty p1
request p1 (BlockChunks @Fake cookie (BlockGetAllChunks h chsz))
blk <- liftIO $ atomically $ Q.readTQueue qblk
debug $ "BLOCK READY:" <+> pretty blk
pure ()
pause ( 1 :: Timeout 'Seconds)
mapM_ cancel peerz
(_, e) <- waitAnyCatchCancel peerz