mirror of https://github.com/voidlizard/hbs2
waitRepo
This commit is contained in:
parent
4fb13f6d17
commit
bdc9fb80b3
|
@ -1,4 +1,9 @@
|
|||
module HBS2.Data.Detect where
|
||||
module HBS2.Data.Detect
|
||||
( module HBS2.Data.Detect
|
||||
, module HBS2.Merkle.Walk
|
||||
, module HBS2.Merkle
|
||||
)
|
||||
where
|
||||
|
||||
import HBS2.Prelude.Plated
|
||||
import HBS2.Hash
|
||||
|
|
|
@ -101,14 +101,13 @@ initRepo syn = do
|
|||
|
||||
repo <- getRepoManifest
|
||||
|
||||
reflog <- [ x | ListVal [SymbolVal "reflog", SignPubKeyLike x] <- repo ]
|
||||
& headMay & orThrowUser "malformed repo manifest"
|
||||
reflog <- getRefLog repo & orThrow GitRepoManifestMalformed
|
||||
|
||||
callRpcWaitMay @RpcPollAdd (TimeoutSec 1) peerAPI (reflog, "reflog", 17)
|
||||
>>= orThrowUser "rpc timeout"
|
||||
|
||||
-- FIXME: remove-this
|
||||
liftIO $ print $ pretty $ mkForm "manifest" repo
|
||||
liftIO $ print $ pretty $ mkForm "manifest" (coerce repo)
|
||||
|
||||
CreateRepoDefBlock pk -> do
|
||||
|
||||
|
|
|
@ -460,6 +460,11 @@ compression ; prints compression level
|
|||
-- for_ result $ \(n, (r, h, v)) -> do
|
||||
-- liftIO $ print $ "R" <+> pretty h <+> pretty r <+> pretty v <+> pretty n
|
||||
|
||||
entry $ bindMatch "reflog:wait" $ nil_ $ \syn -> lift $ connectedDo do
|
||||
let (_,argz) = splitOpts [] syn
|
||||
let t = headMay [ realToFrac x | LitIntVal x <- argz ]
|
||||
waitRepo t
|
||||
|
||||
entry $ bindMatch "reflog:imported" $ nil_ $ \syn -> lift $ connectedDo do
|
||||
p <- importedCheckpoint
|
||||
liftIO $ print $ pretty p
|
||||
|
@ -470,16 +475,15 @@ compression ; prints compression level
|
|||
brief "shows repo manifest" $
|
||||
entry $ bindMatch "repo:manifest" $ nil_ $ const $ lift $ connectedDo do
|
||||
manifest <- Repo.getRepoManifest
|
||||
liftIO $ print $ pretty $ mkForm "manifest" manifest
|
||||
liftIO $ print $ pretty $ mkForm "manifest" (coerce manifest)
|
||||
|
||||
brief "shows repo reflog" $
|
||||
entry $ bindMatch "repo:reflog" $ nil_ $ const $ lift $ connectedDo do
|
||||
repo <- Repo.getRepoManifest
|
||||
|
||||
reflog <- [ x | x@(ListVal [SymbolVal "reflog", SignPubKeyLike _]) <- repo ]
|
||||
& headMay & orThrow GitRepoManifestMalformed
|
||||
reflog <- getRefLog repo & orThrow GitRepoManifestMalformed
|
||||
|
||||
liftIO $ print $ pretty reflog
|
||||
liftIO $ print $ pretty (AsBase58 reflog)
|
||||
|
||||
entry $ bindMatch "repo:credentials" $ nil_ $ const $ lift $ connectedDo do
|
||||
(p,_) <- getRepoRefLogCredentials
|
||||
|
|
|
@ -8,20 +8,269 @@ module HBS2.Git3.State
|
|||
|
||||
import HBS2.Git3.Prelude
|
||||
|
||||
import HBS2.Merkle
|
||||
import HBS2.Git3.State.Internal.Types as Exported
|
||||
import HBS2.Git3.State.Internal.LWWBlock as Exported
|
||||
import HBS2.Git3.State.Internal.RefLog as Exported
|
||||
import HBS2.Git3.State.Internal.Segment as Exported
|
||||
import HBS2.Git3.State.Internal.Index as Exported
|
||||
|
||||
import HBS2.Storage.Operations.Missed
|
||||
import HBS2.Net.Auth.Credentials
|
||||
import HBS2.KeyMan.Keys.Direct
|
||||
import HBS2.Data.Detect
|
||||
import HBS2.CLI.Run.MetaData (getTreeContents)
|
||||
|
||||
import Data.Config.Suckless
|
||||
|
||||
import HBS2.Peer.RPC.API.Storage
|
||||
import HBS2.Peer.RPC.Client.StorageClient
|
||||
import HBS2.System.Dir
|
||||
import HBS2.Peer.CLI.Detect
|
||||
|
||||
import Data.ByteString.Lazy qualified as LBS
|
||||
import Data.HashSet (HashSet)
|
||||
import Data.HashSet qualified as HS
|
||||
import Data.Kind
|
||||
import Data.Maybe
|
||||
import Data.List qualified as L
|
||||
import Data.Text.Encoding.Error qualified as TE
|
||||
import Data.Text.Encoding qualified as TE
|
||||
import Data.Word
|
||||
import Lens.Micro.Platform
|
||||
|
||||
import Codec.Compression.Zstd (maxCLevel)
|
||||
|
||||
|
||||
newtype RepoManifest = RepoManifest [Syntax C]
|
||||
|
||||
|
||||
getRefLog :: RepoManifest -> Maybe GitRemoteKey
|
||||
getRefLog mf = lastMay [ x
|
||||
| ListVal [SymbolVal "reflog", SignPubKeyLike x] <- coerce mf
|
||||
]
|
||||
|
||||
updateRepoKey :: forall m . HBS2GitPerks m => GitRepoKey -> Git3 m ()
|
||||
updateRepoKey key = do
|
||||
|
||||
setGitRepoKey key
|
||||
|
||||
reflog <- getRepoManifest <&> getRefLog
|
||||
|
||||
ask >>= \case
|
||||
Git3Connected{..} -> atomically $ writeTVar gitRefLog reflog
|
||||
_ -> none
|
||||
|
||||
getRepoRefMaybe :: forall m . HBS2GitPerks m => Git3 m (Maybe (LWWRef 'HBS2Basic))
|
||||
getRepoRefMaybe = do
|
||||
lwwAPI <- getClientAPI @LWWRefAPI @UNIX
|
||||
|
||||
pk <- getGitRepoKey >>= orThrow GitRepoRefNotSet
|
||||
|
||||
callRpcWaitMay @RpcLWWRefGet (TimeoutSec 1) lwwAPI (LWWRefKey pk)
|
||||
>>= orThrow RpcTimeout
|
||||
|
||||
getRepoRefLogCredentials :: forall m . HBS2GitPerks m
|
||||
=> Git3 m (PubKey 'Sign 'HBS2Basic, PrivKey 'Sign HBS2Basic)
|
||||
|
||||
getRepoRefLogCredentials = do
|
||||
-- FIXME: memoize-this
|
||||
mf <- getRepoManifest
|
||||
rk <- getGitRepoKey >>= orThrow GitRepoRefNotSet
|
||||
|
||||
reflog <- getGitRemoteKey >>= orThrow Git3ReflogNotSet
|
||||
|
||||
creds <- runKeymanClientRO (loadCredentials rk)
|
||||
>>= orThrowUser ("not found credentials for" <+> pretty (AsBase58 rk))
|
||||
|
||||
seed <- [ x | ListVal [SymbolVal "seed", LitIntVal x ] <- coerce mf ]
|
||||
& lastMay & orThrow GitRepoManifestMalformed
|
||||
<&> fromIntegral @_ @Word64
|
||||
|
||||
let sk = view peerSignSk creds
|
||||
|
||||
(p,s) <- derivedKey @'HBS2Basic @'Sign seed sk
|
||||
|
||||
unless ( p == reflog ) do
|
||||
throwIO RefLogCredentialsNotMatched
|
||||
|
||||
pure (p,s)
|
||||
|
||||
getRepoManifest :: forall m . HBS2GitPerks m => Git3 m RepoManifest
|
||||
getRepoManifest = do
|
||||
|
||||
sto <- getStorage
|
||||
|
||||
LWWRef{..} <- getRepoRefMaybe >>= orThrow GitRepoRefEmpty
|
||||
|
||||
mfref <- readLogThrow (getBlock sto) lwwValue
|
||||
<&> headMay
|
||||
>>= orThrow GitRepoManifestMalformed
|
||||
|
||||
runExceptT (getTreeContents sto mfref)
|
||||
>>= orThrowPassIO
|
||||
<&> TE.decodeUtf8With TE.lenientDecode . LBS.toStrict
|
||||
<&> parseTop
|
||||
>>= orThrow GitRepoManifestMalformed
|
||||
<&> RepoManifest
|
||||
|
||||
nullGit3Env :: MonadIO m => m Git3Env
|
||||
nullGit3Env = Git3Disconnected
|
||||
<$> newTVarIO defSegmentSize
|
||||
<*> newTVarIO defCompressionLevel
|
||||
<*> newTVarIO defIndexBlockSize
|
||||
<*> newTVarIO Nothing
|
||||
|
||||
connectedDo :: (MonadIO m) => Git3 m a -> Git3 m a
|
||||
connectedDo what = do
|
||||
env <- ask
|
||||
debug $ red "connectedDo"
|
||||
case env of
|
||||
Git3Disconnected{} -> do
|
||||
throwIO Git3PeerNotConnected
|
||||
|
||||
_ -> what
|
||||
|
||||
withGit3Env :: Git3Perks m => Git3Env -> Git3 m a -> m a
|
||||
withGit3Env env a = runReaderT (fromGit3 a) env
|
||||
|
||||
runGit3 :: Git3Perks m => Git3Env -> Git3 m b -> m b
|
||||
runGit3 env action = withGit3Env env action
|
||||
|
||||
|
||||
recover :: Git3 IO a -> Git3 IO a
|
||||
recover m = fix \again -> do
|
||||
catch m $ \case
|
||||
Git3PeerNotConnected -> do
|
||||
|
||||
soname <- detectRPC
|
||||
`orDie` "can't locate hbs2-peer rpc"
|
||||
|
||||
flip runContT pure do
|
||||
|
||||
client <- lift $ race (pause @'Seconds 1) (newMessagingUnix False 1.0 soname)
|
||||
>>= orThrowUser ("can't connect to" <+> pretty soname)
|
||||
|
||||
void $ ContT $ withAsync $ runMessagingUnix client
|
||||
|
||||
peer <- makeServiceCaller @PeerAPI (fromString soname)
|
||||
refLogAPI <- makeServiceCaller @RefLogAPI (fromString soname)
|
||||
storageAPI <- makeServiceCaller @StorageAPI (fromString soname)
|
||||
lwwAPI <- makeServiceCaller @LWWRefAPI (fromString soname)
|
||||
|
||||
-- let sto = AnyStorage (StorageClient storageAPI)
|
||||
|
||||
let endpoints = [ Endpoint @UNIX peer
|
||||
, Endpoint @UNIX refLogAPI
|
||||
, Endpoint @UNIX lwwAPI
|
||||
, Endpoint @UNIX storageAPI
|
||||
]
|
||||
|
||||
void $ ContT $ withAsync $ liftIO $ runReaderT (runServiceClientMulti endpoints) client
|
||||
|
||||
let sto = AnyStorage (StorageClient storageAPI)
|
||||
|
||||
rk <- lift $ getGitRepoKey >>= orThrow GitRepoRefNotSet
|
||||
|
||||
notice $ yellow $ "REPOKEY" <+> pretty (AsBase58 rk)
|
||||
|
||||
connected <- Git3Connected soname sto peer refLogAPI lwwAPI
|
||||
<$> newTVarIO (Just rk)
|
||||
<*> newTVarIO Nothing
|
||||
<*> newTVarIO defSegmentSize
|
||||
<*> newTVarIO defCompressionLevel
|
||||
<*> newTVarIO defIndexBlockSize
|
||||
|
||||
|
||||
liftIO $ withGit3Env connected do
|
||||
|
||||
updateRepoKey rk
|
||||
|
||||
ref <- getGitRemoteKey >>= orThrow GitRepoManifestMalformed
|
||||
|
||||
state <- getStatePath (AsBase58 ref)
|
||||
mkdir state
|
||||
|
||||
again
|
||||
|
||||
e -> throwIO e
|
||||
|
||||
|
||||
data ReflogWaitTimeout =
|
||||
ReflogWaitTimeout
|
||||
deriving stock (Show,Typeable)
|
||||
|
||||
instance Exception ReflogWaitTimeout
|
||||
|
||||
|
||||
data CWRepo =
|
||||
CWaitLWW
|
||||
| CCheckManifest (LWWRef HBS2Basic)
|
||||
| CAborted
|
||||
|
||||
waitRepo :: forall m . HBS2GitPerks m => Maybe (Timeout 'Seconds) -> Git3 m ()
|
||||
waitRepo timeout = do
|
||||
repoKey <- getGitRepoKey >>= orThrow GitRepoRefNotSet
|
||||
|
||||
lwwAPI <- getClientAPI @LWWRefAPI @UNIX
|
||||
peerAPI <- getClientAPI @PeerAPI @UNIX
|
||||
reflogAPI <- getClientAPI @RefLogAPI @UNIX
|
||||
sto <- getStorage
|
||||
|
||||
env <- ask
|
||||
|
||||
flip runContT pure do
|
||||
|
||||
let wait w what x = pause @'Seconds w >> what x
|
||||
|
||||
callCC \forPeer -> do
|
||||
|
||||
notice "wait for peer"
|
||||
|
||||
lift (callRpcWaitMay @RpcPollAdd (TimeoutSec 1) peerAPI (repoKey, "lwwref", 31))
|
||||
>>= maybe (wait 1 forPeer ()) (const none)
|
||||
|
||||
pFetch <- ContT $ withAsync $ forever do
|
||||
void (callRpcWaitMay @RpcLWWRefFetch (TimeoutSec 1) lwwAPI (LWWRefKey repoKey))
|
||||
pause @'Seconds 10
|
||||
|
||||
lww <- flip fix () \next _ -> do
|
||||
notice $ "wait for" <+> pretty (AsBase58 repoKey)
|
||||
lift (callRpcWaitMay @RpcLWWRefGet (TimeoutSec 1) lwwAPI (LWWRefKey repoKey))
|
||||
>>= \case
|
||||
Just (Just x) -> pure x
|
||||
_ -> wait 2 next ()
|
||||
|
||||
mf <- flip fix () $ \next _ -> do
|
||||
notice $ "wait for manifest"
|
||||
lift (try @_ @WalkMerkleError getRepoManifest) >>= \case
|
||||
Left{} -> wait 1 next ()
|
||||
Right x -> pure x
|
||||
|
||||
reflog <- getRefLog mf & orThrow GitRepoManifestMalformed
|
||||
|
||||
lift $ setGitRepoKey reflog
|
||||
|
||||
rv <- flip fix () \next _ -> do
|
||||
notice $ "wait for data" <+> pretty (AsBase58 reflog)
|
||||
lift (callRpcWaitMay @RpcRefLogGet (TimeoutSec 1) reflogAPI reflog)
|
||||
>>= \case
|
||||
Just (Just x) -> pure x
|
||||
_ -> wait 2 next ()
|
||||
|
||||
okay <- newEmptyTMVarIO
|
||||
|
||||
flip fix () $ \next _ -> do
|
||||
notice $ "wait for data (2)" <+> pretty (AsBase58 reflog)
|
||||
missed <- findMissedBlocks sto rv
|
||||
unless (L.null missed) $ wait 2 next ()
|
||||
atomically $ writeTMVar okay True
|
||||
|
||||
pWait <- ContT $ withAsync $ race ( pause (fromMaybe 300 timeout) ) do
|
||||
void $ atomically $ takeTMVar okay
|
||||
|
||||
waitAnyCatchCancel [pWait, pFetch]
|
||||
|
||||
liftIO $ print $ "reflog" <+> pretty (AsBase58 reflog) <+> pretty rv
|
||||
|
||||
|
||||
|
|
|
@ -5,52 +5,13 @@ module HBS2.Git3.State.Internal.Types
|
|||
, pattern SignPubKeyLike
|
||||
) where
|
||||
|
||||
|
||||
import HBS2.Git3.Prelude
|
||||
import HBS2.Git3.Config.Local
|
||||
import HBS2.Net.Auth.Credentials
|
||||
import HBS2.KeyMan.Keys.Direct
|
||||
import HBS2.System.Dir
|
||||
import HBS2.Data.Detect (readLogThrow)
|
||||
import HBS2.CLI.Run.MetaData (getTreeContents)
|
||||
|
||||
import Data.Config.Suckless
|
||||
|
||||
import HBS2.Storage.Operations.Missed
|
||||
import HBS2.Defaults as Exported
|
||||
import HBS2.OrDie as Exported
|
||||
import HBS2.Data.Types.Refs as Exported
|
||||
import HBS2.Base58 as Exported
|
||||
import HBS2.Merkle as Exported
|
||||
import HBS2.Misc.PrettyStuff as Exported
|
||||
import HBS2.Net.Auth.Credentials
|
||||
import HBS2.Peer.Proto.LWWRef as Exported
|
||||
import HBS2.Peer.Proto.RefLog as Exported
|
||||
import HBS2.Peer.RPC.API.RefLog as Exported
|
||||
import HBS2.Peer.RPC.API.Peer as Exported
|
||||
import HBS2.Peer.RPC.API.LWWRef as Exported
|
||||
import HBS2.Peer.RPC.API.Storage as Exported
|
||||
import HBS2.Peer.RPC.Client hiding (encode,decode)
|
||||
import HBS2.Peer.RPC.Client.Unix hiding (encode,decode)
|
||||
import HBS2.Peer.RPC.Client.StorageClient
|
||||
import HBS2.Peer.CLI.Detect (detectRPC)
|
||||
import HBS2.Data.Types.SignedBox as Exported
|
||||
import HBS2.Storage as Exported
|
||||
import HBS2.Storage.Operations.Class as Exported
|
||||
import HBS2.System.Logger.Simple.ANSI as Exported
|
||||
|
||||
import Data.List qualified as L
|
||||
import Data.Text.Encoding qualified as TE
|
||||
import Data.Text.Encoding.Error qualified as TE
|
||||
import Data.ByteString.Lazy qualified as LBS
|
||||
import Data.Word
|
||||
|
||||
import Data.Kind
|
||||
import Data.HashSet (HashSet)
|
||||
import Data.HashSet qualified as HS
|
||||
import Lens.Micro.Platform
|
||||
|
||||
import System.FilePath
|
||||
|
||||
unit :: FilePath
|
||||
unit = "hbs2-git"
|
||||
|
@ -182,77 +143,6 @@ instance (MonadIO m) => HasGitRemoteKey (Git3 m) where
|
|||
e <- ask
|
||||
liftIO $ atomically $ writeTVar (gitRepoKey e) (Just k)
|
||||
|
||||
getStatePathM :: forall m . (HBS2GitPerks m, HasGitRemoteKey m) => m FilePath
|
||||
getStatePathM = do
|
||||
k <- getGitRemoteKey >>= orThrow RefLogNotSet
|
||||
getStatePath (AsBase58 k)
|
||||
|
||||
updateRepoKey :: forall m . HBS2GitPerks m => GitRepoKey -> Git3 m ()
|
||||
updateRepoKey key = do
|
||||
|
||||
setGitRepoKey key
|
||||
|
||||
mf <- getRepoManifest
|
||||
|
||||
let reflog = lastMay [ x
|
||||
| ListVal [SymbolVal "reflog", SignPubKeyLike x] <- mf
|
||||
]
|
||||
|
||||
ask >>= \case
|
||||
Git3Connected{..} -> atomically $ writeTVar gitRefLog reflog
|
||||
_ -> none
|
||||
|
||||
getRepoRefMaybe :: forall m . HBS2GitPerks m => Git3 m (Maybe (LWWRef 'HBS2Basic))
|
||||
getRepoRefMaybe = do
|
||||
lwwAPI <- getClientAPI @LWWRefAPI @UNIX
|
||||
|
||||
pk <- getGitRepoKey >>= orThrow GitRepoRefNotSet
|
||||
|
||||
callRpcWaitMay @RpcLWWRefGet (TimeoutSec 1) lwwAPI (LWWRefKey pk)
|
||||
>>= orThrow RpcTimeout
|
||||
|
||||
getRepoRefLogCredentials :: forall m . HBS2GitPerks m
|
||||
=> Git3 m (PubKey 'Sign 'HBS2Basic, PrivKey 'Sign HBS2Basic)
|
||||
|
||||
getRepoRefLogCredentials = do
|
||||
-- FIXME: memoize-this
|
||||
mf <- getRepoManifest
|
||||
rk <- getGitRepoKey >>= orThrow GitRepoRefNotSet
|
||||
|
||||
reflog <- getGitRemoteKey >>= orThrow Git3ReflogNotSet
|
||||
|
||||
creds <- runKeymanClientRO (loadCredentials rk)
|
||||
>>= orThrowUser ("not found credentials for" <+> pretty (AsBase58 rk))
|
||||
|
||||
seed <- [ x | ListVal [SymbolVal "seed", LitIntVal x ] <- mf ]
|
||||
& lastMay & orThrow GitRepoManifestMalformed
|
||||
<&> fromIntegral @_ @Word64
|
||||
|
||||
let sk = view peerSignSk creds
|
||||
|
||||
(p,s) <- derivedKey @'HBS2Basic @'Sign seed sk
|
||||
|
||||
unless ( p == reflog ) do
|
||||
throwIO RefLogCredentialsNotMatched
|
||||
|
||||
pure (p,s)
|
||||
|
||||
getRepoManifest :: forall m . HBS2GitPerks m => Git3 m [Syntax C]
|
||||
getRepoManifest = do
|
||||
|
||||
sto <- getStorage
|
||||
|
||||
LWWRef{..} <- getRepoRefMaybe >>= orThrow GitRepoRefEmpty
|
||||
|
||||
mfref <- readLogThrow (getBlock sto) lwwValue
|
||||
<&> headMay
|
||||
>>= orThrow GitRepoManifestMalformed
|
||||
|
||||
runExceptT (getTreeContents sto mfref)
|
||||
>>= orThrowPassIO
|
||||
<&> TE.decodeUtf8With TE.lenientDecode . LBS.toStrict
|
||||
<&> parseTop
|
||||
>>= orThrow GitRepoManifestMalformed
|
||||
|
||||
newtype Git3 (m :: Type -> Type) a = Git3M { fromGit3 :: ReaderT Git3Env m a }
|
||||
deriving newtype ( Applicative
|
||||
|
@ -288,186 +178,9 @@ instance (MonadUnliftIO m) => HasClientAPI LWWRefAPI UNIX (Git3 m) where
|
|||
Git3Disconnected{} -> throwIO Git3PeerNotConnected
|
||||
Git3Connected{..} -> pure lwwAPI
|
||||
|
||||
nullGit3Env :: MonadIO m => m Git3Env
|
||||
nullGit3Env = Git3Disconnected
|
||||
<$> newTVarIO defSegmentSize
|
||||
<*> newTVarIO defCompressionLevel
|
||||
<*> newTVarIO defIndexBlockSize
|
||||
<*> newTVarIO Nothing
|
||||
|
||||
connectedDo :: (MonadIO m) => Git3 m a -> Git3 m a
|
||||
connectedDo what = do
|
||||
env <- ask
|
||||
debug $ red "connectedDo"
|
||||
case env of
|
||||
Git3Disconnected{} -> do
|
||||
throwIO Git3PeerNotConnected
|
||||
|
||||
_ -> what
|
||||
|
||||
withGit3Env :: Git3Perks m => Git3Env -> Git3 m a -> m a
|
||||
withGit3Env env a = runReaderT (fromGit3 a) env
|
||||
|
||||
runGit3 :: Git3Perks m => Git3Env -> Git3 m b -> m b
|
||||
runGit3 env action = withGit3Env env action
|
||||
|
||||
|
||||
recover :: Git3 IO a -> Git3 IO a
|
||||
recover m = fix \again -> do
|
||||
catch m $ \case
|
||||
Git3PeerNotConnected -> do
|
||||
|
||||
soname <- detectRPC
|
||||
`orDie` "can't locate hbs2-peer rpc"
|
||||
|
||||
flip runContT pure do
|
||||
|
||||
client <- lift $ race (pause @'Seconds 1) (newMessagingUnix False 1.0 soname)
|
||||
>>= orThrowUser ("can't connect to" <+> pretty soname)
|
||||
|
||||
void $ ContT $ withAsync $ runMessagingUnix client
|
||||
|
||||
peer <- makeServiceCaller @PeerAPI (fromString soname)
|
||||
refLogAPI <- makeServiceCaller @RefLogAPI (fromString soname)
|
||||
storageAPI <- makeServiceCaller @StorageAPI (fromString soname)
|
||||
lwwAPI <- makeServiceCaller @LWWRefAPI (fromString soname)
|
||||
|
||||
-- let sto = AnyStorage (StorageClient storageAPI)
|
||||
|
||||
let endpoints = [ Endpoint @UNIX peer
|
||||
, Endpoint @UNIX refLogAPI
|
||||
, Endpoint @UNIX lwwAPI
|
||||
, Endpoint @UNIX storageAPI
|
||||
]
|
||||
|
||||
void $ ContT $ withAsync $ liftIO $ runReaderT (runServiceClientMulti endpoints) client
|
||||
|
||||
let sto = AnyStorage (StorageClient storageAPI)
|
||||
|
||||
rk <- lift $ getGitRepoKey >>= orThrow GitRepoRefNotSet
|
||||
|
||||
notice $ yellow $ "REPOKEY" <+> pretty (AsBase58 rk)
|
||||
|
||||
connected <- Git3Connected soname sto peer refLogAPI lwwAPI
|
||||
<$> newTVarIO (Just rk)
|
||||
<*> newTVarIO Nothing
|
||||
<*> newTVarIO defSegmentSize
|
||||
<*> newTVarIO defCompressionLevel
|
||||
<*> newTVarIO defIndexBlockSize
|
||||
|
||||
|
||||
liftIO $ withGit3Env connected do
|
||||
|
||||
waitRepo
|
||||
|
||||
updateRepoKey rk
|
||||
|
||||
ref <- getGitRemoteKey >>= orThrow GitRepoManifestMalformed
|
||||
|
||||
state <- getStatePath (AsBase58 ref)
|
||||
mkdir state
|
||||
|
||||
again
|
||||
|
||||
e -> throwIO e
|
||||
|
||||
data ReflogWaitTimeout =
|
||||
ReflogWaitTimeout
|
||||
deriving stock (Show,Typeable)
|
||||
|
||||
instance Exception ReflogWaitTimeout
|
||||
|
||||
|
||||
data CWRepo =
|
||||
CWaitLWW
|
||||
| CCheckManifest (LWWRef HBS2Basic)
|
||||
| CAborted
|
||||
|
||||
waitRepo :: forall m . HBS2GitPerks m => Git3 m ()
|
||||
waitRepo = do
|
||||
repoKey <- getGitRepoKey >>= orThrow GitRepoRefNotSet
|
||||
|
||||
lwwAPI <- getClientAPI @LWWRefAPI @UNIX
|
||||
peerAPI <- getClientAPI @PeerAPI @UNIX
|
||||
reflogAPI <- getClientAPI @RefLogAPI @UNIX
|
||||
sto <- getStorage
|
||||
|
||||
env <- ask
|
||||
|
||||
callRpcWaitMay @RpcPollAdd (TimeoutSec 1) peerAPI (repoKey, "lwwref", 31)
|
||||
>>= orThrow RpcTimeout
|
||||
|
||||
refLog1_ <- newEmptyTMVarIO
|
||||
refLog2_ <- newEmptyTMVarIO
|
||||
|
||||
void $ flip runContT pure do
|
||||
|
||||
void $ ContT $ withAsync $ forever do
|
||||
void $ callRpcWaitMay @RpcLWWRefFetch (TimeoutSec 1) lwwAPI (LWWRefKey repoKey)
|
||||
pause @'Seconds 10
|
||||
|
||||
p1 <- ContT $ withAsync $ do
|
||||
r <- atomically $ takeTMVar refLog1_
|
||||
forever do
|
||||
notice "FETCH REFLOG!"
|
||||
void $ callRpcWaitMay @RpcRefLogFetch (TimeoutSec 1) reflogAPI r
|
||||
pause @'Seconds 10
|
||||
|
||||
p2 <- ContT $ withAsync $ do
|
||||
r <- atomically $ takeTMVar refLog2_
|
||||
void $ fix \again -> do
|
||||
notice "AGAIN!"
|
||||
rv <- callRpcWaitMay @RpcRefLogGet (TimeoutSec 1) reflogAPI r
|
||||
>>= \case
|
||||
Nothing -> pause @'Seconds 3 >> again
|
||||
Just Nothing -> pause @'Seconds 1.24 >> again
|
||||
Just (Just x) -> pure x
|
||||
|
||||
missed <- findMissedBlocks sto rv
|
||||
|
||||
if L.null missed then do
|
||||
pure rv
|
||||
else do
|
||||
notice "missed blocks in reflog"
|
||||
pause @'Seconds 5
|
||||
again
|
||||
|
||||
liftIO $ withGit3Env env do
|
||||
|
||||
flip fix CWaitLWW $ \next -> \case
|
||||
CWaitLWW -> do
|
||||
notice $ "wait" <+> pretty (AsBase58 repoKey)
|
||||
getRepoRefMaybe >>= \case
|
||||
Nothing -> do
|
||||
pause @'Seconds 1
|
||||
next CWaitLWW
|
||||
|
||||
Just v -> next $ CCheckManifest v
|
||||
|
||||
CCheckManifest LWWRef{} -> do
|
||||
notice "check manifest"
|
||||
r <- try @_ @HBS2GitExcepion getRepoManifest
|
||||
case r of
|
||||
Left GitRepoRefEmpty -> next CWaitLWW
|
||||
|
||||
Left e -> next CAborted
|
||||
|
||||
Right mf -> do
|
||||
let reflog = lastMay [ x | ListVal [SymbolVal "reflog", SignPubKeyLike x] <- mf ]
|
||||
case reflog of
|
||||
Nothing -> next CAborted
|
||||
Just rf -> do
|
||||
|
||||
callRpcWaitMay @RpcPollAdd (TimeoutSec 1) peerAPI (rf, "reflog", 17)
|
||||
>>= orThrow RpcTimeout
|
||||
|
||||
atomically do
|
||||
writeTMVar refLog1_ rf
|
||||
writeTMVar refLog2_ rf
|
||||
|
||||
CAborted -> err "waitRepo aborted" >> none
|
||||
|
||||
|
||||
waitAnyCatchCancel [p1,p2]
|
||||
|
||||
getStatePathM :: forall m . (HBS2GitPerks m, HasGitRemoteKey m) => m FilePath
|
||||
getStatePathM = do
|
||||
k <- getGitRemoteKey >>= orThrow RefLogNotSet
|
||||
getStatePath (AsBase58 k)
|
||||
|
||||
|
|
Loading…
Reference in New Issue