From b3de2184532bf01138ab00fa8dc24a8fb76c8ce3 Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Wed, 13 Mar 2024 05:58:10 +0300 Subject: [PATCH] migrated poll table --- hbs2-peer/app/Brains.hs | 62 +++++++++++++++++++++++-------- hbs2-peer/app/LWWRef.hs | 46 +++++++++++++++++++++++ hbs2-peer/app/PeerMain.hs | 5 ++- hbs2-peer/app/PeerTypes.hs | 11 +++--- hbs2-peer/app/RefLog.hs | 6 +-- hbs2-peer/hbs2-peer.cabal | 1 + hbs2-peer/lib/HBS2/Peer/Brains.hs | 4 +- 7 files changed, 108 insertions(+), 27 deletions(-) create mode 100644 hbs2-peer/app/LWWRef.hs diff --git a/hbs2-peer/app/Brains.hs b/hbs2-peer/app/Brains.hs index d8f17202..739c8fc1 100644 --- a/hbs2-peer/app/Brains.hs +++ b/hbs2-peer/app/Brains.hs @@ -225,7 +225,7 @@ instance ( Hashable (Peer e) where sql = [qc| - insert into statedb.poll (ref,type,interval) + insert into {poll_table} (ref,type,interval) values (?,?,?) on conflict do update set interval = excluded.interval |] @@ -236,7 +236,7 @@ instance ( Hashable (Peer e) liftIO $ execute conn sql (Only (show $ pretty (AsBase58 r))) where sql = [qc| - delete from statedb.poll + delete from {poll_table} where ref = ? |] @@ -245,21 +245,21 @@ instance ( Hashable (Peer e) let conn = view brainsDb brains case mtp of Nothing -> postprocess <$> - query_ conn [qc|select ref, type, interval from statedb.poll|] + query_ conn [qc|select ref, type, interval from {poll_table}|] Just tp -> postprocess <$> query conn [qc|select ref, type, interval from statedb.poll where type = ?|] (Only tp) where postprocess = mapMaybe (\(r,t,i) -> (,t,i) <$> fromStringMay r ) - isPolledRef brains ref = do + isPolledRef brains tp ref = do liftIO do let conn = view brainsDb brains query @_ @(Only Int) conn [qc| - select 1 from statedb.poll - where ref = ? + select 1 from {poll_table} + where ref = ? and type = ? limit 1 - |] ( Only ( show $ pretty (AsBase58 ref) ) ) + |] ( show $ pretty (AsBase58 ref), tp ) <&> isJust . listToMaybe setSeen brains w ts = do @@ -718,6 +718,8 @@ insertPexInfo br peers = liftIO do |] (Only (show $ pretty p)) +{- HLINT ignore "Functor law" -} + selectPexInfo :: forall e . (e ~ L4Proto) => BasicBrains e -> IO [PeerAddr e] @@ -730,6 +732,18 @@ selectPexInfo br = liftIO do |] <&> fmap (fromStringMay . fromOnly) <&> catMaybes +tableExists :: Connection -> Maybe String -> String -> IO Bool +tableExists conn prefix' tableName = do + let sql = [qc| + SELECT name FROM {prefix}.sqlite_master WHERE type='table' AND name=? + |] + r <- query conn sql (Only tableName) :: IO [Only String] + pure $ not $ null r + + where + prefix = fromMaybe "main" prefix' + + -- FIXME: eventually-close-db newBasicBrains :: forall e m . (Hashable (Peer e), MonadIO m) => PeerConfig @@ -836,14 +850,26 @@ newBasicBrains cfg = liftIO do ) |] - execute_ conn [qc| - create table if not exists statedb.poll - ( ref text not null - , type text not null - , interval int not null - , primary key (ref) - ) - |] + poll_1 <- tableExists conn (Just "statedb") "poll_1" + poll_0 <- tableExists conn (Just "statedb") "poll" + + unless poll_1 do + debug $ red "BRAINS: CREATE poll_1" + execute_ conn [qc| + create table if not exists statedb.poll_1 + ( ref text not null + , type text not null + , interval int not null + , primary key (ref,type) + ) + |] + + when poll_0 do + debug $ red "BRAINS: FILL poll_1" + execute_ conn [qc| + insert into statedb.poll_1 (ref,type,interval) + select ref,type,interval from statedb.poll; + |] execute_ conn [qc| create table if not exists peer_asymmkey @@ -879,6 +905,10 @@ data PeerDownloadsDelOnStart instance Monad m => HasCfgKey PeerDownloadsDelOnStart b m where key = "downloads-del-on-start" +{- HLINT ignore "Use camelCase" -} +poll_table :: String +poll_table = "statedb.poll_1" + runBasicBrains :: forall e m . ( e ~ L4Proto , MonadUnliftIO m , ForRefChans e @@ -945,7 +975,7 @@ runBasicBrains cfg brains = do updateOP brains $ do let conn = view brainsDb brains liftIO $ execute conn [qc| - insert into statedb.poll (ref,type,interval) + insert into {poll_table} (ref,type,interval) values (?,?,?) on conflict do update set interval = excluded.interval |] (show $ pretty (AsBase58 x), show $ pretty t, mi) diff --git a/hbs2-peer/app/LWWRef.hs b/hbs2-peer/app/LWWRef.hs new file mode 100644 index 00000000..eaac7a3e --- /dev/null +++ b/hbs2-peer/app/LWWRef.hs @@ -0,0 +1,46 @@ +module LWWRef where + +import HBS2.Prelude.Plated +import HBS2.Clock +import HBS2.Actors.Peer +import HBS2.Events +import HBS2.Data.Types.Refs +import HBS2.Data.Detect +import HBS2.Net.PeerLocator +import HBS2.Net.Proto +import HBS2.Base58 +import HBS2.Storage +import HBS2.Storage.Operations.Missed +import HBS2.Hash +import HBS2.Peer.Proto +import HBS2.Net.Auth.Credentials +import HBS2.Merkle + +import HBS2.Misc.PrettyStuff + +import Brains +import PeerConfig +import PeerTypes + +import Control.Monad +import UnliftIO + +lwwRefWorker :: forall e s m . ( MonadIO m + , MonadUnliftIO m + , MyPeer e + , HasStorage m + , Sessions e (KnownPeer e) m + , Signatures s + , s ~ Encryption e + , IsRefPubKey s + ) + => PeerConfig + -> SomeBrains e + -> m () + +lwwRefWorker conf brains = do + forever do + debug $ yellow "lwwRefWorker" + pause @'Seconds 20 + + diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 646e86ad..fbed91b3 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -48,6 +48,7 @@ import Bootstrap import CheckMetrics import RefLog qualified import RefLog (reflogWorker) +import LWWRef (lwwRefWorker) import HttpWorker import DispatchProxy import PeerMeta @@ -819,7 +820,7 @@ runPeer opts = Exception.handle (\e -> myException e let refChanAdapter = RefChanAdapter { refChanOnHead = refChanOnHeadFn rce - , refChanSubscribed = isPolledRef @e brains + , refChanSubscribed = isPolledRef @e brains "refchan" , refChanWriteTran = refChanWriteTranFn rce , refChanValidatePropose = refChanValidateTranFn @e rce @@ -1036,6 +1037,8 @@ runPeer opts = Exception.handle (\e -> myException e peerThread "refChanNotifyLogWorker" (refChanNotifyLogWorker @e conf (SomeBrains brains)) + peerThread "lwwRefWorker" (lwwRefWorker @e conf (SomeBrains brains)) + liftIO $ withPeerM penv do runProto @e [ makeResponse (blockSizeProto blk (downloadOnBlockSize denv) onNoBlock) diff --git a/hbs2-peer/app/PeerTypes.hs b/hbs2-peer/app/PeerTypes.hs index 2bf08a35..f422fb5c 100644 --- a/hbs2-peer/app/PeerTypes.hs +++ b/hbs2-peer/app/PeerTypes.hs @@ -486,12 +486,13 @@ simpleBlockAnnounce size h = do class IsPolledKey e proto | proto -> e where - getPolledKey :: proto -> PubKey 'Sign (Encryption e) + getPolledKey :: proto -> (String, PubKey 'Sign (Encryption e)) instance IsPolledKey e (LWWRefProto e) where getPolledKey = \case - LWWRefProto1 (LWWProtoGet (LWWRefKey k)) -> k - LWWRefProto1 (LWWProtoSet (LWWRefKey k) _) -> k + LWWRefProto1 (LWWProtoGet (LWWRefKey k)) -> (tp,k) + LWWRefProto1 (LWWProtoSet (LWWRefKey k) _) -> (tp,k) + where tp = "lwwref" subscribed :: forall e proto m . ( MonadIO m , IsPolledKey e proto @@ -505,8 +506,8 @@ subscribed :: forall e proto m . ( MonadIO m -> m () subscribed brains f req = do - let ref = getPolledKey req - polled <- isPolledRef @e brains ref + let (tp,ref) = getPolledKey req + polled <- isPolledRef @e brains tp ref when polled $ f req authorized :: forall e proto m . ( MonadIO m diff --git a/hbs2-peer/app/RefLog.hs b/hbs2-peer/app/RefLog.hs index 2205e7e4..d26bd941 100644 --- a/hbs2-peer/app/RefLog.hs +++ b/hbs2-peer/app/RefLog.hs @@ -65,7 +65,7 @@ mkRefLogRequestAdapter :: forall e s m . ( MonadIO m => SomeBrains e -> m (RefLogRequestI e (ResponseM e m )) mkRefLogRequestAdapter brains = do sto <- getStorage - pure $ RefLogRequestI (doOnRefLogRequest brains sto) dontHandle (isPolledRef @e brains) + pure $ RefLogRequestI (doOnRefLogRequest brains sto) dontHandle (isPolledRef @e brains "reflog") doOnRefLogRequest :: forall e s m . ( MonadIO m , MyPeer e @@ -78,7 +78,7 @@ doOnRefLogRequest :: forall e s m . ( MonadIO m -> m (Maybe (Hash HbSync)) doOnRefLogRequest brains sto (_,pk) = runMaybeT do - isPolledRef @e brains pk >>= guard + isPolledRef @e brains "reflog" pk >>= guard ref <- liftIO $ getRef sto (RefLogKey @s pk) when (isNothing ref) do warn $ "missed reflog value" <+> pretty ref @@ -150,7 +150,7 @@ reflogWorker conf brains adapter = do subscribe @e RefLogUpdateEvKey $ \(RefLogUpdateEvData (reflog,v, mpip)) -> do trace $ "reflog worker.got refupdate" <+> pretty (AsBase58 reflog) - polled <- isPolledRef @e brains reflog + polled <- isPolledRef @e brains "reflog" reflog buddy <- maybe1 mpip (pure False) $ \pip -> do pa <- toPeerAddr @e pip acceptAnnouncesFromPeer @e conf pa diff --git a/hbs2-peer/hbs2-peer.cabal b/hbs2-peer/hbs2-peer.cabal index 47cc753c..1af60d57 100644 --- a/hbs2-peer/hbs2-peer.cabal +++ b/hbs2-peer/hbs2-peer.cabal @@ -265,6 +265,7 @@ executable hbs2-peer , RefLog , RefChan , RefChanNotifyLog + , LWWRef , CheckMetrics , HttpWorker , Brains diff --git a/hbs2-peer/lib/HBS2/Peer/Brains.hs b/hbs2-peer/lib/HBS2/Peer/Brains.hs index caf37474..49a1caa2 100644 --- a/hbs2-peer/lib/HBS2/Peer/Brains.hs +++ b/hbs2-peer/lib/HBS2/Peer/Brains.hs @@ -18,8 +18,8 @@ class HasBrains e a where listPolledRefs :: MonadIO m => a -> Maybe String -> m [(PubKey 'Sign (Encryption e), String, Int)] listPolledRefs _ _ = pure mempty - isPolledRef :: MonadIO m => a -> PubKey 'Sign (Encryption e) -> m Bool - isPolledRef _ _ = pure False + isPolledRef :: MonadIO m => a -> String -> PubKey 'Sign (Encryption e) -> m Bool + isPolledRef _ _ _ = pure False delPolledRef :: MonadIO m => a -> PubKey 'Sign (Encryption e) -> m () delPolledRef _ _ = pure ()