lwwref + lwwref/tree http streaming

This commit is contained in:
Dmitry Zuikov 2024-03-14 07:39:22 +03:00
parent 10e99e7cdc
commit 27dbc14c62
19 changed files with 752 additions and 58 deletions

View File

@ -103,6 +103,7 @@ outputs = { self, nixpkgs, haskell-flake-utils, ... }@inputs:
hoogle
htags
text-icu
magic
pkgs.icu72
pkgs.openssl
weeder

View File

@ -225,7 +225,7 @@ instance ( Hashable (Peer e)
where
sql = [qc|
insert into statedb.poll (ref,type,interval)
insert into {poll_table} (ref,type,interval)
values (?,?,?)
on conflict do update set interval = excluded.interval
|]
@ -236,7 +236,7 @@ instance ( Hashable (Peer e)
liftIO $ execute conn sql (Only (show $ pretty (AsBase58 r)))
where
sql = [qc|
delete from statedb.poll
delete from {poll_table}
where ref = ?
|]
@ -245,21 +245,21 @@ instance ( Hashable (Peer e)
let conn = view brainsDb brains
case mtp of
Nothing -> postprocess <$>
query_ conn [qc|select ref, type, interval from statedb.poll|]
query_ conn [qc|select ref, type, interval from {poll_table}|]
Just tp -> postprocess <$>
query conn [qc|select ref, type, interval from statedb.poll where type = ?|] (Only tp)
query conn [qc|select ref, type, interval from {poll_table} where type = ?|] (Only tp)
where
postprocess = mapMaybe (\(r,t,i) -> (,t,i) <$> fromStringMay r )
isPolledRef brains ref = do
isPolledRef brains tp ref = do
liftIO do
let conn = view brainsDb brains
query @_ @(Only Int) conn [qc|
select 1 from statedb.poll
where ref = ?
select 1 from {poll_table}
where ref = ? and type = ?
limit 1
|] ( Only ( show $ pretty (AsBase58 ref) ) )
|] ( show $ pretty (AsBase58 ref), tp )
<&> isJust . listToMaybe
setSeen brains w ts = do
@ -718,6 +718,8 @@ insertPexInfo br peers = liftIO do
|] (Only (show $ pretty p))
{- HLINT ignore "Functor law" -}
selectPexInfo :: forall e . (e ~ L4Proto)
=> BasicBrains e
-> IO [PeerAddr e]
@ -730,6 +732,18 @@ selectPexInfo br = liftIO do
|] <&> fmap (fromStringMay . fromOnly)
<&> catMaybes
tableExists :: Connection -> Maybe String -> String -> IO Bool
tableExists conn prefix' tableName = do
let sql = [qc|
SELECT name FROM {prefix}.sqlite_master WHERE type='table' AND name=?
|]
r <- query conn sql (Only tableName) :: IO [Only String]
pure $ not $ null r
where
prefix = fromMaybe "main" prefix'
-- FIXME: eventually-close-db
newBasicBrains :: forall e m . (Hashable (Peer e), MonadIO m)
=> PeerConfig
@ -836,14 +850,26 @@ newBasicBrains cfg = liftIO do
)
|]
execute_ conn [qc|
create table if not exists statedb.poll
( ref text not null
, type text not null
, interval int not null
, primary key (ref)
)
|]
poll_1 <- tableExists conn (Just "statedb") "poll_1"
poll_0 <- tableExists conn (Just "statedb") "poll"
unless poll_1 do
debug $ red "BRAINS: CREATE poll_1"
execute_ conn [qc|
create table if not exists statedb.poll_1
( ref text not null
, type text not null
, interval int not null
, primary key (ref,type)
)
|]
when poll_0 do
debug $ red "BRAINS: FILL poll_1"
execute_ conn [qc|
insert into statedb.poll_1 (ref,type,interval)
select ref,type,interval from statedb.poll;
|]
execute_ conn [qc|
create table if not exists peer_asymmkey
@ -879,6 +905,10 @@ data PeerDownloadsDelOnStart
instance Monad m => HasCfgKey PeerDownloadsDelOnStart b m where
key = "downloads-del-on-start"
{- HLINT ignore "Use camelCase" -}
poll_table :: String
poll_table = "statedb.poll_1"
runBasicBrains :: forall e m . ( e ~ L4Proto
, MonadUnliftIO m
, ForRefChans e
@ -945,7 +975,7 @@ runBasicBrains cfg brains = do
updateOP brains $ do
let conn = view brainsDb brains
liftIO $ execute conn [qc|
insert into statedb.poll (ref,type,interval)
insert into {poll_table} (ref,type,interval)
values (?,?,?)
on conflict do update set interval = excluded.interval
|] (show $ pretty (AsBase58 x), show $ pretty t, mi)

View File

@ -1,10 +1,12 @@
{-# Language TemplateHaskell #-}
module CLI.Common where
import HBS2.Prelude
import HBS2.Clock
import HBS2.Net.Messaging.Unix
import HBS2.Net.Proto
import HBS2.Net.Proto.Service
import HBS2.Net.Auth.Schema
import PeerConfig
@ -58,3 +60,6 @@ pRpcCommon :: Parser RPCOpt
pRpcCommon = do
RPCOpt <$> optional confOpt
<*> optional rpcOpt
pPubKey :: ReadM (PubKey 'Sign HBS2Basic)
pPubKey = maybeReader fromStringMay

View File

@ -0,0 +1,64 @@
module CLI.LWWRef where
import HBS2.Prelude.Plated
import HBS2.OrDie
import HBS2.Net.Proto.Service
import HBS2.Net.Auth.Credentials
import HBS2.Data.Types.SignedBox
import HBS2.Net.Auth.Schema
import HBS2.Peer.Proto.LWWRef
import HBS2.Peer.RPC.API.LWWRef
import HBS2.KeyMan.Keys.Direct
import CLI.Common
import RPC2()
import PeerLogger hiding (info)
import System.Exit
import Options.Applicative
import Data.Word
import Lens.Micro.Platform
pLwwRef :: Parser (IO ())
pLwwRef = hsubparser ( command "fetch" (info pLwwRefFetch (progDesc "fetch lwwref"))
<> command "get" (info pLwwRefGet (progDesc "get lwwref"))
<> command "update" (info pLwwRefUpdate (progDesc "update lwwref"))
)
pLwwRefFetch :: Parser (IO ())
pLwwRefFetch = do
rpc <- pRpcCommon
ref <- strArgument (metavar "LWWREF")
pure $ withMyRPC @LWWRefAPI rpc $ \caller -> do
callService @RpcLWWRefFetch caller ref >>= \case
Left e -> err (viaShow e) >> exitFailure
Right{} -> pure ()
lwwRef :: ReadM (LWWRefKey HBS2Basic)
lwwRef = maybeReader (fromStringMay @(LWWRefKey HBS2Basic))
pLwwRefGet :: Parser (IO ())
pLwwRefGet = do
rpc <- pRpcCommon
ref <- strArgument (metavar "LWWREF")
pure $ withMyRPC @LWWRefAPI rpc $ \caller -> do
callService @RpcLWWRefGet caller ref >>= \case
Left e -> err (viaShow e) >> exitFailure
Right r -> print $ pretty r
pLwwRefUpdate :: Parser (IO ())
pLwwRefUpdate = do
rpc <- pRpcCommon
puk <- argument pPubKey (metavar "LWWREF")
seq <- option @Word64 auto (short 's' <> long "seq" <> help "seqno" <>metavar "SEQ")
val <- option (maybeReader fromStringMay) (short 'v' <> long "value" <> help "value" <> metavar "VALUE")
pure $ withMyRPC @LWWRefAPI rpc $ \caller -> do
(sk,pk) <- liftIO $ runKeymanClient do
creds <- loadCredentials puk >>= orThrowUser "can't load credentials"
pure ( view peerSignSk creds, view peerSignPk creds )
let box = makeSignedBox @L4Proto pk sk (LWWRef @L4Proto seq val Nothing)
callService @RpcLWWRefUpdate caller box >>= \case
Left e -> err (viaShow e) >> exitFailure
Right r -> print $ pretty r

View File

@ -2,29 +2,56 @@
module HttpWorker where
import HBS2.Prelude
import HBS2.Hash
import HBS2.Actors.Peer
import HBS2.Storage
import HBS2.Data.Detect
import HBS2.Data.Types.Refs
import HBS2.Merkle (AnnMetaData)
import HBS2.Merkle
import HBS2.Peer.Proto
import HBS2.Peer.Proto.LWWRef
import HBS2.Net.Auth.Schema
import HBS2.Data.Types.SignedBox
import HBS2.Events
import HBS2.Storage.Operations.ByteString
import PeerTypes
import PeerConfig
import RefLog ( doRefLogBroadCast )
import Data.Config.Suckless
import Data.ByteString.Lazy qualified as LBS
import Network.HTTP.Types.Status
import Network.Wai.Middleware.RequestLogger
import Text.InterpolatedString.Perl6 (qc)
import Web.Scotty
import Data.ByteString.Builder (byteString, Builder)
import Data.Either
import Codec.Serialise (deserialiseOrFail)
import Data.Aeson (object, (.=))
import Data.ByteString.Lazy.Char8 qualified as LBS8
import Control.Monad.Reader
import Lens.Micro.Platform (view)
import System.FilePath
import Control.Monad.Except
import Control.Monad.Trans.Cont
{- HLINT ignore "Functor law" -}
-- TODO: introduce-http-of-off-feature
extractMetadataHash :: Hash HbSync -> LBS.ByteString -> Maybe (Hash HbSync)
extractMetadataHash what blob =
case tryDetect what blob of
MerkleAnn (MTreeAnn {_mtaMeta = AnnHashRef h, _mtaCrypt = NullEncryption}) -> Just h
_ -> Nothing
orElse :: m r -> Maybe a -> ContT r m a
orElse a mb = ContT $ maybe1 mb a
httpWorker :: forall e s m . ( MyPeer e
, MonadIO m
, HasStorage m
@ -32,6 +59,7 @@ httpWorker :: forall e s m . ( MyPeer e
, s ~ Encryption e
, m ~ PeerM e IO
, e ~ L4Proto
-- , ForLWWRefProto e
) => PeerConfig -> AnnMetaData -> DownloadEnv e -> m ()
httpWorker (PeerConfig syn) pmeta e = do
@ -53,6 +81,73 @@ httpWorker (PeerConfig syn) pmeta e = do
Just n -> do
json n
-- TODO: key-to-disable-tree-streaming
get "/ref/:key" do
void $ flip runContT pure do
what <- lift (param @String "key" <&> fromStringMay @(LWWRefKey HBS2Basic))
>>= orElse (status status404)
rv <- getRef sto what
>>= orElse (status status404)
>>= getBlock sto
>>= orElse (status status404)
<&> either (const Nothing) Just . deserialiseOrFail @(SignedBox (LWWRef e) e)
>>= orElse (status status404)
<&> unboxSignedBox0 @(LWWRef e)
>>= orElse (status status404)
<&> lwwValue . snd
lift $ redirect [qc|/tree/{pretty rv}|]
get "/tree/:hash" do
what <- param @String "hash" <&> fromString
void $ flip runContT pure do
callCC $ \exit -> do
blob <- liftIO (getBlock sto what)
>>= orElse (status status404)
mh <- orElse (status status404) (extractMetadataHash what blob)
meta <- lift (getBlock sto mh) >>= orElse (status status404)
<&> LBS8.unpack
<&> fromRight mempty . parseTop
let tp = headDef "application/octet-stream"
[ show (pretty w)
| ListVal [SymbolVal "mime-type:", LitStrVal w] <- meta
]
let fn = headMay
[ show (pretty w)
| ListVal [SymbolVal "file-name:", LitStrVal w] <- meta
]
-- liftIO $ print $ pretty meta
case fn of
Just x | takeExtension x == ".html" -> pure ()
| otherwise -> lift $ do
addHeader "content-disposition" [qc|attachment; filename="{x}"|]
_ -> pure ()
lift $ addHeader "content-type" (fromString tp)
elbs <- lift $ runExceptT $ readFromMerkle sto (SimpleKey what)
case elbs of
Left{} -> lift $ status status404
Right lbs -> lift do
stream $ \write flush -> do
for_ (LBS.toChunks lbs) $ \chunk -> do
write $ byteString chunk
flush
get "/cat/:hash" do
what <- param @String "hash" <&> fromString
blob <- liftIO $ getBlock sto what

52
hbs2-peer/app/LWWRef.hs Normal file
View File

@ -0,0 +1,52 @@
module LWWRef where
import HBS2.Prelude.Plated
import HBS2.Actors.Peer
import HBS2.Data.Types.Refs
import HBS2.Net.Proto
import HBS2.Base58
import HBS2.Storage
import HBS2.Storage.Operations.Missed
import HBS2.Hash
import HBS2.Peer.Proto
import HBS2.Peer.Proto.LWWRef
import HBS2.Net.Auth.Credentials
import HBS2.Misc.PrettyStuff
import Brains
import PeerConfig
import PeerTypes
import Control.Monad
import UnliftIO
import Lens.Micro.Platform
{- HLINT ignore "Functor law" -}
lwwRefWorker :: forall e s m . ( MonadIO m
, MonadUnliftIO m
, MyPeer e
, HasStorage m
, Sessions e (KnownPeer e) m
, HasGossip e (LWWRefProto e) m
, Signatures s
, s ~ Encryption e
, IsRefPubKey s
)
=> PeerConfig
-> SomeBrains e
-> m ()
lwwRefWorker conf brains = do
let listRefs = listPolledRefs @e brains (Just "lwwref")
<&> fmap (\(a,_,b) -> (a,b))
<&> fmap (over _2 ( (*60) . fromIntegral) )
polling (Polling 5 5) listRefs $ \ref -> do
debug $ yellow "POLLING LWWREF" <+> pretty (AsBase58 ref)
gossip (LWWRefProto1 @e (LWWProtoGet (LWWRefKey ref)))

View File

@ -16,6 +16,7 @@ import HBS2.Data.Types.Refs
import HBS2.Data.Types.SignedBox
import HBS2.Data.Types
import HBS2.Net.Auth.Credentials
import HBS2.Net.Auth.Schema()
import HBS2.Net.IP.Addr
import HBS2.Net.Messaging.UDP
import HBS2.Net.Messaging.TCP
@ -47,11 +48,13 @@ import Bootstrap
import CheckMetrics
import RefLog qualified
import RefLog (reflogWorker)
import LWWRef (lwwRefWorker)
import HttpWorker
import DispatchProxy
import PeerMeta
import CLI.Common
import CLI.RefChan
import CLI.LWWRef
import RefChan
import RefChanNotifyLog
import Fetch (fetchHash)
@ -65,9 +68,12 @@ import HBS2.Peer.RPC.API.Storage
import HBS2.Peer.RPC.API.Peer
import HBS2.Peer.RPC.API.RefLog
import HBS2.Peer.RPC.API.RefChan
import HBS2.Peer.RPC.API.LWWRef
import HBS2.Peer.Notify
import HBS2.Peer.RPC.Client.StorageClient
import HBS2.Peer.Proto.LWWRef.Internal
import RPC2(RPC2Context(..))
import Codec.Serialise as Serialise
@ -233,6 +239,7 @@ runCLI = do
<> command "fetch" (info pFetch (progDesc "fetch block"))
<> command "reflog" (info pRefLog (progDesc "reflog commands"))
<> command "refchan" (info pRefChan (progDesc "refchan commands"))
<> command "lwwref" (info pLwwRef (progDesc "lwwref commands"))
<> command "peers" (info pPeers (progDesc "show known peers"))
<> command "pexinfo" (info pPexInfo (progDesc "show pex"))
<> command "download" (info pDownload (progDesc "download management"))
@ -450,7 +457,6 @@ runCLI = do
<> command "del" (info pPollDel (progDesc "del poller" ))
)
pPollAdd = do
rpc <- pRpcCommon
r <- argument refP (metavar "REF")
@ -614,6 +620,8 @@ respawn opts =
runPeer :: forall e s . ( e ~ L4Proto
, FromStringMaybe (PeerAddr e)
, s ~ Encryption e
-- , ForLWWRefProto e
-- , Serialise (PubKey 'Sign (Encryption e))
, HasStorage (PeerM e IO)
)=> PeerOpts -> IO ()
@ -812,7 +820,7 @@ runPeer opts = Exception.handle (\e -> myException e
let refChanAdapter =
RefChanAdapter
{ refChanOnHead = refChanOnHeadFn rce
, refChanSubscribed = isPolledRef @e brains
, refChanSubscribed = isPolledRef @e brains "refchan"
, refChanWriteTran = refChanWriteTranFn rce
, refChanValidatePropose = refChanValidateTranFn @e rce
@ -996,6 +1004,10 @@ runPeer opts = Exception.handle (\e -> myException e
err $ red "Exception" <+> "in thread" <+> pretty t <+> viaShow e
liftIO $ throwTo myself GoAgainException
let lwwRefProtoA = lwwRefProto (LWWRefProtoAdapter { lwwFetchBlock = download })
where download h = withPeerM env $ withDownload denv (addDownload Nothing h)
flip runContT pure do
peerThread "local multicast" $ forever $ do
@ -1029,6 +1041,8 @@ runPeer opts = Exception.handle (\e -> myException e
peerThread "refChanNotifyLogWorker" (refChanNotifyLogWorker @e conf (SomeBrains brains))
peerThread "lwwRefWorker" (lwwRefWorker @e conf (SomeBrains brains))
liftIO $ withPeerM penv do
runProto @e
[ makeResponse (blockSizeProto blk (downloadOnBlockSize denv) onNoBlock)
@ -1043,6 +1057,8 @@ runPeer opts = Exception.handle (\e -> myException e
, makeResponse (refChanUpdateProto False pc refChanAdapter)
, makeResponse (refChanRequestProto False refChanAdapter)
, makeResponse (refChanNotifyProto False refChanAdapter)
-- TODO: change-all-to-authorized
, makeResponse ((authorized . subscribed (SomeBrains brains)) lwwRefProtoA)
]
@ -1152,6 +1168,7 @@ runPeer opts = Exception.handle (\e -> myException e
, makeResponse (makeServer @RefLogAPI)
, makeResponse (makeServer @RefChanAPI)
, makeResponse (makeServer @StorageAPI)
, makeResponse (makeServer @LWWRefAPI)
, makeResponse (makeNotifyServer @(RefChanEvents L4Proto) env)
, makeResponse (makeNotifyServer @(RefLogEvents L4Proto) envrl)
]

View File

@ -3,6 +3,7 @@
{-# Language UndecidableInstances #-}
{-# Language AllowAmbiguousTypes #-}
{-# Language MultiWayIf #-}
{-# Language FunctionalDependencies #-}
module PeerTypes
( module PeerTypes
, module PeerLogger
@ -13,6 +14,8 @@ module PeerTypes
import HBS2.Polling
import HBS2.Actors.Peer
import HBS2.Clock
import HBS2.Net.Auth.Schema
import HBS2.Net.Auth.Credentials
import HBS2.Data.Types.SignedBox
import HBS2.Data.Types.Peer
import HBS2.Data.Types.Refs
@ -24,6 +27,7 @@ import HBS2.Net.IP.Addr
import HBS2.Net.Proto
import HBS2.Peer.Proto.Peer
import HBS2.Peer.Proto.BlockInfo
import HBS2.Peer.Proto.LWWRef
import HBS2.Net.Proto.Sessions
import HBS2.Prelude.Plated
import HBS2.Storage
@ -481,4 +485,40 @@ simpleBlockAnnounce size h = do
pure $ BlockAnnounce @e no annInfo
class IsPolledKey e proto | proto -> e where
getPolledKey :: proto -> (String, PubKey 'Sign (Encryption e))
instance IsPolledKey e (LWWRefProto e) where
getPolledKey = \case
LWWRefProto1 (LWWProtoGet (LWWRefKey k)) -> (tp,k)
LWWRefProto1 (LWWProtoSet (LWWRefKey k) _) -> (tp,k)
where tp = "lwwref"
subscribed :: forall e proto m . ( MonadIO m
, IsPolledKey e proto
, Request e proto m
, Response e proto m
)
=> SomeBrains e
-> (proto -> m ())
-> proto
-> m ()
subscribed brains f req = do
let (tp,ref) = getPolledKey req
polled <- isPolledRef @e brains tp ref
when polled $ f req
authorized :: forall e proto m . ( MonadIO m
, Request e proto m
, Response e proto m
, Sessions e (KnownPeer e) m
)
=> (proto -> m ()) -> proto -> m ()
authorized f req = do
p <- thatPeer @proto
auth <- find (KnownPeerKey p) id <&> isJust
when auth (f req)

View File

@ -2,10 +2,12 @@ module RPC2
( module RPC2.Peer
, module RPC2.RefLog
, module RPC2.RefChan
, module RPC2.LWWRef
) where
import RPC2.Peer
import RPC2.RefLog
import RPC2.RefChan
import RPC2.LWWRef

View File

@ -0,0 +1,76 @@
{-# OPTIONS_GHC -fno-warn-orphans #-}
{-# Language UndecidableInstances #-}
module RPC2.LWWRef where
import HBS2.Peer.Prelude
import HBS2.Actors.Peer
import HBS2.Data.Types.SignedBox
import HBS2.Peer.Proto
import HBS2.Peer.Proto.LWWRef
import HBS2.Peer.Proto.LWWRef.Internal
import HBS2.Storage
import HBS2.Net.Messaging.Unix
import PeerTypes
import HBS2.Peer.RPC.Internal.Types
import HBS2.Peer.RPC.API.LWWRef
import Lens.Micro.Platform
import Control.Monad.Reader
import Control.Monad.Trans.Maybe
type LWWRefContext m = (MonadIO m, HasRpcContext LWWRefAPI RPC2Context m)
instance (Monad m)
=> HasRpcContext LWWRefAPI RPC2Context (ResponseM UNIX (ReaderT RPC2Context m)) where
getRpcContext = lift ask
instance (LWWRefContext m) => HandleMethod m RpcLWWRefGet where
handleMethod key = do
co <- getRpcContext @LWWRefAPI
debug "rpc.LWWRefContext"
let penv = rpcPeerEnv co
liftIO $ withPeerM penv $ do
sto <- getStorage
runMaybeT do
rv <- getRef sto key >>= toMPlus
val <- getBlock sto rv >>= toMPlus
<&> unboxSignedBox @(LWWRef L4Proto) @L4Proto
>>= toMPlus
pure $ snd val
instance LWWRefContext m => HandleMethod m RpcLWWRefFetch where
handleMethod key = do
co <- getRpcContext @LWWRefAPI
debug $ "rpc.LWWRefFetch" <+> pretty key
let penv = rpcPeerEnv co
liftIO $ withPeerM penv $ do
gossip (LWWRefProto1 @L4Proto (LWWProtoGet key))
instance LWWRefContext m => HandleMethod m RpcLWWRefUpdate where
handleMethod box = do
co <- getRpcContext @LWWRefAPI
debug "rpc.LWWRefUpdate"
let penv = rpcPeerEnv co
let nada = LWWRefProtoAdapter dontHandle
void $ runMaybeT do
(puk, _) <- unboxSignedBox0 box & toMPlus
liftIO $ withPeerM penv do
me <- ownPeer @L4Proto
runResponseM me $ do
lwwRefProto nada (LWWRefProto1 (LWWProtoSet @L4Proto (LWWRefKey puk) box))

View File

@ -65,7 +65,7 @@ mkRefLogRequestAdapter :: forall e s m . ( MonadIO m
=> SomeBrains e -> m (RefLogRequestI e (ResponseM e m ))
mkRefLogRequestAdapter brains = do
sto <- getStorage
pure $ RefLogRequestI (doOnRefLogRequest brains sto) dontHandle (isPolledRef @e brains)
pure $ RefLogRequestI (doOnRefLogRequest brains sto) dontHandle (isPolledRef @e brains "reflog")
doOnRefLogRequest :: forall e s m . ( MonadIO m
, MyPeer e
@ -78,7 +78,7 @@ doOnRefLogRequest :: forall e s m . ( MonadIO m
-> m (Maybe (Hash HbSync))
doOnRefLogRequest brains sto (_,pk) = runMaybeT do
isPolledRef @e brains pk >>= guard
isPolledRef @e brains "reflog" pk >>= guard
ref <- liftIO $ getRef sto (RefLogKey @s pk)
when (isNothing ref) do
warn $ "missed reflog value" <+> pretty ref
@ -150,7 +150,7 @@ reflogWorker conf brains adapter = do
subscribe @e RefLogUpdateEvKey $ \(RefLogUpdateEvData (reflog,v, mpip)) -> do
trace $ "reflog worker.got refupdate" <+> pretty (AsBase58 reflog)
polled <- isPolledRef @e brains reflog
polled <- isPolledRef @e brains "reflog" reflog
buddy <- maybe1 mpip (pure False) $ \pip -> do
pa <- toPeerAddr @e pip
acceptAnnouncesFromPeer @e conf pa

View File

@ -69,6 +69,7 @@ common common-deps
, warp
, http-conduit
, http-types
, wai
, wai-extra
, unliftio
, unliftio-core
@ -157,11 +158,14 @@ library
HBS2.Peer.Proto.RefChan.RefChanNotify
HBS2.Peer.Proto.RefChan.RefChanUpdate
HBS2.Peer.Proto.AnyRef
HBS2.Peer.Proto.LWWRef
HBS2.Peer.Proto.LWWRef.Internal
HBS2.Peer.RPC.Class
HBS2.Peer.RPC.API.Peer
HBS2.Peer.RPC.API.RefLog
HBS2.Peer.RPC.API.RefChan
HBS2.Peer.RPC.API.LWWRef
HBS2.Peer.RPC.API.Storage
HBS2.Peer.RPC.Client.Unix
HBS2.Peer.RPC.Client.StorageClient
@ -255,18 +259,21 @@ executable hbs2-peer
, RPC2.Downloads
, RPC2.RefLog
, RPC2.RefChan
, RPC2.LWWRef
, PeerTypes
, PeerLogger
, PeerConfig
, RefLog
, RefChan
, RefChanNotifyLog
, LWWRef
, CheckMetrics
, HttpWorker
, Brains
, DispatchProxy
, CLI.Common
, CLI.RefChan
, CLI.LWWRef
, Paths_hbs2_peer

View File

@ -18,8 +18,8 @@ class HasBrains e a where
listPolledRefs :: MonadIO m => a -> Maybe String -> m [(PubKey 'Sign (Encryption e), String, Int)]
listPolledRefs _ _ = pure mempty
isPolledRef :: MonadIO m => a -> PubKey 'Sign (Encryption e) -> m Bool
isPolledRef _ _ = pure False
isPolledRef :: MonadIO m => a -> String -> PubKey 'Sign (Encryption e) -> m Bool
isPolledRef _ _ _ = pure False
delPolledRef :: MonadIO m => a -> PubKey 'Sign (Encryption e) -> m ()
delPolledRef _ _ = pure ()

View File

@ -1,4 +1,5 @@
{-# OPTIONS_GHC -fno-warn-orphans #-}
{-# Language UndecidableInstances #-}
module HBS2.Peer.Proto
( module HBS2.Peer.Proto.PeerMeta
, module HBS2.Peer.Proto.BlockAnnounce
@ -27,6 +28,7 @@ import HBS2.Peer.Proto.PeerExchange
import HBS2.Peer.Proto.RefLog
import HBS2.Peer.Proto.RefChan hiding (Notify)
import HBS2.Peer.Proto.AnyRef
import HBS2.Peer.Proto.LWWRef
import HBS2.Actors.Peer.Types
import HBS2.Net.Messaging.Unix (UNIX)
@ -146,6 +148,12 @@ instance HasProtocol L4Proto (RefChanNotify L4Proto) where
-- возьмем пока 10 секунд
requestPeriodLim = NoLimit
instance ForLWWRefProto L4Proto => HasProtocol L4Proto (LWWRefProto L4Proto) where
type instance ProtocolId (LWWRefProto L4Proto) = 12001
type instance Encoded L4Proto = ByteString
decode = either (const Nothing) Just . deserialiseOrFail
encode = serialise
requestPeriodLim = ReqLimPerMessage 1
instance Serialise (RefChanValidate UNIX) => HasProtocol UNIX (RefChanValidate UNIX) where
type instance ProtocolId (RefChanValidate UNIX) = 0xFFFA0001

View File

@ -0,0 +1,85 @@
{-# Language UndecidableInstances #-}
{-# Language AllowAmbiguousTypes #-}
module HBS2.Peer.Proto.LWWRef where
import HBS2.Prelude.Plated
import HBS2.Base58
import HBS2.Hash
import HBS2.Data.Types.SignedBox
import HBS2.Data.Types.Refs
import HBS2.Net.Proto.Types
import HBS2.Net.Auth.Schema()
import Data.ByteString (ByteString)
import Data.Hashable hiding (Hashed)
import Data.Maybe
import Data.Word
data LWWRefProtoReq e =
LWWProtoGet (LWWRefKey (Encryption e))
| LWWProtoSet (LWWRefKey (Encryption e)) (SignedBox (LWWRef e) e)
deriving stock Generic
data LWWRefProto e =
LWWRefProto1 (LWWRefProtoReq e)
deriving stock (Generic)
data LWWRef e =
LWWRef
{ lwwSeq :: Word64
, lwwValue :: HashRef
, lwwProof :: Maybe HashRef
}
deriving stock (Generic)
type ForLWWRefProto e = (ForSignedBox e, Serialise (LWWRefKey (Encryption e)))
instance ForLWWRefProto e => Serialise (LWWRefProtoReq e)
instance ForLWWRefProto e => Serialise (LWWRefProto e)
instance ForLWWRefProto e => Serialise (LWWRef e)
newtype LWWRefKey s =
LWWRefKey
{ fromLwwRefKey :: PubKey 'Sign s
}
deriving stock (Generic)
instance RefMetaData (LWWRefKey s)
deriving stock instance IsRefPubKey s => Eq (LWWRefKey s)
instance IsRefPubKey e => Serialise (LWWRefKey e)
instance IsRefPubKey s => Hashable (LWWRefKey s) where
hashWithSalt s k = hashWithSalt s (hashObject @HbSync k)
instance IsRefPubKey s => Hashed HbSync (LWWRefKey s) where
hashObject (LWWRefKey pk) = hashObject ("lwwrefkey|" <> serialise pk)
instance IsRefPubKey s => FromStringMaybe (LWWRefKey s) where
fromStringMay s = LWWRefKey <$> fromStringMay s
instance IsRefPubKey s => IsString (LWWRefKey s) where
fromString s = fromMaybe (error "bad public key base58") (fromStringMay s)
instance Pretty (AsBase58 (PubKey 'Sign s )) => Pretty (AsBase58 (LWWRefKey s)) where
pretty (AsBase58 (LWWRefKey k)) = pretty (AsBase58 k)
instance Pretty (AsBase58 (PubKey 'Sign s )) => Pretty (LWWRefKey s) where
pretty (LWWRefKey k) = pretty (AsBase58 k)
instance Pretty (LWWRef e) where
pretty (LWWRef{..}) = parens ( "lwwref" <> line
<> indent 2 ( seqno <> line <> val <> line <> proof)
)
where
seqno = parens ( "seq" <+> pretty lwwSeq )
val = parens ( "value" <+> dquotes (pretty lwwValue) )
proof | isNothing lwwProof = mempty
| otherwise = parens ( "proof" <+> pretty lwwProof)

View File

@ -0,0 +1,112 @@
module HBS2.Peer.Proto.LWWRef.Internal
( module HBS2.Peer.Proto.LWWRef.Internal
, module HBS2.Peer.Proto.LWWRef
) where
import HBS2.Prelude.Plated
import HBS2.Peer.Proto.LWWRef
import HBS2.Data.Types.SignedBox
import HBS2.Storage
import HBS2.Hash
import HBS2.Clock
import HBS2.Net.Proto
import HBS2.Net.Auth.Credentials
import HBS2.Base58
import HBS2.Events
import HBS2.Actors.Peer.Types
import HBS2.Peer.Proto.Peer
import HBS2.Net.Proto.Sessions
import HBS2.Data.Types.Refs
import HBS2.Misc.PrettyStuff
import HBS2.System.Logger.Simple
import Codec.Serialise
import Control.Monad
import Control.Monad.Trans.Maybe
import Data.Maybe
{- HLINT ignore "Functor law" -}
data LWWRefProtoAdapter e m =
LWWRefProtoAdapter
{ lwwFetchBlock :: Hash HbSync -> m ()
}
lwwRefProto :: forall e s m proto . ( MonadIO m
, ForLWWRefProto e
, Request e proto m
, Response e proto m
, HasDeferred proto e m
, HasGossip e (LWWRefProto e) m
, HasStorage m
, IsPeerAddr e m
, Pretty (Peer e)
, Sessions e (KnownPeer e) m
, Signatures s
, Pretty (AsBase58 (PubKey 'Sign s))
, s ~ Encryption e
, proto ~ LWWRefProto e
)
=> LWWRefProtoAdapter e m
-> LWWRefProto e -> m ()
lwwRefProto adapter pkt@(LWWRefProto1 req) = do
debug $ yellow "lwwRefProto"
case req of
LWWProtoGet key -> deferred @proto $ void $ runMaybeT do
sto <- getStorage
ref <- getRef sto key >>= toMPlus
box <- getBlock sto ref
>>= toMPlus
<&> deserialiseOrFail
>>= toMPlus
lift $ response (LWWRefProto1 (LWWProtoSet @e key box))
LWWProtoSet key box -> void $ runMaybeT do
(puk, lww) <- MaybeT $ pure $ unboxSignedBox0 box
guard ( puk == fromLwwRefKey key )
deferred @proto do
sto <- getStorage
let bs = serialise box
let h0 = hashObject @HbSync bs
new <- hasBlock sto h0 <&> isNothing
when new do
lift $ gossip pkt
lift $ lwwFetchBlock adapter (fromHashRef (lwwValue lww))
getRef sto key >>= \case
Nothing -> do
h <- enqueueBlock sto bs >>= toMPlus
updateRef sto key h
Just rv -> do
blk' <- getBlock sto rv
maybe1 blk' (forcedUpdateLwwRef sto key bs) $ \blk -> do
let seq0 = deserialiseOrFail @(SignedBox (LWWRef e) e) blk
& either (const Nothing) Just
>>= unboxSignedBox0
<&> snd
<&> lwwSeq
when (Just (lwwSeq lww) > seq0) do
forcedUpdateLwwRef sto key (serialise box)
where
forcedUpdateLwwRef sto key bs = do
h' <- enqueueBlock sto bs
forM_ h' $ updateRef sto key

View File

@ -0,0 +1,40 @@
module HBS2.Peer.RPC.API.LWWRef where
import HBS2.Peer.Prelude
import HBS2.Peer.Proto.LWWRef
import HBS2.Data.Types.SignedBox
import HBS2.Net.Messaging.Unix
import HBS2.Data.Types.Refs (HashRef(..))
import HBS2.Net.Proto.Service
import HBS2.Peer.Proto.RefLog (RefLogUpdate)
import Data.ByteString.Lazy (ByteString)
import Codec.Serialise
data RpcLWWRefGet
data RpcLWWRefUpdate
data RpcLWWRefFetch
type LWWRefAPI = '[ RpcLWWRefGet -- may be done via storage
, RpcLWWRefUpdate --
, RpcLWWRefFetch --
]
instance HasProtocol UNIX (ServiceProto LWWRefAPI UNIX) where
type instance ProtocolId (ServiceProto LWWRefAPI UNIX) = 16267229472009458342
type instance Encoded UNIX = ByteString
decode = either (const Nothing) Just . deserialiseOrFail
encode = serialise
type instance Input RpcLWWRefGet = LWWRefKey HBS2Basic
type instance Output RpcLWWRefGet = Maybe (LWWRef L4Proto)
type instance Input RpcLWWRefFetch = LWWRefKey HBS2Basic
type instance Output RpcLWWRefFetch = ()
type instance Input RpcLWWRefUpdate = SignedBox (LWWRef L4Proto) L4Proto
type instance Output RpcLWWRefUpdate = ()

View File

@ -22,11 +22,12 @@ import HBS2.Storage.Simple.Extra
import HBS2.Data.Bundle
import HBS2.OrDie
import HBS2.Version
import HBS2.Misc.PrettyStuff
import Paths_hbs2 qualified as Pkg
import HBS2.KeyMan.Keys.Direct
import HBS2.System.Logger.Simple hiding (info)
import HBS2.System.Logger.Simple.ANSI hiding (info)
import Data.Config.Suckless
@ -36,11 +37,13 @@ import Control.Monad
import Control.Monad.Except
import Control.Monad.Trans.Maybe
import Control.Monad.Trans.Resource
import Control.Monad.Trans.Cont
import Crypto.Saltine.Core.Box qualified as Encrypt
import Data.Aeson qualified as Aeson
import Data.ByteString.Char8 qualified as BS8
import Data.ByteString.Lazy (ByteString)
import Data.ByteString.Lazy qualified as LBS
import Data.ByteString.Lazy.Char8 qualified as LBS8
import Data.ByteString qualified as BS
import Data.ByteArray.Hash (SipHash(..), SipKey(..))
import Data.ByteArray.Hash qualified as BA
@ -54,9 +57,15 @@ import Options.Applicative
import Streaming.Prelude qualified as S
import Streaming.ByteString qualified as SB
import System.Directory
import System.FilePath
import System.Exit qualified as Exit
import System.IO qualified as IO
import System.IO.Temp (emptySystemTempFile)
import Magic.Data
import Magic.Init (magicLoadDefault,magicOpen)
import Magic.Operations (magicFile)
import UnliftIO
tracePrefix :: SetLoggerEntry
@ -75,6 +84,9 @@ noticePrefix :: SetLoggerEntry
noticePrefix = logPrefix "[notice] " . toStderr
data MetadataMethod = MetaDataAuto FilePath
deriving stock (Eq,Generic,Show)
newtype CommonOpts =
CommonOpts
{ _coPref :: Maybe StoragePrefix
@ -221,6 +233,11 @@ runCat opts ss = do
Left hx -> err $ "missed block" <+> pretty hx
Right hr -> print $ vcat (fmap pretty hr)
MerkleAnn (MTreeAnn {_mtaCrypt = NullEncryption }) -> do
bs <- runExceptT (readFromMerkle (AnyStorage ss) (SimpleKey mhash))
>>= orThrowUser "can't read/decode tree"
LBS.putStr bs
MerkleAnn ann@(MTreeAnn {_mtaCrypt = EncryptGroupNaClSymm gkh _}) -> do
keyring <- case uniLastMay @OptKeyringFile opts of
Just krf -> do
@ -310,38 +327,7 @@ runStore opts ss = runResourceT do
Nothing -> die "unknown or invalid group key"
Just (EncSymm gk) -> do
pk <- unOptEncPk <$> pure (uniLastMay @OptEncPubKey opts) `orDie` "public key not specified"
krf <- pure (uniLastMay @OptKeyringFile opts) `orDie` "keyring file not set"
s <- liftIO $ BS.readFile (unOptKeyringFile krf)
cred <- pure (parseCredentials @HBS2Basic (AsCredFile s)) `orDie` "bad keyring file"
sk <- pure (headMay [ (view krPk k, view krSk k)
| k <- view peerKeyring cred
, view krPk k == pk
]) `orDie` "secret key not found"
gks <- pure (Symm.lookupGroupKey (snd sk) pk gk) `orDie` ("can't find secret key for " <> show (pretty (AsBase58 (fst sk))))
void $ liftIO $ IO.withFile inputFile IO.ReadMode $ \fh -> do
let reader = readChunked fh (fromIntegral defBlockSize)
qqq <- S.toList_ $ reader
& S.map (BA.sipHash (SipKey 2716310006254639645 507093936407764973) . LBS.toStrict)
& S.map \(SipHash w) -> w
let (HbSyncHash nonce) = hashObject @HbSync (serialise qqq)
IO.hSeek fh IO.AbsoluteSeek 0
let segments = readChunked fh (fromIntegral defBlockSize)
let source = ToEncryptSymmBS gks (Right gk) nonce segments NoMetaData Nothing
r <- runExceptT $ writeAsMerkle ss source
case r of
Left e -> die (show e)
Right h -> hPrint stdout (pretty h)
die "symmetric group keys are deprecated"
Just (EncAsymm gk) -> liftIO $ IO.withFile inputFile IO.ReadMode $ \ha -> do
@ -487,6 +473,7 @@ main = join . customExecParser (prefs showHelpOnError) $
parser :: Parser (IO ())
parser = hsubparser ( command "store" (info pStore (progDesc "store block"))
<> command "cat" (info pCat (progDesc "cat block"))
<> command "metadata" (info pMetadata (progDesc "tree metadata manipulation"))
<> command "hash" (info pHash (progDesc "calculates hash"))
<> command "fsck" (info pFsck (progDesc "check storage constistency"))
<> command "deps" (info pDeps (progDesc "print dependencies"))
@ -535,6 +522,78 @@ main = join . customExecParser (prefs showHelpOnError) $
pure $ withStore o $ runCat
$ CatOpts hash (CatHashesOnly <$> onlyh) (OptKeyringFile <$> keyringFile) raw
pMetadata = hsubparser ( command "dump" (info pMetadataDump (progDesc "dump metadata"))
<> command "create" (info pMetadataCreate (progDesc "create tree with metadata"))
)
pMetadataDump = do
o <- common
h <- argument (maybeReader (fromStringMay @HashRef)) (metavar "HASH") <&> fromHashRef
pure $ flip runContT pure do
sto <- ContT (withStore o)
void $ runMaybeT do
bs <- getBlock sto h >>= toMPlus
case tryDetect h bs of
MerkleAnn (MTreeAnn { _mtaMeta = AnnHashRef mh } ) -> do
bs <- getBlock sto mh
`orDie` "cant' read metadata"
liftIO $ LBS.putStr bs
_ -> exitFailure
pMetadataCreate = do
o <- common
how <- MetaDataAuto <$> strOption ( long "auto" <> metavar "FILENAME" <> help "automatic metadata from file name")
dry <- flag False True (long "dry" <> short 'n' <> help "don't write to storage")
pure $ flip runContT pure do
sto <- ContT $ withStore o
void $ runMaybeT do
case how of
MetaDataAuto fn -> do
meta <- liftIO do
magic <- magicOpen [MagicMimeType,MagicMime,MagicMimeEncoding]
magicLoadDefault magic
mime <- magicFile magic fn
pure [ "file-name:" <+> dquotes (pretty $ takeFileName fn)
, "mime-type:" <+> dquotes (pretty mime)
]
let s = LBS8.pack $ show $ vcat meta
liftIO $ LBS8.putStr s
guard (not dry)
mth <- putBlock sto s >>= toMPlus
bs <- liftIO $ LBS.readFile fn
root <- writeAsMerkle sto bs
mt <- getBlock sto root `orDie` "can't read merkle tree just written"
<&> deserialiseOrFail @(MTree [HashRef])
>>= orThrowUser "corrupted merkle tree -- should never happen"
delBlock sto root
let mtann = MTreeAnn (AnnHashRef mth) NullEncryption mt
hnew <- putBlock sto (serialise mtann)
`orDie` "can't write merkle tree"
liftIO $ putStrLn ""
liftIO $ putStrLn ""
liftIO $ print $ pretty hnew
pGroupKey = pGroupKeySymm
pGroupKeySymm = hsubparser ( command "gen" (info pGroupKeySymmGen (progDesc "generate") )

View File

@ -79,6 +79,7 @@ executable hbs2
, filepath
, hashable
, interpolatedstring-perl6
, magic
, memory
, microlens-platform
, mtl