From 53449e1faa14c7d1e71591131abd9ef08177002b Mon Sep 17 00:00:00 2001 From: peilun-conflux <48905552+peilun-conflux@users.noreply.github.com> Date: Fri, 9 Aug 2024 17:13:20 +0800 Subject: [PATCH] Half log_page_size when it queries too many logs. (#152) * Half log_page_size when it queries too many logs. * fmt. * Increase log broadcast channel size. --- .../src/sync_manager/log_query.rs | 47 +++++++++++++++---- node/log_entry_sync/src/sync_manager/mod.rs | 3 +- 2 files changed, 39 insertions(+), 11 deletions(-) diff --git a/node/log_entry_sync/src/sync_manager/log_query.rs b/node/log_entry_sync/src/sync_manager/log_query.rs index 37e77fb..b711b7d 100644 --- a/node/log_entry_sync/src/sync_manager/log_query.rs +++ b/node/log_entry_sync/src/sync_manager/log_query.rs @@ -14,6 +14,8 @@ use thiserror::Error; pub(crate) type PinBoxFut<'a, T> = Pin> + Send + 'a>>; +const TOO_MANY_LOGS_ERROR_MSG: &str = "query returned more than"; + /// A log query provides streaming access to historical logs via a paginated /// request. For streaming access to future logs, use [`Middleware::watch`] or /// [`Middleware::subscribe_logs`] @@ -21,6 +23,9 @@ pub struct LogQuery<'a, P> { provider: &'a Provider

, filter: Filter, from_block: Option, + + expected_page_size: u64, + /// It may be smaller than `expected_page_size` if the server cannot return all the logs. page_size: u64, current_logs: VecDeque, last_block: Option, @@ -31,7 +36,8 @@ pub struct LogQuery<'a, P> { enum LogQueryState<'a> { Initial, LoadLastBlock(PinBoxFut<'a, U64>), - LoadLogs(PinBoxFut<'a, Vec>), + /// `(from_block, get_logs_fut)`. `from_block` is used to resume if the request fails. + LoadLogs((Option, PinBoxFut<'a, Vec>)), Consume, } @@ -45,6 +51,7 @@ where provider, filter: filter.clone(), from_block: filter.get_from_block(), + expected_page_size: 10000, page_size: 10000, current_logs: VecDeque::new(), last_block: None, @@ -56,6 +63,7 @@ where /// set page size for pagination pub fn with_page_size(mut self, page_size: u64) -> Self { self.page_size = page_size; + self.expected_page_size = page_size; self } } @@ -98,7 +106,7 @@ where tokio::time::sleep(delay).await; provider.get_logs(&filter).await }); - rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut)); + rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs((None, fut))); } else { // if paginatable, load last block let fut = match self.filter.get_to_block() { @@ -134,18 +142,33 @@ where tokio::time::sleep(delay).await; provider.get_logs(&filter).await }); - rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut)); + rewake_with_new_state!( + ctx, + self, + LogQueryState::LoadLogs((Some(from_block), fut)) + ); } Err(err) => Poll::Ready(Some(Err(LogQueryError::LoadLastBlockError(err)))), } } - LogQueryState::LoadLogs(fut) => match futures_util::ready!(fut.as_mut().poll(ctx)) { - Ok(logs) => { - self.current_logs = VecDeque::from(logs); - rewake_with_new_state!(ctx, self, LogQueryState::Consume); + LogQueryState::LoadLogs((from_block, fut)) => { + match futures_util::ready!(fut.as_mut().poll(ctx)) { + Ok(logs) => { + self.current_logs = VecDeque::from(logs); + self.page_size = self.expected_page_size; + rewake_with_new_state!(ctx, self, LogQueryState::Consume); + } + Err(err) => { + if err.to_string().contains(TOO_MANY_LOGS_ERROR_MSG) { + self.from_block = *from_block; + self.page_size /= 2; + rewake_with_new_state!(ctx, self, LogQueryState::Consume); + } else { + Poll::Ready(Some(Err(LogQueryError::LoadLogsError(err)))) + } + } } - Err(err) => Poll::Ready(Some(Err(LogQueryError::LoadLogsError(err)))), - }, + } LogQueryState::Consume => { let log = self.current_logs.pop_front(); if log.is_none() { @@ -183,7 +206,11 @@ where provider.get_logs(&filter).await }); - rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut)); + rewake_with_new_state!( + ctx, + self, + LogQueryState::LoadLogs((Some(from_block), fut)) + ); } } else { Poll::Ready(log.map(Ok)) diff --git a/node/log_entry_sync/src/sync_manager/mod.rs b/node/log_entry_sync/src/sync_manager/mod.rs index 9cc487a..0a07f40 100644 --- a/node/log_entry_sync/src/sync_manager/mod.rs +++ b/node/log_entry_sync/src/sync_manager/mod.rs @@ -21,7 +21,8 @@ use tokio::sync::{oneshot, RwLock}; const RETRY_WAIT_MS: u64 = 500; // A RPC query can return at most 10000 entries. -const BROADCAST_CHANNEL_CAPACITY: usize = 10000; +// Each tx has less than 10KB, so the cache size should be acceptable. +const BROADCAST_CHANNEL_CAPACITY: usize = 25000; const CATCH_UP_END_GAP: u64 = 10; #[derive(Clone, Debug)]