introducing resourcet

This commit is contained in:
Dmitry Zuikov 2023-05-24 12:31:41 +03:00
parent 453b7fd822
commit e9a552d78a
5 changed files with 122 additions and 31 deletions

View File

@ -16,9 +16,9 @@ import HBS2.Prelude.Plated
import HBS2.System.Logger.Simple
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Exception
-- import Control.Concurrent.Async
import Control.Concurrent.STM (flushTQueue,stateTVar)
import Control.Exception (try,Exception,SomeException,throwIO)
import Control.Monad
import Data.Bits
import Data.ByteString.Lazy (ByteString)
@ -35,10 +35,14 @@ import Lens.Micro.Platform
import Network.ByteOrder hiding (ByteString)
import Network.Simple.TCP
import Network.Socket hiding (listen,connect)
import Network.Socket.ByteString.Lazy hiding (send,recv)
-- import Network.Socket.ByteString.Lazy hiding (send,recv)
import Streaming.Prelude qualified as S
import System.Random hiding (next)
import UnliftIO.Async
import UnliftIO.STM
import UnliftIO.Exception qualified as U
data SocketClosedException =
SocketClosedException
deriving stock (Show, Typeable)
@ -142,7 +146,6 @@ readFromSocket sock size = LBS.fromChunks <$> (go size & S.toList_)
go (max 0 (n - nread))
eos = do
debug "SOCKET FUCKING CLOSED!"
liftIO $ throwIO SocketClosedException
connectionId :: Word32 -> Word32 -> Word64
@ -346,14 +349,18 @@ connectPeerTCP env peer = liftIO do
-- REVIEW: так что в итоге? где-то здесь?
shutdown sock ShutdownBoth
-- FIXME: link-all-asyncs
runMessagingTCP :: forall m . MonadIO m => MessagingTCP -> m ()
runMessagingTCP env = liftIO do
own <- toPeerAddr $ view tcpOwnPeer env
let (L4Address _ (IPAddrPort (i,p))) = own
let defs = view tcpDefer env
void $ async $ forever do
mon <- async $ forever do
pause @'Seconds 30
now <- getTimeCoarse
@ -366,7 +373,7 @@ runMessagingTCP env = liftIO do
[] -> Nothing
xs -> Just xs
void $ async $ forever do
con <- async $ forever do
let ev = view tcpDeferEv env
@ -400,7 +407,7 @@ runMessagingTCP env = liftIO do
pure ()
void $ async $ forever do
stat <- async $ forever do
pause @'Seconds 120
ps <- readTVarIO $ view tcpConnPeer env
let peers = HashMap.toList ps
@ -411,28 +418,31 @@ runMessagingTCP env = liftIO do
<+> pretty c
<+> parens ("used:" <+> pretty used)
listen (Host (show i)) (show p) $ \(sock, sa) -> do
debug $ "Listening on" <+> pretty sa
mapM_ link [mon,con,stat]
forever do
void $ acceptFork sock $ \(so, remote) -> do
trace $ "GOT INCOMING CONNECTION FROM"
<+> brackets (pretty own)
<+> brackets (pretty sa)
<+> pretty remote
liftIO (
listen (Host (show i)) (show p) $ \(sock, sa) -> do
debug $ "Listening on" <+> pretty sa
void $ try @SomeException $ do
forever do
void $ acceptFork sock $ \(so, remote) -> do
trace $ "GOT INCOMING CONNECTION FROM"
<+> brackets (pretty own)
<+> brackets (pretty sa)
<+> pretty remote
spawnConnection Server env so remote
void $ try @SomeException $ do
-- gracefulClose so 1000
spawnConnection Server env so remote
-- TODO: probably-cleanup-peer
-- TODO: periodically-drop-inactive-connections
-- gracefulClose so 1000
debug $ "CLOSING CONNECTION" <+> pretty remote
shutdown so ShutdownBoth
close so
-- TODO: probably-cleanup-peer
-- TODO: periodically-drop-inactive-connections
debug $ "CLOSING CONNECTION" <+> pretty remote
shutdown so ShutdownBoth
close so ) `U.finally` mapM_ cancel [mon,con,stat]
traceCmd :: forall a ann b m . ( Pretty a

View File

@ -46,13 +46,15 @@ data MessagingUDP =
getOwnPeer :: MessagingUDP -> Peer L4Proto
getOwnPeer mess = PeerL4 UDP (listenAddr mess)
newMessagingUDPMulticast :: MonadIO m => String -> m (Maybe MessagingUDP)
newMessagingUDPMulticast :: MonadResource m => String -> m (Maybe MessagingUDP)
newMessagingUDPMulticast s = runMaybeT $ do
(host, port) <- MaybeT $ pure $ getHostPort (Text.pack s)
so <- liftIO $ multicastReceiver host port
_ <- register $ close so
liftIO $ setSocketOption so ReuseAddr 1
a <- liftIO $ getSocketName so
@ -62,7 +64,6 @@ newMessagingUDPMulticast s = runMaybeT $ do
<*> newTVarIO so
<*> pure True
close_ so = trace "closing fuckin socket!!" >> close so
newMessagingUDP :: (MonadIO m, MonadResource m) => Bool -> Maybe String -> m (Maybe MessagingUDP)
newMessagingUDP reuse saddr =
@ -74,7 +75,7 @@ newMessagingUDP reuse saddr =
let a = addrAddress l
so <- liftIO $ socket (addrFamily l) (addrSocketType l) (addrProtocol l)
_ <- register $ close_ so
_ <- register $ close so
when reuse $ do
liftIO $ setSocketOption so ReuseAddr 1
@ -88,7 +89,7 @@ newMessagingUDP reuse saddr =
Nothing -> do
so <- liftIO $ socket AF_INET Datagram defaultProtocol
_ <- register $ close_ so
_ <- register $ close so
sa <- liftIO $ getSocketName so
@ -129,7 +130,7 @@ udpWorker env tso = do
-- liftIO $ print $ "recv:" <+> pretty (BS.length msg)
-- atomically $ Q.writeTBQueue (sink env) (From (PeerUDP from), LBS.fromStrict msg)
mapM_ wait [rcvLoop,sndLoop]
void $ waitAnyCatchCancel [rcvLoop,sndLoop]
-- FIXME: stopping

View File

@ -133,7 +133,7 @@ peerPingLoop cfg = do
-- TODO: peer info loop
void $ liftIO $ async $ forever $ withPeerM e $ do
infoLoop <- liftIO $ async $ forever $ withPeerM e $ do
pause @'Seconds 10
pee <- knownPeers @e pl
@ -186,7 +186,7 @@ peerPingLoop cfg = do
expire (PeerInfoKey p)
expire (KnownPeerKey p)
liftIO $ link watch
liftIO $ mapM_ link [watch, infoLoop]
forever do

View File

@ -501,3 +501,47 @@ test-suite test-concurrent-write
, terminal-progress-bar
test-suite test-misc
import: shared-properties
import: common-deps
default-language: Haskell2010
ghc-options:
-threaded
-rtsopts
"-with-rtsopts=-N6 -A64m -AL256m -I0"
other-modules:
-- other-extensions:
type: exitcode-stdio-1.0
hs-source-dirs: test
main-is: TestMisc.hs
build-depends:
base, hbs2-storage-simple, hbs2-core
, async
, bytestring
, cborg
, containers
, directory
, filepath
, hashable
, microlens-platform
, mtl
, prettyprinter
, QuickCheck
, stm
, random
, safe
, serialise
, tasty
, tasty-hunit
, temporary
, timeit
, uniplate
, vector
, terminal-progress-bar

View File

@ -0,0 +1,36 @@
module Main where
import HBS2.Clock
import Control.Concurrent.Async
import UnliftIO.Exception
import System.IO
import Control.Monad
testOne :: IO ()
testOne = do
t1 <- async $ forever $ do
pause @'Seconds 1
print "ONE"
t2 <- async $ forever $ do
pause @'Seconds 2
print "TWO"
link t1
link t2
print "testOne DONE"
pause @'Seconds 10
main = do
hSetBuffering stdout LineBuffering
testOne
pause @'Seconds 30
print "WTF?"
pure ()