refactor config

This commit is contained in:
Peter Zhang 2025-02-14 22:12:38 +08:00
parent 01997ad670
commit 5fee8b9d63
8 changed files with 44 additions and 55 deletions

1
Cargo.lock generated
View File

@ -9269,6 +9269,7 @@ dependencies = [
"clap", "clap",
"config", "config",
"console-subscriber", "console-subscriber",
"contract-wrapper",
"ctrlc", "ctrlc",
"duration-str", "duration-str",
"error-chain", "error-chain",

View File

@ -4,6 +4,7 @@ use ethers::{
providers::{Middleware, ProviderError}, providers::{Middleware, ProviderError},
types::{TransactionReceipt, U256}, types::{TransactionReceipt, U256},
}; };
use serde::Deserialize;
use std::{sync::Arc, time::Duration}; use std::{sync::Arc, time::Duration};
use tokio::time::sleep; use tokio::time::sleep;
use tracing::{debug, info}; use tracing::{debug, info};
@ -19,35 +20,38 @@ pub enum SubmissionAction {
} }
/// Configuration for submission retries, gas price, etc. /// Configuration for submission retries, gas price, etc.
#[derive(Clone, Debug)] #[derive(Clone, Copy, Debug, Deserialize)]
pub struct SubmitConfig { pub struct SubmitConfig {
/// If `Some`, use this gas price for the first attempt. /// If `Some`, use this gas price for the first attempt.
/// If `None`, fetch the current network gas price. /// If `None`, fetch the current network gas price.
pub initial_gas_price: Option<U256>, pub(crate) initial_gas_price: Option<U256>,
/// If `Some`, clamp increased gas price to this limit. /// If `Some`, clamp increased gas price to this limit.
/// If `None`, do not bump gas for mempool/timeout errors. /// If `None`, do not bump gas for mempool/timeout errors.
pub max_gas_price: Option<U256>, pub(crate) max_gas_price: Option<U256>,
/// Gas limit of the transaction
pub(crate) max_gas: Option<U256>,
/// Factor by which to multiply the gas price on each mempool/timeout error. /// Factor by which to multiply the gas price on each mempool/timeout error.
/// E.g. if factor=11 => a 10% bump => newGas = (gas * factor) / 10 /// E.g. if factor=11 => a 10% bump => newGas = (gas * factor) / 10
pub gas_increase_factor: Option<u64>, pub(crate) gas_increase_factor: Option<u64>,
/// The maximum number of gas bumps (for mempool/timeout). If `max_gas_price` is set, /// The maximum number of gas bumps (for mempool/timeout). If `max_gas_price` is set,
/// we typically rely on clamping. But you can still cap the number of bumps if you want. /// we typically rely on clamping. But you can still cap the number of bumps if you want.
pub max_retries: usize, pub(crate) max_retries: Option<usize>,
/// Seconds to wait between attempts. /// Seconds to wait between attempts.
pub interval_secs: u64, pub(crate) interval_secs: Option<u64>,
/// A global timeout for the entire operation, in seconds. If 0, no global timeout.
pub total_timeout_secs: u64,
} }
const DEFAULT_INTERVAL_SECS: u64 = 2;
const DEFAULT_MAX_RETRIES: usize = 5;
impl Default for SubmitConfig { impl Default for SubmitConfig {
fn default() -> Self { fn default() -> Self {
Self { Self {
initial_gas_price: None, initial_gas_price: None,
max_gas_price: None, max_gas_price: None,
max_gas: None,
gas_increase_factor: Some(11), // implies 10% bump if we do (gas*11)/10 gas_increase_factor: Some(11), // implies 10% bump if we do (gas*11)/10
max_retries: 5, max_retries: Some(DEFAULT_MAX_RETRIES),
interval_secs: 2, interval_secs: Some(DEFAULT_INTERVAL_SECS),
total_timeout_secs: 0, // no global timeout by default
} }
} }
} }
@ -121,6 +125,9 @@ where
M: Middleware + 'static, M: Middleware + 'static,
T: Detokenize, T: Detokenize,
{ {
if let Some(max_gas) = config.max_gas {
call = call.gas(max_gas);
}
let mut gas_price = if let Some(gp) = config.initial_gas_price { let mut gas_price = if let Some(gp) = config.initial_gas_price {
gp gp
} else { } else {
@ -136,6 +143,7 @@ where
// Two counters: one for gas bumps, one for non-gas retries // Two counters: one for gas bumps, one for non-gas retries
let mut non_gas_retries = 0; let mut non_gas_retries = 0;
let max_retries = config.max_retries.unwrap_or(DEFAULT_MAX_RETRIES);
loop { loop {
// Set gas price on the call // Set gas price on the call
@ -173,8 +181,8 @@ where
} else { } else {
// Non-gas error => increment nonGasRetries // Non-gas error => increment nonGasRetries
non_gas_retries += 1; non_gas_retries += 1;
if non_gas_retries > config.max_retries { if non_gas_retries > max_retries {
return Err(format!("Exceeded non-gas retries: {}", config.max_retries)); return Err(format!("Exceeded non-gas retries: {}", max_retries));
} }
debug!( debug!(
"Non-gas retry #{} (same gas price: {})", "Non-gas retry #{} (same gas price: {})",
@ -188,6 +196,9 @@ where
} }
// Sleep between attempts // Sleep between attempts
sleep(Duration::from_secs(config.interval_secs)).await; sleep(Duration::from_secs(
config.interval_secs.unwrap_or(DEFAULT_INTERVAL_SECS),
))
.await;
} }
} }

View File

@ -43,6 +43,7 @@ rust-log = { package = "log", version = "0.4.22" }
tracing-core = "0.1.32" tracing-core = "0.1.32"
tracing-log = "0.2.0" tracing-log = "0.2.0"
console-subscriber = { version = "0.4.1", optional = true } console-subscriber = { version = "0.4.1", optional = true }
contract-wrapper = { path = "../common/contract-wrapper" }
[dependencies.libp2p] [dependencies.libp2p]
version = "0.45.1" version = "0.45.1"

View File

@ -2,7 +2,8 @@ use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use ethereum_types::{Address, H256, U256}; use contract_wrapper::SubmitConfig;
use ethereum_types::{Address, H256};
use ethers::core::k256::SecretKey; use ethers::core::k256::SecretKey;
use ethers::middleware::SignerMiddleware; use ethers::middleware::SignerMiddleware;
use ethers::providers::Http; use ethers::providers::Http;
@ -21,7 +22,6 @@ pub struct MinerConfig {
pub(crate) rpc_endpoint_url: String, pub(crate) rpc_endpoint_url: String,
pub(crate) mine_address: Address, pub(crate) mine_address: Address,
pub(crate) flow_address: Address, pub(crate) flow_address: Address,
pub(crate) submission_gas: Option<U256>,
pub(crate) cpu_percentage: u64, pub(crate) cpu_percentage: u64,
pub(crate) iter_batch: usize, pub(crate) iter_batch: usize,
pub(crate) shard_config: ShardConfig, pub(crate) shard_config: ShardConfig,
@ -29,7 +29,7 @@ pub struct MinerConfig {
pub(crate) rate_limit_retries: u32, pub(crate) rate_limit_retries: u32,
pub(crate) timeout_retries: u32, pub(crate) timeout_retries: u32,
pub(crate) initial_backoff: u64, pub(crate) initial_backoff: u64,
pub(crate) max_gas_price: Option<U256>, pub(crate) submission_config: SubmitConfig,
} }
pub type MineServiceMiddleware = SignerMiddleware<Arc<Provider<RetryClient<Http>>>, LocalWallet>; pub type MineServiceMiddleware = SignerMiddleware<Arc<Provider<RetryClient<Http>>>, LocalWallet>;
@ -42,7 +42,6 @@ impl MinerConfig {
rpc_endpoint_url: String, rpc_endpoint_url: String,
mine_address: Address, mine_address: Address,
flow_address: Address, flow_address: Address,
submission_gas: Option<U256>,
cpu_percentage: u64, cpu_percentage: u64,
iter_batch: usize, iter_batch: usize,
context_query_seconds: u64, context_query_seconds: u64,
@ -50,7 +49,7 @@ impl MinerConfig {
rate_limit_retries: u32, rate_limit_retries: u32,
timeout_retries: u32, timeout_retries: u32,
initial_backoff: u64, initial_backoff: u64,
max_gas_price: Option<U256>, submission_config: SubmitConfig,
) -> Option<MinerConfig> { ) -> Option<MinerConfig> {
miner_key.map(|miner_key| MinerConfig { miner_key.map(|miner_key| MinerConfig {
miner_id, miner_id,
@ -58,7 +57,6 @@ impl MinerConfig {
rpc_endpoint_url, rpc_endpoint_url,
mine_address, mine_address,
flow_address, flow_address,
submission_gas,
cpu_percentage, cpu_percentage,
iter_batch, iter_batch,
shard_config, shard_config,
@ -66,7 +64,7 @@ impl MinerConfig {
rate_limit_retries, rate_limit_retries,
timeout_retries, timeout_retries,
initial_backoff, initial_backoff,
max_gas_price, submission_config,
}) })
} }

View File

@ -92,20 +92,12 @@ async fn request_miner_id(
) -> Result<H256, String> { ) -> Result<H256, String> {
debug!("Requesting miner id on chain..."); debug!("Requesting miner id on chain...");
let mut submission_call: ContractCall<_, _> = let submission_call: ContractCall<_, _> =
mine_contract.request_miner_id(beneficiary, 0).legacy(); mine_contract.request_miner_id(beneficiary, 0).legacy();
if config.submission_gas.is_some() {
submission_call = submission_call.gas(config.submission_gas.unwrap());
}
let submission_config = contract_wrapper::SubmitConfig {
max_gas_price: config.max_gas_price,
..Default::default()
};
let receipt = contract_wrapper::submit_with_retry( let receipt = contract_wrapper::submit_with_retry(
submission_call, submission_call,
&submission_config, &config.submission_config,
mine_contract.client().clone(), mine_contract.client().clone(),
) )
.await .await

View File

@ -1,6 +1,6 @@
use contract_interface::PoraAnswer; use contract_interface::PoraAnswer;
use contract_interface::{PoraMine, ZgsFlow}; use contract_interface::{PoraMine, ZgsFlow};
use ethereum_types::U256; use contract_wrapper::SubmitConfig;
use ethers::contract::ContractCall; use ethers::contract::ContractCall;
use ethers::prelude::{Http, Provider, RetryClient}; use ethers::prelude::{Http, Provider, RetryClient};
use hex::ToHex; use hex::ToHex;
@ -22,9 +22,8 @@ pub struct Submitter {
mine_context_receiver: broadcast::Receiver<MineContextMessage>, mine_context_receiver: broadcast::Receiver<MineContextMessage>,
mine_contract: PoraMine<MineServiceMiddleware>, mine_contract: PoraMine<MineServiceMiddleware>,
flow_contract: ZgsFlow<Provider<RetryClient<Http>>>, flow_contract: ZgsFlow<Provider<RetryClient<Http>>>,
default_gas_limit: Option<U256>,
default_gas_price: Option<U256>,
store: Arc<Store>, store: Arc<Store>,
config: SubmitConfig,
} }
impl Submitter { impl Submitter {
@ -39,8 +38,6 @@ impl Submitter {
) { ) {
let mine_contract = PoraMine::new(config.mine_address, signing_provider); let mine_contract = PoraMine::new(config.mine_address, signing_provider);
let flow_contract = ZgsFlow::new(config.flow_address, provider.clone()); let flow_contract = ZgsFlow::new(config.flow_address, provider.clone());
let default_gas_limit = config.submission_gas;
let default_gas_price = config.max_gas_price;
let submitter = Submitter { let submitter = Submitter {
mine_answer_receiver, mine_answer_receiver,
@ -48,8 +45,7 @@ impl Submitter {
mine_contract, mine_contract,
flow_contract, flow_contract,
store, store,
default_gas_limit, config: config.submission_config,
default_gas_price,
}; };
executor.spawn( executor.spawn(
async move { Box::pin(submitter.start()).await }, async move { Box::pin(submitter.start()).await },
@ -133,11 +129,7 @@ impl Submitter {
}; };
trace!("submit_answer: answer={:?}", answer); trace!("submit_answer: answer={:?}", answer);
let mut submission_call: ContractCall<_, _> = self.mine_contract.submit(answer).legacy(); let submission_call: ContractCall<_, _> = self.mine_contract.submit(answer).legacy();
if let Some(gas_limit) = self.default_gas_limit {
submission_call = submission_call.gas(gas_limit);
}
if let Some(calldata) = submission_call.calldata() { if let Some(calldata) = submission_call.calldata() {
debug!( debug!(
@ -152,13 +144,9 @@ impl Submitter {
submission_call.estimate_gas().await submission_call.estimate_gas().await
); );
let submission_config = contract_wrapper::SubmitConfig {
max_gas_price: self.default_gas_price,
..Default::default()
};
contract_wrapper::submit_with_retry( contract_wrapper::submit_with_retry(
submission_call, submission_call,
&submission_config, &self.config,
self.mine_contract.client().clone(), self.mine_contract.client().clone(),
) )
.await .await

View File

@ -1,7 +1,7 @@
#![allow(clippy::field_reassign_with_default)] #![allow(clippy::field_reassign_with_default)]
use crate::ZgsConfig; use crate::ZgsConfig;
use ethereum_types::{H256, U256}; use ethereum_types::H256;
use ethers::prelude::{Http, Middleware, Provider}; use ethers::prelude::{Http, Middleware, Provider};
use log_entry_sync::{CacheConfig, ContractAddress, LogSyncConfig}; use log_entry_sync::{CacheConfig, ContractAddress, LogSyncConfig};
use miner::MinerConfig; use miner::MinerConfig;
@ -179,8 +179,6 @@ impl ZgsConfig {
} else { } else {
None None
}; };
let submission_gas = self.miner_submission_gas.map(U256::from);
let submission_gas_price = self.miner_submission_max_gas_price.map(U256::from);
let cpu_percentage = self.miner_cpu_percentage; let cpu_percentage = self.miner_cpu_percentage;
let iter_batch = self.mine_iter_batch_size; let iter_batch = self.mine_iter_batch_size;
let context_query_seconds = self.mine_context_query_seconds; let context_query_seconds = self.mine_context_query_seconds;
@ -193,7 +191,6 @@ impl ZgsConfig {
self.blockchain_rpc_endpoint.clone(), self.blockchain_rpc_endpoint.clone(),
mine_address, mine_address,
flow_address, flow_address,
submission_gas,
cpu_percentage, cpu_percentage,
iter_batch, iter_batch,
context_query_seconds, context_query_seconds,
@ -201,7 +198,7 @@ impl ZgsConfig {
self.rate_limit_retries, self.rate_limit_retries,
self.timeout_retries, self.timeout_retries,
self.initial_backoff, self.initial_backoff,
submission_gas_price, self.submission_config,
)) ))
} }

View File

@ -74,8 +74,6 @@ build_config! {
(mine_contract_address, (String), "".to_string()) (mine_contract_address, (String), "".to_string())
(miner_id, (Option<String>), None) (miner_id, (Option<String>), None)
(miner_key, (Option<String>), None) (miner_key, (Option<String>), None)
(miner_submission_gas, (Option<u64>), None)
(miner_submission_max_gas_price, (Option<u64>), None)
(miner_cpu_percentage, (u64), 100) (miner_cpu_percentage, (u64), 100)
(mine_iter_batch_size, (usize), 100) (mine_iter_batch_size, (usize), 100)
(reward_contract_address, (String), "".to_string()) (reward_contract_address, (String), "".to_string())
@ -107,6 +105,9 @@ pub struct ZgsConfig {
// rpc config, configured by [rpc] section by `config` crate. // rpc config, configured by [rpc] section by `config` crate.
pub rpc: rpc::RPCConfig, pub rpc: rpc::RPCConfig,
// submission config, configured by [submission_config] section by `config` crate.
pub submission_config: contract_wrapper::SubmitConfig,
// metrics config, configured by [metrics] section by `config` crate. // metrics config, configured by [metrics] section by `config` crate.
pub metrics: metrics::MetricsConfiguration, pub metrics: metrics::MetricsConfiguration,
} }