fixed respawn

This commit is contained in:
Dmitry Zuikov 2023-10-26 03:53:44 +03:00
parent e7460213d7
commit 27fd382cc3
1 changed files with 11 additions and 13 deletions

View File

@ -23,9 +23,7 @@ import HBS2.Net.Messaging.TCP
import HBS2.Net.Messaging.Unix import HBS2.Net.Messaging.Unix
import HBS2.Net.Messaging.Encrypted.ByPass import HBS2.Net.Messaging.Encrypted.ByPass
import HBS2.Net.PeerLocator import HBS2.Net.PeerLocator
import HBS2.Net.Proto as Proto
import HBS2.Net.Proto.Definition import HBS2.Net.Proto.Definition
import HBS2.Net.Proto.Event.PeerExpired
import HBS2.Net.Proto.Peer import HBS2.Net.Proto.Peer
import HBS2.Net.Proto.PeerAnnounce import HBS2.Net.Proto.PeerAnnounce
import HBS2.Net.Proto.PeerExchange import HBS2.Net.Proto.PeerExchange
@ -84,16 +82,13 @@ import Crypto.Saltine (sodiumInit)
import Data.ByteString.Lazy qualified as LBS import Data.ByteString.Lazy qualified as LBS
import Data.ByteString qualified as BS import Data.ByteString qualified as BS
import Data.Cache qualified as Cache import Data.Cache qualified as Cache
import Data.HashSet qualified as HashSet
import Data.List qualified as L import Data.List qualified as L
import Data.Map (Map) import Data.Map (Map)
import Data.Map qualified as Map import Data.Map qualified as Map
import Data.Maybe import Data.Maybe
import Data.Set qualified as Set import Data.Set qualified as Set
import Data.Set (Set) import Data.Set (Set)
import Data.Time.Clock (UTCTime, NominalDiffTime, diffUTCTime, getCurrentTime)
import Data.Time.Clock.POSIX import Data.Time.Clock.POSIX
import Data.Time.LocalTime
import Data.Time.Format import Data.Time.Format
import Lens.Micro.Platform as Lens import Lens.Micro.Platform as Lens
import Network.Socket import Network.Socket
@ -114,6 +109,11 @@ import UnliftIO.Async as U
import Control.Monad.Trans.Resource import Control.Monad.Trans.Resource
import Streaming.Prelude qualified as S import Streaming.Prelude qualified as S
data GoAgainException = GoAgainException
deriving (Eq,Ord,Show,Typeable)
instance Exception GoAgainException
-- TODO: write-workers-to-config -- TODO: write-workers-to-config
defStorageThreads :: Integral a => a defStorageThreads :: Integral a => a
defStorageThreads = 4 defStorageThreads = 4
@ -565,6 +565,8 @@ runPeer opts = U.handle (\e -> myException e
>> respawn opts >> respawn opts
) $ runResourceT do ) $ runResourceT do
myself <- liftIO myThreadId
metrics <- liftIO newStore metrics <- liftIO newStore
xdg <- liftIO $ getXdgDirectory XdgData defStorePath <&> fromString xdg <- liftIO $ getXdgDirectory XdgData defStorePath <&> fromString
@ -881,7 +883,6 @@ runPeer opts = U.handle (\e -> myException e
Just (e' :: AsyncCancelled) -> pure () Just (e' :: AsyncCancelled) -> pure ()
Nothing -> do Nothing -> do
err ("peerThread" <+> viaShow t <+> "Failed with" <+> viaShow e) err ("peerThread" <+> viaShow t <+> "Failed with" <+> viaShow e)
throwM e -- threadSelf (SomeException e)
debug $ "peerThread Finished:" <+> t debug $ "peerThread Finished:" <+> t
@ -924,7 +925,6 @@ runPeer opts = U.handle (\e -> myException e
, makeResponse (blockChunksProto adapter) , makeResponse (blockChunksProto adapter)
, makeResponse blockAnnounceProto , makeResponse blockAnnounceProto
, makeResponse (withCredentials @e pc . peerHandShakeProto hshakeAdapter penv) , makeResponse (withCredentials @e pc . peerHandShakeProto hshakeAdapter penv)
-- , makeResponse (withCredentials @e pc . encryptionHandshakeProto encryptionHshakeAdapter)
, makeResponse peerExchangeProto , makeResponse peerExchangeProto
, makeResponse refLogUpdateProto , makeResponse refLogUpdateProto
, makeResponse (refLogRequestProto reflogReqAdapter) , makeResponse (refLogRequestProto reflogReqAdapter)
@ -936,6 +936,7 @@ runPeer opts = U.handle (\e -> myException e
] ]
void $ liftIO $ waitAnyCancel workers void $ liftIO $ waitAnyCancel workers
liftIO $ throwTo myself GoAgainException
let refChanHeadPostAction href = do let refChanHeadPostAction href = do
void $ liftIO $ withPeerM penv $ do void $ liftIO $ withPeerM penv $ do
@ -1029,7 +1030,6 @@ runPeer opts = U.handle (\e -> myException e
} }
m1 <- async $ runMessagingUnix rpcmsg m1 <- async $ runMessagingUnix rpcmsg
link m1
rpcProto <- async $ flip runReaderT rpcctx do rpcProto <- async $ flip runReaderT rpcctx do
runProto @UNIX runProto @UNIX
@ -1039,10 +1039,6 @@ runPeer opts = U.handle (\e -> myException e
, makeResponse (makeServer @StorageAPI) , makeResponse (makeServer @StorageAPI)
] ]
link proxyThread
link rpcProto
link loop
void $ waitAnyCancel $ w <> [ udp void $ waitAnyCancel $ w <> [ udp
, loop , loop
, m1 , m1
@ -1054,8 +1050,10 @@ runPeer opts = U.handle (\e -> myException e
] ]
liftIO $ simpleStorageStop s liftIO $ simpleStorageStop s
pause @'Seconds 1
-- we want to clean up all resources
throwM GoAgainException
emitToPeer :: ( MonadIO m emitToPeer :: ( MonadIO m
, EventEmitter e a (PeerM e IO) , EventEmitter e a (PeerM e IO)