diff --git a/hbs2-peer/app/Brains.hs b/hbs2-peer/app/Brains.hs index b9abdb33..0d86c793 100644 --- a/hbs2-peer/app/Brains.hs +++ b/hbs2-peer/app/Brains.hs @@ -315,6 +315,35 @@ instance ( Hashable (Peer e) r <- query @_ @(Only Int) conn [qc|select 1 from seen where hash = ? limit 1|] (Only h) pure $ not $ List.null r + brainsCacheBlockSize b pk ha s = do + updateOP b $ do + let conn = view brainsDb b + + let sql = [qc| + insert into blocksizecache (block,peer,size) + values(?,?,?) + on conflict (block,peer) do update set size = excluded.size + |] + + void $ execute conn sql (hash,peer,s) + + where + peer = show $ pretty (AsBase58 pk) + hash = show $ pretty ha + + brainsFindBlockSize brains pk ha = do + let conn = view brainsDb brains + let peer = show $ pretty (AsBase58 pk) + let hash = show $ pretty ha + liftIO do + result <- query @_ @(Only Integer) conn [qc| + select size + from blocksizecache + where block = ? and peer = ? + limit 1 + |] (hash, peer) + pure $ fromOnly <$> listToMaybe result + commitNow :: forall e m . MonadIO m => BasicBrains e -> Bool @@ -624,6 +653,7 @@ SAVEPOINT zzz1; DELETE FROM ancestors WHERE strftime('%s','now') - strftime('%s', ts) > 600; DELETE FROM seenby WHERE strftime('%s','now') - strftime('%s', ts) > 600; DELETE FROM blocksize WHERE strftime('%s','now') - strftime('%s', ts) > (86400*7); +DELETE FROM blocksizecache WHERE strftime('%s','now') - strftime('%s', ts) > (86400*7); DELETE FROM statedb.pexinfo where seen < datetime('now', '-7 days'); DELETE FROM seen where ts < datetime('now'); @@ -868,7 +898,7 @@ newBasicBrains cfg = liftIO do |] execute_ conn [qc| - create table if not exists blocksize2 + create table if not exists blocksizecache ( block text not null , peer text not null , size int diff --git a/hbs2-peer/app/RPC2.hs b/hbs2-peer/app/RPC2.hs index 622d2adc..5f87c2b9 100644 --- a/hbs2-peer/app/RPC2.hs +++ b/hbs2-peer/app/RPC2.hs @@ -27,6 +27,7 @@ import HBS2.Actors.Peer import HBS2.Peer.Proto.Peer import HBS2.Peer.Proto.BlockInfo import HBS2.Peer.Proto.BlockChunks +import HBS2.Peer.Brains import HBS2.Storage import HBS2.Clock import HBS2.Net.Auth.Schema @@ -105,6 +106,10 @@ instance BlockSizeCache e () where cacheBlockSize _ _ _ _ = pure () findBlockSize _ _ _ = pure Nothing +instance BlockSizeCache e (SomeBrains e) where + cacheBlockSize = brainsCacheBlockSize @e + findBlockSize = brainsFindBlockSize @e + queryBlockSizeFromPeer :: forall e cache m . ( e ~ L4Proto , MonadUnliftIO m , BlockSizeCache e cache @@ -119,23 +124,33 @@ queryBlockSizeFromPeer cache e h peer = do what <- try @_ @(DownloadError e) $ liftIO $ withPeerM e do - PeerData{..} <- find (KnownPeerKey peer) id - >>= orThrow (UnknownPeerError peer) + flip runContT pure $ callCC \exit -> do - sizeQ <- newTQueueIO + PeerData{..} <- lift $ find (KnownPeerKey peer) id + >>= orThrow (UnknownPeerError peer) - subscribe @e (BlockSizeEventKey peer) $ \case - BlockSizeEvent (that, hx, sz) | hx == h -> do - atomically $ writeTQueue sizeQ (Just sz) - cacheBlockSize @e cache _peerSignKey h sz + s <- lift $ findBlockSize @e cache _peerSignKey h - _ -> do - atomically $ writeTQueue sizeQ Nothing + debug $ "FOUND CACHED VALUE" <+> pretty h <+> pretty s - request peer (GetBlockSize @e h) + maybe none (exit . Just) s - race ( pause defBlockInfoTimeout ) (atomically $ readTQueue sizeQ ) - >>= orThrow (PeerRequestTimeout peer) + lift do + + sizeQ <- newTQueueIO + + subscribe @e (BlockSizeEventKey peer) $ \case + BlockSizeEvent (that, hx, sz) | hx == h -> do + atomically $ writeTQueue sizeQ (Just sz) + cacheBlockSize @e cache _peerSignKey h sz + + _ -> do + atomically $ writeTQueue sizeQ Nothing + + request peer (GetBlockSize @e h) + + race ( pause defBlockInfoTimeout ) (atomically $ readTQueue sizeQ ) + >>= orThrow (PeerRequestTimeout peer) case what of Left{} -> pure $ Left (PeerRequestTimeout peer) @@ -268,7 +283,7 @@ instance (e ~ L4Proto, MonadUnliftIO m, HasRpcContext PeerAPI RPC2Context m) => peer <- either (const $ exit (mkSym "error:invalid-address")) pure peer' - what <- lift $ downloadFromPeer defChunkWaitMax () rpcPeerEnv (coerce blk) peer + what <- lift $ downloadFromPeer defChunkWaitMax rpcBrains rpcPeerEnv (coerce blk) peer case what of Left e -> pure $ mkList @C [ mkSym "error" , mkStr (show e) ] @@ -285,7 +300,7 @@ instance (e ~ L4Proto, MonadUnliftIO m, HasRpcContext PeerAPI RPC2Context m) => peer <- either (const $ exit (mkSym "error:invalid-address")) pure peer' - sz <- lift $ queryBlockSizeFromPeer @e () rpcPeerEnv (coerce blk) peer + sz <- lift $ queryBlockSizeFromPeer @e rpcBrains rpcPeerEnv (coerce blk) peer case sz of Left e -> pure $ mkList @C [ mkSym "error", mkStr (show e) ] diff --git a/hbs2-peer/lib/HBS2/Peer/Brains.hs b/hbs2-peer/lib/HBS2/Peer/Brains.hs index 8c16d230..f17cdbcd 100644 --- a/hbs2-peer/lib/HBS2/Peer/Brains.hs +++ b/hbs2-peer/lib/HBS2/Peer/Brains.hs @@ -67,6 +67,23 @@ class HasBrains e a where onKnownPeers :: MonadIO m => a -> [Peer e] -> m () onKnownPeers _ _ = none + brainsCacheBlockSize :: MonadIO m + => a + -> PubKey 'Sign (Encryption e) + -> Hash HbSync + -> Integer + -> m () + + brainsCacheBlockSize _ _ _ _ = none + + brainsFindBlockSize :: MonadIO m + => a + -> PubKey 'Sign (Encryption e) + -> Hash HbSync + -> m (Maybe Integer) + + brainsFindBlockSize _ _ _ = pure Nothing + onBlockSize :: ( MonadIO m , IsPeerAddr e m ) @@ -197,3 +214,6 @@ instance HasBrains e (SomeBrains e) where setSeen (SomeBrains a) = setSeen @e a isSeen (SomeBrains a) = isSeen @e a + brainsCacheBlockSize (SomeBrains a) = brainsCacheBlockSize @e a + brainsFindBlockSize (SomeBrains a) = brainsFindBlockSize @e a +