mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-04-04 15:35:18 +00:00
Compare commits
1 Commits
8f8b6db72a
...
ab7bdf908a
Author | SHA1 | Date | |
---|---|---|---|
![]() |
ab7bdf908a |
@ -1,5 +1,5 @@
|
||||
use crate::sync_manager::log_query::LogQuery;
|
||||
use crate::sync_manager::{metrics, RETRY_WAIT_MS};
|
||||
use crate::sync_manager::RETRY_WAIT_MS;
|
||||
use crate::ContractAddress;
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
use append_merkle::{Algorithm, Sha3Algorithm};
|
||||
@ -14,12 +14,15 @@ use shared_types::{DataRoot, Transaction};
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use std::time::Duration;
|
||||
use storage::log_store::{tx_store::BlockHashAndSubmissionIndex, Store};
|
||||
use task_executor::TaskExecutor;
|
||||
use tokio::sync::{
|
||||
mpsc::{UnboundedReceiver, UnboundedSender},
|
||||
RwLock,
|
||||
use tokio::{
|
||||
sync::{
|
||||
mpsc::{UnboundedReceiver, UnboundedSender},
|
||||
RwLock,
|
||||
},
|
||||
time::Instant,
|
||||
};
|
||||
|
||||
pub struct LogEntryFetcher {
|
||||
@ -239,7 +242,6 @@ impl LogEntryFetcher {
|
||||
);
|
||||
let (mut block_hash_sent, mut block_number_sent) = (None, None);
|
||||
while let Some(maybe_log) = stream.next().await {
|
||||
let start_time = Instant::now();
|
||||
match maybe_log {
|
||||
Ok(log) => {
|
||||
let sync_progress =
|
||||
@ -299,7 +301,6 @@ impl LogEntryFetcher {
|
||||
tokio::time::sleep(Duration::from_millis(RETRY_WAIT_MS)).await;
|
||||
}
|
||||
}
|
||||
metrics::RECOVER_LOG.update_since(start_time);
|
||||
}
|
||||
|
||||
info!("log recover end");
|
||||
|
@ -9,5 +9,5 @@ lazy_static::lazy_static! {
|
||||
|
||||
pub static ref STORE_PUT_TX_SPEED_IN_BYTES: Arc<dyn Gauge<usize>> = GaugeUsize::register("log_entry_sync_manager_put_tx_speed_in_bytes");
|
||||
|
||||
pub static ref RECOVER_LOG: Arc<dyn Timer> = register_timer("log_entry_sync_manager_recover_log");
|
||||
pub static ref FlOW_CONTRACT_ROOT: Arc<dyn Timer> = register_timer("log_manager_flow_contract_root");
|
||||
}
|
||||
|
@ -26,7 +26,6 @@ const RETRY_WAIT_MS: u64 = 500;
|
||||
// Each tx has less than 10KB, so the cache size should be acceptable.
|
||||
const BROADCAST_CHANNEL_CAPACITY: usize = 25000;
|
||||
const CATCH_UP_END_GAP: u64 = 10;
|
||||
const CHECK_ROOT_INTERVAL: u64 = 500;
|
||||
|
||||
/// Errors while handle data
|
||||
#[derive(Error, Debug)]
|
||||
@ -516,43 +515,43 @@ impl LogSyncManager {
|
||||
|
||||
// Check if the computed data root matches on-chain state.
|
||||
// If the call fails, we won't check the root here and return `true` directly.
|
||||
if self.next_tx_seq % CHECK_ROOT_INTERVAL == 0 {
|
||||
let flow_contract = self.log_fetcher.flow_contract();
|
||||
let flow_contract = self.log_fetcher.flow_contract();
|
||||
|
||||
match flow_contract
|
||||
.get_flow_root_by_tx_seq(tx.seq.into())
|
||||
.call()
|
||||
.await
|
||||
{
|
||||
Ok(contract_root_bytes) => {
|
||||
let contract_root = H256::from_slice(&contract_root_bytes);
|
||||
// contract_root is zero for tx submitted before upgrading.
|
||||
if !contract_root.is_zero() {
|
||||
match self.store.get_context() {
|
||||
Ok((local_root, _)) => {
|
||||
if contract_root != local_root {
|
||||
error!(
|
||||
?contract_root,
|
||||
?local_root,
|
||||
"local flow root and on-chain flow root mismatch"
|
||||
);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(?e, "fail to read the local flow root");
|
||||
let flow_time = Instant::now();
|
||||
match flow_contract
|
||||
.get_flow_root_by_tx_seq(tx.seq.into())
|
||||
.call()
|
||||
.await
|
||||
{
|
||||
Ok(contract_root_bytes) => {
|
||||
let contract_root = H256::from_slice(&contract_root_bytes);
|
||||
// contract_root is zero for tx submitted before upgrading.
|
||||
if !contract_root.is_zero() {
|
||||
match self.store.get_context() {
|
||||
Ok((local_root, _)) => {
|
||||
if contract_root != local_root {
|
||||
error!(
|
||||
?contract_root,
|
||||
?local_root,
|
||||
"local flow root and on-chain flow root mismatch"
|
||||
);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(?e, "fail to read the local flow root");
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(?e, "fail to read the on-chain flow root");
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(?e, "fail to read the on-chain flow root");
|
||||
}
|
||||
}
|
||||
metrics::FlOW_CONTRACT_ROOT.update_since(flow_time);
|
||||
|
||||
metrics::STORE_PUT_TX_SPEED_IN_BYTES
|
||||
.update((tx.size * 1000 / start_time.elapsed().as_micros() as u64) as usize);
|
||||
.update((tx.size / start_time.elapsed().as_millis() as u64) as usize);
|
||||
metrics::STORE_PUT_TX.update_since(start_time);
|
||||
|
||||
true
|
||||
|
Loading…
Reference in New Issue
Block a user