diff --git a/.fixme/config b/.fixme/config index 0f3328f0..fc21f46a 100644 --- a/.fixme/config +++ b/.fixme/config @@ -9,6 +9,8 @@ fixme-prefix PR: pr fixme-files **/*.hs docs/devlog.md +fixme-files **/*.cabal + fixme-files docs/pep*.txt fixme-files docs/drafts/**/*.txt fixme-files docs/notes/**/*.txt diff --git a/hbs2-core/hbs2-core.cabal b/hbs2-core/hbs2-core.cabal index af08fc2b..f2614b19 100644 --- a/hbs2-core/hbs2-core.cabal +++ b/hbs2-core/hbs2-core.cabal @@ -99,6 +99,7 @@ library , HBS2.Net.Messaging.UDP , HBS2.Net.Messaging.TCP , HBS2.Net.Messaging.Unix + , HBS2.Net.Messaging.Stream , HBS2.Net.PeerLocator , HBS2.Net.PeerLocator.Static , HBS2.Net.Proto diff --git a/hbs2-core/lib/HBS2/Net/Messaging/Stream.hs b/hbs2-core/lib/HBS2/Net/Messaging/Stream.hs new file mode 100644 index 00000000..1c12537c --- /dev/null +++ b/hbs2-core/lib/HBS2/Net/Messaging/Stream.hs @@ -0,0 +1,63 @@ +module HBS2.Net.Messaging.Stream where + +import HBS2.Prelude.Plated + +import Data.Function +import Control.Exception (try,Exception,SomeException,throwIO) +import Data.ByteString.Lazy (ByteString) +import Data.ByteString.Lazy qualified as LBS +import Data.Typeable +import Network.Socket hiding (listen,connect) +import Streaming.Prelude qualified as S +import Data.ByteString qualified as BS +import Network.Simple.TCP + +data SocketClosedException = + SocketClosedException + deriving stock (Show, Typeable) + +instance Exception SocketClosedException + + +-- FIXME: why-streaming-then? +-- Ну и зачем тут вообще стриминг, +-- если чтение всё равно руками написал? +-- Если fromChunks - O(n), и reverse O(n) +-- то мы все равно пройдем все чанки, на +-- кой чёрт тогда вообще стриминг? бред +-- какой-то. +readFromSocket :: forall m . MonadIO m + => Socket + -> Int + -> m ByteString + +readFromSocket sock size = LBS.fromChunks <$> (go size & S.toList_) + where + go 0 = pure () + go n = do + r <- liftIO $ recv sock n + maybe1 r eos $ \bs -> do + let nread = BS.length bs + S.yield bs + go (max 0 (n - nread)) + + eos = do + liftIO $ throwIO SocketClosedException + +readFromSocket1 :: forall m . MonadIO m + => Socket + -> Int + -> m ByteString + +readFromSocket1 sock size = LBS.fromChunks <$> (go size & S.toList_) + where + go 0 = pure () + go n = do + r <- liftIO $ recv sock (min 65536 n) + maybe1 r eos $ \bs -> do + let nread = BS.length bs + S.yield bs + go (max 0 (n - nread)) + + eos = do + liftIO $ throwIO SocketClosedException diff --git a/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs b/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs index d8985af1..2af86b9a 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs @@ -15,6 +15,8 @@ import HBS2.Net.Messaging import HBS2.Net.Proto.Types import HBS2.Prelude.Plated +import HBS2.Net.Messaging.Stream + import HBS2.System.Logger.Simple import Control.Concurrent.STM (flushTQueue) @@ -44,11 +46,6 @@ import UnliftIO.Async import UnliftIO.STM import UnliftIO.Exception qualified as U -data SocketClosedException = - SocketClosedException - deriving stock (Show, Typeable) - -instance Exception SocketClosedException -- FIXME: control-recv-capacity-to-avoid-leaks @@ -128,30 +125,6 @@ instance Messaging MessagingTCP L4Proto ByteString where forM ms $ \(p, msg) -> pure (From p, msg) --- FIXME: why-streaming-then? --- Ну и зачем тут вообще стриминг, --- если чтение всё равно руками написал? --- Если fromChunks - O(n), и reverse O(n) --- то мы все равно пройдем все чанки, на --- кой чёрт тогда вообще стриминг? бред --- какой-то. -readFromSocket :: forall m . MonadIO m - => Socket - -> Int - -> m ByteString - -readFromSocket sock size = LBS.fromChunks <$> (go size & S.toList_) - where - go 0 = pure () - go n = do - r <- liftIO $ recv sock n - maybe1 r eos $ \bs -> do - let nread = BS.length bs - S.yield bs - go (max 0 (n - nread)) - - eos = do - liftIO $ throwIO SocketClosedException connectionId :: Word32 -> Word32 -> Word64 connectionId a b = (fromIntegral hi `shiftL` 32) .|. fromIntegral low diff --git a/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs b/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs index ab72d1ef..88ed7499 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs @@ -10,6 +10,7 @@ import HBS2.Prelude.Plated import HBS2.Net.Proto.Types import HBS2.Actors.Peer.Types import HBS2.Net.Messaging +import HBS2.Net.Messaging.Stream import HBS2.Clock import HBS2.System.Logger.Simple @@ -17,6 +18,7 @@ import HBS2.System.Logger.Simple import Control.Monad.Trans.Resource import Control.Monad import Control.Monad.Reader +import Data.ByteString qualified as BS import Data.ByteString.Lazy (ByteString) import Data.ByteString.Lazy qualified as LBS import Data.Function @@ -27,12 +29,15 @@ import Data.HashMap.Strict (HashMap) import Network.ByteOrder hiding (ByteString) import Network.Socket import Network.Socket.ByteString +import Network.Socket.ByteString.Lazy qualified as SL import Control.Concurrent.STM.TQueue (flushTQueue) import Data.Set (Set) import Data.Set qualified as Set import Lens.Micro.Platform import UnliftIO +import Streaming.Prelude qualified as S + import Control.Concurrent (myThreadId) data UNIX = UNIX @@ -109,6 +114,7 @@ data ReadTimeoutException = ReadTimeoutException deriving (Show, Typeable) instance Exception ReadTimeoutException + runMessagingUnix :: MonadUnliftIO m => MessagingUnix -> m () runMessagingUnix env = do @@ -179,9 +185,16 @@ runMessagingUnix env = do maybe1 mq none $ \q -> do msg <- liftIO . atomically $ readTQueue q + + let len = fromIntegral $ LBS.length msg :: Int + let bs = bytestring32 (fromIntegral len) + liftIO $ sendAll so $ bytestring32 (fromIntegral len) - liftIO $ sendAll so $ LBS.toStrict msg + + -- debug $ "sendAll" <+> pretty len <+> pretty (LBS.length msg) <+> viaShow bs + + liftIO $ SL.sendAll so msg void $ allocate (pure writer) cancel @@ -192,13 +205,15 @@ runMessagingUnix env = do let mq = Just (msgUnixRecv env) - frameLen <- liftIO $ recv so 4 <&> word32 <&> fromIntegral - frame <- liftIO $ recv so frameLen + -- frameLen <- liftIO $ recv so 4 <&> word32 <&> fromIntegral + frameLen <- liftIO $ readFromSocket so 4 <&> LBS.toStrict <&> word32 <&> fromIntegral - let s = if msgUnixServer env then "S-" else "C-" + -- debug $ "frameLen" <+> pretty frameLen + + frame <- liftIO $ readFromSocket so frameLen -- <&> LBS.toStrict maybe1 mq none $ \q -> do - atomically $ writeTQueue q (From that, LBS.fromStrict frame) + atomically $ writeTQueue q (From that, frame) now <- getTimeCoarse atomically $ writeTVar (msgUnixLast env) now @@ -241,12 +256,12 @@ runMessagingUnix env = do let q = msgUnixRecv env -- Read response from server - frameLen <- liftIO $ recv sock 4 <&> word32 <&> fromIntegral - frame <- liftIO $ recv sock frameLen + frameLen <- liftIO $ readFromSocket sock 4 <&> LBS.toStrict <&> word32 <&> fromIntegral + frame <- liftIO $ readFromSocket sock frameLen -- сообщения кому? **МНЕ** -- сообщения от кого? от **КОГО-ТО** - atomically $ writeTQueue q (From who, LBS.fromStrict frame) + atomically $ writeTQueue q (From who, frame) forever do @@ -259,7 +274,7 @@ runMessagingUnix env = do msg <- liftIO . atomically $ readTQueue q let len = fromIntegral $ LBS.length msg :: Int liftIO $ sendAll sock $ bytestring32 (fromIntegral len) - liftIO $ sendAll sock $ LBS.toStrict msg + liftIO $ SL.sendAll sock msg void $ waitAnyCatchCancel [reader] @@ -283,7 +298,7 @@ runMessagingUnix env = do dropQueues :: MonadIO m => m () dropQueues = do - -- liftIO $ atomically $ modifyTVar (msgUnixRecvFrom env) mempty + void $ liftIO $ atomically $ flushTQueue (msgUnixRecv env) liftIO $ atomically $ modifyTVar (msgUnixSendTo env) mempty -- мы не дропаем обратную очередь (принятые сообщения), потому, -- что нет смысла. она живёт столько, сколько живёт клиент diff --git a/hbs2-core/lib/HBS2/Net/Proto/Service.hs b/hbs2-core/lib/HBS2/Net/Proto/Service.hs index 1fecd973..0da8c1ef 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Service.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Service.hs @@ -28,9 +28,15 @@ import Data.Word import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict qualified as HashMap + +type family Input a :: Type +type family Output a :: Type + +-- FIXME: wrap-those-instances +type instance Input () = () +type instance Output () = () + class (Monad m, Serialise (Output a), Serialise (Input a)) => HandleMethod m a where - type family Input a :: Type - type family Output a :: Type handleMethod :: Input a -> m (Output a) type family AllHandlers m (xs :: [Type]) :: Constraint where @@ -52,8 +58,6 @@ instance (Monad m, EnumAll xs (Int, SomeHandler m) m, HandleMethod m x) => EnumA shift = map (\(i, h) -> (i + 1, h)) instance Monad m => HandleMethod m () where - type Input () = () - type Output () = () handleMethod _ = pure () data ServiceError = @@ -176,6 +180,38 @@ runServiceClient caller = do wait proto +data Endpoint e m = forall (api :: [Type]) . ( HasProtocol e (ServiceProto api e) + , HasTimeLimits e (ServiceProto api e) m + , PeerMessaging e + , Pretty (Peer e) + ) + => Endpoint (ServiceCaller api e) + +runServiceClientMulti :: forall e m . ( MonadIO m + , MonadUnliftIO m + -- FIXME: remove-this-debug-shit + , Show (Peer e) + , Pretty (Peer e) + , PeerMessaging e + , HasOwnPeer e m + , HasFabriq e m + ) + => [ Endpoint e m ] + -> m () + +runServiceClientMulti endpoints = do + proto <- async $ runProto @e [ makeResponse @e (makeClient x) | (Endpoint x) <- endpoints ] + link proto + + waiters <- forM endpoints $ \(Endpoint caller) -> async $ forever do + req <- getRequest caller + request @e (callPeer caller) req + + mapM_ link waiters + + void $ UIO.waitAnyCatchCancel $ proto : waiters + + notifyServiceCaller :: forall api e m . MonadIO m => ServiceCaller api e -> ServiceProto api e @@ -235,7 +271,6 @@ makeClient :: forall api e m . ( MonadIO m makeClient = notifyServiceCaller - instance (HasProtocol e (ServiceProto api e)) => HasTimeLimits e (ServiceProto api e) IO where tryLockForPeriod _ _ = pure True diff --git a/hbs2-core/lib/HBS2/Storage/Operations/ByteString.hs b/hbs2-core/lib/HBS2/Storage/Operations/ByteString.hs index a00319e0..cac73f1d 100644 --- a/hbs2-core/lib/HBS2/Storage/Operations/ByteString.hs +++ b/hbs2-core/lib/HBS2/Storage/Operations/ByteString.hs @@ -11,10 +11,11 @@ import HBS2.Hash import HBS2.Storage import HBS2.Merkle import HBS2.Data.Types.Refs -import HBS2.Defaults import HBS2.Storage.Operations.Class +import HBS2.Defaults + import Streaming.Prelude qualified as S import Streaming qualified as S import Data.Function @@ -30,16 +31,18 @@ instance (MonadIO m, h ~ HbSync, Storage s h ByteString m) => MerkleWriter ByteS writeAsMerkle sto bs = do hashes <- S.each (LBS.unpack bs) - & S.chunksOf (fromIntegral defBlockSize) + & S.chunksOf (fromIntegral defBlockSize ) & S.mapped (fmap (first LBS.pack) . S.toList) & S.mapM (\blk -> enqueueBlock sto blk >> pure blk) + -- & S.mapM (\blk -> putBlock sto blk >> pure blk) & S.map (HashRef . hashObject) & S.toList_ -- FIXME: handle-hardcode let pt = toPTree (MaxSize 256) (MaxNum 256) hashes -- FIXME: settings - makeMerkle 0 pt $ \(_,_,bss) -> void $ putBlock sto bss + makeMerkle 0 pt $ \(_,_,bss) -> do + void $ putBlock sto bss instance ( MonadIO m , MonadError OperationError m diff --git a/hbs2-git/git-hbs2/GitRemoteMain.hs b/hbs2-git/git-hbs2/GitRemoteMain.hs index 418df693..9d5f66f7 100644 --- a/hbs2-git/git-hbs2/GitRemoteMain.hs +++ b/hbs2-git/git-hbs2/GitRemoteMain.hs @@ -8,7 +8,6 @@ import HBS2.Git.Types import HBS2.System.Logger.Simple -import HBS2Git.Types(traceTime) import HBS2Git.App import HBS2Git.State import HBS2Git.Import @@ -17,9 +16,11 @@ import HBS2.Git.Local.CLI import HBS2Git.Export (runExport) +import HBS2Git.Config as Config import GitRemoteTypes import GitRemotePush + import Control.Concurrent.STM import Control.Monad.Reader import Data.Attoparsec.Text hiding (try) @@ -79,6 +80,8 @@ loop :: forall m . ( MonadIO m , MonadUnliftIO m , MonadMask m , HasProgress (RunWithConfig (GitRemoteApp m)) + , HasStorage (RunWithConfig (GitRemoteApp m)) + , HasRPC (RunWithConfig (GitRemoteApp m)) ) => [String] -> GitRemoteApp m () loop args = do @@ -106,7 +109,7 @@ loop args = do warn $ "reference" <+> pretty ref <+> "missing" warn "trying to init reference --- may be it's ours" liftIO $ runApp WithLog (runExport Nothing ref) - + importRefLogNew True ref refsNew <- withDB db stateGetActualRefs let possibleHead = listToMaybe $ List.take 1 $ List.sortOn guessHead (fmap fst refsNew) @@ -219,14 +222,14 @@ main = do evolve - env <- RemoteEnv <$> detectHBS2PeerCatAPI - <*> detectHBS2PeerSizeAPI - <*> detectHBS2PeerPutAPI - <*> detectHBS2PeerRefLogGetAPI - <*> liftIO (newTVarIO mempty) + (_, syn) <- Config.configInit - runRemoteM env do - loop args + runWithRPC $ \rpc -> do + env <- RemoteEnv <$> liftIO (newTVarIO mempty) + <*> pure rpc + + runRemoteM env do + loop args shutUp diff --git a/hbs2-git/git-hbs2/GitRemotePush.hs b/hbs2-git/git-hbs2/GitRemotePush.hs index 917a1b75..0a75e64a 100644 --- a/hbs2-git/git-hbs2/GitRemotePush.hs +++ b/hbs2-git/git-hbs2/GitRemotePush.hs @@ -44,15 +44,15 @@ newtype RunWithConfig m a = runWithConfig :: MonadIO m => [Syntax C] -> RunWithConfig m a -> m a runWithConfig conf m = runReaderT (fromWithConf m) conf +instance (Monad m, HasStorage m) => HasStorage (RunWithConfig m) where + getStorage = lift getStorage + +instance (Monad m, HasRPC m) => HasRPC (RunWithConfig m) where + getRPC = lift getRPC + instance MonadIO m => HasConf (RunWithConfig (GitRemoteApp m)) where getConf = ask -instance MonadIO m => HasCatAPI (RunWithConfig (GitRemoteApp m)) where - getHttpCatAPI = lift getHttpCatAPI - getHttpSizeAPI = lift getHttpSizeAPI - getHttpPutAPI = lift getHttpPutAPI - getHttpRefLogGetAPI = lift getHttpRefLogGetAPI - instance MonadIO m => HasRefCredentials (RunWithConfig (GitRemoteApp m)) where getCredentials = lift . getCredentials setCredentials r c = lift $ setCredentials r c @@ -61,6 +61,7 @@ push :: forall m . ( MonadIO m , MonadCatch m , HasProgress (RunWithConfig (GitRemoteApp m)) , MonadMask (RunWithConfig (GitRemoteApp m)) + , HasStorage (RunWithConfig (GitRemoteApp m)) , MonadUnliftIO m , MonadMask m ) diff --git a/hbs2-git/git-hbs2/GitRemoteTypes.hs b/hbs2-git/git-hbs2/GitRemoteTypes.hs index edc03dd3..4e3a5bb8 100644 --- a/hbs2-git/git-hbs2/GitRemoteTypes.hs +++ b/hbs2-git/git-hbs2/GitRemoteTypes.hs @@ -6,6 +6,7 @@ import HBS2.Prelude import HBS2.OrDie import HBS2.Net.Auth.Credentials (PeerCredentials) import HBS2.Net.Proto.Definition() +import HBS2.Peer.RPC.Client.StorageClient import HBS2Git.Types import Control.Monad.Reader @@ -18,11 +19,8 @@ import Control.Monad.Trans.Resource data RemoteEnv = RemoteEnv - { _reHttpCat :: API - , _reHttpSize :: API - , _reHttpPut :: API - , _reHttpRefGet :: API - , _reCreds :: TVar (HashMap RepoRef (PeerCredentials Schema)) + { _reCreds :: TVar (HashMap RepoRef (PeerCredentials Schema)) + , _reRpc :: RPCEndpoints } makeLenses 'RemoteEnv @@ -41,15 +39,15 @@ newtype GitRemoteApp m a = , MonadTrans ) +instance Monad m => HasStorage (GitRemoteApp m) where + getStorage = asks (rpcStorage . view reRpc) <&> AnyStorage . StorageClient + +instance Monad m => HasRPC (GitRemoteApp m) where + getRPC = asks (view reRpc) + runRemoteM :: MonadIO m => RemoteEnv -> GitRemoteApp m a -> m a runRemoteM env m = runReaderT (fromRemoteApp m) env -instance MonadIO m => HasCatAPI (GitRemoteApp m) where - getHttpCatAPI = view (asks reHttpCat) - getHttpSizeAPI = view (asks reHttpSize) - getHttpPutAPI = view (asks reHttpPut) - getHttpRefLogGetAPI = view (asks reHttpRefGet) - instance MonadIO m => HasRefCredentials (GitRemoteApp m) where setCredentials ref cred = do diff --git a/hbs2-git/hbs2-git.cabal b/hbs2-git/hbs2-git.cabal index eb288375..680c7751 100644 --- a/hbs2-git/hbs2-git.cabal +++ b/hbs2-git/hbs2-git.cabal @@ -54,7 +54,7 @@ common shared-properties , TypeFamilies - build-depends: hbs2-core + build-depends: hbs2-core, hbs2-peer , aeson , async , base16-bytestring @@ -176,28 +176,29 @@ executable git-remote-hbs2 hs-source-dirs: git-hbs2 default-language: Haskell2010 -executable git-hbs2-http - import: shared-properties - main-is: GitHttpDumbMain.hs +-- FIXME: make-git-hbs2-http-work-again +-- executable git-hbs2-http +-- import: shared-properties +-- main-is: GitHttpDumbMain.hs - ghc-options: - -threaded - -rtsopts - "-with-rtsopts=-N4 -A64m -AL256m -I0" +-- ghc-options: +-- -threaded +-- -rtsopts +-- "-with-rtsopts=-N4 -A64m -AL256m -I0" - other-modules: +-- other-modules: - -- other-extensions: - build-depends: - base, hbs2-git - , http-types - , optparse-applicative - , scotty - , wai-extra - , warp - , zlib +-- -- other-extensions: +-- build-depends: +-- base, hbs2-git +-- , http-types +-- , optparse-applicative +-- , scotty +-- , wai-extra +-- , warp +-- , zlib - hs-source-dirs: git-hbs2-http - default-language: Haskell2010 +-- hs-source-dirs: git-hbs2-http +-- default-language: Haskell2010 diff --git a/hbs2-git/lib/HBS2Git/App.hs b/hbs2-git/lib/HBS2Git/App.hs index e6a8772a..c4829096 100644 --- a/hbs2-git/lib/HBS2Git/App.hs +++ b/hbs2-git/lib/HBS2Git/App.hs @@ -4,31 +4,44 @@ module HBS2Git.App ( module HBS2Git.App , module HBS2Git.Types + , HasStorage(..) ) where import HBS2.Prelude +import HBS2.Actors.Peer.Types import HBS2.Data.Types.Refs import HBS2.Base58 import HBS2.OrDie import HBS2.Hash +import HBS2.Clock +import HBS2.Storage +import HBS2.Storage.Operations.ByteString import HBS2.System.Logger.Simple import HBS2.Merkle import HBS2.Git.Types import HBS2.Net.Proto.Definition() +import HBS2.Peer.RPC.Client.StorageClient import HBS2.Net.Auth.Credentials hiding (getCredentials) import HBS2.Net.Proto.RefLog import HBS2.Defaults (defBlockSize) +import HBS2.Peer.RPC.Client.Unix +import HBS2.Peer.RPC.API.Peer +import HBS2.Peer.RPC.API.RefLog + import HBS2Git.Types import HBS2Git.Config as Config import HBS2Git.Evolve +import HBS2Git.PrettyStuff import Data.Maybe import Control.Monad.Trans.Maybe import Data.Foldable import Data.Either import Control.Monad.Reader +import Control.Monad.Trans.Resource +import Control.Monad.Except (runExceptT,throwError) import Crypto.Saltine.Core.Sign qualified as Sign import Data.ByteString.Lazy.Char8 (ByteString) import Data.ByteString.Char8 qualified as B8 @@ -43,20 +56,25 @@ import System.Process.Typed import Text.InterpolatedString.Perl6 (qc) import Network.HTTP.Simple import Network.HTTP.Types.Status -import Control.Concurrent.STM +import Control.Concurrent.STM (flushTQueue) import Codec.Serialise import Data.HashMap.Strict qualified as HashMap import Data.List qualified as List import Data.Text qualified as Text -import Data.IORef +-- import Data.IORef import System.IO.Unsafe (unsafePerformIO) import Data.Cache qualified as Cache -import Control.Concurrent.Async +-- import Control.Concurrent.Async import System.Environment -import System.IO + +import Prettyprinter.Render.Terminal import Streaming.Prelude qualified as S +import UnliftIO + +-- instance HasTimeLimits UNIX (ServiceProto PeerAPI UNIX) m where + instance MonadIO m => HasCfgKey ConfBranch (Set String) m where key = "branch" @@ -95,12 +113,6 @@ infoPrefix = toStderr data WithLog = NoLog | WithLog -instance MonadIO m => HasCatAPI (App m) where - getHttpCatAPI = asks (view appPeerHttpCat) - getHttpSizeAPI = asks (view appPeerHttpSize) - getHttpPutAPI = asks (view appPeerHttpPut) - getHttpRefLogGetAPI = asks (view appPeerHttpRefLogGet) - instance MonadIO m => HasRefCredentials (App m) where setCredentials ref cred = do asks (view appRefCred) >>= \t -> liftIO $ atomically $ @@ -110,6 +122,14 @@ instance MonadIO m => HasRefCredentials (App m) where hm <- asks (view appRefCred) >>= liftIO . readTVarIO pure (HashMap.lookup ref hm) `orDie` "keyring not set" +instance (Monad m, HasStorage m) => (HasStorage (ResourceT m)) where + getStorage = lift getStorage + +instance MonadIO m => HasStorage (App m) where + getStorage = asks (rpcStorage . view appRpc) <&> AnyStorage . StorageClient + +instance MonadIO m => HasRPC (App m) where + getRPC = asks (view appRpc) withApp :: MonadIO m => AppEnv -> App m a -> m a withApp env m = runReaderT (fromApp m) env @@ -165,10 +185,71 @@ detectHBS2PeerRefLogGetAPI = do let new = Text.replace "/cat" "/reflog" $ Text.pack api pure $ Text.unpack new + getAppStateDir :: forall m . MonadIO m => m FilePath getAppStateDir = liftIO $ getXdgDirectory XdgData Config.appName -runApp :: MonadIO m => WithLog -> App m () -> m () + + +runWithRPC :: forall m . MonadUnliftIO m => (RPCEndpoints -> m ()) -> m () +runWithRPC action = do + + (_, syn) <- configInit + + let soname' = lastMay [ Text.unpack n + | ListVal @C (Key "rpc" [SymbolVal "unix", LitStrVal n]) <- syn + ] + + soname <- race ( pause @'Seconds 1) (maybe detectRPC pure soname') `orDie` "hbs2-peer rpc timeout!" + + client <- race ( pause @'Seconds 1) (newMessagingUnix False 1.0 soname) `orDie` "hbs2-peer rpc timeout!" + + rpc <- RPCEndpoints <$> makeServiceCaller (fromString soname) + <*> makeServiceCaller (fromString soname) + <*> makeServiceCaller (fromString soname) + + messaging <- async $ runMessagingUnix client + link messaging + + let endpoints = [ Endpoint @UNIX (rpcPeer rpc) + , Endpoint @UNIX (rpcStorage rpc) + , Endpoint @UNIX (rpcRefLog rpc) + ] + + c1 <- async $ liftIO $ runReaderT (runServiceClientMulti endpoints) client + link c1 + + test <- race ( pause @'Seconds 1) (callService @RpcPoke (rpcPeer rpc) ()) `orDie` "hbs2-peer rpc timeout!" + + void $ pure test `orDie` "hbs2-peer rpc error!" + + debug $ "hbs2-peer RPC ok" <+> pretty soname + + action rpc + + cancel messaging + + void $ waitAnyCatchCancel [messaging, c1] + + where + + detectRPC = do + (_, o, _) <- readProcess (shell [qc|hbs2-peer poke|]) + + let answ = parseTop (LBS.unpack o) & fromRight mempty + + let so = headMay [ Text.unpack r | ListVal (Key "rpc:" [LitStrVal r]) <- answ ] + + -- FIXME: logger-to-support-colors + liftIO $ hPutDoc stderr $ yellow "rpc: found RPC" <+> pretty so + <> line <> + yellow "rpc: add option" <+> parens ("rpc unix" <+> dquotes (pretty so)) + <+> "to the config .hbs2/config" + <> line <> line + + pure so `orDie` "hbs2-peer rpc not detected" + +runApp :: MonadUnliftIO m => WithLog -> App m () -> m () runApp l m = do case l of @@ -192,25 +273,11 @@ runApp l m = do (pwd, syn) <- Config.configInit xdgstate <- getAppStateDir - -- let statePath = xdgstate makeRelative home pwd - -- let dbPath = statePath "state.db" - -- db <- dbEnv dbPath - -- trace $ "state" <+> pretty statePath - -- here <- liftIO $ doesDirectoryExist statePath - -- unless here do - -- liftIO $ createDirectoryIfMissing True statePath - -- withDB db stateInit - reQ <- detectHBS2PeerCatAPI - szQ <- detectHBS2PeerSizeAPI - puQ <- detectHBS2PeerPutAPI - rlQ <- detectHBS2PeerRefLogGetAPI - - mtCred <- liftIO $ newTVarIO mempty - - let env = AppEnv pwd (pwd ".git") syn xdgstate reQ szQ puQ rlQ mtCred - - runReaderT (fromApp m) env + runWithRPC $ \rpc -> do + mtCred <- liftIO $ newTVarIO mempty + let env = AppEnv pwd (pwd ".git") syn xdgstate mtCred rpc + runReaderT (fromApp m) (set appRpc rpc env) debug $ vcat (fmap pretty syn) @@ -220,67 +287,17 @@ runApp l m = do setLoggingOff @TRACE setLoggingOff @INFO - -writeBlock :: forall m . (HasCatAPI m, MonadIO m) => ByteString -> m (Maybe (Hash HbSync)) -writeBlock bs = do - req <- getHttpPutAPI - writeBlockIO req bs - -writeBlockIO :: forall m . MonadIO m => API -> ByteString -> m (Maybe (Hash HbSync)) -writeBlockIO api bs = do - req1 <- liftIO $ parseRequest api - let request = setRequestMethod "PUT" - $ setRequestHeader "Content-Type" ["application/octet-stream"] - $ setRequestBodyLBS bs req1 - - resp <- httpLBS request - - case statusCode (getResponseStatus resp) of - - 200 -> pure $ getResponseBody resp & LBS.unpack & fromStringMay - _ -> pure Nothing - - -readBlock :: forall m . (HasCatAPI m, MonadIO m) => HashRef -> m (Maybe ByteString) +readBlock :: forall m . (MonadIO m, HasStorage m) => HashRef -> m (Maybe ByteString) readBlock h = do - req1 <- getHttpCatAPI - readBlockFrom req1 h + sto <- getStorage + liftIO $ getBlock sto (fromHashRef h) -readBlockFrom :: forall m . (MonadIO m) => API -> HashRef -> m (Maybe ByteString) -readBlockFrom api h = do - let reqs = api <> "/" <> show (pretty h) - req <- liftIO $ parseRequest reqs - resp <- httpLBS req +readRef :: (HasStorage m, MonadIO m) => RepoRef -> m (Maybe HashRef) +readRef ref = do + sto <- getStorage + liftIO (getRef sto (refAlias ref)) <&> fmap HashRef - case statusCode (getResponseStatus resp) of - 200 -> pure $ Just (getResponseBody resp) - _ -> pure Nothing - - -readRefHttp :: forall m . (HasCatAPI m, MonadIO m) => RepoRef -> m (Maybe HashRef) -readRefHttp re = do - req0 <- getHttpRefLogGetAPI - let req = req0 <> "/" <> show (pretty re) - request <- liftIO $ parseRequest req - resp <- httpLBS request - - case statusCode (getResponseStatus resp) of - 200 -> pure $ getResponseBody resp & LBS.unpack & fromStringMay - _ -> pure Nothing - - -getBlockSize :: forall m . (HasCatAPI m, MonadIO m) => HashRef -> m (Maybe Integer) -getBlockSize h = do - req1 <- getHttpSizeAPI - let reqs = req1 <> "/" <> show (pretty h) - req <- liftIO $ parseRequest reqs - httpJSONEither req <&> getResponseBody <&> either (const Nothing) Just - -readRef :: (HasCatAPI m, MonadIO m) => RepoRef -> m (Maybe HashRef) -readRef = readRefHttp - - -readHashesFromBlock :: (MonadIO m, HasCatAPI m) => HashRef -> m [HashRef] +readHashesFromBlock :: (MonadIO m, HasStorage m) => HashRef -> m [HashRef] readHashesFromBlock (HashRef h) = do treeQ <- liftIO newTQueueIO walkMerkle h (readBlock . HashRef) $ \hr -> do @@ -290,25 +307,9 @@ readHashesFromBlock (HashRef h) = do re <- liftIO $ atomically $ flushTQueue treeQ pure $ mconcat re -readRefCLI :: MonadIO m => RepoRef -> m (Maybe HashRef) -readRefCLI r = do - let k = pretty (AsBase58 r) - trace [qc|hbs2-peer reflog get {k}|] - let cmd = setStdin closed $ setStderr closed - $ shell [qc|hbs2-peer reflog get {k}|] - (code, out, _) <- liftIO $ readProcess cmd - - trace $ viaShow out - - case code of - ExitFailure{} -> pure Nothing - _ -> do - let s = LBS.unpack <$> headMay (LBS.lines out) - pure $ s >>= fromStringMay - type ObjType = MTreeAnn [HashRef] -readObject :: forall m . (MonadIO m, HasCatAPI m) => HashRef -> m (Maybe ByteString) +readObject :: forall m . (MonadIO m, HasStorage m) => HashRef -> m (Maybe ByteString) readObject h = runMaybeT do q <- liftIO newTQueueIO @@ -329,7 +330,7 @@ readObject h = runMaybeT do mconcat <$> liftIO (atomically $ flushTQueue q) -calcRank :: forall m . (MonadIO m, HasCatAPI m) => HashRef -> m Int +calcRank :: forall m . (MonadIO m, HasStorage m) => HashRef -> m Int calcRank h = fromMaybe 0 <$> runMaybeT do blk <- MaybeT $ readBlock h @@ -347,6 +348,7 @@ calcRank h = fromMaybe 0 <$> runMaybeT do postRefUpdate :: ( MonadIO m , HasRefCredentials m + , HasRPC m , IsRefPubKey Schema ) => RepoRef @@ -355,7 +357,7 @@ postRefUpdate :: ( MonadIO m -> m () postRefUpdate ref seqno hash = do - trace $ "refPostUpdate" <+> pretty seqno <+> pretty hash + info $ "refPostUpdate" <+> pretty seqno <+> pretty hash cred <- getCredentials ref let pubk = view peerSignPk cred @@ -363,88 +365,35 @@ postRefUpdate ref seqno hash = do let tran = SequentialRef seqno (AnnotatedHashRef Nothing hash) let bs = serialise tran & LBS.toStrict - msg <- makeRefLogUpdate @HBS2L4Proto pubk privk bs <&> serialise + msg <- makeRefLogUpdate @HBS2L4Proto pubk privk bs - let input = byteStringInput msg - let cmd = setStdin input $ shell [qc|hbs2-peer reflog send-raw|] + rpc <- getRPC <&> rpcRefLog - (code, _, _) <- liftIO $ readProcess cmd + callService @RpcRefLogPost rpc msg + >>= either (err . viaShow) (const $ pure ()) - trace $ "hbs2-peer exited with code" <+> viaShow code -storeObject :: (MonadIO m, HasCatAPI m, HasConf m) +storeObject :: (MonadIO m, HasStorage m, HasConf m) => ByteString -> ByteString -> m (Maybe HashRef) --- storeObject = storeObjectHBS2Store -storeObject = storeObjectHttpPut +storeObject = storeObjectRPC -storeObjectHttpPut :: (MonadIO m, HasCatAPI m, HasConf m) - => ByteString - -> ByteString - -> m (Maybe HashRef) +storeObjectRPC :: (MonadIO m, HasStorage m) + => ByteString + -> ByteString + -> m (Maybe HashRef) +storeObjectRPC meta bs = do + sto <- getStorage + runMaybeT do + h <- liftIO $ writeAsMerkle sto bs + let txt = LBS.unpack meta & Text.pack + blk <- MaybeT $ liftIO $ getBlock sto h -storeObjectHttpPut meta bs = do + -- FIXME: fix-excess-data-roundtrip + mtree <- MaybeT $ deserialiseOrFail @(MTree [HashRef]) blk + & either (const $ pure Nothing) (pure . Just) - let chu = chunks (fromIntegral defBlockSize) bs - - rt <- liftIO $ Cache.newCache Nothing - - -- FIXME: run-concurrently - hashes <- forM chu $ \s -> do - h <- writeBlock s `orDie` "cant write block" - pure (HashRef h) - - let pt = toPTree (MaxSize 1024) (MaxNum 1024) hashes -- FIXME: settings - - -- trace $ viaShow pt - - root <- makeMerkle 0 pt $ \(h,t,bss) -> do - liftIO $ Cache.insert rt h (t,bss) - -- void $ writeBlock bss - - pieces' <- liftIO $ Cache.toList rt - let pieces = [ bss | (_, (_,bss), _) <- pieces' ] - - api <- getHttpPutAPI - - liftIO $ mapConcurrently (writeBlockIO api) pieces - - mtree <- liftIO $ fst <$> Cache.lookup rt root `orDie` "cant find root block" - - let txt = LBS.unpack meta & Text.pack - - let ann = serialise (MTreeAnn (ShortMetadata txt) NullEncryption mtree) - - writeBlock ann <&> fmap HashRef - --- FIXME: ASAP-store-calls-hbs2 --- Это может приводить к тому, что если пир и hbs2-peer --- смотрят на разные каталоги --- ошибки могут быть очень загадочны. --- Нужно починить. --- --- FIXME: support-another-apis-for-storage -storeObjectHBS2Store :: (MonadIO m, HasConf m) => ByteString -> ByteString -> m (Maybe HashRef) -storeObjectHBS2Store meta bs = do - - stor <- cfgValue @StoragePref @(Maybe FilePath) - - -- FIXME: fix-temporary-workaround-while-hbs2-is-used - -- Пока не избавились от hbs2 store для сохранения объектов - -- можно использовать ключ storage в конфиге hbs2-git - let pref = maybe "" (mappend "-p ") stor - - let meta58 = show $ pretty $ B8.unpack $ toBase58 (LBS.toStrict meta) - - -- trace $ "meta58" <+> pretty meta58 - - let input = byteStringInput bs - let cmd = setStdin input $ setStderr closed - $ shell [qc|hbs2 store --short-meta-base58={meta58} {pref}|] - - (_, out, _) <- liftIO $ readProcess cmd - - case LBS.words out of - ["merkle-root:", h] -> pure $ Just $ fromString (LBS.unpack h) - _ -> pure Nothing + let ann = serialise (MTreeAnn (ShortMetadata txt) NullEncryption mtree) + MaybeT $ liftIO $ putBlock sto ann <&> fmap HashRef makeDbPath :: MonadIO m => RepoRef -> m FilePath diff --git a/hbs2-git/lib/HBS2Git/Config.hs b/hbs2-git/lib/HBS2Git/Config.hs index 948f6a94..73813d7b 100644 --- a/hbs2-git/lib/HBS2Git/Config.hs +++ b/hbs2-git/lib/HBS2Git/Config.hs @@ -58,7 +58,6 @@ configPath _ = liftIO do git <- findGitDir pwd byEnv <- lookupEnv "GIT_DIR" path <- pure (git <|> byEnv) `orDie` "*** hbs2-git: .git directory not found" - debug $ "AAAAA " <+> pretty path pure (takeDirectory path ".hbs2") data ConfigPathInfo = ConfigPathInfo { diff --git a/hbs2-git/lib/HBS2Git/Evolve.hs b/hbs2-git/lib/HBS2Git/Evolve.hs index f910aa39..27a8a8c7 100644 --- a/hbs2-git/lib/HBS2Git/Evolve.hs +++ b/hbs2-git/lib/HBS2Git/Evolve.hs @@ -24,7 +24,6 @@ import UnliftIO evolve :: MonadIO m => m () evolve = void $ runMaybeT do - trace "DO EVOLVE MAZAFAKA!" here <- liftIO getCurrentDirectory diff --git a/hbs2-git/lib/HBS2Git/Export.hs b/hbs2-git/lib/HBS2Git/Export.hs index 9da6d7ce..f52c9590 100644 --- a/hbs2-git/lib/HBS2Git/Export.hs +++ b/hbs2-git/lib/HBS2Git/Export.hs @@ -75,10 +75,11 @@ exportRefDeleted :: forall o m . ( MonadIO m , MonadCatch m , MonadMask m , MonadUnliftIO m - , HasCatAPI m , HasConf m , HasRefCredentials m , HasProgress m + , HasStorage m + , HasRPC m , ExportRepoOps o ) => o @@ -158,7 +159,8 @@ withExportEnv :: MonadIO m => ExportEnv -> ExportT m a -> m a withExportEnv env f = runReaderT (fromExportT f) env writeLogSegments :: forall m . ( MonadIO m - , HasCatAPI m + , HasStorage m + , HasRPC m , MonadMask m , HasRefCredentials m , HasConf m @@ -246,10 +248,11 @@ exportRefOnly :: forall o m . ( MonadIO m , MonadCatch m , MonadMask m , MonadUnliftIO m - , HasCatAPI m , HasConf m , HasRefCredentials m , HasProgress m + , HasStorage m + , HasRPC m , ExportRepoOps o ) => o @@ -375,6 +378,8 @@ runExport :: forall m . ( MonadIO m , MonadCatch m , HasProgress (App m) , MonadMask (App m) + , HasStorage (App m) + , HasRPC (App m) ) => Maybe FilePath -> RepoRef -> App m () @@ -390,6 +395,8 @@ runExport' :: forall m . ( MonadIO m , MonadCatch m , HasProgress (App m) , MonadMask (App m) + , HasStorage (App m) + , HasRPC (App m) ) => FilePath -> App m () @@ -405,6 +412,8 @@ runExport'' :: forall m . ( MonadIO m , MonadCatch m , HasProgress (App m) , MonadMask (App m) + , HasStorage (App m) + , HasRPC (App m) ) => FilePath -> RepoRef -> App m () diff --git a/hbs2-git/lib/HBS2Git/Import.hs b/hbs2-git/lib/HBS2Git/Import.hs index 2abfae64..e48c31d1 100644 --- a/hbs2-git/lib/HBS2Git/Import.hs +++ b/hbs2-git/lib/HBS2Git/Import.hs @@ -53,14 +53,14 @@ makeLenses 'RunImportOpts isRunImportDry :: RunImportOpts -> Bool isRunImportDry o = view runImportDry o == Just True -walkHashes :: HasCatAPI m => TQueue HashRef -> Hash HbSync -> m () +walkHashes :: (MonadIO m, HasStorage m) => TQueue HashRef -> Hash HbSync -> m () walkHashes q h = walkMerkle h (readBlock . HashRef) $ \(hr :: Either (Hash HbSync) [HashRef]) -> do case hr of Left hx -> die $ show $ pretty "missed block:" <+> pretty hx Right (hrr :: [HashRef]) -> do forM_ hrr $ liftIO . atomically . Q.writeTQueue q -blockSource :: (MonadIO m, HasCatAPI m) => HashRef -> SB.ByteStream m Integer +blockSource :: (MonadIO m, HasStorage m) => HashRef -> SB.ByteStream m Integer blockSource h = do tsize <- liftIO $ newTVarIO 0 deepScan ScanDeep (const none) (fromHashRef h) (lift . readBlock . HashRef) $ \ha -> do @@ -109,42 +109,11 @@ instance HasImportOpts (Bool, Bool) where importForce = fst importDontWriteGit = snd --- FIXME: ASAP-will-work-only-for-one-repo --- сейчас все транзакции помечаются, как обработанные --- в глобальном стейте для ссылки. таким образом, --- если мы вызвали для одного репозитория, --- то import не будет работать для остальных, т.к. решит, --- что всё обработано. --- --- Решение: --- Вариант N1. Держать стейт локально в каждом --- каталоге git. --- Минусы: --- - большой оверхед по данным --- - мусор в каталоге git --- - например, git-hbs2-http вообще работает без "репозитория", --- как ему быть --- --- Вариант N2. сделать развязку через какой-то ID --- репозитория или путь к нему. --- Минусы: --- - выглядит хрупко --- - например, git-hbs2-http вообще работает без "репозитория", --- как ему быть --- --- Вариант N3. БД обновлять отдельно, объекты git - отдельно --- для каждого репозитория, запоминать (где?) проигранные для --- него логи. --- Минусы: --- - двойное сканирование файлов логов - получение, распаковка, --- сканирование и т.п. сначала для БД, потом для непосредственно --- репозитория --- importRefLogNew :: ( MonadIO m , MonadUnliftIO m , MonadCatch m , MonadMask m - , HasCatAPI m + , HasStorage m , HasImportOpts opts ) => opts -> RepoRef -> m () diff --git a/hbs2-git/lib/HBS2Git/ListRefs.hs b/hbs2-git/lib/HBS2Git/ListRefs.hs index 09b44168..d6720bb3 100644 --- a/hbs2-git/lib/HBS2Git/ListRefs.hs +++ b/hbs2-git/lib/HBS2Git/ListRefs.hs @@ -50,7 +50,7 @@ hbs2Prefix = "hbs2://" -- все известные ref-ы из стейта. -- Сейчас выводятся только локальные -runListRefs :: MonadIO m => App m () +runListRefs :: (MonadIO m, HasStorage (App m)) => App m () runListRefs = do refs <- gitGetRemotes <&> filter isHbs2 remoteEntries <- @@ -74,10 +74,10 @@ runListRefs = do where isHbs2 (_, b) = Text.isPrefixOf hbs2Prefix b -runToolsScan :: (MonadUnliftIO m,MonadCatch m,MonadMask m) => RepoRef -> App m () +runToolsScan :: (MonadUnliftIO m,MonadCatch m,MonadMask m,HasStorage (App m)) => RepoRef -> App m () runToolsScan ref = do trace $ "runToolsScan" <+> pretty ref - importRefLogNew False ref + importRefLogNew True ref shutUp pure () @@ -89,7 +89,7 @@ runToolsGetRefs ref = do hPrint stdout $ pretty (AsGitRefsFile rh) shutUp -getRefVal :: (MonadIO m, HasCatAPI m) => Text -> m (Maybe HashRef) +getRefVal :: (MonadIO m, HasStorage m) => Text -> m (Maybe HashRef) getRefVal url = case Text.stripPrefix hbs2Prefix url of Nothing -> do @@ -100,9 +100,10 @@ getRefVal url = liftIO $ print $ pretty "can't parse ref" <+> pretty refStr pure Nothing Just ref -> do - mRefVal <- readRefHttp ref + mRefVal <- readRef ref case mRefVal of Nothing -> do - liftIO $ print $ pretty "readRefHttp error" <+> pretty ref + liftIO $ print $ pretty "readRef error" <+> pretty ref pure Nothing Just v -> pure $ Just v + diff --git a/hbs2-git/lib/HBS2Git/State.hs b/hbs2-git/lib/HBS2Git/State.hs index e0332ffb..0ef9a7c0 100644 --- a/hbs2-git/lib/HBS2Git/State.hs +++ b/hbs2-git/lib/HBS2Git/State.hs @@ -31,8 +31,6 @@ import Control.Concurrent.STM import Data.Graph (graphFromEdges, topSort) import Lens.Micro.Platform -import System.IO (stderr) - -- FIXME: move-orphans-to-separate-module instance ToField Cookie where diff --git a/hbs2-git/lib/HBS2Git/Types.hs b/hbs2-git/lib/HBS2Git/Types.hs index 1b7f6d10..78a31518 100644 --- a/hbs2-git/lib/HBS2Git/Types.hs +++ b/hbs2-git/lib/HBS2Git/Types.hs @@ -5,6 +5,8 @@ module HBS2Git.Types ( module HBS2Git.Types , module Control.Monad.IO.Class + , HasStorage(..) + , AnyStorage(..) ) where @@ -12,10 +14,16 @@ import HBS2.Prelude.Plated import HBS2.Hash import HBS2.Base58 import HBS2.Git.Types +import HBS2.Actors.Peer.Types (HasStorage(..),AnyStorage(..)) +import HBS2.Peer.RPC.Client.Unix hiding (Cookie) import HBS2.Net.Proto.RefLog (RefLogKey(..)) import HBS2.Net.Proto.Types hiding (Cookie) import HBS2.Net.Auth.Credentials +import HBS2.Peer.RPC.API.Peer +import HBS2.Peer.RPC.API.RefLog +import HBS2.Peer.RPC.API.Storage + import HBS2.System.Logger.Simple import Data.Config.Suckless @@ -80,23 +88,30 @@ data KeyRingFile data KeyRingFiles data StoragePref +data RPCEndpoints = + RPCEndpoints + { rpcPeer :: ServiceCaller PeerAPI UNIX + , rpcStorage :: ServiceCaller StorageAPI UNIX + , rpcRefLog :: ServiceCaller RefLogAPI UNIX + } + data AppEnv = AppEnv { _appCurDir :: FilePath , _appGitDir :: FilePath , _appConf :: [Syntax C] , _appStateDir :: FilePath - , _appPeerHttpCat :: API - , _appPeerHttpSize :: API - , _appPeerHttpPut :: API - , _appPeerHttpRefLogGet :: API , _appRefCred :: TVar (HashMap RepoRef (PeerCredentials Schema)) + , _appRpc :: RPCEndpoints } makeLenses 'AppEnv newtype AsGitRefsFile a = AsGitRefsFile a +class HasRPC m where + getRPC :: m RPCEndpoints + data RepoHead = RepoHead { _repoHEAD :: Maybe GitRef @@ -165,35 +180,10 @@ instance {-# OVERLAPPABLE #-} MonadIO m => HasProgress m where , styleWidth = ConstantWidth 60 } -class MonadIO m => HasCatAPI m where - getHttpCatAPI :: m API - getHttpSizeAPI :: m API - getHttpPutAPI :: m API - getHttpRefLogGetAPI :: m API - class MonadIO m => HasRefCredentials m where getCredentials :: RepoRef -> m (PeerCredentials Schema) setCredentials :: RepoRef -> PeerCredentials Schema -> m () -instance (HasCatAPI m, MonadIO m) => HasCatAPI (MaybeT m) where - getHttpCatAPI = lift getHttpCatAPI - getHttpSizeAPI = lift getHttpSizeAPI - getHttpPutAPI = lift getHttpPutAPI - getHttpRefLogGetAPI = lift getHttpRefLogGetAPI - - -instance (HasCatAPI m, MonadIO m) => HasCatAPI (ResourceT m) where - getHttpCatAPI = lift getHttpCatAPI - getHttpSizeAPI = lift getHttpSizeAPI - getHttpPutAPI = lift getHttpPutAPI - getHttpRefLogGetAPI = lift getHttpRefLogGetAPI - --- instance (HasCatAPI (App m), MonadIO m) => HasCatAPI (ResourceT (App m)) where --- getHttpCatAPI = lift getHttpCatAPI --- getHttpSizeAPI = lift getHttpSizeAPI --- getHttpPutAPI = lift getHttpPutAPI --- getHttpRefLogGetAPI = lift getHttpRefLogGetAPI - newtype App m a = App { fromApp :: ReaderT AppEnv m a } deriving newtype ( Applicative diff --git a/hbs2-peer/app/PeerConfig.hs b/hbs2-peer/app/PeerConfig.hs index f2cd3730..3879cd0f 100644 --- a/hbs2-peer/app/PeerConfig.hs +++ b/hbs2-peer/app/PeerConfig.hs @@ -136,7 +136,7 @@ getRpcSocketNameM = do syn <- getConf let soname = lastDef rpcSoDef [ Text.unpack n - | ListVal @C (Key "rpc2" [SymbolVal "unix", LitStrVal n]) <- syn + | ListVal @C (Key "rpc" [SymbolVal "unix", LitStrVal n]) <- syn ] pure soname diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index bd09fad2..f4f78c49 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -281,10 +281,13 @@ runCLI = do pPoke = do rpc <- pRpcCommon pure $ withMyRPC @PeerAPI rpc $ \caller -> do - r <- callService @RpcPoke caller () - case r of - Left e -> err (viaShow e) - Right txt -> putStrLn txt + e <- race ( pause @'Seconds 0.25) do + r <- callService @RpcPoke caller () + case r of + Left e -> die (show e) + Right txt -> putStrLn txt >> exitSuccess + + liftIO $ either (const $ exitFailure) (const $ exitSuccess) e pAnnounce = do rpc <- pRpcCommon @@ -914,7 +917,7 @@ runPeer opts = Exception.handle (\e -> myException e let refChanHeadPostAction href = do void $ liftIO $ withPeerM penv $ do let h = fromHashRef href - debug $ "rpc2.refChanHeadPost" <+> pretty h + debug $ "rpc.refChanHeadPost" <+> pretty h me <- ownPeer @e sto <- getStorage @@ -935,14 +938,14 @@ runPeer opts = Exception.handle (\e -> myException e runResponseM me $ refChanHeadProto @e True refChanAdapter msg let refChanProposeAction (puk, box) = do - debug $ "rpc2.reChanPropose" <+> pretty (AsBase58 puk) + debug $ "rpc.reChanPropose" <+> pretty (AsBase58 puk) void $ liftIO $ withPeerM penv $ do me <- ownPeer @e runMaybeT do proposed <- MaybeT $ makeProposeTran @e pc puk box lift $ runResponseM me $ refChanUpdateProto @e True pc refChanAdapter (Propose @e puk proposed) - -- NOTE: moved-to-rpc2 + -- NOTE: moved-to-rpc let refChanNotifyAction (puk, box) = do void $ liftIO $ withPeerM penv $ do me <- ownPeer @e @@ -976,17 +979,20 @@ runPeer opts = Exception.handle (\e -> myException e Nothing -> mempty Just p -> "http-port:" <+> pretty p + let rpc = getRpcSocketName conf + let pokeAnsw = show $ vcat [ "peer-key:" <+> dquotes (pretty (AsBase58 k)) , "udp:" <+> dquotes (pretty (listenAddr mess)) , "local-multicast:" <+> dquotes (pretty localMulticast) + , "rpc:" <+> dquotes (pretty rpc) , http ] let rpcSa = getRpcSocketName conf - rpc2msg <- newMessagingUnixOpts [MUFork] True 1.0 rpcSa + rpcmsg <- newMessagingUnixOpts [MUFork] True 1.0 rpcSa - let rpc2ctx = RPC2Context { rpcConfig = fromPeerConfig conf - , rpcMessaging = rpc2msg + let rpcctx = RPC2Context { rpcConfig = fromPeerConfig conf + , rpcMessaging = rpcmsg , rpcPokeAnswer = pokeAnsw , rpcPeerEnv = penv , rpcLocalMultiCast = localMulticast @@ -997,10 +1003,10 @@ runPeer opts = Exception.handle (\e -> myException e , rpcDoRefChanNotify = refChanNotifyAction } - m1 <- async $ runMessagingUnix rpc2msg + m1 <- async $ runMessagingUnix rpcmsg link m1 - rpcProto <- async $ flip runReaderT rpc2ctx do + rpcProto <- async $ flip runReaderT rpcctx do runProto @UNIX [ makeResponse (makeServer @PeerAPI) , makeResponse (makeServer @RefLogAPI) diff --git a/hbs2-peer/app/RPC2/Announce.hs b/hbs2-peer/app/RPC2/Announce.hs index 6d439968..3bfbf8a4 100644 --- a/hbs2-peer/app/RPC2/Announce.hs +++ b/hbs2-peer/app/RPC2/Announce.hs @@ -15,12 +15,10 @@ import HBS2.Peer.RPC.API.Peer instance (MonadIO m,HasRpcContext PeerAPI RPC2Context m) => HandleMethod m RpcAnnounce where - type instance Input RpcAnnounce = HashRef - type instance Output RpcAnnounce = () handleMethod href = do co <- getRpcContext @PeerAPI - debug $ "rpc2.announce:" <+> pretty href + debug $ "rpc.announce:" <+> pretty href sendBlockAnnounce (rpcPeerEnv co) (rpcLocalMultiCast co) (fromHashRef href) diff --git a/hbs2-peer/app/RPC2/Die.hs b/hbs2-peer/app/RPC2/Die.hs index 9f54cf7d..15d3a587 100644 --- a/hbs2-peer/app/RPC2/Die.hs +++ b/hbs2-peer/app/RPC2/Die.hs @@ -5,9 +5,7 @@ import HBS2.Clock import HBS2.Net.Proto.Service import HBS2.System.Logger.Simple -import Data.Config.Suckless.KeyValue -import HBS2.Peer.RPC.Internal.Types import HBS2.Peer.RPC.API.Peer import System.Exit qualified as Exit @@ -15,11 +13,9 @@ import Control.Concurrent.Async instance (MonadIO m) => HandleMethod m RpcDie where - type instance Input RpcDie = () - type instance Output RpcDie = () handleMethod _ = do - debug $ "rpc2.die: exiting" + debug $ "rpc.die: exiting" void $ liftIO $ do w <- async $ pause @'Seconds 0.5 >> Exit.exitSuccess link w diff --git a/hbs2-peer/app/RPC2/Fetch.hs b/hbs2-peer/app/RPC2/Fetch.hs index cc66d88e..b63e992b 100644 --- a/hbs2-peer/app/RPC2/Fetch.hs +++ b/hbs2-peer/app/RPC2/Fetch.hs @@ -3,7 +3,6 @@ module RPC2.Fetch where import HBS2.Prelude.Plated -import HBS2.Data.Types.Refs (HashRef(..)) import HBS2.Net.Proto.Service import HBS2.System.Logger.Simple @@ -12,12 +11,10 @@ import HBS2.Peer.RPC.Internal.Types import HBS2.Peer.RPC.API.Peer instance (MonadIO m, HasRpcContext PeerAPI RPC2Context m) => HandleMethod m RpcFetch where - type instance Input RpcFetch = HashRef - type instance Output RpcFetch = () handleMethod href = do co <- getRpcContext @PeerAPI - debug $ "rpc2.fetch:" <+> pretty href + debug $ "rpc.fetch:" <+> pretty href liftIO $ rpcDoFetch co href diff --git a/hbs2-peer/app/RPC2/LogLevel.hs b/hbs2-peer/app/RPC2/LogLevel.hs index c0ab2bf4..494d83b1 100644 --- a/hbs2-peer/app/RPC2/LogLevel.hs +++ b/hbs2-peer/app/RPC2/LogLevel.hs @@ -9,15 +9,8 @@ import Log import HBS2.Peer.RPC.API.Peer import HBS2.System.Logger.Simple -import Codec.Serialise - - -instance Serialise SetLogging - instance (MonadIO m) => HandleMethod m RpcLogLevel where - type instance Input RpcLogLevel = SetLogging - type instance Output RpcLogLevel = () handleMethod = \case DebugOn True -> do diff --git a/hbs2-peer/app/RPC2/Peers.hs b/hbs2-peer/app/RPC2/Peers.hs index 225a93d5..e4202653 100644 --- a/hbs2-peer/app/RPC2/Peers.hs +++ b/hbs2-peer/app/RPC2/Peers.hs @@ -23,8 +23,6 @@ import Data.Maybe instance (MonadIO m, HasRpcContext PeerAPI RPC2Context m) => HandleMethod m RpcPeers where - type instance Input RpcPeers = () - type instance Output RpcPeers = [(PubKey 'Sign HBS2Basic, PeerAddr L4Proto)] handleMethod _ = do co <- getRpcContext @PeerAPI diff --git a/hbs2-peer/app/RPC2/PexInfo.hs b/hbs2-peer/app/RPC2/PexInfo.hs index 16548e19..bace3711 100644 --- a/hbs2-peer/app/RPC2/PexInfo.hs +++ b/hbs2-peer/app/RPC2/PexInfo.hs @@ -3,7 +3,6 @@ module RPC2.PexInfo where import HBS2.Actors.Peer -import HBS2.Net.Proto.Types import HBS2.Net.Proto.Service import HBS2.Prelude.Plated import HBS2.Net.Proto.Definition() @@ -13,11 +12,12 @@ import HBS2.Net.Proto.PeerExchange import HBS2.Peer.RPC.Internal.Types import HBS2.Peer.RPC.API.Peer +import Codec.Serialise + instance ( MonadIO m , HasRpcContext PeerAPI RPC2Context m + , Serialise (Output RpcPexInfo) ) => HandleMethod m RpcPexInfo where - type instance Input RpcPexInfo = () - type instance Output RpcPexInfo = [PeerAddr L4Proto] handleMethod _ = do co <- getRpcContext @PeerAPI diff --git a/hbs2-peer/app/RPC2/Ping.hs b/hbs2-peer/app/RPC2/Ping.hs index 6c99528b..e0d3e3a3 100644 --- a/hbs2-peer/app/RPC2/Ping.hs +++ b/hbs2-peer/app/RPC2/Ping.hs @@ -4,8 +4,6 @@ module RPC2.Ping where import HBS2.Prelude.Plated import HBS2.Actors.Peer --- import HBS2.Actors.Peer.Types -import HBS2.Net.Proto.Types import HBS2.Net.Proto.Service import HBS2.System.Logger.Simple @@ -17,12 +15,10 @@ import HBS2.Peer.RPC.API.Peer instance (MonadIO m, HasRpcContext PeerAPI RPC2Context m) => HandleMethod m RpcPing where - type instance Input RpcPing = PeerAddr L4Proto - type instance Output RpcPing = Bool handleMethod pa = do co <- getRpcContext @PeerAPI - debug $ "rpc2.ping:" <+> pretty pa + debug $ "rpc.ping:" <+> pretty pa liftIO $ withPeerM (rpcPeerEnv co) $ do pingPeerWait pa diff --git a/hbs2-peer/app/RPC2/Poke.hs b/hbs2-peer/app/RPC2/Poke.hs index 34f8b733..4758cea2 100644 --- a/hbs2-peer/app/RPC2/Poke.hs +++ b/hbs2-peer/app/RPC2/Poke.hs @@ -15,12 +15,10 @@ instance ( MonadIO m , HasRpcContext PeerAPI RPC2Context m) => HandleMethod m RpcPoke where - type instance Input RpcPoke = () - type instance Output RpcPoke = String handleMethod n = do co <- getRpcContext @PeerAPI - debug $ "rpc2.poke: alive and kicking!" <+> pretty n + debug $ "rpc.poke: alive and kicking!" <+> pretty n pure $ rpcPokeAnswer co diff --git a/hbs2-peer/app/RPC2/RefChan.hs b/hbs2-peer/app/RPC2/RefChan.hs index 6426c2de..a50dfd8a 100644 --- a/hbs2-peer/app/RPC2/RefChan.hs +++ b/hbs2-peer/app/RPC2/RefChan.hs @@ -12,7 +12,6 @@ import HBS2.Base58 import HBS2.Data.Types.Refs (HashRef(..)) import HBS2.Net.Proto.Definition() import HBS2.Net.Proto.Service -import HBS2.Data.Types.SignedBox import HBS2.Net.Proto.RefChan import HBS2.Net.Messaging.Unix import HBS2.Storage @@ -23,7 +22,6 @@ import HBS2.Peer.RPC.Internal.Types import HBS2.System.Logger.Simple import PeerTypes -import Data.ByteString qualified as BS import Data.Functor import Control.Monad.Reader @@ -35,71 +33,57 @@ instance (Monad m) getRpcContext = lift ask instance RefChanContext m => HandleMethod m RpcRefChanHeadGet where - type instance Input RpcRefChanHeadGet = PubKey 'Sign HBS2Basic - type instance Output RpcRefChanHeadGet = Maybe HashRef handleMethod puk = do co <- getRpcContext @RefChanAPI let penv = rpcPeerEnv co - debug $ "rpc2.refchanHeadGet:" <+> pretty (AsBase58 puk) + debug $ "rpc.refchanHeadGet:" <+> pretty (AsBase58 puk) liftIO $ withPeerM penv $ do sto <- getStorage liftIO $ getRef sto (RefChanHeadKey @HBS2Basic puk) <&> fmap HashRef instance (RefChanContext m) => HandleMethod m RpcRefChanHeadFetch where - type instance Input RpcRefChanHeadFetch = PubKey 'Sign HBS2Basic - type instance Output RpcRefChanHeadFetch = () handleMethod puk = do - debug $ "rpc2.refchanHeadFetch:" <+> pretty (AsBase58 puk) + debug $ "rpc.refchanHeadFetch:" <+> pretty (AsBase58 puk) penv <- rpcPeerEnv <$> getRpcContext @RefChanAPI void $ liftIO $ withPeerM penv $ do broadCastMessage (RefChanGetHead @L4Proto puk) instance RefChanContext m => HandleMethod m RpcRefChanFetch where - type instance Input RpcRefChanFetch = PubKey 'Sign HBS2Basic - type instance Output RpcRefChanFetch = () handleMethod puk = do - debug $ "rpc2.refchanFetch:" <+> pretty (AsBase58 puk) + debug $ "rpc.refchanFetch:" <+> pretty (AsBase58 puk) penv <- rpcPeerEnv <$> getRpcContext @RefChanAPI void $ liftIO $ withPeerM penv $ do gossip (RefChanRequest @L4Proto puk) instance RefChanContext m => HandleMethod m RpcRefChanGet where - type instance Input RpcRefChanGet = PubKey 'Sign HBS2Basic - type instance Output RpcRefChanGet = Maybe HashRef handleMethod puk = do co <- getRpcContext @RefChanAPI let penv = rpcPeerEnv co - debug $ "rpc2.refchanGet:" <+> pretty (AsBase58 puk) + debug $ "rpc.refchanGet:" <+> pretty (AsBase58 puk) liftIO $ withPeerM penv $ do sto <- getStorage liftIO $ getRef sto (RefChanLogKey @HBS2Basic puk) <&> fmap HashRef instance RefChanContext m => HandleMethod m RpcRefChanPropose where - type instance Input RpcRefChanPropose = (PubKey 'Sign HBS2Basic, SignedBox BS.ByteString L4Proto) - type instance Output RpcRefChanPropose = () handleMethod (puk, box) = do co <- getRpcContext @RefChanAPI - debug $ "rpc2.refChanNotifyAction" <+> pretty (AsBase58 puk) + debug $ "rpc.refChanNotifyAction" <+> pretty (AsBase58 puk) liftIO $ rpcDoRefChanPropose co (puk, box) instance RefChanContext m => HandleMethod m RpcRefChanNotify where - type instance Input RpcRefChanNotify = (PubKey 'Sign HBS2Basic, SignedBox BS.ByteString L4Proto) - type instance Output RpcRefChanNotify = () handleMethod (puk, box) = do co <- getRpcContext @RefChanAPI - debug $ "rpc2.refChanNotifyAction" <+> pretty (AsBase58 puk) + debug $ "rpc.refChanNotifyAction" <+> pretty (AsBase58 puk) liftIO $ rpcDoRefChanNotify co (puk, box) instance RefChanContext m => HandleMethod m RpcRefChanHeadPost where - type instance Input RpcRefChanHeadPost = HashRef - type instance Output RpcRefChanHeadPost = () handleMethod href = do co <- getRpcContext @RefChanAPI diff --git a/hbs2-peer/app/RPC2/RefLog.hs b/hbs2-peer/app/RPC2/RefLog.hs index a157a7da..1cf7bfe2 100644 --- a/hbs2-peer/app/RPC2/RefLog.hs +++ b/hbs2-peer/app/RPC2/RefLog.hs @@ -37,12 +37,10 @@ instance (Monad m) getRpcContext = lift ask instance (RefLogContext m) => HandleMethod m RpcRefLogGet where - type instance Input RpcRefLogGet = PubKey 'Sign HBS2Basic - type instance Output RpcRefLogGet = Maybe HashRef handleMethod pk = do co <- getRpcContext @RefLogAPI - debug $ "rpc2.reflogGet:" <+> pretty (AsBase58 pk) + debug $ "rpc.reflogGet:" <+> pretty (AsBase58 pk) <+> pretty (hashObject @HbSync (RefLogKey @HBS2Basic pk)) liftIO $ withPeerM (rpcPeerEnv co) $ do @@ -50,24 +48,20 @@ instance (RefLogContext m) => HandleMethod m RpcRefLogGet where liftIO (getRef sto (RefLogKey @HBS2Basic pk)) <&> fmap HashRef instance (RefLogContext m) => HandleMethod m RpcRefLogFetch where - type instance Input RpcRefLogFetch = PubKey 'Sign HBS2Basic - type instance Output RpcRefLogFetch = () handleMethod pk = do co <- getRpcContext @RefLogAPI - debug $ "rpc2.reflogFetch:" <+> pretty (AsBase58 pk) + debug $ "rpc.reflogFetch:" <+> pretty (AsBase58 pk) liftIO $ withPeerM (rpcPeerEnv co) $ do broadCastMessage (RefLogRequest @L4Proto pk) instance (RefLogContext m) => HandleMethod m RpcRefLogPost where - type instance Input RpcRefLogPost = RefLogUpdate L4Proto - type instance Output RpcRefLogPost = () handleMethod msg = do co <- getRpcContext @RefLogAPI let pk = view refLogId msg - debug $ "rpc2.reflogPost:" <+> pretty (AsBase58 pk) + debug $ "rpc.reflogPost:" <+> pretty (AsBase58 pk) liftIO $ withPeerM (rpcPeerEnv co) $ do emit @L4Proto RefLogUpdateEvKey (RefLogUpdateEvData (pk, msg)) diff --git a/hbs2-peer/app/RefLog.hs b/hbs2-peer/app/RefLog.hs index a022c07c..b73edbfa 100644 --- a/hbs2-peer/app/RefLog.hs +++ b/hbs2-peer/app/RefLog.hs @@ -131,6 +131,7 @@ reflogWorker conf adapter = do let reflogUpdate reflog _ tran = do signed <- verifyRefLogUpdate tran + when signed do liftIO $ atomically $ writeTQueue pQ (reflog, [tran]) diff --git a/hbs2-peer/lib/HBS2/Peer/RPC/API/Peer.hs b/hbs2-peer/lib/HBS2/Peer/RPC/API/Peer.hs index c606cc1b..112770ac 100644 --- a/hbs2-peer/lib/HBS2/Peer/RPC/API/Peer.hs +++ b/hbs2-peer/lib/HBS2/Peer/RPC/API/Peer.hs @@ -1,8 +1,10 @@ +{-# Language UndecidableInstances #-} module HBS2.Peer.RPC.API.Peer where import HBS2.Prelude.Plated import HBS2.Net.Messaging.Unix import HBS2.Net.Proto.Service +import HBS2.Data.Types.Refs (HashRef(..)) import HBS2.Actors.Peer import HBS2.Peer.RPC.Internal.Types @@ -41,9 +43,34 @@ instance (Monad m) -- type instance RpcContext PeerAPI = RPC2Context getRpcContext = lift ask +type instance Input RpcDie = () +type instance Output RpcDie = () + +type instance Input RpcPoke = () +type instance Output RpcPoke = String + +type instance Input RpcPing = PeerAddr L4Proto +type instance Output RpcPing = Bool + +type instance Input RpcAnnounce = HashRef +type instance Output RpcAnnounce = () + +type instance Input RpcPexInfo = () +type instance Output RpcPexInfo = [PeerAddr L4Proto] + +type instance Input RpcPeers = () +type instance Output RpcPeers = [(PubKey 'Sign HBS2Basic, PeerAddr L4Proto)] + +type instance Input RpcFetch = HashRef +type instance Output RpcFetch = () + +type instance Input RpcLogLevel = SetLogging +type instance Output RpcLogLevel = () data SetLogging = DebugOn Bool | TraceOn Bool deriving (Generic,Eq,Show) +instance Serialise SetLogging + diff --git a/hbs2-peer/lib/HBS2/Peer/RPC/API/RefChan.hs b/hbs2-peer/lib/HBS2/Peer/RPC/API/RefChan.hs index 402acf77..a5e48156 100644 --- a/hbs2-peer/lib/HBS2/Peer/RPC/API/RefChan.hs +++ b/hbs2-peer/lib/HBS2/Peer/RPC/API/RefChan.hs @@ -1,9 +1,13 @@ +{-# Language UndecidableInstances #-} module HBS2.Peer.RPC.API.RefChan where import HBS2.Net.Proto.Service import HBS2.Net.Messaging.Unix (UNIX) +import HBS2.Data.Types.Refs (HashRef(..)) +import HBS2.Data.Types.SignedBox import Data.ByteString.Lazy ( ByteString ) +import Data.ByteString qualified as BS import Codec.Serialise -- NOTE: refchan-head-endpoints @@ -38,3 +42,24 @@ instance HasProtocol UNIX (ServiceProto RefChanAPI UNIX) where encode = serialise +type instance Input RpcRefChanHeadGet = PubKey 'Sign HBS2Basic +type instance Output RpcRefChanHeadGet = Maybe HashRef + +type instance Input RpcRefChanHeadFetch = PubKey 'Sign HBS2Basic +type instance Output RpcRefChanHeadFetch = () + +type instance Input RpcRefChanFetch = PubKey 'Sign HBS2Basic +type instance Output RpcRefChanFetch = () + +type instance Input RpcRefChanGet = PubKey 'Sign HBS2Basic +type instance Output RpcRefChanGet = Maybe HashRef + +type instance Input RpcRefChanPropose = (PubKey 'Sign HBS2Basic, SignedBox BS.ByteString L4Proto) +type instance Output RpcRefChanPropose = () + +type instance Input RpcRefChanNotify = (PubKey 'Sign HBS2Basic, SignedBox BS.ByteString L4Proto) +type instance Output RpcRefChanNotify = () + +type instance Input RpcRefChanHeadPost = HashRef +type instance Output RpcRefChanHeadPost = () + diff --git a/hbs2-peer/lib/HBS2/Peer/RPC/API/RefLog.hs b/hbs2-peer/lib/HBS2/Peer/RPC/API/RefLog.hs index 9964238a..31f44938 100644 --- a/hbs2-peer/lib/HBS2/Peer/RPC/API/RefLog.hs +++ b/hbs2-peer/lib/HBS2/Peer/RPC/API/RefLog.hs @@ -1,7 +1,10 @@ +{-# Language UndecidableInstances #-} module HBS2.Peer.RPC.API.RefLog where import HBS2.Net.Messaging.Unix +import HBS2.Data.Types.Refs (HashRef(..)) import HBS2.Net.Proto.Service +import HBS2.Net.Proto.RefLog (RefLogUpdate) import Data.ByteString.Lazy (ByteString) import Codec.Serialise @@ -23,3 +26,11 @@ instance HasProtocol UNIX (ServiceProto RefLogAPI UNIX) where decode = either (const Nothing) Just . deserialiseOrFail encode = serialise +type instance Input RpcRefLogGet = PubKey 'Sign HBS2Basic +type instance Output RpcRefLogGet = Maybe HashRef + +type instance Input RpcRefLogFetch = PubKey 'Sign HBS2Basic +type instance Output RpcRefLogFetch = () + +type instance Input RpcRefLogPost = RefLogUpdate L4Proto +type instance Output RpcRefLogPost = () diff --git a/hbs2-peer/lib/HBS2/Peer/RPC/API/Storage.hs b/hbs2-peer/lib/HBS2/Peer/RPC/API/Storage.hs index 2141582b..a66a0d33 100644 --- a/hbs2-peer/lib/HBS2/Peer/RPC/API/Storage.hs +++ b/hbs2-peer/lib/HBS2/Peer/RPC/API/Storage.hs @@ -5,6 +5,8 @@ import HBS2.Actors.Peer import HBS2.Net.Proto.Service import HBS2.Net.Messaging.Unix import HBS2.Peer.RPC.Internal.Types +import HBS2.Storage (Offset,Size) +import HBS2.Data.Types.Refs (HashRef(..),RefAlias(..)) import Control.Monad.Reader import Data.ByteString.Lazy (ByteString) @@ -45,3 +47,30 @@ instance (Monad m) instance Monad m => HasStorage (ReaderT RPC2Context m) where getStorage = asks rpcStorage +type instance Input RpcStorageHasBlock = HashRef +type instance Output RpcStorageHasBlock = Maybe Integer + +type instance Input RpcStorageGetBlock = HashRef +type instance Output RpcStorageGetBlock = Maybe ByteString + +type instance Input RpcStorageEnqueueBlock = ByteString +type instance Output RpcStorageEnqueueBlock = Maybe HashRef + +type instance Input RpcStoragePutBlock = ByteString +type instance Output RpcStoragePutBlock = Maybe HashRef + +type instance Input RpcStorageDelBlock = HashRef +type instance Output RpcStorageDelBlock = () + +type instance Input RpcStorageGetChunk = (HashRef, Offset, Size) +type instance Output RpcStorageGetChunk = Maybe ByteString + +type instance Input RpcStorageGetRef = RefAlias +type instance Output RpcStorageGetRef = Maybe HashRef + +type instance Input RpcStorageUpdateRef = (RefAlias, HashRef) +type instance Output RpcStorageUpdateRef = () + +type instance Input RpcStorageDelRef = RefAlias +type instance Output RpcStorageDelRef = () + diff --git a/hbs2-peer/lib/HBS2/Peer/RPC/Client/StorageClient.hs b/hbs2-peer/lib/HBS2/Peer/RPC/Client/StorageClient.hs index 8a36df29..b2863ded 100644 --- a/hbs2-peer/lib/HBS2/Peer/RPC/Client/StorageClient.hs +++ b/hbs2-peer/lib/HBS2/Peer/RPC/Client/StorageClient.hs @@ -20,6 +20,8 @@ import Data.Functor import Data.ByteString.Lazy (ByteString) import Data.Either +import HBS2.System.Logger.Simple + newtype StorageClient e = StorageClient { fromStorageClient :: ServiceCaller StorageAPI e } @@ -29,10 +31,12 @@ instance ( MonadIO m => Storage (StorageClient e) HbSync ByteString m where putBlock s lbs = liftIO do + debug $ "CLIENT: putBlock!" callService @RpcStoragePutBlock @StorageAPI (fromStorageClient s) lbs <&> either (const Nothing) (fmap fromHashRef) enqueueBlock s lbs = liftIO do + debug $ "CLIENT: enqueueBlock!" callService @RpcStorageEnqueueBlock @StorageAPI (fromStorageClient s) lbs <&> either (const Nothing) (fmap fromHashRef) diff --git a/hbs2-peer/lib/HBS2/Peer/RPC/Client/Unix.hs b/hbs2-peer/lib/HBS2/Peer/RPC/Client/Unix.hs index 43c4c5b0..bd6a8056 100644 --- a/hbs2-peer/lib/HBS2/Peer/RPC/Client/Unix.hs +++ b/hbs2-peer/lib/HBS2/Peer/RPC/Client/Unix.hs @@ -1,4 +1,8 @@ -module HBS2.Peer.RPC.Client.Unix where +module HBS2.Peer.RPC.Client.Unix + ( module HBS2.Peer.RPC.Client.Unix + , module HBS2.Net.Proto.Service + , module HBS2.Net.Messaging.Unix + ) where import HBS2.Prelude.Plated @@ -6,6 +10,7 @@ import HBS2.Clock import HBS2.Net.Messaging.Unix import HBS2.Net.Proto.Service +-- FIXME: to-remove-code import HBS2.Peer.RPC.API.Storage() import HBS2.System.Logger.Simple diff --git a/hbs2-peer/lib/HBS2/Peer/RPC/Internal/Storage.hs b/hbs2-peer/lib/HBS2/Peer/RPC/Internal/Storage.hs index c656fae4..9aca0762 100644 --- a/hbs2-peer/lib/HBS2/Peer/RPC/Internal/Storage.hs +++ b/hbs2-peer/lib/HBS2/Peer/RPC/Internal/Storage.hs @@ -9,89 +9,78 @@ module HBS2.Peer.RPC.Internal.Storage import HBS2.Prelude.Plated import HBS2.Actors.Peer.Types -import HBS2.Data.Types.Refs (HashRef(..),RefAlias(..)) +import HBS2.Data.Types.Refs (HashRef(..)) import HBS2.Storage import HBS2.Peer.RPC.Class import HBS2.Peer.RPC.API.Storage import HBS2.Net.Proto.Service - +import HBS2.System.Logger.Simple import Data.Functor -import Data.ByteString.Lazy ( ByteString ) -- type StorageContext m = (MonadIO m, HasStorage m) type StorageContext m = (MonadIO m, HasStorage m) instance (StorageContext m) => HandleMethod m RpcStorageHasBlock where - type instance Input RpcStorageHasBlock = HashRef - type instance Output RpcStorageHasBlock = Maybe Integer handleMethod href = do + debug $ "rpc.storage.hasBlock" <+> pretty href sto <- getStorage liftIO $ hasBlock sto (fromHashRef href) instance (StorageContext m) => HandleMethod m RpcStorageGetBlock where - type instance Input RpcStorageGetBlock = HashRef - type instance Output RpcStorageGetBlock = Maybe ByteString handleMethod href = do + debug $ "rpc.storage.getBlock" <+> pretty href sto <- getStorage liftIO $ getBlock sto (fromHashRef href) instance (StorageContext m) => HandleMethod m RpcStorageEnqueueBlock where - type instance Input RpcStorageEnqueueBlock = ByteString - type instance Output RpcStorageEnqueueBlock = Maybe HashRef handleMethod lbs = do + debug $ "rpc.storage.enqueueBlock" sto <- getStorage liftIO $ enqueueBlock sto lbs <&> fmap HashRef instance (StorageContext m) => HandleMethod m RpcStoragePutBlock where - type instance Input RpcStoragePutBlock = ByteString - type instance Output RpcStoragePutBlock = Maybe HashRef handleMethod lbs = do + debug $ "rpc.storage.putBlock" sto <- getStorage liftIO $ putBlock sto lbs <&> fmap HashRef instance (StorageContext m) => HandleMethod m RpcStorageDelBlock where - type instance Input RpcStorageDelBlock = HashRef - type instance Output RpcStorageDelBlock = () handleMethod href = do + debug $ "rpc.storage.delBlock" <+> pretty href sto <- getStorage liftIO $ delBlock sto (fromHashRef href) instance (StorageContext m) => HandleMethod m RpcStorageGetChunk where - type instance Input RpcStorageGetChunk = (HashRef, Offset, Size) - type instance Output RpcStorageGetChunk = Maybe ByteString handleMethod (h,o,s) = do sto <- getStorage liftIO $ getChunk sto (fromHashRef h) o s instance (StorageContext m) => HandleMethod m RpcStorageGetRef where - type instance Input RpcStorageGetRef = RefAlias - type instance Output RpcStorageGetRef = Maybe HashRef handleMethod ref = do + debug $ "rpc.storage.getRef" <+> pretty ref sto <- getStorage liftIO $ getRef sto ref <&> fmap HashRef instance (StorageContext m) => HandleMethod m RpcStorageUpdateRef where - type instance Input RpcStorageUpdateRef = (RefAlias, HashRef) - type instance Output RpcStorageUpdateRef = () handleMethod (ref, val) = do + debug $ "rpc.storage.updateRef" <+> pretty ref sto <- getStorage liftIO $ updateRef sto ref (fromHashRef val) instance (StorageContext m) => HandleMethod m RpcStorageDelRef where - type instance Input RpcStorageDelRef = RefAlias - type instance Output RpcStorageDelRef = () handleMethod ref = do + debug $ "rpc.storage.delRef" <+> pretty ref sto <- getStorage liftIO $ delRef sto ref diff --git a/hbs2-tests/test/StorageServiceTest.hs b/hbs2-tests/test/StorageServiceTest.hs index df3e5719..f6f513c2 100644 --- a/hbs2-tests/test/StorageServiceTest.hs +++ b/hbs2-tests/test/StorageServiceTest.hs @@ -9,6 +9,8 @@ import HBS2.Net.Messaging.Unix import HBS2.Net.Proto.Service import HBS2.Storage import HBS2.Storage.Simple (simpleStorageWorker,simpleStorageInit) +import HBS2.Storage.Operations.ByteString qualified as OP +import HBS2.Storage.Operations.ByteString (MerkleReader(..),TreeKey(..)) import HBS2.Peer.RPC.API.Storage import HBS2.Peer.RPC.Client.Unix @@ -21,12 +23,14 @@ import HBS2.OrDie import HBS2.System.Logger.Simple import Control.Monad.Reader +import Control.Monad.Except (Except, ExceptT(..), runExcept, runExceptT) import Data.Kind import System.FilePath import UnliftIO import Prettyprinter import Data.ByteString.Lazy (ByteString) import Data.ByteString.Lazy qualified as LBS +import Data.ByteString.Lazy.Char8 qualified as LBS8 import Data.Maybe import Codec.Serialise @@ -169,6 +173,18 @@ main = do assertBool "ref-alias-works-3" (vjopa == Just h3) + let aaa = LBS8.replicate (256 * 1024 * 10) 'A' + + aaaHref <- OP.writeAsMerkle cto aaa + + info $ "writeAsMerkle" <+> pretty aaaHref + + aaaWat <- runExceptT (OP.readFromMerkle cto (SimpleKey aaaHref)) `orDie` "readFromMerkle failed" + + info $ "readFromMerkle" <+> pretty (LBS.length aaaWat) + + assertBool "read/write" (aaa == aaaWat) + pure ()