From 538afb00e153f39a32d546f9af6f9e3ffa60b11c Mon Sep 17 00:00:00 2001 From: 0g-peterzhb <158457852+0g-peterzhb@users.noreply.github.com> Date: Tue, 18 Feb 2025 09:09:50 +0800 Subject: [PATCH] add gas auto adjustment (#330) * add gas auto adjustment * refactor config --- Cargo.lock | 13 ++ common/contract-wrapper/Cargo.toml | 17 +++ common/contract-wrapper/src/lib.rs | 204 +++++++++++++++++++++++++++++ node/Cargo.toml | 1 + node/miner/Cargo.toml | 1 + node/miner/src/config.rs | 9 +- node/miner/src/miner_id.rs | 20 ++- node/miner/src/submitter.rs | 128 ++---------------- node/src/config/convert.rs | 5 +- node/src/config/mod.rs | 4 +- 10 files changed, 267 insertions(+), 135 deletions(-) create mode 100644 common/contract-wrapper/Cargo.toml create mode 100644 common/contract-wrapper/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 54a2635..3a18b61 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1271,6 +1271,17 @@ dependencies = [ "serde_json", ] +[[package]] +name = "contract-wrapper" +version = "0.1.0" +dependencies = [ + "ethers", + "serde", + "serde_json", + "tokio", + "tracing", +] + [[package]] name = "convert_case" version = "0.6.0" @@ -5017,6 +5028,7 @@ dependencies = [ "async-trait", "blake2", "contract-interface", + "contract-wrapper", "ethereum-types 0.14.1", "ethers", "hex", @@ -9257,6 +9269,7 @@ dependencies = [ "clap", "config", "console-subscriber", + "contract-wrapper", "ctrlc", "duration-str", "error-chain", diff --git a/common/contract-wrapper/Cargo.toml b/common/contract-wrapper/Cargo.toml new file mode 100644 index 0000000..fceba96 --- /dev/null +++ b/common/contract-wrapper/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "contract-wrapper" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +tokio = { version = "1.28", features = ["macros"] } +ethers = "2.0" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +tracing = "0.1.35" +# or `tracing` if you prefer + +[features] +dev = [] diff --git a/common/contract-wrapper/src/lib.rs b/common/contract-wrapper/src/lib.rs new file mode 100644 index 0000000..662910e --- /dev/null +++ b/common/contract-wrapper/src/lib.rs @@ -0,0 +1,204 @@ +use ethers::{ + abi::Detokenize, + contract::ContractCall, + providers::{Middleware, ProviderError}, + types::{TransactionReceipt, U256}, +}; +use serde::Deserialize; +use std::{sync::Arc, time::Duration}; +use tokio::time::sleep; +use tracing::{debug, info}; + +/// The result of a single submission attempt. +#[derive(Debug)] +pub enum SubmissionAction { + Success(TransactionReceipt), + /// Generic "retry" signal, but we still need to know if it's "mempool/timeout" or something else. + /// We'll parse the error string or have a separate reason in a real app. + Retry(String), + Error(String), +} + +/// Configuration for submission retries, gas price, etc. +#[derive(Clone, Copy, Debug, Deserialize)] +pub struct SubmitConfig { + /// If `Some`, use this gas price for the first attempt. + /// If `None`, fetch the current network gas price. + pub(crate) initial_gas_price: Option, + /// If `Some`, clamp increased gas price to this limit. + /// If `None`, do not bump gas for mempool/timeout errors. + pub(crate) max_gas_price: Option, + /// Gas limit of the transaction + pub(crate) max_gas: Option, + /// Factor by which to multiply the gas price on each mempool/timeout error. + /// E.g. if factor=11 => a 10% bump => newGas = (gas * factor) / 10 + pub(crate) gas_increase_factor: Option, + /// The maximum number of gas bumps (for mempool/timeout). If `max_gas_price` is set, + /// we typically rely on clamping. But you can still cap the number of bumps if you want. + pub(crate) max_retries: Option, + /// Seconds to wait between attempts. + pub(crate) interval_secs: Option, +} + +const DEFAULT_INTERVAL_SECS: u64 = 2; +const DEFAULT_MAX_RETRIES: usize = 5; + +impl Default for SubmitConfig { + fn default() -> Self { + Self { + initial_gas_price: None, + max_gas_price: None, + max_gas: None, + gas_increase_factor: Some(11), // implies 10% bump if we do (gas*11)/10 + max_retries: Some(DEFAULT_MAX_RETRIES), + interval_secs: Some(DEFAULT_INTERVAL_SECS), + } + } +} + +/// A simple function to detect if the retry is from a mempool or timeout error. +/// Right now, we rely on `submit_once` returning `SubmissionAction::Retry` for ANY error +/// that is "retryable," so we must parse the error string from `submit_once`, or +/// store that string. Another approach is to return an enum with a reason from `submit_once`. +fn is_mempool_or_timeout_error(error_str: String) -> bool { + let lower = error_str.to_lowercase(); + lower.contains("mempool") || lower.contains("timeout") +} + +/// A function that performs a single submission attempt: +/// - Sends the transaction +/// - Awaits the receipt with limited internal retries +/// - Returns a `SubmissionAction` indicating success, retry, or error. +pub async fn submit_once(call: ContractCall) -> SubmissionAction +where + M: Middleware + 'static, + T: Detokenize, +{ + let pending_tx = match call.send().await { + Ok(tx) => tx, + Err(e) => { + let msg = e.to_string(); + if is_mempool_or_timeout_error(msg.clone()) { + return SubmissionAction::Retry(format!("mempool/timeout: {:?}", e)); + } + + debug!("Error sending transaction: {:?}", msg); + return SubmissionAction::Error(format!("Transaction failed: {}", msg)); + } + }; + + debug!("Signed tx hash: {:?}", pending_tx.tx_hash()); + + let receipt_result = pending_tx.await; + match receipt_result { + Ok(Some(receipt)) => { + info!("Transaction mined, receipt: {:?}", receipt); + SubmissionAction::Success(receipt) + } + Ok(None) => { + debug!("Transaction probably timed out; retrying"); + SubmissionAction::Retry("timeout, receipt is none".to_string()) + } + Err(ProviderError::HTTPError(e)) => { + debug!("HTTP error retrieving receipt: {:?}", e); + SubmissionAction::Retry(format!("http error: {:?}", e)) + } + Err(e) => SubmissionAction::Error(format!("Transaction unrecoverable: {:?}", e)), + } +} + +/// Increase gas price using integer arithmetic: (gp * factor_num) / factor_den +fn increase_gas_price_u256(gp: U256, factor_num: u64, factor_den: u64) -> U256 { + let num = U256::from(factor_num); + let den = U256::from(factor_den); + gp.checked_mul(num).unwrap_or(U256::MAX) / den +} + +/// A higher-level function that wraps `submit_once` in a gas-price–adjustment loop, +/// plus a global timeout, plus distinct behavior for mempool/timeout vs other errors. +pub async fn submit_with_retry( + mut call: ContractCall, + config: &SubmitConfig, + middleware: Arc, +) -> Result +where + M: Middleware + 'static, + T: Detokenize, +{ + if let Some(max_gas) = config.max_gas { + call = call.gas(max_gas); + } + let mut gas_price = if let Some(gp) = config.initial_gas_price { + gp + } else { + middleware + .get_gas_price() + .await + .map_err(|e| format!("Failed to fetch gas price: {:?}", e))? + }; + + // If no factor is set, default to 11 => 10% bump + let factor_num = config.gas_increase_factor.unwrap_or(11); + let factor_den = 10u64; + + // Two counters: one for gas bumps, one for non-gas retries + let mut non_gas_retries = 0; + let max_retries = config.max_retries.unwrap_or(DEFAULT_MAX_RETRIES); + + loop { + // Set gas price on the call + call = call.gas_price(gas_price); + + match submit_once(call.clone()).await { + SubmissionAction::Success(receipt) => { + return Ok(receipt); + } + SubmissionAction::Retry(error_str) => { + // We need to figure out if it's "mempool/timeout" or some other reason. + // Right now, we don't have the error string from `submit_once` easily, + // so let's assume we store it or we do a separate function that returns it. + // For simplicity, let's do a hack: let's define a placeholder "error_str" and parse it. + // In reality, you'd likely return `SubmissionAction::Retry(reason_str)` from `submit_once`. + if is_mempool_or_timeout_error(error_str.clone()) { + // Mempool/timeout error + if let Some(max_gp) = config.max_gas_price { + if gas_price >= max_gp { + return Err(format!( + "Exceeded max gas price: {}, with error msg: {}", + max_gp, error_str + )); + } + // Bump the gas + let new_price = increase_gas_price_u256(gas_price, factor_num, factor_den); + gas_price = std::cmp::min(new_price, max_gp); + debug!("Bumping gas price to {}", gas_price); + } else { + // No maxGasPrice => we do NOT bump => fail + return Err( + "Mempool/timeout error, no maxGasPrice set => aborting".to_string() + ); + } + } else { + // Non-gas error => increment nonGasRetries + non_gas_retries += 1; + if non_gas_retries > max_retries { + return Err(format!("Exceeded non-gas retries: {}", max_retries)); + } + debug!( + "Non-gas retry #{} (same gas price: {})", + non_gas_retries, gas_price + ); + } + } + SubmissionAction::Error(e) => { + return Err(e); + } + } + + // Sleep between attempts + sleep(Duration::from_secs( + config.interval_secs.unwrap_or(DEFAULT_INTERVAL_SECS), + )) + .await; + } +} diff --git a/node/Cargo.toml b/node/Cargo.toml index 7178bf4..e75e173 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -43,6 +43,7 @@ rust-log = { package = "log", version = "0.4.22" } tracing-core = "0.1.32" tracing-log = "0.2.0" console-subscriber = { version = "0.4.1", optional = true } +contract-wrapper = { path = "../common/contract-wrapper" } [dependencies.libp2p] version = "0.45.1" diff --git a/node/miner/Cargo.toml b/node/miner/Cargo.toml index 697376b..e288ace 100644 --- a/node/miner/Cargo.toml +++ b/node/miner/Cargo.toml @@ -10,6 +10,7 @@ zgs_spec = { path = "../../common/spec" } zgs_seal = { path = "../../common/zgs_seal" } task_executor = { path = "../../common/task_executor" } contract-interface = { path = "../../common/contract-interface" } +contract-wrapper = { path = "../../common/contract-wrapper" } lighthouse_metrics = { path = "../../common/lighthouse_metrics" } ethereum-types = "0.14" tokio = { version = "1.19.2", features = ["full"] } diff --git a/node/miner/src/config.rs b/node/miner/src/config.rs index e9dcb71..63bd277 100644 --- a/node/miner/src/config.rs +++ b/node/miner/src/config.rs @@ -2,7 +2,8 @@ use std::str::FromStr; use std::sync::Arc; use std::time::Duration; -use ethereum_types::{Address, H256, U256}; +use contract_wrapper::SubmitConfig; +use ethereum_types::{Address, H256}; use ethers::core::k256::SecretKey; use ethers::middleware::SignerMiddleware; use ethers::providers::Http; @@ -21,7 +22,6 @@ pub struct MinerConfig { pub(crate) rpc_endpoint_url: String, pub(crate) mine_address: Address, pub(crate) flow_address: Address, - pub(crate) submission_gas: Option, pub(crate) cpu_percentage: u64, pub(crate) iter_batch: usize, pub(crate) shard_config: ShardConfig, @@ -29,6 +29,7 @@ pub struct MinerConfig { pub(crate) rate_limit_retries: u32, pub(crate) timeout_retries: u32, pub(crate) initial_backoff: u64, + pub(crate) submission_config: SubmitConfig, } pub type MineServiceMiddleware = SignerMiddleware>>, LocalWallet>; @@ -41,7 +42,6 @@ impl MinerConfig { rpc_endpoint_url: String, mine_address: Address, flow_address: Address, - submission_gas: Option, cpu_percentage: u64, iter_batch: usize, context_query_seconds: u64, @@ -49,6 +49,7 @@ impl MinerConfig { rate_limit_retries: u32, timeout_retries: u32, initial_backoff: u64, + submission_config: SubmitConfig, ) -> Option { miner_key.map(|miner_key| MinerConfig { miner_id, @@ -56,7 +57,6 @@ impl MinerConfig { rpc_endpoint_url, mine_address, flow_address, - submission_gas, cpu_percentage, iter_batch, shard_config, @@ -64,6 +64,7 @@ impl MinerConfig { rate_limit_retries, timeout_retries, initial_backoff, + submission_config, }) } diff --git a/node/miner/src/miner_id.rs b/node/miner/src/miner_id.rs index 16f692f..7c0454f 100644 --- a/node/miner/src/miner_id.rs +++ b/node/miner/src/miner_id.rs @@ -57,7 +57,7 @@ pub(crate) async fn check_and_request_miner_id( } (None, None) => { let beneficiary = provider.address(); - let id = request_miner_id(&mine_contract, beneficiary).await?; + let id = request_miner_id(config, &mine_contract, beneficiary).await?; set_miner_id(store, &id) .await .map_err(|e| format!("set miner id on db corrupt: {:?}", e))?; @@ -86,6 +86,7 @@ async fn check_miner_id( } async fn request_miner_id( + config: &MinerConfig, mine_contract: &PoraMine, beneficiary: Address, ) -> Result { @@ -94,16 +95,13 @@ async fn request_miner_id( let submission_call: ContractCall<_, _> = mine_contract.request_miner_id(beneficiary, 0).legacy(); - let pending_tx = submission_call - .send() - .await - .map_err(|e| format!("Fail to request miner id: {:?}", e))?; - - let receipt = pending_tx - .retries(3) - .await - .map_err(|e| format!("Fail to execute mine answer transaction: {:?}", e))? - .ok_or("Request miner id transaction dropped after 3 retries")?; + let receipt = contract_wrapper::submit_with_retry( + submission_call, + &config.submission_config, + mine_contract.client().clone(), + ) + .await + .map_err(|e| format!("Fail to submit miner id request: {:?}", e))?; let first_log = receipt .logs diff --git a/node/miner/src/submitter.rs b/node/miner/src/submitter.rs index e8eea0b..99251b1 100644 --- a/node/miner/src/submitter.rs +++ b/node/miner/src/submitter.rs @@ -1,14 +1,11 @@ use contract_interface::PoraAnswer; use contract_interface::{PoraMine, ZgsFlow}; -use ethereum_types::U256; -use ethers::abi::Detokenize; +use contract_wrapper::SubmitConfig; use ethers::contract::ContractCall; use ethers::prelude::{Http, Provider, RetryClient}; -use ethers::providers::{Middleware, ProviderError}; use hex::ToHex; use shared_types::FlowRangeProof; use std::sync::Arc; -use std::time::Duration; use storage::H256; use storage_async::Store; use task_executor::TaskExecutor; @@ -20,23 +17,13 @@ use crate::watcher::MineContextMessage; use zgs_spec::{BYTES_PER_SEAL, SECTORS_PER_SEAL}; -const SUBMISSION_RETRIES: usize = 15; -const ADJUST_GAS_RETRIES: usize = 20; - pub struct Submitter { mine_answer_receiver: mpsc::UnboundedReceiver, mine_context_receiver: broadcast::Receiver, mine_contract: PoraMine, flow_contract: ZgsFlow>>, - default_gas_limit: Option, store: Arc, - provider: Arc>>, -} - -enum SubmissionAction { - Retry, - Success, - Error(String), + config: SubmitConfig, } impl Submitter { @@ -51,7 +38,6 @@ impl Submitter { ) { let mine_contract = PoraMine::new(config.mine_address, signing_provider); let flow_contract = ZgsFlow::new(config.flow_address, provider.clone()); - let default_gas_limit = config.submission_gas; let submitter = Submitter { mine_answer_receiver, @@ -59,8 +45,7 @@ impl Submitter { mine_contract, flow_contract, store, - default_gas_limit, - provider, + config: config.submission_config, }; executor.spawn( async move { Box::pin(submitter.start()).await }, @@ -144,11 +129,7 @@ impl Submitter { }; trace!("submit_answer: answer={:?}", answer); - let mut submission_call: ContractCall<_, _> = self.mine_contract.submit(answer).legacy(); - - if let Some(gas_limit) = self.default_gas_limit { - submission_call = submission_call.gas(gas_limit); - } + let submission_call: ContractCall<_, _> = self.mine_contract.submit(answer).legacy(); if let Some(calldata) = submission_call.calldata() { debug!( @@ -163,96 +144,15 @@ impl Submitter { submission_call.estimate_gas().await ); - self.submit_with_retry(submission_call).await - } + contract_wrapper::submit_with_retry( + submission_call, + &self.config, + self.mine_contract.client().clone(), + ) + .await + .map_err(|e| format!("Failed to submit mine answer: {:?}", e))?; - async fn submit_with_retry( - &self, - mut submission_call: ContractCall, - ) -> Result<(), String> { - let mut gas_price = self - .provider - .get_gas_price() - .await - .map_err(|e| format!("Failed to get current gas price {:?}", e))?; - let mut n_retry = 0; - while n_retry < ADJUST_GAS_RETRIES { - n_retry += 1; - submission_call = submission_call.gas_price(gas_price); - match self.submit_once(submission_call.clone()).await { - SubmissionAction::Retry => { - gas_price = next_gas_price(gas_price); - } - SubmissionAction::Success => { - return Ok(()); - } - SubmissionAction::Error(e) => { - return Err(e); - } - } - } - - Err("Submission failed after retries".to_string()) - } - - async fn submit_once( - &self, - submission_call: ContractCall, - ) -> SubmissionAction { - let pending_transaction = match submission_call.send().await { - Ok(tx) => tx, - Err(e) => { - if e.to_string().contains("insufficient funds") - || e.to_string().contains("out of gas") - { - return SubmissionAction::Error(format!( - "Fail to execute PoRA submission transaction: {:?}", - e - )); - } - // Log the error and increase gas. - debug!("Error sending transaction: {:?}", e); - return SubmissionAction::Retry; - } - }; - - debug!( - "Signed submission transaction hash: {:?}", - pending_transaction.tx_hash() - ); - - let receipt_result = pending_transaction - .retries(SUBMISSION_RETRIES) - .interval(Duration::from_secs(2)) - .await; - - match receipt_result { - Ok(Some(receipt)) => { - // Successfully executed the transaction. - info!("Submit PoRA success, receipt: {:?}", receipt); - SubmissionAction::Success - } - Ok(None) => { - // The transaction did not complete within the specified waiting time. - debug!( - "Transaction dropped after {} retries; increasing gas and retrying", - SUBMISSION_RETRIES - ); - SubmissionAction::Retry - } - Err(ProviderError::HTTPError(e)) => { - // For HTTP errors, increase gas and retry. - debug!("HTTP error retrieving receipt: {:?}", e); - SubmissionAction::Retry - } - Err(e) => { - // For all other errors, return immediately. - SubmissionAction::Error(format!( - "Fail to execute PoRA submission transaction: {:?}", - e - )) - } - } + Ok(()) } } @@ -263,7 +163,3 @@ fn flow_proof_to_pora_merkle_proof(flow_proof: FlowRangeProof) -> Vec<[u8; 32]> // Exclude `item`, the nodes in the sealed data subtree, and `root`. full_proof[depth_in_sealed_data + 1..full_proof.len() - 1].to_vec() } - -fn next_gas_price(current_gas_price: U256) -> U256 { - current_gas_price * U256::from(11) / U256::from(10) -} diff --git a/node/src/config/convert.rs b/node/src/config/convert.rs index 5f76559..4c760e6 100644 --- a/node/src/config/convert.rs +++ b/node/src/config/convert.rs @@ -1,7 +1,7 @@ #![allow(clippy::field_reassign_with_default)] use crate::ZgsConfig; -use ethereum_types::{H256, U256}; +use ethereum_types::H256; use ethers::prelude::{Http, Middleware, Provider}; use log_entry_sync::{CacheConfig, ContractAddress, LogSyncConfig}; use miner::MinerConfig; @@ -179,7 +179,6 @@ impl ZgsConfig { } else { None }; - let submission_gas = self.miner_submission_gas.map(U256::from); let cpu_percentage = self.miner_cpu_percentage; let iter_batch = self.mine_iter_batch_size; let context_query_seconds = self.mine_context_query_seconds; @@ -192,7 +191,6 @@ impl ZgsConfig { self.blockchain_rpc_endpoint.clone(), mine_address, flow_address, - submission_gas, cpu_percentage, iter_batch, context_query_seconds, @@ -200,6 +198,7 @@ impl ZgsConfig { self.rate_limit_retries, self.timeout_retries, self.initial_backoff, + self.submission_config, )) } diff --git a/node/src/config/mod.rs b/node/src/config/mod.rs index fccc7f5..8861a69 100644 --- a/node/src/config/mod.rs +++ b/node/src/config/mod.rs @@ -74,7 +74,6 @@ build_config! { (mine_contract_address, (String), "".to_string()) (miner_id, (Option), None) (miner_key, (Option), None) - (miner_submission_gas, (Option), None) (miner_cpu_percentage, (u64), 100) (mine_iter_batch_size, (usize), 100) (reward_contract_address, (String), "".to_string()) @@ -106,6 +105,9 @@ pub struct ZgsConfig { // rpc config, configured by [rpc] section by `config` crate. pub rpc: rpc::RPCConfig, + // submission config, configured by [submission_config] section by `config` crate. + pub submission_config: contract_wrapper::SubmitConfig, + // metrics config, configured by [metrics] section by `config` crate. pub metrics: metrics::MetricsConfiguration, }