diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index eec945b2..2aa5fb60 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -180,7 +180,7 @@ instance Monad m => HasPeerLocator e (PeerM e m) where instance Monad m => HasFabriq e (PeerM e m) where getFabriq = asks (view envFab) -instance Monad m => HasStorage (PeerM e m) where +instance (Block ByteString ~ ByteString, Monad m) => HasStorage (PeerM e m) where getStorage = asks (view envStorage) instance Monad m => HasPeerNonce e (PeerM e m) where diff --git a/hbs2-core/lib/HBS2/Actors/Peer/Types.hs b/hbs2-core/lib/HBS2/Actors/Peer/Types.hs index 1d959a5d..920481b1 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer/Types.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer/Types.hs @@ -24,7 +24,7 @@ data AnyStorage = forall zu . ( Block ByteString ~ ByteString , Storage zu HbSync ByteString IO ) => AnyStorage zu -class HasStorage m where +class Block ByteString ~ ByteString => HasStorage m where getStorage :: m AnyStorage diff --git a/hbs2-core/lib/HBS2/Data/Detect.hs b/hbs2-core/lib/HBS2/Data/Detect.hs index 00815636..a8fcc1f3 100644 --- a/hbs2-core/lib/HBS2/Data/Detect.hs +++ b/hbs2-core/lib/HBS2/Data/Detect.hs @@ -11,6 +11,7 @@ import Data.Foldable (for_) import Control.Monad.Trans.Maybe import Codec.Serialise (deserialiseOrFail) import Data.ByteString.Lazy (ByteString) +import Data.ByteString.Lazy qualified as LBS import Data.Either import Data.Function import Data.Functor @@ -20,6 +21,9 @@ import Control.Concurrent.STM import Data.HashMap.Strict qualified as HashMap import Data.HashMap.Strict (HashMap) +import Streaming.Prelude qualified as S +import Streaming qualified as S + data BlobType = Merkle (MTree [HashRef]) | MerkleAnn (MTreeAnn [HashRef]) | AnnRef AnnotatedHashRef @@ -125,3 +129,17 @@ deepScan l miss from reader sink = do walk h = walkMerkle h reader stepInside +readBlobFromTree :: forall m . ( MonadIO m ) + => ( Hash HbSync -> IO (Maybe ByteString) ) + -> HashRef + -> m (Maybe ByteString) + +readBlobFromTree readBlock hr = do + + pieces <- S.toList_ $ + deepScan ScanDeep (const $ S.yield Nothing) (fromHashRef hr) (liftIO . readBlock) $ \ha -> do + unless (fromHashRef hr == ha) do + liftIO (readBlock ha) >>= S.yield + + pure $ LBS.concat <$> sequence pieces + diff --git a/hbs2-core/lib/HBS2/Net/Proto/Definition.hs b/hbs2-core/lib/HBS2/Net/Proto/Definition.hs index fb02e33e..431a8858 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Definition.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Definition.hs @@ -131,6 +131,12 @@ instance HasProtocol L4Proto (RefChanHead L4Proto) where -- TODO: find-out-optimal-max-frequency requestPeriodLim = ReqLimPerMessage 60 +instance HasProtocol L4Proto (RefChanUpdate L4Proto) where + type instance ProtocolId (RefChanUpdate L4Proto) = 11002 + type instance Encoded L4Proto = ByteString + decode = either (const Nothing) Just . deserialiseOrFail + encode = serialise + instance Expires (SessionKey L4Proto (BlockInfo L4Proto)) where expiresIn _ = Just defCookieTimeoutSec diff --git a/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs b/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs index 8199653c..472a1f86 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs @@ -5,6 +5,7 @@ module HBS2.Net.Proto.RefChan where import HBS2.Prelude.Plated import HBS2.Hash +import HBS2.Data.Detect -- import HBS2.Clock import HBS2.Net.Proto import HBS2.Net.Auth.Credentials @@ -108,6 +109,26 @@ data RefChanHead e = instance ForRefChans e => Serialise (RefChanHead e) +data ProposeTran e = ProposeTran HashRef (SignedBox ByteString e) -- произвольная бинарная транзакция, + deriving stock (Generic) -- подписанная ключом **АВТОРА**, который её рассылает + +instance ForRefChans e => Serialise (ProposeTran e) + +-- TODO: find-out-sure-transaction-size +-- транзакция должна быть маленькая! +-- хочешь что-то большое просунуть -- шли хэши. +-- черт его знает, какой там останется пайлоад. +-- надо посмотреть. байт, небось, 400 +data RefChanUpdate e = + Propose (RefChanId e) (SignedBox (ProposeTran e) e) -- подписано ключом пира + deriving stock (Generic) + +instance ForRefChans e => Serialise (RefChanUpdate e) + +-- data RefChanNotifyMsg e = +-- Notify (SignedBox ByteString e) +-- deriving stock (Generic) + data RefChanHeadAdapter e m = RefChanHeadAdapter { refChanHeadOnHead :: RefChanId e -> RefChanHeadBlockTran e -> m () @@ -171,6 +192,95 @@ refChanHeadProto self adapter msg = do where proto = Proxy @(RefChanHead e) + + +refChanUpdateProto :: forall e s m . ( MonadIO m + , Request e (RefChanUpdate e) m + , Response e (RefChanUpdate e) m + , HasDeferred e (RefChanUpdate e) m + , IsPeerAddr e m + , Pretty (Peer e) + , Sessions e (KnownPeer e) m + , HasStorage m + , Signatures s + , IsRefPubKey s + , Pretty (AsBase58 (PubKey 'Sign s)) + -- , Serialise (Signature s) + , ForRefChans e + , s ~ Encryption e + ) + => Bool + -> RefChanHeadAdapter e m + -> RefChanUpdate e + -> m () + +refChanUpdateProto self adapter msg = do + -- авторизовать пира + peer <- thatPeer proto + + auth <- find (KnownPeerKey peer) id <&> isJust + + sto <- getStorage + + void $ runMaybeT do + + guard auth + + + case msg of + Propose chan box -> do + guard =<< lift (refChanHeadSubscribed adapter chan) + + -- TODO: implement-propose-reaction + -- достать голову если есть + -- если нет - увы. ничего не делать + -- достать голову, которую прислали, если она есть + -- если нет -- это может значить, что либо она левая, + -- либо у нас еще не обновилось. + -- ну короче, по факту можем поддержать разговор, + -- только если у нас такая же голова. + -- в любом другом случае ничего не делаем. + -- + -- короче. такое: + -- смотрим, что голова == нашей голове + -- если да, то достаём + -- смотрим, что пир вообще может сюда писать + -- если нет - то ничего не делаем. + -- смотрим, что автор вообще может сюда писать. + -- если нет - то ничего не делаем. + -- + -- если же пир может писать, автор может писать, + -- то рассылаем всем участникам Accept, свой Accept + -- тоже куда-то запоминаем (куда?) + -- ну или шлём его себе сами - просто вызываем эту + -- же функцию в этом же контексте с Accept + + debug "RefChanUpdate/Propose" + deferred proto do + + -- проверили подпись пира + (peerKey, (ProposeTran headRef bs)) <- MaybeT $ pure $ unboxSignedBox0 box + + -- итак, сначала достаём голову. как мы достаём голову? + h <- MaybeT $ liftIO $ getRef sto (RefChanHeadKey @s chan) + + -- FIXME: cache-this + hdblob <- MaybeT $ readBlobFromTree ( getBlock sto ) (HashRef h) + + -- смотрим, что у нас такая же голова. + -- если нет -- значит, кто-то рассинхронизировался. + -- может быть, потом попробуем головы запросить + guard (HashRef h == headRef) + + -- теперь достаём голову + + pure () + + + where + proto = Proxy @(RefChanUpdate e) + + makeSignedBox :: forall e p . (Serialise p, ForRefChans e, Signatures (Encryption e)) => PubKey 'Sign (Encryption e) -> PrivKey 'Sign (Encryption e) @@ -182,21 +292,26 @@ makeSignedBox pk sk msg = SignedBox @p @e pk bs sign bs = LBS.toStrict (serialise msg) sign = makeSign @(Encryption e) sk bs + +unboxSignedBox0 :: forall p e . (Serialise p, ForRefChans e, Signatures (Encryption e)) + => SignedBox p e + -> Maybe (PubKey 'Sign (Encryption e), p) + +unboxSignedBox0 (SignedBox pk bs sign) = runIdentity $ runMaybeT do + guard $ verifySign @(Encryption e) pk sign bs + p <- MaybeT $ pure $ deserialiseOrFail @p (LBS.fromStrict bs) & either (const Nothing) Just + pure (pk, p) + unboxSignedBox :: forall p e . (Serialise p, ForRefChans e, Signatures (Encryption e)) => LBS.ByteString -> Maybe (PubKey 'Sign (Encryption e), p) unboxSignedBox bs = runIdentity $ runMaybeT do - (SignedBox pk bs sign) <- MaybeT $ pure - $ deserialiseOrFail @(SignedBox p e) bs - & either (pure Nothing) Just + box <- MaybeT $ pure $ deserialiseOrFail @(SignedBox p e) bs + & either (pure Nothing) Just - guard $ verifySign @(Encryption e) pk sign bs - - p <- MaybeT $ pure $ deserialiseOrFail @p (LBS.fromStrict bs) & either (const Nothing) Just - - pure (pk, p) + MaybeT $ pure $ unboxSignedBox0 box instance ForRefChans e => FromStringMaybe (RefChanHeadBlock e) where fromStringMay str = RefChanHeadBlockSmall <$> version diff --git a/hbs2-peer/app/RefChan.hs b/hbs2-peer/app/RefChan.hs index d890a74b..0a8e1eaf 100644 --- a/hbs2-peer/app/RefChan.hs +++ b/hbs2-peer/app/RefChan.hs @@ -115,24 +115,6 @@ checkDownloaded hr = do pure $ maybe False (not . List.null) $ sequence result --- FIXME: move-to-library -readBlob :: forall m . ( MonadIO m - , HasStorage m - , Block ByteString ~ ByteString - ) - => HashRef - -> m (Maybe ByteString) - -readBlob hr = do - sto <- getStorage - let readBlock h = liftIO $ getBlock sto h - - chunks <- S.toList_ $ - deepScan ScanDeep (const $ S.yield Nothing) (fromHashRef hr) readBlock $ \ha -> do - unless (fromHashRef hr == ha) do - readBlock ha >>= S.yield - - pure $ LBS.concat <$> sequence chunks refChanWorker :: forall e s m . ( MonadIO m , MonadUnliftIO m @@ -212,9 +194,10 @@ refChanWorker env brains = do refChanAddDownload env chan hr trace $ "BLOCK IS NOT HERE" <+> pretty hr else do + sto <- getStorage trace $ "BLOCK IS HERE" <+> pretty hr -- читаем блок - lbs <- readBlob hr <&> fromMaybe mempty + lbs <- readBlobFromTree (getBlock sto) hr <&> fromMaybe mempty let what = unboxSignedBox @(RefChanHeadBlock e) @e lbs notify <- atomically $ do @@ -228,8 +211,6 @@ refChanWorker env brains = do Just (pk,blk) | pk == chan -> do let rkey = RefChanHeadKey @s pk - sto <- getStorage - debug $ "Good head block" <+> pretty hr <+> "processing..." ourVersion <- runMaybeT do @@ -237,7 +218,7 @@ refChanWorker env brains = do cur <- MaybeT $ liftIO $ getRef sto rkey - lbss <- MaybeT $ readBlob (HashRef cur) + lbss <- MaybeT $ readBlobFromTree (getBlock sto) (HashRef cur) (_, blkOur) <- MaybeT $ pure $ unboxSignedBox @(RefChanHeadBlock e) @e lbss