wip, check if sqlite blocks

This commit is contained in:
Dmitry Zuikov 2024-03-17 05:35:21 +03:00
parent c869bd58f2
commit 9546a440ea
1 changed files with 33 additions and 16 deletions

View File

@ -38,6 +38,8 @@ import Data.Cache qualified as Cache
import Data.Either import Data.Either
import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HashMap import Data.HashMap.Strict qualified as HashMap
import Data.HashSet (HashSet)
import Data.HashSet qualified as HashSet
import Data.List qualified as List import Data.List qualified as List
import Data.Maybe import Data.Maybe
import Data.Text qualified as Text import Data.Text qualified as Text
@ -82,6 +84,7 @@ data BasicBrains e =
, _brainsCommit :: TQueue CommitCmd , _brainsCommit :: TQueue CommitCmd
, _brainsDelDownload :: TQueue (Hash HbSync) , _brainsDelDownload :: TQueue (Hash HbSync)
, _brainsSizeCache :: Cache (Peer e, Hash HbSync) Integer , _brainsSizeCache :: Cache (Peer e, Hash HbSync) Integer
, _brainsPolled :: TVar (HashSet (PubKey 'Sign (Encryption e), String))
} }
makeLenses 'BasicBrains makeLenses 'BasicBrains
@ -96,6 +99,7 @@ cleanupPostponed b h = do
instance ( Hashable (Peer e) instance ( Hashable (Peer e)
, Pretty (Peer e), Pretty (PeerAddr e) , Pretty (Peer e), Pretty (PeerAddr e)
, Pretty (AsBase58 (PubKey 'Sign (Encryption e))) , Pretty (AsBase58 (PubKey 'Sign (Encryption e)))
, Hashable (PubKey 'Sign (Encryption e))
, e ~ L4Proto , e ~ L4Proto
, ForRefChans e , ForRefChans e
) => HasBrains e (BasicBrains e) where ) => HasBrains e (BasicBrains e) where
@ -103,14 +107,14 @@ instance ( Hashable (Peer e)
onClientTCPConnected br pa@(L4Address proto _) ssid = do onClientTCPConnected br pa@(L4Address proto _) ssid = do
debug $ "BRAINS: onClientTCPConnected" <+> pretty proto <+> pretty pa <+> pretty ssid debug $ "BRAINS: onClientTCPConnected" <+> pretty proto <+> pretty pa <+> pretty ssid
updateOP br $ insertClientTCP br pa ssid updateOP br $ insertClientTCP br pa ssid
commitNow br True commitNow br False
getClientTCP br = liftIO (selectClientTCP br) getClientTCP br = liftIO (selectClientTCP br)
setActiveTCPSessions br ssids = do setActiveTCPSessions br ssids = do
trace $ "BRAINS: setActiveTCPSessions" <+> pretty ssids trace $ "BRAINS: setActiveTCPSessions" <+> pretty ssids
updateOP br $ updateTCPSessions br ssids updateOP br $ updateTCPSessions br ssids
commitNow br True commitNow br False
listTCPPexCandidates = liftIO . selectTCPPexCandidates listTCPPexCandidates = liftIO . selectTCPPexCandidates
@ -134,7 +138,7 @@ instance ( Hashable (Peer e)
forM_ ps $ \pip -> do forM_ ps $ \pip -> do
pa <- toPeerAddr pip pa <- toPeerAddr pip
insertKnownPeer br pa insertKnownPeer br pa
commitNow br True commitNow br False
onBlockSize b p h size = do onBlockSize b p h size = do
liftIO $ Cache.insert (_brainsSizeCache b) (p,h) size liftIO $ Cache.insert (_brainsSizeCache b) (p,h) size
@ -217,12 +221,12 @@ instance ( Hashable (Peer e)
addPolledRef brains r s i = do addPolledRef brains r s i = do
liftIO $ atomically $ modifyTVar (_brainsPolled brains) (HashSet.insert (r,s))
updateOP brains $ do updateOP brains $ do
let conn = view brainsDb brains let conn = view brainsDb brains
liftIO $ execute conn sql (show $ pretty (AsBase58 r), s, i) liftIO $ execute conn sql (show $ pretty (AsBase58 r), s, i)
commitNow brains True
where where
sql = [qc| sql = [qc|
insert into {poll_table} (ref,type,interval) insert into {poll_table} (ref,type,interval)
@ -253,14 +257,24 @@ instance ( Hashable (Peer e)
postprocess = mapMaybe (\(r,t,i) -> (,t,i) <$> fromStringMay r ) postprocess = mapMaybe (\(r,t,i) -> (,t,i) <$> fromStringMay r )
isPolledRef brains tp ref = do isPolledRef brains tp ref = do
liftIO do
let conn = view brainsDb brains cached <- liftIO $ readTVarIO (_brainsPolled brains) <&> HashSet.member (ref,tp)
query @_ @(Only Int) conn [qc|
select 1 from {poll_table} if cached then
where ref = ? and type = ? pure True
limit 1 else do
|] ( show $ pretty (AsBase58 ref), tp )
<&> isJust . listToMaybe r <- liftIO do
let conn = view brainsDb brains
query @_ @(Only Int) conn [qc|
select 1 from {poll_table}
where ref = ? and type = ?
limit 1
|] ( show $ pretty (AsBase58 ref), tp )
<&> isJust . listToMaybe
liftIO $ atomically $ modifyTVar (_brainsPolled brains) (HashSet.insert (ref,tp))
pure r
setSeen brains w ts = do setSeen brains w ts = do
utc <- liftIO getCurrentTime <&> addUTCTime ts utc <- liftIO getCurrentTime <&> addUTCTime ts
@ -745,7 +759,10 @@ tableExists conn prefix' tableName = do
-- FIXME: eventually-close-db -- FIXME: eventually-close-db
newBasicBrains :: forall e m . (Hashable (Peer e), MonadIO m) newBasicBrains :: forall e m . ( Hashable (Peer e)
, Hashable (PubKey 'Sign (Encryption e))
, MonadIO m
)
=> PeerConfig => PeerConfig
-> m (BasicBrains e) -> m (BasicBrains e)
@ -898,7 +915,7 @@ newBasicBrains cfg = liftIO do
<*> newTQueueIO <*> newTQueueIO
<*> newTQueueIO <*> newTQueueIO
<*> Cache.newCache (Just (toTimeSpec (1200:: Timeout 'Seconds))) <*> Cache.newCache (Just (toTimeSpec (1200:: Timeout 'Seconds)))
<*> newTVarIO mempty
data PeerDownloadsDelOnStart data PeerDownloadsDelOnStart
@ -961,7 +978,7 @@ runBasicBrains cfg brains = do
when (delDowns == FeatureOn ) do when (delDowns == FeatureOn ) do
debug $ yellow "CLEAN ALL DOWNLOADS" debug $ yellow "CLEAN ALL DOWNLOADS"
updateOP brains (delAllDownloads brains) updateOP brains (delAllDownloads brains)
commitNow brains True commitNow brains False
let polls = catMaybes ( let polls = catMaybes (
[ (tp,n,) <$> fromStringMay @(PubKey 'Sign (Encryption e)) (Text.unpack ref) [ (tp,n,) <$> fromStringMay @(PubKey 'Sign (Encryption e)) (Text.unpack ref)