This commit is contained in:
Dmitry Zuikov 2023-07-15 07:48:32 +03:00
parent 2d5406cee0
commit 8e567c87d0
3 changed files with 45 additions and 11 deletions

View File

@ -61,6 +61,7 @@ type ForRefChans e = ( Serialise ( PubKey 'Sign (Encryption e))
, Pretty (AsBase58 (PubKey 'Sign (Encryption e))) , Pretty (AsBase58 (PubKey 'Sign (Encryption e)))
, FromStringMaybe (PubKey 'Sign (Encryption e)) , FromStringMaybe (PubKey 'Sign (Encryption e))
, Serialise (Signature (Encryption e)) , Serialise (Signature (Encryption e))
, Hashable (PubKey 'Sign (Encryption e))
) )
instance ForRefChans e => Serialise (RefChanHeadBlock e) instance ForRefChans e => Serialise (RefChanHeadBlock e)
@ -106,6 +107,7 @@ data RefChanHead e =
instance ForRefChans e => Serialise (RefChanHead e) instance ForRefChans e => Serialise (RefChanHead e)
data RefChanHeadAdapter e m = data RefChanHeadAdapter e m =
RefChanHeadAdapter RefChanHeadAdapter
{ refChanHeadOnHead :: RefChanId e -> RefChanHeadBlockTran e -> m () { refChanHeadOnHead :: RefChanId e -> RefChanHeadBlockTran e -> m ()

View File

@ -992,6 +992,7 @@ runPeer opts = U.handle (\e -> myException e
Left{} -> err $ "can't read head block" <+> pretty h Left{} -> err $ "can't read head block" <+> pretty h
Right (SignedBox k _ _) -> do Right (SignedBox k _ _) -> do
let msg = RefChanHead k (RefChanHeadBlockTran (HashRef h)) let msg = RefChanHead k (RefChanHeadBlockTran (HashRef h))
refChanNotifyOnUpdated rce k
runResponseM me $ refChanHeadProto @e True refChanHeadAdapter msg runResponseM me $ refChanHeadProto @e True refChanHeadAdapter msg
let refChanHeadGetAction puk = do let refChanHeadGetAction puk = do

View File

@ -3,10 +3,11 @@
module RefChan ( module RefChan (
RefChanWorkerEnv(..) RefChanWorkerEnv(..)
, refChanWorkerEnvHeadQ , refChanWorkerEnvHeadQ
, refChaWorkerEnvDownload , refChanWorkerEnvDownload
, refChanOnHead , refChanOnHead
, refChanWorker , refChanWorker
, refChanWorkerEnv , refChanWorkerEnv
, refChanNotifyOnUpdated
) where ) where
import HBS2.Prelude.Plated import HBS2.Prelude.Plated
@ -14,12 +15,15 @@ import HBS2.Prelude.Plated
import HBS2.Actors.Peer import HBS2.Actors.Peer
import HBS2.Base58 import HBS2.Base58
import HBS2.Clock import HBS2.Clock
import HBS2.Net.Proto.Peer
import HBS2.Net.Proto.Sessions
import HBS2.Data.Detect import HBS2.Data.Detect
import HBS2.Data.Types.Refs import HBS2.Data.Types.Refs
import HBS2.Net.Auth.Credentials import HBS2.Net.Auth.Credentials
import HBS2.Net.Proto import HBS2.Net.Proto
import HBS2.Net.Proto.RefChan import HBS2.Net.Proto.RefChan
import HBS2.Net.Proto.Types import HBS2.Net.Proto.Types
import HBS2.Net.Proto.Definition()
import HBS2.Storage import HBS2.Storage
import HBS2.System.Logger.Simple import HBS2.System.Logger.Simple
@ -51,26 +55,33 @@ instance Exception DataNotReady
data RefChanWorkerEnv e = data RefChanWorkerEnv e =
RefChanWorkerEnv RefChanWorkerEnv
{ _refChanWorkerEnvDownload :: DownloadEnv e { _refChanWorkerEnvDEnv :: DownloadEnv e
, _refChanWorkerEnvHeadQ :: TQueue (RefChanId e, RefChanHeadBlockTran e) , _refChanWorkerEnvHeadQ :: TQueue (RefChanId e, RefChanHeadBlockTran e)
, _refChaWorkerEnvDownload :: TVar (HashMap HashRef (RefChanId e, TimeSpec)) , _refChanWorkerEnvDownload :: TVar (HashMap HashRef (RefChanId e, TimeSpec))
, _refChanWorkerEnvNotify :: TVar (HashMap (RefChanId e) ())
} }
makeLenses 'RefChanWorkerEnv makeLenses 'RefChanWorkerEnv
refChanWorkerEnv :: forall m e . MonadIO m refChanWorkerEnv :: forall m e . (MonadIO m, ForRefChans e)
=> PeerConfig => PeerConfig
-> DownloadEnv e -> DownloadEnv e
-> m (RefChanWorkerEnv e) -> m (RefChanWorkerEnv e)
refChanWorkerEnv _ de = liftIO $ RefChanWorkerEnv @e de <$> newTQueueIO refChanWorkerEnv _ de = liftIO $ RefChanWorkerEnv @e de <$> newTQueueIO
<*> newTVarIO mempty <*> newTVarIO mempty
<*> newTVarIO mempty
refChanOnHead :: MonadIO m => RefChanWorkerEnv e -> RefChanId e -> RefChanHeadBlockTran e -> m () refChanOnHead :: MonadIO m => RefChanWorkerEnv e -> RefChanId e -> RefChanHeadBlockTran e -> m ()
refChanOnHead env chan tran = do refChanOnHead env chan tran = do
atomically $ writeTQueue (view refChanWorkerEnvHeadQ env) (chan, tran) atomically $ writeTQueue (view refChanWorkerEnvHeadQ env) (chan, tran)
-- FIXME: leak-when-block-never-really-updated
refChanNotifyOnUpdated :: (MonadIO m, ForRefChans e) => RefChanWorkerEnv e -> RefChanId e -> m ()
refChanNotifyOnUpdated env chan = do
atomically $ modifyTVar (_refChanWorkerEnvNotify env) (HashMap.insert chan ())
refChanAddDownload :: forall e m . ( m ~ PeerM e IO refChanAddDownload :: forall e m . ( m ~ PeerM e IO
, MyPeer e , MyPeer e
, Block ByteString ~ ByteString , Block ByteString ~ ByteString
@ -79,10 +90,10 @@ refChanAddDownload :: forall e m . ( m ~ PeerM e IO
refChanAddDownload env chan r = do refChanAddDownload env chan r = do
penv <- ask penv <- ask
t <- getTimeCoarse t <- getTimeCoarse
withPeerM penv $ withDownload (_refChanWorkerEnvDownload env) withPeerM penv $ withDownload (_refChanWorkerEnvDEnv env)
$ processBlock @e (fromHashRef r) $ processBlock @e (fromHashRef r)
atomically $ modifyTVar (view refChaWorkerEnvDownload env) (HashMap.insert r (chan,t)) atomically $ modifyTVar (view refChanWorkerEnvDownload env) (HashMap.insert r (chan,t))
checkDownloaded :: forall m . (MonadIO m, HasStorage m, Block ByteString ~ ByteString) => HashRef -> m Bool checkDownloaded :: forall m . (MonadIO m, HasStorage m, Block ByteString ~ ByteString) => HashRef -> m Bool
checkDownloaded hr = do checkDownloaded hr = do
@ -120,6 +131,9 @@ refChanWorker :: forall e s m . ( MonadIO m
, MonadUnliftIO m , MonadUnliftIO m
, MyPeer e , MyPeer e
, HasStorage m , HasStorage m
, Request e (RefChanHead e) m
, HasProtocol e (RefChanHead e)
, Sessions e (KnownPeer e) m
, Signatures s , Signatures s
, s ~ Encryption e , s ~ Encryption e
, IsRefPubKey s , IsRefPubKey s
@ -133,20 +147,24 @@ refChanWorker :: forall e s m . ( MonadIO m
refChanWorker env = do refChanWorker env = do
hw <- async refChanHeadMon penv <- ask
-- FIXME: resume-on-exception
hw <- async (refChanHeadMon penv)
downloads <- async monitorDownloads downloads <- async monitorDownloads
forever do forever do
pause @'Seconds 10 pause @'Seconds 10
debug "I'm refchan worker" debug "I'm refchan worker"
mapM_ wait [hw,downloads] mapM_ waitCatch [hw,downloads]
where where
monitorDownloads = forever do monitorDownloads = forever do
pause @'Seconds 2 pause @'Seconds 2
all <- atomically $ readTVar (view refChaWorkerEnvDownload env) <&> HashMap.toList all <- atomically $ readTVar (view refChanWorkerEnvDownload env) <&> HashMap.toList
now <- getTimeCoarse now <- getTimeCoarse
@ -161,10 +179,11 @@ refChanWorker env = do
let expired = realToFrac (toNanoSecs $ now - t) / 1e9 > 600 let expired = realToFrac (toNanoSecs $ now - t) / 1e9 > 600
if expired then pure mempty else pure [(r,item)] if expired then pure mempty else pure [(r,item)]
atomically $ writeTVar (view refChaWorkerEnvDownload env) (HashMap.fromList (mconcat rest)) atomically $ writeTVar (view refChanWorkerEnvDownload env) (HashMap.fromList (mconcat rest))
-- FIXME: in-parallel? -- FIXME: in-parallel?
refChanHeadMon = do refChanHeadMon pe = liftIO $ withPeerM pe do
forever do forever do
(chan, RefChanHeadBlockTran hr) <- atomically $ readTQueue (view refChanWorkerEnvHeadQ env) (chan, RefChanHeadBlockTran hr) <- atomically $ readTQueue (view refChanWorkerEnvHeadQ env)
@ -179,6 +198,11 @@ refChanWorker env = do
lbs <- readBlob hr <&> fromMaybe mempty lbs <- readBlob hr <&> fromMaybe mempty
let what = unboxSignedBox @(RefChanHeadBlock e) @e lbs let what = unboxSignedBox @(RefChanHeadBlock e) @e lbs
notify <- atomically $ do
no <- readTVar (_refChanWorkerEnvNotify env) <&> HashMap.member chan
modifyTVar (_refChanWorkerEnvNotify env) (HashMap.delete chan)
pure no
case what of case what of
Nothing -> err $ "malformed head block" <+> pretty hr Nothing -> err $ "malformed head block" <+> pretty hr
@ -206,6 +230,13 @@ refChanWorker env = do
if v1 > v0 then do if v1 > v0 then do
debug $ "UPDATING HEAD BLOCK" <+> pretty (v1, v0) debug $ "UPDATING HEAD BLOCK" <+> pretty (v1, v0)
liftIO $ updateRef sto rkey (fromHashRef hr) liftIO $ updateRef sto rkey (fromHashRef hr)
-- если это мы сами его обновили - то неплохо бы
-- всем разослать уведомление. А как?
when notify do
debug $ "NOTIFY-ALL-HEAD-UPDATED" <+> pretty (AsBase58 pk) <+> pretty hr
broadCastMessage (RefChanHead @e pk (RefChanHeadBlockTran hr))
pure ()
else do else do
debug $ "LEAVING HEAD BLOCK" <+> pretty (v1, v0) debug $ "LEAVING HEAD BLOCK" <+> pretty (v1, v0)