From ada3542ec6724d480021d8f81740d37e29459b33 Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Fri, 20 Jan 2023 21:05:57 +0300 Subject: [PATCH] wip --- hbs2-core/lib/HBS2/Actors/Peer.hs | 12 ++- hbs2-core/lib/HBS2/Net/Proto/Types.hs | 8 ++ hbs2-tests/test/PeerMain.hs | 147 ++++++++++++++++---------- 3 files changed, 109 insertions(+), 58 deletions(-) diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index 3c0e7b08..8a1fe9f4 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -16,7 +16,7 @@ import Control.Monad.Reader import Control.Monad.Trans.Maybe import Data.ByteString.Lazy ( ByteString ) import Data.Digest.Murmur32 -import Data.Foldable +import Data.Foldable hiding (find) import Data.Hashable import Data.Kind import Data.Map qualified as Map @@ -145,6 +145,8 @@ instance ( MonadIO m , Hashable (SessionKey e p) ) => Sessions e p (ResponseM e m) where + find k f = flip runEngineM (find k f) =<< asks (view engine) + fetch i d k f = flip runEngineM (fetch i d k f) =<< asks (view engine) update d k f = flip runEngineM (update d k f) =<< asks (view engine) @@ -161,6 +163,14 @@ instance ( MonadIO m ) => Sessions e p (EngineM e m) where + find k f = do + se <- asks (view sessions) + let sk = newSKey @(SessionKey e p) k + r <- liftIO $ Cache.lookup se sk + case fromDynamic @(SessionData e p) <$> r of + Just v -> pure $ f <$> v + Nothing -> pure Nothing + fetch upd def k fn = do se <- asks (view sessions) let sk = newSKey @(SessionKey e p) k diff --git a/hbs2-core/lib/HBS2/Net/Proto/Types.hs b/hbs2-core/lib/HBS2/Net/Proto/Types.hs index 85bbe570..4039e874 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Types.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Types.hs @@ -66,6 +66,14 @@ class ( Monad m ) => Sessions e p m | p -> e where + + -- | Session fetch function. + -- | It will insert a new session, if default value is Just something. + + find :: SessionKey e p -- ^ session key + -> (SessionData e p -> a) -- ^ modification function, i.e. lens + -> m (Maybe a) + -- | Session fetch function. -- | It will insert a new session, if default value is Just something. diff --git a/hbs2-tests/test/PeerMain.hs b/hbs2-tests/test/PeerMain.hs index 11ce453b..205eacf9 100644 --- a/hbs2-tests/test/PeerMain.hs +++ b/hbs2-tests/test/PeerMain.hs @@ -33,7 +33,7 @@ import Data.ByteString.Lazy (ByteString) import Data.ByteString.Lazy.Char8 qualified as B8 import Data.Cache (Cache) import Data.Cache qualified as Cache -import Data.Foldable +import Data.Foldable hiding (find) import Data.Hashable import Data.Map (Map) import Data.Map qualified as Map @@ -69,6 +69,7 @@ debug p = liftIO $ hPrint stderr p data BlockDownload = BlockDownload { _sBlockHash :: Hash HbSync + , _sBlockSize :: Size , _sBlockChunkSize :: ChunkSize , _sBlockOffset :: Offset , _sBlockWritten :: Size @@ -77,7 +78,7 @@ data BlockDownload = makeLenses 'BlockDownload newBlockDownload :: Hash HbSync -> BlockDownload -newBlockDownload h = BlockDownload h 0 0 0 +newBlockDownload h = BlockDownload h 0 0 0 0 data Fake @@ -137,13 +138,15 @@ main = do -- ] -newtype PeerEvents e (m :: Type -> Type) = +data PeerEvents e (m :: Type -> Type) = PeerEvents - { onBlockSize :: TVar (Map (Hash HbSync) [HasBlockEvent HbSync e m]) + { onBlockSize :: TVar (Map (Hash HbSync) [HasBlockEvent HbSync e m]) + , onBlockReady :: TVar (Map (Hash HbSync) [OnBlockReady HbSync m]) } newPeerEventsIO :: forall e m . MonadIO m => IO (PeerEvents e m) newPeerEventsIO = PeerEvents <$> newTVarIO mempty + <*> newTVarIO mempty addBlockSizeEventNotify :: forall e m . (MonadIO m) => PeerEvents e m @@ -154,6 +157,15 @@ addBlockSizeEventNotify :: forall e m . (MonadIO m) addBlockSizeEventNotify pe h e = do void $ liftIO $ atomically $ modifyTVar' (onBlockSize pe) (Map.insertWith (<>) h [e]) +addBlockReadyEventNotify :: forall e m . (MonadIO m) + => PeerEvents e m + -> Hash HbSync + -> OnBlockReady HbSync m + -> m () + +addBlockReadyEventNotify pe h e = do + void $ liftIO $ atomically $ modifyTVar' (onBlockReady pe) (Map.insertWith (<>) h [e]) + emitBlockSizeEvent :: forall e m . MonadIO m => PeerEvents e m -> Hash HbSync @@ -170,15 +182,30 @@ emitBlockSizeEvent pe h event = do in (mconcat (maybeToList ev), Map.delete h m) -runFakePeer :: forall e b m . ( e ~ Fake - -- , MonadIO m - , Messaging b e ByteString - -- , Sessions Fake (BlockSize Fake) - -- , m ~ ResponseM Fake IO - -- , MonadIO m - -- , Response e p m - -- , EngineM e m - ) +emitBlockReadyEvent :: forall e m . MonadIO m + => PeerEvents e m + -> Hash HbSync + -> m () + +emitBlockReadyEvent pe h = do + ev <- liftIO $ atomically $ stateTVar (onBlockReady pe) alter + for_ ev $ \e -> e h + + where + alter m = + let ev = Map.lookup h m + in (mconcat (maybeToList ev), Map.delete h m) + + +runFakePeer :: forall e b . ( e ~ Fake + -- , MonadIO m + , Messaging b e ByteString + -- , Sessions Fake (BlockSize Fake) + -- , m ~ ResponseM Fake IO + -- , MonadIO m + -- , Response e p m + -- , EngineM e m + ) => PeerEvents e (EngineM e IO) -> Peer e -> b @@ -227,9 +254,9 @@ runFakePeer ev p0 bus work = do let adapter = BlockChunksI - { blkSize = hasBlock storage - , blkChunk = getChunk storage - , blkGetHash = error "FUCK" -- FIXME! \c -> getSession' se sBlockDownload c (view sBlockHash) + { blkSize = hasBlock storage + , blkChunk = getChunk storage + , blkGetHash = \c -> find (DownloadSessionKey c) (view sBlockHash) -- КАК ТОЛЬКО ПРИНЯЛИ ВСЕ ЧАНКИ (ПРИШЁЛ ПОСЛЕДНИЙ ЧАНК): -- СЧИТАЕМ ХЭШ ТОГО, ЧТО ПОЛУЧИЛОСЬ @@ -242,55 +269,50 @@ runFakePeer ev p0 bus work = do let cKey = DownloadSessionKey (p,c) -- check if there is a session - -- FIXME - -- void $ MaybeT $ getSession' se sBlockDownload cKey id - - let de = newBlockDownload h + -- FIXME: + -- TODO: log situation when no session + dwnld <- MaybeT $ find cKey id let bslen = fromIntegral $ B8.length bs - -- TODO: log this situation - -- FIXME - -- mbSize <- MaybeT $ getSession' se sBlockSizes h (Map.lookup p) <&> fromMaybe Nothing - -- mbChSize <- MaybeT $ getSession' se sBlockDownload cKey (view sBlockChunkSize) - -- let offset = fromIntegral n * fromIntegral mbChSize :: Offset + let mbSize = view sBlockSize dwnld + let mbChSize = view sBlockChunkSize dwnld - -- updSession se de sBlockDownload cKey (over sBlockOffset (max offset)) + let offset0 = fromIntegral n * fromIntegral mbChSize :: Offset - -- liftIO $ do - -- writeChunk cww cKey h offset bs - -- updSession se de sBlockDownload cKey (over sBlockWritten (+bslen)) + liftIO $ do + writeChunk cww cKey h offset0 bs - -- dwnld <- MaybeT $ getSession' se sBlockDownload cKey id + let written = view sBlockWritten dwnld + bslen + let maxOff = max offset0 (view sBlockOffset dwnld) - -- let maxOff = view sBlockOffset dwnld - -- let written = view sBlockWritten dwnld - -- let notify = view sOnBlockReady dwnld + lift $ update dwnld cKey ( set sBlockOffset maxOff + . set sBlockWritten written + ) - -- let mbDone = (maxOff + fromIntegral mbChSize) > fromIntegral mbSize - -- && written >= mbSize + let mbDone = (maxOff + fromIntegral mbChSize) > fromIntegral mbSize + && written >= mbSize - -- when mbDone $ lift do - -- deferred (Proxy @(BlockChunks e)) $ do - -- h1 <- liftIO $ getHash cww cKey h + when mbDone $ lift do + deferred (Proxy @(BlockChunks e)) $ do + h1 <- liftIO $ getHash cww cKey h - -- -- ПОСЧИТАТЬ ХЭШ - -- -- ЕСЛИ СОШЁЛСЯ - ФИНАЛИЗИРОВАТЬ БЛОК - -- -- ЕСЛИ НЕ СОШЁЛСЯ - ТО ПОДОЖДАТЬ ЕЩЕ - -- when ( h1 == h ) $ do - -- lift $ commitBlock cww cKey h - -- lift $ notify h - -- delSession se sBlockDownload cKey + -- ПОСЧИТАТЬ ХЭШ + -- ЕСЛИ СОШЁЛСЯ - ФИНАЛИЗИРОВАТЬ БЛОК + -- ЕСЛИ НЕ СОШЁЛСЯ - ТО ПОДОЖДАТЬ ЕЩЕ + when ( h1 == h ) $ do + liftIO $ commitBlock cww cKey h + expire cKey + lift $ runEngineM env $ emitBlockReadyEvent ev h -- TODO: fix this crazy shit - -- when (written > mbSize * defBlockDownloadThreshold) $ do - -- debug $ "SESSION DELETED BECAUSE THAT PEER IS JERK:" <+> pretty p - -- delSession se sBlockDownload cKey + when (written > mbSize * defBlockDownloadThreshold) $ do + debug $ "SESSION DELETED BECAUSE THAT PEER IS JERK:" <+> pretty p + lift $ expire cKey -- ЕСЛИ ТУТ ВИСЕТЬ ДОЛГО, ТО НАС МОЖНО ДИДОСИТЬ, -- ПОСЫЛАЯ НЕ ВСЕ БЛОКИ ЧАНКА ИЛИ ПОСЫЛАЯ ОТДЕЛЬНЫЕ -- ЧАНКИ ПО МНОГУ РАЗ. А МЫ БУДЕМ ХЭШИ СЧИТАТЬ. -- ТАК НЕ ПОЙДЕТ -- ТАК ЧТО ТУТ ЖДЁМ, ДОПУСТИМ 2*mbSize и отваливаемся - pure () } peer <- async $ runPeer env @@ -333,33 +355,44 @@ test1 = do traverse_ (atomically . TBQ.writeTBQueue b) ini pure b - p0Thread <- async $ runFakePeer ev0 p0 fake $ do + -- TODO: random shuffle and take X let knownPeers = [p1] fix \next -> do - -- НА САМОМ ДЕЛЕ НАМ НЕ НАДО ЖДАТЬ БЛОКИНФЫ. - -- НАМ НАДО ОТПРАВЛЯТЬ КАЧАТЬ БЛОК, КАК ТОЛЬКО - -- ПО НЕМУ ПОЯВИЛАСЬ ИНФА - blkHash <- liftIO $ atomically $ TBQ.readTBQueue blkQ + -- TODO: check is this block is already here + -- maybe emit event to continue -> parse/seek for content + + addBlockReadyEventNotify ev0 blkHash $ \h -> do + debug $ "DOWNLOADED BLOCK" <+> pretty h <+> "NOW WHAT?" + -- TODO: надо трекать, может блок-то и найден -- либо по всем пирам спросить addBlockSizeEventNotify ev0 blkHash $ \case - (p, h, Just _) -> do + (p, h, Just size) -> do coo <- genCookie (p,blkHash) let key = DownloadSessionKey (p, coo) - let new = newBlockDownload blkHash + let chusz = defChunkSize + + let new = set sBlockChunkSize chusz + . set sBlockSize (fromIntegral size) + $ newBlockDownload blkHash + update @Fake new key id - request p (GetBlockSize @Fake blkHash) + request p (BlockChunks coo (BlockGetAllChunks @Fake blkHash chusz)) -- FIXME: nice construction liftIO $ print $ "DAVAI BLOCK!" <+> pretty h _ -> pure () + -- TODO: смотрим, может у нас уже есть block-size + -- тогда ловим случайного пира, у которого оно есть + -- и ставим на закачку + -- КТО ПЕРВЫЙ ВСТАЛ ТОГО И ТАПКИ for_ knownPeers $ \who -> request who (GetBlockSize @Fake blkHash)