From 204de9afc6a08d87b5ba435745d8e0be4ba2530a Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Tue, 18 Jul 2023 16:59:36 +0300 Subject: [PATCH] wip --- hbs2-core/lib/HBS2/Net/Proto/RefChan.hs | 25 ++++++++++++- hbs2-peer/app/RefChan.hs | 50 ++++++++++++++++++++++--- 2 files changed, 68 insertions(+), 7 deletions(-) diff --git a/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs b/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs index 924b6c28..06bbfdc6 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs @@ -102,6 +102,29 @@ instance Pretty (AsBase58 (PubKey 'Sign s )) => Pretty (RefChanHeadKey s) where pretty (RefChanHeadKey k) = pretty (AsBase58 k) +newtype RefChanLogKey s = RefChanLogKey (PubKey 'Sign s) + +deriving stock instance IsRefPubKey s => Eq (RefChanLogKey s) + +instance IsRefPubKey s => Hashable (RefChanLogKey s) where + hashWithSalt s k = hashWithSalt s (hashObject @HbSync k) + +instance IsRefPubKey s => Hashed HbSync (RefChanLogKey s) where + hashObject (RefChanLogKey pk) = hashObject ("refchanlog|" <> serialise pk) + +instance IsRefPubKey s => FromStringMaybe (RefChanLogKey s) where + fromStringMay s = RefChanLogKey <$> fromStringMay s + +instance IsRefPubKey s => IsString (RefChanLogKey s) where + fromString s = fromMaybe (error "bad public key base58") (fromStringMay s) + +instance Pretty (AsBase58 (PubKey 'Sign s )) => Pretty (AsBase58 (RefChanLogKey s)) where + pretty (AsBase58 (RefChanLogKey k)) = pretty (AsBase58 k) + +instance Pretty (AsBase58 (PubKey 'Sign s )) => Pretty (RefChanLogKey s) where + pretty (RefChanLogKey k) = pretty (AsBase58 k) + + -- блок головы может быть довольно большой. -- поэтому посылаем его, как merkle tree newtype RefChanHeadBlockTran e = @@ -431,7 +454,7 @@ refChanUpdateProto self pc adapter msg = do here <- liftIO (hasBlock sto (fromHashRef hashRef)) <&> isJust unless here do - debug $ "NO PROPOSE TRANSACTION SAVED YET!" <+> pretty hashRef + warn $ "No propose transaction saved yet!" <+> pretty hashRef tranBs <- MaybeT $ liftIO $ getBlock sto (fromHashRef hashRef) diff --git a/hbs2-peer/app/RefChan.hs b/hbs2-peer/app/RefChan.hs index 5609ca0c..6e624b3d 100644 --- a/hbs2-peer/app/RefChan.hs +++ b/hbs2-peer/app/RefChan.hs @@ -25,6 +25,7 @@ import HBS2.Net.Auth.Credentials import HBS2.Net.Proto import HBS2.Net.Proto.RefChan import HBS2.Net.Proto.Definition() +import HBS2.Merkle import HBS2.Storage import HBS2.System.Logger.Simple @@ -50,6 +51,7 @@ import Data.Maybe import Lens.Micro.Platform import Data.Heap qualified as Heap import Data.Heap (Heap,Entry(..)) +import Codec.Serialise import UnliftIO import Streaming.Prelude qualified as S @@ -195,14 +197,50 @@ refChanWorker env brains = do atomically $ modifyTVar rounds (HashSet.delete x) debug $ "CLEANUP ROUND" <+> pretty x - refChanWriter = forever do - pause @'Seconds 1 - _ <- atomically $ peekTQueue (view refChanWorkerEnvWriteQ env) + refChanWriter = do + sto <- getStorage + forever do + pause @'Seconds 1 + _ <- atomically $ peekTQueue (view refChanWorkerEnvWriteQ env) - trans <- liftIO $ atomically $ flushTQueue (view refChanWorkerEnvWriteQ env) + htrans <- liftIO $ atomically $ flushTQueue (view refChanWorkerEnvWriteQ env) - forM_ trans $ \t -> do - debug $ "ABOUT TO WRITE TRANS" <+> pretty t + trans <- forM htrans $ \h -> runMaybeT do + blk <- MaybeT $ liftIO (getBlock sto (fromHashRef h)) + upd <- MaybeT $ pure $ deserialiseOrFail @(RefChanUpdate e) blk & either (const Nothing) Just + + case upd of + Propose chan _ -> pure (RefChanLogKey chan, h) + Accept chan _ -> pure (RefChanLogKey chan, h) + + let byChan = HashMap.fromListWith (<>) [ (x, [y]) | (x,y) <- catMaybes trans ] + + -- FIXME: process-in-parallel + forM_ (HashMap.toList byChan) $ \(c,new) -> do + mbLog <- liftIO $ getRef sto (RefChanLogKey @(Encryption e) c) + + hashes <- maybe1 mbLog (pure mempty) $ \hlog -> do + S.toList_ $ do + walkMerkle hlog (liftIO . getBlock sto) $ \hr -> do + case hr of + Left{} -> pure () + Right (hrr :: [HashRef]) -> S.each hrr + + -- FIXME: might-be-problems-on-large-logs + let hashesNew = HashSet.fromList (hashes <> new) & HashSet.toList + + -- -- FIXME: remove-chunk-num-hardcode + let pt = toPTree (MaxSize 256) (MaxNum 256) hashesNew + + newRoot <- liftIO do + nref <- makeMerkle 0 pt $ \(_,_,bss) -> do + void $ putBlock sto bss + + -- updateRef sto c nref + pure () + + + pure () refChanHeadPoll = do