mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-04-04 15:35:18 +00:00
Compare commits
No commits in common. "e20be63026aa0e3845b035ac840b2df858f3d41a" and "e3c199d3616632c1483372314da94502cd7feda1" have entirely different histories.
e20be63026
...
e3c199d361
@ -290,7 +290,6 @@ impl LogEntryFetcher {
|
|||||||
let contract = ZgsFlow::new(self.contract_address, self.provider.clone());
|
let contract = ZgsFlow::new(self.contract_address, self.provider.clone());
|
||||||
let provider = self.provider.clone();
|
let provider = self.provider.clone();
|
||||||
let confirmation_delay = self.confirmation_delay;
|
let confirmation_delay = self.confirmation_delay;
|
||||||
let log_page_size = self.log_page_size;
|
|
||||||
executor.spawn(
|
executor.spawn(
|
||||||
async move {
|
async move {
|
||||||
debug!("start_watch starts, start={}", start_block_number);
|
debug!("start_watch starts, start={}", start_block_number);
|
||||||
@ -306,7 +305,6 @@ impl LogEntryFetcher {
|
|||||||
confirmation_delay,
|
confirmation_delay,
|
||||||
&contract,
|
&contract,
|
||||||
&block_hash_cache,
|
&block_hash_cache,
|
||||||
log_page_size,
|
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
@ -342,7 +340,6 @@ impl LogEntryFetcher {
|
|||||||
confirmation_delay: u64,
|
confirmation_delay: u64,
|
||||||
contract: &ZgsFlow<Provider<RetryClient<Http>>>,
|
contract: &ZgsFlow<Provider<RetryClient<Http>>>,
|
||||||
block_hash_cache: &Arc<RwLock<BTreeMap<u64, Option<BlockHashAndSubmissionIndex>>>>,
|
block_hash_cache: &Arc<RwLock<BTreeMap<u64, Option<BlockHashAndSubmissionIndex>>>>,
|
||||||
log_page_size: u64,
|
|
||||||
) -> Result<Option<(u64, H256, Option<Option<u64>>)>> {
|
) -> Result<Option<(u64, H256, Option<Option<u64>>)>> {
|
||||||
let latest_block_number = provider.get_block_number().await?.as_u64();
|
let latest_block_number = provider.get_block_number().await?.as_u64();
|
||||||
debug!(
|
debug!(
|
||||||
@ -411,11 +408,8 @@ impl LogEntryFetcher {
|
|||||||
.to_block(to_block_number)
|
.to_block(to_block_number)
|
||||||
.address(contract.address().into())
|
.address(contract.address().into())
|
||||||
.filter;
|
.filter;
|
||||||
let mut stream = LogQuery::new(provider, &filter, Duration::from_millis(10))
|
|
||||||
.with_page_size(log_page_size);
|
|
||||||
let mut block_logs: BTreeMap<u64, Vec<Log>> = BTreeMap::new();
|
let mut block_logs: BTreeMap<u64, Vec<Log>> = BTreeMap::new();
|
||||||
while let Some(maybe_log) = stream.next().await {
|
for log in provider.get_logs(&filter).await? {
|
||||||
let log = maybe_log?;
|
|
||||||
let block_number = log
|
let block_number = log
|
||||||
.block_number
|
.block_number
|
||||||
.ok_or_else(|| anyhow!("block number missing"))?
|
.ok_or_else(|| anyhow!("block number missing"))?
|
||||||
@ -502,20 +496,20 @@ impl LogEntryFetcher {
|
|||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
};
|
};
|
||||||
for log in log_events.into_iter() {
|
|
||||||
if let Err(e) = watch_tx.send(log) {
|
|
||||||
warn!("send LogFetchProgress::Transaction failed: {:?}", e);
|
|
||||||
return Ok(progress);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if let Some(p) = &new_progress {
|
if let Some(p) = &new_progress {
|
||||||
if let Err(e) = watch_tx.send(LogFetchProgress::SyncedBlock(*p)) {
|
if let Err(e) = watch_tx.send(LogFetchProgress::SyncedBlock(*p)) {
|
||||||
warn!("send LogFetchProgress::SyncedBlock failed: {:?}", e);
|
warn!("send LogFetchProgress failed: {:?}", e);
|
||||||
return Ok(progress);
|
return Ok(progress);
|
||||||
} else {
|
} else {
|
||||||
block_hash_cache.write().await.insert(p.0, None);
|
block_hash_cache.write().await.insert(p.0, None);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
for log in log_events.into_iter() {
|
||||||
|
if let Err(e) = watch_tx.send(log) {
|
||||||
|
warn!("send log failed: {:?}", e);
|
||||||
|
return Ok(progress);
|
||||||
|
}
|
||||||
|
}
|
||||||
progress = new_progress;
|
progress = new_progress;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -339,7 +339,8 @@ impl LogSyncManager {
|
|||||||
LogFetchProgress::Transaction(tx) => {
|
LogFetchProgress::Transaction(tx) => {
|
||||||
if !self.put_tx(tx.clone()).await {
|
if !self.put_tx(tx.clone()).await {
|
||||||
// Unexpected error.
|
// Unexpected error.
|
||||||
bail!("log sync write error");
|
error!("log sync write error");
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
if let Err(e) = self.event_send.send(LogSyncEvent::TxSynced { tx }) {
|
if let Err(e) = self.event_send.send(LogSyncEvent::TxSynced { tx }) {
|
||||||
// TODO: Do we need to wait until all receivers are initialized?
|
// TODO: Do we need to wait until all receivers are initialized?
|
||||||
|
@ -7,8 +7,7 @@ use merkle_tree::RawLeafSha3Algorithm;
|
|||||||
use network::Multiaddr;
|
use network::Multiaddr;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use shared_types::{
|
use shared_types::{
|
||||||
compute_padded_chunk_size, compute_segment_size, DataRoot, FileProof, NetworkIdentity,
|
compute_padded_chunk_size, compute_segment_size, DataRoot, FileProof, Transaction, CHUNK_SIZE,
|
||||||
Transaction, CHUNK_SIZE,
|
|
||||||
};
|
};
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
use std::hash::Hasher;
|
use std::hash::Hasher;
|
||||||
@ -29,7 +28,6 @@ pub struct Status {
|
|||||||
pub connected_peers: usize,
|
pub connected_peers: usize,
|
||||||
pub log_sync_height: u64,
|
pub log_sync_height: u64,
|
||||||
pub log_sync_block: H256,
|
pub log_sync_block: H256,
|
||||||
pub network_identity: NetworkIdentity,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Serialize, Deserialize)]
|
||||||
|
@ -30,7 +30,6 @@ impl RpcServer for RpcServerImpl {
|
|||||||
connected_peers: self.ctx.network_globals.connected_peers(),
|
connected_peers: self.ctx.network_globals.connected_peers(),
|
||||||
log_sync_height: sync_progress.0,
|
log_sync_height: sync_progress.0,
|
||||||
log_sync_block: sync_progress.1,
|
log_sync_block: sync_progress.1,
|
||||||
network_identity: self.ctx.network_globals.network_id(),
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -370,7 +370,6 @@ impl TryFrom<FileProof> for FlowProof {
|
|||||||
#[derive(
|
#[derive(
|
||||||
DeriveEncode, DeriveDecode, Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize,
|
DeriveEncode, DeriveDecode, Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize,
|
||||||
)]
|
)]
|
||||||
#[serde(rename_all = "camelCase")]
|
|
||||||
pub struct NetworkIdentity {
|
pub struct NetworkIdentity {
|
||||||
/// The chain id of the blockchain network.
|
/// The chain id of the blockchain network.
|
||||||
pub chain_id: u64,
|
pub chain_id: u64,
|
||||||
@ -385,7 +384,6 @@ pub struct NetworkIdentity {
|
|||||||
#[derive(
|
#[derive(
|
||||||
DeriveEncode, DeriveDecode, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize,
|
DeriveEncode, DeriveDecode, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize,
|
||||||
)]
|
)]
|
||||||
#[serde(rename_all = "camelCase")]
|
|
||||||
pub struct ProtocolVersion {
|
pub struct ProtocolVersion {
|
||||||
pub major: u8,
|
pub major: u8,
|
||||||
pub minor: u8,
|
pub minor: u8,
|
||||||
|
Loading…
Reference in New Issue
Block a user