qblf merged

- Requires testing and bugfixing
- Needs high CPU usage investigation (polling/packet flood?)
- Bundles are downloading repeatedly, that is weird
This commit is contained in:
Dmitry Zuikov 2023-09-21 05:57:20 +03:00
parent a5628a19ed
commit a980569ce4
42 changed files with 2992 additions and 88 deletions

View File

@ -11,6 +11,7 @@ fixme-files **/*.hs docs/devlog.md
fixme-files docs/pep*.txt
fixme-files docs/drafts/**/*.txt
fixme-files docs/notes/**/*.txt
fixme-files docs/pr/**/*.txt
fixme-files docs/todo/**/*.txt
fixme-files docs/notes/**/*.txt

View File

@ -1,3 +1,11 @@
## 2023-14-08
PR: hbs2-git-config-location
branch: hbs2-git-fastpok
commit: 82d5a30f4510b3f70bfbf2bc208b14df4ce2b17d
Изменено расположение файла конфигурации hbs2-git,
незначительный рефакторинг.
## 2023-07-30
какие-то косяки
@ -1311,3 +1319,4 @@ PR: bus-crypt
Шифрование протокола общения нод.
Обмен асимметричными публичными ключами выполняется на стадии хэндшейка в ping/pong.
Для шифрования данных создаётся симметричный ключ по diffie-hellman.

View File

@ -1,3 +1,229 @@
NOTE: bundle-exchange-protocol
Проблема:
Долгое скачивание множества маленьких объектов, особенно при
работе со структурами, которые определяются через множество
ссылок ([HashRef]). Такой список сам по себе разбит на блоки,
блоки на чанки, то, на что он ссылается так же разбито, каждый
блок это несколько запросов в pull-модели (узнать размер,
запросить чанки и т.д).
Решения:
Данная проблема уже встречалась в hbs2-git и была решена
переходом от модели "один объект git --- одно дерево merkle" к
модели "один журналов push git --- одно дерево merkle". Таким
образом, стейт тут является объединением множества журналов,
каждый из которых содержит объекты git, отсутствующие в
известных журналах (предыдущий стейт).
Это радикально ускорило синхронизацию, однако ввело оверхед по
данным, и мы потеряли связь 1-1 между объектом git и объектом
(деревом?) hbs2.
Так же данная проблема встретилась в реализации QBLF при
большом потоке транзакций и будет встречаться всегда, когда мы
выражаем состояние, как множество ссылок на мелкие объекты.
Более общим решением видится введение некоего примитива bundle,
который будет собирать объекты в журнал, состоящий из секций,
каждая секция соответствует отдельному блоку. По факту, такой
журнал собирается из списка HashRef, HashRef указывает на
конкретный блоб.
Данный подход имеет смысл, когда требуется передать множество
мелких объектов. Передача же крупных объектов таким образом так
же возможна, однако она даёт возможность для злоупотреблений
(блоки огромного размера вместо деревьев). В принципе, бороться
с этим можно, так как в каждой секции у нас есть размер, и
можно принудительно объекты большого размера сохранять, как
деревья (?), однако тогда мы больше не увидим хэша данного
объекта, который передан в секции. Который, однако, является
опциональным.
Это ставит перед нами вопрос, как скачивать эти бандлы.
Текущй алгоритм скачивания парсит блоки, и если блок
является деревом меркля списка ссылок (MTree [HashRef]),
то он попытается скачать и каждый блок, на который указывает
HashRef.
Если **перед** этим мы узнаем, что для MTRee [HashRef] есть
бандл, мы можем выкачать и импортировать его.
Если объектов много --- то это плюс. Но если их мало/средне,
то может возникнуть ситуация, что мы скачаем и объекты, и их
журнал.
Как с этим бороться без очень сложных настроек/взаимоотношений
процессов --- пока непонятно.
Частное решение для QBLF может быть таким: стейт есть хэш
конкретного Bundle.
Тогда мы:
Новое состояние:
- создаём bundle
- рассылаем анонс стейта как обычно
Мерж:
- читаем все бандлы (состояния) --
тут у нас есть и транзакции и их хэши
- делаем новый бандл после мержа транзакций
из всех бандлов
Остальное:
- как было
Плюсы:
- мы не дублируем данные, если не хотим (можем не
импортировать объекты из бандлов)
- можем импортировать, но только при переходе в
состояние
- скачивание объектов будет быстрое
- алгоритм мержа усложнится незначительно
- не нужно сейчас решать, что делать с протоколом
обмена бандлами и как его дизайнить
Минусы:
- это опять, скорее, частное решение, но за счёт
этого мы избегаем оверхедов
- переписывание всего журнала при переходе
к новому стейту?
- возможен подъем всего стейта в память (а не по одной
транзакции, более того -- сейчас так и будет)
Более общее решение:
- Создаём новый стейт, как [HashRef]
- Для него --- создаём и анонсируем бандл
- Бандл скачивается отдельно и самостоятельно
- После импорта бандла его можно, в принципе, удалять
- Можно подержать какое-то время, что бы раздать другим
пирам
- Бандл, вероятно, скачается быстрее, чем будет рекурсивно
скачиваться меркл-дерево множества ссылок
- Можно попробовать вообще не качать дерево, а качать только
его бандл, а само дерево из него реконструировать (???),
тогда уходим от оверхеда при скачивании
- Получив анонс стейта, нам бы как-то узнать хэш его бандла, что бы
скачать бандл.
- Анонсы бандлов можно, всё ж таки, рассылать через Notify,
это не заденет остальные части системы и не надо будет
делать новый протокол
- Но этот протокол может был бы полезен в других
частях
Плюсы:
- Стейт остаётся [HashRef], то есть не требует
подъема больших объектов в память
- Алгоритм merge вообще не меняется
- В других кейсах может пригодиться
Минусы:
- Явно сложнее
- Со ссылками возникают вопросы византийского поведения:
например, ноды могут фальсифицировать ссылки или просто
возвращать мусор
- Если в ключ ссылки включить публичный ключ пира
(актора?), то можно хотя бы убедиться, что данная ссылка
целостна. И тогда у того, кто её распространяет --- не
получится её фальсифицировать, только игнорировать.
Можно вставлять не в ключ, а в значение, и его
подписывать. Таким образом, это обычный SignedBox.
- В контексте QBLF это повлияет только на скорость
распространения стейта, т.е если мы не достигаем
консенсуса по значению ссылки -- то рано или поздно
журнал выкачается и так
- Если это ссылка неизвестно на что, то откуда
SomeRefProto узнает, что значение надо скачивать
и импортировать?
Отвечаем: Это будет, допустим, (SignedBox Bundle)
и протокол, который качает, допустим, попробует
распарсить и если это Bundle - то качать его HashRef.
- А как он, например, его будет импортировать, когда
Bundle скачается?
Отвечаем: допустим, получили Bundle -- запускаем
DownloadMonitor с нужной периодичностью, время от
времени поллим, как поняли, что скачалось ---
запускаем импорт, т.к. процесс импорта передаём
в этот самый монитор (ой мама). Что-то подобное
в любом случае надо бы/надо было сделать
Другое более общее решение:
У нас уже есть механизм BlockAnnounce. Если мы принимаем
BlockAnnounce, то там можно добавить блок типа "Значение ссылки:
ссылка на бандл". Тогда, если мы получили такой блок - мы можем
добавить его в Detect, и если там бандл - то качать этот самый
бандл.
Это избавит нас от необходимости делать полную реализацию
протокола обмена ссылками, который довольно спорный пока что.
Итак, что мы добавляем: новый тип, возможно, "Ссылка на bundle"
. тип этот помещаем в блок, например.
Получив этот блок, например, даунлоадер начинает его скачивать,
а так же, допустим, вешает монитор - что бы импортировать
объекты по завершению процесса.
Тогда мы решаем в рамках текущего BlockAnnounce, с применением
тех политик, что там используются, что не помешает нам
реализовать механизм обмена ссылками, буде до него дойдет.
Возникает, кстати, проблема, что делать с шифрованием.
Ответ -- или класть уже зашифрованные блоки (а откуда ключ
брать для расшифровки?)
Или ввести обёртку EncryptedBundle или сразу сделать у
Bundle метаинформацию, и там ссылку на ключ передавать
TODO: bundle-announce
TODO: bundle-value-block
TODO: bundle-value-block-to-detect
TODO: bundle-value-block-to-downlooad-bundle
TODO: bundle-value-block-to-imoport-on-downloaded
Базовая реализация бандлов

83
docs/refchan/howto.txt Normal file
View File

@ -0,0 +1,83 @@
## Init refchan
1. Generate owner's keypair
```
hbs2 keyring-new > owner.key
hbs2 keyrint-list owner.key
sign-key: Atg67E6CPMJWKvR9BvwZTTEjg3Hjz4CYCaEARGANepGP
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
``
2. Generate (or obtain) author's keys
3. Create refchan head config, like:
```
cat > refchan.cfg
(version 16) ;; version
(quorum 2) ;; required answers to set up a quorum
(wait 15) ;; time in seconds for round to complete
;; peer's key authoried to rely/subscribe messages
(peer "35gKUG1mwBTr3tQpjWwR2kBYEnDmHxesoJL5Lj7tMjq3" 1)
;; same
(peer "5GnroAC8FXNRL8rcgJj6RTu9mt1AbuNd5MZVnDBcCKzb" 1)
;; author (key, authorized to post messages)
(author "2cU7qBWpohfco4BcbHGPjF6ypGdqDwpKomp8ky6QAEBy")
(author "EoPuukyDLeaZm3vpN3CAuZfjhrYBh6fVyWXcXueCK4i8")
```
4. Make the peer "listen" the refchan
```
cat >> ~/.config/hbs2-peer/config
poll refchan 5 "5ZHZkatu1GeeHybdBms6xFFBWti1cqJtKAjiMmtDT6XQ"
```
5. Set up the head block
```
hbs2-peer refchan gen ./refchan.cfg -k owner.key | hbs2 store
merkle-root: FJ2Lj1kB4oFf8F3rL1xv3gaG5kzrPMmE2hPm5oQziLy5
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
hbs2-peer refchan post FJ2Lj1kB4oFf8F3rL1xv3gaG5kzrPMmE2hPm5oQziLy5
```
6. Check the head block:
```
hbs2 keyring-list owner.key
sign-key: Atg67E6CPMJWKvR9BvwZTTEjg3Hjz4CYCaEARGANepGP
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
hbs2-peer refchan head get Atg67E6CPMJWKvR9BvwZTTEjg3Hjz4CYCaEARGANepGP
FJ2Lj1kB4oFf8F3rL1xv3gaG5kzrPMmE2hPm5oQziLy5
hbs2 cat FJ2Lj1kB4oFf8F3rL1xv3gaG5kzrPMmE2hPm5oQziLy5 | hbs2-peer refchan head dump
(version 16)
(quorum 2)
(wait 15)
(peer "35gKUG1mwBTr3tQpjWwR2kBYEnDmHxesoJL5Lj7tMjq3" 1)
(peer "5GnroAC8FXNRL8rcgJj6RTu9mt1AbuNd5MZVnDBcCKzb" 1)
(author "EoPuukyDLeaZm3vpN3CAuZfjhrYBh6fVyWXcXueCK4i8")
(author "2cU7qBWpohfco4BcbHGPjF6ypGdqDwpKomp8ky6QAEBy")
```

View File

@ -0,0 +1,22 @@
TODO: refchan-qblf-slow-transaction-propagation
на большом количестве одновременных транзакций (порядка сотен),
начинает залипать скачивание.
Та же самая проблема: много маленьких транзакций. Кажется,
надо сразу делать более эффективный механизм (а какой?)
TODO: refchan-qblf-fantom-states
поскольку анонсы/мержи не прибиваются сразу,
на следующих раундах появляются фантомные состояния.
кажется, надо игнорировать стейты, в которых мы уже
были, и убивать таблицу этих состояний, когда этих
состояний не осталось.
как проверить: запустить массовый флуд транзакциями,
должны исчезнуть "предыдущие" состояния.
TODO: non-atomic-blocks-processing
недокачанный блок может частично обработаться,
и появляются всякие левые транзакции/состояния.
Удивительным образом потом они уходят, но засоряют
эфир в процессе

View File

View File

@ -0,0 +1,854 @@
{-# Language TemplateHaskell #-}
{-# Language UndecidableInstances #-}
{-# Language AllowAmbiguousTypes #-}
module Main where
import HBS2.Prelude
import HBS2.Defaults
import HBS2.Merkle
import HBS2.Hash
import HBS2.Clock
import HBS2.Base58
import HBS2.OrDie
import HBS2.Data.Types.Refs
import HBS2.Net.Proto.Types
import HBS2.Actors.Peer
import HBS2.Net.Proto.RefChan
import HBS2.Net.Proto.AnyRef
import HBS2.Data.Types.SignedBox
import HBS2.Net.Messaging.Unix
import HBS2.Net.Proto.Definition
import HBS2.Data.Bundle
import HBS2.Net.Auth.Credentials
import HBS2.Data.Detect
import HBS2.Actors.Peer.Types()
import HBS2.Storage.Simple
import HBS2.System.Logger.Simple
import QBLF.Proto
import Demo.QBLF.Transactions
import Data.Config.Suckless
import Data.Config.Suckless.KeyValue
import Data.Ord
import Control.Monad.Trans.Maybe
import Codec.Serialise
import Control.Monad.Reader
import Data.ByteString(ByteString)
import Data.ByteString.Char8 qualified as BS
import Data.ByteString.Lazy qualified as LBS
import Data.Functor
import Data.List qualified as List
import Lens.Micro.Platform hiding ((.=))
import Options.Applicative hiding (info)
import Options.Applicative qualified as O
import System.Directory
import Data.HashSet (HashSet)
import Data.HashSet qualified as HashSet
import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HashMap
import Data.Maybe
import Data.Word
import System.Random
import UnliftIO
import Data.Time.Clock.POSIX (getPOSIXTime)
import Data.Aeson hiding (json)
import Web.Scotty hiding (request,header)
import Web.Scotty qualified as Scotty
import Network.HTTP.Types.Status
import Data.Monoid (mconcat)
import Data.Cache (Cache)
import Data.Cache qualified as Cache
import Control.Monad.Except
import Streaming.Prelude qualified as S
{- HLINT ignore "Use newtype instead of data" -}
-- TODO: config
-- сделать конфиг, а то слишком много уже параметров в CLI
data HttpPortOpt
data RefChanOpt
data SocketOpt
data ActorOpt
data DefStateOpt
data StateRefOpt
data QBLFRefKey
type MyRefKey = AnyRefKey QBLFRefKey HBS2Basic
instance Monad m => HasCfgKey HttpPortOpt (Maybe Int) m where
key = "http"
instance {-# OVERLAPPING #-} (HasConf m, HasCfgKey HttpPortOpt (Maybe Int) m) => HasCfgValue HttpPortOpt (Maybe Int) m where
cfgValue = val <$> getConf
where
val syn = lastMay [ fromIntegral e
| ListVal @C (Key s [LitIntVal e]) <- syn, s == key @HttpPortOpt @(Maybe Int) @m
]
instance Monad m => HasCfgKey RefChanOpt (Maybe String) m where
key = "refchan"
instance Monad m => HasCfgKey SocketOpt (Maybe String) m where
key = "socket"
instance Monad m => HasCfgKey ActorOpt (Maybe String) m where
key = "actor"
instance Monad m => HasCfgKey DefStateOpt (Maybe String) m where
key = "default-state"
instance Monad m => HasCfgKey StateRefOpt (Maybe String) m where
key = "state-ref"
class ToBalance e tx where
toBalance :: tx -> [(Account e, Amount)]
tracePrefix :: SetLoggerEntry
tracePrefix = toStderr . logPrefix "[trace] "
debugPrefix :: SetLoggerEntry
debugPrefix = toStderr . logPrefix "[debug] "
errorPrefix :: SetLoggerEntry
errorPrefix = toStderr . logPrefix "[error] "
warnPrefix :: SetLoggerEntry
warnPrefix = toStderr . logPrefix "[warn] "
noticePrefix :: SetLoggerEntry
noticePrefix = toStderr . logPrefix "[notice] "
infoPrefix :: SetLoggerEntry
infoPrefix = toStdout . logPrefix ""
silently :: MonadIO m => m a -> m ()
silently m = do
setLoggingOff @DEBUG
setLoggingOff @INFO
setLoggingOff @ERROR
setLoggingOff @WARN
setLoggingOff @NOTICE
void m
withLogging :: MonadIO m => m a -> m ()
withLogging m = do
-- setLogging @TRACE tracePrefix
setLogging @DEBUG debugPrefix
setLogging @INFO infoPrefix
setLogging @ERROR errorPrefix
setLogging @WARN warnPrefix
setLogging @NOTICE noticePrefix
m
setLoggingOff @DEBUG
setLoggingOff @INFO
setLoggingOff @ERROR
setLoggingOff @WARN
setLoggingOff @NOTICE
data MyEnv =
MyEnv
{ mySelf :: Peer UNIX
, myFab :: Fabriq UNIX
, myChan :: RefChanId UNIX
, myRef :: MyRefKey
, mySto :: AnyStorage
, myCred :: PeerCredentials HBS2Basic
, myHttpPort :: Int
, myFetch :: Cache HashRef ()
}
newtype App m a = App { fromApp :: ReaderT MyEnv m a }
deriving newtype ( Functor
, Applicative
, Monad
, MonadIO
, MonadUnliftIO
, MonadReader MyEnv
, MonadTrans
)
runApp :: (MonadIO m, PeerMessaging UNIX) => MyEnv -> App m a -> m a
runApp env m = runReaderT (fromApp m) env
instance Monad m => HasFabriq UNIX (App m) where
getFabriq = asks myFab
instance Monad m => HasOwnPeer UNIX (App m) where
ownPeer = asks mySelf
instance Monad m => HasStorage (App m) where
getStorage = asks mySto
data ConsensusQBLF
data StateQBLF = StateQBLF { fromStateQBLF :: HashRef }
--
-- TODO: state-propagation
-- Как лучше сделать?
-- Нужно:
-- 1. сохранить дерево множества транзакций
--
-- 1.1 Сейчас есть: http для сохранения одного блока
-- Надо: сохранить дерево целиком
-- Есть пример, можно скопипастить
-- Минус: может быть не быстро на больших блоках
--
--
-- 1.2 Сделать unix-socket ручку на добавление сразу дерева?
--
-- (-1) Доработка hbs2-peer сразу новый worker
-- (+1) Быстрое добавление дерева целиком
-- (-1) Управление через UNIX сокеты выглядит сложнее
--
-- 2. распространить блок (announce)
--
-- + ручка API в hbs2-peer (announce)
-- +
-- TODO: implementation-plan-transaction
-- определить транзакцию
-- допустим на шаге 1 - эмиссия: emit(acc, amount)
-- где acc - аккаунт, на который осуществляется эмиссия
-- amount - количество токенов (целое!)
-- proof - key(emit(acc, amount)) == key(refchan)
-- и подпись соответствует.
--
-- теперь - как мы получаем эту транзакцию?
-- предусловия: приватный ключ не дожен экспонироваться
--
-- как будет делаться транза IRL?
-- безопасно:
-- 1. сгенерили CBOR на клиенте
-- 2. подписали
-- 3. запостили
--
-- реально:
-- 1. сгенерили CBOR на адаптере, ключ на адаптере
-- 2. подписали
-- 3. запостили
--
-- что видим: адаптер = эмуляция "правильного" клиента
--
-- нужен способ, который будет удобен и через CLI и
-- внешнему приложению (адаптеру).
--
-- адаптер пока на хаскелле, просто делаем библиотечный код
-- и CLI обвязку.
--
-- добавить обвязку для HTTP (scotty)
-- решить, сразу CBOR или JSON
-- (+ за json - просто отправлять
-- (- за json - подписывать-то как?)
--
-- надо определить простой и единоообразный способ получения
-- и постинга подписанных транзакций.
--
-- что приводит нас к дисцплине хранения ключа (кто хранит?
-- если никто - то транза должна быть всегда подписана,
-- следовательно, json неудобен)
--
--
data MyError =
DeserializationError | SignatureError | TxUnsupported | SomeOtherError
deriving stock (Eq,Ord,Show)
check :: MonadIO m => MyError -> Either e a -> ExceptT MyError m a
check w = \case
Right x -> ExceptT $ pure (Right x)
Left{} -> ExceptT $ pure (Left w)
fiasco :: MonadIO m => MyError -> ExceptT MyError m a
fiasco x = ExceptT $ pure $ Left x
ok :: MonadIO m => a -> ExceptT MyError m a
ok x = ExceptT $ pure $ Right x
type ForConsensus m = (MonadIO m, Serialise (QBLFMessage ConsensusQBLF))
instance Serialise (QBLFMerge ConsensusQBLF)
instance Serialise (QBLFMessage ConsensusQBLF)
instance Serialise (QBLFAnnounce ConsensusQBLF)
instance Serialise (QBLFCommit ConsensusQBLF)
instance Monad m => HasTimeLimits UNIX (RefChanNotify UNIX) (App m) where
tryLockForPeriod _ _ = pure True
instance (ForConsensus m) => IsQBLF ConsensusQBLF (App m) where
type QBLFActor ConsensusQBLF = Actor L4Proto
type QBLFTransaction ConsensusQBLF = QBLFDemoToken L4Proto
type QBLFState ConsensusQBLF = DAppState
qblfMoveForward _ s1 = do
env <- ask
fetchMissed env s1
pure True
qblfNewState (DAppState h0) txs = do
sto <- asks mySto
chan <- asks myChan
self <- asks mySelf
creds <- asks myCred
let sk = view peerSignSk creds
let pk = view peerSignPk creds
-- let block = serialise (HashSet.toList $ HashSet.fromList txs)
hashes <- liftIO $ mapM (putBlock sto . serialise) txs <&> catMaybes
-- пробуем разослать бандлы с транзакциями
runMaybeT do
ref <- MaybeT $ createBundle sto (fmap HashRef hashes)
let refval = makeBundleRefValue @L4Proto pk sk (BundleRefSimple ref)
r <- MaybeT $ liftIO $ putBlock sto (serialise refval)
lift $ request self (ActionRequest @UNIX chan (RefChanAnnounceBlock (HashRef r)))
current <- readLog (getBlock sto) h0
-- основная проблема в том, что мы пересортировываем весь state
-- однако, если считать его уже отсортированным, то, может быть,
-- все будет не так уж плохо.
-- так-то мы можем вообще его на диске держать
let new = HashSet.fromList ( current <> fmap HashRef hashes )
let pt = toPTree (MaxSize 256) (MaxNum 256) (HashSet.toList new)
root <- if List.null hashes then do
pure h0
else do
r <- makeMerkle 0 pt $ \(hx,_,bs) -> do
th <- liftIO (enqueueBlock sto bs)
debug $ "WRITE TX" <+> pretty hx
request self (ActionRequest @UNIX chan (RefChanAnnounceBlock (HashRef r)))
pure (HashRef r)
debug $ "PROPOSED NEW STATE:" <+> pretty root
pure $ DAppState root
qblfCommit s0 s1 = do
debug $ "COMMIT:" <+> pretty s0 <+> pretty s1
sto <- asks mySto
chan <- asks myChan
ref <- asks myRef
debug $ "UPDATING REF" <+> pretty ref <+> pretty s1
liftIO $ updateRef sto ref (fromHashRef (fromDAppState s1))
pure ()
qblfBroadCast msg = do
self <- asks mySelf
creds <- asks myCred
chan <- asks myChan
let sk = view peerSignSk creds
let pk = view peerSignPk creds
nonce <- randomIO @Word64 <&> serialise <&> LBS.toStrict
let box = makeSignedBox @UNIX pk sk (LBS.toStrict (serialise msg) <> nonce)
let notify = Notify @UNIX chan box
request self notify
case msg of
QBLFMsgAnn _ (QBLFAnnounce _ _) -> do
-- TODO: maybe-announce-new-state-here
pure ()
_ -> none
-- TODO: optimize-qblf-merge
-- будет нормально работать до десятков/сотен тысяч транз,
-- а потом помрёт.
-- варианты:
-- 1. перенести логику в БД
-- 2. кэшировать всё, что можно
qblfMerge s0 s1 = do
chan <- asks myChan
debug $ "MERGE. Proposed state:" <+> pretty s1
sto <- asks mySto
let readFn = liftIO . getBlock sto
tx0 <- readLog readFn (fromDAppState s0) <&> HashSet.fromList
tx1 <- mapM (readLog readFn) (fmap fromDAppState s1) <&> mconcat
debug $ "READ TXS" <+> pretty s1 <+> pretty (length tx1)
r <- forM tx1 $ \t -> runMaybeT do
-- игнорируем ранее добавленные транзакции
guard (not (HashSet.member t tx0))
bs <- MaybeT $ liftIO $ getBlock sto (fromHashRef t)
tx <- MaybeT $ pure $ deserialiseOrFail @(QBLFDemoToken L4Proto) bs & either (const Nothing) Just
case tx of
Emit box -> do
(pk, e@(EmitTx a q _)) <- MaybeT $ pure $ unboxSignedBox0 @(EmitTx L4Proto) box
guard ( chan == pk )
debug $ "VALID EMIT TRANSACTION" <+> pretty t <+> pretty (AsBase58 a) <+> pretty q
pure ([(t,e)], mempty)
(Move box) -> do
(_, m@(MoveTx _ _ qty _)) <- MaybeT $ pure $ unboxSignedBox0 @(MoveTx L4Proto) box
guard (qty > 0)
debug $ "MOVE TRANSACTION" <+> pretty t
pure (mempty, [(t,m)])
let parsed = catMaybes r
let emits = foldMap (view _1) parsed
let moves = foldMap (view _2) parsed & List.sortOn fst
bal0 <- balances (fromDAppState s0)
-- баланс с учётом новых emit
let balE = foldMap (toBalance @L4Proto . snd) emits
& HashMap.fromListWith (+)
& HashMap.unionWith (+) bal0
let moves' = updBalances @L4Proto balE moves
let merged = fmap fst emits <> fmap fst moves'
let pt = toPTree (MaxSize 256) (MaxNum 256) (HashSet.toList (tx0 <> HashSet.fromList merged))
root <- makeMerkle 0 pt $ \(_,_,bs) -> do
void $ liftIO (putBlock sto bs)
let new = DAppState (HashRef root)
-- FIXME: garbage-collect-discarded-states
debug $ "MERGED" <+> pretty new
pure new
instance (HasConf (ReaderT Config IO)) where
getConf = ask
instance HasStorage (ReaderT AnyStorage IO) where
getStorage = ask
instance ToBalance e (EmitTx e) where
toBalance (EmitTx a qty _) = [(a, qty)]
instance ToBalance e (MoveTx e) where
toBalance (MoveTx a1 a2 qty _) = [(a1, -qty), (a2, qty)]
balances :: forall e m . ( e ~ L4Proto
, MonadIO m
, HasStorage m
, ToBalance L4Proto (EmitTx L4Proto)
, ToBalance L4Proto (MoveTx L4Proto)
)
=> HashRef
-> m (HashMap (Account e) Amount)
balances root = do
sto <- getStorage
txs <- readLog (liftIO . getBlock sto) root
r <- forM txs $ \h -> runMaybeT do
blk <- MaybeT $ liftIO $ getBlock sto (fromHashRef h)
tx <- MaybeT $ pure $ deserialiseOrFail @(QBLFDemoToken L4Proto) blk & either (const Nothing) Just
case tx of
Emit box -> do
(_, emit) <- MaybeT $ pure $ unboxSignedBox0 @(EmitTx L4Proto) box
pure $ toBalance @e emit
Move box -> do
(_, move) <- MaybeT $ pure $ unboxSignedBox0 @(MoveTx L4Proto) box
pure $ toBalance @e move
pure $ catMaybes r & mconcat & HashMap.fromListWith (+)
-- TODO: optimize-upd-balances
-- можно сгруппировать по аккаунтам
-- и проверять только те транзакции, которые относятся
-- к связанной (транзакциями) группе аккаунтов.
-- то есть, разбить на кластеры, у которых отсутствуют пересечения по
-- аккаунтам и проверять независимо и параллельно, например
-- причем, прямо этой функцией
--
-- updBalances :: HashMap (Account L4Proto) Amount
-- -> [(tx, b)]
-- -> [(tx, b)]
updBalances :: forall e a tx . (ForRefChans e, ToBalance e tx)
=> HashMap (Account e) Amount
-> [(a, tx)]
-> [(a, tx)]
updBalances = go
where
go bal [] = empty
go bal (t:rest) =
if good then
t : go nb rest
else
go bal rest
where
nb = HashMap.unionWith (+) bal (HashMap.fromList (toBalance @e (snd t)))
good = HashMap.filter (<0) nb & HashMap.null
fetchMissed :: forall e w m . ( MonadIO m
, Request e (RefChanNotify e) m
, e ~ UNIX
, w ~ ConsensusQBLF
)
=> MyEnv
-> QBLFState w
-> m ()
fetchMissed env s = do
let tube = mySelf env
let chan = myChan env
let cache = myFetch env
let sto = mySto env
let href = fromDAppState s
here <- liftIO $ hasBlock sto (fromHashRef href) <&> isJust
wip <- liftIO $ Cache.lookup cache href <&> isJust
unless (here && not wip) do
debug $ "We might be need to fetch" <+> pretty s
request @UNIX tube (ActionRequest @UNIX chan (RefChanFetch (fromDAppState s)))
runMe :: ForConsensus IO => Config -> IO ()
runMe conf = withLogging $ flip runReaderT conf do
debug $ "runMe" <+> pretty conf
kr <- cfgValue @ActorOpt @(Maybe String) `orDie` "actor's key not set"
chan' <- cfgValue @RefChanOpt @(Maybe String) `orDie` "refchan not set"
sa <- cfgValue @SocketOpt @(Maybe String) `orDie` "socket not set"
pno <- cfgValue @HttpPortOpt @(Maybe Int) <&> fromMaybe 3011
ds <- cfgValue @DefStateOpt @(Maybe String)
ref <- ( cfgValue @StateRefOpt @(Maybe String)
<&> maybe Nothing fromStringMay
) `orDie` "state-ref not set"
sc <- liftIO $ BS.readFile kr
creds <- pure (parseCredentials @HBS2Basic (AsCredFile sc)) `orDie` "bad keyring file"
chan <- pure (fromStringMay @(RefChanId L4Proto) chan') `orDie` "invalid REFCHAN"
here <- liftIO $ doesFileExist sa
when here do
liftIO $ removeFile sa
server <- newMessagingUnix True 1.0 sa
abus <- async $ runMessagingUnix server
let tube = fromString sa
-- FIXME: fix-default-storage
xdg <- liftIO $ getXdgDirectory XdgData defStorePath <&> fromString
sto' <- simpleStorageInit @HbSync [StoragePrefix xdg]
let sto = AnyStorage sto'
sw <- liftIO $ replicateM 4 $ async $ simpleStorageWorker sto'
-- FIXME: fix-hardcoded-timeout
fetches <- liftIO $ Cache.newCache (Just (toTimeSpec (TimeoutSec 30)))
let myEnv = MyEnv tube
(Fabriq server)
chan
ref
sto
creds
pno
fetches
let dss = ds >>= fromStringMay
s0 <- readOrCreateStateRef dss sto ref
debug $ "STATE0:" <+> pretty s0
-- получить голову
-- из головы получить акторов
headBlk <- getRefChanHead @L4Proto sto (RefChanHeadKey chan) `orDie` "can't read head block"
let self = view peerSignPk creds & Actor @L4Proto
let actors = view refChanHeadAuthors headBlk
& HashSet.toList
& fmap (Actor @L4Proto)
runApp myEnv do
-- FIXME: timeout-hardcode
let w = realToFrac 5
-- FIXME: use-actors-asap
qblf <- qblfInit @ConsensusQBLF self actors (DAppState (HashRef s0)) w
consensus <- async do
pause @'Seconds 0.5
qblfRun qblf
-- FIXME: web-port-to-config
web <- async $ liftIO $ scotty (fromIntegral (myHttpPort myEnv)) $ do
post "/tx" $ do
r <- runExceptT do
bin <- lift body
let hBin = hashObject @HbSync bin
debug $ "GOT TX" <+> pretty hBin
tok <- check DeserializationError =<< pure (deserialiseOrFail @(QBLFDemoToken L4Proto) bin)
tx <- case tok of
(Emit box) -> do
(sign, tx) <- maybe (ExceptT $ pure $ Left SignatureError) pure $ unboxSignedBox0 box
if sign == chan then
pure hBin
else
fiasco SignatureError
(Move box) -> do
(sign, tx) <- maybe (ExceptT $ pure $ Left SignatureError) pure $ unboxSignedBox0 box
pure hBin
qblfEnqueue qblf tok
pure hBin
case r of
Left SignatureError -> do
err $ viaShow SignatureError
status status401
Left e -> do
err $ viaShow e
status status400
Right tx -> do
debug $ "TX ENQUEUED OK" <+> pretty tx
status status200
link web
runProto $ List.singleton $ makeResponse (myProto myEnv qblf chan)
void $ waitAnyCatchCancel $ [abus] <> sw
where
myProto :: forall e m . ( MonadIO m
, Request e (RefChanNotify e) m
, e ~ UNIX
)
=> MyEnv
-> QBLF ConsensusQBLF
-> RefChanId e
-> RefChanNotify e
-> m ()
myProto _ qblf _ (ActionRequest{}) = do
pure ()
myProto env qblf chan (Notify _ msg) = do
let sto = mySto env
let tube = mySelf env
let coco = hashObject @HbSync $ serialise msg
void $ runMaybeT do
(_, wrapped) <- MaybeT $ pure $ unboxSignedBox0 @ByteString @UNIX msg
qbmess <- MaybeT $ pure $ deserialiseOrFail @(QBLFMessage ConsensusQBLF) (LBS.fromStrict wrapped)
& either (const Nothing) Just
states <- case qbmess of
QBLFMsgAnn _ (QBLFAnnounce s0 s1) -> do
pure [s0, s1]
QBLFMsgHeartBeat _ _ s0 _-> do
pure [s0]
_ -> do
pure mempty
-- FIXME: full-download-guarantee
lift $ forM_ states (fetchMissed env)
qblfAcceptMessage qblf qbmess
-- debug $ "RefChanQBLFMain(3)" <+> "got message" <+> pretty (AsBase58 chan) <+> pretty coco
readOrCreateStateRef mbDs sto ref = do
debug $ "MyRef:" <+> pretty (hashObject @HbSync ref)
fix \spin -> do
mbref <- liftIO $ getRef @_ @HbSync sto ref
case mbref of
Nothing -> do
debug "STATE is empty"
maybe1 mbDs none $ \ds -> do
debug $ "UPDATE REF" <+> pretty (hashObject @HbSync ref) <+> pretty (HashRef ds)
liftIO $ updateRef sto ref ds
pause @'Seconds 0.25
spin
Just val -> do
pure val
type Config = [Syntax MegaParsec]
main :: IO ()
main = join . customExecParser (prefs showHelpOnError) $
O.info (helper <*> globalOptions)
( fullDesc
<> header "refchan-qblf-worker"
<> progDesc "for test and demo purposed"
)
where
globalOptions = applyConfig <$> commonOpts <*> cli
applyConfig :: Maybe FilePath -> (Config -> IO ()) -> IO ()
applyConfig config m = do
maybe1 config (m mempty) $ \conf -> do
top <- readFile conf <&> parseTop <&> either (pure mempty) id
m top
commonOpts = optional $ strOption (long "config" <> short 'c' <> help "Config file")
cli = hsubparser ( command "run" (O.info pRun (progDesc "run qblf servant" ) )
<> command "gen" (O.info pGen (progDesc "generate transcation") )
<> command "post" (O.info pPostTx (progDesc "post transaction") )
<> command "check" (O.info pCheckTx (progDesc "check transaction") )
<> command "balances" (O.info pBalances (progDesc "show balances") )
)
pRun = do
pure runMe
pGen = hsubparser
( command "tx-emit" ( O.info pGenEmit (progDesc "generate emit") )
<> command "tx-move" ( O.info pGenMove (progDesc "generate move") )
)
pGenEmit = do
kr <- strOption ( long "keyring" <> short 'k' <> help "keyring file" )
amnt <- option @Amount auto ( long "amount" <> short 'n' <> help "amount" )
dest <- strArgument ( metavar "ADDRESS" )
pure $ const $ silently do
sc <- BS.readFile kr
creds <- pure (parseCredentials @HBS2Basic (AsCredFile sc)) `orDie` "bad keyring file"
let pk = view peerSignPk creds
let sk = view peerSignSk creds
acc <- pure (fromStringMay @(RefChanId L4Proto) dest) `orDie` "bad account address"
tx <- makeEmitTx @L4Proto pk sk acc amnt
LBS.putStr $ serialise tx
pGenMove = do
kr <- strOption ( long "wallet" <> short 'w' <> help "wallet (keyring) file" )
amnt <- option @Amount auto ( long "amount" <> short 'n' <> help "amount" )
dest <- strArgument ( metavar "ADDRESS" )
pure $ const $ silently do
sc <- BS.readFile kr
creds <- pure (parseCredentials @HBS2Basic (AsCredFile sc)) `orDie` "bad keyring file"
let pk = view peerSignPk creds
let sk = view peerSignSk creds
acc <- pure (fromStringMay @(RefChanId L4Proto) dest) `orDie` "bad account address"
tx <- makeMoveTx @L4Proto pk sk acc amnt
LBS.putStr $ serialise tx
pCheckTx = do
kr <- strOption ( long "keyring" <> short 'k' <> help "keyring file" )
pure $ const do
sc <- BS.readFile kr
creds <- pure (parseCredentials @HBS2Basic (AsCredFile sc)) `orDie` "bad keyring file"
let pk = view peerSignPk creds
let sk = view peerSignSk creds
tx <- LBS.getContents <&> deserialise @(QBLFDemoToken L4Proto)
case tx of
Emit box -> do
void $ pure (unboxSignedBox0 @(EmitTx L4Proto) @L4Proto box) `orDie` "bad emit tx"
Move box -> do
void $ pure (unboxSignedBox0 @(MoveTx L4Proto) @L4Proto box) `orDie` "bad move tx"
pure ()
pPostTx = pure $ const do
error "not supported anymore / TODO via http"
-- rc <- strArgument ( metavar "REFCHAN" )
-- sa <- strArgument ( metavar "UNIX-SOCKET" ) <&> fromString
-- pure $ withLogging do
-- rchan <- pure (fromStringMay @(RefChanId L4Proto) rc) `orDie` "bad refchan"
-- print "JOPA"
-- -- FIXME: wrap-client-boilerplate
-- inbox <- newMessagingUnix False 1.0 sa
-- wInbox <- async $ runMessagingUnix inbox
-- let env = MyEnv (fromString sa) (Fabriq inbox) rchan
-- msg <- (LBS.getContents <&> deserialiseOrFail) `orDie` "transaction decode error"
-- runApp env do
-- request (mySelf env) (msg :: QBLFDemoTran UNIX)
-- pause @'Seconds 0.1
-- cancel wInbox
pBalances = do
state <- strArgument ( metavar "STATE" )
pure $ const $ withLogging do
xdg <- liftIO $ getXdgDirectory XdgData defStorePath <&> fromString
sto' <- simpleStorageInit @HbSync [StoragePrefix xdg]
let sto = AnyStorage sto'
sw <- liftIO $ replicateM 4 $ async $ simpleStorageWorker sto'
root <- pure (fromStringMay @HashRef state) `orDie` "Bad STATE reference"
flip runReaderT sto $ do
debug $ "calculating balances for" <+> pretty root
bal <- balances root
forM_ (HashMap.toList bal) $ \(acc, qty) -> do
liftIO $ print $ pretty (AsBase58 acc) <+> pretty qty

View File

@ -0,0 +1,16 @@
;;
hbs2-peer-rpc "127.0.0.1:13334"
http 3011
refchan "Atg67E6CPMJWKvR9BvwZTTEjg3Hjz4CYCaEARGANepGP"
socket "/tmp/notify-Atg67E6CPMJWKvR9BvwZTTEjg3Hjz4CYCaEARGANepGP"
actor "test/refchan/author1.key"
default-state "56JXzPzJQzaC9PuniQ7zHCPjedLhyc8vBjn3RtcwfXDt"
state-ref "BQ4Y1235vCZkx2uG9mjq5QuZZ4aSxr4XGAeoibxaLWFt"
;; storage?

View File

@ -0,0 +1,16 @@
;;
hbs2-peer-rpc "127.0.0.1:13334"
http 3012
refchan "Atg67E6CPMJWKvR9BvwZTTEjg3Hjz4CYCaEARGANepGP"
socket "/tmp/notify-Atg67E6CPMJWKvR9BvwZTTEjg3Hjz4CYCaEARGANepGP-2"
actor "test/refchan/author2.key"
default-state "56JXzPzJQzaC9PuniQ7zHCPjedLhyc8vBjn3RtcwfXDt"
state-ref "J7ZJ8th1JbmnLiUvLe15sqKtgteQEanxHvWpcqgbS197"
;; storage?

View File

@ -0,0 +1,16 @@
;;
hbs2-peer-rpc "127.0.0.1:13334"
http 3011
refchan "Atg67E6CPMJWKvR9BvwZTTEjg3Hjz4CYCaEARGANepGP"
socket "/tmp/notify-Atg67E6CPMJWKvR9BvwZTTEjg3Hjz4CYCaEARGANepGP"
actor "test/refchan/author3.key"
default-state "56JXzPzJQzaC9PuniQ7zHCPjedLhyc8vBjn3RtcwfXDt"
state-ref "62WTcQCgYU6CbNCeC68SyhyJ27wb24cR8onLM7G8Hsau"
;; storage?

Binary file not shown.

View File

@ -0,0 +1,9 @@
(version 1)
(quorum 2)
(wait 15)
(peer "35gKUG1mwBTr3tQpjWwR2kBYEnDmHxesoJL5Lj7tMjq3" 1)
(peer "5GnroAC8FXNRL8rcgJj6RTu9mt1AbuNd5MZVnDBcCKzb" 1)
(author "23PFfUtRgDtASus54zGhdGvuGjyDVz6fcMwvroSS4ni6")
(author "2Lf9L88dmgp4ETKTpUvVsqn7rgtsjNzcDUVxCnx3yx5G")

View File

@ -0,0 +1,6 @@
# hbs2 credentials file
# keep it private
rmbTHUgvAY9MAPCPXWMnKxxBdLYhwzpJvuXWrGG59nuTQQ89yH2sgvibnz8U
FHcqremPdtCunTkgGGFVvjBoCF72MiBGX4HtwKvX6umBoxLHnemgHQQF4z73
cZfqDbqf8rXRB9yig1Uib3AXD9

View File

@ -0,0 +1,6 @@
# hbs2 credentials file
# keep it private
rmbTHUgvMRE3DbKUeSfReH5EvTRspfEeK8ogmTG7rs99Ha48KNnEmR9Yfjdq
rimiXGM8oK4WdK9PDqySsZBZxeVdaYiLwbr9yzjwpG8baXQ99k34Dkk7s662
ZVm6kioTEG8yTw8twTbXk2b6Y7

View File

@ -0,0 +1,6 @@
# hbs2 credentials file
# keep it private
rmbTHUgvbLz9QzUGc4fzPfHKyyFuh8w2sEvGWPsP1GxrSHk7dh5Qs4A1GQMM
QgUJpB2HvAaQsQjVBUHjHgRkKbNUEPGmyG9iDCQ9Fmy4MCQFQ5YMmBVZthvP
QYia9gaD5WJuPS12rUAjZHeKvo

View File

@ -0,0 +1,6 @@
# hbs2 credentials file
# keep it private
rmbTHUgve1Kak6KL7StWLyoPxzTy1Fs4DcYccQ4eU93gjN5s2ooSrVsD4C5e
cztUQSysK5Gj5B1d9jYwF9bk4vUCh2QdLdA7r8zAfUxKkto8Ri8oUyGn9eVk
J3JWTSTJE25umnFQ3D4uZmXunF

View File

@ -0,0 +1,6 @@
# hbs2 credentials file
# keep it private
rmbTHUgvCGLNpN4YMRoeQs3erqzCVPqNL9cGaTHLToa5EX8DnxPoWHXrRRp8
PXZ2SzLUQXPE16aAJyHsYJcaEbkrcm8W7nB3wKpBZz4Dm91G2uXrMkdv3UdX
jTABTKF3vWKc15LGbu6b7BoqPu

View File

@ -0,0 +1,134 @@
{-# Language UndecidableInstances #-}
module Demo.QBLF.Transactions where
import HBS2.Prelude.Plated
import HBS2.Hash
import HBS2.Base58
import HBS2.Net.Proto.Types
import HBS2.Actors.Peer.Types
import HBS2.Net.Proto.RefChan
import HBS2.Data.Types.Refs (HashRef(..))
import HBS2.Data.Types.SignedBox
import HBS2.Net.Auth.Credentials
import HBS2.Net.Messaging.Unix (UNIX)
import HBS2.Net.Proto.Definition()
import Data.Hashable(Hashable(..))
import Codec.Serialise
import Data.ByteString.Lazy (ByteString)
import Data.Word (Word64)
import System.Random
newtype Actor e =
Actor { fromActor :: PubKey 'Sign (Encryption e) }
deriving stock (Generic)
deriving stock instance Eq (PubKey 'Sign (Encryption e)) => Eq (Actor e)
deriving newtype instance Hashable (PubKey 'Sign (Encryption e)) => Hashable (Actor e)
instance Pretty (AsBase58 (PubKey 'Sign (Encryption e))) => Pretty (Actor e) where
pretty (Actor a) = pretty (AsBase58 a)
type Account e = PubKey 'Sign (Encryption e)
newtype Amount = Amount Integer
deriving stock (Eq,Show,Ord,Data,Generic)
deriving newtype (Read,Enum,Num,Integral,Real,Pretty)
newtype DAppState = DAppState { fromDAppState :: HashRef }
deriving stock (Eq,Show,Ord,Data,Generic)
deriving newtype (Hashable,Pretty)
instance Hashed HbSync DAppState where
hashObject (DAppState (HashRef h)) = h
data EmitTx e = EmitTx (Account e) Amount Word64
deriving stock (Generic)
data MoveTx e = MoveTx (Account e) (Account e) Amount Word64
deriving stock (Generic)
data QBLFDemoToken e =
Emit (SignedBox (EmitTx e) e) -- proof: owner's key
| Move (SignedBox (MoveTx e) e) -- proof: wallet's key
deriving stock (Generic)
instance ForRefChans e => Serialise (Actor e)
instance Serialise DAppState
instance Serialise Amount
instance Serialise (PubKey 'Sign (Encryption e)) => Serialise (EmitTx e)
instance Serialise (PubKey 'Sign (Encryption e)) => Serialise (MoveTx e)
instance (Serialise (Account e), ForRefChans e) => Serialise (QBLFDemoToken e)
type ForQBLFDemoToken e = ( Eq (PubKey 'Sign (Encryption e))
, Eq (Signature (Encryption e))
, Pretty (AsBase58 (PubKey 'Sign (Encryption e)))
, ForSignedBox e
, FromStringMaybe (PubKey 'Sign (Encryption e))
, Serialise (PubKey 'Sign (Encryption e))
, Serialise (Signature (Encryption e))
, Hashable (PubKey 'Sign (Encryption e))
)
deriving stock instance (ForQBLFDemoToken e) => Eq (QBLFDemoToken e)
instance ForQBLFDemoToken e => Hashable (QBLFDemoToken e) where
hashWithSalt salt = \case
Emit box -> hashWithSalt salt box
Move box -> hashWithSalt salt box
newtype QBLFDemoTran e =
QBLFDemoTran (SignedBox (QBLFDemoToken e) e)
deriving stock Generic
instance ForRefChans e => Serialise (QBLFDemoTran e)
deriving newtype instance
(Eq (PubKey 'Sign (Encryption e)), Eq (Signature (Encryption e)))
=> Eq (QBLFDemoTran e)
deriving newtype instance
(Eq (Signature (Encryption e)), ForRefChans e)
=> Hashable (QBLFDemoTran e)
instance Serialise (QBLFDemoTran UNIX) => HasProtocol UNIX (QBLFDemoTran UNIX) where
type instance ProtocolId (QBLFDemoTran UNIX) = 0xFFFF0001
type instance Encoded UNIX = ByteString
decode = either (const Nothing) Just . deserialiseOrFail
encode = serialise
makeEmitTx :: forall e m . ( MonadIO m
, ForRefChans e
, Signatures (Encryption e)
)
=> PubKey 'Sign (Encryption e)
-> PrivKey 'Sign (Encryption e)
-> Account e
-> Amount
-> m (QBLFDemoToken e)
makeEmitTx pk sk acc amount = do
nonce <- randomIO
let box = makeSignedBox @e pk sk (EmitTx @e acc amount nonce)
pure (Emit @e box)
makeMoveTx :: forall e m . ( MonadIO m
, ForRefChans e
, Signatures (Encryption e)
)
=> PubKey 'Sign (Encryption e) -- from pk
-> PrivKey 'Sign (Encryption e) -- from sk
-> Account e
-> Amount -- amount
-> m (QBLFDemoToken e)
makeMoveTx pk sk acc amount = do
nonce <- randomIO
let box = makeSignedBox @e pk sk (MoveTx @e pk acc amount nonce)
pure (Move @e box)

View File

@ -0,0 +1,510 @@
{-# Language TemplateHaskell #-}
{-# Language AllowAmbiguousTypes #-}
{-# Language UndecidableInstances #-}
{-# Language MultiWayIf #-}
module QBLF.Proto where
import HBS2.Prelude.Plated
import HBS2.System.Logger.Simple
import HBS2.Clock
import HBS2.Hash
import Control.Applicative
import Control.Concurrent.STM (flushTQueue)
import Control.Monad
import Control.Monad.Trans.Maybe
import Data.Either
import Data.Function
import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HashMap
import Data.HashSet (HashSet)
import Data.HashSet qualified as HashSet
import Data.Kind
import Data.List qualified as List
import Data.Map qualified as Map
import Data.Maybe
import Data.Ord
import Data.Tuple (swap)
import Lens.Micro.Platform
import System.Random (randomRIO)
import Data.Time.Clock.POSIX
import Codec.Serialise()
import Data.Fixed
import UnliftIO
{- HLINT ignore "Use newtype instead of data" -}
newtype QBLFTimeStamp =
QBLFTimeStamp { fromQBLFTimeStamp :: Fixed E12 }
deriving stock (Generic)
deriving newtype (Eq,Ord,Num,Real,Fractional,Show)
instance Serialise QBLFTimeStamp
data QBLFMessage w =
QBLFMsgAnn (QBLFActor w) (QBLFAnnounce w)
| QBLFMsgMerge (QBLFActor w ) (QBLFMerge w)
| QBLFMsgCommit (QBLFActor w) (QBLFCommit w)
| QBLFMsgHeartBeat (QBLFActor w) QBLFStateN (QBLFState w) QBLFTimeStamp
deriving stock Generic
data QBLFAnnounce w =
QBLFAnnounce (QBLFState w) (QBLFState w)
deriving stock (Generic)
data QBLFMerge w =
QBLFMerge (QBLFState w) (QBLFState w)
deriving stock Generic
data QBLFCommit w =
QBLFCommit (QBLFState w) (QBLFState w)
deriving stock Generic
data QBLFStateN =
QWait
| QAnnounce
| QMerge
| QCommit
deriving stock (Eq,Ord,Enum,Show,Generic)
instance Serialise QBLFStateN
type ForQBLF w = ( Hashed HbSync (QBLFState w)
, Hashable (QBLFState w)
, Hashable (QBLFTransaction w)
, Hashable (QBLFActor w)
, Hashable (QBLFAnnounce w)
, Hashable (QBLFMerge w)
, Eq (QBLFState w)
)
deriving instance ForQBLF w => Eq (QBLFAnnounce w)
deriving instance ForQBLF w => Eq (QBLFMerge w)
instance ForQBLF w => Hashable (QBLFAnnounce w)
instance ForQBLF w => Hashable (QBLFMerge w)
class (Monad m, ForQBLF w) => IsQBLF w m where
type QBLFActor w :: Type
type QBLFTransaction w :: Type
type QBLFState w :: Type
qblfNewState :: QBLFState w -> [QBLFTransaction w] -> m (QBLFState w)
qblfBroadCast :: QBLFMessage w -> m ()
qblfMerge :: QBLFState w -> [QBLFState w] -> m (QBLFState w)
qblfCommit :: QBLFState w -> QBLFState w -> m ()
qblfMoveForward :: QBLFState w -> QBLFState w -> m Bool
qblfMoveForward _ _ = pure True
data QBLF w =
QBLF
{ _qblfSelf :: QBLFActor w
, _qblfAllActors :: HashSet (QBLFActor w)
, _qblfState :: QBLFStateN
, _qblfCurrent :: QBLFState w
, _qblfWaitAnnounce :: Timeout 'Seconds
, _qblfCommitsFrom :: HashSet (QBLFActor w)
, _qblfTranQ :: TVar (HashSet (QBLFTransaction w))
, _qblfAlive :: TVar (HashMap (QBLFActor w) (QBLFStateN, QBLFState w, TimeSpec))
, _qblfStateTime :: TVar TimeSpec
, _qblfLastHeartBeat :: TVar TimeSpec
, _qblfAnnounces :: TVar (HashMap (QBLFActor w, QBLFAnnounce w) TimeSpec)
, _qblfMerges :: TVar (HashMap (QBLFActor w, QBLFMerge w) TimeSpec)
}
makeLenses ''QBLF
qblfGetActor :: (ForQBLF w) => QBLFMessage w -> QBLFActor w
qblfGetActor = \case
QBLFMsgAnn a _ -> a
QBLFMsgMerge a _ -> a
QBLFMsgCommit a _ -> a
QBLFMsgHeartBeat a _ _ _ -> a
qblfEnqueue :: (ForQBLF w, MonadIO m) => QBLF w -> QBLFTransaction w -> m ()
qblfEnqueue me tran = do
-- synced <- qblfIsSynced me
-- when synced do
atomically $ modifyTVar (view qblfTranQ me) (HashSet.insert tran)
qblfAcceptMessage :: (ForQBLF w, MonadIO m) => QBLF w -> QBLFMessage w -> m ()
qblfAcceptMessage me msg = do
-- FIXME: drop-premature-announces
let actor = qblfGetActor msg
when (actor `HashSet.member` view qblfAllActors me) do
now <- getTimeCoarse
let mine = view qblfCurrent me
let add = const True -- not $ HashSet.member x (view qblfIgnore me)
case msg of
QBLFMsgAnn a ann@(QBLFAnnounce s0 _) | add s0 -> do
atomically $ modifyTVar (view qblfAnnounces me) (HashMap.insert (a, ann) now)
QBLFMsgMerge a m@(QBLFMerge s0 _) | add s0 -> do
atomically $ modifyTVar (view qblfMerges me) (HashMap.insert (a, m) now)
QBLFMsgCommit a (QBLFCommit s0 s) | add s0 -> do
atomically $ modifyTVar (view qblfAlive me) (HashMap.insert a (QWait,s,now))
QBLFMsgHeartBeat a t s _ -> do
-- debug $ "heartbeat" <+> pretty (view qblfSelf me) <+> pretty (a, s)
atomically $ modifyTVar (view qblfAlive me) (HashMap.insert a (t,s,now))
_ -> pure ()
qblfQuorum :: forall w a m . (ForQBLF w, IsQBLF w m, MonadUnliftIO m, Integral a)
=> QBLF w
-> m a
qblfQuorum me = do
-- n <- qblfLastAlive me
-- pure $ fromIntegral $ 1 + (n `div` 2)
let aliveSz = view qblfAllActors me & List.length
pure $ max 1 $ round $ realToFrac (aliveSz + 1) / 2
qblfLastAlive :: (ForQBLF w, IsQBLF w m, MonadUnliftIO m) => QBLF w -> m Int
qblfLastAlive me = pure 0
-- q <- qblfQuorum me
-- n <- atomically $ readTVar (view qblfAlive me) <&> HashMap.toList <&> length
-- if n > 0 then
-- pure n
-- else
-- pure q
qblfInit :: forall w m . (ForQBLF w, IsQBLF w m, MonadUnliftIO m)
=> QBLFActor w -- ^ self
-> [QBLFActor w] -- ^ all actors
-> QBLFState w -- ^ initial state
-> Timeout 'Seconds -- ^ timeout
-> m (QBLF w)
qblfInit self actors s0 w =
QBLF self
(HashSet.fromList actors)
QWait
s0
w
mempty
<$> newTVarIO mempty
<*> newTVarIO mempty
<*> (newTVarIO =<< now)
<*> newTVarIO 0
<*> newTVarIO mempty
<*> newTVarIO mempty
where
now = getTimeCoarse
qblfNextCommitTime :: (MonadIO f, Real a) => a -> f TimeSpec
qblfNextCommitTime ww = do
let wt = realToFrac ww
t0 <- getTimeCoarse
dt <- liftIO $ randomRIO (wt/2, wt)
pure $ fromNanoSecs $ toNanoSecs t0 + round (realToFrac dt * 1e9)
-- qblfGetState :: (ForQBLF w, MonadUnliftIO m) => QBLF w -> m QBLFStateN
-- qblfGetState q = readTVarIO (view qblfState q)
qblfRun :: forall w m . ( Pretty (QBLFActor w)
, Pretty (QBLFState w)
, ForQBLF w
, IsQBLF w m
, MonadUnliftIO m
) => QBLF w -> m ()
qblfRun me = do
forever $ do
void $ qblfTo me QWait
warn "QUIT!!!"
-- mapM_ wait [a1,hb]
-- mapM_ wait [a1]
-- waitAnyCatchCancel []
where
tAlive = 5
minHeartBeat = round 5e9
sendHeartBeat :: IsQBLF w m => QBLF w -> m ()
sendHeartBeat s = do
ts <- liftIO getPOSIXTime <&> realToFrac
now <- getTimeCoarse
sent <- readTVarIO (_qblfLastHeartBeat s)
when (toNanoSecs (now - sent) > minHeartBeat) do
qblfBroadCast @w $ QBLFMsgHeartBeat (view qblfSelf s) (view qblfState s) (view qblfCurrent s) ts
atomically $ writeTVar (_qblfLastHeartBeat s) now
sendAnnounce :: IsQBLF w m => QBLF w -> QBLFState w -> m ()
sendAnnounce s sx = do
qblfBroadCast @w (QBLFMsgAnn self (QBLFAnnounce current sx))
where
self = view qblfSelf s
current = view qblfCurrent s
sendMerge :: IsQBLF w m => QBLF w -> QBLFState w -> m ()
sendMerge s sx = do
qblfBroadCast @w (QBLFMsgMerge self (QBLFMerge current sx))
where
self = view qblfSelf s
current = view qblfCurrent s
sendCommit :: IsQBLF w m => QBLF w -> QBLFState w -> m ()
sendCommit s sx = do
qblfBroadCast @w (QBLFMsgCommit self (QBLFCommit current sx))
where
self = view qblfSelf s
current = view qblfCurrent s
nap = pause @'Seconds 0.25
getAlive :: IsQBLF w m => QBLF w -> m [(QBLFActor w, QBLFStateN, QBLFState w) ]
getAlive s = do
now <- getTimeCoarse
states <- readTVarIO (view qblfAlive s) <&> HashMap.toList
pure [ (a,n,sx) | (a, (n,sx,t)) <- states, toNanoSecs (now - t) < round (2 * tAlive * 1e9) ]
sweepMerges :: IsQBLF w m => QBLF w -> QBLFState w -> m ()
sweepMerges qblf s = do
atomically do
old <- readTVar (view qblfMerges qblf) <&> HashMap.toList
let new = [ ((a, m), t) | ((a, m@(QBLFMerge s0 _)), t) <- old, s0 /= s ]
writeTVar (view qblfMerges me) (HashMap.fromList new)
sweepAnnounces :: IsQBLF w m => QBLF w -> m ()
sweepAnnounces qblf = do
-- FIXME: fix-magic-number
let wnano = fromIntegral $ toNanoSeconds $ max 300 $ 10 * view qblfWaitAnnounce qblf
now <- getTimeCoarse
swept <- atomically do
old <- readTVar (view qblfAnnounces qblf) <&> HashMap.toList
let new = [ ((a, m), t)
| ((a, m@(QBLFAnnounce _ _)), t) <- old
, toNanoSecs (now - t) < wnano
]
writeTVar (view qblfAnnounces me) (HashMap.fromList new)
pure $ length old - length new
when (swept > 0) do
debug $ "sweepAnnounces" <+> pretty swept
qblfTo s n = do
now <- getTimeCoarse
let ns = s { _qblfState = n }
atomically $ writeTVar (_qblfStateTime ns) now
case n of
QWait -> qblfToWait ns
QAnnounce -> qblfToAnnounce ns
QMerge -> qblfToMerge ns
QCommit -> qblfToCommit ns
qblfToWait s = do
let w = view qblfWaitAnnounce s
q <- qblfQuorum s
let trsh = max 2 (q `div` 2)
newAnn <- newTVarIO mempty
fix \next -> do
sendHeartBeat s
sweepAnnounces s
now <- getTimeCoarse
alive <- getAlive s
let wn = sum [ 1 | (_, QWait, _) <- alive ]
-- debug $ logActor s <+> "wait" <+> pretty wn
t0 <- readTVarIO (view qblfStateTime s)
let elapsed = toNanoSeconds $ TimeoutTS (now - t0)
their <- selectState s (fmap (view _3) alive)
let mine = view qblfCurrent s
ann0 <- atomically $ readTVar (view qblfAnnounces s) <&> HashMap.keys
let alst = [ s1 | (_, QBLFAnnounce s0 s1) <- ann0, s0 == mine, s1 /= s0 ]
old <- readTVarIO newAnn
let an = HashSet.size $ HashSet.fromList alst `HashSet.difference` old
atomically $ writeTVar newAnn (HashSet.fromList alst)
let g = if | their /= Just mine -> do
case their of
Nothing -> pure $ nap >> next
Just th -> do
-- FIXME: what-if-failed
forwarded <- qblfMoveForward @w mine th
if forwarded then do
debug $ logActor s <+> "DO FAST-FORWARD" <+> pretty th
qblfCommit @w mine th
pure $ qblfTo (set qblfCurrent th s) QAnnounce
else do
pure $ nap >> next
| wn >= q && (elapsed > toNanoSeconds w || an >= trsh) && Just mine == their -> do
let el = elapsed > toNanoSeconds w
let aa = an >= trsh
debug $ logActor s <+> "ready" <+> pretty their <+> pretty el <+> pretty aa <+> pretty an
pure $ qblfTo s QAnnounce
| otherwise -> pure $ nap >> next
join g
qblfToAnnounce s = do
let mine = view qblfCurrent s
q <- qblfQuorum s
let wa = view qblfWaitAnnounce s
sweepMerges s mine
-- TODO: extract-method
txs <- atomically do
tx <- readTVar (view qblfTranQ s) <&> HashSet.toList
writeTVar (view qblfTranQ s) mempty
pure tx
hx <- qblfNewState @w mine txs
sendAnnounce s hx
pause (0.1 * wa)
g <- race ( pause wa ) do
fix \next -> do
ann0 <- atomically $ readTVar (view qblfAnnounces s) <&> HashMap.keys
let ann = [ s1 | (_, QBLFAnnounce s0 s1) <- ann0, s0 == mine ]
if length ann >= q then do
debug $ logActor s <+> "announce/ready-to-merge" <+> pretty (view qblfCurrent s) <+> pretty (length ann)
pure $ qblfTo s QMerge
else do
trace $ logActor s <+> "announce/wait" <+> pretty (view qblfCurrent s) <+> pretty (length ann)
nap >> next
case g of
Left{} -> qblfTo s QWait
Right n -> n
qblfToMerge s = do
let mine = view qblfCurrent s
q <- qblfQuorum s
let wa = view qblfWaitAnnounce s
-- pause @'Seconds wa
-- debug $ logActor s <+> "merge"
ann0 <- atomically $ readTVar (view qblfAnnounces s) <&> HashMap.keys
let aann = [ (a, s1) | (a, QBLFAnnounce s0 s1) <- ann0, s0 == mine ]
let ann = fmap snd aann
let actors = fmap fst aann & HashSet.fromList
let g = case ann of
[] -> do
debug $ "MERGE: not found SHIT!" <+> pretty (length aann)
pure $ qblfTo s QWait
(_:_) -> do
new <- qblfMerge @w mine ann
sendMerge s new
pure $ qblfTo (set qblfCommitsFrom actors s) QCommit
join g
qblfToCommit s = do
let mine = view qblfCurrent s
let authors = view qblfCommitsFrom s
-- pause @'Seconds 2
debug $ logActor s <+> "qblfToCommit"
-- FIXME: timeout-and-rollback-to-wait
let wa = view qblfWaitAnnounce s
r <- race ( pause wa ) do
fix \next -> do
merges0 <- atomically $ readTVar (view qblfMerges s) <&> HashMap.keys
let merges = [ (s1, a, m)
| (a, m@(QBLFMerge s0 s1)) <- merges0, s0 == mine
, a `HashSet.member` authors
]
mbNew <- selectState s (fmap (view _1) merges)
trace $ "#### COMMIT NEW:"
<+> pretty mine
<+> pretty mbNew
-- <+> pretty (fmap (view _1) merges)
case mbNew of
Just new -> do
when (new /= mine) do
debug $ logActor s <+> "commit: " <+> pretty new
sendCommit s new
qblfCommit @w mine new
-- sweepAnnounces s mine
pure $ qblfTo ( set qblfCurrent new s
) QWait
Nothing -> do
-- debug $ logActor s <+> "commit: " <+> "fail"
nap >> next
case r of
Left{} -> qblfTo s QWait
Right n -> n
selectState :: IsQBLF w m => QBLF w -> [QBLFState w] -> m (Maybe (QBLFState w))
selectState s sx = do
q <- qblfQuorum s
let mbs = fmap (,1) sx & HashMap.fromListWith (+)
& HashMap.toList
& fmap (over _2 List.singleton . swap)
& Map.fromListWith (<>)
& Map.toDescList
& headMay
runMaybeT do
ss <- MaybeT $ pure mbs
let sss = over _2 (List.sortOn (Down . hashObject @HbSync)) ss :: (Integer, [QBLFState w])
if fst sss >= q then do
MaybeT $ pure $ headMay (snd sss)
else
mzero
logActor s = "ACTOR" <> parens (pretty (view qblfSelf s))

View File

@ -0,0 +1,243 @@
cabal-version: 3.0
name: refchan-qblf
version: 0.1.0.0
-- synopsis:
-- description:
license: BSD-3-Clause
license-file: LICENSE
-- author:
-- maintainer:
-- copyright:
category: Network
build-type: Simple
extra-doc-files: CHANGELOG.md
-- extra-source-files:
common warnings
ghc-options: -Wall
common common-deps
build-depends:
base, hbs2-core, hbs2-storage-simple
, aeson
, async
, bytestring
, cache
, containers
, data-default
, deepseq
, directory
, filepath
, hashable
, http-types
, microlens-platform
, mtl
, mwc-random
, prettyprinter
, QuickCheck
, random
, random-shuffle
, resourcet
, safe
, serialise
, split
, stm
, streaming
, scotty
, suckless-conf
, tasty
, tasty-hunit
, temporary
, timeit
, transformers
, uniplate
, unordered-containers
, vector
, prettyprinter-ansi-terminal
, interpolatedstring-perl6
, unliftio
common shared-properties
ghc-options:
-Wall
-O2
-fno-warn-type-defaults
-- -fno-warn-unused-matches
-- -fno-warn-unused-do-bind
-- -Werror=missing-methods
-- -Werror=incomplete-patterns
-- -fno-warn-unused-binds
-threaded
-rtsopts
"-with-rtsopts=-N4 -A256m -AL256m -I0"
default-language: Haskell2010
default-extensions:
ApplicativeDo
, BangPatterns
, BlockArguments
, ConstraintKinds
, DataKinds
, DeriveDataTypeable
, DeriveGeneric
, DerivingStrategies
, DerivingVia
, ExtendedDefaultRules
, FlexibleContexts
, FlexibleInstances
, GADTs
, GeneralizedNewtypeDeriving
, ImportQualifiedPost
, LambdaCase
, MultiParamTypeClasses
, OverloadedStrings
, QuasiQuotes
, ScopedTypeVariables
, StandaloneDeriving
, TupleSections
, TypeApplications
, TypeFamilies
library refchan-qblf-core
import: shared-properties
import: common-deps
default-language: Haskell2010
exposed-modules: QBLF.Proto
ghc-options:
-- -prof
-- -fprof-auto
-- other-extensions:
-- type: exitcode-stdio-1.0
hs-source-dirs: lib
build-depends:
base, hbs2-core
, async
, attoparsec
, bytestring
, cache
, clock
, containers
, data-default
, directory
, hashable
, microlens-platform
, mtl
, mwc-random
, network
, network-ip
, optparse-applicative
, prettyprinter
, QuickCheck
, random
, safe
, serialise
, stm
, streaming
, tasty
, tasty-hunit
, text
, time
, transformers
, uniplate
, vector
, unliftio
executable refchan-qblf
import: shared-properties
import: common-deps
default-language: Haskell2010
ghc-options:
-- -prof
-- -fprof-auto
other-modules:
Demo.QBLF.Transactions
-- other-extensions:
-- type: exitcode-stdio-1.0
hs-source-dirs: app lib
main-is: RefChanQBLFMain.hs
build-depends:
base, refchan-qblf-core, hbs2-core, hbs2-storage-simple
, async
, attoparsec
, bytestring
, cache
, clock
, containers
, data-default
, data-textual
, directory
, hashable
, microlens-platform
, mtl
, mwc-random
, network
, network-ip
, optparse-applicative
, prettyprinter
, QuickCheck
, random
, safe
, serialise
, stm
, streaming
, tasty
, tasty-hunit
, text
, time
, transformers
, uniplate
, vector
, unliftio
test-suite refchan-qblf-proto-test
import: shared-properties
default-language: Haskell2010
other-modules:
type: exitcode-stdio-1.0
hs-source-dirs: test
main-is: RefChanQBLFProtoTest.hs
build-depends:
base, refchan-qblf-core, hbs2-core
, async
, bytestring
, cache
, cborg
, containers
, directory
, filepath
, hashable
, microlens-platform
, mtl
, optparse-applicative
, prettyprinter
, prettyprinter-ansi-terminal
, QuickCheck
, random
, safe
, serialise
, tasty
, tasty-hunit
, temporary
, timeit
, uniplate
, unliftio
, unordered-containers
, vector

View File

@ -0,0 +1,334 @@
{-# Language AllowAmbiguousTypes #-}
module Main where
import HBS2.Prelude.Plated
import HBS2.System.Logger.Simple
import HBS2.Clock
import HBS2.Hash
import HBS2.Base58
import QBLF.Proto
import Data.Ord
import Data.Functor
import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HashMap
import Data.List qualified as List
import Control.Monad.Reader
import System.Random
import Data.Hashable hiding (Hashed)
import Data.Word
import Lens.Micro.Platform
import Data.Fixed
import Options.Applicative as O
import Data.Cache (Cache)
import Data.Cache qualified as Cache
import Data.Maybe
import Data.Graph
import Data.HashSet (HashSet)
import Data.HashSet qualified as HashSet
import Prettyprinter.Render.Terminal
import GHC.TypeLits
import Codec.Serialise
import UnliftIO
type MyHash = Hash HbSync
{- HLINT ignore "Use newtype instead of data" -}
newtype Actor = Actor { fromActor :: Int }
deriving stock (Eq,Ord,Show,Generic)
deriving newtype (Num,Enum,Hashable,Pretty)
newtype Tran = OddEven { fromTran :: Integer }
deriving stock (Eq,Ord,Show,Generic)
deriving newtype (Hashable)
instance Serialise Tran
newtype MyState = MyState { fromMyState :: MyHash } -- [Tran]
deriving stock (Eq,Ord,Show,Generic)
deriving newtype (Hashable,Pretty)
instance Hashed HbSync MyState where
hashObject (MyState x) = x
data MyConsensus = MyConsensus
data MyEnv =
MyEnv
{ mySelf :: Actor
, myActors :: HashMap Actor (TQueue (QBLFMessage MyConsensus))
, myState :: TVar MyState
, myStorage :: TVar (HashMap MyHash (HashSet Tran))
, myCommits :: TVar (HashSet (MyHash, MyHash))
}
newtype RunM m a = RunM { fromRunM :: ReaderT MyEnv m a }
deriving newtype ( Applicative
, Functor
, Monad
, MonadIO
, MonadUnliftIO
, MonadReader MyEnv
)
runM :: MyEnv -> RunM m a -> m a
runM env f = runReaderT (fromRunM f) env
instance MonadIO m => IsQBLF MyConsensus (RunM m) where
type QBLFActor MyConsensus = Actor
type QBLFTransaction MyConsensus = Tran
type QBLFState MyConsensus = MyState
qblfNewState _ txs = do
let tx1 = HashSet.fromList txs
let h1 = hashObject (serialise tx1)
let ms1 = MyState h1
ca <- asks myStorage
atomically $ modifyTVar ca $ HashMap.insert h1 tx1
pure ms1
qblfCommit s0 s1 = do
mst <- asks myState
me <- asks mySelf
co <- asks myCommits
atomically $ do
writeTVar mst s1
modifyTVar co (HashSet.insert (fromMyState s1, fromMyState s0))
trace $ "COMMIT" <> braces (viaShow me)
<+> ":"
<+> pretty s0
<+> "->"
<+> pretty s1
qblfBroadCast msg = do
actors <- ask <&> myActors <&> HashMap.toList
forM_ actors $ \(a,q) -> do
-- debug $ "qblfBroadCast to" <+> pretty a
atomically $ writeTQueue q msg
qblfMerge m mss = do
cache <- asks myStorage
txs' <- forM (m:mss) $ \(MyState h) -> do
readTVarIO cache <&> HashMap.lookup h <&> fromMaybe mempty
let txs = HashSet.unions txs'
let tx3 = HashSet.filter evenOnly txs
-- -- List.nub $ List.filter evenOnly $ List.sort txs
let h3 = hashObject (serialise tx3)
let ms3 = MyState h3
atomically $ modifyTVar cache $ HashMap.insert h3 tx3
--
pure ms3
where
evenOnly (OddEven x) = even x
tracePrefix :: SetLoggerEntry
tracePrefix = logPrefix "[trace] "
debugPrefix :: SetLoggerEntry
debugPrefix = logPrefix "[debug] "
errorPrefix :: SetLoggerEntry
errorPrefix = logPrefix "[error] "
warnPrefix :: SetLoggerEntry
warnPrefix = logPrefix "[warn] "
noticePrefix :: SetLoggerEntry
noticePrefix = logPrefix "[notice] "
data Opts =
Opts
{ cliActorsNum :: Int
, cliWaitAnnounceTime :: Double
, cliPd :: Double
, cliPb :: Double
, cliPt :: Double
, cliWt :: Double
, cliTrace :: Bool
}
cliOptions :: Parser Opts
cliOptions = Opts
<$> option auto
( long "actors-num"
<> short 'n'
<> help "number of actors"
<> showDefault
<> value 2 )
<*> option auto
( long "wait-announce-time"
<> short 'w'
<> help "wait-announce-time"
<> showDefault
<> value 10 )
<*> option auto
( long "pD"
<> help "p(disaster)"
<> showDefault
<> value 0.000001 )
<*> option auto
( long "pB"
<> help "p(Bad)"
<> showDefault
<> value 0.05 )
<*> option auto
( long "pT"
<> help "p(transaction)"
<> showDefault
<> value 0.10 )
<*> option auto
( long "wT"
<> help "Period(transaction)"
<> showDefault
<> value 1.00 )
<*> option auto
( long "trace"
<> help "enable trace"
<> showDefault
<> value False )
main :: IO ()
main = do
opts <- execParser $ O.info (cliOptions <**> helper)
( fullDesc
<> progDesc "refchan-qblf-proto-test"
)
when (cliTrace opts) do
setLogging @TRACE tracePrefix
setLogging @DEBUG debugPrefix
setLogging @INFO defLog
setLogging @ERROR errorPrefix
setLogging @WARN warnPrefix
setLogging @NOTICE noticePrefix
cache <- newTVarIO mempty
let nada = hashObject (serialise (mempty :: HashSet Tran))
let s0 = MyState nada
atomically $ modifyTVar cache $ HashMap.insert nada mempty
commits <- newTVarIO mempty
let actors = [ 1 .. fromIntegral (cliActorsNum opts) ] :: [Actor]
ee <- forM actors $ \a -> do
(a,) <$> newTQueueIO
tstates <- newTVarIO (mempty :: HashMap Actor MyState)
async $ forever $ do
let w = cliWaitAnnounceTime opts
let n = cliActorsNum opts
let q = max 1 $ round $ realToFrac (n + 1) / 2
pause @'Seconds (2 * realToFrac w)
rs <- atomically $ readTVar tstates <&> HashMap.elems
let votes = List.sortOn (Down . snd) $ HashMap.toList $ HashMap.fromListWith (+) [ (n,1) | n <- rs ]
let dv = sum [ 1 | (_,x) <- votes, x < q ]
case headMay votes of
Just (s,l) | l >= q -> do
z <- readTVarIO cache <&> HashMap.lookup (fromMyState s) <&> maybe 0 length
notice $ annotate (color Green) $ "CONSENSUS" <+> pretty s <+> viaShow l <+> pretty z <+> viaShow dv
_ -> pure () -- err "NO CONSENSUS!"
pure ()
w1 <- forM actors $ \a -> async do
mst <- newTVarIO s0
runM (MyEnv a (HashMap.fromList ee) mst cache commits) do
let pBad = cliPb opts
let w = cliWaitAnnounceTime opts
let pFuckup = cliPd opts
let pT = cliPt opts
let wT = cliWt opts
qblf <- qblfInit @MyConsensus a actors s0 (realToFrac w)
r1 <- async $ do
bad <- randomRIO (0.0, 1.00)
miniW <- randomRIO (0.001, 0.02)
pause @'Seconds (realToFrac miniW)
when (bad > (1 - pBad)) do
dt <- randomRIO (w/5, w*5)
pause @'Seconds (realToFrac dt)
qblfRun qblf
a1 <- async $ forever do
pause @'Seconds (realToFrac wT)
pt <- liftIO $ randomRIO (0.00, 1.00 :: Double)
when (pt < pT) do
tran <- OddEven <$> liftIO randomIO
-- debug $ "I'm" <+> viaShow a <+> viaShow tran
qblfEnqueue qblf tran
a2 <- async $ do
mbInbox <- asks myActors <&> HashMap.lookup a
maybe1 mbInbox (pure ()) $ \inbox -> forever do
atomically (readTQueue inbox) >>= qblfAcceptMessage qblf
let n = realToFrac (length actors)
fuckup <- liftIO $ randomRIO (0.00, 1.00) <&> (*n)
when (fuckup < pFuckup) do
debug $ "ACTOR" <> parens (pretty $ view qblfSelf qblf) <+> "DISASTER HAPPENED"
let wx = realToFrac $ view qblfWaitAnnounce qblf
dt <- liftIO $ randomRIO (wx*0.5, wx*3.5)
pause @'Seconds (realToFrac dt)
a3 <- async $ forever do
pause @'Seconds (realToFrac (cliWaitAnnounceTime opts))
me <- ask
st@(MyState xs) <- asks myState >>= readTVarIO
n <- asks myStorage >>= liftIO . readTVarIO <&> HashMap.lookup xs <&> fromMaybe mempty <&> length
atomically $ modifyTVar tstates (HashMap.insert a st)
-- n <- qblfGetState qblf
-- -- h <- height
-- notice $ "ACTOR" <> parens (pretty (fromActor $ mySelf me))
-- <> brackets (viaShow n)
-- <> braces (pretty xs)
mapM_ link [r1, a1, a2, a3]
mapM_ wait [a1,a2,a3,r1]
waitAnyCatchCancel w1
putStrLn "WAT?"

View File

@ -112,6 +112,7 @@ library
, HBS2.Net.Proto.Sessions
, HBS2.Net.Proto.RefLog
, HBS2.Net.Proto.RefChan
, HBS2.Net.Proto.AnyRef
, HBS2.Net.Proto.Types
, HBS2.OrDie
, HBS2.Prelude

View File

@ -328,8 +328,7 @@ instance ( MonadIO m
when allowed do
sendTo pipe (To peer_e) (From me) (AnyMessage @(Encoded e) @e proto (encode msg))
-- trace $ "REQUEST: after sendTo" <+> viaShow peer_e <+> viaShow msg
trace $ "REQUEST: after sendTo" <+> viaShow peer_e
instance ( Typeable (EventHandler e p (PeerM e IO))
@ -491,7 +490,9 @@ runProto hh = do
for_ messages $ \(From pip, AnyMessage n msg :: AnyMessage (Encoded e) e) -> do
case Map.lookup n disp of
Nothing -> pure () -- FIXME: error counting! and statistics counting feature
Nothing -> do
err $ "PROTO not found" <+> pretty n <+> pretty (fmap fst resp)
pure () -- FIXME: error counting! and statistics counting feature
Just (AnyProtocol { protoDecode = decoder
, handle = h

View File

@ -143,3 +143,16 @@ readBlobFromTree readBlock hr = do
pure $ LBS.concat <$> sequence pieces
readLog :: forall m . ( MonadIO m )
=> ( Hash HbSync -> IO (Maybe ByteString) )
-> HashRef
-> m [HashRef]
readLog getBlock (HashRef h) =
S.toList_ $ do
walkMerkle h (liftIO . getBlock) $ \hr -> do
case hr of
Left{} -> pure ()
Right (hrr :: [HashRef]) -> S.each hrr

View File

@ -55,7 +55,7 @@ defBlockSizeCacheTime :: TimeSpec
defBlockSizeCacheTime = toTimeSpec ( 30 :: Timeout 'Seconds )
defRequestLimitSec :: Timeout 'Seconds
defRequestLimitSec = 60
defRequestLimitSec = 300
defBlockBanTime :: TimeSpec
defBlockBanTime = toTimeSpec defBlockBanTimeSec

View File

@ -292,7 +292,7 @@ spawnConnection tp env so sa = liftIO do
void $ waitAnyCatchCancel [rd,wr]
-- cleanupConn connId
-- lift $ cleanupConn connId
-- gracefulClose so 1000
debug $ "spawnConnection exit" <+> pretty sa

View File

@ -23,11 +23,13 @@ import Data.Set (Set)
import Data.Set qualified as Set
import UnliftIO
data UNIX
data UNIX = UNIX
deriving (Eq,Ord,Show,Generic)
{- HLINT ignore "Use newtype instead of data" -}
data MessagingUnixOpts =
MUWatchdog Int
MUWatchdog Int
| MUFork
deriving (Eq,Ord,Show,Generic,Data)
-- FIXME: use-bounded-queues
@ -118,6 +120,11 @@ runMessagingUnix env = do
liftIO $ bind sock $ SockAddrUnix (msgUnixSockPath env)
liftIO $ listen sock 1
let doFork = Set.member MUFork (msgUnixOpts env)
let withSession | doFork = void . async . runResourceT
| otherwise = void . runResourceT
watchdog <- async $ do
let mwd = headMay [ n | MUWatchdog n <- Set.toList (msgUnixOpts env) ]
@ -144,26 +151,28 @@ runMessagingUnix env = do
atomically $ modifyTVar (msgUnixAccepts env) succ
void $ allocate (pure so) close
withSession do
writer <- async $ forever do
msg <- liftIO . atomically $ readTQueue (msgUnixInbox env)
let len = fromIntegral $ LBS.length msg :: Int
liftIO $ sendAll so $ bytestring32 (fromIntegral len)
liftIO $ sendAll so $ LBS.toStrict msg
void $ allocate (pure so) close
void $ allocate (pure writer) cancel
writer <- async $ forever do
msg <- liftIO . atomically $ readTQueue (msgUnixInbox env)
let len = fromIntegral $ LBS.length msg :: Int
liftIO $ sendAll so $ bytestring32 (fromIntegral len)
liftIO $ sendAll so $ LBS.toStrict msg
link writer
void $ allocate (pure writer) cancel
fix \next -> do
-- FIXME: timeout-hardcode
frameLen <- liftIO $ recv so 4 <&> word32 <&> fromIntegral
frame <- liftIO $ recv so frameLen
atomically $ writeTQueue (msgUnixRecv env) (From (PeerUNIX sa), LBS.fromStrict frame)
now <- getTimeCoarse
atomically $ writeTVar (msgUnixLast env) now
next
link writer
fix \next -> do
-- FIXME: timeout-hardcode
frameLen <- liftIO $ recv so 4 <&> word32 <&> fromIntegral
frame <- liftIO $ recv so frameLen
atomically $ writeTQueue (msgUnixRecv env) (From (PeerUNIX sa), LBS.fromStrict frame)
now <- getTimeCoarse
atomically $ writeTVar (msgUnixLast env) now
next
(_, r) <- waitAnyCatchCancel [run, watchdog]

View File

@ -0,0 +1,35 @@
{-# Language UndecidableInstances #-}
module HBS2.Net.Proto.AnyRef where
import HBS2.Prelude
import HBS2.Hash
import HBS2.Base58
import HBS2.Net.Proto.Types
import HBS2.Data.Types.Refs
import Data.Maybe (fromMaybe)
import Data.Hashable hiding (Hashed)
newtype AnyRefKey t s = AnyRefKey (PubKey 'Sign s)
deriving stock instance IsRefPubKey s => Eq (AnyRefKey n s)
instance (IsRefPubKey s) => Hashable (AnyRefKey t s) where
hashWithSalt s k = hashWithSalt s (hashObject @HbSync k)
instance (IsRefPubKey s) => Hashed HbSync (AnyRefKey t s) where
hashObject (AnyRefKey pk) = hashObject ("anyref|" <> serialise pk)
instance IsRefPubKey s => FromStringMaybe (AnyRefKey t s) where
fromStringMay s = AnyRefKey <$> fromStringMay s
instance IsRefPubKey s => IsString (AnyRefKey t s) where
fromString s = fromMaybe (error "bad public key base58") (fromStringMay s)
instance Pretty (AsBase58 (PubKey 'Sign s)) => Pretty (AsBase58 (AnyRefKey t s)) where
pretty (AsBase58 (AnyRefKey k)) = pretty (AsBase58 k)
instance Pretty (AsBase58 (PubKey 'Sign s)) => Pretty (AnyRefKey n s) where
pretty (AnyRefKey k) = pretty (AsBase58 k)

View File

@ -181,7 +181,7 @@ instance HasProtocol L4Proto (RefChanNotify L4Proto) where
-- но сообщения должны быть разные,
-- тогда и минута нормально.
-- возьмем пока 10 секунд
requestPeriodLim = ReqLimPerMessage 10
requestPeriodLim = NoLimit
instance HasProtocol L4Proto (DialReq L4Proto) where
type instance ProtocolId (DialReq L4Proto) = 96000
@ -202,6 +202,14 @@ instance Serialise (RefChanValidate UNIX) => HasProtocol UNIX (RefChanValidate U
decode = either (const Nothing) Just . deserialiseOrFail
encode = serialise
instance Serialise (RefChanNotify UNIX) => HasProtocol UNIX (RefChanNotify UNIX) where
type instance ProtocolId (RefChanNotify UNIX) = 0xFFFB0001
type instance Encoded UNIX = ByteString
decode = either (const Nothing) Just . deserialiseOrFail
encode = serialise
requestPeriodLim = NoLimit
instance MonadIO m => HasNonces (RefChanValidate UNIX) m where
type instance Nonce (RefChanValidate UNIX) = BS.ByteString
newNonce = do
@ -211,6 +219,9 @@ instance MonadIO m => HasNonces (RefChanValidate UNIX) m where
instance HasTimeLimits UNIX (RefChanValidate UNIX) IO where
tryLockForPeriod _ _ = pure True
instance HasTimeLimits UNIX (RefChanNotify UNIX) IO where
tryLockForPeriod _ _ = pure True
instance Expires (SessionKey L4Proto (BlockInfo L4Proto)) where
expiresIn _ = Just defCookieTimeoutSec

View File

@ -21,6 +21,7 @@ import HBS2.Data.Types.Refs
import HBS2.Data.Types.SignedBox
import HBS2.Actors.Peer.Types
import HBS2.Data.Types.Peer
import HBS2.Net.Messaging.Unix (UNIX)
import HBS2.Storage
import Data.Config.Suckless
@ -67,7 +68,7 @@ data RefChanHeadBlock e =
makeLenses 'RefChanHeadBlockSmall
type ForRefChans e = ( Serialise ( PubKey 'Sign (Encryption e))
, Pretty (AsBase58 (PubKey 'Sign (Encryption e)))
-- , Pretty (AsBase58 (PubKey 'Sign (Encryption e)))
, FromStringMaybe (PubKey 'Sign (Encryption e))
, Signatures (Encryption e)
, Serialise (Signature (Encryption e))
@ -248,10 +249,23 @@ instance Typeable (RefChanRequest e) => Hashable (EventKey e (RefChanRequest e))
where
p = Proxy @(RefChanRequest e)
data RefChanActionRequest =
RefChanAnnounceBlock HashRef
| RefChanFetch HashRef
deriving stock (Generic)
instance Serialise RefChanActionRequest
-- принимается, только если соответствует текущему HEAD
-- не пишется в журнал
data RefChanNotify e =
Notify (RefChanId e) (SignedBox ByteString e) -- подписано ключом автора
Notify (RefChanId e) (SignedBox ByteString e) -- подписано ключом автора
-- довольно уместно будет добавить эти команды сюда -
-- они постоянно нужны, и это сильно упростит коммуникации
| ActionRequest (RefChanId e) RefChanActionRequest
deriving stock (Generic)
instance ForRefChans e => Serialise (RefChanNotify e)
@ -279,7 +293,10 @@ instance ( Serialise (PubKey 'Sign (Encryption e))
, Serialise (Nonce (RefChanValidate e)) )
=> Serialise (RefChanValidate e)
instance (ForRefChans e, Pretty (AsBase58 (Nonce (RefChanValidate e)))) => Pretty (RefChanValidate e) where
instance ( ForRefChans e
, Pretty (AsBase58 (Nonce (RefChanValidate e)))
, Pretty (AsBase58 (PubKey 'Sign (Encryption e)))
) => Pretty (RefChanValidate e) where
pretty (RefChanValidate n c d) = case d of
Validate r -> pretty "validate" <+> pretty (AsBase58 n) <+> pretty (AsBase58 c) <+> pretty r
Accepted r -> pretty "accepted" <+> pretty (AsBase58 n) <+> pretty (AsBase58 c) <+> pretty r
@ -293,6 +310,7 @@ data RefChanAdapter e m =
, refChanSubscribed :: RefChanId e -> m Bool
, refChanWriteTran :: HashRef -> m ()
, refChanValidatePropose :: RefChanId e -> HashRef -> m Bool
, refChanNotifyRely :: RefChanId e -> RefChanNotify e -> m ()
}
class HasRefChanId e p | p -> e where
@ -311,6 +329,7 @@ instance HasRefChanId e (RefChanRequest e) where
instance HasRefChanId e (RefChanNotify e) where
getRefChanId = \case
Notify c _ -> c
ActionRequest c _ -> c
instance HasRefChanId e (RefChanValidate e) where
getRefChanId = rcvChan
@ -747,6 +766,8 @@ refChanRequestProto self adapter msg = do
proto = Proxy @(RefChanRequest e)
-- instance Coercible (SignedBox L4Proto
refChanNotifyProto :: forall e s m . ( MonadIO m
, Request e (RefChanNotify e) m
, Response e (RefChanNotify e) m
@ -769,13 +790,23 @@ refChanNotifyProto :: forall e s m . ( MonadIO m
-> RefChanNotify e
-> m ()
refChanNotifyProto self adapter msg@(ActionRequest rchan a) = do
debug $ "RefChanNotify ACTION REQUEST"
pure ()
refChanNotifyProto self adapter msg@(Notify rchan box) = do
-- аутентифицируем
-- проверяем ACL
-- пересылаем всем
sto <- getStorage
peer <- thatPeer proto
debug $ "&&& refChanNotifyProto" <+> pretty self
let h0 = hashObject @HbSync (serialise msg)
auth <- find (KnownPeerKey peer) id <&> isJust
void $ runMaybeT do
@ -784,20 +815,31 @@ refChanNotifyProto self adapter msg@(Notify rchan box) = do
guard (self || auth)
(authorKey, bs) <- MaybeT $ pure $ unboxSignedBox0 box
deferred proto do
let refchanKey = RefChanHeadKey @s rchan
headBlock <- MaybeT $ getActualRefChanHead @e refchanKey
guard =<< liftIO (hasBlock sto h0 <&> isNothing)
guard $ checkACL headBlock Nothing authorKey
(authorKey, bs) <- MaybeT $ pure $ unboxSignedBox0 box
-- теперь пересылаем по госсипу
lift $ gossip msg
let refchanKey = RefChanHeadKey @s rchan
headBlock <- MaybeT $ getActualRefChanHead @e refchanKey
trace $ "refChanNotifyProto" <+> pretty (BS.length bs)
guard $ checkACL headBlock Nothing authorKey
-- тут надо заслать во внешнее приложение,
-- равно как и в остальных refchan-протоколах
-- FIXME: garbage-collection-required
liftIO $ putBlock sto (serialise msg)
-- теперь пересылаем по госсипу
lift $ gossip msg
debug $ "^^^ refChanNotifyProto" <+> pretty peer <+> pretty h0
-- тут надо заслать во внешнее приложение,
-- равно как и в остальных refchan-протоколах
unless self do
debug $ "^^^ CALL refChanNotifyRely" <+> pretty h0
lift $ refChanNotifyRely adapter rchan msg
where
proto = Proxy @(RefChanNotify e)
@ -808,7 +850,7 @@ getActualRefChanHead :: forall e s m . ( MonadIO m
, HasStorage m
, Signatures s
, IsRefPubKey s
, Pretty (AsBase58 (PubKey 'Sign s))
-- , Pretty (AsBase58 (PubKey 'Sign s))
-- , Serialise (Signature s)
, ForRefChans e
, HasStorage m
@ -830,14 +872,25 @@ getActualRefChanHead key = do
pure hd
Nothing -> do
debug "ABOUT TO FIND HEAD"
h <- MaybeT $ liftIO $ getRef sto key
hdblob <- MaybeT $ readBlobFromTree ( getBlock sto ) (HashRef h)
(_, headblk) <- MaybeT $ pure $ unboxSignedBox @(RefChanHeadBlock e) @e hdblob
lift $ update headblk (RefChanHeadBlockKey key) id -- set found head
headblk <- MaybeT $ getRefChanHead sto key
debug "HEAD FOUND"
pure headblk
getRefChanHead :: forall e s m . ( MonadIO m
, s ~ Encryption e
, ForRefChans e
, Signatures s
)
=> AnyStorage
-> RefChanHeadKey s
-> m (Maybe (RefChanHeadBlock e))
getRefChanHead sto k = runMaybeT do
h <- MaybeT $ liftIO $ getRef sto k
hdblob <- MaybeT $ readBlobFromTree ( getBlock sto ) (HashRef h)
(_, headblk) <- MaybeT $ pure $ unboxSignedBox @(RefChanHeadBlock e) @e hdblob
pure headblk
makeProposeTran :: forall e s m . ( MonadIO m
, ForRefChans e
, Signatures (Encryption e)
@ -879,7 +932,7 @@ instance ForRefChans e => FromStringMaybe (RefChanHeadBlock e) where
| (ListVal [SymbolVal "author", LitStrVal s] ) <- parsed
]
instance ForRefChans e => Pretty (RefChanHeadBlock e) where
instance (ForRefChans e, Pretty (AsBase58 (PubKey 'Sign (Encryption e)))) => Pretty (RefChanHeadBlock e) where
pretty blk = parens ("version" <+> pretty (view refChanHeadVersion blk)) <> line
<>
parens ("quorum" <+> pretty (view refChanHeadQuorum blk)) <> line

View File

@ -17,6 +17,12 @@ instance MonadIO m => OrDie m (Maybe a) where
Nothing -> liftIO $ die err
Just x -> pure x
instance MonadIO m => OrDie m (Either a b) where
type instance OrDieResult (Either a b) = b
orDie mv err = mv >>= \case
Left{} -> liftIO $ die err
Right x -> pure x
instance MonadIO m => OrDie m ExitCode where
type instance OrDieResult ExitCode = ()
orDie mv err = mv >>= \case

View File

@ -203,6 +203,7 @@ cleanupPostponed b h = do
instance ( Hashable (Peer e)
, Pretty (Peer e), Pretty (PeerAddr e)
, Pretty (AsBase58 (PubKey 'Sign (Encryption e)))
, e ~ L4Proto
, ForRefChans e
) => HasBrains e (BasicBrains e) where
@ -803,6 +804,7 @@ newBasicBrains cfg = liftIO do
runBasicBrains :: forall e m . ( e ~ L4Proto
, MonadUnliftIO m
, ForRefChans e
, Pretty (AsBase58 (PubKey 'Sign (Encryption L4Proto)))
)
=> PeerConfig
-> BasicBrains e

View File

@ -11,7 +11,7 @@ import HBS2.Events
import PeerTypes
import PeerConfig
import RefLog ( doRefLogBroadCast )
import RefLog ( doRefLogBroadCast )
import Data.Functor
import Data.ByteString.Lazy qualified as LBS
@ -73,7 +73,7 @@ httpWorker conf pmeta e = do
va <- liftIO $ getRef sto (RefLogKey @s ref)
maybe1 va (status status404) $ \val -> do
text [qc|{pretty val}|]
post "/reflog" do
bs <- LBS.take 4194304 <$> body
let msg' =

View File

@ -616,15 +616,18 @@ runPeer opts = Exception.handle (\e -> myException e
pause @'Seconds 600
liftIO $ Cache.purgeExpired nbcache
rce <- refChanWorkerEnv conf denv
rce <- refChanWorkerEnv conf penv denv
let refChanAdapter = RefChanAdapter
{ refChanOnHead = refChanOnHeadFn rce
, refChanSubscribed = isPolledRef @e brains
, refChanWriteTran = refChanWriteTranFn rce
, refChanValidatePropose = refChanValidateTranFn @e rce
, refChanNotifyRely = refChanNotifyRelyFn @e rce
}
rcw <- async $ liftIO $ runRefChanRelyWorker rce refChanAdapter
let pexFilt pips = do
tcpex <- listTCPPexCandidates @e brains <&> HashSet.fromList
fset <- forM pips $ \p -> do

View File

@ -525,9 +525,25 @@ instance (ForGossip e p (PeerM e IO)) => HasGossip e p (PeerM e IO) where
broadCastMessage msg
instance (ForGossip e p (ResponseM e m), HasGossip e p m) => HasGossip e p (ResponseM e m) where
gossip = lift . gossip
gossip msg = do
that <- thatPeer (Proxy @p)
forKnownPeers $ \pip _ -> do
unless (that == pip) do
request @e pip msg
simpleBlockAnnounce :: forall e m . ( Monad m
, HasPeerNonce e m
)
=> Integer
-> Hash HbSync
-> m (BlockAnnounce e)
simpleBlockAnnounce size h = do
no <- peerNonce @e
let annInfo = BlockAnnounceInfo 0 NoBlockInfoMeta size h
pure $ BlockAnnounce @e no annInfo
data TRACE1
instance HasLogLevel TRACE1 where

View File

@ -7,7 +7,9 @@ module RefChan (
, refChanOnHeadFn
, refChanWriteTranFn
, refChanValidateTranFn
, refChanNotifyRelyFn
, refChanWorker
, runRefChanRelyWorker
, refChanWorkerEnv
, refChanNotifyOnUpdated
) where
@ -16,8 +18,10 @@ import HBS2.Prelude.Plated
import HBS2.Actors.Peer
import HBS2.Base58
import HBS2.Hash
import HBS2.Clock
import HBS2.Data.Detect
import HBS2.Defaults
import HBS2.Data.Types.Refs
import HBS2.Data.Types.SignedBox
import HBS2.Events
@ -38,27 +42,28 @@ import PeerConfig
import BlockDownload
import Brains
import Data.Dynamic
import Codec.Serialise
import Control.Concurrent.STM (flushTQueue)
import Control.Exception ()
import Control.Monad.Except (throwError, runExceptT)
import Control.Monad.Except ()
import Control.Monad.Reader
import Control.Monad.Trans.Maybe
import Data.ByteString.Lazy (ByteString)
import Data.ByteString.Lazy qualified as LBS
import Data.Cache (Cache)
import Data.Cache qualified as Cache
import Data.Coerce
import Data.ByteString (ByteString)
import Data.Either
import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HashMap
import Data.HashSet (HashSet)
import Data.HashSet qualified as HashSet
import Data.Heap ()
import Data.Coerce
-- import Data.Heap qualified as Heap
import Data.List qualified as List
import Data.Maybe
import Data.Text qualified as Text
import Lens.Micro.Platform
import Data.Generics.Product
import UnliftIO
import Streaming.Prelude qualified as S
@ -78,30 +83,52 @@ data RefChanValidator =
, rcvAsync :: Async ()
}
data RefChanNotifier =
RefChanNotifier
{ rcnPeer :: Peer UNIX
, rcnInbox :: TQueue (RefChanNotify UNIX)
, rcnAsync :: Async ()
}
data RefChanWorkerEnv e =
RefChanWorkerEnv
{ _refChanWorkerConf :: PeerConfig
, _refChanWorkerEnvDEnv :: DownloadEnv e
, _refChanWorkerEnvHeadQ :: TQueue (RefChanId e, RefChanHeadBlockTran e)
, _refChanWorkerEnvDownload :: TVar (HashMap HashRef (RefChanId e, (TimeSpec, OnDownloadComplete)))
, _refChanWorkerEnvNotify :: TVar (HashMap (RefChanId e) ())
, _refChanWorkerEnvWriteQ :: TQueue HashRef
, _refChanWorkerValidators :: TVar (HashMap (RefChanId e) RefChanValidator)
{ _refChanWorkerConf :: PeerConfig
, _refChanPeerEnv :: PeerEnv e
, _refChanWorkerEnvDEnv :: DownloadEnv e
, _refChanWorkerEnvHeadQ :: TQueue (RefChanId e, RefChanHeadBlockTran e)
, _refChanWorkerEnvDownload :: TVar (HashMap HashRef (RefChanId e, (TimeSpec, OnDownloadComplete)))
, _refChanWorkerEnvNotify :: TVar (HashMap (RefChanId e) ())
, _refChanWorkerEnvWriteQ :: TQueue HashRef
, _refChanWorkerValidators :: TVar (HashMap (RefChanId e) RefChanValidator)
-- FIXME: peer-addr-to-have-multiple-actors-on-single-box
-- нужно ключом держать Peer e (SockAddr)
-- что бы можно было завести несколько акторов на одном
-- боксе в целях отладки.
, _refChanWorkerNotifiers :: TVar (HashMap (RefChanId e) [RefChanNotifier])
, _refChanWorkerNotifiersInbox :: TQueue (RefChanNotify e) -- ^ to rely messages from clients to gossip
, _refChanWorkerNotifiersDone :: Cache (Hash HbSync) ()
, _refChanWorkerLocalRelyDone :: Cache (Peer UNIX, Hash HbSync) ()
}
makeLenses 'RefChanWorkerEnv
refChanWorkerEnv :: forall m e . (MonadIO m, ForRefChans e)
=> PeerConfig
-> PeerEnv e
-> DownloadEnv e
-> m (RefChanWorkerEnv e)
refChanWorkerEnv conf de = liftIO $ RefChanWorkerEnv @e conf de
refChanWorkerEnv conf pe de = liftIO $ RefChanWorkerEnv @e conf pe de
<$> newTQueueIO
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> newTQueueIO
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> newTQueueIO
<*> Cache.newCache (Just defRequestLimit)
<*> Cache.newCache (Just defRequestLimit)
refChanOnHeadFn :: forall e m . (ForRefChans e, MonadIO m) => RefChanWorkerEnv e -> RefChanId e -> RefChanHeadBlockTran e -> m ()
refChanOnHeadFn env chan tran = do
@ -145,6 +172,29 @@ refChanNotifyOnUpdated :: (MonadIO m, ForRefChans e) => RefChanWorkerEnv e -> Re
refChanNotifyOnUpdated env chan = do
atomically $ modifyTVar (_refChanWorkerEnvNotify env) (HashMap.insert chan ())
refChanNotifyRelyFn :: forall e m . ( MonadUnliftIO m
, ForRefChans e, e ~ L4Proto
)
=> RefChanWorkerEnv e
-> RefChanId e
-> RefChanNotify e
-> m ()
refChanNotifyRelyFn env chan msg@(Notify _ (SignedBox k box s)) = do
debug "refChanNotifyRelyFn"
-- FIXME: multiple-hash-object-msg-performance
let h0 = hashObject @HbSync (serialise msg)
void $ runMaybeT do
guard =<< liftIO (Cache.lookup (view refChanWorkerNotifiersDone env) h0 <&> isNothing)
liftIO $ Cache.insert (view refChanWorkerNotifiersDone env) h0 ()
-- RefChanNotifier q _ <- MaybeT $ liftIO (readTVarIO (view refChanWorkerNotifiers env) <&> HashMap.lookup chan)
notifiers <- MaybeT $ liftIO (readTVarIO (view refChanWorkerNotifiers env) <&> HashMap.lookup chan)
forM_ notifiers $ \(RefChanNotifier _ q _) -> do
atomically $ writeTQueue q (Notify @UNIX chan (SignedBox k box s))
refChanAddDownload :: forall e m . ( m ~ PeerM e IO
, MyPeer e
)
@ -164,16 +214,143 @@ refChanAddDownload env chan r onComlete = do
readLog :: forall m . ( MonadUnliftIO m )
=> AnyStorage
-> HashRef
-> m [HashRef]
readLog sto (HashRef h) =
S.toList_ $ do
walkMerkle h (liftIO . getBlock sto) $ \hr -> do
case hr of
Left{} -> pure ()
Right (hrr :: [HashRef]) -> S.each hrr
data NotifyEnv =
NotifyEnv
{ _notifyClient :: Fabriq UNIX
, _notifySelf :: Peer UNIX
}
newtype NotifyProtoM m a = NotifyProto { fromNotifyProto :: ReaderT NotifyEnv m a }
deriving newtype ( Functor
, Applicative
, Monad
, MonadIO
, MonadUnliftIO
, MonadReader NotifyEnv
, MonadTrans
)
runNotifyProtoM :: (MonadIO m, PeerMessaging UNIX) => MessagingUnix -> NotifyProtoM m a -> m a
runNotifyProtoM bus m = runReaderT (fromNotifyProto m) (NotifyEnv (Fabriq bus) (msgUnixSelf bus))
instance Monad m => HasFabriq UNIX (NotifyProtoM m) where
getFabriq = asks _notifyClient
instance Monad m => HasOwnPeer UNIX (NotifyProtoM m) where
ownPeer = asks _notifySelf
refChanNotifyRpcProto :: forall e m . ( MonadIO m
, Request e (RefChanNotify e) m
-- , HasPeerNonce UNIX m
, e ~ UNIX
-- , m ~ PeerM e IO
)
=> RefChanWorkerEnv L4Proto
-> RefChanNotify e
-> m ()
refChanNotifyRpcProto env msg@(ActionRequest chan action) = do
let penv = view refChanPeerEnv env
case action of
RefChanAnnounceBlock h -> do
debug $ "RefChanNotify: RefChanAnnounceBlock" <+> pretty h
liftIO $ withPeerM penv $ do
sto <- getStorage
sz' <- liftIO $ hasBlock sto (fromHashRef h)
maybe1 sz' none $ \sz -> do
ann <- simpleBlockAnnounce @L4Proto sz (fromHashRef h)
gossip ann
pure ()
RefChanFetch h -> do
debug $ "RefChanNotify: RefChanFetch" <+> pretty h
liftIO $ withPeerM penv $ do
refChanAddDownload env chan h (const $ pure ())
where
proto = Proxy @(RefChanNotify e)
refChanNotifyRpcProto env msg@(Notify chan (SignedBox pk box si)) = do
debug "GOT MESSAGE FROM CLIENT"
atomically $ writeTQueue (view refChanWorkerNotifiersInbox env) (Notify @L4Proto chan (SignedBox pk box si))
-- тут мы должны переслать всем, кроме отправителя
let h0 = hashObject @HbSync (serialise msg)
-- FIXME: squash-this-copypaste
void $ runMaybeT do
notifiers <- MaybeT $ liftIO (readTVarIO (view refChanWorkerNotifiers env) <&> HashMap.lookup chan)
forM_ notifiers $ \(RefChanNotifier peer q _) -> do
let lkey = (peer, h0)
-- guard =<< liftIO (Cache.lookup (view refChanWorkerLocalRelyDone env) lkey <&> isNothing)
-- liftIO $ Cache.insert (view refChanWorkerLocalRelyDone env) lkey ()
atomically $ writeTQueue q msg
refChanWorkerInitNotifiers :: forall e m . ( MonadIO m
, MonadUnliftIO m
, MyPeer e
-- , ForRefChans e
-- , ForRefChans UNIX
-- , m ~ PeerM e IO
, e ~ L4Proto
)
=> RefChanWorkerEnv e
-> m ()
refChanWorkerInitNotifiers env = do
debug "refChanWorkerInitNotifiers"
let (PeerConfig syn) = view refChanWorkerConf env
let notifiers = [ mkV rc x | ListVal [ SymbolVal "notify"
, SymbolVal "refchan"
, LitStrVal rc
, ListVal [ SymbolVal "socket", SymbolVal "unix", LitStrVal x ]
] <- syn
] & catMaybes
forM_ notifiers $ \(rc, sa) -> do
debug $ "** NOTIFIER FOR" <+> pretty (AsBase58 rc, sa)
q <- newTQueueIO
val <- async $ liftIO $ notifierThread rc sa q
let rcn = RefChanNotifier (fromString sa) q val
atomically $ modifyTVar (_refChanWorkerNotifiers env) (HashMap.insertWith (<>) rc [rcn])
where
mkV :: Text -> Text -> Maybe (RefChanId e, String)
mkV rc x = (,Text.unpack x) <$> fromStringMay @(RefChanId e) (Text.unpack rc)
notifierThread _ sa q = do
debug $ ">>> NOTIFIER THREAD FOR" <+> pretty sa
client <- newMessagingUnix False 1.0 sa
msg <- async $ runMessagingUnix client
runNotifyProtoM client do
proto <- async $ runProto [ makeResponse (refChanNotifyRpcProto env) ]
forever do
req <- atomically $ readTQueue q
debug "Rely notification request"
request @UNIX (fromString sa) req
wait proto
mapM_ wait [msg]
data ValidateEnv =
ValidateEnv
@ -181,7 +358,7 @@ data ValidateEnv =
, _validateSelf :: Peer UNIX
}
newtype ValidateProtoM m a = PingPongM { fromValidateProto :: ReaderT ValidateEnv m a }
newtype ValidateProtoM m a = ValidateProto { fromValidateProto :: ReaderT ValidateEnv m a }
deriving newtype ( Functor
, Applicative
, Monad
@ -226,7 +403,6 @@ refChanValidateProto waiters msg = do
maybe1 mbAnsw none $ \answ -> do
putMVar answ m
refChanWorkerInitValidators :: forall e m . ( MonadIO m
, MonadUnliftIO m
-- , MyPeer e
@ -276,6 +452,9 @@ refChanWorkerInitValidators env = do
-- FIXME: hardcoded-timeout
waiters <- Cache.newCache (Just (toTimeSpec (10 :: Timeout 'Seconds)))
-- FIXME: hardcoded-timeout
waiters <- Cache.newCache (Just (toTimeSpec (10 :: Timeout 'Seconds)))
runValidateProtoM client do
poke <- async $ forever do
@ -302,6 +481,13 @@ refChanWorkerInitValidators env = do
let pa = fromString sa
request pa req
Poke{} -> do
debug "DO SEND POKE"
let pa = fromString sa
pure ()
--
-- request pa req
_ -> pure ()
@ -311,6 +497,22 @@ refChanWorkerInitValidators env = do
cancel msg
warn $ "validatorThread is terminated for some reasons" <+> pretty (AsBase58 chan)
runRefChanRelyWorker :: forall e m .
( MonadIO m
, m ~ PeerM e IO
, e ~ L4Proto
)
=> RefChanWorkerEnv e
-> RefChanAdapter e (ResponseM e m)
-> IO ()
runRefChanRelyWorker env adapter = liftIO $ forever do
withPeerM (view refChanPeerEnv env) do
me <- ownPeer @e
-- FIXME: use-bounded-queue-ASAP
mess <- atomically $ readTQueue (view refChanWorkerNotifiersInbox env)
runResponseM me $ do
refChanNotifyProto True adapter mess
refChanWorker :: forall e s m . ( MonadIO m
, MonadUnliftIO m
@ -361,6 +563,7 @@ refChanWorker env brains = do
sto <- getStorage
liftIO $ refChanWorkerInitValidators env
liftIO $ refChanWorkerInitNotifiers env
subscribe @e RefChanRequestEventKey $ \(RefChanRequestEvent chan val) -> do
debug $ "RefChanRequestEvent" <+> pretty (AsBase58 chan) <+> pretty val
@ -443,7 +646,7 @@ refChanWorker env brains = do
forM_ (HashMap.toList byChan) $ \(c,new) -> do
mbLog <- liftIO $ getRef sto c
hashes <- maybe1 mbLog (pure mempty) $ readLog sto . HashRef
hashes <- maybe1 mbLog (pure mempty) $ readLog (getBlock sto) . HashRef
-- FIXME: might-be-problems-on-large-logs
let hashesNew = HashSet.fromList (hashes <> new) & HashSet.toList
@ -575,6 +778,7 @@ logMergeProcess :: forall e s m . ( MonadUnliftIO m
, ForRefChans e
, HasStorage m
, Signatures s
, Pretty (AsBase58 (PubKey 'Sign s))
, s ~ Encryption e
, m ~ PeerM e IO
)
@ -637,18 +841,25 @@ logMergeProcess env q = do
penv <- ask
let readFn = getBlock sto
void $ runMaybeT do
let chanKey = RefChanLogKey @s chan
h <- MaybeT $ liftIO $ getRef sto chanKey
-- FIXME: wont-work-if-no-reference-yet
-- не сработает если ссылка новая
(mergeSet, merge) <- liftIO (getRef sto chanKey) >>= \case
Nothing -> do
new <- mconcat <$> mapM (lift . readLog readFn) logs
pure (HashSet.fromList new, not (List.null new))
current <- lift $ readLog sto (HashRef h) <&> HashSet.fromList
-- trans <- filter (not . flip HashSet.member current) . mconcat <$> mapM (lift . readLog sto) logs
trans <- mconcat <$> mapM (lift . readLog sto) (filter (/= HashRef h) logs)
guard (not $ List.null trans)
Just h -> do
current <- lift $ readLog readFn (HashRef h) <&> HashSet.fromList
new <- mconcat <$> mapM (lift . readLog readFn) (filter (/= HashRef h) logs)
let mergeSet = HashSet.fromList new <> current
pure (mergeSet, not (List.null new))
guard merge
-- итак, тут приехал весь лог, который есть у пира
-- логично искать подтверждения только в нём. если
@ -667,15 +878,15 @@ logMergeProcess env q = do
-- потом, если всё ок -- пишем accept-ы и propose-ы у которых
-- больше quorum подтверждений для актуальной головы
let mergeSet = (HashSet.fromList trans <> current) & HashSet.toList
let mergeList = HashSet.toList mergeSet
-- если какие-то транзакции отсутствуют - пытаемся их скачать
-- и надеемся на лучшее (лог сойдется в следующий раз)
forM_ mergeSet $ \href -> do
forM_ mergeList $ \href -> do
mblk <- liftIO $ getBlock sto (fromHashRef href)
maybe1 mblk (lift $ refChanAddDownload env chan href dontHandle) dontHandle
r <- forM mergeSet $ \href -> runMaybeT do
r <- forM mergeList $ \href -> runMaybeT do
blk <- MaybeT $ liftIO $ getBlock sto (fromHashRef href)

View File

@ -377,8 +377,8 @@ simpleReadLinkRaw ss hash = do
let fn = simpleRefFileName ss hash
rs <- spawnAndWait ss $ do
-- FIXME: log-this-situation
(Just <$> LBS.readFile fn) `catchAny` \_ -> do
err $ "simpleReadLinkRaw" <+> pretty hash <+> pretty fn
(Just <$> LBS.readFile fn) `catchAny` \e -> do
err $ "simpleReadLinkRaw" <+> pretty hash <+> pretty fn <+> viaShow e
pure Nothing
pure $ fromMaybe Nothing rs
@ -396,8 +396,8 @@ simpleReadLinkVal :: ( IsKey h
simpleReadLinkVal ss hash = do
let fn = simpleRefFileName ss hash
rs <- spawnAndWait ss $ do
(Just <$> BS.readFile fn) `catchAny` \_ -> do
err $ "simpleReadLinkVal" <+> pretty hash <+> pretty fn
(Just <$> BS.readFile fn) `catchAny` \e -> do
err $ "simpleReadLinkVal" <+> pretty hash <+> pretty fn <+> viaShow e
pure Nothing
runMaybeT do
@ -422,7 +422,7 @@ instance ( MonadIO m, IsKey hash
updateRef ss ref v = do
let refHash = hashObject @hash ref
-- liftIO $ print $ "updateRef:" <+> pretty refHash
debug $ "updateRef:" <+> pretty refHash
void $ liftIO $ simpleWriteLinkRawRef ss refHash v
getRef ss ref = do

View File

@ -10,6 +10,7 @@ import HBS2.Net.Auth.AccessKey
import HBS2.Net.Auth.Credentials
import HBS2.Net.Proto.Definition()
import HBS2.Net.Proto.RefLog(RefLogKey(..))
import HBS2.Net.Proto.AnyRef(AnyRefKey(..))
import HBS2.Prelude.Plated
import HBS2.Storage.Simple
import HBS2.Storage.Simple.Extra
@ -356,6 +357,13 @@ runRefLogGet s ss = do
print $ pretty ref
exitSuccess
runAnyRefGet :: forall s t . IsRefPubKey s => AnyRefKey t s -> SimpleStorage HbSync -> IO ()
runAnyRefGet s ss = do
ref' <- getRef ss s
maybe1 ref' exitFailure $ \ref -> do
print $ pretty ref
exitSuccess
withStore :: Data opts => opts -> ( SimpleStorage HbSync -> IO () ) -> IO ()
withStore opts f = do
@ -473,6 +481,14 @@ main = join . customExecParser (prefs showHelpOnError) $
reflogs <- strArgument ( metavar "REFLOG" )
pure $ withStore o (runRefLogGet @HBS2Basic reflogs)
pAnyRef = hsubparser ( command "get" (info pAnyRefGet (progDesc "get anyref value") ) )
pAnyRefGet = do
o <- common
anyref <- strArgument ( metavar "ANYREF" )
pure $ withStore o (runAnyRefGet @HBS2Basic anyref)
pFsck = do
o <- common
pure $ withStore o $ \sto -> do

6
test/refchan/author1.key Normal file
View File

@ -0,0 +1,6 @@
# hbs2 credentials file
# keep it private
rmbTHUgvHqhSEsxHBwheyJbnhxMpzrBoTYd7pPkwL5k3haMDRChR4e38FYqz
xmJPbJPV1UhxBszq91uSgp5vrSxaTWfVqGREE5f1Z3ySQJhBMoSr2wSGsCjF
vaAzkBK9c3GrRr4kQB5yqTAko1

6
test/refchan/author2.key Normal file
View File

@ -0,0 +1,6 @@
# hbs2 credentials file
# keep it private
rmbTHUgvbsubKopKR5wWpsMFKpCPbtUQ1zcMoLa5xJfy6RgPfY8k5EVC1CXR
BjgaoXEqtFwcshGrrkXoBqsRpVwBLEgeZcLuMGxe23PxCzJoqZciovbeVCuH
45ApvdBFzeTcppYSZ9Q6GQqWFq

6
test/refchan/author3.key Normal file
View File

@ -0,0 +1,6 @@
# hbs2 credentials file
# keep it private
rmbTHUgvSXaWW2Zx1RtLbGq5qYFRAzbs1GFv5eCzcypKrAsBdPZMpZUgZ42C
e9kS7oUM2r82jfesa9B22MiAfjkfKzwMN8anEj4KAJfPMYVCvTM7DraqQhtV
CfLooy4smrB9scBEznvWfCGPSw

6
test/refchan/owner.key Normal file
View File

@ -0,0 +1,6 @@
# hbs2 credentials file
# keep it private
rmbTHUgvShBTLfScwWwDj9c8MNF77Q8gwRvBTL9L3dBK57pzCsnD9FRyHnqa
sMHzNDVgrYqMqm1R9obVNSr7yxXs8uW6T2xw6dZ2n6fDACaswcQrUm46j5xQ
qTyGWUvLAQF2FoJcYcNPTZkpqD