From 5f77b26520bea20f8afed47aaf039a4cb10066f5 Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Fri, 24 Mar 2023 19:54:07 +0300 Subject: [PATCH] hbs2-git: lotsa stuff --- docs/devlog.md | 2 +- hbs2-git/git-hbs2/GitRemoteMain.hs | 22 ++-- hbs2-git/git-hbs2/GitRemotePush.hs | 2 + hbs2-git/git-hbs2/GitRemoteTypes.hs | 4 + hbs2-git/hbs2-git.cabal | 3 + hbs2-git/lib/HBS2Git/App.hs | 172 +++++++++++++++++++++++++--- hbs2-git/lib/HBS2Git/Export.hs | 27 ++++- hbs2-git/lib/HBS2Git/Types.hs | 9 +- hbs2-peer/app/HttpWorker.hs | 29 ++++- hbs2-peer/app/PeerMain.hs | 5 +- 10 files changed, 236 insertions(+), 39 deletions(-) diff --git a/docs/devlog.md b/docs/devlog.md index 8ea8e21b..bb3e05cb 100644 --- a/docs/devlog.md +++ b/docs/devlog.md @@ -2,7 +2,7 @@ ## 2023-03-24 -проверка: wip101 +проверка: wip109 TODO: storage-reliable-write Надёжную процедуру записи блока. diff --git a/hbs2-git/git-hbs2/GitRemoteMain.hs b/hbs2-git/git-hbs2/GitRemoteMain.hs index 0d347a31..ec125147 100644 --- a/hbs2-git/git-hbs2/GitRemoteMain.hs +++ b/hbs2-git/git-hbs2/GitRemoteMain.hs @@ -6,6 +6,7 @@ import HBS2.Base58 import HBS2.OrDie import HBS2.Git.Types import HBS2.Git.Local.CLI +import HBS2.Clock import HBS2.System.Logger.Simple @@ -72,7 +73,6 @@ parseRepoURL url' = either (const Nothing) Just (parseOnly p url) capabilities :: BS.ByteString capabilities = BS.unlines ["push","fetch"] - readHeadDef :: HasCatAPI m => DBEnv -> m LBS.ByteString readHeadDef db = withDB db stateGetHead >>= @@ -117,6 +117,8 @@ loop args = do updateLocalState ref + hd <- readHeadDef db + hashes <- withDB db stateGetAllObjects -- FIXME: asap-get-all-existing-objects-or-all-if-clone @@ -151,7 +153,7 @@ loop args = do let cmd = BS.words str -- trace $ pretty (fmap BS.unpack cmd) - hPrint stderr $ show $ pretty (fmap BS.unpack cmd) + -- hPrint stderr $ show $ pretty (fmap BS.unpack cmd) -- isBatch <- liftIO $ readTVarIO batch @@ -161,8 +163,7 @@ loop args = do liftIO $ atomically $ writeTVar batch False sendEol when isBatch next - unless isBatch do - updateLocalState ref + -- unless isBatch do ["capabilities"] -> do trace $ "send capabilities" <+> pretty (BS.unpack capabilities) @@ -171,9 +172,6 @@ loop args = do ["list"] -> do - updateLocalState ref - hd <- readHeadDef db - hl <- liftIO $ readTVarIO jobNumT pb <- newProgressMonitor "storing git objects" hl @@ -198,14 +196,14 @@ loop args = do next ["list","for-push"] -> do - for_ (LBS.lines hdRefOld) (sendLn . LBS.toStrict) + for_ (LBS.lines hd) (sendLn . LBS.toStrict) sendEol next ["fetch", sha1, x] -> do trace $ "fetch" <+> pretty (BS.unpack sha1) <+> pretty (BS.unpack x) liftIO $ atomically $ writeTVar batch True - sendEol + -- sendEol next ["push", rr] -> do @@ -214,12 +212,14 @@ loop args = do liftIO $ atomically $ writeTVar batch True pushed <- push ref pu case pushed of - Nothing -> sendEol + Nothing -> hPrint stderr "fucked!" >> sendEol Just re -> sendLn [qc|ok {pretty re}|] next other -> die $ show other + -- updateLocalState ref + where fromString' "" = Nothing fromString' x = Just $ fromString x @@ -247,6 +247,8 @@ main = do env <- RemoteEnv <$> detectHBS2PeerCatAPI <*> detectHBS2PeerSizeAPI + <*> detectHBS2PeerPutAPI + <*> detectHBS2PeerRefLogGetAPI <*> liftIO (newTVarIO mempty) runRemoteM env do diff --git a/hbs2-git/git-hbs2/GitRemotePush.hs b/hbs2-git/git-hbs2/GitRemotePush.hs index fffb6c82..0fb97674 100644 --- a/hbs2-git/git-hbs2/GitRemotePush.hs +++ b/hbs2-git/git-hbs2/GitRemotePush.hs @@ -52,6 +52,8 @@ instance MonadIO m => HasConf (RunWithConfig (GitRemoteApp m)) where instance MonadIO m => HasCatAPI (RunWithConfig (GitRemoteApp m)) where getHttpCatAPI = lift getHttpCatAPI getHttpSizeAPI = lift getHttpSizeAPI + getHttpPutAPI = lift getHttpPutAPI + getHttpRefLogGetAPI = lift getHttpRefLogGetAPI instance MonadIO m => HasRefCredentials (RunWithConfig (GitRemoteApp m)) where getCredentials = lift . getCredentials diff --git a/hbs2-git/git-hbs2/GitRemoteTypes.hs b/hbs2-git/git-hbs2/GitRemoteTypes.hs index f034f7bc..af231358 100644 --- a/hbs2-git/git-hbs2/GitRemoteTypes.hs +++ b/hbs2-git/git-hbs2/GitRemoteTypes.hs @@ -18,6 +18,8 @@ data RemoteEnv = RemoteEnv { _reHttpCat :: API , _reHttpSize :: API + , _reHttpPut :: API + , _reHttpRefGet :: API , _reCreds :: TVar (HashMap RepoRef (PeerCredentials Schema)) } @@ -38,6 +40,8 @@ runRemoteM env m = runReaderT (fromRemoteApp m) env instance MonadIO m => HasCatAPI (GitRemoteApp m) where getHttpCatAPI = view (asks reHttpCat) getHttpSizeAPI = view (asks reHttpSize) + getHttpPutAPI = view (asks reHttpPut) + getHttpRefLogGetAPI = view (asks reHttpRefGet) instance MonadIO m => HasRefCredentials (GitRemoteApp m) where diff --git a/hbs2-git/hbs2-git.cabal b/hbs2-git/hbs2-git.cabal index 33d58123..fc97466e 100644 --- a/hbs2-git/hbs2-git.cabal +++ b/hbs2-git/hbs2-git.cabal @@ -103,6 +103,7 @@ library -- other-extensions: build-depends: base , terminal-progress-bar + , http-types hs-source-dirs: lib default-language: Haskell2010 @@ -123,6 +124,7 @@ executable git-hbs2 build-depends: base, hbs2-git , optparse-applicative + , http-types hs-source-dirs: git-hbs2 default-language: Haskell2010 @@ -150,6 +152,7 @@ executable git-remote-hbs2 , unix , unliftio , terminal-progress-bar + , http-types hs-source-dirs: git-hbs2 default-language: Haskell2010 diff --git a/hbs2-git/lib/HBS2Git/App.hs b/hbs2-git/lib/HBS2Git/App.hs index 8f535657..fa55536a 100644 --- a/hbs2-git/lib/HBS2Git/App.hs +++ b/hbs2-git/lib/HBS2Git/App.hs @@ -18,6 +18,7 @@ import HBS2.Git.Types import HBS2.Net.Proto.Definition() import HBS2.Net.Auth.Credentials hiding (getCredentials) import HBS2.Net.Proto.RefLog +import HBS2.Defaults (defBlockSize) import HBS2Git.Types import HBS2Git.Config as Config @@ -40,11 +41,18 @@ import System.FilePath import System.Process.Typed import Text.InterpolatedString.Perl6 (qc) import Network.HTTP.Simple +import Network.HTTP.Types.Status import Control.Concurrent.STM import Codec.Serialise import Data.HashMap.Strict qualified as HashMap import Data.List qualified as List import Data.Text qualified as Text +import System.IO ( stderr ) +import Data.IORef +import System.IO.Unsafe (unsafePerformIO) +import Data.Cache (Cache) +import Data.Cache qualified as Cache +import Control.Concurrent.Async instance MonadIO m => HasCfgKey ConfBranch (Set String) m where key = "branch" @@ -89,6 +97,8 @@ data WithLog = NoLog | WithLog instance MonadIO m => HasCatAPI (App m) where getHttpCatAPI = asks (view appPeerHttpCat) getHttpSizeAPI = asks (view appPeerHttpSize) + getHttpPutAPI = asks (view appPeerHttpPut) + getHttpRefLogGetAPI = asks (view appPeerHttpRefLogGet) instance MonadIO m => HasRefCredentials (App m) where setCredentials ref cred = do @@ -103,33 +113,57 @@ instance MonadIO m => HasRefCredentials (App m) where withApp :: MonadIO m => AppEnv -> App m a -> m a withApp env m = runReaderT (fromApp m) env -detectHBS2PeerCatAPI :: MonadIO m => m String +{-# NOINLINE hBS2PeerCatAPI #-} +hBS2PeerCatAPI :: IORef (Maybe API) +hBS2PeerCatAPI = unsafePerformIO (newIORef Nothing) + +detectHBS2PeerCatAPI :: MonadIO m => m API detectHBS2PeerCatAPI = do -- FIXME: hardcoded-hbs2-peer - (_, o, _) <- readProcess (shell [qc|hbs2-peer poke|]) - trace $ pretty (LBS.unpack o) + v <- liftIO $ readIORef hBS2PeerCatAPI - let dieMsg = "hbs2-peer is down or it's http is inactive" + case v of + Just x -> pure x + Nothing -> do + (_, o, _) <- readProcess (shell [qc|hbs2-peer poke|]) - let answ = parseTop (LBS.unpack o) & fromRight mempty + let dieMsg = "hbs2-peer is down or it's http is inactive" - let po = headMay [ n | ListVal (Key "http-port:" [LitIntVal n]) <- answ ] - -- shutUp + let answ = parseTop (LBS.unpack o) & fromRight mempty - pnum <- pure po `orDie` dieMsg + let po = headMay [ n | ListVal (Key "http-port:" [LitIntVal n]) <- answ ] + -- shutUp - debug $ pretty "using http port" <+> pretty po + pnum <- pure po `orDie` dieMsg - pure [qc|http://localhost:{pnum}/cat|] + debug $ pretty "using http port" <+> pretty po + + let api = [qc|http://localhost:{pnum}/cat|] + + liftIO $ writeIORef hBS2PeerCatAPI (Just api) + + pure api -detectHBS2PeerSizeAPI :: MonadIO m => m String +detectHBS2PeerSizeAPI :: MonadIO m => m API detectHBS2PeerSizeAPI = do api <- detectHBS2PeerCatAPI let new = Text.replace "/cat" "/size" $ Text.pack api pure $ Text.unpack new +detectHBS2PeerPutAPI :: MonadIO m => m API +detectHBS2PeerPutAPI = do + api <- detectHBS2PeerCatAPI + let new = Text.replace "/cat" "/" $ Text.pack api + pure $ Text.unpack new + +detectHBS2PeerRefLogGetAPI :: MonadIO m => m API +detectHBS2PeerRefLogGetAPI = do + api <- detectHBS2PeerCatAPI + let new = Text.replace "/cat" "/reflog" $ Text.pack api + pure $ Text.unpack new + getAppStateDir :: forall m . MonadIO m => m FilePath getAppStateDir = liftIO $ getXdgDirectory XdgData Config.appName @@ -159,10 +193,12 @@ runApp l m = do reQ <- detectHBS2PeerCatAPI szQ <- detectHBS2PeerSizeAPI + puQ <- detectHBS2PeerPutAPI + rlQ <- detectHBS2PeerRefLogGetAPI mtCred <- liftIO $ newTVarIO mempty - let env = AppEnv pwd (pwd ".git") syn xdgstate reQ szQ mtCred + let env = AppEnv pwd (pwd ".git") syn xdgstate reQ szQ puQ rlQ mtCred runReaderT (fromApp m) env @@ -174,13 +210,50 @@ runApp l m = do setLoggingOff @TRACE setLoggingOff @INFO + +writeBlock :: forall m . (HasCatAPI m, MonadIO m) => ByteString -> m (Maybe (Hash HbSync)) +writeBlock bs = do + req <- getHttpPutAPI + writeBlockIO req bs + +writeBlockIO :: forall m . MonadIO m => API -> ByteString -> m (Maybe (Hash HbSync)) +writeBlockIO api bs = do + req1 <- liftIO $ parseRequest api + let request = setRequestMethod "PUT" + $ setRequestHeader "Content-Type" ["application/octet-stream"] + $ setRequestBodyLBS bs req1 + + resp <- httpLBS request + + case statusCode (getResponseStatus resp) of + + 200 -> pure $ getResponseBody resp & LBS.unpack & fromStringMay + _ -> pure Nothing + + readBlock :: forall m . (HasCatAPI m, MonadIO m) => HashRef -> m (Maybe ByteString) readBlock h = do - -- trace $ "readBlock" <+> pretty h - req1 <- getHttpCatAPI -- asks (view appPeerHttpCat) + req1 <- getHttpCatAPI let reqs = req1 <> "/" <> show (pretty h) req <- liftIO $ parseRequest reqs - httpLBS req <&> getResponseBody <&> Just + resp <- httpLBS req + + case statusCode (getResponseStatus resp) of + 200 -> pure $ Just (getResponseBody resp) + _ -> pure Nothing + + +readRefHttp :: forall m . (HasCatAPI m, MonadIO m) => RepoRef -> m (Maybe HashRef) +readRefHttp re = do + req0 <- getHttpRefLogGetAPI + let req = req0 <> "/" <> show (pretty re) + request <- liftIO $ parseRequest req + resp <- httpLBS request + + case statusCode (getResponseStatus resp) of + 200 -> pure $ getResponseBody resp & LBS.unpack & fromStringMay + _ -> pure Nothing + getBlockSize :: forall m . (HasCatAPI m, MonadIO m) => HashRef -> m (Maybe Integer) getBlockSize h = do @@ -189,8 +262,22 @@ getBlockSize h = do req <- liftIO $ parseRequest reqs httpJSONEither req <&> getResponseBody <&> either (const Nothing) Just -readRef :: MonadIO m => RepoRef -> m (Maybe HashRef) -readRef r = do +readRef :: (HasCatAPI m, MonadIO m) => RepoRef -> m (Maybe HashRef) +readRef = readRefHttp + + +readHashesFromBlock :: (MonadIO m, HasCatAPI m) => HashRef -> m [HashRef] +readHashesFromBlock (HashRef h) = do + treeQ <- liftIO newTQueueIO + walkMerkle h (readBlock . HashRef) $ \hr -> do + case hr of + Left{} -> pure () + Right (hrr :: [HashRef]) -> liftIO $ atomically $ writeTQueue treeQ hrr + re <- liftIO $ atomically $ flushTQueue treeQ + pure $ mconcat re + +readRefCLI :: MonadIO m => RepoRef -> m (Maybe HashRef) +readRefCLI r = do let k = pretty (AsBase58 r) trace [qc|hbs2-peer reflog get {k}|] let cmd = setStdin closed $ setStderr closed @@ -248,8 +335,55 @@ postRefUpdate ref seqno hash = do trace $ "hbs2-peer exited with code" <+> viaShow code -storeObject :: (MonadIO m, HasConf m) => ByteString -> ByteString -> m (Maybe HashRef) -storeObject = storeObjectHBS2Store +storeObject :: (MonadIO m, HasCatAPI m, HasConf m) + => ByteString -> ByteString -> m (Maybe HashRef) +-- storeObject = storeObjectHBS2Store +storeObject = storeObjectHttpPut + +storeObjectHttpPut :: (MonadIO m, HasCatAPI m, HasConf m) + => ByteString + -> ByteString + -> m (Maybe HashRef) + +storeObjectHttpPut meta bs = do + + -- TODO: разбить-на-блоки + -- TODO: сохранить-блоки-получить-хэши + -- TODO: записать-merkle-c-метадатой-и-хэшами + + let chu = chunks (fromIntegral defBlockSize) bs + + trace $ length chu + + rt <- liftIO $ Cache.newCache Nothing + + -- FIXME: run-concurrently + hashes <- forM chu $ \s -> do + h <- writeBlock s `orDie` "cant write block" + pure (HashRef h) + + let pt = toPTree (MaxSize 1024) (MaxNum 1024) hashes -- FIXME: settings + + trace $ viaShow pt + + root <- makeMerkle 0 pt $ \(h,t,bss) -> do + liftIO $ Cache.insert rt h (t,bss) + -- void $ writeBlock bss + + pieces' <- liftIO $ Cache.toList rt + let pieces = [ bss | (_, (_,bss), _) <- pieces' ] + + api <- getHttpPutAPI + + liftIO $ mapConcurrently (writeBlockIO api) pieces + + mtree <- liftIO $ fst <$> Cache.lookup rt root `orDie` "cant find root block" + + let txt = LBS.unpack meta & Text.pack + + let ann = serialise (MTreeAnn (ShortMetadata txt) NullEncryption mtree) + + writeBlock ann <&> fmap HashRef -- FIXME: ASAP-store-calls-hbs2 -- Это может приводить к тому, что если пир и hbs2-peer diff --git a/hbs2-git/lib/HBS2Git/Export.hs b/hbs2-git/lib/HBS2Git/Export.hs index 35ddc9f8..7521099d 100644 --- a/hbs2-git/lib/HBS2Git/Export.hs +++ b/hbs2-git/lib/HBS2Git/Export.hs @@ -2,6 +2,7 @@ module HBS2Git.Export where import HBS2.Prelude.Plated +import HBS2.Clock import HBS2.Data.Types.Refs import HBS2.OrDie import HBS2.System.Logger.Simple @@ -26,17 +27,15 @@ import Data.List (sortBy) import Control.Applicative import Control.Monad.Reader import Data.ByteString.Lazy.Char8 qualified as LBS -import Data.ByteString qualified as BS import Data.Cache as Cache import Data.Foldable (for_) -import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict qualified as HashMap import Data.Maybe import Data.Set qualified as Set import Data.Set (Set) import Lens.Micro.Platform import Control.Concurrent.STM -import Control.Concurrent.STM.TVar +import Control.Concurrent.Async data HashCache = HashCache @@ -85,6 +84,7 @@ export h repoHead = do liftIO $ gitGetTransitiveClosure cache mempty h <&> Set.toList -- notice "store dependencies to state" + -- hashes <- readHashesFromBlock undefined sz <- liftIO $ Cache.size (hCache cache) mon1 <- newProgressMonitor "storing dependencies" sz @@ -110,14 +110,14 @@ export h repoHead = do -- let gha = gitHashObject (GitObject Blob repoHead) hh <- lift $ storeObject metaHead repoHeadStr `orDie` "cant save repo head" - - mon3 <- newProgressMonitor "store all objects from repo" (length deps) + mon3 <- newProgressMonitor "export objects from repo" (length deps) for_ deps $ \d -> do here <- stateGetHash d <&> isJust -- FIXME: asap-check-if-objects-is-in-hbs2 unless here do lbs <- gitReadObject Nothing d + -- TODO: why-not-default-blob -- anything is blob tp <- gitGetObjectType d <&> fromMaybe Blob -- @@ -127,7 +127,6 @@ export h repoHead = do <> "type:" <+> pretty tp <+> pretty d <> line - hr' <- lift $ storeObject metaO lbs maybe1 hr' (pure ()) $ \hr -> do @@ -160,8 +159,24 @@ export h repoHead = do seqno <- stateGetSequence <&> succ -- FIXME: same-transaction-different-seqno + postRefUpdate h seqno (HashRef root) + let noRef = do + pause @'Seconds 20 + shutUp + die $ show $ pretty "No reference appeared for" <+> pretty h + + wmon <- newProgressMonitor "waiting for ref" 20 + void $ liftIO $ race noRef $ do + runApp NoLog do + fix \next -> do + v <- readRefHttp h + updateProgress wmon 1 + case v of + Nothing -> pause @'Seconds 1 >> next + Just{} -> pure () + pure (HashRef root, hh) diff --git a/hbs2-git/lib/HBS2Git/Types.hs b/hbs2-git/lib/HBS2Git/Types.hs index 1a577e6c..97740dd8 100644 --- a/hbs2-git/lib/HBS2Git/Types.hs +++ b/hbs2-git/lib/HBS2Git/Types.hs @@ -36,6 +36,7 @@ import Data.Kind type Schema = UDP +-- FIXME: introduce-API-type type API = String type DBEnv = Connection @@ -56,8 +57,10 @@ data AppEnv = , _appGitDir :: FilePath , _appConf :: [Syntax C] , _appStateDir :: FilePath - , _appPeerHttpCat :: String + , _appPeerHttpCat :: API , _appPeerHttpSize :: API + , _appPeerHttpPut :: API + , _appPeerHttpRefLogGet :: API , _appRefCred :: TVar (HashMap RepoRef (PeerCredentials Schema)) } @@ -114,6 +117,8 @@ instance {-# OVERLAPPABLE #-} MonadIO m => HasProgress m where class MonadIO m => HasCatAPI m where getHttpCatAPI :: m API getHttpSizeAPI :: m API + getHttpPutAPI :: m API + getHttpRefLogGetAPI :: m API class MonadIO m => HasRefCredentials m where getCredentials :: RepoRef -> m (PeerCredentials Schema) @@ -122,6 +127,8 @@ class MonadIO m => HasRefCredentials m where instance (HasCatAPI m, MonadIO m) => HasCatAPI (MaybeT m) where getHttpCatAPI = lift getHttpCatAPI getHttpSizeAPI = lift getHttpSizeAPI + getHttpPutAPI = lift getHttpPutAPI + getHttpRefLogGetAPI = lift getHttpRefLogGetAPI class Monad m => HasCfgKey a b m where -- type family CfgValue a :: Type diff --git a/hbs2-peer/app/HttpWorker.hs b/hbs2-peer/app/HttpWorker.hs index cb072e94..b9147751 100644 --- a/hbs2-peer/app/HttpWorker.hs +++ b/hbs2-peer/app/HttpWorker.hs @@ -3,6 +3,8 @@ module HttpWorker where import HBS2.Prelude import HBS2.Actors.Peer import HBS2.Storage +import HBS2.Hash +import HBS2.Data.Types.Refs import HBS2.System.Logger.Simple @@ -36,7 +38,7 @@ httpWorker conf e = do maybe1 port' none $ \port -> liftIO do scotty port $ do - middleware logStdoutDev + middleware logStdout get "/size/:hash" do what <- param @String "hash" <&> fromString @@ -56,5 +58,30 @@ httpWorker conf e = do addHeader "content-length" [qc|{LBS.length lbs}|] raw lbs + get "/reflog/:ref" do + re <- param @String "ref" <&> fromStringMay + case re of + Nothing -> status status404 + Just ref -> do + va <- liftIO $ getRef sto (RefLogKey ref) + maybe1 va (status status404) $ \val -> do + text [qc|{pretty val}|] + + put "/" do + -- FIXME: optional-header-based-authorization + -- signed nonce + peer key? + + -- TODO: ddos-protection + -- FIXME: fix-max-size-hardcode + bs <- LBS.take 4194304 <$> body + -- let ha = hashObject @HbSync bs + -- here <- liftIO $ hasBlock sto ha <&> isJust + + mbHash <- liftIO $ putBlock sto bs + + case mbHash of + Nothing -> status status500 + Just h -> text [qc|{pretty h}|] + pure () diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 17392efd..483a4447 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -869,6 +869,7 @@ rpcClientMain opt action = do withRPC :: RPCOpt -> RPC UDP -> IO () withRPC o cmd = rpcClientMain o $ do + hSetBuffering stdout LineBuffering conf <- peerConfigRead (view rpcOptConf o) @@ -941,7 +942,8 @@ withRPC o cmd = rpcClientMain o $ do void $ liftIO $ race onTimeout do k <- liftIO $ atomically $ readTQueue pokeFQ - Log.info $ pretty k + print (pretty k) + hFlush stdout exitSuccess RPCPeers{} -> liftIO do @@ -965,6 +967,7 @@ withRPC o cmd = rpcClientMain o $ do Nothing -> exitFailure Just re -> do print $ pretty re + hFlush stdout exitSuccess _ -> pure ()