Compare commits

...

4 Commits

Author SHA1 Message Date
0g-peterzhb
f351bf799f
Merge b352184dc6 into 2947cb7ac6 2024-10-21 21:25:14 +08:00
Joel Liu
2947cb7ac6
Optimizing recover perf by reducing sync progress events (#244)
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run
* Optimizing recover perf by reducing sync progress events

* add log

* add log
2024-10-21 21:24:50 +08:00
peilun-conflux
39efb721c5
Remove sender from contract call. (#242)
This allows the RPC services to cache the results.
2024-10-21 16:58:50 +08:00
Joel Liu
9fe5a2c18b
Stop recovery when sending via the channel fails (#240)
Some checks failed
abi-consistent-check / build-and-compare (push) Has been cancelled
code-coverage / unittest-cov (push) Has been cancelled
rust / check (push) Has been cancelled
rust / test (push) Has been cancelled
rust / lints (push) Has been cancelled
functional-test / test (push) Has been cancelled
2024-10-18 16:08:35 +08:00
6 changed files with 51 additions and 26 deletions

View File

@ -236,22 +236,29 @@ impl LogEntryFetcher {
.filter;
let mut stream = LogQuery::new(&provider, &filter, log_query_delay)
.with_page_size(log_page_size);
debug!(
info!(
"start_recover starts, start={} end={}",
start_block_number, end_block_number
);
let (mut block_hash_sent, mut block_number_sent) = (None, None);
while let Some(maybe_log) = stream.next().await {
match maybe_log {
Ok(log) => {
let sync_progress =
if log.block_hash.is_some() && log.block_number.is_some() {
let synced_block = LogFetchProgress::SyncedBlock((
log.block_number.unwrap().as_u64(),
log.block_hash.unwrap(),
None,
));
progress = log.block_number.unwrap().as_u64();
Some(synced_block)
if block_hash_sent != log.block_hash
|| block_number_sent != log.block_number
{
let synced_block = LogFetchProgress::SyncedBlock((
log.block_number.unwrap().as_u64(),
log.block_hash.unwrap(),
None,
));
progress = log.block_number.unwrap().as_u64();
Some(synced_block)
} else {
None
}
} else {
None
};
@ -268,11 +275,17 @@ impl LogEntryFetcher {
log.block_number.expect("block number exist").as_u64(),
))
.and_then(|_| match sync_progress {
Some(b) => recover_tx.send(b),
Some(b) => {
recover_tx.send(b)?;
block_hash_sent = log.block_hash;
block_number_sent = log.block_number;
Ok(())
}
None => Ok(()),
})
{
error!("send error: e={:?}", e);
break;
}
}
Err(e) => {
@ -289,6 +302,8 @@ impl LogEntryFetcher {
}
}
}
info!("log recover end");
},
"log recover",
);

View File

@ -67,8 +67,8 @@ impl MinerConfig {
})
}
pub(crate) async fn make_provider(&self) -> Result<MineServiceMiddleware, String> {
let provider = Arc::new(Provider::new(
pub(crate) fn make_provider(&self) -> Result<Arc<Provider<RetryClient<Http>>>, String> {
Ok(Arc::new(Provider::new(
RetryClientBuilder::default()
.rate_limit_retries(self.rate_limit_retries)
.timeout_retries(self.timeout_retries)
@ -78,7 +78,11 @@ impl MinerConfig {
.map_err(|e| format!("Cannot parse blockchain endpoint: {:?}", e))?,
Box::new(HttpRateLimitRetryPolicy),
),
));
)))
}
pub(crate) async fn make_signing_provider(&self) -> Result<MineServiceMiddleware, String> {
let provider = self.make_provider()?;
let chain_id = provider
.get_chainid()
.await

View File

@ -1,6 +1,7 @@
use std::{collections::BTreeMap, sync::Arc};
use ethereum_types::H256;
use ethers::prelude::{Http, Provider, RetryClient};
use tokio::time::{sleep, Duration, Instant};
use contract_interface::{EpochRangeWithContextDigest, ZgsFlow};
@ -12,14 +13,14 @@ use storage_async::Store;
use task_executor::TaskExecutor;
use zgs_spec::SECTORS_PER_SEAL;
use crate::config::{MineServiceMiddleware, MinerConfig};
use crate::config::MinerConfig;
const DB_QUERY_PERIOD_ON_NO_TASK: u64 = 1;
const DB_QUERY_PERIOD_ON_ERROR: u64 = 5;
const CHAIN_STATUS_QUERY_PERIOD: u64 = 5;
pub struct Sealer {
flow_contract: ZgsFlow<MineServiceMiddleware>,
flow_contract: ZgsFlow<Provider<RetryClient<Http>>>,
store: Arc<Store>,
context_cache: BTreeMap<u128, EpochRangeWithContextDigest>,
last_context_flow_length: u64,
@ -29,7 +30,7 @@ pub struct Sealer {
impl Sealer {
pub fn spawn(
executor: TaskExecutor,
provider: Arc<MineServiceMiddleware>,
provider: Arc<Provider<RetryClient<Http>>>,
store: Arc<Store>,
config: &MinerConfig,
miner_id: H256,

View File

@ -33,11 +33,13 @@ impl MineService {
config: MinerConfig,
store: Arc<Store>,
) -> Result<broadcast::Sender<MinerMessage>, String> {
let provider = Arc::new(config.make_provider().await?);
let provider = config.make_provider()?;
let signing_provider = Arc::new(config.make_signing_provider().await?);
let (msg_send, msg_recv) = broadcast::channel(1024);
let miner_id = check_and_request_miner_id(&config, store.as_ref(), &provider).await?;
let miner_id =
check_and_request_miner_id(&config, store.as_ref(), &signing_provider).await?;
debug!("miner id setting complete.");
let mine_context_receiver = MineContextWatcher::spawn(
@ -61,6 +63,7 @@ impl MineService {
mine_answer_receiver,
mine_context_receiver,
provider.clone(),
signing_provider,
store.clone(),
&config,
);

View File

@ -2,6 +2,7 @@ use contract_interface::PoraAnswer;
use contract_interface::{PoraMine, ZgsFlow};
use ethereum_types::U256;
use ethers::contract::ContractCall;
use ethers::prelude::{Http, Provider, RetryClient};
use ethers::providers::PendingTransaction;
use hex::ToHex;
use shared_types::FlowRangeProof;
@ -24,7 +25,7 @@ pub struct Submitter {
mine_answer_receiver: mpsc::UnboundedReceiver<AnswerWithoutProof>,
mine_context_receiver: broadcast::Receiver<MineContextMessage>,
mine_contract: PoraMine<MineServiceMiddleware>,
flow_contract: ZgsFlow<MineServiceMiddleware>,
flow_contract: ZgsFlow<Provider<RetryClient<Http>>>,
default_gas_limit: Option<U256>,
store: Arc<Store>,
}
@ -34,11 +35,12 @@ impl Submitter {
executor: TaskExecutor,
mine_answer_receiver: mpsc::UnboundedReceiver<AnswerWithoutProof>,
mine_context_receiver: broadcast::Receiver<MineContextMessage>,
provider: Arc<MineServiceMiddleware>,
provider: Arc<Provider<RetryClient<Http>>>,
signing_provider: Arc<MineServiceMiddleware>,
store: Arc<Store>,
config: &MinerConfig,
) {
let mine_contract = PoraMine::new(config.mine_address, provider.clone());
let mine_contract = PoraMine::new(config.mine_address, signing_provider);
let flow_contract = ZgsFlow::new(config.flow_address, provider);
let default_gas_limit = config.submission_gas;

View File

@ -14,13 +14,13 @@ use tokio::{
try_join,
};
use crate::{config::MineServiceMiddleware, mine::PoraPuzzle, MinerConfig, MinerMessage};
use ethers::prelude::{Http, RetryClient};
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use std::{ops::DerefMut, str::FromStr};
use crate::{config::MineServiceMiddleware, mine::PoraPuzzle, MinerConfig, MinerMessage};
pub type MineContextMessage = Option<PoraPuzzle>;
lazy_static! {
@ -29,9 +29,9 @@ lazy_static! {
}
pub struct MineContextWatcher {
provider: Arc<MineServiceMiddleware>,
flow_contract: ZgsFlow<MineServiceMiddleware>,
mine_contract: PoraMine<MineServiceMiddleware>,
provider: Arc<Provider<RetryClient<Http>>>,
flow_contract: ZgsFlow<Provider<RetryClient<Http>>>,
mine_contract: PoraMine<Provider<RetryClient<Http>>>,
mine_context_sender: broadcast::Sender<MineContextMessage>,
last_report: MineContextMessage,
@ -44,7 +44,7 @@ impl MineContextWatcher {
pub fn spawn(
executor: TaskExecutor,
msg_recv: broadcast::Receiver<MinerMessage>,
provider: Arc<MineServiceMiddleware>,
provider: Arc<Provider<RetryClient<Http>>>,
config: &MinerConfig,
) -> broadcast::Receiver<MineContextMessage> {
let mine_contract = PoraMine::new(config.mine_address, provider.clone());