diff --git a/hbs2-peer/app/Brains.hs b/hbs2-peer/app/Brains.hs index 92807160..bc12c360 100644 --- a/hbs2-peer/app/Brains.hs +++ b/hbs2-peer/app/Brains.hs @@ -38,6 +38,8 @@ import Data.Cache qualified as Cache import Data.Either import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict qualified as HashMap +import Data.HashSet (HashSet) +import Data.HashSet qualified as HashSet import Data.List qualified as List import Data.Maybe import Data.Text qualified as Text @@ -82,6 +84,7 @@ data BasicBrains e = , _brainsCommit :: TQueue CommitCmd , _brainsDelDownload :: TQueue (Hash HbSync) , _brainsSizeCache :: Cache (Peer e, Hash HbSync) Integer + , _brainsPolled :: TVar (HashSet (PubKey 'Sign (Encryption e), String)) } makeLenses 'BasicBrains @@ -96,6 +99,7 @@ cleanupPostponed b h = do instance ( Hashable (Peer e) , Pretty (Peer e), Pretty (PeerAddr e) , Pretty (AsBase58 (PubKey 'Sign (Encryption e))) + , Hashable (PubKey 'Sign (Encryption e)) , e ~ L4Proto , ForRefChans e ) => HasBrains e (BasicBrains e) where @@ -103,14 +107,14 @@ instance ( Hashable (Peer e) onClientTCPConnected br pa@(L4Address proto _) ssid = do debug $ "BRAINS: onClientTCPConnected" <+> pretty proto <+> pretty pa <+> pretty ssid updateOP br $ insertClientTCP br pa ssid - commitNow br True + commitNow br False getClientTCP br = liftIO (selectClientTCP br) setActiveTCPSessions br ssids = do trace $ "BRAINS: setActiveTCPSessions" <+> pretty ssids updateOP br $ updateTCPSessions br ssids - commitNow br True + commitNow br False listTCPPexCandidates = liftIO . selectTCPPexCandidates @@ -134,7 +138,7 @@ instance ( Hashable (Peer e) forM_ ps $ \pip -> do pa <- toPeerAddr pip insertKnownPeer br pa - commitNow br True + commitNow br False onBlockSize b p h size = do liftIO $ Cache.insert (_brainsSizeCache b) (p,h) size @@ -217,12 +221,12 @@ instance ( Hashable (Peer e) addPolledRef brains r s i = do + liftIO $ atomically $ modifyTVar (_brainsPolled brains) (HashSet.insert (r,s)) + updateOP brains $ do let conn = view brainsDb brains liftIO $ execute conn sql (show $ pretty (AsBase58 r), s, i) - commitNow brains True - where sql = [qc| insert into {poll_table} (ref,type,interval) @@ -253,14 +257,24 @@ instance ( Hashable (Peer e) postprocess = mapMaybe (\(r,t,i) -> (,t,i) <$> fromStringMay r ) isPolledRef brains tp ref = do - liftIO do - let conn = view brainsDb brains - query @_ @(Only Int) conn [qc| - select 1 from {poll_table} - where ref = ? and type = ? - limit 1 - |] ( show $ pretty (AsBase58 ref), tp ) - <&> isJust . listToMaybe + + cached <- liftIO $ readTVarIO (_brainsPolled brains) <&> HashSet.member (ref,tp) + + if cached then + pure True + else do + + r <- liftIO do + let conn = view brainsDb brains + query @_ @(Only Int) conn [qc| + select 1 from {poll_table} + where ref = ? and type = ? + limit 1 + |] ( show $ pretty (AsBase58 ref), tp ) + <&> isJust . listToMaybe + + liftIO $ atomically $ modifyTVar (_brainsPolled brains) (HashSet.insert (ref,tp)) + pure r setSeen brains w ts = do utc <- liftIO getCurrentTime <&> addUTCTime ts @@ -745,7 +759,10 @@ tableExists conn prefix' tableName = do -- FIXME: eventually-close-db -newBasicBrains :: forall e m . (Hashable (Peer e), MonadIO m) +newBasicBrains :: forall e m . ( Hashable (Peer e) + , Hashable (PubKey 'Sign (Encryption e)) + , MonadIO m + ) => PeerConfig -> m (BasicBrains e) @@ -898,7 +915,7 @@ newBasicBrains cfg = liftIO do <*> newTQueueIO <*> newTQueueIO <*> Cache.newCache (Just (toTimeSpec (1200:: Timeout 'Seconds))) - + <*> newTVarIO mempty data PeerDownloadsDelOnStart @@ -961,7 +978,7 @@ runBasicBrains cfg brains = do when (delDowns == FeatureOn ) do debug $ yellow "CLEAN ALL DOWNLOADS" updateOP brains (delAllDownloads brains) - commitNow brains True + commitNow brains False let polls = catMaybes ( [ (tp,n,) <$> fromStringMay @(PubKey 'Sign (Encryption e)) (Text.unpack ref)