hbs2-git: lotsa stuff

This commit is contained in:
Dmitry Zuikov 2023-03-24 19:54:07 +03:00
parent 2648382ad9
commit 5f77b26520
10 changed files with 236 additions and 39 deletions

View File

@ -2,7 +2,7 @@
## 2023-03-24
проверка: wip101
проверка: wip109
TODO: storage-reliable-write
Надёжную процедуру записи блока.

View File

@ -6,6 +6,7 @@ import HBS2.Base58
import HBS2.OrDie
import HBS2.Git.Types
import HBS2.Git.Local.CLI
import HBS2.Clock
import HBS2.System.Logger.Simple
@ -72,7 +73,6 @@ parseRepoURL url' = either (const Nothing) Just (parseOnly p url)
capabilities :: BS.ByteString
capabilities = BS.unlines ["push","fetch"]
readHeadDef :: HasCatAPI m => DBEnv -> m LBS.ByteString
readHeadDef db =
withDB db stateGetHead >>=
@ -117,6 +117,8 @@ loop args = do
updateLocalState ref
hd <- readHeadDef db
hashes <- withDB db stateGetAllObjects
-- FIXME: asap-get-all-existing-objects-or-all-if-clone
@ -151,7 +153,7 @@ loop args = do
let cmd = BS.words str
-- trace $ pretty (fmap BS.unpack cmd)
hPrint stderr $ show $ pretty (fmap BS.unpack cmd)
-- hPrint stderr $ show $ pretty (fmap BS.unpack cmd)
--
isBatch <- liftIO $ readTVarIO batch
@ -161,8 +163,7 @@ loop args = do
liftIO $ atomically $ writeTVar batch False
sendEol
when isBatch next
unless isBatch do
updateLocalState ref
-- unless isBatch do
["capabilities"] -> do
trace $ "send capabilities" <+> pretty (BS.unpack capabilities)
@ -171,9 +172,6 @@ loop args = do
["list"] -> do
updateLocalState ref
hd <- readHeadDef db
hl <- liftIO $ readTVarIO jobNumT
pb <- newProgressMonitor "storing git objects" hl
@ -198,14 +196,14 @@ loop args = do
next
["list","for-push"] -> do
for_ (LBS.lines hdRefOld) (sendLn . LBS.toStrict)
for_ (LBS.lines hd) (sendLn . LBS.toStrict)
sendEol
next
["fetch", sha1, x] -> do
trace $ "fetch" <+> pretty (BS.unpack sha1) <+> pretty (BS.unpack x)
liftIO $ atomically $ writeTVar batch True
sendEol
-- sendEol
next
["push", rr] -> do
@ -214,12 +212,14 @@ loop args = do
liftIO $ atomically $ writeTVar batch True
pushed <- push ref pu
case pushed of
Nothing -> sendEol
Nothing -> hPrint stderr "fucked!" >> sendEol
Just re -> sendLn [qc|ok {pretty re}|]
next
other -> die $ show other
-- updateLocalState ref
where
fromString' "" = Nothing
fromString' x = Just $ fromString x
@ -247,6 +247,8 @@ main = do
env <- RemoteEnv <$> detectHBS2PeerCatAPI
<*> detectHBS2PeerSizeAPI
<*> detectHBS2PeerPutAPI
<*> detectHBS2PeerRefLogGetAPI
<*> liftIO (newTVarIO mempty)
runRemoteM env do

View File

@ -52,6 +52,8 @@ instance MonadIO m => HasConf (RunWithConfig (GitRemoteApp m)) where
instance MonadIO m => HasCatAPI (RunWithConfig (GitRemoteApp m)) where
getHttpCatAPI = lift getHttpCatAPI
getHttpSizeAPI = lift getHttpSizeAPI
getHttpPutAPI = lift getHttpPutAPI
getHttpRefLogGetAPI = lift getHttpRefLogGetAPI
instance MonadIO m => HasRefCredentials (RunWithConfig (GitRemoteApp m)) where
getCredentials = lift . getCredentials

View File

@ -18,6 +18,8 @@ data RemoteEnv =
RemoteEnv
{ _reHttpCat :: API
, _reHttpSize :: API
, _reHttpPut :: API
, _reHttpRefGet :: API
, _reCreds :: TVar (HashMap RepoRef (PeerCredentials Schema))
}
@ -38,6 +40,8 @@ runRemoteM env m = runReaderT (fromRemoteApp m) env
instance MonadIO m => HasCatAPI (GitRemoteApp m) where
getHttpCatAPI = view (asks reHttpCat)
getHttpSizeAPI = view (asks reHttpSize)
getHttpPutAPI = view (asks reHttpPut)
getHttpRefLogGetAPI = view (asks reHttpRefGet)
instance MonadIO m => HasRefCredentials (GitRemoteApp m) where

View File

@ -103,6 +103,7 @@ library
-- other-extensions:
build-depends: base
, terminal-progress-bar
, http-types
hs-source-dirs: lib
default-language: Haskell2010
@ -123,6 +124,7 @@ executable git-hbs2
build-depends:
base, hbs2-git
, optparse-applicative
, http-types
hs-source-dirs: git-hbs2
default-language: Haskell2010
@ -150,6 +152,7 @@ executable git-remote-hbs2
, unix
, unliftio
, terminal-progress-bar
, http-types
hs-source-dirs: git-hbs2
default-language: Haskell2010

View File

@ -18,6 +18,7 @@ import HBS2.Git.Types
import HBS2.Net.Proto.Definition()
import HBS2.Net.Auth.Credentials hiding (getCredentials)
import HBS2.Net.Proto.RefLog
import HBS2.Defaults (defBlockSize)
import HBS2Git.Types
import HBS2Git.Config as Config
@ -40,11 +41,18 @@ import System.FilePath
import System.Process.Typed
import Text.InterpolatedString.Perl6 (qc)
import Network.HTTP.Simple
import Network.HTTP.Types.Status
import Control.Concurrent.STM
import Codec.Serialise
import Data.HashMap.Strict qualified as HashMap
import Data.List qualified as List
import Data.Text qualified as Text
import System.IO ( stderr )
import Data.IORef
import System.IO.Unsafe (unsafePerformIO)
import Data.Cache (Cache)
import Data.Cache qualified as Cache
import Control.Concurrent.Async
instance MonadIO m => HasCfgKey ConfBranch (Set String) m where
key = "branch"
@ -89,6 +97,8 @@ data WithLog = NoLog | WithLog
instance MonadIO m => HasCatAPI (App m) where
getHttpCatAPI = asks (view appPeerHttpCat)
getHttpSizeAPI = asks (view appPeerHttpSize)
getHttpPutAPI = asks (view appPeerHttpPut)
getHttpRefLogGetAPI = asks (view appPeerHttpRefLogGet)
instance MonadIO m => HasRefCredentials (App m) where
setCredentials ref cred = do
@ -103,33 +113,57 @@ instance MonadIO m => HasRefCredentials (App m) where
withApp :: MonadIO m => AppEnv -> App m a -> m a
withApp env m = runReaderT (fromApp m) env
detectHBS2PeerCatAPI :: MonadIO m => m String
{-# NOINLINE hBS2PeerCatAPI #-}
hBS2PeerCatAPI :: IORef (Maybe API)
hBS2PeerCatAPI = unsafePerformIO (newIORef Nothing)
detectHBS2PeerCatAPI :: MonadIO m => m API
detectHBS2PeerCatAPI = do
-- FIXME: hardcoded-hbs2-peer
(_, o, _) <- readProcess (shell [qc|hbs2-peer poke|])
trace $ pretty (LBS.unpack o)
v <- liftIO $ readIORef hBS2PeerCatAPI
let dieMsg = "hbs2-peer is down or it's http is inactive"
case v of
Just x -> pure x
Nothing -> do
(_, o, _) <- readProcess (shell [qc|hbs2-peer poke|])
let answ = parseTop (LBS.unpack o) & fromRight mempty
let dieMsg = "hbs2-peer is down or it's http is inactive"
let po = headMay [ n | ListVal (Key "http-port:" [LitIntVal n]) <- answ ]
-- shutUp
let answ = parseTop (LBS.unpack o) & fromRight mempty
pnum <- pure po `orDie` dieMsg
let po = headMay [ n | ListVal (Key "http-port:" [LitIntVal n]) <- answ ]
-- shutUp
debug $ pretty "using http port" <+> pretty po
pnum <- pure po `orDie` dieMsg
pure [qc|http://localhost:{pnum}/cat|]
debug $ pretty "using http port" <+> pretty po
let api = [qc|http://localhost:{pnum}/cat|]
liftIO $ writeIORef hBS2PeerCatAPI (Just api)
pure api
detectHBS2PeerSizeAPI :: MonadIO m => m String
detectHBS2PeerSizeAPI :: MonadIO m => m API
detectHBS2PeerSizeAPI = do
api <- detectHBS2PeerCatAPI
let new = Text.replace "/cat" "/size" $ Text.pack api
pure $ Text.unpack new
detectHBS2PeerPutAPI :: MonadIO m => m API
detectHBS2PeerPutAPI = do
api <- detectHBS2PeerCatAPI
let new = Text.replace "/cat" "/" $ Text.pack api
pure $ Text.unpack new
detectHBS2PeerRefLogGetAPI :: MonadIO m => m API
detectHBS2PeerRefLogGetAPI = do
api <- detectHBS2PeerCatAPI
let new = Text.replace "/cat" "/reflog" $ Text.pack api
pure $ Text.unpack new
getAppStateDir :: forall m . MonadIO m => m FilePath
getAppStateDir = liftIO $ getXdgDirectory XdgData Config.appName
@ -159,10 +193,12 @@ runApp l m = do
reQ <- detectHBS2PeerCatAPI
szQ <- detectHBS2PeerSizeAPI
puQ <- detectHBS2PeerPutAPI
rlQ <- detectHBS2PeerRefLogGetAPI
mtCred <- liftIO $ newTVarIO mempty
let env = AppEnv pwd (pwd </> ".git") syn xdgstate reQ szQ mtCred
let env = AppEnv pwd (pwd </> ".git") syn xdgstate reQ szQ puQ rlQ mtCred
runReaderT (fromApp m) env
@ -174,13 +210,50 @@ runApp l m = do
setLoggingOff @TRACE
setLoggingOff @INFO
writeBlock :: forall m . (HasCatAPI m, MonadIO m) => ByteString -> m (Maybe (Hash HbSync))
writeBlock bs = do
req <- getHttpPutAPI
writeBlockIO req bs
writeBlockIO :: forall m . MonadIO m => API -> ByteString -> m (Maybe (Hash HbSync))
writeBlockIO api bs = do
req1 <- liftIO $ parseRequest api
let request = setRequestMethod "PUT"
$ setRequestHeader "Content-Type" ["application/octet-stream"]
$ setRequestBodyLBS bs req1
resp <- httpLBS request
case statusCode (getResponseStatus resp) of
200 -> pure $ getResponseBody resp & LBS.unpack & fromStringMay
_ -> pure Nothing
readBlock :: forall m . (HasCatAPI m, MonadIO m) => HashRef -> m (Maybe ByteString)
readBlock h = do
-- trace $ "readBlock" <+> pretty h
req1 <- getHttpCatAPI -- asks (view appPeerHttpCat)
req1 <- getHttpCatAPI
let reqs = req1 <> "/" <> show (pretty h)
req <- liftIO $ parseRequest reqs
httpLBS req <&> getResponseBody <&> Just
resp <- httpLBS req
case statusCode (getResponseStatus resp) of
200 -> pure $ Just (getResponseBody resp)
_ -> pure Nothing
readRefHttp :: forall m . (HasCatAPI m, MonadIO m) => RepoRef -> m (Maybe HashRef)
readRefHttp re = do
req0 <- getHttpRefLogGetAPI
let req = req0 <> "/" <> show (pretty re)
request <- liftIO $ parseRequest req
resp <- httpLBS request
case statusCode (getResponseStatus resp) of
200 -> pure $ getResponseBody resp & LBS.unpack & fromStringMay
_ -> pure Nothing
getBlockSize :: forall m . (HasCatAPI m, MonadIO m) => HashRef -> m (Maybe Integer)
getBlockSize h = do
@ -189,8 +262,22 @@ getBlockSize h = do
req <- liftIO $ parseRequest reqs
httpJSONEither req <&> getResponseBody <&> either (const Nothing) Just
readRef :: MonadIO m => RepoRef -> m (Maybe HashRef)
readRef r = do
readRef :: (HasCatAPI m, MonadIO m) => RepoRef -> m (Maybe HashRef)
readRef = readRefHttp
readHashesFromBlock :: (MonadIO m, HasCatAPI m) => HashRef -> m [HashRef]
readHashesFromBlock (HashRef h) = do
treeQ <- liftIO newTQueueIO
walkMerkle h (readBlock . HashRef) $ \hr -> do
case hr of
Left{} -> pure ()
Right (hrr :: [HashRef]) -> liftIO $ atomically $ writeTQueue treeQ hrr
re <- liftIO $ atomically $ flushTQueue treeQ
pure $ mconcat re
readRefCLI :: MonadIO m => RepoRef -> m (Maybe HashRef)
readRefCLI r = do
let k = pretty (AsBase58 r)
trace [qc|hbs2-peer reflog get {k}|]
let cmd = setStdin closed $ setStderr closed
@ -248,8 +335,55 @@ postRefUpdate ref seqno hash = do
trace $ "hbs2-peer exited with code" <+> viaShow code
storeObject :: (MonadIO m, HasConf m) => ByteString -> ByteString -> m (Maybe HashRef)
storeObject = storeObjectHBS2Store
storeObject :: (MonadIO m, HasCatAPI m, HasConf m)
=> ByteString -> ByteString -> m (Maybe HashRef)
-- storeObject = storeObjectHBS2Store
storeObject = storeObjectHttpPut
storeObjectHttpPut :: (MonadIO m, HasCatAPI m, HasConf m)
=> ByteString
-> ByteString
-> m (Maybe HashRef)
storeObjectHttpPut meta bs = do
-- TODO: разбить-на-блоки
-- TODO: сохранить-блоки-получить-хэши
-- TODO: записать-merkle-c-метадатой-и-хэшами
let chu = chunks (fromIntegral defBlockSize) bs
trace $ length chu
rt <- liftIO $ Cache.newCache Nothing
-- FIXME: run-concurrently
hashes <- forM chu $ \s -> do
h <- writeBlock s `orDie` "cant write block"
pure (HashRef h)
let pt = toPTree (MaxSize 1024) (MaxNum 1024) hashes -- FIXME: settings
trace $ viaShow pt
root <- makeMerkle 0 pt $ \(h,t,bss) -> do
liftIO $ Cache.insert rt h (t,bss)
-- void $ writeBlock bss
pieces' <- liftIO $ Cache.toList rt
let pieces = [ bss | (_, (_,bss), _) <- pieces' ]
api <- getHttpPutAPI
liftIO $ mapConcurrently (writeBlockIO api) pieces
mtree <- liftIO $ fst <$> Cache.lookup rt root `orDie` "cant find root block"
let txt = LBS.unpack meta & Text.pack
let ann = serialise (MTreeAnn (ShortMetadata txt) NullEncryption mtree)
writeBlock ann <&> fmap HashRef
-- FIXME: ASAP-store-calls-hbs2
-- Это может приводить к тому, что если пир и hbs2-peer

View File

@ -2,6 +2,7 @@
module HBS2Git.Export where
import HBS2.Prelude.Plated
import HBS2.Clock
import HBS2.Data.Types.Refs
import HBS2.OrDie
import HBS2.System.Logger.Simple
@ -26,17 +27,15 @@ import Data.List (sortBy)
import Control.Applicative
import Control.Monad.Reader
import Data.ByteString.Lazy.Char8 qualified as LBS
import Data.ByteString qualified as BS
import Data.Cache as Cache
import Data.Foldable (for_)
import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HashMap
import Data.Maybe
import Data.Set qualified as Set
import Data.Set (Set)
import Lens.Micro.Platform
import Control.Concurrent.STM
import Control.Concurrent.STM.TVar
import Control.Concurrent.Async
data HashCache =
HashCache
@ -85,6 +84,7 @@ export h repoHead = do
liftIO $ gitGetTransitiveClosure cache mempty h <&> Set.toList
-- notice "store dependencies to state"
-- hashes <- readHashesFromBlock undefined
sz <- liftIO $ Cache.size (hCache cache)
mon1 <- newProgressMonitor "storing dependencies" sz
@ -110,14 +110,14 @@ export h repoHead = do
-- let gha = gitHashObject (GitObject Blob repoHead)
hh <- lift $ storeObject metaHead repoHeadStr `orDie` "cant save repo head"
mon3 <- newProgressMonitor "store all objects from repo" (length deps)
mon3 <- newProgressMonitor "export objects from repo" (length deps)
for_ deps $ \d -> do
here <- stateGetHash d <&> isJust
-- FIXME: asap-check-if-objects-is-in-hbs2
unless here do
lbs <- gitReadObject Nothing d
-- TODO: why-not-default-blob
-- anything is blob
tp <- gitGetObjectType d <&> fromMaybe Blob --
@ -127,7 +127,6 @@ export h repoHead = do
<> "type:" <+> pretty tp <+> pretty d
<> line
hr' <- lift $ storeObject metaO lbs
maybe1 hr' (pure ()) $ \hr -> do
@ -160,8 +159,24 @@ export h repoHead = do
seqno <- stateGetSequence <&> succ
-- FIXME: same-transaction-different-seqno
postRefUpdate h seqno (HashRef root)
let noRef = do
pause @'Seconds 20
shutUp
die $ show $ pretty "No reference appeared for" <+> pretty h
wmon <- newProgressMonitor "waiting for ref" 20
void $ liftIO $ race noRef $ do
runApp NoLog do
fix \next -> do
v <- readRefHttp h
updateProgress wmon 1
case v of
Nothing -> pause @'Seconds 1 >> next
Just{} -> pure ()
pure (HashRef root, hh)

View File

@ -36,6 +36,7 @@ import Data.Kind
type Schema = UDP
-- FIXME: introduce-API-type
type API = String
type DBEnv = Connection
@ -56,8 +57,10 @@ data AppEnv =
, _appGitDir :: FilePath
, _appConf :: [Syntax C]
, _appStateDir :: FilePath
, _appPeerHttpCat :: String
, _appPeerHttpCat :: API
, _appPeerHttpSize :: API
, _appPeerHttpPut :: API
, _appPeerHttpRefLogGet :: API
, _appRefCred :: TVar (HashMap RepoRef (PeerCredentials Schema))
}
@ -114,6 +117,8 @@ instance {-# OVERLAPPABLE #-} MonadIO m => HasProgress m where
class MonadIO m => HasCatAPI m where
getHttpCatAPI :: m API
getHttpSizeAPI :: m API
getHttpPutAPI :: m API
getHttpRefLogGetAPI :: m API
class MonadIO m => HasRefCredentials m where
getCredentials :: RepoRef -> m (PeerCredentials Schema)
@ -122,6 +127,8 @@ class MonadIO m => HasRefCredentials m where
instance (HasCatAPI m, MonadIO m) => HasCatAPI (MaybeT m) where
getHttpCatAPI = lift getHttpCatAPI
getHttpSizeAPI = lift getHttpSizeAPI
getHttpPutAPI = lift getHttpPutAPI
getHttpRefLogGetAPI = lift getHttpRefLogGetAPI
class Monad m => HasCfgKey a b m where
-- type family CfgValue a :: Type

View File

@ -3,6 +3,8 @@ module HttpWorker where
import HBS2.Prelude
import HBS2.Actors.Peer
import HBS2.Storage
import HBS2.Hash
import HBS2.Data.Types.Refs
import HBS2.System.Logger.Simple
@ -36,7 +38,7 @@ httpWorker conf e = do
maybe1 port' none $ \port -> liftIO do
scotty port $ do
middleware logStdoutDev
middleware logStdout
get "/size/:hash" do
what <- param @String "hash" <&> fromString
@ -56,5 +58,30 @@ httpWorker conf e = do
addHeader "content-length" [qc|{LBS.length lbs}|]
raw lbs
get "/reflog/:ref" do
re <- param @String "ref" <&> fromStringMay
case re of
Nothing -> status status404
Just ref -> do
va <- liftIO $ getRef sto (RefLogKey ref)
maybe1 va (status status404) $ \val -> do
text [qc|{pretty val}|]
put "/" do
-- FIXME: optional-header-based-authorization
-- signed nonce + peer key?
-- TODO: ddos-protection
-- FIXME: fix-max-size-hardcode
bs <- LBS.take 4194304 <$> body
-- let ha = hashObject @HbSync bs
-- here <- liftIO $ hasBlock sto ha <&> isJust
mbHash <- liftIO $ putBlock sto bs
case mbHash of
Nothing -> status status500
Just h -> text [qc|{pretty h}|]
pure ()

View File

@ -869,6 +869,7 @@ rpcClientMain opt action = do
withRPC :: RPCOpt -> RPC UDP -> IO ()
withRPC o cmd = rpcClientMain o $ do
hSetBuffering stdout LineBuffering
conf <- peerConfigRead (view rpcOptConf o)
@ -941,7 +942,8 @@ withRPC o cmd = rpcClientMain o $ do
void $ liftIO $ race onTimeout do
k <- liftIO $ atomically $ readTQueue pokeFQ
Log.info $ pretty k
print (pretty k)
hFlush stdout
exitSuccess
RPCPeers{} -> liftIO do
@ -965,6 +967,7 @@ withRPC o cmd = rpcClientMain o $ do
Nothing -> exitFailure
Just re -> do
print $ pretty re
hFlush stdout
exitSuccess
_ -> pure ()