mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-04-04 15:35:18 +00:00
Half log_page_size when it queries too many logs.
This commit is contained in:
parent
891e00fa80
commit
ea92174096
@ -1,4 +1,6 @@
|
||||
use ethers::prelude::{Filter, JsonRpcClient, Log, Middleware, Provider, ProviderError, U64};
|
||||
use ethers::prelude::{
|
||||
Filter, JsonRpcClient, Log, Middleware, Provider, ProviderError, U64,
|
||||
};
|
||||
use futures_core::stream::Stream;
|
||||
use jsonrpsee::tracing::trace;
|
||||
use std::future::Future;
|
||||
@ -14,6 +16,8 @@ use thiserror::Error;
|
||||
pub(crate) type PinBoxFut<'a, T> =
|
||||
Pin<Box<dyn Future<Output = Result<T, ProviderError>> + Send + 'a>>;
|
||||
|
||||
const TOO_MANY_LOGS_ERROR_MSG: &str = "query returned more than";
|
||||
|
||||
/// A log query provides streaming access to historical logs via a paginated
|
||||
/// request. For streaming access to future logs, use [`Middleware::watch`] or
|
||||
/// [`Middleware::subscribe_logs`]
|
||||
@ -21,6 +25,9 @@ pub struct LogQuery<'a, P> {
|
||||
provider: &'a Provider<P>,
|
||||
filter: Filter,
|
||||
from_block: Option<U64>,
|
||||
|
||||
expected_page_size: u64,
|
||||
/// It may be smaller than `expected_page_size` if the server cannot return all the logs.
|
||||
page_size: u64,
|
||||
current_logs: VecDeque<Log>,
|
||||
last_block: Option<U64>,
|
||||
@ -31,7 +38,8 @@ pub struct LogQuery<'a, P> {
|
||||
enum LogQueryState<'a> {
|
||||
Initial,
|
||||
LoadLastBlock(PinBoxFut<'a, U64>),
|
||||
LoadLogs(PinBoxFut<'a, Vec<Log>>),
|
||||
/// `(from_block, get_logs_fut)`. `from_block` is used to resume if the request fails.
|
||||
LoadLogs((Option<U64>, PinBoxFut<'a, Vec<Log>>)),
|
||||
Consume,
|
||||
}
|
||||
|
||||
@ -45,6 +53,7 @@ where
|
||||
provider,
|
||||
filter: filter.clone(),
|
||||
from_block: filter.get_from_block(),
|
||||
expected_page_size: 10000,
|
||||
page_size: 10000,
|
||||
current_logs: VecDeque::new(),
|
||||
last_block: None,
|
||||
@ -56,6 +65,7 @@ where
|
||||
/// set page size for pagination
|
||||
pub fn with_page_size(mut self, page_size: u64) -> Self {
|
||||
self.page_size = page_size;
|
||||
self.expected_page_size = page_size;
|
||||
self
|
||||
}
|
||||
}
|
||||
@ -98,7 +108,7 @@ where
|
||||
tokio::time::sleep(delay).await;
|
||||
provider.get_logs(&filter).await
|
||||
});
|
||||
rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut));
|
||||
rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs((None, fut)));
|
||||
} else {
|
||||
// if paginatable, load last block
|
||||
let fut = match self.filter.get_to_block() {
|
||||
@ -134,18 +144,33 @@ where
|
||||
tokio::time::sleep(delay).await;
|
||||
provider.get_logs(&filter).await
|
||||
});
|
||||
rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut));
|
||||
rewake_with_new_state!(
|
||||
ctx,
|
||||
self,
|
||||
LogQueryState::LoadLogs((Some(from_block), fut))
|
||||
);
|
||||
}
|
||||
Err(err) => Poll::Ready(Some(Err(LogQueryError::LoadLastBlockError(err)))),
|
||||
}
|
||||
}
|
||||
LogQueryState::LoadLogs(fut) => match futures_util::ready!(fut.as_mut().poll(ctx)) {
|
||||
Ok(logs) => {
|
||||
self.current_logs = VecDeque::from(logs);
|
||||
rewake_with_new_state!(ctx, self, LogQueryState::Consume);
|
||||
LogQueryState::LoadLogs((from_block, fut)) => {
|
||||
match futures_util::ready!(fut.as_mut().poll(ctx)) {
|
||||
Ok(logs) => {
|
||||
self.current_logs = VecDeque::from(logs);
|
||||
self.page_size = self.expected_page_size;
|
||||
rewake_with_new_state!(ctx, self, LogQueryState::Consume);
|
||||
}
|
||||
Err(err) => {
|
||||
if err.to_string().contains(TOO_MANY_LOGS_ERROR_MSG) {
|
||||
self.from_block = *from_block;
|
||||
self.page_size /= 2;
|
||||
rewake_with_new_state!(ctx, self, LogQueryState::Consume);
|
||||
} else {
|
||||
Poll::Ready(Some(Err(LogQueryError::LoadLogsError(err))))
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => Poll::Ready(Some(Err(LogQueryError::LoadLogsError(err)))),
|
||||
},
|
||||
}
|
||||
LogQueryState::Consume => {
|
||||
let log = self.current_logs.pop_front();
|
||||
if log.is_none() {
|
||||
@ -183,7 +208,11 @@ where
|
||||
provider.get_logs(&filter).await
|
||||
});
|
||||
|
||||
rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut));
|
||||
rewake_with_new_state!(
|
||||
ctx,
|
||||
self,
|
||||
LogQueryState::LoadLogs((Some(from_block), fut))
|
||||
);
|
||||
}
|
||||
} else {
|
||||
Poll::Ready(log.map(Ok))
|
||||
|
Loading…
Reference in New Issue
Block a user