mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-04-04 15:35:18 +00:00
Compare commits
No commits in common. "a153955246b70e1075ab647bf3bb4c7240b5e463" and "a9f5169c15fe0546d9f26fed08cd4f3b410087c7" have entirely different histories.
a153955246
...
a9f5169c15
@ -145,15 +145,6 @@ impl LogEntryFetcher {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let log_latest_block_number = match store.get_log_latest_block_number() {
|
|
||||||
Ok(Some(b)) => b,
|
|
||||||
Ok(None) => 0,
|
|
||||||
Err(e) => {
|
|
||||||
error!("get log latest block number error: e={:?}", e);
|
|
||||||
0
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Some(processed_block_number) = processed_block_number {
|
if let Some(processed_block_number) = processed_block_number {
|
||||||
let finalized_block_number =
|
let finalized_block_number =
|
||||||
match provider.get_block(BlockNumber::Finalized).await {
|
match provider.get_block(BlockNumber::Finalized).await {
|
||||||
@ -177,24 +168,25 @@ impl LogEntryFetcher {
|
|||||||
};
|
};
|
||||||
|
|
||||||
if let Some(finalized_block_number) = finalized_block_number {
|
if let Some(finalized_block_number) = finalized_block_number {
|
||||||
let safe_block_number = std::cmp::min(
|
if processed_block_number >= finalized_block_number {
|
||||||
std::cmp::min(log_latest_block_number, finalized_block_number),
|
let mut pending_keys = vec![];
|
||||||
processed_block_number,
|
for (key, _) in block_hash_cache.read().await.iter() {
|
||||||
);
|
if *key < finalized_block_number {
|
||||||
let mut pending_keys = vec![];
|
pending_keys.push(*key);
|
||||||
for (key, _) in block_hash_cache.read().await.iter() {
|
} else {
|
||||||
if *key < safe_block_number {
|
break;
|
||||||
pending_keys.push(*key);
|
}
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
for key in pending_keys.into_iter() {
|
for key in pending_keys.into_iter() {
|
||||||
if let Err(e) = store.delete_block_hash_by_number(key) {
|
if let Err(e) = store.delete_block_hash_by_number(key) {
|
||||||
error!("remove block tx for number {} error: e={:?}", key, e);
|
error!(
|
||||||
} else {
|
"remove block tx for number {} error: e={:?}",
|
||||||
block_hash_cache.write().await.remove(&key);
|
key, e
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
block_hash_cache.write().await.remove(&key);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -321,7 +313,6 @@ impl LogEntryFetcher {
|
|||||||
&mut progress_reset_history,
|
&mut progress_reset_history,
|
||||||
watch_loop_wait_time_ms,
|
watch_loop_wait_time_ms,
|
||||||
&block_hash_cache,
|
&block_hash_cache,
|
||||||
provider.as_ref(),
|
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
@ -393,10 +384,6 @@ impl LogEntryFetcher {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
if block.logs_bloom.is_none() {
|
|
||||||
bail!("block {:?} logs bloom is none", block.number);
|
|
||||||
}
|
|
||||||
|
|
||||||
if from_block_number > 0 && block.parent_hash != parent_block_hash {
|
if from_block_number > 0 && block.parent_hash != parent_block_hash {
|
||||||
// reorg happened
|
// reorg happened
|
||||||
let (parent_block_number, block_hash) = revert_one_block(
|
let (parent_block_number, block_hash) = revert_one_block(
|
||||||
@ -425,22 +412,13 @@ impl LogEntryFetcher {
|
|||||||
block.number
|
block.number
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
if parent_block_hash.is_none() || Some(block.parent_hash) != parent_block_hash {
|
if Some(block.parent_hash) != parent_block_hash {
|
||||||
bail!(
|
bail!(
|
||||||
"parent block hash mismatch, expected {:?}, actual {}",
|
"parent block hash mismatch, expected {:?}, actual {}",
|
||||||
parent_block_hash,
|
parent_block_hash,
|
||||||
block.parent_hash
|
block.parent_hash
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
if block_number == to_block_number && block.hash.is_none() {
|
|
||||||
bail!("block {:?} hash is none", block.number);
|
|
||||||
}
|
|
||||||
|
|
||||||
if block.logs_bloom.is_none() {
|
|
||||||
bail!("block {:?} logs bloom is none", block.number);
|
|
||||||
}
|
|
||||||
|
|
||||||
parent_block_hash = block.hash;
|
parent_block_hash = block.hash;
|
||||||
blocks.insert(block_number, block);
|
blocks.insert(block_number, block);
|
||||||
}
|
}
|
||||||
@ -492,7 +470,7 @@ impl LogEntryFetcher {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let tx = txs_hm[&log.transaction_index];
|
let tx = txs_hm[&log.transaction_index];
|
||||||
if log.transaction_hash.is_none() || log.transaction_hash != Some(tx.hash) {
|
if log.transaction_hash != Some(tx.hash) {
|
||||||
warn!(
|
warn!(
|
||||||
"log tx hash mismatch, log transaction {:?}, block transaction {:?}",
|
"log tx hash mismatch, log transaction {:?}, block transaction {:?}",
|
||||||
log.transaction_hash,
|
log.transaction_hash,
|
||||||
@ -500,9 +478,7 @@ impl LogEntryFetcher {
|
|||||||
);
|
);
|
||||||
return Ok(progress);
|
return Ok(progress);
|
||||||
}
|
}
|
||||||
if log.transaction_index.is_none()
|
if log.transaction_index != tx.transaction_index {
|
||||||
|| log.transaction_index != tx.transaction_index
|
|
||||||
{
|
|
||||||
warn!(
|
warn!(
|
||||||
"log tx index mismatch, log tx index {:?}, block transaction index {:?}",
|
"log tx index mismatch, log tx index {:?}, block transaction index {:?}",
|
||||||
log.transaction_index,
|
log.transaction_index,
|
||||||
@ -589,7 +565,6 @@ async fn check_watch_process(
|
|||||||
progress_reset_history: &mut BTreeMap<u64, (Instant, usize)>,
|
progress_reset_history: &mut BTreeMap<u64, (Instant, usize)>,
|
||||||
watch_loop_wait_time_ms: u64,
|
watch_loop_wait_time_ms: u64,
|
||||||
block_hash_cache: &Arc<RwLock<BTreeMap<u64, Option<BlockHashAndSubmissionIndex>>>>,
|
block_hash_cache: &Arc<RwLock<BTreeMap<u64, Option<BlockHashAndSubmissionIndex>>>>,
|
||||||
provider: &Provider<RetryClient<Http>>,
|
|
||||||
) {
|
) {
|
||||||
let mut min_received_progress = None;
|
let mut min_received_progress = None;
|
||||||
while let Ok(v) = watch_progress_rx.try_recv() {
|
while let Ok(v) = watch_progress_rx.try_recv() {
|
||||||
@ -651,21 +626,7 @@ async fn check_watch_process(
|
|||||||
tokio::time::sleep(Duration::from_secs(RETRY_WAIT_MS)).await;
|
tokio::time::sleep(Duration::from_secs(RETRY_WAIT_MS)).await;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
warn!(
|
panic!("parent block {} expect exist", *progress - 1);
|
||||||
"get block hash for block {} from RPC, assume there is no org",
|
|
||||||
*progress - 1
|
|
||||||
);
|
|
||||||
match provider.get_block(*progress - 1).await {
|
|
||||||
Ok(Some(v)) => {
|
|
||||||
v.hash.expect("parent block hash expect exist");
|
|
||||||
}
|
|
||||||
Ok(None) => {
|
|
||||||
panic!("parent block {} expect exist", *progress - 1);
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
panic!("parent block {} expect exist, error {}", *progress - 1, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -14,7 +14,6 @@ use std::sync::Arc;
|
|||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
use storage::log_store::{tx_store::BlockHashAndSubmissionIndex, Store};
|
use storage::log_store::{tx_store::BlockHashAndSubmissionIndex, Store};
|
||||||
use task_executor::{ShutdownReason, TaskExecutor};
|
use task_executor::{ShutdownReason, TaskExecutor};
|
||||||
use thiserror::Error;
|
|
||||||
use tokio::sync::broadcast;
|
use tokio::sync::broadcast;
|
||||||
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
|
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
|
||||||
use tokio::sync::{oneshot, RwLock};
|
use tokio::sync::{oneshot, RwLock};
|
||||||
@ -26,17 +25,6 @@ const RETRY_WAIT_MS: u64 = 500;
|
|||||||
const BROADCAST_CHANNEL_CAPACITY: usize = 25000;
|
const BROADCAST_CHANNEL_CAPACITY: usize = 25000;
|
||||||
const CATCH_UP_END_GAP: u64 = 10;
|
const CATCH_UP_END_GAP: u64 = 10;
|
||||||
|
|
||||||
/// Errors while handle data
|
|
||||||
#[derive(Error, Debug)]
|
|
||||||
pub enum HandleDataError {
|
|
||||||
/// Sequence Error
|
|
||||||
#[error("transaction seq is great than expected, expect block number {0}")]
|
|
||||||
SeqError(u64),
|
|
||||||
/// Other Errors
|
|
||||||
#[error("{0}")]
|
|
||||||
CommonError(#[from] anyhow::Error),
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub enum LogSyncEvent {
|
pub enum LogSyncEvent {
|
||||||
/// Chain reorg detected without any operation yet.
|
/// Chain reorg detected without any operation yet.
|
||||||
@ -201,51 +189,13 @@ impl LogSyncManager {
|
|||||||
} else {
|
} else {
|
||||||
// Keep catching-up data until we are close to the latest height.
|
// Keep catching-up data until we are close to the latest height.
|
||||||
loop {
|
loop {
|
||||||
// wait tx receipt is ready
|
log_sync_manager
|
||||||
if let Ok(Some(block)) = log_sync_manager
|
|
||||||
.log_fetcher
|
|
||||||
.provider()
|
|
||||||
.get_block_with_txs(finalized_block_number)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
if let Some(tx) = block.transactions.first() {
|
|
||||||
loop {
|
|
||||||
match log_sync_manager
|
|
||||||
.log_fetcher
|
|
||||||
.provider()
|
|
||||||
.get_transaction_receipt(tx.hash)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(Some(_)) => break,
|
|
||||||
_ => {
|
|
||||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
while let Err(e) = log_sync_manager
|
|
||||||
.catch_up_data(
|
.catch_up_data(
|
||||||
executor_clone.clone(),
|
executor_clone.clone(),
|
||||||
start_block_number,
|
start_block_number,
|
||||||
finalized_block_number,
|
finalized_block_number,
|
||||||
)
|
)
|
||||||
.await
|
.await?;
|
||||||
{
|
|
||||||
match e {
|
|
||||||
HandleDataError::SeqError(block_number) => {
|
|
||||||
warn!("seq error occurred, retry from {}", block_number);
|
|
||||||
start_block_number = block_number;
|
|
||||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
return Err(e.into());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
start_block_number = finalized_block_number.saturating_add(1);
|
start_block_number = finalized_block_number.saturating_add(1);
|
||||||
|
|
||||||
let new_finalized_block =
|
let new_finalized_block =
|
||||||
@ -264,18 +214,6 @@ impl LogSyncManager {
|
|||||||
warn!("catch_up_end send fails, possibly auto_sync is not enabled");
|
warn!("catch_up_end send fails, possibly auto_sync is not enabled");
|
||||||
}
|
}
|
||||||
|
|
||||||
log_sync_manager
|
|
||||||
.log_fetcher
|
|
||||||
.start_remove_finalized_block_task(
|
|
||||||
&executor_clone,
|
|
||||||
log_sync_manager.store.clone(),
|
|
||||||
log_sync_manager.block_hash_cache.clone(),
|
|
||||||
log_sync_manager.config.default_finalized_block_count,
|
|
||||||
log_sync_manager
|
|
||||||
.config
|
|
||||||
.remove_finalized_block_interval_minutes,
|
|
||||||
);
|
|
||||||
|
|
||||||
let (watch_progress_tx, watch_progress_rx) =
|
let (watch_progress_tx, watch_progress_rx) =
|
||||||
tokio::sync::mpsc::unbounded_channel();
|
tokio::sync::mpsc::unbounded_channel();
|
||||||
let watch_rx = log_sync_manager.log_fetcher.start_watch(
|
let watch_rx = log_sync_manager.log_fetcher.start_watch(
|
||||||
@ -358,7 +296,7 @@ impl LogSyncManager {
|
|||||||
&mut self,
|
&mut self,
|
||||||
mut rx: UnboundedReceiver<LogFetchProgress>,
|
mut rx: UnboundedReceiver<LogFetchProgress>,
|
||||||
watch_progress_tx: &Option<UnboundedSender<u64>>,
|
watch_progress_tx: &Option<UnboundedSender<u64>>,
|
||||||
) -> Result<(), HandleDataError> {
|
) -> Result<()> {
|
||||||
let mut log_latest_block_number =
|
let mut log_latest_block_number =
|
||||||
if let Some(block_number) = self.store.get_log_latest_block_number()? {
|
if let Some(block_number) = self.store.get_log_latest_block_number()? {
|
||||||
block_number
|
block_number
|
||||||
@ -424,15 +362,13 @@ impl LogSyncManager {
|
|||||||
} else {
|
} else {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
return Err(HandleDataError::SeqError(log_latest_block_number));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if stop {
|
if stop {
|
||||||
// Unexpected error.
|
// Unexpected error.
|
||||||
return Err(anyhow!("log sync write error").into());
|
bail!("log sync write error");
|
||||||
}
|
}
|
||||||
if let Err(e) = self.event_send.send(LogSyncEvent::TxSynced { tx }) {
|
if let Err(e) = self.event_send.send(LogSyncEvent::TxSynced { tx }) {
|
||||||
// TODO: Do we need to wait until all receivers are initialized?
|
// TODO: Do we need to wait until all receivers are initialized?
|
||||||
@ -511,7 +447,7 @@ impl LogSyncManager {
|
|||||||
executor_clone: TaskExecutor,
|
executor_clone: TaskExecutor,
|
||||||
start_block_number: u64,
|
start_block_number: u64,
|
||||||
finalized_block_number: u64,
|
finalized_block_number: u64,
|
||||||
) -> Result<(), HandleDataError> {
|
) -> Result<()> {
|
||||||
if start_block_number < finalized_block_number {
|
if start_block_number < finalized_block_number {
|
||||||
let recover_rx = self.log_fetcher.start_recover(
|
let recover_rx = self.log_fetcher.start_recover(
|
||||||
start_block_number,
|
start_block_number,
|
||||||
@ -521,6 +457,14 @@ impl LogSyncManager {
|
|||||||
);
|
);
|
||||||
self.handle_data(recover_rx, &None).await?;
|
self.handle_data(recover_rx, &None).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
self.log_fetcher.start_remove_finalized_block_task(
|
||||||
|
&executor_clone,
|
||||||
|
self.store.clone(),
|
||||||
|
self.block_hash_cache.clone(),
|
||||||
|
self.config.default_finalized_block_count,
|
||||||
|
self.config.remove_finalized_block_interval_minutes,
|
||||||
|
);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -545,10 +489,6 @@ async fn get_start_block_number_with_hash(
|
|||||||
.get(&block_number)
|
.get(&block_number)
|
||||||
{
|
{
|
||||||
return Ok((block_number, val.block_hash));
|
return Ok((block_number, val.block_hash));
|
||||||
} else {
|
|
||||||
warn!("get block hash for block {} from RPC", block_number);
|
|
||||||
let block_hash = log_sync_manager.get_block(block_number.into()).await?.1;
|
|
||||||
return Ok((block_number, block_hash));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -29,7 +29,6 @@ pub struct Status {
|
|||||||
pub connected_peers: usize,
|
pub connected_peers: usize,
|
||||||
pub log_sync_height: u64,
|
pub log_sync_height: u64,
|
||||||
pub log_sync_block: H256,
|
pub log_sync_block: H256,
|
||||||
pub next_tx_seq: u64,
|
|
||||||
pub network_identity: NetworkIdentity,
|
pub network_identity: NetworkIdentity,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -26,13 +26,10 @@ impl RpcServer for RpcServerImpl {
|
|||||||
.get_sync_progress()?
|
.get_sync_progress()?
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
|
|
||||||
let next_tx_seq = self.ctx.log_store.get_store().next_tx_seq();
|
|
||||||
|
|
||||||
Ok(Status {
|
Ok(Status {
|
||||||
connected_peers: self.ctx.network_globals.connected_peers(),
|
connected_peers: self.ctx.network_globals.connected_peers(),
|
||||||
log_sync_height: sync_progress.0,
|
log_sync_height: sync_progress.0,
|
||||||
log_sync_block: sync_progress.1,
|
log_sync_block: sync_progress.1,
|
||||||
next_tx_seq,
|
|
||||||
network_identity: self.ctx.network_globals.network_id(),
|
network_identity: self.ctx.network_globals.network_id(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user