From ec417fe3ef6f909b5b771cebbec3528c18a029da Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Fri, 20 Jan 2023 15:41:27 +0300 Subject: [PATCH] wip --- hbs2-core/lib/HBS2/Actors/Peer.hs | 93 +++++++++++++++---- hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs | 5 + hbs2-core/lib/HBS2/Net/Proto/Types.hs | 9 +- .../lib/HBS2/Storage/Simple.hs | 1 + hbs2-tests/hbs2-tests.cabal | 17 +++- hbs2-tests/test/HmapMain.hs | 76 +++++++++++++++ hbs2-tests/test/PeerMain.hs | 60 ++++++++++-- 7 files changed, 236 insertions(+), 25 deletions(-) create mode 100644 hbs2-tests/test/HmapMain.hs diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index 3b034032..16f512e2 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -25,9 +25,32 @@ import Lens.Micro.Platform import System.Random qualified as Random import Data.Cache (Cache) import Data.Cache qualified as Cache +import Data.Dynamic +import Data.Maybe import Codec.Serialise hiding (encode,decode) + +data SKey = forall a . (Unkey a, Eq a, Hashable a) => SKey (Proxy a) Dynamic + +class Typeable a => Unkey a where + unfuck :: Proxy a -> Dynamic -> Maybe a + +instance Typeable a => Unkey a where + unfuck _ = fromDynamic @a + +newSKey :: forall a . (Eq a, Typeable a, Unkey a, Hashable a, Show a) => a -> SKey +newSKey s = SKey (Proxy @a) (toDyn s) + + +instance Hashable SKey where + hashWithSalt s (SKey p d) = hashWithSalt s (unfuck p d) + + +instance Eq SKey where + (==) (SKey p1 a) (SKey p2 b) = unfuck p1 a == unfuck p1 b + + data AnyMessage e = AnyMessage Integer (Encoded e) deriving stock (Generic) @@ -40,7 +63,7 @@ data EngineEnv e = forall bus . ( Messaging bus e ByteString EngineEnv { _peer :: Maybe (Peer e) , _self :: Peer e - , _sessions :: () + , _sessions :: Cache SKey Dynamic , bus :: bus , defer :: Pipeline IO () } @@ -113,24 +136,61 @@ instance (MonadIO m, HasProtocol e p) => Request e p (EngineM e m) where liftIO $ sendTo b (To p) (From s) bs + instance ( MonadIO m , HasProtocol e p + , Eq (SessionKey e p) + , Typeable (SessionKey e p) + , Typeable (SessionData e p) + , Hashable (SessionKey e p) + , Show (SessionData e p) + , Show (SessionKey e p) + ) => Sessions e p (ResponseM e m) where + + fetch i d k f = flip runEngineM (fetch i d k f) =<< asks (view engine) + + update d k f = flip runEngineM (update d k f) =<< asks (view engine) + + expire k = flip runEngineM (expire k) =<< asks (view engine) + + +instance ( MonadIO m + , HasProtocol e p + , Eq (SessionKey e p) + , Typeable (SessionKey e p) + , Typeable (SessionData e p) + , Hashable (SessionKey e p) + , Show (SessionData e p) + , Show (SessionKey e p) ) => Sessions e p (EngineM e m) where - fetch upd def k fn = undefined - -- se <- asks (view sessions) - -- w <- liftIO $ Cache.fetchWithCache se k (const $ pure def) - -- when upd (liftIO $ Cache.insert se k def) - -- pure (fn w) - update def k f = undefined - -- se <- asks (view sessions) - -- w <- liftIO $ Cache.fetchWithCache se k (const $ pure def) - -- liftIO $ Cache.insert se k (f w) + fetch upd def k fn = do + se <- asks (view sessions) + let sk = newSKey @(SessionKey e p) k + let ddef = toDyn def - expire k = undefined - -- se <- asks (view sessions) - -- liftIO $ Cache.delete se k + liftIO $ print ("fetch!", show k) + + r <- liftIO $ Cache.lookup se sk + + case r of + Just v -> pure $ fn $ fromMaybe def (fromDynamic @(SessionData e p) v ) + Nothing -> do + when upd $ liftIO $ Cache.insert se sk ddef + pure (fn def) + + update def k f = do + se <- asks (view sessions) + val <- fetch @e @p True def k id + liftIO $ print "UPDATE !!!!" + liftIO $ Cache.insert se (newSKey @(SessionKey e p) k) (toDyn (f val)) + z <- liftIO $ Cache.lookup se (newSKey k) + liftIO $ print $ ("INSERTED SHIT", z) + + expire k = do + se <- asks (view sessions) + liftIO $ Cache.delete se (newSKey @(SessionKey e p) k) instance (HasProtocol e p, Serialise (Encoded e)) => Response e p (ResponseM e IO) where @@ -163,8 +223,7 @@ newEnv :: forall e bus m . ( Monad m newEnv p pipe = do de <- liftIO $ newPipeline defProtoPipelineSize - let se = () - -- se <- liftIO $ Cache.newCache (Just defCookieTimeout) -- FIXME: some more clever for timeout, i.e. typeclass + se <- liftIO $ Cache.newCache (Just defCookieTimeout) -- FIXME: some more clever for timeout, i.e. typeclass pure $ EngineEnv Nothing p se pipe de runPeer :: forall e m a . ( MonadIO m @@ -198,12 +257,14 @@ runPeer env@(EngineEnv {bus = pipe, defer = d}) hh = do local (set peer (Just pip)) do + ee <- ask + case Map.lookup n disp of Nothing -> pure () Just (AnyProtocol { protoDecode = decoder , handle = h - }) -> maybe (pure ()) (runResponseM env pip . h) (decoder msg) + }) -> maybe (pure ()) (runResponseM ee pip . h) (decoder msg) -- FIXME: slow and dumb instance {-# OVERLAPPABLE #-} (MonadIO m, Num (Cookie e)) => GenCookie e (EngineM e m) where diff --git a/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs b/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs index 2b979ac7..09d081d8 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs @@ -41,3 +41,8 @@ blockSizeProto getBlockSize evHasBlock = evHasBlock ( that, h, Just sz ) +newtype instance SessionKey e (BlockSize e) = + BlockSizeKey (Hash HbSync) + deriving stock (Typeable,Eq,Show) + deriving newtype (Hashable,IsString) + diff --git a/hbs2-core/lib/HBS2/Net/Proto/Types.hs b/hbs2-core/lib/HBS2/Net/Proto/Types.hs index bc4838ad..fcf3078c 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Types.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Types.hs @@ -10,6 +10,7 @@ import GHC.TypeLits import Data.Proxy import Data.Hashable import Control.Monad.IO.Class +import Data.Typeable -- e -> Transport (like, UDP or TChan) -- p -> L4 Protocol (like Ping/Pong) @@ -49,10 +50,16 @@ class Request e p (m :: Type -> Type) | p -> e where -- So it is that it is. data family SessionKey e p :: Type -data family SessionData e p :: Type +type family SessionData e p :: Type + class ( Monad m , HasProtocol e p + , Eq (SessionKey e p) + , Hashable (SessionKey e p) + , Typeable (SessionData e p) + -- , Typeable e + -- , Typeable p ) => Sessions e p m | p -> e where diff --git a/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs b/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs index 8536d0c9..197a3e59 100644 --- a/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs +++ b/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs @@ -370,6 +370,7 @@ simpleReadLinkRaw ss hash = do instance ( MonadIO m, IsKey hash , Hashed hash LBS.ByteString , Key hash ~ Hash hash + , Block LBS.ByteString ~ LBS.ByteString ) => Storage (SimpleStorage hash) hash LBS.ByteString m where diff --git a/hbs2-tests/hbs2-tests.cabal b/hbs2-tests/hbs2-tests.cabal index e2ec6398..f169b8f2 100644 --- a/hbs2-tests/hbs2-tests.cabal +++ b/hbs2-tests/hbs2-tests.cabal @@ -40,6 +40,7 @@ common common-deps , transformers , uniplate , vector + , data-default common shared-properties ghc-options: @@ -98,6 +99,20 @@ test-suite test-peer main-is: PeerMain.hs - +test-suite test-hmap + import: shared-properties + import: common-deps + default-language: Haskell2010 + + other-modules: + + build-depends: HMap + , data-default + + -- other-extensions: + + type: exitcode-stdio-1.0 + hs-source-dirs: test + main-is: HmapMain.hs diff --git a/hbs2-tests/test/HmapMain.hs b/hbs2-tests/test/HmapMain.hs new file mode 100644 index 00000000..f960e1f3 --- /dev/null +++ b/hbs2-tests/test/HmapMain.hs @@ -0,0 +1,76 @@ +{-# LANGUAGE ExistentialQuantification, RankNTypes #-} +module Main where + +import Data.Typeable +import Data.Dynamic +import Data.Proxy +import Data.Kind + +import Prettyprinter + + +data Key = forall a . (Unfuck a, Eq a) => Key (Proxy a) Dynamic + +class Typeable a => Unfuck a where + unfuck :: Proxy a -> Dynamic -> Maybe a + +instance Typeable a => Unfuck a where + unfuck _ = fromDynamic @a + +newKey :: forall a . (Eq a, Typeable a, Unfuck a) => a -> Key +newKey s = Key (Proxy @a) (toDyn s) + +instance Eq Key where + (==) (Key p1 a) (Key p2 b) = unfuck p1 a == unfuck p1 b + +main :: IO () +main = do + let k1 = newKey 22 + let k2 = newKey 33 + let k3 = newKey "JOPA" + + print $ "k1 == k1:" <+> pretty (k1 == k1) + print $ "k2 == k2:" <+> pretty (k2 == k2) + print $ "k1 == k2:" <+> pretty (k1 == k2) + print $ "k3 == k3:" <+> pretty (k3 == k3) + print $ "k3 == k2:" <+> pretty (k3 == k2) + print $ "k3 == k1:" <+> pretty (k3 == k1) + + + -- _ <- race ( pause ( 60 :: Timeout 'Seconds) ) $ forever $ do + -- let gen = arbitrary @MyKey + -- let n = 100 + -- keys <- replicateM 10 (sample' @MyKey gen) <&> mconcat + -- vals <- replicateM 100 (randomIO @Int) + + -- let kv = zip keys vals + + -- forM_ kv $ \(k,v) -> do + + -- m <- readTVarIO tm + + -- let z = withKey k id + + -- undefined + + -- atomically $ writeTVar tm z + -- atomically $ modifyTVar km (k:) + + -- kl <- readTVarIO km + + -- when (length kl > 1000) $ do + -- let (a,b) = L.splitAt 1000 kl + + -- m1 <- readTVarIO tm + -- forM_ b $ \z3 -> do + -- let m2 = withKey z3 $ \z3 -> delete z3 m1 + -- pure () + + -- atomically $ writeTVar km b + + -- pure () + + + + pure () + diff --git a/hbs2-tests/test/PeerMain.hs b/hbs2-tests/test/PeerMain.hs index bb777be5..3c6413b5 100644 --- a/hbs2-tests/test/PeerMain.hs +++ b/hbs2-tests/test/PeerMain.hs @@ -1,6 +1,7 @@ {-# Language RankNTypes #-} {-# Language TemplateHaskell #-} {-# Language AllowAmbiguousTypes #-} +{-# Language UndecidableInstances #-} module Main where import HBS2.Prelude.Plated @@ -44,6 +45,9 @@ import System.Exit import System.FilePath.Posix import System.IO import Control.Concurrent +import Data.Default +import Control.Monad.Reader +import Data.Dynamic import Control.Concurrent.STM import Control.Concurrent.STM.TQueue qualified as Q @@ -111,6 +115,22 @@ instance HasProtocol Fake (BlockChunks Fake) where decode = either (const Nothing) Just . deserialiseOrFail encode = serialise + +type instance SessionData Fake (BlockSize Fake) = BlockSizeSession Fake + +newtype BlockSizeSession e = + BlockSizeSession + { _bsBlockSizes :: Map (Peer e) Size + } + +makeLenses 'BlockSizeSession + +instance Ord (Peer e) => Default (BlockSizeSession e) where + def = BlockSizeSession mempty + +deriving stock instance Show (BlockSizeSession Fake) + + main :: IO () main = do hSetBuffering stderr LineBuffering @@ -174,9 +194,9 @@ runFakePeer :: forall e b . ( e ~ Fake -> b -> EngineM e IO () -> IO () -runFakePeer p bus work = do +runFakePeer p0 bus work = do - env <- newEnv p bus + env <- newEnv p0 bus se <- emptySessions @e @@ -191,7 +211,7 @@ runFakePeer p bus work = do let opts = [ StoragePrefix dir ] - storage <- simpleStorageInit opts -- :: IO (SimpleStorage HbSync) + storage <- simpleStorageInit opts :: IO (SimpleStorage HbSync) w <- liftIO $ async $ simpleStorageWorker storage @@ -213,12 +233,20 @@ runFakePeer p bus work = do maybe1 sz' (pure ()) $ \sz -> do let bsz = fromIntegral sz + z <- fetch @e False def (BlockSizeKey h) id + liftIO $ print ("QQQQQ", pretty p0, pretty p, z) + + update @e def (BlockSizeKey h) (over bsBlockSizes (Map.insert p bsz)) + -- here we cache block size information updSession se mempty sBlockSizes h (Map.insert p bsz) updSession se bsz sBlockSize h (const bsz) debug $ pretty p <+> "has block" <+> pretty h <+> pretty sz' + z <- fetch @e False def (BlockSizeKey h) id + liftIO $ print ("BEBEBE", pretty p0, pretty p, z) + let adapter = BlockChunksI { blkSize = hasBlock storage @@ -305,7 +333,7 @@ test1 = do fake <- newFakeP2P True - void $ race (pause (2 :: Timeout 'Seconds)) $ do + void $ race (pause (10 :: Timeout 'Seconds)) $ do let p0 = 0 :: Peer Fake let p1 = 1 :: Peer Fake @@ -314,10 +342,26 @@ test1 = do p0Thread <- async $ runFakePeer p0 fake $ do - request p1 (GetBlockSize @Fake (fromString "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ")) - request p1 (GetBlockSize @Fake (fromString "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt")) + let h1 = "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt" + let h0 = "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ" - let h = fromString "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ" + -- fetch @Fake @(BlockSize Fake) True def h id + -- update @Fake @(BlockSize Fake) def (fromString h1) (over bsBlockSizes (Map.insert p1 111)) + update @Fake @(BlockSize Fake) def (BlockSizeKey (fromString h0)) (over bsBlockSizes (Map.insert p0 100)) + + -- request p1 (GetBlockSize @Fake (fromString h1)) + request p0 (GetBlockSize @Fake (fromString h0)) + + se1 <- fetch @Fake @(BlockSize Fake) False def (BlockSizeKey (fromString h0)) id + -- se2 <- fetch @Fake @(BlockSize Fake) False def (fromString h1) id + + jopa <- asks (view sessions) + + wtf <- liftIO $ Cache.lookup jopa (newSKey @(SessionKey Fake (BlockSize Fake)) (BlockSizeKey (fromString h0))) + + pause ( 2 :: Timeout 'Seconds) + + liftIO $ print $ (p0, "AAAAAA", se1, fromDynamic @(SessionData Fake (BlockSize Fake)) (fromJust wtf)) -- updateSession cookie (id) -- se <- getSession cookie (lens) @@ -383,6 +427,8 @@ test1 = do -- pure () + pause ( 5 :: Timeout 'Seconds) + mapM_ cancel peerz (_, e) <- waitAnyCatchCancel peerz