From 5d3d60778d8c2d0e3e390f0324f89307340cbc99 Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Fri, 20 Jan 2023 19:09:11 +0300 Subject: [PATCH] gonna-fuckup --- hbs2-core/lib/HBS2/Actors.hs | 2 +- hbs2-core/lib/HBS2/Actors/Peer.hs | 16 +- hbs2-core/lib/HBS2/Defaults.hs | 3 + hbs2-core/lib/HBS2/Net/Proto/Types.hs | 10 + hbs2-tests/test/PeerMain.hs | 251 +++++++++++++------------- 5 files changed, 138 insertions(+), 144 deletions(-) diff --git a/hbs2-core/lib/HBS2/Actors.hs b/hbs2-core/lib/HBS2/Actors.hs index e27ffdc1..de151fde 100644 --- a/hbs2-core/lib/HBS2/Actors.hs +++ b/hbs2-core/lib/HBS2/Actors.hs @@ -24,7 +24,7 @@ data Pipeline m a = , toQueue :: TBMQueue ( m a ) } -newPipeline :: forall a (m :: Type -> Type) . MonadIO m => Int -> m (Pipeline m a) +newPipeline :: forall a (m1 :: Type -> Type) (m :: Type -> Type) . (MonadIO m, MonadIO m1) => Int -> m (Pipeline m1 a) newPipeline size = do tv <- liftIO $ TVar.newTVarIO False liftIO $ TBMQ.newTBMQueueIO size <&> Pipeline tv diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index 16f512e2..3c0e7b08 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -39,7 +39,7 @@ class Typeable a => Unkey a where instance Typeable a => Unkey a where unfuck _ = fromDynamic @a -newSKey :: forall a . (Eq a, Typeable a, Unkey a, Hashable a, Show a) => a -> SKey +newSKey :: forall a . (Eq a, Typeable a, Unkey a, Hashable a) => a -> SKey newSKey s = SKey (Proxy @a) (toDyn s) @@ -143,8 +143,6 @@ instance ( MonadIO m , Typeable (SessionKey e p) , Typeable (SessionData e p) , Hashable (SessionKey e p) - , Show (SessionData e p) - , Show (SessionKey e p) ) => Sessions e p (ResponseM e m) where fetch i d k f = flip runEngineM (fetch i d k f) =<< asks (view engine) @@ -160,8 +158,6 @@ instance ( MonadIO m , Typeable (SessionKey e p) , Typeable (SessionData e p) , Hashable (SessionKey e p) - , Show (SessionData e p) - , Show (SessionKey e p) ) => Sessions e p (EngineM e m) where @@ -170,8 +166,6 @@ instance ( MonadIO m let sk = newSKey @(SessionKey e p) k let ddef = toDyn def - liftIO $ print ("fetch!", show k) - r <- liftIO $ Cache.lookup se sk case r of @@ -183,10 +177,7 @@ instance ( MonadIO m update def k f = do se <- asks (view sessions) val <- fetch @e @p True def k id - liftIO $ print "UPDATE !!!!" liftIO $ Cache.insert se (newSKey @(SessionKey e p) k) (toDyn (f val)) - z <- liftIO $ Cache.lookup se (newSKey k) - liftIO $ print $ ("INSERTED SHIT", z) expire k = do se <- asks (view sessions) @@ -266,10 +257,5 @@ runPeer env@(EngineEnv {bus = pipe, defer = d}) hh = do , handle = h }) -> maybe (pure ()) (runResponseM ee pip . h) (decoder msg) --- FIXME: slow and dumb -instance {-# OVERLAPPABLE #-} (MonadIO m, Num (Cookie e)) => GenCookie e (EngineM e m) where - genCookie salt = do - r <- liftIO $ Random.randomIO @Int - pure $ fromInteger $ fromIntegral $ asWord32 $ hash32 (hash salt + r) diff --git a/hbs2-core/lib/HBS2/Defaults.hs b/hbs2-core/lib/HBS2/Defaults.hs index 8e5ade1e..fd376b19 100644 --- a/hbs2-core/lib/HBS2/Defaults.hs +++ b/hbs2-core/lib/HBS2/Defaults.hs @@ -19,6 +19,9 @@ defPipelineSize = 100 defChunkWriterQ :: Integral a => a defChunkWriterQ = 100 +defBlockDownloadQ :: Integral a => a +defBlockDownloadQ = 100 + defBlockDownloadThreshold :: Integral a => a defBlockDownloadThreshold = 2 diff --git a/hbs2-core/lib/HBS2/Net/Proto/Types.hs b/hbs2-core/lib/HBS2/Net/Proto/Types.hs index fcf3078c..85bbe570 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Types.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Types.hs @@ -1,6 +1,7 @@ {-# Language TypeFamilyDependencies #-} {-# Language FunctionalDependencies #-} {-# Language AllowAmbiguousTypes #-} +{-# Language UndecidableInstances #-} module HBS2.Net.Proto.Types ( module HBS2.Net.Proto.Types ) where @@ -11,6 +12,8 @@ import Data.Proxy import Data.Hashable import Control.Monad.IO.Class import Data.Typeable +import System.Random qualified as Random +import Data.Digest.Murmur32 -- e -> Transport (like, UDP or TChan) -- p -> L4 Protocol (like Ping/Pong) @@ -93,3 +96,10 @@ class (KnownNat (ProtocolId p), HasPeer e) => HasProtocol e p | p -> e where decode :: Encoded e -> Maybe p encode :: p -> Encoded e + +-- FIXME: slow and dumb +instance {-# OVERLAPPABLE #-} (MonadIO m, Num (Cookie e)) => GenCookie e m where + genCookie salt = do + r <- liftIO $ Random.randomIO @Int + pure $ fromInteger $ fromIntegral $ asWord32 $ hash32 (hash salt + r) + diff --git a/hbs2-tests/test/PeerMain.hs b/hbs2-tests/test/PeerMain.hs index 3c6413b5..250e647d 100644 --- a/hbs2-tests/test/PeerMain.hs +++ b/hbs2-tests/test/PeerMain.hs @@ -7,6 +7,7 @@ module Main where import HBS2.Prelude.Plated import HBS2.Clock import HBS2.Hash +import HBS2.Actors -- import HBS2.Net.Messaging import HBS2.Net.Proto import HBS2.Net.Proto.BlockInfo @@ -48,9 +49,12 @@ import Control.Concurrent import Data.Default import Control.Monad.Reader import Data.Dynamic +import Data.Kind import Control.Concurrent.STM import Control.Concurrent.STM.TQueue qualified as Q +import Control.Concurrent.STM.TBQueue qualified as TBQ +import Control.Concurrent.STM.TBQueue (TBQueue) debug :: (MonadIO m) => Doc ann -> m () debug p = liftIO $ hPrint stderr p @@ -62,35 +66,19 @@ debug p = liftIO $ hPrint stderr p -- but client's cookie in protocol should be just ( cookie :: Word32 ) -data BlockDownload m = +data BlockDownload = BlockDownload { _sBlockHash :: Hash HbSync , _sBlockChunkSize :: ChunkSize , _sBlockOffset :: Offset , _sBlockWritten :: Size - , _sOnBlockReady :: OnBlockReady HbSync m } -data MySessions e m = - Sessions - { _sBlockDownload :: Cache (Peer e, Cookie e) (BlockDownload m) - , _sBlockSizes :: Cache (Hash HbSync) (Map (Peer e) Size) - , _sBlockSize :: Cache (Hash HbSync) Size - } - - -makeLenses 'Sessions makeLenses 'BlockDownload -newBlockDownload :: forall m . MonadIO m - => Hash HbSync - -> OnBlockReady HbSync m - -> BlockDownload m - +newBlockDownload :: Hash HbSync -> BlockDownload newBlockDownload h = BlockDownload h 0 0 0 - - data Fake instance HasPeer Fake where @@ -117,6 +105,12 @@ instance HasProtocol Fake (BlockChunks Fake) where type instance SessionData Fake (BlockSize Fake) = BlockSizeSession Fake +type instance SessionData Fake (BlockChunks Fake) = BlockDownload + +newtype instance SessionKey Fake (BlockChunks Fake) = + DownloadSessionKey (Peer Fake, Cookie Fake) + deriving newtype (Eq, Hashable) + deriving stock (Generic) newtype BlockSizeSession e = BlockSizeSession @@ -143,63 +137,56 @@ main = do -- ] -emptySessions :: forall e m . MonadIO m => m (MySessions e m) -emptySessions = liftIO $ - Sessions <$> Cache.newCache (Just defCookieTimeout) - <*> Cache.newCache (Just defBlockInfoTimeout) - <*> Cache.newCache (Just defBlockInfoTimeout) +newtype PeerEvents e (m :: Type -> Type) = + PeerEvents + { onBlockSize :: TVar (Map (Hash HbSync) [HasBlockEvent HbSync e m]) + } -newSession :: (Eq k, Hashable k,MonadIO m) - => s - -> Getting (Cache k v) s (Cache k v) - -> k - -> v - -> m () +newPeerEventsIO :: forall e m . MonadIO m => IO (PeerEvents e m) +newPeerEventsIO = PeerEvents <$> newTVarIO mempty -newSession se l k v = do - let cache = view l se - liftIO $ Cache.insert cache k v +addBlockSizeEventNotify :: forall e m . (MonadIO m) + => PeerEvents e m + -> Hash HbSync + -> HasBlockEvent HbSync e m + -> m () -getSession' se l k fn = do - let cache = view l se - liftIO $ Cache.lookup cache k <&> fmap fn +addBlockSizeEventNotify pe h e = do + void $ liftIO $ atomically $ modifyTVar' (onBlockSize pe) (Map.insertWith (<>) h [e]) -getSession se l k = getSession' se l k id +emitBlockSizeEvent :: MonadIO m + => PeerEvents e m + -> Hash HbSync + -> (Peer e, Hash HbSync, Maybe Integer) + -> m () -updSession se def l k fn = liftIO do - let cache = view l se - v <- Cache.fetchWithCache cache k (const $ pure def) - Cache.insert cache k (fn v) +emitBlockSizeEvent pe h event = do + ev <- liftIO $ atomically $ stateTVar (onBlockSize pe) alter + for_ ev $ \e -> e event -delSession se l k = liftIO do - Cache.delete (view l se) k - -expireSession se l = liftIO do - Cache.purgeExpired (view l se) - --- A questionable FIX to avoid "orphans" complains -data Adapted e = Adapted + where + alter m = + let ev = Map.lookup h m + in (mconcat (maybeToList ev), Map.delete h m) --- newtype FullPeerM m a = RealPeerM { fromRealPeerM :: ReaderT } - -runFakePeer :: forall e b . ( e ~ Fake - -- , m ~ IO - , Messaging b e ByteString - -- , MonadIO m - -- , Response e p m - -- , EngineM e m - ) - => Peer e +runFakePeer :: forall e b m . ( e ~ Fake + , MonadIO m + , Messaging b e ByteString + -- , MonadIO m + -- , Response e p m + -- , EngineM e m + ) + => PeerEvents e m + -> Peer e -> b - -> EngineM e IO () - -> IO () -runFakePeer p0 bus work = do + -> EngineM e m () + -> IO () + +runFakePeer ev p0 bus work = do env <- newEnv p0 bus - se <- emptySessions @e - let pid = fromIntegral (hash (env ^. self)) :: Word8 dir <- liftIO $ canonicalizePath ( ".peers" show pid) @@ -233,25 +220,14 @@ runFakePeer p0 bus work = do maybe1 sz' (pure ()) $ \sz -> do let bsz = fromIntegral sz - z <- fetch @e False def (BlockSizeKey h) id - liftIO $ print ("QQQQQ", pretty p0, pretty p, z) - update @e def (BlockSizeKey h) (over bsBlockSizes (Map.insert p bsz)) - - -- here we cache block size information - updSession se mempty sBlockSizes h (Map.insert p bsz) - updSession se bsz sBlockSize h (const bsz) - - debug $ pretty p <+> "has block" <+> pretty h <+> pretty sz' - - z <- fetch @e False def (BlockSizeKey h) id - liftIO $ print ("BEBEBE", pretty p0, pretty p, z) + emitBlockSizeEvent ev h (p, h, Just sz) let adapter = BlockChunksI { blkSize = hasBlock storage , blkChunk = getChunk storage - , blkGetHash = \c -> getSession' se sBlockDownload c (view sBlockHash) + , blkGetHash = error "FUCK" -- FIXME! \c -> getSession' se sBlockDownload c (view sBlockHash) -- КАК ТОЛЬКО ПРИНЯЛИ ВСЕ ЧАНКИ (ПРИШЁЛ ПОСЛЕДНИЙ ЧАНК): -- СЧИТАЕМ ХЭШ ТОГО, ЧТО ПОЛУЧИЛОСЬ @@ -261,55 +237,58 @@ runFakePeer p0 bus work = do -- УДАЛЯЕМ КУКУ? , blkAcceptChunk = \(c,p,h,n,bs) -> void $ runMaybeT $ do - let cKey = (p,c) + let cKey = DownloadSessionKey (p,c) -- check if there is a session - void $ MaybeT $ getSession' se sBlockDownload cKey id + -- FIXME + -- void $ MaybeT $ getSession' se sBlockDownload cKey id - let def = newBlockDownload h dontHandle + let de = newBlockDownload h let bslen = fromIntegral $ B8.length bs -- TODO: log this situation - mbSize <- MaybeT $ getSession' se sBlockSizes h (Map.lookup p) <&> fromMaybe Nothing - mbChSize <- MaybeT $ getSession' se sBlockDownload cKey (view sBlockChunkSize) + -- FIXME + -- mbSize <- MaybeT $ getSession' se sBlockSizes h (Map.lookup p) <&> fromMaybe Nothing + -- mbChSize <- MaybeT $ getSession' se sBlockDownload cKey (view sBlockChunkSize) - let offset = fromIntegral n * fromIntegral mbChSize :: Offset + -- let offset = fromIntegral n * fromIntegral mbChSize :: Offset - updSession se def sBlockDownload cKey (over sBlockOffset (max offset)) + -- updSession se de sBlockDownload cKey (over sBlockOffset (max offset)) - liftIO $ do - writeChunk cww cKey h offset bs - updSession se def sBlockDownload cKey (over sBlockWritten (+bslen)) + -- liftIO $ do + -- writeChunk cww cKey h offset bs + -- updSession se de sBlockDownload cKey (over sBlockWritten (+bslen)) - dwnld <- MaybeT $ getSession' se sBlockDownload cKey id + -- dwnld <- MaybeT $ getSession' se sBlockDownload cKey id - let maxOff = view sBlockOffset dwnld - let written = view sBlockWritten dwnld - let notify = view sOnBlockReady dwnld + -- let maxOff = view sBlockOffset dwnld + -- let written = view sBlockWritten dwnld + -- let notify = view sOnBlockReady dwnld - let mbDone = (maxOff + fromIntegral mbChSize) > fromIntegral mbSize - && written >= mbSize + -- let mbDone = (maxOff + fromIntegral mbChSize) > fromIntegral mbSize + -- && written >= mbSize - when mbDone $ lift do - deferred (Proxy @(BlockChunks e)) $ do - h1 <- liftIO $ getHash cww cKey h + -- when mbDone $ lift do + -- deferred (Proxy @(BlockChunks e)) $ do + -- h1 <- liftIO $ getHash cww cKey h - -- ПОСЧИТАТЬ ХЭШ - -- ЕСЛИ СОШЁЛСЯ - ФИНАЛИЗИРОВАТЬ БЛОК - -- ЕСЛИ НЕ СОШЁЛСЯ - ТО ПОДОЖДАТЬ ЕЩЕ - when ( h1 == h ) $ do - lift $ commitBlock cww cKey h - lift $ notify h - delSession se sBlockDownload cKey + -- -- ПОСЧИТАТЬ ХЭШ + -- -- ЕСЛИ СОШЁЛСЯ - ФИНАЛИЗИРОВАТЬ БЛОК + -- -- ЕСЛИ НЕ СОШЁЛСЯ - ТО ПОДОЖДАТЬ ЕЩЕ + -- when ( h1 == h ) $ do + -- lift $ commitBlock cww cKey h + -- lift $ notify h + -- delSession se sBlockDownload cKey - when (written > mbSize * defBlockDownloadThreshold) $ do - debug $ "SESSION DELETED BECAUSE THAT PEER IS JERK:" <+> pretty p - delSession se sBlockDownload cKey + -- when (written > mbSize * defBlockDownloadThreshold) $ do + -- debug $ "SESSION DELETED BECAUSE THAT PEER IS JERK:" <+> pretty p + -- delSession se sBlockDownload cKey -- ЕСЛИ ТУТ ВИСЕТЬ ДОЛГО, ТО НАС МОЖНО ДИДОСИТЬ, -- ПОСЫЛАЯ НЕ ВСЕ БЛОКИ ЧАНКА ИЛИ ПОСЫЛАЯ ОТДЕЛЬНЫЕ -- ЧАНКИ ПО МНОГУ РАЗ. А МЫ БУДЕМ ХЭШИ СЧИТАТЬ. -- ТАК НЕ ПОЙДЕТ -- ТАК ЧТО ТУТ ЖДЁМ, ДОПУСТИМ 2*mbSize и отваливаемся + pure () } peer <- async $ runPeer env @@ -333,44 +312,60 @@ test1 = do fake <- newFakeP2P True - void $ race (pause (10 :: Timeout 'Seconds)) $ do + void $ race (pause (2 :: Timeout 'Seconds)) $ do let p0 = 0 :: Peer Fake let p1 = 1 :: Peer Fake - p1Thread <- async $ runFakePeer p1 fake (liftIO $ forever yield) + ev1 <- liftIO newPeerEventsIO + ev0 <- liftIO newPeerEventsIO - p0Thread <- async $ runFakePeer p0 fake $ do + p1Thread <- async $ runFakePeer ev1 p1 fake (liftIO $ forever yield) - let h1 = "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt" - let h0 = "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ" + let ini = [ "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt" + , "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ" + ] - -- fetch @Fake @(BlockSize Fake) True def h id - -- update @Fake @(BlockSize Fake) def (fromString h1) (over bsBlockSizes (Map.insert p1 111)) - update @Fake @(BlockSize Fake) def (BlockSizeKey (fromString h0)) (over bsBlockSizes (Map.insert p0 100)) + blkQ <- liftIO $ do + b <- newTBQueueIO defBlockDownloadQ + traverse_ (atomically . TBQ.writeTBQueue b) ini + pure b - -- request p1 (GetBlockSize @Fake (fromString h1)) - request p0 (GetBlockSize @Fake (fromString h0)) + p0Thread <- async $ runFakePeer ev0 p0 fake $ do - se1 <- fetch @Fake @(BlockSize Fake) False def (BlockSizeKey (fromString h0)) id - -- se2 <- fetch @Fake @(BlockSize Fake) False def (fromString h1) id + let knownPeers = [p1] - jopa <- asks (view sessions) + fix \next -> do - wtf <- liftIO $ Cache.lookup jopa (newSKey @(SessionKey Fake (BlockSize Fake)) (BlockSizeKey (fromString h0))) + -- НА САМОМ ДЕЛЕ НАМ НЕ НАДО ЖДАТЬ БЛОКИНФЫ. + -- НАМ НАДО ОТПРАВЛЯТЬ КАЧАТЬ БЛОК, КАК ТОЛЬКО + -- ПО НЕМУ ПОЯВИЛАСЬ ИНФА - pause ( 2 :: Timeout 'Seconds) + blkHash <- liftIO $ atomically $ TBQ.readTBQueue blkQ - liftIO $ print $ (p0, "AAAAAA", se1, fromDynamic @(SessionData Fake (BlockSize Fake)) (fromJust wtf)) + -- TODO: надо трекать, может блок-то и найден + -- либо по всем пирам спросить - -- updateSession cookie (id) - -- se <- getSession cookie (lens) - -- cookie <- newSession ??? + addBlockSizeEventNotify ev0 blkHash $ \case + (p, h, Just _) -> do + -- coo <- genCookie (p,blkHash) + -- let key = DownloadSessionKey (p, coo) + -- let new = newBlockDownload blkHash + -- update @Fake new key id + -- (over bsBlockSizes (Map.insert p bsz)) + request p (GetBlockSize @Fake blkHash) + -- liftIO $ print $ "DAVAI BLOCK!" <+> pretty h + -- update + -- let q = pure () + pure () - -- newCookie <- genCookie @Fake (p1, h) -- <<~~~ FIXME: generate a good session id! - -- let cKey@(_, cookie) = (p1, newCookie) + _ -> pure () - pure () + -- КТО ПЕРВЫЙ ВСТАЛ ТОГО И ТАПКИ + for_ knownPeers $ \who -> + request who (GetBlockSize @Fake blkHash) + + next let peerz = p0Thread : [p1Thread] @@ -427,7 +422,7 @@ test1 = do -- pure () - pause ( 5 :: Timeout 'Seconds) + pause ( 1 :: Timeout 'Seconds) mapM_ cancel peerz