{-# Language AllowAmbiguousTypes #-} {-# Language TemplateHaskell #-} module RefChan ( RefChanWorkerEnv(..) , refChanWorkerEnvHeadQ , refChanWorkerEnvDownload , refChanOnHeadFn , refChanWriteTranFn , refChanValidateTranFn , refChanWorker , refChanWorkerEnv , refChanNotifyOnUpdated ) where import HBS2.Prelude.Plated import HBS2.Actors.Peer import HBS2.Base58 import HBS2.Clock import HBS2.Data.Detect import HBS2.Data.Types.Refs import HBS2.Events import HBS2.Merkle import HBS2.Net.Auth.Credentials import HBS2.Net.Messaging.Unix import HBS2.Net.Proto import HBS2.Net.Proto.Definition() import HBS2.Net.Proto.Peer import HBS2.Net.Proto.RefChan import HBS2.Net.Proto.Sessions import HBS2.Storage import HBS2.System.Logger.Simple import PeerTypes import PeerConfig import BlockDownload import Brains import Codec.Serialise import Control.Concurrent.STM (flushTQueue) import Control.Exception () import Control.Monad.Except (throwError, runExceptT) import Control.Monad.Reader import Control.Monad.Trans.Maybe import Data.ByteString.Lazy (ByteString) import Data.ByteString.Lazy qualified as LBS import Data.Cache (Cache) import Data.Cache qualified as Cache import Data.Coerce import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict qualified as HashMap import Data.HashSet (HashSet) import Data.HashSet qualified as HashSet import Data.Heap () -- import Data.Heap qualified as Heap import Data.List qualified as List import Data.Maybe import Data.Text qualified as Text import Lens.Micro.Platform import UnliftIO import Streaming.Prelude qualified as S import Streaming() {- HLINT ignore "Use newtype instead of data" -} data DataNotReady = DataNotReady deriving (Show) instance Exception DataNotReady type OnDownloadComplete = HashRef -> IO () data RefChanValidator = RefChanValidator { rcvInbox :: TQueue (RefChanValidate UNIX, MVar (RefChanValidate UNIX)) , rcvAsync :: Async () } data RefChanWorkerEnv e = RefChanWorkerEnv { _refChanWorkerConf :: PeerConfig , _refChanWorkerEnvDEnv :: DownloadEnv e , _refChanWorkerEnvHeadQ :: TQueue (RefChanId e, RefChanHeadBlockTran e) , _refChanWorkerEnvDownload :: TVar (HashMap HashRef (RefChanId e, (TimeSpec, OnDownloadComplete))) , _refChanWorkerEnvNotify :: TVar (HashMap (RefChanId e) ()) , _refChanWorkerEnvWriteQ :: TQueue HashRef , _refChanWorkerValidators :: TVar (HashMap (RefChanId e) RefChanValidator) } makeLenses 'RefChanWorkerEnv refChanWorkerEnv :: forall m e . (MonadIO m, ForRefChans e) => PeerConfig -> DownloadEnv e -> m (RefChanWorkerEnv e) refChanWorkerEnv conf de = liftIO $ RefChanWorkerEnv @e conf de <$> newTQueueIO <*> newTVarIO mempty <*> newTVarIO mempty <*> newTQueueIO <*> newTVarIO mempty refChanOnHeadFn :: forall e m . (ForRefChans e, MonadIO m) => RefChanWorkerEnv e -> RefChanId e -> RefChanHeadBlockTran e -> m () refChanOnHeadFn env chan tran = do atomically $ writeTQueue (view refChanWorkerEnvHeadQ env) (chan, tran) refChanWriteTranFn :: MonadIO m => RefChanWorkerEnv e -> HashRef -> m () refChanWriteTranFn env href = do atomically $ writeTQueue (view refChanWorkerEnvWriteQ env) href refChanValidateTranFn :: forall e m . ( MonadUnliftIO m , ForRefChans e, e ~ L4Proto , HasNonces (RefChanValidate UNIX) m ) => RefChanWorkerEnv e -> RefChanId e -> HashRef -> m Bool refChanValidateTranFn env chan htran = do mbv <- readTVarIO (view refChanWorkerValidators env) <&> HashMap.lookup chan -- отправить запрос в соответствующий... что? -- ждать ответа debug $ "VALIDATE TRAN" <+> pretty (AsBase58 chan) <+> pretty htran r <- maybe1 mbv (pure True) $ \(RefChanValidator q _) -> do answ <- newEmptyMVar nonce <- newNonce @(RefChanValidate UNIX) atomically $ writeTQueue q (RefChanValidate nonce chan (Validate @UNIX htran), answ) withMVar answ $ \msg -> case rcvData msg of Accepted{} -> pure True _ -> pure False debug $ "TRANS VALIDATION RESULT: " <+> pretty htran <+> pretty r pure r -- FIXME: leak-when-block-never-really-updated refChanNotifyOnUpdated :: (MonadIO m, ForRefChans e) => RefChanWorkerEnv e -> RefChanId e -> m () refChanNotifyOnUpdated env chan = do atomically $ modifyTVar (_refChanWorkerEnvNotify env) (HashMap.insert chan ()) refChanAddDownload :: forall e m . ( m ~ PeerM e IO , MyPeer e ) => RefChanWorkerEnv e -> RefChanId e -> HashRef -> OnDownloadComplete -> m () refChanAddDownload env chan r onComlete = do penv <- ask t <- getTimeCoarse withPeerM penv $ withDownload (_refChanWorkerEnvDEnv env) $ processBlock @e (fromHashRef r) atomically $ modifyTVar (view refChanWorkerEnvDownload env) (HashMap.insert r (chan,(t, onComlete))) -- FIXME: slow-deep-scan-exception-seems-not-working checkDownloaded :: forall m . (MonadIO m, HasStorage m) => HashRef -> m Bool checkDownloaded hr = do sto <- getStorage let readBlock h = liftIO $ getBlock sto h result <- S.toList_ $ deepScan ScanDeep (const $ S.yield Nothing) (fromHashRef hr) readBlock $ \ha -> do unless (fromHashRef hr == ha) do here <- liftIO $ hasBlock sto ha S.yield here pure $ maybe False (not . List.null) $ sequence result readLog :: forall m . ( MonadUnliftIO m ) => AnyStorage -> HashRef -> m [HashRef] readLog sto (HashRef h) = S.toList_ $ do walkMerkle h (liftIO . getBlock sto) $ \hr -> do case hr of Left{} -> pure () Right (hrr :: [HashRef]) -> S.each hrr data ValidateEnv = ValidateEnv { _validateClient :: Fabriq UNIX , _validateSelf :: Peer UNIX } newtype ValidateProtoM m a = PingPongM { fromValidateProto :: ReaderT ValidateEnv m a } deriving newtype ( Functor , Applicative , Monad , MonadIO , MonadUnliftIO , MonadReader ValidateEnv , MonadTrans ) runValidateProtoM :: (MonadIO m, PeerMessaging UNIX) => MessagingUnix -> ValidateProtoM m a -> m a runValidateProtoM tran m = runReaderT (fromValidateProto m) (ValidateEnv (Fabriq tran) (msgUnixSelf tran)) instance Monad m => HasFabriq UNIX (ValidateProtoM m) where getFabriq = asks _validateClient instance Monad m => HasOwnPeer UNIX (ValidateProtoM m) where ownPeer = asks _validateSelf refChanValidateProto :: forall e m . ( MonadIO m , Request e (RefChanValidate e) m , Response e (RefChanValidate e) m , e ~ UNIX ) => Cache (RefChanValidateNonce e) (MVar (RefChanValidate e)) -> RefChanValidate e -> m () refChanValidateProto waiters msg = do debug $ "GOT ANSWER FROM VALIDATOR" <+> pretty msg case rcvData msg of Accepted h -> emitAnswer h msg Rejected h -> emitAnswer h msg _ -> none where emitAnswer h m = liftIO do debug $ "EMIT ANSWER" <+> pretty h mbAnsw <- Cache.lookup waiters (rcvNonce m) maybe1 mbAnsw none $ \answ -> do putMVar answ m refChanWorkerInitValidators :: forall e m . ( MonadIO m , MonadUnliftIO m -- , MyPeer e -- , ForRefChans e -- , ForRefChans UNIX -- , m ~ PeerM e IO , e ~ L4Proto ) => RefChanWorkerEnv e -> m () refChanWorkerInitValidators env = do debug "refChanWorkerInitValidators" let (PeerConfig syn) = view refChanWorkerConf env let validators = [ mkV rc x | ListVal [ SymbolVal "validate" , SymbolVal "refchan" , LitStrVal rc , ListVal [ SymbolVal "socket", SymbolVal "unix", LitStrVal x ] ] <- syn ] & catMaybes forM_ validators $ \(rc, sa) -> do debug $ "** VALIDATOR FOR" <+> pretty (AsBase58 rc, sa) here <- readTVarIO (_refChanWorkerValidators env) <&> HashMap.member rc unless here do q <- newTQueueIO val <- async $ validatorThread rc sa q let rcv = RefChanValidator q val atomically $ modifyTVar (_refChanWorkerValidators env) (HashMap.insert rc rcv) where mkV :: Text -> Text -> Maybe (RefChanId e, String) mkV rc x = (,Text.unpack x) <$> fromStringMay @(RefChanId e) (Text.unpack rc) -- FIXME: make-thread-respawning validatorThread chan sa q = liftIO do client <- newMessagingUnix False 1.0 sa msg <- async $ runMessagingUnix client -- FIXME: hardcoded-timeout waiters <- Cache.newCache (Just (toTimeSpec (10 :: Timeout 'Seconds))) runValidateProtoM client do poke <- async $ forever do pause @'Seconds 10 mv <- newEmptyMVar nonce <- newNonce @(RefChanValidate UNIX) atomically $ writeTQueue q (RefChanValidate @UNIX nonce chan Poke, mv) z <- async $ runProto [ makeResponse (refChanValidateProto waiters) ] forever do (req, answ) <- atomically $ readTQueue q case rcvData req of Validate href -> do debug $ "DO REQUEST VALIDATE" <+> pretty href <+> pretty sa liftIO $ Cache.insert waiters (rcvNonce req) answ let pa = fromString sa request pa req Poke{} -> do debug "DO SEND POKE" let pa = fromString sa request pa req _ -> pure () (_, r) <- waitAnyCatch [z,poke] debug $ "SOMETHING WRONG:" <+> viaShow r cancel msg warn $ "validatorThread is terminated for some reasons" <+> pretty (AsBase58 chan) refChanWorker :: forall e s m . ( MonadIO m , MonadUnliftIO m , MyPeer e , HasStorage m , Request e (RefChanHead e) m , Request e (RefChanRequest e) m , Sessions e (KnownPeer e) m , Signatures s , s ~ Encryption e , IsRefPubKey s -- , Pretty (AsBase58 (PubKey 'Sign s)) , ForRefChans e , EventListener e (RefChanRound e) m , EventListener e (RefChanRequest e) m , Sessions e (RefChanRound e) m , m ~ PeerM e IO , e ~ L4Proto ) => RefChanWorkerEnv e -> SomeBrains e -> m () refChanWorker env brains = do penv <- ask mergeQ <- newTQueueIO -- FIXME: resume-on-exception hw <- async (refChanHeadMon penv) -- FIXME: insist-more-during-download -- что-то частая ситуация, когда блоки -- с трудом докачиваются. надо бы -- разобраться. возможно переделать -- механизм скачивания блоков downloads <- async monitorHeadDownloads polls <- async refChanPoll wtrans <- async refChanWriter cleanup1 <- async cleanupRounds merge <- async (logMergeProcess env mergeQ) sto <- getStorage liftIO $ refChanWorkerInitValidators env subscribe @e RefChanRequestEventKey $ \(RefChanRequestEvent chan val) -> do debug $ "RefChanRequestEvent" <+> pretty (AsBase58 chan) <+> pretty val h <- liftIO $ getRef sto (RefChanLogKey @s chan) -- игнорируем, если синхронно unless ((HashRef <$> h) == Just val) do refChanAddDownload env chan val $ \href -> do debug $ "BLOCK DOWNLOADED" <+> pretty href atomically $ writeTQueue mergeQ (chan, href) atomically $ writeTQueue mergeQ (chan, val) forever do pause @'Seconds 10 debug "I'm refchan worker" mapM_ waitCatch [hw,downloads,polls,wtrans,merge,cleanup1] where cleanupRounds = do rounds <- newTVarIO HashSet.empty subscribe @e RefChanRoundEventKey $ \(RefChanRoundEvent rcrk) -> do atomically $ modifyTVar rounds (HashSet.insert rcrk) debug $ "ON ROUND STARTED" <+> pretty rcrk forever do -- FIXME: use-polling-function-and-respect-wait pause @'Seconds 10 now <- getTimeCoarse xs <- readTVarIO rounds <&> HashSet.toList forM_ xs $ \x -> do void $ runMaybeT do se <- MaybeT $ find @e x id closed <- readTVarIO (view refChanRoundClosed se) trans <- atomically $ readTVar (view refChanRoundTrans se) <&> HashSet.toList let ttl = view refChanRoundTTL se when (closed || ttl <= now) do lift $ expire x when closed do forM_ trans $ \t -> do debug $ "WRITING TRANS" <+> pretty t lift $ refChanWriteTranFn env t atomically $ modifyTVar rounds (HashSet.delete x) debug $ "CLEANUP ROUND" <+> pretty x refChanWriter = do sto <- getStorage forever do pause @'Seconds 1 _ <- atomically $ peekTQueue (view refChanWorkerEnvWriteQ env) htrans <- liftIO $ atomically $ flushTQueue (view refChanWorkerEnvWriteQ env) trans <- forM htrans $ \h -> runMaybeT do blk <- MaybeT $ liftIO (getBlock sto (fromHashRef h)) upd <- MaybeT $ pure $ deserialiseOrFail @(RefChanUpdate e) blk & either (const Nothing) Just case upd of Propose chan _ -> pure (RefChanLogKey @(Encryption e) chan, h) Accept chan _ -> pure (RefChanLogKey @(Encryption e) chan, h) let byChan = HashMap.fromListWith (<>) [ (x, [y]) | (x,y) <- catMaybes trans ] -- FIXME: process-in-parallel forM_ (HashMap.toList byChan) $ \(c,new) -> do mbLog <- liftIO $ getRef sto c hashes <- maybe1 mbLog (pure mempty) $ readLog sto . HashRef -- FIXME: might-be-problems-on-large-logs let hashesNew = HashSet.fromList (hashes <> new) & HashSet.toList -- -- FIXME: remove-chunk-num-hardcode let pt = toPTree (MaxSize 256) (MaxNum 256) hashesNew nref <- makeMerkle 0 pt $ \(_,_,bss) -> void $ liftIO $ putBlock sto bss liftIO $ updateRef sto c nref debug $ "REFCHANLOG UPDATED:" <+> pretty c <+> pretty nref refChanPoll = do let listRefs = listPolledRefs @e brains "refchan" <&> fmap (over _2 ( (*60) . fromIntegral) ) polling (Polling 5 5) listRefs $ \ref -> do debug $ "POLLING REFCHAN" <+> pretty (AsBase58 ref) broadCastMessage (RefChanGetHead @e ref) broadCastMessage (RefChanRequest @e ref) monitorHeadDownloads = forever do pause @'Seconds 1 all <- atomically $ readTVar (view refChanWorkerEnvDownload env) <&> HashMap.toList now <- getTimeCoarse -- FIXME: change-to-polling-functions -- FIXME: consider-timeouts-or-leak-is-possible rest <- forM all $ \(r,item@(chan,(t,onComplete))) -> do here <- checkDownloaded r if here then do liftIO $ onComplete r -- refChanOnHeadFn env chan (RefChanHeadBlockTran r) pure mempty else do -- FIXME: fix-timeout-hardcode let expired = realToFrac (toNanoSecs $ now - t) / 1e9 > 600 if expired then pure mempty else pure [(r,item)] atomically $ writeTVar (view refChanWorkerEnvDownload env) (HashMap.fromList (mconcat rest)) -- FIXME: in-parallel? refChanHeadMon pe = liftIO $ withPeerM pe do forever do (chan, RefChanHeadBlockTran hr) <- atomically $ readTQueue (view refChanWorkerEnvHeadQ env) here <- checkDownloaded hr if not here then do refChanAddDownload env chan hr (withPeerM pe . refChanOnHeadFn env chan . RefChanHeadBlockTran) trace $ "BLOCK IS NOT HERE" <+> pretty hr else do sto <- getStorage trace $ "BLOCK IS HERE" <+> pretty hr -- читаем блок lbs <- readBlobFromTree (getBlock sto) hr <&> fromMaybe mempty let what = unboxSignedBox @(RefChanHeadBlock e) @e lbs notify <- atomically $ do no <- readTVar (_refChanWorkerEnvNotify env) <&> HashMap.member chan modifyTVar (_refChanWorkerEnvNotify env) (HashMap.delete chan) pure no case what of Nothing -> err $ "malformed head block" <+> pretty hr Just (pk,blk) | pk == chan -> do let rkey = RefChanHeadKey @s pk debug $ "Good head block" <+> pretty hr <+> "processing..." ourVersion <- runMaybeT do cur <- MaybeT $ liftIO $ getRef sto rkey lbss <- MaybeT $ readBlobFromTree (getBlock sto) (HashRef cur) (_, blkOur) <- MaybeT $ pure $ unboxSignedBox @(RefChanHeadBlock e) @e lbss pure $ view refChanHeadVersion blkOur let v0 = fromMaybe 0 ourVersion let v1 = view refChanHeadVersion blk if v1 > v0 then do debug $ "UPDATING HEAD BLOCK" <+> pretty (v1, v0) liftIO $ updateRef sto rkey (fromHashRef hr) -- если это мы сами его обновили - то неплохо бы -- всем разослать уведомление. А как? -- -- TODO: update-acl-here forM_ (HashMap.keys $ view refChanHeadPeers blk) $ \pip -> do debug $ "ADD PEER ACL" <+> pretty (AsBase58 chan) <+> pretty(AsBase58 pip) forM_ (view refChanHeadAuthors blk) $ \au -> do debug $ "ADD AUTHOR ACL" <+> pretty (AsBase58 chan) <+> pretty(AsBase58 au) when notify do debug $ "NOTIFY-ALL-HEAD-UPDATED" <+> pretty (AsBase58 pk) <+> pretty hr broadCastMessage (RefChanHead @e pk (RefChanHeadBlockTran hr)) else do debug $ "LEAVING HEAD BLOCK" <+> pretty (v1, v0) _ -> debug "not subscribed to this refchan" pure () -- распаковываем блок -- вытаскиваем ключ из блока? pure () data MergeEnv e = MergeEnv { mergeSto :: AnyStorage , mergeHeads :: TVar (HashMap HashRef (RefChanHeadBlock e) ) } -- FIXME: possible-performance-issues -- Выглядит довольно медленно. Вероятно, -- можно быстрее. -- В частности, кэшировать уже обработанные логи logMergeProcess :: forall e s m . ( MonadUnliftIO m , MyPeer e , ForRefChans e , HasStorage m , Signatures s , s ~ Encryption e , m ~ PeerM e IO ) => RefChanWorkerEnv e -> TQueue (RefChanId e, HashRef) -> m () logMergeProcess env q = do sto <- getStorage menv <- MergeEnv sto <$> newTVarIO mempty forever do -- FIXME: fix-hardcoded-timeout pause @'Seconds 1 _ <- atomically $ peekTQueue q logs <- liftIO $ atomically $ flushTQueue q let byChan = HashMap.fromListWith (<>) [ (x,[y]) | (x,y) <- logs ] & HashMap.toList & fmap (over _2 List.nub) -- FIXME: in-parallel mapM_ (logMergeChan menv sto) byChan where getHead :: MergeEnv e -> HashRef -> m (Maybe (RefChanHeadBlock e)) getHead e h = do let sto = mergeSto e hd <- readTVarIO (mergeHeads e) <&> HashMap.lookup h here <- liftIO $ hasBlock sto (fromHashRef h) <&> isJust unless here do warn $ "refchan. head is missed:" <+> pretty h pure () case hd of Just x -> pure (Just x) Nothing -> runMaybeT do hdblob <- MaybeT $ readBlobFromTree ( liftIO . getBlock sto ) h (_, headblk) <- MaybeT $ pure $ unboxSignedBox @(RefChanHeadBlock e) @e hdblob atomically $ modifyTVar (mergeHeads e) (HashMap.insert h headblk) pure headblk downloadMissedHead :: AnyStorage -> RefChanId e -> HashRef -> m () downloadMissedHead sto chan headRef = do penv <- ask here <- liftIO $ hasBlock sto (fromHashRef headRef) <&> isJust unless here do refChanAddDownload env chan headRef (withPeerM penv . refChanOnHeadFn env chan . RefChanHeadBlockTran) logMergeChan menv sto (chan, logs) = do penv <- ask void $ runMaybeT do let chanKey = RefChanLogKey @s chan h <- MaybeT $ liftIO $ getRef sto chanKey current <- lift $ readLog sto (HashRef h) <&> HashSet.fromList -- trans <- filter (not . flip HashSet.member current) . mconcat <$> mapM (lift . readLog sto) logs trans <- mconcat <$> mapM (lift . readLog sto) (filter (/= HashRef h) logs) guard (not $ List.null trans) -- итак, тут приехал весь лог, который есть у пира -- логично искать подтверждения только в нём. если -- пир принял транзы без достаточного количества -- подтверждений, то он сам лошара. -- каждую транзу рассматриваем один раз, если -- она смержена. -- если она не смержена --- может быть, надо её -- в какой-то reject список заносить -- распаковать, отсортировать по головам сначала -- потом бежим по головам, достаём головы -- проверяем acl-ы на соответствие историческим головам -- потом связываем каждый accept с соответствующим propose -- потом считаем количество accept для каждого propose -- потом, если всё ок -- пишем accept-ы и propose-ы у которых -- больше quorum подтверждений для актуальной головы let mergeSet = (HashSet.fromList trans <> current) & HashSet.toList -- если какие-то транзакции отсутствуют - пытаемся их скачать -- и надеемся на лучшее (лог сойдется в следующий раз) forM_ mergeSet $ \href -> do mblk <- liftIO $ getBlock sto (fromHashRef href) maybe1 mblk (lift $ refChanAddDownload env chan href dontHandle) dontHandle r <- forM mergeSet $ \href -> runMaybeT do blk <- MaybeT $ liftIO $ getBlock sto (fromHashRef href) tran <- MaybeT $ pure $ deserialiseOrFail @(RefChanUpdate e) blk & either (const Nothing) Just case tran of Propose _ box -> do (pk, ProposeTran headRef box) <- MaybeT $ pure $ unboxSignedBox0 box (ak, _) <- MaybeT $ pure $ unboxSignedBox0 box lift $ lift $ downloadMissedHead sto chan headRef hd <- MaybeT $ lift $ getHead menv headRef let quo = view refChanHeadQuorum hd & fromIntegral guard $ checkACL hd (Just pk) ak pure [(href, (quo,mempty))] Accept _ box -> do (pk, AcceptTran headRef hashRef) <- MaybeT $ pure $ unboxSignedBox0 box lift $ lift $ downloadMissedHead sto chan headRef hd <- MaybeT $ lift $ getHead menv headRef let quo = view refChanHeadQuorum hd & fromIntegral guard $ HashMap.member pk (view refChanHeadPeers hd) pure [(hashRef, (quo,[href]))] let merge1 (q1, hs1) (q2, hs2) = (max q1 q2, List.nub (hs1 <> hs2) ) let permitted = HashMap.fromListWith merge1 (mconcat (catMaybes r)) & HashMap.toList new <- S.toList_ do forM_ permitted $ \(prop, (qx, accs)) -> do when (length accs >= qx) do S.yield prop S.each accs debug $ "new trans to merge" <+> pretty (AsBase58 chan) <+> pretty (length new) forM_ new $ \tnew -> do debug $ "TRANS TO MERGE" <+> pretty tnew let merged = HashSet.fromList new & HashSet.toList let pt = toPTree (MaxSize 256) (MaxNum 256) merged unless (List.null merged) do liftIO do nref <- makeMerkle 0 pt $ \(_,_,bss) -> do void $ putBlock sto bss debug $ "NEW REFCHAN" <+> pretty chanKey <+> pretty nref updateRef sto chanKey nref