Compare commits

..

1 Commits

Author SHA1 Message Date
CryptoFennec
96212cbeac
Merge 5a25ec49dd into 9189cabbb2 2024-08-08 18:36:31 +08:00
2 changed files with 11 additions and 39 deletions

View File

@ -14,8 +14,6 @@ use thiserror::Error;
pub(crate) type PinBoxFut<'a, T> = pub(crate) type PinBoxFut<'a, T> =
Pin<Box<dyn Future<Output = Result<T, ProviderError>> + Send + 'a>>; Pin<Box<dyn Future<Output = Result<T, ProviderError>> + Send + 'a>>;
const TOO_MANY_LOGS_ERROR_MSG: &str = "query returned more than";
/// A log query provides streaming access to historical logs via a paginated /// A log query provides streaming access to historical logs via a paginated
/// request. For streaming access to future logs, use [`Middleware::watch`] or /// request. For streaming access to future logs, use [`Middleware::watch`] or
/// [`Middleware::subscribe_logs`] /// [`Middleware::subscribe_logs`]
@ -23,9 +21,6 @@ pub struct LogQuery<'a, P> {
provider: &'a Provider<P>, provider: &'a Provider<P>,
filter: Filter, filter: Filter,
from_block: Option<U64>, from_block: Option<U64>,
expected_page_size: u64,
/// It may be smaller than `expected_page_size` if the server cannot return all the logs.
page_size: u64, page_size: u64,
current_logs: VecDeque<Log>, current_logs: VecDeque<Log>,
last_block: Option<U64>, last_block: Option<U64>,
@ -36,8 +31,7 @@ pub struct LogQuery<'a, P> {
enum LogQueryState<'a> { enum LogQueryState<'a> {
Initial, Initial,
LoadLastBlock(PinBoxFut<'a, U64>), LoadLastBlock(PinBoxFut<'a, U64>),
/// `(from_block, get_logs_fut)`. `from_block` is used to resume if the request fails. LoadLogs(PinBoxFut<'a, Vec<Log>>),
LoadLogs((Option<U64>, PinBoxFut<'a, Vec<Log>>)),
Consume, Consume,
} }
@ -51,7 +45,6 @@ where
provider, provider,
filter: filter.clone(), filter: filter.clone(),
from_block: filter.get_from_block(), from_block: filter.get_from_block(),
expected_page_size: 10000,
page_size: 10000, page_size: 10000,
current_logs: VecDeque::new(), current_logs: VecDeque::new(),
last_block: None, last_block: None,
@ -63,7 +56,6 @@ where
/// set page size for pagination /// set page size for pagination
pub fn with_page_size(mut self, page_size: u64) -> Self { pub fn with_page_size(mut self, page_size: u64) -> Self {
self.page_size = page_size; self.page_size = page_size;
self.expected_page_size = page_size;
self self
} }
} }
@ -106,7 +98,7 @@ where
tokio::time::sleep(delay).await; tokio::time::sleep(delay).await;
provider.get_logs(&filter).await provider.get_logs(&filter).await
}); });
rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs((None, fut))); rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut));
} else { } else {
// if paginatable, load last block // if paginatable, load last block
let fut = match self.filter.get_to_block() { let fut = match self.filter.get_to_block() {
@ -142,33 +134,18 @@ where
tokio::time::sleep(delay).await; tokio::time::sleep(delay).await;
provider.get_logs(&filter).await provider.get_logs(&filter).await
}); });
rewake_with_new_state!( rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut));
ctx,
self,
LogQueryState::LoadLogs((Some(from_block), fut))
);
} }
Err(err) => Poll::Ready(Some(Err(LogQueryError::LoadLastBlockError(err)))), Err(err) => Poll::Ready(Some(Err(LogQueryError::LoadLastBlockError(err)))),
} }
} }
LogQueryState::LoadLogs((from_block, fut)) => { LogQueryState::LoadLogs(fut) => match futures_util::ready!(fut.as_mut().poll(ctx)) {
match futures_util::ready!(fut.as_mut().poll(ctx)) {
Ok(logs) => { Ok(logs) => {
self.current_logs = VecDeque::from(logs); self.current_logs = VecDeque::from(logs);
self.page_size = self.expected_page_size;
rewake_with_new_state!(ctx, self, LogQueryState::Consume); rewake_with_new_state!(ctx, self, LogQueryState::Consume);
} }
Err(err) => { Err(err) => Poll::Ready(Some(Err(LogQueryError::LoadLogsError(err)))),
if err.to_string().contains(TOO_MANY_LOGS_ERROR_MSG) { },
self.from_block = *from_block;
self.page_size /= 2;
rewake_with_new_state!(ctx, self, LogQueryState::Consume);
} else {
Poll::Ready(Some(Err(LogQueryError::LoadLogsError(err))))
}
}
}
}
LogQueryState::Consume => { LogQueryState::Consume => {
let log = self.current_logs.pop_front(); let log = self.current_logs.pop_front();
if log.is_none() { if log.is_none() {
@ -206,11 +183,7 @@ where
provider.get_logs(&filter).await provider.get_logs(&filter).await
}); });
rewake_with_new_state!( rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut));
ctx,
self,
LogQueryState::LoadLogs((Some(from_block), fut))
);
} }
} else { } else {
Poll::Ready(log.map(Ok)) Poll::Ready(log.map(Ok))

View File

@ -21,8 +21,7 @@ use tokio::sync::{oneshot, RwLock};
const RETRY_WAIT_MS: u64 = 500; const RETRY_WAIT_MS: u64 = 500;
// A RPC query can return at most 10000 entries. // A RPC query can return at most 10000 entries.
// Each tx has less than 10KB, so the cache size should be acceptable. const BROADCAST_CHANNEL_CAPACITY: usize = 10000;
const BROADCAST_CHANNEL_CAPACITY: usize = 25000;
const CATCH_UP_END_GAP: u64 = 10; const CATCH_UP_END_GAP: u64 = 10;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]