add gas auto adjustment (#330)

* add gas auto adjustment

* refactor config
This commit is contained in:
0g-peterzhb 2025-02-18 09:09:50 +08:00 committed by GitHub
parent 7ad3f717b4
commit 538afb00e1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 267 additions and 135 deletions

13
Cargo.lock generated
View File

@ -1271,6 +1271,17 @@ dependencies = [
"serde_json",
]
[[package]]
name = "contract-wrapper"
version = "0.1.0"
dependencies = [
"ethers",
"serde",
"serde_json",
"tokio",
"tracing",
]
[[package]]
name = "convert_case"
version = "0.6.0"
@ -5017,6 +5028,7 @@ dependencies = [
"async-trait",
"blake2",
"contract-interface",
"contract-wrapper",
"ethereum-types 0.14.1",
"ethers",
"hex",
@ -9257,6 +9269,7 @@ dependencies = [
"clap",
"config",
"console-subscriber",
"contract-wrapper",
"ctrlc",
"duration-str",
"error-chain",

View File

@ -0,0 +1,17 @@
[package]
name = "contract-wrapper"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio = { version = "1.28", features = ["macros"] }
ethers = "2.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tracing = "0.1.35"
# or `tracing` if you prefer
[features]
dev = []

View File

@ -0,0 +1,204 @@
use ethers::{
abi::Detokenize,
contract::ContractCall,
providers::{Middleware, ProviderError},
types::{TransactionReceipt, U256},
};
use serde::Deserialize;
use std::{sync::Arc, time::Duration};
use tokio::time::sleep;
use tracing::{debug, info};
/// The result of a single submission attempt.
#[derive(Debug)]
pub enum SubmissionAction {
Success(TransactionReceipt),
/// Generic "retry" signal, but we still need to know if it's "mempool/timeout" or something else.
/// We'll parse the error string or have a separate reason in a real app.
Retry(String),
Error(String),
}
/// Configuration for submission retries, gas price, etc.
#[derive(Clone, Copy, Debug, Deserialize)]
pub struct SubmitConfig {
/// If `Some`, use this gas price for the first attempt.
/// If `None`, fetch the current network gas price.
pub(crate) initial_gas_price: Option<U256>,
/// If `Some`, clamp increased gas price to this limit.
/// If `None`, do not bump gas for mempool/timeout errors.
pub(crate) max_gas_price: Option<U256>,
/// Gas limit of the transaction
pub(crate) max_gas: Option<U256>,
/// Factor by which to multiply the gas price on each mempool/timeout error.
/// E.g. if factor=11 => a 10% bump => newGas = (gas * factor) / 10
pub(crate) gas_increase_factor: Option<u64>,
/// The maximum number of gas bumps (for mempool/timeout). If `max_gas_price` is set,
/// we typically rely on clamping. But you can still cap the number of bumps if you want.
pub(crate) max_retries: Option<usize>,
/// Seconds to wait between attempts.
pub(crate) interval_secs: Option<u64>,
}
const DEFAULT_INTERVAL_SECS: u64 = 2;
const DEFAULT_MAX_RETRIES: usize = 5;
impl Default for SubmitConfig {
fn default() -> Self {
Self {
initial_gas_price: None,
max_gas_price: None,
max_gas: None,
gas_increase_factor: Some(11), // implies 10% bump if we do (gas*11)/10
max_retries: Some(DEFAULT_MAX_RETRIES),
interval_secs: Some(DEFAULT_INTERVAL_SECS),
}
}
}
/// A simple function to detect if the retry is from a mempool or timeout error.
/// Right now, we rely on `submit_once` returning `SubmissionAction::Retry` for ANY error
/// that is "retryable," so we must parse the error string from `submit_once`, or
/// store that string. Another approach is to return an enum with a reason from `submit_once`.
fn is_mempool_or_timeout_error(error_str: String) -> bool {
let lower = error_str.to_lowercase();
lower.contains("mempool") || lower.contains("timeout")
}
/// A function that performs a single submission attempt:
/// - Sends the transaction
/// - Awaits the receipt with limited internal retries
/// - Returns a `SubmissionAction` indicating success, retry, or error.
pub async fn submit_once<M, T>(call: ContractCall<M, T>) -> SubmissionAction
where
M: Middleware + 'static,
T: Detokenize,
{
let pending_tx = match call.send().await {
Ok(tx) => tx,
Err(e) => {
let msg = e.to_string();
if is_mempool_or_timeout_error(msg.clone()) {
return SubmissionAction::Retry(format!("mempool/timeout: {:?}", e));
}
debug!("Error sending transaction: {:?}", msg);
return SubmissionAction::Error(format!("Transaction failed: {}", msg));
}
};
debug!("Signed tx hash: {:?}", pending_tx.tx_hash());
let receipt_result = pending_tx.await;
match receipt_result {
Ok(Some(receipt)) => {
info!("Transaction mined, receipt: {:?}", receipt);
SubmissionAction::Success(receipt)
}
Ok(None) => {
debug!("Transaction probably timed out; retrying");
SubmissionAction::Retry("timeout, receipt is none".to_string())
}
Err(ProviderError::HTTPError(e)) => {
debug!("HTTP error retrieving receipt: {:?}", e);
SubmissionAction::Retry(format!("http error: {:?}", e))
}
Err(e) => SubmissionAction::Error(format!("Transaction unrecoverable: {:?}", e)),
}
}
/// Increase gas price using integer arithmetic: (gp * factor_num) / factor_den
fn increase_gas_price_u256(gp: U256, factor_num: u64, factor_den: u64) -> U256 {
let num = U256::from(factor_num);
let den = U256::from(factor_den);
gp.checked_mul(num).unwrap_or(U256::MAX) / den
}
/// A higher-level function that wraps `submit_once` in a gas-priceadjustment loop,
/// plus a global timeout, plus distinct behavior for mempool/timeout vs other errors.
pub async fn submit_with_retry<M, T>(
mut call: ContractCall<M, T>,
config: &SubmitConfig,
middleware: Arc<M>,
) -> Result<TransactionReceipt, String>
where
M: Middleware + 'static,
T: Detokenize,
{
if let Some(max_gas) = config.max_gas {
call = call.gas(max_gas);
}
let mut gas_price = if let Some(gp) = config.initial_gas_price {
gp
} else {
middleware
.get_gas_price()
.await
.map_err(|e| format!("Failed to fetch gas price: {:?}", e))?
};
// If no factor is set, default to 11 => 10% bump
let factor_num = config.gas_increase_factor.unwrap_or(11);
let factor_den = 10u64;
// Two counters: one for gas bumps, one for non-gas retries
let mut non_gas_retries = 0;
let max_retries = config.max_retries.unwrap_or(DEFAULT_MAX_RETRIES);
loop {
// Set gas price on the call
call = call.gas_price(gas_price);
match submit_once(call.clone()).await {
SubmissionAction::Success(receipt) => {
return Ok(receipt);
}
SubmissionAction::Retry(error_str) => {
// We need to figure out if it's "mempool/timeout" or some other reason.
// Right now, we don't have the error string from `submit_once` easily,
// so let's assume we store it or we do a separate function that returns it.
// For simplicity, let's do a hack: let's define a placeholder "error_str" and parse it.
// In reality, you'd likely return `SubmissionAction::Retry(reason_str)` from `submit_once`.
if is_mempool_or_timeout_error(error_str.clone()) {
// Mempool/timeout error
if let Some(max_gp) = config.max_gas_price {
if gas_price >= max_gp {
return Err(format!(
"Exceeded max gas price: {}, with error msg: {}",
max_gp, error_str
));
}
// Bump the gas
let new_price = increase_gas_price_u256(gas_price, factor_num, factor_den);
gas_price = std::cmp::min(new_price, max_gp);
debug!("Bumping gas price to {}", gas_price);
} else {
// No maxGasPrice => we do NOT bump => fail
return Err(
"Mempool/timeout error, no maxGasPrice set => aborting".to_string()
);
}
} else {
// Non-gas error => increment nonGasRetries
non_gas_retries += 1;
if non_gas_retries > max_retries {
return Err(format!("Exceeded non-gas retries: {}", max_retries));
}
debug!(
"Non-gas retry #{} (same gas price: {})",
non_gas_retries, gas_price
);
}
}
SubmissionAction::Error(e) => {
return Err(e);
}
}
// Sleep between attempts
sleep(Duration::from_secs(
config.interval_secs.unwrap_or(DEFAULT_INTERVAL_SECS),
))
.await;
}
}

View File

@ -43,6 +43,7 @@ rust-log = { package = "log", version = "0.4.22" }
tracing-core = "0.1.32"
tracing-log = "0.2.0"
console-subscriber = { version = "0.4.1", optional = true }
contract-wrapper = { path = "../common/contract-wrapper" }
[dependencies.libp2p]
version = "0.45.1"

View File

@ -10,6 +10,7 @@ zgs_spec = { path = "../../common/spec" }
zgs_seal = { path = "../../common/zgs_seal" }
task_executor = { path = "../../common/task_executor" }
contract-interface = { path = "../../common/contract-interface" }
contract-wrapper = { path = "../../common/contract-wrapper" }
lighthouse_metrics = { path = "../../common/lighthouse_metrics" }
ethereum-types = "0.14"
tokio = { version = "1.19.2", features = ["full"] }

View File

@ -2,7 +2,8 @@ use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use ethereum_types::{Address, H256, U256};
use contract_wrapper::SubmitConfig;
use ethereum_types::{Address, H256};
use ethers::core::k256::SecretKey;
use ethers::middleware::SignerMiddleware;
use ethers::providers::Http;
@ -21,7 +22,6 @@ pub struct MinerConfig {
pub(crate) rpc_endpoint_url: String,
pub(crate) mine_address: Address,
pub(crate) flow_address: Address,
pub(crate) submission_gas: Option<U256>,
pub(crate) cpu_percentage: u64,
pub(crate) iter_batch: usize,
pub(crate) shard_config: ShardConfig,
@ -29,6 +29,7 @@ pub struct MinerConfig {
pub(crate) rate_limit_retries: u32,
pub(crate) timeout_retries: u32,
pub(crate) initial_backoff: u64,
pub(crate) submission_config: SubmitConfig,
}
pub type MineServiceMiddleware = SignerMiddleware<Arc<Provider<RetryClient<Http>>>, LocalWallet>;
@ -41,7 +42,6 @@ impl MinerConfig {
rpc_endpoint_url: String,
mine_address: Address,
flow_address: Address,
submission_gas: Option<U256>,
cpu_percentage: u64,
iter_batch: usize,
context_query_seconds: u64,
@ -49,6 +49,7 @@ impl MinerConfig {
rate_limit_retries: u32,
timeout_retries: u32,
initial_backoff: u64,
submission_config: SubmitConfig,
) -> Option<MinerConfig> {
miner_key.map(|miner_key| MinerConfig {
miner_id,
@ -56,7 +57,6 @@ impl MinerConfig {
rpc_endpoint_url,
mine_address,
flow_address,
submission_gas,
cpu_percentage,
iter_batch,
shard_config,
@ -64,6 +64,7 @@ impl MinerConfig {
rate_limit_retries,
timeout_retries,
initial_backoff,
submission_config,
})
}

View File

@ -57,7 +57,7 @@ pub(crate) async fn check_and_request_miner_id(
}
(None, None) => {
let beneficiary = provider.address();
let id = request_miner_id(&mine_contract, beneficiary).await?;
let id = request_miner_id(config, &mine_contract, beneficiary).await?;
set_miner_id(store, &id)
.await
.map_err(|e| format!("set miner id on db corrupt: {:?}", e))?;
@ -86,6 +86,7 @@ async fn check_miner_id(
}
async fn request_miner_id(
config: &MinerConfig,
mine_contract: &PoraMine<MineServiceMiddleware>,
beneficiary: Address,
) -> Result<H256, String> {
@ -94,16 +95,13 @@ async fn request_miner_id(
let submission_call: ContractCall<_, _> =
mine_contract.request_miner_id(beneficiary, 0).legacy();
let pending_tx = submission_call
.send()
.await
.map_err(|e| format!("Fail to request miner id: {:?}", e))?;
let receipt = pending_tx
.retries(3)
.await
.map_err(|e| format!("Fail to execute mine answer transaction: {:?}", e))?
.ok_or("Request miner id transaction dropped after 3 retries")?;
let receipt = contract_wrapper::submit_with_retry(
submission_call,
&config.submission_config,
mine_contract.client().clone(),
)
.await
.map_err(|e| format!("Fail to submit miner id request: {:?}", e))?;
let first_log = receipt
.logs

View File

@ -1,14 +1,11 @@
use contract_interface::PoraAnswer;
use contract_interface::{PoraMine, ZgsFlow};
use ethereum_types::U256;
use ethers::abi::Detokenize;
use contract_wrapper::SubmitConfig;
use ethers::contract::ContractCall;
use ethers::prelude::{Http, Provider, RetryClient};
use ethers::providers::{Middleware, ProviderError};
use hex::ToHex;
use shared_types::FlowRangeProof;
use std::sync::Arc;
use std::time::Duration;
use storage::H256;
use storage_async::Store;
use task_executor::TaskExecutor;
@ -20,23 +17,13 @@ use crate::watcher::MineContextMessage;
use zgs_spec::{BYTES_PER_SEAL, SECTORS_PER_SEAL};
const SUBMISSION_RETRIES: usize = 15;
const ADJUST_GAS_RETRIES: usize = 20;
pub struct Submitter {
mine_answer_receiver: mpsc::UnboundedReceiver<AnswerWithoutProof>,
mine_context_receiver: broadcast::Receiver<MineContextMessage>,
mine_contract: PoraMine<MineServiceMiddleware>,
flow_contract: ZgsFlow<Provider<RetryClient<Http>>>,
default_gas_limit: Option<U256>,
store: Arc<Store>,
provider: Arc<Provider<RetryClient<Http>>>,
}
enum SubmissionAction {
Retry,
Success,
Error(String),
config: SubmitConfig,
}
impl Submitter {
@ -51,7 +38,6 @@ impl Submitter {
) {
let mine_contract = PoraMine::new(config.mine_address, signing_provider);
let flow_contract = ZgsFlow::new(config.flow_address, provider.clone());
let default_gas_limit = config.submission_gas;
let submitter = Submitter {
mine_answer_receiver,
@ -59,8 +45,7 @@ impl Submitter {
mine_contract,
flow_contract,
store,
default_gas_limit,
provider,
config: config.submission_config,
};
executor.spawn(
async move { Box::pin(submitter.start()).await },
@ -144,11 +129,7 @@ impl Submitter {
};
trace!("submit_answer: answer={:?}", answer);
let mut submission_call: ContractCall<_, _> = self.mine_contract.submit(answer).legacy();
if let Some(gas_limit) = self.default_gas_limit {
submission_call = submission_call.gas(gas_limit);
}
let submission_call: ContractCall<_, _> = self.mine_contract.submit(answer).legacy();
if let Some(calldata) = submission_call.calldata() {
debug!(
@ -163,96 +144,15 @@ impl Submitter {
submission_call.estimate_gas().await
);
self.submit_with_retry(submission_call).await
}
contract_wrapper::submit_with_retry(
submission_call,
&self.config,
self.mine_contract.client().clone(),
)
.await
.map_err(|e| format!("Failed to submit mine answer: {:?}", e))?;
async fn submit_with_retry<M: Middleware, T: Detokenize>(
&self,
mut submission_call: ContractCall<M, T>,
) -> Result<(), String> {
let mut gas_price = self
.provider
.get_gas_price()
.await
.map_err(|e| format!("Failed to get current gas price {:?}", e))?;
let mut n_retry = 0;
while n_retry < ADJUST_GAS_RETRIES {
n_retry += 1;
submission_call = submission_call.gas_price(gas_price);
match self.submit_once(submission_call.clone()).await {
SubmissionAction::Retry => {
gas_price = next_gas_price(gas_price);
}
SubmissionAction::Success => {
return Ok(());
}
SubmissionAction::Error(e) => {
return Err(e);
}
}
}
Err("Submission failed after retries".to_string())
}
async fn submit_once<M: Middleware, T: Detokenize>(
&self,
submission_call: ContractCall<M, T>,
) -> SubmissionAction {
let pending_transaction = match submission_call.send().await {
Ok(tx) => tx,
Err(e) => {
if e.to_string().contains("insufficient funds")
|| e.to_string().contains("out of gas")
{
return SubmissionAction::Error(format!(
"Fail to execute PoRA submission transaction: {:?}",
e
));
}
// Log the error and increase gas.
debug!("Error sending transaction: {:?}", e);
return SubmissionAction::Retry;
}
};
debug!(
"Signed submission transaction hash: {:?}",
pending_transaction.tx_hash()
);
let receipt_result = pending_transaction
.retries(SUBMISSION_RETRIES)
.interval(Duration::from_secs(2))
.await;
match receipt_result {
Ok(Some(receipt)) => {
// Successfully executed the transaction.
info!("Submit PoRA success, receipt: {:?}", receipt);
SubmissionAction::Success
}
Ok(None) => {
// The transaction did not complete within the specified waiting time.
debug!(
"Transaction dropped after {} retries; increasing gas and retrying",
SUBMISSION_RETRIES
);
SubmissionAction::Retry
}
Err(ProviderError::HTTPError(e)) => {
// For HTTP errors, increase gas and retry.
debug!("HTTP error retrieving receipt: {:?}", e);
SubmissionAction::Retry
}
Err(e) => {
// For all other errors, return immediately.
SubmissionAction::Error(format!(
"Fail to execute PoRA submission transaction: {:?}",
e
))
}
}
Ok(())
}
}
@ -263,7 +163,3 @@ fn flow_proof_to_pora_merkle_proof(flow_proof: FlowRangeProof) -> Vec<[u8; 32]>
// Exclude `item`, the nodes in the sealed data subtree, and `root`.
full_proof[depth_in_sealed_data + 1..full_proof.len() - 1].to_vec()
}
fn next_gas_price(current_gas_price: U256) -> U256 {
current_gas_price * U256::from(11) / U256::from(10)
}

View File

@ -1,7 +1,7 @@
#![allow(clippy::field_reassign_with_default)]
use crate::ZgsConfig;
use ethereum_types::{H256, U256};
use ethereum_types::H256;
use ethers::prelude::{Http, Middleware, Provider};
use log_entry_sync::{CacheConfig, ContractAddress, LogSyncConfig};
use miner::MinerConfig;
@ -179,7 +179,6 @@ impl ZgsConfig {
} else {
None
};
let submission_gas = self.miner_submission_gas.map(U256::from);
let cpu_percentage = self.miner_cpu_percentage;
let iter_batch = self.mine_iter_batch_size;
let context_query_seconds = self.mine_context_query_seconds;
@ -192,7 +191,6 @@ impl ZgsConfig {
self.blockchain_rpc_endpoint.clone(),
mine_address,
flow_address,
submission_gas,
cpu_percentage,
iter_batch,
context_query_seconds,
@ -200,6 +198,7 @@ impl ZgsConfig {
self.rate_limit_retries,
self.timeout_retries,
self.initial_backoff,
self.submission_config,
))
}

View File

@ -74,7 +74,6 @@ build_config! {
(mine_contract_address, (String), "".to_string())
(miner_id, (Option<String>), None)
(miner_key, (Option<String>), None)
(miner_submission_gas, (Option<u64>), None)
(miner_cpu_percentage, (u64), 100)
(mine_iter_batch_size, (usize), 100)
(reward_contract_address, (String), "".to_string())
@ -106,6 +105,9 @@ pub struct ZgsConfig {
// rpc config, configured by [rpc] section by `config` crate.
pub rpc: rpc::RPCConfig,
// submission config, configured by [submission_config] section by `config` crate.
pub submission_config: contract_wrapper::SubmitConfig,
// metrics config, configured by [metrics] section by `config` crate.
pub metrics: metrics::MetricsConfiguration,
}