mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2024-11-20 15:05:19 +00:00
update next tx seq
This commit is contained in:
parent
b82183bf09
commit
4127333297
@ -1,6 +1,6 @@
|
|||||||
use ethers::prelude::{Filter, JsonRpcClient, Log, Middleware, Provider, ProviderError, U64};
|
use ethers::prelude::{Filter, JsonRpcClient, Log, Middleware, Provider, ProviderError, U64};
|
||||||
use futures_core::stream::Stream;
|
use futures_core::stream::Stream;
|
||||||
use jsonrpsee::tracing::{error, trace};
|
use jsonrpsee::tracing::{debug, error, trace};
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use std::{
|
use std::{
|
||||||
@ -152,6 +152,7 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
LogQueryState::LoadLogs((from_block, fut)) => {
|
LogQueryState::LoadLogs((from_block, fut)) => {
|
||||||
|
debug!("LoadLogs: loading logs from block={:?}", from_block);
|
||||||
match futures_util::ready!(fut.as_mut().poll(ctx)) {
|
match futures_util::ready!(fut.as_mut().poll(ctx)) {
|
||||||
Ok(logs) => {
|
Ok(logs) => {
|
||||||
self.current_logs = VecDeque::from(logs);
|
self.current_logs = VecDeque::from(logs);
|
||||||
|
@ -5,7 +5,7 @@ use anyhow::{anyhow, bail, Result};
|
|||||||
use ethereum_types::H256;
|
use ethereum_types::H256;
|
||||||
use ethers::{prelude::Middleware, types::BlockNumber};
|
use ethers::{prelude::Middleware, types::BlockNumber};
|
||||||
use futures::FutureExt;
|
use futures::FutureExt;
|
||||||
use jsonrpsee::tracing::{debug, error, trace, warn};
|
use jsonrpsee::tracing::{debug, error, warn};
|
||||||
use shared_types::{ChunkArray, Transaction};
|
use shared_types::{ChunkArray, Transaction};
|
||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
@ -56,7 +56,7 @@ impl LogSyncManager {
|
|||||||
store: Arc<dyn Store>,
|
store: Arc<dyn Store>,
|
||||||
) -> Result<(broadcast::Sender<LogSyncEvent>, oneshot::Receiver<()>)> {
|
) -> Result<(broadcast::Sender<LogSyncEvent>, oneshot::Receiver<()>)> {
|
||||||
let next_tx_seq = store.next_tx_seq();
|
let next_tx_seq = store.next_tx_seq();
|
||||||
|
debug!("LogSyncManager spawn next_tx_seq: {}", next_tx_seq);
|
||||||
let executor_clone = executor.clone();
|
let executor_clone = executor.clone();
|
||||||
let mut shutdown_sender = executor.shutdown_sender();
|
let mut shutdown_sender = executor.shutdown_sender();
|
||||||
|
|
||||||
@ -298,7 +298,7 @@ impl LogSyncManager {
|
|||||||
|
|
||||||
async fn handle_data(&mut self, mut rx: UnboundedReceiver<LogFetchProgress>) -> Result<()> {
|
async fn handle_data(&mut self, mut rx: UnboundedReceiver<LogFetchProgress>) -> Result<()> {
|
||||||
while let Some(data) = rx.recv().await {
|
while let Some(data) = rx.recv().await {
|
||||||
trace!("handle_data: data={:?}", data);
|
debug!("handle_data: data={:?}", data);
|
||||||
match data {
|
match data {
|
||||||
LogFetchProgress::SyncedBlock((
|
LogFetchProgress::SyncedBlock((
|
||||||
block_number,
|
block_number,
|
||||||
@ -387,7 +387,7 @@ impl LogSyncManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.data_cache.garbage_collect(self.next_tx_seq);
|
self.data_cache.garbage_collect(self.next_tx_seq);
|
||||||
self.next_tx_seq += 1;
|
self.next_tx_seq = self.store.next_tx_seq();
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user