wip, respect only polled refchans

This commit is contained in:
Dmitry Zuikov 2023-07-15 06:08:56 +03:00
parent 476ecddb6d
commit 42be590baa
4 changed files with 101 additions and 33 deletions

View File

@ -105,7 +105,8 @@ instance ForRefChans e => Serialise (RefChanHead e)
data RefChanHeadAdapter e m =
RefChanHeadAdapter
{ refChanHeadOnHead :: RefChanHeadBlockTran e -> m ()
{ refChanHeadOnHead :: RefChanId e -> RefChanHeadBlockTran e -> m ()
, refChanHeadSubscribed :: RefChanId e -> m Bool
}
refChanHeadProto :: forall e s m . ( MonadIO m
@ -135,9 +136,10 @@ refChanHeadProto self adapter msg = do
case msg of
RefChanHead chan pkt -> do
guard =<< lift (refChanHeadSubscribed adapter chan)
trace $ "RefChanHead" <+> pretty self <+> pretty (AsBase58 chan)
-- FIXME: check-chan-is-listened
lift $ refChanHeadOnHead adapter pkt
lift $ refChanHeadOnHead adapter chan pkt
RefChanGetHead _ -> do
-- прочитать ссылку

View File

@ -5,33 +5,37 @@ module Brains where
import HBS2.Prelude.Plated
import HBS2.Clock
import HBS2.Data.Types.Refs
import HBS2.Net.Proto
import HBS2.Hash
import HBS2.Base58
import HBS2.Net.IP.Addr
import HBS2.Net.Auth.Credentials
import HBS2.System.Logger.Simple
import PeerConfig
import Data.Maybe
import Control.Monad
import Control.Exception
import Control.Concurrent.STM
import Control.Concurrent.Async
import Lens.Micro.Platform
import Data.HashMap.Strict qualified as HashMap
import Data.HashMap.Strict (HashMap)
import Data.List qualified as List
import Data.Cache (Cache)
import Data.Cache qualified as Cache
import Text.InterpolatedString.Perl6 (qc)
import Control.Exception
import Control.Monad
import Database.SQLite.Simple
import Database.SQLite.Simple.FromField
import System.Random (randomRIO)
import Data.Word
import Data.Cache (Cache)
import Data.Cache qualified as Cache
import Data.Either
import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HashMap
import Data.List qualified as List
import Data.Maybe
import Data.Text qualified as Text
import Data.Word
import Lens.Micro.Platform
import System.Directory
import System.FilePath
import System.Random (randomRIO)
import Text.InterpolatedString.Perl6 (qc)
import UnliftIO (MonadUnliftIO(..),async,race)
data PeerBrainsDb
@ -40,6 +44,12 @@ instance HasCfgKey PeerBrainsDb (Maybe String) where
class HasBrains e a where
listPolledRefs :: MonadIO m => a -> String -> m [(PubKey 'Sign (Encryption e), Int)]
listPolledRefs _ _ = pure mempty
isPolledRef :: MonadIO m => a -> PubKey 'Sign (Encryption e) -> m Bool
isPolledRef _ _ = pure False
onClientTCPConnected :: MonadIO m => a -> PeerAddr e -> Word64 -> m ()
onClientTCPConnected _ _ = const none
@ -148,6 +158,8 @@ instance Pretty (Peer e) => HasBrains e NoBrains where
data SomeBrains e = forall a . HasBrains e a => SomeBrains a
instance HasBrains e (SomeBrains e) where
listPolledRefs (SomeBrains a) = listPolledRefs @e a
isPolledRef (SomeBrains a) = isPolledRef @e a
onClientTCPConnected (SomeBrains a) = onClientTCPConnected @e a
getClientTCP (SomeBrains a) = getClientTCP @e a
setActiveTCPSessions (SomeBrains a) = setActiveTCPSessions @e a
@ -279,6 +291,23 @@ instance ( Hashable (Peer e)
setReflogProcessed b h = do
updateOP b $ insertReflogProcessed b h
listPolledRefs brains tp = do
liftIO $ do
let conn = view brainsDb brains
query conn [qc|select ref, interval from poll where type = ?|] (Only tp)
<&> fmap (\(r,i) -> (,i) <$> fromStringMay r )
<&> catMaybes
isPolledRef brains ref = do
liftIO do
let conn = view brainsDb brains
query @_ @(Only Int) conn [qc|
select 1 from poll
where ref = ?
limit 1
|] ( Only ( show $ pretty (AsBase58 ref) ) )
<&> isJust . listToMaybe
commitNow :: forall e m . MonadIO m
=> BasicBrains e
-> Bool
@ -690,6 +719,16 @@ newBasicBrains cfg = liftIO do
)
|]
execute_ conn [qc|
create table if not exists poll
( ref text not null
, type text not null
, interval int not null
, primary key (ref,type)
)
|]
BasicBrains <$> newTVarIO mempty
<*> newTVarIO mempty
<*> Cache.newCache (Just (toTimeSpec (30 :: Timeout 'Seconds)))
@ -697,8 +736,12 @@ newBasicBrains cfg = liftIO do
<*> newTQueueIO
<*> newTQueueIO
runBasicBrains :: MonadIO m => BasicBrains e -> m ()
runBasicBrains brains = do
runBasicBrains :: forall e m . ( e ~ L4Proto, MonadUnliftIO m )
=> PeerConfig
-> BasicBrains e
-> m ()
runBasicBrains cfg brains = do
let pip = view brainsPipeline brains
let expire = view brainsExpire brains
@ -725,6 +768,27 @@ runBasicBrains brains = do
pause @'Seconds 60
updateOP brains (cleanupHashes brains)
trace "runBasicBrains init"
let (PeerConfig syn) = cfg
let polls = catMaybes (
[ (tp,n,) <$> fromStringMay @(PubKey 'Sign (Encryption e)) (Text.unpack ref)
| ListVal @C (Key "poll" [SymbolVal tp, LitIntVal n, LitStrVal ref]) <- syn
] )
void $ async $ do
pause @'Seconds 5
forM_ polls $ \(t,mi,x) -> do
trace $ "BRAINS: poll" <+> pretty t <+> pretty (AsBase58 x) <+> pretty mi
updateOP brains $ do
let conn = view brainsDb brains
liftIO $ execute conn [qc|
insert into poll (ref,type,interval)
values (?,?,?)
on conflict do update set interval = excluded.interval
|] (show $ pretty (AsBase58 x), show $ pretty t, mi)
commitNow brains True
void $ forever do
pause @'Seconds 15
ee <- liftIO $ Cache.toList expire

View File

@ -533,7 +533,7 @@ runPeer opts = U.handle (\e -> myException e
brains <- newBasicBrains @e conf
brainsThread <- async $ runBasicBrains brains
brainsThread <- async $ runBasicBrains conf brains
denv <- newDownloadEnv brains
@ -564,6 +564,7 @@ runPeer opts = U.handle (\e -> myException e
let refChanHeadAdapter = RefChanHeadAdapter
{ refChanHeadOnHead = refChanOnHead rce
, refChanHeadSubscribed = isPolledRef @e brains
}
let pexFilt pips = do

View File

@ -52,8 +52,8 @@ instance Exception DataNotReady
data RefChanWorkerEnv e =
RefChanWorkerEnv
{ _refChanWorkerEnvDownload :: DownloadEnv e
, _refChanWorkerEnvHeadQ :: TQueue (RefChanHeadBlockTran e)
, _refChaWorkerEnvDownload :: TVar (HashMap HashRef TimeSpec) -- таймстемп можно
, _refChanWorkerEnvHeadQ :: TQueue (RefChanId e, RefChanHeadBlockTran e)
, _refChaWorkerEnvDownload :: TVar (HashMap HashRef (RefChanId e, TimeSpec))
}
makeLenses 'RefChanWorkerEnv
@ -67,22 +67,22 @@ refChanWorkerEnv _ de = liftIO $ RefChanWorkerEnv @e de <$> newTQueueIO
<*> newTVarIO mempty
refChanOnHead :: MonadIO m => RefChanWorkerEnv e -> RefChanHeadBlockTran e -> m ()
refChanOnHead env tran = do
atomically $ writeTQueue (view refChanWorkerEnvHeadQ env) tran
refChanOnHead :: MonadIO m => RefChanWorkerEnv e -> RefChanId e -> RefChanHeadBlockTran e -> m ()
refChanOnHead env chan tran = do
atomically $ writeTQueue (view refChanWorkerEnvHeadQ env) (chan, tran)
refChanAddDownload :: forall e m . ( m ~ PeerM e IO
, MyPeer e
, Block ByteString ~ ByteString
)
=> RefChanWorkerEnv e -> HashRef -> m ()
refChanAddDownload env r = do
=> RefChanWorkerEnv e -> RefChanId e -> HashRef -> m ()
refChanAddDownload env chan r = do
penv <- ask
t <- getTimeCoarse
withPeerM penv $ withDownload (_refChanWorkerEnvDownload env)
$ processBlock @e (fromHashRef r)
atomically $ modifyTVar (view refChaWorkerEnvDownload env) (HashMap.insert r t)
atomically $ modifyTVar (view refChaWorkerEnvDownload env) (HashMap.insert r (chan,t))
checkDownloaded :: forall m . (MonadIO m, HasStorage m, Block ByteString ~ ByteString) => HashRef -> m Bool
checkDownloaded hr = do
@ -146,28 +146,27 @@ refChanWorker env = do
now <- getTimeCoarse
-- FIXME: consider-timeouts-or-leak-is-possible
rest <- forM all $ \(r,t) -> do
rest <- forM all $ \(r,item@(chan,t)) -> do
here <- checkDownloaded r
if here then do
refChanOnHead env (RefChanHeadBlockTran r)
refChanOnHead env chan (RefChanHeadBlockTran r)
pure mempty
else do
-- FIXME: fix-timeout-hardcode
let expired = realToFrac (toNanoSecs $ now - t) / 1e9 > 600
if expired then pure mempty else pure [(r,t)]
if expired then pure mempty else pure [(r,item)]
atomically $ writeTVar (view refChaWorkerEnvDownload env) (HashMap.fromList (mconcat rest))
-- FIXME: in-parallel?
refChanHeadMon = do
forever do
RefChanHeadBlockTran hr <- atomically $ readTQueue (view refChanWorkerEnvHeadQ env)
-- debug $ "DROP HEAD UPDATE" <+> pretty (fromRefChanHeadBlockTran tran)
(chan, RefChanHeadBlockTran hr) <- atomically $ readTQueue (view refChanWorkerEnvHeadQ env)
here <- checkDownloaded hr
if not here then do
refChanAddDownload env hr
refChanAddDownload env chan hr
trace $ "BLOCK IS NOT HERE" <+> pretty hr
else do
trace $ "BLOCK IS HERE" <+> pretty hr
@ -178,7 +177,7 @@ refChanWorker env = do
case what of
Nothing -> err $ "malformed head block" <+> pretty hr
Just (pk,blk) -> do
Just (pk,blk) | pk == chan -> do
let rkey = RefChanHeadKey @s pk
sto <- getStorage
@ -205,6 +204,8 @@ refChanWorker env = do
else do
debug $ "LEAVING HEAD BLOCK" <+> pretty (v1, v0)
_ -> debug "not subscribed to this refchan"
pure ()
-- распаковываем блок
-- вытаскиваем ключ из блока?