This commit is contained in:
Dmitry Zuikov 2023-01-20 15:41:27 +03:00
parent 2b223644a4
commit ec417fe3ef
7 changed files with 236 additions and 25 deletions

View File

@ -25,9 +25,32 @@ import Lens.Micro.Platform
import System.Random qualified as Random
import Data.Cache (Cache)
import Data.Cache qualified as Cache
import Data.Dynamic
import Data.Maybe
import Codec.Serialise hiding (encode,decode)
data SKey = forall a . (Unkey a, Eq a, Hashable a) => SKey (Proxy a) Dynamic
class Typeable a => Unkey a where
unfuck :: Proxy a -> Dynamic -> Maybe a
instance Typeable a => Unkey a where
unfuck _ = fromDynamic @a
newSKey :: forall a . (Eq a, Typeable a, Unkey a, Hashable a, Show a) => a -> SKey
newSKey s = SKey (Proxy @a) (toDyn s)
instance Hashable SKey where
hashWithSalt s (SKey p d) = hashWithSalt s (unfuck p d)
instance Eq SKey where
(==) (SKey p1 a) (SKey p2 b) = unfuck p1 a == unfuck p1 b
data AnyMessage e = AnyMessage Integer (Encoded e)
deriving stock (Generic)
@ -40,7 +63,7 @@ data EngineEnv e = forall bus . ( Messaging bus e ByteString
EngineEnv
{ _peer :: Maybe (Peer e)
, _self :: Peer e
, _sessions :: ()
, _sessions :: Cache SKey Dynamic
, bus :: bus
, defer :: Pipeline IO ()
}
@ -113,24 +136,61 @@ instance (MonadIO m, HasProtocol e p) => Request e p (EngineM e m) where
liftIO $ sendTo b (To p) (From s) bs
instance ( MonadIO m
, HasProtocol e p
, Eq (SessionKey e p)
, Typeable (SessionKey e p)
, Typeable (SessionData e p)
, Hashable (SessionKey e p)
, Show (SessionData e p)
, Show (SessionKey e p)
) => Sessions e p (ResponseM e m) where
fetch i d k f = flip runEngineM (fetch i d k f) =<< asks (view engine)
update d k f = flip runEngineM (update d k f) =<< asks (view engine)
expire k = flip runEngineM (expire k) =<< asks (view engine)
instance ( MonadIO m
, HasProtocol e p
, Eq (SessionKey e p)
, Typeable (SessionKey e p)
, Typeable (SessionData e p)
, Hashable (SessionKey e p)
, Show (SessionData e p)
, Show (SessionKey e p)
) => Sessions e p (EngineM e m) where
fetch upd def k fn = undefined
-- se <- asks (view sessions)
-- w <- liftIO $ Cache.fetchWithCache se k (const $ pure def)
-- when upd (liftIO $ Cache.insert se k def)
-- pure (fn w)
update def k f = undefined
-- se <- asks (view sessions)
-- w <- liftIO $ Cache.fetchWithCache se k (const $ pure def)
-- liftIO $ Cache.insert se k (f w)
fetch upd def k fn = do
se <- asks (view sessions)
let sk = newSKey @(SessionKey e p) k
let ddef = toDyn def
expire k = undefined
-- se <- asks (view sessions)
-- liftIO $ Cache.delete se k
liftIO $ print ("fetch!", show k)
r <- liftIO $ Cache.lookup se sk
case r of
Just v -> pure $ fn $ fromMaybe def (fromDynamic @(SessionData e p) v )
Nothing -> do
when upd $ liftIO $ Cache.insert se sk ddef
pure (fn def)
update def k f = do
se <- asks (view sessions)
val <- fetch @e @p True def k id
liftIO $ print "UPDATE !!!!"
liftIO $ Cache.insert se (newSKey @(SessionKey e p) k) (toDyn (f val))
z <- liftIO $ Cache.lookup se (newSKey k)
liftIO $ print $ ("INSERTED SHIT", z)
expire k = do
se <- asks (view sessions)
liftIO $ Cache.delete se (newSKey @(SessionKey e p) k)
instance (HasProtocol e p, Serialise (Encoded e)) => Response e p (ResponseM e IO) where
@ -163,8 +223,7 @@ newEnv :: forall e bus m . ( Monad m
newEnv p pipe = do
de <- liftIO $ newPipeline defProtoPipelineSize
let se = ()
-- se <- liftIO $ Cache.newCache (Just defCookieTimeout) -- FIXME: some more clever for timeout, i.e. typeclass
se <- liftIO $ Cache.newCache (Just defCookieTimeout) -- FIXME: some more clever for timeout, i.e. typeclass
pure $ EngineEnv Nothing p se pipe de
runPeer :: forall e m a . ( MonadIO m
@ -198,12 +257,14 @@ runPeer env@(EngineEnv {bus = pipe, defer = d}) hh = do
local (set peer (Just pip)) do
ee <- ask
case Map.lookup n disp of
Nothing -> pure ()
Just (AnyProtocol { protoDecode = decoder
, handle = h
}) -> maybe (pure ()) (runResponseM env pip . h) (decoder msg)
}) -> maybe (pure ()) (runResponseM ee pip . h) (decoder msg)
-- FIXME: slow and dumb
instance {-# OVERLAPPABLE #-} (MonadIO m, Num (Cookie e)) => GenCookie e (EngineM e m) where

View File

@ -41,3 +41,8 @@ blockSizeProto getBlockSize evHasBlock =
evHasBlock ( that, h, Just sz )
newtype instance SessionKey e (BlockSize e) =
BlockSizeKey (Hash HbSync)
deriving stock (Typeable,Eq,Show)
deriving newtype (Hashable,IsString)

View File

@ -10,6 +10,7 @@ import GHC.TypeLits
import Data.Proxy
import Data.Hashable
import Control.Monad.IO.Class
import Data.Typeable
-- e -> Transport (like, UDP or TChan)
-- p -> L4 Protocol (like Ping/Pong)
@ -49,10 +50,16 @@ class Request e p (m :: Type -> Type) | p -> e where
-- So it is that it is.
data family SessionKey e p :: Type
data family SessionData e p :: Type
type family SessionData e p :: Type
class ( Monad m
, HasProtocol e p
, Eq (SessionKey e p)
, Hashable (SessionKey e p)
, Typeable (SessionData e p)
-- , Typeable e
-- , Typeable p
) => Sessions e p m | p -> e where

View File

@ -370,6 +370,7 @@ simpleReadLinkRaw ss hash = do
instance ( MonadIO m, IsKey hash
, Hashed hash LBS.ByteString
, Key hash ~ Hash hash
, Block LBS.ByteString ~ LBS.ByteString
)
=> Storage (SimpleStorage hash) hash LBS.ByteString m where

View File

@ -40,6 +40,7 @@ common common-deps
, transformers
, uniplate
, vector
, data-default
common shared-properties
ghc-options:
@ -98,6 +99,20 @@ test-suite test-peer
main-is: PeerMain.hs
test-suite test-hmap
import: shared-properties
import: common-deps
default-language: Haskell2010
other-modules:
build-depends: HMap
, data-default
-- other-extensions:
type: exitcode-stdio-1.0
hs-source-dirs: test
main-is: HmapMain.hs

View File

@ -0,0 +1,76 @@
{-# LANGUAGE ExistentialQuantification, RankNTypes #-}
module Main where
import Data.Typeable
import Data.Dynamic
import Data.Proxy
import Data.Kind
import Prettyprinter
data Key = forall a . (Unfuck a, Eq a) => Key (Proxy a) Dynamic
class Typeable a => Unfuck a where
unfuck :: Proxy a -> Dynamic -> Maybe a
instance Typeable a => Unfuck a where
unfuck _ = fromDynamic @a
newKey :: forall a . (Eq a, Typeable a, Unfuck a) => a -> Key
newKey s = Key (Proxy @a) (toDyn s)
instance Eq Key where
(==) (Key p1 a) (Key p2 b) = unfuck p1 a == unfuck p1 b
main :: IO ()
main = do
let k1 = newKey 22
let k2 = newKey 33
let k3 = newKey "JOPA"
print $ "k1 == k1:" <+> pretty (k1 == k1)
print $ "k2 == k2:" <+> pretty (k2 == k2)
print $ "k1 == k2:" <+> pretty (k1 == k2)
print $ "k3 == k3:" <+> pretty (k3 == k3)
print $ "k3 == k2:" <+> pretty (k3 == k2)
print $ "k3 == k1:" <+> pretty (k3 == k1)
-- _ <- race ( pause ( 60 :: Timeout 'Seconds) ) $ forever $ do
-- let gen = arbitrary @MyKey
-- let n = 100
-- keys <- replicateM 10 (sample' @MyKey gen) <&> mconcat
-- vals <- replicateM 100 (randomIO @Int)
-- let kv = zip keys vals
-- forM_ kv $ \(k,v) -> do
-- m <- readTVarIO tm
-- let z = withKey k id
-- undefined
-- atomically $ writeTVar tm z
-- atomically $ modifyTVar km (k:)
-- kl <- readTVarIO km
-- when (length kl > 1000) $ do
-- let (a,b) = L.splitAt 1000 kl
-- m1 <- readTVarIO tm
-- forM_ b $ \z3 -> do
-- let m2 = withKey z3 $ \z3 -> delete z3 m1
-- pure ()
-- atomically $ writeTVar km b
-- pure ()
pure ()

View File

@ -1,6 +1,7 @@
{-# Language RankNTypes #-}
{-# Language TemplateHaskell #-}
{-# Language AllowAmbiguousTypes #-}
{-# Language UndecidableInstances #-}
module Main where
import HBS2.Prelude.Plated
@ -44,6 +45,9 @@ import System.Exit
import System.FilePath.Posix
import System.IO
import Control.Concurrent
import Data.Default
import Control.Monad.Reader
import Data.Dynamic
import Control.Concurrent.STM
import Control.Concurrent.STM.TQueue qualified as Q
@ -111,6 +115,22 @@ instance HasProtocol Fake (BlockChunks Fake) where
decode = either (const Nothing) Just . deserialiseOrFail
encode = serialise
type instance SessionData Fake (BlockSize Fake) = BlockSizeSession Fake
newtype BlockSizeSession e =
BlockSizeSession
{ _bsBlockSizes :: Map (Peer e) Size
}
makeLenses 'BlockSizeSession
instance Ord (Peer e) => Default (BlockSizeSession e) where
def = BlockSizeSession mempty
deriving stock instance Show (BlockSizeSession Fake)
main :: IO ()
main = do
hSetBuffering stderr LineBuffering
@ -174,9 +194,9 @@ runFakePeer :: forall e b . ( e ~ Fake
-> b
-> EngineM e IO ()
-> IO ()
runFakePeer p bus work = do
runFakePeer p0 bus work = do
env <- newEnv p bus
env <- newEnv p0 bus
se <- emptySessions @e
@ -191,7 +211,7 @@ runFakePeer p bus work = do
let opts = [ StoragePrefix dir
]
storage <- simpleStorageInit opts -- :: IO (SimpleStorage HbSync)
storage <- simpleStorageInit opts :: IO (SimpleStorage HbSync)
w <- liftIO $ async $ simpleStorageWorker storage
@ -213,12 +233,20 @@ runFakePeer p bus work = do
maybe1 sz' (pure ()) $ \sz -> do
let bsz = fromIntegral sz
z <- fetch @e False def (BlockSizeKey h) id
liftIO $ print ("QQQQQ", pretty p0, pretty p, z)
update @e def (BlockSizeKey h) (over bsBlockSizes (Map.insert p bsz))
-- here we cache block size information
updSession se mempty sBlockSizes h (Map.insert p bsz)
updSession se bsz sBlockSize h (const bsz)
debug $ pretty p <+> "has block" <+> pretty h <+> pretty sz'
z <- fetch @e False def (BlockSizeKey h) id
liftIO $ print ("BEBEBE", pretty p0, pretty p, z)
let adapter =
BlockChunksI
{ blkSize = hasBlock storage
@ -305,7 +333,7 @@ test1 = do
fake <- newFakeP2P True
void $ race (pause (2 :: Timeout 'Seconds)) $ do
void $ race (pause (10 :: Timeout 'Seconds)) $ do
let p0 = 0 :: Peer Fake
let p1 = 1 :: Peer Fake
@ -314,10 +342,26 @@ test1 = do
p0Thread <- async $ runFakePeer p0 fake $ do
request p1 (GetBlockSize @Fake (fromString "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ"))
request p1 (GetBlockSize @Fake (fromString "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt"))
let h1 = "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt"
let h0 = "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ"
let h = fromString "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ"
-- fetch @Fake @(BlockSize Fake) True def h id
-- update @Fake @(BlockSize Fake) def (fromString h1) (over bsBlockSizes (Map.insert p1 111))
update @Fake @(BlockSize Fake) def (BlockSizeKey (fromString h0)) (over bsBlockSizes (Map.insert p0 100))
-- request p1 (GetBlockSize @Fake (fromString h1))
request p0 (GetBlockSize @Fake (fromString h0))
se1 <- fetch @Fake @(BlockSize Fake) False def (BlockSizeKey (fromString h0)) id
-- se2 <- fetch @Fake @(BlockSize Fake) False def (fromString h1) id
jopa <- asks (view sessions)
wtf <- liftIO $ Cache.lookup jopa (newSKey @(SessionKey Fake (BlockSize Fake)) (BlockSizeKey (fromString h0)))
pause ( 2 :: Timeout 'Seconds)
liftIO $ print $ (p0, "AAAAAA", se1, fromDynamic @(SessionData Fake (BlockSize Fake)) (fromJust wtf))
-- updateSession cookie (id)
-- se <- getSession cookie (lens)
@ -383,6 +427,8 @@ test1 = do
-- pure ()
pause ( 5 :: Timeout 'Seconds)
mapM_ cancel peerz
(_, e) <- waitAnyCatchCancel peerz