wip, log merging, debug-26

This commit is contained in:
Dmitry Zuikov 2023-07-20 08:51:25 +03:00
parent 7d55e9984e
commit 8ebcb91946
1 changed files with 87 additions and 68 deletions

View File

@ -88,7 +88,7 @@ refChanWorkerEnv _ de = liftIO $ RefChanWorkerEnv @e de <$> newTQueueIO
<*> newTQueueIO <*> newTQueueIO
refChanOnHeadFn :: MonadIO m => RefChanWorkerEnv e -> RefChanId e -> RefChanHeadBlockTran e -> m () refChanOnHeadFn :: forall e m . (ForRefChans e, MonadIO m) => RefChanWorkerEnv e -> RefChanId e -> RefChanHeadBlockTran e -> m ()
refChanOnHeadFn env chan tran = do refChanOnHeadFn env chan tran = do
atomically $ writeTQueue (view refChanWorkerEnvHeadQ env) (chan, tran) atomically $ writeTQueue (view refChanWorkerEnvHeadQ env) (chan, tran)
@ -439,7 +439,7 @@ logMergeProcess env q = do
here <- liftIO $ hasBlock sto (fromHashRef h) <&> isJust here <- liftIO $ hasBlock sto (fromHashRef h) <&> isJust
unless here do unless here do
debug $ "head is missed:" <+> pretty h warn $ "refchan. head is missed:" <+> pretty h
pure () pure ()
case hd of case hd of
@ -450,95 +450,114 @@ logMergeProcess env q = do
atomically $ modifyTVar (mergeHeads e) (HashMap.insert h headblk) atomically $ modifyTVar (mergeHeads e) (HashMap.insert h headblk)
pure headblk pure headblk
logMergeChan menv sto (chan, logs) = void $ runMaybeT do downloadMissedHead :: AnyStorage -> RefChanId e -> HashRef -> m ()
downloadMissedHead sto chan headRef = do
penv <- ask
here <- liftIO $ hasBlock sto (fromHashRef headRef) <&> isJust
unless here do
refChanAddDownload env chan headRef (withPeerM penv . refChanOnHeadFn env chan . RefChanHeadBlockTran)
let chanKey = RefChanLogKey @s chan logMergeChan menv sto (chan, logs) = do
h <- MaybeT $ liftIO $ getRef sto chanKey penv <- ask
current <- lift $ readLog sto (HashRef h) <&> HashSet.fromList void $ runMaybeT do
-- trans <- filter (not . flip HashSet.member current) . mconcat <$> mapM (lift . readLog sto) logs let chanKey = RefChanLogKey @s chan
trans <- mconcat <$> mapM (lift . readLog sto) logs
guard (not $ List.null trans) h <- MaybeT $ liftIO $ getRef sto chanKey
-- итак, тут приехал весь лог, который есть у пира current <- lift $ readLog sto (HashRef h) <&> HashSet.fromList
-- логично искать подтверждения только в нём. если
-- пир принял транзы без достаточного количества
-- подтверждений, то он сам лошара.
-- каждую транзу рассматриваем один раз, если
-- она смержена.
-- если она не смержена --- может быть, надо её
-- в какой-то reject список заносить
-- распаковать, отсортировать по головам сначала -- trans <- filter (not . flip HashSet.member current) . mconcat <$> mapM (lift . readLog sto) logs
-- потом бежим по головам, достаём головы trans <- mconcat <$> mapM (lift . readLog sto) logs
-- проверяем acl-ы на соответствие историческим головам
-- потом связываем каждый accept с соответствующим propose
-- потом считаем количество accept для каждого propose
-- потом, если всё ок -- пишем accept-ы и propose-ы у которых
-- больше quorum подтверждений для актуальной головы
let mergeSet = (HashSet.fromList trans <> current) & HashSet.toList guard (not $ List.null trans)
-- если какие-то транзакции отсутствуют - пытаемся их скачать -- итак, тут приехал весь лог, который есть у пира
-- и надеемся на лучшее (лог сойдется в следующий раз) -- логично искать подтверждения только в нём. если
forM_ mergeSet $ \href -> do -- пир принял транзы без достаточного количества
here <- liftIO $ hasBlock sto (fromHashRef href) <&> isJust -- подтверждений, то он сам лошара.
unless here do -- каждую транзу рассматриваем один раз, если
lift $ refChanAddDownload env chan href dontHandle -- она смержена.
-- если она не смержена --- может быть, надо её
-- в какой-то reject список заносить
r <- forM mergeSet $ \href -> runMaybeT do -- распаковать, отсортировать по головам сначала
-- потом бежим по головам, достаём головы
-- проверяем acl-ы на соответствие историческим головам
-- потом связываем каждый accept с соответствующим propose
-- потом считаем количество accept для каждого propose
-- потом, если всё ок -- пишем accept-ы и propose-ы у которых
-- больше quorum подтверждений для актуальной головы
blk <- MaybeT $ liftIO $ getBlock sto (fromHashRef href) let mergeSet = (HashSet.fromList trans <> current) & HashSet.toList
tran <- MaybeT $ pure $ deserialiseOrFail @(RefChanUpdate e) blk -- если какие-то транзакции отсутствуют - пытаемся их скачать
& either (const Nothing) Just -- и надеемся на лучшее (лог сойдется в следующий раз)
forM_ mergeSet $ \href -> do
mblk <- liftIO $ getBlock sto (fromHashRef href)
maybe1 mblk (lift $ refChanAddDownload env chan href dontHandle) dontHandle
case tran of r <- forM mergeSet $ \href -> runMaybeT do
Propose _ box -> do
(pk, ProposeTran headRef box) <- MaybeT $ pure $ unboxSignedBox0 box
(ak, _) <- MaybeT $ pure $ unboxSignedBox0 box
hd <- MaybeT $ lift $ getHead menv headRef
let quo = view refChanHeadQuorum hd & fromIntegral
guard $ checkACL hd pk ak
pure [(href, (quo,mempty))]
Accept _ box -> do blk <- MaybeT $ liftIO $ getBlock sto (fromHashRef href)
(pk, AcceptTran headRef hashRef) <- MaybeT $ pure $ unboxSignedBox0 box
hd <- MaybeT $ lift $ getHead menv headRef
let quo = view refChanHeadQuorum hd & fromIntegral
guard $ HashMap.member pk (view refChanHeadPeers hd)
pure [(hashRef, (quo,[href]))]
let merge1 (q1, hs1) (q2, hs2) = (max q1 q2, List.nub (hs1 <> hs2) ) tran <- MaybeT $ pure $ deserialiseOrFail @(RefChanUpdate e) blk
& either (const Nothing) Just
let permitted = HashMap.fromListWith merge1 (mconcat (catMaybes r)) case tran of
& HashMap.toList Propose _ box -> do
(pk, ProposeTran headRef box) <- MaybeT $ pure $ unboxSignedBox0 box
(ak, _) <- MaybeT $ pure $ unboxSignedBox0 box
new <- S.toList_ do lift $ lift $ downloadMissedHead sto chan headRef
forM_ permitted $ \(prop, (qx, accs)) -> do
when (length accs >= qx) do
S.yield prop
S.each accs
debug $ "new trans to merge" <+> pretty (AsBase58 chan) <+> pretty (length new) hd <- MaybeT $ lift $ getHead menv headRef
forM_ new $ \tnew -> do let quo = view refChanHeadQuorum hd & fromIntegral
debug $ "TRANS TO MERGE" <+> pretty tnew guard $ checkACL hd pk ak
pure [(href, (quo,mempty))]
let merged = HashSet.fromList new & HashSet.toList Accept _ box -> do
(pk, AcceptTran headRef hashRef) <- MaybeT $ pure $ unboxSignedBox0 box
let pt = toPTree (MaxSize 256) (MaxNum 256) merged lift $ lift $ downloadMissedHead sto chan headRef
unless (List.null merged) do hd <- MaybeT $ lift $ getHead menv headRef
liftIO do let quo = view refChanHeadQuorum hd & fromIntegral
nref <- makeMerkle 0 pt $ \(_,_,bss) -> do guard $ HashMap.member pk (view refChanHeadPeers hd)
void $ putBlock sto bss pure [(hashRef, (quo,[href]))]
let merge1 (q1, hs1) (q2, hs2) = (max q1 q2, List.nub (hs1 <> hs2) )
let permitted = HashMap.fromListWith merge1 (mconcat (catMaybes r))
& HashMap.toList
new <- S.toList_ do
forM_ permitted $ \(prop, (qx, accs)) -> do
when (length accs >= qx) do
S.yield prop
S.each accs
debug $ "new trans to merge" <+> pretty (AsBase58 chan) <+> pretty (length new)
forM_ new $ \tnew -> do
debug $ "TRANS TO MERGE" <+> pretty tnew
let merged = HashSet.fromList new & HashSet.toList
let pt = toPTree (MaxSize 256) (MaxNum 256) merged
unless (List.null merged) do
liftIO do
nref <- makeMerkle 0 pt $ \(_,_,bss) -> do
void $ putBlock sto bss
debug $ "NEW REFCHAN" <+> pretty chanKey <+> pretty nref
updateRef sto chanKey nref
debug $ "NEW REFCHAN" <+> pretty chanKey <+> pretty nref
updateRef sto chanKey nref