mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-04-04 15:35:18 +00:00
Compare commits
4 Commits
f7bf5aecc8
...
f351bf799f
Author | SHA1 | Date | |
---|---|---|---|
![]() |
f351bf799f | ||
![]() |
2947cb7ac6 | ||
![]() |
39efb721c5 | ||
![]() |
9fe5a2c18b |
@ -236,15 +236,19 @@ impl LogEntryFetcher {
|
|||||||
.filter;
|
.filter;
|
||||||
let mut stream = LogQuery::new(&provider, &filter, log_query_delay)
|
let mut stream = LogQuery::new(&provider, &filter, log_query_delay)
|
||||||
.with_page_size(log_page_size);
|
.with_page_size(log_page_size);
|
||||||
debug!(
|
info!(
|
||||||
"start_recover starts, start={} end={}",
|
"start_recover starts, start={} end={}",
|
||||||
start_block_number, end_block_number
|
start_block_number, end_block_number
|
||||||
);
|
);
|
||||||
|
let (mut block_hash_sent, mut block_number_sent) = (None, None);
|
||||||
while let Some(maybe_log) = stream.next().await {
|
while let Some(maybe_log) = stream.next().await {
|
||||||
match maybe_log {
|
match maybe_log {
|
||||||
Ok(log) => {
|
Ok(log) => {
|
||||||
let sync_progress =
|
let sync_progress =
|
||||||
if log.block_hash.is_some() && log.block_number.is_some() {
|
if log.block_hash.is_some() && log.block_number.is_some() {
|
||||||
|
if block_hash_sent != log.block_hash
|
||||||
|
|| block_number_sent != log.block_number
|
||||||
|
{
|
||||||
let synced_block = LogFetchProgress::SyncedBlock((
|
let synced_block = LogFetchProgress::SyncedBlock((
|
||||||
log.block_number.unwrap().as_u64(),
|
log.block_number.unwrap().as_u64(),
|
||||||
log.block_hash.unwrap(),
|
log.block_hash.unwrap(),
|
||||||
@ -254,6 +258,9 @@ impl LogEntryFetcher {
|
|||||||
Some(synced_block)
|
Some(synced_block)
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
None
|
||||||
};
|
};
|
||||||
debug!("recover: progress={:?}", sync_progress);
|
debug!("recover: progress={:?}", sync_progress);
|
||||||
|
|
||||||
@ -268,11 +275,17 @@ impl LogEntryFetcher {
|
|||||||
log.block_number.expect("block number exist").as_u64(),
|
log.block_number.expect("block number exist").as_u64(),
|
||||||
))
|
))
|
||||||
.and_then(|_| match sync_progress {
|
.and_then(|_| match sync_progress {
|
||||||
Some(b) => recover_tx.send(b),
|
Some(b) => {
|
||||||
|
recover_tx.send(b)?;
|
||||||
|
block_hash_sent = log.block_hash;
|
||||||
|
block_number_sent = log.block_number;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
None => Ok(()),
|
None => Ok(()),
|
||||||
})
|
})
|
||||||
{
|
{
|
||||||
error!("send error: e={:?}", e);
|
error!("send error: e={:?}", e);
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
@ -289,6 +302,8 @@ impl LogEntryFetcher {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
info!("log recover end");
|
||||||
},
|
},
|
||||||
"log recover",
|
"log recover",
|
||||||
);
|
);
|
||||||
|
@ -67,8 +67,8 @@ impl MinerConfig {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn make_provider(&self) -> Result<MineServiceMiddleware, String> {
|
pub(crate) fn make_provider(&self) -> Result<Arc<Provider<RetryClient<Http>>>, String> {
|
||||||
let provider = Arc::new(Provider::new(
|
Ok(Arc::new(Provider::new(
|
||||||
RetryClientBuilder::default()
|
RetryClientBuilder::default()
|
||||||
.rate_limit_retries(self.rate_limit_retries)
|
.rate_limit_retries(self.rate_limit_retries)
|
||||||
.timeout_retries(self.timeout_retries)
|
.timeout_retries(self.timeout_retries)
|
||||||
@ -78,7 +78,11 @@ impl MinerConfig {
|
|||||||
.map_err(|e| format!("Cannot parse blockchain endpoint: {:?}", e))?,
|
.map_err(|e| format!("Cannot parse blockchain endpoint: {:?}", e))?,
|
||||||
Box::new(HttpRateLimitRetryPolicy),
|
Box::new(HttpRateLimitRetryPolicy),
|
||||||
),
|
),
|
||||||
));
|
)))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn make_signing_provider(&self) -> Result<MineServiceMiddleware, String> {
|
||||||
|
let provider = self.make_provider()?;
|
||||||
let chain_id = provider
|
let chain_id = provider
|
||||||
.get_chainid()
|
.get_chainid()
|
||||||
.await
|
.await
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
use std::{collections::BTreeMap, sync::Arc};
|
use std::{collections::BTreeMap, sync::Arc};
|
||||||
|
|
||||||
use ethereum_types::H256;
|
use ethereum_types::H256;
|
||||||
|
use ethers::prelude::{Http, Provider, RetryClient};
|
||||||
use tokio::time::{sleep, Duration, Instant};
|
use tokio::time::{sleep, Duration, Instant};
|
||||||
|
|
||||||
use contract_interface::{EpochRangeWithContextDigest, ZgsFlow};
|
use contract_interface::{EpochRangeWithContextDigest, ZgsFlow};
|
||||||
@ -12,14 +13,14 @@ use storage_async::Store;
|
|||||||
use task_executor::TaskExecutor;
|
use task_executor::TaskExecutor;
|
||||||
use zgs_spec::SECTORS_PER_SEAL;
|
use zgs_spec::SECTORS_PER_SEAL;
|
||||||
|
|
||||||
use crate::config::{MineServiceMiddleware, MinerConfig};
|
use crate::config::MinerConfig;
|
||||||
|
|
||||||
const DB_QUERY_PERIOD_ON_NO_TASK: u64 = 1;
|
const DB_QUERY_PERIOD_ON_NO_TASK: u64 = 1;
|
||||||
const DB_QUERY_PERIOD_ON_ERROR: u64 = 5;
|
const DB_QUERY_PERIOD_ON_ERROR: u64 = 5;
|
||||||
const CHAIN_STATUS_QUERY_PERIOD: u64 = 5;
|
const CHAIN_STATUS_QUERY_PERIOD: u64 = 5;
|
||||||
|
|
||||||
pub struct Sealer {
|
pub struct Sealer {
|
||||||
flow_contract: ZgsFlow<MineServiceMiddleware>,
|
flow_contract: ZgsFlow<Provider<RetryClient<Http>>>,
|
||||||
store: Arc<Store>,
|
store: Arc<Store>,
|
||||||
context_cache: BTreeMap<u128, EpochRangeWithContextDigest>,
|
context_cache: BTreeMap<u128, EpochRangeWithContextDigest>,
|
||||||
last_context_flow_length: u64,
|
last_context_flow_length: u64,
|
||||||
@ -29,7 +30,7 @@ pub struct Sealer {
|
|||||||
impl Sealer {
|
impl Sealer {
|
||||||
pub fn spawn(
|
pub fn spawn(
|
||||||
executor: TaskExecutor,
|
executor: TaskExecutor,
|
||||||
provider: Arc<MineServiceMiddleware>,
|
provider: Arc<Provider<RetryClient<Http>>>,
|
||||||
store: Arc<Store>,
|
store: Arc<Store>,
|
||||||
config: &MinerConfig,
|
config: &MinerConfig,
|
||||||
miner_id: H256,
|
miner_id: H256,
|
||||||
|
@ -33,11 +33,13 @@ impl MineService {
|
|||||||
config: MinerConfig,
|
config: MinerConfig,
|
||||||
store: Arc<Store>,
|
store: Arc<Store>,
|
||||||
) -> Result<broadcast::Sender<MinerMessage>, String> {
|
) -> Result<broadcast::Sender<MinerMessage>, String> {
|
||||||
let provider = Arc::new(config.make_provider().await?);
|
let provider = config.make_provider()?;
|
||||||
|
let signing_provider = Arc::new(config.make_signing_provider().await?);
|
||||||
|
|
||||||
let (msg_send, msg_recv) = broadcast::channel(1024);
|
let (msg_send, msg_recv) = broadcast::channel(1024);
|
||||||
|
|
||||||
let miner_id = check_and_request_miner_id(&config, store.as_ref(), &provider).await?;
|
let miner_id =
|
||||||
|
check_and_request_miner_id(&config, store.as_ref(), &signing_provider).await?;
|
||||||
debug!("miner id setting complete.");
|
debug!("miner id setting complete.");
|
||||||
|
|
||||||
let mine_context_receiver = MineContextWatcher::spawn(
|
let mine_context_receiver = MineContextWatcher::spawn(
|
||||||
@ -61,6 +63,7 @@ impl MineService {
|
|||||||
mine_answer_receiver,
|
mine_answer_receiver,
|
||||||
mine_context_receiver,
|
mine_context_receiver,
|
||||||
provider.clone(),
|
provider.clone(),
|
||||||
|
signing_provider,
|
||||||
store.clone(),
|
store.clone(),
|
||||||
&config,
|
&config,
|
||||||
);
|
);
|
||||||
|
@ -2,6 +2,7 @@ use contract_interface::PoraAnswer;
|
|||||||
use contract_interface::{PoraMine, ZgsFlow};
|
use contract_interface::{PoraMine, ZgsFlow};
|
||||||
use ethereum_types::U256;
|
use ethereum_types::U256;
|
||||||
use ethers::contract::ContractCall;
|
use ethers::contract::ContractCall;
|
||||||
|
use ethers::prelude::{Http, Provider, RetryClient};
|
||||||
use ethers::providers::PendingTransaction;
|
use ethers::providers::PendingTransaction;
|
||||||
use hex::ToHex;
|
use hex::ToHex;
|
||||||
use shared_types::FlowRangeProof;
|
use shared_types::FlowRangeProof;
|
||||||
@ -24,7 +25,7 @@ pub struct Submitter {
|
|||||||
mine_answer_receiver: mpsc::UnboundedReceiver<AnswerWithoutProof>,
|
mine_answer_receiver: mpsc::UnboundedReceiver<AnswerWithoutProof>,
|
||||||
mine_context_receiver: broadcast::Receiver<MineContextMessage>,
|
mine_context_receiver: broadcast::Receiver<MineContextMessage>,
|
||||||
mine_contract: PoraMine<MineServiceMiddleware>,
|
mine_contract: PoraMine<MineServiceMiddleware>,
|
||||||
flow_contract: ZgsFlow<MineServiceMiddleware>,
|
flow_contract: ZgsFlow<Provider<RetryClient<Http>>>,
|
||||||
default_gas_limit: Option<U256>,
|
default_gas_limit: Option<U256>,
|
||||||
store: Arc<Store>,
|
store: Arc<Store>,
|
||||||
}
|
}
|
||||||
@ -34,11 +35,12 @@ impl Submitter {
|
|||||||
executor: TaskExecutor,
|
executor: TaskExecutor,
|
||||||
mine_answer_receiver: mpsc::UnboundedReceiver<AnswerWithoutProof>,
|
mine_answer_receiver: mpsc::UnboundedReceiver<AnswerWithoutProof>,
|
||||||
mine_context_receiver: broadcast::Receiver<MineContextMessage>,
|
mine_context_receiver: broadcast::Receiver<MineContextMessage>,
|
||||||
provider: Arc<MineServiceMiddleware>,
|
provider: Arc<Provider<RetryClient<Http>>>,
|
||||||
|
signing_provider: Arc<MineServiceMiddleware>,
|
||||||
store: Arc<Store>,
|
store: Arc<Store>,
|
||||||
config: &MinerConfig,
|
config: &MinerConfig,
|
||||||
) {
|
) {
|
||||||
let mine_contract = PoraMine::new(config.mine_address, provider.clone());
|
let mine_contract = PoraMine::new(config.mine_address, signing_provider);
|
||||||
let flow_contract = ZgsFlow::new(config.flow_address, provider);
|
let flow_contract = ZgsFlow::new(config.flow_address, provider);
|
||||||
let default_gas_limit = config.submission_gas;
|
let default_gas_limit = config.submission_gas;
|
||||||
|
|
||||||
|
@ -14,13 +14,13 @@ use tokio::{
|
|||||||
try_join,
|
try_join,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use crate::{config::MineServiceMiddleware, mine::PoraPuzzle, MinerConfig, MinerMessage};
|
||||||
|
use ethers::prelude::{Http, RetryClient};
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use std::{ops::DerefMut, str::FromStr};
|
use std::{ops::DerefMut, str::FromStr};
|
||||||
|
|
||||||
use crate::{config::MineServiceMiddleware, mine::PoraPuzzle, MinerConfig, MinerMessage};
|
|
||||||
|
|
||||||
pub type MineContextMessage = Option<PoraPuzzle>;
|
pub type MineContextMessage = Option<PoraPuzzle>;
|
||||||
|
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
@ -29,9 +29,9 @@ lazy_static! {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub struct MineContextWatcher {
|
pub struct MineContextWatcher {
|
||||||
provider: Arc<MineServiceMiddleware>,
|
provider: Arc<Provider<RetryClient<Http>>>,
|
||||||
flow_contract: ZgsFlow<MineServiceMiddleware>,
|
flow_contract: ZgsFlow<Provider<RetryClient<Http>>>,
|
||||||
mine_contract: PoraMine<MineServiceMiddleware>,
|
mine_contract: PoraMine<Provider<RetryClient<Http>>>,
|
||||||
|
|
||||||
mine_context_sender: broadcast::Sender<MineContextMessage>,
|
mine_context_sender: broadcast::Sender<MineContextMessage>,
|
||||||
last_report: MineContextMessage,
|
last_report: MineContextMessage,
|
||||||
@ -44,7 +44,7 @@ impl MineContextWatcher {
|
|||||||
pub fn spawn(
|
pub fn spawn(
|
||||||
executor: TaskExecutor,
|
executor: TaskExecutor,
|
||||||
msg_recv: broadcast::Receiver<MinerMessage>,
|
msg_recv: broadcast::Receiver<MinerMessage>,
|
||||||
provider: Arc<MineServiceMiddleware>,
|
provider: Arc<Provider<RetryClient<Http>>>,
|
||||||
config: &MinerConfig,
|
config: &MinerConfig,
|
||||||
) -> broadcast::Receiver<MineContextMessage> {
|
) -> broadcast::Receiver<MineContextMessage> {
|
||||||
let mine_contract = PoraMine::new(config.mine_address, provider.clone());
|
let mine_contract = PoraMine::new(config.mine_address, provider.clone());
|
||||||
|
Loading…
Reference in New Issue
Block a user