From 09e070e38f8a0346d1bd857edb8d1964001cf93a Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Fri, 15 Sep 2023 19:38:32 +0300 Subject: [PATCH] merged --- .hlint.yaml | 6 + Makefile | 3 + cabal.project | 1 + examples/raft-algo/LICENSE | 0 examples/raft-algo/app/RaftAlgoMain.hs | 5 + examples/raft-algo/lib/RaftAlgo/Proto.hs | 836 ++++++++++++++++++ examples/raft-algo/raft-algo.cabal | 244 +++++ examples/raft-algo/test/RaftAlgoProtoTest.hs | 33 + hbs2-core/hbs2-core.cabal | 11 +- hbs2-core/lib/Dialog/Core.hs | 442 --------- hbs2-core/lib/HBS2/Actors/Peer.hs | 32 +- hbs2-core/lib/HBS2/Data/Types/Refs.hs | 2 +- hbs2-core/lib/HBS2/Net/Auth/AccessKey.hs | 1 + hbs2-core/lib/HBS2/Net/Auth/Credentials.hs | 6 - hbs2-core/lib/{ => HBS2/Net}/Dialog/Client.hs | 13 +- hbs2-core/lib/HBS2/Net/Dialog/Core.hs | 831 +++++++++++++++++ .../lib/{ => HBS2/Net}/Dialog/Helpers/List.hs | 2 +- .../Net}/Dialog/Helpers/Streaming.hs | 2 +- hbs2-core/lib/HBS2/Net/Proto/Dialog.hs | 62 +- .../lib/HBS2/Net/Proto/EncryptionHandshake.hs | 12 +- hbs2-core/lib/HBS2/Net/Proto/Peer.hs | 34 + hbs2-core/lib/HBS2/Net/Proto/Types.hs | 6 +- hbs2-core/test/DialogSpec.hs | 8 +- hbs2-peer/app/EncryptionKeys.hs | 11 +- hbs2-peer/app/PeerMain.hs | 82 +- hbs2-peer/app/PeerMain/Dialog/Server.hs | 178 ++++ hbs2-peer/app/PeerMain/Dialog/Spec.hs | 35 + hbs2-peer/app/PeerMain/DialogCliCommand.hs | 106 +-- hbs2-peer/app/PeerMain/PeerDialog.hs | 39 - hbs2-peer/app/PeerTypes.hs | 6 - hbs2-peer/app/ProxyMessaging.hs | 40 +- hbs2-peer/hbs2-peer.cabal | 3 +- 32 files changed, 2416 insertions(+), 676 deletions(-) create mode 100644 .hlint.yaml create mode 100644 examples/raft-algo/LICENSE create mode 100644 examples/raft-algo/app/RaftAlgoMain.hs create mode 100644 examples/raft-algo/lib/RaftAlgo/Proto.hs create mode 100644 examples/raft-algo/raft-algo.cabal create mode 100644 examples/raft-algo/test/RaftAlgoProtoTest.hs delete mode 100644 hbs2-core/lib/Dialog/Core.hs rename hbs2-core/lib/{ => HBS2/Net}/Dialog/Client.hs (93%) create mode 100644 hbs2-core/lib/HBS2/Net/Dialog/Core.hs rename hbs2-core/lib/{ => HBS2/Net}/Dialog/Helpers/List.hs (91%) rename hbs2-core/lib/{ => HBS2/Net}/Dialog/Helpers/Streaming.hs (97%) create mode 100644 hbs2-peer/app/PeerMain/Dialog/Server.hs create mode 100644 hbs2-peer/app/PeerMain/Dialog/Spec.hs delete mode 100644 hbs2-peer/app/PeerMain/PeerDialog.hs diff --git a/.hlint.yaml b/.hlint.yaml new file mode 100644 index 00000000..066e659c --- /dev/null +++ b/.hlint.yaml @@ -0,0 +1,6 @@ +- ignore: {name: "Use infix"} # 2 hints +- ignore: {name: "Use let"} # 4 hints +- ignore: {name: "Use newtype instead of data"} # 4 hints +- ignore: {name: "Use print"} # 1 hint +- ignore: {name: "Redundant bracket Found"} # 1 hint +- ignore: {name: "Redundant $"} # 1 hint diff --git a/Makefile b/Makefile index b8d14084..b923cd36 100644 --- a/Makefile +++ b/Makefile @@ -18,3 +18,6 @@ build: test-core: > nix develop -c cabal run hbs2-core:test +.PHONY: test-raft +test-raft: +> nix develop -c ghcid -c 'cabal repl' raft-algo -T RaftAlgo.Proto.devTest diff --git a/cabal.project b/cabal.project index 4ad57f5d..d37766c9 100644 --- a/cabal.project +++ b/cabal.project @@ -1,4 +1,5 @@ packages: **/*.cabal + examples/*/*.cabal -- allow-newer: all diff --git a/examples/raft-algo/LICENSE b/examples/raft-algo/LICENSE new file mode 100644 index 00000000..e69de29b diff --git a/examples/raft-algo/app/RaftAlgoMain.hs b/examples/raft-algo/app/RaftAlgoMain.hs new file mode 100644 index 00000000..e73253c9 --- /dev/null +++ b/examples/raft-algo/app/RaftAlgoMain.hs @@ -0,0 +1,5 @@ +module Main where + +main :: IO () +main = do + pure () diff --git a/examples/raft-algo/lib/RaftAlgo/Proto.hs b/examples/raft-algo/lib/RaftAlgo/Proto.hs new file mode 100644 index 00000000..9da001fe --- /dev/null +++ b/examples/raft-algo/lib/RaftAlgo/Proto.hs @@ -0,0 +1,836 @@ +-- {-# Language AllowAmbiguousTypes #-} +-- {-# LANGUAGE PolyKinds #-} +{-# LANGUAGE StrictData #-} +module RaftAlgo.Proto where + +import HBS2.Prelude.Plated + +import Control.Applicative +import Control.Arrow +import Control.Monad +import Control.Monad.Cont +import Control.Monad.Error.Class +import Control.Monad.State.Strict +import Control.Monad.Trans.Except +import Control.Monad.Trans.Maybe +import Control.Monad.Writer as W +import Data.Either +import Data.Foldable qualified as F +import Data.Function +import Data.Generics.Labels +import Data.Generics.Product +import Data.Generics.Sum +import Data.Heap (Heap) +import Data.Heap qualified as Heap +import Data.Kind +import Data.List qualified as List +import Data.Map (Map) +import Data.Map.Strict qualified as Map +import Data.Maybe +import Data.Ord +import Data.Sequence (Seq(..)) +import Data.Sequence qualified as Seq +import Data.Set (Set) +import Data.Set qualified as Set +import Data.Text qualified as T +import Data.Time +import Data.Time.Calendar.OrdinalDate +import Data.Void +import Lens.Micro.Platform as Lens +import Numeric.Natural +import Streaming +import Streaming.Prelude qualified as Streaming +import System.Random.MWC as MWC +import System.Random.Stateful + + +newtype Term = Term { unTerm :: Int } + deriving (Eq, Ord, Num, Enum, Show) + +-- | `p` - identity of a node +-- `a` - payload (reflog hash for example) +data LogEntry p a = LogEntry + { entryTerm :: Term + , entryStateMachineCommand :: StateMachineCommand p a + } + deriving (Generic, Show) + +data StateMachineCommand p a + = SMCLoad a + -- | SMAddNode p + -- | SMDropNode p + deriving (Generic, Show) + +-- Должно сохраняться надёжно: +-- currentTerm +-- votedFor +-- log[] +data PersMethods p a m = PersMethods + { getCurrentTerm :: m Term + , setCurrentTerm :: Term -> m () + -- + , getVotedFor :: m (Maybe p) + , setVotedFor :: Maybe p -> m () + -- + -- log[] .. + , getLogEntry :: Int -> m (Maybe (LogEntry p a)) + , setLogEntry :: Int -> LogEntry p a -> m () + , getLogEntriesFrom :: Int -> m [LogEntry p a] + , getLastLogIndex :: m Int + -- + , getOurID :: m p + } + +getLastLogIndexNTerm :: Monad m => PersMethods p a m -> m (Int, Term) +getLastLogIndexNTerm PersMethods{..} = do + lastLogIndex <- getLastLogIndex + (lastLogIndex, ) <$> + if lastLogIndex > 0 + then maybe 0 entryTerm <$> getLogEntry lastLogIndex + else pure 0 + +hoistMethods :: + (forall a. m a -> n a) + -> PersMethods p a m + -> PersMethods p a n +hoistMethods nt h = PersMethods + { getCurrentTerm = nt $ getCurrentTerm h + , setCurrentTerm = nt . setCurrentTerm h + -- + , getVotedFor = nt $ getVotedFor h + , setVotedFor = nt . setVotedFor h + -- + , getLogEntry = nt . getLogEntry h + , setLogEntry = \j le -> nt $ setLogEntry h j le + , getLogEntriesFrom = nt . getLogEntriesFrom h + , getLastLogIndex = nt $ getLastLogIndex h + -- + , getOurID = nt $ getOurID h + } + +data RaftState p = RaftState + { commitIndex :: Int + , lastAppliedIndex :: Int + , actorState :: ActorState p + , cluster :: Set p + } + deriving (Generic, Show) + +data ActorState p + = StateFollower + | StateCandidate + { votesCollected :: Set p + } + | StateLeader + { nextIndex :: Map p Int + , matchIndex :: Map p Int + } + deriving (Generic, Eq, Ord, Show) + +--- + +data RequestVote p = RequestVote + { requestVoteTerm :: Term + , requestVoteCandidateID :: p + , requestVoteCandidateLastLogIndex :: Int + , requestVoteCandidateLastLogTerm :: Term + } + deriving (Generic, Show) + +data AppendEntries p a = AppendEntries + { appendEntriesTerm :: Term + , appendEntriesLeaderID :: p + , appendEntriesPrevLogIndex :: Int + , appendEntriesPrevLogTerm :: Term + , appendEntriesES :: [LogEntry p a] + , appendEntriesLeaderCommitIndex :: Int + } + deriving (Generic, Show) + +data ProcCall p a + = CallRequestVote (RequestVote p) + | RequestVoteReply + { requestVoteReplyTerm :: Term + , requestVoteReplyGranted :: Bool + , requestVoteReplyFromID :: p + } + | CallAppendEntries (AppendEntries p a) + | AppendEntriesReply + { appendEntriesReplyTerm :: Term + , appendEntriesReplySuccess :: Bool + , appendEntriesReplyFromID :: p + } + deriving (Generic, Show) + + +data NodeEvent p a + = ElectionTimeoutElapses + | LeaderHeartbeat + | GotProcCall (ProcCall p a) + | GotClientCall (StateMachineCommand p a) + deriving (Show) + +data NodeAction p a + = ResetElectionTimer + | ResetLeaderHeartbeat + + | UpdateState (RaftState p -> RaftState p) + | SwitchRole (ActorState p) + + | LogMessage Text + | CallProc p (ProcCall p a) + | ReplyProc p (ProcCall p a) + + | ReplyToClientWhoIsLeader (Maybe p) + +--- +-- Test cluster runner + +data TestLogState p a = TestLogState + { testLogStateCurrentTerm :: Term + , testLogStateVotedFor :: Maybe p + , testLogStateLog :: Seq (LogEntry p a) + , testLogStateRaftState :: RaftState p + , testLogStateNodeID :: p + } + deriving (Generic, Show) + +newtype NodeID a = NodeID a + deriving newtype (Show, Eq, Ord, Num, Enum) +type NodeIDInt = NodeID Int + +newtype TestData = TestData Text + deriving newtype (Show, Eq, Ord, IsString) + +data TestEvent t = TestEvent + { testEventTime :: t + , testEvent :: NodeEvent NodeIDInt TestData + } + deriving (Generic) + +data TestClusterState = TestClusterState + { testClusterNodes :: Map NodeIDInt (TestLogState NodeIDInt TestData) + , testClusterEvents :: Heap (Heap.Entry UTCTime ( + Either + (NodeEvent NodeIDInt TestData, [TestEvent UTCTime]) + (NodeIDInt, NodeEvent NodeIDInt TestData) + )) + , testClusterElectionTimeouts :: Map NodeIDInt Int + } + deriving (Generic) + +devTest :: IO () +devTest = do + evalTestCluster 5 commands $ Streaming.take 142 >>> Streaming.mapM display >>> Streaming.effects + where + display = List.uncons >>> mapM_ \(x,xs') -> liftIO do + putStrLn (T.unpack x) + forM_ xs' \x' -> do + putStr " " + putStrLn (T.unpack x') + putStrLn "" + + commands :: [TestEvent NominalDiffTime] + commands = [ + TestEvent 9 (GotClientCall (SMCLoad "Tx1")) + , TestEvent 1 (GotClientCall (SMCLoad "Tx2")) + , TestEvent 3 (GotClientCall (SMCLoad "Tx3")) + , TestEvent 2 (GotClientCall (SMCLoad "Tx4")) + ] + +evalTestCluster :: + ( Monad m + , m' ~ StateT TestClusterState (StateT StdGen m) + ) + => Int + -> [TestEvent NominalDiffTime] + -> (Stream (Of [Text]) m' () -> m' ()) + -> m () +evalTestCluster clusterSize cevs sf = + runStateGenT_ (mkStdGen randomSeed) \g -> do + + initialNodeTimeouts <- forM nodeIDs \p -> do + dt <- realToFrac @Double <$> MWC.uniformRM (minElectionTimeout, maxElectionTimeout) g + pure $ Heap.Entry (addUTCTime dt timeBegin) (Right (p, ElectionTimeoutElapses)) + + evalStateT ( + sf $ + fix \go -> + lift (clusterStep g) >>= either + (\e -> Streaming.each [[e]]) + (\es -> do + Streaming.each [es] + go + ) + ) + TestClusterState + { testClusterNodes = Map.fromList $ nodeIDs <&> (id &&& mkTestNodeState (Set.fromList nodeIDs)) + , testClusterEvents = Heap.fromList $ tcevs <> initialNodeTimeouts + , testClusterElectionTimeouts = mempty + } + + where + + randomSeed = 1 + + minElectionTimeout :: Double + minElectionTimeout = 5 + maxElectionTimeout = minElectionTimeout * 2 + + heartbeatPeriod :: NominalDiffTime + heartbeatPeriod = realToFrac (minElectionTimeout * 3 / 4) + + nodeIDs = fromIntegral <$> [1..clusterSize] + + timeBegin :: UTCTime + timeBegin = UTCTime (YearDay 2000 1) 0 + + tcevs = case integrateTestEventTimes timeBegin cevs of + [] -> [] + (TestEvent t1 ev1):clientEvents -> + [Heap.Entry t1 (Left (ev1, clientEvents))] + + integrateTestEventTimes :: UTCTime -> [TestEvent NominalDiffTime] -> [TestEvent UTCTime] + integrateTestEventTimes t0 = flip evalState t0 . mapM \ev -> do + upd <- addUTCTime (ev ^. #testEventTime) <$> get + put upd + pure $ ev & #testEventTime .~ upd + + mkTestNodeState :: Set NodeIDInt -> NodeIDInt -> TestLogState NodeIDInt TestData + mkTestNodeState allNodeIDs nodeID = TestLogState + { testLogStateCurrentTerm = 0 + , testLogStateVotedFor = Nothing + , testLogStateLog = mempty + , testLogStateRaftState = + RaftState + { commitIndex = 0 + , lastAppliedIndex = 0 + , actorState = StateFollower + , cluster = allNodeIDs + } + , testLogStateNodeID = nodeID + } + + clusterStep :: (Monad m, StatefulGen (StateGenM StdGen) m) + => StateGenM StdGen -> StateT TestClusterState m (Either Text [Text]) + clusterStep g = runExceptT do + (Heap.Entry tnow tev, heapRest) <- justOrThrowError "no events" + $ gets (Heap.uncons . view #testClusterEvents) + #testClusterEvents .= heapRest + + case tev of + -- Запрос от клиента + Left (ev, tevs) -> do + case tevs of + (TestEvent tnext evnext):tevs' -> do + #testClusterEvents %= mappend do + Heap.singleton (Heap.Entry tnext (Left (evnext, tevs'))) + [] -> pure () + clusterSize <- gets (Map.size . view #testClusterNodes) + targetNodeID <- lift . lift $ fromIntegral <$> MWC.uniformRM (1, clusterSize) g + runEvent tnow targetNodeID ev + + -- Событие от ноды для ноды nodeID + Right (nodeID, ev :: NodeEvent NodeIDInt TestData) -> + runEvent tnow nodeID ev + + + where + runEvent tnow nodeID ev = fromMaybe [] <$> runMaybeT do + + case ev of + ElectionTimeoutElapses -> do + + x <- #testClusterElectionTimeouts . Lens.at nodeID . non 1 <%= pred + when (x > 0) do + fail "old timeout droped" + + _ -> pure () + + lift do + nodeState :: TestLogState NodeIDInt TestData + <- justOrThrowError ("no state for node " <> showt nodeID) + $ gets $ preview (#testClusterNodes . ix nodeID) + + let testactions :: [TestNodeAction] + (testactions, nodeState') = flip runState nodeState (nodeTestStep ev) + + #testClusterNodes . ix nodeID .= nodeState' + + -- pure (Just nodeID, ev, mempty, testactions) + + -- [(NominalDiffTime, (NodeIDInt, NodeEvent NodeIDInt TestData))] + (newevents, log :: [Text]) <- W.runWriterT $ catMaybes <$> forM testactions \case + + TestResetElectionTimer -> do + dt <- lift . lift . lift $ realToFrac @Double + <$> MWC.uniformRM (minElectionTimeout, maxElectionTimeout) g + -- Как сбрасывать предыдущие таймеры? + -- В словарике testClusterElectionTimeouts по ключам + -- будем считать сколько добавлено новых + -- TestResetElectionTimer Событие кидать только если + -- оно соответствует единственной оставшейся записи. + x <- #testClusterElectionTimeouts . Lens.at nodeID . non 1 <<%= succ + -- W.tell [showt x] + pure $ Just $ + (dt, (nodeID, ElectionTimeoutElapses)) + + TestResetLeaderHeartbeat -> + pure $ Just $ + (heartbeatPeriod, (nodeID, LeaderHeartbeat)) + + TestApplyLogEntryToSM t -> pure Nothing + + TestLogMessage t -> pure Nothing + + TestCallProc nodeID pcall -> do + dt <- lift . lift . lift $ realToFrac @Double + <$> MWC.uniformRM (0.01, 0.1) g + pure $ Just $ + (dt, (nodeID, GotProcCall pcall)) + + TestReplyProc nodeID pcall -> do + dt <- lift . lift . lift $ realToFrac @Double + <$> MWC.uniformRM (0.01, 0.1) g + pure $ Just $ + (dt, (nodeID, GotProcCall pcall)) + + TestReplyToClientWhoIsLeader mnodeID -> forM mnodeID \nodeID -> do + dt <- lift . lift . lift $ realToFrac @Double + <$> MWC.uniformRM (0.3, 1) g + pure (dt, (nodeID, ev)) + + #testClusterEvents %= mappend do + Heap.fromList (newevents <&> \(t',ev') -> Heap.Entry (addUTCTime t' tnow) (Right ev')) + + nodeStates <- gets (view #testClusterNodes) + + -- pure [showt (tnow, nodeID, ev, "->", testactions ^.. traverse . _Ctor @"TestLogMessage")] + pure $ ([showt $ (tnow, nodeID, ev)] + <> (showt <$> (testactions ^.. traverse)) + <> log + <> ["states:"] + <> (showt <$> Map.elems (nodeStates)) + ) + +showt :: Show a => a -> Text +showt = T.pack . show + +data TestNodeAction + = TestResetElectionTimer + | TestResetLeaderHeartbeat + | TestApplyLogEntryToSM TestData + | TestLogMessage Text + | TestCallProc NodeIDInt (ProcCall NodeIDInt TestData) + | TestReplyProc NodeIDInt (ProcCall NodeIDInt TestData) + | TestReplyToClientWhoIsLeader (Maybe NodeIDInt) + deriving (Generic, Show) + +nodeTestStep :: forall m. (Monad m) + => NodeEvent NodeIDInt TestData + -> StateT (TestLogState NodeIDInt TestData) m [TestNodeAction] +nodeTestStep ev = do + + let h :: PersMethods NodeIDInt TestData (StateT (TestLogState NodeIDInt TestData) m) + h = PersMethods + { getCurrentTerm = gets (view #testLogStateCurrentTerm) + , setCurrentTerm = (#testLogStateCurrentTerm .=) + -- + , getVotedFor = gets (view #testLogStateVotedFor) + , setVotedFor = (#testLogStateVotedFor .=) + -- + , getLogEntry = \j -> gets $ Seq.lookup (j-1) . view #testLogStateLog + , setLogEntry = \j le -> #testLogStateLog %= \s -> do + when (Seq.length s < (j-1)) do + error "Raft algo error. Trying to set log element after missed elements" + Seq.take (j-1) s Seq.|> le + + , getLogEntriesFrom = \j -> gets (F.toList . Seq.drop (j-1) . (view #testLogStateLog)) + , getLastLogIndex = gets (Seq.length . view #testLogStateLog) + -- + , getOurID = gets (view #testLogStateNodeID) + } + + rstate <- gets (view #testLogStateRaftState) + (rstate', actions) <- do + flip runStateT mempty do + flip execStateT rstate do + flip runContT absurd do + actions :: [TestNodeAction] <- testNodeReact (hoistMethods (lift . lift) h) ev + lift . lift $ modify (actions <>) + -- lift . lift . modify . (<>) =<< testNodeReact (hoistMethods (lift . lift) h) ev + ContT \k -> pure () + #testLogStateRaftState .= rstate' + pure (reverse actions) + + where + seqLast :: Seq a -> a + seqLast = \case + (Seq.viewr -> _ Seq.:> x) -> x + _ -> error "no last element in empty sequence" + +testNodeReact :: (MonadState (RaftState p) m, p ~ NodeIDInt, a ~ TestData) + => PersMethods p a m -> NodeEvent p a -> ContT () m [TestNodeAction] +testNodeReact h ev = do + x <- nodeReact h ev =<< get + ContT \k -> do + case x of + ResetElectionTimer -> k [TestResetElectionTimer] + ResetLeaderHeartbeat -> k [TestResetLeaderHeartbeat] + + UpdateState f -> id %= f + SwitchRole st -> do + #actorState .= st + k [TestLogMessage ("Switch role to " <> showt st)] + + LogMessage msg -> k [TestLogMessage msg] + CallProc p proc -> k [TestCallProc p proc] + ReplyProc p proc -> k [TestReplyProc p proc] + + ReplyToClientWhoIsLeader mp -> k [TestReplyToClientWhoIsLeader mp] + + cix <- (gets (view #commitIndex)) + (gets (view #lastAppliedIndex)) >>= fix \go lastApplied -> do + when (cix > lastApplied) do + -- increment lastApplied, apply log[lastApplied] to stateMachine + lastApplied' <- #lastAppliedIndex <%= succ + (getLogEntry h) lastApplied' >>= mapM_ \le -> do + case entryStateMachineCommand le of + SMCLoad a -> do + k [TestApplyLogEntryToSM a] + -- | SMAddNode p + -- | SMDropNode p + go lastApplied' + +nodeReact :: forall m p a. (Monad m, Ord p, Show p, Show a) + => PersMethods p a m + -> NodeEvent p a + -> RaftState p + -> ContT () m (NodeAction p a) +nodeReact h@PersMethods{..} ev RaftState{..} = do + ourID <- lift getOurID + let otherNodes = cluster `Set.difference` Set.singleton ourID + ContT \k -> case actorState of + + StateFollower -> do + -- * respond to rpcs from candidates and leaders + -- * if election timeout elapses without receiving AppendEntries from + -- current leader or asking vote from candidates + -- then convert to candidate + case ev of + ElectionTimeoutElapses -> do + k (SwitchRole (StateCandidate (Set.singleton ourID))) + startNewElection k + + LeaderHeartbeat -> do + pure () + + GotProcCall proc -> case proc of + + CallRequestVote (req@RequestVote {..}) -> do + updateCurrentTerm_ k requestVoteTerm + granted <- replyAfterRequestVote req k + k (LogMessage $ "granted: " <> showt granted) + when granted do + k ResetElectionTimer + + CallAppendEntries req@AppendEntries {..} -> do + updateCurrentTerm_ k appendEntriesTerm + replyAfterAppendEntries req k do + k ResetElectionTimer + + RequestVoteReply {..} -> do + k (LogMessage "Follower Got RequestVoteReply. Why ???") + void $ updateCurrentTerm_ k requestVoteReplyTerm + + AppendEntriesReply {..} -> do + void $ updateCurrentTerm_ k appendEntriesReplyTerm + + GotClientCall {} -> do + votedFor <- getVotedFor + k (ReplyToClientWhoIsLeader votedFor) + + -- * if votes received from majority of servers: become leader + -- * if AppendEntries received from new leader: become follower + -- * if election timeout elapses: start new election + StateCandidate{..} -> do + case ev of + ElectionTimeoutElapses -> do + startNewElection k + + LeaderHeartbeat -> do + pure () + + GotProcCall proc -> case proc of + + CallRequestVote req@RequestVote {..} -> do + upd <- updateCurrentTerm k requestVoteTerm + granted <- replyAfterRequestVote req k + when (granted && not upd) do + k ResetElectionTimer + + CallAppendEntries req@AppendEntries {..} -> do + updateCurrentTerm_ k appendEntriesTerm + replyAfterAppendEntries req k do + k ResetElectionTimer + k (SwitchRole StateFollower) + + RequestVoteReply {..} -> do + upd <- updateCurrentTerm k requestVoteReplyTerm + when (requestVoteReplyGranted && not upd) do + let votesCollected' = Set.insert requestVoteReplyFromID votesCollected + k (UpdateState (#actorState . _Ctor @"StateCandidate" .~ votesCollected')) + k (LogMessage ("Votes collected " <> showt votesCollected')) + when (Set.size votesCollected' > Set.size cluster `div` 2) do + (lastLogIndex, lastLogEntryTerm) <- getLastLogIndexNTerm h + k (SwitchRole (StateLeader + { nextIndex = Map.fromList $ (Set.toList otherNodes) <&> (, succ lastLogIndex) + , matchIndex = Map.fromList $ (Set.toList otherNodes) <&> (, defMatchIndex) + })) + k ResetLeaderHeartbeat + forM_ otherNodes \p -> do + term <- getCurrentTerm + k (CallProc p (CallAppendEntries ( + AppendEntries + { appendEntriesTerm = term + , appendEntriesLeaderID = ourID + , appendEntriesPrevLogIndex = lastLogIndex + , appendEntriesPrevLogTerm = lastLogEntryTerm + , appendEntriesES = mempty + , appendEntriesLeaderCommitIndex = commitIndex + }))) + + AppendEntriesReply {..} -> do + void $ updateCurrentTerm k appendEntriesReplyTerm + + GotClientCall {} -> do + votedFor <- getVotedFor + k (ReplyToClientWhoIsLeader votedFor) + + StateLeader{..} -> do + let + leaderCallAppendEntries p = do + ourLastLogIndex <- getLastLogIndex + + let pPrevIndex = maybe ourLastLogIndex pred (nextIndex ^? ix p) + (pPrevEntryTerm, entries') <- + if pPrevIndex > 0 + then + maybe (error "Bug in algorithm") (over _1 entryTerm) . List.uncons + <$> getLogEntriesFrom pPrevIndex + else + (0 ,) <$> getLogEntriesFrom 1 + + term <- getCurrentTerm + k (CallProc p (CallAppendEntries ( + AppendEntries + { appendEntriesTerm = term + , appendEntriesLeaderID = ourID + , appendEntriesPrevLogIndex = pPrevIndex + , appendEntriesPrevLogTerm = pPrevEntryTerm + , appendEntriesES = entries' + , appendEntriesLeaderCommitIndex = commitIndex + }))) + + case ev of + + ElectionTimeoutElapses -> + pure () + + LeaderHeartbeat -> do + mapM_ leaderCallAppendEntries otherNodes + k ResetLeaderHeartbeat + + GotProcCall proc -> case proc of + + CallRequestVote req@RequestVote {..} -> do + updateCurrentTerm k requestVoteTerm + replyAfterRequestVote req k + pure () + + CallAppendEntries req@AppendEntries {..} -> do + updateCurrentTerm k appendEntriesTerm + replyAfterAppendEntries req k do + k ResetElectionTimer + k (SwitchRole StateFollower) + pure () + + RequestVoteReply {..} -> do + updateCurrentTerm k requestVoteReplyTerm + pure () + + AppendEntriesReply {..} -> do + -- Нужно ли здесь учитывать appendEntriesReplyTerm ? + currentTerm <- getCurrentTerm + if (appendEntriesReplyTerm == currentTerm) + then do + if appendEntriesReplySuccess + then do + ourLastLogIndex <- getLastLogIndex + let + -- updMatchIndex :: Map p Int -> Map p Int + updMatchIndex = Lens.at appendEntriesReplyFromID . non defMatchIndex .~ ourLastLogIndex + k (UpdateState (( + #actorState . _Ctor @"StateLeader" + . _1 -- . #nextIndex + . ix appendEntriesReplyFromID .~ succ ourLastLogIndex + ) . ( + #actorState . _Ctor @"StateLeader" + . _2 -- . #matchIndex + %~ updMatchIndex + ))) + let newCommitIndex = matchIndex & updMatchIndex & Map.elems + & Heap.fromList + & Heap.drop ((Set.size cluster `div` 2 - 1)) + & Heap.uncons + & maybe defMatchIndex fst + when (newCommitIndex > commitIndex) do + getLogEntry newCommitIndex >>= mapM_ do + entryTerm >>> \term -> + when (term == currentTerm) do + setCurrentTerm term + k (UpdateState (#commitIndex .~ newCommitIndex)) + else do + k (UpdateState (#actorState . _Ctor @"StateLeader" + . _1 -- . #nextIndex + . ix appendEntriesReplyFromID %~ pred + )) + leaderCallAppendEntries appendEntriesReplyFromID + else + void $ updateCurrentTerm k appendEntriesReplyTerm + + GotClientCall cmd -> do + j <- getLastLogIndex + term <- getCurrentTerm + setLogEntry (succ j) + (LogEntry + { entryTerm = term + , entryStateMachineCommand = cmd + }) + -- TODO "respond after entry applied to state machine" + + where + + defMatchIndex = 0 + + startNewElection k = do + + ourID <- getOurID + let otherNodes = cluster `Set.difference` Set.singleton ourID + + setCurrentTerm . succ =<< getCurrentTerm + setVotedFor . Just =<< getOurID + k ResetElectionTimer + + term <- getCurrentTerm + (lastLogIndex, lastLogEntryTerm) <- getLastLogIndexNTerm h + forM_ otherNodes \p -> do + k (CallProc p (CallRequestVote ( + RequestVote + { requestVoteTerm = term + , requestVoteCandidateID = ourID + , requestVoteCandidateLastLogIndex = lastLogIndex + , requestVoteCandidateLastLogTerm = lastLogEntryTerm + }))) + + updateCurrentTerm_ k term = do + updateCurrentTerm' k term (pure ()) + + updateCurrentTerm k term = do + updateCurrentTerm' k term do + k (SwitchRole StateFollower) + k ResetElectionTimer + + updateCurrentTerm' k term onUpdate = do + currentTerm <- getCurrentTerm + k (LogMessage ("updateCurrentTerm ? our:" <> showt currentTerm <> " -> new:" <> showt term)) + if (currentTerm < term) then do + setCurrentTerm term + onUpdate + setVotedFor Nothing + pure True + else pure False + + replyAfterRequestVote RequestVote {..} k = do + -- 1. if term < currentTerm: reply false + -- 2. else if voteFor is null or candidateID, + -- and candidate's log is at least up-to-date as our log: grant vote + currentTerm <- getCurrentTerm + granted <- (either (\e -> k (LogMessage e) >> pure False) (const (pure True))) =<< runExceptT do + + when (requestVoteTerm < currentTerm) do + throwError "requestVoteTerm < currentTerm" + + lift getVotedFor >>= mapM_ \ourVoteFor -> do + when (ourVoteFor /= requestVoteCandidateID) do + throwError "already voted for another candidate" + + setVotedFor (Just requestVoteCandidateID) + ourID <- getOurID + k $ ReplyProc requestVoteCandidateID + $ RequestVoteReply + { requestVoteReplyTerm = currentTerm + , requestVoteReplyGranted = granted + , requestVoteReplyFromID = ourID + } + pure granted + + replyAfterAppendEntries AppendEntries {..} k onSuccess = do + -- 1. if term < currentTerm: reply false + -- 2. reply false if log doesn't contain an entry at prevLogIndex whose + -- term matches prevLogTerm + -- 3. if an existing entry conflicts with a new one (same index but + -- different terms), delete the existing entry and all that follow it + -- 4. append any new entries not already in the log + -- 5. if leaderCommit > commitIndex, + -- set commitIndex = min(leaderCommit, index of last new entry) + + currentTerm <- getCurrentTerm + k (LogMessage ("replyAfterAppendEntries our:" <> showt currentTerm <> " showt appendEntriesTerm)) + success <- (either (\e -> k (LogMessage e) >> pure False) (const (pure True))) =<< runExceptT do + + when (appendEntriesTerm < currentTerm) do + throwError "appendEntriesTerm < currentTerm" + + when (appendEntriesPrevLogIndex > 0) do + le' <- justOrThrowError ("No log entry " <> showt appendEntriesPrevLogIndex) + $ getLogEntry appendEntriesPrevLogIndex + when (entryTerm le' /= appendEntriesPrevLogTerm) do + throwError "entryTerm at appendEntriesPrevLogIndex /= appendEntriesPrevLogTerm" + + justOrThrowError "?-?" $ flip fix (zip [succ appendEntriesPrevLogIndex..] appendEntriesES) $ + \go -> \case + [] -> pure (Just ()) + (j, le):es -> + getLogEntry j >>= \case + + Nothing -> do + setLogEntry j le + mapM_ (uncurry setLogEntry) es + pure (Just ()) + + Just cle -> do + if (entryTerm cle == entryTerm le) + then go es + else do + setLogEntry j le + mapM_ (uncurry setLogEntry) es + pure (Just ()) + + when (appendEntriesLeaderCommitIndex > commitIndex) do + lift do + j <- getLastLogIndex + k (UpdateState (#commitIndex .~ min appendEntriesLeaderCommitIndex j)) + + when success onSuccess + + ourID <- getOurID + k $ ReplyProc appendEntriesLeaderID + $ AppendEntriesReply + { appendEntriesReplyTerm = currentTerm + , appendEntriesReplySuccess = success + , appendEntriesReplyFromID = ourID + } + +justOrThrowError :: Functor m => e -> m (Maybe a) -> ExceptT e m a +justOrThrowError e = ExceptT . fmap (maybe (Left e) Right) diff --git a/examples/raft-algo/raft-algo.cabal b/examples/raft-algo/raft-algo.cabal new file mode 100644 index 00000000..6413302e --- /dev/null +++ b/examples/raft-algo/raft-algo.cabal @@ -0,0 +1,244 @@ +cabal-version: 3.0 +name: raft-algo +version: 0.1.0.0 +-- synopsis: +-- description: +license: BSD-3-Clause +license-file: LICENSE +-- author: +-- maintainer: +-- copyright: +category: Network +build-type: Simple +extra-doc-files: CHANGELOG.md +-- extra-source-files: + +common warnings + ghc-options: -Wall + +common common-deps + build-depends: + base, hbs2-core, hbs2-storage-simple + , async + , bytestring + , cache + , containers + , data-default + , deepseq + , directory + , filepath + , generic-lens + , hashable + , heaps + , microlens-platform + , mtl + , mwc-random + , prettyprinter + , QuickCheck + , random + , random-shuffle + , resourcet + , safe + , serialise + , split + , stm + , streaming + , suckless-conf + , tasty + , tasty-hunit + , temporary + , timeit + , transformers + , uniplate + , unordered-containers + , vector + , prettyprinter-ansi-terminal + , interpolatedstring-perl6 + , unliftio + +common shared-properties + ghc-options: + -Wall + -O2 + -fno-warn-type-defaults + -- -fno-warn-unused-matches + -- -fno-warn-unused-do-bind + -- -Werror=missing-methods + -- -Werror=incomplete-patterns + -- -fno-warn-unused-binds + -threaded + -rtsopts + "-with-rtsopts=-N4 -A256m -AL256m -I0" + + + default-language: Haskell2010 + + default-extensions: + ApplicativeDo + , BangPatterns + , BlockArguments + , ConstraintKinds + , DataKinds + , DeriveDataTypeable + , DeriveGeneric + , DerivingStrategies + , DerivingVia + , ExtendedDefaultRules + , FlexibleContexts + , FlexibleInstances + , GADTs + , GeneralizedNewtypeDeriving + , ImportQualifiedPost + , LambdaCase + , MultiParamTypeClasses + , OverloadedLabels + , OverloadedStrings + , QuasiQuotes + , RankNTypes + , RecordWildCards + , ScopedTypeVariables + , StandaloneDeriving + , TupleSections + , TypeApplications + , TypeFamilies + , ViewPatterns + +library + import: shared-properties + import: common-deps + default-language: Haskell2010 + + exposed-modules: RaftAlgo.Proto + + ghc-options: + -- -prof + -- -fprof-auto + + -- other-extensions: + + -- type: exitcode-stdio-1.0 + hs-source-dirs: lib + + build-depends: + base, hbs2-core + , async + , attoparsec + , bytestring + , cache + , clock + , containers + , data-default + , directory + , hashable + , microlens-platform + , mtl + , mwc-random + , network + , network-ip + , optparse-applicative + , prettyprinter + , QuickCheck + , random + , safe + , serialise + , stm + , streaming + , tasty + , tasty-hunit + , text + , time + , transformers + , uniplate + , vector + , unliftio + +executable raft-algo-app + import: shared-properties + import: common-deps + default-language: Haskell2010 + + ghc-options: + -- -prof + -- -fprof-auto + + -- other-modules: + + -- other-extensions: + + -- type: exitcode-stdio-1.0 + hs-source-dirs: app lib + main-is: RaftAlgoMain.hs + + build-depends: + base, raft-algo, hbs2-core, hbs2-storage-simple + , async + , attoparsec + , bytestring + , cache + , clock + , containers + , data-default + , data-textual + , directory + , hashable + , microlens-platform + , mtl + , mwc-random + , network + , network-ip + , optparse-applicative + , prettyprinter + , QuickCheck + , random + , safe + , serialise + , stm + , streaming + , tasty + , tasty-hunit + , text + , time + , transformers + , uniplate + , vector + , unliftio + + +test-suite raft-algo-proto-test + import: shared-properties + default-language: Haskell2010 + + other-modules: + + type: exitcode-stdio-1.0 + hs-source-dirs: test + main-is: RaftAlgoProtoTest.hs + + build-depends: + base, raft-algo, hbs2-core + , async + , bytestring + , cache + , cborg + , containers + , directory + , filepath + , hashable + , microlens-platform + , mtl + , optparse-applicative + , prettyprinter + , QuickCheck + , random + , safe + , serialise + , tasty + , tasty-hunit + , temporary + , timeit + , uniplate + , unliftio + , unordered-containers + , vector + + diff --git a/examples/raft-algo/test/RaftAlgoProtoTest.hs b/examples/raft-algo/test/RaftAlgoProtoTest.hs new file mode 100644 index 00000000..a652d1f5 --- /dev/null +++ b/examples/raft-algo/test/RaftAlgoProtoTest.hs @@ -0,0 +1,33 @@ +module Main where + +import HBS2.Prelude.Plated +import HBS2.System.Logger.Simple +import HBS2.Clock +import HBS2.Hash +import HBS2.Base58 + +import RaftAlgo.Proto + +import Data.Ord +import Data.Functor +import Data.HashMap.Strict (HashMap) +import Data.HashMap.Strict qualified as HashMap +import Data.List qualified as List +import Control.Monad.Reader +import System.Random +import Data.Hashable hiding (Hashed) +import Data.Word +import Lens.Micro.Platform +import Data.Fixed +import Options.Applicative as O +import Data.Cache (Cache) +import Data.Cache qualified as Cache +import Data.Maybe +import Data.Graph +import Data.HashSet (HashSet) +import Data.HashSet qualified as HashSet + +import Codec.Serialise + +import UnliftIO + diff --git a/hbs2-core/hbs2-core.cabal b/hbs2-core/hbs2-core.cabal index 4093e790..9db89d3a 100644 --- a/hbs2-core/hbs2-core.cabal +++ b/hbs2-core/hbs2-core.cabal @@ -20,6 +20,7 @@ common shared-properties ghc-options: -Wall -fno-warn-type-defaults + -- -fprint-potential-instances -- -prof -fprof-auto -- -fno-warn-unused-matches -- -fno-warn-unused-do-bind @@ -40,6 +41,7 @@ common shared-properties , ConstraintKinds , DataKinds , DeriveDataTypeable + , DeriveFunctor , DeriveGeneric , DerivingStrategies , DerivingVia @@ -121,10 +123,10 @@ library , HBS2.Storage.Operations , HBS2.System.Logger.Simple , HBS2.System.Logger.Simple.Class - , Dialog.Core - , Dialog.Client - , Dialog.Helpers.List - , Dialog.Helpers.Streaming + , HBS2.Net.Dialog.Core + , HBS2.Net.Dialog.Client + , HBS2.Net.Dialog.Helpers.List + , HBS2.Net.Dialog.Helpers.Streaming -- other-modules: @@ -140,6 +142,7 @@ library , cache , cborg , clock + , constraints , containers , cryptonite , data-default diff --git a/hbs2-core/lib/Dialog/Core.hs b/hbs2-core/lib/Dialog/Core.hs deleted file mode 100644 index d259e89f..00000000 --- a/hbs2-core/lib/Dialog/Core.hs +++ /dev/null @@ -1,442 +0,0 @@ -{-# LANGUAGE StrictData #-} --- {-# LANGUAGE OverloadedLists #-} --- {-# LANGUAGE UndecidableInstances #-} -module Dialog.Core where - --- import Data.ByteString.Builder as Builder --- import Data.ByteString.Builder.Internal as Builder --- import GHC.IsList -import Codec.Serialise -import Control.Arrow -import Control.Monad -import Control.Monad.Error.Class -import Control.Monad.Except (Except(..), ExceptT(..), runExcept, runExceptT) -import Control.Monad.IO.Class -import Control.Monad.State.Class as State -import Control.Monad.State.Strict (evalStateT) -import Control.Monad.Trans.Class -import Control.Monad.Writer qualified as W -import Data.Binary.Get as Get -import Data.Binary.Put as Put -import Data.Bits -import Data.Bool -import Data.ByteArray (ByteArrayAccess) -import Data.ByteArray.Sized as BAS -import Data.ByteString (ByteString) -import Data.ByteString qualified as BS -import Data.ByteString.Char8 qualified as BS8 -import Data.ByteString.Lazy qualified as BSL -import Data.Foldable -import Data.Foldable as F -import Data.Function -import Data.Generics.Labels -import Data.Generics.Product.Fields -import Data.List qualified as List -import Data.List.NonEmpty qualified as NE -import Data.Map.Strict as Map -import Data.Maybe -import Data.String.Conversions as X (cs) -import Data.Text (Text) -import Data.Word -import GHC.Exts -import GHC.Generics (Generic) -import Lens.Micro.Platform -import Numeric.Natural -import System.Random.MWC -import UnliftIO.Exception -import UnliftIO.STM - --- import Prettyprinter --- import HBS2.Base58 -import Data.ByteString.Base16 qualified as B16 - -import Dialog.Helpers.List - -type Frames = Frames' ByteString -newtype Frames' a = Frames { unFrames :: [a] } - deriving stock (Generic,Eq) - deriving newtype (Functor, Foldable, Semigroup, Monoid - -- , IsList - ) - - -instance Show Frames where - show (Frames xs) = "Frames " <> show (BS.length <$> xs) - -- <> " " <> show (fmap B16.encode xs) - <> " " <> (show . fmap (limitSize 42)) xs - - where - limitSize n as = bool as (BS.take (n-3) as <> "...") (BS.length as > n) - -framesBodyPart :: Traversal' Frames [ByteString] -framesBodyPart = #unFrames . tailAfterP (== "") - -tailAfterP :: forall a . (a -> Bool) -> Traversal' [a] [a] -tailAfterP p focus = fix \go -> \case - x:xs -> (x :) <$> bool go focus (p x) xs - xs -> pure xs - ---- - --- encodeFrames :: Frames -> ByteString -encodeFrames :: Foldable t => t ByteString -> ByteString -encodeFrames = F.toList >>> BSL.toStrict . runPut . \case - - [] -> pure () - - xss -> flip fix xss \go -> \case - [] -> pure () - bs:xs -> do - let (flip shiftR 1 -> n1, ns) = unfoldSizeBytes @Word64 . flip shiftL 1 . fromIntegral . BS.length $ bs - - putWord8 $ n1 - & (bool (sbit 7) id (List.null xs)) - & (bool (sbit 6) id (List.null ns)) - - forM_ (markMore ns) \(n, isMoreBytesInSize) -> do - putWord8 $ n & bool (zbit 7) (sbit 7) isMoreBytesInSize - - putByteString bs - - go xs - - where - - markMore as = zip as ((True <$ List.drop 1 as) <> [False]) - - unfoldSizeBytes :: (Bits n, Integral n) => n -> (Word8, [Word8]) - unfoldSizeBytes = (\(a NE.:| as) -> (a, as)) . NE.unfoldr \w -> - ( (flip shiftR 1 . flip shiftL 1 . fromIntegral) w - , let w' = shiftR w 7 - in bool Nothing (Just w') (w' > 0) - ) - -decodeFrames :: MonadError String m => ByteString -> m Frames -decodeFrames = \case - "" -> pure mempty - - bs' -> (bs' &) $ BSL.fromStrict >>> either (throwError . view _3) (pure . Frames . view _3) - <$> runGetOrFail do - - fix \go -> do - - j <- getWord8 - - size <- - flip fix (6, j) \fu (b, j') -> do - let n = (fromIntegral . clearLeftBits (8-b)) j' - if (tbit b j') - then (n +) . flip shiftL b <$> (fu . (7, ) =<< getWord8) - else pure n - - bs <- getByteString size - - let moreFrames = tbit 7 j - - if moreFrames - then (bs :) <$> go - else pure [bs] - - where - clearLeftBits n = flip shiftR n . flip shiftL n - tbit = flip testBit - - -devDialogCore :: IO () -devDialogCore = do - display (Frames []) - display (Frames [""]) - display (Frames [BS.replicate 32 0x55]) - display (Frames [BS.replicate 32 0x55, ""]) - display (Frames [BS.replicate 32 0x55, "\3\3"]) - display (Frames [BS.replicate 33 0x55, "\3\3"]) - display (Frames [BS.replicate 63 0x55]) - display (Frames [BS.replicate 64 0x55]) - -- display (Frames [BS.replicate 65 0x55]) - display (Frames ["\8\8\8","\4\4\4"]) - display (Frames ["","\1"]) - where - display a = do - putStrLn . cs . show . B16.encode . encodeFrames $ a - putStrLn "" - - - - -sbit :: (Bits n) => Int -> n -> n -sbit = flip setBit - -zbit :: (Bits n) => Int -> n -> n -zbit = flip clearBit - ---- - -decodeFramesFail :: (MonadFail m) => ByteString -> m Frames -decodeFramesFail = errorToFail . decodeFrames - ---- - -errorToFailT :: (MonadFail m) => ExceptT String m a -> m a -errorToFailT = either fail pure <=< runExceptT - -errorToFail :: MonadFail m => Except String a -> m a -errorToFail = either fail pure . runExcept - -errorShowToFail :: (MonadFail m, Show s) => Except s a -> m a -errorShowToFail = either (fail . show) pure . runExcept - --- - -data CallerID = CallerID - { unCallerIDV :: Word8 - , unCallerID :: Word32 - } - deriving stock (Generic, Eq, Ord) - -instance Serialise CallerID - -newCallerID :: forall m. MonadIO m => m CallerID -newCallerID = liftIO $ withSystemRandomST \g -> - CallerID <$> (uniformM g) <*> (uniformM g) - ---- - -newtype CallerHandler m = CallerHandler - { unCallerHandler :: Frames -> m () - } - -newtype CallerEnv m = CallerEnv - { unCallerEnv :: TVar (Map CallerID (CallerHandler m)) } - -newCallerEnv :: MonadIO m => m (CallerEnv m') -newCallerEnv = CallerEnv <$> newTVarIO mempty - ---- - -newtype RequestResponseHandler m = RequestResponseHandler - { unRequestResponseHandler :: Frames -> m () - } - -newtype RequestResponseEnv m = RequestResponseEnv - { unRequestResponseEnv :: TVar (Map RequestID (RequestResponseHandler m)) - } - -newRequestResponseEnv :: MonadIO m => m (RequestResponseEnv m') -newRequestResponseEnv = - RequestResponseEnv <$> newTVarIO mempty - ---- - -data DClient m p i = DClient - { clientCallerEnv :: CallerEnv m - , clientSendProtoRequest :: p -> Frames -> m () - , clientGetKnownPeers :: m [(p, i)] - } - ---- - -newtype RequestID = RequestID { unRequestID :: Word32 } - deriving stock (Generic, Eq, Ord) - deriving newtype (Serialise, Num, Enum) - -- deriving via TODO_GenericVLQ Put Get - -data RequestResult - = RequestDone - -- | RequestSuccessIncomplete - | RequestTimeout - | RequestFailure ResponseStatusCode Frames - | RequestErrorBadResponse BadResponse Frames - deriving stock (Generic, Eq, Show) - -data BadResponse - = ResponseErrorNoResponseHeader - | ResponseInsufficientFrames - | ResponseParseError DeserialiseFailure - deriving stock (Generic, Eq, Show) - ---- - -setupCallerEnv :: MonadIO m => CallerEnv m' -> CallerID -> CallerHandler m' -> m () -setupCallerEnv env callerID repHandleEnv = - (atomically . modifyTVar' (unCallerEnv env)) - (at callerID .~ Just repHandleEnv) - -clearCallerEnv :: MonadIO m => CallerEnv m' -> CallerID -> m () -clearCallerEnv env callerID = - (atomically . modifyTVar' (unCallerEnv env)) - (at callerID .~ Nothing) - -findCallerHandler :: MonadIO m => CallerEnv m' -> CallerID -> m (Maybe (CallerHandler m')) -findCallerHandler CallerEnv{..} callerID = - (atomically (readTVar unCallerEnv)) <&> (preview (ix callerID)) - ---- - -setupRepHandler :: MonadIO m => RequestResponseEnv m' -> RequestID -> RequestResponseHandler m' -> m () -setupRepHandler RequestResponseEnv{..} requestID useResponse = - (atomically . modifyTVar' unRequestResponseEnv) - (at requestID .~ Just useResponse) - -clearRepHandler :: MonadIO m => RequestResponseEnv m' -> RequestID -> m () -clearRepHandler RequestResponseEnv{..} requestID = - (atomically . modifyTVar' unRequestResponseEnv) - (at requestID .~ Nothing) - -findRepHandler :: MonadIO m => RequestResponseEnv m' -> RequestID -> m (Maybe (RequestResponseHandler m')) -findRepHandler RequestResponseEnv{..} requestID = - (atomically (readTVar unRequestResponseEnv)) <&> (preview (ix requestID)) - ---- - -data DialogRequestEnv m p pd = DialogRequestEnv - { dreqEnvPeer :: p - , dreqEnvGetPeerData :: m pd - } - --- data DialogRequestError --- = DialogRequestFailure String --- deriving stock (Show) --- instance Exception DialogRequestError - ---- - -newtype DialogRequestRouter m = DialogRequestRouter - { unDialogRequestRouter :: - Map [ByteString] -- path - - -- handler :: Input -> m (Either ErrorMessage (HowToReply -> ResponseContinuation)) - (Frames -> Either Text ((Frames -> m ()) -> m ())) - } - - deriving (Semigroup, Monoid) - -dialogRequestRoutes - :: ListBuilder - ([ByteString], Frames -> Either Text ((Frames -> m ()) -> m ())) - -> DialogRequestRouter m -dialogRequestRoutes = DialogRequestRouter . Map.fromList . buildList - -hand :: Monad m => a -> b -> ListBuilderT m (a, b) -hand = curry li - ---- - -dpath :: Text -> [ByteString] -> Frames -dpath path = Frames . (cs path :) - ---- - -addEnvelope :: Monoid a => [a] -> Frames' a -> Frames' a -addEnvelope en = over #unFrames ((en <> [mempty]) <>) - -splitEnvelope :: (Monoid a, Eq a) => Frames' a -> ([a], Frames' a) -splitEnvelope = fmap (Frames . List.drop 1) . List.break (== mempty) . unFrames - -data ResponseHeader = ResponseHeader - { respStatus :: ResponseStatus - , respSeqNumber :: Int - } - deriving (Generic, Show, Eq) - -instance Serialise ResponseHeader - -data ResponseStatus = ResponseStatus - { responseStatusCode :: ResponseStatusCode - , responseStatusMessage :: Text - } - deriving (Generic, Show, Eq) - -instance Serialise ResponseStatus - -data ResponseStatusCode - = Success200 - | SuccessMore - | BadRequest400 - | Forbidden403 - | NotFound404 - deriving (Generic, Show, Eq) - -instance Serialise ResponseStatusCode - -routerSignature :: Word8 -routerSignature = 1 - -routeDialogRequest :: forall m p pd . - Monad m - => DialogRequestRouter m - -> DialogRequestEnv m p pd - -> (Frames -> m ()) - -> Frames - -> m () -routeDialogRequest router drenv rawReplyToPeer frames = do - erun <- pure $ runExcept $ flip evalStateT req do - - signature <- cutFrameDecode - (ResponseStatus BadRequest400 "No signature in request") - - when (signature /= routerSignature) $ throwError - (ResponseStatus BadRequest400 "Wrong signature in request") - - route <- cutFrameOr - (ResponseStatus BadRequest400 "No route in request") - - h <- fromJustThrowError - (ResponseStatus NotFound404 "Route not found") - (unDialogRequestRouter router ^? ix (BS8.split '/' route)) - - lift . ExceptT . pure - -- Если не может разобрать параметры запроса, - -- то самим ответить этому пиру '404' - . left (ResponseStatus BadRequest400) - . h - -- передать оставшуюся часть запроса в хэндлер - =<< get - - case erun of - Left rs -> replyToPeer (Frames [serialiseS (ResponseHeader rs 0)]) - Right run -> - -- передать хэндлеру продолжение чтобы ответить этому пиру - run replyToPeer - - where - (backPath, req) = splitEnvelope frames - - replyToPeer :: Frames -> m () - replyToPeer = rawReplyToPeer . over #unFrames (backPath <>) - -cutFrameDecode :: (Serialise b, MonadState Frames m, MonadError e m) => e -> m b -cutFrameDecode e = - State.gets unFrames >>= \case - x:xs -> - (either (const (throwError e)) pure . deserialiseOrFailS) x - <* State.put (Frames xs) - _ -> throwError e - -cutFrameDecode' - :: (Serialise b, MonadState Frames m, MonadError (Maybe DeserialiseFailure) m) - => m b -cutFrameDecode' = - State.gets unFrames >>= \case - x:xs -> - (either (throwError . Just) pure . deserialiseOrFailS) x - <* State.put (Frames xs) - _ -> throwError Nothing - -cutFrameOr :: (MonadState (Frames' b) m, MonadError e m) => e -> m b -cutFrameOr e = - State.gets unFrames >>= \case - x:xs -> x <$ State.put (Frames xs) - _ -> throwError e - -serialiseS :: Serialise a => a -> ByteString -serialiseS = BSL.toStrict . serialise - -deserialiseOrFailS :: Serialise a => ByteString -> Either DeserialiseFailure a -deserialiseOrFailS = deserialiseOrFail . BSL.fromStrict - -fromMaybeM :: Applicative m => m a -> Maybe a -> m a -fromMaybeM ma = maybe ma pure - -fromJustThrowError :: MonadError e m => e -> Maybe a -> m a -fromJustThrowError = fromMaybeM . throwError - diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index 36ae0f97..ac81b2c2 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -11,6 +11,7 @@ module HBS2.Actors.Peer import HBS2.Actors import HBS2.Actors.Peer.Types import HBS2.Clock +import HBS2.Data.Types.Crypto import HBS2.Data.Types.Peer import HBS2.Defaults import HBS2.Events @@ -158,30 +159,8 @@ data PeerEnv e = , _envSweepers :: TVar (HashMap SKey [PeerM e IO ()]) , _envReqMsgLimit :: Cache (Peer e, Integer, Encoded e) () , _envReqProtoLimit :: Cache (Peer e, Integer) () - , _envAsymmetricKeyPair :: AsymmKeypair (Encryption e) - , _envEncryptionKeys :: TVar (HashMap (PeerData e) (CommonSecret (Encryption e))) } -setEncryptionKey :: - ( Hashable (PubKey 'Sign (Encryption L4Proto)) - , Hashable PeerNonce - , Show (PubKey 'Sign (Encryption L4Proto)) - , Show PeerNonce - , Show (CommonSecret (Encryption L4Proto)) - ) => PeerEnv L4Proto -> Peer L4Proto -> PeerData L4Proto -> Maybe (CommonSecret (Encryption L4Proto)) -> IO () -setEncryptionKey penv peer pd msecret = do - atomically $ modifyTVar' (_envEncryptionKeys penv) $ Lens.at pd .~ msecret - case msecret of - Nothing -> trace $ "ENCRYPTION delete key" <+> pretty peer <+> viaShow pd - Just k -> trace $ "ENCRYPTION store key" <+> pretty peer <+> viaShow pd <+> viaShow k - -getEncryptionKey :: - ( Hashable (PubKey 'Sign (Encryption L4Proto)) - , Hashable PeerNonce - ) => PeerEnv L4Proto -> PeerData L4Proto -> IO (Maybe (CommonSecret (Encryption L4Proto))) -getEncryptionKey penv pd = - readTVarIO (_envEncryptionKeys penv) <&> preview (Lens.ix pd) - newtype PeerM e m a = PeerM { fromPeerM :: ReaderT (PeerEnv e) m a } deriving newtype ( Functor , Applicative @@ -189,6 +168,7 @@ newtype PeerM e m a = PeerM { fromPeerM :: ReaderT (PeerEnv e) m a } , MonadReader (PeerEnv e) , MonadIO , MonadUnliftIO + , MonadTrans ) @@ -212,10 +192,10 @@ makeLenses 'PeerEnv makeLenses 'ResponseEnv -runResponseM :: forall e m . (Monad m) +runResponseM :: forall e m a . (Monad m) => Peer e - -> ResponseM e m () - -> m () + -> ResponseM e m a + -> m a runResponseM peer f = runReaderT (fromResponse f) (ResponseEnv peer) @@ -434,8 +414,6 @@ newPeerEnv s bus p = do _envSweepers <- liftIO (newTVarIO mempty) _envReqMsgLimit <- liftIO (Cache.newCache (Just defRequestLimit)) _envReqProtoLimit <- liftIO (Cache.newCache (Just defRequestLimit)) - _envAsymmetricKeyPair <- asymmNewKeypair @(Encryption e) - _envEncryptionKeys <- liftIO (newTVarIO mempty) pure PeerEnv {..} runPeerM :: forall e m . ( MonadIO m diff --git a/hbs2-core/lib/HBS2/Data/Types/Refs.hs b/hbs2-core/lib/HBS2/Data/Types/Refs.hs index 241d4d10..541c3434 100644 --- a/hbs2-core/lib/HBS2/Data/Types/Refs.hs +++ b/hbs2-core/lib/HBS2/Data/Types/Refs.hs @@ -8,7 +8,7 @@ module HBS2.Data.Types.Refs import HBS2.Base58 import HBS2.Hash import HBS2.Merkle -import HBS2.Net.Proto.Types (Encryption) +import HBS2.Net.Proto.Types import HBS2.Net.Auth.Credentials import HBS2.Prelude diff --git a/hbs2-core/lib/HBS2/Net/Auth/AccessKey.hs b/hbs2-core/lib/HBS2/Net/Auth/AccessKey.hs index abf84c17..4d4c0b8e 100644 --- a/hbs2-core/lib/HBS2/Net/Auth/AccessKey.hs +++ b/hbs2-core/lib/HBS2/Net/Auth/AccessKey.hs @@ -10,6 +10,7 @@ import HBS2.Data.Types import HBS2.Merkle import HBS2.Net.Auth.Credentials import HBS2.Net.Proto.Definition +import HBS2.Net.Proto.Types import HBS2.Prelude.Plated import Codec.Serialise diff --git a/hbs2-core/lib/HBS2/Net/Auth/Credentials.hs b/hbs2-core/lib/HBS2/Net/Auth/Credentials.hs index 0ae23baf..1a866395 100644 --- a/hbs2-core/lib/HBS2/Net/Auth/Credentials.hs +++ b/hbs2-core/lib/HBS2/Net/Auth/Credentials.hs @@ -28,11 +28,6 @@ import Data.Kind type family EncryptPubKey e :: Type -data CryptoAction = Sign | Encrypt - -type family PubKey ( a :: CryptoAction) e :: Type -type family PrivKey ( a :: CryptoAction) e :: Type - class Signatures e where type family Signature e :: Type makeSign :: PrivKey 'Sign e -> ByteString -> Signature e @@ -207,4 +202,3 @@ instance IsEncoding (PubKey 'Encrypt e) => Pretty (KeyringEntry e) where pretty ke = fill 10 "pub-key:" <+> pretty (AsBase58 (Crypto.encode (view krPk ke))) - diff --git a/hbs2-core/lib/Dialog/Client.hs b/hbs2-core/lib/HBS2/Net/Dialog/Client.hs similarity index 93% rename from hbs2-core/lib/Dialog/Client.hs rename to hbs2-core/lib/HBS2/Net/Dialog/Client.hs index 5d8a987d..dd4d2465 100644 --- a/hbs2-core/lib/Dialog/Client.hs +++ b/hbs2-core/lib/HBS2/Net/Dialog/Client.hs @@ -1,7 +1,7 @@ {-# LANGUAGE OverloadedLabels #-} {-# LANGUAGE StrictData #-} {-# LANGUAGE ImpredicativeTypes #-} -module Dialog.Client where +module HBS2.Net.Dialog.Client where -- import System.Clock -- import System.Timeout @@ -34,8 +34,8 @@ import UnliftIO.Exception import UnliftIO.STM import UnliftIO.Timeout -import Dialog.Core -import Dialog.Helpers.Streaming +import HBS2.Net.Dialog.Core +import HBS2.Net.Dialog.Helpers.Streaming --- @@ -96,10 +96,11 @@ dQuery' par dcli peer rq go = processResponseHeader rhxs@(rh, xs) = case ((responseStatusCode . respStatus) rh) of Success200 -> Left (Just rhxs, RequestDone) + SuccessNoContent204 -> Left (Just rhxs, RequestDone) SuccessMore -> Right rhxs - r@BadRequest400 -> Left (Nothing, (RequestFailure r xs)) - r@Forbidden403 -> Left (Nothing, (RequestFailure r xs)) - r@NotFound404 -> Left (Nothing, (RequestFailure r xs)) + BadRequest400 -> Left (Nothing, (RequestFailure (respStatus rh) xs)) + Forbidden403 -> Left (Nothing, (RequestFailure (respStatus rh) xs)) + NotFound404 -> Left (Nothing, (RequestFailure (respStatus rh) xs)) rq' = rq & #unFrames %~ ([serialiseS routerSignature] <>) diff --git a/hbs2-core/lib/HBS2/Net/Dialog/Core.hs b/hbs2-core/lib/HBS2/Net/Dialog/Core.hs new file mode 100644 index 00000000..60aef062 --- /dev/null +++ b/hbs2-core/lib/HBS2/Net/Dialog/Core.hs @@ -0,0 +1,831 @@ +{-# Language AllowAmbiguousTypes #-} +{-# LANGUAGE DeriveDataTypeable #-} +{-# LANGUAGE InstanceSigs #-} +{-# LANGUAGE PolyKinds #-} +{-# LANGUAGE QuantifiedConstraints #-} +{-# LANGUAGE StrictData #-} +{-# LANGUAGE TypeOperators #-} +{-# LANGUAGE UndecidableInstances #-} + +-- {-# LANGUAGE ConstraintKinds #-} +-- {-# LANGUAGE OverloadedLists #-} + +-- {-# LANGUAGE CPP #-} +-- {-# LANGUAGE DataKinds #-} +-- {-# LANGUAGE FlexibleContexts #-} +-- {-# LANGUAGE FlexibleInstances #-} +-- {-# LANGUAGE MultiParamTypeClasses #-} +-- {-# LANGUAGE OverloadedStrings #-} +-- {-# LANGUAGE RankNTypes #-} +-- {-# LANGUAGE ScopedTypeVariables #-} +-- {-# LANGUAGE TupleSections #-} +-- {-# LANGUAGE TypeApplications #-} +-- {-# LANGUAGE TypeFamilies #-} + + +module HBS2.Net.Dialog.Core where + +import Codec.Serialise +import Control.Arrow +import Control.Monad +import Control.Monad.Error.Class +import Control.Monad.Except (Except, ExceptT(..), runExcept, runExceptT) +import Control.Monad.IO.Class +import Control.Monad.State.Class as State +import Control.Monad.State.Strict as StateStrict (evalState, evalStateT, runStateT, StateT(..)) +import Control.Monad.Trans.Class +import Data.Binary.Get as Get +import Data.Binary.Put as Put +import Data.Bits +import Data.Bool +import Data.ByteString (ByteString) +import Data.ByteString qualified as BS +import Data.ByteString.Base16 qualified as B16 +import Data.ByteString.Char8 qualified as BS8 +import Data.ByteString.Lazy qualified as BSL +import Data.Constraint (Dict(..)) +import Data.Foldable as F +import Data.Function +import Data.Functor +import Data.Generics.Labels () +import Data.Generics.Product.Fields () +import Data.Generics.Sum.Constructors +import Data.Kind +import Data.List qualified as List +import Data.List.NonEmpty qualified as NE +import Data.Map.Strict as Map +import Data.Maybe +import Data.Proxy +import Data.String.Conversions as X (cs) +import Data.Text (Text) +import Data.Typeable +import Data.Word +import GHC.Generics ((:*:) (..), Generic (..), K1 (..), M1 (..)) +import GHC.Generics qualified as Generics +import GHC.TypeLits +import Lens.Micro.Platform +import Streaming +import System.Random.MWC +import UnliftIO.STM + +import HBS2.Net.Dialog.Helpers.List + +type Frames = Frames' ByteString +newtype Frames' a = Frames { unFrames :: [a] } + deriving stock (Generic,Eq) + deriving newtype (Functor, Foldable, Semigroup, Monoid + -- , IsList + ) + + +instance Show Frames where + show (Frames xs) = "Frames " <> show (BS.length <$> xs) + -- <> " " <> show (fmap B16.encode xs) + <> " " <> (show . fmap (limitSize 42)) xs + + where + limitSize n as = bool as (BS.take (n-3) as <> "...") (BS.length as > n) + +framesBodyPart :: Traversal' Frames [ByteString] +framesBodyPart = #unFrames . tailAfterP (== "") + +tailAfterP :: forall a . (a -> Bool) -> Traversal' [a] [a] +tailAfterP p focus = fix \go -> \case + x:xs -> (x :) <$> bool go focus (p x) xs + xs -> pure xs + +--- + +encodeFrames :: Frames -> ByteString +-- encodeFrames :: Foldable t => t ByteString -> ByteString +encodeFrames = F.toList >>> BSL.toStrict . runPut . \case + + [] -> pure () + + xss -> flip fix xss \go -> \case + [] -> pure () + bs:xs -> do + let (flip shiftR 1 -> n1, ns) = unfoldSizeBytes @Word64 . flip shiftL 1 . fromIntegral . BS.length $ bs + + putWord8 $ n1 + & bool (sbit 7) id (List.null xs) + & bool (sbit 6) id (List.null ns) + + forM_ (markMore ns) \(n, isMoreBytesInSize) -> do + putWord8 $ n & bool (zbit 7) (sbit 7) isMoreBytesInSize + + putByteString bs + + go xs + + where + + markMore as = zip as ((True <$ List.drop 1 as) <> [False]) + + unfoldSizeBytes :: (Bits n, Integral n) => n -> (Word8, [Word8]) + unfoldSizeBytes = (\(a NE.:| as) -> (a, as)) . NE.unfoldr \w -> + ( (flip shiftR 1 . flip shiftL 1 . fromIntegral) w + , let w' = shiftR w 7 + in bool Nothing (Just w') (w' > 0) + ) + +decodeFrames :: MonadError String m => ByteString -> m Frames +decodeFrames = \case + "" -> pure mempty + + bs' -> (bs' &) $ BSL.fromStrict >>> either (throwError . view _3) (pure . Frames . view _3) + <$> runGetOrFail do + + fix \go -> do + + j <- getWord8 + + bsize <- + flip fix (6, j) \fu (b, j') -> do + let n = (fromIntegral . clearLeftBits (8-b)) j' + if tbit b j' + then (n +) . flip shiftL b <$> (fu . (7, ) =<< getWord8) + else pure n + + bs <- getByteString bsize + + let moreFrames = tbit 7 j + + if moreFrames + then (bs :) <$> go + else pure [bs] + + where + clearLeftBits n = flip shiftR n . flip shiftL n + tbit = flip testBit + + +devDialogCore :: IO () +devDialogCore = do + display (Frames []) + display (Frames [""]) + display (Frames [BS.replicate 32 0x55]) + display (Frames [BS.replicate 32 0x55, ""]) + display (Frames [BS.replicate 32 0x55, "\3\3"]) + display (Frames [BS.replicate 33 0x55, "\3\3"]) + display (Frames [BS.replicate 63 0x55]) + display (Frames [BS.replicate 64 0x55]) + -- display (Frames [BS.replicate 65 0x55]) + display (Frames ["\8\8\8","\4\4\4"]) + display (Frames ["","\1"]) + where + display a = do + putStrLn . cs . show . B16.encode . encodeFrames $ a + putStrLn "" + + + + +sbit :: (Bits n) => Int -> n -> n +sbit = flip setBit + +zbit :: (Bits n) => Int -> n -> n +zbit = flip clearBit + +--- + +decodeFramesFail :: (MonadFail m) => ByteString -> m Frames +decodeFramesFail = errorToFail . decodeFrames + +--- + +errorToFailT :: (MonadFail m) => ExceptT String m a -> m a +errorToFailT = either fail pure <=< runExceptT + +errorToFail :: MonadFail m => Except String a -> m a +errorToFail = either fail pure . runExcept + +errorShowToFail :: (MonadFail m, Show s) => Except s a -> m a +errorShowToFail = either (fail . show) pure . runExcept + +-- + +data CallerID = CallerID + { unCallerIDV :: Word8 + , unCallerID :: Word32 + } + deriving stock (Generic, Eq, Ord) + +instance Serialise CallerID + +newCallerID :: forall m. MonadIO m => m CallerID +newCallerID = liftIO $ withSystemRandomST \g -> + CallerID <$> uniformM g <*> uniformM g + +--- + +newtype CallerHandler m = CallerHandler + { unCallerHandler :: Frames -> m () + } + +newtype CallerEnv m = CallerEnv + { unCallerEnv :: TVar (Map CallerID (CallerHandler m)) } + +newCallerEnv :: MonadIO m => m (CallerEnv m') +newCallerEnv = CallerEnv <$> newTVarIO mempty + +--- + +newtype RequestResponseHandler m = RequestResponseHandler + { unRequestResponseHandler :: Frames -> m () + } + +newtype RequestResponseEnv m = RequestResponseEnv + { unRequestResponseEnv :: TVar (Map RequestID (RequestResponseHandler m)) + } + +newRequestResponseEnv :: MonadIO m => m (RequestResponseEnv m') +newRequestResponseEnv = + RequestResponseEnv <$> newTVarIO mempty + +--- + +data DClient m p i = DClient + { clientCallerEnv :: CallerEnv m + , clientSendProtoRequest :: p -> Frames -> m () + , clientGetKnownPeers :: m [(p, i)] + } + +--- + +newtype RequestID = RequestID { unRequestID :: Word32 } + deriving stock (Generic, Eq, Ord) + deriving newtype (Serialise, Num, Enum) + -- deriving via TODO_GenericVLQ Put Get + +data RequestResult + = RequestDone + -- | RequestSuccessIncomplete + | RequestTimeout + | RequestFailure ResponseStatus Frames + | RequestErrorBadResponse BadResponse Frames + deriving stock (Generic, Eq, Show) + +data BadResponse + = ResponseErrorNoResponseHeader + | ResponseInsufficientFrames + | ResponseParseError DeserialiseFailure + deriving stock (Generic, Eq, Show) + +--- + +setupCallerEnv :: MonadIO m => CallerEnv m' -> CallerID -> CallerHandler m' -> m () +setupCallerEnv env callerID repHandleEnv = + (atomically . modifyTVar' (unCallerEnv env)) + (at callerID ?~ repHandleEnv) + +clearCallerEnv :: MonadIO m => CallerEnv m' -> CallerID -> m () +clearCallerEnv env callerID = + (atomically . modifyTVar' (unCallerEnv env)) + (at callerID .~ Nothing) + +findCallerHandler :: MonadIO m => CallerEnv m' -> CallerID -> m (Maybe (CallerHandler m')) +findCallerHandler CallerEnv{..} callerID = + readTVarIO unCallerEnv <&> preview (ix callerID) + +--- + +setupRepHandler :: MonadIO m => RequestResponseEnv m' -> RequestID -> RequestResponseHandler m' -> m () +setupRepHandler RequestResponseEnv{..} requestID useResponse = + (atomically . modifyTVar' unRequestResponseEnv) + (at requestID ?~ useResponse) + +clearRepHandler :: MonadIO m => RequestResponseEnv m' -> RequestID -> m () +clearRepHandler RequestResponseEnv{..} requestID = + (atomically . modifyTVar' unRequestResponseEnv) + (at requestID .~ Nothing) + +findRepHandler :: MonadIO m => RequestResponseEnv m' -> RequestID -> m (Maybe (RequestResponseHandler m')) +findRepHandler RequestResponseEnv{..} requestID = + readTVarIO unRequestResponseEnv <&> preview (ix requestID) + +--- + +data DialogRequestEnv m p pd = DialogRequestEnv + { dreqEnvPeer :: p + , dreqEnvGetPeerData :: m pd + } + +-- data DialogRequestError +-- = DialogRequestFailure String +-- deriving stock (Show) +-- instance Exception DialogRequestError + +--- + +-- type Application = Request -> (Response -> IO ResponseReceived) -> IO ResponseReceived +type DApp m = Frames -> (Frames -> m ()) -> m () + +mkDApp :: + forall spec ctx m io. + ( Monad m + , Monad io + , HasHandler m (NamedSpec spec) ctx + , HasHandler io (NamedSpec spec) ctx + ) + => Proxy (NamedSpec spec) + -> Ctx ctx + -> (forall x. m x -> DialHandlerT io x) + -> spec (ModeServerT m) + -> DApp io +mkDApp p ctx ntToDialHandlerTn hd = routeDialogRequest rr + where + rr :: DialogRequestRouter io + rr = route p ctx + $ hoistDialogWithContext p (Proxy @ctx) ntToDialHandlerTn + hd + +type DialogReplyHandler m = (Frames -> m ()) -> m () + +type DialogRequestRouter (m :: Type -> Type) = + DialogRequestRoutes (DialogReplyHandler m) + +data DialogRequestRoutes (h :: Type) + = DialogRequestPaths (Map ByteString (DialogRequestRoutes h)) + | DialogRequestPreparse (Frames -> Either Text (DialogRequestRoutes h, Frames)) + | DialogRequestEndpoint h + deriving (Generic, Functor) + +instance Semigroup (DialogRequestRoutes h) where + (<>) a b = case (a, b) of + (DialogRequestPaths p1, DialogRequestPaths p2) -> + DialogRequestPaths (p1 <> p2) + _ -> b + +-- instance Monoid (DialogRequestRoutes h) where +-- mempty = DialogRequestPaths mempty + +dialogRequestRoutes + :: ListBuilder + ([ByteString], Frames -> Either Text ((Frames -> m ()) -> m (), Frames)) + -> DialogRequestRouter m +dialogRequestRoutes = List.foldl1' (<>) + . fmap toPaths + . over (traverse . _2) (DialogRequestPreparse . (fmap . fmap) (over _1 DialogRequestEndpoint)) + . buildList + where + toPaths :: ([ByteString], DialogRequestRoutes ((Frames -> m ()) -> m ())) + -> DialogRequestRoutes (DialogReplyHandler m) + toPaths = fix \go (ps, rr) -> case ps of + [] -> rr + [p] -> DialogRequestPaths (Map.singleton p rr) + p:px' -> DialogRequestPaths (Map.singleton p (go (px', rr))) + +hand :: Monad m => a -> b -> ListBuilderT m (a, b) +hand = curry li + +handconv :: (Monad m, Monad m', Serialise req, Serialise resp) + => a + -> Text + -> (req -> ExceptT ResponseStatus m resp) + -> ListBuilderT m' (a, Frames -> Either Text ((Frames -> m ()) -> m (), Frames)) +handconv path msg h = + hand path $ processReply msg h + +--- + +processReply :: forall m m' req resp . + ( Monad m + , Serialise req + , Serialise resp + , m' ~ ExceptT ResponseStatus m + ) + => Text + -> (req -> m' resp) + -> Frames + -> Either Text ((Frames -> m ()) -> m (), Frames) +processReply msg h = runExcept . runStateT do + flip runReply . h <$> cutFrameDecode msg + +runReply :: + ( Monad m + , Serialise a + ) + => (Frames -> m r) + -> ExceptT ResponseStatus m a + -> m r +runReply reply = + either + (\e -> reply (Frames [serialiseS (ResponseHeader e 0)])) + (\a -> reply (Frames [serialiseS (ResponseHeader (ResponseStatus Success200 "") 0) + , serialiseS a + ]) + ) + <=< runExceptT + +--- + +dpath :: Text -> [ByteString] -> Frames +dpath path = Frames . (cs path :) + +--- + +addEnvelope :: Monoid a => [a] -> Frames' a -> Frames' a +addEnvelope en = over #unFrames ((en <> [mempty]) <>) + +splitEnvelope :: (Monoid a, Eq a) => Frames' a -> ([a], Frames' a) +splitEnvelope = fmap (Frames . List.drop 1) . List.break (== mempty) . unFrames + +data ResponseHeader = ResponseHeader + { respStatus :: ResponseStatus + , respSeqNumber :: Int + } + deriving (Generic, Show, Eq) + +instance Serialise ResponseHeader + +data ResponseStatus = ResponseStatus + { responseStatusCode :: ResponseStatusCode + , responseStatusMessage :: Text + } + deriving (Generic, Show, Eq) + +instance Serialise ResponseStatus + +data ResponseStatusCode + = Success200 + | SuccessNoContent204 + | SuccessMore + | BadRequest400 + | Forbidden403 + | NotFound404 + deriving (Generic, Show, Eq) + +instance Serialise ResponseStatusCode + +routerSignature :: Word8 +routerSignature = 1 + +routeDialogRequest :: forall m . + Monad m + => DialogRequestRouter m + -> Frames + -> (Frames -> m ()) + -> m () +routeDialogRequest router frames rawReplyToPeer = do + -- error $ show router + erun <- pure $ runExcept $ flip evalStateT req do + + signature <- cutFrameDecode + (ResponseStatus BadRequest400 "No signature in request") + + when (signature /= routerSignature) $ throwError + (ResponseStatus BadRequest400 "Wrong signature in request") + + path <- cutFrameOr + (ResponseStatus BadRequest400 "No path in request") + + lift . ExceptT . pure + -- Если не может разобрать параметры запроса, + -- то самим ответить этому пиру '404' + -- . left (ResponseStatus BadRequest400) + . travel (BS8.split '/' path) router + -- передать оставшуюся часть запроса в хэндлер + =<< get + + case erun of + Left rs -> replyToPeer (Frames [serialiseS (ResponseHeader rs 0)]) + Right go -> + -- передать хэндлеру продолжение чтобы ответить этому пиру + go replyToPeer + + where + (backPath, req) = splitEnvelope frames + + replyToPeer :: Frames -> m () + replyToPeer = rawReplyToPeer . over #unFrames (backPath <>) + +travel :: () + => [ByteString] + -> DialogRequestRouter m + -> Frames + -> Either ResponseStatus ((Frames -> m ()) -> m ()) +travel path'' router'' = evalStateT $ flipfix2 path'' router'' + \go path -> \case + DialogRequestPaths kv -> case path of + step:path' -> + maybe + (throwError (ResponseStatus BadRequest400 "Path not found")) + (go path') + (Map.lookup step kv) + _ -> throwError (ResponseStatus BadRequest400 "Path not found (too long)") + DialogRequestPreparse hfx -> + go path =<< StateT (left (ResponseStatus BadRequest400) . hfx) + DialogRequestEndpoint ep -> pure ep + +flipfix2 :: a -> b -> ((a -> b -> c) -> (a -> b -> c)) -> c +flipfix2 a b f = fix f a b + +cutFrameDecode :: (Serialise b, MonadState Frames m, MonadError e m) => e -> m b +cutFrameDecode e = + State.gets unFrames >>= \case + x:xs -> + (either (const (throwError e)) pure . deserialiseOrFailS) x + <* State.put (Frames xs) + _ -> throwError e + +cutFrameDecode' + :: (Serialise b, MonadState Frames m, MonadError (Maybe DeserialiseFailure) m) + => m b +cutFrameDecode' = + State.gets unFrames >>= \case + x:xs -> + (either (throwError . Just) pure . deserialiseOrFailS) x + <* State.put (Frames xs) + _ -> throwError Nothing + +cutFrameOr :: (MonadState (Frames' b) m, MonadError e m) => e -> m b +cutFrameOr e = + State.gets unFrames >>= \case + x:xs -> x <$ State.put (Frames xs) + _ -> throwError e + +serialiseS :: Serialise a => a -> ByteString +serialiseS = BSL.toStrict . serialise + +deserialiseOrFailS :: Serialise a => ByteString -> Either DeserialiseFailure a +deserialiseOrFailS = deserialiseOrFail . BSL.fromStrict + +fromMaybeM :: Applicative m => m a -> Maybe a -> m a +fromMaybeM ma = maybe ma pure + +fromJustThrowError :: MonadError e m => e -> Maybe a -> m a +fromJustThrowError = fromMaybeM . throwError + + + +------------------------------------------ +--- Type-level specification ------------- +------------------------------------------ + + +data ReqCBOR (a :: Type) +data SingleAck +data SingleRespCBOR (a :: Type) +data StreamingRespCBOR (a :: Type) + +data NamedSpec (spec :: Type -> Type) + +class DialMode mode where + type mode &- spec :: Type +infixl 0 &- + +data (path :: k) &/ (a :: Type) + deriving (Typeable) +infixr 4 &/ + +type path &// a = path &/ NamedSpec a +infixr 4 &// + +--- + +data ModePlain +instance DialMode ModePlain where + type ModePlain &- spec = spec + +--- + +data ModeServerT (m :: Type -> Type) + +instance DialMode (ModeServerT m) where + type ModeServerT m &- spec = HandlerD spec m + +class HasHandler m spec ctx where + type HandlerD spec (m' :: Type -> Type) :: Type + + route :: + Proxy spec + -> Ctx ctx + -> HandlerD spec (DialHandlerT m) + -> DialogRequestRouter m + + hoistDialogWithContext + :: Proxy spec + -> Proxy ctx + -> (forall x. m x -> n x) + -> HandlerD spec m + -> HandlerD spec n + +data EmptyCX -- '[] +data Ctx ctx where + EmptyCtx :: Ctx EmptyCX + -- (:&.) :: x -> Ctx xs -> Ctx (x ': xs) +-- infixr 5 :&. + +-- hoistTRouter :: forall t m n . +-- (MonadTrans t, Monad m, Monad n, m ~ t n) +-- => (forall a . m a -> n a) +-- -> DialogRequestRouter m +-- -> DialogRequestRouter n +-- hoistTRouter nt = fmap nt' +-- where +-- nt' :: ((x -> m y) -> m y) +-- -> ((x -> n y) -> n y) +-- nt' xtmy_tmy = nt . xtmy_tmy . fmap lift + +hoistTRouter :: forall m n . + (Monad m, Monad n) + => (forall a . m a -> n a) + -> (forall a . n a -> m a) + -> DialogRequestRouter m + -> DialogRequestRouter n +hoistTRouter ntf ntb = fmap nt' + where + nt' :: ((x -> m y) -> m y) + -> ((x -> n y) -> n y) + nt' xtmy_tmy = ntf . xtmy_tmy . fmap ntb + + +type DialHandlerIO a = DialHandlerT IO a +newtype DialHandlerT m a = DialHandlerT { runDialHandlerT :: ExceptT ResponseStatus m a } + deriving + ( Generic, Functor, Applicative, Monad + , MonadIO + , MonadTrans + , MonadError ResponseStatus + -- , MonadUnliftIO + -- , MonadThrow, MonadCatch, MonadMask + ) + +--- + +instance (KnownSymbol path, HasHandler m spec ctx) => HasHandler m (path &/ spec) ctx where + type HandlerD (path &/ spec) m = HandlerD spec m + + route _ ctx h = DialogRequestPaths $ + Map.singleton (cs (symbolVal (Proxy @path))) (route (Proxy @spec) ctx h) + + hoistDialogWithContext _ = hoistDialogWithContext (Proxy @spec) + +--- + +instance + ( Serialise a + , Typeable a + , HasHandler m spec ctx + ) => + HasHandler m (ReqCBOR a &/ spec) ctx where + type HandlerD (ReqCBOR a &/ spec) m = a -> HandlerD spec m + + route _ ctx (ha :: a -> HandlerD spec (DialHandlerT m)) = + DialogRequestPreparse \fx -> do + (a, fx') + <- runExcept + $ flip runStateT fx + $ cutFrameDecode ((cs . show . typeRep) (Proxy @a)) + pure (route (Proxy @spec) ctx (ha a), fx') + + hoistDialogWithContext _ pc nt s = hoistDialogWithContext (Proxy @spec) pc nt . s + +--- + +instance + ( Applicative m + ) => + HasHandler m SingleAck ctx where + type HandlerD SingleAck m = m () + + route _ _ctx _mx = + DialogRequestEndpoint \reply -> do + reply (Frames [serialiseS (ResponseHeader (ResponseStatus SuccessNoContent204 "") 0)]) + + hoistDialogWithContext _ _ nt hdlM = nt hdlM + +--- + +instance + ( Monad m + , Serialise a + ) => + HasHandler m (SingleRespCBOR a) ctx where + type HandlerD (SingleRespCBOR a) m = m a + + route _ _ctx ma = + DialogRequestEndpoint \reply -> do + + ea <- runExceptT $ runDialHandlerT ma + + case ea of + Left e -> reply $ Frames [ serialiseS e ] + Right a -> reply $ Frames + [ serialiseS (ResponseHeader (ResponseStatus Success200 "") 0) + , serialiseS (a :: a) + ] + + hoistDialogWithContext _ _ nt hdlM = nt hdlM + +--- + +instance + ( Serialise a + ) => + HasHandler m (StreamingRespCBOR a) ctx where + type HandlerD (StreamingRespCBOR a) m = Stream (Of a) m () + + route = undefined + + -- hoistDialogWithContext = undefined + +--- + +type GServerConstraints spec m = + ( GToProduct (Rep (spec (ModeServerT m))) ~ HandlerD (GToProduct (Rep (spec ModePlain))) m + , GProduct (Rep (spec (ModeServerT m))) + ) + +class GServer (spec :: Type -> Type) (m :: Type -> Type) where + gServerProof :: Dict (GServerConstraints spec m) + +instance + ( GToProduct (Rep (spec (ModeServerT m))) ~ HandlerD (GToProduct (Rep (spec ModePlain))) m + , GProduct (Rep (spec (ModeServerT m))) + ) => GServer spec m where + gServerProof = Dict + + +instance + ( HasHandler m (GToProduct (Rep (spec ModePlain))) ctx + -- , HasHandler m (GToProduct (Rep (spec (ModeServerT m)))) ctx + -- , GProduct (Rep (spec ModePlain)) + , forall q . Generic (spec (ModeServerT q)) + , forall q . GServer spec q + ) => + HasHandler m (NamedSpec spec) ctx where + type HandlerD (NamedSpec spec) m = spec (ModeServerT m) + + route :: + Proxy (NamedSpec spec) + -> Ctx ctx + -> spec (ModeServerT (DialHandlerT m)) + -> DialogRequestRouter m + route _ ctx spec = + case gServerProof @spec @(DialHandlerT m) of + Dict -> route (Proxy @(GToProduct (Rep (spec ModePlain)))) ctx (toProduct spec) + + hoistDialogWithContext + :: forall n. Proxy (NamedSpec spec) + -> Proxy ctx + -> (forall x. m x -> n x) + -> spec (ModeServerT m) + -> spec (ModeServerT n) + hoistDialogWithContext _ pctx nat dl = + case (gServerProof @spec @m, gServerProof @spec @n) of + (Dict, Dict) -> + fromProduct dlN + where + dlM :: HandlerD (GToProduct (Rep (spec ModePlain))) m = + toProduct dl + dlN :: HandlerD (GToProduct (Rep (spec ModePlain))) n = + hoistDialogWithContext (Proxy @(GToProduct (Rep (spec ModePlain)))) pctx nat dlM + + +toProduct :: (Generic (spec mode), GProduct (Rep (spec mode))) + => spec mode -> GToProduct (Rep (spec mode)) +toProduct = gtoProduct . Generics.from + +fromProduct + :: (Generic (spec mode), GProduct (Rep (spec mode))) + => GToProduct (Rep (spec mode)) -> spec mode +fromProduct = Generics.to . gfromProduct + +instance + ( HasHandler m speca ctx + , HasHandler m specb ctx + ) => + HasHandler m (GP speca specb) ctx where + type HandlerD (GP speca specb) m = GP (HandlerD speca m) (HandlerD specb m) + route _ ctx (GP speca specb) = + route (Proxy @speca) ctx speca + <> route (Proxy @specb) ctx specb + + hoistDialogWithContext _ pc nt (GP speca specb) = + GP + (hoistDialogWithContext (Proxy @speca) pc nt speca) + (hoistDialogWithContext (Proxy @specb) pc nt specb) + +data GP a b = GP a b + +class GProduct f where + type GToProduct (f :: Type -> Type) + gtoProduct :: f p -> GToProduct f + gfromProduct :: GToProduct f -> f p + +instance (GProduct l, GProduct r) => GProduct (l :*: r) where + type GToProduct (l :*: r) = GP (GToProduct l) (GToProduct r) + gtoProduct (l :*: r) = GP (gtoProduct l) (gtoProduct r) + gfromProduct (GP l r) = gfromProduct l :*: gfromProduct r + +instance GProduct f => GProduct (M1 i c f) where + type GToProduct (M1 i c f) = GToProduct f + gtoProduct = gtoProduct . unM1 + gfromProduct = M1 . gfromProduct + +instance GProduct (K1 i c) where + type GToProduct (K1 i c) = c + gtoProduct = unK1 + gfromProduct = K1 diff --git a/hbs2-core/lib/Dialog/Helpers/List.hs b/hbs2-core/lib/HBS2/Net/Dialog/Helpers/List.hs similarity index 91% rename from hbs2-core/lib/Dialog/Helpers/List.hs rename to hbs2-core/lib/HBS2/Net/Dialog/Helpers/List.hs index b086b2e8..2460b993 100644 --- a/hbs2-core/lib/Dialog/Helpers/List.hs +++ b/hbs2-core/lib/HBS2/Net/Dialog/Helpers/List.hs @@ -1,4 +1,4 @@ -module Dialog.Helpers.List where +module HBS2.Net.Dialog.Helpers.List where import Control.Monad.Trans.Writer.CPS qualified as W import Data.Functor.Identity diff --git a/hbs2-core/lib/Dialog/Helpers/Streaming.hs b/hbs2-core/lib/HBS2/Net/Dialog/Helpers/Streaming.hs similarity index 97% rename from hbs2-core/lib/Dialog/Helpers/Streaming.hs rename to hbs2-core/lib/HBS2/Net/Dialog/Helpers/Streaming.hs index a38be2b6..412928a4 100644 --- a/hbs2-core/lib/Dialog/Helpers/Streaming.hs +++ b/hbs2-core/lib/HBS2/Net/Dialog/Helpers/Streaming.hs @@ -1,4 +1,4 @@ -module Dialog.Helpers.Streaming where +module HBS2.Net.Dialog.Helpers.Streaming where import Control.Monad.Fix import Data.ByteString qualified as BS diff --git a/hbs2-core/lib/HBS2/Net/Proto/Dialog.hs b/hbs2-core/lib/HBS2/Net/Proto/Dialog.hs index 76523679..925980bb 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Dialog.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Dialog.hs @@ -3,39 +3,24 @@ module HBS2.Net.Proto.Dialog ( module HBS2.Net.Proto.Dialog -, module Dialog.Core -, module Dialog.Client +, module HBS2.Net.Dialog.Core +, module HBS2.Net.Dialog.Client ) where -import HBS2.Actors.Peer -import HBS2.Clock -import HBS2.Data.Types -import HBS2.Net.Auth.Credentials -import HBS2.Net.Proto -import HBS2.Net.Proto.Peer -import HBS2.Net.Proto.Sessions -import HBS2.Prelude.Plated hiding (at) -import HBS2.System.Logger.Simple - import Codec.Serialise (deserialiseOrFail) import Control.Arrow import Control.Monad -import Control.Monad.Error.Class -import Control.Monad.IO.Unlift import Data.ByteString (ByteString) -import Data.ByteString qualified as BS -import Data.ByteString.Builder import Data.ByteString.Lazy qualified as BSL -import Data.List qualified as List -import Data.Map.Strict as Map +import Data.Generics.Product.Fields () +import Data.Kind import Lens.Micro.Platform -import Streaming as S -import Streaming.Prelude qualified as S -import UnliftIO.Exception -import UnliftIO.STM -import Dialog.Client -import Dialog.Core +import HBS2.Data.Types +import HBS2.Net.Dialog.Client +import HBS2.Net.Dialog.Core +import HBS2.Net.Proto +import HBS2.Prelude.Plated hiding (at) --- @@ -62,7 +47,7 @@ dialRespEncode = \case --- -data DialogProtoEnv m e = DialogProtoEnv +newtype DialogProtoEnv m e = DialogProtoEnv { dialogProtoEnvCallerEnv :: CallerEnv m } @@ -76,12 +61,12 @@ newDialogProtoEnv = do -- Adapters should share the same env -data DialReqProtoAdapter e (m :: * -> *) s = DialReqProtoAdapter - { dialReqProtoAdapterRouter :: DialogRequestRouter m - -- , dialReqProtoAdapterEnv :: DialogProtoEnv e +data DialReqProtoAdapter e (m :: Type -> Type) s = DialReqProtoAdapter + { dialReqProtoAdapterDApp :: DApp IO + , dialReqProtoAdapterNT :: Peer e -> forall a . m a -> IO a } -data DialRespProtoAdapter e (m :: * -> *) s = DialRespProtoAdapter +newtype DialRespProtoAdapter e (m :: Type -> Type) s = DialRespProtoAdapter { dialRespProtoAdapterEnv :: DialogProtoEnv m e } @@ -98,19 +83,22 @@ dialReqProto :: forall e s m . => DialReqProtoAdapter e m s -> DialReq e -> m () -dialReqProto DialReqProtoAdapter{..} = unDialReq >>> \frames -> do +dialReqProto adapter = unDialReq >>> \frames -> do peer <- thatPeer dialReqProtoProxy - let dialReqEnv :: DialogRequestEnv m (Peer e) (Maybe (PeerData e)) - dialReqEnv = DialogRequestEnv - { dreqEnvPeer = peer - , dreqEnvGetPeerData = pure Nothing -- undefined -- find (KnownPeerKey peer) id - } + -- let dialReqEnv :: DialogRequestEnv m (Peer e) (Maybe (PeerData e)) + -- dialReqEnv = DialogRequestEnv + -- { dreqEnvPeer = peer + -- , dreqEnvGetPeerData = pure Nothing -- undefined -- find (KnownPeerKey peer) id + -- } let replyToPeer :: Frames -> m () replyToPeer = request peer . DialResp @e - routeDialogRequest dialReqProtoAdapterRouter dialReqEnv replyToPeer frames + let replyToPeerIO :: Frames -> IO () + replyToPeerIO = dialReqProtoAdapterNT adapter peer <$> replyToPeer + + liftIO $ (dialReqProtoAdapterDApp adapter) frames replyToPeerIO where dialReqProtoProxy = Proxy @(DialReq e) @@ -127,7 +115,7 @@ dialRespProto :: forall e s m . -> DialResp e -> m () dialRespProto DialRespProtoAdapter{..} = unDialResp >>> unFrames >>> \case - "": xs -> do + "": _xs -> do -- Ответили прямо нам сюда. Нужно как-то отреагировать на xs pure () diff --git a/hbs2-core/lib/HBS2/Net/Proto/EncryptionHandshake.hs b/hbs2-core/lib/HBS2/Net/Proto/EncryptionHandshake.hs index 37964767..2532e2e2 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/EncryptionHandshake.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/EncryptionHandshake.hs @@ -15,6 +15,7 @@ import HBS2.System.Logger.Simple import Crypto.Saltine.Core.Box qualified as Encrypt import Data.ByteString qualified as BS +import Data.Hashable hiding (Hashed) import Data.String.Conversions (cs) import Lens.Micro.Platform @@ -68,6 +69,10 @@ sendBeginEncryptionExchange creds ourpubkey peer = do data EncryptionHandshakeAdapter e m s = EncryptionHandshakeAdapter { encHandshake_considerPeerAsymmKey :: Peer e -> Maybe Encrypt.PublicKey -> m () + + , encAsymmetricKeyPair :: AsymmKeypair (Encryption e) + + , encGetEncryptionKey :: EncryptionKeyIDKey e -> m (Maybe (CommonSecret (Encryption e))) } @@ -88,11 +93,10 @@ encryptionHandshakeProto :: forall e s m . , Show (Nonce ()) ) => EncryptionHandshakeAdapter e m s - -> PeerEnv e -> EncryptionHandshake e -> m () -encryptionHandshakeProto EncryptionHandshakeAdapter{..} penv = \case +encryptionHandshakeProto EncryptionHandshakeAdapter{..} = \case ResetEncryptionKeys -> do peer <- thatPeer proto @@ -104,7 +108,7 @@ encryptionHandshakeProto EncryptionHandshakeAdapter{..} penv = \case encHandshake_considerPeerAsymmKey peer Nothing creds <- getCredentials @s - let ourpubkey = pubKeyFromKeypair @s $ view envAsymmetricKeyPair penv + ourpubkey <- pure $ pubKeyFromKeypair @s $ encAsymmetricKeyPair sendBeginEncryptionExchange @e creds ourpubkey peer BeginEncryptionExchange theirsign theirpubkey -> do @@ -117,7 +121,7 @@ encryptionHandshakeProto EncryptionHandshakeAdapter{..} penv = \case -- взять свои ключи creds <- getCredentials @s - let ourpubkey = pubKeyFromKeypair @s $ view envAsymmetricKeyPair penv + ourpubkey <- pure $ pubKeyFromKeypair @s $ encAsymmetricKeyPair -- подписать нонс let sign = makeSign @s (view peerSignSk creds) ((cs . serialise) ourpubkey) diff --git a/hbs2-core/lib/HBS2/Net/Proto/Peer.hs b/hbs2-core/lib/HBS2/Net/Proto/Peer.hs index c9b6ac23..fd99f402 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Peer.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Peer.hs @@ -7,6 +7,7 @@ import HBS2.Actors.Peer import HBS2.Data.Types import HBS2.Events import HBS2.Net.Proto +import HBS2.Net.Proto.Types import HBS2.Clock import HBS2.Net.Proto.Sessions import HBS2.Prelude.Plated @@ -228,3 +229,36 @@ instance ( Serialise (PubKey 'Sign (Encryption e)) => Serialise (PeerHandshake e) + +--- + +data EncryptionKeyIDKey e = + EncryptionKeyIDKey + { ekeyIDPeerSignKey :: PubKey 'Sign (Encryption e) + , ekeyIDPeerNonce :: PeerNonce + } + deriving (Generic) + +deriving instance + ( Show (PubKey 'Sign (Encryption e)) + , Show (Nonce ()) + ) => Show (EncryptionKeyIDKey e) + +deriving instance + ( Eq (PubKey 'Sign (Encryption e)) + , Eq (Nonce ()) + ) => Eq (EncryptionKeyIDKey e) + +instance ( + Hashable (PubKey 'Sign (Encryption e)) + , Hashable (Nonce ()) + ) => Hashable (EncryptionKeyIDKey e) where + hashWithSalt s EncryptionKeyIDKey {..} = + hashWithSalt s (ekeyIDPeerSignKey, ekeyIDPeerNonce) + +encryptionKeyIDKeyFromPeerData :: PeerData e -> EncryptionKeyIDKey e +encryptionKeyIDKeyFromPeerData PeerData{..} = + EncryptionKeyIDKey + { ekeyIDPeerSignKey = _peerSignKey + , ekeyIDPeerNonce = _peerOwnNonce + } diff --git a/hbs2-core/lib/HBS2/Net/Proto/Types.hs b/hbs2-core/lib/HBS2/Net/Proto/Types.hs index f2680a08..7f95a11b 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Types.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Types.hs @@ -28,6 +28,11 @@ import Control.Monad.Trans.Maybe -- e -> Transport (like, UDP or TChan) -- p -> L4 Protocol (like Ping/Pong) +data CryptoAction = Sign | Encrypt + +type family PubKey ( a :: CryptoAction) e :: Type +type family PrivKey ( a :: CryptoAction) e :: Type + type family Encryption e :: Type -- FIXME: move-to-a-crypto-definition-modules @@ -206,4 +211,3 @@ instance FromStringMaybe (PeerAddr L4Proto) where instance Serialise L4Proto instance Serialise (PeerAddr L4Proto) - diff --git a/hbs2-core/test/DialogSpec.hs b/hbs2-core/test/DialogSpec.hs index 1a8e8913..59222f5e 100644 --- a/hbs2-core/test/DialogSpec.hs +++ b/hbs2-core/test/DialogSpec.hs @@ -14,8 +14,8 @@ import GHC.Generics (Generic) import Lens.Micro.Platform import System.IO -import Dialog.Core -import Dialog.Helpers.List +import HBS2.Net.Dialog.Core +import HBS2.Net.Dialog.Helpers.List newtype BSA = BSA { unBSA :: ByteString } deriving (Generic, Show) @@ -57,3 +57,7 @@ testDialog = testGroup "dialog" $ buildList do property' "roundtrip encode Frames" \ xs -> (decodeFrames . encodeFrames) xs == Right xs + property' "encodeFrames is quasidistributive over mappend" \ (xs, ys) -> + BS.drop (BS.length (encodeFrames xs)) (encodeFrames (xs <> ys)) + == encodeFrames ys + diff --git a/hbs2-peer/app/EncryptionKeys.hs b/hbs2-peer/app/EncryptionKeys.hs index dd524d5d..c06dd667 100644 --- a/hbs2-peer/app/EncryptionKeys.hs +++ b/hbs2-peer/app/EncryptionKeys.hs @@ -56,15 +56,14 @@ encryptionHandshakeWorker :: forall e m s . -- , HasCredentials s m ) => PeerConfig - -> PeerEnv e -> PeerCredentials s -> EncryptionHandshakeAdapter e m s -> m () -encryptionHandshakeWorker pconf penv creds EncryptionHandshakeAdapter{..} = do +encryptionHandshakeWorker pconf creds EncryptionHandshakeAdapter{..} = do -- e :: PeerEnv e <- ask - let ourpubkey = pubKeyFromKeypair @s $ _envAsymmetricKeyPair penv + ourpubkey <- pure $ pubKeyFromKeypair @s $ encAsymmetricKeyPair pl <- getPeerLocator @e @@ -75,9 +74,9 @@ encryptionHandshakeWorker pconf penv creds EncryptionHandshakeAdapter{..} = do forM_ peers \peer -> do -- Только если ещё не знаем ключ ноды - mpeerData <- find (KnownPeerKey peer) id - mkey <- liftIO do - join <$> forM mpeerData \peerData -> getEncryptionKey penv peerData + mencKeyID <- (fmap . fmap) encryptionKeyIDKeyFromPeerData $ + find (KnownPeerKey peer) id + mkey <- join <$> mapM encGetEncryptionKey mencKeyID case mkey of Just _ -> pure () Nothing -> sendBeginEncryptionExchange @e creds ourpubkey peer diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index ac23eba9..52a7e9f5 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -57,7 +57,8 @@ import RefLog (reflogWorker) import HttpWorker import ProxyMessaging import PeerMain.DialogCliCommand -import PeerMain.PeerDialog +import PeerMain.Dialog.Server +import PeerMain.Dialog.Spec import PeerMeta import CLI.RefChan import RefChan @@ -448,7 +449,7 @@ runPeer :: forall e s . ( e ~ L4Proto , FromStringMaybe (PeerAddr e) , s ~ Encryption e , HasStorage (PeerM e IO) - ) => PeerOpts -> IO () + )=> PeerOpts -> IO () runPeer opts = Exception.handle (\e -> myException e >> performGC @@ -574,32 +575,35 @@ runPeer opts = Exception.handle (\e -> myException e pure $ Just tcpEnv (proxy, penv) <- mdo - proxy <- newProxyMessaging mess tcp >>= \peer -> pure peer + proxy <- newProxyMessaging mess tcp >>= \proxy' -> pure proxy' { _proxy_getEncryptionKey = \peer -> do - mpeerData <- withPeerM penv $ find (KnownPeerKey peer) id - mkey <- join <$> forM mpeerData \peerData -> getEncryptionKey penv peerData + mencKeyID <- (fmap . fmap) encryptionKeyIDKeyFromPeerData $ + withPeerM penv $ find (KnownPeerKey peer) id + mkey <- join <$> forM mencKeyID \encKeyID -> + getEncryptionKey proxy encKeyID case mkey of Nothing -> trace1 $ "ENCRYPTION empty getEncryptionKey" - <+> pretty peer <+> viaShow mpeerData + <+> pretty peer <+> viaShow mencKeyID Just k -> trace1 $ "ENCRYPTION success getEncryptionKey" - <+> pretty peer <+> viaShow mpeerData <+> viaShow k + <+> pretty peer <+> viaShow mencKeyID <+> viaShow k pure mkey , _proxy_clearEncryptionKey = \peer -> do - mpeerData <- withPeerM penv $ find (KnownPeerKey peer) id - forM_ mpeerData \peerData -> setEncryptionKey penv peer peerData Nothing + mencKeyID <- (fmap . fmap) encryptionKeyIDKeyFromPeerData $ + withPeerM penv $ find (KnownPeerKey peer) id + forM_ mencKeyID \encKeyID -> setEncryptionKey proxy peer encKeyID Nothing -- deletePeerAsymmKey brains peer - forM_ mpeerData \peerData -> - deletePeerAsymmKey' brains (show peerData) + forM_ mencKeyID \encKeyID -> + deletePeerAsymmKey' brains (show encKeyID) , _proxy_sendResetEncryptionKeys = \peer -> withPeerM penv do sendResetEncryptionKeys peer , _proxy_sendBeginEncryptionExchange = \peer -> withPeerM penv do sendBeginEncryptionExchange pc - ((pubKeyFromKeypair @s . view envAsymmetricKeyPair) penv) + ((pubKeyFromKeypair @s . _proxy_asymmetricKeyPair) proxy) peer } @@ -690,32 +694,37 @@ runPeer opts = Exception.handle (\e -> myException e ) => EncryptionHandshakeAdapter L4Proto m s encryptionHshakeAdapter = EncryptionHandshakeAdapter { encHandshake_considerPeerAsymmKey = \peer mpubkey -> withPeerM penv do - mpeerData <- withPeerM penv $ find (KnownPeerKey peer) id + mencKeyID <- (fmap . fmap) encryptionKeyIDKeyFromPeerData $ + withPeerM penv $ find (KnownPeerKey peer) id case mpubkey of Nothing -> do - -- trace $ "ENCRYPTION delete key" <+> pretty peer <+> viaShow mpeerData + -- trace $ "ENCRYPTION delete key" <+> pretty peer <+> viaShow mencKeyID -- deletePeerAsymmKey brains peer - forM_ mpeerData \peerData -> - deletePeerAsymmKey' brains (show peerData) + forM_ mencKeyID \encKeyID -> + deletePeerAsymmKey' brains (show encKeyID) Just pk -> do -- emit PeerAsymmInfoKey (PeerAsymmPubKey peer pk) let symmk = genCommonSecret @s - (privKeyFromKeypair @s (view envAsymmetricKeyPair penv)) + (privKeyFromKeypair @s (_proxy_asymmetricKeyPair proxy)) pk - case mpeerData of + case mencKeyID of Nothing -> do -- insertPeerAsymmKey brains peer pk symmk -- insertPeerAsymmKey' brains (show peer) pk symmk - trace $ "ENCRYPTION can not store key. No peerData" - <+> pretty peer <+> viaShow mpeerData - Just peerData -> do - liftIO $ setEncryptionKey penv peer peerData (Just symmk) - insertPeerAsymmKey' brains (show peerData) pk symmk + trace $ "ENCRYPTION can not store key. No encKeyID" + <+> pretty peer <+> viaShow mencKeyID + Just encKeyID -> do + liftIO $ setEncryptionKey proxy peer encKeyID (Just symmk) + insertPeerAsymmKey' brains (show encKeyID) pk symmk + + , encAsymmetricKeyPair = _proxy_asymmetricKeyPair proxy + + , encGetEncryptionKey = liftIO . getEncryptionKey proxy } -- dialReqProtoAdapter <- do - -- dialReqProtoAdapterRouter <- pure dialogRoutes + -- dialReqProtoAdapterDApp <- pure dialogRoutes -- pure DialReqProtoAdapter {..} env <- ask @@ -727,11 +736,12 @@ runPeer opts = Exception.handle (\e -> myException e addPeers @e pl ps subscribe @e PeerExpiredEventKey \(PeerExpiredEvent peer {-mpeerData-}) -> liftIO do - mpeerData <- withPeerM penv $ find (KnownPeerKey peer) id - forM_ mpeerData \peerData -> setEncryptionKey penv peer peerData Nothing + mencKeyID <- (fmap . fmap) encryptionKeyIDKeyFromPeerData $ + withPeerM penv $ find (KnownPeerKey peer) id + forM_ mencKeyID \encKeyID -> setEncryptionKey proxy peer encKeyID Nothing -- deletePeerAsymmKey brains peer - forM_ mpeerData \peerData -> - deletePeerAsymmKey' brains (show peerData) + forM_ mencKeyID \encKeyID -> + deletePeerAsymmKey' brains (show encKeyID) subscribe @e PeerAnnounceEventKey $ \(PeerAnnounceEvent pip nonce) -> do unless (nonce == pnonce) $ do @@ -874,7 +884,7 @@ runPeer opts = Exception.handle (\e -> myException e peerThread "blockDownloadLoop" (blockDownloadLoop denv) peerThread "encryptionHandshakeWorker" - (EncryptionKeys.encryptionHandshakeWorker @e conf penv pc encryptionHshakeAdapter) + (EncryptionKeys.encryptionHandshakeWorker @e conf pc encryptionHshakeAdapter) let tcpProbeWait :: Timeout 'Seconds tcpProbeWait = (fromInteger . fromMaybe 300) (cfgValue @PeerTcpProbeWaitKey conf) @@ -994,7 +1004,7 @@ runPeer opts = Exception.handle (\e -> myException e , makeResponse (blockChunksProto adapter) , makeResponse blockAnnounceProto , makeResponse (withCredentials @e pc . peerHandShakeProto hshakeAdapter penv) - , makeResponse (withCredentials @e pc . encryptionHandshakeProto encryptionHshakeAdapter penv) + , makeResponse (withCredentials @e pc . encryptionHandshakeProto encryptionHshakeAdapter) , makeResponse (peerExchangeProto pexFilt) , makeResponse (refLogUpdateProto reflogAdapter) , makeResponse (refLogRequestProto reflogReqAdapter) @@ -1199,7 +1209,17 @@ runPeer opts = Exception.handle (\e -> myException e } dialReqProtoAdapter <- do - dialReqProtoAdapterRouter <- pure dialogRoutes + let denv = DialEnv + + let dialReqProtoAdapterDApp = drpcFullDApp denv penv + + -- dialReqProtoAdapterNT :: ResponseM L4Proto (RpcM (ResourceT IO)) a -> IO a + dialReqProtoAdapterNT :: Peer e -> forall a . ResponseM L4Proto (RpcM (ResourceT IO)) a -> IO a + dialReqProtoAdapterNT = \peer -> + runResourceT + . runRPC udp1 + . runResponseM peer + pure DialReqProtoAdapter {..} rpc <- async $ runRPC udp1 do diff --git a/hbs2-peer/app/PeerMain/Dialog/Server.hs b/hbs2-peer/app/PeerMain/Dialog/Server.hs new file mode 100644 index 00000000..f7c0bb5e --- /dev/null +++ b/hbs2-peer/app/PeerMain/Dialog/Server.hs @@ -0,0 +1,178 @@ +{-# LANGUAGE PolyKinds #-} +{-# Language AllowAmbiguousTypes #-} +{-# LANGUAGE StrictData #-} +{-# LANGUAGE UndecidableInstances #-} +module PeerMain.Dialog.Server where + +import Codec.Serialise +import Control.Monad.Except +import Control.Monad.IO.Class () +import Control.Monad.Reader +import Lens.Micro.Platform + +import HBS2.Actors.Peer +import HBS2.Data.Types.Refs +import HBS2.Hash +import HBS2.Net.Dialog.Core +import HBS2.Net.Proto.RefLog +import HBS2.Net.Proto.Types +import HBS2.Prelude +import HBS2.Storage.Simple + +import PeerMain.Dialog.Spec + +--- + +data DialEnv = DialEnv + +newtype DialT m a = DialT { unDialT :: PeerM L4Proto (ReaderT DialEnv (DialHandlerT m)) a } + deriving + ( Generic, Functor, Applicative, Monad + , MonadIO + , MonadReader (PeerEnv L4Proto) + -- , MonadTrans + -- , MonadError ResponseStatus + -- , MonadThrow, MonadCatch, MonadMask + ) + +-- instance Monad m => MonadReader DialEnv (DialT m) where +-- ask = DialT . lift $ ask +-- local f ma = undefined + +instance Monad m => HasStorage (DialT m) where + getStorage = asks (view envStorage) + +instance MonadTrans DialT where + lift = DialT . lift . lift . lift + +instance Monad m => + MonadError ResponseStatus (DialT m) where + -- {-# MINIMAL throwError, catchError #-} + -- throwError :: e -> m a + throwError = DialT . lift . throwError + -- catchError :: m a -> (e -> m a) -> m a + catchError = undefined + +--- + +runDialTtoDialHandlerT :: MonadIO m => DialEnv -> PeerEnv L4Proto -> DialT m a -> DialHandlerT m a +runDialTtoDialHandlerT denv penv = + flip runReaderT denv . withPeerM penv . unDialT + +--- + +dialogRoutes' :: forall m . + ( MonadIO m + , Serialise (PubKey 'Sign (Encryption L4Proto)) + , FromStringMaybe (PubKey 'Sign (Encryption L4Proto)) + , Hashable (PubKey 'Sign (Encryption L4Proto)) + ) + => PeerEnv L4Proto + -> DialogRequestRouter m +dialogRoutes' penv = dialogRequestRoutes do + + hand ["ping"] \req -> (, req) <$> Right \reply -> do + reply (Frames [serialiseS (ResponseHeader (ResponseStatus Success200 "") 0), "pong"]) + + hand ["spec"] \req -> (, req) <$> Right \reply -> do + undefined + -- let xs = Map.keys (unDialogRequestRouter (dialogRoutes @m penv)) + + -- forM_ (zip (zip [1..] xs) ((True <$ drop 1 xs) <> [False])) \((j,x),isMore) -> do + -- reply (Frames [serialiseS (ResponseHeader (ResponseStatus (bool Success200 SuccessMore isMore) "") j) + -- , BS.intercalate "/" x + -- ]) + + + hand ["debug", "no-response-header"] \req -> (, req) <$> Right \reply -> do + reply (Frames [serialiseS (ResponseHeader (ResponseStatus SuccessMore "") 0), "one"]) + reply (Frames [serialiseS (ResponseHeader (ResponseStatus SuccessMore "") 1), "two"]) + reply (Frames []) + + hand ["debug", "wrong-header"] \req -> (, req) <$> Right \reply -> do + reply (Frames [serialiseS (ResponseHeader (ResponseStatus SuccessMore "") 0), "correct-header"]) + reply (Frames ["wrong-header"]) + + hand ["debug", "timeout"] \req -> (, req) <$> Right \reply -> do + reply (Frames [serialiseS (ResponseHeader (ResponseStatus SuccessMore "") 0), "false more"]) + + + handconv ["reflog", "get"] "ReflogGetReq" \(ReflogGetReq ref) -> do + + sto <- withPeerM penv getStorage + + hash <- maybe (throwError (ResponseStatus NotFound404 "unknown reference")) pure + =<< liftIO do + getRef sto (RefLogKey @(Encryption L4Proto) ref) + + pure (ReflogGetResp hash) + +newtype ReflogGetReq = ReflogGetReq (PubKey 'Sign (Encryption L4Proto)) + deriving (Generic) + +instance Serialise (PubKey 'Sign (Encryption L4Proto)) + => Serialise ReflogGetReq + +newtype ReflogGetResp = ReflogGetResp (Hash HbSync) + deriving (Generic) + +instance Serialise (PubKey 'Sign (Encryption L4Proto)) + => Serialise ReflogGetResp + +--- + +drpcFullDApp :: forall m . + ( MonadIO m + , Unconstraints + ) + => DialEnv -> PeerEnv L4Proto -> DApp m +drpcFullDApp denv penv = + mkDApp + (Proxy @(NamedSpec DialogRPCSpec)) + EmptyCtx + (runDialTtoDialHandlerT denv penv) + -- (drpcFullH :: DialogRPCSpec (ModeServerT (DialT m))) + drpcFullH + +type ConstraintsH m = + ( MonadIO m + , MonadError ResponseStatus m + , HasStorage m + , Unconstraints + ) + +type Unconstraints = + ( Serialise (PubKey 'Sign (Encryption L4Proto)) + , Hashable (PubKey 'Sign (Encryption L4Proto)) + , Show (PubKey 'Sign (Encryption L4Proto)) + , Typeable (PubKey 'Sign (Encryption L4Proto)) + , FromStringMaybe (PubKey 'Sign (Encryption L4Proto)) + ) + +drpcFullH :: ( ConstraintsH m ) + => DialogRPCSpec (ModeServerT m) +drpcFullH = DialogRPCSpec + { drpcPing = pure "pong" + , drpcSpec = pure "tbd" + , drpcReflog = reflogH + } + +reflogH :: ( ConstraintsH m ) + => RPCReflogSpec (ModeServerT m) +reflogH = RPCReflogSpec {..} + where + + reflogGet ref = do + + sto <- getStorage + + hash <- maybe (throwError (ResponseStatus NotFound404 "unknown reference")) pure + =<< liftIO do + getRef sto (RefLogKey @(Encryption L4Proto) ref) + + pure hash + + reflogFetch pk = do + liftIO $ print pk + pure () + diff --git a/hbs2-peer/app/PeerMain/Dialog/Spec.hs b/hbs2-peer/app/PeerMain/Dialog/Spec.hs new file mode 100644 index 00000000..d9dbb898 --- /dev/null +++ b/hbs2-peer/app/PeerMain/Dialog/Spec.hs @@ -0,0 +1,35 @@ +{-# LANGUAGE PolyKinds #-} +{-# LANGUAGE TypeOperators #-} +{-# LANGUAGE StrictData #-} + +module PeerMain.Dialog.Spec where + +-- import Codec.Serialise +-- import Streaming +import Data.Text (Text) +import GHC.Generics (Generic) + +import HBS2.Hash +import HBS2.Net.Dialog.Core +import HBS2.Net.Proto.Types + + +data DialogRPCSpec r = DialogRPCSpec + { drpcPing :: r &- "ping" &/ SingleRespCBOR Text + , drpcSpec :: r &- "spec" &/ SingleRespCBOR Text + , drpcReflog :: r &- "reflog" &// RPCReflogSpec + } + deriving (Generic) + +data RPCReflogSpec r = RPCReflogSpec + { reflogGet :: r &- "get" + &/ ReqCBOR (PubKey 'Sign (Encryption L4Proto)) + &/ SingleRespCBOR (Hash HbSync) + + , reflogFetch :: r &- "fetch" + &/ ReqCBOR (PubKey 'Sign (Encryption L4Proto)) + &/ SingleAck + + } + deriving (Generic) + diff --git a/hbs2-peer/app/PeerMain/DialogCliCommand.hs b/hbs2-peer/app/PeerMain/DialogCliCommand.hs index ff6f27fe..18fd279a 100644 --- a/hbs2-peer/app/PeerMain/DialogCliCommand.hs +++ b/hbs2-peer/app/PeerMain/DialogCliCommand.hs @@ -2,109 +2,66 @@ module PeerMain.DialogCliCommand where --- import Data.Generics.Labels --- import Data.Generics.Product.Fields +import Data.Generics.Labels +import Data.Generics.Product.Fields import HBS2.Actors.Peer -import HBS2.Base58 -import HBS2.Clock -import HBS2.Net.Proto.RefLog (RefLogKey(..)) -import HBS2.Defaults -import HBS2.Events import HBS2.Hash -import HBS2.Merkle -import HBS2.Net.Auth.Credentials import HBS2.Net.IP.Addr -import HBS2.Net.Messaging -import HBS2.Net.Messaging.TCP import HBS2.Net.Messaging.UDP -import HBS2.Net.PeerLocator import HBS2.Net.Proto -import HBS2.Net.Proto.Definition import HBS2.Net.Proto.Dialog -import HBS2.Net.Proto.Peer -import HBS2.Net.Proto.PeerAnnounce -import HBS2.Net.Proto.PeerExchange -import HBS2.Net.Proto.PeerMeta -import HBS2.Net.Proto.RefLog -import HBS2.Net.Proto.Sessions -import HBS2.Net.Proto.Types import HBS2.OrDie import HBS2.Prelude -import HBS2.Prelude.Plated -import HBS2.Storage.Simple import HBS2.System.Logger.Simple hiding (info) -import HBS2.System.Logger.Simple qualified as Log -import BlockDownload -import BlockHttpDownload -import Bootstrap -import Brains -import CheckMetrics -import DownloadQ -import HttpWorker import PeerConfig -import PeerInfo -import PeerMeta -import PeerTypes -import ProxyMessaging -import RefLog (reflogWorker) -import RefLog qualified -import RPC +import RPC (PeerRpcKey, RpcM, runRPC) -import Control.Monad -import Control.Monad.IO.Unlift -import Control.Monad.Reader +import Control.Monad.Except (Except(..), ExceptT(..), runExcept, runExceptT) +import Control.Monad.State.Strict (evalStateT) import Control.Monad.Trans.Cont -import Control.Monad.Trans.Maybe -import Crypto.Saltine.Core.Box qualified as Encrypt -import Data.ByteString qualified as BS -import Data.ByteString.Char8 qualified as BS8 -import Data.ByteString.Lazy (ByteString) -import Data.ByteString.Lazy qualified as LBS import Data.Default import Data.Function import Data.Functor +import Data.Kind import Data.List qualified as List -import Data.Map.Strict qualified as Map -import Data.Maybe -import Data.Monoid qualified as Monoid -import Data.Set qualified as Set import Data.String.Conversions as X (cs) import Data.Void (absurd, Void) import Lens.Micro.Platform import Network.Socket import Options.Applicative -import Streaming as S import Streaming.Prelude qualified as S -import System.Directory +import System.IO import UnliftIO.Async import UnliftIO.Concurrent import UnliftIO.Exception as U import UnliftIO.Resource --- import System.FilePath.Posix -import System.IO -import System.Exit - pDialog :: Parser (IO ()) pDialog = hsubparser $ mempty <> command "ping" (info pPing (progDesc "ping hbs2 node via dialog inteface") ) <> command "debug" (info pDebug (progDesc "debug call different dialog inteface routes") ) + <> command "reflog" (info pReflog (progDesc "reflog commands") ) + +pReflog :: Parser (IO ()) +pReflog = hsubparser $ mempty + <> command "get" (info pRefLogGet (progDesc "get own reflog from all" )) + <> command "fetch" (info pRefLogFetch (progDesc "fetch reflog from all" )) confOpt :: Parser FilePath confOpt = strOption ( long "config" <> short 'c' <> help "config" ) -data OptInitial (f :: * -> *) a b = OptInitial { unOptInitial :: f a } +newtype OptInitial (f :: Type -> Type) a b = OptInitial { unOptInitial :: f a } deriving (Generic, Show) -data OptResolved (f :: * -> *) a b = OptResolved { unOptResolved :: b } +newtype OptResolved (f :: Type -> Type) a b = OptResolved { unOptResolved :: b } deriving (Generic, Show) type DialOptInitial = DialOpt OptInitial type DialOptResolved = DialOpt OptResolved -data DialOpt (f :: (* -> *) -> * -> * -> *) = DialOpt +data DialOpt (f :: (Type -> Type) -> Type -> Type -> Type) = DialOpt { dialOptConf :: f Maybe FilePath PeerConfig , dialOptAddr :: f Maybe String (Peer L4Proto) } @@ -133,7 +90,7 @@ resolveDialOpt dopt = do `orDieM` "Dial endpoint not set" as <- parseAddrUDP (cs saddr) <&> fmap (fromSockAddr @'UDP . addrAddress) - peer <- (headMay $ List.sortBy (compare `on` addrPriority) as) + peer <- headMay (List.sortBy (compare `on` addrPriority) as) `orDieM` "Can't parse Dial endpoint" pure DialOpt @@ -149,6 +106,35 @@ pPing = do liftIO . print =<< do dQuery1 def cli peer (dpath "ping" []) +reflogKeyP :: ReadM (PubKey 'Sign (Encryption L4Proto)) +reflogKeyP = eitherReader $ + maybe (Left "invalid REFLOG-KEY") pure . fromStringMay + +pRefLogGet :: Parser (IO ()) +pRefLogGet = do + dopt <- pDialCommon + rkey <- argument reflogKeyP ( metavar "REFLOG-KEY" ) + pure do + withDial dopt \peer dclient -> + withClient dclient \cli -> do + xs <- dQuery1 def cli peer (dpath "reflog/get" [serialiseS rkey]) + + hash <- either (lift . lift . fail) pure $ runExcept $ flip evalStateT xs do + cutFrameDecode @(Hash HbSync) do + "No hash in response: " <> show xs + + liftIO . print $ pretty hash + +pRefLogFetch :: Parser (IO ()) +pRefLogFetch = do + dopt <- pDialCommon + rkey <- argument reflogKeyP ( metavar "REFLOG-KEY" ) + pure do + withDial dopt \peer dclient -> + withClient dclient \cli -> do + xs <- dQuery1 def cli peer (dpath "reflog/fetch" [serialiseS rkey]) + + liftIO . print $ "Response: " <> show xs pDebug :: Parser (IO ()) pDebug = do diff --git a/hbs2-peer/app/PeerMain/PeerDialog.hs b/hbs2-peer/app/PeerMain/PeerDialog.hs deleted file mode 100644 index 5da04012..00000000 --- a/hbs2-peer/app/PeerMain/PeerDialog.hs +++ /dev/null @@ -1,39 +0,0 @@ -module PeerMain.PeerDialog where - -import Control.Monad -import Control.Monad.IO.Class -import Data.Bool -import Data.ByteString qualified as BS -import Data.Map qualified as Map - -import Dialog.Core -import HBS2.Net.Proto.Types - - -dialogRoutes :: forall m . MonadIO m => DialogRequestRouter m -dialogRoutes = dialogRequestRoutes do - - hand ["ping"] \req -> Right \reply -> do - reply (Frames [serialiseS (ResponseHeader (ResponseStatus Success200 "") 0), "pong"]) - - hand ["spec"] \req -> Right \reply -> do - let xs = Map.keys (unDialogRequestRouter (dialogRoutes @m)) - - forM_ (zip (zip [1..] xs) ((True <$ drop 1 xs) <> [False])) \((j,x),isMore) -> do - reply (Frames [serialiseS (ResponseHeader (ResponseStatus (bool Success200 SuccessMore isMore) "") j) - , BS.intercalate "/" x - ]) - - - hand ["debug", "no-response-header"] \req -> Right \reply -> do - reply (Frames [serialiseS (ResponseHeader (ResponseStatus SuccessMore "") 0), "one"]) - reply (Frames [serialiseS (ResponseHeader (ResponseStatus SuccessMore "") 1), "two"]) - reply (Frames []) - - hand ["debug", "wrong-header"] \req -> Right \reply -> do - reply (Frames [serialiseS (ResponseHeader (ResponseStatus SuccessMore "") 0), "correct-header"]) - reply (Frames ["wrong-header"]) - - hand ["debug", "timeout"] \req -> Right \reply -> do - reply (Frames [serialiseS (ResponseHeader (ResponseStatus SuccessMore "") 0), "false more"]) - diff --git a/hbs2-peer/app/PeerTypes.hs b/hbs2-peer/app/PeerTypes.hs index 8acc4bdc..f6f6237c 100644 --- a/hbs2-peer/app/PeerTypes.hs +++ b/hbs2-peer/app/PeerTypes.hs @@ -443,15 +443,9 @@ mkPeerMeta conf penv = do . fromStringMay @(PeerAddr L4Proto) ) =<< cfgValue @PeerListenTCPKey conf - -- let useEncryption = True -- move to config annMetaFromPeerMeta . PeerMeta $ W.execWriter do mHttpPort `forM` \p -> elem "http-port" (TE.encodeUtf8 . Text.pack . show $ p) mTcpPort `forM` \p -> elem "listen-tcp" (TE.encodeUtf8 . Text.pack . show $ p) - -- when useEncryption do - -- elem "ekey" (TE.encodeUtf8 . Text.pack . show $ - -- (Encrypt.publicKey . _envAsymmetricKeyPair) penv - -- -- mayby sign this pubkey by node key ? - -- ) where elem k = W.tell . L.singleton . (k ,) diff --git a/hbs2-peer/app/ProxyMessaging.hs b/hbs2-peer/app/ProxyMessaging.hs index e2cf8d52..69076054 100644 --- a/hbs2-peer/app/ProxyMessaging.hs +++ b/hbs2-peer/app/ProxyMessaging.hs @@ -4,6 +4,9 @@ module ProxyMessaging , newProxyMessaging , runProxyMessaging , sendToPlainProxyMessaging + , getEncryptionKey + , setEncryptionKey + , encryptionKeyIDKeyFromPeerData ) where import HBS2.Prelude.Plated @@ -34,6 +37,7 @@ import Control.Monad.Trans.Maybe import Data.ByteString (ByteString) import Data.ByteString qualified as BS import Data.ByteString.Lazy qualified as LBS +import Data.Hashable hiding (Hashed) import Data.Maybe import Data.String.Conversions (cs) import Data.List qualified as L @@ -41,6 +45,10 @@ import Data.Map (Map) import Data.Map qualified as Map import Lens.Micro.Platform as Lens import Control.Monad +import Data.HashMap.Strict (HashMap) +import Data.HashMap.Strict qualified as HashMap + +import HBS2.Data.Types.Peer -- TODO: protocol-encryption-goes-here @@ -54,6 +62,9 @@ data ProxyMessaging = , _proxy_clearEncryptionKey :: Peer L4Proto -> IO () , _proxy_sendResetEncryptionKeys :: Peer L4Proto -> IO () , _proxy_sendBeginEncryptionExchange :: Peer L4Proto -> IO () + + , _proxy_asymmetricKeyPair :: AsymmKeypair (Encryption L4Proto) + , _proxy_encryptionKeys :: TVar (HashMap (EncryptionKeyIDKey L4Proto) (CommonSecret (Encryption L4Proto))) } -- 1 нода X создаёт себе Encrypt.Keypair @@ -78,8 +89,36 @@ newProxyMessaging u t = liftIO do let _proxy_sendResetEncryptionKeys = const (pure ()) let _proxy_sendBeginEncryptionExchange = const (pure ()) + _proxy_asymmetricKeyPair <- asymmNewKeypair @(Encryption L4Proto) + _proxy_encryptionKeys <- liftIO (newTVarIO mempty) + pure ProxyMessaging {..} +--- + +setEncryptionKey :: + ( Hashable (PubKey 'Sign (Encryption L4Proto)) + , Hashable PeerNonce + , Show (PubKey 'Sign (Encryption L4Proto)) + , Show PeerNonce + , Show (CommonSecret (Encryption L4Proto)) + , Show (EncryptionKeyIDKey L4Proto) + ) => ProxyMessaging -> Peer L4Proto -> EncryptionKeyIDKey L4Proto -> Maybe (CommonSecret (Encryption L4Proto)) -> IO () +setEncryptionKey proxy peer pd msecret = do + atomically $ modifyTVar' (_proxy_encryptionKeys proxy) $ Lens.at pd .~ msecret + case msecret of + Nothing -> trace $ "ENCRYPTION delete key" <+> pretty peer <+> viaShow pd + Just k -> trace $ "ENCRYPTION store key" <+> pretty peer <+> viaShow pd <+> viaShow k + +getEncryptionKey :: + ( Hashable (PubKey 'Sign (Encryption L4Proto)) + , Hashable PeerNonce + ) => ProxyMessaging -> EncryptionKeyIDKey L4Proto -> IO (Maybe (CommonSecret (Encryption L4Proto))) +getEncryptionKey proxy pd = + readTVarIO (_proxy_encryptionKeys proxy) <&> preview (Lens.ix pd) + +--- + runProxyMessaging :: forall m . MonadIO m => ProxyMessaging -> m () @@ -103,7 +142,6 @@ runProxyMessaging env = liftIO do liftIO $ mapM_ waitCatch [u,t] - instance Messaging ProxyMessaging L4Proto LBS.ByteString where sendTo = sendToProxyMessaging diff --git a/hbs2-peer/hbs2-peer.cabal b/hbs2-peer/hbs2-peer.cabal index c0cf535d..63a2b925 100644 --- a/hbs2-peer/hbs2-peer.cabal +++ b/hbs2-peer/hbs2-peer.cabal @@ -137,7 +137,8 @@ executable hbs2-peer , Bootstrap , PeerInfo , PeerMain.DialogCliCommand - , PeerMain.PeerDialog + , PeerMain.Dialog.Server + , PeerMain.Dialog.Spec , PeerMeta , RPC , PeerTypes