mirror of https://github.com/voidlizard/hbs2
cache block sizes
This commit is contained in:
parent
bb7ed502b4
commit
1aa2554d50
|
@ -315,6 +315,35 @@ instance ( Hashable (Peer e)
|
||||||
r <- query @_ @(Only Int) conn [qc|select 1 from seen where hash = ? limit 1|] (Only h)
|
r <- query @_ @(Only Int) conn [qc|select 1 from seen where hash = ? limit 1|] (Only h)
|
||||||
pure $ not $ List.null r
|
pure $ not $ List.null r
|
||||||
|
|
||||||
|
brainsCacheBlockSize b pk ha s = do
|
||||||
|
updateOP b $ do
|
||||||
|
let conn = view brainsDb b
|
||||||
|
|
||||||
|
let sql = [qc|
|
||||||
|
insert into blocksizecache (block,peer,size)
|
||||||
|
values(?,?,?)
|
||||||
|
on conflict (block,peer) do update set size = excluded.size
|
||||||
|
|]
|
||||||
|
|
||||||
|
void $ execute conn sql (hash,peer,s)
|
||||||
|
|
||||||
|
where
|
||||||
|
peer = show $ pretty (AsBase58 pk)
|
||||||
|
hash = show $ pretty ha
|
||||||
|
|
||||||
|
brainsFindBlockSize brains pk ha = do
|
||||||
|
let conn = view brainsDb brains
|
||||||
|
let peer = show $ pretty (AsBase58 pk)
|
||||||
|
let hash = show $ pretty ha
|
||||||
|
liftIO do
|
||||||
|
result <- query @_ @(Only Integer) conn [qc|
|
||||||
|
select size
|
||||||
|
from blocksizecache
|
||||||
|
where block = ? and peer = ?
|
||||||
|
limit 1
|
||||||
|
|] (hash, peer)
|
||||||
|
pure $ fromOnly <$> listToMaybe result
|
||||||
|
|
||||||
commitNow :: forall e m . MonadIO m
|
commitNow :: forall e m . MonadIO m
|
||||||
=> BasicBrains e
|
=> BasicBrains e
|
||||||
-> Bool
|
-> Bool
|
||||||
|
@ -624,6 +653,7 @@ SAVEPOINT zzz1;
|
||||||
DELETE FROM ancestors WHERE strftime('%s','now') - strftime('%s', ts) > 600;
|
DELETE FROM ancestors WHERE strftime('%s','now') - strftime('%s', ts) > 600;
|
||||||
DELETE FROM seenby WHERE strftime('%s','now') - strftime('%s', ts) > 600;
|
DELETE FROM seenby WHERE strftime('%s','now') - strftime('%s', ts) > 600;
|
||||||
DELETE FROM blocksize WHERE strftime('%s','now') - strftime('%s', ts) > (86400*7);
|
DELETE FROM blocksize WHERE strftime('%s','now') - strftime('%s', ts) > (86400*7);
|
||||||
|
DELETE FROM blocksizecache WHERE strftime('%s','now') - strftime('%s', ts) > (86400*7);
|
||||||
DELETE FROM statedb.pexinfo where seen < datetime('now', '-7 days');
|
DELETE FROM statedb.pexinfo where seen < datetime('now', '-7 days');
|
||||||
DELETE FROM seen where ts < datetime('now');
|
DELETE FROM seen where ts < datetime('now');
|
||||||
|
|
||||||
|
@ -868,7 +898,7 @@ newBasicBrains cfg = liftIO do
|
||||||
|]
|
|]
|
||||||
|
|
||||||
execute_ conn [qc|
|
execute_ conn [qc|
|
||||||
create table if not exists blocksize2
|
create table if not exists blocksizecache
|
||||||
( block text not null
|
( block text not null
|
||||||
, peer text not null
|
, peer text not null
|
||||||
, size int
|
, size int
|
||||||
|
|
|
@ -27,6 +27,7 @@ import HBS2.Actors.Peer
|
||||||
import HBS2.Peer.Proto.Peer
|
import HBS2.Peer.Proto.Peer
|
||||||
import HBS2.Peer.Proto.BlockInfo
|
import HBS2.Peer.Proto.BlockInfo
|
||||||
import HBS2.Peer.Proto.BlockChunks
|
import HBS2.Peer.Proto.BlockChunks
|
||||||
|
import HBS2.Peer.Brains
|
||||||
import HBS2.Storage
|
import HBS2.Storage
|
||||||
import HBS2.Clock
|
import HBS2.Clock
|
||||||
import HBS2.Net.Auth.Schema
|
import HBS2.Net.Auth.Schema
|
||||||
|
@ -105,6 +106,10 @@ instance BlockSizeCache e () where
|
||||||
cacheBlockSize _ _ _ _ = pure ()
|
cacheBlockSize _ _ _ _ = pure ()
|
||||||
findBlockSize _ _ _ = pure Nothing
|
findBlockSize _ _ _ = pure Nothing
|
||||||
|
|
||||||
|
instance BlockSizeCache e (SomeBrains e) where
|
||||||
|
cacheBlockSize = brainsCacheBlockSize @e
|
||||||
|
findBlockSize = brainsFindBlockSize @e
|
||||||
|
|
||||||
queryBlockSizeFromPeer :: forall e cache m . ( e ~ L4Proto
|
queryBlockSizeFromPeer :: forall e cache m . ( e ~ L4Proto
|
||||||
, MonadUnliftIO m
|
, MonadUnliftIO m
|
||||||
, BlockSizeCache e cache
|
, BlockSizeCache e cache
|
||||||
|
@ -119,23 +124,33 @@ queryBlockSizeFromPeer cache e h peer = do
|
||||||
|
|
||||||
what <- try @_ @(DownloadError e) $ liftIO $ withPeerM e do
|
what <- try @_ @(DownloadError e) $ liftIO $ withPeerM e do
|
||||||
|
|
||||||
PeerData{..} <- find (KnownPeerKey peer) id
|
flip runContT pure $ callCC \exit -> do
|
||||||
>>= orThrow (UnknownPeerError peer)
|
|
||||||
|
|
||||||
sizeQ <- newTQueueIO
|
PeerData{..} <- lift $ find (KnownPeerKey peer) id
|
||||||
|
>>= orThrow (UnknownPeerError peer)
|
||||||
|
|
||||||
subscribe @e (BlockSizeEventKey peer) $ \case
|
s <- lift $ findBlockSize @e cache _peerSignKey h
|
||||||
BlockSizeEvent (that, hx, sz) | hx == h -> do
|
|
||||||
atomically $ writeTQueue sizeQ (Just sz)
|
|
||||||
cacheBlockSize @e cache _peerSignKey h sz
|
|
||||||
|
|
||||||
_ -> do
|
debug $ "FOUND CACHED VALUE" <+> pretty h <+> pretty s
|
||||||
atomically $ writeTQueue sizeQ Nothing
|
|
||||||
|
|
||||||
request peer (GetBlockSize @e h)
|
maybe none (exit . Just) s
|
||||||
|
|
||||||
race ( pause defBlockInfoTimeout ) (atomically $ readTQueue sizeQ )
|
lift do
|
||||||
>>= orThrow (PeerRequestTimeout peer)
|
|
||||||
|
sizeQ <- newTQueueIO
|
||||||
|
|
||||||
|
subscribe @e (BlockSizeEventKey peer) $ \case
|
||||||
|
BlockSizeEvent (that, hx, sz) | hx == h -> do
|
||||||
|
atomically $ writeTQueue sizeQ (Just sz)
|
||||||
|
cacheBlockSize @e cache _peerSignKey h sz
|
||||||
|
|
||||||
|
_ -> do
|
||||||
|
atomically $ writeTQueue sizeQ Nothing
|
||||||
|
|
||||||
|
request peer (GetBlockSize @e h)
|
||||||
|
|
||||||
|
race ( pause defBlockInfoTimeout ) (atomically $ readTQueue sizeQ )
|
||||||
|
>>= orThrow (PeerRequestTimeout peer)
|
||||||
|
|
||||||
case what of
|
case what of
|
||||||
Left{} -> pure $ Left (PeerRequestTimeout peer)
|
Left{} -> pure $ Left (PeerRequestTimeout peer)
|
||||||
|
@ -268,7 +283,7 @@ instance (e ~ L4Proto, MonadUnliftIO m, HasRpcContext PeerAPI RPC2Context m) =>
|
||||||
|
|
||||||
peer <- either (const $ exit (mkSym "error:invalid-address")) pure peer'
|
peer <- either (const $ exit (mkSym "error:invalid-address")) pure peer'
|
||||||
|
|
||||||
what <- lift $ downloadFromPeer defChunkWaitMax () rpcPeerEnv (coerce blk) peer
|
what <- lift $ downloadFromPeer defChunkWaitMax rpcBrains rpcPeerEnv (coerce blk) peer
|
||||||
|
|
||||||
case what of
|
case what of
|
||||||
Left e -> pure $ mkList @C [ mkSym "error" , mkStr (show e) ]
|
Left e -> pure $ mkList @C [ mkSym "error" , mkStr (show e) ]
|
||||||
|
@ -285,7 +300,7 @@ instance (e ~ L4Proto, MonadUnliftIO m, HasRpcContext PeerAPI RPC2Context m) =>
|
||||||
|
|
||||||
peer <- either (const $ exit (mkSym "error:invalid-address")) pure peer'
|
peer <- either (const $ exit (mkSym "error:invalid-address")) pure peer'
|
||||||
|
|
||||||
sz <- lift $ queryBlockSizeFromPeer @e () rpcPeerEnv (coerce blk) peer
|
sz <- lift $ queryBlockSizeFromPeer @e rpcBrains rpcPeerEnv (coerce blk) peer
|
||||||
|
|
||||||
case sz of
|
case sz of
|
||||||
Left e -> pure $ mkList @C [ mkSym "error", mkStr (show e) ]
|
Left e -> pure $ mkList @C [ mkSym "error", mkStr (show e) ]
|
||||||
|
|
|
@ -67,6 +67,23 @@ class HasBrains e a where
|
||||||
onKnownPeers :: MonadIO m => a -> [Peer e] -> m ()
|
onKnownPeers :: MonadIO m => a -> [Peer e] -> m ()
|
||||||
onKnownPeers _ _ = none
|
onKnownPeers _ _ = none
|
||||||
|
|
||||||
|
brainsCacheBlockSize :: MonadIO m
|
||||||
|
=> a
|
||||||
|
-> PubKey 'Sign (Encryption e)
|
||||||
|
-> Hash HbSync
|
||||||
|
-> Integer
|
||||||
|
-> m ()
|
||||||
|
|
||||||
|
brainsCacheBlockSize _ _ _ _ = none
|
||||||
|
|
||||||
|
brainsFindBlockSize :: MonadIO m
|
||||||
|
=> a
|
||||||
|
-> PubKey 'Sign (Encryption e)
|
||||||
|
-> Hash HbSync
|
||||||
|
-> m (Maybe Integer)
|
||||||
|
|
||||||
|
brainsFindBlockSize _ _ _ = pure Nothing
|
||||||
|
|
||||||
onBlockSize :: ( MonadIO m
|
onBlockSize :: ( MonadIO m
|
||||||
, IsPeerAddr e m
|
, IsPeerAddr e m
|
||||||
)
|
)
|
||||||
|
@ -197,3 +214,6 @@ instance HasBrains e (SomeBrains e) where
|
||||||
setSeen (SomeBrains a) = setSeen @e a
|
setSeen (SomeBrains a) = setSeen @e a
|
||||||
isSeen (SomeBrains a) = isSeen @e a
|
isSeen (SomeBrains a) = isSeen @e a
|
||||||
|
|
||||||
|
brainsCacheBlockSize (SomeBrains a) = brainsCacheBlockSize @e a
|
||||||
|
brainsFindBlockSize (SomeBrains a) = brainsFindBlockSize @e a
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue