mirror of https://github.com/voidlizard/hbs2
wip, log merging, debug
This commit is contained in:
parent
f31846f848
commit
003fc43cc1
|
@ -546,16 +546,19 @@ refChanUpdateProto self pc adapter msg = do
|
||||||
where
|
where
|
||||||
proto = Proxy @(RefChanUpdate e)
|
proto = Proxy @(RefChanUpdate e)
|
||||||
|
|
||||||
checkACL :: RefChanHeadBlock e
|
|
||||||
-> PubKey 'Sign s
|
checkACL :: forall e s . (Encryption e ~ s, ForRefChans e)
|
||||||
-> PubKey 'Sign s
|
=> RefChanHeadBlock e
|
||||||
-> Bool
|
-> PubKey 'Sign s
|
||||||
checkACL theHead peerKey authorKey = match
|
-> PubKey 'Sign s
|
||||||
where
|
-> Bool
|
||||||
pips = view refChanHeadPeers theHead
|
|
||||||
aus = view refChanHeadAuthors theHead
|
checkACL theHead peerKey authorKey = match
|
||||||
match = peerKey `HashMap.member` pips
|
where
|
||||||
&& authorKey `HashSet.member` aus
|
pips = view refChanHeadPeers theHead
|
||||||
|
aus = view refChanHeadAuthors theHead
|
||||||
|
match = peerKey `HashMap.member` pips
|
||||||
|
&& authorKey `HashSet.member` aus
|
||||||
|
|
||||||
-- TODO: refchan-poll-proto
|
-- TODO: refchan-poll-proto
|
||||||
-- Запрашиваем refchan у всех.
|
-- Запрашиваем refchan у всех.
|
||||||
|
|
|
@ -15,7 +15,7 @@ import HBS2.Prelude.Plated
|
||||||
|
|
||||||
import HBS2.Actors.Peer
|
import HBS2.Actors.Peer
|
||||||
import HBS2.Base58
|
import HBS2.Base58
|
||||||
import HBS2.Hash
|
import HBS2.Merkle
|
||||||
import HBS2.Clock
|
import HBS2.Clock
|
||||||
import HBS2.Events
|
import HBS2.Events
|
||||||
import HBS2.Net.Proto.Peer
|
import HBS2.Net.Proto.Peer
|
||||||
|
@ -367,10 +367,22 @@ refChanWorker env brains = do
|
||||||
|
|
||||||
pure ()
|
pure ()
|
||||||
|
|
||||||
|
data MergeEnv e =
|
||||||
|
MergeEnv
|
||||||
|
{ mergeSto :: AnyStorage
|
||||||
|
, mergeHeads :: TVar (HashMap HashRef (RefChanHeadBlock e) )
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
-- FIXME: possible-performance-issues
|
||||||
|
-- Выглядит довольно медленно. Вероятно,
|
||||||
|
-- можно быстрее.
|
||||||
|
-- В частности, кэшировать уже обработанные логи
|
||||||
logMergeProcess :: forall e s m . ( MonadUnliftIO m
|
logMergeProcess :: forall e s m . ( MonadUnliftIO m
|
||||||
, MyPeer e
|
, MyPeer e
|
||||||
, ForRefChans e
|
, ForRefChans e
|
||||||
, HasStorage m
|
, HasStorage m
|
||||||
|
, Signatures s
|
||||||
, s ~ Encryption e
|
, s ~ Encryption e
|
||||||
)
|
)
|
||||||
=> RefChanWorkerEnv e
|
=> RefChanWorkerEnv e
|
||||||
|
@ -381,6 +393,8 @@ logMergeProcess _ q = do
|
||||||
|
|
||||||
sto <- getStorage
|
sto <- getStorage
|
||||||
|
|
||||||
|
menv <- MergeEnv sto <$> newTVarIO mempty
|
||||||
|
|
||||||
forever do
|
forever do
|
||||||
-- FIXME: fix-hardcoded-timeout
|
-- FIXME: fix-hardcoded-timeout
|
||||||
pause @'Seconds 1
|
pause @'Seconds 1
|
||||||
|
@ -389,13 +403,33 @@ logMergeProcess _ q = do
|
||||||
let byChan = HashMap.fromListWith (<>) [ (x,[y]) | (x,y) <- logs ] & HashMap.toList
|
let byChan = HashMap.fromListWith (<>) [ (x,[y]) | (x,y) <- logs ] & HashMap.toList
|
||||||
|
|
||||||
-- FIXME: in-parallel
|
-- FIXME: in-parallel
|
||||||
mapM_ (logMergeChan sto) byChan
|
mapM_ (logMergeChan menv sto) byChan
|
||||||
|
|
||||||
where
|
where
|
||||||
|
|
||||||
logMergeChan sto (chan, logs) = runMaybeT do
|
getHead :: MergeEnv e
|
||||||
|
-> HashRef
|
||||||
|
-> m (Maybe (RefChanHeadBlock e))
|
||||||
|
|
||||||
h <- MaybeT $ liftIO $ getRef sto (RefChanLogKey @s chan)
|
getHead e h = do
|
||||||
|
|
||||||
|
let sto = mergeSto e
|
||||||
|
hd <- readTVarIO (mergeHeads e) <&> HashMap.lookup h
|
||||||
|
|
||||||
|
case hd of
|
||||||
|
Just x -> pure (Just x)
|
||||||
|
Nothing -> runMaybeT do
|
||||||
|
hdblob <- MaybeT $ readBlobFromTree ( liftIO . getBlock sto ) h
|
||||||
|
(_, headblk) <- MaybeT $ pure $ unboxSignedBox @(RefChanHeadBlock e) @e hdblob
|
||||||
|
atomically $ modifyTVar (mergeHeads e) (HashMap.insert h headblk)
|
||||||
|
pure headblk
|
||||||
|
|
||||||
|
|
||||||
|
logMergeChan menv sto (chan, logs) = void $ runMaybeT do
|
||||||
|
|
||||||
|
let chanKey = RefChanLogKey @s chan
|
||||||
|
|
||||||
|
h <- MaybeT $ liftIO $ getRef sto chanKey
|
||||||
|
|
||||||
current <- lift $ readLog sto (HashRef h) <&> HashSet.fromList
|
current <- lift $ readLog sto (HashRef h) <&> HashSet.fromList
|
||||||
|
|
||||||
|
@ -403,8 +437,61 @@ logMergeProcess _ q = do
|
||||||
|
|
||||||
trans <- filter (not . flip HashSet.member current) . mconcat <$> mapM (lift . readLog sto) logs
|
trans <- filter (not . flip HashSet.member current) . mconcat <$> mapM (lift . readLog sto) logs
|
||||||
|
|
||||||
forM_ trans $ \t -> do
|
-- итак, тут приехал весь лог, который есть у пира
|
||||||
|
-- логично искать подтверждения только в нём. если
|
||||||
|
-- пир принял транзы без достаточного количества
|
||||||
|
-- подтверждений, то он сам лошара.
|
||||||
|
-- каждую транзу рассматриваем один раз, если
|
||||||
|
-- она смержена.
|
||||||
|
-- если она не смержена --- может быть, надо её
|
||||||
|
-- в какой-то reject список заносить
|
||||||
|
|
||||||
debug $ "ABOUT TO MERGE TRANS" <+> pretty t
|
-- распаковать, отсортировать по головам сначала
|
||||||
|
-- потом бежим по головам, достаём головы
|
||||||
|
-- проверяем acl-ы на соответствие историческим головам
|
||||||
|
-- потом связываем каждый accept с соответствующим propose
|
||||||
|
-- потом считаем количество accept для каждого propose
|
||||||
|
-- потом, если всё ок -- пишем accept-ы и propose-ы у которых
|
||||||
|
-- больше quorum подтверждений для актуальной головы
|
||||||
|
|
||||||
|
r <- forM trans $ \href -> runMaybeT do
|
||||||
|
blk <- MaybeT $ liftIO $ getBlock sto (fromHashRef href)
|
||||||
|
tran <- MaybeT $ pure $ deserialiseOrFail @(RefChanUpdate e) blk
|
||||||
|
& either (const Nothing) Just
|
||||||
|
case tran of
|
||||||
|
Propose _ box -> do
|
||||||
|
(pk, ProposeTran headRef box) <- MaybeT $ pure $ unboxSignedBox0 box
|
||||||
|
(ak, _) <- MaybeT $ pure $ unboxSignedBox0 box
|
||||||
|
hd <- MaybeT $ lift $ getHead menv headRef
|
||||||
|
let quo = view refChanHeadQuorum hd & fromIntegral
|
||||||
|
guard $ checkACL hd pk ak
|
||||||
|
pure [(href, (quo,mempty))]
|
||||||
|
|
||||||
|
Accept _ box -> do
|
||||||
|
(pk, AcceptTran headRef hashRef) <- MaybeT $ pure $ unboxSignedBox0 box
|
||||||
|
hd <- MaybeT $ lift $ getHead menv headRef
|
||||||
|
let quo = view refChanHeadQuorum hd & fromIntegral
|
||||||
|
guard $ HashMap.member pk (view refChanHeadPeers hd)
|
||||||
|
pure [(href, (quo,[hashRef]))]
|
||||||
|
|
||||||
|
let merge1 (q1, hs1) (q2, hs2) = (max q1 q2, List.nub (hs1 <> hs2) )
|
||||||
|
|
||||||
|
let permitted = HashMap.fromListWith merge1 (mconcat (catMaybes r))
|
||||||
|
& HashMap.toList
|
||||||
|
|
||||||
|
new <- S.toList_ do
|
||||||
|
forM_ permitted $ \(prop, (qx, accs)) -> do
|
||||||
|
when (length accs >= qx) do
|
||||||
|
S.yield prop
|
||||||
|
S.each accs
|
||||||
|
|
||||||
|
let merged = HashSet.union current (HashSet.fromList new) & HashSet.toList
|
||||||
|
|
||||||
|
let pt = toPTree (MaxSize 256) (MaxNum 256) merged
|
||||||
|
|
||||||
|
liftIO do
|
||||||
|
nref <- makeMerkle 0 pt $ \(_,_,bss) -> do
|
||||||
|
void $ putBlock sto bss
|
||||||
|
|
||||||
|
updateRef sto chanKey nref
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue