Half log_page_size when it queries too many logs. (#152)
Some checks failed
abi-consistent-check / build-and-compare (push) Has been cancelled
code-coverage / unittest-cov (push) Has been cancelled
rust / check (push) Has been cancelled
rust / test (push) Has been cancelled
rust / lints (push) Has been cancelled
functional-test / test (push) Has been cancelled

* Half log_page_size when it queries too many logs.

* fmt.

* Increase log broadcast channel size.
This commit is contained in:
peilun-conflux 2024-08-09 17:13:20 +08:00 committed by GitHub
parent 9189cabbb2
commit 53449e1faa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 39 additions and 11 deletions

View File

@ -14,6 +14,8 @@ use thiserror::Error;
pub(crate) type PinBoxFut<'a, T> =
Pin<Box<dyn Future<Output = Result<T, ProviderError>> + Send + 'a>>;
const TOO_MANY_LOGS_ERROR_MSG: &str = "query returned more than";
/// A log query provides streaming access to historical logs via a paginated
/// request. For streaming access to future logs, use [`Middleware::watch`] or
/// [`Middleware::subscribe_logs`]
@ -21,6 +23,9 @@ pub struct LogQuery<'a, P> {
provider: &'a Provider<P>,
filter: Filter,
from_block: Option<U64>,
expected_page_size: u64,
/// It may be smaller than `expected_page_size` if the server cannot return all the logs.
page_size: u64,
current_logs: VecDeque<Log>,
last_block: Option<U64>,
@ -31,7 +36,8 @@ pub struct LogQuery<'a, P> {
enum LogQueryState<'a> {
Initial,
LoadLastBlock(PinBoxFut<'a, U64>),
LoadLogs(PinBoxFut<'a, Vec<Log>>),
/// `(from_block, get_logs_fut)`. `from_block` is used to resume if the request fails.
LoadLogs((Option<U64>, PinBoxFut<'a, Vec<Log>>)),
Consume,
}
@ -45,6 +51,7 @@ where
provider,
filter: filter.clone(),
from_block: filter.get_from_block(),
expected_page_size: 10000,
page_size: 10000,
current_logs: VecDeque::new(),
last_block: None,
@ -56,6 +63,7 @@ where
/// set page size for pagination
pub fn with_page_size(mut self, page_size: u64) -> Self {
self.page_size = page_size;
self.expected_page_size = page_size;
self
}
}
@ -98,7 +106,7 @@ where
tokio::time::sleep(delay).await;
provider.get_logs(&filter).await
});
rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut));
rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs((None, fut)));
} else {
// if paginatable, load last block
let fut = match self.filter.get_to_block() {
@ -134,18 +142,33 @@ where
tokio::time::sleep(delay).await;
provider.get_logs(&filter).await
});
rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut));
rewake_with_new_state!(
ctx,
self,
LogQueryState::LoadLogs((Some(from_block), fut))
);
}
Err(err) => Poll::Ready(Some(Err(LogQueryError::LoadLastBlockError(err)))),
}
}
LogQueryState::LoadLogs(fut) => match futures_util::ready!(fut.as_mut().poll(ctx)) {
Ok(logs) => {
self.current_logs = VecDeque::from(logs);
rewake_with_new_state!(ctx, self, LogQueryState::Consume);
LogQueryState::LoadLogs((from_block, fut)) => {
match futures_util::ready!(fut.as_mut().poll(ctx)) {
Ok(logs) => {
self.current_logs = VecDeque::from(logs);
self.page_size = self.expected_page_size;
rewake_with_new_state!(ctx, self, LogQueryState::Consume);
}
Err(err) => {
if err.to_string().contains(TOO_MANY_LOGS_ERROR_MSG) {
self.from_block = *from_block;
self.page_size /= 2;
rewake_with_new_state!(ctx, self, LogQueryState::Consume);
} else {
Poll::Ready(Some(Err(LogQueryError::LoadLogsError(err))))
}
}
}
Err(err) => Poll::Ready(Some(Err(LogQueryError::LoadLogsError(err)))),
},
}
LogQueryState::Consume => {
let log = self.current_logs.pop_front();
if log.is_none() {
@ -183,7 +206,11 @@ where
provider.get_logs(&filter).await
});
rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut));
rewake_with_new_state!(
ctx,
self,
LogQueryState::LoadLogs((Some(from_block), fut))
);
}
} else {
Poll::Ready(log.map(Ok))

View File

@ -21,7 +21,8 @@ use tokio::sync::{oneshot, RwLock};
const RETRY_WAIT_MS: u64 = 500;
// A RPC query can return at most 10000 entries.
const BROADCAST_CHANNEL_CAPACITY: usize = 10000;
// Each tx has less than 10KB, so the cache size should be acceptable.
const BROADCAST_CHANNEL_CAPACITY: usize = 25000;
const CATCH_UP_END_GAP: u64 = 10;
#[derive(Clone, Debug)]