diff --git a/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs b/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs index c2f0b19e..783d2695 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs @@ -105,7 +105,8 @@ instance ForRefChans e => Serialise (RefChanHead e) data RefChanHeadAdapter e m = RefChanHeadAdapter - { refChanHeadOnHead :: RefChanHeadBlockTran e -> m () + { refChanHeadOnHead :: RefChanId e -> RefChanHeadBlockTran e -> m () + , refChanHeadSubscribed :: RefChanId e -> m Bool } refChanHeadProto :: forall e s m . ( MonadIO m @@ -135,9 +136,10 @@ refChanHeadProto self adapter msg = do case msg of RefChanHead chan pkt -> do + guard =<< lift (refChanHeadSubscribed adapter chan) trace $ "RefChanHead" <+> pretty self <+> pretty (AsBase58 chan) -- FIXME: check-chan-is-listened - lift $ refChanHeadOnHead adapter pkt + lift $ refChanHeadOnHead adapter chan pkt RefChanGetHead _ -> do -- прочитать ссылку diff --git a/hbs2-peer/app/Brains.hs b/hbs2-peer/app/Brains.hs index 9f880c28..d28dbb65 100644 --- a/hbs2-peer/app/Brains.hs +++ b/hbs2-peer/app/Brains.hs @@ -5,33 +5,37 @@ module Brains where import HBS2.Prelude.Plated import HBS2.Clock +import HBS2.Data.Types.Refs import HBS2.Net.Proto import HBS2.Hash +import HBS2.Base58 import HBS2.Net.IP.Addr +import HBS2.Net.Auth.Credentials import HBS2.System.Logger.Simple import PeerConfig -import Data.Maybe -import Control.Monad -import Control.Exception import Control.Concurrent.STM -import Control.Concurrent.Async -import Lens.Micro.Platform -import Data.HashMap.Strict qualified as HashMap -import Data.HashMap.Strict (HashMap) -import Data.List qualified as List -import Data.Cache (Cache) -import Data.Cache qualified as Cache -import Text.InterpolatedString.Perl6 (qc) +import Control.Exception +import Control.Monad import Database.SQLite.Simple import Database.SQLite.Simple.FromField -import System.Random (randomRIO) -import Data.Word +import Data.Cache (Cache) +import Data.Cache qualified as Cache import Data.Either +import Data.HashMap.Strict (HashMap) +import Data.HashMap.Strict qualified as HashMap +import Data.List qualified as List +import Data.Maybe +import Data.Text qualified as Text +import Data.Word +import Lens.Micro.Platform import System.Directory import System.FilePath +import System.Random (randomRIO) +import Text.InterpolatedString.Perl6 (qc) +import UnliftIO (MonadUnliftIO(..),async,race) data PeerBrainsDb @@ -40,6 +44,12 @@ instance HasCfgKey PeerBrainsDb (Maybe String) where class HasBrains e a where + listPolledRefs :: MonadIO m => a -> String -> m [(PubKey 'Sign (Encryption e), Int)] + listPolledRefs _ _ = pure mempty + + isPolledRef :: MonadIO m => a -> PubKey 'Sign (Encryption e) -> m Bool + isPolledRef _ _ = pure False + onClientTCPConnected :: MonadIO m => a -> PeerAddr e -> Word64 -> m () onClientTCPConnected _ _ = const none @@ -148,6 +158,8 @@ instance Pretty (Peer e) => HasBrains e NoBrains where data SomeBrains e = forall a . HasBrains e a => SomeBrains a instance HasBrains e (SomeBrains e) where + listPolledRefs (SomeBrains a) = listPolledRefs @e a + isPolledRef (SomeBrains a) = isPolledRef @e a onClientTCPConnected (SomeBrains a) = onClientTCPConnected @e a getClientTCP (SomeBrains a) = getClientTCP @e a setActiveTCPSessions (SomeBrains a) = setActiveTCPSessions @e a @@ -279,6 +291,23 @@ instance ( Hashable (Peer e) setReflogProcessed b h = do updateOP b $ insertReflogProcessed b h + listPolledRefs brains tp = do + liftIO $ do + let conn = view brainsDb brains + query conn [qc|select ref, interval from poll where type = ?|] (Only tp) + <&> fmap (\(r,i) -> (,i) <$> fromStringMay r ) + <&> catMaybes + + isPolledRef brains ref = do + liftIO do + let conn = view brainsDb brains + query @_ @(Only Int) conn [qc| + select 1 from poll + where ref = ? + limit 1 + |] ( Only ( show $ pretty (AsBase58 ref) ) ) + <&> isJust . listToMaybe + commitNow :: forall e m . MonadIO m => BasicBrains e -> Bool @@ -690,6 +719,16 @@ newBasicBrains cfg = liftIO do ) |] + execute_ conn [qc| + create table if not exists poll + ( ref text not null + , type text not null + , interval int not null + , primary key (ref,type) + ) + |] + + BasicBrains <$> newTVarIO mempty <*> newTVarIO mempty <*> Cache.newCache (Just (toTimeSpec (30 :: Timeout 'Seconds))) @@ -697,8 +736,12 @@ newBasicBrains cfg = liftIO do <*> newTQueueIO <*> newTQueueIO -runBasicBrains :: MonadIO m => BasicBrains e -> m () -runBasicBrains brains = do +runBasicBrains :: forall e m . ( e ~ L4Proto, MonadUnliftIO m ) + => PeerConfig + -> BasicBrains e + -> m () + +runBasicBrains cfg brains = do let pip = view brainsPipeline brains let expire = view brainsExpire brains @@ -725,6 +768,27 @@ runBasicBrains brains = do pause @'Seconds 60 updateOP brains (cleanupHashes brains) + trace "runBasicBrains init" + + let (PeerConfig syn) = cfg + let polls = catMaybes ( + [ (tp,n,) <$> fromStringMay @(PubKey 'Sign (Encryption e)) (Text.unpack ref) + | ListVal @C (Key "poll" [SymbolVal tp, LitIntVal n, LitStrVal ref]) <- syn + ] ) + + void $ async $ do + pause @'Seconds 5 + forM_ polls $ \(t,mi,x) -> do + trace $ "BRAINS: poll" <+> pretty t <+> pretty (AsBase58 x) <+> pretty mi + updateOP brains $ do + let conn = view brainsDb brains + liftIO $ execute conn [qc| + insert into poll (ref,type,interval) + values (?,?,?) + on conflict do update set interval = excluded.interval + |] (show $ pretty (AsBase58 x), show $ pretty t, mi) + commitNow brains True + void $ forever do pause @'Seconds 15 ee <- liftIO $ Cache.toList expire diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 538a01ba..e1f1fd35 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -533,7 +533,7 @@ runPeer opts = U.handle (\e -> myException e brains <- newBasicBrains @e conf - brainsThread <- async $ runBasicBrains brains + brainsThread <- async $ runBasicBrains conf brains denv <- newDownloadEnv brains @@ -564,6 +564,7 @@ runPeer opts = U.handle (\e -> myException e let refChanHeadAdapter = RefChanHeadAdapter { refChanHeadOnHead = refChanOnHead rce + , refChanHeadSubscribed = isPolledRef @e brains } let pexFilt pips = do diff --git a/hbs2-peer/app/RefChan.hs b/hbs2-peer/app/RefChan.hs index a8c72470..10049415 100644 --- a/hbs2-peer/app/RefChan.hs +++ b/hbs2-peer/app/RefChan.hs @@ -52,8 +52,8 @@ instance Exception DataNotReady data RefChanWorkerEnv e = RefChanWorkerEnv { _refChanWorkerEnvDownload :: DownloadEnv e - , _refChanWorkerEnvHeadQ :: TQueue (RefChanHeadBlockTran e) - , _refChaWorkerEnvDownload :: TVar (HashMap HashRef TimeSpec) -- таймстемп можно + , _refChanWorkerEnvHeadQ :: TQueue (RefChanId e, RefChanHeadBlockTran e) + , _refChaWorkerEnvDownload :: TVar (HashMap HashRef (RefChanId e, TimeSpec)) } makeLenses 'RefChanWorkerEnv @@ -67,22 +67,22 @@ refChanWorkerEnv _ de = liftIO $ RefChanWorkerEnv @e de <$> newTQueueIO <*> newTVarIO mempty -refChanOnHead :: MonadIO m => RefChanWorkerEnv e -> RefChanHeadBlockTran e -> m () -refChanOnHead env tran = do - atomically $ writeTQueue (view refChanWorkerEnvHeadQ env) tran +refChanOnHead :: MonadIO m => RefChanWorkerEnv e -> RefChanId e -> RefChanHeadBlockTran e -> m () +refChanOnHead env chan tran = do + atomically $ writeTQueue (view refChanWorkerEnvHeadQ env) (chan, tran) refChanAddDownload :: forall e m . ( m ~ PeerM e IO , MyPeer e , Block ByteString ~ ByteString ) - => RefChanWorkerEnv e -> HashRef -> m () -refChanAddDownload env r = do + => RefChanWorkerEnv e -> RefChanId e -> HashRef -> m () +refChanAddDownload env chan r = do penv <- ask t <- getTimeCoarse withPeerM penv $ withDownload (_refChanWorkerEnvDownload env) $ processBlock @e (fromHashRef r) - atomically $ modifyTVar (view refChaWorkerEnvDownload env) (HashMap.insert r t) + atomically $ modifyTVar (view refChaWorkerEnvDownload env) (HashMap.insert r (chan,t)) checkDownloaded :: forall m . (MonadIO m, HasStorage m, Block ByteString ~ ByteString) => HashRef -> m Bool checkDownloaded hr = do @@ -146,28 +146,27 @@ refChanWorker env = do now <- getTimeCoarse -- FIXME: consider-timeouts-or-leak-is-possible - rest <- forM all $ \(r,t) -> do + rest <- forM all $ \(r,item@(chan,t)) -> do here <- checkDownloaded r if here then do - refChanOnHead env (RefChanHeadBlockTran r) + refChanOnHead env chan (RefChanHeadBlockTran r) pure mempty else do -- FIXME: fix-timeout-hardcode let expired = realToFrac (toNanoSecs $ now - t) / 1e9 > 600 - if expired then pure mempty else pure [(r,t)] + if expired then pure mempty else pure [(r,item)] atomically $ writeTVar (view refChaWorkerEnvDownload env) (HashMap.fromList (mconcat rest)) -- FIXME: in-parallel? refChanHeadMon = do forever do - RefChanHeadBlockTran hr <- atomically $ readTQueue (view refChanWorkerEnvHeadQ env) - -- debug $ "DROP HEAD UPDATE" <+> pretty (fromRefChanHeadBlockTran tran) + (chan, RefChanHeadBlockTran hr) <- atomically $ readTQueue (view refChanWorkerEnvHeadQ env) here <- checkDownloaded hr if not here then do - refChanAddDownload env hr + refChanAddDownload env chan hr trace $ "BLOCK IS NOT HERE" <+> pretty hr else do trace $ "BLOCK IS HERE" <+> pretty hr @@ -178,7 +177,7 @@ refChanWorker env = do case what of Nothing -> err $ "malformed head block" <+> pretty hr - Just (pk,blk) -> do + Just (pk,blk) | pk == chan -> do let rkey = RefChanHeadKey @s pk sto <- getStorage @@ -205,6 +204,8 @@ refChanWorker env = do else do debug $ "LEAVING HEAD BLOCK" <+> pretty (v1, v0) + _ -> debug "not subscribed to this refchan" + pure () -- распаковываем блок -- вытаскиваем ключ из блока?