wip, reworking git-related stuff

This commit is contained in:
Dmitry Zuikov 2024-04-11 05:58:54 +03:00
parent 9aafab745d
commit 5cdfe10274
15 changed files with 515 additions and 88 deletions

View File

@ -133,6 +133,21 @@
"type": "github"
}
},
"flake-utils_7": {
"locked": {
"lastModified": 1644229661,
"narHash": "sha256-1YdnJAsNy69bpcjuoKdOYQX0YxZBiCYZo4Twxerqv7k=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "3cecb5b042f7f209c56ffd8371b2711a290ec797",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
},
"haskell-flake-utils": {
"inputs": {
"flake-utils": "flake-utils"
@ -229,6 +244,24 @@
"inputs": {
"flake-utils": "flake-utils_6"
},
"locked": {
"lastModified": 1698938553,
"narHash": "sha256-oXpTKXioqFbl2mhhvpJIAvgNd+wYyv4ekI+YnJHEJ6s=",
"owner": "ivanovs-4",
"repo": "haskell-flake-utils",
"rev": "19b273b5dc401a0a565e7f75cf50a593871b80c9",
"type": "github"
},
"original": {
"owner": "ivanovs-4",
"repo": "haskell-flake-utils",
"type": "github"
}
},
"haskell-flake-utils_7": {
"inputs": {
"flake-utils": "flake-utils_7"
},
"locked": {
"lastModified": 1672412555,
"narHash": "sha256-Kaa8F7nQFR3KuS6Y9WRUxeJeZlp6CCubyrRfmiEsW4k=",
@ -264,6 +297,27 @@
"type": "github"
}
},
"lsm": {
"inputs": {
"haskell-flake-utils": "haskell-flake-utils_6",
"nixpkgs": [
"nixpkgs"
]
},
"locked": {
"lastModified": 1711033804,
"narHash": "sha256-z9cb5yuWfuZmGukxsZebXhc6KUZoPVT60oXxQ6j6ML8=",
"ref": "refs/heads/master",
"rev": "0e8286a43da5b9e54c4f3ecdb994173fe77351db",
"revCount": 26,
"type": "git",
"url": "https://git.hbs2.net/5BCaH95cWsVKBmWaDNLWQr2umxzzT5kqRRKNTm2J15Ls"
},
"original": {
"type": "git",
"url": "https://git.hbs2.net/5BCaH95cWsVKBmWaDNLWQr2umxzzT5kqRRKNTm2J15Ls"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1707451808,
@ -286,6 +340,7 @@
"fixme": "fixme",
"haskell-flake-utils": "haskell-flake-utils_4",
"hspup": "hspup",
"lsm": "lsm",
"nixpkgs": "nixpkgs",
"saltine": "saltine",
"suckless-conf": "suckless-conf_2"
@ -332,7 +387,7 @@
},
"suckless-conf_2": {
"inputs": {
"haskell-flake-utils": "haskell-flake-utils_6",
"haskell-flake-utils": "haskell-flake-utils_7",
"nixpkgs": [
"nixpkgs"
]

View File

@ -18,6 +18,9 @@ inputs = {
db-pipe.url = "git+https://git.hbs2.net/5xrwbTzzweS9yeJQnrrUY9gQJfhJf84pbyHhF2MMmSft";
db-pipe.inputs.nixpkgs.follows = "nixpkgs";
lsm.url = "git+https://git.hbs2.net/5BCaH95cWsVKBmWaDNLWQr2umxzzT5kqRRKNTm2J15Ls";
lsm.inputs.nixpkgs.follows = "nixpkgs";
saltine = {
url = "github:tel/saltine/3d3a54cf46f78b71b4b55653482fb6f4cee6b77d";
flake = false;

View File

@ -105,6 +105,7 @@ library
, HBS2.Net.Messaging.UDP
, HBS2.Net.Messaging.TCP
, HBS2.Net.Messaging.Unix
, HBS2.Net.Messaging.Pipe
, HBS2.Net.Messaging.Stream
, HBS2.Net.Messaging.Encrypted.RandomPrefix
, HBS2.Net.Messaging.Encrypted.ByPass
@ -196,6 +197,7 @@ library
, time
, transformers
, uniplate
, unix
, unordered-containers
, unliftio
, unliftio-core

View File

@ -0,0 +1,100 @@
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE PolyKinds #-}
{-# LANGUAGE NumericUnderscores #-}
module HBS2.Net.Messaging.Pipe where
import HBS2.Prelude.Plated
import HBS2.Net.Proto.Types
import HBS2.Actors.Peer.Types
import HBS2.Net.Messaging
import Control.Concurrent.STM qualified as STM
import Control.Monad.Reader
import Data.ByteString.Builder qualified as B
import Data.ByteString.Lazy (ByteString)
import Data.ByteString.Lazy qualified as LBS
import Data.Hashable
import Network.ByteOrder hiding (ByteString)
import System.IO.Unsafe (unsafePerformIO)
import System.Posix.IO
import UnliftIO
-- define new transport protocol type
data PIPE = PIPE
deriving (Eq,Ord,Show,Generic)
-- address for the new protocol
newtype PipeAddr = PipeAddr Handle
deriving newtype (Eq,Show)
-- the protocol work data
data MessagingPipe =
MessagingPipe
{ pipeIn :: Handle
, pipeOut :: Handle
, inQ :: TQueue ByteString
}
remotePeer :: MessagingPipe -> Peer PIPE
remotePeer = PeerPIPE . PipeAddr . pipeOut
localPeer :: MessagingPipe -> Peer PIPE
localPeer = PeerPIPE . PipeAddr . pipeIn
newMessagingPipe :: MonadIO m => (Handle, Handle) -> m MessagingPipe
newMessagingPipe (pIn,pOut) = do
MessagingPipe pIn pOut
<$> newTQueueIO
instance Hashable PipeAddr where
hashWithSalt salt (PipeAddr pip) = hashWithSalt salt ("pipe-addr", fd)
where
fd = unsafePerformIO (handleToFd pip <&> fromIntegral @_ @Word)
instance HasPeer PIPE where
newtype instance Peer PIPE = PeerPIPE { _fromPeerPipe :: PipeAddr }
deriving stock (Eq,Show,Generic)
deriving newtype (Hashable)
instance Pretty (Peer PIPE) where
pretty (PeerPIPE p) = parens ("pipe" <+> viaShow p)
-- Messaging definition for protocol
instance Messaging MessagingPipe PIPE ByteString where
sendTo bus _ _ msg = liftIO do
LBS.hPutStr (pipeOut bus) (B.toLazyByteString frame <> msg)
hFlush (pipeOut bus)
where
frame = B.word32BE (fromIntegral $ LBS.length msg)
receive bus _ = do
msg <- liftIO $ atomically $ peekTQueue q >> STM.flushTQueue q
for msg $ \m -> pure (From (PeerPIPE (PipeAddr who)), m)
where
q = inQ bus
who = pipeIn bus
runMessagingPipe :: MonadIO m => MessagingPipe -> m ()
runMessagingPipe bus = liftIO do
fix \next -> do
frame <- LBS.hGet who 4 <&> word32 . LBS.toStrict
piece <- LBS.hGet who (fromIntegral frame)
atomically (writeTQueue (inQ bus) piece)
next
where
who = pipeIn bus
instance (MonadIO m, Messaging MessagingPipe PIPE (Encoded PIPE))
=> HasFabriq PIPE (ReaderT MessagingPipe m) where
getFabriq = asks Fabriq
instance MonadIO m => HasOwnPeer PIPE (ReaderT MessagingPipe m) where
ownPeer = asks localPeer

View File

@ -4,6 +4,7 @@ module HBS2.Net.Messaging.Unix
( module HBS2.Net.Messaging.Unix
, module HBS2.Net.Messaging
, module HBS2.Net.Proto.Types
, SocketClosedException
) where
import HBS2.Prelude.Plated
@ -220,12 +221,23 @@ runMessagingUnix env = do
atomically $ writeTVar seen now
next
clientLoop m = fix \next -> do
m
if not (MUDontRetry `elem` msgUnixOpts env) then do
debug "LOOP!"
next
else do
debug "LOOP EXIT"
handleClient | MUDontRetry `elem` msgUnixOpts env = \_ w -> handleAny throwStopped w
| otherwise = handleAny
throwStopped _ = throwIO UnixMessagingStopped
runClient = liftIO $ forever $ handleClient logAndRetry $ flip runContT pure $ do
runClient = liftIO $ clientLoop $ handleClient logAndRetry $ flip runContT pure $ do
debug "HERE WE GO AGAIN!"
let sa = SockAddrUnix (msgUnixSockPath env)
let p = msgUnixSockPath env
@ -335,6 +347,7 @@ runMessagingUnix env = do
pause (msgUnixRetryTime env)
logAndRetry :: SomeException -> IO ()
logAndRetry e = do
warn $ "MessagingUnix. runClient failed, probably server is gone. Retrying:" <+> pretty (msgUnixSelf env)

View File

@ -56,6 +56,8 @@ import System.Exit qualified as Exit
import Data.Cache qualified as Cache
import Data.Cache (Cache)
import System.Exit
{- HLINT ignore "Functor law" -}
@ -158,15 +160,18 @@ withApp cfgPath action = do
setLogging @WARN warnPrefix
setLogging @NOTICE noticePrefix
soname <- detectRPC
`orDie` "can't detect RPC"
fix \next -> do
flip runContT pure do
soname' <- lift detectRPC
soname <- ContT $ maybe1 soname' (pure ())
client <- lift $ race (pause @'Seconds 1) (newMessagingUnix False 1.0 soname)
>>= orThrowUser ("can't connect to" <+> pretty soname)
void $ ContT $ withAsync $ runMessagingUnix client
mess <- ContT $ withAsync $ runMessagingUnix client
peerAPI <- makeServiceCaller @PeerAPI (fromString soname)
refLogAPI <- makeServiceCaller @RefLogAPI (fromString soname)
@ -179,12 +184,13 @@ withApp cfgPath action = do
, Endpoint @UNIX storageAPI
]
void $ ContT $ withAsync $ liftIO $ runReaderT (runServiceClientMulti endpoints) client
mn <- ContT $ withAsync $ liftIO $ runReaderT (runServiceClientMulti endpoints) client
let o = [MUWatchdog 20, MUDontRetry]
let o = [MUWatchdog 20,MUDontRetry]
clientN <- newMessagingUnixOpts o False 1.0 soname
void $ ContT $ withAsync $ runMessagingUnix clientN
notif <- ContT $ withAsync (runMessagingUnix clientN)
sink <- newNotifySink
@ -192,7 +198,7 @@ withApp cfgPath action = do
debug $ red "notify restarted!"
runNotifyWorkerClient sink
void $ ContT $ withAsync $ flip runReaderT clientN $ do
p1 <- ContT $ withAsync $ flip runReaderT clientN $ do
runProto @UNIX
[ makeResponse (makeNotifyClient @(RefLogEvents L4Proto) sink)
]
@ -212,8 +218,17 @@ withApp cfgPath action = do
<*> newTVarIO mempty
<*> newTQueueIO
lift $ runReaderT (runFixerM $ withConfig cfgPath action) env
`finally` do
void $ ContT $ bracket (pure ()) $ \_ -> do
readTVarIO (_listeners env) <&> HM.elems >>= mapM_ cancel
p3 <- ContT $ withAsync $ runReaderT (runFixerM $ withConfig cfgPath action) env
void $ waitAnyCatchCancel [mess,mn,notif,p1,p3]
debug $ red "respawning..."
pause @'Seconds 5
next
setLoggingOff @DEBUG
setLoggingOff @INFO
setLoggingOff @ERROR
@ -232,16 +247,18 @@ data ConfWatch =
| ConfUpdate [Syntax C]
mainLoop :: FixerM IO ()
mainLoop = forever $ do
mainLoop = do
debug "hbs2-fixer. do stuff since 2024"
conf <- getConf
-- debug $ line <> vcat (fmap pretty conf)
flip runContT pure do
debug $ red "Reloading..."
lift $ updateFromConfig conf
void $ ContT $ withAsync $ do
p1 <- ContT $ withAsync $ do
cfg <- asks _configFile `orDie` "config file not specified"
flip fix ConfRead $ \next -> \case
@ -271,7 +288,7 @@ mainLoop = forever $ do
next ConfRead
-- poll reflogs
void $ ContT $ withAsync do
p2 <- ContT $ withAsync do
let w = asks _watchers
>>= readTVarIO
@ -292,15 +309,20 @@ mainLoop = forever $ do
pure ()
jobs <- asks _pipeline
void $ ContT $ withAsync $ forever do
liftIO $ E.try @SomeException (join $ atomically $ readTQueue jobs)
>>= \case
Left e -> err (viaShow e)
_ -> pure ()
p3 <- ContT $ withAsync $ fix \next -> do
r <- liftIO $ E.try @SomeException (join $ atomically $ readTQueue jobs)
case r of
Left e -> do
err (viaShow e)
let ee = fromException @AsyncCancelled e
forever $ pause @'Seconds 60
unless (isJust ee) do
next
_ -> next
void $ waitAnyCatchCancel [p1,p2,p3]
oneSec :: MonadUnliftIO m => m b -> m (Either () b)
oneSec = race (pause @'Seconds 1)

View File

@ -168,3 +168,5 @@ executable git-remote-hbs2
default-language: GHC2021

View File

@ -945,7 +945,7 @@ logMergeProcess penv env q = withPeerM penv do
hd <- MaybeT $ lift $ getHead menv headRef
let quo = view refChanHeadQuorum hd & fromIntegral
guard $ checkACL hd (Just pk) ak
guard $ checkACL ACLUpdate hd (Just pk) ak
pure [(href, (quo,mempty))]
Accept _ box -> do

View File

@ -79,7 +79,7 @@ refChanNotifyProto self adapter msg@(Notify rchan box) = do
let refchanKey = RefChanHeadKey @s rchan
headBlock <- MaybeT $ getActualRefChanHead @e refchanKey
guard $ checkACL headBlock Nothing authorKey
guard $ checkACL ACLNotify headBlock Nothing authorKey
-- FIXME: garbage-collection-required
liftIO $ putBlock sto (serialise msg)

View File

@ -295,7 +295,7 @@ refChanUpdateProto self pc adapter msg = do
let pips = view refChanHeadPeers headBlock
guard $ checkACL headBlock (Just peerKey) authorKey
guard $ checkACL ACLUpdate headBlock (Just peerKey) authorKey
debug $ "OMG!!! TRANS AUTHORIZED" <+> pretty (AsBase58 peerKey) <+> pretty (AsBase58 authorKey)
@ -453,7 +453,7 @@ refChanUpdateProto self pc adapter msg = do
(authorKey, _) <- MaybeT $ pure $ unboxSignedBox0 pbox
-- может, и не надо второй раз проверять
guard $ checkACL headBlock (Just peerKey) authorKey
guard $ checkACL ACLUpdate headBlock (Just peerKey) authorKey
debug $ "JUST GOT TRANSACTION FROM STORAGE! ABOUT TO CHECK IT" <+> pretty hashRef

View File

@ -46,6 +46,9 @@ type RefChanAuthor e = PubKey 'Sign (Encryption e)
type Weight = Integer
data ACLType = ACLUpdate | ACLNotify
deriving stock (Eq,Ord,Generic,Data,Show)
data RefChanHeadBlock e =
RefChanHeadBlockSmall
{ _refChanHeadVersion :: Integer
@ -63,6 +66,16 @@ data RefChanHeadBlock e =
, _refChanHeadReaders' :: HashSet (PubKey 'Encrypt (Encryption e))
, _refChanHeadExt :: ByteString
}
| RefChanHeadBlock2
{ _refChanHeadVersion :: Integer
, _refChanHeadQuorum :: Integer
, _refChanHeadWaitAccept :: Integer
, _refChanHeadPeers :: HashMap (PubKey 'Sign (Encryption e)) Weight
, _refChanHeadAuthors :: HashSet (PubKey 'Sign (Encryption e))
, _refChanHeadReaders' :: HashSet (PubKey 'Encrypt (Encryption e))
, _refChanHeadNotifiers' :: HashSet (PubKey 'Sign (Encryption e))
, _refChanHeadExt :: ByteString
}
deriving stock (Generic)
makeLenses ''RefChanHeadBlock
@ -126,7 +139,25 @@ refChanHeadReaders = lens g s
where
g (RefChanHeadBlockSmall{}) = mempty
g (RefChanHeadBlock1{..}) = _refChanHeadReaders'
g (RefChanHeadBlock2{..}) = _refChanHeadReaders'
s v@(RefChanHeadBlock1{}) x = v { _refChanHeadReaders' = x }
s v@(RefChanHeadBlock2{}) x = v { _refChanHeadReaders' = x }
s x _ = x
refChanHeadNotifiers :: ForRefChans e
=> Lens (RefChanHeadBlock e)
(RefChanHeadBlock e)
(HashSet (PubKey 'Sign (Encryption e)))
(HashSet (PubKey 'Sign (Encryption e)))
refChanHeadNotifiers = lens g s
where
g (RefChanHeadBlockSmall{}) = mempty
g (RefChanHeadBlock1{}) = mempty
g (RefChanHeadBlock2{..}) = _refChanHeadNotifiers'
s v@(RefChanHeadBlock2{}) x = v { _refChanHeadNotifiers' = x }
s x _ = x
instance ForRefChans e => Serialise (RefChanHeadBlock e)
@ -197,20 +228,15 @@ instance Pretty (AsBase58 (PubKey 'Sign s )) => Pretty (RefChanLogKey s) where
instance ForRefChans e => FromStringMaybe (RefChanHeadBlock e) where
-- NOTE: we-dont-support-old-head-formats-anymore
fromStringMay str =
case readers of
[] -> RefChanHeadBlockSmall <$> version
RefChanHeadBlock2 <$> version
<*> quorum
<*> wait
<*> pure (HashMap.fromList peers)
<*> pure (HashSet.fromList authors)
rs -> RefChanHeadBlock1 <$> version
<*> quorum
<*> wait
<*> pure (HashMap.fromList peers)
<*> pure (HashSet.fromList authors)
<*> pure (HashSet.fromList rs)
<*> pure (HashSet.fromList readers)
<*> pure (HashSet.fromList notifiers)
<*> pure mempty
where
@ -231,6 +257,11 @@ instance ForRefChans e => FromStringMaybe (RefChanHeadBlock e) where
| (ListVal [SymbolVal "reader", LitStrVal s] ) <- parsed
]
notifiers = catMaybes [ fromStringMay @(PubKey 'Sign (Encryption e)) (Text.unpack s)
| (ListVal [SymbolVal "notifier", LitStrVal s] ) <- parsed
]
instance (ForRefChans e
, Pretty (AsBase58 (PubKey 'Sign (Encryption e)))
, Pretty (AsBase58 (PubKey 'Encrypt (Encryption e)))
@ -241,16 +272,22 @@ instance (ForRefChans e
<>
parens ("wait" <+> pretty (view refChanHeadWaitAccept blk)) <> line
<>
vcat (fmap peer (HashMap.toList $ view refChanHeadPeers blk)) <> line
lstOf peer (HashMap.toList $ view refChanHeadPeers blk)
<>
vcat (fmap author (HashSet.toList $ view refChanHeadAuthors blk)) <> line
lstOf author (HashSet.toList $ view refChanHeadAuthors blk)
<>
vcat (fmap reader (HashSet.toList $ view refChanHeadReaders blk)) <> line
lstOf reader (HashSet.toList $ view refChanHeadReaders blk)
<>
lstOf notifier (HashSet.toList $ view refChanHeadNotifiers blk)
where
peer (p,w) = parens ("peer" <+> dquotes (pretty (AsBase58 p)) <+> pretty w)
author p = parens ("author" <+> dquotes (pretty (AsBase58 p)))
reader p = parens ("reader" <+> dquotes (pretty (AsBase58 p)))
notifier p = parens ("notifier" <+> dquotes (pretty (AsBase58 p)))
lstOf f e | null e = mempty
| otherwise = vcat (fmap f e) <> line
-- блок головы может быть довольно большой.
@ -335,15 +372,19 @@ getRefChanHead sto k = runMaybeT do
checkACL :: forall e s . (Encryption e ~ s, ForRefChans e)
=> RefChanHeadBlock e
=> ACLType
-> RefChanHeadBlock e
-> Maybe (PubKey 'Sign s)
-> PubKey 'Sign s
-> Bool
checkACL theHead mbPeerKey authorKey = match
checkACL acl theHead mbPeerKey authorKey = match
where
pips = view refChanHeadPeers theHead
aus = view refChanHeadAuthors theHead
notifiers = view refChanHeadNotifiers theHead
match = maybe True (`HashMap.member` pips) mbPeerKey
&& authorKey `HashSet.member` aus
&& ( authorKey `HashSet.member` aus
|| acl == ACLNotify && authorKey `HashSet.member` notifiers
)

View File

@ -947,3 +947,51 @@ executable test-playground
, resourcet
, text-icu >= 0.8.0.3
executable test-pipe-mess
import: shared-properties
default-language: Haskell2010
-- other-extensions:
type: exitcode-stdio-1.0
hs-source-dirs: test
main-is: TestPipeMessaging.hs
build-depends:
base, hbs2-core
, async
, bytestring
, cache
, containers
, directory
, hashable
, microlens-platform
, mtl
, network-byte-order
, prettyprinter
, QuickCheck
, quickcheck-instances
, random
, safe
, serialise
, stm
, streaming
, tasty
, tasty-quickcheck
, tasty-hunit
, tasty-quickcheck
, transformers
, uniplate
, vector
, saltine
, simple-logger
, string-conversions
, filepath
, temporary
, unliftio
, unordered-containers
, unix
, timeit

View File

@ -60,9 +60,9 @@ type instance Output Method2 = ()
instance MonadIO m => HandleMethod m Method2 where
handleMethod _ = pure ()
instance (HasProtocol UNIX (ServiceProto api UNIX), MonadUnliftIO m)
=> HasDeferred UNIX (ServiceProto api UNIX) m where
deferred _ m = void (async m)
-- instance (HasProtocol UNIX (ServiceProto api UNIX), MonadUnliftIO m)
-- => HasDeferred UNIX (ServiceProto api UNIX) m where
-- deferred m = void (async m)
main :: IO ()
main = do

View File

@ -0,0 +1,114 @@
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE PolyKinds #-}
{-# LANGUAGE NumericUnderscores #-}
module Main where
import HBS2.Prelude.Plated
import HBS2.Net.Messaging
import HBS2.Net.Messaging.Pipe
import HBS2.Net.Proto.Service
import HBS2.Actors.Peer
import HBS2.System.Logger.Simple.ANSI
import Data.ByteString.Lazy (ByteString)
import System.Posix.IO
import UnliftIO
import Control.Monad.Trans.Cont
import Control.Monad.Reader
import Codec.Serialise
import Data.Fixed
import System.TimeIt
-- protocol's data
data Ping =
Ping Int
| Pong Int
deriving stock (Eq,Show,Generic)
instance Pretty Ping where
pretty = viaShow
instance Serialise Ping
-- API definition
type MyServiceMethods1 = '[ Ping ]
-- API endpoint definition
type instance Input Ping = Ping
type instance Output Ping = Maybe Ping
-- API handler
instance MonadIO m => HandleMethod m Ping where
handleMethod = \case
Ping n -> pure (Just (Pong n))
Pong _ -> pure Nothing
-- Codec for protocol
instance HasProtocol PIPE (ServiceProto MyServiceMethods1 PIPE) where
type instance ProtocolId (ServiceProto MyServiceMethods1 PIPE) = 0xDEADF00D1
type instance Encoded PIPE = ByteString
decode = either (const Nothing) Just . deserialiseOrFail
encode = serialise
-- Some "deferred" implementation for our monad
-- note -- plain asyncs may cause to resource leak
instance (MonadUnliftIO m, HasProtocol PIPE (ServiceProto api PIPE))
=> HasDeferred (ServiceProto api PIPE) PIPE m where
deferred m = void (async m)
mainLoop :: IO ()
mainLoop = do
flip runContT pure do
-- pipe for server
(i1,o1) <- liftIO $ createPipe
>>= \(i,o) -> (,) <$> fdToHandle i <*> fdToHandle o
-- pipe for client
(i2,o2) <- liftIO $ createPipe
>>= \(i,o) -> (,) <$> fdToHandle i <*> fdToHandle o
-- interwire client and server by pipes
server <- newMessagingPipe (i2,o1)
client <- newMessagingPipe (i1,o2)
-- run messaging workers
void $ ContT $ withAsync $ runMessagingPipe server
void $ ContT $ withAsync $ runMessagingPipe client
-- make server protocol responder
void $ ContT $ withAsync $ flip runReaderT server do
runProto @PIPE
[ makeResponse (makeServer @MyServiceMethods1)
]
-- make client's "caller"
caller <- lift $ makeServiceCaller @MyServiceMethods1 @PIPE (localPeer client)
-- make client's endpoint worker
void $ ContT $ withAsync $ runReaderT (runServiceClient caller) client
let n = 20_000
(a, _) <- timeItT do
for_ [1..n] $ \i -> do
void $ callService @Ping caller (Ping i)
debug $ "sent" <+> pretty n <+> "messages in" <+> pretty (realToFrac a :: Fixed E3) <> "sec"
<> line
<> "rps:" <+> pretty (realToFrac n / realToFrac a :: Fixed E2)
main :: IO ()
main = do
setLogging @DEBUG defLog
mainLoop
`finally` do
setLoggingOff @DEBUG

View File

@ -10,6 +10,10 @@ import Data.ByteString.Lazy qualified as LBS
import Data.ByteString qualified as BS
import Codec.Serialise
import Lens.Micro.Platform
import Control.Monad.Trans.Cont
import Control.Monad
import UnliftIO
-- желаемое поведение: добавить в новую версию A какое-нибудь поле так,
-- что бы предыдущие записи продолжали десериализоваться без этого поля,
@ -65,6 +69,29 @@ test w = case w of
A -> "Match A"
runWithAsync :: IO ()
runWithAsync = do
hSetBuffering stdout LineBuffering
flip runContT pure do
t1 <- ContT $ withAsync do
forever do
print "PIU"
pause @'Seconds 1
q <- ContT $ withAsync do
pause @'Seconds 10
print "FUCKIG QUIT"
pysh <- ContT $ withAsync $ forever do
pause @'Seconds 2
print "PYSHPYSH"
void $ waitAnyCatchCancel [t1,q,pysh]
main :: IO ()
main = do
print "1"