mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-04-04 15:35:18 +00:00
Compare commits
1 Commits
8ee0b7ceeb
...
96212cbeac
Author | SHA1 | Date | |
---|---|---|---|
![]() |
96212cbeac |
@ -14,8 +14,6 @@ use thiserror::Error;
|
|||||||
pub(crate) type PinBoxFut<'a, T> =
|
pub(crate) type PinBoxFut<'a, T> =
|
||||||
Pin<Box<dyn Future<Output = Result<T, ProviderError>> + Send + 'a>>;
|
Pin<Box<dyn Future<Output = Result<T, ProviderError>> + Send + 'a>>;
|
||||||
|
|
||||||
const TOO_MANY_LOGS_ERROR_MSG: &str = "query returned more than";
|
|
||||||
|
|
||||||
/// A log query provides streaming access to historical logs via a paginated
|
/// A log query provides streaming access to historical logs via a paginated
|
||||||
/// request. For streaming access to future logs, use [`Middleware::watch`] or
|
/// request. For streaming access to future logs, use [`Middleware::watch`] or
|
||||||
/// [`Middleware::subscribe_logs`]
|
/// [`Middleware::subscribe_logs`]
|
||||||
@ -23,9 +21,6 @@ pub struct LogQuery<'a, P> {
|
|||||||
provider: &'a Provider<P>,
|
provider: &'a Provider<P>,
|
||||||
filter: Filter,
|
filter: Filter,
|
||||||
from_block: Option<U64>,
|
from_block: Option<U64>,
|
||||||
|
|
||||||
expected_page_size: u64,
|
|
||||||
/// It may be smaller than `expected_page_size` if the server cannot return all the logs.
|
|
||||||
page_size: u64,
|
page_size: u64,
|
||||||
current_logs: VecDeque<Log>,
|
current_logs: VecDeque<Log>,
|
||||||
last_block: Option<U64>,
|
last_block: Option<U64>,
|
||||||
@ -36,8 +31,7 @@ pub struct LogQuery<'a, P> {
|
|||||||
enum LogQueryState<'a> {
|
enum LogQueryState<'a> {
|
||||||
Initial,
|
Initial,
|
||||||
LoadLastBlock(PinBoxFut<'a, U64>),
|
LoadLastBlock(PinBoxFut<'a, U64>),
|
||||||
/// `(from_block, get_logs_fut)`. `from_block` is used to resume if the request fails.
|
LoadLogs(PinBoxFut<'a, Vec<Log>>),
|
||||||
LoadLogs((Option<U64>, PinBoxFut<'a, Vec<Log>>)),
|
|
||||||
Consume,
|
Consume,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -51,7 +45,6 @@ where
|
|||||||
provider,
|
provider,
|
||||||
filter: filter.clone(),
|
filter: filter.clone(),
|
||||||
from_block: filter.get_from_block(),
|
from_block: filter.get_from_block(),
|
||||||
expected_page_size: 10000,
|
|
||||||
page_size: 10000,
|
page_size: 10000,
|
||||||
current_logs: VecDeque::new(),
|
current_logs: VecDeque::new(),
|
||||||
last_block: None,
|
last_block: None,
|
||||||
@ -63,7 +56,6 @@ where
|
|||||||
/// set page size for pagination
|
/// set page size for pagination
|
||||||
pub fn with_page_size(mut self, page_size: u64) -> Self {
|
pub fn with_page_size(mut self, page_size: u64) -> Self {
|
||||||
self.page_size = page_size;
|
self.page_size = page_size;
|
||||||
self.expected_page_size = page_size;
|
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -106,7 +98,7 @@ where
|
|||||||
tokio::time::sleep(delay).await;
|
tokio::time::sleep(delay).await;
|
||||||
provider.get_logs(&filter).await
|
provider.get_logs(&filter).await
|
||||||
});
|
});
|
||||||
rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs((None, fut)));
|
rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut));
|
||||||
} else {
|
} else {
|
||||||
// if paginatable, load last block
|
// if paginatable, load last block
|
||||||
let fut = match self.filter.get_to_block() {
|
let fut = match self.filter.get_to_block() {
|
||||||
@ -142,33 +134,18 @@ where
|
|||||||
tokio::time::sleep(delay).await;
|
tokio::time::sleep(delay).await;
|
||||||
provider.get_logs(&filter).await
|
provider.get_logs(&filter).await
|
||||||
});
|
});
|
||||||
rewake_with_new_state!(
|
rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut));
|
||||||
ctx,
|
|
||||||
self,
|
|
||||||
LogQueryState::LoadLogs((Some(from_block), fut))
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
Err(err) => Poll::Ready(Some(Err(LogQueryError::LoadLastBlockError(err)))),
|
Err(err) => Poll::Ready(Some(Err(LogQueryError::LoadLastBlockError(err)))),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
LogQueryState::LoadLogs((from_block, fut)) => {
|
LogQueryState::LoadLogs(fut) => match futures_util::ready!(fut.as_mut().poll(ctx)) {
|
||||||
match futures_util::ready!(fut.as_mut().poll(ctx)) {
|
|
||||||
Ok(logs) => {
|
Ok(logs) => {
|
||||||
self.current_logs = VecDeque::from(logs);
|
self.current_logs = VecDeque::from(logs);
|
||||||
self.page_size = self.expected_page_size;
|
|
||||||
rewake_with_new_state!(ctx, self, LogQueryState::Consume);
|
rewake_with_new_state!(ctx, self, LogQueryState::Consume);
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => Poll::Ready(Some(Err(LogQueryError::LoadLogsError(err)))),
|
||||||
if err.to_string().contains(TOO_MANY_LOGS_ERROR_MSG) {
|
},
|
||||||
self.from_block = *from_block;
|
|
||||||
self.page_size /= 2;
|
|
||||||
rewake_with_new_state!(ctx, self, LogQueryState::Consume);
|
|
||||||
} else {
|
|
||||||
Poll::Ready(Some(Err(LogQueryError::LoadLogsError(err))))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
LogQueryState::Consume => {
|
LogQueryState::Consume => {
|
||||||
let log = self.current_logs.pop_front();
|
let log = self.current_logs.pop_front();
|
||||||
if log.is_none() {
|
if log.is_none() {
|
||||||
@ -206,11 +183,7 @@ where
|
|||||||
provider.get_logs(&filter).await
|
provider.get_logs(&filter).await
|
||||||
});
|
});
|
||||||
|
|
||||||
rewake_with_new_state!(
|
rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut));
|
||||||
ctx,
|
|
||||||
self,
|
|
||||||
LogQueryState::LoadLogs((Some(from_block), fut))
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
Poll::Ready(log.map(Ok))
|
Poll::Ready(log.map(Ok))
|
||||||
|
@ -21,8 +21,7 @@ use tokio::sync::{oneshot, RwLock};
|
|||||||
const RETRY_WAIT_MS: u64 = 500;
|
const RETRY_WAIT_MS: u64 = 500;
|
||||||
|
|
||||||
// A RPC query can return at most 10000 entries.
|
// A RPC query can return at most 10000 entries.
|
||||||
// Each tx has less than 10KB, so the cache size should be acceptable.
|
const BROADCAST_CHANNEL_CAPACITY: usize = 10000;
|
||||||
const BROADCAST_CHANNEL_CAPACITY: usize = 25000;
|
|
||||||
const CATCH_UP_END_GAP: u64 = 10;
|
const CATCH_UP_END_GAP: u64 = 10;
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
|
Loading…
Reference in New Issue
Block a user