From cca14e246efdd84eded9fc045c65e7df132858e7 Mon Sep 17 00:00:00 2001 From: bruno-valante <140794260+bruno-valante@users.noreply.github.com> Date: Fri, 12 Jul 2024 17:31:53 +0800 Subject: [PATCH] Support multiple mine submission (#124) * Support multiple mine submission * Update --- 0g-storage-contracts | 2 +- Cargo.lock | 1 + node/miner/Cargo.toml | 1 + node/miner/src/lib.rs | 1 + node/miner/src/metrics.rs | 32 +++++++++++++++++++++++++ node/miner/src/mine.rs | 12 ++++++---- node/miner/src/pora.rs | 10 ++++---- node/miner/src/submitter.rs | 2 +- node/miner/src/watcher.rs | 10 ++++---- tests/mine_test.py | 8 +++---- tests/mine_with_market_test.py | 16 +++++++------ tests/test_framework/blockchain_node.py | 24 +++++++++++++++++-- tests/test_framework/contract_proxy.py | 3 +++ 13 files changed, 93 insertions(+), 29 deletions(-) create mode 100644 node/miner/src/metrics.rs diff --git a/0g-storage-contracts b/0g-storage-contracts index faee29c..25bc14a 160000 --- a/0g-storage-contracts +++ b/0g-storage-contracts @@ -1 +1 @@ -Subproject commit faee29c2343036e1172321194569639ebeed6e01 +Subproject commit 25bc14a27441e8fb26e4d42d7c8c885f92d6c74a diff --git a/Cargo.lock b/Cargo.lock index 38caad8..2771cff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4527,6 +4527,7 @@ dependencies = [ "ethers", "hex", "lazy_static", + "lighthouse_metrics", "network", "rand 0.8.5", "shared_types", diff --git a/node/miner/Cargo.toml b/node/miner/Cargo.toml index 7f07aa9..697376b 100644 --- a/node/miner/Cargo.toml +++ b/node/miner/Cargo.toml @@ -10,6 +10,7 @@ zgs_spec = { path = "../../common/spec" } zgs_seal = { path = "../../common/zgs_seal" } task_executor = { path = "../../common/task_executor" } contract-interface = { path = "../../common/contract-interface" } +lighthouse_metrics = { path = "../../common/lighthouse_metrics" } ethereum-types = "0.14" tokio = { version = "1.19.2", features = ["full"] } tracing = "0.1.35" diff --git a/node/miner/src/lib.rs b/node/miner/src/lib.rs index e815c59..0fc06f4 100644 --- a/node/miner/src/lib.rs +++ b/node/miner/src/lib.rs @@ -6,6 +6,7 @@ extern crate lazy_static; mod config; mod loader; +mod metrics; mod mine; mod miner_id; pub mod pora; diff --git a/node/miner/src/metrics.rs b/node/miner/src/metrics.rs new file mode 100644 index 0000000..168d04a --- /dev/null +++ b/node/miner/src/metrics.rs @@ -0,0 +1,32 @@ +use lighthouse_metrics::{try_create_int_counter, IntCounter, Result}; + +lazy_static! { + pub static ref SCRATCH_PAD_ITER_COUNT: Result = try_create_int_counter( + "miner_scratch_pad_iter", + "Number of scratch pad iterations for PoRA" + ); + pub static ref LOADING_COUNT: Result = try_create_int_counter( + "miner_loading_iter", + "Number of loading iterations for PoRA" + ); + pub static ref PAD_MIX_COUNT: Result = try_create_int_counter( + "miner_mix_iter", + "Number of mix sealed data with scratch pad iterations for PoRA" + ); + pub static ref HIT_COUNT: Result = + try_create_int_counter("miner_hit", "Number of hit for PoRA"); +} + +pub fn report() -> String { + let s = |counter: &Result| match counter { + Ok(x) => format!("{}", x.get()), + Err(_) => "n/a".to_string(), + }; + format!( + "scratch pad: {}, loading: {}, pad_mix: {}, hit: {}", + s(&SCRATCH_PAD_ITER_COUNT), + s(&LOADING_COUNT), + s(&PAD_MIX_COUNT), + s(&HIT_COUNT) + ) +} diff --git a/node/miner/src/mine.rs b/node/miner/src/mine.rs index 33df555..6f41dd2 100644 --- a/node/miner/src/mine.rs +++ b/node/miner/src/mine.rs @@ -6,15 +6,16 @@ use task_executor::TaskExecutor; use tokio::sync::{broadcast, mpsc}; use tokio::time::{sleep, Duration, Instant}; +use storage::config::ShardConfig; use zgs_spec::{SECTORS_PER_LOAD, SECTORS_PER_MAX_MINING_RANGE, SECTORS_PER_PRICING}; +use super::metrics; use crate::recall_range::RecallRange; use crate::{ pora::{AnswerWithoutProof, Miner}, watcher::MineContextMessage, MinerConfig, MinerMessage, PoraLoader, }; -use storage::config::ShardConfig; use std::sync::Arc; @@ -122,7 +123,6 @@ impl PoraService { let cpu_percent: u64 = self.cpu_percentage; let diastole = sleep(Duration::from_secs(0)); tokio::pin!(diastole); - // info!("CPU percent {}", cpu_percent); loop { tokio::select! { @@ -157,19 +157,21 @@ impl PoraService { } maybe_msg = self.mine_context_receiver.recv() => { - trace!("PoraService receives context={:?}", maybe_msg); if let Some(msg) = maybe_msg { - debug!("Update mine service: {:?}", msg); + info!("Update mine service: {:?}", msg); + info!("Mine iterations statistics: {}", metrics::report()); self.puzzle = msg.map(|(context, target_quality)| PoraPuzzle { context, target_quality }); + } else { + warn!("Mine context channel closed."); } } () = &mut diastole, if !diastole.is_elapsed() => { } - _ = async {}, if mining_enabled && cpu_percent > 0 && self.as_miner().is_some() && diastole.is_elapsed() => { + _ = async {}, if mining_enabled && cpu_percent > 0 && self.as_miner().map_or(false, |miner| miner.range.mining_length > 0) && diastole.is_elapsed() => { let nonce = H256(rand::thread_rng().gen()); let miner = self.as_miner().unwrap(); diff --git a/node/miner/src/pora.rs b/node/miner/src/pora.rs index 107a103..4d0b9f6 100644 --- a/node/miner/src/pora.rs +++ b/node/miner/src/pora.rs @@ -1,8 +1,10 @@ +use super::metrics::*; use crate::recall_range::RecallRange; use crate::{MineRangeConfig, PoraLoader}; use blake2::{Blake2b512, Digest}; use contract_interface::zgs_flow::MineContext; use ethereum_types::{H256, U256}; +use lighthouse_metrics::inc_counter; use storage::log_store::MineLoadChunk; use tiny_keccak::{Hasher, Keccak}; use zgs_spec::{BYTES_PER_SCRATCHPAD, BYTES_PER_SEAL, SECTORS_PER_LOAD, SECTORS_PER_SEAL}; @@ -58,16 +60,13 @@ impl<'a> Miner<'a> { } pub async fn iteration(&self, nonce: H256) -> Option { + inc_counter(&SCRATCH_PAD_ITER_COUNT); let ScratchPad { scratch_pad, recall_seed, pad_seed, } = self.make_scratch_pad(&nonce); - if self.range.mining_length == 0 { - return None; - } - let recall_position = self.range.load_position(recall_seed)?; if !self.mine_range_config.is_covered(recall_position).unwrap() { trace!( @@ -77,6 +76,7 @@ impl<'a> Miner<'a> { return None; } + inc_counter(&LOADING_COUNT); let MineLoadChunk { loaded_chunk, avalibilities, @@ -95,6 +95,7 @@ impl<'a> Miner<'a> { .zip(avalibilities.into_iter()) .filter_map(|(data, avaliable)| avaliable.then_some(data)) { + inc_counter(&PAD_MIX_COUNT); // Rust can optimize this loop well. for (x, y) in sealed_data.iter_mut().zip(scratch_pad.iter()) { *x ^= y; @@ -111,6 +112,7 @@ impl<'a> Miner<'a> { U256::MAX / self.target_quality, quality_scale ); + inc_counter(&HIT_COUNT); // Undo mix data when find a valid solition for (x, y) in sealed_data.iter_mut().zip(scratch_pad.iter()) { *x ^= y; diff --git a/node/miner/src/submitter.rs b/node/miner/src/submitter.rs index efa2ae0..398436c 100644 --- a/node/miner/src/submitter.rs +++ b/node/miner/src/submitter.rs @@ -143,7 +143,7 @@ impl Submitter { SUBMISSION_RETIES ))?; - info!("Submit PoRA sucess"); + info!("Submit PoRA success"); debug!("Receipt: {:?}", receipt); Ok(()) diff --git a/node/miner/src/watcher.rs b/node/miner/src/watcher.rs index 09667f7..c6573ff 100644 --- a/node/miner/src/watcher.rs +++ b/node/miner/src/watcher.rs @@ -106,13 +106,13 @@ impl MineContextWatcher { async fn query_recent_context(&mut self) -> Result<(), String> { let context_call = self.flow_contract.make_context_with_result(); - let epoch_call = self.mine_contract.last_mined_epoch(); - let quality_call = self.mine_contract.target_quality(); + let valid_call = self.mine_contract.can_submit(); + let quality_call = self.mine_contract.pora_target(); - let (context, epoch, quality) = - try_join!(context_call.call(), epoch_call.call(), quality_call.call()) + let (context, can_submit, quality) = + try_join!(context_call.call(), valid_call.call(), quality_call.call()) .map_err(|e| format!("Failed to query mining context: {:?}", e))?; - let report = if context.epoch > epoch && context.digest != EMPTY_HASH.0 { + let report = if can_submit && context.digest != EMPTY_HASH.0 { Some((context, quality)) } else { None diff --git a/tests/mine_test.py b/tests/mine_test.py index 40a6b67..db9ca53 100755 --- a/tests/mine_test.py +++ b/tests/mine_test.py @@ -35,7 +35,7 @@ class MineTest(TestFramework): self.log.info("flow address: %s", self.contract.address()) self.log.info("mine address: %s", self.mine_contract.address()) - quality = int(2**256 / 400 / estimate_st_performance()) + quality = int(2**256 / 100 / estimate_st_performance()) self.mine_contract.set_quality(quality) self.log.info("Submit the first data chunk") @@ -49,14 +49,14 @@ class MineTest(TestFramework): self.contract.update_context() self.log.info("Wait for the first mine answer") - wait_until(lambda: self.mine_contract.last_mined_epoch() == 1, timeout=180) + wait_until(lambda: self.mine_contract.last_mined_epoch() == start_epoch + 1 and not self.mine_contract.can_submit(), timeout=180) self.log.info("Wait for the second mine context release") wait_until(lambda: int(blockchain.eth_blockNumber(), 16) >= start_epoch + 2, timeout=180) self.contract.update_context() self.log.info("Wait for the second mine answer") - wait_until(lambda: self.mine_contract.last_mined_epoch() == 2, timeout=180) + wait_until(lambda: self.mine_contract.last_mined_epoch() == start_epoch + 2 and not self.mine_contract.can_submit(), timeout=180) self.nodes[0].miner_stop() self.log.info("Wait for the third mine context release") @@ -69,7 +69,7 @@ class MineTest(TestFramework): self.nodes[0].miner_start() self.log.info("Wait for the third mine answer") - wait_until(lambda: self.mine_contract.last_mined_epoch() == 3, timeout=180) + wait_until(lambda: self.mine_contract.last_mined_epoch() == start_epoch + 3 and not self.mine_contract.can_submit(), timeout=180) if __name__ == "__main__": diff --git a/tests/mine_with_market_test.py b/tests/mine_with_market_test.py index 59d2c97..e5fa156 100755 --- a/tests/mine_with_market_test.py +++ b/tests/mine_with_market_test.py @@ -45,8 +45,8 @@ class MineTest(TestFramework): self.log.info("flow address: %s", self.contract.address()) self.log.info("mine address: %s", self.mine_contract.address()) - quality = int(2**256 / 5 / estimate_st_performance()) - self.mine_contract.set_quality(quality) + difficulty = int(2**256 / 5 / estimate_st_performance()) + self.mine_contract.set_quality(difficulty) SECTORS_PER_PRICING = int(8 * ( 2 ** 30 ) / 256) @@ -67,10 +67,10 @@ class MineTest(TestFramework): self.contract.update_context() self.log.info("Wait for mine answer") - wait_until(lambda: self.mine_contract.last_mined_epoch() == start_epoch + 1, timeout=120) + wait_until(lambda: self.mine_contract.last_mined_epoch() == start_epoch + 1 and not self.mine_contract.can_submit(), timeout=120) rewards = self.reward_contract.reward_distributes() - assert_equal(len(self.reward_contract.reward_distributes()), start_epoch + 1) + assert_equal(len(rewards), 2) firstReward = rewards[0].args.amount self.log.info("Received reward %d Gwei", firstReward / (10**9)) @@ -87,10 +87,12 @@ class MineTest(TestFramework): self.contract.update_context() self.log.info("Wait for mine answer") - wait_until(lambda: self.mine_contract.last_mined_epoch() == start_epoch + 2) + wait_until(lambda: self.mine_contract.last_mined_epoch() == start_epoch + 2 and not self.mine_contract.can_submit()) + assert_equal(self.contract.epoch(), start_epoch + 2) + rewards = self.reward_contract.reward_distributes() - assert_equal(len(self.reward_contract.reward_distributes()), start_epoch + 2) - secondReward = rewards[1].args.amount + assert_equal(len(rewards), 4) + secondReward = rewards[2].args.amount self.log.info("Received reward %d Gwei", secondReward / (10**9)) assert_greater_than(secondReward, 100 * firstReward / (start_epoch + 1)) diff --git a/tests/test_framework/blockchain_node.py b/tests/test_framework/blockchain_node.py index badb966..6f2f55e 100644 --- a/tests/test_framework/blockchain_node.py +++ b/tests/test_framework/blockchain_node.py @@ -285,18 +285,27 @@ class BlockchainNode(TestNode): def deploy_no_market(): self.log.debug("Start deploy contracts") + dummy_market_contract, _ = deploy_contract("DummyMarket", []) self.log.debug("DummyMarket deployed") + dummy_reward_contract, _ = deploy_contract("DummyReward", []) self.log.debug("DummyReward deployed") + flow_contract, _ = deploy_contract("Flow", [mine_period, 0]) self.log.debug("Flow deployed") + mine_contract, _ = deploy_contract("PoraMineTest", [0]) self.log.debug("Mine deployed") - mine_contract.functions.initialize(1, 1, flow_contract.address, dummy_reward_contract.address).transact(TX_PARAMS) + + mine_contract.functions.initialize(1, flow_contract.address, dummy_reward_contract.address).transact(TX_PARAMS) + mine_contract.functions.setDifficultyAdjustRatio(1).transact(TX_PARAMS) + mine_contract.functions.setTargetSubmissions(2).transact(TX_PARAMS) self.log.debug("Mine Initialized") + flow_initialize_hash = (flow_contract.get_function_by_signature('initialize(address)'))(dummy_market_contract.address).transact(TX_PARAMS) self.log.debug("Flow Initialized") + self.wait_for_transaction_receipt(w3, flow_initialize_hash) self.log.info("All contracts deployed") @@ -309,22 +318,33 @@ class BlockchainNode(TestNode): LIFETIME_MONTH = 1 self.log.debug("Start deploy contracts") + mine_contract, _ = deploy_contract("PoraMineTest", [0]) self.log.debug("Mine deployed") + market_contract, _ = deploy_contract("FixedPrice", []) self.log.debug("Market deployed") + reward_contract, _ =deploy_contract("OnePoolReward", [LIFETIME_MONTH]) self.log.debug("Reward deployed") + flow_contract, _ = deploy_contract("FixedPriceFlow", [mine_period, 0]) self.log.debug("Flow deployed") - mine_contract.functions.initialize(1, 1, flow_contract.address, reward_contract.address).transact(TX_PARAMS) + + mine_contract.functions.initialize(1, flow_contract.address, reward_contract.address).transact(TX_PARAMS) + mine_contract.functions.setDifficultyAdjustRatio(1).transact(TX_PARAMS) + mine_contract.functions.setTargetSubmissions(2).transact(TX_PARAMS) self.log.debug("Mine Initialized") + market_contract.functions.initialize(LIFETIME_MONTH, flow_contract.address, reward_contract.address).transact(TX_PARAMS) self.log.debug("Market Initialized") + reward_contract.functions.initialize(market_contract.address, mine_contract.address).transact(TX_PARAMS) self.log.debug("Reward Initialized") + flow_initialize_hash = (flow_contract.get_function_by_signature('initialize(address)'))(market_contract.address).transact(TX_PARAMS) self.log.debug("Flow Initialized") + self.wait_for_transaction_receipt(w3, flow_initialize_hash) self.log.info("All contracts deployed") diff --git a/tests/test_framework/contract_proxy.py b/tests/test_framework/contract_proxy.py index 272680e..aaeea94 100644 --- a/tests/test_framework/contract_proxy.py +++ b/tests/test_framework/contract_proxy.py @@ -90,6 +90,9 @@ class MineContractProxy(ContractProxy): def last_mined_epoch(self, node_idx=0): return self._call("lastMinedEpoch", node_idx) + def can_submit(self, node_idx=0): + return self._call("canSubmit", node_idx) + def set_quality(self, quality, node_idx=0): return self._send("setQuality", node_idx, _targetQuality=quality)