add gas auto adjustment

This commit is contained in:
Peter Zhang 2025-02-13 20:58:07 +08:00
parent 7ad3f717b4
commit d77c6422bc
9 changed files with 259 additions and 117 deletions

12
Cargo.lock generated
View File

@ -1271,6 +1271,17 @@ dependencies = [
"serde_json",
]
[[package]]
name = "contract-wrapper"
version = "0.1.0"
dependencies = [
"ethers",
"serde",
"serde_json",
"tokio",
"tracing",
]
[[package]]
name = "convert_case"
version = "0.6.0"
@ -5017,6 +5028,7 @@ dependencies = [
"async-trait",
"blake2",
"contract-interface",
"contract-wrapper",
"ethereum-types 0.14.1",
"ethers",
"hex",

View File

@ -0,0 +1,17 @@
[package]
name = "contract-wrapper"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio = { version = "1.28", features = ["macros"] }
ethers = "2.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tracing = "0.1.35"
# or `tracing` if you prefer
[features]
dev = []

View File

@ -0,0 +1,192 @@
use ethers::{
abi::Detokenize,
contract::ContractCall,
providers::{Middleware, ProviderError},
types::{TransactionReceipt, U256},
};
use std::{sync::Arc, time::Duration};
use tokio::time::sleep;
use tracing::{debug, info};
/// The result of a single submission attempt.
#[derive(Debug)]
pub enum SubmissionAction {
Success(TransactionReceipt),
/// Generic "retry" signal, but we still need to know if it's "mempool/timeout" or something else.
/// We'll parse the error string or have a separate reason in a real app.
Retry(String),
Error(String),
}
/// Configuration for submission retries, gas price, etc.
#[derive(Clone, Debug)]
pub struct SubmitConfig {
/// If `Some`, use this gas price for the first attempt.
/// If `None`, fetch the current network gas price.
pub initial_gas_price: Option<U256>,
/// If `Some`, clamp increased gas price to this limit.
/// If `None`, do not bump gas for mempool/timeout errors.
pub max_gas_price: Option<U256>,
/// Factor by which to multiply the gas price on each mempool/timeout error.
/// E.g. if factor=11 => a 10% bump => newGas = (gas * factor) / 10
pub gas_increase_factor: Option<u64>,
/// The maximum number of gas bumps (for mempool/timeout). If `max_gas_price` is set,
/// we typically rely on clamping. But you can still cap the number of bumps if you want.
pub max_retries: usize,
/// Seconds to wait between attempts.
pub interval_secs: u64,
/// A global timeout for the entire operation, in seconds. If 0, no global timeout.
pub total_timeout_secs: u64,
}
impl Default for SubmitConfig {
fn default() -> Self {
Self {
initial_gas_price: None,
max_gas_price: None,
gas_increase_factor: Some(11), // implies 10% bump if we do (gas*11)/10
max_retries: 5,
interval_secs: 2,
total_timeout_secs: 0, // no global timeout by default
}
}
}
/// A simple function to detect if the retry is from a mempool or timeout error.
/// Right now, we rely on `submit_once` returning `SubmissionAction::Retry` for ANY error
/// that is "retryable," so we must parse the error string from `submit_once`, or
/// store that string. Another approach is to return an enum with a reason from `submit_once`.
fn is_mempool_or_timeout_error(error_str: String) -> bool {
let lower = error_str.to_lowercase();
lower.contains("mempool") || lower.contains("timeout")
}
/// A function that performs a single submission attempt:
/// - Sends the transaction
/// - Awaits the receipt with limited internal retries
/// - Returns a `SubmissionAction` indicating success, retry, or error.
pub async fn submit_once<M, T>(call: ContractCall<M, T>) -> SubmissionAction
where
M: Middleware + 'static,
T: Detokenize,
{
let pending_tx = match call.send().await {
Ok(tx) => tx,
Err(e) => {
let msg = e.to_string().to_lowercase();
if msg.contains("mempool") || msg.contains("timeout") {
return SubmissionAction::Retry(format!("mempool/timeout: {:?}", e));
}
debug!("Error sending transaction: {:?}", msg);
return SubmissionAction::Error(format!("Transaction failed: {}", msg));
}
};
debug!("Signed tx hash: {:?}", pending_tx.tx_hash());
let receipt_result = pending_tx.await;
match receipt_result {
Ok(Some(receipt)) => {
info!("Transaction mined, receipt: {:?}", receipt);
SubmissionAction::Success(receipt)
}
Ok(None) => {
debug!("Transaction probably timed out; retrying");
SubmissionAction::Retry("timeout, receipt is none".to_string())
}
Err(ProviderError::HTTPError(e)) => {
debug!("HTTP error retrieving receipt: {:?}", e);
SubmissionAction::Retry(format!("http error: {:?}", e))
}
Err(e) => SubmissionAction::Error(format!("Transaction unrecoverable: {:?}", e)),
}
}
/// Increase gas price using integer arithmetic: (gp * factor_num) / factor_den
fn increase_gas_price_u256(gp: U256, factor_num: u64, factor_den: u64) -> U256 {
let num = U256::from(factor_num);
let den = U256::from(factor_den);
gp.checked_mul(num).unwrap_or(U256::MAX) / den
}
/// A higher-level function that wraps `submit_once` in a gas-priceadjustment loop,
/// plus a global timeout, plus distinct behavior for mempool/timeout vs other errors.
pub async fn submit_with_retry<M, T>(
mut call: ContractCall<M, T>,
config: &SubmitConfig,
middleware: Arc<M>,
) -> Result<TransactionReceipt, String>
where
M: Middleware + 'static,
T: Detokenize,
{
let mut gas_price = if let Some(gp) = config.initial_gas_price {
gp
} else {
middleware
.get_gas_price()
.await
.map_err(|e| format!("Failed to fetch gas price: {:?}", e))?
};
// If no factor is set, default to 11 => 10% bump
let factor_num = config.gas_increase_factor.unwrap_or(11);
let factor_den = 10u64;
// Two counters: one for gas bumps, one for non-gas retries
let mut non_gas_retries = 0;
loop {
// Set gas price on the call
call = call.gas_price(gas_price);
match submit_once(call.clone()).await {
SubmissionAction::Success(receipt) => {
return Ok(receipt);
}
SubmissionAction::Retry(error_str) => {
// We need to figure out if it's "mempool/timeout" or some other reason.
// Right now, we don't have the error string from `submit_once` easily,
// so let's assume we store it or we do a separate function that returns it.
// For simplicity, let's do a hack: let's define a placeholder "error_str" and parse it.
// In reality, you'd likely return `SubmissionAction::Retry(reason_str)` from `submit_once`.
if is_mempool_or_timeout_error(error_str) {
// Mempool/timeout error
if config.max_gas_price.is_none() {
// No maxGasPrice => we do NOT bump => fail
return Err(
"Mempool/timeout error, no maxGasPrice set => aborting".to_string()
);
} else {
// Bump the gas
let new_price = increase_gas_price_u256(gas_price, factor_num, factor_den);
if let Some(max_gp) = config.max_gas_price {
gas_price = std::cmp::min(new_price, max_gp);
} else {
// Shouldn't happen because we checked above
gas_price = new_price;
}
debug!("Bumping gas price to {}", gas_price);
}
} else {
// Non-gas error => increment nonGasRetries
non_gas_retries += 1;
if non_gas_retries > config.max_retries {
return Err(format!("Exceeded non-gas retries: {}", config.max_retries));
}
debug!(
"Non-gas retry #{} (same gas price: {})",
non_gas_retries, gas_price
);
}
}
SubmissionAction::Error(e) => {
return Err(e);
}
}
// Sleep between attempts
sleep(Duration::from_secs(config.interval_secs)).await;
}
}

View File

@ -10,6 +10,7 @@ zgs_spec = { path = "../../common/spec" }
zgs_seal = { path = "../../common/zgs_seal" }
task_executor = { path = "../../common/task_executor" }
contract-interface = { path = "../../common/contract-interface" }
contract-wrapper = { path = "../../common/contract-wrapper" }
lighthouse_metrics = { path = "../../common/lighthouse_metrics" }
ethereum-types = "0.14"
tokio = { version = "1.19.2", features = ["full"] }

View File

@ -29,6 +29,7 @@ pub struct MinerConfig {
pub(crate) rate_limit_retries: u32,
pub(crate) timeout_retries: u32,
pub(crate) initial_backoff: u64,
pub(crate) max_gas_price: Option<U256>,
}
pub type MineServiceMiddleware = SignerMiddleware<Arc<Provider<RetryClient<Http>>>, LocalWallet>;
@ -49,6 +50,7 @@ impl MinerConfig {
rate_limit_retries: u32,
timeout_retries: u32,
initial_backoff: u64,
max_gas_price: Option<U256>,
) -> Option<MinerConfig> {
miner_key.map(|miner_key| MinerConfig {
miner_id,
@ -64,6 +66,7 @@ impl MinerConfig {
rate_limit_retries,
timeout_retries,
initial_backoff,
max_gas_price,
})
}

View File

@ -57,7 +57,7 @@ pub(crate) async fn check_and_request_miner_id(
}
(None, None) => {
let beneficiary = provider.address();
let id = request_miner_id(&mine_contract, beneficiary).await?;
let id = request_miner_id(config, &mine_contract, beneficiary).await?;
set_miner_id(store, &id)
.await
.map_err(|e| format!("set miner id on db corrupt: {:?}", e))?;
@ -86,24 +86,30 @@ async fn check_miner_id(
}
async fn request_miner_id(
config: &MinerConfig,
mine_contract: &PoraMine<MineServiceMiddleware>,
beneficiary: Address,
) -> Result<H256, String> {
debug!("Requesting miner id on chain...");
let submission_call: ContractCall<_, _> =
let mut submission_call: ContractCall<_, _> =
mine_contract.request_miner_id(beneficiary, 0).legacy();
let pending_tx = submission_call
.send()
.await
.map_err(|e| format!("Fail to request miner id: {:?}", e))?;
if config.submission_gas.is_some() {
submission_call = submission_call.gas(config.submission_gas.unwrap());
}
let receipt = pending_tx
.retries(3)
.await
.map_err(|e| format!("Fail to execute mine answer transaction: {:?}", e))?
.ok_or("Request miner id transaction dropped after 3 retries")?;
let submission_config = contract_wrapper::SubmitConfig {
max_gas_price: config.max_gas_price,
..Default::default()
};
let receipt = contract_wrapper::submit_with_retry(
submission_call,
&submission_config,
mine_contract.client().clone(),
)
.await
.map_err(|e| format!("Fail to submit miner id request: {:?}", e))?;
let first_log = receipt
.logs

View File

@ -1,14 +1,11 @@
use contract_interface::PoraAnswer;
use contract_interface::{PoraMine, ZgsFlow};
use ethereum_types::U256;
use ethers::abi::Detokenize;
use ethers::contract::ContractCall;
use ethers::prelude::{Http, Provider, RetryClient};
use ethers::providers::{Middleware, ProviderError};
use hex::ToHex;
use shared_types::FlowRangeProof;
use std::sync::Arc;
use std::time::Duration;
use storage::H256;
use storage_async::Store;
use task_executor::TaskExecutor;
@ -20,23 +17,14 @@ use crate::watcher::MineContextMessage;
use zgs_spec::{BYTES_PER_SEAL, SECTORS_PER_SEAL};
const SUBMISSION_RETRIES: usize = 15;
const ADJUST_GAS_RETRIES: usize = 20;
pub struct Submitter {
mine_answer_receiver: mpsc::UnboundedReceiver<AnswerWithoutProof>,
mine_context_receiver: broadcast::Receiver<MineContextMessage>,
mine_contract: PoraMine<MineServiceMiddleware>,
flow_contract: ZgsFlow<Provider<RetryClient<Http>>>,
default_gas_limit: Option<U256>,
default_gas_price: Option<U256>,
store: Arc<Store>,
provider: Arc<Provider<RetryClient<Http>>>,
}
enum SubmissionAction {
Retry,
Success,
Error(String),
}
impl Submitter {
@ -52,6 +40,7 @@ impl Submitter {
let mine_contract = PoraMine::new(config.mine_address, signing_provider);
let flow_contract = ZgsFlow::new(config.flow_address, provider.clone());
let default_gas_limit = config.submission_gas;
let default_gas_price = config.max_gas_price;
let submitter = Submitter {
mine_answer_receiver,
@ -60,7 +49,7 @@ impl Submitter {
flow_contract,
store,
default_gas_limit,
provider,
default_gas_price,
};
executor.spawn(
async move { Box::pin(submitter.start()).await },
@ -163,96 +152,19 @@ impl Submitter {
submission_call.estimate_gas().await
);
self.submit_with_retry(submission_call).await
}
async fn submit_with_retry<M: Middleware, T: Detokenize>(
&self,
mut submission_call: ContractCall<M, T>,
) -> Result<(), String> {
let mut gas_price = self
.provider
.get_gas_price()
.await
.map_err(|e| format!("Failed to get current gas price {:?}", e))?;
let mut n_retry = 0;
while n_retry < ADJUST_GAS_RETRIES {
n_retry += 1;
submission_call = submission_call.gas_price(gas_price);
match self.submit_once(submission_call.clone()).await {
SubmissionAction::Retry => {
gas_price = next_gas_price(gas_price);
}
SubmissionAction::Success => {
return Ok(());
}
SubmissionAction::Error(e) => {
return Err(e);
}
}
}
Err("Submission failed after retries".to_string())
}
async fn submit_once<M: Middleware, T: Detokenize>(
&self,
submission_call: ContractCall<M, T>,
) -> SubmissionAction {
let pending_transaction = match submission_call.send().await {
Ok(tx) => tx,
Err(e) => {
if e.to_string().contains("insufficient funds")
|| e.to_string().contains("out of gas")
{
return SubmissionAction::Error(format!(
"Fail to execute PoRA submission transaction: {:?}",
e
));
}
// Log the error and increase gas.
debug!("Error sending transaction: {:?}", e);
return SubmissionAction::Retry;
}
let submission_config = contract_wrapper::SubmitConfig {
max_gas_price: self.default_gas_price,
..Default::default()
};
contract_wrapper::submit_with_retry(
submission_call,
&submission_config,
self.mine_contract.client().clone(),
)
.await
.map_err(|e| format!("Failed to submit mine answer: {:?}", e))?;
debug!(
"Signed submission transaction hash: {:?}",
pending_transaction.tx_hash()
);
let receipt_result = pending_transaction
.retries(SUBMISSION_RETRIES)
.interval(Duration::from_secs(2))
.await;
match receipt_result {
Ok(Some(receipt)) => {
// Successfully executed the transaction.
info!("Submit PoRA success, receipt: {:?}", receipt);
SubmissionAction::Success
}
Ok(None) => {
// The transaction did not complete within the specified waiting time.
debug!(
"Transaction dropped after {} retries; increasing gas and retrying",
SUBMISSION_RETRIES
);
SubmissionAction::Retry
}
Err(ProviderError::HTTPError(e)) => {
// For HTTP errors, increase gas and retry.
debug!("HTTP error retrieving receipt: {:?}", e);
SubmissionAction::Retry
}
Err(e) => {
// For all other errors, return immediately.
SubmissionAction::Error(format!(
"Fail to execute PoRA submission transaction: {:?}",
e
))
}
}
Ok(())
}
}
@ -263,7 +175,3 @@ fn flow_proof_to_pora_merkle_proof(flow_proof: FlowRangeProof) -> Vec<[u8; 32]>
// Exclude `item`, the nodes in the sealed data subtree, and `root`.
full_proof[depth_in_sealed_data + 1..full_proof.len() - 1].to_vec()
}
fn next_gas_price(current_gas_price: U256) -> U256 {
current_gas_price * U256::from(11) / U256::from(10)
}

View File

@ -180,6 +180,7 @@ impl ZgsConfig {
None
};
let submission_gas = self.miner_submission_gas.map(U256::from);
let submission_gas_price = self.miner_submission_gas_price.map(U256::from);
let cpu_percentage = self.miner_cpu_percentage;
let iter_batch = self.mine_iter_batch_size;
let context_query_seconds = self.mine_context_query_seconds;
@ -200,6 +201,7 @@ impl ZgsConfig {
self.rate_limit_retries,
self.timeout_retries,
self.initial_backoff,
submission_gas_price,
))
}

View File

@ -75,6 +75,7 @@ build_config! {
(miner_id, (Option<String>), None)
(miner_key, (Option<String>), None)
(miner_submission_gas, (Option<u64>), None)
(miner_submission_gas_price, (Option<u64>), None)
(miner_cpu_percentage, (u64), 100)
(mine_iter_batch_size, (usize), 100)
(reward_contract_address, (String), "".to_string())