Compare commits

...

4 Commits

Author SHA1 Message Date
peilun-conflux
04f48e0617
Merge d318db5ea3 into 8f17a7ad72 2024-10-26 06:27:02 +00:00
Bo QIU
8f17a7ad72
Fix metrics config deser (#245)
Some checks failed
abi-consistent-check / build-and-compare (push) Has been cancelled
code-coverage / unittest-cov (push) Has been cancelled
rust / check (push) Has been cancelled
rust / test (push) Has been cancelled
rust / lints (push) Has been cancelled
functional-test / test (push) Has been cancelled
2024-10-22 14:14:33 +08:00
Joel Liu
2947cb7ac6
Optimizing recover perf by reducing sync progress events (#244)
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run
* Optimizing recover perf by reducing sync progress events

* add log

* add log
2024-10-21 21:24:50 +08:00
peilun-conflux
39efb721c5
Remove sender from contract call. (#242)
This allows the RPC services to cache the results.
2024-10-21 16:58:50 +08:00
11 changed files with 56 additions and 31 deletions

3
Cargo.lock generated
View File

@ -4715,9 +4715,10 @@ dependencies = [
[[package]] [[package]]
name = "metrics" name = "metrics"
version = "0.1.0" version = "0.1.0"
source = "git+https://github.com/Conflux-Chain/conflux-rust.git?rev=992ebc5483d937c8f6b883e266f8ed2a67a7fa9a#992ebc5483d937c8f6b883e266f8ed2a67a7fa9a" source = "git+https://github.com/Conflux-Chain/conflux-rust.git?rev=c4734e337c66d38e6396742cd5117b596e8d2603#c4734e337c66d38e6396742cd5117b596e8d2603"
dependencies = [ dependencies = [
"chrono", "chrono",
"duration-str",
"futures", "futures",
"influx_db_client", "influx_db_client",
"lazy_static", "lazy_static",

View File

@ -28,7 +28,7 @@ members = [
resolver = "2" resolver = "2"
[workspace.dependencies] [workspace.dependencies]
metrics = { git = "https://github.com/Conflux-Chain/conflux-rust.git", rev = "992ebc5483d937c8f6b883e266f8ed2a67a7fa9a" } metrics = { git = "https://github.com/Conflux-Chain/conflux-rust.git", rev = "c4734e337c66d38e6396742cd5117b596e8d2603" }
[patch.crates-io] [patch.crates-io]
discv5 = { path = "version-meld/discv5" } discv5 = { path = "version-meld/discv5" }

View File

@ -236,22 +236,29 @@ impl LogEntryFetcher {
.filter; .filter;
let mut stream = LogQuery::new(&provider, &filter, log_query_delay) let mut stream = LogQuery::new(&provider, &filter, log_query_delay)
.with_page_size(log_page_size); .with_page_size(log_page_size);
debug!( info!(
"start_recover starts, start={} end={}", "start_recover starts, start={} end={}",
start_block_number, end_block_number start_block_number, end_block_number
); );
let (mut block_hash_sent, mut block_number_sent) = (None, None);
while let Some(maybe_log) = stream.next().await { while let Some(maybe_log) = stream.next().await {
match maybe_log { match maybe_log {
Ok(log) => { Ok(log) => {
let sync_progress = let sync_progress =
if log.block_hash.is_some() && log.block_number.is_some() { if log.block_hash.is_some() && log.block_number.is_some() {
let synced_block = LogFetchProgress::SyncedBlock(( if block_hash_sent != log.block_hash
log.block_number.unwrap().as_u64(), || block_number_sent != log.block_number
log.block_hash.unwrap(), {
None, let synced_block = LogFetchProgress::SyncedBlock((
)); log.block_number.unwrap().as_u64(),
progress = log.block_number.unwrap().as_u64(); log.block_hash.unwrap(),
Some(synced_block) None,
));
progress = log.block_number.unwrap().as_u64();
Some(synced_block)
} else {
None
}
} else { } else {
None None
}; };
@ -268,7 +275,12 @@ impl LogEntryFetcher {
log.block_number.expect("block number exist").as_u64(), log.block_number.expect("block number exist").as_u64(),
)) ))
.and_then(|_| match sync_progress { .and_then(|_| match sync_progress {
Some(b) => recover_tx.send(b), Some(b) => {
recover_tx.send(b)?;
block_hash_sent = log.block_hash;
block_number_sent = log.block_number;
Ok(())
}
None => Ok(()), None => Ok(()),
}) })
{ {
@ -290,6 +302,8 @@ impl LogEntryFetcher {
} }
} }
} }
info!("log recover end");
}, },
"log recover", "log recover",
); );

View File

@ -67,8 +67,8 @@ impl MinerConfig {
}) })
} }
pub(crate) async fn make_provider(&self) -> Result<MineServiceMiddleware, String> { pub(crate) fn make_provider(&self) -> Result<Arc<Provider<RetryClient<Http>>>, String> {
let provider = Arc::new(Provider::new( Ok(Arc::new(Provider::new(
RetryClientBuilder::default() RetryClientBuilder::default()
.rate_limit_retries(self.rate_limit_retries) .rate_limit_retries(self.rate_limit_retries)
.timeout_retries(self.timeout_retries) .timeout_retries(self.timeout_retries)
@ -78,7 +78,11 @@ impl MinerConfig {
.map_err(|e| format!("Cannot parse blockchain endpoint: {:?}", e))?, .map_err(|e| format!("Cannot parse blockchain endpoint: {:?}", e))?,
Box::new(HttpRateLimitRetryPolicy), Box::new(HttpRateLimitRetryPolicy),
), ),
)); )))
}
pub(crate) async fn make_signing_provider(&self) -> Result<MineServiceMiddleware, String> {
let provider = self.make_provider()?;
let chain_id = provider let chain_id = provider
.get_chainid() .get_chainid()
.await .await

View File

@ -1,6 +1,7 @@
use std::{collections::BTreeMap, sync::Arc}; use std::{collections::BTreeMap, sync::Arc};
use ethereum_types::H256; use ethereum_types::H256;
use ethers::prelude::{Http, Provider, RetryClient};
use tokio::time::{sleep, Duration, Instant}; use tokio::time::{sleep, Duration, Instant};
use contract_interface::{EpochRangeWithContextDigest, ZgsFlow}; use contract_interface::{EpochRangeWithContextDigest, ZgsFlow};
@ -12,14 +13,14 @@ use storage_async::Store;
use task_executor::TaskExecutor; use task_executor::TaskExecutor;
use zgs_spec::SECTORS_PER_SEAL; use zgs_spec::SECTORS_PER_SEAL;
use crate::config::{MineServiceMiddleware, MinerConfig}; use crate::config::MinerConfig;
const DB_QUERY_PERIOD_ON_NO_TASK: u64 = 1; const DB_QUERY_PERIOD_ON_NO_TASK: u64 = 1;
const DB_QUERY_PERIOD_ON_ERROR: u64 = 5; const DB_QUERY_PERIOD_ON_ERROR: u64 = 5;
const CHAIN_STATUS_QUERY_PERIOD: u64 = 5; const CHAIN_STATUS_QUERY_PERIOD: u64 = 5;
pub struct Sealer { pub struct Sealer {
flow_contract: ZgsFlow<MineServiceMiddleware>, flow_contract: ZgsFlow<Provider<RetryClient<Http>>>,
store: Arc<Store>, store: Arc<Store>,
context_cache: BTreeMap<u128, EpochRangeWithContextDigest>, context_cache: BTreeMap<u128, EpochRangeWithContextDigest>,
last_context_flow_length: u64, last_context_flow_length: u64,
@ -29,7 +30,7 @@ pub struct Sealer {
impl Sealer { impl Sealer {
pub fn spawn( pub fn spawn(
executor: TaskExecutor, executor: TaskExecutor,
provider: Arc<MineServiceMiddleware>, provider: Arc<Provider<RetryClient<Http>>>,
store: Arc<Store>, store: Arc<Store>,
config: &MinerConfig, config: &MinerConfig,
miner_id: H256, miner_id: H256,

View File

@ -33,11 +33,13 @@ impl MineService {
config: MinerConfig, config: MinerConfig,
store: Arc<Store>, store: Arc<Store>,
) -> Result<broadcast::Sender<MinerMessage>, String> { ) -> Result<broadcast::Sender<MinerMessage>, String> {
let provider = Arc::new(config.make_provider().await?); let provider = config.make_provider()?;
let signing_provider = Arc::new(config.make_signing_provider().await?);
let (msg_send, msg_recv) = broadcast::channel(1024); let (msg_send, msg_recv) = broadcast::channel(1024);
let miner_id = check_and_request_miner_id(&config, store.as_ref(), &provider).await?; let miner_id =
check_and_request_miner_id(&config, store.as_ref(), &signing_provider).await?;
debug!("miner id setting complete."); debug!("miner id setting complete.");
let mine_context_receiver = MineContextWatcher::spawn( let mine_context_receiver = MineContextWatcher::spawn(
@ -61,6 +63,7 @@ impl MineService {
mine_answer_receiver, mine_answer_receiver,
mine_context_receiver, mine_context_receiver,
provider.clone(), provider.clone(),
signing_provider,
store.clone(), store.clone(),
&config, &config,
); );

View File

@ -2,6 +2,7 @@ use contract_interface::PoraAnswer;
use contract_interface::{PoraMine, ZgsFlow}; use contract_interface::{PoraMine, ZgsFlow};
use ethereum_types::U256; use ethereum_types::U256;
use ethers::contract::ContractCall; use ethers::contract::ContractCall;
use ethers::prelude::{Http, Provider, RetryClient};
use ethers::providers::PendingTransaction; use ethers::providers::PendingTransaction;
use hex::ToHex; use hex::ToHex;
use shared_types::FlowRangeProof; use shared_types::FlowRangeProof;
@ -24,7 +25,7 @@ pub struct Submitter {
mine_answer_receiver: mpsc::UnboundedReceiver<AnswerWithoutProof>, mine_answer_receiver: mpsc::UnboundedReceiver<AnswerWithoutProof>,
mine_context_receiver: broadcast::Receiver<MineContextMessage>, mine_context_receiver: broadcast::Receiver<MineContextMessage>,
mine_contract: PoraMine<MineServiceMiddleware>, mine_contract: PoraMine<MineServiceMiddleware>,
flow_contract: ZgsFlow<MineServiceMiddleware>, flow_contract: ZgsFlow<Provider<RetryClient<Http>>>,
default_gas_limit: Option<U256>, default_gas_limit: Option<U256>,
store: Arc<Store>, store: Arc<Store>,
} }
@ -34,11 +35,12 @@ impl Submitter {
executor: TaskExecutor, executor: TaskExecutor,
mine_answer_receiver: mpsc::UnboundedReceiver<AnswerWithoutProof>, mine_answer_receiver: mpsc::UnboundedReceiver<AnswerWithoutProof>,
mine_context_receiver: broadcast::Receiver<MineContextMessage>, mine_context_receiver: broadcast::Receiver<MineContextMessage>,
provider: Arc<MineServiceMiddleware>, provider: Arc<Provider<RetryClient<Http>>>,
signing_provider: Arc<MineServiceMiddleware>,
store: Arc<Store>, store: Arc<Store>,
config: &MinerConfig, config: &MinerConfig,
) { ) {
let mine_contract = PoraMine::new(config.mine_address, provider.clone()); let mine_contract = PoraMine::new(config.mine_address, signing_provider);
let flow_contract = ZgsFlow::new(config.flow_address, provider); let flow_contract = ZgsFlow::new(config.flow_address, provider);
let default_gas_limit = config.submission_gas; let default_gas_limit = config.submission_gas;

View File

@ -14,13 +14,13 @@ use tokio::{
try_join, try_join,
}; };
use crate::{config::MineServiceMiddleware, mine::PoraPuzzle, MinerConfig, MinerMessage};
use ethers::prelude::{Http, RetryClient};
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use std::{ops::DerefMut, str::FromStr}; use std::{ops::DerefMut, str::FromStr};
use crate::{config::MineServiceMiddleware, mine::PoraPuzzle, MinerConfig, MinerMessage};
pub type MineContextMessage = Option<PoraPuzzle>; pub type MineContextMessage = Option<PoraPuzzle>;
lazy_static! { lazy_static! {
@ -29,9 +29,9 @@ lazy_static! {
} }
pub struct MineContextWatcher { pub struct MineContextWatcher {
provider: Arc<MineServiceMiddleware>, provider: Arc<Provider<RetryClient<Http>>>,
flow_contract: ZgsFlow<MineServiceMiddleware>, flow_contract: ZgsFlow<Provider<RetryClient<Http>>>,
mine_contract: PoraMine<MineServiceMiddleware>, mine_contract: PoraMine<Provider<RetryClient<Http>>>,
mine_context_sender: broadcast::Sender<MineContextMessage>, mine_context_sender: broadcast::Sender<MineContextMessage>,
last_report: MineContextMessage, last_report: MineContextMessage,
@ -44,7 +44,7 @@ impl MineContextWatcher {
pub fn spawn( pub fn spawn(
executor: TaskExecutor, executor: TaskExecutor,
msg_recv: broadcast::Receiver<MinerMessage>, msg_recv: broadcast::Receiver<MinerMessage>,
provider: Arc<MineServiceMiddleware>, provider: Arc<Provider<RetryClient<Http>>>,
config: &MinerConfig, config: &MinerConfig,
) -> broadcast::Receiver<MineContextMessage> { ) -> broadcast::Receiver<MineContextMessage> {
let mine_contract = PoraMine::new(config.mine_address, provider.clone()); let mine_contract = PoraMine::new(config.mine_address, provider.clone());

View File

@ -324,7 +324,7 @@ auto_sync_enabled = true
# enabled = false # enabled = false
# Interval to output metrics periodically, e.g. "10s", "30s" or "60s". # Interval to output metrics periodically, e.g. "10s", "30s" or "60s".
# report_interval = "" # report_interval = "10s"
# File name to output metrics periodically. # File name to output metrics periodically.
# file_report_output = "" # file_report_output = ""

View File

@ -336,7 +336,7 @@ auto_sync_enabled = true
# enabled = false # enabled = false
# Interval to output metrics periodically, e.g. "10s", "30s" or "60s". # Interval to output metrics periodically, e.g. "10s", "30s" or "60s".
# report_interval = "" # report_interval = "10s"
# File name to output metrics periodically. # File name to output metrics periodically.
# file_report_output = "" # file_report_output = ""

View File

@ -338,7 +338,7 @@
# enabled = false # enabled = false
# Interval to output metrics periodically, e.g. "10s", "30s" or "60s". # Interval to output metrics periodically, e.g. "10s", "30s" or "60s".
# report_interval = "" # report_interval = "10s"
# File name to output metrics periodically. # File name to output metrics periodically.
# file_report_output = "" # file_report_output = ""