mirror of https://github.com/voidlizard/hbs2
Fail fast on errors in peerThread, do not eat exceptions
This commit is contained in:
parent
382cb2c9fc
commit
956ca8638c
|
@ -78,6 +78,7 @@ import System.Exit
|
||||||
import System.IO
|
import System.IO
|
||||||
import System.Metrics
|
import System.Metrics
|
||||||
import Data.Cache qualified as Cache
|
import Data.Cache qualified as Cache
|
||||||
|
import UnliftIO.Exception qualified as U
|
||||||
|
|
||||||
|
|
||||||
-- TODO: write-workers-to-config
|
-- TODO: write-workers-to-config
|
||||||
|
@ -713,45 +714,49 @@ runPeer opts = Exception.handle myException $ do
|
||||||
debug "sending first peer announce"
|
debug "sending first peer announce"
|
||||||
request localMulticast (PeerAnnounce @e pnonce)
|
request localMulticast (PeerAnnounce @e pnonce)
|
||||||
|
|
||||||
let peerThread = W.tell . L.singleton <=< liftIO . async . withPeerM env
|
let peerThread t mx = W.tell . L.singleton =<< (liftIO . async) do
|
||||||
|
withPeerM env mx
|
||||||
|
`U.withException` \e -> case (fromException e) of
|
||||||
|
Just (e' :: AsyncCancelled) -> pure ()
|
||||||
|
Nothing -> err ("peerThread" <+> viaShow t <+> "Failed with" <+> viaShow e)
|
||||||
|
debug $ "peerThread Finished:" <+> t
|
||||||
workers <- W.execWriterT do
|
workers <- W.execWriterT do
|
||||||
|
|
||||||
peerThread $ forever $ do
|
peerThread "local multicast" $ forever $ do
|
||||||
pause defPeerAnnounceTime -- FIXME: setting!
|
pause defPeerAnnounceTime -- FIXME: setting!
|
||||||
debug "sending local peer announce"
|
debug "sending local peer announce"
|
||||||
request localMulticast (PeerAnnounce @e pnonce)
|
request localMulticast (PeerAnnounce @e pnonce)
|
||||||
|
|
||||||
-- peerThread (tcpWorker conf)
|
-- peerThread "tcpWorker" (tcpWorker conf)
|
||||||
|
|
||||||
peerThread (httpWorker conf denv)
|
peerThread "httpWorker " (httpWorker conf denv)
|
||||||
|
|
||||||
peerThread (checkMetrics metrics)
|
peerThread "checkMetrics " (checkMetrics metrics)
|
||||||
|
|
||||||
peerThread (peerPingLoop @e conf)
|
peerThread "peerPingLoop " (peerPingLoop @e conf)
|
||||||
|
|
||||||
peerThread (knownPeersPingLoop @e conf)
|
peerThread "knownPeersPingLoop " (knownPeersPingLoop @e conf)
|
||||||
|
|
||||||
peerThread (bootstrapDnsLoop @e conf)
|
peerThread "bootstrapDnsLoop " (bootstrapDnsLoop @e conf)
|
||||||
|
|
||||||
peerThread (pexLoop @e)
|
peerThread "pexLoop " (pexLoop @e)
|
||||||
|
|
||||||
peerThread (blockDownloadLoop denv)
|
peerThread "blockDownloadLoop " (blockDownloadLoop denv)
|
||||||
|
|
||||||
peerThread fillPeerMeta
|
peerThread "fillPeerMeta" (fillPeerMeta)
|
||||||
|
|
||||||
-- FIXME: clumsy-code
|
-- FIXME: clumsy-code
|
||||||
-- Is it better now ?
|
-- Is it better now ?
|
||||||
when useHttpDownload do
|
when useHttpDownload do
|
||||||
peerThread (blockHttpDownloadLoop denv)
|
peerThread "blockHttpDownloadLoop " (blockHttpDownloadLoop denv)
|
||||||
|
|
||||||
peerThread (postponedLoop denv)
|
peerThread "postponedLoop" (postponedLoop denv)
|
||||||
|
|
||||||
peerThread (downloadQueue conf denv)
|
peerThread "downloadQueue" (downloadQueue conf denv)
|
||||||
|
|
||||||
peerThread (reflogWorker @e conf rwa)
|
peerThread "reflogWorker " (reflogWorker @e conf rwa)
|
||||||
|
|
||||||
peerThread $ forever $ do
|
peerThread "ping pong" $ forever $ do
|
||||||
cmd <- liftIO $ atomically $ readTQueue rpcQ
|
cmd <- liftIO $ atomically $ readTQueue rpcQ
|
||||||
case cmd of
|
case cmd of
|
||||||
POKE -> debug "on poke: alive and kicking!"
|
POKE -> debug "on poke: alive and kicking!"
|
||||||
|
@ -842,7 +847,7 @@ runPeer opts = Exception.handle myException $ do
|
||||||
_ -> pure ()
|
_ -> pure ()
|
||||||
|
|
||||||
|
|
||||||
peerThread do
|
peerThread "all protos" do
|
||||||
runProto @e
|
runProto @e
|
||||||
[ makeResponse (blockSizeProto blk dontHandle onNoBlock)
|
[ makeResponse (blockSizeProto blk dontHandle onNoBlock)
|
||||||
, makeResponse (blockChunksProto adapter)
|
, makeResponse (blockChunksProto adapter)
|
||||||
|
@ -854,7 +859,7 @@ runPeer opts = Exception.handle myException $ do
|
||||||
, makeResponse (peerMetaProto (mkPeerMeta conf))
|
, makeResponse (peerMetaProto (mkPeerMeta conf))
|
||||||
]
|
]
|
||||||
|
|
||||||
void $ liftIO $ waitAnyCatchCancel workers
|
void $ liftIO $ waitAnyCancel workers
|
||||||
|
|
||||||
|
|
||||||
let pokeAction _ = do
|
let pokeAction _ = do
|
||||||
|
@ -973,7 +978,7 @@ runPeer opts = Exception.handle myException $ do
|
||||||
, makeResponse peerAnnounceProto
|
, makeResponse peerAnnounceProto
|
||||||
]
|
]
|
||||||
|
|
||||||
void $ waitAnyCatchCancel $ w <> [udp,loop,rpc,mrpc,ann,messMcast,brainsThread]
|
void $ waitAnyCancel $ w <> [udp,loop,rpc,mrpc,ann,messMcast,brainsThread]
|
||||||
|
|
||||||
simpleStorageStop s
|
simpleStorageStop s
|
||||||
|
|
||||||
|
@ -1100,9 +1105,9 @@ withRPC o cmd = rpcClientMain o $ do
|
||||||
|
|
||||||
_ -> pure ()
|
_ -> pure ()
|
||||||
|
|
||||||
void $ liftIO $ waitAnyCatchCancel [proto]
|
void $ liftIO $ waitAnyCancel [proto]
|
||||||
|
|
||||||
void $ waitAnyCatchCancel [mrpc, prpc]
|
void $ waitAnyCancel [mrpc, prpc]
|
||||||
|
|
||||||
runRpcCommand :: FromStringMaybe (IPAddrPort L4Proto) => RPCOpt -> RPCCommand -> IO ()
|
runRpcCommand :: FromStringMaybe (IPAddrPort L4Proto) => RPCOpt -> RPCCommand -> IO ()
|
||||||
runRpcCommand opt = \case
|
runRpcCommand opt = \case
|
||||||
|
|
|
@ -62,6 +62,7 @@ common common-deps
|
||||||
, http-conduit
|
, http-conduit
|
||||||
, http-types
|
, http-types
|
||||||
, wai-extra
|
, wai-extra
|
||||||
|
, unliftio
|
||||||
|
|
||||||
common shared-properties
|
common shared-properties
|
||||||
ghc-options:
|
ghc-options:
|
||||||
|
|
Loading…
Reference in New Issue