mirror of https://github.com/voidlizard/hbs2
feed-up-with-block-bytestring
This commit is contained in:
parent
5691e20f6e
commit
e4c759a49d
|
@ -180,7 +180,7 @@ instance Monad m => HasPeerLocator e (PeerM e m) where
|
||||||
instance Monad m => HasFabriq e (PeerM e m) where
|
instance Monad m => HasFabriq e (PeerM e m) where
|
||||||
getFabriq = asks (view envFab)
|
getFabriq = asks (view envFab)
|
||||||
|
|
||||||
instance Monad m => HasStorage (PeerM e m) where
|
instance (Block ByteString ~ ByteString, Monad m) => HasStorage (PeerM e m) where
|
||||||
getStorage = asks (view envStorage)
|
getStorage = asks (view envStorage)
|
||||||
|
|
||||||
instance Monad m => HasPeerNonce e (PeerM e m) where
|
instance Monad m => HasPeerNonce e (PeerM e m) where
|
||||||
|
|
|
@ -24,7 +24,7 @@ data AnyStorage = forall zu . ( Block ByteString ~ ByteString
|
||||||
, Storage zu HbSync ByteString IO
|
, Storage zu HbSync ByteString IO
|
||||||
) => AnyStorage zu
|
) => AnyStorage zu
|
||||||
|
|
||||||
class HasStorage m where
|
class Block ByteString ~ ByteString => HasStorage m where
|
||||||
getStorage :: m AnyStorage
|
getStorage :: m AnyStorage
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -11,6 +11,7 @@ import Data.Foldable (for_)
|
||||||
import Control.Monad.Trans.Maybe
|
import Control.Monad.Trans.Maybe
|
||||||
import Codec.Serialise (deserialiseOrFail)
|
import Codec.Serialise (deserialiseOrFail)
|
||||||
import Data.ByteString.Lazy (ByteString)
|
import Data.ByteString.Lazy (ByteString)
|
||||||
|
import Data.ByteString.Lazy qualified as LBS
|
||||||
import Data.Either
|
import Data.Either
|
||||||
import Data.Function
|
import Data.Function
|
||||||
import Data.Functor
|
import Data.Functor
|
||||||
|
@ -20,6 +21,9 @@ import Control.Concurrent.STM
|
||||||
import Data.HashMap.Strict qualified as HashMap
|
import Data.HashMap.Strict qualified as HashMap
|
||||||
import Data.HashMap.Strict (HashMap)
|
import Data.HashMap.Strict (HashMap)
|
||||||
|
|
||||||
|
import Streaming.Prelude qualified as S
|
||||||
|
import Streaming qualified as S
|
||||||
|
|
||||||
data BlobType = Merkle (MTree [HashRef])
|
data BlobType = Merkle (MTree [HashRef])
|
||||||
| MerkleAnn (MTreeAnn [HashRef])
|
| MerkleAnn (MTreeAnn [HashRef])
|
||||||
| AnnRef AnnotatedHashRef
|
| AnnRef AnnotatedHashRef
|
||||||
|
@ -125,3 +129,17 @@ deepScan l miss from reader sink = do
|
||||||
|
|
||||||
walk h = walkMerkle h reader stepInside
|
walk h = walkMerkle h reader stepInside
|
||||||
|
|
||||||
|
readBlobFromTree :: forall m . ( MonadIO m )
|
||||||
|
=> ( Hash HbSync -> IO (Maybe ByteString) )
|
||||||
|
-> HashRef
|
||||||
|
-> m (Maybe ByteString)
|
||||||
|
|
||||||
|
readBlobFromTree readBlock hr = do
|
||||||
|
|
||||||
|
pieces <- S.toList_ $
|
||||||
|
deepScan ScanDeep (const $ S.yield Nothing) (fromHashRef hr) (liftIO . readBlock) $ \ha -> do
|
||||||
|
unless (fromHashRef hr == ha) do
|
||||||
|
liftIO (readBlock ha) >>= S.yield
|
||||||
|
|
||||||
|
pure $ LBS.concat <$> sequence pieces
|
||||||
|
|
||||||
|
|
|
@ -131,6 +131,12 @@ instance HasProtocol L4Proto (RefChanHead L4Proto) where
|
||||||
-- TODO: find-out-optimal-max-frequency
|
-- TODO: find-out-optimal-max-frequency
|
||||||
requestPeriodLim = ReqLimPerMessage 60
|
requestPeriodLim = ReqLimPerMessage 60
|
||||||
|
|
||||||
|
instance HasProtocol L4Proto (RefChanUpdate L4Proto) where
|
||||||
|
type instance ProtocolId (RefChanUpdate L4Proto) = 11002
|
||||||
|
type instance Encoded L4Proto = ByteString
|
||||||
|
decode = either (const Nothing) Just . deserialiseOrFail
|
||||||
|
encode = serialise
|
||||||
|
|
||||||
|
|
||||||
instance Expires (SessionKey L4Proto (BlockInfo L4Proto)) where
|
instance Expires (SessionKey L4Proto (BlockInfo L4Proto)) where
|
||||||
expiresIn _ = Just defCookieTimeoutSec
|
expiresIn _ = Just defCookieTimeoutSec
|
||||||
|
|
|
@ -5,6 +5,7 @@ module HBS2.Net.Proto.RefChan where
|
||||||
|
|
||||||
import HBS2.Prelude.Plated
|
import HBS2.Prelude.Plated
|
||||||
import HBS2.Hash
|
import HBS2.Hash
|
||||||
|
import HBS2.Data.Detect
|
||||||
-- import HBS2.Clock
|
-- import HBS2.Clock
|
||||||
import HBS2.Net.Proto
|
import HBS2.Net.Proto
|
||||||
import HBS2.Net.Auth.Credentials
|
import HBS2.Net.Auth.Credentials
|
||||||
|
@ -108,6 +109,26 @@ data RefChanHead e =
|
||||||
instance ForRefChans e => Serialise (RefChanHead e)
|
instance ForRefChans e => Serialise (RefChanHead e)
|
||||||
|
|
||||||
|
|
||||||
|
data ProposeTran e = ProposeTran HashRef (SignedBox ByteString e) -- произвольная бинарная транзакция,
|
||||||
|
deriving stock (Generic) -- подписанная ключом **АВТОРА**, который её рассылает
|
||||||
|
|
||||||
|
instance ForRefChans e => Serialise (ProposeTran e)
|
||||||
|
|
||||||
|
-- TODO: find-out-sure-transaction-size
|
||||||
|
-- транзакция должна быть маленькая!
|
||||||
|
-- хочешь что-то большое просунуть -- шли хэши.
|
||||||
|
-- черт его знает, какой там останется пайлоад.
|
||||||
|
-- надо посмотреть. байт, небось, 400
|
||||||
|
data RefChanUpdate e =
|
||||||
|
Propose (RefChanId e) (SignedBox (ProposeTran e) e) -- подписано ключом пира
|
||||||
|
deriving stock (Generic)
|
||||||
|
|
||||||
|
instance ForRefChans e => Serialise (RefChanUpdate e)
|
||||||
|
|
||||||
|
-- data RefChanNotifyMsg e =
|
||||||
|
-- Notify (SignedBox ByteString e)
|
||||||
|
-- deriving stock (Generic)
|
||||||
|
|
||||||
data RefChanHeadAdapter e m =
|
data RefChanHeadAdapter e m =
|
||||||
RefChanHeadAdapter
|
RefChanHeadAdapter
|
||||||
{ refChanHeadOnHead :: RefChanId e -> RefChanHeadBlockTran e -> m ()
|
{ refChanHeadOnHead :: RefChanId e -> RefChanHeadBlockTran e -> m ()
|
||||||
|
@ -171,6 +192,95 @@ refChanHeadProto self adapter msg = do
|
||||||
where
|
where
|
||||||
proto = Proxy @(RefChanHead e)
|
proto = Proxy @(RefChanHead e)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
refChanUpdateProto :: forall e s m . ( MonadIO m
|
||||||
|
, Request e (RefChanUpdate e) m
|
||||||
|
, Response e (RefChanUpdate e) m
|
||||||
|
, HasDeferred e (RefChanUpdate e) m
|
||||||
|
, IsPeerAddr e m
|
||||||
|
, Pretty (Peer e)
|
||||||
|
, Sessions e (KnownPeer e) m
|
||||||
|
, HasStorage m
|
||||||
|
, Signatures s
|
||||||
|
, IsRefPubKey s
|
||||||
|
, Pretty (AsBase58 (PubKey 'Sign s))
|
||||||
|
-- , Serialise (Signature s)
|
||||||
|
, ForRefChans e
|
||||||
|
, s ~ Encryption e
|
||||||
|
)
|
||||||
|
=> Bool
|
||||||
|
-> RefChanHeadAdapter e m
|
||||||
|
-> RefChanUpdate e
|
||||||
|
-> m ()
|
||||||
|
|
||||||
|
refChanUpdateProto self adapter msg = do
|
||||||
|
-- авторизовать пира
|
||||||
|
peer <- thatPeer proto
|
||||||
|
|
||||||
|
auth <- find (KnownPeerKey peer) id <&> isJust
|
||||||
|
|
||||||
|
sto <- getStorage
|
||||||
|
|
||||||
|
void $ runMaybeT do
|
||||||
|
|
||||||
|
guard auth
|
||||||
|
|
||||||
|
|
||||||
|
case msg of
|
||||||
|
Propose chan box -> do
|
||||||
|
guard =<< lift (refChanHeadSubscribed adapter chan)
|
||||||
|
|
||||||
|
-- TODO: implement-propose-reaction
|
||||||
|
-- достать голову если есть
|
||||||
|
-- если нет - увы. ничего не делать
|
||||||
|
-- достать голову, которую прислали, если она есть
|
||||||
|
-- если нет -- это может значить, что либо она левая,
|
||||||
|
-- либо у нас еще не обновилось.
|
||||||
|
-- ну короче, по факту можем поддержать разговор,
|
||||||
|
-- только если у нас такая же голова.
|
||||||
|
-- в любом другом случае ничего не делаем.
|
||||||
|
--
|
||||||
|
-- короче. такое:
|
||||||
|
-- смотрим, что голова == нашей голове
|
||||||
|
-- если да, то достаём
|
||||||
|
-- смотрим, что пир вообще может сюда писать
|
||||||
|
-- если нет - то ничего не делаем.
|
||||||
|
-- смотрим, что автор вообще может сюда писать.
|
||||||
|
-- если нет - то ничего не делаем.
|
||||||
|
--
|
||||||
|
-- если же пир может писать, автор может писать,
|
||||||
|
-- то рассылаем всем участникам Accept, свой Accept
|
||||||
|
-- тоже куда-то запоминаем (куда?)
|
||||||
|
-- ну или шлём его себе сами - просто вызываем эту
|
||||||
|
-- же функцию в этом же контексте с Accept
|
||||||
|
|
||||||
|
debug "RefChanUpdate/Propose"
|
||||||
|
deferred proto do
|
||||||
|
|
||||||
|
-- проверили подпись пира
|
||||||
|
(peerKey, (ProposeTran headRef bs)) <- MaybeT $ pure $ unboxSignedBox0 box
|
||||||
|
|
||||||
|
-- итак, сначала достаём голову. как мы достаём голову?
|
||||||
|
h <- MaybeT $ liftIO $ getRef sto (RefChanHeadKey @s chan)
|
||||||
|
|
||||||
|
-- FIXME: cache-this
|
||||||
|
hdblob <- MaybeT $ readBlobFromTree ( getBlock sto ) (HashRef h)
|
||||||
|
|
||||||
|
-- смотрим, что у нас такая же голова.
|
||||||
|
-- если нет -- значит, кто-то рассинхронизировался.
|
||||||
|
-- может быть, потом попробуем головы запросить
|
||||||
|
guard (HashRef h == headRef)
|
||||||
|
|
||||||
|
-- теперь достаём голову
|
||||||
|
|
||||||
|
pure ()
|
||||||
|
|
||||||
|
|
||||||
|
where
|
||||||
|
proto = Proxy @(RefChanUpdate e)
|
||||||
|
|
||||||
|
|
||||||
makeSignedBox :: forall e p . (Serialise p, ForRefChans e, Signatures (Encryption e))
|
makeSignedBox :: forall e p . (Serialise p, ForRefChans e, Signatures (Encryption e))
|
||||||
=> PubKey 'Sign (Encryption e)
|
=> PubKey 'Sign (Encryption e)
|
||||||
-> PrivKey 'Sign (Encryption e)
|
-> PrivKey 'Sign (Encryption e)
|
||||||
|
@ -182,21 +292,26 @@ makeSignedBox pk sk msg = SignedBox @p @e pk bs sign
|
||||||
bs = LBS.toStrict (serialise msg)
|
bs = LBS.toStrict (serialise msg)
|
||||||
sign = makeSign @(Encryption e) sk bs
|
sign = makeSign @(Encryption e) sk bs
|
||||||
|
|
||||||
|
|
||||||
|
unboxSignedBox0 :: forall p e . (Serialise p, ForRefChans e, Signatures (Encryption e))
|
||||||
|
=> SignedBox p e
|
||||||
|
-> Maybe (PubKey 'Sign (Encryption e), p)
|
||||||
|
|
||||||
|
unboxSignedBox0 (SignedBox pk bs sign) = runIdentity $ runMaybeT do
|
||||||
|
guard $ verifySign @(Encryption e) pk sign bs
|
||||||
|
p <- MaybeT $ pure $ deserialiseOrFail @p (LBS.fromStrict bs) & either (const Nothing) Just
|
||||||
|
pure (pk, p)
|
||||||
|
|
||||||
unboxSignedBox :: forall p e . (Serialise p, ForRefChans e, Signatures (Encryption e))
|
unboxSignedBox :: forall p e . (Serialise p, ForRefChans e, Signatures (Encryption e))
|
||||||
=> LBS.ByteString
|
=> LBS.ByteString
|
||||||
-> Maybe (PubKey 'Sign (Encryption e), p)
|
-> Maybe (PubKey 'Sign (Encryption e), p)
|
||||||
|
|
||||||
unboxSignedBox bs = runIdentity $ runMaybeT do
|
unboxSignedBox bs = runIdentity $ runMaybeT do
|
||||||
|
|
||||||
(SignedBox pk bs sign) <- MaybeT $ pure
|
box <- MaybeT $ pure $ deserialiseOrFail @(SignedBox p e) bs
|
||||||
$ deserialiseOrFail @(SignedBox p e) bs
|
& either (pure Nothing) Just
|
||||||
& either (pure Nothing) Just
|
|
||||||
|
|
||||||
guard $ verifySign @(Encryption e) pk sign bs
|
MaybeT $ pure $ unboxSignedBox0 box
|
||||||
|
|
||||||
p <- MaybeT $ pure $ deserialiseOrFail @p (LBS.fromStrict bs) & either (const Nothing) Just
|
|
||||||
|
|
||||||
pure (pk, p)
|
|
||||||
|
|
||||||
instance ForRefChans e => FromStringMaybe (RefChanHeadBlock e) where
|
instance ForRefChans e => FromStringMaybe (RefChanHeadBlock e) where
|
||||||
fromStringMay str = RefChanHeadBlockSmall <$> version
|
fromStringMay str = RefChanHeadBlockSmall <$> version
|
||||||
|
|
|
@ -115,24 +115,6 @@ checkDownloaded hr = do
|
||||||
|
|
||||||
pure $ maybe False (not . List.null) $ sequence result
|
pure $ maybe False (not . List.null) $ sequence result
|
||||||
|
|
||||||
-- FIXME: move-to-library
|
|
||||||
readBlob :: forall m . ( MonadIO m
|
|
||||||
, HasStorage m
|
|
||||||
, Block ByteString ~ ByteString
|
|
||||||
)
|
|
||||||
=> HashRef
|
|
||||||
-> m (Maybe ByteString)
|
|
||||||
|
|
||||||
readBlob hr = do
|
|
||||||
sto <- getStorage
|
|
||||||
let readBlock h = liftIO $ getBlock sto h
|
|
||||||
|
|
||||||
chunks <- S.toList_ $
|
|
||||||
deepScan ScanDeep (const $ S.yield Nothing) (fromHashRef hr) readBlock $ \ha -> do
|
|
||||||
unless (fromHashRef hr == ha) do
|
|
||||||
readBlock ha >>= S.yield
|
|
||||||
|
|
||||||
pure $ LBS.concat <$> sequence chunks
|
|
||||||
|
|
||||||
refChanWorker :: forall e s m . ( MonadIO m
|
refChanWorker :: forall e s m . ( MonadIO m
|
||||||
, MonadUnliftIO m
|
, MonadUnliftIO m
|
||||||
|
@ -212,9 +194,10 @@ refChanWorker env brains = do
|
||||||
refChanAddDownload env chan hr
|
refChanAddDownload env chan hr
|
||||||
trace $ "BLOCK IS NOT HERE" <+> pretty hr
|
trace $ "BLOCK IS NOT HERE" <+> pretty hr
|
||||||
else do
|
else do
|
||||||
|
sto <- getStorage
|
||||||
trace $ "BLOCK IS HERE" <+> pretty hr
|
trace $ "BLOCK IS HERE" <+> pretty hr
|
||||||
-- читаем блок
|
-- читаем блок
|
||||||
lbs <- readBlob hr <&> fromMaybe mempty
|
lbs <- readBlobFromTree (getBlock sto) hr <&> fromMaybe mempty
|
||||||
let what = unboxSignedBox @(RefChanHeadBlock e) @e lbs
|
let what = unboxSignedBox @(RefChanHeadBlock e) @e lbs
|
||||||
|
|
||||||
notify <- atomically $ do
|
notify <- atomically $ do
|
||||||
|
@ -228,8 +211,6 @@ refChanWorker env brains = do
|
||||||
Just (pk,blk) | pk == chan -> do
|
Just (pk,blk) | pk == chan -> do
|
||||||
let rkey = RefChanHeadKey @s pk
|
let rkey = RefChanHeadKey @s pk
|
||||||
|
|
||||||
sto <- getStorage
|
|
||||||
|
|
||||||
debug $ "Good head block" <+> pretty hr <+> "processing..."
|
debug $ "Good head block" <+> pretty hr <+> "processing..."
|
||||||
|
|
||||||
ourVersion <- runMaybeT do
|
ourVersion <- runMaybeT do
|
||||||
|
@ -237,7 +218,7 @@ refChanWorker env brains = do
|
||||||
|
|
||||||
cur <- MaybeT $ liftIO $ getRef sto rkey
|
cur <- MaybeT $ liftIO $ getRef sto rkey
|
||||||
|
|
||||||
lbss <- MaybeT $ readBlob (HashRef cur)
|
lbss <- MaybeT $ readBlobFromTree (getBlock sto) (HashRef cur)
|
||||||
|
|
||||||
(_, blkOur) <- MaybeT $ pure $ unboxSignedBox @(RefChanHeadBlock e) @e lbss
|
(_, blkOur) <- MaybeT $ pure $ unboxSignedBox @(RefChanHeadBlock e) @e lbss
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue