mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-04-04 15:35:18 +00:00
Compare commits
5 Commits
c7af2fcbc9
...
fd6285fef2
Author | SHA1 | Date | |
---|---|---|---|
![]() |
fd6285fef2 | ||
![]() |
6a26c336e7 | ||
![]() |
a915766840 | ||
![]() |
3d9aa8c940 | ||
![]() |
538afb00e1 |
13
Cargo.lock
generated
13
Cargo.lock
generated
@ -1271,6 +1271,17 @@ dependencies = [
|
|||||||
"serde_json",
|
"serde_json",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "contract-wrapper"
|
||||||
|
version = "0.1.0"
|
||||||
|
dependencies = [
|
||||||
|
"ethers",
|
||||||
|
"serde",
|
||||||
|
"serde_json",
|
||||||
|
"tokio",
|
||||||
|
"tracing",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "convert_case"
|
name = "convert_case"
|
||||||
version = "0.6.0"
|
version = "0.6.0"
|
||||||
@ -5017,6 +5028,7 @@ dependencies = [
|
|||||||
"async-trait",
|
"async-trait",
|
||||||
"blake2",
|
"blake2",
|
||||||
"contract-interface",
|
"contract-interface",
|
||||||
|
"contract-wrapper",
|
||||||
"ethereum-types 0.14.1",
|
"ethereum-types 0.14.1",
|
||||||
"ethers",
|
"ethers",
|
||||||
"hex",
|
"hex",
|
||||||
@ -9257,6 +9269,7 @@ dependencies = [
|
|||||||
"clap",
|
"clap",
|
||||||
"config",
|
"config",
|
||||||
"console-subscriber",
|
"console-subscriber",
|
||||||
|
"contract-wrapper",
|
||||||
"ctrlc",
|
"ctrlc",
|
||||||
"duration-str",
|
"duration-str",
|
||||||
"error-chain",
|
"error-chain",
|
||||||
|
17
common/contract-wrapper/Cargo.toml
Normal file
17
common/contract-wrapper/Cargo.toml
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
[package]
|
||||||
|
name = "contract-wrapper"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
tokio = { version = "1.28", features = ["macros"] }
|
||||||
|
ethers = "2.0"
|
||||||
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
|
serde_json = "1.0"
|
||||||
|
tracing = "0.1.35"
|
||||||
|
# or `tracing` if you prefer
|
||||||
|
|
||||||
|
[features]
|
||||||
|
dev = []
|
204
common/contract-wrapper/src/lib.rs
Normal file
204
common/contract-wrapper/src/lib.rs
Normal file
@ -0,0 +1,204 @@
|
|||||||
|
use ethers::{
|
||||||
|
abi::Detokenize,
|
||||||
|
contract::ContractCall,
|
||||||
|
providers::{Middleware, ProviderError},
|
||||||
|
types::{TransactionReceipt, U256},
|
||||||
|
};
|
||||||
|
use serde::Deserialize;
|
||||||
|
use std::{sync::Arc, time::Duration};
|
||||||
|
use tokio::time::sleep;
|
||||||
|
use tracing::{debug, info};
|
||||||
|
|
||||||
|
/// The result of a single submission attempt.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum SubmissionAction {
|
||||||
|
Success(TransactionReceipt),
|
||||||
|
/// Generic "retry" signal, but we still need to know if it's "mempool/timeout" or something else.
|
||||||
|
/// We'll parse the error string or have a separate reason in a real app.
|
||||||
|
Retry(String),
|
||||||
|
Error(String),
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Configuration for submission retries, gas price, etc.
|
||||||
|
#[derive(Clone, Copy, Debug, Deserialize)]
|
||||||
|
pub struct SubmitConfig {
|
||||||
|
/// If `Some`, use this gas price for the first attempt.
|
||||||
|
/// If `None`, fetch the current network gas price.
|
||||||
|
pub(crate) initial_gas_price: Option<U256>,
|
||||||
|
/// If `Some`, clamp increased gas price to this limit.
|
||||||
|
/// If `None`, do not bump gas for mempool/timeout errors.
|
||||||
|
pub(crate) max_gas_price: Option<U256>,
|
||||||
|
/// Gas limit of the transaction
|
||||||
|
pub(crate) max_gas: Option<U256>,
|
||||||
|
/// Factor by which to multiply the gas price on each mempool/timeout error.
|
||||||
|
/// E.g. if factor=11 => a 10% bump => newGas = (gas * factor) / 10
|
||||||
|
pub(crate) gas_increase_factor: Option<u64>,
|
||||||
|
/// The maximum number of gas bumps (for mempool/timeout). If `max_gas_price` is set,
|
||||||
|
/// we typically rely on clamping. But you can still cap the number of bumps if you want.
|
||||||
|
pub(crate) max_retries: Option<usize>,
|
||||||
|
/// Seconds to wait between attempts.
|
||||||
|
pub(crate) interval_secs: Option<u64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
const DEFAULT_INTERVAL_SECS: u64 = 2;
|
||||||
|
const DEFAULT_MAX_RETRIES: usize = 5;
|
||||||
|
|
||||||
|
impl Default for SubmitConfig {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
initial_gas_price: None,
|
||||||
|
max_gas_price: None,
|
||||||
|
max_gas: None,
|
||||||
|
gas_increase_factor: Some(11), // implies 10% bump if we do (gas*11)/10
|
||||||
|
max_retries: Some(DEFAULT_MAX_RETRIES),
|
||||||
|
interval_secs: Some(DEFAULT_INTERVAL_SECS),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A simple function to detect if the retry is from a mempool or timeout error.
|
||||||
|
/// Right now, we rely on `submit_once` returning `SubmissionAction::Retry` for ANY error
|
||||||
|
/// that is "retryable," so we must parse the error string from `submit_once`, or
|
||||||
|
/// store that string. Another approach is to return an enum with a reason from `submit_once`.
|
||||||
|
fn is_mempool_or_timeout_error(error_str: String) -> bool {
|
||||||
|
let lower = error_str.to_lowercase();
|
||||||
|
lower.contains("mempool") || lower.contains("timeout")
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A function that performs a single submission attempt:
|
||||||
|
/// - Sends the transaction
|
||||||
|
/// - Awaits the receipt with limited internal retries
|
||||||
|
/// - Returns a `SubmissionAction` indicating success, retry, or error.
|
||||||
|
pub async fn submit_once<M, T>(call: ContractCall<M, T>) -> SubmissionAction
|
||||||
|
where
|
||||||
|
M: Middleware + 'static,
|
||||||
|
T: Detokenize,
|
||||||
|
{
|
||||||
|
let pending_tx = match call.send().await {
|
||||||
|
Ok(tx) => tx,
|
||||||
|
Err(e) => {
|
||||||
|
let msg = e.to_string();
|
||||||
|
if is_mempool_or_timeout_error(msg.clone()) {
|
||||||
|
return SubmissionAction::Retry(format!("mempool/timeout: {:?}", e));
|
||||||
|
}
|
||||||
|
|
||||||
|
debug!("Error sending transaction: {:?}", msg);
|
||||||
|
return SubmissionAction::Error(format!("Transaction failed: {}", msg));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
debug!("Signed tx hash: {:?}", pending_tx.tx_hash());
|
||||||
|
|
||||||
|
let receipt_result = pending_tx.await;
|
||||||
|
match receipt_result {
|
||||||
|
Ok(Some(receipt)) => {
|
||||||
|
info!("Transaction mined, receipt: {:?}", receipt);
|
||||||
|
SubmissionAction::Success(receipt)
|
||||||
|
}
|
||||||
|
Ok(None) => {
|
||||||
|
debug!("Transaction probably timed out; retrying");
|
||||||
|
SubmissionAction::Retry("timeout, receipt is none".to_string())
|
||||||
|
}
|
||||||
|
Err(ProviderError::HTTPError(e)) => {
|
||||||
|
debug!("HTTP error retrieving receipt: {:?}", e);
|
||||||
|
SubmissionAction::Retry(format!("http error: {:?}", e))
|
||||||
|
}
|
||||||
|
Err(e) => SubmissionAction::Error(format!("Transaction unrecoverable: {:?}", e)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Increase gas price using integer arithmetic: (gp * factor_num) / factor_den
|
||||||
|
fn increase_gas_price_u256(gp: U256, factor_num: u64, factor_den: u64) -> U256 {
|
||||||
|
let num = U256::from(factor_num);
|
||||||
|
let den = U256::from(factor_den);
|
||||||
|
gp.checked_mul(num).unwrap_or(U256::MAX) / den
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A higher-level function that wraps `submit_once` in a gas-price–adjustment loop,
|
||||||
|
/// plus a global timeout, plus distinct behavior for mempool/timeout vs other errors.
|
||||||
|
pub async fn submit_with_retry<M, T>(
|
||||||
|
mut call: ContractCall<M, T>,
|
||||||
|
config: &SubmitConfig,
|
||||||
|
middleware: Arc<M>,
|
||||||
|
) -> Result<TransactionReceipt, String>
|
||||||
|
where
|
||||||
|
M: Middleware + 'static,
|
||||||
|
T: Detokenize,
|
||||||
|
{
|
||||||
|
if let Some(max_gas) = config.max_gas {
|
||||||
|
call = call.gas(max_gas);
|
||||||
|
}
|
||||||
|
let mut gas_price = if let Some(gp) = config.initial_gas_price {
|
||||||
|
gp
|
||||||
|
} else {
|
||||||
|
middleware
|
||||||
|
.get_gas_price()
|
||||||
|
.await
|
||||||
|
.map_err(|e| format!("Failed to fetch gas price: {:?}", e))?
|
||||||
|
};
|
||||||
|
|
||||||
|
// If no factor is set, default to 11 => 10% bump
|
||||||
|
let factor_num = config.gas_increase_factor.unwrap_or(11);
|
||||||
|
let factor_den = 10u64;
|
||||||
|
|
||||||
|
// Two counters: one for gas bumps, one for non-gas retries
|
||||||
|
let mut non_gas_retries = 0;
|
||||||
|
let max_retries = config.max_retries.unwrap_or(DEFAULT_MAX_RETRIES);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
// Set gas price on the call
|
||||||
|
call = call.gas_price(gas_price);
|
||||||
|
|
||||||
|
match submit_once(call.clone()).await {
|
||||||
|
SubmissionAction::Success(receipt) => {
|
||||||
|
return Ok(receipt);
|
||||||
|
}
|
||||||
|
SubmissionAction::Retry(error_str) => {
|
||||||
|
// We need to figure out if it's "mempool/timeout" or some other reason.
|
||||||
|
// Right now, we don't have the error string from `submit_once` easily,
|
||||||
|
// so let's assume we store it or we do a separate function that returns it.
|
||||||
|
// For simplicity, let's do a hack: let's define a placeholder "error_str" and parse it.
|
||||||
|
// In reality, you'd likely return `SubmissionAction::Retry(reason_str)` from `submit_once`.
|
||||||
|
if is_mempool_or_timeout_error(error_str.clone()) {
|
||||||
|
// Mempool/timeout error
|
||||||
|
if let Some(max_gp) = config.max_gas_price {
|
||||||
|
if gas_price >= max_gp {
|
||||||
|
return Err(format!(
|
||||||
|
"Exceeded max gas price: {}, with error msg: {}",
|
||||||
|
max_gp, error_str
|
||||||
|
));
|
||||||
|
}
|
||||||
|
// Bump the gas
|
||||||
|
let new_price = increase_gas_price_u256(gas_price, factor_num, factor_den);
|
||||||
|
gas_price = std::cmp::min(new_price, max_gp);
|
||||||
|
debug!("Bumping gas price to {}", gas_price);
|
||||||
|
} else {
|
||||||
|
// No maxGasPrice => we do NOT bump => fail
|
||||||
|
return Err(
|
||||||
|
"Mempool/timeout error, no maxGasPrice set => aborting".to_string()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Non-gas error => increment nonGasRetries
|
||||||
|
non_gas_retries += 1;
|
||||||
|
if non_gas_retries > max_retries {
|
||||||
|
return Err(format!("Exceeded non-gas retries: {}", max_retries));
|
||||||
|
}
|
||||||
|
debug!(
|
||||||
|
"Non-gas retry #{} (same gas price: {})",
|
||||||
|
non_gas_retries, gas_price
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
SubmissionAction::Error(e) => {
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sleep between attempts
|
||||||
|
sleep(Duration::from_secs(
|
||||||
|
config.interval_secs.unwrap_or(DEFAULT_INTERVAL_SECS),
|
||||||
|
))
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
}
|
@ -6,13 +6,13 @@ ZeroGravity system consists of a data availability layer (0G DA) on top of a dec
|
|||||||
|
|
||||||
Figure 1 illustrates the architecture of the 0G system. When a data block enters the 0G DA, it is first erasure coded and organized into multiple consecutive chunks through erasure coding. The merkle root as a commitment of the encoded data block is then submitted to the consensus layer to keep the order of the data entering the system. The chunks are then dispersed to different storage nodes in 0G Storage where the data may be further replicated to other nodes depending on the storage fee that the user pays. The storage nodes periodically participate in the mining process by interacting with the consensus network to accrue rewards from the system.
|
Figure 1 illustrates the architecture of the 0G system. When a data block enters the 0G DA, it is first erasure coded and organized into multiple consecutive chunks through erasure coding. The merkle root as a commitment of the encoded data block is then submitted to the consensus layer to keep the order of the data entering the system. The chunks are then dispersed to different storage nodes in 0G Storage where the data may be further replicated to other nodes depending on the storage fee that the user pays. The storage nodes periodically participate in the mining process by interacting with the consensus network to accrue rewards from the system.
|
||||||
|
|
||||||
<figure><img src="../../.gitbook/assets/zg-storage-architecture.png" alt=""><figcaption><p>Figure 1. The Architecture of 0G System</p></figcaption></figure>
|
<figure><img src="../.gitbook/assets/zg-storage-architecture.png" alt=""><figcaption><p>Figure 1. The Architecture of 0G System</p></figcaption></figure>
|
||||||
|
|
||||||
## 0G Storage
|
## 0G Storage
|
||||||
|
|
||||||
0G Storage employs layered design targeting to support different types of decentralized applications. Figure 2 shows the overview of the full stack layers of 0G Storage.
|
0G Storage employs layered design targeting to support different types of decentralized applications. Figure 2 shows the overview of the full stack layers of 0G Storage.
|
||||||
|
|
||||||
<figure><img src="../../.gitbook/assets/zg-storage-layer.png" alt=""><figcaption><p>Figure 2. Full Stack Solution of 0G Storage</p></figcaption></figure>
|
<figure><img src="../.gitbook/assets/zg-storage-layer.png" alt=""><figcaption><p>Figure 2. Full Stack Solution of 0G Storage</p></figcaption></figure>
|
||||||
|
|
||||||
The lowest is a log layer which is a decentralized system. It consists of multiple storage nodes to form a storage network. The network has built-in incentive mechanism to reward the data storage. The ordering of the uploaded data is guaranteed by a sequencing mechanism to provide a log-based semantics and abstraction. This layer is used to store unstructured raw data for permanent persistency.
|
The lowest is a log layer which is a decentralized system. It consists of multiple storage nodes to form a storage network. The network has built-in incentive mechanism to reward the data storage. The ordering of the uploaded data is guaranteed by a sequencing mechanism to provide a log-based semantics and abstraction. This layer is used to store unstructured raw data for permanent persistency.
|
||||||
|
|
||||||
|
@ -30,4 +30,4 @@ Precisely, the mining process has the following steps:
|
|||||||
6. For each piece $$\overrightarrow{v}$$, compute the Blake2b hash of the tuple ($$\mathsf{miner\_id}$$, $$\mathsf{nonce}$$, $$\mathsf{context\_digest}$$, $$\mathsf{start\_position}$$, $$\mathsf{mine\_length}$$, $$\overrightarrow{v}$$).
|
6. For each piece $$\overrightarrow{v}$$, compute the Blake2b hash of the tuple ($$\mathsf{miner\_id}$$, $$\mathsf{nonce}$$, $$\mathsf{context\_digest}$$, $$\mathsf{start\_position}$$, $$\mathsf{mine\_length}$$, $$\overrightarrow{v}$$).
|
||||||
7. If one of Blake2b hash output is smaller than a target value, the miner finds a legitimate PoRA solution.
|
7. If one of Blake2b hash output is smaller than a target value, the miner finds a legitimate PoRA solution.
|
||||||
|
|
||||||
<figure><img src="../../../.gitbook/assets/zg-storage-algorithm.png" alt=""><figcaption><p>Figure 1. Recall Position and Scratchpad Computation</p></figcaption></figure>
|
<figure><img src="../../.gitbook/assets/zg-storage-algorithm.png" alt=""><figcaption><p>Figure 1. Recall Position and Scratchpad Computation</p></figcaption></figure>
|
||||||
|
@ -6,4 +6,4 @@ A user-defined function will be used to deserialize the raw content in the log e
|
|||||||
|
|
||||||
When a new key-value node just joins the network, it connects to the log layer and plays the log entries from head to tail to construct the latest state of the key-value store. During the log entry playing, an application-specific key-value node can skip irrelevant log entries which do not contain stream IDs that it cares.
|
When a new key-value node just joins the network, it connects to the log layer and plays the log entries from head to tail to construct the latest state of the key-value store. During the log entry playing, an application-specific key-value node can skip irrelevant log entries which do not contain stream IDs that it cares.
|
||||||
|
|
||||||
<figure><img src="../../.gitbook/assets/zg-storage-log.png" alt=""><figcaption><p>Figure 1. Decentralized K-V Store</p></figcaption></figure>
|
<figure><img src="../.gitbook/assets/zg-storage-log.png" alt=""><figcaption><p>Figure 1. Decentralized K-V Store</p></figcaption></figure>
|
||||||
|
@ -8,7 +8,7 @@ When an application server linking with the 0G Storage key-value runtime starts
|
|||||||
|
|
||||||
When an application server with the key-value runtime encounters the commit record during playing the log, it identifies a conflict window consisting of all the log entries between the start log position of the transaction and the position of the commit record. The log entries in the conflict window therefore contain the key-value operations concurrent with the transaction submitting the commit record. The runtime further detects whether these concurrent operations contain the updates on the keys belonging to the read set of the transaction. If yes, the transaction is aborted, otherwise committed successfully.
|
When an application server with the key-value runtime encounters the commit record during playing the log, it identifies a conflict window consisting of all the log entries between the start log position of the transaction and the position of the commit record. The log entries in the conflict window therefore contain the key-value operations concurrent with the transaction submitting the commit record. The runtime further detects whether these concurrent operations contain the updates on the keys belonging to the read set of the transaction. If yes, the transaction is aborted, otherwise committed successfully.
|
||||||
|
|
||||||
<figure><img src="../../.gitbook/assets/zg-storage-transaction.png" alt=""><figcaption><p>Figure 1. Transaction Processing on 0G K-V Store</p></figcaption></figure>
|
<figure><img src="../.gitbook/assets/zg-storage-transaction.png" alt=""><figcaption><p>Figure 1. Transaction Processing on 0G K-V Store</p></figcaption></figure>
|
||||||
|
|
||||||
## Concurrent Assumption
|
## Concurrent Assumption
|
||||||
|
|
||||||
|
@ -43,6 +43,7 @@ rust-log = { package = "log", version = "0.4.22" }
|
|||||||
tracing-core = "0.1.32"
|
tracing-core = "0.1.32"
|
||||||
tracing-log = "0.2.0"
|
tracing-log = "0.2.0"
|
||||||
console-subscriber = { version = "0.4.1", optional = true }
|
console-subscriber = { version = "0.4.1", optional = true }
|
||||||
|
contract-wrapper = { path = "../common/contract-wrapper" }
|
||||||
|
|
||||||
[dependencies.libp2p]
|
[dependencies.libp2p]
|
||||||
version = "0.45.1"
|
version = "0.45.1"
|
||||||
|
@ -20,7 +20,7 @@ pub struct LogSyncConfig {
|
|||||||
// blockchain provider retry params
|
// blockchain provider retry params
|
||||||
// the number of retries after a connection times out
|
// the number of retries after a connection times out
|
||||||
pub rate_limit_retries: u32,
|
pub rate_limit_retries: u32,
|
||||||
// the nubmer of retries for rate limited responses
|
// the number of retries for rate limited responses
|
||||||
pub timeout_retries: u32,
|
pub timeout_retries: u32,
|
||||||
// the duration to wait before retry, in ms
|
// the duration to wait before retry, in ms
|
||||||
pub initial_backoff: u64,
|
pub initial_backoff: u64,
|
||||||
|
@ -10,6 +10,7 @@ zgs_spec = { path = "../../common/spec" }
|
|||||||
zgs_seal = { path = "../../common/zgs_seal" }
|
zgs_seal = { path = "../../common/zgs_seal" }
|
||||||
task_executor = { path = "../../common/task_executor" }
|
task_executor = { path = "../../common/task_executor" }
|
||||||
contract-interface = { path = "../../common/contract-interface" }
|
contract-interface = { path = "../../common/contract-interface" }
|
||||||
|
contract-wrapper = { path = "../../common/contract-wrapper" }
|
||||||
lighthouse_metrics = { path = "../../common/lighthouse_metrics" }
|
lighthouse_metrics = { path = "../../common/lighthouse_metrics" }
|
||||||
ethereum-types = "0.14"
|
ethereum-types = "0.14"
|
||||||
tokio = { version = "1.19.2", features = ["full"] }
|
tokio = { version = "1.19.2", features = ["full"] }
|
||||||
|
@ -2,7 +2,8 @@ use std::str::FromStr;
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use ethereum_types::{Address, H256, U256};
|
use contract_wrapper::SubmitConfig;
|
||||||
|
use ethereum_types::{Address, H256};
|
||||||
use ethers::core::k256::SecretKey;
|
use ethers::core::k256::SecretKey;
|
||||||
use ethers::middleware::SignerMiddleware;
|
use ethers::middleware::SignerMiddleware;
|
||||||
use ethers::providers::Http;
|
use ethers::providers::Http;
|
||||||
@ -21,7 +22,6 @@ pub struct MinerConfig {
|
|||||||
pub(crate) rpc_endpoint_url: String,
|
pub(crate) rpc_endpoint_url: String,
|
||||||
pub(crate) mine_address: Address,
|
pub(crate) mine_address: Address,
|
||||||
pub(crate) flow_address: Address,
|
pub(crate) flow_address: Address,
|
||||||
pub(crate) submission_gas: Option<U256>,
|
|
||||||
pub(crate) cpu_percentage: u64,
|
pub(crate) cpu_percentage: u64,
|
||||||
pub(crate) iter_batch: usize,
|
pub(crate) iter_batch: usize,
|
||||||
pub(crate) shard_config: ShardConfig,
|
pub(crate) shard_config: ShardConfig,
|
||||||
@ -29,6 +29,7 @@ pub struct MinerConfig {
|
|||||||
pub(crate) rate_limit_retries: u32,
|
pub(crate) rate_limit_retries: u32,
|
||||||
pub(crate) timeout_retries: u32,
|
pub(crate) timeout_retries: u32,
|
||||||
pub(crate) initial_backoff: u64,
|
pub(crate) initial_backoff: u64,
|
||||||
|
pub(crate) submission_config: SubmitConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type MineServiceMiddleware = SignerMiddleware<Arc<Provider<RetryClient<Http>>>, LocalWallet>;
|
pub type MineServiceMiddleware = SignerMiddleware<Arc<Provider<RetryClient<Http>>>, LocalWallet>;
|
||||||
@ -41,7 +42,6 @@ impl MinerConfig {
|
|||||||
rpc_endpoint_url: String,
|
rpc_endpoint_url: String,
|
||||||
mine_address: Address,
|
mine_address: Address,
|
||||||
flow_address: Address,
|
flow_address: Address,
|
||||||
submission_gas: Option<U256>,
|
|
||||||
cpu_percentage: u64,
|
cpu_percentage: u64,
|
||||||
iter_batch: usize,
|
iter_batch: usize,
|
||||||
context_query_seconds: u64,
|
context_query_seconds: u64,
|
||||||
@ -49,6 +49,7 @@ impl MinerConfig {
|
|||||||
rate_limit_retries: u32,
|
rate_limit_retries: u32,
|
||||||
timeout_retries: u32,
|
timeout_retries: u32,
|
||||||
initial_backoff: u64,
|
initial_backoff: u64,
|
||||||
|
submission_config: SubmitConfig,
|
||||||
) -> Option<MinerConfig> {
|
) -> Option<MinerConfig> {
|
||||||
miner_key.map(|miner_key| MinerConfig {
|
miner_key.map(|miner_key| MinerConfig {
|
||||||
miner_id,
|
miner_id,
|
||||||
@ -56,7 +57,6 @@ impl MinerConfig {
|
|||||||
rpc_endpoint_url,
|
rpc_endpoint_url,
|
||||||
mine_address,
|
mine_address,
|
||||||
flow_address,
|
flow_address,
|
||||||
submission_gas,
|
|
||||||
cpu_percentage,
|
cpu_percentage,
|
||||||
iter_batch,
|
iter_batch,
|
||||||
shard_config,
|
shard_config,
|
||||||
@ -64,6 +64,7 @@ impl MinerConfig {
|
|||||||
rate_limit_retries,
|
rate_limit_retries,
|
||||||
timeout_retries,
|
timeout_retries,
|
||||||
initial_backoff,
|
initial_backoff,
|
||||||
|
submission_config,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -57,7 +57,7 @@ pub(crate) async fn check_and_request_miner_id(
|
|||||||
}
|
}
|
||||||
(None, None) => {
|
(None, None) => {
|
||||||
let beneficiary = provider.address();
|
let beneficiary = provider.address();
|
||||||
let id = request_miner_id(&mine_contract, beneficiary).await?;
|
let id = request_miner_id(config, &mine_contract, beneficiary).await?;
|
||||||
set_miner_id(store, &id)
|
set_miner_id(store, &id)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| format!("set miner id on db corrupt: {:?}", e))?;
|
.map_err(|e| format!("set miner id on db corrupt: {:?}", e))?;
|
||||||
@ -86,6 +86,7 @@ async fn check_miner_id(
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn request_miner_id(
|
async fn request_miner_id(
|
||||||
|
config: &MinerConfig,
|
||||||
mine_contract: &PoraMine<MineServiceMiddleware>,
|
mine_contract: &PoraMine<MineServiceMiddleware>,
|
||||||
beneficiary: Address,
|
beneficiary: Address,
|
||||||
) -> Result<H256, String> {
|
) -> Result<H256, String> {
|
||||||
@ -94,16 +95,13 @@ async fn request_miner_id(
|
|||||||
let submission_call: ContractCall<_, _> =
|
let submission_call: ContractCall<_, _> =
|
||||||
mine_contract.request_miner_id(beneficiary, 0).legacy();
|
mine_contract.request_miner_id(beneficiary, 0).legacy();
|
||||||
|
|
||||||
let pending_tx = submission_call
|
let receipt = contract_wrapper::submit_with_retry(
|
||||||
.send()
|
submission_call,
|
||||||
.await
|
&config.submission_config,
|
||||||
.map_err(|e| format!("Fail to request miner id: {:?}", e))?;
|
mine_contract.client().clone(),
|
||||||
|
)
|
||||||
let receipt = pending_tx
|
.await
|
||||||
.retries(3)
|
.map_err(|e| format!("Fail to submit miner id request: {:?}", e))?;
|
||||||
.await
|
|
||||||
.map_err(|e| format!("Fail to execute mine answer transaction: {:?}", e))?
|
|
||||||
.ok_or("Request miner id transaction dropped after 3 retries")?;
|
|
||||||
|
|
||||||
let first_log = receipt
|
let first_log = receipt
|
||||||
.logs
|
.logs
|
||||||
|
@ -1,14 +1,11 @@
|
|||||||
use contract_interface::PoraAnswer;
|
use contract_interface::PoraAnswer;
|
||||||
use contract_interface::{PoraMine, ZgsFlow};
|
use contract_interface::{PoraMine, ZgsFlow};
|
||||||
use ethereum_types::U256;
|
use contract_wrapper::SubmitConfig;
|
||||||
use ethers::abi::Detokenize;
|
|
||||||
use ethers::contract::ContractCall;
|
use ethers::contract::ContractCall;
|
||||||
use ethers::prelude::{Http, Provider, RetryClient};
|
use ethers::prelude::{Http, Provider, RetryClient};
|
||||||
use ethers::providers::{Middleware, ProviderError};
|
|
||||||
use hex::ToHex;
|
use hex::ToHex;
|
||||||
use shared_types::FlowRangeProof;
|
use shared_types::FlowRangeProof;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
|
||||||
use storage::H256;
|
use storage::H256;
|
||||||
use storage_async::Store;
|
use storage_async::Store;
|
||||||
use task_executor::TaskExecutor;
|
use task_executor::TaskExecutor;
|
||||||
@ -20,23 +17,13 @@ use crate::watcher::MineContextMessage;
|
|||||||
|
|
||||||
use zgs_spec::{BYTES_PER_SEAL, SECTORS_PER_SEAL};
|
use zgs_spec::{BYTES_PER_SEAL, SECTORS_PER_SEAL};
|
||||||
|
|
||||||
const SUBMISSION_RETRIES: usize = 15;
|
|
||||||
const ADJUST_GAS_RETRIES: usize = 20;
|
|
||||||
|
|
||||||
pub struct Submitter {
|
pub struct Submitter {
|
||||||
mine_answer_receiver: mpsc::UnboundedReceiver<AnswerWithoutProof>,
|
mine_answer_receiver: mpsc::UnboundedReceiver<AnswerWithoutProof>,
|
||||||
mine_context_receiver: broadcast::Receiver<MineContextMessage>,
|
mine_context_receiver: broadcast::Receiver<MineContextMessage>,
|
||||||
mine_contract: PoraMine<MineServiceMiddleware>,
|
mine_contract: PoraMine<MineServiceMiddleware>,
|
||||||
flow_contract: ZgsFlow<Provider<RetryClient<Http>>>,
|
flow_contract: ZgsFlow<Provider<RetryClient<Http>>>,
|
||||||
default_gas_limit: Option<U256>,
|
|
||||||
store: Arc<Store>,
|
store: Arc<Store>,
|
||||||
provider: Arc<Provider<RetryClient<Http>>>,
|
config: SubmitConfig,
|
||||||
}
|
|
||||||
|
|
||||||
enum SubmissionAction {
|
|
||||||
Retry,
|
|
||||||
Success,
|
|
||||||
Error(String),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Submitter {
|
impl Submitter {
|
||||||
@ -51,7 +38,6 @@ impl Submitter {
|
|||||||
) {
|
) {
|
||||||
let mine_contract = PoraMine::new(config.mine_address, signing_provider);
|
let mine_contract = PoraMine::new(config.mine_address, signing_provider);
|
||||||
let flow_contract = ZgsFlow::new(config.flow_address, provider.clone());
|
let flow_contract = ZgsFlow::new(config.flow_address, provider.clone());
|
||||||
let default_gas_limit = config.submission_gas;
|
|
||||||
|
|
||||||
let submitter = Submitter {
|
let submitter = Submitter {
|
||||||
mine_answer_receiver,
|
mine_answer_receiver,
|
||||||
@ -59,8 +45,7 @@ impl Submitter {
|
|||||||
mine_contract,
|
mine_contract,
|
||||||
flow_contract,
|
flow_contract,
|
||||||
store,
|
store,
|
||||||
default_gas_limit,
|
config: config.submission_config,
|
||||||
provider,
|
|
||||||
};
|
};
|
||||||
executor.spawn(
|
executor.spawn(
|
||||||
async move { Box::pin(submitter.start()).await },
|
async move { Box::pin(submitter.start()).await },
|
||||||
@ -144,11 +129,7 @@ impl Submitter {
|
|||||||
};
|
};
|
||||||
trace!("submit_answer: answer={:?}", answer);
|
trace!("submit_answer: answer={:?}", answer);
|
||||||
|
|
||||||
let mut submission_call: ContractCall<_, _> = self.mine_contract.submit(answer).legacy();
|
let submission_call: ContractCall<_, _> = self.mine_contract.submit(answer).legacy();
|
||||||
|
|
||||||
if let Some(gas_limit) = self.default_gas_limit {
|
|
||||||
submission_call = submission_call.gas(gas_limit);
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(calldata) = submission_call.calldata() {
|
if let Some(calldata) = submission_call.calldata() {
|
||||||
debug!(
|
debug!(
|
||||||
@ -163,96 +144,15 @@ impl Submitter {
|
|||||||
submission_call.estimate_gas().await
|
submission_call.estimate_gas().await
|
||||||
);
|
);
|
||||||
|
|
||||||
self.submit_with_retry(submission_call).await
|
contract_wrapper::submit_with_retry(
|
||||||
}
|
submission_call,
|
||||||
|
&self.config,
|
||||||
|
self.mine_contract.client().clone(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.map_err(|e| format!("Failed to submit mine answer: {:?}", e))?;
|
||||||
|
|
||||||
async fn submit_with_retry<M: Middleware, T: Detokenize>(
|
Ok(())
|
||||||
&self,
|
|
||||||
mut submission_call: ContractCall<M, T>,
|
|
||||||
) -> Result<(), String> {
|
|
||||||
let mut gas_price = self
|
|
||||||
.provider
|
|
||||||
.get_gas_price()
|
|
||||||
.await
|
|
||||||
.map_err(|e| format!("Failed to get current gas price {:?}", e))?;
|
|
||||||
let mut n_retry = 0;
|
|
||||||
while n_retry < ADJUST_GAS_RETRIES {
|
|
||||||
n_retry += 1;
|
|
||||||
submission_call = submission_call.gas_price(gas_price);
|
|
||||||
match self.submit_once(submission_call.clone()).await {
|
|
||||||
SubmissionAction::Retry => {
|
|
||||||
gas_price = next_gas_price(gas_price);
|
|
||||||
}
|
|
||||||
SubmissionAction::Success => {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
SubmissionAction::Error(e) => {
|
|
||||||
return Err(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Err("Submission failed after retries".to_string())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn submit_once<M: Middleware, T: Detokenize>(
|
|
||||||
&self,
|
|
||||||
submission_call: ContractCall<M, T>,
|
|
||||||
) -> SubmissionAction {
|
|
||||||
let pending_transaction = match submission_call.send().await {
|
|
||||||
Ok(tx) => tx,
|
|
||||||
Err(e) => {
|
|
||||||
if e.to_string().contains("insufficient funds")
|
|
||||||
|| e.to_string().contains("out of gas")
|
|
||||||
{
|
|
||||||
return SubmissionAction::Error(format!(
|
|
||||||
"Fail to execute PoRA submission transaction: {:?}",
|
|
||||||
e
|
|
||||||
));
|
|
||||||
}
|
|
||||||
// Log the error and increase gas.
|
|
||||||
debug!("Error sending transaction: {:?}", e);
|
|
||||||
return SubmissionAction::Retry;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
debug!(
|
|
||||||
"Signed submission transaction hash: {:?}",
|
|
||||||
pending_transaction.tx_hash()
|
|
||||||
);
|
|
||||||
|
|
||||||
let receipt_result = pending_transaction
|
|
||||||
.retries(SUBMISSION_RETRIES)
|
|
||||||
.interval(Duration::from_secs(2))
|
|
||||||
.await;
|
|
||||||
|
|
||||||
match receipt_result {
|
|
||||||
Ok(Some(receipt)) => {
|
|
||||||
// Successfully executed the transaction.
|
|
||||||
info!("Submit PoRA success, receipt: {:?}", receipt);
|
|
||||||
SubmissionAction::Success
|
|
||||||
}
|
|
||||||
Ok(None) => {
|
|
||||||
// The transaction did not complete within the specified waiting time.
|
|
||||||
debug!(
|
|
||||||
"Transaction dropped after {} retries; increasing gas and retrying",
|
|
||||||
SUBMISSION_RETRIES
|
|
||||||
);
|
|
||||||
SubmissionAction::Retry
|
|
||||||
}
|
|
||||||
Err(ProviderError::HTTPError(e)) => {
|
|
||||||
// For HTTP errors, increase gas and retry.
|
|
||||||
debug!("HTTP error retrieving receipt: {:?}", e);
|
|
||||||
SubmissionAction::Retry
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
// For all other errors, return immediately.
|
|
||||||
SubmissionAction::Error(format!(
|
|
||||||
"Fail to execute PoRA submission transaction: {:?}",
|
|
||||||
e
|
|
||||||
))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -263,7 +163,3 @@ fn flow_proof_to_pora_merkle_proof(flow_proof: FlowRangeProof) -> Vec<[u8; 32]>
|
|||||||
// Exclude `item`, the nodes in the sealed data subtree, and `root`.
|
// Exclude `item`, the nodes in the sealed data subtree, and `root`.
|
||||||
full_proof[depth_in_sealed_data + 1..full_proof.len() - 1].to_vec()
|
full_proof[depth_in_sealed_data + 1..full_proof.len() - 1].to_vec()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn next_gas_price(current_gas_price: U256) -> U256 {
|
|
||||||
current_gas_price * U256::from(11) / U256::from(10)
|
|
||||||
}
|
|
||||||
|
@ -103,7 +103,7 @@ pub struct Config {
|
|||||||
/// Subscribe to all subnets for the duration of the runtime.
|
/// Subscribe to all subnets for the duration of the runtime.
|
||||||
pub subscribe_all_subnets: bool,
|
pub subscribe_all_subnets: bool,
|
||||||
|
|
||||||
/// Import/aggregate all attestations recieved on subscribed subnets for the duration of the
|
/// Import/aggregate all attestations received on subscribed subnets for the duration of the
|
||||||
/// runtime.
|
/// runtime.
|
||||||
pub import_all_attestations: bool,
|
pub import_all_attestations: bool,
|
||||||
|
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
#![allow(clippy::field_reassign_with_default)]
|
#![allow(clippy::field_reassign_with_default)]
|
||||||
|
|
||||||
use crate::ZgsConfig;
|
use crate::ZgsConfig;
|
||||||
use ethereum_types::{H256, U256};
|
use ethereum_types::H256;
|
||||||
use ethers::prelude::{Http, Middleware, Provider};
|
use ethers::prelude::{Http, Middleware, Provider};
|
||||||
use log_entry_sync::{CacheConfig, ContractAddress, LogSyncConfig};
|
use log_entry_sync::{CacheConfig, ContractAddress, LogSyncConfig};
|
||||||
use miner::MinerConfig;
|
use miner::MinerConfig;
|
||||||
@ -179,7 +179,6 @@ impl ZgsConfig {
|
|||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
};
|
};
|
||||||
let submission_gas = self.miner_submission_gas.map(U256::from);
|
|
||||||
let cpu_percentage = self.miner_cpu_percentage;
|
let cpu_percentage = self.miner_cpu_percentage;
|
||||||
let iter_batch = self.mine_iter_batch_size;
|
let iter_batch = self.mine_iter_batch_size;
|
||||||
let context_query_seconds = self.mine_context_query_seconds;
|
let context_query_seconds = self.mine_context_query_seconds;
|
||||||
@ -192,7 +191,6 @@ impl ZgsConfig {
|
|||||||
self.blockchain_rpc_endpoint.clone(),
|
self.blockchain_rpc_endpoint.clone(),
|
||||||
mine_address,
|
mine_address,
|
||||||
flow_address,
|
flow_address,
|
||||||
submission_gas,
|
|
||||||
cpu_percentage,
|
cpu_percentage,
|
||||||
iter_batch,
|
iter_batch,
|
||||||
context_query_seconds,
|
context_query_seconds,
|
||||||
@ -200,6 +198,7 @@ impl ZgsConfig {
|
|||||||
self.rate_limit_retries,
|
self.rate_limit_retries,
|
||||||
self.timeout_retries,
|
self.timeout_retries,
|
||||||
self.initial_backoff,
|
self.initial_backoff,
|
||||||
|
self.submission_config,
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -74,7 +74,6 @@ build_config! {
|
|||||||
(mine_contract_address, (String), "".to_string())
|
(mine_contract_address, (String), "".to_string())
|
||||||
(miner_id, (Option<String>), None)
|
(miner_id, (Option<String>), None)
|
||||||
(miner_key, (Option<String>), None)
|
(miner_key, (Option<String>), None)
|
||||||
(miner_submission_gas, (Option<u64>), None)
|
|
||||||
(miner_cpu_percentage, (u64), 100)
|
(miner_cpu_percentage, (u64), 100)
|
||||||
(mine_iter_batch_size, (usize), 100)
|
(mine_iter_batch_size, (usize), 100)
|
||||||
(reward_contract_address, (String), "".to_string())
|
(reward_contract_address, (String), "".to_string())
|
||||||
@ -106,6 +105,9 @@ pub struct ZgsConfig {
|
|||||||
// rpc config, configured by [rpc] section by `config` crate.
|
// rpc config, configured by [rpc] section by `config` crate.
|
||||||
pub rpc: rpc::RPCConfig,
|
pub rpc: rpc::RPCConfig,
|
||||||
|
|
||||||
|
// submission config, configured by [submission_config] section by `config` crate.
|
||||||
|
pub submission_config: contract_wrapper::SubmitConfig,
|
||||||
|
|
||||||
// metrics config, configured by [metrics] section by `config` crate.
|
// metrics config, configured by [metrics] section by `config` crate.
|
||||||
pub metrics: metrics::MetricsConfiguration,
|
pub metrics: metrics::MetricsConfiguration,
|
||||||
}
|
}
|
||||||
|
@ -42,9 +42,9 @@ pub trait EnrKey: Send + Sync + Unpin + 'static {
|
|||||||
/// Returns the public key associated with current key pair.
|
/// Returns the public key associated with current key pair.
|
||||||
fn public(&self) -> Self::PublicKey;
|
fn public(&self) -> Self::PublicKey;
|
||||||
|
|
||||||
/// Provides a method to decode a raw public key from an ENR `BTreeMap` to a useable public key.
|
/// Provides a method to decode a raw public key from an ENR `BTreeMap` to a usable public key.
|
||||||
///
|
///
|
||||||
/// This method allows a key type to decode the raw bytes in an ENR to a useable
|
/// This method allows a key type to decode the raw bytes in an ENR to a usable
|
||||||
/// `EnrPublicKey`. It takes the ENR's `BTreeMap` and returns a public key.
|
/// `EnrPublicKey`. It takes the ENR's `BTreeMap` and returns a public key.
|
||||||
///
|
///
|
||||||
/// Note: This specifies the supported key schemes for an ENR.
|
/// Note: This specifies the supported key schemes for an ENR.
|
||||||
|
@ -1195,7 +1195,7 @@ mod tests {
|
|||||||
assert_eq!(enr.tcp4(), Some(tcp));
|
assert_eq!(enr.tcp4(), Some(tcp));
|
||||||
assert!(enr.verify());
|
assert!(enr.verify());
|
||||||
|
|
||||||
// Compare the encoding as the key itself can be differnet
|
// Compare the encoding as the key itself can be different
|
||||||
assert_eq!(enr.public_key().encode(), key.public().encode(),);
|
assert_eq!(enr.public_key().encode(), key.public().encode(),);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user