From f9748ed1fc3207d52d99d80a12851f4e5b553d28 Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Sun, 22 Jan 2023 09:20:47 +0300 Subject: [PATCH] wip --- hbs2-core/hbs2-core.cabal | 1 + hbs2-core/lib/HBS2/Actors/Peer.hs | 82 ++++++++++++++++++----- hbs2-core/lib/HBS2/Clock.hs | 8 ++- hbs2-core/lib/HBS2/Data/Types.hs | 2 + hbs2-core/lib/HBS2/Defaults.hs | 3 + hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs | 2 +- hbs2-tests/test/Peer2Main.hs | 25 +++++-- 7 files changed, 100 insertions(+), 23 deletions(-) diff --git a/hbs2-core/hbs2-core.cabal b/hbs2-core/hbs2-core.cabal index b23d3258..6da2184c 100644 --- a/hbs2-core/hbs2-core.cabal +++ b/hbs2-core/hbs2-core.cabal @@ -119,6 +119,7 @@ library , text , transformers , uniplate + , unordered-containers hs-source-dirs: lib default-language: Haskell2010 diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index 134c14d6..29781766 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -5,6 +5,7 @@ module HBS2.Actors.Peer where import HBS2.Prelude.Plated import HBS2.Hash +import HBS2.Clock import HBS2.Actors import HBS2.Storage import HBS2.Net.Proto @@ -26,6 +27,13 @@ import Data.Map qualified as Map import Data.Maybe import GHC.TypeLits import Lens.Micro.Platform +import Data.HashMap.Strict (HashMap) +import Data.HashMap.Strict qualified as HashMap +import Control.Concurrent.STM.TVar +import Control.Concurrent.STM +import Data.Hashable (hash) + +import Prettyprinter hiding (pipe) data AnyStorage = forall zu . Storage zu HbSync ByteString IO => AnyStorage zu @@ -82,12 +90,14 @@ makeResponse h = AnyProtocol { myProtoId = natVal (Proxy @(ProtocolId p)) data PeerEnv e = PeerEnv - { _envSelf :: Peer e - , _envFab :: Fabriq e - , _envStorage :: AnyStorage - , _envDeferred :: Pipeline IO () - , _envSessions :: Cache SKey Dynamic - , _envEvents :: Cache SKey Dynamic + { _envSelf :: Peer e + , _envFab :: Fabriq e + , _envStorage :: AnyStorage + , _envDeferred :: Pipeline IO () + , _envSessions :: Cache SKey Dynamic + , _envEvents :: TVar (HashMap SKey [Dynamic]) + , _envExpireTimes :: Cache SKey () + , _envSweepers :: TVar (HashMap SKey [PeerM e IO ()]) } newtype PeerM e m a = PeerM { fromPeerM :: ReaderT (PeerEnv e) m a } @@ -187,20 +197,51 @@ instance ( MonadIO m let bs = serialise (AnyMessage @e proto (encode msg)) sendTo pipe (To p) (From me) bs - - instance ( HasProtocol e p , Typeable (EventHandler e p (PeerM e IO)) , Typeable (EventKey e p) , Typeable (Event e p) , Hashable (EventKey e p) + , Expires (EventKey e p) , Eq (EventKey e p) ) => EventListener e p (PeerM e IO) where subscribe k h = do ev <- asks (view envEvents) - liftIO $ Cache.insert ev (newSKey @(EventKey e p) k) (toDyn h) - pure () + let sk = newSKey @(EventKey e p) k + let dyn = toDyn h + liftIO $ atomically $ modifyTVar' ev (HashMap.insertWith (<>) sk [dyn]) + -- FIXME: add a sweeping routine or else everything will be fucked! + addSweeper (expiresIn (Proxy @(EventKey e p))) sk $ do + liftIO $ print $ "sweep smth with key" <+> pretty (hash sk) + liftIO $ atomically $ modifyTVar' ev (HashMap.delete sk) + +addSweeper :: forall e . Timeout 'Seconds -> SKey -> PeerM e IO () -> PeerM e IO () +addSweeper t k sweeper = do + liftIO $ print $ "adding sweeper for key" <+> pretty (hash k) + ex <- asks (view envExpireTimes) + sw <- asks (view envSweepers) + liftIO $ Cache.insert' ex (Just (toTimeSpec t)) k () + liftIO $ atomically $ modifyTVar' sw (HashMap.insertWith (<>) k [sweeper]) + +sweep :: PeerM e IO () +sweep = do + ex <- asks (view envExpireTimes) + sw <- asks (view envSweepers) + + liftIO $ Cache.purgeExpired ex + toSweep <- HashMap.toList <$> liftIO (readTVarIO sw) + + alive <- forM toSweep $ \(s, actions) -> do + here <- liftIO $ Cache.lookup' ex s <&> isJust + + if here then + pure [(s, actions)] + else do + sequence_ actions + pure [] + + liftIO $ atomically $ modifyTVar' sw (<> HashMap.fromList (mconcat alive)) instance ( HasProtocol e p , Typeable (EventKey e p) @@ -215,20 +256,30 @@ instance ( HasProtocol e p let sk = newSKey @(EventKey e p) k void $ runMaybeT $ do - r <- MaybeT $ liftIO $ Cache.lookup se sk - ev <- MaybeT $ pure $ fromDynamic @(EventHandler e p (PeerM e IO)) r - lift $ ev d + subs <- MaybeT $ liftIO $ atomically $ readTVar se <&> HashMap.lookup sk + void $ liftIO $ atomically $ modifyTVar' se (HashMap.delete sk) + for_ subs $ \r -> do + ev <- MaybeT $ pure $ fromDynamic @(EventHandler e p (PeerM e IO)) r + lift $ ev d - -runPeerM :: MonadIO m => AnyStorage -> Fabriq e -> Peer e -> PeerM e m a -> m () +runPeerM :: (MonadIO m, Pretty (Peer e)) => AnyStorage -> Fabriq e -> Peer e -> PeerM e m a -> m () runPeerM s bus p f = do env <- PeerEnv p bus s <$> newPipeline defProtoPipelineSize <*> liftIO (Cache.newCache (Just defCookieTimeout)) + <*> liftIO (newTVarIO mempty) <*> liftIO (Cache.newCache (Just defCookieTimeout)) + <*> liftIO (newTVarIO mempty) let de = view envDeferred env as <- liftIO $ async $ runPipeline de + + sw <- liftIO $ async $ forever $ withPeerM env $ do + pause defSweepTimeout + se <- asks (view envSessions) + liftIO $ Cache.purgeExpired se + sweep + void $ runReaderT (fromPeerM f) env void $ liftIO $ stopPipeline de liftIO $ cancel as @@ -276,6 +327,7 @@ instance ( HasProtocol e p , Serialise (Encoded e) , MonadTrans (ResponseM e) , HasStorage (PeerM e IO) + , Pretty (Peer e) ) => Response e p (ResponseM e (PeerM e IO)) where thatPeer _ = asks (view answTo) diff --git a/hbs2-core/lib/HBS2/Clock.hs b/hbs2-core/lib/HBS2/Clock.hs index 717c70ea..a29b6d27 100644 --- a/hbs2-core/lib/HBS2/Clock.hs +++ b/hbs2-core/lib/HBS2/Clock.hs @@ -1,14 +1,16 @@ +{-# Language FunctionalDependencies #-} module HBS2.Clock ( module HBS2.Clock , module System.Clock )where +import Control.Concurrent (threadDelay) import Control.Monad.IO.Class import Data.Fixed import Data.Int (Int64) +import Data.Proxy import Prettyprinter import System.Clock -import Control.Concurrent (threadDelay) data TimeoutKind = MilliSeconds | Seconds | Minutes @@ -63,3 +65,7 @@ instance IsTimeout 'Seconds where instance IsTimeout 'Minutes where toNanoSeconds (TimeoutMin x) = round (x * 60 * 1e9) +class Expires a where + expiresIn :: Proxy a -> Timeout 'Seconds + + diff --git a/hbs2-core/lib/HBS2/Data/Types.hs b/hbs2-core/lib/HBS2/Data/Types.hs index 627218bf..0b494977 100644 --- a/hbs2-core/lib/HBS2/Data/Types.hs +++ b/hbs2-core/lib/HBS2/Data/Types.hs @@ -7,3 +7,5 @@ module HBS2.Data.Types import HBS2.Hash import HBS2.Data.Types.Refs + + diff --git a/hbs2-core/lib/HBS2/Defaults.hs b/hbs2-core/lib/HBS2/Defaults.hs index fd376b19..dfc121d0 100644 --- a/hbs2-core/lib/HBS2/Defaults.hs +++ b/hbs2-core/lib/HBS2/Defaults.hs @@ -35,5 +35,8 @@ defCookieTimeout = toTimeSpec ( 10 :: Timeout 'Minutes) defBlockInfoTimeout :: TimeSpec defBlockInfoTimeout = toTimeSpec ( 10 :: Timeout 'Minutes) +defSweepTimeout :: Timeout 'Seconds +defSweepTimeout = 5 -- FIXME: only for debug! + diff --git a/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs b/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs index 50eb615b..c4cf1d51 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs @@ -50,7 +50,7 @@ newtype instance SessionKey e (BlockSize e) = newtype instance EventKey e (BlockSize e) = - BlockSizeEventKey () + BlockSizeEventKey (Hash HbSync) deriving stock (Typeable, Eq) deriving newtype (Hashable) diff --git a/hbs2-tests/test/Peer2Main.hs b/hbs2-tests/test/Peer2Main.hs index 243e3d5e..e498ad50 100644 --- a/hbs2-tests/test/Peer2Main.hs +++ b/hbs2-tests/test/Peer2Main.hs @@ -27,6 +27,7 @@ import Control.Monad.Reader import Data.ByteString.Lazy (ByteString) import Data.ByteString.Lazy.Char8 qualified as B8 import Data.Default +import Data.Foldable import Data.Map (Map) import Data.Map qualified as Map import Data.Word @@ -70,6 +71,10 @@ instance HasProtocol Fake (BlockSize Fake) where decode = either (const Nothing) Just . deserialiseOrFail encode = serialise +-- FIXME: 3 is for debug only! +instance Expires (EventKey Fake (BlockSize Fake)) where + expiresIn _ = 3 + instance HasProtocol Fake (BlockChunks Fake) where type instance ProtocolId (BlockChunks Fake) = 2 type instance Encoded Fake = ByteString @@ -142,7 +147,7 @@ handleBlockInfo (p, h, sz') = do let bsz = fromIntegral sz update @e def (BlockSizeKey h) (over bsBlockSizes (Map.insert p bsz)) liftIO $ debug $ "got block:" <+> pretty (p, h, sz) - emit @e (BlockSizeEventKey ()) BlockSizeEvent + emit @e (BlockSizeEventKey h) BlockSizeEvent -- FIXME: turn back on event notification -- lift $ runEngineM env $ emitBlockSizeEvent ev h (p, h, Just sz) -- TODO: fix this crazy shit @@ -154,11 +159,19 @@ blockDownloadLoop :: forall e . ( HasProtocol e (BlockSize e) ) => PeerM e IO () blockDownloadLoop = do - w <- subscribe @e @(BlockSize e) (BlockSizeEventKey ()) $ \_ -> do - debug "can't believe this shit works" + let blks = [ "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt" + , "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ" + , "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" + ] - request 1 (GetBlockSize @e "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt") - request 1 (GetBlockSize @e "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ") + for_ blks $ \h -> do + + debug $ "subscribing to" <+> pretty h + + subscribe @e @(BlockSize e) (BlockSizeEventKey h) $ \_ -> do + debug $ "can't believe this shit works" <+> pretty h + + request 1 (GetBlockSize @e h) fix \next -> do liftIO $ print "piu!" @@ -210,7 +223,7 @@ main = do liftIO $ cancel as - pause ( 5 :: Timeout 'Seconds) + pause ( 8 :: Timeout 'Seconds) mapM_ cancel (our:others)