migrated poll table

This commit is contained in:
Dmitry Zuikov 2024-03-13 05:58:10 +03:00
parent a22c530952
commit b3de218453
7 changed files with 108 additions and 27 deletions

View File

@ -225,7 +225,7 @@ instance ( Hashable (Peer e)
where where
sql = [qc| sql = [qc|
insert into statedb.poll (ref,type,interval) insert into {poll_table} (ref,type,interval)
values (?,?,?) values (?,?,?)
on conflict do update set interval = excluded.interval on conflict do update set interval = excluded.interval
|] |]
@ -236,7 +236,7 @@ instance ( Hashable (Peer e)
liftIO $ execute conn sql (Only (show $ pretty (AsBase58 r))) liftIO $ execute conn sql (Only (show $ pretty (AsBase58 r)))
where where
sql = [qc| sql = [qc|
delete from statedb.poll delete from {poll_table}
where ref = ? where ref = ?
|] |]
@ -245,21 +245,21 @@ instance ( Hashable (Peer e)
let conn = view brainsDb brains let conn = view brainsDb brains
case mtp of case mtp of
Nothing -> postprocess <$> Nothing -> postprocess <$>
query_ conn [qc|select ref, type, interval from statedb.poll|] query_ conn [qc|select ref, type, interval from {poll_table}|]
Just tp -> postprocess <$> Just tp -> postprocess <$>
query conn [qc|select ref, type, interval from statedb.poll where type = ?|] (Only tp) query conn [qc|select ref, type, interval from statedb.poll where type = ?|] (Only tp)
where where
postprocess = mapMaybe (\(r,t,i) -> (,t,i) <$> fromStringMay r ) postprocess = mapMaybe (\(r,t,i) -> (,t,i) <$> fromStringMay r )
isPolledRef brains ref = do isPolledRef brains tp ref = do
liftIO do liftIO do
let conn = view brainsDb brains let conn = view brainsDb brains
query @_ @(Only Int) conn [qc| query @_ @(Only Int) conn [qc|
select 1 from statedb.poll select 1 from {poll_table}
where ref = ? where ref = ? and type = ?
limit 1 limit 1
|] ( Only ( show $ pretty (AsBase58 ref) ) ) |] ( show $ pretty (AsBase58 ref), tp )
<&> isJust . listToMaybe <&> isJust . listToMaybe
setSeen brains w ts = do setSeen brains w ts = do
@ -718,6 +718,8 @@ insertPexInfo br peers = liftIO do
|] (Only (show $ pretty p)) |] (Only (show $ pretty p))
{- HLINT ignore "Functor law" -}
selectPexInfo :: forall e . (e ~ L4Proto) selectPexInfo :: forall e . (e ~ L4Proto)
=> BasicBrains e => BasicBrains e
-> IO [PeerAddr e] -> IO [PeerAddr e]
@ -730,6 +732,18 @@ selectPexInfo br = liftIO do
|] <&> fmap (fromStringMay . fromOnly) |] <&> fmap (fromStringMay . fromOnly)
<&> catMaybes <&> catMaybes
tableExists :: Connection -> Maybe String -> String -> IO Bool
tableExists conn prefix' tableName = do
let sql = [qc|
SELECT name FROM {prefix}.sqlite_master WHERE type='table' AND name=?
|]
r <- query conn sql (Only tableName) :: IO [Only String]
pure $ not $ null r
where
prefix = fromMaybe "main" prefix'
-- FIXME: eventually-close-db -- FIXME: eventually-close-db
newBasicBrains :: forall e m . (Hashable (Peer e), MonadIO m) newBasicBrains :: forall e m . (Hashable (Peer e), MonadIO m)
=> PeerConfig => PeerConfig
@ -836,14 +850,26 @@ newBasicBrains cfg = liftIO do
) )
|] |]
execute_ conn [qc| poll_1 <- tableExists conn (Just "statedb") "poll_1"
create table if not exists statedb.poll poll_0 <- tableExists conn (Just "statedb") "poll"
( ref text not null
, type text not null unless poll_1 do
, interval int not null debug $ red "BRAINS: CREATE poll_1"
, primary key (ref) execute_ conn [qc|
) create table if not exists statedb.poll_1
|] ( ref text not null
, type text not null
, interval int not null
, primary key (ref,type)
)
|]
when poll_0 do
debug $ red "BRAINS: FILL poll_1"
execute_ conn [qc|
insert into statedb.poll_1 (ref,type,interval)
select ref,type,interval from statedb.poll;
|]
execute_ conn [qc| execute_ conn [qc|
create table if not exists peer_asymmkey create table if not exists peer_asymmkey
@ -879,6 +905,10 @@ data PeerDownloadsDelOnStart
instance Monad m => HasCfgKey PeerDownloadsDelOnStart b m where instance Monad m => HasCfgKey PeerDownloadsDelOnStart b m where
key = "downloads-del-on-start" key = "downloads-del-on-start"
{- HLINT ignore "Use camelCase" -}
poll_table :: String
poll_table = "statedb.poll_1"
runBasicBrains :: forall e m . ( e ~ L4Proto runBasicBrains :: forall e m . ( e ~ L4Proto
, MonadUnliftIO m , MonadUnliftIO m
, ForRefChans e , ForRefChans e
@ -945,7 +975,7 @@ runBasicBrains cfg brains = do
updateOP brains $ do updateOP brains $ do
let conn = view brainsDb brains let conn = view brainsDb brains
liftIO $ execute conn [qc| liftIO $ execute conn [qc|
insert into statedb.poll (ref,type,interval) insert into {poll_table} (ref,type,interval)
values (?,?,?) values (?,?,?)
on conflict do update set interval = excluded.interval on conflict do update set interval = excluded.interval
|] (show $ pretty (AsBase58 x), show $ pretty t, mi) |] (show $ pretty (AsBase58 x), show $ pretty t, mi)

46
hbs2-peer/app/LWWRef.hs Normal file
View File

@ -0,0 +1,46 @@
module LWWRef where
import HBS2.Prelude.Plated
import HBS2.Clock
import HBS2.Actors.Peer
import HBS2.Events
import HBS2.Data.Types.Refs
import HBS2.Data.Detect
import HBS2.Net.PeerLocator
import HBS2.Net.Proto
import HBS2.Base58
import HBS2.Storage
import HBS2.Storage.Operations.Missed
import HBS2.Hash
import HBS2.Peer.Proto
import HBS2.Net.Auth.Credentials
import HBS2.Merkle
import HBS2.Misc.PrettyStuff
import Brains
import PeerConfig
import PeerTypes
import Control.Monad
import UnliftIO
lwwRefWorker :: forall e s m . ( MonadIO m
, MonadUnliftIO m
, MyPeer e
, HasStorage m
, Sessions e (KnownPeer e) m
, Signatures s
, s ~ Encryption e
, IsRefPubKey s
)
=> PeerConfig
-> SomeBrains e
-> m ()
lwwRefWorker conf brains = do
forever do
debug $ yellow "lwwRefWorker"
pause @'Seconds 20

View File

@ -48,6 +48,7 @@ import Bootstrap
import CheckMetrics import CheckMetrics
import RefLog qualified import RefLog qualified
import RefLog (reflogWorker) import RefLog (reflogWorker)
import LWWRef (lwwRefWorker)
import HttpWorker import HttpWorker
import DispatchProxy import DispatchProxy
import PeerMeta import PeerMeta
@ -819,7 +820,7 @@ runPeer opts = Exception.handle (\e -> myException e
let refChanAdapter = let refChanAdapter =
RefChanAdapter RefChanAdapter
{ refChanOnHead = refChanOnHeadFn rce { refChanOnHead = refChanOnHeadFn rce
, refChanSubscribed = isPolledRef @e brains , refChanSubscribed = isPolledRef @e brains "refchan"
, refChanWriteTran = refChanWriteTranFn rce , refChanWriteTran = refChanWriteTranFn rce
, refChanValidatePropose = refChanValidateTranFn @e rce , refChanValidatePropose = refChanValidateTranFn @e rce
@ -1036,6 +1037,8 @@ runPeer opts = Exception.handle (\e -> myException e
peerThread "refChanNotifyLogWorker" (refChanNotifyLogWorker @e conf (SomeBrains brains)) peerThread "refChanNotifyLogWorker" (refChanNotifyLogWorker @e conf (SomeBrains brains))
peerThread "lwwRefWorker" (lwwRefWorker @e conf (SomeBrains brains))
liftIO $ withPeerM penv do liftIO $ withPeerM penv do
runProto @e runProto @e
[ makeResponse (blockSizeProto blk (downloadOnBlockSize denv) onNoBlock) [ makeResponse (blockSizeProto blk (downloadOnBlockSize denv) onNoBlock)

View File

@ -486,12 +486,13 @@ simpleBlockAnnounce size h = do
class IsPolledKey e proto | proto -> e where class IsPolledKey e proto | proto -> e where
getPolledKey :: proto -> PubKey 'Sign (Encryption e) getPolledKey :: proto -> (String, PubKey 'Sign (Encryption e))
instance IsPolledKey e (LWWRefProto e) where instance IsPolledKey e (LWWRefProto e) where
getPolledKey = \case getPolledKey = \case
LWWRefProto1 (LWWProtoGet (LWWRefKey k)) -> k LWWRefProto1 (LWWProtoGet (LWWRefKey k)) -> (tp,k)
LWWRefProto1 (LWWProtoSet (LWWRefKey k) _) -> k LWWRefProto1 (LWWProtoSet (LWWRefKey k) _) -> (tp,k)
where tp = "lwwref"
subscribed :: forall e proto m . ( MonadIO m subscribed :: forall e proto m . ( MonadIO m
, IsPolledKey e proto , IsPolledKey e proto
@ -505,8 +506,8 @@ subscribed :: forall e proto m . ( MonadIO m
-> m () -> m ()
subscribed brains f req = do subscribed brains f req = do
let ref = getPolledKey req let (tp,ref) = getPolledKey req
polled <- isPolledRef @e brains ref polled <- isPolledRef @e brains tp ref
when polled $ f req when polled $ f req
authorized :: forall e proto m . ( MonadIO m authorized :: forall e proto m . ( MonadIO m

View File

@ -65,7 +65,7 @@ mkRefLogRequestAdapter :: forall e s m . ( MonadIO m
=> SomeBrains e -> m (RefLogRequestI e (ResponseM e m )) => SomeBrains e -> m (RefLogRequestI e (ResponseM e m ))
mkRefLogRequestAdapter brains = do mkRefLogRequestAdapter brains = do
sto <- getStorage sto <- getStorage
pure $ RefLogRequestI (doOnRefLogRequest brains sto) dontHandle (isPolledRef @e brains) pure $ RefLogRequestI (doOnRefLogRequest brains sto) dontHandle (isPolledRef @e brains "reflog")
doOnRefLogRequest :: forall e s m . ( MonadIO m doOnRefLogRequest :: forall e s m . ( MonadIO m
, MyPeer e , MyPeer e
@ -78,7 +78,7 @@ doOnRefLogRequest :: forall e s m . ( MonadIO m
-> m (Maybe (Hash HbSync)) -> m (Maybe (Hash HbSync))
doOnRefLogRequest brains sto (_,pk) = runMaybeT do doOnRefLogRequest brains sto (_,pk) = runMaybeT do
isPolledRef @e brains pk >>= guard isPolledRef @e brains "reflog" pk >>= guard
ref <- liftIO $ getRef sto (RefLogKey @s pk) ref <- liftIO $ getRef sto (RefLogKey @s pk)
when (isNothing ref) do when (isNothing ref) do
warn $ "missed reflog value" <+> pretty ref warn $ "missed reflog value" <+> pretty ref
@ -150,7 +150,7 @@ reflogWorker conf brains adapter = do
subscribe @e RefLogUpdateEvKey $ \(RefLogUpdateEvData (reflog,v, mpip)) -> do subscribe @e RefLogUpdateEvKey $ \(RefLogUpdateEvData (reflog,v, mpip)) -> do
trace $ "reflog worker.got refupdate" <+> pretty (AsBase58 reflog) trace $ "reflog worker.got refupdate" <+> pretty (AsBase58 reflog)
polled <- isPolledRef @e brains reflog polled <- isPolledRef @e brains "reflog" reflog
buddy <- maybe1 mpip (pure False) $ \pip -> do buddy <- maybe1 mpip (pure False) $ \pip -> do
pa <- toPeerAddr @e pip pa <- toPeerAddr @e pip
acceptAnnouncesFromPeer @e conf pa acceptAnnouncesFromPeer @e conf pa

View File

@ -265,6 +265,7 @@ executable hbs2-peer
, RefLog , RefLog
, RefChan , RefChan
, RefChanNotifyLog , RefChanNotifyLog
, LWWRef
, CheckMetrics , CheckMetrics
, HttpWorker , HttpWorker
, Brains , Brains

View File

@ -18,8 +18,8 @@ class HasBrains e a where
listPolledRefs :: MonadIO m => a -> Maybe String -> m [(PubKey 'Sign (Encryption e), String, Int)] listPolledRefs :: MonadIO m => a -> Maybe String -> m [(PubKey 'Sign (Encryption e), String, Int)]
listPolledRefs _ _ = pure mempty listPolledRefs _ _ = pure mempty
isPolledRef :: MonadIO m => a -> PubKey 'Sign (Encryption e) -> m Bool isPolledRef :: MonadIO m => a -> String -> PubKey 'Sign (Encryption e) -> m Bool
isPolledRef _ _ = pure False isPolledRef _ _ _ = pure False
delPolledRef :: MonadIO m => a -> PubKey 'Sign (Encryption e) -> m () delPolledRef :: MonadIO m => a -> PubKey 'Sign (Encryption e) -> m ()
delPolledRef _ _ = pure () delPolledRef _ _ = pure ()