{-# Language AllowAmbiguousTypes #-} {-# Language TemplateHaskell #-} module RefChan ( RefChanWorkerEnv(..) , refChanWorkerEnvHeadQ , refChanWorkerEnvDownload , refChanOnHeadFn , refChanWriteTranFn , refChanValidateTranFn , refChanNotifyRelyFn , refChanWorker , runRefChanRelyWorker , refChanWorkerEnv , refChanNotifyOnUpdated ) where import HBS2.Prelude.Plated import HBS2.Actors.Peer import HBS2.Base58 import HBS2.Hash import HBS2.Data.Detect import HBS2.Defaults import HBS2.Data.Types.Refs import HBS2.Data.Types.SignedBox import HBS2.Events import HBS2.Merkle import HBS2.Net.Auth.Credentials import HBS2.Net.Messaging.Unix import HBS2.Peer.Proto.Peer import HBS2.Peer.Proto.RefChan import HBS2.Peer.Proto.RefChan.Adapter import HBS2.Net.Proto.Notify (SomeNotifySource(..)) import HBS2.Peer.Notify import HBS2.Net.Proto.Sessions import HBS2.Storage import PeerTypes hiding (downloads) import PeerConfig import BlockDownload() import Brains import Control.Monad.Trans.Cont import Codec.Serialise import Control.Concurrent.STM (flushTQueue) import Control.Exception () import Control.Monad.Except () import Control.Monad.Reader import Control.Monad.Trans.Maybe import Data.Coerce import Data.Cache (Cache) import Data.Cache qualified as Cache import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict qualified as HashMap import Data.HashSet qualified as HashSet import Data.Heap () import Data.List qualified as List import Data.Maybe import Data.ByteString.Lazy qualified as LBS import Data.Text qualified as Text import Lens.Micro.Platform import UnliftIO import Streaming.Prelude qualified as S import Streaming() {- HLINT ignore "Use newtype instead of data" -} data DataNotReady = DataNotReady deriving (Show) instance Exception DataNotReady type OnDownloadComplete = HashRef -> IO () data RefChanValidator = RefChanValidator { rcvInbox :: TQueue (RefChanValidate UNIX, MVar (RefChanValidate UNIX)) , rcvAsync :: Async () } data RefChanNotifier = RefChanNotifier { rcnPeer :: Peer UNIX , rcnInbox :: TQueue (RefChanNotify UNIX) , rcnAsync :: Async () } data RefChanWorkerEnv e = RefChanWorkerEnv { _refChanWorkerConf :: PeerConfig , _refChanPeerEnv :: PeerEnv e , _refChanWorkerEnvDEnv :: DownloadEnv e , _refChanNotifySource :: SomeNotifySource (RefChanEvents e) , _refChanWorkerEnvHeadQ :: TQueue (RefChanId e, RefChanHeadBlockTran e) , _refChanWorkerEnvDownload :: TVar (HashMap HashRef (RefChanId e, (TimeSpec, OnDownloadComplete))) , _refChanWorkerEnvNotify :: TVar (HashMap (RefChanId e) ()) , _refChanWorkerEnvWriteQ :: TQueue HashRef , _refChanWorkerValidators :: TVar (HashMap (RefChanId e) RefChanValidator) -- FIXME: peer-addr-to-have-multiple-actors-on-single-box -- нужно ключом держать Peer e (SockAddr) -- что бы можно было завести несколько акторов на одном -- боксе в целях отладки. , _refChanWorkerNotifiers :: TVar (HashMap (RefChanId e) [RefChanNotifier]) , _refChanWorkerNotifiersInbox :: TQueue (RefChanNotify e) -- ^ to rely messages from clients to gossip , _refChanWorkerNotifiersDone :: Cache (Hash HbSync) () , _refChanWorkerLocalRelyDone :: Cache (Peer UNIX, Hash HbSync) () } makeLenses 'RefChanWorkerEnv refChanWorkerEnv :: forall m e . (MonadIO m, ForRefChans e) => PeerConfig -> PeerEnv e -> DownloadEnv e -> SomeNotifySource (RefChanEvents e) -> m (RefChanWorkerEnv e) refChanWorkerEnv conf pe de nsource = liftIO $ RefChanWorkerEnv @e conf pe de nsource <$> newTQueueIO <*> newTVarIO mempty <*> newTVarIO mempty <*> newTQueueIO <*> newTVarIO mempty <*> newTVarIO mempty <*> newTQueueIO <*> Cache.newCache (Just defRequestLimit) <*> Cache.newCache (Just defRequestLimit) refChanOnHeadFn :: forall e m . (ForRefChans e, MonadIO m) => RefChanWorkerEnv e -> RefChanId e -> RefChanHeadBlockTran e -> m () refChanOnHeadFn env chan tran = do atomically $ writeTQueue (view refChanWorkerEnvHeadQ env) (chan, tran) refChanWriteTranFn :: MonadIO m => RefChanWorkerEnv e -> HashRef -> m () refChanWriteTranFn env href = do atomically $ writeTQueue (view refChanWorkerEnvWriteQ env) href refChanValidateTranFn :: forall e m . ( MonadUnliftIO m , ForRefChans e, e ~ L4Proto , HasNonces (RefChanValidate UNIX) m ) => RefChanWorkerEnv e -> RefChanId e -> HashRef -> m Bool refChanValidateTranFn env chan htran = do mbv <- readTVarIO (view refChanWorkerValidators env) <&> HashMap.lookup chan -- отправить запрос в соответствующий... что? -- ждать ответа debug $ "VALIDATE TRAN" <+> pretty (AsBase58 chan) <+> pretty htran r <- maybe1 mbv (pure True) $ \(RefChanValidator q _) -> do answ <- newEmptyMVar nonce <- newNonce @(RefChanValidate UNIX) atomically $ writeTQueue q (RefChanValidate nonce chan (Validate @UNIX htran), answ) withMVar answ $ \msg -> case rcvData msg of Accepted{} -> pure True _ -> pure False debug $ "TRANS VALIDATION RESULT: " <+> pretty htran <+> pretty r pure r -- FIXME: leak-when-block-never-really-updated refChanNotifyOnUpdated :: (MonadIO m, ForRefChans e) => RefChanWorkerEnv e -> RefChanId e -> m () refChanNotifyOnUpdated env chan = do atomically $ modifyTVar (_refChanWorkerEnvNotify env) (HashMap.insert chan ()) refChanNotifyRelyFn :: forall e m . ( MonadUnliftIO m , ForRefChans e, e ~ L4Proto ) => RefChanWorkerEnv e -> RefChanId e -> RefChanNotify e -> m () refChanNotifyRelyFn env chan msg@(Notify _ (SignedBox k box s)) = do debug "refChanNotifyRelyFn" -- FIXME: multiple-hash-object-msg-performance let h0 = hashObject @HbSync (serialise msg) void $ runMaybeT do guard =<< liftIO (Cache.lookup (view refChanWorkerNotifiersDone env) h0 <&> isNothing) liftIO $ Cache.insert (view refChanWorkerNotifiersDone env) h0 () -- RefChanNotifier q _ <- MaybeT $ liftIO (readTVarIO (view refChanWorkerNotifiers env) <&> HashMap.lookup chan) notifiers <- MaybeT $ liftIO (readTVarIO (view refChanWorkerNotifiers env) <&> HashMap.lookup chan) forM_ notifiers $ \(RefChanNotifier _ q _) -> do atomically $ writeTQueue q (Notify @UNIX chan (SignedBox k box s)) refChanNotifyRelyFn _ _ _ = pure () refChanAddDownload :: forall e m . ( m ~ PeerM e IO , MyPeer e ) => RefChanWorkerEnv e -> RefChanId e -> HashRef -> OnDownloadComplete -> m () refChanAddDownload env chan r onComlete = do penv <- ask t <- getTimeCoarse withPeerM penv $ withDownload (_refChanWorkerEnvDEnv env) $ addDownload @e Nothing (fromHashRef r) atomically $ modifyTVar (view refChanWorkerEnvDownload env) (HashMap.insert r (chan,(t, onComlete))) data NotifyEnv = NotifyEnv { _notifyClient :: Fabriq UNIX , _notifySelf :: Peer UNIX } newtype NotifyProtoM m a = NotifyProto { fromNotifyProto :: ReaderT NotifyEnv m a } deriving newtype ( Functor , Applicative , Monad , MonadIO , MonadUnliftIO , MonadReader NotifyEnv , MonadTrans ) runNotifyProtoM :: (MonadIO m, PeerMessaging UNIX) => MessagingUnix -> NotifyProtoM m a -> m a runNotifyProtoM bus m = runReaderT (fromNotifyProto m) (NotifyEnv (Fabriq bus) (msgUnixSelf bus)) instance Monad m => HasFabriq UNIX (NotifyProtoM m) where getFabriq = asks _notifyClient instance Monad m => HasOwnPeer UNIX (NotifyProtoM m) where ownPeer = asks _notifySelf refChanNotifyRpcProto :: forall e m . ( MonadIO m , Request e (RefChanNotify e) m -- , HasPeerNonce UNIX m , e ~ UNIX -- , m ~ PeerM e IO ) => RefChanWorkerEnv L4Proto -> RefChanNotify e -> m () refChanNotifyRpcProto env msg@(ActionRequest chan action) = do let penv = view refChanPeerEnv env case action of RefChanAnnounceBlock h -> do debug $ "RefChanNotify: RefChanAnnounceBlock" <+> pretty h liftIO $ withPeerM penv $ do sto <- getStorage sz' <- liftIO $ hasBlock sto (fromHashRef h) maybe1 sz' none $ \sz -> do ann <- simpleBlockAnnounce @L4Proto sz (fromHashRef h) gossip ann pure () RefChanFetch h -> do debug $ "RefChanNotify: RefChanFetch" <+> pretty h liftIO $ withPeerM penv $ do refChanAddDownload env chan h (const $ pure ()) where proto = Proxy @(RefChanNotify e) refChanNotifyRpcProto env msg@(Notify chan (SignedBox pk box si)) = do debug "GOT MESSAGE FROM CLIENT" atomically $ writeTQueue (view refChanWorkerNotifiersInbox env) (Notify @L4Proto chan (SignedBox pk box si)) -- тут мы должны переслать всем, кроме отправителя let h0 = hashObject @HbSync (serialise msg) -- FIXME: squash-this-copypaste void $ runMaybeT do notifiers <- MaybeT $ liftIO (readTVarIO (view refChanWorkerNotifiers env) <&> HashMap.lookup chan) forM_ notifiers $ \(RefChanNotifier peer q _) -> do let lkey = (peer, h0) -- guard =<< liftIO (Cache.lookup (view refChanWorkerLocalRelyDone env) lkey <&> isNothing) -- liftIO $ Cache.insert (view refChanWorkerLocalRelyDone env) lkey () atomically $ writeTQueue q msg refChanWorkerInitNotifiers :: forall e m . ( MonadIO m , MonadUnliftIO m , MyPeer e -- , ForRefChans e -- , ForRefChans UNIX , m ~ PeerM e IO , e ~ L4Proto ) => RefChanWorkerEnv e -> m () refChanWorkerInitNotifiers env = do penv <- ask debug "refChanWorkerInitNotifiers" let (PeerConfig syn) = view refChanWorkerConf env let notifiers = [ mkV rc x | ListVal [ SymbolVal "notify" , SymbolVal "refchan" , LitStrVal rc , ListVal [ SymbolVal "socket", SymbolVal "unix", LitStrVal x ] ] <- syn ] & catMaybes forM_ notifiers $ \(rc, sa) -> do q <- newTQueueIO -- FIXME: restart-notifiers -- сейчас если один нотификатор упал -- упадут все -- сделать так, что бы рестартовал только один нотификатор val <- asyncLinked $ withPeerM penv (notifierThread rc sa q) let rcn = RefChanNotifier (fromString sa) q val atomically $ modifyTVar (_refChanWorkerNotifiers env) (HashMap.insertWith (<>) rc [rcn]) where mkV :: Text -> Text -> Maybe (RefChanId e, String) mkV rc x = (,Text.unpack x) <$> fromStringMay @(RefChanId e) (Text.unpack rc) notifierThread rc sa q = flip runContT (either throwIO (const none)) do debug $ ">>> Notifier thread started" <+> pretty (AsBase58 rc, sa) client <- newMessagingUnix False 1.0 sa msg <- ContT $ withAsync $ runMessagingUnix client disp <- ContT $ withAsync $ runNotifyProtoM client do runProto [ makeResponse (refChanNotifyRpcProto env) ] rely <- ContT $ withAsync $ runNotifyProtoM client do forever do req <- atomically $ readTQueue q debug "Rely notification request" request @UNIX (fromString sa) req r <- waitAnyCatchCancel [msg, disp, rely] warn $ ">>> Notifier thread for" <+> pretty sa <+> "terminated" <+> viaShow (snd r) atomically $ modifyTVar (_refChanWorkerNotifiers env) (HashMap.delete rc) pure (snd r) data ValidateEnv = ValidateEnv { _validateClient :: Fabriq UNIX , _validateSelf :: Peer UNIX } newtype ValidateProtoM m a = ValidateProto { fromValidateProto :: ReaderT ValidateEnv m a } deriving newtype ( Functor , Applicative , Monad , MonadIO , MonadUnliftIO , MonadReader ValidateEnv , MonadTrans ) runValidateProtoM :: (MonadIO m, PeerMessaging UNIX) => MessagingUnix -> ValidateProtoM m a -> m a runValidateProtoM tran m = runReaderT (fromValidateProto m) (ValidateEnv (Fabriq tran) (msgUnixSelf tran)) instance Monad m => HasFabriq UNIX (ValidateProtoM m) where getFabriq = asks _validateClient instance Monad m => HasOwnPeer UNIX (ValidateProtoM m) where ownPeer = asks _validateSelf refChanValidateProto :: forall e m . ( MonadIO m , Request e (RefChanValidate e) m , Response e (RefChanValidate e) m , e ~ UNIX ) => Cache (RefChanValidateNonce e) (MVar (RefChanValidate e)) -> RefChanValidate e -> m () refChanValidateProto waiters msg = do debug $ "GOT ANSWER FROM VALIDATOR" <+> pretty msg case rcvData msg of Accepted h -> emitAnswer h msg Rejected h -> emitAnswer h msg _ -> none where emitAnswer h m = liftIO do debug $ "EMIT ANSWER" <+> pretty h mbAnsw <- Cache.lookup waiters (rcvNonce m) maybe1 mbAnsw none $ \answ -> do putMVar answ m refChanWorkerInitValidators :: forall e m . ( MonadIO m , MonadUnliftIO m -- , MyPeer e -- , ForRefChans e -- , ForRefChans UNIX -- , m ~ PeerM e IO , e ~ L4Proto ) => RefChanWorkerEnv e -> m () refChanWorkerInitValidators env = do debug "refChanWorkerInitValidators" let (PeerConfig syn) = view refChanWorkerConf env let validators = [ mkV rc x | ListVal [ SymbolVal "validate" , SymbolVal "refchan" , LitStrVal rc , ListVal [ SymbolVal "socket", SymbolVal "unix", LitStrVal x ] ] <- syn ] & catMaybes forM_ validators $ \(rc, sa) -> do debug $ "** VALIDATOR FOR" <+> pretty (AsBase58 rc, sa) here <- readTVarIO (_refChanWorkerValidators env) <&> HashMap.member rc unless here do q <- newTQueueIO val <- asyncLinked $ validatorThread rc sa q let rcv = RefChanValidator q val atomically $ modifyTVar (_refChanWorkerValidators env) (HashMap.insert rc rcv) where mkV :: Text -> Text -> Maybe (RefChanId e, String) mkV rc x = (,Text.unpack x) <$> fromStringMay @(RefChanId e) (Text.unpack rc) -- FIXME: make-thread-respawning validatorThread chan sa q = liftIO $ flip runContT (either throwIO (const none)) do client <- newMessagingUnix False 1.0 sa msg <- ContT $ withAsync $ runMessagingUnix client -- FIXME: hardcoded-timeout waiters <- liftIO $ Cache.newCache (Just (toTimeSpec (10 :: Timeout 'Seconds))) disp <- ContT $ withAsync $ runValidateProtoM client do runProto [ makeResponse (refChanValidateProto waiters) ] poke <- ContT $ withAsync $ runValidateProtoM client do pause @'Seconds 10 mv <- newEmptyMVar nonce <- newNonce @(RefChanValidate UNIX) atomically $ writeTQueue q (RefChanValidate @UNIX nonce chan Poke, mv) rely <- ContT $ withAsync $ runValidateProtoM client do (req, answ) <- atomically $ readTQueue q case rcvData req of Validate href -> do debug $ "DO REQUEST VALIDATE" <+> pretty href <+> pretty sa liftIO $ Cache.insert waiters (rcvNonce req) answ let pa = fromString sa request pa req Poke{} -> do debug "DO SEND POKE" let pa = fromString sa request pa req _ -> pure () r <- waitAnyCatchCancel [msg, disp, poke, rely] atomically $ modifyTVar (_refChanWorkerValidators env) (HashMap.delete chan) warn $ "*** RefChan validate thread:" <+> pretty sa <+> viaShow (snd r) pure (snd r) runRefChanRelyWorker :: forall e m . ( MonadIO m , m ~ PeerM e IO , e ~ L4Proto ) => RefChanWorkerEnv e -> RefChanAdapter e (ResponseM e m) -> IO () runRefChanRelyWorker env adapter = liftIO $ forever do withPeerM (view refChanPeerEnv env) do me <- ownPeer @e -- FIXME: use-bounded-queue-ASAP mess <- atomically $ readTQueue (view refChanWorkerNotifiersInbox env) runResponseM me $ do refChanNotifyProto True adapter mess {- HLINT ignore "Functor law" -} refChanWorker :: forall e s m . ( MonadIO m , MonadUnliftIO m , MyPeer e , HasStorage m , Request e (RefChanHead e) m , Request e (RefChanRequest e) m , Sessions e (KnownPeer e) m , Signatures s , s ~ Encryption e , IsRefPubKey s -- , Pretty (AsBase58 (PubKey 'Sign s)) , ForRefChans e , EventListener e (RefChanRound e) m , EventListener e (RefChanRequest e) m , Sessions e (RefChanRound e) m , m ~ PeerM e IO , e ~ L4Proto ) => RefChanWorkerEnv e -> SomeBrains e -> m () refChanWorker env@RefChanWorkerEnv{..} brains = do penv <- ask mergeQ <- newTQueueIO -- FIXME: resume-on-exception -- FIXME: insist-more-during-download -- что-то частая ситуация, когда блоки -- с трудом докачиваются. надо бы -- разобраться. возможно переделать -- механизм скачивания блоков -- -- всё это нужно вместе. соответственно, -- упало одно - отменяем всё и простреливаем -- наверх. -- соответственно - bracket на каждый поток flip runContT (either throwIO (const none) .snd) do hw <- ContT $ withAsync (refChanHeadMon penv) downloads <- ContT $ withAsync (monitorHeadDownloads penv) polls <- ContT $ withAsync (refChanPoll penv) wtrans <- ContT $ withAsync (liftIO $ withPeerM penv $ refChanWriter) cleanup1 <- ContT $ withAsync (liftIO (cleanupRounds penv)) merge <- ContT $ withAsync (liftIO $ logMergeProcess penv env mergeQ) sto <- lift getStorage liftIO $ refChanWorkerInitValidators env lift $ refChanWorkerInitNotifiers env liftIO $ withPeerM penv do subscribe @e RefChanRequestEventKey $ \(RefChanRequestEvent chan val) -> do debug $ "RefChanRequestEvent" <+> pretty (AsBase58 chan) <+> pretty val h <- liftIO $ getRef sto (RefChanLogKey @s chan) -- игнорируем, если синхронно unless ((HashRef <$> h) == Just val) do refChanAddDownload env chan val $ \href -> do debug $ "BLOCK DOWNLOADED" <+> pretty href atomically $ writeTQueue mergeQ (chan, href) atomically $ writeTQueue mergeQ (chan, val) bullshit <- ContT $ withAsync $ forever do pause @'Seconds 10 debug "I'm refchan worker" waitAnyCatchCancel [hw,downloads,polls,wtrans,merge,cleanup1,bullshit] where cleanupRounds penv = withPeerM penv do rounds <- newTVarIO HashSet.empty subscribe @e RefChanRoundEventKey $ \(RefChanRoundEvent rcrk) -> do atomically $ modifyTVar rounds (HashSet.insert rcrk) debug $ "ON ROUND STARTED" <+> pretty rcrk forever do -- FIXME: use-polling-function-and-respect-wait pause @'Seconds 10 now <- getTimeCoarse xs <- readTVarIO rounds <&> HashSet.toList forM_ xs $ \x -> do void $ runMaybeT do se <- MaybeT $ find @e x id closed <- readTVarIO (view refChanRoundClosed se) trans <- atomically $ readTVar (view refChanRoundTrans se) <&> HashSet.toList let ttl = view refChanRoundTTL se when (closed || ttl <= now) do lift $ expire x when closed do forM_ trans $ \t -> do debug $ "WRITING TRANS" <+> pretty t lift $ refChanWriteTranFn env t atomically $ modifyTVar rounds (HashSet.delete x) debug $ "CLEANUP ROUND" <+> pretty x refChanWriter = do sto <- getStorage forever do pause @'Seconds 1 _ <- atomically $ peekTQueue (view refChanWorkerEnvWriteQ env) htrans <- liftIO $ atomically $ flushTQueue (view refChanWorkerEnvWriteQ env) trans <- forM htrans $ \h -> runMaybeT do blk <- MaybeT $ liftIO (getBlock sto (fromHashRef h)) upd <- MaybeT $ pure $ deserialiseOrFail @(RefChanUpdate e) blk & either (const Nothing) Just case upd of Propose chan box -> do lift $ tryDownloadContent env chan box pure (RefChanLogKey @(Encryption e) chan, h) Accept chan _ -> pure (RefChanLogKey @(Encryption e) chan, h) let byChan = HashMap.fromListWith (<>) [ (x, [y]) | (x,y) <- catMaybes trans ] -- FIXME: thread-num-hardcode-to-remove pooledForConcurrentlyN_ 4 (HashMap.toList byChan) $ \(c,new) -> do mbLog <- liftIO $ getRef sto c hashes <- maybe1 mbLog (pure mempty) $ readLog (getBlock sto) . HashRef -- FIXME: might-be-problems-on-large-logs let hashesNew = HashSet.fromList (hashes <> new) & HashSet.toList -- FIXME: remove-chunk-num-hardcode -- $class: hardcode let pt = toPTree (MaxSize 256) (MaxNum 256) hashesNew nref <- makeMerkle 0 pt $ \(_,_,bss) -> void $ liftIO $ putBlock sto bss -- TODO: ASAP-notify-on-refchan-update -- $workflow: wip updateRef sto c nref notifyOnRefChanUpdated env c nref refChanPoll penv = withPeerM penv do let listRefs = listPolledRefs @e brains (Just "refchan") <&> fmap (\(a,_,b) -> (a,b)) <&> fmap (over _2 ( (*60) . fromIntegral) ) polling (Polling 5 5) listRefs $ \ref -> do debug $ "POLLING REFCHAN" <+> pretty (AsBase58 ref) broadCastMessage (RefChanGetHead @e ref) broadCastMessage (RefChanRequest @e ref) monitorHeadDownloads penv = withPeerM penv $ forever do pause @'Seconds 1 all <- atomically $ readTVar (view refChanWorkerEnvDownload env) <&> HashMap.toList now <- getTimeCoarse -- FIXME: change-to-polling-functions -- FIXME: consider-timeouts-or-leak-is-possible rest <- forM all $ \(r,item@(chan,(t,onComplete))) -> do here <- checkDownloaded r if here then do liftIO $ onComplete r -- refChanOnHeadFn env chan (RefChanHeadBlockTran r) pure mempty else do -- FIXME: fix-timeout-hardcode let expired = realToFrac (toNanoSecs $ now - t) / 1e9 > 600 if expired then pure mempty else pure [(r,item)] atomically $ writeTVar (view refChanWorkerEnvDownload env) (HashMap.fromList (mconcat rest)) -- FIXME: in-parallel? refChanHeadMon pe = liftIO $ withPeerM pe do forever do (chan, RefChanHeadBlockTran hr) <- atomically $ readTQueue (view refChanWorkerEnvHeadQ env) here <- checkDownloaded hr if not here then do refChanAddDownload env chan hr (withPeerM pe . refChanOnHeadFn env chan . RefChanHeadBlockTran) trace $ "BLOCK IS NOT HERE" <+> pretty hr else do sto <- getStorage trace $ "BLOCK IS HERE" <+> pretty hr -- читаем блок lbs <- readBlobFromTree (getBlock sto) hr <&> fromMaybe mempty let what = unboxSignedBox @(RefChanHeadBlock e) @s lbs notify <- atomically $ do no <- readTVar _refChanWorkerEnvNotify <&> HashMap.member chan modifyTVar _refChanWorkerEnvNotify (HashMap.delete chan) pure no case what of Nothing -> err $ "malformed head block" <+> pretty hr Just (pk,blk) | pk == chan -> do let rkey = RefChanHeadKey @s pk debug $ "Good head block" <+> pretty hr <+> "processing..." ourVersion <- runMaybeT do cur <- MaybeT $ liftIO $ getRef sto rkey lbss <- MaybeT $ readBlobFromTree (getBlock sto) (HashRef cur) (_, blkOur) <- MaybeT $ pure $ unboxSignedBox @(RefChanHeadBlock e) @s lbss pure $ view refChanHeadVersion blkOur let v0 = fromMaybe 0 ourVersion let v1 = view refChanHeadVersion blk if v1 > v0 then do debug $ "UPDATING HEAD BLOCK" <+> pretty (v1, v0) liftIO $ updateRef sto rkey (fromHashRef hr) -- если это мы сами его обновили - то неплохо бы -- всем разослать уведомление. А как? -- -- TODO: update-acl-here forM_ (HashMap.keys $ view refChanHeadPeers blk) $ \pip -> do debug $ "ADD PEER ACL" <+> pretty (AsBase58 chan) <+> pretty(AsBase58 pip) forM_ (view refChanHeadAuthors blk) $ \au -> do debug $ "ADD AUTHOR ACL" <+> pretty (AsBase58 chan) <+> pretty(AsBase58 au) when notify do debug $ "NOTIFY-ALL-HEAD-UPDATED" <+> pretty (AsBase58 pk) <+> pretty hr broadCastMessage (RefChanHead @e pk (RefChanHeadBlockTran hr)) else do debug $ "LEAVING HEAD BLOCK" <+> pretty (v1, v0) _ -> debug "not subscribed to this refchan" pure () -- распаковываем блок -- вытаскиваем ключ из блока? pure () tryDownloadContent env chan box = runMaybeT do (_, ProposeTran _ pbox) <- MaybeT $ pure $ unboxSignedBox0 box (_, bss) <- MaybeT $ pure $ unboxSignedBox0 pbox let what = tryDetect (hashObject bss) (LBS.fromStrict bss) down <- case what of AnnRef (AnnotatedHashRef a b ) -> do pure $ b : maybeToList a SeqRef (SequentialRef _ (AnnotatedHashRef a b))-> do pure $ b : maybeToList a _ -> pure mempty for_ (List.nub down) $ \blk -> do lift $ refChanAddDownload env chan blk dontHandle data MergeEnv e = MergeEnv { mergeSto :: AnyStorage , mergeHeads :: TVar (HashMap HashRef (RefChanHeadBlock e) ) } -- FIXME: possible-performance-issues -- Выглядит довольно медленно. Вероятно, -- можно быстрее. -- В частности, кэшировать уже обработанные логи logMergeProcess :: forall e s m . ( MonadUnliftIO m , MyPeer e , ForRefChans e , HasStorage m , Signatures s , Pretty (AsBase58 (PubKey 'Sign s)) , s ~ Encryption e , m ~ PeerM e IO ) => PeerEnv e -> RefChanWorkerEnv e -> TQueue (RefChanId e, HashRef) -> IO () logMergeProcess penv env q = withPeerM penv do sto <- getStorage menv <- MergeEnv sto <$> newTVarIO mempty forever do -- FIXME: fix-hardcoded-timeout pause @'Seconds 1 _ <- atomically $ peekTQueue q logs <- liftIO $ atomically $ flushTQueue q let byChan = HashMap.fromListWith (<>) [ (x,[y]) | (x,y) <- logs ] & HashMap.toList & fmap (over _2 List.nub) -- FIXME: in-parallel mapM_ (logMergeChan menv sto) byChan where getHead :: MergeEnv e -> HashRef -> m (Maybe (RefChanHeadBlock e)) getHead e h = do let sto = mergeSto e hd <- readTVarIO (mergeHeads e) <&> HashMap.lookup h here <- liftIO $ hasBlock sto (fromHashRef h) <&> isJust unless here do warn $ "refchan. head is missed:" <+> pretty h pure () case hd of Just x -> pure (Just x) Nothing -> runMaybeT do hdblob <- MaybeT $ readBlobFromTree ( liftIO . getBlock sto ) h (_, headblk) <- MaybeT $ pure $ unboxSignedBox @_ @s hdblob atomically $ modifyTVar (mergeHeads e) (HashMap.insert h headblk) pure headblk downloadMissedHead :: AnyStorage -> RefChanId e -> HashRef -> m () downloadMissedHead sto chan headRef = do penv <- ask here <- liftIO $ hasBlock sto (fromHashRef headRef) <&> isJust unless here do refChanAddDownload env chan headRef (withPeerM penv . refChanOnHeadFn env chan . RefChanHeadBlockTran) logMergeChan menv sto (chan, logs) = do penv <- ask let readFn = getBlock sto void $ runMaybeT do let chanKey = RefChanLogKey @s chan -- FIXME: wont-work-if-no-reference-yet -- не сработает если ссылка новая (mergeSet, merge) <- liftIO (getRef sto chanKey) >>= \case Nothing -> do new <- mconcat <$> mapM (lift . readLog readFn) logs pure (HashSet.fromList new, not (List.null new)) Just h -> do current <- lift $ readLog readFn (HashRef h) <&> HashSet.fromList new <- mconcat <$> mapM (lift . readLog readFn) (filter (/= HashRef h) logs) let mergeSet = HashSet.fromList new <> current pure (mergeSet, not (List.null new)) guard merge -- итак, тут приехал весь лог, который есть у пира -- логично искать подтверждения только в нём. если -- пир принял транзы без достаточного количества -- подтверждений, то он сам лошара. -- каждую транзу рассматриваем один раз, если -- она смержена. -- если она не смержена --- может быть, надо её -- в какой-то reject список заносить -- распаковать, отсортировать по головам сначала -- потом бежим по головам, достаём головы -- проверяем acl-ы на соответствие историческим головам -- потом связываем каждый accept с соответствующим propose -- потом считаем количество accept для каждого propose -- потом, если всё ок -- пишем accept-ы и propose-ы у которых -- больше quorum подтверждений для актуальной головы let mergeList = HashSet.toList mergeSet downQ <- newTQueueIO -- если какие-то транзакции отсутствуют - пытаемся их скачать -- и надеемся на лучшее (лог сойдется в следующий раз) forM_ mergeList $ \href -> do mblk <- liftIO $ getBlock sto (fromHashRef href) maybe1 mblk (lift $ refChanAddDownload env chan href dontHandle) dontHandle r <- forM mergeList $ \href -> runMaybeT do blk <- MaybeT $ liftIO $ getBlock sto (fromHashRef href) tran <- MaybeT $ pure $ deserialiseOrFail @(RefChanUpdate e) blk & either (const Nothing) Just case tran of Propose _ box -> do (pk, ProposeTran headRef pbox) <- MaybeT $ pure $ unboxSignedBox0 box liftIO $ withPeerM penv $ tryDownloadContent env chan box (ak, bss) <- MaybeT $ pure $ unboxSignedBox0 pbox lift $ lift $ downloadMissedHead sto chan headRef hd <- MaybeT $ lift $ getHead menv headRef let quo = view refChanHeadQuorum hd & fromIntegral guard $ checkACL ACLUpdate hd (Just pk) ak pure [(href, (quo,mempty))] Accept _ box -> do (pk, AcceptTran _ headRef hashRef) <- MaybeT $ pure $ unboxSignedBox0 box lift $ lift $ downloadMissedHead sto chan headRef hd <- MaybeT $ lift $ getHead menv headRef let quo = view refChanHeadQuorum hd & fromIntegral guard $ HashMap.member pk (view refChanHeadPeers hd) pure [(hashRef, (quo,[href]))] let merge1 (q1, hs1) (q2, hs2) = (max q1 q2, List.nub (hs1 <> hs2) ) let permitted = HashMap.fromListWith merge1 (mconcat (catMaybes r)) & HashMap.toList new <- S.toList_ do forM_ permitted $ \(prop, (qx, accs)) -> do when (length accs >= qx) do S.yield prop S.each accs debug $ "new trans to merge" <+> pretty (AsBase58 chan) <+> pretty (length new) let merged = HashSet.fromList new deps <- atomically $ flushTQueue downQ for_ deps $ \(parent, AnnotatedHashRef a r) -> do when (parent `HashSet.member` merged) do debug $ "#@#@#@ !!! RefChan.download deps" <+> pretty a <+> pretty r lift $ refChanAddDownload env chan r dontHandle maybe1 a none $ \ann -> do lift $ refChanAddDownload env chan ann dontHandle unless (HashSet.null merged) do -- FIXME: sub-optimal-partition -- убрать этот хардкод размеров -- он приводит к излишне мелким блокам let pt = toPTree (MaxSize 256) (MaxNum 256) (HashSet.toList merged) liftIO do nref <- makeMerkle 0 pt $ \(_,_,bss) -> do void $ putBlock sto bss updateRef sto chanKey nref notifyOnRefChanUpdated env chanKey nref notifyOnRefChanUpdated :: forall e s m . ( ForRefChans e , s ~ Encryption e , MonadUnliftIO m ) => RefChanWorkerEnv e -> RefChanLogKey s -> Hash HbSync -> m () notifyOnRefChanUpdated RefChanWorkerEnv{..} c nref = do emitNotify _refChanNotifySource notification debug $ "REFCHAN UPDATED:" <+> pretty c <+> pretty nref where notification = (RefChanNotifyKey (coerce c), RefChanUpdated (coerce c) (HashRef nref))