This commit is contained in:
Dmitry Zuikov 2023-07-18 16:59:36 +03:00
parent 6dc3fc91f5
commit 204de9afc6
2 changed files with 68 additions and 7 deletions

View File

@ -102,6 +102,29 @@ instance Pretty (AsBase58 (PubKey 'Sign s )) => Pretty (RefChanHeadKey s) where
pretty (RefChanHeadKey k) = pretty (AsBase58 k)
newtype RefChanLogKey s = RefChanLogKey (PubKey 'Sign s)
deriving stock instance IsRefPubKey s => Eq (RefChanLogKey s)
instance IsRefPubKey s => Hashable (RefChanLogKey s) where
hashWithSalt s k = hashWithSalt s (hashObject @HbSync k)
instance IsRefPubKey s => Hashed HbSync (RefChanLogKey s) where
hashObject (RefChanLogKey pk) = hashObject ("refchanlog|" <> serialise pk)
instance IsRefPubKey s => FromStringMaybe (RefChanLogKey s) where
fromStringMay s = RefChanLogKey <$> fromStringMay s
instance IsRefPubKey s => IsString (RefChanLogKey s) where
fromString s = fromMaybe (error "bad public key base58") (fromStringMay s)
instance Pretty (AsBase58 (PubKey 'Sign s )) => Pretty (AsBase58 (RefChanLogKey s)) where
pretty (AsBase58 (RefChanLogKey k)) = pretty (AsBase58 k)
instance Pretty (AsBase58 (PubKey 'Sign s )) => Pretty (RefChanLogKey s) where
pretty (RefChanLogKey k) = pretty (AsBase58 k)
-- блок головы может быть довольно большой.
-- поэтому посылаем его, как merkle tree
newtype RefChanHeadBlockTran e =
@ -431,7 +454,7 @@ refChanUpdateProto self pc adapter msg = do
here <- liftIO (hasBlock sto (fromHashRef hashRef)) <&> isJust
unless here do
debug $ "NO PROPOSE TRANSACTION SAVED YET!" <+> pretty hashRef
warn $ "No propose transaction saved yet!" <+> pretty hashRef
tranBs <- MaybeT $ liftIO $ getBlock sto (fromHashRef hashRef)

View File

@ -25,6 +25,7 @@ import HBS2.Net.Auth.Credentials
import HBS2.Net.Proto
import HBS2.Net.Proto.RefChan
import HBS2.Net.Proto.Definition()
import HBS2.Merkle
import HBS2.Storage
import HBS2.System.Logger.Simple
@ -50,6 +51,7 @@ import Data.Maybe
import Lens.Micro.Platform
import Data.Heap qualified as Heap
import Data.Heap (Heap,Entry(..))
import Codec.Serialise
import UnliftIO
import Streaming.Prelude qualified as S
@ -195,14 +197,50 @@ refChanWorker env brains = do
atomically $ modifyTVar rounds (HashSet.delete x)
debug $ "CLEANUP ROUND" <+> pretty x
refChanWriter = forever do
refChanWriter = do
sto <- getStorage
forever do
pause @'Seconds 1
_ <- atomically $ peekTQueue (view refChanWorkerEnvWriteQ env)
trans <- liftIO $ atomically $ flushTQueue (view refChanWorkerEnvWriteQ env)
htrans <- liftIO $ atomically $ flushTQueue (view refChanWorkerEnvWriteQ env)
forM_ trans $ \t -> do
debug $ "ABOUT TO WRITE TRANS" <+> pretty t
trans <- forM htrans $ \h -> runMaybeT do
blk <- MaybeT $ liftIO (getBlock sto (fromHashRef h))
upd <- MaybeT $ pure $ deserialiseOrFail @(RefChanUpdate e) blk & either (const Nothing) Just
case upd of
Propose chan _ -> pure (RefChanLogKey chan, h)
Accept chan _ -> pure (RefChanLogKey chan, h)
let byChan = HashMap.fromListWith (<>) [ (x, [y]) | (x,y) <- catMaybes trans ]
-- FIXME: process-in-parallel
forM_ (HashMap.toList byChan) $ \(c,new) -> do
mbLog <- liftIO $ getRef sto (RefChanLogKey @(Encryption e) c)
hashes <- maybe1 mbLog (pure mempty) $ \hlog -> do
S.toList_ $ do
walkMerkle hlog (liftIO . getBlock sto) $ \hr -> do
case hr of
Left{} -> pure ()
Right (hrr :: [HashRef]) -> S.each hrr
-- FIXME: might-be-problems-on-large-logs
let hashesNew = HashSet.fromList (hashes <> new) & HashSet.toList
-- -- FIXME: remove-chunk-num-hardcode
let pt = toPTree (MaxSize 256) (MaxNum 256) hashesNew
newRoot <- liftIO do
nref <- makeMerkle 0 pt $ \(_,_,bss) -> do
void $ putBlock sto bss
-- updateRef sto c nref
pure ()
pure ()
refChanHeadPoll = do