diff --git a/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs b/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs index b78043de..8ea2f57f 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs @@ -546,16 +546,19 @@ refChanUpdateProto self pc adapter msg = do where proto = Proxy @(RefChanUpdate e) - checkACL :: RefChanHeadBlock e - -> PubKey 'Sign s - -> PubKey 'Sign s - -> Bool - checkACL theHead peerKey authorKey = match - where - pips = view refChanHeadPeers theHead - aus = view refChanHeadAuthors theHead - match = peerKey `HashMap.member` pips - && authorKey `HashSet.member` aus + +checkACL :: forall e s . (Encryption e ~ s, ForRefChans e) + => RefChanHeadBlock e + -> PubKey 'Sign s + -> PubKey 'Sign s + -> Bool + +checkACL theHead peerKey authorKey = match + where + pips = view refChanHeadPeers theHead + aus = view refChanHeadAuthors theHead + match = peerKey `HashMap.member` pips + && authorKey `HashSet.member` aus -- TODO: refchan-poll-proto -- Запрашиваем refchan у всех. diff --git a/hbs2-peer/app/RefChan.hs b/hbs2-peer/app/RefChan.hs index e892a179..fd7d1552 100644 --- a/hbs2-peer/app/RefChan.hs +++ b/hbs2-peer/app/RefChan.hs @@ -15,7 +15,7 @@ import HBS2.Prelude.Plated import HBS2.Actors.Peer import HBS2.Base58 -import HBS2.Hash +import HBS2.Merkle import HBS2.Clock import HBS2.Events import HBS2.Net.Proto.Peer @@ -367,10 +367,22 @@ refChanWorker env brains = do pure () +data MergeEnv e = + MergeEnv + { mergeSto :: AnyStorage + , mergeHeads :: TVar (HashMap HashRef (RefChanHeadBlock e) ) + } + + +-- FIXME: possible-performance-issues +-- Выглядит довольно медленно. Вероятно, +-- можно быстрее. +-- В частности, кэшировать уже обработанные логи logMergeProcess :: forall e s m . ( MonadUnliftIO m , MyPeer e , ForRefChans e , HasStorage m + , Signatures s , s ~ Encryption e ) => RefChanWorkerEnv e @@ -381,6 +393,8 @@ logMergeProcess _ q = do sto <- getStorage + menv <- MergeEnv sto <$> newTVarIO mempty + forever do -- FIXME: fix-hardcoded-timeout pause @'Seconds 1 @@ -389,13 +403,33 @@ logMergeProcess _ q = do let byChan = HashMap.fromListWith (<>) [ (x,[y]) | (x,y) <- logs ] & HashMap.toList -- FIXME: in-parallel - mapM_ (logMergeChan sto) byChan + mapM_ (logMergeChan menv sto) byChan where - logMergeChan sto (chan, logs) = runMaybeT do + getHead :: MergeEnv e + -> HashRef + -> m (Maybe (RefChanHeadBlock e)) - h <- MaybeT $ liftIO $ getRef sto (RefChanLogKey @s chan) + getHead e h = do + + let sto = mergeSto e + hd <- readTVarIO (mergeHeads e) <&> HashMap.lookup h + + case hd of + Just x -> pure (Just x) + Nothing -> runMaybeT do + hdblob <- MaybeT $ readBlobFromTree ( liftIO . getBlock sto ) h + (_, headblk) <- MaybeT $ pure $ unboxSignedBox @(RefChanHeadBlock e) @e hdblob + atomically $ modifyTVar (mergeHeads e) (HashMap.insert h headblk) + pure headblk + + + logMergeChan menv sto (chan, logs) = void $ runMaybeT do + + let chanKey = RefChanLogKey @s chan + + h <- MaybeT $ liftIO $ getRef sto chanKey current <- lift $ readLog sto (HashRef h) <&> HashSet.fromList @@ -403,8 +437,61 @@ logMergeProcess _ q = do trans <- filter (not . flip HashSet.member current) . mconcat <$> mapM (lift . readLog sto) logs - forM_ trans $ \t -> do + -- итак, тут приехал весь лог, который есть у пира + -- логично искать подтверждения только в нём. если + -- пир принял транзы без достаточного количества + -- подтверждений, то он сам лошара. + -- каждую транзу рассматриваем один раз, если + -- она смержена. + -- если она не смержена --- может быть, надо её + -- в какой-то reject список заносить - debug $ "ABOUT TO MERGE TRANS" <+> pretty t + -- распаковать, отсортировать по головам сначала + -- потом бежим по головам, достаём головы + -- проверяем acl-ы на соответствие историческим головам + -- потом связываем каждый accept с соответствующим propose + -- потом считаем количество accept для каждого propose + -- потом, если всё ок -- пишем accept-ы и propose-ы у которых + -- больше quorum подтверждений для актуальной головы + r <- forM trans $ \href -> runMaybeT do + blk <- MaybeT $ liftIO $ getBlock sto (fromHashRef href) + tran <- MaybeT $ pure $ deserialiseOrFail @(RefChanUpdate e) blk + & either (const Nothing) Just + case tran of + Propose _ box -> do + (pk, ProposeTran headRef box) <- MaybeT $ pure $ unboxSignedBox0 box + (ak, _) <- MaybeT $ pure $ unboxSignedBox0 box + hd <- MaybeT $ lift $ getHead menv headRef + let quo = view refChanHeadQuorum hd & fromIntegral + guard $ checkACL hd pk ak + pure [(href, (quo,mempty))] + + Accept _ box -> do + (pk, AcceptTran headRef hashRef) <- MaybeT $ pure $ unboxSignedBox0 box + hd <- MaybeT $ lift $ getHead menv headRef + let quo = view refChanHeadQuorum hd & fromIntegral + guard $ HashMap.member pk (view refChanHeadPeers hd) + pure [(href, (quo,[hashRef]))] + + let merge1 (q1, hs1) (q2, hs2) = (max q1 q2, List.nub (hs1 <> hs2) ) + + let permitted = HashMap.fromListWith merge1 (mconcat (catMaybes r)) + & HashMap.toList + + new <- S.toList_ do + forM_ permitted $ \(prop, (qx, accs)) -> do + when (length accs >= qx) do + S.yield prop + S.each accs + + let merged = HashSet.union current (HashSet.fromList new) & HashSet.toList + + let pt = toPTree (MaxSize 256) (MaxNum 256) merged + + liftIO do + nref <- makeMerkle 0 pt $ \(_,_,bss) -> do + void $ putBlock sto bss + + updateRef sto chanKey nref