From 8e567c87d03699abb9cfc9dbeaf6c9d14f73f72a Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Sat, 15 Jul 2023 07:48:32 +0300 Subject: [PATCH] wip --- hbs2-core/lib/HBS2/Net/Proto/RefChan.hs | 2 + hbs2-peer/app/PeerMain.hs | 1 + hbs2-peer/app/RefChan.hs | 53 ++++++++++++++++++++----- 3 files changed, 45 insertions(+), 11 deletions(-) diff --git a/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs b/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs index c187a7ed..5cc7394a 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs @@ -61,6 +61,7 @@ type ForRefChans e = ( Serialise ( PubKey 'Sign (Encryption e)) , Pretty (AsBase58 (PubKey 'Sign (Encryption e))) , FromStringMaybe (PubKey 'Sign (Encryption e)) , Serialise (Signature (Encryption e)) + , Hashable (PubKey 'Sign (Encryption e)) ) instance ForRefChans e => Serialise (RefChanHeadBlock e) @@ -106,6 +107,7 @@ data RefChanHead e = instance ForRefChans e => Serialise (RefChanHead e) + data RefChanHeadAdapter e m = RefChanHeadAdapter { refChanHeadOnHead :: RefChanId e -> RefChanHeadBlockTran e -> m () diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 46065663..ade69bf1 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -992,6 +992,7 @@ runPeer opts = U.handle (\e -> myException e Left{} -> err $ "can't read head block" <+> pretty h Right (SignedBox k _ _) -> do let msg = RefChanHead k (RefChanHeadBlockTran (HashRef h)) + refChanNotifyOnUpdated rce k runResponseM me $ refChanHeadProto @e True refChanHeadAdapter msg let refChanHeadGetAction puk = do diff --git a/hbs2-peer/app/RefChan.hs b/hbs2-peer/app/RefChan.hs index 30b07b44..b1b11ec9 100644 --- a/hbs2-peer/app/RefChan.hs +++ b/hbs2-peer/app/RefChan.hs @@ -3,10 +3,11 @@ module RefChan ( RefChanWorkerEnv(..) , refChanWorkerEnvHeadQ - , refChaWorkerEnvDownload + , refChanWorkerEnvDownload , refChanOnHead , refChanWorker , refChanWorkerEnv + , refChanNotifyOnUpdated ) where import HBS2.Prelude.Plated @@ -14,12 +15,15 @@ import HBS2.Prelude.Plated import HBS2.Actors.Peer import HBS2.Base58 import HBS2.Clock +import HBS2.Net.Proto.Peer +import HBS2.Net.Proto.Sessions import HBS2.Data.Detect import HBS2.Data.Types.Refs import HBS2.Net.Auth.Credentials import HBS2.Net.Proto import HBS2.Net.Proto.RefChan import HBS2.Net.Proto.Types +import HBS2.Net.Proto.Definition() import HBS2.Storage import HBS2.System.Logger.Simple @@ -51,26 +55,33 @@ instance Exception DataNotReady data RefChanWorkerEnv e = RefChanWorkerEnv - { _refChanWorkerEnvDownload :: DownloadEnv e + { _refChanWorkerEnvDEnv :: DownloadEnv e , _refChanWorkerEnvHeadQ :: TQueue (RefChanId e, RefChanHeadBlockTran e) - , _refChaWorkerEnvDownload :: TVar (HashMap HashRef (RefChanId e, TimeSpec)) + , _refChanWorkerEnvDownload :: TVar (HashMap HashRef (RefChanId e, TimeSpec)) + , _refChanWorkerEnvNotify :: TVar (HashMap (RefChanId e) ()) } makeLenses 'RefChanWorkerEnv -refChanWorkerEnv :: forall m e . MonadIO m +refChanWorkerEnv :: forall m e . (MonadIO m, ForRefChans e) => PeerConfig -> DownloadEnv e -> m (RefChanWorkerEnv e) refChanWorkerEnv _ de = liftIO $ RefChanWorkerEnv @e de <$> newTQueueIO <*> newTVarIO mempty + <*> newTVarIO mempty refChanOnHead :: MonadIO m => RefChanWorkerEnv e -> RefChanId e -> RefChanHeadBlockTran e -> m () refChanOnHead env chan tran = do atomically $ writeTQueue (view refChanWorkerEnvHeadQ env) (chan, tran) +-- FIXME: leak-when-block-never-really-updated +refChanNotifyOnUpdated :: (MonadIO m, ForRefChans e) => RefChanWorkerEnv e -> RefChanId e -> m () +refChanNotifyOnUpdated env chan = do + atomically $ modifyTVar (_refChanWorkerEnvNotify env) (HashMap.insert chan ()) + refChanAddDownload :: forall e m . ( m ~ PeerM e IO , MyPeer e , Block ByteString ~ ByteString @@ -79,10 +90,10 @@ refChanAddDownload :: forall e m . ( m ~ PeerM e IO refChanAddDownload env chan r = do penv <- ask t <- getTimeCoarse - withPeerM penv $ withDownload (_refChanWorkerEnvDownload env) + withPeerM penv $ withDownload (_refChanWorkerEnvDEnv env) $ processBlock @e (fromHashRef r) - atomically $ modifyTVar (view refChaWorkerEnvDownload env) (HashMap.insert r (chan,t)) + atomically $ modifyTVar (view refChanWorkerEnvDownload env) (HashMap.insert r (chan,t)) checkDownloaded :: forall m . (MonadIO m, HasStorage m, Block ByteString ~ ByteString) => HashRef -> m Bool checkDownloaded hr = do @@ -120,6 +131,9 @@ refChanWorker :: forall e s m . ( MonadIO m , MonadUnliftIO m , MyPeer e , HasStorage m + , Request e (RefChanHead e) m + , HasProtocol e (RefChanHead e) + , Sessions e (KnownPeer e) m , Signatures s , s ~ Encryption e , IsRefPubKey s @@ -133,20 +147,24 @@ refChanWorker :: forall e s m . ( MonadIO m refChanWorker env = do - hw <- async refChanHeadMon + penv <- ask + + -- FIXME: resume-on-exception + hw <- async (refChanHeadMon penv) + downloads <- async monitorDownloads forever do pause @'Seconds 10 debug "I'm refchan worker" - mapM_ wait [hw,downloads] + mapM_ waitCatch [hw,downloads] where monitorDownloads = forever do pause @'Seconds 2 - all <- atomically $ readTVar (view refChaWorkerEnvDownload env) <&> HashMap.toList + all <- atomically $ readTVar (view refChanWorkerEnvDownload env) <&> HashMap.toList now <- getTimeCoarse @@ -161,10 +179,11 @@ refChanWorker env = do let expired = realToFrac (toNanoSecs $ now - t) / 1e9 > 600 if expired then pure mempty else pure [(r,item)] - atomically $ writeTVar (view refChaWorkerEnvDownload env) (HashMap.fromList (mconcat rest)) + atomically $ writeTVar (view refChanWorkerEnvDownload env) (HashMap.fromList (mconcat rest)) -- FIXME: in-parallel? - refChanHeadMon = do + refChanHeadMon pe = liftIO $ withPeerM pe do + forever do (chan, RefChanHeadBlockTran hr) <- atomically $ readTQueue (view refChanWorkerEnvHeadQ env) @@ -179,6 +198,11 @@ refChanWorker env = do lbs <- readBlob hr <&> fromMaybe mempty let what = unboxSignedBox @(RefChanHeadBlock e) @e lbs + notify <- atomically $ do + no <- readTVar (_refChanWorkerEnvNotify env) <&> HashMap.member chan + modifyTVar (_refChanWorkerEnvNotify env) (HashMap.delete chan) + pure no + case what of Nothing -> err $ "malformed head block" <+> pretty hr @@ -206,6 +230,13 @@ refChanWorker env = do if v1 > v0 then do debug $ "UPDATING HEAD BLOCK" <+> pretty (v1, v0) liftIO $ updateRef sto rkey (fromHashRef hr) + -- если это мы сами его обновили - то неплохо бы + -- всем разослать уведомление. А как? + when notify do + debug $ "NOTIFY-ALL-HEAD-UPDATED" <+> pretty (AsBase58 pk) <+> pretty hr + broadCastMessage (RefChanHead @e pk (RefChanHeadBlockTran hr)) + pure () + else do debug $ "LEAVING HEAD BLOCK" <+> pretty (v1, v0)