diff --git a/.fixme/config b/.fixme/config index bacbca51..0f3328f0 100644 --- a/.fixme/config +++ b/.fixme/config @@ -11,6 +11,7 @@ fixme-files **/*.hs docs/devlog.md fixme-files docs/pep*.txt fixme-files docs/drafts/**/*.txt +fixme-files docs/notes/**/*.txt fixme-files docs/pr/**/*.txt fixme-files docs/todo/**/*.txt fixme-files docs/notes/**/*.txt diff --git a/docs/devlog.md b/docs/devlog.md index 87e98a23..8416977c 100644 --- a/docs/devlog.md +++ b/docs/devlog.md @@ -1,3 +1,11 @@ +## 2023-14-08 + +PR: hbs2-git-config-location + branch: hbs2-git-fastpok + commit: 82d5a30f4510b3f70bfbf2bc208b14df4ce2b17d + Изменено расположение файла конфигурации hbs2-git, + незначительный рефакторинг. + ## 2023-07-30 какие-то косяки @@ -1311,3 +1319,4 @@ PR: bus-crypt Шифрование протокола общения нод. Обмен асимметричными публичными ключами выполняется на стадии хэндшейка в ping/pong. Для шифрования данных создаётся симметричный ключ по diffie-hellman. + diff --git a/docs/notes/bundles.txt b/docs/notes/bundles.txt index 42eee39a..705cf5b3 100644 --- a/docs/notes/bundles.txt +++ b/docs/notes/bundles.txt @@ -1,3 +1,229 @@ +NOTE: bundle-exchange-protocol + + Проблема: + + Долгое скачивание множества маленьких объектов, особенно при + работе со структурами, которые определяются через множество + ссылок ([HashRef]). Такой список сам по себе разбит на блоки, + блоки на чанки, то, на что он ссылается так же разбито, каждый + блок это несколько запросов в pull-модели (узнать размер, + запросить чанки и т.д). + + Решения: + + Данная проблема уже встречалась в hbs2-git и была решена + переходом от модели "один объект git --- одно дерево merkle" к + модели "один журналов push git --- одно дерево merkle". Таким + образом, стейт тут является объединением множества журналов, + каждый из которых содержит объекты git, отсутствующие в + известных журналах (предыдущий стейт). + + Это радикально ускорило синхронизацию, однако ввело оверхед по + данным, и мы потеряли связь 1-1 между объектом git и объектом + (деревом?) hbs2. + + Так же данная проблема встретилась в реализации QBLF при + большом потоке транзакций и будет встречаться всегда, когда мы + выражаем состояние, как множество ссылок на мелкие объекты. + + Более общим решением видится введение некоего примитива bundle, + который будет собирать объекты в журнал, состоящий из секций, + каждая секция соответствует отдельному блоку. По факту, такой + журнал собирается из списка HashRef, HashRef указывает на + конкретный блоб. + + Данный подход имеет смысл, когда требуется передать множество + мелких объектов. Передача же крупных объектов таким образом так + же возможна, однако она даёт возможность для злоупотреблений + (блоки огромного размера вместо деревьев). В принципе, бороться + с этим можно, так как в каждой секции у нас есть размер, и + можно принудительно объекты большого размера сохранять, как + деревья (?), однако тогда мы больше не увидим хэша данного + объекта, который передан в секции. Который, однако, является + опциональным. + + Это ставит перед нами вопрос, как скачивать эти бандлы. + + Текущй алгоритм скачивания парсит блоки, и если блок + является деревом меркля списка ссылок (MTree [HashRef]), + то он попытается скачать и каждый блок, на который указывает + HashRef. + + Если **перед** этим мы узнаем, что для MTRee [HashRef] есть + бандл, мы можем выкачать и импортировать его. + + Если объектов много --- то это плюс. Но если их мало/средне, + то может возникнуть ситуация, что мы скачаем и объекты, и их + журнал. + + Как с этим бороться без очень сложных настроек/взаимоотношений + процессов --- пока непонятно. + + Частное решение для QBLF может быть таким: стейт есть хэш + конкретного Bundle. + + Тогда мы: + + Новое состояние: + - создаём bundle + - рассылаем анонс стейта как обычно + + Мерж: + - читаем все бандлы (состояния) -- + тут у нас есть и транзакции и их хэши + + - делаем новый бандл после мержа транзакций + из всех бандлов + + Остальное: + - как было + + Плюсы: + - мы не дублируем данные, если не хотим (можем не + импортировать объекты из бандлов) + + - можем импортировать, но только при переходе в + состояние + + - скачивание объектов будет быстрое + + - алгоритм мержа усложнится незначительно + + - не нужно сейчас решать, что делать с протоколом + обмена бандлами и как его дизайнить + + Минусы: + + - это опять, скорее, частное решение, но за счёт + этого мы избегаем оверхедов + + - переписывание всего журнала при переходе + к новому стейту? + + - возможен подъем всего стейта в память (а не по одной + транзакции, более того -- сейчас так и будет) + + + Более общее решение: + + - Создаём новый стейт, как [HashRef] + + - Для него --- создаём и анонсируем бандл + + - Бандл скачивается отдельно и самостоятельно + + - После импорта бандла его можно, в принципе, удалять + + - Можно подержать какое-то время, что бы раздать другим + пирам + + - Бандл, вероятно, скачается быстрее, чем будет рекурсивно + скачиваться меркл-дерево множества ссылок + + - Можно попробовать вообще не качать дерево, а качать только + его бандл, а само дерево из него реконструировать (???), + тогда уходим от оверхеда при скачивании + + - Получив анонс стейта, нам бы как-то узнать хэш его бандла, что бы + скачать бандл. + + - Анонсы бандлов можно, всё ж таки, рассылать через Notify, + это не заденет остальные части системы и не надо будет + делать новый протокол + + - Но этот протокол может был бы полезен в других + частях + + + Плюсы: + + - Стейт остаётся [HashRef], то есть не требует + подъема больших объектов в память + + - Алгоритм merge вообще не меняется + + - В других кейсах может пригодиться + + Минусы: + + - Явно сложнее + + - Со ссылками возникают вопросы византийского поведения: + например, ноды могут фальсифицировать ссылки или просто + возвращать мусор + + - Если в ключ ссылки включить публичный ключ пира + (актора?), то можно хотя бы убедиться, что данная ссылка + целостна. И тогда у того, кто её распространяет --- не + получится её фальсифицировать, только игнорировать. + Можно вставлять не в ключ, а в значение, и его + подписывать. Таким образом, это обычный SignedBox. + + - В контексте QBLF это повлияет только на скорость + распространения стейта, т.е если мы не достигаем + консенсуса по значению ссылки -- то рано или поздно + журнал выкачается и так + + - Если это ссылка неизвестно на что, то откуда + SomeRefProto узнает, что значение надо скачивать + и импортировать? + + Отвечаем: Это будет, допустим, (SignedBox Bundle) + и протокол, который качает, допустим, попробует + распарсить и если это Bundle - то качать его HashRef. + + - А как он, например, его будет импортировать, когда + Bundle скачается? + + Отвечаем: допустим, получили Bundle -- запускаем + DownloadMonitor с нужной периодичностью, время от + времени поллим, как поняли, что скачалось --- + запускаем импорт, т.к. процесс импорта передаём + в этот самый монитор (ой мама). Что-то подобное + в любом случае надо бы/надо было сделать + + + Другое более общее решение: + + У нас уже есть механизм BlockAnnounce. Если мы принимаем + BlockAnnounce, то там можно добавить блок типа "Значение ссылки: + ссылка на бандл". Тогда, если мы получили такой блок - мы можем + добавить его в Detect, и если там бандл - то качать этот самый + бандл. + + Это избавит нас от необходимости делать полную реализацию + протокола обмена ссылками, который довольно спорный пока что. + + Итак, что мы добавляем: новый тип, возможно, "Ссылка на bundle" + . тип этот помещаем в блок, например. + + Получив этот блок, например, даунлоадер начинает его скачивать, + а так же, допустим, вешает монитор - что бы импортировать + объекты по завершению процесса. + + Тогда мы решаем в рамках текущего BlockAnnounce, с применением + тех политик, что там используются, что не помешает нам + реализовать механизм обмена ссылками, буде до него дойдет. + + Возникает, кстати, проблема, что делать с шифрованием. + + Ответ -- или класть уже зашифрованные блоки (а откуда ключ + брать для расшифровки?) + + Или ввести обёртку EncryptedBundle или сразу сделать у + Bundle метаинформацию, и там ссылку на ключ передавать + + +TODO: bundle-announce + +TODO: bundle-value-block + +TODO: bundle-value-block-to-detect + +TODO: bundle-value-block-to-downlooad-bundle + +TODO: bundle-value-block-to-imoport-on-downloaded + Базовая реализация бандлов diff --git a/docs/refchan/howto.txt b/docs/refchan/howto.txt new file mode 100644 index 00000000..43fbf98f --- /dev/null +++ b/docs/refchan/howto.txt @@ -0,0 +1,83 @@ + +## Init refchan + + +1. Generate owner's keypair + +``` +hbs2 keyring-new > owner.key + +hbs2 keyrint-list owner.key +sign-key: Atg67E6CPMJWKvR9BvwZTTEjg3Hjz4CYCaEARGANepGP + ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +`` + +2. Generate (or obtain) author's keys + +3. Create refchan head config, like: + +``` +cat > refchan.cfg + +(version 16) ;; version +(quorum 2) ;; required answers to set up a quorum +(wait 15) ;; time in seconds for round to complete + + ;; peer's key authoried to rely/subscribe messages +(peer "35gKUG1mwBTr3tQpjWwR2kBYEnDmHxesoJL5Lj7tMjq3" 1) + +;; same +(peer "5GnroAC8FXNRL8rcgJj6RTu9mt1AbuNd5MZVnDBcCKzb" 1) + +;; author (key, authorized to post messages) +(author "2cU7qBWpohfco4BcbHGPjF6ypGdqDwpKomp8ky6QAEBy") +(author "EoPuukyDLeaZm3vpN3CAuZfjhrYBh6fVyWXcXueCK4i8") + +``` + +4. Make the peer "listen" the refchan + +``` +cat >> ~/.config/hbs2-peer/config + +poll refchan 5 "5ZHZkatu1GeeHybdBms6xFFBWti1cqJtKAjiMmtDT6XQ" + +``` + +5. Set up the head block + +``` +hbs2-peer refchan gen ./refchan.cfg -k owner.key | hbs2 store + +merkle-root: FJ2Lj1kB4oFf8F3rL1xv3gaG5kzrPMmE2hPm5oQziLy5 + ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +hbs2-peer refchan post FJ2Lj1kB4oFf8F3rL1xv3gaG5kzrPMmE2hPm5oQziLy5 + +``` + +6. Check the head block: + +``` +hbs2 keyring-list owner.key +sign-key: Atg67E6CPMJWKvR9BvwZTTEjg3Hjz4CYCaEARGANepGP + ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +hbs2-peer refchan head get Atg67E6CPMJWKvR9BvwZTTEjg3Hjz4CYCaEARGANepGP +FJ2Lj1kB4oFf8F3rL1xv3gaG5kzrPMmE2hPm5oQziLy5 + +hbs2 cat FJ2Lj1kB4oFf8F3rL1xv3gaG5kzrPMmE2hPm5oQziLy5 | hbs2-peer refchan head dump + +(version 16) +(quorum 2) +(wait 15) +(peer "35gKUG1mwBTr3tQpjWwR2kBYEnDmHxesoJL5Lj7tMjq3" 1) +(peer "5GnroAC8FXNRL8rcgJj6RTu9mt1AbuNd5MZVnDBcCKzb" 1) +(author "EoPuukyDLeaZm3vpN3CAuZfjhrYBh6fVyWXcXueCK4i8") +(author "2cU7qBWpohfco4BcbHGPjF6ypGdqDwpKomp8ky6QAEBy") + + +``` + + + diff --git a/docs/refchan/qblf-notes.txt b/docs/refchan/qblf-notes.txt new file mode 100644 index 00000000..b7ec475f --- /dev/null +++ b/docs/refchan/qblf-notes.txt @@ -0,0 +1,22 @@ +TODO: refchan-qblf-slow-transaction-propagation + на большом количестве одновременных транзакций (порядка сотен), + начинает залипать скачивание. + Та же самая проблема: много маленьких транзакций. Кажется, + надо сразу делать более эффективный механизм (а какой?) + + +TODO: refchan-qblf-fantom-states + поскольку анонсы/мержи не прибиваются сразу, + на следующих раундах появляются фантомные состояния. + кажется, надо игнорировать стейты, в которых мы уже + были, и убивать таблицу этих состояний, когда этих + состояний не осталось. + + как проверить: запустить массовый флуд транзакциями, + должны исчезнуть "предыдущие" состояния. + +TODO: non-atomic-blocks-processing + недокачанный блок может частично обработаться, + и появляются всякие левые транзакции/состояния. + Удивительным образом потом они уходят, но засоряют + эфир в процессе diff --git a/examples/refchan-qblf/LICENSE b/examples/refchan-qblf/LICENSE new file mode 100644 index 00000000..e69de29b diff --git a/examples/refchan-qblf/app/RefChanQBLFMain.hs b/examples/refchan-qblf/app/RefChanQBLFMain.hs new file mode 100644 index 00000000..14706b84 --- /dev/null +++ b/examples/refchan-qblf/app/RefChanQBLFMain.hs @@ -0,0 +1,854 @@ +{-# Language TemplateHaskell #-} +{-# Language UndecidableInstances #-} +{-# Language AllowAmbiguousTypes #-} +module Main where + +import HBS2.Prelude +import HBS2.Defaults +import HBS2.Merkle +import HBS2.Hash +import HBS2.Clock +import HBS2.Base58 +import HBS2.OrDie +import HBS2.Data.Types.Refs +import HBS2.Net.Proto.Types +import HBS2.Actors.Peer +import HBS2.Net.Proto.RefChan +import HBS2.Net.Proto.AnyRef +import HBS2.Data.Types.SignedBox +import HBS2.Net.Messaging.Unix +import HBS2.Net.Proto.Definition +import HBS2.Data.Bundle +import HBS2.Net.Auth.Credentials +import HBS2.Data.Detect +import HBS2.Actors.Peer.Types() + +import HBS2.Storage.Simple + +import HBS2.System.Logger.Simple + +import QBLF.Proto + +import Demo.QBLF.Transactions +import Data.Config.Suckless +import Data.Config.Suckless.KeyValue + +import Data.Ord +import Control.Monad.Trans.Maybe +import Codec.Serialise +import Control.Monad.Reader +import Data.ByteString(ByteString) +import Data.ByteString.Char8 qualified as BS +import Data.ByteString.Lazy qualified as LBS +import Data.Functor +import Data.List qualified as List +import Lens.Micro.Platform hiding ((.=)) +import Options.Applicative hiding (info) +import Options.Applicative qualified as O +import System.Directory +import Data.HashSet (HashSet) +import Data.HashSet qualified as HashSet +import Data.HashMap.Strict (HashMap) +import Data.HashMap.Strict qualified as HashMap +import Data.Maybe +import Data.Word +import System.Random +import UnliftIO + +import Data.Time.Clock.POSIX (getPOSIXTime) + +import Data.Aeson hiding (json) +import Web.Scotty hiding (request,header) +import Web.Scotty qualified as Scotty +import Network.HTTP.Types.Status +import Data.Monoid (mconcat) +import Data.Cache (Cache) +import Data.Cache qualified as Cache + +import Control.Monad.Except + + +import Streaming.Prelude qualified as S + +{- HLINT ignore "Use newtype instead of data" -} + +-- TODO: config +-- сделать конфиг, а то слишком много уже параметров в CLI + +data HttpPortOpt +data RefChanOpt +data SocketOpt +data ActorOpt +data DefStateOpt +data StateRefOpt + +data QBLFRefKey +type MyRefKey = AnyRefKey QBLFRefKey HBS2Basic + +instance Monad m => HasCfgKey HttpPortOpt (Maybe Int) m where + key = "http" + + +instance {-# OVERLAPPING #-} (HasConf m, HasCfgKey HttpPortOpt (Maybe Int) m) => HasCfgValue HttpPortOpt (Maybe Int) m where + cfgValue = val <$> getConf + where + val syn = lastMay [ fromIntegral e + | ListVal @C (Key s [LitIntVal e]) <- syn, s == key @HttpPortOpt @(Maybe Int) @m + ] + +instance Monad m => HasCfgKey RefChanOpt (Maybe String) m where + key = "refchan" + +instance Monad m => HasCfgKey SocketOpt (Maybe String) m where + key = "socket" + +instance Monad m => HasCfgKey ActorOpt (Maybe String) m where + key = "actor" + +instance Monad m => HasCfgKey DefStateOpt (Maybe String) m where + key = "default-state" + +instance Monad m => HasCfgKey StateRefOpt (Maybe String) m where + key = "state-ref" + +class ToBalance e tx where + toBalance :: tx -> [(Account e, Amount)] + +tracePrefix :: SetLoggerEntry +tracePrefix = toStderr . logPrefix "[trace] " + +debugPrefix :: SetLoggerEntry +debugPrefix = toStderr . logPrefix "[debug] " + +errorPrefix :: SetLoggerEntry +errorPrefix = toStderr . logPrefix "[error] " + +warnPrefix :: SetLoggerEntry +warnPrefix = toStderr . logPrefix "[warn] " + +noticePrefix :: SetLoggerEntry +noticePrefix = toStderr . logPrefix "[notice] " + +infoPrefix :: SetLoggerEntry +infoPrefix = toStdout . logPrefix "" + +silently :: MonadIO m => m a -> m () +silently m = do + setLoggingOff @DEBUG + setLoggingOff @INFO + setLoggingOff @ERROR + setLoggingOff @WARN + setLoggingOff @NOTICE + void m + +withLogging :: MonadIO m => m a -> m () +withLogging m = do + + -- setLogging @TRACE tracePrefix + setLogging @DEBUG debugPrefix + setLogging @INFO infoPrefix + setLogging @ERROR errorPrefix + setLogging @WARN warnPrefix + setLogging @NOTICE noticePrefix + + m + + setLoggingOff @DEBUG + setLoggingOff @INFO + setLoggingOff @ERROR + setLoggingOff @WARN + setLoggingOff @NOTICE + +data MyEnv = + MyEnv + { mySelf :: Peer UNIX + , myFab :: Fabriq UNIX + , myChan :: RefChanId UNIX + , myRef :: MyRefKey + , mySto :: AnyStorage + , myCred :: PeerCredentials HBS2Basic + , myHttpPort :: Int + , myFetch :: Cache HashRef () + } + +newtype App m a = App { fromApp :: ReaderT MyEnv m a } + deriving newtype ( Functor + , Applicative + , Monad + , MonadIO + , MonadUnliftIO + , MonadReader MyEnv + , MonadTrans + ) + +runApp :: (MonadIO m, PeerMessaging UNIX) => MyEnv -> App m a -> m a +runApp env m = runReaderT (fromApp m) env + +instance Monad m => HasFabriq UNIX (App m) where + getFabriq = asks myFab + +instance Monad m => HasOwnPeer UNIX (App m) where + ownPeer = asks mySelf + +instance Monad m => HasStorage (App m) where + getStorage = asks mySto + +data ConsensusQBLF + +data StateQBLF = StateQBLF { fromStateQBLF :: HashRef } + +-- + +-- TODO: state-propagation +-- Как лучше сделать? +-- Нужно: +-- 1. сохранить дерево множества транзакций +-- +-- 1.1 Сейчас есть: http для сохранения одного блока +-- Надо: сохранить дерево целиком +-- Есть пример, можно скопипастить +-- Минус: может быть не быстро на больших блоках +-- +-- +-- 1.2 Сделать unix-socket ручку на добавление сразу дерева? +-- +-- (-1) Доработка hbs2-peer сразу новый worker +-- (+1) Быстрое добавление дерева целиком +-- (-1) Управление через UNIX сокеты выглядит сложнее +-- +-- 2. распространить блок (announce) +-- +-- + ручка API в hbs2-peer (announce) +-- + + +-- TODO: implementation-plan-transaction +-- определить транзакцию +-- допустим на шаге 1 - эмиссия: emit(acc, amount) +-- где acc - аккаунт, на который осуществляется эмиссия +-- amount - количество токенов (целое!) +-- proof - key(emit(acc, amount)) == key(refchan) +-- и подпись соответствует. +-- +-- теперь - как мы получаем эту транзакцию? +-- предусловия: приватный ключ не дожен экспонироваться +-- +-- как будет делаться транза IRL? +-- безопасно: +-- 1. сгенерили CBOR на клиенте +-- 2. подписали +-- 3. запостили +-- +-- реально: +-- 1. сгенерили CBOR на адаптере, ключ на адаптере +-- 2. подписали +-- 3. запостили +-- +-- что видим: адаптер = эмуляция "правильного" клиента +-- +-- нужен способ, который будет удобен и через CLI и +-- внешнему приложению (адаптеру). +-- +-- адаптер пока на хаскелле, просто делаем библиотечный код +-- и CLI обвязку. +-- +-- добавить обвязку для HTTP (scotty) +-- решить, сразу CBOR или JSON +-- (+ за json - просто отправлять +-- (- за json - подписывать-то как?) +-- +-- надо определить простой и единоообразный способ получения +-- и постинга подписанных транзакций. +-- +-- что приводит нас к дисцплине хранения ключа (кто хранит? +-- если никто - то транза должна быть всегда подписана, +-- следовательно, json неудобен) +-- +-- + +data MyError = + DeserializationError | SignatureError | TxUnsupported | SomeOtherError + deriving stock (Eq,Ord,Show) + +check :: MonadIO m => MyError -> Either e a -> ExceptT MyError m a +check w = \case + Right x -> ExceptT $ pure (Right x) + Left{} -> ExceptT $ pure (Left w) + +fiasco :: MonadIO m => MyError -> ExceptT MyError m a +fiasco x = ExceptT $ pure $ Left x + +ok :: MonadIO m => a -> ExceptT MyError m a +ok x = ExceptT $ pure $ Right x + + +type ForConsensus m = (MonadIO m, Serialise (QBLFMessage ConsensusQBLF)) + +instance Serialise (QBLFMerge ConsensusQBLF) +instance Serialise (QBLFMessage ConsensusQBLF) +instance Serialise (QBLFAnnounce ConsensusQBLF) +instance Serialise (QBLFCommit ConsensusQBLF) + +instance Monad m => HasTimeLimits UNIX (RefChanNotify UNIX) (App m) where + tryLockForPeriod _ _ = pure True + +instance (ForConsensus m) => IsQBLF ConsensusQBLF (App m) where + type QBLFActor ConsensusQBLF = Actor L4Proto + type QBLFTransaction ConsensusQBLF = QBLFDemoToken L4Proto + type QBLFState ConsensusQBLF = DAppState + + qblfMoveForward _ s1 = do + env <- ask + fetchMissed env s1 + pure True + + qblfNewState (DAppState h0) txs = do + sto <- asks mySto + chan <- asks myChan + self <- asks mySelf + creds <- asks myCred + let sk = view peerSignSk creds + let pk = view peerSignPk creds + + -- let block = serialise (HashSet.toList $ HashSet.fromList txs) + + hashes <- liftIO $ mapM (putBlock sto . serialise) txs <&> catMaybes + + -- пробуем разослать бандлы с транзакциями + runMaybeT do + ref <- MaybeT $ createBundle sto (fmap HashRef hashes) + let refval = makeBundleRefValue @L4Proto pk sk (BundleRefSimple ref) + r <- MaybeT $ liftIO $ putBlock sto (serialise refval) + lift $ request self (ActionRequest @UNIX chan (RefChanAnnounceBlock (HashRef r))) + + current <- readLog (getBlock sto) h0 + + -- основная проблема в том, что мы пересортировываем весь state + -- однако, если считать его уже отсортированным, то, может быть, + -- все будет не так уж плохо. + -- так-то мы можем вообще его на диске держать + let new = HashSet.fromList ( current <> fmap HashRef hashes ) + + let pt = toPTree (MaxSize 256) (MaxNum 256) (HashSet.toList new) + + root <- if List.null hashes then do + pure h0 + else do + r <- makeMerkle 0 pt $ \(hx,_,bs) -> do + th <- liftIO (enqueueBlock sto bs) + debug $ "WRITE TX" <+> pretty hx + + request self (ActionRequest @UNIX chan (RefChanAnnounceBlock (HashRef r))) + + pure (HashRef r) + + debug $ "PROPOSED NEW STATE:" <+> pretty root + pure $ DAppState root + + qblfCommit s0 s1 = do + debug $ "COMMIT:" <+> pretty s0 <+> pretty s1 + sto <- asks mySto + chan <- asks myChan + ref <- asks myRef + + debug $ "UPDATING REF" <+> pretty ref <+> pretty s1 + liftIO $ updateRef sto ref (fromHashRef (fromDAppState s1)) + pure () + + qblfBroadCast msg = do + self <- asks mySelf + creds <- asks myCred + chan <- asks myChan + + let sk = view peerSignSk creds + let pk = view peerSignPk creds + nonce <- randomIO @Word64 <&> serialise <&> LBS.toStrict + let box = makeSignedBox @UNIX pk sk (LBS.toStrict (serialise msg) <> nonce) + let notify = Notify @UNIX chan box + request self notify + + case msg of + QBLFMsgAnn _ (QBLFAnnounce _ _) -> do + -- TODO: maybe-announce-new-state-here + pure () + + _ -> none + + -- TODO: optimize-qblf-merge + -- будет нормально работать до десятков/сотен тысяч транз, + -- а потом помрёт. + -- варианты: + -- 1. перенести логику в БД + -- 2. кэшировать всё, что можно + qblfMerge s0 s1 = do + + chan <- asks myChan + + debug $ "MERGE. Proposed state:" <+> pretty s1 + + sto <- asks mySto + let readFn = liftIO . getBlock sto + + tx0 <- readLog readFn (fromDAppState s0) <&> HashSet.fromList + tx1 <- mapM (readLog readFn) (fmap fromDAppState s1) <&> mconcat + + debug $ "READ TXS" <+> pretty s1 <+> pretty (length tx1) + + r <- forM tx1 $ \t -> runMaybeT do + + -- игнорируем ранее добавленные транзакции + guard (not (HashSet.member t tx0)) + + bs <- MaybeT $ liftIO $ getBlock sto (fromHashRef t) + + tx <- MaybeT $ pure $ deserialiseOrFail @(QBLFDemoToken L4Proto) bs & either (const Nothing) Just + + case tx of + Emit box -> do + (pk, e@(EmitTx a q _)) <- MaybeT $ pure $ unboxSignedBox0 @(EmitTx L4Proto) box + guard ( chan == pk ) + debug $ "VALID EMIT TRANSACTION" <+> pretty t <+> pretty (AsBase58 a) <+> pretty q + pure ([(t,e)], mempty) + + (Move box) -> do + (_, m@(MoveTx _ _ qty _)) <- MaybeT $ pure $ unboxSignedBox0 @(MoveTx L4Proto) box + + guard (qty > 0) + debug $ "MOVE TRANSACTION" <+> pretty t + pure (mempty, [(t,m)]) + + let parsed = catMaybes r + + let emits = foldMap (view _1) parsed + + let moves = foldMap (view _2) parsed & List.sortOn fst + + bal0 <- balances (fromDAppState s0) + + -- баланс с учётом новых emit + let balE = foldMap (toBalance @L4Proto . snd) emits + & HashMap.fromListWith (+) + & HashMap.unionWith (+) bal0 + + let moves' = updBalances @L4Proto balE moves + + let merged = fmap fst emits <> fmap fst moves' + + let pt = toPTree (MaxSize 256) (MaxNum 256) (HashSet.toList (tx0 <> HashSet.fromList merged)) + + root <- makeMerkle 0 pt $ \(_,_,bs) -> do + void $ liftIO (putBlock sto bs) + + let new = DAppState (HashRef root) + + -- FIXME: garbage-collect-discarded-states + + debug $ "MERGED" <+> pretty new + + pure new + + +instance (HasConf (ReaderT Config IO)) where + getConf = ask + +instance HasStorage (ReaderT AnyStorage IO) where + getStorage = ask + + + +instance ToBalance e (EmitTx e) where + toBalance (EmitTx a qty _) = [(a, qty)] + +instance ToBalance e (MoveTx e) where + toBalance (MoveTx a1 a2 qty _) = [(a1, -qty), (a2, qty)] + +balances :: forall e m . ( e ~ L4Proto + , MonadIO m + , HasStorage m + , ToBalance L4Proto (EmitTx L4Proto) + , ToBalance L4Proto (MoveTx L4Proto) + ) + => HashRef + -> m (HashMap (Account e) Amount) + +balances root = do + sto <- getStorage + txs <- readLog (liftIO . getBlock sto) root + + r <- forM txs $ \h -> runMaybeT do + blk <- MaybeT $ liftIO $ getBlock sto (fromHashRef h) + tx <- MaybeT $ pure $ deserialiseOrFail @(QBLFDemoToken L4Proto) blk & either (const Nothing) Just + + case tx of + Emit box -> do + (_, emit) <- MaybeT $ pure $ unboxSignedBox0 @(EmitTx L4Proto) box + pure $ toBalance @e emit + + Move box -> do + (_, move) <- MaybeT $ pure $ unboxSignedBox0 @(MoveTx L4Proto) box + pure $ toBalance @e move + + pure $ catMaybes r & mconcat & HashMap.fromListWith (+) + + +-- TODO: optimize-upd-balances +-- можно сгруппировать по аккаунтам +-- и проверять только те транзакции, которые относятся +-- к связанной (транзакциями) группе аккаунтов. +-- то есть, разбить на кластеры, у которых отсутствуют пересечения по +-- аккаунтам и проверять независимо и параллельно, например +-- причем, прямо этой функцией +-- +-- updBalances :: HashMap (Account L4Proto) Amount +-- -> [(tx, b)] +-- -> [(tx, b)] + +updBalances :: forall e a tx . (ForRefChans e, ToBalance e tx) + => HashMap (Account e) Amount + -> [(a, tx)] + -> [(a, tx)] + +updBalances = go + where + go bal [] = empty + + go bal (t:rest) = + + if good then + t : go nb rest + else + go bal rest + + where + nb = HashMap.unionWith (+) bal (HashMap.fromList (toBalance @e (snd t))) + good = HashMap.filter (<0) nb & HashMap.null + + +fetchMissed :: forall e w m . ( MonadIO m + , Request e (RefChanNotify e) m + , e ~ UNIX + , w ~ ConsensusQBLF + ) + => MyEnv + -> QBLFState w + -> m () + +fetchMissed env s = do + let tube = mySelf env + let chan = myChan env + let cache = myFetch env + let sto = mySto env + + let href = fromDAppState s + + here <- liftIO $ hasBlock sto (fromHashRef href) <&> isJust + wip <- liftIO $ Cache.lookup cache href <&> isJust + + unless (here && not wip) do + debug $ "We might be need to fetch" <+> pretty s + request @UNIX tube (ActionRequest @UNIX chan (RefChanFetch (fromDAppState s))) + +runMe :: ForConsensus IO => Config -> IO () +runMe conf = withLogging $ flip runReaderT conf do + + debug $ "runMe" <+> pretty conf + + kr <- cfgValue @ActorOpt @(Maybe String) `orDie` "actor's key not set" + chan' <- cfgValue @RefChanOpt @(Maybe String) `orDie` "refchan not set" + sa <- cfgValue @SocketOpt @(Maybe String) `orDie` "socket not set" + pno <- cfgValue @HttpPortOpt @(Maybe Int) <&> fromMaybe 3011 + ds <- cfgValue @DefStateOpt @(Maybe String) + + ref <- ( cfgValue @StateRefOpt @(Maybe String) + <&> maybe Nothing fromStringMay + ) `orDie` "state-ref not set" + + sc <- liftIO $ BS.readFile kr + creds <- pure (parseCredentials @HBS2Basic (AsCredFile sc)) `orDie` "bad keyring file" + + chan <- pure (fromStringMay @(RefChanId L4Proto) chan') `orDie` "invalid REFCHAN" + + here <- liftIO $ doesFileExist sa + + when here do + liftIO $ removeFile sa + + server <- newMessagingUnix True 1.0 sa + + abus <- async $ runMessagingUnix server + + let tube = fromString sa + + -- FIXME: fix-default-storage + xdg <- liftIO $ getXdgDirectory XdgData defStorePath <&> fromString + sto' <- simpleStorageInit @HbSync [StoragePrefix xdg] + + let sto = AnyStorage sto' + sw <- liftIO $ replicateM 4 $ async $ simpleStorageWorker sto' + + -- FIXME: fix-hardcoded-timeout + fetches <- liftIO $ Cache.newCache (Just (toTimeSpec (TimeoutSec 30))) + + let myEnv = MyEnv tube + (Fabriq server) + chan + ref + sto + creds + pno + fetches + + let dss = ds >>= fromStringMay + + s0 <- readOrCreateStateRef dss sto ref + + debug $ "STATE0:" <+> pretty s0 + + -- получить голову + -- из головы получить акторов + + headBlk <- getRefChanHead @L4Proto sto (RefChanHeadKey chan) `orDie` "can't read head block" + + let self = view peerSignPk creds & Actor @L4Proto + + let actors = view refChanHeadAuthors headBlk + & HashSet.toList + & fmap (Actor @L4Proto) + + runApp myEnv do + + -- FIXME: timeout-hardcode + let w = realToFrac 5 + + -- FIXME: use-actors-asap + qblf <- qblfInit @ConsensusQBLF self actors (DAppState (HashRef s0)) w + + consensus <- async do + pause @'Seconds 0.5 + qblfRun qblf + + + -- FIXME: web-port-to-config + web <- async $ liftIO $ scotty (fromIntegral (myHttpPort myEnv)) $ do + post "/tx" $ do + + r <- runExceptT do + + bin <- lift body + let hBin = hashObject @HbSync bin + + debug $ "GOT TX" <+> pretty hBin + + tok <- check DeserializationError =<< pure (deserialiseOrFail @(QBLFDemoToken L4Proto) bin) + + tx <- case tok of + (Emit box) -> do + (sign, tx) <- maybe (ExceptT $ pure $ Left SignatureError) pure $ unboxSignedBox0 box + + if sign == chan then + pure hBin + else + fiasco SignatureError + + (Move box) -> do + (sign, tx) <- maybe (ExceptT $ pure $ Left SignatureError) pure $ unboxSignedBox0 box + pure hBin + + qblfEnqueue qblf tok + pure hBin + + case r of + Left SignatureError -> do + err $ viaShow SignatureError + status status401 + + Left e -> do + err $ viaShow e + status status400 + + Right tx -> do + debug $ "TX ENQUEUED OK" <+> pretty tx + status status200 + + link web + + runProto $ List.singleton $ makeResponse (myProto myEnv qblf chan) + + void $ waitAnyCatchCancel $ [abus] <> sw + + where + + myProto :: forall e m . ( MonadIO m + , Request e (RefChanNotify e) m + , e ~ UNIX + ) + => MyEnv + -> QBLF ConsensusQBLF + -> RefChanId e + -> RefChanNotify e + -> m () + + myProto _ qblf _ (ActionRequest{}) = do + pure () + + myProto env qblf chan (Notify _ msg) = do + + let sto = mySto env + let tube = mySelf env + + let coco = hashObject @HbSync $ serialise msg + void $ runMaybeT do + (_, wrapped) <- MaybeT $ pure $ unboxSignedBox0 @ByteString @UNIX msg + qbmess <- MaybeT $ pure $ deserialiseOrFail @(QBLFMessage ConsensusQBLF) (LBS.fromStrict wrapped) + & either (const Nothing) Just + + states <- case qbmess of + QBLFMsgAnn _ (QBLFAnnounce s0 s1) -> do + pure [s0, s1] + + QBLFMsgHeartBeat _ _ s0 _-> do + pure [s0] + + _ -> do + pure mempty + + -- FIXME: full-download-guarantee + lift $ forM_ states (fetchMissed env) + + qblfAcceptMessage qblf qbmess + -- debug $ "RefChanQBLFMain(3)" <+> "got message" <+> pretty (AsBase58 chan) <+> pretty coco + + readOrCreateStateRef mbDs sto ref = do + debug $ "MyRef:" <+> pretty (hashObject @HbSync ref) + fix \spin -> do + mbref <- liftIO $ getRef @_ @HbSync sto ref + case mbref of + Nothing -> do + debug "STATE is empty" + maybe1 mbDs none $ \ds -> do + debug $ "UPDATE REF" <+> pretty (hashObject @HbSync ref) <+> pretty (HashRef ds) + liftIO $ updateRef sto ref ds + + pause @'Seconds 0.25 + + spin + + Just val -> do + pure val + +type Config = [Syntax MegaParsec] + +main :: IO () +main = join . customExecParser (prefs showHelpOnError) $ + O.info (helper <*> globalOptions) + ( fullDesc + <> header "refchan-qblf-worker" + <> progDesc "for test and demo purposed" + ) + where + + globalOptions = applyConfig <$> commonOpts <*> cli + + applyConfig :: Maybe FilePath -> (Config -> IO ()) -> IO () + applyConfig config m = do + maybe1 config (m mempty) $ \conf -> do + top <- readFile conf <&> parseTop <&> either (pure mempty) id + m top + + commonOpts = optional $ strOption (long "config" <> short 'c' <> help "Config file") + + cli = hsubparser ( command "run" (O.info pRun (progDesc "run qblf servant" ) ) + <> command "gen" (O.info pGen (progDesc "generate transcation") ) + <> command "post" (O.info pPostTx (progDesc "post transaction") ) + <> command "check" (O.info pCheckTx (progDesc "check transaction") ) + <> command "balances" (O.info pBalances (progDesc "show balances") ) + ) + + pRun = do + pure runMe + + pGen = hsubparser + ( command "tx-emit" ( O.info pGenEmit (progDesc "generate emit") ) + <> command "tx-move" ( O.info pGenMove (progDesc "generate move") ) + ) + + pGenEmit = do + kr <- strOption ( long "keyring" <> short 'k' <> help "keyring file" ) + amnt <- option @Amount auto ( long "amount" <> short 'n' <> help "amount" ) + dest <- strArgument ( metavar "ADDRESS" ) + pure $ const $ silently do + sc <- BS.readFile kr + creds <- pure (parseCredentials @HBS2Basic (AsCredFile sc)) `orDie` "bad keyring file" + let pk = view peerSignPk creds + let sk = view peerSignSk creds + acc <- pure (fromStringMay @(RefChanId L4Proto) dest) `orDie` "bad account address" + tx <- makeEmitTx @L4Proto pk sk acc amnt + LBS.putStr $ serialise tx + + pGenMove = do + kr <- strOption ( long "wallet" <> short 'w' <> help "wallet (keyring) file" ) + amnt <- option @Amount auto ( long "amount" <> short 'n' <> help "amount" ) + dest <- strArgument ( metavar "ADDRESS" ) + pure $ const $ silently do + sc <- BS.readFile kr + creds <- pure (parseCredentials @HBS2Basic (AsCredFile sc)) `orDie` "bad keyring file" + let pk = view peerSignPk creds + let sk = view peerSignSk creds + acc <- pure (fromStringMay @(RefChanId L4Proto) dest) `orDie` "bad account address" + tx <- makeMoveTx @L4Proto pk sk acc amnt + LBS.putStr $ serialise tx + + pCheckTx = do + kr <- strOption ( long "keyring" <> short 'k' <> help "keyring file" ) + pure $ const do + sc <- BS.readFile kr + creds <- pure (parseCredentials @HBS2Basic (AsCredFile sc)) `orDie` "bad keyring file" + let pk = view peerSignPk creds + let sk = view peerSignSk creds + + tx <- LBS.getContents <&> deserialise @(QBLFDemoToken L4Proto) + + case tx of + Emit box -> do + void $ pure (unboxSignedBox0 @(EmitTx L4Proto) @L4Proto box) `orDie` "bad emit tx" + + Move box -> do + void $ pure (unboxSignedBox0 @(MoveTx L4Proto) @L4Proto box) `orDie` "bad move tx" + + pure () + + pPostTx = pure $ const do + error "not supported anymore / TODO via http" + -- rc <- strArgument ( metavar "REFCHAN" ) + -- sa <- strArgument ( metavar "UNIX-SOCKET" ) <&> fromString + -- pure $ withLogging do + -- rchan <- pure (fromStringMay @(RefChanId L4Proto) rc) `orDie` "bad refchan" + -- print "JOPA" + -- -- FIXME: wrap-client-boilerplate + -- inbox <- newMessagingUnix False 1.0 sa + -- wInbox <- async $ runMessagingUnix inbox + -- let env = MyEnv (fromString sa) (Fabriq inbox) rchan + -- msg <- (LBS.getContents <&> deserialiseOrFail) `orDie` "transaction decode error" + -- runApp env do + -- request (mySelf env) (msg :: QBLFDemoTran UNIX) + -- pause @'Seconds 0.1 + -- cancel wInbox + + pBalances = do + state <- strArgument ( metavar "STATE" ) + pure $ const $ withLogging do + + xdg <- liftIO $ getXdgDirectory XdgData defStorePath <&> fromString + sto' <- simpleStorageInit @HbSync [StoragePrefix xdg] + + let sto = AnyStorage sto' + sw <- liftIO $ replicateM 4 $ async $ simpleStorageWorker sto' + + root <- pure (fromStringMay @HashRef state) `orDie` "Bad STATE reference" + + flip runReaderT sto $ do + debug $ "calculating balances for" <+> pretty root + bal <- balances root + + forM_ (HashMap.toList bal) $ \(acc, qty) -> do + liftIO $ print $ pretty (AsBase58 acc) <+> pretty qty + diff --git a/examples/refchan-qblf/config/default b/examples/refchan-qblf/config/default new file mode 100644 index 00000000..a6fb2b26 --- /dev/null +++ b/examples/refchan-qblf/config/default @@ -0,0 +1,16 @@ + +;; +hbs2-peer-rpc "127.0.0.1:13334" + +http 3011 +refchan "Atg67E6CPMJWKvR9BvwZTTEjg3Hjz4CYCaEARGANepGP" +socket "/tmp/notify-Atg67E6CPMJWKvR9BvwZTTEjg3Hjz4CYCaEARGANepGP" +actor "test/refchan/author1.key" + +default-state "56JXzPzJQzaC9PuniQ7zHCPjedLhyc8vBjn3RtcwfXDt" + +state-ref "BQ4Y1235vCZkx2uG9mjq5QuZZ4aSxr4XGAeoibxaLWFt" + +;; storage? + + diff --git a/examples/refchan-qblf/config/second b/examples/refchan-qblf/config/second new file mode 100644 index 00000000..e864deec --- /dev/null +++ b/examples/refchan-qblf/config/second @@ -0,0 +1,16 @@ + +;; +hbs2-peer-rpc "127.0.0.1:13334" + +http 3012 +refchan "Atg67E6CPMJWKvR9BvwZTTEjg3Hjz4CYCaEARGANepGP" +socket "/tmp/notify-Atg67E6CPMJWKvR9BvwZTTEjg3Hjz4CYCaEARGANepGP-2" +actor "test/refchan/author2.key" + +default-state "56JXzPzJQzaC9PuniQ7zHCPjedLhyc8vBjn3RtcwfXDt" + +state-ref "J7ZJ8th1JbmnLiUvLe15sqKtgteQEanxHvWpcqgbS197" + +;; storage? + + diff --git a/examples/refchan-qblf/config/third b/examples/refchan-qblf/config/third new file mode 100644 index 00000000..2881d29e --- /dev/null +++ b/examples/refchan-qblf/config/third @@ -0,0 +1,16 @@ + +;; +hbs2-peer-rpc "127.0.0.1:13334" + +http 3011 +refchan "Atg67E6CPMJWKvR9BvwZTTEjg3Hjz4CYCaEARGANepGP" +socket "/tmp/notify-Atg67E6CPMJWKvR9BvwZTTEjg3Hjz4CYCaEARGANepGP" +actor "test/refchan/author3.key" + +default-state "56JXzPzJQzaC9PuniQ7zHCPjedLhyc8vBjn3RtcwfXDt" + +state-ref "62WTcQCgYU6CbNCeC68SyhyJ27wb24cR8onLM7G8Hsau" + +;; storage? + + diff --git a/examples/refchan-qblf/data/head.bin b/examples/refchan-qblf/data/head.bin new file mode 100644 index 00000000..f553fde2 Binary files /dev/null and b/examples/refchan-qblf/data/head.bin differ diff --git a/examples/refchan-qblf/data/head.dsl b/examples/refchan-qblf/data/head.dsl new file mode 100644 index 00000000..be51465a --- /dev/null +++ b/examples/refchan-qblf/data/head.dsl @@ -0,0 +1,9 @@ +(version 1) +(quorum 2) +(wait 15) +(peer "35gKUG1mwBTr3tQpjWwR2kBYEnDmHxesoJL5Lj7tMjq3" 1) +(peer "5GnroAC8FXNRL8rcgJj6RTu9mt1AbuNd5MZVnDBcCKzb" 1) +(author "23PFfUtRgDtASus54zGhdGvuGjyDVz6fcMwvroSS4ni6") +(author "2Lf9L88dmgp4ETKTpUvVsqn7rgtsjNzcDUVxCnx3yx5G") + + diff --git a/examples/refchan-qblf/data/owner.key b/examples/refchan-qblf/data/owner.key new file mode 100644 index 00000000..69afc32a --- /dev/null +++ b/examples/refchan-qblf/data/owner.key @@ -0,0 +1,6 @@ +# hbs2 credentials file +# keep it private + +rmbTHUgvAY9MAPCPXWMnKxxBdLYhwzpJvuXWrGG59nuTQQ89yH2sgvibnz8U +FHcqremPdtCunTkgGGFVvjBoCF72MiBGX4HtwKvX6umBoxLHnemgHQQF4z73 +cZfqDbqf8rXRB9yig1Uib3AXD9 diff --git a/examples/refchan-qblf/data/peer1.key b/examples/refchan-qblf/data/peer1.key new file mode 100644 index 00000000..6920881f --- /dev/null +++ b/examples/refchan-qblf/data/peer1.key @@ -0,0 +1,6 @@ +# hbs2 credentials file +# keep it private + +rmbTHUgvMRE3DbKUeSfReH5EvTRspfEeK8ogmTG7rs99Ha48KNnEmR9Yfjdq +rimiXGM8oK4WdK9PDqySsZBZxeVdaYiLwbr9yzjwpG8baXQ99k34Dkk7s662 +ZVm6kioTEG8yTw8twTbXk2b6Y7 diff --git a/examples/refchan-qblf/data/peer2.key b/examples/refchan-qblf/data/peer2.key new file mode 100644 index 00000000..ef37a67e --- /dev/null +++ b/examples/refchan-qblf/data/peer2.key @@ -0,0 +1,6 @@ +# hbs2 credentials file +# keep it private + +rmbTHUgvbLz9QzUGc4fzPfHKyyFuh8w2sEvGWPsP1GxrSHk7dh5Qs4A1GQMM +QgUJpB2HvAaQsQjVBUHjHgRkKbNUEPGmyG9iDCQ9Fmy4MCQFQ5YMmBVZthvP +QYia9gaD5WJuPS12rUAjZHeKvo diff --git a/examples/refchan-qblf/data/wallet1.key b/examples/refchan-qblf/data/wallet1.key new file mode 100644 index 00000000..a4c79a8c --- /dev/null +++ b/examples/refchan-qblf/data/wallet1.key @@ -0,0 +1,6 @@ +# hbs2 credentials file +# keep it private + +rmbTHUgve1Kak6KL7StWLyoPxzTy1Fs4DcYccQ4eU93gjN5s2ooSrVsD4C5e +cztUQSysK5Gj5B1d9jYwF9bk4vUCh2QdLdA7r8zAfUxKkto8Ri8oUyGn9eVk +J3JWTSTJE25umnFQ3D4uZmXunF diff --git a/examples/refchan-qblf/data/wallet2.key b/examples/refchan-qblf/data/wallet2.key new file mode 100644 index 00000000..bdf33f59 --- /dev/null +++ b/examples/refchan-qblf/data/wallet2.key @@ -0,0 +1,6 @@ +# hbs2 credentials file +# keep it private + +rmbTHUgvCGLNpN4YMRoeQs3erqzCVPqNL9cGaTHLToa5EX8DnxPoWHXrRRp8 +PXZ2SzLUQXPE16aAJyHsYJcaEbkrcm8W7nB3wKpBZz4Dm91G2uXrMkdv3UdX +jTABTKF3vWKc15LGbu6b7BoqPu diff --git a/examples/refchan-qblf/lib/Demo/QBLF/Transactions.hs b/examples/refchan-qblf/lib/Demo/QBLF/Transactions.hs new file mode 100644 index 00000000..fd634bd2 --- /dev/null +++ b/examples/refchan-qblf/lib/Demo/QBLF/Transactions.hs @@ -0,0 +1,134 @@ +{-# Language UndecidableInstances #-} +module Demo.QBLF.Transactions where + +import HBS2.Prelude.Plated +import HBS2.Hash +import HBS2.Base58 +import HBS2.Net.Proto.Types +import HBS2.Actors.Peer.Types +import HBS2.Net.Proto.RefChan +import HBS2.Data.Types.Refs (HashRef(..)) +import HBS2.Data.Types.SignedBox +import HBS2.Net.Auth.Credentials +import HBS2.Net.Messaging.Unix (UNIX) +import HBS2.Net.Proto.Definition() + +import Data.Hashable(Hashable(..)) +import Codec.Serialise +import Data.ByteString.Lazy (ByteString) +import Data.Word (Word64) +import System.Random + +newtype Actor e = + Actor { fromActor :: PubKey 'Sign (Encryption e) } + deriving stock (Generic) + +deriving stock instance Eq (PubKey 'Sign (Encryption e)) => Eq (Actor e) +deriving newtype instance Hashable (PubKey 'Sign (Encryption e)) => Hashable (Actor e) + +instance Pretty (AsBase58 (PubKey 'Sign (Encryption e))) => Pretty (Actor e) where + pretty (Actor a) = pretty (AsBase58 a) + +type Account e = PubKey 'Sign (Encryption e) + +newtype Amount = Amount Integer + deriving stock (Eq,Show,Ord,Data,Generic) + deriving newtype (Read,Enum,Num,Integral,Real,Pretty) + +newtype DAppState = DAppState { fromDAppState :: HashRef } + deriving stock (Eq,Show,Ord,Data,Generic) + deriving newtype (Hashable,Pretty) + +instance Hashed HbSync DAppState where + hashObject (DAppState (HashRef h)) = h + +data EmitTx e = EmitTx (Account e) Amount Word64 + deriving stock (Generic) + +data MoveTx e = MoveTx (Account e) (Account e) Amount Word64 + deriving stock (Generic) + +data QBLFDemoToken e = + Emit (SignedBox (EmitTx e) e) -- proof: owner's key + | Move (SignedBox (MoveTx e) e) -- proof: wallet's key + deriving stock (Generic) + +instance ForRefChans e => Serialise (Actor e) + +instance Serialise DAppState + +instance Serialise Amount + +instance Serialise (PubKey 'Sign (Encryption e)) => Serialise (EmitTx e) + +instance Serialise (PubKey 'Sign (Encryption e)) => Serialise (MoveTx e) + +instance (Serialise (Account e), ForRefChans e) => Serialise (QBLFDemoToken e) + +type ForQBLFDemoToken e = ( Eq (PubKey 'Sign (Encryption e)) + , Eq (Signature (Encryption e)) + , Pretty (AsBase58 (PubKey 'Sign (Encryption e))) + , ForSignedBox e + , FromStringMaybe (PubKey 'Sign (Encryption e)) + , Serialise (PubKey 'Sign (Encryption e)) + , Serialise (Signature (Encryption e)) + , Hashable (PubKey 'Sign (Encryption e)) + ) + +deriving stock instance (ForQBLFDemoToken e) => Eq (QBLFDemoToken e) + +instance ForQBLFDemoToken e => Hashable (QBLFDemoToken e) where + hashWithSalt salt = \case + Emit box -> hashWithSalt salt box + Move box -> hashWithSalt salt box + +newtype QBLFDemoTran e = + QBLFDemoTran (SignedBox (QBLFDemoToken e) e) + deriving stock Generic + +instance ForRefChans e => Serialise (QBLFDemoTran e) + +deriving newtype instance + (Eq (PubKey 'Sign (Encryption e)), Eq (Signature (Encryption e))) + => Eq (QBLFDemoTran e) + +deriving newtype instance + (Eq (Signature (Encryption e)), ForRefChans e) + => Hashable (QBLFDemoTran e) + +instance Serialise (QBLFDemoTran UNIX) => HasProtocol UNIX (QBLFDemoTran UNIX) where + type instance ProtocolId (QBLFDemoTran UNIX) = 0xFFFF0001 + type instance Encoded UNIX = ByteString + decode = either (const Nothing) Just . deserialiseOrFail + encode = serialise + +makeEmitTx :: forall e m . ( MonadIO m + , ForRefChans e + , Signatures (Encryption e) + ) + => PubKey 'Sign (Encryption e) + -> PrivKey 'Sign (Encryption e) + -> Account e + -> Amount + -> m (QBLFDemoToken e) + +makeEmitTx pk sk acc amount = do + nonce <- randomIO + let box = makeSignedBox @e pk sk (EmitTx @e acc amount nonce) + pure (Emit @e box) + +makeMoveTx :: forall e m . ( MonadIO m + , ForRefChans e + , Signatures (Encryption e) + ) + => PubKey 'Sign (Encryption e) -- from pk + -> PrivKey 'Sign (Encryption e) -- from sk + -> Account e + -> Amount -- amount + -> m (QBLFDemoToken e) + +makeMoveTx pk sk acc amount = do + nonce <- randomIO + let box = makeSignedBox @e pk sk (MoveTx @e pk acc amount nonce) + pure (Move @e box) + diff --git a/examples/refchan-qblf/lib/QBLF/Proto.hs b/examples/refchan-qblf/lib/QBLF/Proto.hs new file mode 100644 index 00000000..6c70d5d4 --- /dev/null +++ b/examples/refchan-qblf/lib/QBLF/Proto.hs @@ -0,0 +1,510 @@ +{-# Language TemplateHaskell #-} +{-# Language AllowAmbiguousTypes #-} +{-# Language UndecidableInstances #-} +{-# Language MultiWayIf #-} +module QBLF.Proto where + +import HBS2.Prelude.Plated +import HBS2.System.Logger.Simple +import HBS2.Clock +import HBS2.Hash + +import Control.Applicative +import Control.Concurrent.STM (flushTQueue) +import Control.Monad +import Control.Monad.Trans.Maybe +import Data.Either +import Data.Function +import Data.HashMap.Strict (HashMap) +import Data.HashMap.Strict qualified as HashMap +import Data.HashSet (HashSet) +import Data.HashSet qualified as HashSet +import Data.Kind +import Data.List qualified as List +import Data.Map qualified as Map +import Data.Maybe +import Data.Ord +import Data.Tuple (swap) +import Lens.Micro.Platform +import System.Random (randomRIO) +import Data.Time.Clock.POSIX +import Codec.Serialise() +import Data.Fixed +import UnliftIO + +{- HLINT ignore "Use newtype instead of data" -} + +newtype QBLFTimeStamp = + QBLFTimeStamp { fromQBLFTimeStamp :: Fixed E12 } + deriving stock (Generic) + deriving newtype (Eq,Ord,Num,Real,Fractional,Show) + +instance Serialise QBLFTimeStamp + +data QBLFMessage w = + QBLFMsgAnn (QBLFActor w) (QBLFAnnounce w) + | QBLFMsgMerge (QBLFActor w ) (QBLFMerge w) + | QBLFMsgCommit (QBLFActor w) (QBLFCommit w) + | QBLFMsgHeartBeat (QBLFActor w) QBLFStateN (QBLFState w) QBLFTimeStamp + deriving stock Generic + +data QBLFAnnounce w = + QBLFAnnounce (QBLFState w) (QBLFState w) + deriving stock (Generic) + + +data QBLFMerge w = + QBLFMerge (QBLFState w) (QBLFState w) + deriving stock Generic + +data QBLFCommit w = + QBLFCommit (QBLFState w) (QBLFState w) + deriving stock Generic + + +data QBLFStateN = + QWait + | QAnnounce + | QMerge + | QCommit + deriving stock (Eq,Ord,Enum,Show,Generic) + +instance Serialise QBLFStateN + +type ForQBLF w = ( Hashed HbSync (QBLFState w) + , Hashable (QBLFState w) + , Hashable (QBLFTransaction w) + , Hashable (QBLFActor w) + , Hashable (QBLFAnnounce w) + , Hashable (QBLFMerge w) + , Eq (QBLFState w) + ) + +deriving instance ForQBLF w => Eq (QBLFAnnounce w) +deriving instance ForQBLF w => Eq (QBLFMerge w) + +instance ForQBLF w => Hashable (QBLFAnnounce w) + +instance ForQBLF w => Hashable (QBLFMerge w) + +class (Monad m, ForQBLF w) => IsQBLF w m where + type QBLFActor w :: Type + type QBLFTransaction w :: Type + type QBLFState w :: Type + + qblfNewState :: QBLFState w -> [QBLFTransaction w] -> m (QBLFState w) + qblfBroadCast :: QBLFMessage w -> m () + qblfMerge :: QBLFState w -> [QBLFState w] -> m (QBLFState w) + qblfCommit :: QBLFState w -> QBLFState w -> m () + + qblfMoveForward :: QBLFState w -> QBLFState w -> m Bool + qblfMoveForward _ _ = pure True + +data QBLF w = + QBLF + { _qblfSelf :: QBLFActor w + , _qblfAllActors :: HashSet (QBLFActor w) + , _qblfState :: QBLFStateN + , _qblfCurrent :: QBLFState w + , _qblfWaitAnnounce :: Timeout 'Seconds + , _qblfCommitsFrom :: HashSet (QBLFActor w) + , _qblfTranQ :: TVar (HashSet (QBLFTransaction w)) + , _qblfAlive :: TVar (HashMap (QBLFActor w) (QBLFStateN, QBLFState w, TimeSpec)) + , _qblfStateTime :: TVar TimeSpec + , _qblfLastHeartBeat :: TVar TimeSpec + , _qblfAnnounces :: TVar (HashMap (QBLFActor w, QBLFAnnounce w) TimeSpec) + , _qblfMerges :: TVar (HashMap (QBLFActor w, QBLFMerge w) TimeSpec) + } + +makeLenses ''QBLF + + +qblfGetActor :: (ForQBLF w) => QBLFMessage w -> QBLFActor w +qblfGetActor = \case + QBLFMsgAnn a _ -> a + QBLFMsgMerge a _ -> a + QBLFMsgCommit a _ -> a + QBLFMsgHeartBeat a _ _ _ -> a + +qblfEnqueue :: (ForQBLF w, MonadIO m) => QBLF w -> QBLFTransaction w -> m () +qblfEnqueue me tran = do + -- synced <- qblfIsSynced me + -- when synced do + atomically $ modifyTVar (view qblfTranQ me) (HashSet.insert tran) + + + +qblfAcceptMessage :: (ForQBLF w, MonadIO m) => QBLF w -> QBLFMessage w -> m () +qblfAcceptMessage me msg = do + + -- FIXME: drop-premature-announces + + let actor = qblfGetActor msg + + when (actor `HashSet.member` view qblfAllActors me) do + now <- getTimeCoarse + + let mine = view qblfCurrent me + let add = const True -- not $ HashSet.member x (view qblfIgnore me) + + case msg of + QBLFMsgAnn a ann@(QBLFAnnounce s0 _) | add s0 -> do + atomically $ modifyTVar (view qblfAnnounces me) (HashMap.insert (a, ann) now) + + QBLFMsgMerge a m@(QBLFMerge s0 _) | add s0 -> do + atomically $ modifyTVar (view qblfMerges me) (HashMap.insert (a, m) now) + + QBLFMsgCommit a (QBLFCommit s0 s) | add s0 -> do + atomically $ modifyTVar (view qblfAlive me) (HashMap.insert a (QWait,s,now)) + + QBLFMsgHeartBeat a t s _ -> do + -- debug $ "heartbeat" <+> pretty (view qblfSelf me) <+> pretty (a, s) + atomically $ modifyTVar (view qblfAlive me) (HashMap.insert a (t,s,now)) + + _ -> pure () + +qblfQuorum :: forall w a m . (ForQBLF w, IsQBLF w m, MonadUnliftIO m, Integral a) + => QBLF w + -> m a + +qblfQuorum me = do + -- n <- qblfLastAlive me + -- pure $ fromIntegral $ 1 + (n `div` 2) + let aliveSz = view qblfAllActors me & List.length + pure $ max 1 $ round $ realToFrac (aliveSz + 1) / 2 + +qblfLastAlive :: (ForQBLF w, IsQBLF w m, MonadUnliftIO m) => QBLF w -> m Int +qblfLastAlive me = pure 0 + -- q <- qblfQuorum me + -- n <- atomically $ readTVar (view qblfAlive me) <&> HashMap.toList <&> length + -- if n > 0 then + -- pure n + -- else + -- pure q + + +qblfInit :: forall w m . (ForQBLF w, IsQBLF w m, MonadUnliftIO m) + => QBLFActor w -- ^ self + -> [QBLFActor w] -- ^ all actors + -> QBLFState w -- ^ initial state + -> Timeout 'Seconds -- ^ timeout + -> m (QBLF w) + +qblfInit self actors s0 w = + QBLF self + (HashSet.fromList actors) + QWait + s0 + w + mempty + <$> newTVarIO mempty + <*> newTVarIO mempty + <*> (newTVarIO =<< now) + <*> newTVarIO 0 + <*> newTVarIO mempty + <*> newTVarIO mempty + + where + now = getTimeCoarse + +qblfNextCommitTime :: (MonadIO f, Real a) => a -> f TimeSpec +qblfNextCommitTime ww = do + let wt = realToFrac ww + t0 <- getTimeCoarse + dt <- liftIO $ randomRIO (wt/2, wt) + pure $ fromNanoSecs $ toNanoSecs t0 + round (realToFrac dt * 1e9) + +-- qblfGetState :: (ForQBLF w, MonadUnliftIO m) => QBLF w -> m QBLFStateN +-- qblfGetState q = readTVarIO (view qblfState q) + + + +qblfRun :: forall w m . ( Pretty (QBLFActor w) + , Pretty (QBLFState w) + , ForQBLF w + , IsQBLF w m + , MonadUnliftIO m + ) => QBLF w -> m () +qblfRun me = do + + forever $ do + void $ qblfTo me QWait + warn "QUIT!!!" + + -- mapM_ wait [a1,hb] + -- mapM_ wait [a1] + + -- waitAnyCatchCancel [] + + where + + tAlive = 5 + + minHeartBeat = round 5e9 + + sendHeartBeat :: IsQBLF w m => QBLF w -> m () + sendHeartBeat s = do + ts <- liftIO getPOSIXTime <&> realToFrac + now <- getTimeCoarse + sent <- readTVarIO (_qblfLastHeartBeat s) + when (toNanoSecs (now - sent) > minHeartBeat) do + qblfBroadCast @w $ QBLFMsgHeartBeat (view qblfSelf s) (view qblfState s) (view qblfCurrent s) ts + atomically $ writeTVar (_qblfLastHeartBeat s) now + + sendAnnounce :: IsQBLF w m => QBLF w -> QBLFState w -> m () + sendAnnounce s sx = do + qblfBroadCast @w (QBLFMsgAnn self (QBLFAnnounce current sx)) + where + self = view qblfSelf s + current = view qblfCurrent s + + sendMerge :: IsQBLF w m => QBLF w -> QBLFState w -> m () + sendMerge s sx = do + qblfBroadCast @w (QBLFMsgMerge self (QBLFMerge current sx)) + where + self = view qblfSelf s + current = view qblfCurrent s + + sendCommit :: IsQBLF w m => QBLF w -> QBLFState w -> m () + sendCommit s sx = do + qblfBroadCast @w (QBLFMsgCommit self (QBLFCommit current sx)) + where + self = view qblfSelf s + current = view qblfCurrent s + + + nap = pause @'Seconds 0.25 + + getAlive :: IsQBLF w m => QBLF w -> m [(QBLFActor w, QBLFStateN, QBLFState w) ] + getAlive s = do + now <- getTimeCoarse + states <- readTVarIO (view qblfAlive s) <&> HashMap.toList + pure [ (a,n,sx) | (a, (n,sx,t)) <- states, toNanoSecs (now - t) < round (2 * tAlive * 1e9) ] + + sweepMerges :: IsQBLF w m => QBLF w -> QBLFState w -> m () + sweepMerges qblf s = do + atomically do + old <- readTVar (view qblfMerges qblf) <&> HashMap.toList + let new = [ ((a, m), t) | ((a, m@(QBLFMerge s0 _)), t) <- old, s0 /= s ] + writeTVar (view qblfMerges me) (HashMap.fromList new) + + sweepAnnounces :: IsQBLF w m => QBLF w -> m () + sweepAnnounces qblf = do + -- FIXME: fix-magic-number + let wnano = fromIntegral $ toNanoSeconds $ max 300 $ 10 * view qblfWaitAnnounce qblf + + now <- getTimeCoarse + + swept <- atomically do + old <- readTVar (view qblfAnnounces qblf) <&> HashMap.toList + let new = [ ((a, m), t) + | ((a, m@(QBLFAnnounce _ _)), t) <- old + , toNanoSecs (now - t) < wnano + ] + writeTVar (view qblfAnnounces me) (HashMap.fromList new) + pure $ length old - length new + + when (swept > 0) do + debug $ "sweepAnnounces" <+> pretty swept + + qblfTo s n = do + now <- getTimeCoarse + let ns = s { _qblfState = n } + atomically $ writeTVar (_qblfStateTime ns) now + case n of + QWait -> qblfToWait ns + QAnnounce -> qblfToAnnounce ns + QMerge -> qblfToMerge ns + QCommit -> qblfToCommit ns + + qblfToWait s = do + let w = view qblfWaitAnnounce s + q <- qblfQuorum s + + let trsh = max 2 (q `div` 2) + + newAnn <- newTVarIO mempty + + fix \next -> do + + sendHeartBeat s + + sweepAnnounces s + + now <- getTimeCoarse + alive <- getAlive s + let wn = sum [ 1 | (_, QWait, _) <- alive ] + -- debug $ logActor s <+> "wait" <+> pretty wn + + t0 <- readTVarIO (view qblfStateTime s) + let elapsed = toNanoSeconds $ TimeoutTS (now - t0) + + their <- selectState s (fmap (view _3) alive) + let mine = view qblfCurrent s + + ann0 <- atomically $ readTVar (view qblfAnnounces s) <&> HashMap.keys + + let alst = [ s1 | (_, QBLFAnnounce s0 s1) <- ann0, s0 == mine, s1 /= s0 ] + + old <- readTVarIO newAnn + let an = HashSet.size $ HashSet.fromList alst `HashSet.difference` old + + atomically $ writeTVar newAnn (HashSet.fromList alst) + + let g = if | their /= Just mine -> do + + case their of + Nothing -> pure $ nap >> next + Just th -> do + + -- FIXME: what-if-failed + forwarded <- qblfMoveForward @w mine th + + if forwarded then do + debug $ logActor s <+> "DO FAST-FORWARD" <+> pretty th + qblfCommit @w mine th + pure $ qblfTo (set qblfCurrent th s) QAnnounce + else do + pure $ nap >> next + + | wn >= q && (elapsed > toNanoSeconds w || an >= trsh) && Just mine == their -> do + let el = elapsed > toNanoSeconds w + let aa = an >= trsh + debug $ logActor s <+> "ready" <+> pretty their <+> pretty el <+> pretty aa <+> pretty an + pure $ qblfTo s QAnnounce + + | otherwise -> pure $ nap >> next + + join g + + qblfToAnnounce s = do + + let mine = view qblfCurrent s + q <- qblfQuorum s + let wa = view qblfWaitAnnounce s + + sweepMerges s mine + + -- TODO: extract-method + txs <- atomically do + tx <- readTVar (view qblfTranQ s) <&> HashSet.toList + writeTVar (view qblfTranQ s) mempty + pure tx + + hx <- qblfNewState @w mine txs + + sendAnnounce s hx + + pause (0.1 * wa) + + g <- race ( pause wa ) do + + fix \next -> do + + ann0 <- atomically $ readTVar (view qblfAnnounces s) <&> HashMap.keys + + let ann = [ s1 | (_, QBLFAnnounce s0 s1) <- ann0, s0 == mine ] + + if length ann >= q then do + debug $ logActor s <+> "announce/ready-to-merge" <+> pretty (view qblfCurrent s) <+> pretty (length ann) + pure $ qblfTo s QMerge + else do + trace $ logActor s <+> "announce/wait" <+> pretty (view qblfCurrent s) <+> pretty (length ann) + nap >> next + + case g of + Left{} -> qblfTo s QWait + Right n -> n + + qblfToMerge s = do + let mine = view qblfCurrent s + q <- qblfQuorum s + + let wa = view qblfWaitAnnounce s + -- pause @'Seconds wa + -- debug $ logActor s <+> "merge" + + ann0 <- atomically $ readTVar (view qblfAnnounces s) <&> HashMap.keys + let aann = [ (a, s1) | (a, QBLFAnnounce s0 s1) <- ann0, s0 == mine ] + + let ann = fmap snd aann + let actors = fmap fst aann & HashSet.fromList + + let g = case ann of + [] -> do + debug $ "MERGE: not found SHIT!" <+> pretty (length aann) + pure $ qblfTo s QWait + + (_:_) -> do + new <- qblfMerge @w mine ann + sendMerge s new + pure $ qblfTo (set qblfCommitsFrom actors s) QCommit + + join g + + qblfToCommit s = do + let mine = view qblfCurrent s + let authors = view qblfCommitsFrom s + + -- pause @'Seconds 2 + debug $ logActor s <+> "qblfToCommit" + + -- FIXME: timeout-and-rollback-to-wait + + let wa = view qblfWaitAnnounce s + + r <- race ( pause wa ) do + fix \next -> do + + merges0 <- atomically $ readTVar (view qblfMerges s) <&> HashMap.keys + let merges = [ (s1, a, m) + | (a, m@(QBLFMerge s0 s1)) <- merges0, s0 == mine + , a `HashSet.member` authors + ] + + mbNew <- selectState s (fmap (view _1) merges) + + trace $ "#### COMMIT NEW:" + <+> pretty mine + <+> pretty mbNew + -- <+> pretty (fmap (view _1) merges) + + case mbNew of + Just new -> do + when (new /= mine) do + debug $ logActor s <+> "commit: " <+> pretty new + sendCommit s new + qblfCommit @w mine new + -- sweepAnnounces s mine + + pure $ qblfTo ( set qblfCurrent new s + ) QWait + + Nothing -> do + -- debug $ logActor s <+> "commit: " <+> "fail" + nap >> next + + case r of + Left{} -> qblfTo s QWait + Right n -> n + + selectState :: IsQBLF w m => QBLF w -> [QBLFState w] -> m (Maybe (QBLFState w)) + selectState s sx = do + q <- qblfQuorum s + let mbs = fmap (,1) sx & HashMap.fromListWith (+) + & HashMap.toList + & fmap (over _2 List.singleton . swap) + & Map.fromListWith (<>) + & Map.toDescList + & headMay + runMaybeT do + ss <- MaybeT $ pure mbs + let sss = over _2 (List.sortOn (Down . hashObject @HbSync)) ss :: (Integer, [QBLFState w]) + + if fst sss >= q then do + MaybeT $ pure $ headMay (snd sss) + else + mzero + + logActor s = "ACTOR" <> parens (pretty (view qblfSelf s)) + diff --git a/examples/refchan-qblf/refchan-qblf.cabal b/examples/refchan-qblf/refchan-qblf.cabal new file mode 100644 index 00000000..bed8ea85 --- /dev/null +++ b/examples/refchan-qblf/refchan-qblf.cabal @@ -0,0 +1,243 @@ +cabal-version: 3.0 +name: refchan-qblf +version: 0.1.0.0 +-- synopsis: +-- description: +license: BSD-3-Clause +license-file: LICENSE +-- author: +-- maintainer: +-- copyright: +category: Network +build-type: Simple +extra-doc-files: CHANGELOG.md +-- extra-source-files: + +common warnings + ghc-options: -Wall + +common common-deps + build-depends: + base, hbs2-core, hbs2-storage-simple + , aeson + , async + , bytestring + , cache + , containers + , data-default + , deepseq + , directory + , filepath + , hashable + , http-types + , microlens-platform + , mtl + , mwc-random + , prettyprinter + , QuickCheck + , random + , random-shuffle + , resourcet + , safe + , serialise + , split + , stm + , streaming + , scotty + , suckless-conf + , tasty + , tasty-hunit + , temporary + , timeit + , transformers + , uniplate + , unordered-containers + , vector + , prettyprinter-ansi-terminal + , interpolatedstring-perl6 + , unliftio + +common shared-properties + ghc-options: + -Wall + -O2 + -fno-warn-type-defaults + -- -fno-warn-unused-matches + -- -fno-warn-unused-do-bind + -- -Werror=missing-methods + -- -Werror=incomplete-patterns + -- -fno-warn-unused-binds + -threaded + -rtsopts + "-with-rtsopts=-N4 -A256m -AL256m -I0" + + + default-language: Haskell2010 + + default-extensions: + ApplicativeDo + , BangPatterns + , BlockArguments + , ConstraintKinds + , DataKinds + , DeriveDataTypeable + , DeriveGeneric + , DerivingStrategies + , DerivingVia + , ExtendedDefaultRules + , FlexibleContexts + , FlexibleInstances + , GADTs + , GeneralizedNewtypeDeriving + , ImportQualifiedPost + , LambdaCase + , MultiParamTypeClasses + , OverloadedStrings + , QuasiQuotes + , ScopedTypeVariables + , StandaloneDeriving + , TupleSections + , TypeApplications + , TypeFamilies + +library refchan-qblf-core + import: shared-properties + import: common-deps + default-language: Haskell2010 + + exposed-modules: QBLF.Proto + + ghc-options: + -- -prof + -- -fprof-auto + + -- other-extensions: + + -- type: exitcode-stdio-1.0 + hs-source-dirs: lib + + build-depends: + base, hbs2-core + , async + , attoparsec + , bytestring + , cache + , clock + , containers + , data-default + , directory + , hashable + , microlens-platform + , mtl + , mwc-random + , network + , network-ip + , optparse-applicative + , prettyprinter + , QuickCheck + , random + , safe + , serialise + , stm + , streaming + , tasty + , tasty-hunit + , text + , time + , transformers + , uniplate + , vector + , unliftio + +executable refchan-qblf + import: shared-properties + import: common-deps + default-language: Haskell2010 + + ghc-options: + -- -prof + -- -fprof-auto + + other-modules: + Demo.QBLF.Transactions + + -- other-extensions: + + -- type: exitcode-stdio-1.0 + hs-source-dirs: app lib + main-is: RefChanQBLFMain.hs + + build-depends: + base, refchan-qblf-core, hbs2-core, hbs2-storage-simple + , async + , attoparsec + , bytestring + , cache + , clock + , containers + , data-default + , data-textual + , directory + , hashable + , microlens-platform + , mtl + , mwc-random + , network + , network-ip + , optparse-applicative + , prettyprinter + , QuickCheck + , random + , safe + , serialise + , stm + , streaming + , tasty + , tasty-hunit + , text + , time + , transformers + , uniplate + , vector + , unliftio + + +test-suite refchan-qblf-proto-test + import: shared-properties + default-language: Haskell2010 + + other-modules: + + type: exitcode-stdio-1.0 + hs-source-dirs: test + main-is: RefChanQBLFProtoTest.hs + + build-depends: + base, refchan-qblf-core, hbs2-core + , async + , bytestring + , cache + , cborg + , containers + , directory + , filepath + , hashable + , microlens-platform + , mtl + , optparse-applicative + , prettyprinter + , prettyprinter-ansi-terminal + , QuickCheck + , random + , safe + , serialise + , tasty + , tasty-hunit + , temporary + , timeit + , uniplate + , unliftio + , unordered-containers + , vector + + diff --git a/examples/refchan-qblf/test/RefChanQBLFProtoTest.hs b/examples/refchan-qblf/test/RefChanQBLFProtoTest.hs new file mode 100644 index 00000000..d34417a4 --- /dev/null +++ b/examples/refchan-qblf/test/RefChanQBLFProtoTest.hs @@ -0,0 +1,334 @@ +{-# Language AllowAmbiguousTypes #-} +module Main where + +import HBS2.Prelude.Plated +import HBS2.System.Logger.Simple +import HBS2.Clock +import HBS2.Hash +import HBS2.Base58 + +import QBLF.Proto + +import Data.Ord +import Data.Functor +import Data.HashMap.Strict (HashMap) +import Data.HashMap.Strict qualified as HashMap +import Data.List qualified as List +import Control.Monad.Reader +import System.Random +import Data.Hashable hiding (Hashed) +import Data.Word +import Lens.Micro.Platform +import Data.Fixed +import Options.Applicative as O +import Data.Cache (Cache) +import Data.Cache qualified as Cache +import Data.Maybe +import Data.Graph +import Data.HashSet (HashSet) +import Data.HashSet qualified as HashSet +import Prettyprinter.Render.Terminal +import GHC.TypeLits + +import Codec.Serialise + +import UnliftIO + +type MyHash = Hash HbSync + +{- HLINT ignore "Use newtype instead of data" -} + +newtype Actor = Actor { fromActor :: Int } + deriving stock (Eq,Ord,Show,Generic) + deriving newtype (Num,Enum,Hashable,Pretty) + +newtype Tran = OddEven { fromTran :: Integer } + deriving stock (Eq,Ord,Show,Generic) + deriving newtype (Hashable) + + +instance Serialise Tran + +newtype MyState = MyState { fromMyState :: MyHash } -- [Tran] + deriving stock (Eq,Ord,Show,Generic) + deriving newtype (Hashable,Pretty) + +instance Hashed HbSync MyState where + hashObject (MyState x) = x + +data MyConsensus = MyConsensus + +data MyEnv = + MyEnv + { mySelf :: Actor + , myActors :: HashMap Actor (TQueue (QBLFMessage MyConsensus)) + , myState :: TVar MyState + , myStorage :: TVar (HashMap MyHash (HashSet Tran)) + , myCommits :: TVar (HashSet (MyHash, MyHash)) + } + + + +newtype RunM m a = RunM { fromRunM :: ReaderT MyEnv m a } + deriving newtype ( Applicative + , Functor + , Monad + , MonadIO + , MonadUnliftIO + , MonadReader MyEnv + ) + +runM :: MyEnv -> RunM m a -> m a +runM env f = runReaderT (fromRunM f) env + + +instance MonadIO m => IsQBLF MyConsensus (RunM m) where + type QBLFActor MyConsensus = Actor + type QBLFTransaction MyConsensus = Tran + type QBLFState MyConsensus = MyState + + qblfNewState _ txs = do + let tx1 = HashSet.fromList txs + let h1 = hashObject (serialise tx1) + let ms1 = MyState h1 + ca <- asks myStorage + atomically $ modifyTVar ca $ HashMap.insert h1 tx1 + pure ms1 + + qblfCommit s0 s1 = do + mst <- asks myState + me <- asks mySelf + co <- asks myCommits + + atomically $ do + writeTVar mst s1 + modifyTVar co (HashSet.insert (fromMyState s1, fromMyState s0)) + + trace $ "COMMIT" <> braces (viaShow me) + <+> ":" + <+> pretty s0 + <+> "->" + <+> pretty s1 + + qblfBroadCast msg = do + actors <- ask <&> myActors <&> HashMap.toList + forM_ actors $ \(a,q) -> do + -- debug $ "qblfBroadCast to" <+> pretty a + atomically $ writeTQueue q msg + + qblfMerge m mss = do + + cache <- asks myStorage + + txs' <- forM (m:mss) $ \(MyState h) -> do + readTVarIO cache <&> HashMap.lookup h <&> fromMaybe mempty + + let txs = HashSet.unions txs' + + let tx3 = HashSet.filter evenOnly txs +-- -- List.nub $ List.filter evenOnly $ List.sort txs + let h3 = hashObject (serialise tx3) + + let ms3 = MyState h3 + atomically $ modifyTVar cache $ HashMap.insert h3 tx3 +-- + pure ms3 + + where + evenOnly (OddEven x) = even x + +tracePrefix :: SetLoggerEntry +tracePrefix = logPrefix "[trace] " + +debugPrefix :: SetLoggerEntry +debugPrefix = logPrefix "[debug] " + +errorPrefix :: SetLoggerEntry +errorPrefix = logPrefix "[error] " + +warnPrefix :: SetLoggerEntry +warnPrefix = logPrefix "[warn] " + +noticePrefix :: SetLoggerEntry +noticePrefix = logPrefix "[notice] " + +data Opts = + Opts + { cliActorsNum :: Int + , cliWaitAnnounceTime :: Double + , cliPd :: Double + , cliPb :: Double + , cliPt :: Double + , cliWt :: Double + , cliTrace :: Bool + } + +cliOptions :: Parser Opts +cliOptions = Opts + <$> option auto + ( long "actors-num" + <> short 'n' + <> help "number of actors" + <> showDefault + <> value 2 ) + <*> option auto + ( long "wait-announce-time" + <> short 'w' + <> help "wait-announce-time" + <> showDefault + <> value 10 ) + <*> option auto + ( long "pD" + <> help "p(disaster)" + <> showDefault + <> value 0.000001 ) + <*> option auto + ( long "pB" + <> help "p(Bad)" + <> showDefault + <> value 0.05 ) + <*> option auto + ( long "pT" + <> help "p(transaction)" + <> showDefault + <> value 0.10 ) + <*> option auto + ( long "wT" + <> help "Period(transaction)" + <> showDefault + <> value 1.00 ) + <*> option auto + ( long "trace" + <> help "enable trace" + <> showDefault + <> value False ) + + + +main :: IO () +main = do + + opts <- execParser $ O.info (cliOptions <**> helper) + ( fullDesc + <> progDesc "refchan-qblf-proto-test" + ) + + when (cliTrace opts) do + setLogging @TRACE tracePrefix + + setLogging @DEBUG debugPrefix + setLogging @INFO defLog + setLogging @ERROR errorPrefix + setLogging @WARN warnPrefix + setLogging @NOTICE noticePrefix + + cache <- newTVarIO mempty + let nada = hashObject (serialise (mempty :: HashSet Tran)) + let s0 = MyState nada + atomically $ modifyTVar cache $ HashMap.insert nada mempty + commits <- newTVarIO mempty + + let actors = [ 1 .. fromIntegral (cliActorsNum opts) ] :: [Actor] + + ee <- forM actors $ \a -> do + (a,) <$> newTQueueIO + + tstates <- newTVarIO (mempty :: HashMap Actor MyState) + + async $ forever $ do + let w = cliWaitAnnounceTime opts + let n = cliActorsNum opts + + let q = max 1 $ round $ realToFrac (n + 1) / 2 + + pause @'Seconds (2 * realToFrac w) + + rs <- atomically $ readTVar tstates <&> HashMap.elems + + let votes = List.sortOn (Down . snd) $ HashMap.toList $ HashMap.fromListWith (+) [ (n,1) | n <- rs ] + + let dv = sum [ 1 | (_,x) <- votes, x < q ] + + case headMay votes of + Just (s,l) | l >= q -> do + z <- readTVarIO cache <&> HashMap.lookup (fromMyState s) <&> maybe 0 length + notice $ annotate (color Green) $ "CONSENSUS" <+> pretty s <+> viaShow l <+> pretty z <+> viaShow dv + + _ -> pure () -- err "NO CONSENSUS!" + + pure () + + w1 <- forM actors $ \a -> async do + + mst <- newTVarIO s0 + + runM (MyEnv a (HashMap.fromList ee) mst cache commits) do + + let pBad = cliPb opts + let w = cliWaitAnnounceTime opts + let pFuckup = cliPd opts + let pT = cliPt opts + let wT = cliWt opts + + qblf <- qblfInit @MyConsensus a actors s0 (realToFrac w) + + r1 <- async $ do + bad <- randomRIO (0.0, 1.00) + + miniW <- randomRIO (0.001, 0.02) + pause @'Seconds (realToFrac miniW) + + when (bad > (1 - pBad)) do + dt <- randomRIO (w/5, w*5) + pause @'Seconds (realToFrac dt) + + qblfRun qblf + + a1 <- async $ forever do + pause @'Seconds (realToFrac wT) + + pt <- liftIO $ randomRIO (0.00, 1.00 :: Double) + + when (pt < pT) do + tran <- OddEven <$> liftIO randomIO + -- debug $ "I'm" <+> viaShow a <+> viaShow tran + qblfEnqueue qblf tran + + a2 <- async $ do + mbInbox <- asks myActors <&> HashMap.lookup a + maybe1 mbInbox (pure ()) $ \inbox -> forever do + atomically (readTQueue inbox) >>= qblfAcceptMessage qblf + + let n = realToFrac (length actors) + fuckup <- liftIO $ randomRIO (0.00, 1.00) <&> (*n) + + when (fuckup < pFuckup) do + debug $ "ACTOR" <> parens (pretty $ view qblfSelf qblf) <+> "DISASTER HAPPENED" + let wx = realToFrac $ view qblfWaitAnnounce qblf + dt <- liftIO $ randomRIO (wx*0.5, wx*3.5) + pause @'Seconds (realToFrac dt) + + a3 <- async $ forever do + pause @'Seconds (realToFrac (cliWaitAnnounceTime opts)) + me <- ask + st@(MyState xs) <- asks myState >>= readTVarIO + n <- asks myStorage >>= liftIO . readTVarIO <&> HashMap.lookup xs <&> fromMaybe mempty <&> length + + atomically $ modifyTVar tstates (HashMap.insert a st) + + -- n <- qblfGetState qblf + + -- -- h <- height + -- notice $ "ACTOR" <> parens (pretty (fromActor $ mySelf me)) + -- <> brackets (viaShow n) + -- <> braces (pretty xs) + + + mapM_ link [r1, a1, a2, a3] + + mapM_ wait [a1,a2,a3,r1] + + waitAnyCatchCancel w1 + + putStrLn "WAT?" + diff --git a/hbs2-core/hbs2-core.cabal b/hbs2-core/hbs2-core.cabal index 1b86abc0..4093e790 100644 --- a/hbs2-core/hbs2-core.cabal +++ b/hbs2-core/hbs2-core.cabal @@ -112,6 +112,7 @@ library , HBS2.Net.Proto.Sessions , HBS2.Net.Proto.RefLog , HBS2.Net.Proto.RefChan + , HBS2.Net.Proto.AnyRef , HBS2.Net.Proto.Types , HBS2.OrDie , HBS2.Prelude diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index bf82a7e7..36ae0f97 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -328,8 +328,7 @@ instance ( MonadIO m when allowed do sendTo pipe (To peer_e) (From me) (AnyMessage @(Encoded e) @e proto (encode msg)) - -- trace $ "REQUEST: after sendTo" <+> viaShow peer_e <+> viaShow msg - + trace $ "REQUEST: after sendTo" <+> viaShow peer_e instance ( Typeable (EventHandler e p (PeerM e IO)) @@ -491,7 +490,9 @@ runProto hh = do for_ messages $ \(From pip, AnyMessage n msg :: AnyMessage (Encoded e) e) -> do case Map.lookup n disp of - Nothing -> pure () -- FIXME: error counting! and statistics counting feature + Nothing -> do + err $ "PROTO not found" <+> pretty n <+> pretty (fmap fst resp) + pure () -- FIXME: error counting! and statistics counting feature Just (AnyProtocol { protoDecode = decoder , handle = h diff --git a/hbs2-core/lib/HBS2/Data/Detect.hs b/hbs2-core/lib/HBS2/Data/Detect.hs index a8fcc1f3..a85b0677 100644 --- a/hbs2-core/lib/HBS2/Data/Detect.hs +++ b/hbs2-core/lib/HBS2/Data/Detect.hs @@ -143,3 +143,16 @@ readBlobFromTree readBlock hr = do pure $ LBS.concat <$> sequence pieces + +readLog :: forall m . ( MonadIO m ) + => ( Hash HbSync -> IO (Maybe ByteString) ) + -> HashRef + -> m [HashRef] +readLog getBlock (HashRef h) = + S.toList_ $ do + walkMerkle h (liftIO . getBlock) $ \hr -> do + case hr of + Left{} -> pure () + Right (hrr :: [HashRef]) -> S.each hrr + + diff --git a/hbs2-core/lib/HBS2/Defaults.hs b/hbs2-core/lib/HBS2/Defaults.hs index bdcc6d9a..d75fea41 100644 --- a/hbs2-core/lib/HBS2/Defaults.hs +++ b/hbs2-core/lib/HBS2/Defaults.hs @@ -55,7 +55,7 @@ defBlockSizeCacheTime :: TimeSpec defBlockSizeCacheTime = toTimeSpec ( 30 :: Timeout 'Seconds ) defRequestLimitSec :: Timeout 'Seconds -defRequestLimitSec = 60 +defRequestLimitSec = 300 defBlockBanTime :: TimeSpec defBlockBanTime = toTimeSpec defBlockBanTimeSec diff --git a/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs b/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs index 4cb28d6b..d8985af1 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs @@ -292,7 +292,7 @@ spawnConnection tp env so sa = liftIO do void $ waitAnyCatchCancel [rd,wr] - -- cleanupConn connId + -- lift $ cleanupConn connId -- gracefulClose so 1000 debug $ "spawnConnection exit" <+> pretty sa diff --git a/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs b/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs index b1b948d8..dcf2e655 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs @@ -23,11 +23,13 @@ import Data.Set (Set) import Data.Set qualified as Set import UnliftIO -data UNIX +data UNIX = UNIX + deriving (Eq,Ord,Show,Generic) {- HLINT ignore "Use newtype instead of data" -} data MessagingUnixOpts = - MUWatchdog Int + MUWatchdog Int + | MUFork deriving (Eq,Ord,Show,Generic,Data) -- FIXME: use-bounded-queues @@ -118,6 +120,11 @@ runMessagingUnix env = do liftIO $ bind sock $ SockAddrUnix (msgUnixSockPath env) liftIO $ listen sock 1 + let doFork = Set.member MUFork (msgUnixOpts env) + + let withSession | doFork = void . async . runResourceT + | otherwise = void . runResourceT + watchdog <- async $ do let mwd = headMay [ n | MUWatchdog n <- Set.toList (msgUnixOpts env) ] @@ -144,26 +151,28 @@ runMessagingUnix env = do atomically $ modifyTVar (msgUnixAccepts env) succ - void $ allocate (pure so) close + withSession do - writer <- async $ forever do - msg <- liftIO . atomically $ readTQueue (msgUnixInbox env) - let len = fromIntegral $ LBS.length msg :: Int - liftIO $ sendAll so $ bytestring32 (fromIntegral len) - liftIO $ sendAll so $ LBS.toStrict msg + void $ allocate (pure so) close - void $ allocate (pure writer) cancel + writer <- async $ forever do + msg <- liftIO . atomically $ readTQueue (msgUnixInbox env) + let len = fromIntegral $ LBS.length msg :: Int + liftIO $ sendAll so $ bytestring32 (fromIntegral len) + liftIO $ sendAll so $ LBS.toStrict msg - link writer + void $ allocate (pure writer) cancel - fix \next -> do - -- FIXME: timeout-hardcode - frameLen <- liftIO $ recv so 4 <&> word32 <&> fromIntegral - frame <- liftIO $ recv so frameLen - atomically $ writeTQueue (msgUnixRecv env) (From (PeerUNIX sa), LBS.fromStrict frame) - now <- getTimeCoarse - atomically $ writeTVar (msgUnixLast env) now - next + link writer + + fix \next -> do + -- FIXME: timeout-hardcode + frameLen <- liftIO $ recv so 4 <&> word32 <&> fromIntegral + frame <- liftIO $ recv so frameLen + atomically $ writeTQueue (msgUnixRecv env) (From (PeerUNIX sa), LBS.fromStrict frame) + now <- getTimeCoarse + atomically $ writeTVar (msgUnixLast env) now + next (_, r) <- waitAnyCatchCancel [run, watchdog] diff --git a/hbs2-core/lib/HBS2/Net/Proto/AnyRef.hs b/hbs2-core/lib/HBS2/Net/Proto/AnyRef.hs new file mode 100644 index 00000000..82f8397c --- /dev/null +++ b/hbs2-core/lib/HBS2/Net/Proto/AnyRef.hs @@ -0,0 +1,35 @@ +{-# Language UndecidableInstances #-} +module HBS2.Net.Proto.AnyRef where + +import HBS2.Prelude +import HBS2.Hash +import HBS2.Base58 +import HBS2.Net.Proto.Types +import HBS2.Data.Types.Refs + +import Data.Maybe (fromMaybe) +import Data.Hashable hiding (Hashed) + +newtype AnyRefKey t s = AnyRefKey (PubKey 'Sign s) + +deriving stock instance IsRefPubKey s => Eq (AnyRefKey n s) + +instance (IsRefPubKey s) => Hashable (AnyRefKey t s) where + hashWithSalt s k = hashWithSalt s (hashObject @HbSync k) + +instance (IsRefPubKey s) => Hashed HbSync (AnyRefKey t s) where + hashObject (AnyRefKey pk) = hashObject ("anyref|" <> serialise pk) + +instance IsRefPubKey s => FromStringMaybe (AnyRefKey t s) where + fromStringMay s = AnyRefKey <$> fromStringMay s + +instance IsRefPubKey s => IsString (AnyRefKey t s) where + fromString s = fromMaybe (error "bad public key base58") (fromStringMay s) + +instance Pretty (AsBase58 (PubKey 'Sign s)) => Pretty (AsBase58 (AnyRefKey t s)) where + pretty (AsBase58 (AnyRefKey k)) = pretty (AsBase58 k) + +instance Pretty (AsBase58 (PubKey 'Sign s)) => Pretty (AnyRefKey n s) where + pretty (AnyRefKey k) = pretty (AsBase58 k) + + diff --git a/hbs2-core/lib/HBS2/Net/Proto/Definition.hs b/hbs2-core/lib/HBS2/Net/Proto/Definition.hs index e1223184..c0db1b9a 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Definition.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Definition.hs @@ -181,7 +181,7 @@ instance HasProtocol L4Proto (RefChanNotify L4Proto) where -- но сообщения должны быть разные, -- тогда и минута нормально. -- возьмем пока 10 секунд - requestPeriodLim = ReqLimPerMessage 10 + requestPeriodLim = NoLimit instance HasProtocol L4Proto (DialReq L4Proto) where type instance ProtocolId (DialReq L4Proto) = 96000 @@ -202,6 +202,14 @@ instance Serialise (RefChanValidate UNIX) => HasProtocol UNIX (RefChanValidate U decode = either (const Nothing) Just . deserialiseOrFail encode = serialise + +instance Serialise (RefChanNotify UNIX) => HasProtocol UNIX (RefChanNotify UNIX) where + type instance ProtocolId (RefChanNotify UNIX) = 0xFFFB0001 + type instance Encoded UNIX = ByteString + decode = either (const Nothing) Just . deserialiseOrFail + encode = serialise + requestPeriodLim = NoLimit + instance MonadIO m => HasNonces (RefChanValidate UNIX) m where type instance Nonce (RefChanValidate UNIX) = BS.ByteString newNonce = do @@ -211,6 +219,9 @@ instance MonadIO m => HasNonces (RefChanValidate UNIX) m where instance HasTimeLimits UNIX (RefChanValidate UNIX) IO where tryLockForPeriod _ _ = pure True +instance HasTimeLimits UNIX (RefChanNotify UNIX) IO where + tryLockForPeriod _ _ = pure True + instance Expires (SessionKey L4Proto (BlockInfo L4Proto)) where expiresIn _ = Just defCookieTimeoutSec diff --git a/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs b/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs index ab7fed9e..9dabb793 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs @@ -21,6 +21,7 @@ import HBS2.Data.Types.Refs import HBS2.Data.Types.SignedBox import HBS2.Actors.Peer.Types import HBS2.Data.Types.Peer +import HBS2.Net.Messaging.Unix (UNIX) import HBS2.Storage import Data.Config.Suckless @@ -67,7 +68,7 @@ data RefChanHeadBlock e = makeLenses 'RefChanHeadBlockSmall type ForRefChans e = ( Serialise ( PubKey 'Sign (Encryption e)) - , Pretty (AsBase58 (PubKey 'Sign (Encryption e))) + -- , Pretty (AsBase58 (PubKey 'Sign (Encryption e))) , FromStringMaybe (PubKey 'Sign (Encryption e)) , Signatures (Encryption e) , Serialise (Signature (Encryption e)) @@ -248,10 +249,23 @@ instance Typeable (RefChanRequest e) => Hashable (EventKey e (RefChanRequest e)) where p = Proxy @(RefChanRequest e) + +data RefChanActionRequest = + RefChanAnnounceBlock HashRef + | RefChanFetch HashRef + deriving stock (Generic) + +instance Serialise RefChanActionRequest + -- принимается, только если соответствует текущему HEAD -- не пишется в журнал + data RefChanNotify e = - Notify (RefChanId e) (SignedBox ByteString e) -- подписано ключом автора + Notify (RefChanId e) (SignedBox ByteString e) -- подписано ключом автора + -- довольно уместно будет добавить эти команды сюда - + -- они постоянно нужны, и это сильно упростит коммуникации + | ActionRequest (RefChanId e) RefChanActionRequest + deriving stock (Generic) instance ForRefChans e => Serialise (RefChanNotify e) @@ -279,7 +293,10 @@ instance ( Serialise (PubKey 'Sign (Encryption e)) , Serialise (Nonce (RefChanValidate e)) ) => Serialise (RefChanValidate e) -instance (ForRefChans e, Pretty (AsBase58 (Nonce (RefChanValidate e)))) => Pretty (RefChanValidate e) where +instance ( ForRefChans e + , Pretty (AsBase58 (Nonce (RefChanValidate e))) + , Pretty (AsBase58 (PubKey 'Sign (Encryption e))) + ) => Pretty (RefChanValidate e) where pretty (RefChanValidate n c d) = case d of Validate r -> pretty "validate" <+> pretty (AsBase58 n) <+> pretty (AsBase58 c) <+> pretty r Accepted r -> pretty "accepted" <+> pretty (AsBase58 n) <+> pretty (AsBase58 c) <+> pretty r @@ -293,6 +310,7 @@ data RefChanAdapter e m = , refChanSubscribed :: RefChanId e -> m Bool , refChanWriteTran :: HashRef -> m () , refChanValidatePropose :: RefChanId e -> HashRef -> m Bool + , refChanNotifyRely :: RefChanId e -> RefChanNotify e -> m () } class HasRefChanId e p | p -> e where @@ -311,6 +329,7 @@ instance HasRefChanId e (RefChanRequest e) where instance HasRefChanId e (RefChanNotify e) where getRefChanId = \case Notify c _ -> c + ActionRequest c _ -> c instance HasRefChanId e (RefChanValidate e) where getRefChanId = rcvChan @@ -747,6 +766,8 @@ refChanRequestProto self adapter msg = do proto = Proxy @(RefChanRequest e) +-- instance Coercible (SignedBox L4Proto + refChanNotifyProto :: forall e s m . ( MonadIO m , Request e (RefChanNotify e) m , Response e (RefChanNotify e) m @@ -769,13 +790,23 @@ refChanNotifyProto :: forall e s m . ( MonadIO m -> RefChanNotify e -> m () +refChanNotifyProto self adapter msg@(ActionRequest rchan a) = do + debug $ "RefChanNotify ACTION REQUEST" + pure () + refChanNotifyProto self adapter msg@(Notify rchan box) = do -- аутентифицируем -- проверяем ACL -- пересылаем всем + sto <- getStorage + peer <- thatPeer proto + debug $ "&&& refChanNotifyProto" <+> pretty self + + let h0 = hashObject @HbSync (serialise msg) + auth <- find (KnownPeerKey peer) id <&> isJust void $ runMaybeT do @@ -784,20 +815,31 @@ refChanNotifyProto self adapter msg@(Notify rchan box) = do guard (self || auth) - (authorKey, bs) <- MaybeT $ pure $ unboxSignedBox0 box + deferred proto do - let refchanKey = RefChanHeadKey @s rchan - headBlock <- MaybeT $ getActualRefChanHead @e refchanKey + guard =<< liftIO (hasBlock sto h0 <&> isNothing) - guard $ checkACL headBlock Nothing authorKey + (authorKey, bs) <- MaybeT $ pure $ unboxSignedBox0 box - -- теперь пересылаем по госсипу - lift $ gossip msg + let refchanKey = RefChanHeadKey @s rchan + headBlock <- MaybeT $ getActualRefChanHead @e refchanKey - trace $ "refChanNotifyProto" <+> pretty (BS.length bs) + guard $ checkACL headBlock Nothing authorKey - -- тут надо заслать во внешнее приложение, - -- равно как и в остальных refchan-протоколах + -- FIXME: garbage-collection-required + liftIO $ putBlock sto (serialise msg) + + -- теперь пересылаем по госсипу + lift $ gossip msg + + debug $ "^^^ refChanNotifyProto" <+> pretty peer <+> pretty h0 + + -- тут надо заслать во внешнее приложение, + -- равно как и в остальных refchan-протоколах + + unless self do + debug $ "^^^ CALL refChanNotifyRely" <+> pretty h0 + lift $ refChanNotifyRely adapter rchan msg where proto = Proxy @(RefChanNotify e) @@ -808,7 +850,7 @@ getActualRefChanHead :: forall e s m . ( MonadIO m , HasStorage m , Signatures s , IsRefPubKey s - , Pretty (AsBase58 (PubKey 'Sign s)) + -- , Pretty (AsBase58 (PubKey 'Sign s)) -- , Serialise (Signature s) , ForRefChans e , HasStorage m @@ -830,14 +872,25 @@ getActualRefChanHead key = do pure hd Nothing -> do - debug "ABOUT TO FIND HEAD" - h <- MaybeT $ liftIO $ getRef sto key - hdblob <- MaybeT $ readBlobFromTree ( getBlock sto ) (HashRef h) - (_, headblk) <- MaybeT $ pure $ unboxSignedBox @(RefChanHeadBlock e) @e hdblob - lift $ update headblk (RefChanHeadBlockKey key) id -- set found head + headblk <- MaybeT $ getRefChanHead sto key debug "HEAD FOUND" pure headblk +getRefChanHead :: forall e s m . ( MonadIO m + , s ~ Encryption e + , ForRefChans e + , Signatures s + ) + => AnyStorage + -> RefChanHeadKey s + -> m (Maybe (RefChanHeadBlock e)) + +getRefChanHead sto k = runMaybeT do + h <- MaybeT $ liftIO $ getRef sto k + hdblob <- MaybeT $ readBlobFromTree ( getBlock sto ) (HashRef h) + (_, headblk) <- MaybeT $ pure $ unboxSignedBox @(RefChanHeadBlock e) @e hdblob + pure headblk + makeProposeTran :: forall e s m . ( MonadIO m , ForRefChans e , Signatures (Encryption e) @@ -879,7 +932,7 @@ instance ForRefChans e => FromStringMaybe (RefChanHeadBlock e) where | (ListVal [SymbolVal "author", LitStrVal s] ) <- parsed ] -instance ForRefChans e => Pretty (RefChanHeadBlock e) where +instance (ForRefChans e, Pretty (AsBase58 (PubKey 'Sign (Encryption e)))) => Pretty (RefChanHeadBlock e) where pretty blk = parens ("version" <+> pretty (view refChanHeadVersion blk)) <> line <> parens ("quorum" <+> pretty (view refChanHeadQuorum blk)) <> line diff --git a/hbs2-core/lib/HBS2/OrDie.hs b/hbs2-core/lib/HBS2/OrDie.hs index 12376b91..8fd3beb6 100644 --- a/hbs2-core/lib/HBS2/OrDie.hs +++ b/hbs2-core/lib/HBS2/OrDie.hs @@ -17,6 +17,12 @@ instance MonadIO m => OrDie m (Maybe a) where Nothing -> liftIO $ die err Just x -> pure x +instance MonadIO m => OrDie m (Either a b) where + type instance OrDieResult (Either a b) = b + orDie mv err = mv >>= \case + Left{} -> liftIO $ die err + Right x -> pure x + instance MonadIO m => OrDie m ExitCode where type instance OrDieResult ExitCode = () orDie mv err = mv >>= \case diff --git a/hbs2-peer/app/Brains.hs b/hbs2-peer/app/Brains.hs index c4f60307..819dfe5c 100644 --- a/hbs2-peer/app/Brains.hs +++ b/hbs2-peer/app/Brains.hs @@ -203,6 +203,7 @@ cleanupPostponed b h = do instance ( Hashable (Peer e) , Pretty (Peer e), Pretty (PeerAddr e) + , Pretty (AsBase58 (PubKey 'Sign (Encryption e))) , e ~ L4Proto , ForRefChans e ) => HasBrains e (BasicBrains e) where @@ -803,6 +804,7 @@ newBasicBrains cfg = liftIO do runBasicBrains :: forall e m . ( e ~ L4Proto , MonadUnliftIO m , ForRefChans e + , Pretty (AsBase58 (PubKey 'Sign (Encryption L4Proto))) ) => PeerConfig -> BasicBrains e diff --git a/hbs2-peer/app/HttpWorker.hs b/hbs2-peer/app/HttpWorker.hs index 704de207..d3b0511a 100644 --- a/hbs2-peer/app/HttpWorker.hs +++ b/hbs2-peer/app/HttpWorker.hs @@ -11,7 +11,7 @@ import HBS2.Events import PeerTypes import PeerConfig -import RefLog ( doRefLogBroadCast ) +import RefLog ( doRefLogBroadCast ) import Data.Functor import Data.ByteString.Lazy qualified as LBS @@ -73,7 +73,7 @@ httpWorker conf pmeta e = do va <- liftIO $ getRef sto (RefLogKey @s ref) maybe1 va (status status404) $ \val -> do text [qc|{pretty val}|] - + post "/reflog" do bs <- LBS.take 4194304 <$> body let msg' = diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 2f3f3588..ac23eba9 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -616,15 +616,18 @@ runPeer opts = Exception.handle (\e -> myException e pause @'Seconds 600 liftIO $ Cache.purgeExpired nbcache - rce <- refChanWorkerEnv conf denv + rce <- refChanWorkerEnv conf penv denv let refChanAdapter = RefChanAdapter { refChanOnHead = refChanOnHeadFn rce , refChanSubscribed = isPolledRef @e brains , refChanWriteTran = refChanWriteTranFn rce , refChanValidatePropose = refChanValidateTranFn @e rce + , refChanNotifyRely = refChanNotifyRelyFn @e rce } + rcw <- async $ liftIO $ runRefChanRelyWorker rce refChanAdapter + let pexFilt pips = do tcpex <- listTCPPexCandidates @e brains <&> HashSet.fromList fset <- forM pips $ \p -> do diff --git a/hbs2-peer/app/PeerTypes.hs b/hbs2-peer/app/PeerTypes.hs index 48bdd439..8acc4bdc 100644 --- a/hbs2-peer/app/PeerTypes.hs +++ b/hbs2-peer/app/PeerTypes.hs @@ -525,9 +525,25 @@ instance (ForGossip e p (PeerM e IO)) => HasGossip e p (PeerM e IO) where broadCastMessage msg instance (ForGossip e p (ResponseM e m), HasGossip e p m) => HasGossip e p (ResponseM e m) where - gossip = lift . gossip + gossip msg = do + that <- thatPeer (Proxy @p) + forKnownPeers $ \pip _ -> do + unless (that == pip) do + request @e pip msg +simpleBlockAnnounce :: forall e m . ( Monad m + , HasPeerNonce e m + ) + => Integer + -> Hash HbSync + -> m (BlockAnnounce e) + +simpleBlockAnnounce size h = do + no <- peerNonce @e + let annInfo = BlockAnnounceInfo 0 NoBlockInfoMeta size h + pure $ BlockAnnounce @e no annInfo + data TRACE1 instance HasLogLevel TRACE1 where diff --git a/hbs2-peer/app/RefChan.hs b/hbs2-peer/app/RefChan.hs index 7b16fb52..5c32f61c 100644 --- a/hbs2-peer/app/RefChan.hs +++ b/hbs2-peer/app/RefChan.hs @@ -7,7 +7,9 @@ module RefChan ( , refChanOnHeadFn , refChanWriteTranFn , refChanValidateTranFn + , refChanNotifyRelyFn , refChanWorker + , runRefChanRelyWorker , refChanWorkerEnv , refChanNotifyOnUpdated ) where @@ -16,8 +18,10 @@ import HBS2.Prelude.Plated import HBS2.Actors.Peer import HBS2.Base58 +import HBS2.Hash import HBS2.Clock import HBS2.Data.Detect +import HBS2.Defaults import HBS2.Data.Types.Refs import HBS2.Data.Types.SignedBox import HBS2.Events @@ -38,27 +42,28 @@ import PeerConfig import BlockDownload import Brains +import Data.Dynamic import Codec.Serialise import Control.Concurrent.STM (flushTQueue) import Control.Exception () -import Control.Monad.Except (throwError, runExceptT) +import Control.Monad.Except () import Control.Monad.Reader import Control.Monad.Trans.Maybe -import Data.ByteString.Lazy (ByteString) -import Data.ByteString.Lazy qualified as LBS import Data.Cache (Cache) import Data.Cache qualified as Cache -import Data.Coerce +import Data.ByteString (ByteString) +import Data.Either import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict qualified as HashMap -import Data.HashSet (HashSet) import Data.HashSet qualified as HashSet import Data.Heap () +import Data.Coerce -- import Data.Heap qualified as Heap import Data.List qualified as List import Data.Maybe import Data.Text qualified as Text import Lens.Micro.Platform +import Data.Generics.Product import UnliftIO import Streaming.Prelude qualified as S @@ -78,30 +83,52 @@ data RefChanValidator = , rcvAsync :: Async () } + +data RefChanNotifier = + RefChanNotifier + { rcnPeer :: Peer UNIX + , rcnInbox :: TQueue (RefChanNotify UNIX) + , rcnAsync :: Async () + } + data RefChanWorkerEnv e = RefChanWorkerEnv - { _refChanWorkerConf :: PeerConfig - , _refChanWorkerEnvDEnv :: DownloadEnv e - , _refChanWorkerEnvHeadQ :: TQueue (RefChanId e, RefChanHeadBlockTran e) - , _refChanWorkerEnvDownload :: TVar (HashMap HashRef (RefChanId e, (TimeSpec, OnDownloadComplete))) - , _refChanWorkerEnvNotify :: TVar (HashMap (RefChanId e) ()) - , _refChanWorkerEnvWriteQ :: TQueue HashRef - , _refChanWorkerValidators :: TVar (HashMap (RefChanId e) RefChanValidator) + { _refChanWorkerConf :: PeerConfig + , _refChanPeerEnv :: PeerEnv e + , _refChanWorkerEnvDEnv :: DownloadEnv e + , _refChanWorkerEnvHeadQ :: TQueue (RefChanId e, RefChanHeadBlockTran e) + , _refChanWorkerEnvDownload :: TVar (HashMap HashRef (RefChanId e, (TimeSpec, OnDownloadComplete))) + , _refChanWorkerEnvNotify :: TVar (HashMap (RefChanId e) ()) + , _refChanWorkerEnvWriteQ :: TQueue HashRef + , _refChanWorkerValidators :: TVar (HashMap (RefChanId e) RefChanValidator) + -- FIXME: peer-addr-to-have-multiple-actors-on-single-box + -- нужно ключом держать Peer e (SockAddr) + -- что бы можно было завести несколько акторов на одном + -- боксе в целях отладки. + , _refChanWorkerNotifiers :: TVar (HashMap (RefChanId e) [RefChanNotifier]) + , _refChanWorkerNotifiersInbox :: TQueue (RefChanNotify e) -- ^ to rely messages from clients to gossip + , _refChanWorkerNotifiersDone :: Cache (Hash HbSync) () + , _refChanWorkerLocalRelyDone :: Cache (Peer UNIX, Hash HbSync) () } makeLenses 'RefChanWorkerEnv refChanWorkerEnv :: forall m e . (MonadIO m, ForRefChans e) => PeerConfig + -> PeerEnv e -> DownloadEnv e -> m (RefChanWorkerEnv e) -refChanWorkerEnv conf de = liftIO $ RefChanWorkerEnv @e conf de +refChanWorkerEnv conf pe de = liftIO $ RefChanWorkerEnv @e conf pe de <$> newTQueueIO <*> newTVarIO mempty <*> newTVarIO mempty <*> newTQueueIO <*> newTVarIO mempty + <*> newTVarIO mempty + <*> newTQueueIO + <*> Cache.newCache (Just defRequestLimit) + <*> Cache.newCache (Just defRequestLimit) refChanOnHeadFn :: forall e m . (ForRefChans e, MonadIO m) => RefChanWorkerEnv e -> RefChanId e -> RefChanHeadBlockTran e -> m () refChanOnHeadFn env chan tran = do @@ -145,6 +172,29 @@ refChanNotifyOnUpdated :: (MonadIO m, ForRefChans e) => RefChanWorkerEnv e -> Re refChanNotifyOnUpdated env chan = do atomically $ modifyTVar (_refChanWorkerEnvNotify env) (HashMap.insert chan ()) + +refChanNotifyRelyFn :: forall e m . ( MonadUnliftIO m + , ForRefChans e, e ~ L4Proto + ) + => RefChanWorkerEnv e + -> RefChanId e + -> RefChanNotify e + -> m () + +refChanNotifyRelyFn env chan msg@(Notify _ (SignedBox k box s)) = do + debug "refChanNotifyRelyFn" + -- FIXME: multiple-hash-object-msg-performance + let h0 = hashObject @HbSync (serialise msg) + void $ runMaybeT do + guard =<< liftIO (Cache.lookup (view refChanWorkerNotifiersDone env) h0 <&> isNothing) + liftIO $ Cache.insert (view refChanWorkerNotifiersDone env) h0 () + + -- RefChanNotifier q _ <- MaybeT $ liftIO (readTVarIO (view refChanWorkerNotifiers env) <&> HashMap.lookup chan) + notifiers <- MaybeT $ liftIO (readTVarIO (view refChanWorkerNotifiers env) <&> HashMap.lookup chan) + forM_ notifiers $ \(RefChanNotifier _ q _) -> do + atomically $ writeTQueue q (Notify @UNIX chan (SignedBox k box s)) + + refChanAddDownload :: forall e m . ( m ~ PeerM e IO , MyPeer e ) @@ -164,16 +214,143 @@ refChanAddDownload env chan r onComlete = do -readLog :: forall m . ( MonadUnliftIO m ) - => AnyStorage - -> HashRef - -> m [HashRef] -readLog sto (HashRef h) = - S.toList_ $ do - walkMerkle h (liftIO . getBlock sto) $ \hr -> do - case hr of - Left{} -> pure () - Right (hrr :: [HashRef]) -> S.each hrr +data NotifyEnv = + NotifyEnv + { _notifyClient :: Fabriq UNIX + , _notifySelf :: Peer UNIX + } + +newtype NotifyProtoM m a = NotifyProto { fromNotifyProto :: ReaderT NotifyEnv m a } + deriving newtype ( Functor + , Applicative + , Monad + , MonadIO + , MonadUnliftIO + , MonadReader NotifyEnv + , MonadTrans + ) + + +runNotifyProtoM :: (MonadIO m, PeerMessaging UNIX) => MessagingUnix -> NotifyProtoM m a -> m a +runNotifyProtoM bus m = runReaderT (fromNotifyProto m) (NotifyEnv (Fabriq bus) (msgUnixSelf bus)) + + +instance Monad m => HasFabriq UNIX (NotifyProtoM m) where + getFabriq = asks _notifyClient + +instance Monad m => HasOwnPeer UNIX (NotifyProtoM m) where + ownPeer = asks _notifySelf + +refChanNotifyRpcProto :: forall e m . ( MonadIO m + , Request e (RefChanNotify e) m + -- , HasPeerNonce UNIX m + , e ~ UNIX + -- , m ~ PeerM e IO + ) + => RefChanWorkerEnv L4Proto + -> RefChanNotify e + -> m () + +refChanNotifyRpcProto env msg@(ActionRequest chan action) = do + + let penv = view refChanPeerEnv env + + case action of + RefChanAnnounceBlock h -> do + debug $ "RefChanNotify: RefChanAnnounceBlock" <+> pretty h + + liftIO $ withPeerM penv $ do + sto <- getStorage + sz' <- liftIO $ hasBlock sto (fromHashRef h) + maybe1 sz' none $ \sz -> do + ann <- simpleBlockAnnounce @L4Proto sz (fromHashRef h) + gossip ann + + pure () + + RefChanFetch h -> do + debug $ "RefChanNotify: RefChanFetch" <+> pretty h + + liftIO $ withPeerM penv $ do + refChanAddDownload env chan h (const $ pure ()) + + where + proto = Proxy @(RefChanNotify e) + +refChanNotifyRpcProto env msg@(Notify chan (SignedBox pk box si)) = do + debug "GOT MESSAGE FROM CLIENT" + atomically $ writeTQueue (view refChanWorkerNotifiersInbox env) (Notify @L4Proto chan (SignedBox pk box si)) + -- тут мы должны переслать всем, кроме отправителя + + let h0 = hashObject @HbSync (serialise msg) + + -- FIXME: squash-this-copypaste + void $ runMaybeT do + notifiers <- MaybeT $ liftIO (readTVarIO (view refChanWorkerNotifiers env) <&> HashMap.lookup chan) + forM_ notifiers $ \(RefChanNotifier peer q _) -> do + let lkey = (peer, h0) + -- guard =<< liftIO (Cache.lookup (view refChanWorkerLocalRelyDone env) lkey <&> isNothing) + -- liftIO $ Cache.insert (view refChanWorkerLocalRelyDone env) lkey () + atomically $ writeTQueue q msg + +refChanWorkerInitNotifiers :: forall e m . ( MonadIO m + , MonadUnliftIO m + , MyPeer e + -- , ForRefChans e + -- , ForRefChans UNIX + -- , m ~ PeerM e IO + , e ~ L4Proto + ) + => RefChanWorkerEnv e + -> m () + + +refChanWorkerInitNotifiers env = do + debug "refChanWorkerInitNotifiers" + + let (PeerConfig syn) = view refChanWorkerConf env + + let notifiers = [ mkV rc x | ListVal [ SymbolVal "notify" + , SymbolVal "refchan" + , LitStrVal rc + , ListVal [ SymbolVal "socket", SymbolVal "unix", LitStrVal x ] + ] <- syn + ] & catMaybes + + forM_ notifiers $ \(rc, sa) -> do + debug $ "** NOTIFIER FOR" <+> pretty (AsBase58 rc, sa) + + q <- newTQueueIO + val <- async $ liftIO $ notifierThread rc sa q + + let rcn = RefChanNotifier (fromString sa) q val + + atomically $ modifyTVar (_refChanWorkerNotifiers env) (HashMap.insertWith (<>) rc [rcn]) + + where + mkV :: Text -> Text -> Maybe (RefChanId e, String) + mkV rc x = (,Text.unpack x) <$> fromStringMay @(RefChanId e) (Text.unpack rc) + + + notifierThread _ sa q = do + + debug $ ">>> NOTIFIER THREAD FOR" <+> pretty sa + + client <- newMessagingUnix False 1.0 sa + msg <- async $ runMessagingUnix client + + runNotifyProtoM client do + proto <- async $ runProto [ makeResponse (refChanNotifyRpcProto env) ] + + forever do + req <- atomically $ readTQueue q + debug "Rely notification request" + request @UNIX (fromString sa) req + + wait proto + + mapM_ wait [msg] + data ValidateEnv = ValidateEnv @@ -181,7 +358,7 @@ data ValidateEnv = , _validateSelf :: Peer UNIX } -newtype ValidateProtoM m a = PingPongM { fromValidateProto :: ReaderT ValidateEnv m a } +newtype ValidateProtoM m a = ValidateProto { fromValidateProto :: ReaderT ValidateEnv m a } deriving newtype ( Functor , Applicative , Monad @@ -226,7 +403,6 @@ refChanValidateProto waiters msg = do maybe1 mbAnsw none $ \answ -> do putMVar answ m - refChanWorkerInitValidators :: forall e m . ( MonadIO m , MonadUnliftIO m -- , MyPeer e @@ -276,6 +452,9 @@ refChanWorkerInitValidators env = do -- FIXME: hardcoded-timeout waiters <- Cache.newCache (Just (toTimeSpec (10 :: Timeout 'Seconds))) + -- FIXME: hardcoded-timeout + waiters <- Cache.newCache (Just (toTimeSpec (10 :: Timeout 'Seconds))) + runValidateProtoM client do poke <- async $ forever do @@ -302,6 +481,13 @@ refChanWorkerInitValidators env = do let pa = fromString sa request pa req + Poke{} -> do + debug "DO SEND POKE" + let pa = fromString sa + pure () + -- + -- request pa req + _ -> pure () @@ -311,6 +497,22 @@ refChanWorkerInitValidators env = do cancel msg warn $ "validatorThread is terminated for some reasons" <+> pretty (AsBase58 chan) +runRefChanRelyWorker :: forall e m . + ( MonadIO m + , m ~ PeerM e IO + , e ~ L4Proto + ) + => RefChanWorkerEnv e + -> RefChanAdapter e (ResponseM e m) + -> IO () + +runRefChanRelyWorker env adapter = liftIO $ forever do + withPeerM (view refChanPeerEnv env) do + me <- ownPeer @e + -- FIXME: use-bounded-queue-ASAP + mess <- atomically $ readTQueue (view refChanWorkerNotifiersInbox env) + runResponseM me $ do + refChanNotifyProto True adapter mess refChanWorker :: forall e s m . ( MonadIO m , MonadUnliftIO m @@ -361,6 +563,7 @@ refChanWorker env brains = do sto <- getStorage liftIO $ refChanWorkerInitValidators env + liftIO $ refChanWorkerInitNotifiers env subscribe @e RefChanRequestEventKey $ \(RefChanRequestEvent chan val) -> do debug $ "RefChanRequestEvent" <+> pretty (AsBase58 chan) <+> pretty val @@ -443,7 +646,7 @@ refChanWorker env brains = do forM_ (HashMap.toList byChan) $ \(c,new) -> do mbLog <- liftIO $ getRef sto c - hashes <- maybe1 mbLog (pure mempty) $ readLog sto . HashRef + hashes <- maybe1 mbLog (pure mempty) $ readLog (getBlock sto) . HashRef -- FIXME: might-be-problems-on-large-logs let hashesNew = HashSet.fromList (hashes <> new) & HashSet.toList @@ -575,6 +778,7 @@ logMergeProcess :: forall e s m . ( MonadUnliftIO m , ForRefChans e , HasStorage m , Signatures s + , Pretty (AsBase58 (PubKey 'Sign s)) , s ~ Encryption e , m ~ PeerM e IO ) @@ -637,18 +841,25 @@ logMergeProcess env q = do penv <- ask + let readFn = getBlock sto + void $ runMaybeT do let chanKey = RefChanLogKey @s chan - h <- MaybeT $ liftIO $ getRef sto chanKey + -- FIXME: wont-work-if-no-reference-yet + -- не сработает если ссылка новая + (mergeSet, merge) <- liftIO (getRef sto chanKey) >>= \case + Nothing -> do + new <- mconcat <$> mapM (lift . readLog readFn) logs + pure (HashSet.fromList new, not (List.null new)) - current <- lift $ readLog sto (HashRef h) <&> HashSet.fromList - - -- trans <- filter (not . flip HashSet.member current) . mconcat <$> mapM (lift . readLog sto) logs - trans <- mconcat <$> mapM (lift . readLog sto) (filter (/= HashRef h) logs) - - guard (not $ List.null trans) + Just h -> do + current <- lift $ readLog readFn (HashRef h) <&> HashSet.fromList + new <- mconcat <$> mapM (lift . readLog readFn) (filter (/= HashRef h) logs) + let mergeSet = HashSet.fromList new <> current + pure (mergeSet, not (List.null new)) + guard merge -- итак, тут приехал весь лог, который есть у пира -- логично искать подтверждения только в нём. если @@ -667,15 +878,15 @@ logMergeProcess env q = do -- потом, если всё ок -- пишем accept-ы и propose-ы у которых -- больше quorum подтверждений для актуальной головы - let mergeSet = (HashSet.fromList trans <> current) & HashSet.toList + let mergeList = HashSet.toList mergeSet -- если какие-то транзакции отсутствуют - пытаемся их скачать -- и надеемся на лучшее (лог сойдется в следующий раз) - forM_ mergeSet $ \href -> do + forM_ mergeList $ \href -> do mblk <- liftIO $ getBlock sto (fromHashRef href) maybe1 mblk (lift $ refChanAddDownload env chan href dontHandle) dontHandle - r <- forM mergeSet $ \href -> runMaybeT do + r <- forM mergeList $ \href -> runMaybeT do blk <- MaybeT $ liftIO $ getBlock sto (fromHashRef href) diff --git a/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs b/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs index d701456c..210c456d 100644 --- a/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs +++ b/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs @@ -377,8 +377,8 @@ simpleReadLinkRaw ss hash = do let fn = simpleRefFileName ss hash rs <- spawnAndWait ss $ do -- FIXME: log-this-situation - (Just <$> LBS.readFile fn) `catchAny` \_ -> do - err $ "simpleReadLinkRaw" <+> pretty hash <+> pretty fn + (Just <$> LBS.readFile fn) `catchAny` \e -> do + err $ "simpleReadLinkRaw" <+> pretty hash <+> pretty fn <+> viaShow e pure Nothing pure $ fromMaybe Nothing rs @@ -396,8 +396,8 @@ simpleReadLinkVal :: ( IsKey h simpleReadLinkVal ss hash = do let fn = simpleRefFileName ss hash rs <- spawnAndWait ss $ do - (Just <$> BS.readFile fn) `catchAny` \_ -> do - err $ "simpleReadLinkVal" <+> pretty hash <+> pretty fn + (Just <$> BS.readFile fn) `catchAny` \e -> do + err $ "simpleReadLinkVal" <+> pretty hash <+> pretty fn <+> viaShow e pure Nothing runMaybeT do @@ -422,7 +422,7 @@ instance ( MonadIO m, IsKey hash updateRef ss ref v = do let refHash = hashObject @hash ref - -- liftIO $ print $ "updateRef:" <+> pretty refHash + debug $ "updateRef:" <+> pretty refHash void $ liftIO $ simpleWriteLinkRawRef ss refHash v getRef ss ref = do diff --git a/hbs2/Main.hs b/hbs2/Main.hs index 56b87dc1..656f17a5 100644 --- a/hbs2/Main.hs +++ b/hbs2/Main.hs @@ -10,6 +10,7 @@ import HBS2.Net.Auth.AccessKey import HBS2.Net.Auth.Credentials import HBS2.Net.Proto.Definition() import HBS2.Net.Proto.RefLog(RefLogKey(..)) +import HBS2.Net.Proto.AnyRef(AnyRefKey(..)) import HBS2.Prelude.Plated import HBS2.Storage.Simple import HBS2.Storage.Simple.Extra @@ -356,6 +357,13 @@ runRefLogGet s ss = do print $ pretty ref exitSuccess +runAnyRefGet :: forall s t . IsRefPubKey s => AnyRefKey t s -> SimpleStorage HbSync -> IO () +runAnyRefGet s ss = do + ref' <- getRef ss s + maybe1 ref' exitFailure $ \ref -> do + print $ pretty ref + exitSuccess + withStore :: Data opts => opts -> ( SimpleStorage HbSync -> IO () ) -> IO () withStore opts f = do @@ -473,6 +481,14 @@ main = join . customExecParser (prefs showHelpOnError) $ reflogs <- strArgument ( metavar "REFLOG" ) pure $ withStore o (runRefLogGet @HBS2Basic reflogs) + + pAnyRef = hsubparser ( command "get" (info pAnyRefGet (progDesc "get anyref value") ) ) + + pAnyRefGet = do + o <- common + anyref <- strArgument ( metavar "ANYREF" ) + pure $ withStore o (runAnyRefGet @HBS2Basic anyref) + pFsck = do o <- common pure $ withStore o $ \sto -> do diff --git a/test/refchan/author1.key b/test/refchan/author1.key new file mode 100644 index 00000000..d974d284 --- /dev/null +++ b/test/refchan/author1.key @@ -0,0 +1,6 @@ +# hbs2 credentials file +# keep it private + +rmbTHUgvHqhSEsxHBwheyJbnhxMpzrBoTYd7pPkwL5k3haMDRChR4e38FYqz +xmJPbJPV1UhxBszq91uSgp5vrSxaTWfVqGREE5f1Z3ySQJhBMoSr2wSGsCjF +vaAzkBK9c3GrRr4kQB5yqTAko1 diff --git a/test/refchan/author2.key b/test/refchan/author2.key new file mode 100644 index 00000000..71d249d9 --- /dev/null +++ b/test/refchan/author2.key @@ -0,0 +1,6 @@ +# hbs2 credentials file +# keep it private + +rmbTHUgvbsubKopKR5wWpsMFKpCPbtUQ1zcMoLa5xJfy6RgPfY8k5EVC1CXR +BjgaoXEqtFwcshGrrkXoBqsRpVwBLEgeZcLuMGxe23PxCzJoqZciovbeVCuH +45ApvdBFzeTcppYSZ9Q6GQqWFq diff --git a/test/refchan/author3.key b/test/refchan/author3.key new file mode 100644 index 00000000..d8036f90 --- /dev/null +++ b/test/refchan/author3.key @@ -0,0 +1,6 @@ +# hbs2 credentials file +# keep it private + +rmbTHUgvSXaWW2Zx1RtLbGq5qYFRAzbs1GFv5eCzcypKrAsBdPZMpZUgZ42C +e9kS7oUM2r82jfesa9B22MiAfjkfKzwMN8anEj4KAJfPMYVCvTM7DraqQhtV +CfLooy4smrB9scBEznvWfCGPSw diff --git a/test/refchan/owner.key b/test/refchan/owner.key new file mode 100644 index 00000000..40a6090a --- /dev/null +++ b/test/refchan/owner.key @@ -0,0 +1,6 @@ +# hbs2 credentials file +# keep it private + +rmbTHUgvShBTLfScwWwDj9c8MNF77Q8gwRvBTL9L3dBK57pzCsnD9FRyHnqa +sMHzNDVgrYqMqm1R9obVNSr7yxXs8uW6T2xw6dZ2n6fDACaswcQrUm46j5xQ +qTyGWUvLAQF2FoJcYcNPTZkpqD