From 51597c58cbbe5db2391a8b702ab77ed1e65d582b Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Fri, 15 Sep 2023 19:40:03 +0300 Subject: [PATCH] Revert "merged" This reverts commit eeb2735c4dc10409ba996b37cf83ba2fb178d392. --- .fixme/log | 3 + .hlint.yaml | 6 - Makefile | 3 - cabal.project | 1 - examples/raft-algo/LICENSE | 0 examples/raft-algo/app/RaftAlgoMain.hs | 5 - examples/raft-algo/lib/RaftAlgo/Proto.hs | 836 ------------------ examples/raft-algo/raft-algo.cabal | 244 ----- examples/raft-algo/test/RaftAlgoProtoTest.hs | 33 - hbs2-core/hbs2-core.cabal | 11 +- hbs2-core/lib/{HBS2/Net => }/Dialog/Client.hs | 13 +- hbs2-core/lib/Dialog/Core.hs | 442 +++++++++ .../lib/{HBS2/Net => }/Dialog/Helpers/List.hs | 2 +- .../Net => }/Dialog/Helpers/Streaming.hs | 2 +- hbs2-core/lib/HBS2/Actors/Peer.hs | 32 +- hbs2-core/lib/HBS2/Data/Types/Refs.hs | 2 +- hbs2-core/lib/HBS2/Net/Auth/AccessKey.hs | 1 - hbs2-core/lib/HBS2/Net/Auth/Credentials.hs | 6 + hbs2-core/lib/HBS2/Net/Dialog/Core.hs | 831 ----------------- hbs2-core/lib/HBS2/Net/Proto/Dialog.hs | 62 +- .../lib/HBS2/Net/Proto/EncryptionHandshake.hs | 12 +- hbs2-core/lib/HBS2/Net/Proto/Peer.hs | 34 - hbs2-core/lib/HBS2/Net/Proto/Types.hs | 6 +- hbs2-core/test/DialogSpec.hs | 8 +- hbs2-peer/app/EncryptionKeys.hs | 11 +- hbs2-peer/app/PeerMain.hs | 82 +- hbs2-peer/app/PeerMain/Dialog/Server.hs | 178 ---- hbs2-peer/app/PeerMain/Dialog/Spec.hs | 35 - hbs2-peer/app/PeerMain/DialogCliCommand.hs | 106 ++- hbs2-peer/app/PeerMain/PeerDialog.hs | 39 + hbs2-peer/app/PeerTypes.hs | 6 + hbs2-peer/app/ProxyMessaging.hs | 40 +- hbs2-peer/hbs2-peer.cabal | 3 +- 33 files changed, 679 insertions(+), 2416 deletions(-) delete mode 100644 .hlint.yaml delete mode 100644 examples/raft-algo/LICENSE delete mode 100644 examples/raft-algo/app/RaftAlgoMain.hs delete mode 100644 examples/raft-algo/lib/RaftAlgo/Proto.hs delete mode 100644 examples/raft-algo/raft-algo.cabal delete mode 100644 examples/raft-algo/test/RaftAlgoProtoTest.hs rename hbs2-core/lib/{HBS2/Net => }/Dialog/Client.hs (93%) create mode 100644 hbs2-core/lib/Dialog/Core.hs rename hbs2-core/lib/{HBS2/Net => }/Dialog/Helpers/List.hs (91%) rename hbs2-core/lib/{HBS2/Net => }/Dialog/Helpers/Streaming.hs (97%) delete mode 100644 hbs2-core/lib/HBS2/Net/Dialog/Core.hs delete mode 100644 hbs2-peer/app/PeerMain/Dialog/Server.hs delete mode 100644 hbs2-peer/app/PeerMain/Dialog/Spec.hs create mode 100644 hbs2-peer/app/PeerMain/PeerDialog.hs diff --git a/.fixme/log b/.fixme/log index e69de29b..bf7ca57e 100644 --- a/.fixme/log +++ b/.fixme/log @@ -0,0 +1,3 @@ + +(fixme-set "assigned" "HPoqtobDAT" "voidlizard") +(fixme-set "workflow" "test" "HPoqtobDAT") \ No newline at end of file diff --git a/.hlint.yaml b/.hlint.yaml deleted file mode 100644 index 066e659c..00000000 --- a/.hlint.yaml +++ /dev/null @@ -1,6 +0,0 @@ -- ignore: {name: "Use infix"} # 2 hints -- ignore: {name: "Use let"} # 4 hints -- ignore: {name: "Use newtype instead of data"} # 4 hints -- ignore: {name: "Use print"} # 1 hint -- ignore: {name: "Redundant bracket Found"} # 1 hint -- ignore: {name: "Redundant $"} # 1 hint diff --git a/Makefile b/Makefile index b923cd36..b8d14084 100644 --- a/Makefile +++ b/Makefile @@ -18,6 +18,3 @@ build: test-core: > nix develop -c cabal run hbs2-core:test -.PHONY: test-raft -test-raft: -> nix develop -c ghcid -c 'cabal repl' raft-algo -T RaftAlgo.Proto.devTest diff --git a/cabal.project b/cabal.project index d37766c9..4ad57f5d 100644 --- a/cabal.project +++ b/cabal.project @@ -1,5 +1,4 @@ packages: **/*.cabal - examples/*/*.cabal -- allow-newer: all diff --git a/examples/raft-algo/LICENSE b/examples/raft-algo/LICENSE deleted file mode 100644 index e69de29b..00000000 diff --git a/examples/raft-algo/app/RaftAlgoMain.hs b/examples/raft-algo/app/RaftAlgoMain.hs deleted file mode 100644 index e73253c9..00000000 --- a/examples/raft-algo/app/RaftAlgoMain.hs +++ /dev/null @@ -1,5 +0,0 @@ -module Main where - -main :: IO () -main = do - pure () diff --git a/examples/raft-algo/lib/RaftAlgo/Proto.hs b/examples/raft-algo/lib/RaftAlgo/Proto.hs deleted file mode 100644 index 9da001fe..00000000 --- a/examples/raft-algo/lib/RaftAlgo/Proto.hs +++ /dev/null @@ -1,836 +0,0 @@ --- {-# Language AllowAmbiguousTypes #-} --- {-# LANGUAGE PolyKinds #-} -{-# LANGUAGE StrictData #-} -module RaftAlgo.Proto where - -import HBS2.Prelude.Plated - -import Control.Applicative -import Control.Arrow -import Control.Monad -import Control.Monad.Cont -import Control.Monad.Error.Class -import Control.Monad.State.Strict -import Control.Monad.Trans.Except -import Control.Monad.Trans.Maybe -import Control.Monad.Writer as W -import Data.Either -import Data.Foldable qualified as F -import Data.Function -import Data.Generics.Labels -import Data.Generics.Product -import Data.Generics.Sum -import Data.Heap (Heap) -import Data.Heap qualified as Heap -import Data.Kind -import Data.List qualified as List -import Data.Map (Map) -import Data.Map.Strict qualified as Map -import Data.Maybe -import Data.Ord -import Data.Sequence (Seq(..)) -import Data.Sequence qualified as Seq -import Data.Set (Set) -import Data.Set qualified as Set -import Data.Text qualified as T -import Data.Time -import Data.Time.Calendar.OrdinalDate -import Data.Void -import Lens.Micro.Platform as Lens -import Numeric.Natural -import Streaming -import Streaming.Prelude qualified as Streaming -import System.Random.MWC as MWC -import System.Random.Stateful - - -newtype Term = Term { unTerm :: Int } - deriving (Eq, Ord, Num, Enum, Show) - --- | `p` - identity of a node --- `a` - payload (reflog hash for example) -data LogEntry p a = LogEntry - { entryTerm :: Term - , entryStateMachineCommand :: StateMachineCommand p a - } - deriving (Generic, Show) - -data StateMachineCommand p a - = SMCLoad a - -- | SMAddNode p - -- | SMDropNode p - deriving (Generic, Show) - --- Должно сохраняться надёжно: --- currentTerm --- votedFor --- log[] -data PersMethods p a m = PersMethods - { getCurrentTerm :: m Term - , setCurrentTerm :: Term -> m () - -- - , getVotedFor :: m (Maybe p) - , setVotedFor :: Maybe p -> m () - -- - -- log[] .. - , getLogEntry :: Int -> m (Maybe (LogEntry p a)) - , setLogEntry :: Int -> LogEntry p a -> m () - , getLogEntriesFrom :: Int -> m [LogEntry p a] - , getLastLogIndex :: m Int - -- - , getOurID :: m p - } - -getLastLogIndexNTerm :: Monad m => PersMethods p a m -> m (Int, Term) -getLastLogIndexNTerm PersMethods{..} = do - lastLogIndex <- getLastLogIndex - (lastLogIndex, ) <$> - if lastLogIndex > 0 - then maybe 0 entryTerm <$> getLogEntry lastLogIndex - else pure 0 - -hoistMethods :: - (forall a. m a -> n a) - -> PersMethods p a m - -> PersMethods p a n -hoistMethods nt h = PersMethods - { getCurrentTerm = nt $ getCurrentTerm h - , setCurrentTerm = nt . setCurrentTerm h - -- - , getVotedFor = nt $ getVotedFor h - , setVotedFor = nt . setVotedFor h - -- - , getLogEntry = nt . getLogEntry h - , setLogEntry = \j le -> nt $ setLogEntry h j le - , getLogEntriesFrom = nt . getLogEntriesFrom h - , getLastLogIndex = nt $ getLastLogIndex h - -- - , getOurID = nt $ getOurID h - } - -data RaftState p = RaftState - { commitIndex :: Int - , lastAppliedIndex :: Int - , actorState :: ActorState p - , cluster :: Set p - } - deriving (Generic, Show) - -data ActorState p - = StateFollower - | StateCandidate - { votesCollected :: Set p - } - | StateLeader - { nextIndex :: Map p Int - , matchIndex :: Map p Int - } - deriving (Generic, Eq, Ord, Show) - ---- - -data RequestVote p = RequestVote - { requestVoteTerm :: Term - , requestVoteCandidateID :: p - , requestVoteCandidateLastLogIndex :: Int - , requestVoteCandidateLastLogTerm :: Term - } - deriving (Generic, Show) - -data AppendEntries p a = AppendEntries - { appendEntriesTerm :: Term - , appendEntriesLeaderID :: p - , appendEntriesPrevLogIndex :: Int - , appendEntriesPrevLogTerm :: Term - , appendEntriesES :: [LogEntry p a] - , appendEntriesLeaderCommitIndex :: Int - } - deriving (Generic, Show) - -data ProcCall p a - = CallRequestVote (RequestVote p) - | RequestVoteReply - { requestVoteReplyTerm :: Term - , requestVoteReplyGranted :: Bool - , requestVoteReplyFromID :: p - } - | CallAppendEntries (AppendEntries p a) - | AppendEntriesReply - { appendEntriesReplyTerm :: Term - , appendEntriesReplySuccess :: Bool - , appendEntriesReplyFromID :: p - } - deriving (Generic, Show) - - -data NodeEvent p a - = ElectionTimeoutElapses - | LeaderHeartbeat - | GotProcCall (ProcCall p a) - | GotClientCall (StateMachineCommand p a) - deriving (Show) - -data NodeAction p a - = ResetElectionTimer - | ResetLeaderHeartbeat - - | UpdateState (RaftState p -> RaftState p) - | SwitchRole (ActorState p) - - | LogMessage Text - | CallProc p (ProcCall p a) - | ReplyProc p (ProcCall p a) - - | ReplyToClientWhoIsLeader (Maybe p) - ---- --- Test cluster runner - -data TestLogState p a = TestLogState - { testLogStateCurrentTerm :: Term - , testLogStateVotedFor :: Maybe p - , testLogStateLog :: Seq (LogEntry p a) - , testLogStateRaftState :: RaftState p - , testLogStateNodeID :: p - } - deriving (Generic, Show) - -newtype NodeID a = NodeID a - deriving newtype (Show, Eq, Ord, Num, Enum) -type NodeIDInt = NodeID Int - -newtype TestData = TestData Text - deriving newtype (Show, Eq, Ord, IsString) - -data TestEvent t = TestEvent - { testEventTime :: t - , testEvent :: NodeEvent NodeIDInt TestData - } - deriving (Generic) - -data TestClusterState = TestClusterState - { testClusterNodes :: Map NodeIDInt (TestLogState NodeIDInt TestData) - , testClusterEvents :: Heap (Heap.Entry UTCTime ( - Either - (NodeEvent NodeIDInt TestData, [TestEvent UTCTime]) - (NodeIDInt, NodeEvent NodeIDInt TestData) - )) - , testClusterElectionTimeouts :: Map NodeIDInt Int - } - deriving (Generic) - -devTest :: IO () -devTest = do - evalTestCluster 5 commands $ Streaming.take 142 >>> Streaming.mapM display >>> Streaming.effects - where - display = List.uncons >>> mapM_ \(x,xs') -> liftIO do - putStrLn (T.unpack x) - forM_ xs' \x' -> do - putStr " " - putStrLn (T.unpack x') - putStrLn "" - - commands :: [TestEvent NominalDiffTime] - commands = [ - TestEvent 9 (GotClientCall (SMCLoad "Tx1")) - , TestEvent 1 (GotClientCall (SMCLoad "Tx2")) - , TestEvent 3 (GotClientCall (SMCLoad "Tx3")) - , TestEvent 2 (GotClientCall (SMCLoad "Tx4")) - ] - -evalTestCluster :: - ( Monad m - , m' ~ StateT TestClusterState (StateT StdGen m) - ) - => Int - -> [TestEvent NominalDiffTime] - -> (Stream (Of [Text]) m' () -> m' ()) - -> m () -evalTestCluster clusterSize cevs sf = - runStateGenT_ (mkStdGen randomSeed) \g -> do - - initialNodeTimeouts <- forM nodeIDs \p -> do - dt <- realToFrac @Double <$> MWC.uniformRM (minElectionTimeout, maxElectionTimeout) g - pure $ Heap.Entry (addUTCTime dt timeBegin) (Right (p, ElectionTimeoutElapses)) - - evalStateT ( - sf $ - fix \go -> - lift (clusterStep g) >>= either - (\e -> Streaming.each [[e]]) - (\es -> do - Streaming.each [es] - go - ) - ) - TestClusterState - { testClusterNodes = Map.fromList $ nodeIDs <&> (id &&& mkTestNodeState (Set.fromList nodeIDs)) - , testClusterEvents = Heap.fromList $ tcevs <> initialNodeTimeouts - , testClusterElectionTimeouts = mempty - } - - where - - randomSeed = 1 - - minElectionTimeout :: Double - minElectionTimeout = 5 - maxElectionTimeout = minElectionTimeout * 2 - - heartbeatPeriod :: NominalDiffTime - heartbeatPeriod = realToFrac (minElectionTimeout * 3 / 4) - - nodeIDs = fromIntegral <$> [1..clusterSize] - - timeBegin :: UTCTime - timeBegin = UTCTime (YearDay 2000 1) 0 - - tcevs = case integrateTestEventTimes timeBegin cevs of - [] -> [] - (TestEvent t1 ev1):clientEvents -> - [Heap.Entry t1 (Left (ev1, clientEvents))] - - integrateTestEventTimes :: UTCTime -> [TestEvent NominalDiffTime] -> [TestEvent UTCTime] - integrateTestEventTimes t0 = flip evalState t0 . mapM \ev -> do - upd <- addUTCTime (ev ^. #testEventTime) <$> get - put upd - pure $ ev & #testEventTime .~ upd - - mkTestNodeState :: Set NodeIDInt -> NodeIDInt -> TestLogState NodeIDInt TestData - mkTestNodeState allNodeIDs nodeID = TestLogState - { testLogStateCurrentTerm = 0 - , testLogStateVotedFor = Nothing - , testLogStateLog = mempty - , testLogStateRaftState = - RaftState - { commitIndex = 0 - , lastAppliedIndex = 0 - , actorState = StateFollower - , cluster = allNodeIDs - } - , testLogStateNodeID = nodeID - } - - clusterStep :: (Monad m, StatefulGen (StateGenM StdGen) m) - => StateGenM StdGen -> StateT TestClusterState m (Either Text [Text]) - clusterStep g = runExceptT do - (Heap.Entry tnow tev, heapRest) <- justOrThrowError "no events" - $ gets (Heap.uncons . view #testClusterEvents) - #testClusterEvents .= heapRest - - case tev of - -- Запрос от клиента - Left (ev, tevs) -> do - case tevs of - (TestEvent tnext evnext):tevs' -> do - #testClusterEvents %= mappend do - Heap.singleton (Heap.Entry tnext (Left (evnext, tevs'))) - [] -> pure () - clusterSize <- gets (Map.size . view #testClusterNodes) - targetNodeID <- lift . lift $ fromIntegral <$> MWC.uniformRM (1, clusterSize) g - runEvent tnow targetNodeID ev - - -- Событие от ноды для ноды nodeID - Right (nodeID, ev :: NodeEvent NodeIDInt TestData) -> - runEvent tnow nodeID ev - - - where - runEvent tnow nodeID ev = fromMaybe [] <$> runMaybeT do - - case ev of - ElectionTimeoutElapses -> do - - x <- #testClusterElectionTimeouts . Lens.at nodeID . non 1 <%= pred - when (x > 0) do - fail "old timeout droped" - - _ -> pure () - - lift do - nodeState :: TestLogState NodeIDInt TestData - <- justOrThrowError ("no state for node " <> showt nodeID) - $ gets $ preview (#testClusterNodes . ix nodeID) - - let testactions :: [TestNodeAction] - (testactions, nodeState') = flip runState nodeState (nodeTestStep ev) - - #testClusterNodes . ix nodeID .= nodeState' - - -- pure (Just nodeID, ev, mempty, testactions) - - -- [(NominalDiffTime, (NodeIDInt, NodeEvent NodeIDInt TestData))] - (newevents, log :: [Text]) <- W.runWriterT $ catMaybes <$> forM testactions \case - - TestResetElectionTimer -> do - dt <- lift . lift . lift $ realToFrac @Double - <$> MWC.uniformRM (minElectionTimeout, maxElectionTimeout) g - -- Как сбрасывать предыдущие таймеры? - -- В словарике testClusterElectionTimeouts по ключам - -- будем считать сколько добавлено новых - -- TestResetElectionTimer Событие кидать только если - -- оно соответствует единственной оставшейся записи. - x <- #testClusterElectionTimeouts . Lens.at nodeID . non 1 <<%= succ - -- W.tell [showt x] - pure $ Just $ - (dt, (nodeID, ElectionTimeoutElapses)) - - TestResetLeaderHeartbeat -> - pure $ Just $ - (heartbeatPeriod, (nodeID, LeaderHeartbeat)) - - TestApplyLogEntryToSM t -> pure Nothing - - TestLogMessage t -> pure Nothing - - TestCallProc nodeID pcall -> do - dt <- lift . lift . lift $ realToFrac @Double - <$> MWC.uniformRM (0.01, 0.1) g - pure $ Just $ - (dt, (nodeID, GotProcCall pcall)) - - TestReplyProc nodeID pcall -> do - dt <- lift . lift . lift $ realToFrac @Double - <$> MWC.uniformRM (0.01, 0.1) g - pure $ Just $ - (dt, (nodeID, GotProcCall pcall)) - - TestReplyToClientWhoIsLeader mnodeID -> forM mnodeID \nodeID -> do - dt <- lift . lift . lift $ realToFrac @Double - <$> MWC.uniformRM (0.3, 1) g - pure (dt, (nodeID, ev)) - - #testClusterEvents %= mappend do - Heap.fromList (newevents <&> \(t',ev') -> Heap.Entry (addUTCTime t' tnow) (Right ev')) - - nodeStates <- gets (view #testClusterNodes) - - -- pure [showt (tnow, nodeID, ev, "->", testactions ^.. traverse . _Ctor @"TestLogMessage")] - pure $ ([showt $ (tnow, nodeID, ev)] - <> (showt <$> (testactions ^.. traverse)) - <> log - <> ["states:"] - <> (showt <$> Map.elems (nodeStates)) - ) - -showt :: Show a => a -> Text -showt = T.pack . show - -data TestNodeAction - = TestResetElectionTimer - | TestResetLeaderHeartbeat - | TestApplyLogEntryToSM TestData - | TestLogMessage Text - | TestCallProc NodeIDInt (ProcCall NodeIDInt TestData) - | TestReplyProc NodeIDInt (ProcCall NodeIDInt TestData) - | TestReplyToClientWhoIsLeader (Maybe NodeIDInt) - deriving (Generic, Show) - -nodeTestStep :: forall m. (Monad m) - => NodeEvent NodeIDInt TestData - -> StateT (TestLogState NodeIDInt TestData) m [TestNodeAction] -nodeTestStep ev = do - - let h :: PersMethods NodeIDInt TestData (StateT (TestLogState NodeIDInt TestData) m) - h = PersMethods - { getCurrentTerm = gets (view #testLogStateCurrentTerm) - , setCurrentTerm = (#testLogStateCurrentTerm .=) - -- - , getVotedFor = gets (view #testLogStateVotedFor) - , setVotedFor = (#testLogStateVotedFor .=) - -- - , getLogEntry = \j -> gets $ Seq.lookup (j-1) . view #testLogStateLog - , setLogEntry = \j le -> #testLogStateLog %= \s -> do - when (Seq.length s < (j-1)) do - error "Raft algo error. Trying to set log element after missed elements" - Seq.take (j-1) s Seq.|> le - - , getLogEntriesFrom = \j -> gets (F.toList . Seq.drop (j-1) . (view #testLogStateLog)) - , getLastLogIndex = gets (Seq.length . view #testLogStateLog) - -- - , getOurID = gets (view #testLogStateNodeID) - } - - rstate <- gets (view #testLogStateRaftState) - (rstate', actions) <- do - flip runStateT mempty do - flip execStateT rstate do - flip runContT absurd do - actions :: [TestNodeAction] <- testNodeReact (hoistMethods (lift . lift) h) ev - lift . lift $ modify (actions <>) - -- lift . lift . modify . (<>) =<< testNodeReact (hoistMethods (lift . lift) h) ev - ContT \k -> pure () - #testLogStateRaftState .= rstate' - pure (reverse actions) - - where - seqLast :: Seq a -> a - seqLast = \case - (Seq.viewr -> _ Seq.:> x) -> x - _ -> error "no last element in empty sequence" - -testNodeReact :: (MonadState (RaftState p) m, p ~ NodeIDInt, a ~ TestData) - => PersMethods p a m -> NodeEvent p a -> ContT () m [TestNodeAction] -testNodeReact h ev = do - x <- nodeReact h ev =<< get - ContT \k -> do - case x of - ResetElectionTimer -> k [TestResetElectionTimer] - ResetLeaderHeartbeat -> k [TestResetLeaderHeartbeat] - - UpdateState f -> id %= f - SwitchRole st -> do - #actorState .= st - k [TestLogMessage ("Switch role to " <> showt st)] - - LogMessage msg -> k [TestLogMessage msg] - CallProc p proc -> k [TestCallProc p proc] - ReplyProc p proc -> k [TestReplyProc p proc] - - ReplyToClientWhoIsLeader mp -> k [TestReplyToClientWhoIsLeader mp] - - cix <- (gets (view #commitIndex)) - (gets (view #lastAppliedIndex)) >>= fix \go lastApplied -> do - when (cix > lastApplied) do - -- increment lastApplied, apply log[lastApplied] to stateMachine - lastApplied' <- #lastAppliedIndex <%= succ - (getLogEntry h) lastApplied' >>= mapM_ \le -> do - case entryStateMachineCommand le of - SMCLoad a -> do - k [TestApplyLogEntryToSM a] - -- | SMAddNode p - -- | SMDropNode p - go lastApplied' - -nodeReact :: forall m p a. (Monad m, Ord p, Show p, Show a) - => PersMethods p a m - -> NodeEvent p a - -> RaftState p - -> ContT () m (NodeAction p a) -nodeReact h@PersMethods{..} ev RaftState{..} = do - ourID <- lift getOurID - let otherNodes = cluster `Set.difference` Set.singleton ourID - ContT \k -> case actorState of - - StateFollower -> do - -- * respond to rpcs from candidates and leaders - -- * if election timeout elapses without receiving AppendEntries from - -- current leader or asking vote from candidates - -- then convert to candidate - case ev of - ElectionTimeoutElapses -> do - k (SwitchRole (StateCandidate (Set.singleton ourID))) - startNewElection k - - LeaderHeartbeat -> do - pure () - - GotProcCall proc -> case proc of - - CallRequestVote (req@RequestVote {..}) -> do - updateCurrentTerm_ k requestVoteTerm - granted <- replyAfterRequestVote req k - k (LogMessage $ "granted: " <> showt granted) - when granted do - k ResetElectionTimer - - CallAppendEntries req@AppendEntries {..} -> do - updateCurrentTerm_ k appendEntriesTerm - replyAfterAppendEntries req k do - k ResetElectionTimer - - RequestVoteReply {..} -> do - k (LogMessage "Follower Got RequestVoteReply. Why ???") - void $ updateCurrentTerm_ k requestVoteReplyTerm - - AppendEntriesReply {..} -> do - void $ updateCurrentTerm_ k appendEntriesReplyTerm - - GotClientCall {} -> do - votedFor <- getVotedFor - k (ReplyToClientWhoIsLeader votedFor) - - -- * if votes received from majority of servers: become leader - -- * if AppendEntries received from new leader: become follower - -- * if election timeout elapses: start new election - StateCandidate{..} -> do - case ev of - ElectionTimeoutElapses -> do - startNewElection k - - LeaderHeartbeat -> do - pure () - - GotProcCall proc -> case proc of - - CallRequestVote req@RequestVote {..} -> do - upd <- updateCurrentTerm k requestVoteTerm - granted <- replyAfterRequestVote req k - when (granted && not upd) do - k ResetElectionTimer - - CallAppendEntries req@AppendEntries {..} -> do - updateCurrentTerm_ k appendEntriesTerm - replyAfterAppendEntries req k do - k ResetElectionTimer - k (SwitchRole StateFollower) - - RequestVoteReply {..} -> do - upd <- updateCurrentTerm k requestVoteReplyTerm - when (requestVoteReplyGranted && not upd) do - let votesCollected' = Set.insert requestVoteReplyFromID votesCollected - k (UpdateState (#actorState . _Ctor @"StateCandidate" .~ votesCollected')) - k (LogMessage ("Votes collected " <> showt votesCollected')) - when (Set.size votesCollected' > Set.size cluster `div` 2) do - (lastLogIndex, lastLogEntryTerm) <- getLastLogIndexNTerm h - k (SwitchRole (StateLeader - { nextIndex = Map.fromList $ (Set.toList otherNodes) <&> (, succ lastLogIndex) - , matchIndex = Map.fromList $ (Set.toList otherNodes) <&> (, defMatchIndex) - })) - k ResetLeaderHeartbeat - forM_ otherNodes \p -> do - term <- getCurrentTerm - k (CallProc p (CallAppendEntries ( - AppendEntries - { appendEntriesTerm = term - , appendEntriesLeaderID = ourID - , appendEntriesPrevLogIndex = lastLogIndex - , appendEntriesPrevLogTerm = lastLogEntryTerm - , appendEntriesES = mempty - , appendEntriesLeaderCommitIndex = commitIndex - }))) - - AppendEntriesReply {..} -> do - void $ updateCurrentTerm k appendEntriesReplyTerm - - GotClientCall {} -> do - votedFor <- getVotedFor - k (ReplyToClientWhoIsLeader votedFor) - - StateLeader{..} -> do - let - leaderCallAppendEntries p = do - ourLastLogIndex <- getLastLogIndex - - let pPrevIndex = maybe ourLastLogIndex pred (nextIndex ^? ix p) - (pPrevEntryTerm, entries') <- - if pPrevIndex > 0 - then - maybe (error "Bug in algorithm") (over _1 entryTerm) . List.uncons - <$> getLogEntriesFrom pPrevIndex - else - (0 ,) <$> getLogEntriesFrom 1 - - term <- getCurrentTerm - k (CallProc p (CallAppendEntries ( - AppendEntries - { appendEntriesTerm = term - , appendEntriesLeaderID = ourID - , appendEntriesPrevLogIndex = pPrevIndex - , appendEntriesPrevLogTerm = pPrevEntryTerm - , appendEntriesES = entries' - , appendEntriesLeaderCommitIndex = commitIndex - }))) - - case ev of - - ElectionTimeoutElapses -> - pure () - - LeaderHeartbeat -> do - mapM_ leaderCallAppendEntries otherNodes - k ResetLeaderHeartbeat - - GotProcCall proc -> case proc of - - CallRequestVote req@RequestVote {..} -> do - updateCurrentTerm k requestVoteTerm - replyAfterRequestVote req k - pure () - - CallAppendEntries req@AppendEntries {..} -> do - updateCurrentTerm k appendEntriesTerm - replyAfterAppendEntries req k do - k ResetElectionTimer - k (SwitchRole StateFollower) - pure () - - RequestVoteReply {..} -> do - updateCurrentTerm k requestVoteReplyTerm - pure () - - AppendEntriesReply {..} -> do - -- Нужно ли здесь учитывать appendEntriesReplyTerm ? - currentTerm <- getCurrentTerm - if (appendEntriesReplyTerm == currentTerm) - then do - if appendEntriesReplySuccess - then do - ourLastLogIndex <- getLastLogIndex - let - -- updMatchIndex :: Map p Int -> Map p Int - updMatchIndex = Lens.at appendEntriesReplyFromID . non defMatchIndex .~ ourLastLogIndex - k (UpdateState (( - #actorState . _Ctor @"StateLeader" - . _1 -- . #nextIndex - . ix appendEntriesReplyFromID .~ succ ourLastLogIndex - ) . ( - #actorState . _Ctor @"StateLeader" - . _2 -- . #matchIndex - %~ updMatchIndex - ))) - let newCommitIndex = matchIndex & updMatchIndex & Map.elems - & Heap.fromList - & Heap.drop ((Set.size cluster `div` 2 - 1)) - & Heap.uncons - & maybe defMatchIndex fst - when (newCommitIndex > commitIndex) do - getLogEntry newCommitIndex >>= mapM_ do - entryTerm >>> \term -> - when (term == currentTerm) do - setCurrentTerm term - k (UpdateState (#commitIndex .~ newCommitIndex)) - else do - k (UpdateState (#actorState . _Ctor @"StateLeader" - . _1 -- . #nextIndex - . ix appendEntriesReplyFromID %~ pred - )) - leaderCallAppendEntries appendEntriesReplyFromID - else - void $ updateCurrentTerm k appendEntriesReplyTerm - - GotClientCall cmd -> do - j <- getLastLogIndex - term <- getCurrentTerm - setLogEntry (succ j) - (LogEntry - { entryTerm = term - , entryStateMachineCommand = cmd - }) - -- TODO "respond after entry applied to state machine" - - where - - defMatchIndex = 0 - - startNewElection k = do - - ourID <- getOurID - let otherNodes = cluster `Set.difference` Set.singleton ourID - - setCurrentTerm . succ =<< getCurrentTerm - setVotedFor . Just =<< getOurID - k ResetElectionTimer - - term <- getCurrentTerm - (lastLogIndex, lastLogEntryTerm) <- getLastLogIndexNTerm h - forM_ otherNodes \p -> do - k (CallProc p (CallRequestVote ( - RequestVote - { requestVoteTerm = term - , requestVoteCandidateID = ourID - , requestVoteCandidateLastLogIndex = lastLogIndex - , requestVoteCandidateLastLogTerm = lastLogEntryTerm - }))) - - updateCurrentTerm_ k term = do - updateCurrentTerm' k term (pure ()) - - updateCurrentTerm k term = do - updateCurrentTerm' k term do - k (SwitchRole StateFollower) - k ResetElectionTimer - - updateCurrentTerm' k term onUpdate = do - currentTerm <- getCurrentTerm - k (LogMessage ("updateCurrentTerm ? our:" <> showt currentTerm <> " -> new:" <> showt term)) - if (currentTerm < term) then do - setCurrentTerm term - onUpdate - setVotedFor Nothing - pure True - else pure False - - replyAfterRequestVote RequestVote {..} k = do - -- 1. if term < currentTerm: reply false - -- 2. else if voteFor is null or candidateID, - -- and candidate's log is at least up-to-date as our log: grant vote - currentTerm <- getCurrentTerm - granted <- (either (\e -> k (LogMessage e) >> pure False) (const (pure True))) =<< runExceptT do - - when (requestVoteTerm < currentTerm) do - throwError "requestVoteTerm < currentTerm" - - lift getVotedFor >>= mapM_ \ourVoteFor -> do - when (ourVoteFor /= requestVoteCandidateID) do - throwError "already voted for another candidate" - - setVotedFor (Just requestVoteCandidateID) - ourID <- getOurID - k $ ReplyProc requestVoteCandidateID - $ RequestVoteReply - { requestVoteReplyTerm = currentTerm - , requestVoteReplyGranted = granted - , requestVoteReplyFromID = ourID - } - pure granted - - replyAfterAppendEntries AppendEntries {..} k onSuccess = do - -- 1. if term < currentTerm: reply false - -- 2. reply false if log doesn't contain an entry at prevLogIndex whose - -- term matches prevLogTerm - -- 3. if an existing entry conflicts with a new one (same index but - -- different terms), delete the existing entry and all that follow it - -- 4. append any new entries not already in the log - -- 5. if leaderCommit > commitIndex, - -- set commitIndex = min(leaderCommit, index of last new entry) - - currentTerm <- getCurrentTerm - k (LogMessage ("replyAfterAppendEntries our:" <> showt currentTerm <> " showt appendEntriesTerm)) - success <- (either (\e -> k (LogMessage e) >> pure False) (const (pure True))) =<< runExceptT do - - when (appendEntriesTerm < currentTerm) do - throwError "appendEntriesTerm < currentTerm" - - when (appendEntriesPrevLogIndex > 0) do - le' <- justOrThrowError ("No log entry " <> showt appendEntriesPrevLogIndex) - $ getLogEntry appendEntriesPrevLogIndex - when (entryTerm le' /= appendEntriesPrevLogTerm) do - throwError "entryTerm at appendEntriesPrevLogIndex /= appendEntriesPrevLogTerm" - - justOrThrowError "?-?" $ flip fix (zip [succ appendEntriesPrevLogIndex..] appendEntriesES) $ - \go -> \case - [] -> pure (Just ()) - (j, le):es -> - getLogEntry j >>= \case - - Nothing -> do - setLogEntry j le - mapM_ (uncurry setLogEntry) es - pure (Just ()) - - Just cle -> do - if (entryTerm cle == entryTerm le) - then go es - else do - setLogEntry j le - mapM_ (uncurry setLogEntry) es - pure (Just ()) - - when (appendEntriesLeaderCommitIndex > commitIndex) do - lift do - j <- getLastLogIndex - k (UpdateState (#commitIndex .~ min appendEntriesLeaderCommitIndex j)) - - when success onSuccess - - ourID <- getOurID - k $ ReplyProc appendEntriesLeaderID - $ AppendEntriesReply - { appendEntriesReplyTerm = currentTerm - , appendEntriesReplySuccess = success - , appendEntriesReplyFromID = ourID - } - -justOrThrowError :: Functor m => e -> m (Maybe a) -> ExceptT e m a -justOrThrowError e = ExceptT . fmap (maybe (Left e) Right) diff --git a/examples/raft-algo/raft-algo.cabal b/examples/raft-algo/raft-algo.cabal deleted file mode 100644 index 6413302e..00000000 --- a/examples/raft-algo/raft-algo.cabal +++ /dev/null @@ -1,244 +0,0 @@ -cabal-version: 3.0 -name: raft-algo -version: 0.1.0.0 --- synopsis: --- description: -license: BSD-3-Clause -license-file: LICENSE --- author: --- maintainer: --- copyright: -category: Network -build-type: Simple -extra-doc-files: CHANGELOG.md --- extra-source-files: - -common warnings - ghc-options: -Wall - -common common-deps - build-depends: - base, hbs2-core, hbs2-storage-simple - , async - , bytestring - , cache - , containers - , data-default - , deepseq - , directory - , filepath - , generic-lens - , hashable - , heaps - , microlens-platform - , mtl - , mwc-random - , prettyprinter - , QuickCheck - , random - , random-shuffle - , resourcet - , safe - , serialise - , split - , stm - , streaming - , suckless-conf - , tasty - , tasty-hunit - , temporary - , timeit - , transformers - , uniplate - , unordered-containers - , vector - , prettyprinter-ansi-terminal - , interpolatedstring-perl6 - , unliftio - -common shared-properties - ghc-options: - -Wall - -O2 - -fno-warn-type-defaults - -- -fno-warn-unused-matches - -- -fno-warn-unused-do-bind - -- -Werror=missing-methods - -- -Werror=incomplete-patterns - -- -fno-warn-unused-binds - -threaded - -rtsopts - "-with-rtsopts=-N4 -A256m -AL256m -I0" - - - default-language: Haskell2010 - - default-extensions: - ApplicativeDo - , BangPatterns - , BlockArguments - , ConstraintKinds - , DataKinds - , DeriveDataTypeable - , DeriveGeneric - , DerivingStrategies - , DerivingVia - , ExtendedDefaultRules - , FlexibleContexts - , FlexibleInstances - , GADTs - , GeneralizedNewtypeDeriving - , ImportQualifiedPost - , LambdaCase - , MultiParamTypeClasses - , OverloadedLabels - , OverloadedStrings - , QuasiQuotes - , RankNTypes - , RecordWildCards - , ScopedTypeVariables - , StandaloneDeriving - , TupleSections - , TypeApplications - , TypeFamilies - , ViewPatterns - -library - import: shared-properties - import: common-deps - default-language: Haskell2010 - - exposed-modules: RaftAlgo.Proto - - ghc-options: - -- -prof - -- -fprof-auto - - -- other-extensions: - - -- type: exitcode-stdio-1.0 - hs-source-dirs: lib - - build-depends: - base, hbs2-core - , async - , attoparsec - , bytestring - , cache - , clock - , containers - , data-default - , directory - , hashable - , microlens-platform - , mtl - , mwc-random - , network - , network-ip - , optparse-applicative - , prettyprinter - , QuickCheck - , random - , safe - , serialise - , stm - , streaming - , tasty - , tasty-hunit - , text - , time - , transformers - , uniplate - , vector - , unliftio - -executable raft-algo-app - import: shared-properties - import: common-deps - default-language: Haskell2010 - - ghc-options: - -- -prof - -- -fprof-auto - - -- other-modules: - - -- other-extensions: - - -- type: exitcode-stdio-1.0 - hs-source-dirs: app lib - main-is: RaftAlgoMain.hs - - build-depends: - base, raft-algo, hbs2-core, hbs2-storage-simple - , async - , attoparsec - , bytestring - , cache - , clock - , containers - , data-default - , data-textual - , directory - , hashable - , microlens-platform - , mtl - , mwc-random - , network - , network-ip - , optparse-applicative - , prettyprinter - , QuickCheck - , random - , safe - , serialise - , stm - , streaming - , tasty - , tasty-hunit - , text - , time - , transformers - , uniplate - , vector - , unliftio - - -test-suite raft-algo-proto-test - import: shared-properties - default-language: Haskell2010 - - other-modules: - - type: exitcode-stdio-1.0 - hs-source-dirs: test - main-is: RaftAlgoProtoTest.hs - - build-depends: - base, raft-algo, hbs2-core - , async - , bytestring - , cache - , cborg - , containers - , directory - , filepath - , hashable - , microlens-platform - , mtl - , optparse-applicative - , prettyprinter - , QuickCheck - , random - , safe - , serialise - , tasty - , tasty-hunit - , temporary - , timeit - , uniplate - , unliftio - , unordered-containers - , vector - - diff --git a/examples/raft-algo/test/RaftAlgoProtoTest.hs b/examples/raft-algo/test/RaftAlgoProtoTest.hs deleted file mode 100644 index a652d1f5..00000000 --- a/examples/raft-algo/test/RaftAlgoProtoTest.hs +++ /dev/null @@ -1,33 +0,0 @@ -module Main where - -import HBS2.Prelude.Plated -import HBS2.System.Logger.Simple -import HBS2.Clock -import HBS2.Hash -import HBS2.Base58 - -import RaftAlgo.Proto - -import Data.Ord -import Data.Functor -import Data.HashMap.Strict (HashMap) -import Data.HashMap.Strict qualified as HashMap -import Data.List qualified as List -import Control.Monad.Reader -import System.Random -import Data.Hashable hiding (Hashed) -import Data.Word -import Lens.Micro.Platform -import Data.Fixed -import Options.Applicative as O -import Data.Cache (Cache) -import Data.Cache qualified as Cache -import Data.Maybe -import Data.Graph -import Data.HashSet (HashSet) -import Data.HashSet qualified as HashSet - -import Codec.Serialise - -import UnliftIO - diff --git a/hbs2-core/hbs2-core.cabal b/hbs2-core/hbs2-core.cabal index cb454686..1b86abc0 100644 --- a/hbs2-core/hbs2-core.cabal +++ b/hbs2-core/hbs2-core.cabal @@ -20,7 +20,6 @@ common shared-properties ghc-options: -Wall -fno-warn-type-defaults - -- -fprint-potential-instances -- -prof -fprof-auto -- -fno-warn-unused-matches -- -fno-warn-unused-do-bind @@ -41,7 +40,6 @@ common shared-properties , ConstraintKinds , DataKinds , DeriveDataTypeable - , DeriveFunctor , DeriveGeneric , DerivingStrategies , DerivingVia @@ -122,10 +120,10 @@ library , HBS2.Storage.Operations , HBS2.System.Logger.Simple , HBS2.System.Logger.Simple.Class - , HBS2.Net.Dialog.Core - , HBS2.Net.Dialog.Client - , HBS2.Net.Dialog.Helpers.List - , HBS2.Net.Dialog.Helpers.Streaming + , Dialog.Core + , Dialog.Client + , Dialog.Helpers.List + , Dialog.Helpers.Streaming -- other-modules: @@ -141,7 +139,6 @@ library , cache , cborg , clock - , constraints , containers , cryptonite , data-default diff --git a/hbs2-core/lib/HBS2/Net/Dialog/Client.hs b/hbs2-core/lib/Dialog/Client.hs similarity index 93% rename from hbs2-core/lib/HBS2/Net/Dialog/Client.hs rename to hbs2-core/lib/Dialog/Client.hs index dd4d2465..5d8a987d 100644 --- a/hbs2-core/lib/HBS2/Net/Dialog/Client.hs +++ b/hbs2-core/lib/Dialog/Client.hs @@ -1,7 +1,7 @@ {-# LANGUAGE OverloadedLabels #-} {-# LANGUAGE StrictData #-} {-# LANGUAGE ImpredicativeTypes #-} -module HBS2.Net.Dialog.Client where +module Dialog.Client where -- import System.Clock -- import System.Timeout @@ -34,8 +34,8 @@ import UnliftIO.Exception import UnliftIO.STM import UnliftIO.Timeout -import HBS2.Net.Dialog.Core -import HBS2.Net.Dialog.Helpers.Streaming +import Dialog.Core +import Dialog.Helpers.Streaming --- @@ -96,11 +96,10 @@ dQuery' par dcli peer rq go = processResponseHeader rhxs@(rh, xs) = case ((responseStatusCode . respStatus) rh) of Success200 -> Left (Just rhxs, RequestDone) - SuccessNoContent204 -> Left (Just rhxs, RequestDone) SuccessMore -> Right rhxs - BadRequest400 -> Left (Nothing, (RequestFailure (respStatus rh) xs)) - Forbidden403 -> Left (Nothing, (RequestFailure (respStatus rh) xs)) - NotFound404 -> Left (Nothing, (RequestFailure (respStatus rh) xs)) + r@BadRequest400 -> Left (Nothing, (RequestFailure r xs)) + r@Forbidden403 -> Left (Nothing, (RequestFailure r xs)) + r@NotFound404 -> Left (Nothing, (RequestFailure r xs)) rq' = rq & #unFrames %~ ([serialiseS routerSignature] <>) diff --git a/hbs2-core/lib/Dialog/Core.hs b/hbs2-core/lib/Dialog/Core.hs new file mode 100644 index 00000000..d259e89f --- /dev/null +++ b/hbs2-core/lib/Dialog/Core.hs @@ -0,0 +1,442 @@ +{-# LANGUAGE StrictData #-} +-- {-# LANGUAGE OverloadedLists #-} +-- {-# LANGUAGE UndecidableInstances #-} +module Dialog.Core where + +-- import Data.ByteString.Builder as Builder +-- import Data.ByteString.Builder.Internal as Builder +-- import GHC.IsList +import Codec.Serialise +import Control.Arrow +import Control.Monad +import Control.Monad.Error.Class +import Control.Monad.Except (Except(..), ExceptT(..), runExcept, runExceptT) +import Control.Monad.IO.Class +import Control.Monad.State.Class as State +import Control.Monad.State.Strict (evalStateT) +import Control.Monad.Trans.Class +import Control.Monad.Writer qualified as W +import Data.Binary.Get as Get +import Data.Binary.Put as Put +import Data.Bits +import Data.Bool +import Data.ByteArray (ByteArrayAccess) +import Data.ByteArray.Sized as BAS +import Data.ByteString (ByteString) +import Data.ByteString qualified as BS +import Data.ByteString.Char8 qualified as BS8 +import Data.ByteString.Lazy qualified as BSL +import Data.Foldable +import Data.Foldable as F +import Data.Function +import Data.Generics.Labels +import Data.Generics.Product.Fields +import Data.List qualified as List +import Data.List.NonEmpty qualified as NE +import Data.Map.Strict as Map +import Data.Maybe +import Data.String.Conversions as X (cs) +import Data.Text (Text) +import Data.Word +import GHC.Exts +import GHC.Generics (Generic) +import Lens.Micro.Platform +import Numeric.Natural +import System.Random.MWC +import UnliftIO.Exception +import UnliftIO.STM + +-- import Prettyprinter +-- import HBS2.Base58 +import Data.ByteString.Base16 qualified as B16 + +import Dialog.Helpers.List + +type Frames = Frames' ByteString +newtype Frames' a = Frames { unFrames :: [a] } + deriving stock (Generic,Eq) + deriving newtype (Functor, Foldable, Semigroup, Monoid + -- , IsList + ) + + +instance Show Frames where + show (Frames xs) = "Frames " <> show (BS.length <$> xs) + -- <> " " <> show (fmap B16.encode xs) + <> " " <> (show . fmap (limitSize 42)) xs + + where + limitSize n as = bool as (BS.take (n-3) as <> "...") (BS.length as > n) + +framesBodyPart :: Traversal' Frames [ByteString] +framesBodyPart = #unFrames . tailAfterP (== "") + +tailAfterP :: forall a . (a -> Bool) -> Traversal' [a] [a] +tailAfterP p focus = fix \go -> \case + x:xs -> (x :) <$> bool go focus (p x) xs + xs -> pure xs + +--- + +-- encodeFrames :: Frames -> ByteString +encodeFrames :: Foldable t => t ByteString -> ByteString +encodeFrames = F.toList >>> BSL.toStrict . runPut . \case + + [] -> pure () + + xss -> flip fix xss \go -> \case + [] -> pure () + bs:xs -> do + let (flip shiftR 1 -> n1, ns) = unfoldSizeBytes @Word64 . flip shiftL 1 . fromIntegral . BS.length $ bs + + putWord8 $ n1 + & (bool (sbit 7) id (List.null xs)) + & (bool (sbit 6) id (List.null ns)) + + forM_ (markMore ns) \(n, isMoreBytesInSize) -> do + putWord8 $ n & bool (zbit 7) (sbit 7) isMoreBytesInSize + + putByteString bs + + go xs + + where + + markMore as = zip as ((True <$ List.drop 1 as) <> [False]) + + unfoldSizeBytes :: (Bits n, Integral n) => n -> (Word8, [Word8]) + unfoldSizeBytes = (\(a NE.:| as) -> (a, as)) . NE.unfoldr \w -> + ( (flip shiftR 1 . flip shiftL 1 . fromIntegral) w + , let w' = shiftR w 7 + in bool Nothing (Just w') (w' > 0) + ) + +decodeFrames :: MonadError String m => ByteString -> m Frames +decodeFrames = \case + "" -> pure mempty + + bs' -> (bs' &) $ BSL.fromStrict >>> either (throwError . view _3) (pure . Frames . view _3) + <$> runGetOrFail do + + fix \go -> do + + j <- getWord8 + + size <- + flip fix (6, j) \fu (b, j') -> do + let n = (fromIntegral . clearLeftBits (8-b)) j' + if (tbit b j') + then (n +) . flip shiftL b <$> (fu . (7, ) =<< getWord8) + else pure n + + bs <- getByteString size + + let moreFrames = tbit 7 j + + if moreFrames + then (bs :) <$> go + else pure [bs] + + where + clearLeftBits n = flip shiftR n . flip shiftL n + tbit = flip testBit + + +devDialogCore :: IO () +devDialogCore = do + display (Frames []) + display (Frames [""]) + display (Frames [BS.replicate 32 0x55]) + display (Frames [BS.replicate 32 0x55, ""]) + display (Frames [BS.replicate 32 0x55, "\3\3"]) + display (Frames [BS.replicate 33 0x55, "\3\3"]) + display (Frames [BS.replicate 63 0x55]) + display (Frames [BS.replicate 64 0x55]) + -- display (Frames [BS.replicate 65 0x55]) + display (Frames ["\8\8\8","\4\4\4"]) + display (Frames ["","\1"]) + where + display a = do + putStrLn . cs . show . B16.encode . encodeFrames $ a + putStrLn "" + + + + +sbit :: (Bits n) => Int -> n -> n +sbit = flip setBit + +zbit :: (Bits n) => Int -> n -> n +zbit = flip clearBit + +--- + +decodeFramesFail :: (MonadFail m) => ByteString -> m Frames +decodeFramesFail = errorToFail . decodeFrames + +--- + +errorToFailT :: (MonadFail m) => ExceptT String m a -> m a +errorToFailT = either fail pure <=< runExceptT + +errorToFail :: MonadFail m => Except String a -> m a +errorToFail = either fail pure . runExcept + +errorShowToFail :: (MonadFail m, Show s) => Except s a -> m a +errorShowToFail = either (fail . show) pure . runExcept + +-- + +data CallerID = CallerID + { unCallerIDV :: Word8 + , unCallerID :: Word32 + } + deriving stock (Generic, Eq, Ord) + +instance Serialise CallerID + +newCallerID :: forall m. MonadIO m => m CallerID +newCallerID = liftIO $ withSystemRandomST \g -> + CallerID <$> (uniformM g) <*> (uniformM g) + +--- + +newtype CallerHandler m = CallerHandler + { unCallerHandler :: Frames -> m () + } + +newtype CallerEnv m = CallerEnv + { unCallerEnv :: TVar (Map CallerID (CallerHandler m)) } + +newCallerEnv :: MonadIO m => m (CallerEnv m') +newCallerEnv = CallerEnv <$> newTVarIO mempty + +--- + +newtype RequestResponseHandler m = RequestResponseHandler + { unRequestResponseHandler :: Frames -> m () + } + +newtype RequestResponseEnv m = RequestResponseEnv + { unRequestResponseEnv :: TVar (Map RequestID (RequestResponseHandler m)) + } + +newRequestResponseEnv :: MonadIO m => m (RequestResponseEnv m') +newRequestResponseEnv = + RequestResponseEnv <$> newTVarIO mempty + +--- + +data DClient m p i = DClient + { clientCallerEnv :: CallerEnv m + , clientSendProtoRequest :: p -> Frames -> m () + , clientGetKnownPeers :: m [(p, i)] + } + +--- + +newtype RequestID = RequestID { unRequestID :: Word32 } + deriving stock (Generic, Eq, Ord) + deriving newtype (Serialise, Num, Enum) + -- deriving via TODO_GenericVLQ Put Get + +data RequestResult + = RequestDone + -- | RequestSuccessIncomplete + | RequestTimeout + | RequestFailure ResponseStatusCode Frames + | RequestErrorBadResponse BadResponse Frames + deriving stock (Generic, Eq, Show) + +data BadResponse + = ResponseErrorNoResponseHeader + | ResponseInsufficientFrames + | ResponseParseError DeserialiseFailure + deriving stock (Generic, Eq, Show) + +--- + +setupCallerEnv :: MonadIO m => CallerEnv m' -> CallerID -> CallerHandler m' -> m () +setupCallerEnv env callerID repHandleEnv = + (atomically . modifyTVar' (unCallerEnv env)) + (at callerID .~ Just repHandleEnv) + +clearCallerEnv :: MonadIO m => CallerEnv m' -> CallerID -> m () +clearCallerEnv env callerID = + (atomically . modifyTVar' (unCallerEnv env)) + (at callerID .~ Nothing) + +findCallerHandler :: MonadIO m => CallerEnv m' -> CallerID -> m (Maybe (CallerHandler m')) +findCallerHandler CallerEnv{..} callerID = + (atomically (readTVar unCallerEnv)) <&> (preview (ix callerID)) + +--- + +setupRepHandler :: MonadIO m => RequestResponseEnv m' -> RequestID -> RequestResponseHandler m' -> m () +setupRepHandler RequestResponseEnv{..} requestID useResponse = + (atomically . modifyTVar' unRequestResponseEnv) + (at requestID .~ Just useResponse) + +clearRepHandler :: MonadIO m => RequestResponseEnv m' -> RequestID -> m () +clearRepHandler RequestResponseEnv{..} requestID = + (atomically . modifyTVar' unRequestResponseEnv) + (at requestID .~ Nothing) + +findRepHandler :: MonadIO m => RequestResponseEnv m' -> RequestID -> m (Maybe (RequestResponseHandler m')) +findRepHandler RequestResponseEnv{..} requestID = + (atomically (readTVar unRequestResponseEnv)) <&> (preview (ix requestID)) + +--- + +data DialogRequestEnv m p pd = DialogRequestEnv + { dreqEnvPeer :: p + , dreqEnvGetPeerData :: m pd + } + +-- data DialogRequestError +-- = DialogRequestFailure String +-- deriving stock (Show) +-- instance Exception DialogRequestError + +--- + +newtype DialogRequestRouter m = DialogRequestRouter + { unDialogRequestRouter :: + Map [ByteString] -- path + + -- handler :: Input -> m (Either ErrorMessage (HowToReply -> ResponseContinuation)) + (Frames -> Either Text ((Frames -> m ()) -> m ())) + } + + deriving (Semigroup, Monoid) + +dialogRequestRoutes + :: ListBuilder + ([ByteString], Frames -> Either Text ((Frames -> m ()) -> m ())) + -> DialogRequestRouter m +dialogRequestRoutes = DialogRequestRouter . Map.fromList . buildList + +hand :: Monad m => a -> b -> ListBuilderT m (a, b) +hand = curry li + +--- + +dpath :: Text -> [ByteString] -> Frames +dpath path = Frames . (cs path :) + +--- + +addEnvelope :: Monoid a => [a] -> Frames' a -> Frames' a +addEnvelope en = over #unFrames ((en <> [mempty]) <>) + +splitEnvelope :: (Monoid a, Eq a) => Frames' a -> ([a], Frames' a) +splitEnvelope = fmap (Frames . List.drop 1) . List.break (== mempty) . unFrames + +data ResponseHeader = ResponseHeader + { respStatus :: ResponseStatus + , respSeqNumber :: Int + } + deriving (Generic, Show, Eq) + +instance Serialise ResponseHeader + +data ResponseStatus = ResponseStatus + { responseStatusCode :: ResponseStatusCode + , responseStatusMessage :: Text + } + deriving (Generic, Show, Eq) + +instance Serialise ResponseStatus + +data ResponseStatusCode + = Success200 + | SuccessMore + | BadRequest400 + | Forbidden403 + | NotFound404 + deriving (Generic, Show, Eq) + +instance Serialise ResponseStatusCode + +routerSignature :: Word8 +routerSignature = 1 + +routeDialogRequest :: forall m p pd . + Monad m + => DialogRequestRouter m + -> DialogRequestEnv m p pd + -> (Frames -> m ()) + -> Frames + -> m () +routeDialogRequest router drenv rawReplyToPeer frames = do + erun <- pure $ runExcept $ flip evalStateT req do + + signature <- cutFrameDecode + (ResponseStatus BadRequest400 "No signature in request") + + when (signature /= routerSignature) $ throwError + (ResponseStatus BadRequest400 "Wrong signature in request") + + route <- cutFrameOr + (ResponseStatus BadRequest400 "No route in request") + + h <- fromJustThrowError + (ResponseStatus NotFound404 "Route not found") + (unDialogRequestRouter router ^? ix (BS8.split '/' route)) + + lift . ExceptT . pure + -- Если не может разобрать параметры запроса, + -- то самим ответить этому пиру '404' + . left (ResponseStatus BadRequest400) + . h + -- передать оставшуюся часть запроса в хэндлер + =<< get + + case erun of + Left rs -> replyToPeer (Frames [serialiseS (ResponseHeader rs 0)]) + Right run -> + -- передать хэндлеру продолжение чтобы ответить этому пиру + run replyToPeer + + where + (backPath, req) = splitEnvelope frames + + replyToPeer :: Frames -> m () + replyToPeer = rawReplyToPeer . over #unFrames (backPath <>) + +cutFrameDecode :: (Serialise b, MonadState Frames m, MonadError e m) => e -> m b +cutFrameDecode e = + State.gets unFrames >>= \case + x:xs -> + (either (const (throwError e)) pure . deserialiseOrFailS) x + <* State.put (Frames xs) + _ -> throwError e + +cutFrameDecode' + :: (Serialise b, MonadState Frames m, MonadError (Maybe DeserialiseFailure) m) + => m b +cutFrameDecode' = + State.gets unFrames >>= \case + x:xs -> + (either (throwError . Just) pure . deserialiseOrFailS) x + <* State.put (Frames xs) + _ -> throwError Nothing + +cutFrameOr :: (MonadState (Frames' b) m, MonadError e m) => e -> m b +cutFrameOr e = + State.gets unFrames >>= \case + x:xs -> x <$ State.put (Frames xs) + _ -> throwError e + +serialiseS :: Serialise a => a -> ByteString +serialiseS = BSL.toStrict . serialise + +deserialiseOrFailS :: Serialise a => ByteString -> Either DeserialiseFailure a +deserialiseOrFailS = deserialiseOrFail . BSL.fromStrict + +fromMaybeM :: Applicative m => m a -> Maybe a -> m a +fromMaybeM ma = maybe ma pure + +fromJustThrowError :: MonadError e m => e -> Maybe a -> m a +fromJustThrowError = fromMaybeM . throwError + diff --git a/hbs2-core/lib/HBS2/Net/Dialog/Helpers/List.hs b/hbs2-core/lib/Dialog/Helpers/List.hs similarity index 91% rename from hbs2-core/lib/HBS2/Net/Dialog/Helpers/List.hs rename to hbs2-core/lib/Dialog/Helpers/List.hs index 2460b993..b086b2e8 100644 --- a/hbs2-core/lib/HBS2/Net/Dialog/Helpers/List.hs +++ b/hbs2-core/lib/Dialog/Helpers/List.hs @@ -1,4 +1,4 @@ -module HBS2.Net.Dialog.Helpers.List where +module Dialog.Helpers.List where import Control.Monad.Trans.Writer.CPS qualified as W import Data.Functor.Identity diff --git a/hbs2-core/lib/HBS2/Net/Dialog/Helpers/Streaming.hs b/hbs2-core/lib/Dialog/Helpers/Streaming.hs similarity index 97% rename from hbs2-core/lib/HBS2/Net/Dialog/Helpers/Streaming.hs rename to hbs2-core/lib/Dialog/Helpers/Streaming.hs index 412928a4..a38be2b6 100644 --- a/hbs2-core/lib/HBS2/Net/Dialog/Helpers/Streaming.hs +++ b/hbs2-core/lib/Dialog/Helpers/Streaming.hs @@ -1,4 +1,4 @@ -module HBS2.Net.Dialog.Helpers.Streaming where +module Dialog.Helpers.Streaming where import Control.Monad.Fix import Data.ByteString qualified as BS diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index a574609a..bf82a7e7 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -11,7 +11,6 @@ module HBS2.Actors.Peer import HBS2.Actors import HBS2.Actors.Peer.Types import HBS2.Clock -import HBS2.Data.Types.Crypto import HBS2.Data.Types.Peer import HBS2.Defaults import HBS2.Events @@ -159,8 +158,30 @@ data PeerEnv e = , _envSweepers :: TVar (HashMap SKey [PeerM e IO ()]) , _envReqMsgLimit :: Cache (Peer e, Integer, Encoded e) () , _envReqProtoLimit :: Cache (Peer e, Integer) () + , _envAsymmetricKeyPair :: AsymmKeypair (Encryption e) + , _envEncryptionKeys :: TVar (HashMap (PeerData e) (CommonSecret (Encryption e))) } +setEncryptionKey :: + ( Hashable (PubKey 'Sign (Encryption L4Proto)) + , Hashable PeerNonce + , Show (PubKey 'Sign (Encryption L4Proto)) + , Show PeerNonce + , Show (CommonSecret (Encryption L4Proto)) + ) => PeerEnv L4Proto -> Peer L4Proto -> PeerData L4Proto -> Maybe (CommonSecret (Encryption L4Proto)) -> IO () +setEncryptionKey penv peer pd msecret = do + atomically $ modifyTVar' (_envEncryptionKeys penv) $ Lens.at pd .~ msecret + case msecret of + Nothing -> trace $ "ENCRYPTION delete key" <+> pretty peer <+> viaShow pd + Just k -> trace $ "ENCRYPTION store key" <+> pretty peer <+> viaShow pd <+> viaShow k + +getEncryptionKey :: + ( Hashable (PubKey 'Sign (Encryption L4Proto)) + , Hashable PeerNonce + ) => PeerEnv L4Proto -> PeerData L4Proto -> IO (Maybe (CommonSecret (Encryption L4Proto))) +getEncryptionKey penv pd = + readTVarIO (_envEncryptionKeys penv) <&> preview (Lens.ix pd) + newtype PeerM e m a = PeerM { fromPeerM :: ReaderT (PeerEnv e) m a } deriving newtype ( Functor , Applicative @@ -168,7 +189,6 @@ newtype PeerM e m a = PeerM { fromPeerM :: ReaderT (PeerEnv e) m a } , MonadReader (PeerEnv e) , MonadIO , MonadUnliftIO - , MonadTrans ) @@ -192,10 +212,10 @@ makeLenses 'PeerEnv makeLenses 'ResponseEnv -runResponseM :: forall e m a . (Monad m) +runResponseM :: forall e m . (Monad m) => Peer e - -> ResponseM e m a - -> m a + -> ResponseM e m () + -> m () runResponseM peer f = runReaderT (fromResponse f) (ResponseEnv peer) @@ -415,6 +435,8 @@ newPeerEnv s bus p = do _envSweepers <- liftIO (newTVarIO mempty) _envReqMsgLimit <- liftIO (Cache.newCache (Just defRequestLimit)) _envReqProtoLimit <- liftIO (Cache.newCache (Just defRequestLimit)) + _envAsymmetricKeyPair <- asymmNewKeypair @(Encryption e) + _envEncryptionKeys <- liftIO (newTVarIO mempty) pure PeerEnv {..} runPeerM :: forall e m . ( MonadIO m diff --git a/hbs2-core/lib/HBS2/Data/Types/Refs.hs b/hbs2-core/lib/HBS2/Data/Types/Refs.hs index 541c3434..241d4d10 100644 --- a/hbs2-core/lib/HBS2/Data/Types/Refs.hs +++ b/hbs2-core/lib/HBS2/Data/Types/Refs.hs @@ -8,7 +8,7 @@ module HBS2.Data.Types.Refs import HBS2.Base58 import HBS2.Hash import HBS2.Merkle -import HBS2.Net.Proto.Types +import HBS2.Net.Proto.Types (Encryption) import HBS2.Net.Auth.Credentials import HBS2.Prelude diff --git a/hbs2-core/lib/HBS2/Net/Auth/AccessKey.hs b/hbs2-core/lib/HBS2/Net/Auth/AccessKey.hs index 4d4c0b8e..abf84c17 100644 --- a/hbs2-core/lib/HBS2/Net/Auth/AccessKey.hs +++ b/hbs2-core/lib/HBS2/Net/Auth/AccessKey.hs @@ -10,7 +10,6 @@ import HBS2.Data.Types import HBS2.Merkle import HBS2.Net.Auth.Credentials import HBS2.Net.Proto.Definition -import HBS2.Net.Proto.Types import HBS2.Prelude.Plated import Codec.Serialise diff --git a/hbs2-core/lib/HBS2/Net/Auth/Credentials.hs b/hbs2-core/lib/HBS2/Net/Auth/Credentials.hs index 1a866395..0ae23baf 100644 --- a/hbs2-core/lib/HBS2/Net/Auth/Credentials.hs +++ b/hbs2-core/lib/HBS2/Net/Auth/Credentials.hs @@ -28,6 +28,11 @@ import Data.Kind type family EncryptPubKey e :: Type +data CryptoAction = Sign | Encrypt + +type family PubKey ( a :: CryptoAction) e :: Type +type family PrivKey ( a :: CryptoAction) e :: Type + class Signatures e where type family Signature e :: Type makeSign :: PrivKey 'Sign e -> ByteString -> Signature e @@ -202,3 +207,4 @@ instance IsEncoding (PubKey 'Encrypt e) => Pretty (KeyringEntry e) where pretty ke = fill 10 "pub-key:" <+> pretty (AsBase58 (Crypto.encode (view krPk ke))) + diff --git a/hbs2-core/lib/HBS2/Net/Dialog/Core.hs b/hbs2-core/lib/HBS2/Net/Dialog/Core.hs deleted file mode 100644 index 60aef062..00000000 --- a/hbs2-core/lib/HBS2/Net/Dialog/Core.hs +++ /dev/null @@ -1,831 +0,0 @@ -{-# Language AllowAmbiguousTypes #-} -{-# LANGUAGE DeriveDataTypeable #-} -{-# LANGUAGE InstanceSigs #-} -{-# LANGUAGE PolyKinds #-} -{-# LANGUAGE QuantifiedConstraints #-} -{-# LANGUAGE StrictData #-} -{-# LANGUAGE TypeOperators #-} -{-# LANGUAGE UndecidableInstances #-} - --- {-# LANGUAGE ConstraintKinds #-} --- {-# LANGUAGE OverloadedLists #-} - --- {-# LANGUAGE CPP #-} --- {-# LANGUAGE DataKinds #-} --- {-# LANGUAGE FlexibleContexts #-} --- {-# LANGUAGE FlexibleInstances #-} --- {-# LANGUAGE MultiParamTypeClasses #-} --- {-# LANGUAGE OverloadedStrings #-} --- {-# LANGUAGE RankNTypes #-} --- {-# LANGUAGE ScopedTypeVariables #-} --- {-# LANGUAGE TupleSections #-} --- {-# LANGUAGE TypeApplications #-} --- {-# LANGUAGE TypeFamilies #-} - - -module HBS2.Net.Dialog.Core where - -import Codec.Serialise -import Control.Arrow -import Control.Monad -import Control.Monad.Error.Class -import Control.Monad.Except (Except, ExceptT(..), runExcept, runExceptT) -import Control.Monad.IO.Class -import Control.Monad.State.Class as State -import Control.Monad.State.Strict as StateStrict (evalState, evalStateT, runStateT, StateT(..)) -import Control.Monad.Trans.Class -import Data.Binary.Get as Get -import Data.Binary.Put as Put -import Data.Bits -import Data.Bool -import Data.ByteString (ByteString) -import Data.ByteString qualified as BS -import Data.ByteString.Base16 qualified as B16 -import Data.ByteString.Char8 qualified as BS8 -import Data.ByteString.Lazy qualified as BSL -import Data.Constraint (Dict(..)) -import Data.Foldable as F -import Data.Function -import Data.Functor -import Data.Generics.Labels () -import Data.Generics.Product.Fields () -import Data.Generics.Sum.Constructors -import Data.Kind -import Data.List qualified as List -import Data.List.NonEmpty qualified as NE -import Data.Map.Strict as Map -import Data.Maybe -import Data.Proxy -import Data.String.Conversions as X (cs) -import Data.Text (Text) -import Data.Typeable -import Data.Word -import GHC.Generics ((:*:) (..), Generic (..), K1 (..), M1 (..)) -import GHC.Generics qualified as Generics -import GHC.TypeLits -import Lens.Micro.Platform -import Streaming -import System.Random.MWC -import UnliftIO.STM - -import HBS2.Net.Dialog.Helpers.List - -type Frames = Frames' ByteString -newtype Frames' a = Frames { unFrames :: [a] } - deriving stock (Generic,Eq) - deriving newtype (Functor, Foldable, Semigroup, Monoid - -- , IsList - ) - - -instance Show Frames where - show (Frames xs) = "Frames " <> show (BS.length <$> xs) - -- <> " " <> show (fmap B16.encode xs) - <> " " <> (show . fmap (limitSize 42)) xs - - where - limitSize n as = bool as (BS.take (n-3) as <> "...") (BS.length as > n) - -framesBodyPart :: Traversal' Frames [ByteString] -framesBodyPart = #unFrames . tailAfterP (== "") - -tailAfterP :: forall a . (a -> Bool) -> Traversal' [a] [a] -tailAfterP p focus = fix \go -> \case - x:xs -> (x :) <$> bool go focus (p x) xs - xs -> pure xs - ---- - -encodeFrames :: Frames -> ByteString --- encodeFrames :: Foldable t => t ByteString -> ByteString -encodeFrames = F.toList >>> BSL.toStrict . runPut . \case - - [] -> pure () - - xss -> flip fix xss \go -> \case - [] -> pure () - bs:xs -> do - let (flip shiftR 1 -> n1, ns) = unfoldSizeBytes @Word64 . flip shiftL 1 . fromIntegral . BS.length $ bs - - putWord8 $ n1 - & bool (sbit 7) id (List.null xs) - & bool (sbit 6) id (List.null ns) - - forM_ (markMore ns) \(n, isMoreBytesInSize) -> do - putWord8 $ n & bool (zbit 7) (sbit 7) isMoreBytesInSize - - putByteString bs - - go xs - - where - - markMore as = zip as ((True <$ List.drop 1 as) <> [False]) - - unfoldSizeBytes :: (Bits n, Integral n) => n -> (Word8, [Word8]) - unfoldSizeBytes = (\(a NE.:| as) -> (a, as)) . NE.unfoldr \w -> - ( (flip shiftR 1 . flip shiftL 1 . fromIntegral) w - , let w' = shiftR w 7 - in bool Nothing (Just w') (w' > 0) - ) - -decodeFrames :: MonadError String m => ByteString -> m Frames -decodeFrames = \case - "" -> pure mempty - - bs' -> (bs' &) $ BSL.fromStrict >>> either (throwError . view _3) (pure . Frames . view _3) - <$> runGetOrFail do - - fix \go -> do - - j <- getWord8 - - bsize <- - flip fix (6, j) \fu (b, j') -> do - let n = (fromIntegral . clearLeftBits (8-b)) j' - if tbit b j' - then (n +) . flip shiftL b <$> (fu . (7, ) =<< getWord8) - else pure n - - bs <- getByteString bsize - - let moreFrames = tbit 7 j - - if moreFrames - then (bs :) <$> go - else pure [bs] - - where - clearLeftBits n = flip shiftR n . flip shiftL n - tbit = flip testBit - - -devDialogCore :: IO () -devDialogCore = do - display (Frames []) - display (Frames [""]) - display (Frames [BS.replicate 32 0x55]) - display (Frames [BS.replicate 32 0x55, ""]) - display (Frames [BS.replicate 32 0x55, "\3\3"]) - display (Frames [BS.replicate 33 0x55, "\3\3"]) - display (Frames [BS.replicate 63 0x55]) - display (Frames [BS.replicate 64 0x55]) - -- display (Frames [BS.replicate 65 0x55]) - display (Frames ["\8\8\8","\4\4\4"]) - display (Frames ["","\1"]) - where - display a = do - putStrLn . cs . show . B16.encode . encodeFrames $ a - putStrLn "" - - - - -sbit :: (Bits n) => Int -> n -> n -sbit = flip setBit - -zbit :: (Bits n) => Int -> n -> n -zbit = flip clearBit - ---- - -decodeFramesFail :: (MonadFail m) => ByteString -> m Frames -decodeFramesFail = errorToFail . decodeFrames - ---- - -errorToFailT :: (MonadFail m) => ExceptT String m a -> m a -errorToFailT = either fail pure <=< runExceptT - -errorToFail :: MonadFail m => Except String a -> m a -errorToFail = either fail pure . runExcept - -errorShowToFail :: (MonadFail m, Show s) => Except s a -> m a -errorShowToFail = either (fail . show) pure . runExcept - --- - -data CallerID = CallerID - { unCallerIDV :: Word8 - , unCallerID :: Word32 - } - deriving stock (Generic, Eq, Ord) - -instance Serialise CallerID - -newCallerID :: forall m. MonadIO m => m CallerID -newCallerID = liftIO $ withSystemRandomST \g -> - CallerID <$> uniformM g <*> uniformM g - ---- - -newtype CallerHandler m = CallerHandler - { unCallerHandler :: Frames -> m () - } - -newtype CallerEnv m = CallerEnv - { unCallerEnv :: TVar (Map CallerID (CallerHandler m)) } - -newCallerEnv :: MonadIO m => m (CallerEnv m') -newCallerEnv = CallerEnv <$> newTVarIO mempty - ---- - -newtype RequestResponseHandler m = RequestResponseHandler - { unRequestResponseHandler :: Frames -> m () - } - -newtype RequestResponseEnv m = RequestResponseEnv - { unRequestResponseEnv :: TVar (Map RequestID (RequestResponseHandler m)) - } - -newRequestResponseEnv :: MonadIO m => m (RequestResponseEnv m') -newRequestResponseEnv = - RequestResponseEnv <$> newTVarIO mempty - ---- - -data DClient m p i = DClient - { clientCallerEnv :: CallerEnv m - , clientSendProtoRequest :: p -> Frames -> m () - , clientGetKnownPeers :: m [(p, i)] - } - ---- - -newtype RequestID = RequestID { unRequestID :: Word32 } - deriving stock (Generic, Eq, Ord) - deriving newtype (Serialise, Num, Enum) - -- deriving via TODO_GenericVLQ Put Get - -data RequestResult - = RequestDone - -- | RequestSuccessIncomplete - | RequestTimeout - | RequestFailure ResponseStatus Frames - | RequestErrorBadResponse BadResponse Frames - deriving stock (Generic, Eq, Show) - -data BadResponse - = ResponseErrorNoResponseHeader - | ResponseInsufficientFrames - | ResponseParseError DeserialiseFailure - deriving stock (Generic, Eq, Show) - ---- - -setupCallerEnv :: MonadIO m => CallerEnv m' -> CallerID -> CallerHandler m' -> m () -setupCallerEnv env callerID repHandleEnv = - (atomically . modifyTVar' (unCallerEnv env)) - (at callerID ?~ repHandleEnv) - -clearCallerEnv :: MonadIO m => CallerEnv m' -> CallerID -> m () -clearCallerEnv env callerID = - (atomically . modifyTVar' (unCallerEnv env)) - (at callerID .~ Nothing) - -findCallerHandler :: MonadIO m => CallerEnv m' -> CallerID -> m (Maybe (CallerHandler m')) -findCallerHandler CallerEnv{..} callerID = - readTVarIO unCallerEnv <&> preview (ix callerID) - ---- - -setupRepHandler :: MonadIO m => RequestResponseEnv m' -> RequestID -> RequestResponseHandler m' -> m () -setupRepHandler RequestResponseEnv{..} requestID useResponse = - (atomically . modifyTVar' unRequestResponseEnv) - (at requestID ?~ useResponse) - -clearRepHandler :: MonadIO m => RequestResponseEnv m' -> RequestID -> m () -clearRepHandler RequestResponseEnv{..} requestID = - (atomically . modifyTVar' unRequestResponseEnv) - (at requestID .~ Nothing) - -findRepHandler :: MonadIO m => RequestResponseEnv m' -> RequestID -> m (Maybe (RequestResponseHandler m')) -findRepHandler RequestResponseEnv{..} requestID = - readTVarIO unRequestResponseEnv <&> preview (ix requestID) - ---- - -data DialogRequestEnv m p pd = DialogRequestEnv - { dreqEnvPeer :: p - , dreqEnvGetPeerData :: m pd - } - --- data DialogRequestError --- = DialogRequestFailure String --- deriving stock (Show) --- instance Exception DialogRequestError - ---- - --- type Application = Request -> (Response -> IO ResponseReceived) -> IO ResponseReceived -type DApp m = Frames -> (Frames -> m ()) -> m () - -mkDApp :: - forall spec ctx m io. - ( Monad m - , Monad io - , HasHandler m (NamedSpec spec) ctx - , HasHandler io (NamedSpec spec) ctx - ) - => Proxy (NamedSpec spec) - -> Ctx ctx - -> (forall x. m x -> DialHandlerT io x) - -> spec (ModeServerT m) - -> DApp io -mkDApp p ctx ntToDialHandlerTn hd = routeDialogRequest rr - where - rr :: DialogRequestRouter io - rr = route p ctx - $ hoistDialogWithContext p (Proxy @ctx) ntToDialHandlerTn - hd - -type DialogReplyHandler m = (Frames -> m ()) -> m () - -type DialogRequestRouter (m :: Type -> Type) = - DialogRequestRoutes (DialogReplyHandler m) - -data DialogRequestRoutes (h :: Type) - = DialogRequestPaths (Map ByteString (DialogRequestRoutes h)) - | DialogRequestPreparse (Frames -> Either Text (DialogRequestRoutes h, Frames)) - | DialogRequestEndpoint h - deriving (Generic, Functor) - -instance Semigroup (DialogRequestRoutes h) where - (<>) a b = case (a, b) of - (DialogRequestPaths p1, DialogRequestPaths p2) -> - DialogRequestPaths (p1 <> p2) - _ -> b - --- instance Monoid (DialogRequestRoutes h) where --- mempty = DialogRequestPaths mempty - -dialogRequestRoutes - :: ListBuilder - ([ByteString], Frames -> Either Text ((Frames -> m ()) -> m (), Frames)) - -> DialogRequestRouter m -dialogRequestRoutes = List.foldl1' (<>) - . fmap toPaths - . over (traverse . _2) (DialogRequestPreparse . (fmap . fmap) (over _1 DialogRequestEndpoint)) - . buildList - where - toPaths :: ([ByteString], DialogRequestRoutes ((Frames -> m ()) -> m ())) - -> DialogRequestRoutes (DialogReplyHandler m) - toPaths = fix \go (ps, rr) -> case ps of - [] -> rr - [p] -> DialogRequestPaths (Map.singleton p rr) - p:px' -> DialogRequestPaths (Map.singleton p (go (px', rr))) - -hand :: Monad m => a -> b -> ListBuilderT m (a, b) -hand = curry li - -handconv :: (Monad m, Monad m', Serialise req, Serialise resp) - => a - -> Text - -> (req -> ExceptT ResponseStatus m resp) - -> ListBuilderT m' (a, Frames -> Either Text ((Frames -> m ()) -> m (), Frames)) -handconv path msg h = - hand path $ processReply msg h - ---- - -processReply :: forall m m' req resp . - ( Monad m - , Serialise req - , Serialise resp - , m' ~ ExceptT ResponseStatus m - ) - => Text - -> (req -> m' resp) - -> Frames - -> Either Text ((Frames -> m ()) -> m (), Frames) -processReply msg h = runExcept . runStateT do - flip runReply . h <$> cutFrameDecode msg - -runReply :: - ( Monad m - , Serialise a - ) - => (Frames -> m r) - -> ExceptT ResponseStatus m a - -> m r -runReply reply = - either - (\e -> reply (Frames [serialiseS (ResponseHeader e 0)])) - (\a -> reply (Frames [serialiseS (ResponseHeader (ResponseStatus Success200 "") 0) - , serialiseS a - ]) - ) - <=< runExceptT - ---- - -dpath :: Text -> [ByteString] -> Frames -dpath path = Frames . (cs path :) - ---- - -addEnvelope :: Monoid a => [a] -> Frames' a -> Frames' a -addEnvelope en = over #unFrames ((en <> [mempty]) <>) - -splitEnvelope :: (Monoid a, Eq a) => Frames' a -> ([a], Frames' a) -splitEnvelope = fmap (Frames . List.drop 1) . List.break (== mempty) . unFrames - -data ResponseHeader = ResponseHeader - { respStatus :: ResponseStatus - , respSeqNumber :: Int - } - deriving (Generic, Show, Eq) - -instance Serialise ResponseHeader - -data ResponseStatus = ResponseStatus - { responseStatusCode :: ResponseStatusCode - , responseStatusMessage :: Text - } - deriving (Generic, Show, Eq) - -instance Serialise ResponseStatus - -data ResponseStatusCode - = Success200 - | SuccessNoContent204 - | SuccessMore - | BadRequest400 - | Forbidden403 - | NotFound404 - deriving (Generic, Show, Eq) - -instance Serialise ResponseStatusCode - -routerSignature :: Word8 -routerSignature = 1 - -routeDialogRequest :: forall m . - Monad m - => DialogRequestRouter m - -> Frames - -> (Frames -> m ()) - -> m () -routeDialogRequest router frames rawReplyToPeer = do - -- error $ show router - erun <- pure $ runExcept $ flip evalStateT req do - - signature <- cutFrameDecode - (ResponseStatus BadRequest400 "No signature in request") - - when (signature /= routerSignature) $ throwError - (ResponseStatus BadRequest400 "Wrong signature in request") - - path <- cutFrameOr - (ResponseStatus BadRequest400 "No path in request") - - lift . ExceptT . pure - -- Если не может разобрать параметры запроса, - -- то самим ответить этому пиру '404' - -- . left (ResponseStatus BadRequest400) - . travel (BS8.split '/' path) router - -- передать оставшуюся часть запроса в хэндлер - =<< get - - case erun of - Left rs -> replyToPeer (Frames [serialiseS (ResponseHeader rs 0)]) - Right go -> - -- передать хэндлеру продолжение чтобы ответить этому пиру - go replyToPeer - - where - (backPath, req) = splitEnvelope frames - - replyToPeer :: Frames -> m () - replyToPeer = rawReplyToPeer . over #unFrames (backPath <>) - -travel :: () - => [ByteString] - -> DialogRequestRouter m - -> Frames - -> Either ResponseStatus ((Frames -> m ()) -> m ()) -travel path'' router'' = evalStateT $ flipfix2 path'' router'' - \go path -> \case - DialogRequestPaths kv -> case path of - step:path' -> - maybe - (throwError (ResponseStatus BadRequest400 "Path not found")) - (go path') - (Map.lookup step kv) - _ -> throwError (ResponseStatus BadRequest400 "Path not found (too long)") - DialogRequestPreparse hfx -> - go path =<< StateT (left (ResponseStatus BadRequest400) . hfx) - DialogRequestEndpoint ep -> pure ep - -flipfix2 :: a -> b -> ((a -> b -> c) -> (a -> b -> c)) -> c -flipfix2 a b f = fix f a b - -cutFrameDecode :: (Serialise b, MonadState Frames m, MonadError e m) => e -> m b -cutFrameDecode e = - State.gets unFrames >>= \case - x:xs -> - (either (const (throwError e)) pure . deserialiseOrFailS) x - <* State.put (Frames xs) - _ -> throwError e - -cutFrameDecode' - :: (Serialise b, MonadState Frames m, MonadError (Maybe DeserialiseFailure) m) - => m b -cutFrameDecode' = - State.gets unFrames >>= \case - x:xs -> - (either (throwError . Just) pure . deserialiseOrFailS) x - <* State.put (Frames xs) - _ -> throwError Nothing - -cutFrameOr :: (MonadState (Frames' b) m, MonadError e m) => e -> m b -cutFrameOr e = - State.gets unFrames >>= \case - x:xs -> x <$ State.put (Frames xs) - _ -> throwError e - -serialiseS :: Serialise a => a -> ByteString -serialiseS = BSL.toStrict . serialise - -deserialiseOrFailS :: Serialise a => ByteString -> Either DeserialiseFailure a -deserialiseOrFailS = deserialiseOrFail . BSL.fromStrict - -fromMaybeM :: Applicative m => m a -> Maybe a -> m a -fromMaybeM ma = maybe ma pure - -fromJustThrowError :: MonadError e m => e -> Maybe a -> m a -fromJustThrowError = fromMaybeM . throwError - - - ------------------------------------------- ---- Type-level specification ------------- ------------------------------------------- - - -data ReqCBOR (a :: Type) -data SingleAck -data SingleRespCBOR (a :: Type) -data StreamingRespCBOR (a :: Type) - -data NamedSpec (spec :: Type -> Type) - -class DialMode mode where - type mode &- spec :: Type -infixl 0 &- - -data (path :: k) &/ (a :: Type) - deriving (Typeable) -infixr 4 &/ - -type path &// a = path &/ NamedSpec a -infixr 4 &// - ---- - -data ModePlain -instance DialMode ModePlain where - type ModePlain &- spec = spec - ---- - -data ModeServerT (m :: Type -> Type) - -instance DialMode (ModeServerT m) where - type ModeServerT m &- spec = HandlerD spec m - -class HasHandler m spec ctx where - type HandlerD spec (m' :: Type -> Type) :: Type - - route :: - Proxy spec - -> Ctx ctx - -> HandlerD spec (DialHandlerT m) - -> DialogRequestRouter m - - hoistDialogWithContext - :: Proxy spec - -> Proxy ctx - -> (forall x. m x -> n x) - -> HandlerD spec m - -> HandlerD spec n - -data EmptyCX -- '[] -data Ctx ctx where - EmptyCtx :: Ctx EmptyCX - -- (:&.) :: x -> Ctx xs -> Ctx (x ': xs) --- infixr 5 :&. - --- hoistTRouter :: forall t m n . --- (MonadTrans t, Monad m, Monad n, m ~ t n) --- => (forall a . m a -> n a) --- -> DialogRequestRouter m --- -> DialogRequestRouter n --- hoistTRouter nt = fmap nt' --- where --- nt' :: ((x -> m y) -> m y) --- -> ((x -> n y) -> n y) --- nt' xtmy_tmy = nt . xtmy_tmy . fmap lift - -hoistTRouter :: forall m n . - (Monad m, Monad n) - => (forall a . m a -> n a) - -> (forall a . n a -> m a) - -> DialogRequestRouter m - -> DialogRequestRouter n -hoistTRouter ntf ntb = fmap nt' - where - nt' :: ((x -> m y) -> m y) - -> ((x -> n y) -> n y) - nt' xtmy_tmy = ntf . xtmy_tmy . fmap ntb - - -type DialHandlerIO a = DialHandlerT IO a -newtype DialHandlerT m a = DialHandlerT { runDialHandlerT :: ExceptT ResponseStatus m a } - deriving - ( Generic, Functor, Applicative, Monad - , MonadIO - , MonadTrans - , MonadError ResponseStatus - -- , MonadUnliftIO - -- , MonadThrow, MonadCatch, MonadMask - ) - ---- - -instance (KnownSymbol path, HasHandler m spec ctx) => HasHandler m (path &/ spec) ctx where - type HandlerD (path &/ spec) m = HandlerD spec m - - route _ ctx h = DialogRequestPaths $ - Map.singleton (cs (symbolVal (Proxy @path))) (route (Proxy @spec) ctx h) - - hoistDialogWithContext _ = hoistDialogWithContext (Proxy @spec) - ---- - -instance - ( Serialise a - , Typeable a - , HasHandler m spec ctx - ) => - HasHandler m (ReqCBOR a &/ spec) ctx where - type HandlerD (ReqCBOR a &/ spec) m = a -> HandlerD spec m - - route _ ctx (ha :: a -> HandlerD spec (DialHandlerT m)) = - DialogRequestPreparse \fx -> do - (a, fx') - <- runExcept - $ flip runStateT fx - $ cutFrameDecode ((cs . show . typeRep) (Proxy @a)) - pure (route (Proxy @spec) ctx (ha a), fx') - - hoistDialogWithContext _ pc nt s = hoistDialogWithContext (Proxy @spec) pc nt . s - ---- - -instance - ( Applicative m - ) => - HasHandler m SingleAck ctx where - type HandlerD SingleAck m = m () - - route _ _ctx _mx = - DialogRequestEndpoint \reply -> do - reply (Frames [serialiseS (ResponseHeader (ResponseStatus SuccessNoContent204 "") 0)]) - - hoistDialogWithContext _ _ nt hdlM = nt hdlM - ---- - -instance - ( Monad m - , Serialise a - ) => - HasHandler m (SingleRespCBOR a) ctx where - type HandlerD (SingleRespCBOR a) m = m a - - route _ _ctx ma = - DialogRequestEndpoint \reply -> do - - ea <- runExceptT $ runDialHandlerT ma - - case ea of - Left e -> reply $ Frames [ serialiseS e ] - Right a -> reply $ Frames - [ serialiseS (ResponseHeader (ResponseStatus Success200 "") 0) - , serialiseS (a :: a) - ] - - hoistDialogWithContext _ _ nt hdlM = nt hdlM - ---- - -instance - ( Serialise a - ) => - HasHandler m (StreamingRespCBOR a) ctx where - type HandlerD (StreamingRespCBOR a) m = Stream (Of a) m () - - route = undefined - - -- hoistDialogWithContext = undefined - ---- - -type GServerConstraints spec m = - ( GToProduct (Rep (spec (ModeServerT m))) ~ HandlerD (GToProduct (Rep (spec ModePlain))) m - , GProduct (Rep (spec (ModeServerT m))) - ) - -class GServer (spec :: Type -> Type) (m :: Type -> Type) where - gServerProof :: Dict (GServerConstraints spec m) - -instance - ( GToProduct (Rep (spec (ModeServerT m))) ~ HandlerD (GToProduct (Rep (spec ModePlain))) m - , GProduct (Rep (spec (ModeServerT m))) - ) => GServer spec m where - gServerProof = Dict - - -instance - ( HasHandler m (GToProduct (Rep (spec ModePlain))) ctx - -- , HasHandler m (GToProduct (Rep (spec (ModeServerT m)))) ctx - -- , GProduct (Rep (spec ModePlain)) - , forall q . Generic (spec (ModeServerT q)) - , forall q . GServer spec q - ) => - HasHandler m (NamedSpec spec) ctx where - type HandlerD (NamedSpec spec) m = spec (ModeServerT m) - - route :: - Proxy (NamedSpec spec) - -> Ctx ctx - -> spec (ModeServerT (DialHandlerT m)) - -> DialogRequestRouter m - route _ ctx spec = - case gServerProof @spec @(DialHandlerT m) of - Dict -> route (Proxy @(GToProduct (Rep (spec ModePlain)))) ctx (toProduct spec) - - hoistDialogWithContext - :: forall n. Proxy (NamedSpec spec) - -> Proxy ctx - -> (forall x. m x -> n x) - -> spec (ModeServerT m) - -> spec (ModeServerT n) - hoistDialogWithContext _ pctx nat dl = - case (gServerProof @spec @m, gServerProof @spec @n) of - (Dict, Dict) -> - fromProduct dlN - where - dlM :: HandlerD (GToProduct (Rep (spec ModePlain))) m = - toProduct dl - dlN :: HandlerD (GToProduct (Rep (spec ModePlain))) n = - hoistDialogWithContext (Proxy @(GToProduct (Rep (spec ModePlain)))) pctx nat dlM - - -toProduct :: (Generic (spec mode), GProduct (Rep (spec mode))) - => spec mode -> GToProduct (Rep (spec mode)) -toProduct = gtoProduct . Generics.from - -fromProduct - :: (Generic (spec mode), GProduct (Rep (spec mode))) - => GToProduct (Rep (spec mode)) -> spec mode -fromProduct = Generics.to . gfromProduct - -instance - ( HasHandler m speca ctx - , HasHandler m specb ctx - ) => - HasHandler m (GP speca specb) ctx where - type HandlerD (GP speca specb) m = GP (HandlerD speca m) (HandlerD specb m) - route _ ctx (GP speca specb) = - route (Proxy @speca) ctx speca - <> route (Proxy @specb) ctx specb - - hoistDialogWithContext _ pc nt (GP speca specb) = - GP - (hoistDialogWithContext (Proxy @speca) pc nt speca) - (hoistDialogWithContext (Proxy @specb) pc nt specb) - -data GP a b = GP a b - -class GProduct f where - type GToProduct (f :: Type -> Type) - gtoProduct :: f p -> GToProduct f - gfromProduct :: GToProduct f -> f p - -instance (GProduct l, GProduct r) => GProduct (l :*: r) where - type GToProduct (l :*: r) = GP (GToProduct l) (GToProduct r) - gtoProduct (l :*: r) = GP (gtoProduct l) (gtoProduct r) - gfromProduct (GP l r) = gfromProduct l :*: gfromProduct r - -instance GProduct f => GProduct (M1 i c f) where - type GToProduct (M1 i c f) = GToProduct f - gtoProduct = gtoProduct . unM1 - gfromProduct = M1 . gfromProduct - -instance GProduct (K1 i c) where - type GToProduct (K1 i c) = c - gtoProduct = unK1 - gfromProduct = K1 diff --git a/hbs2-core/lib/HBS2/Net/Proto/Dialog.hs b/hbs2-core/lib/HBS2/Net/Proto/Dialog.hs index 925980bb..76523679 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Dialog.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Dialog.hs @@ -3,24 +3,39 @@ module HBS2.Net.Proto.Dialog ( module HBS2.Net.Proto.Dialog -, module HBS2.Net.Dialog.Core -, module HBS2.Net.Dialog.Client +, module Dialog.Core +, module Dialog.Client ) where +import HBS2.Actors.Peer +import HBS2.Clock +import HBS2.Data.Types +import HBS2.Net.Auth.Credentials +import HBS2.Net.Proto +import HBS2.Net.Proto.Peer +import HBS2.Net.Proto.Sessions +import HBS2.Prelude.Plated hiding (at) +import HBS2.System.Logger.Simple + import Codec.Serialise (deserialiseOrFail) import Control.Arrow import Control.Monad +import Control.Monad.Error.Class +import Control.Monad.IO.Unlift import Data.ByteString (ByteString) +import Data.ByteString qualified as BS +import Data.ByteString.Builder import Data.ByteString.Lazy qualified as BSL -import Data.Generics.Product.Fields () -import Data.Kind +import Data.List qualified as List +import Data.Map.Strict as Map import Lens.Micro.Platform +import Streaming as S +import Streaming.Prelude qualified as S +import UnliftIO.Exception +import UnliftIO.STM -import HBS2.Data.Types -import HBS2.Net.Dialog.Client -import HBS2.Net.Dialog.Core -import HBS2.Net.Proto -import HBS2.Prelude.Plated hiding (at) +import Dialog.Client +import Dialog.Core --- @@ -47,7 +62,7 @@ dialRespEncode = \case --- -newtype DialogProtoEnv m e = DialogProtoEnv +data DialogProtoEnv m e = DialogProtoEnv { dialogProtoEnvCallerEnv :: CallerEnv m } @@ -61,12 +76,12 @@ newDialogProtoEnv = do -- Adapters should share the same env -data DialReqProtoAdapter e (m :: Type -> Type) s = DialReqProtoAdapter - { dialReqProtoAdapterDApp :: DApp IO - , dialReqProtoAdapterNT :: Peer e -> forall a . m a -> IO a +data DialReqProtoAdapter e (m :: * -> *) s = DialReqProtoAdapter + { dialReqProtoAdapterRouter :: DialogRequestRouter m + -- , dialReqProtoAdapterEnv :: DialogProtoEnv e } -newtype DialRespProtoAdapter e (m :: Type -> Type) s = DialRespProtoAdapter +data DialRespProtoAdapter e (m :: * -> *) s = DialRespProtoAdapter { dialRespProtoAdapterEnv :: DialogProtoEnv m e } @@ -83,22 +98,19 @@ dialReqProto :: forall e s m . => DialReqProtoAdapter e m s -> DialReq e -> m () -dialReqProto adapter = unDialReq >>> \frames -> do +dialReqProto DialReqProtoAdapter{..} = unDialReq >>> \frames -> do peer <- thatPeer dialReqProtoProxy - -- let dialReqEnv :: DialogRequestEnv m (Peer e) (Maybe (PeerData e)) - -- dialReqEnv = DialogRequestEnv - -- { dreqEnvPeer = peer - -- , dreqEnvGetPeerData = pure Nothing -- undefined -- find (KnownPeerKey peer) id - -- } + let dialReqEnv :: DialogRequestEnv m (Peer e) (Maybe (PeerData e)) + dialReqEnv = DialogRequestEnv + { dreqEnvPeer = peer + , dreqEnvGetPeerData = pure Nothing -- undefined -- find (KnownPeerKey peer) id + } let replyToPeer :: Frames -> m () replyToPeer = request peer . DialResp @e - let replyToPeerIO :: Frames -> IO () - replyToPeerIO = dialReqProtoAdapterNT adapter peer <$> replyToPeer - - liftIO $ (dialReqProtoAdapterDApp adapter) frames replyToPeerIO + routeDialogRequest dialReqProtoAdapterRouter dialReqEnv replyToPeer frames where dialReqProtoProxy = Proxy @(DialReq e) @@ -115,7 +127,7 @@ dialRespProto :: forall e s m . -> DialResp e -> m () dialRespProto DialRespProtoAdapter{..} = unDialResp >>> unFrames >>> \case - "": _xs -> do + "": xs -> do -- Ответили прямо нам сюда. Нужно как-то отреагировать на xs pure () diff --git a/hbs2-core/lib/HBS2/Net/Proto/EncryptionHandshake.hs b/hbs2-core/lib/HBS2/Net/Proto/EncryptionHandshake.hs index 2532e2e2..37964767 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/EncryptionHandshake.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/EncryptionHandshake.hs @@ -15,7 +15,6 @@ import HBS2.System.Logger.Simple import Crypto.Saltine.Core.Box qualified as Encrypt import Data.ByteString qualified as BS -import Data.Hashable hiding (Hashed) import Data.String.Conversions (cs) import Lens.Micro.Platform @@ -69,10 +68,6 @@ sendBeginEncryptionExchange creds ourpubkey peer = do data EncryptionHandshakeAdapter e m s = EncryptionHandshakeAdapter { encHandshake_considerPeerAsymmKey :: Peer e -> Maybe Encrypt.PublicKey -> m () - - , encAsymmetricKeyPair :: AsymmKeypair (Encryption e) - - , encGetEncryptionKey :: EncryptionKeyIDKey e -> m (Maybe (CommonSecret (Encryption e))) } @@ -93,10 +88,11 @@ encryptionHandshakeProto :: forall e s m . , Show (Nonce ()) ) => EncryptionHandshakeAdapter e m s + -> PeerEnv e -> EncryptionHandshake e -> m () -encryptionHandshakeProto EncryptionHandshakeAdapter{..} = \case +encryptionHandshakeProto EncryptionHandshakeAdapter{..} penv = \case ResetEncryptionKeys -> do peer <- thatPeer proto @@ -108,7 +104,7 @@ encryptionHandshakeProto EncryptionHandshakeAdapter{..} = \case encHandshake_considerPeerAsymmKey peer Nothing creds <- getCredentials @s - ourpubkey <- pure $ pubKeyFromKeypair @s $ encAsymmetricKeyPair + let ourpubkey = pubKeyFromKeypair @s $ view envAsymmetricKeyPair penv sendBeginEncryptionExchange @e creds ourpubkey peer BeginEncryptionExchange theirsign theirpubkey -> do @@ -121,7 +117,7 @@ encryptionHandshakeProto EncryptionHandshakeAdapter{..} = \case -- взять свои ключи creds <- getCredentials @s - ourpubkey <- pure $ pubKeyFromKeypair @s $ encAsymmetricKeyPair + let ourpubkey = pubKeyFromKeypair @s $ view envAsymmetricKeyPair penv -- подписать нонс let sign = makeSign @s (view peerSignSk creds) ((cs . serialise) ourpubkey) diff --git a/hbs2-core/lib/HBS2/Net/Proto/Peer.hs b/hbs2-core/lib/HBS2/Net/Proto/Peer.hs index fd99f402..c9b6ac23 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Peer.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Peer.hs @@ -7,7 +7,6 @@ import HBS2.Actors.Peer import HBS2.Data.Types import HBS2.Events import HBS2.Net.Proto -import HBS2.Net.Proto.Types import HBS2.Clock import HBS2.Net.Proto.Sessions import HBS2.Prelude.Plated @@ -229,36 +228,3 @@ instance ( Serialise (PubKey 'Sign (Encryption e)) => Serialise (PeerHandshake e) - ---- - -data EncryptionKeyIDKey e = - EncryptionKeyIDKey - { ekeyIDPeerSignKey :: PubKey 'Sign (Encryption e) - , ekeyIDPeerNonce :: PeerNonce - } - deriving (Generic) - -deriving instance - ( Show (PubKey 'Sign (Encryption e)) - , Show (Nonce ()) - ) => Show (EncryptionKeyIDKey e) - -deriving instance - ( Eq (PubKey 'Sign (Encryption e)) - , Eq (Nonce ()) - ) => Eq (EncryptionKeyIDKey e) - -instance ( - Hashable (PubKey 'Sign (Encryption e)) - , Hashable (Nonce ()) - ) => Hashable (EncryptionKeyIDKey e) where - hashWithSalt s EncryptionKeyIDKey {..} = - hashWithSalt s (ekeyIDPeerSignKey, ekeyIDPeerNonce) - -encryptionKeyIDKeyFromPeerData :: PeerData e -> EncryptionKeyIDKey e -encryptionKeyIDKeyFromPeerData PeerData{..} = - EncryptionKeyIDKey - { ekeyIDPeerSignKey = _peerSignKey - , ekeyIDPeerNonce = _peerOwnNonce - } diff --git a/hbs2-core/lib/HBS2/Net/Proto/Types.hs b/hbs2-core/lib/HBS2/Net/Proto/Types.hs index 7f95a11b..f2680a08 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Types.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Types.hs @@ -28,11 +28,6 @@ import Control.Monad.Trans.Maybe -- e -> Transport (like, UDP or TChan) -- p -> L4 Protocol (like Ping/Pong) -data CryptoAction = Sign | Encrypt - -type family PubKey ( a :: CryptoAction) e :: Type -type family PrivKey ( a :: CryptoAction) e :: Type - type family Encryption e :: Type -- FIXME: move-to-a-crypto-definition-modules @@ -211,3 +206,4 @@ instance FromStringMaybe (PeerAddr L4Proto) where instance Serialise L4Proto instance Serialise (PeerAddr L4Proto) + diff --git a/hbs2-core/test/DialogSpec.hs b/hbs2-core/test/DialogSpec.hs index 59222f5e..1a8e8913 100644 --- a/hbs2-core/test/DialogSpec.hs +++ b/hbs2-core/test/DialogSpec.hs @@ -14,8 +14,8 @@ import GHC.Generics (Generic) import Lens.Micro.Platform import System.IO -import HBS2.Net.Dialog.Core -import HBS2.Net.Dialog.Helpers.List +import Dialog.Core +import Dialog.Helpers.List newtype BSA = BSA { unBSA :: ByteString } deriving (Generic, Show) @@ -57,7 +57,3 @@ testDialog = testGroup "dialog" $ buildList do property' "roundtrip encode Frames" \ xs -> (decodeFrames . encodeFrames) xs == Right xs - property' "encodeFrames is quasidistributive over mappend" \ (xs, ys) -> - BS.drop (BS.length (encodeFrames xs)) (encodeFrames (xs <> ys)) - == encodeFrames ys - diff --git a/hbs2-peer/app/EncryptionKeys.hs b/hbs2-peer/app/EncryptionKeys.hs index c06dd667..dd524d5d 100644 --- a/hbs2-peer/app/EncryptionKeys.hs +++ b/hbs2-peer/app/EncryptionKeys.hs @@ -56,14 +56,15 @@ encryptionHandshakeWorker :: forall e m s . -- , HasCredentials s m ) => PeerConfig + -> PeerEnv e -> PeerCredentials s -> EncryptionHandshakeAdapter e m s -> m () -encryptionHandshakeWorker pconf creds EncryptionHandshakeAdapter{..} = do +encryptionHandshakeWorker pconf penv creds EncryptionHandshakeAdapter{..} = do -- e :: PeerEnv e <- ask - ourpubkey <- pure $ pubKeyFromKeypair @s $ encAsymmetricKeyPair + let ourpubkey = pubKeyFromKeypair @s $ _envAsymmetricKeyPair penv pl <- getPeerLocator @e @@ -74,9 +75,9 @@ encryptionHandshakeWorker pconf creds EncryptionHandshakeAdapter{..} = do forM_ peers \peer -> do -- Только если ещё не знаем ключ ноды - mencKeyID <- (fmap . fmap) encryptionKeyIDKeyFromPeerData $ - find (KnownPeerKey peer) id - mkey <- join <$> mapM encGetEncryptionKey mencKeyID + mpeerData <- find (KnownPeerKey peer) id + mkey <- liftIO do + join <$> forM mpeerData \peerData -> getEncryptionKey penv peerData case mkey of Just _ -> pure () Nothing -> sendBeginEncryptionExchange @e creds ourpubkey peer diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 0d5b48f5..2f3f3588 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -57,8 +57,7 @@ import RefLog (reflogWorker) import HttpWorker import ProxyMessaging import PeerMain.DialogCliCommand -import PeerMain.Dialog.Server -import PeerMain.Dialog.Spec +import PeerMain.PeerDialog import PeerMeta import CLI.RefChan import RefChan @@ -449,7 +448,7 @@ runPeer :: forall e s . ( e ~ L4Proto , FromStringMaybe (PeerAddr e) , s ~ Encryption e , HasStorage (PeerM e IO) - )=> PeerOpts -> IO () + ) => PeerOpts -> IO () runPeer opts = Exception.handle (\e -> myException e >> performGC @@ -575,35 +574,32 @@ runPeer opts = Exception.handle (\e -> myException e pure $ Just tcpEnv (proxy, penv) <- mdo - proxy <- newProxyMessaging mess tcp >>= \proxy' -> pure proxy' + proxy <- newProxyMessaging mess tcp >>= \peer -> pure peer { _proxy_getEncryptionKey = \peer -> do - mencKeyID <- (fmap . fmap) encryptionKeyIDKeyFromPeerData $ - withPeerM penv $ find (KnownPeerKey peer) id - mkey <- join <$> forM mencKeyID \encKeyID -> - getEncryptionKey proxy encKeyID + mpeerData <- withPeerM penv $ find (KnownPeerKey peer) id + mkey <- join <$> forM mpeerData \peerData -> getEncryptionKey penv peerData case mkey of Nothing -> trace1 $ "ENCRYPTION empty getEncryptionKey" - <+> pretty peer <+> viaShow mencKeyID + <+> pretty peer <+> viaShow mpeerData Just k -> trace1 $ "ENCRYPTION success getEncryptionKey" - <+> pretty peer <+> viaShow mencKeyID <+> viaShow k + <+> pretty peer <+> viaShow mpeerData <+> viaShow k pure mkey , _proxy_clearEncryptionKey = \peer -> do - mencKeyID <- (fmap . fmap) encryptionKeyIDKeyFromPeerData $ - withPeerM penv $ find (KnownPeerKey peer) id - forM_ mencKeyID \encKeyID -> setEncryptionKey proxy peer encKeyID Nothing + mpeerData <- withPeerM penv $ find (KnownPeerKey peer) id + forM_ mpeerData \peerData -> setEncryptionKey penv peer peerData Nothing -- deletePeerAsymmKey brains peer - forM_ mencKeyID \encKeyID -> - deletePeerAsymmKey' brains (show encKeyID) + forM_ mpeerData \peerData -> + deletePeerAsymmKey' brains (show peerData) , _proxy_sendResetEncryptionKeys = \peer -> withPeerM penv do sendResetEncryptionKeys peer , _proxy_sendBeginEncryptionExchange = \peer -> withPeerM penv do sendBeginEncryptionExchange pc - ((pubKeyFromKeypair @s . _proxy_asymmetricKeyPair) proxy) + ((pubKeyFromKeypair @s . view envAsymmetricKeyPair) penv) peer } @@ -691,37 +687,32 @@ runPeer opts = Exception.handle (\e -> myException e ) => EncryptionHandshakeAdapter L4Proto m s encryptionHshakeAdapter = EncryptionHandshakeAdapter { encHandshake_considerPeerAsymmKey = \peer mpubkey -> withPeerM penv do - mencKeyID <- (fmap . fmap) encryptionKeyIDKeyFromPeerData $ - withPeerM penv $ find (KnownPeerKey peer) id + mpeerData <- withPeerM penv $ find (KnownPeerKey peer) id case mpubkey of Nothing -> do - -- trace $ "ENCRYPTION delete key" <+> pretty peer <+> viaShow mencKeyID + -- trace $ "ENCRYPTION delete key" <+> pretty peer <+> viaShow mpeerData -- deletePeerAsymmKey brains peer - forM_ mencKeyID \encKeyID -> - deletePeerAsymmKey' brains (show encKeyID) + forM_ mpeerData \peerData -> + deletePeerAsymmKey' brains (show peerData) Just pk -> do -- emit PeerAsymmInfoKey (PeerAsymmPubKey peer pk) let symmk = genCommonSecret @s - (privKeyFromKeypair @s (_proxy_asymmetricKeyPair proxy)) + (privKeyFromKeypair @s (view envAsymmetricKeyPair penv)) pk - case mencKeyID of + case mpeerData of Nothing -> do -- insertPeerAsymmKey brains peer pk symmk -- insertPeerAsymmKey' brains (show peer) pk symmk - trace $ "ENCRYPTION can not store key. No encKeyID" - <+> pretty peer <+> viaShow mencKeyID - Just encKeyID -> do - liftIO $ setEncryptionKey proxy peer encKeyID (Just symmk) - insertPeerAsymmKey' brains (show encKeyID) pk symmk - - , encAsymmetricKeyPair = _proxy_asymmetricKeyPair proxy - - , encGetEncryptionKey = liftIO . getEncryptionKey proxy + trace $ "ENCRYPTION can not store key. No peerData" + <+> pretty peer <+> viaShow mpeerData + Just peerData -> do + liftIO $ setEncryptionKey penv peer peerData (Just symmk) + insertPeerAsymmKey' brains (show peerData) pk symmk } -- dialReqProtoAdapter <- do - -- dialReqProtoAdapterDApp <- pure dialogRoutes + -- dialReqProtoAdapterRouter <- pure dialogRoutes -- pure DialReqProtoAdapter {..} env <- ask @@ -733,12 +724,11 @@ runPeer opts = Exception.handle (\e -> myException e addPeers @e pl ps subscribe @e PeerExpiredEventKey \(PeerExpiredEvent peer {-mpeerData-}) -> liftIO do - mencKeyID <- (fmap . fmap) encryptionKeyIDKeyFromPeerData $ - withPeerM penv $ find (KnownPeerKey peer) id - forM_ mencKeyID \encKeyID -> setEncryptionKey proxy peer encKeyID Nothing + mpeerData <- withPeerM penv $ find (KnownPeerKey peer) id + forM_ mpeerData \peerData -> setEncryptionKey penv peer peerData Nothing -- deletePeerAsymmKey brains peer - forM_ mencKeyID \encKeyID -> - deletePeerAsymmKey' brains (show encKeyID) + forM_ mpeerData \peerData -> + deletePeerAsymmKey' brains (show peerData) subscribe @e PeerAnnounceEventKey $ \(PeerAnnounceEvent pip nonce) -> do unless (nonce == pnonce) $ do @@ -881,7 +871,7 @@ runPeer opts = Exception.handle (\e -> myException e peerThread "blockDownloadLoop" (blockDownloadLoop denv) peerThread "encryptionHandshakeWorker" - (EncryptionKeys.encryptionHandshakeWorker @e conf pc encryptionHshakeAdapter) + (EncryptionKeys.encryptionHandshakeWorker @e conf penv pc encryptionHshakeAdapter) let tcpProbeWait :: Timeout 'Seconds tcpProbeWait = (fromInteger . fromMaybe 300) (cfgValue @PeerTcpProbeWaitKey conf) @@ -1001,7 +991,7 @@ runPeer opts = Exception.handle (\e -> myException e , makeResponse (blockChunksProto adapter) , makeResponse blockAnnounceProto , makeResponse (withCredentials @e pc . peerHandShakeProto hshakeAdapter penv) - , makeResponse (withCredentials @e pc . encryptionHandshakeProto encryptionHshakeAdapter) + , makeResponse (withCredentials @e pc . encryptionHandshakeProto encryptionHshakeAdapter penv) , makeResponse (peerExchangeProto pexFilt) , makeResponse (refLogUpdateProto reflogAdapter) , makeResponse (refLogRequestProto reflogReqAdapter) @@ -1206,17 +1196,7 @@ runPeer opts = Exception.handle (\e -> myException e } dialReqProtoAdapter <- do - let denv = DialEnv - - let dialReqProtoAdapterDApp = drpcFullDApp denv penv - - -- dialReqProtoAdapterNT :: ResponseM L4Proto (RpcM (ResourceT IO)) a -> IO a - dialReqProtoAdapterNT :: Peer e -> forall a . ResponseM L4Proto (RpcM (ResourceT IO)) a -> IO a - dialReqProtoAdapterNT = \peer -> - runResourceT - . runRPC udp1 - . runResponseM peer - + dialReqProtoAdapterRouter <- pure dialogRoutes pure DialReqProtoAdapter {..} rpc <- async $ runRPC udp1 do diff --git a/hbs2-peer/app/PeerMain/Dialog/Server.hs b/hbs2-peer/app/PeerMain/Dialog/Server.hs deleted file mode 100644 index f7c0bb5e..00000000 --- a/hbs2-peer/app/PeerMain/Dialog/Server.hs +++ /dev/null @@ -1,178 +0,0 @@ -{-# LANGUAGE PolyKinds #-} -{-# Language AllowAmbiguousTypes #-} -{-# LANGUAGE StrictData #-} -{-# LANGUAGE UndecidableInstances #-} -module PeerMain.Dialog.Server where - -import Codec.Serialise -import Control.Monad.Except -import Control.Monad.IO.Class () -import Control.Monad.Reader -import Lens.Micro.Platform - -import HBS2.Actors.Peer -import HBS2.Data.Types.Refs -import HBS2.Hash -import HBS2.Net.Dialog.Core -import HBS2.Net.Proto.RefLog -import HBS2.Net.Proto.Types -import HBS2.Prelude -import HBS2.Storage.Simple - -import PeerMain.Dialog.Spec - ---- - -data DialEnv = DialEnv - -newtype DialT m a = DialT { unDialT :: PeerM L4Proto (ReaderT DialEnv (DialHandlerT m)) a } - deriving - ( Generic, Functor, Applicative, Monad - , MonadIO - , MonadReader (PeerEnv L4Proto) - -- , MonadTrans - -- , MonadError ResponseStatus - -- , MonadThrow, MonadCatch, MonadMask - ) - --- instance Monad m => MonadReader DialEnv (DialT m) where --- ask = DialT . lift $ ask --- local f ma = undefined - -instance Monad m => HasStorage (DialT m) where - getStorage = asks (view envStorage) - -instance MonadTrans DialT where - lift = DialT . lift . lift . lift - -instance Monad m => - MonadError ResponseStatus (DialT m) where - -- {-# MINIMAL throwError, catchError #-} - -- throwError :: e -> m a - throwError = DialT . lift . throwError - -- catchError :: m a -> (e -> m a) -> m a - catchError = undefined - ---- - -runDialTtoDialHandlerT :: MonadIO m => DialEnv -> PeerEnv L4Proto -> DialT m a -> DialHandlerT m a -runDialTtoDialHandlerT denv penv = - flip runReaderT denv . withPeerM penv . unDialT - ---- - -dialogRoutes' :: forall m . - ( MonadIO m - , Serialise (PubKey 'Sign (Encryption L4Proto)) - , FromStringMaybe (PubKey 'Sign (Encryption L4Proto)) - , Hashable (PubKey 'Sign (Encryption L4Proto)) - ) - => PeerEnv L4Proto - -> DialogRequestRouter m -dialogRoutes' penv = dialogRequestRoutes do - - hand ["ping"] \req -> (, req) <$> Right \reply -> do - reply (Frames [serialiseS (ResponseHeader (ResponseStatus Success200 "") 0), "pong"]) - - hand ["spec"] \req -> (, req) <$> Right \reply -> do - undefined - -- let xs = Map.keys (unDialogRequestRouter (dialogRoutes @m penv)) - - -- forM_ (zip (zip [1..] xs) ((True <$ drop 1 xs) <> [False])) \((j,x),isMore) -> do - -- reply (Frames [serialiseS (ResponseHeader (ResponseStatus (bool Success200 SuccessMore isMore) "") j) - -- , BS.intercalate "/" x - -- ]) - - - hand ["debug", "no-response-header"] \req -> (, req) <$> Right \reply -> do - reply (Frames [serialiseS (ResponseHeader (ResponseStatus SuccessMore "") 0), "one"]) - reply (Frames [serialiseS (ResponseHeader (ResponseStatus SuccessMore "") 1), "two"]) - reply (Frames []) - - hand ["debug", "wrong-header"] \req -> (, req) <$> Right \reply -> do - reply (Frames [serialiseS (ResponseHeader (ResponseStatus SuccessMore "") 0), "correct-header"]) - reply (Frames ["wrong-header"]) - - hand ["debug", "timeout"] \req -> (, req) <$> Right \reply -> do - reply (Frames [serialiseS (ResponseHeader (ResponseStatus SuccessMore "") 0), "false more"]) - - - handconv ["reflog", "get"] "ReflogGetReq" \(ReflogGetReq ref) -> do - - sto <- withPeerM penv getStorage - - hash <- maybe (throwError (ResponseStatus NotFound404 "unknown reference")) pure - =<< liftIO do - getRef sto (RefLogKey @(Encryption L4Proto) ref) - - pure (ReflogGetResp hash) - -newtype ReflogGetReq = ReflogGetReq (PubKey 'Sign (Encryption L4Proto)) - deriving (Generic) - -instance Serialise (PubKey 'Sign (Encryption L4Proto)) - => Serialise ReflogGetReq - -newtype ReflogGetResp = ReflogGetResp (Hash HbSync) - deriving (Generic) - -instance Serialise (PubKey 'Sign (Encryption L4Proto)) - => Serialise ReflogGetResp - ---- - -drpcFullDApp :: forall m . - ( MonadIO m - , Unconstraints - ) - => DialEnv -> PeerEnv L4Proto -> DApp m -drpcFullDApp denv penv = - mkDApp - (Proxy @(NamedSpec DialogRPCSpec)) - EmptyCtx - (runDialTtoDialHandlerT denv penv) - -- (drpcFullH :: DialogRPCSpec (ModeServerT (DialT m))) - drpcFullH - -type ConstraintsH m = - ( MonadIO m - , MonadError ResponseStatus m - , HasStorage m - , Unconstraints - ) - -type Unconstraints = - ( Serialise (PubKey 'Sign (Encryption L4Proto)) - , Hashable (PubKey 'Sign (Encryption L4Proto)) - , Show (PubKey 'Sign (Encryption L4Proto)) - , Typeable (PubKey 'Sign (Encryption L4Proto)) - , FromStringMaybe (PubKey 'Sign (Encryption L4Proto)) - ) - -drpcFullH :: ( ConstraintsH m ) - => DialogRPCSpec (ModeServerT m) -drpcFullH = DialogRPCSpec - { drpcPing = pure "pong" - , drpcSpec = pure "tbd" - , drpcReflog = reflogH - } - -reflogH :: ( ConstraintsH m ) - => RPCReflogSpec (ModeServerT m) -reflogH = RPCReflogSpec {..} - where - - reflogGet ref = do - - sto <- getStorage - - hash <- maybe (throwError (ResponseStatus NotFound404 "unknown reference")) pure - =<< liftIO do - getRef sto (RefLogKey @(Encryption L4Proto) ref) - - pure hash - - reflogFetch pk = do - liftIO $ print pk - pure () - diff --git a/hbs2-peer/app/PeerMain/Dialog/Spec.hs b/hbs2-peer/app/PeerMain/Dialog/Spec.hs deleted file mode 100644 index d9dbb898..00000000 --- a/hbs2-peer/app/PeerMain/Dialog/Spec.hs +++ /dev/null @@ -1,35 +0,0 @@ -{-# LANGUAGE PolyKinds #-} -{-# LANGUAGE TypeOperators #-} -{-# LANGUAGE StrictData #-} - -module PeerMain.Dialog.Spec where - --- import Codec.Serialise --- import Streaming -import Data.Text (Text) -import GHC.Generics (Generic) - -import HBS2.Hash -import HBS2.Net.Dialog.Core -import HBS2.Net.Proto.Types - - -data DialogRPCSpec r = DialogRPCSpec - { drpcPing :: r &- "ping" &/ SingleRespCBOR Text - , drpcSpec :: r &- "spec" &/ SingleRespCBOR Text - , drpcReflog :: r &- "reflog" &// RPCReflogSpec - } - deriving (Generic) - -data RPCReflogSpec r = RPCReflogSpec - { reflogGet :: r &- "get" - &/ ReqCBOR (PubKey 'Sign (Encryption L4Proto)) - &/ SingleRespCBOR (Hash HbSync) - - , reflogFetch :: r &- "fetch" - &/ ReqCBOR (PubKey 'Sign (Encryption L4Proto)) - &/ SingleAck - - } - deriving (Generic) - diff --git a/hbs2-peer/app/PeerMain/DialogCliCommand.hs b/hbs2-peer/app/PeerMain/DialogCliCommand.hs index 18fd279a..ff6f27fe 100644 --- a/hbs2-peer/app/PeerMain/DialogCliCommand.hs +++ b/hbs2-peer/app/PeerMain/DialogCliCommand.hs @@ -2,66 +2,109 @@ module PeerMain.DialogCliCommand where -import Data.Generics.Labels -import Data.Generics.Product.Fields +-- import Data.Generics.Labels +-- import Data.Generics.Product.Fields import HBS2.Actors.Peer +import HBS2.Base58 +import HBS2.Clock +import HBS2.Net.Proto.RefLog (RefLogKey(..)) +import HBS2.Defaults +import HBS2.Events import HBS2.Hash +import HBS2.Merkle +import HBS2.Net.Auth.Credentials import HBS2.Net.IP.Addr +import HBS2.Net.Messaging +import HBS2.Net.Messaging.TCP import HBS2.Net.Messaging.UDP +import HBS2.Net.PeerLocator import HBS2.Net.Proto +import HBS2.Net.Proto.Definition import HBS2.Net.Proto.Dialog +import HBS2.Net.Proto.Peer +import HBS2.Net.Proto.PeerAnnounce +import HBS2.Net.Proto.PeerExchange +import HBS2.Net.Proto.PeerMeta +import HBS2.Net.Proto.RefLog +import HBS2.Net.Proto.Sessions +import HBS2.Net.Proto.Types import HBS2.OrDie import HBS2.Prelude +import HBS2.Prelude.Plated +import HBS2.Storage.Simple import HBS2.System.Logger.Simple hiding (info) +import HBS2.System.Logger.Simple qualified as Log +import BlockDownload +import BlockHttpDownload +import Bootstrap +import Brains +import CheckMetrics +import DownloadQ +import HttpWorker import PeerConfig -import RPC (PeerRpcKey, RpcM, runRPC) +import PeerInfo +import PeerMeta +import PeerTypes +import ProxyMessaging +import RefLog (reflogWorker) +import RefLog qualified +import RPC -import Control.Monad.Except (Except(..), ExceptT(..), runExcept, runExceptT) -import Control.Monad.State.Strict (evalStateT) +import Control.Monad +import Control.Monad.IO.Unlift +import Control.Monad.Reader import Control.Monad.Trans.Cont +import Control.Monad.Trans.Maybe +import Crypto.Saltine.Core.Box qualified as Encrypt +import Data.ByteString qualified as BS +import Data.ByteString.Char8 qualified as BS8 +import Data.ByteString.Lazy (ByteString) +import Data.ByteString.Lazy qualified as LBS import Data.Default import Data.Function import Data.Functor -import Data.Kind import Data.List qualified as List +import Data.Map.Strict qualified as Map +import Data.Maybe +import Data.Monoid qualified as Monoid +import Data.Set qualified as Set import Data.String.Conversions as X (cs) import Data.Void (absurd, Void) import Lens.Micro.Platform import Network.Socket import Options.Applicative +import Streaming as S import Streaming.Prelude qualified as S -import System.IO +import System.Directory import UnliftIO.Async import UnliftIO.Concurrent import UnliftIO.Exception as U import UnliftIO.Resource +-- import System.FilePath.Posix +import System.IO +import System.Exit + pDialog :: Parser (IO ()) pDialog = hsubparser $ mempty <> command "ping" (info pPing (progDesc "ping hbs2 node via dialog inteface") ) <> command "debug" (info pDebug (progDesc "debug call different dialog inteface routes") ) - <> command "reflog" (info pReflog (progDesc "reflog commands") ) - -pReflog :: Parser (IO ()) -pReflog = hsubparser $ mempty - <> command "get" (info pRefLogGet (progDesc "get own reflog from all" )) - <> command "fetch" (info pRefLogFetch (progDesc "fetch reflog from all" )) confOpt :: Parser FilePath confOpt = strOption ( long "config" <> short 'c' <> help "config" ) -newtype OptInitial (f :: Type -> Type) a b = OptInitial { unOptInitial :: f a } +data OptInitial (f :: * -> *) a b = OptInitial { unOptInitial :: f a } deriving (Generic, Show) -newtype OptResolved (f :: Type -> Type) a b = OptResolved { unOptResolved :: b } +data OptResolved (f :: * -> *) a b = OptResolved { unOptResolved :: b } deriving (Generic, Show) type DialOptInitial = DialOpt OptInitial type DialOptResolved = DialOpt OptResolved -data DialOpt (f :: (Type -> Type) -> Type -> Type -> Type) = DialOpt +data DialOpt (f :: (* -> *) -> * -> * -> *) = DialOpt { dialOptConf :: f Maybe FilePath PeerConfig , dialOptAddr :: f Maybe String (Peer L4Proto) } @@ -90,7 +133,7 @@ resolveDialOpt dopt = do `orDieM` "Dial endpoint not set" as <- parseAddrUDP (cs saddr) <&> fmap (fromSockAddr @'UDP . addrAddress) - peer <- headMay (List.sortBy (compare `on` addrPriority) as) + peer <- (headMay $ List.sortBy (compare `on` addrPriority) as) `orDieM` "Can't parse Dial endpoint" pure DialOpt @@ -106,35 +149,6 @@ pPing = do liftIO . print =<< do dQuery1 def cli peer (dpath "ping" []) -reflogKeyP :: ReadM (PubKey 'Sign (Encryption L4Proto)) -reflogKeyP = eitherReader $ - maybe (Left "invalid REFLOG-KEY") pure . fromStringMay - -pRefLogGet :: Parser (IO ()) -pRefLogGet = do - dopt <- pDialCommon - rkey <- argument reflogKeyP ( metavar "REFLOG-KEY" ) - pure do - withDial dopt \peer dclient -> - withClient dclient \cli -> do - xs <- dQuery1 def cli peer (dpath "reflog/get" [serialiseS rkey]) - - hash <- either (lift . lift . fail) pure $ runExcept $ flip evalStateT xs do - cutFrameDecode @(Hash HbSync) do - "No hash in response: " <> show xs - - liftIO . print $ pretty hash - -pRefLogFetch :: Parser (IO ()) -pRefLogFetch = do - dopt <- pDialCommon - rkey <- argument reflogKeyP ( metavar "REFLOG-KEY" ) - pure do - withDial dopt \peer dclient -> - withClient dclient \cli -> do - xs <- dQuery1 def cli peer (dpath "reflog/fetch" [serialiseS rkey]) - - liftIO . print $ "Response: " <> show xs pDebug :: Parser (IO ()) pDebug = do diff --git a/hbs2-peer/app/PeerMain/PeerDialog.hs b/hbs2-peer/app/PeerMain/PeerDialog.hs new file mode 100644 index 00000000..5da04012 --- /dev/null +++ b/hbs2-peer/app/PeerMain/PeerDialog.hs @@ -0,0 +1,39 @@ +module PeerMain.PeerDialog where + +import Control.Monad +import Control.Monad.IO.Class +import Data.Bool +import Data.ByteString qualified as BS +import Data.Map qualified as Map + +import Dialog.Core +import HBS2.Net.Proto.Types + + +dialogRoutes :: forall m . MonadIO m => DialogRequestRouter m +dialogRoutes = dialogRequestRoutes do + + hand ["ping"] \req -> Right \reply -> do + reply (Frames [serialiseS (ResponseHeader (ResponseStatus Success200 "") 0), "pong"]) + + hand ["spec"] \req -> Right \reply -> do + let xs = Map.keys (unDialogRequestRouter (dialogRoutes @m)) + + forM_ (zip (zip [1..] xs) ((True <$ drop 1 xs) <> [False])) \((j,x),isMore) -> do + reply (Frames [serialiseS (ResponseHeader (ResponseStatus (bool Success200 SuccessMore isMore) "") j) + , BS.intercalate "/" x + ]) + + + hand ["debug", "no-response-header"] \req -> Right \reply -> do + reply (Frames [serialiseS (ResponseHeader (ResponseStatus SuccessMore "") 0), "one"]) + reply (Frames [serialiseS (ResponseHeader (ResponseStatus SuccessMore "") 1), "two"]) + reply (Frames []) + + hand ["debug", "wrong-header"] \req -> Right \reply -> do + reply (Frames [serialiseS (ResponseHeader (ResponseStatus SuccessMore "") 0), "correct-header"]) + reply (Frames ["wrong-header"]) + + hand ["debug", "timeout"] \req -> Right \reply -> do + reply (Frames [serialiseS (ResponseHeader (ResponseStatus SuccessMore "") 0), "false more"]) + diff --git a/hbs2-peer/app/PeerTypes.hs b/hbs2-peer/app/PeerTypes.hs index 0ecd982b..e0251e31 100644 --- a/hbs2-peer/app/PeerTypes.hs +++ b/hbs2-peer/app/PeerTypes.hs @@ -415,9 +415,15 @@ mkPeerMeta conf penv = do . fromStringMay @(PeerAddr L4Proto) ) =<< cfgValue @PeerListenTCPKey conf + -- let useEncryption = True -- move to config annMetaFromPeerMeta . PeerMeta $ W.execWriter do mHttpPort `forM` \p -> elem "http-port" (TE.encodeUtf8 . Text.pack . show $ p) mTcpPort `forM` \p -> elem "listen-tcp" (TE.encodeUtf8 . Text.pack . show $ p) + -- when useEncryption do + -- elem "ekey" (TE.encodeUtf8 . Text.pack . show $ + -- (Encrypt.publicKey . _envAsymmetricKeyPair) penv + -- -- mayby sign this pubkey by node key ? + -- ) where elem k = W.tell . L.singleton . (k ,) diff --git a/hbs2-peer/app/ProxyMessaging.hs b/hbs2-peer/app/ProxyMessaging.hs index 69076054..e2cf8d52 100644 --- a/hbs2-peer/app/ProxyMessaging.hs +++ b/hbs2-peer/app/ProxyMessaging.hs @@ -4,9 +4,6 @@ module ProxyMessaging , newProxyMessaging , runProxyMessaging , sendToPlainProxyMessaging - , getEncryptionKey - , setEncryptionKey - , encryptionKeyIDKeyFromPeerData ) where import HBS2.Prelude.Plated @@ -37,7 +34,6 @@ import Control.Monad.Trans.Maybe import Data.ByteString (ByteString) import Data.ByteString qualified as BS import Data.ByteString.Lazy qualified as LBS -import Data.Hashable hiding (Hashed) import Data.Maybe import Data.String.Conversions (cs) import Data.List qualified as L @@ -45,10 +41,6 @@ import Data.Map (Map) import Data.Map qualified as Map import Lens.Micro.Platform as Lens import Control.Monad -import Data.HashMap.Strict (HashMap) -import Data.HashMap.Strict qualified as HashMap - -import HBS2.Data.Types.Peer -- TODO: protocol-encryption-goes-here @@ -62,9 +54,6 @@ data ProxyMessaging = , _proxy_clearEncryptionKey :: Peer L4Proto -> IO () , _proxy_sendResetEncryptionKeys :: Peer L4Proto -> IO () , _proxy_sendBeginEncryptionExchange :: Peer L4Proto -> IO () - - , _proxy_asymmetricKeyPair :: AsymmKeypair (Encryption L4Proto) - , _proxy_encryptionKeys :: TVar (HashMap (EncryptionKeyIDKey L4Proto) (CommonSecret (Encryption L4Proto))) } -- 1 нода X создаёт себе Encrypt.Keypair @@ -89,36 +78,8 @@ newProxyMessaging u t = liftIO do let _proxy_sendResetEncryptionKeys = const (pure ()) let _proxy_sendBeginEncryptionExchange = const (pure ()) - _proxy_asymmetricKeyPair <- asymmNewKeypair @(Encryption L4Proto) - _proxy_encryptionKeys <- liftIO (newTVarIO mempty) - pure ProxyMessaging {..} ---- - -setEncryptionKey :: - ( Hashable (PubKey 'Sign (Encryption L4Proto)) - , Hashable PeerNonce - , Show (PubKey 'Sign (Encryption L4Proto)) - , Show PeerNonce - , Show (CommonSecret (Encryption L4Proto)) - , Show (EncryptionKeyIDKey L4Proto) - ) => ProxyMessaging -> Peer L4Proto -> EncryptionKeyIDKey L4Proto -> Maybe (CommonSecret (Encryption L4Proto)) -> IO () -setEncryptionKey proxy peer pd msecret = do - atomically $ modifyTVar' (_proxy_encryptionKeys proxy) $ Lens.at pd .~ msecret - case msecret of - Nothing -> trace $ "ENCRYPTION delete key" <+> pretty peer <+> viaShow pd - Just k -> trace $ "ENCRYPTION store key" <+> pretty peer <+> viaShow pd <+> viaShow k - -getEncryptionKey :: - ( Hashable (PubKey 'Sign (Encryption L4Proto)) - , Hashable PeerNonce - ) => ProxyMessaging -> EncryptionKeyIDKey L4Proto -> IO (Maybe (CommonSecret (Encryption L4Proto))) -getEncryptionKey proxy pd = - readTVarIO (_proxy_encryptionKeys proxy) <&> preview (Lens.ix pd) - ---- - runProxyMessaging :: forall m . MonadIO m => ProxyMessaging -> m () @@ -142,6 +103,7 @@ runProxyMessaging env = liftIO do liftIO $ mapM_ waitCatch [u,t] + instance Messaging ProxyMessaging L4Proto LBS.ByteString where sendTo = sendToProxyMessaging diff --git a/hbs2-peer/hbs2-peer.cabal b/hbs2-peer/hbs2-peer.cabal index 626e9a9b..0f951bdf 100644 --- a/hbs2-peer/hbs2-peer.cabal +++ b/hbs2-peer/hbs2-peer.cabal @@ -136,8 +136,7 @@ executable hbs2-peer , Bootstrap , PeerInfo , PeerMain.DialogCliCommand - , PeerMain.Dialog.Server - , PeerMain.Dialog.Spec + , PeerMain.PeerDialog , PeerMeta , RPC , PeerTypes