introducing resourcet

This commit is contained in:
Dmitry Zuikov 2023-05-24 10:23:36 +03:00
parent 69abfcf7e5
commit 453b7fd822
6 changed files with 53 additions and 32 deletions

View File

@ -140,6 +140,7 @@ library
, prettyprinter , prettyprinter
, random , random
, random-shuffle , random-shuffle
, resourcet
, safe , safe
, saltine ^>=0.2.0.1 , saltine ^>=0.2.0.1
, serialise , serialise
@ -154,6 +155,7 @@ library
, transformers , transformers
, uniplate , uniplate
, unordered-containers , unordered-containers
, unliftio
hs-source-dirs: lib hs-source-dirs: lib

View File

@ -8,6 +8,8 @@ import HBS2.Net.Messaging
import HBS2.Net.Proto import HBS2.Net.Proto
import HBS2.Prelude.Plated import HBS2.Prelude.Plated
import HBS2.System.Logger.Simple
import Data.Function import Data.Function
import Control.Exception import Control.Exception
import Control.Monad.Trans.Maybe import Control.Monad.Trans.Maybe
@ -27,6 +29,7 @@ import Network.Socket
import Network.Socket.ByteString import Network.Socket.ByteString
import Network.Multicast import Network.Multicast
import Control.Monad.Trans.Resource
-- One address - one peer - one messaging -- One address - one peer - one messaging
@ -59,7 +62,9 @@ newMessagingUDPMulticast s = runMaybeT $ do
<*> newTVarIO so <*> newTVarIO so
<*> pure True <*> pure True
newMessagingUDP :: MonadIO m => Bool -> Maybe String -> m (Maybe MessagingUDP) close_ so = trace "closing fuckin socket!!" >> close so
newMessagingUDP :: (MonadIO m, MonadResource m) => Bool -> Maybe String -> m (Maybe MessagingUDP)
newMessagingUDP reuse saddr = newMessagingUDP reuse saddr =
case saddr of case saddr of
Just s -> do Just s -> do
@ -69,6 +74,8 @@ newMessagingUDP reuse saddr =
let a = addrAddress l let a = addrAddress l
so <- liftIO $ socket (addrFamily l) (addrSocketType l) (addrProtocol l) so <- liftIO $ socket (addrFamily l) (addrSocketType l) (addrProtocol l)
_ <- register $ close_ so
when reuse $ do when reuse $ do
liftIO $ setSocketOption so ReuseAddr 1 liftIO $ setSocketOption so ReuseAddr 1
@ -80,6 +87,9 @@ newMessagingUDP reuse saddr =
Nothing -> do Nothing -> do
so <- liftIO $ socket AF_INET Datagram defaultProtocol so <- liftIO $ socket AF_INET Datagram defaultProtocol
_ <- register $ close_ so
sa <- liftIO $ getSocketName so sa <- liftIO $ getSocketName so
liftIO $ Just <$> ( MessagingUDP sa <$> Q0.newTQueueIO liftIO $ Just <$> ( MessagingUDP sa <$> Q0.newTQueueIO

View File

@ -50,7 +50,7 @@ import ProxyMessaging
import PeerMeta import PeerMeta
import Codec.Serialise import Codec.Serialise
import Control.Concurrent.Async -- import Control.Concurrent.Async
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Exception as Exception import Control.Exception as Exception
import Control.Monad.Reader import Control.Monad.Reader
@ -77,9 +77,14 @@ import Options.Applicative
import System.Directory import System.Directory
import System.Exit import System.Exit
import System.IO import System.IO
import System.Mem
import System.Metrics import System.Metrics
import Data.Cache qualified as Cache import Data.Cache qualified as Cache
import UnliftIO.Exception qualified as U import UnliftIO.Exception qualified as U
-- import UnliftIO.STM
import UnliftIO.Async as U
import Control.Monad.Trans.Resource
-- TODO: write-workers-to-config -- TODO: write-workers-to-config
@ -353,7 +358,7 @@ runCLI = join . customExecParser (prefs showHelpOnError) $
myException :: SomeException -> IO () myException :: SomeException -> IO ()
myException e = die ( show e ) >> exitFailure myException e = err ( show e ) >> notice "RESTARTING..."
newtype CredentialsM e s m a = newtype CredentialsM e s m a =
@ -418,17 +423,20 @@ runPeer :: forall e s . ( e ~ L4Proto
, s ~ Encryption e , s ~ Encryption e
) => PeerOpts -> IO () ) => PeerOpts -> IO ()
runPeer opts = Exception.handle myException $ do runPeer opts = fix \me -> U.handle (\e -> myException e
>> performGC
>> pause @'Seconds 10
>> me
) $ runResourceT do
metrics <- newStore metrics <- liftIO newStore
xdg <- liftIO $ getXdgDirectory XdgData defStorePath <&> fromString
xdg <- getXdgDirectory XdgData defStorePath <&> fromString
conf <- peerConfigRead (view peerConfig opts) conf <- peerConfigRead (view peerConfig opts)
-- let (PeerConfig syn) = conf -- let (PeerConfig syn) = conf
print $ pretty conf liftIO $ print $ pretty conf
let listenConf = cfgValue @PeerListenKey conf let listenConf = cfgValue @PeerListenKey conf
let rpcConf = cfgValue @PeerRpcKey conf let rpcConf = cfgValue @PeerRpcKey conf
@ -462,7 +470,7 @@ runPeer opts = Exception.handle myException $ do
let accptAnn = cfgValue @PeerAcceptAnnounceKey conf :: AcceptAnnounce let accptAnn = cfgValue @PeerAcceptAnnounceKey conf :: AcceptAnnounce
print $ pretty accptAnn liftIO $ print $ pretty accptAnn
-- FIXME: move-peerBanned-somewhere -- FIXME: move-peerBanned-somewhere
let peerBanned p d = do let peerBanned p d = do
@ -476,11 +484,11 @@ runPeer opts = Exception.handle myException $ do
AcceptAnnounceAll -> pure True AcceptAnnounceAll -> pure True
AcceptAnnounceFrom s -> pure $ view peerSignKey d `Set.member` s AcceptAnnounceFrom s -> pure $ view peerSignKey d `Set.member` s
rpcQ <- newTQueueIO @RPCCommand rpcQ <- liftIO $ newTQueueIO @RPCCommand
let ps = mempty let ps = mempty
pc' <- LBS.readFile credFile pc' <- liftIO $ LBS.readFile credFile
<&> parseCredentials @(Encryption e) . AsCredFile <&> parseCredentials @(Encryption e) . AsCredFile
. LBS.toStrict . LBS.toStrict
. LBS.take 4096 . LBS.take 4096
@ -493,9 +501,9 @@ runPeer opts = Exception.handle myException $ do
let blk = liftIO . hasBlock s let blk = liftIO . hasBlock s
w <- replicateM defStorageThreads $ async $ simpleStorageWorker s w <- replicateM defStorageThreads $ async $ liftIO $ simpleStorageWorker s
localMulticast <- (headMay <$> parseAddrUDP (fromString defLocalMulticast) localMulticast <- liftIO $ (headMay <$> parseAddrUDP (fromString defLocalMulticast)
<&> fmap (fromSockAddr @'UDP . addrAddress) ) <&> fmap (fromSockAddr @'UDP . addrAddress) )
`orDie` "assertion: localMulticastPeer not set" `orDie` "assertion: localMulticastPeer not set"
@ -506,19 +514,16 @@ runPeer opts = Exception.handle myException $ do
`orDie` "unable listen on the given addr" `orDie` "unable listen on the given addr"
udp <- async $ runMessagingUDP mess udp <- async $ runMessagingUDP mess
`catch` (\(e::SomeException) -> throwIO e )
udp1 <- newMessagingUDP False rpcSa udp1 <- newMessagingUDP False rpcSa
`orDie` "Can't start RPC listener" `orDie` "Can't start RPC listener"
mrpc <- async $ runMessagingUDP udp1 mrpc <- async $ runMessagingUDP udp1
`catch` (\(e::SomeException) -> throwIO e )
mcast <- newMessagingUDPMulticast defLocalMulticast mcast <- newMessagingUDPMulticast defLocalMulticast
`orDie` "Can't start RPC listener" `orDie` "Can't start RPC listener"
messMcast <- async $ runMessagingUDP mcast messMcast <- async $ runMessagingUDP mcast
`catch` (\(e::SomeException) -> throwIO e )
brains <- newBasicBrains @e conf brains <- newBasicBrains @e conf
@ -535,13 +540,11 @@ runPeer opts = Exception.handle myException $ do
tcpEnv <- newMessagingTCP addr tcpEnv <- newMessagingTCP addr
-- FIXME: handle-tcp-thread-somehow -- FIXME: handle-tcp-thread-somehow
void $ async $ runMessagingTCP tcpEnv void $ async $ runMessagingTCP tcpEnv
`catch` (\(e::SomeException) -> throwIO e )
pure $ Just tcpEnv pure $ Just tcpEnv
proxy <- newProxyMessaging mess tcp proxy <- newProxyMessaging mess tcp
proxyThread <- async $ runProxyMessaging proxy proxyThread <- async $ runProxyMessaging proxy
`catch` (\(e::SomeException) -> throwIO e )
penv <- newPeerEnv (AnyStorage s) (Fabriq proxy) (getOwnPeer mess) penv <- newPeerEnv (AnyStorage s) (Fabriq proxy) (getOwnPeer mess)
@ -562,7 +565,7 @@ runPeer opts = Exception.handle myException $ do
-- debug $ "onNoBlock" <+> pretty p <+> pretty h -- debug $ "onNoBlock" <+> pretty p <+> pretty h
withPeerM penv $ withDownload denv (addDownload mzero h) withPeerM penv $ withDownload denv (addDownload mzero h)
loop <- async do loop <- liftIO $ async do
runPeerM penv $ do runPeerM penv $ do
adapter <- mkAdapter adapter <- mkAdapter
@ -963,7 +966,7 @@ runPeer opts = Exception.handle myException $ do
menv <- newPeerEnv (AnyStorage s) (Fabriq mcast) (getOwnPeer mcast) menv <- newPeerEnv (AnyStorage s) (Fabriq mcast) (getOwnPeer mcast)
ann <- async $ runPeerM menv $ do ann <- liftIO $ async $ runPeerM menv $ do
self <- ownPeer @e self <- ownPeer @e
@ -983,7 +986,7 @@ runPeer opts = Exception.handle myException $ do
void $ waitAnyCancel $ w <> [udp,loop,rpc,mrpc,ann,messMcast,brainsThread] void $ waitAnyCancel $ w <> [udp,loop,rpc,mrpc,ann,messMcast,brainsThread]
simpleStorageStop s liftIO $ simpleStorageStop s
@ -1003,9 +1006,9 @@ rpcClientMain opt action = do
action action
withRPC :: FromStringMaybe (PeerAddr L4Proto) => RPCOpt -> RPC L4Proto -> IO () withRPC :: FromStringMaybe (PeerAddr L4Proto) => RPCOpt -> RPC L4Proto -> IO ()
withRPC o cmd = rpcClientMain o $ do withRPC o cmd = rpcClientMain o $ runResourceT do
hSetBuffering stdout LineBuffering liftIO $ hSetBuffering stdout LineBuffering
conf <- peerConfigRead (view rpcOptConf o) conf <- peerConfigRead (view rpcOptConf o)
@ -1013,7 +1016,7 @@ withRPC o cmd = rpcClientMain o $ do
saddr <- pure (view rpcOptAddr o <|> rpcConf) `orDie` "RPC endpoint not set" saddr <- pure (view rpcOptAddr o <|> rpcConf) `orDie` "RPC endpoint not set"
as <- parseAddrUDP (fromString saddr) <&> fmap (fromSockAddr @'UDP . addrAddress) as <- liftIO $ parseAddrUDP (fromString saddr) <&> fmap (fromSockAddr @'UDP . addrAddress)
let rpc' = headMay $ L.sortBy (compare `on` addrPriority) as let rpc' = headMay $ L.sortBy (compare `on` addrPriority) as
rpc <- pure rpc' `orDie` "Can't parse RPC endpoint" rpc <- pure rpc' `orDie` "Can't parse RPC endpoint"
@ -1022,13 +1025,13 @@ withRPC o cmd = rpcClientMain o $ do
mrpc <- async $ runMessagingUDP udp1 mrpc <- async $ runMessagingUDP udp1
pingQ <- newTQueueIO pingQ <- liftIO newTQueueIO
pokeQ <- newTQueueIO pokeQ <- liftIO newTQueueIO
pokeFQ <- newTQueueIO pokeFQ <- liftIO newTQueueIO
refQ <- newTQueueIO refQ <- liftIO newTQueueIO
let adapter = let adapter =
RpcAdapter dontHandle RpcAdapter dontHandle

View File

@ -38,6 +38,7 @@ common common-deps
, prettyprinter , prettyprinter
, random , random
, random-shuffle , random-shuffle
, resourcet
-- , resolv -- , resolv
, safe , safe
, saltine >=0.2.0.1 , saltine >=0.2.0.1

View File

@ -35,6 +35,7 @@ common common-deps
, QuickCheck , QuickCheck
, random , random
, random-shuffle , random-shuffle
, resourcet
, safe , safe
, serialise , serialise
, split , split
@ -51,6 +52,7 @@ common common-deps
, vector , vector
, prettyprinter-ansi-terminal , prettyprinter-ansi-terminal
, interpolatedstring-perl6 , interpolatedstring-perl6
, unliftio
common shared-properties common shared-properties
ghc-options: ghc-options:

View File

@ -14,7 +14,10 @@ import System.IO
import Lens.Micro.Platform import Lens.Micro.Platform
import Codec.Serialise import Codec.Serialise
import Control.Concurrent.Async -- import Control.Concurrent.Async
import Control.Monad.Trans.Resource
import UnliftIO.Async
type UDP = L4Proto type UDP = L4Proto
@ -81,9 +84,9 @@ instance HasTimeLimits UDP (PingPong UDP) IO where
tryLockForPeriod _ _ = pure True tryLockForPeriod _ _ = pure True
main :: IO () main :: IO ()
main = do main = runResourceT do
hSetBuffering stdout LineBuffering liftIO $ hSetBuffering stdout LineBuffering
hSetBuffering stderr LineBuffering liftIO $ hSetBuffering stderr LineBuffering
udp1 <- newMessagingUDP False (Just "127.0.0.1:10001") `orDie` "Can't start listener on 10001" udp1 <- newMessagingUDP False (Just "127.0.0.1:10001") `orDie` "Can't start listener on 10001"
udp2 <- newMessagingUDP False (Just "127.0.0.1:10002") `orDie` "Can't start listener on 10002" udp2 <- newMessagingUDP False (Just "127.0.0.1:10002") `orDie` "Can't start listener on 10002"