session reworking wip

This commit is contained in:
Dmitry Zuikov 2023-01-19 07:36:54 +03:00
parent 66d62127a1
commit 42ed4573b2
2 changed files with 40 additions and 25 deletions

View File

@ -19,6 +19,9 @@ defPipelineSize = 100
defChunkWriterQ :: Integral a => a
defChunkWriterQ = 100
defBlockDownloadThreshold :: Integral a => a
defBlockDownloadThreshold = 2
-- typical block hash 530+ chunks * parallel wip blocks amount
defProtoPipelineSize :: Int
defProtoPipelineSize = 65536
@ -29,3 +32,5 @@ defCookieTimeout = toTimeSpec ( 10 :: Timeout 'Minutes)
defBlockInfoTimeout :: TimeSpec
defBlockInfoTimeout = toTimeSpec ( 10 :: Timeout 'Minutes)

View File

@ -67,14 +67,19 @@ data BlockDownload =
data Sessions e =
Sessions
{ _sBlockDownload :: Cache (Cookie e) BlockDownload
{ _sBlockDownload :: Cache (Peer e, Cookie e) BlockDownload
, _sBlockSizes :: Cache (Hash HbSync) (Map (Peer e) Size)
, _sBlockSize :: Cache (Hash HbSync) Size
}
makeLenses 'Sessions
makeLenses 'BlockDownload
newBlockDownload :: Hash HbSync -> BlockDownload
newBlockDownload h = BlockDownload h 0 0 0
type GetBlockChunk h m = Hash h -> Offset -> Size -> m (Maybe ByteString)
@ -82,7 +87,7 @@ data BlockChunksI e m =
BlockChunksI
{ blkSize :: GetBlockSize HbSync m
, blkChunk :: GetBlockChunk HbSync m
, blkGetHash :: Cookie e -> m (Maybe (Hash HbSync))
, blkGetHash :: (Peer e, Cookie e) -> m (Maybe (Hash HbSync))
, blkAcceptChunk :: Response e (BlockChunks e) m => (Cookie e, Peer e, Hash HbSync, ChunkNum, ByteString) -> m ()
}
@ -128,8 +133,8 @@ blockChunksProto adapter (BlockChunks c p) =
maybe (pure ()) (response_ . BlockChunk @e i) chunk
BlockChunk n bs -> do
h <- blkGetHash adapter c
who <- thatPeer proto
h <- blkGetHash adapter (who, c)
maybe1 h (response_ (BlockLost @e)) $ \hh -> do
blkAcceptChunk adapter (c, who, hh, n, bs)
@ -198,8 +203,6 @@ newSession se l k v = do
let cache = view l se
liftIO $ Cache.insert cache k v
withNewSession se l k v m = newSession se l k v >> m
getSession' se l k fn = do
let cache = view l se
liftIO $ Cache.lookup cache k <&> fmap fn
@ -211,6 +214,9 @@ updSession se def l k fn = liftIO do
v <- Cache.fetchWithCache cache k (const $ pure def)
Cache.insert cache k (fn v)
delSession se l k = liftIO do
Cache.delete (view l se) k
runFakePeer :: forall e . e ~ Fake => Sessions e -> EngineEnv e -> IO ()
runFakePeer se env = do
@ -246,6 +252,8 @@ runFakePeer se env = do
let handleBlockInfo (p, h, sz') = do
maybe1 sz' (pure ()) $ \sz -> do
let bsz = fromIntegral sz
-- here we cache block size information
updSession se mempty sBlockSizes h (Map.insert p bsz)
updSession se bsz sBlockSize h (const bsz)
@ -265,42 +273,42 @@ runFakePeer se env = do
-- УДАЛЯЕМ КУКУ?
, blkAcceptChunk = \(c,p,h,n,bs) -> void $ runMaybeT $ do
let def = BlockDownload h 0 0 0 -- FIXME: ASAP
let chuKey = (p,c)
let def = newBlockDownload h
let cKey = (p,c)
let bslen = fromIntegral $ B8.length bs
-- TODO: log this situation
mbSize <- MaybeT $ getSession' se sBlockSizes h (Map.lookup p) <&> fromMaybe Nothing
mbChSize <- MaybeT $ getSession' se sBlockDownload c (view sBlockChunkSize)
mbChSize <- MaybeT $ getSession' se sBlockDownload cKey (view sBlockChunkSize)
let offset = fromIntegral n * fromIntegral mbChSize :: Offset
updSession se def sBlockDownload c (over sBlockOffset (max offset))
updSession se def sBlockDownload cKey (over sBlockOffset (max offset))
liftIO $ do
-- newBlock cww (p,c) h mbSize
writeChunk cww chuKey h offset bs
updSession se def sBlockDownload c (over sBlockWritten (+bslen))
writeChunk cww cKey h offset bs
updSession se def sBlockDownload cKey (over sBlockWritten (+bslen))
maxOff <- MaybeT $ getSession' se sBlockDownload c (view sBlockOffset)
written <- MaybeT $ getSession' se sBlockDownload c (view sBlockWritten)
maxOff <- MaybeT $ getSession' se sBlockDownload cKey (view sBlockOffset)
written <- MaybeT $ getSession' se sBlockDownload cKey (view sBlockWritten)
let mbDone = (maxOff + fromIntegral mbChSize) > fromIntegral mbSize
&& written >= mbSize
when mbDone $ lift do
deferred (Proxy @(BlockChunks e)) $ do
debug "THIS BLOCK MAYBE DONE"
h1 <- liftIO $ getHash cww chuKey h
when ( h1 == h ) $ do
debug $ "THIS BLOCK IS DEFINITLY DONE" <+> pretty h1
liftIO $ commitBlock cww chuKey h
h1 <- liftIO $ getHash cww cKey h
-- ПОСЧИТАТЬ ХЭШ
-- ЕСЛИ СОШЁЛСЯ - ФИНАЛИЗИРОВАТЬ БЛОК
-- ЕСЛИ НЕ СОШЁЛСЯ - ТО ПОДОЖДАТЬ ЕЩЕ
when ( h1 == h ) $ do
debug $ "THIS BLOCK IS DEFINITELY DONE" <+> pretty h1
liftIO $ commitBlock cww cKey h
when (written > mbSize * defBlockDownloadThreshold) $ do
debug $ "SESSION DELETED BECAUSE THAT PEER IS JERK:" <+> pretty p
delSession se sBlockDownload cKey
-- ЕСЛИ ТУТ ВИСЕТЬ ДОЛГО, ТО НАС МОЖНО ДИДОСИТЬ,
-- ПОСЫЛАЯ НЕ ВСЕ БЛОКИ ЧАНКА ИЛИ ПОСЫЛАЯ ОТДЕЛЬНЫЕ
-- ЧАНКИ, ПО МНОГУ РАЗ. А МЫ БУДЕМ ХЭШИ СЧИТАТЬ.
@ -350,12 +358,14 @@ test1 = do
let h = fromString "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt"
-- TODO: generate unique cookie!!
let cookie = 0
--
-- FIXME: withAllCrap $ do ...
let s0 = (fst . head) ee
let cKey@(_, cookie) = (p1, 0) -- <<~~~ FIXME: generate a good session id!
let chsz = defChunkSize
let def = BlockDownload h 0 0 0 -- FIXME: ASAP!!
updSession s0 def sBlockDownload cookie (set sBlockChunkSize chsz)
let def = newBlockDownload h
updSession s0 def sBlockDownload cKey (set sBlockChunkSize chsz)
request p1 (BlockChunks @Fake cookie (BlockGetAllChunks h chsz))
pure ()