diff --git a/hbs2-peer/app/RefChan.hs b/hbs2-peer/app/RefChan.hs index 87a55628..25d57b88 100644 --- a/hbs2-peer/app/RefChan.hs +++ b/hbs2-peer/app/RefChan.hs @@ -88,7 +88,7 @@ refChanWorkerEnv _ de = liftIO $ RefChanWorkerEnv @e de <$> newTQueueIO <*> newTQueueIO -refChanOnHeadFn :: MonadIO m => RefChanWorkerEnv e -> RefChanId e -> RefChanHeadBlockTran e -> m () +refChanOnHeadFn :: forall e m . (ForRefChans e, MonadIO m) => RefChanWorkerEnv e -> RefChanId e -> RefChanHeadBlockTran e -> m () refChanOnHeadFn env chan tran = do atomically $ writeTQueue (view refChanWorkerEnvHeadQ env) (chan, tran) @@ -439,7 +439,7 @@ logMergeProcess env q = do here <- liftIO $ hasBlock sto (fromHashRef h) <&> isJust unless here do - debug $ "head is missed:" <+> pretty h + warn $ "refchan. head is missed:" <+> pretty h pure () case hd of @@ -450,95 +450,114 @@ logMergeProcess env q = do atomically $ modifyTVar (mergeHeads e) (HashMap.insert h headblk) pure headblk - logMergeChan menv sto (chan, logs) = void $ runMaybeT do + downloadMissedHead :: AnyStorage -> RefChanId e -> HashRef -> m () + downloadMissedHead sto chan headRef = do + penv <- ask + here <- liftIO $ hasBlock sto (fromHashRef headRef) <&> isJust + unless here do + refChanAddDownload env chan headRef (withPeerM penv . refChanOnHeadFn env chan . RefChanHeadBlockTran) - let chanKey = RefChanLogKey @s chan + logMergeChan menv sto (chan, logs) = do - h <- MaybeT $ liftIO $ getRef sto chanKey + penv <- ask - current <- lift $ readLog sto (HashRef h) <&> HashSet.fromList + void $ runMaybeT do - -- trans <- filter (not . flip HashSet.member current) . mconcat <$> mapM (lift . readLog sto) logs - trans <- mconcat <$> mapM (lift . readLog sto) logs + let chanKey = RefChanLogKey @s chan - guard (not $ List.null trans) + h <- MaybeT $ liftIO $ getRef sto chanKey - -- итак, тут приехал весь лог, который есть у пира - -- логично искать подтверждения только в нём. если - -- пир принял транзы без достаточного количества - -- подтверждений, то он сам лошара. - -- каждую транзу рассматриваем один раз, если - -- она смержена. - -- если она не смержена --- может быть, надо её - -- в какой-то reject список заносить + current <- lift $ readLog sto (HashRef h) <&> HashSet.fromList - -- распаковать, отсортировать по головам сначала - -- потом бежим по головам, достаём головы - -- проверяем acl-ы на соответствие историческим головам - -- потом связываем каждый accept с соответствующим propose - -- потом считаем количество accept для каждого propose - -- потом, если всё ок -- пишем accept-ы и propose-ы у которых - -- больше quorum подтверждений для актуальной головы + -- trans <- filter (not . flip HashSet.member current) . mconcat <$> mapM (lift . readLog sto) logs + trans <- mconcat <$> mapM (lift . readLog sto) logs - let mergeSet = (HashSet.fromList trans <> current) & HashSet.toList + guard (not $ List.null trans) - -- если какие-то транзакции отсутствуют - пытаемся их скачать - -- и надеемся на лучшее (лог сойдется в следующий раз) - forM_ mergeSet $ \href -> do - here <- liftIO $ hasBlock sto (fromHashRef href) <&> isJust - unless here do - lift $ refChanAddDownload env chan href dontHandle + -- итак, тут приехал весь лог, который есть у пира + -- логично искать подтверждения только в нём. если + -- пир принял транзы без достаточного количества + -- подтверждений, то он сам лошара. + -- каждую транзу рассматриваем один раз, если + -- она смержена. + -- если она не смержена --- может быть, надо её + -- в какой-то reject список заносить - r <- forM mergeSet $ \href -> runMaybeT do + -- распаковать, отсортировать по головам сначала + -- потом бежим по головам, достаём головы + -- проверяем acl-ы на соответствие историческим головам + -- потом связываем каждый accept с соответствующим propose + -- потом считаем количество accept для каждого propose + -- потом, если всё ок -- пишем accept-ы и propose-ы у которых + -- больше quorum подтверждений для актуальной головы - blk <- MaybeT $ liftIO $ getBlock sto (fromHashRef href) + let mergeSet = (HashSet.fromList trans <> current) & HashSet.toList - tran <- MaybeT $ pure $ deserialiseOrFail @(RefChanUpdate e) blk - & either (const Nothing) Just + -- если какие-то транзакции отсутствуют - пытаемся их скачать + -- и надеемся на лучшее (лог сойдется в следующий раз) + forM_ mergeSet $ \href -> do + mblk <- liftIO $ getBlock sto (fromHashRef href) + maybe1 mblk (lift $ refChanAddDownload env chan href dontHandle) dontHandle - case tran of - Propose _ box -> do - (pk, ProposeTran headRef box) <- MaybeT $ pure $ unboxSignedBox0 box - (ak, _) <- MaybeT $ pure $ unboxSignedBox0 box - hd <- MaybeT $ lift $ getHead menv headRef - let quo = view refChanHeadQuorum hd & fromIntegral - guard $ checkACL hd pk ak - pure [(href, (quo,mempty))] + r <- forM mergeSet $ \href -> runMaybeT do - Accept _ box -> do - (pk, AcceptTran headRef hashRef) <- MaybeT $ pure $ unboxSignedBox0 box - hd <- MaybeT $ lift $ getHead menv headRef - let quo = view refChanHeadQuorum hd & fromIntegral - guard $ HashMap.member pk (view refChanHeadPeers hd) - pure [(hashRef, (quo,[href]))] + blk <- MaybeT $ liftIO $ getBlock sto (fromHashRef href) - let merge1 (q1, hs1) (q2, hs2) = (max q1 q2, List.nub (hs1 <> hs2) ) + tran <- MaybeT $ pure $ deserialiseOrFail @(RefChanUpdate e) blk + & either (const Nothing) Just - let permitted = HashMap.fromListWith merge1 (mconcat (catMaybes r)) - & HashMap.toList + case tran of + Propose _ box -> do + (pk, ProposeTran headRef box) <- MaybeT $ pure $ unboxSignedBox0 box + (ak, _) <- MaybeT $ pure $ unboxSignedBox0 box - new <- S.toList_ do - forM_ permitted $ \(prop, (qx, accs)) -> do - when (length accs >= qx) do - S.yield prop - S.each accs + lift $ lift $ downloadMissedHead sto chan headRef - debug $ "new trans to merge" <+> pretty (AsBase58 chan) <+> pretty (length new) + hd <- MaybeT $ lift $ getHead menv headRef - forM_ new $ \tnew -> do - debug $ "TRANS TO MERGE" <+> pretty tnew + let quo = view refChanHeadQuorum hd & fromIntegral + guard $ checkACL hd pk ak + pure [(href, (quo,mempty))] - let merged = HashSet.fromList new & HashSet.toList + Accept _ box -> do + (pk, AcceptTran headRef hashRef) <- MaybeT $ pure $ unboxSignedBox0 box - let pt = toPTree (MaxSize 256) (MaxNum 256) merged + lift $ lift $ downloadMissedHead sto chan headRef - unless (List.null merged) do - liftIO do - nref <- makeMerkle 0 pt $ \(_,_,bss) -> do - void $ putBlock sto bss + hd <- MaybeT $ lift $ getHead menv headRef + let quo = view refChanHeadQuorum hd & fromIntegral + guard $ HashMap.member pk (view refChanHeadPeers hd) + pure [(hashRef, (quo,[href]))] + + let merge1 (q1, hs1) (q2, hs2) = (max q1 q2, List.nub (hs1 <> hs2) ) + + let permitted = HashMap.fromListWith merge1 (mconcat (catMaybes r)) + & HashMap.toList + + new <- S.toList_ do + forM_ permitted $ \(prop, (qx, accs)) -> do + when (length accs >= qx) do + S.yield prop + S.each accs + + debug $ "new trans to merge" <+> pretty (AsBase58 chan) <+> pretty (length new) + + forM_ new $ \tnew -> do + debug $ "TRANS TO MERGE" <+> pretty tnew + + let merged = HashSet.fromList new & HashSet.toList + + let pt = toPTree (MaxSize 256) (MaxNum 256) merged + + unless (List.null merged) do + liftIO do + nref <- makeMerkle 0 pt $ \(_,_,bss) -> do + void $ putBlock sto bss + + debug $ "NEW REFCHAN" <+> pretty chanKey <+> pretty nref + + updateRef sto chanKey nref - debug $ "NEW REFCHAN" <+> pretty chanKey <+> pretty nref - updateRef sto chanKey nref