From 69be4e14126921b0c051950c68ecfe9c039a98ad Mon Sep 17 00:00:00 2001 From: Peilun Li Date: Tue, 23 Jul 2024 15:52:29 +0800 Subject: [PATCH 1/8] Prune no reward chunks. --- 0g-storage-contracts | 2 +- Cargo.lock | 3 + common/contract-interface/src/lib.rs | 12 +++ node/pruner/Cargo.toml | 5 +- node/pruner/src/lib.rs | 106 +++++++++++++++++++++++---- node/src/config/convert.rs | 6 ++ node/src/config/mod.rs | 1 + 7 files changed, 117 insertions(+), 18 deletions(-) diff --git a/0g-storage-contracts b/0g-storage-contracts index 25bc14a..8f8f906 160000 --- a/0g-storage-contracts +++ b/0g-storage-contracts @@ -1 +1 @@ -Subproject commit 25bc14a27441e8fb26e4d42d7c8c885f92d6c74a +Subproject commit 8f8f906224f966651de0b745738fb8aab4f492c4 diff --git a/Cargo.lock b/Cargo.lock index 46baeeb..05c4002 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5824,6 +5824,9 @@ name = "pruner" version = "0.1.0" dependencies = [ "anyhow", + "contract-interface", + "ethereum-types 0.14.1", + "ethers", "miner", "rand 0.8.5", "storage", diff --git a/common/contract-interface/src/lib.rs b/common/contract-interface/src/lib.rs index 10333c2..5ba6e18 100644 --- a/common/contract-interface/src/lib.rs +++ b/common/contract-interface/src/lib.rs @@ -14,6 +14,12 @@ abigen!( "../../0g-storage-contracts/artifacts/contracts/miner/Mine.sol/PoraMine.json" ); +#[cfg(not(feature = "dev"))] +abigen!( + ChunkLinearReward, + "../../0g-storage-contracts/artifacts/contracts/reward/ChunkLinearReward.sol/ChunkLinearReward.json" +); + #[cfg(feature = "dev")] abigen!( ZgsFlow, @@ -25,3 +31,9 @@ abigen!( PoraMine, "../../0g-storage-contracts-dev/artifacts/contracts/miner/Mine.sol/PoraMine.json" ); + +#[cfg(feature = "dev")] +abigen!( + ChunkLinearReward, + "../../0g-storage-contracts/artifacts/contracts/reward/ChunkLinearReward.sol/ChunkLinearReward.json" +); diff --git a/node/pruner/Cargo.toml b/node/pruner/Cargo.toml index 7fe0e2f..c9848a1 100644 --- a/node/pruner/Cargo.toml +++ b/node/pruner/Cargo.toml @@ -11,4 +11,7 @@ anyhow = "1.0.86" tokio = "1.37.0" rand = "0.8.5" task_executor = { path = "../../common/task_executor" } -tracing = "0.1.40" \ No newline at end of file +tracing = "0.1.40" +ethereum-types = "0.14.1" +contract-interface = { path = "../../common/contract-interface" } +ethers = "^2" \ No newline at end of file diff --git a/node/pruner/src/lib.rs b/node/pruner/src/lib.rs index 4ceab42..ebac666 100644 --- a/node/pruner/src/lib.rs +++ b/node/pruner/src/lib.rs @@ -1,4 +1,7 @@ -use anyhow::Result; +use anyhow::{anyhow, bail, Result}; +use contract_interface::ChunkLinearReward; +use ethereum_types::Address; +use ethers::prelude::{Http, Provider}; use miner::MinerMessage; use rand::Rng; use std::path::PathBuf; @@ -13,6 +16,8 @@ use tracing::{debug, info}; // Start pruning when the db directory size exceeds 0.9 * limit. const PRUNE_THRESHOLD: f32 = 0.9; +const FIRST_REWARDABLE_CHUNK_KEY: &str = "first_rewardable_chunk"; + #[derive(Debug)] pub struct PrunerConfig { pub shard_config: ShardConfig, @@ -21,20 +26,32 @@ pub struct PrunerConfig { pub check_time: Duration, pub batch_size: usize, pub batch_wait_time: Duration, + + pub rpc_endpoint_url: String, + pub reward_address: Address, } impl PrunerConfig { fn start_prune_size(&self) -> u64 { (self.max_num_chunks as f32 * PRUNE_THRESHOLD) as u64 } + + fn make_provider(&self) -> Result> { + Provider::::try_from(&self.rpc_endpoint_url) + .map_err(|e| anyhow!("Can not parse blockchain endpoint: {:?}", e)) + } } pub struct Pruner { config: PrunerConfig, + first_rewardable_chunk: u64, + store: Arc, sender: mpsc::UnboundedSender, miner_sender: Option>, + + reward_contract: ChunkLinearReward>, } impl Pruner { @@ -47,12 +64,19 @@ impl Pruner { if let Some(shard_config) = get_shard_config(store.as_ref()).await? { config.shard_config = shard_config; } + let first_rewardable_chunk = get_first_rewardable_chunk(store.as_ref()) + .await? + .unwrap_or(0); + let reward_contract = + ChunkLinearReward::new(config.reward_address, Arc::new(config.make_provider()?)); let (tx, rx) = mpsc::unbounded_channel(); let pruner = Pruner { config, + first_rewardable_chunk, store, sender: tx, miner_sender, + reward_contract, }; pruner.put_shard_config().await?; executor.spawn( @@ -66,20 +90,22 @@ impl Pruner { pub async fn start(mut self) -> Result<()> { loop { + // Check shard config update and prune unneeded data. if let Some(delete_list) = self.maybe_update().await? { info!(new_config = ?self.config.shard_config, "new shard config"); self.put_shard_config().await?; - let mut batch = Vec::with_capacity(self.config.batch_size); - let mut iter = delete_list.peekable(); - while let Some(index) = iter.next() { - batch.push(index); - if batch.len() == self.config.batch_size || iter.peek().is_none() { - debug!(start = batch.first(), end = batch.last(), "prune batch"); - self.store.remove_chunks_batch(&batch).await?; - batch = Vec::with_capacity(self.config.batch_size); - tokio::time::sleep(self.config.batch_wait_time).await; - } - } + self.prune_in_batch(delete_list).await?; + } + + // Check no reward chunks and prune. + let new_first_rewardable = self.reward_contract.first_rewardable_chunk().call().await?; + if let Some(no_reward_list) = self + .maybe_forward_first_rewardable(new_first_rewardable) + .await? + { + self.prune_in_batch(no_reward_list).await?; + self.put_first_rewardable_chunk_index(new_first_rewardable) + .await?; } tokio::time::sleep(self.config.check_time).await; } @@ -92,7 +118,9 @@ impl Pruner { config = ?self.config.shard_config, "maybe_update" ); - if current_size >= self.config.start_prune_size() { + if current_size < self.config.start_prune_size() { + Ok(None) + } else { // Update config and generate delete list should be done in a single lock to ensure // the list is complete. let config = &mut self.config.shard_config; @@ -110,11 +138,44 @@ impl Pruner { // Generate delete list let flow_len = self.store.get_context().await?.1; let start_index = old_shard_id + (!rand_bit) as usize * old_num_shard; - return Ok(Some(Box::new( + Ok(Some(Box::new( (start_index as u64..flow_len).step_by(config.num_shard), - ))); + ))) } - Ok(None) + } + + async fn maybe_forward_first_rewardable( + &mut self, + new_first_rewardable: u64, + ) -> Result>>> { + if self.first_rewardable_chunk == new_first_rewardable { + Ok(None) + } else if self.first_rewardable_chunk > new_first_rewardable { + bail!( + "Unexpected first_rewardable_chunk revert: old={} new={}", + self.first_rewardable_chunk, + new_first_rewardable + ); + } else { + Ok(Some(Box::new( + self.first_rewardable_chunk..new_first_rewardable, + ))) + } + } + + async fn prune_in_batch(&self, to_prune: Box>) -> Result<()> { + let mut batch = Vec::with_capacity(self.config.batch_size); + let mut iter = to_prune.peekable(); + while let Some(index) = iter.next() { + batch.push(index); + if batch.len() == self.config.batch_size || iter.peek().is_none() { + debug!(start = batch.first(), end = batch.last(), "prune batch"); + self.store.remove_chunks_batch(&batch).await?; + batch = Vec::with_capacity(self.config.batch_size); + tokio::time::sleep(self.config.batch_wait_time).await; + } + } + Ok(()) } async fn put_shard_config(&self) -> Result<()> { @@ -130,12 +191,25 @@ impl Pruner { .set_config_encoded(&SHARD_CONFIG_KEY, &self.config.shard_config) .await } + + async fn put_first_rewardable_chunk_index( + &self, + new_first_rewardable_chunk: u64, + ) -> Result<()> { + self.store + .set_config_encoded(&FIRST_REWARDABLE_CHUNK_KEY, &new_first_rewardable_chunk) + .await + } } async fn get_shard_config(store: &Store) -> Result> { store.get_config_decoded(&SHARD_CONFIG_KEY).await } +async fn get_first_rewardable_chunk(store: &Store) -> Result> { + store.get_config_decoded(&FIRST_REWARDABLE_CHUNK_KEY).await +} + #[derive(Debug)] pub enum PrunerMessage { ChangeShardConfig(ShardConfig), diff --git a/node/src/config/convert.rs b/node/src/config/convert.rs index 3f9cb36..9c8b395 100644 --- a/node/src/config/convert.rs +++ b/node/src/config/convert.rs @@ -203,6 +203,10 @@ impl ZgsConfig { pub fn pruner_config(&self) -> Result, String> { if let Some(max_num_chunks) = self.db_max_num_chunks { let shard_config = self.shard_config()?; + let reward_address = self + .reward_contract_address + .parse::() + .map_err(|e| format!("Unable to parse reward_contract_address: {:?}", e))?; Ok(Some(PrunerConfig { shard_config, db_path: self.db_dir.clone().into(), @@ -210,6 +214,8 @@ impl ZgsConfig { check_time: Duration::from_secs(self.prune_check_time_s), batch_size: self.prune_batch_size, batch_wait_time: Duration::from_millis(self.prune_batch_wait_time_ms), + rpc_endpoint_url: self.blockchain_rpc_endpoint.clone(), + reward_address, })) } else { Ok(None) diff --git a/node/src/config/mod.rs b/node/src/config/mod.rs index 5f66515..d7701b9 100644 --- a/node/src/config/mod.rs +++ b/node/src/config/mod.rs @@ -79,6 +79,7 @@ build_config! { (miner_submission_gas, (Option), None) (miner_cpu_percentage, (u64), 100) (mine_iter_batch_size, (usize), 100) + (reward_contract_address, (String), "".to_string()) (shard_position, (Option), None) } From be95796f84c1a94f3c3de5f15f01ec642826af1e Mon Sep 17 00:00:00 2001 From: Peilun Li Date: Mon, 29 Jul 2024 17:52:56 +0800 Subject: [PATCH 2/8] Add tests. --- node/pruner/src/lib.rs | 4 ++++ tests/pruner_test.py | 24 ++++++++++++++++++++++-- tests/test_framework/blockchain_node.py | 14 +++++++------- tests/test_framework/test_framework.py | 4 +++- tests/test_framework/zgs_node.py | 2 ++ 5 files changed, 38 insertions(+), 10 deletions(-) diff --git a/node/pruner/src/lib.rs b/node/pruner/src/lib.rs index ebac666..bd45f9c 100644 --- a/node/pruner/src/lib.rs +++ b/node/pruner/src/lib.rs @@ -103,6 +103,10 @@ impl Pruner { .maybe_forward_first_rewardable(new_first_rewardable) .await? { + info!( + ?new_first_rewardable, + "first rewardable chunk moves forward, start pruning" + ); self.prune_in_batch(no_reward_list).await?; self.put_first_rewardable_chunk_index(new_first_rewardable) .await?; diff --git a/tests/pruner_test.py b/tests/pruner_test.py index 81ee1e0..dd65738 100755 --- a/tests/pruner_test.py +++ b/tests/pruner_test.py @@ -2,8 +2,10 @@ import time from test_framework.test_framework import TestFramework +from config.node_config import GENESIS_PRIV_KEY +from mine_with_market_test import PRICE_PER_SECTOR from utility.submission import create_submission, submit_data -from utility.utils import wait_until, assert_equal +from utility.utils import wait_until, assert_equal, estimate_st_performance class PrunerTest(TestFramework): @@ -13,16 +15,25 @@ class PrunerTest(TestFramework): self.num_nodes = 1 self.zgs_node_configs[0] = { "db_max_num_chunks": 16 * 1024, + "miner_key": GENESIS_PRIV_KEY, "prune_check_time_s": 1, "prune_batch_wait_time_ms": 10, } + self.enable_market = True + self.mine_period = int(45 / self.block_time) + self.lifetime_seconds = 60 + self.launch_wait_seconds = 15 + self.log.info("Contract Info: Est. block time %.2f, Mine period %d", self.block_time, self.mine_period) def run_test(self): + difficulty = int(2**256 / 5 / estimate_st_performance()) + self.mine_contract.set_quality(difficulty) + client = self.nodes[0] chunk_data = b"\x02" * 16 * 256 * 1024 submissions, data_root = create_submission(chunk_data) - self.contract.submit(submissions) + self.contract.submit(submissions, tx_prarams = {"value": int(len(chunk_data) / 256 * PRICE_PER_SECTOR * 1.1)}) wait_until(lambda: self.contract.num_submissions() == 1) wait_until(lambda: client.zgs_get_file_info(data_root) is not None) @@ -42,6 +53,15 @@ class PrunerTest(TestFramework): else: assert_equal(seg, None) + wait_until(lambda: self.reward_contract.first_rewardable_chunk() != 0) + first_rewardable = self.reward_contract.first_rewardable_chunk() + for i in range(shard_id, len(segment), num_shard): + seg = client.zgs_download_segment(data_root, i * 1024, (i + 1) * 1024) + if i < first_rewardable: + assert_equal(seg, None) + else: + assert_equal(len(seg), 349528) + if __name__ == "__main__": PrunerTest().main() diff --git a/tests/test_framework/blockchain_node.py b/tests/test_framework/blockchain_node.py index 6f2f55e..799e156 100644 --- a/tests/test_framework/blockchain_node.py +++ b/tests/test_framework/blockchain_node.py @@ -253,7 +253,7 @@ class BlockchainNode(TestNode): def wait_for_transaction_receipt(self, w3, tx_hash, timeout=120, parent_hash=None): return w3.eth.wait_for_transaction_receipt(tx_hash, timeout) - def setup_contract(self, enable_market, mine_period): + def setup_contract(self, enable_market, mine_period, lifetime_seconds): w3 = Web3(HTTPProvider(self.rpc_url)) account1 = w3.eth.account.from_key(GENESIS_PRIV_KEY) @@ -314,9 +314,7 @@ class BlockchainNode(TestNode): return flow_contract, flow_initialize_hash, mine_contract, dummy_reward_contract - def deploy_with_market(): - LIFETIME_MONTH = 1 - + def deploy_with_market(lifetime_seconds): self.log.debug("Start deploy contracts") mine_contract, _ = deploy_contract("PoraMineTest", [0]) @@ -325,7 +323,7 @@ class BlockchainNode(TestNode): market_contract, _ = deploy_contract("FixedPrice", []) self.log.debug("Market deployed") - reward_contract, _ =deploy_contract("OnePoolReward", [LIFETIME_MONTH]) + reward_contract, _ = deploy_contract("ChunkLinearReward", [lifetime_seconds]) self.log.debug("Reward deployed") flow_contract, _ = deploy_contract("FixedPriceFlow", [mine_period, 0]) @@ -336,7 +334,9 @@ class BlockchainNode(TestNode): mine_contract.functions.setTargetSubmissions(2).transact(TX_PARAMS) self.log.debug("Mine Initialized") - market_contract.functions.initialize(LIFETIME_MONTH, flow_contract.address, reward_contract.address).transact(TX_PARAMS) + market_contract.functions.initialize(int(lifetime_seconds * 256 * 10 * 10 ** 18 / + 2 ** 30 / 12 / 31 / 86400), + flow_contract.address, reward_contract.address).transact(TX_PARAMS) self.log.debug("Market Initialized") reward_contract.functions.initialize(market_contract.address, mine_contract.address).transact(TX_PARAMS) @@ -351,7 +351,7 @@ class BlockchainNode(TestNode): return flow_contract, flow_initialize_hash, mine_contract, reward_contract if enable_market: - return deploy_with_market() + return deploy_with_market(lifetime_seconds) else: return deploy_no_market() diff --git a/tests/test_framework/test_framework.py b/tests/test_framework/test_framework.py index 05e53fc..f369ad1 100644 --- a/tests/test_framework/test_framework.py +++ b/tests/test_framework/test_framework.py @@ -51,6 +51,7 @@ class TestFramework: self.block_time = blockchain_node_type.block_time() self.enable_market = False self.mine_period = 100 + self.lifetime_seconds = 3600 self.launch_wait_seconds = 1 # Set default binary path @@ -171,7 +172,7 @@ class TestFramework: wait_until(lambda: node.eth_blockNumber() is not None) wait_until(lambda: int(node.eth_blockNumber(), 16) > 0) - contract, tx_hash, mine_contract, reward_contract = self.blockchain_nodes[0].setup_contract(self.enable_market, self.mine_period) + contract, tx_hash, mine_contract, reward_contract = self.blockchain_nodes[0].setup_contract(self.enable_market, self.mine_period, self.lifetime_seconds) self.contract = FlowContractProxy(contract, self.blockchain_nodes) self.mine_contract = MineContractProxy(mine_contract, self.blockchain_nodes) self.reward_contract = IRewardContractProxy(reward_contract, self.blockchain_nodes) @@ -197,6 +198,7 @@ class TestFramework: updated_config, self.contract.address(), self.mine_contract.address(), + self.reward_contract.address(), self.log, ) self.nodes.append(node) diff --git a/tests/test_framework/zgs_node.py b/tests/test_framework/zgs_node.py index 0fe7caf..f3b5f7c 100644 --- a/tests/test_framework/zgs_node.py +++ b/tests/test_framework/zgs_node.py @@ -22,6 +22,7 @@ class ZgsNode(TestNode): updated_config, log_contract_address, mine_contract_address, + reward_contract_address, log, rpc_timeout=10, libp2p_nodes=None, @@ -43,6 +44,7 @@ class ZgsNode(TestNode): "network_libp2p_nodes": libp2p_nodes, "log_contract_address": log_contract_address, "mine_contract_address": mine_contract_address, + "reward_contract_address": reward_contract_address, "blockchain_rpc_endpoint": f"http://127.0.0.1:{blockchain_rpc_port(0)}", } # Set configs for this specific node. From e0bb902194edd785b1d692286869ef0fcf598a8d Mon Sep 17 00:00:00 2001 From: Peilun Li Date: Mon, 29 Jul 2024 23:35:41 +0800 Subject: [PATCH 3/8] Fix tests. --- Cargo.lock | 1 + common/contract-interface/src/lib.rs | 4 ++-- node/pruner/Cargo.toml | 3 ++- node/pruner/src/lib.rs | 15 +++++++++++-- node/src/config/mod.rs | 2 +- tests/pruner_test.py | 31 +++++++++++--------------- tests/test_framework/contract_proxy.py | 12 +++++++--- 7 files changed, 41 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 05c4002..aaca1b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5834,6 +5834,7 @@ dependencies = [ "task_executor", "tokio", "tracing", + "zgs_spec", ] [[package]] diff --git a/common/contract-interface/src/lib.rs b/common/contract-interface/src/lib.rs index 1417dc6..5e3badb 100644 --- a/common/contract-interface/src/lib.rs +++ b/common/contract-interface/src/lib.rs @@ -11,7 +11,7 @@ abigen!(PoraMine, "../../storage-contracts-abis/PoraMine.json"); #[cfg(not(feature = "dev"))] abigen!( ChunkLinearReward, - "../../0g-storage-contracts/artifacts/contracts/reward/ChunkLinearReward.sol/ChunkLinearReward.json" + "../../storage-contracts-abis/ChunkLinearReward.json" ); #[cfg(feature = "dev")] @@ -29,5 +29,5 @@ abigen!( #[cfg(feature = "dev")] abigen!( ChunkLinearReward, - "../../0g-storage-contracts/artifacts/contracts/reward/ChunkLinearReward.sol/ChunkLinearReward.json" + "../../0g-storage-contracts-dev/artifacts/contracts/reward/ChunkLinearReward.sol/ChunkLinearReward.json" ); diff --git a/node/pruner/Cargo.toml b/node/pruner/Cargo.toml index c9848a1..5a50b72 100644 --- a/node/pruner/Cargo.toml +++ b/node/pruner/Cargo.toml @@ -14,4 +14,5 @@ task_executor = { path = "../../common/task_executor" } tracing = "0.1.40" ethereum-types = "0.14.1" contract-interface = { path = "../../common/contract-interface" } -ethers = "^2" \ No newline at end of file +ethers = "^2" +zgs_spec = { path = "../../common/spec" } \ No newline at end of file diff --git a/node/pruner/src/lib.rs b/node/pruner/src/lib.rs index bd45f9c..fb126d1 100644 --- a/node/pruner/src/lib.rs +++ b/node/pruner/src/lib.rs @@ -8,16 +8,20 @@ use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use storage::config::{ShardConfig, SHARD_CONFIG_KEY}; +use storage::log_store::log_manager::PORA_CHUNK_SIZE; use storage_async::Store; use task_executor::TaskExecutor; use tokio::sync::{broadcast, mpsc}; use tracing::{debug, info}; +use zgs_spec::SECTORS_PER_PRICING; // Start pruning when the db directory size exceeds 0.9 * limit. const PRUNE_THRESHOLD: f32 = 0.9; const FIRST_REWARDABLE_CHUNK_KEY: &str = "first_rewardable_chunk"; +const CHUNKS_PER_PRICING: u64 = (SECTORS_PER_PRICING / PORA_CHUNK_SIZE) as u64; + #[derive(Debug)] pub struct PrunerConfig { pub shard_config: ShardConfig, @@ -110,6 +114,7 @@ impl Pruner { self.prune_in_batch(no_reward_list).await?; self.put_first_rewardable_chunk_index(new_first_rewardable) .await?; + self.first_rewardable_chunk = new_first_rewardable; } tokio::time::sleep(self.config.check_time).await; } @@ -140,7 +145,12 @@ impl Pruner { config.num_shard *= 2; // Generate delete list - let flow_len = self.store.get_context().await?.1; + let flow_len = self + .store + .get_context() + .await? + .1 + .div_ceil(PORA_CHUNK_SIZE as u64); let start_index = old_shard_id + (!rand_bit) as usize * old_num_shard; Ok(Some(Box::new( (start_index as u64..flow_len).step_by(config.num_shard), @@ -162,7 +172,8 @@ impl Pruner { ); } else { Ok(Some(Box::new( - self.first_rewardable_chunk..new_first_rewardable, + self.first_rewardable_chunk * CHUNKS_PER_PRICING + ..new_first_rewardable * CHUNKS_PER_PRICING, ))) } } diff --git a/node/src/config/mod.rs b/node/src/config/mod.rs index d7701b9..713fa4a 100644 --- a/node/src/config/mod.rs +++ b/node/src/config/mod.rs @@ -65,7 +65,7 @@ build_config! { (db_dir, (String), "db".to_string()) (db_max_num_chunks, (Option), None) (prune_check_time_s, (u64), 60) - (prune_batch_size, (usize), 1024) + (prune_batch_size, (usize), 16 * 1024) (prune_batch_wait_time_ms, (u64), 1000) // misc diff --git a/tests/pruner_test.py b/tests/pruner_test.py index dd65738..21a58cc 100755 --- a/tests/pruner_test.py +++ b/tests/pruner_test.py @@ -14,24 +14,24 @@ class PrunerTest(TestFramework): self.num_blockchain_nodes = 1 self.num_nodes = 1 self.zgs_node_configs[0] = { - "db_max_num_chunks": 16 * 1024, - "miner_key": GENESIS_PRIV_KEY, + "db_max_num_chunks": 32 * 1024 * 1024, + # "miner_key": GENESIS_PRIV_KEY, "prune_check_time_s": 1, - "prune_batch_wait_time_ms": 10, + "prune_batch_wait_time_ms": 1, } self.enable_market = True self.mine_period = int(45 / self.block_time) - self.lifetime_seconds = 60 + self.lifetime_seconds = 240 self.launch_wait_seconds = 15 self.log.info("Contract Info: Est. block time %.2f, Mine period %d", self.block_time, self.mine_period) def run_test(self): - difficulty = int(2**256 / 5 / estimate_st_performance()) - self.mine_contract.set_quality(difficulty) + # difficulty = int(2**256 / 5 / estimate_st_performance()) + # self.mine_contract.set_quality(difficulty) client = self.nodes[0] - chunk_data = b"\x02" * 16 * 256 * 1024 + chunk_data = b"\x02" * 5 * 1024 * 1024 * 1024 submissions, data_root = create_submission(chunk_data) self.contract.submit(submissions, tx_prarams = {"value": int(len(chunk_data) / 256 * PRICE_PER_SECTOR * 1.1)}) wait_until(lambda: self.contract.num_submissions() == 1) @@ -45,19 +45,14 @@ class PrunerTest(TestFramework): shard_id = int(shard_config["shardId"]) num_shard = int(shard_config["numShard"]) + wait_until(lambda: self.reward_contract.first_rewardable_chunk() != 0, timeout=180) + first_rewardable = self.reward_contract.first_rewardable_chunk() * 32 * 1024 + # Wait for 1 sec for the no reward segments to be pruned. + time.sleep(1) + # Wait for chunks to be removed. for i in range(len(segment)): seg = client.zgs_download_segment(data_root, i * 1024, (i + 1) * 1024) - if i % num_shard == shard_id: - # base64 encoding size - assert_equal(len(seg), 349528) - else: - assert_equal(seg, None) - - wait_until(lambda: self.reward_contract.first_rewardable_chunk() != 0) - first_rewardable = self.reward_contract.first_rewardable_chunk() - for i in range(shard_id, len(segment), num_shard): - seg = client.zgs_download_segment(data_root, i * 1024, (i + 1) * 1024) - if i < first_rewardable: + if i < first_rewardable or i % num_shard != shard_id: assert_equal(seg, None) else: assert_equal(len(seg), 349528) diff --git a/tests/test_framework/contract_proxy.py b/tests/test_framework/contract_proxy.py index 8269b24..a9e05f1 100644 --- a/tests/test_framework/contract_proxy.py +++ b/tests/test_framework/contract_proxy.py @@ -17,11 +17,11 @@ class ContractProxy: else self.blockchain_nodes[node_idx].get_contract(self.contract_address) ) - def _call(self, fn_name, node_idx, **args): + def _call(self, fn_name, node_idx, *args): assert node_idx < len(self.blockchain_nodes) contract = self._get_contract(node_idx) - return getattr(contract.functions, fn_name)(**args).call() + return getattr(contract.functions, fn_name)(*args).call() def _send(self, fn_name, node_idx, **args): assert node_idx < len(self.blockchain_nodes) @@ -113,4 +113,10 @@ class RewardContractProxy(ContractProxy): return self._send_payable("donate", node_idx, value) def base_reward(self, node_idx = 0): - return self._call("baseReward", node_idx) \ No newline at end of file + return self._call("baseReward", node_idx) + + def first_rewardable_chunk(self, node_idx = 0): + return self._call("firstRewardableChunk", node_idx) + + def reward_deadline(self, node_idx = 0): + return self._call("rewardDeadline", node_idx, 0) From 769b6969156ee7475243c03e9e60b6ede6a38439 Mon Sep 17 00:00:00 2001 From: Peilun Li Date: Mon, 29 Jul 2024 23:57:51 +0800 Subject: [PATCH 4/8] Fix clippy. --- node/pruner/src/lib.rs | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/node/pruner/src/lib.rs b/node/pruner/src/lib.rs index cc20449..fe4bb93 100644 --- a/node/pruner/src/lib.rs +++ b/node/pruner/src/lib.rs @@ -4,6 +4,7 @@ use ethereum_types::Address; use ethers::prelude::{Http, Provider}; use miner::MinerMessage; use rand::Rng; +use std::cmp::Ordering; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; @@ -162,19 +163,19 @@ impl Pruner { &mut self, new_first_rewardable: u64, ) -> Result>>> { - if self.first_rewardable_chunk == new_first_rewardable { - Ok(None) - } else if self.first_rewardable_chunk > new_first_rewardable { - bail!( - "Unexpected first_rewardable_chunk revert: old={} new={}", - self.first_rewardable_chunk, - new_first_rewardable - ); - } else { - Ok(Some(Box::new( + match self.first_rewardable_chunk.cmp(&new_first_rewardable) { + Ordering::Less => Ok(Some(Box::new( self.first_rewardable_chunk * CHUNKS_PER_PRICING ..new_first_rewardable * CHUNKS_PER_PRICING, - ))) + ))), + Ordering::Equal => Ok(None), + Ordering::Greater => { + bail!( + "Unexpected first_rewardable_chunk revert: old={} new={}", + self.first_rewardable_chunk, + new_first_rewardable + ); + } } } From 827d31cacb60e5445fd58ca8f552848d08188546 Mon Sep 17 00:00:00 2001 From: Peilun Li Date: Tue, 30 Jul 2024 00:49:43 +0800 Subject: [PATCH 5/8] Revert test. --- tests/pruner_test.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/tests/pruner_test.py b/tests/pruner_test.py index 6804b50..42bbd4d 100755 --- a/tests/pruner_test.py +++ b/tests/pruner_test.py @@ -14,8 +14,8 @@ class PrunerTest(TestFramework): self.num_blockchain_nodes = 1 self.num_nodes = 1 self.zgs_node_configs[0] = { - "db_max_num_sectors": 32 * 1024 * 1024, - # "miner_key": GENESIS_PRIV_KEY, + "db_max_num_sectors": 16 * 1024, + # "db_max_num_sectors": 32 * 1024 * 1024, "prune_check_time_s": 1, "prune_batch_wait_time_ms": 1, } @@ -26,12 +26,10 @@ class PrunerTest(TestFramework): self.log.info("Contract Info: Est. block time %.2f, Mine period %d", self.block_time, self.mine_period) def run_test(self): - # difficulty = int(2**256 / 5 / estimate_st_performance()) - # self.mine_contract.set_quality(difficulty) - client = self.nodes[0] - chunk_data = b"\x02" * 5 * 1024 * 1024 * 1024 + chunk_data = b"\x02" * 16 * 256 * 1024 + # chunk_data = b"\x02" * 5 * 1024 * 1024 * 1024 submissions, data_root = create_submission(chunk_data) self.contract.submit(submissions, tx_prarams = {"value": int(len(chunk_data) / 256 * PRICE_PER_SECTOR * 1.1)}) wait_until(lambda: self.contract.num_submissions() == 1) @@ -45,14 +43,15 @@ class PrunerTest(TestFramework): shard_id = int(shard_config["shardId"]) num_shard = int(shard_config["numShard"]) - wait_until(lambda: self.reward_contract.first_rewardable_chunk() != 0, timeout=180) - first_rewardable = self.reward_contract.first_rewardable_chunk() * 32 * 1024 + # wait_until(lambda: self.reward_contract.first_rewardable_chunk() != 0, timeout=180) + # first_rewardable = self.reward_contract.first_rewardable_chunk() * 32 * 1024 # Wait for 1 sec for the no reward segments to be pruned. time.sleep(1) # Wait for chunks to be removed. for i in range(len(segment)): seg = client.zgs_download_segment(data_root, i * 1024, (i + 1) * 1024) - if i < first_rewardable or i % num_shard != shard_id: + # if i < first_rewardable or i % num_shard != shard_id: + if i % num_shard != shard_id: assert_equal(seg, None) else: assert_equal(len(seg), 349528) From 41ba320bcc70285e3b5be1ccafa32d653f84b263 Mon Sep 17 00:00:00 2001 From: Peilun Li Date: Tue, 30 Jul 2024 12:47:05 +0800 Subject: [PATCH 6/8] Enable market in shard_sync_test. --- tests/pruner_test.py | 2 +- tests/shard_sync_test.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/pruner_test.py b/tests/pruner_test.py index 42bbd4d..bac2a76 100755 --- a/tests/pruner_test.py +++ b/tests/pruner_test.py @@ -38,7 +38,7 @@ class PrunerTest(TestFramework): segment = submit_data(client, chunk_data) self.log.info("segment: %s", len(segment)) # Wait for 1 sec for the shard config to be updated - time.sleep(1) + time.sleep(2) shard_config = client.rpc.zgs_getShardConfig() shard_id = int(shard_config["shardId"]) num_shard = int(shard_config["numShard"]) diff --git a/tests/shard_sync_test.py b/tests/shard_sync_test.py index d838fb5..6c1c468 100755 --- a/tests/shard_sync_test.py +++ b/tests/shard_sync_test.py @@ -23,6 +23,7 @@ class PrunerTest(TestFramework): "db_max_num_sectors": 2 ** 30, "shard_position": "1/4" } + self.enable_market = True def run_test(self): client = self.nodes[0] From ce34ff8be63b2904ed755e0daa133ae21940d36d Mon Sep 17 00:00:00 2001 From: Peilun Li Date: Mon, 5 Aug 2024 13:58:48 +0800 Subject: [PATCH 7/8] Add tx prune status. --- node/pruner/src/lib.rs | 47 ++++++++++++++++++++--- node/rpc/src/zgs/impl.rs | 4 ++ node/storage-async/src/lib.rs | 2 + node/storage/src/log_store/log_manager.rs | 8 ++++ node/storage/src/log_store/mod.rs | 4 ++ node/storage/src/log_store/tx_store.rs | 21 +++++++++- node/sync/src/auto_sync/batcher.rs | 4 +- node/sync/src/service.rs | 17 +++++++- 8 files changed, 96 insertions(+), 11 deletions(-) diff --git a/node/pruner/src/lib.rs b/node/pruner/src/lib.rs index fe4bb93..111b7d7 100644 --- a/node/pruner/src/lib.rs +++ b/node/pruner/src/lib.rs @@ -50,6 +50,7 @@ impl PrunerConfig { pub struct Pruner { config: PrunerConfig, first_rewardable_chunk: u64, + first_tx_seq: u64, store: Arc, @@ -69,15 +70,16 @@ impl Pruner { if let Some(shard_config) = get_shard_config(store.as_ref()).await? { config.shard_config = shard_config; } - let first_rewardable_chunk = get_first_rewardable_chunk(store.as_ref()) + let (first_rewardable_chunk, first_tx_seq) = get_first_rewardable_chunk(store.as_ref()) .await? - .unwrap_or(0); + .unwrap_or((0, 0)); let reward_contract = ChunkLinearReward::new(config.reward_address, Arc::new(config.make_provider()?)); let (tx, rx) = mpsc::unbounded_channel(); let pruner = Pruner { config, first_rewardable_chunk, + first_tx_seq, store, sender: tx, miner_sender, @@ -112,10 +114,19 @@ impl Pruner { ?new_first_rewardable, "first rewardable chunk moves forward, start pruning" ); + self.prune_tx( + self.first_rewardable_chunk * SECTORS_PER_PRICING as u64, + new_first_rewardable * SECTORS_PER_PRICING as u64, + ) + .await?; self.prune_in_batch(no_reward_list).await?; - self.put_first_rewardable_chunk_index(new_first_rewardable) - .await?; + self.first_rewardable_chunk = new_first_rewardable; + self.put_first_rewardable_chunk_index( + self.first_rewardable_chunk, + self.first_tx_seq, + ) + .await?; } tokio::time::sleep(self.config.check_time).await; } @@ -194,6 +205,26 @@ impl Pruner { Ok(()) } + async fn prune_tx(&mut self, start_sector: u64, end_sector: u64) -> Result<()> { + while let Some(tx) = self.store.get_tx_by_seq_number(self.first_tx_seq).await? { + // If a part of the tx data is pruned, we mark the tx as pruned. + if tx.start_entry_index() >= start_sector && tx.start_entry_index() < end_sector { + self.store.prune_tx(tx.seq).await?; + } else if tx.start_entry_index() >= end_sector { + break; + } else { + bail!( + "prune tx out of range: tx={:?}, start={} end={}", + tx, + start_sector, + end_sector + ); + } + self.first_tx_seq += 1; + } + Ok(()) + } + async fn put_shard_config(&self) -> Result<()> { if let Some(sender) = &self.miner_sender { sender.send(MinerMessage::SetShardConfig(self.config.shard_config))?; @@ -211,9 +242,13 @@ impl Pruner { async fn put_first_rewardable_chunk_index( &self, new_first_rewardable_chunk: u64, + new_first_tx_seq: u64, ) -> Result<()> { self.store - .set_config_encoded(&FIRST_REWARDABLE_CHUNK_KEY, &new_first_rewardable_chunk) + .set_config_encoded( + &FIRST_REWARDABLE_CHUNK_KEY, + &(new_first_rewardable_chunk, new_first_tx_seq), + ) .await } } @@ -222,7 +257,7 @@ async fn get_shard_config(store: &Store) -> Result> { store.get_config_decoded(&SHARD_CONFIG_KEY).await } -async fn get_first_rewardable_chunk(store: &Store) -> Result> { +async fn get_first_rewardable_chunk(store: &Store) -> Result> { store.get_config_decoded(&FIRST_REWARDABLE_CHUNK_KEY).await } diff --git a/node/rpc/src/zgs/impl.rs b/node/rpc/src/zgs/impl.rs index 017cc2b..cd765bd 100644 --- a/node/rpc/src/zgs/impl.rs +++ b/node/rpc/src/zgs/impl.rs @@ -195,6 +195,10 @@ impl RpcServerImpl { )); } + if self.ctx.log_store.check_tx_pruned(tx.seq).await? { + return Err(error::invalid_params("root", "already pruned")); + } + Ok(false) } else { //Check whether file is small enough to cache in the system diff --git a/node/storage-async/src/lib.rs b/node/storage-async/src/lib.rs index 692a725..7a3da13 100644 --- a/node/storage-async/src/lib.rs +++ b/node/storage-async/src/lib.rs @@ -45,6 +45,7 @@ impl Store { } delegate!(fn check_tx_completed(tx_seq: u64) -> Result); + delegate!(fn check_tx_pruned(tx_seq: u64) -> Result); delegate!(fn get_chunk_by_tx_and_index(tx_seq: u64, index: usize) -> Result>); delegate!(fn get_chunks_by_tx_and_index_range(tx_seq: u64, index_start: usize, index_end: usize) -> Result>); delegate!(fn get_chunks_with_proof_by_tx_and_index_range(tx_seq: u64, index_start: usize, index_end: usize, merkle_tx_seq: Option) -> Result>); @@ -53,6 +54,7 @@ impl Store { delegate!(fn put_chunks_with_tx_hash(tx_seq: u64, tx_hash: H256, chunks: ChunkArray, maybe_file_proof: Option) -> Result); delegate!(fn get_chunk_by_flow_index(index: u64, length: u64) -> Result>); delegate!(fn finalize_tx(tx_seq: u64) -> Result<()>); + delegate!(fn prune_tx(tx_seq: u64) -> Result<()>); delegate!(fn finalize_tx_with_hash(tx_seq: u64, tx_hash: H256) -> Result); delegate!(fn get_proof_at_root(root: Option, index: u64, length: u64) -> Result); delegate!(fn get_context() -> Result<(DataRoot, u64)>); diff --git a/node/storage/src/log_store/log_manager.rs b/node/storage/src/log_store/log_manager.rs index 245cb37..64ea1cf 100644 --- a/node/storage/src/log_store/log_manager.rs +++ b/node/storage/src/log_store/log_manager.rs @@ -337,6 +337,10 @@ impl LogStoreWrite for LogManager { } } + fn prune_tx(&self, tx_seq: u64) -> crate::error::Result<()> { + self.tx_store.prune_tx(tx_seq) + } + fn put_sync_progress(&self, progress: (u64, H256, Option>)) -> Result<()> { self.tx_store.put_progress(progress) } @@ -563,6 +567,10 @@ impl LogStoreRead for LogManager { merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64, )) } + + fn check_tx_pruned(&self, tx_seq: u64) -> crate::error::Result { + self.tx_store.check_tx_pruned(tx_seq) + } } impl LogManager { diff --git a/node/storage/src/log_store/mod.rs b/node/storage/src/log_store/mod.rs index 8b0f00a..a4ffb94 100644 --- a/node/storage/src/log_store/mod.rs +++ b/node/storage/src/log_store/mod.rs @@ -53,6 +53,8 @@ pub trait LogStoreRead: LogStoreChunkRead { fn check_tx_completed(&self, tx_seq: u64) -> Result; + fn check_tx_pruned(&self, tx_seq: u64) -> Result; + fn next_tx_seq(&self) -> u64; fn get_sync_progress(&self) -> Result>; @@ -118,6 +120,8 @@ pub trait LogStoreWrite: LogStoreChunkWrite { /// the caller is supposed to track chunk statuses and call this after storing all the chunks. fn finalize_tx(&self, tx_seq: u64) -> Result<()>; fn finalize_tx_with_hash(&self, tx_seq: u64, tx_hash: H256) -> Result; + /// Mark the tx as pruned, meaning the data will not be stored. + fn prune_tx(&self, tx_seq: u64) -> Result<()>; /// Store the progress of synced block number and its hash. fn put_sync_progress(&self, progress: (u64, H256, Option>)) -> Result<()>; diff --git a/node/storage/src/log_store/tx_store.rs b/node/storage/src/log_store/tx_store.rs index 4074aa1..b638726 100644 --- a/node/storage/src/log_store/tx_store.rs +++ b/node/storage/src/log_store/tx_store.rs @@ -20,6 +20,9 @@ use tracing::{error, instrument}; const LOG_SYNC_PROGRESS_KEY: &str = "log_sync_progress"; const NEXT_TX_KEY: &str = "next_tx_seq"; +const TX_STATUS_FINALIZED: u8 = 0; +const TX_STATUS_PRUNED: u8 = 1; + #[derive(Clone, Debug)] pub struct BlockHashAndSubmissionIndex { pub block_hash: H256, @@ -156,13 +159,27 @@ impl TransactionStore { #[instrument(skip(self))] pub fn finalize_tx(&self, tx_seq: u64) -> Result<()> { + Ok(self.kvdb.put( + COL_TX_COMPLETED, + &tx_seq.to_be_bytes(), + &[TX_STATUS_FINALIZED], + )?) + } + + #[instrument(skip(self))] + pub fn prune_tx(&self, tx_seq: u64) -> Result<()> { Ok(self .kvdb - .put(COL_TX_COMPLETED, &tx_seq.to_be_bytes(), &[0])?) + .put(COL_TX_COMPLETED, &tx_seq.to_be_bytes(), &[TX_STATUS_PRUNED])?) } pub fn check_tx_completed(&self, tx_seq: u64) -> Result { - Ok(self.kvdb.has_key(COL_TX_COMPLETED, &tx_seq.to_be_bytes())?) + Ok(self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())? + == Some(vec![TX_STATUS_FINALIZED])) + } + + pub fn check_tx_pruned(&self, tx_seq: u64) -> Result { + Ok(self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())? == Some(vec![TX_STATUS_PRUNED])) } pub fn next_tx_seq(&self) -> u64 { diff --git a/node/sync/src/auto_sync/batcher.rs b/node/sync/src/auto_sync/batcher.rs index 847048d..5414b1f 100644 --- a/node/sync/src/auto_sync/batcher.rs +++ b/node/sync/src/auto_sync/batcher.rs @@ -82,7 +82,9 @@ impl Batcher { async fn poll_tx(&self, tx_seq: u64) -> Result> { // file already exists - if self.store.check_tx_completed(tx_seq).await? { + if self.store.check_tx_completed(tx_seq).await? + || self.store.check_tx_pruned(tx_seq).await? + { // File may be finalized during file sync, e.g. user uploaded file via RPC. // In this case, just terminate the file sync. let num_terminated = self.terminate_file_sync(tx_seq, false).await; diff --git a/node/sync/src/service.rs b/node/sync/src/service.rs index dc917c1..3e3c24a 100644 --- a/node/sync/src/service.rs +++ b/node/sync/src/service.rs @@ -573,7 +573,9 @@ impl SyncService { async fn on_find_file(&mut self, tx_seq: u64) -> Result<()> { // file already exists - if self.store.check_tx_completed(tx_seq).await? { + if self.store.check_tx_completed(tx_seq).await? + || self.store.check_tx_pruned(tx_seq).await? + { return Ok(()); } // broadcast find file @@ -633,7 +635,9 @@ impl SyncService { }; // file already exists - if self.store.check_tx_completed(tx_seq).await? { + if self.store.check_tx_completed(tx_seq).await? + || self.store.check_tx_pruned(tx_seq).await? + { bail!("File already exists"); } @@ -705,6 +709,15 @@ impl SyncService { } } + match self.store.check_tx_pruned(tx_seq).await { + Ok(true) => return, + Ok(false) => {} + Err(err) => { + error!(%tx_seq, %err, "Failed to check if file pruned"); + return; + } + } + // Now, always sync files among all nodes if let Err(err) = self .on_start_sync_file(tx_seq, None, Some((peer_id, addr))) From bff760cdf76054e973671865ac37a01770383aa2 Mon Sep 17 00:00:00 2001 From: Peilun Li Date: Mon, 5 Aug 2024 19:54:12 +0800 Subject: [PATCH 8/8] Fix tests. --- tests/shard_sync_test.py | 3 ++- tests/utility/simple_rpc_proxy.py | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/shard_sync_test.py b/tests/shard_sync_test.py index 6c1c468..bab9e8c 100755 --- a/tests/shard_sync_test.py +++ b/tests/shard_sync_test.py @@ -2,6 +2,7 @@ import time from test_framework.test_framework import TestFramework +from mine_with_market_test import PRICE_PER_SECTOR from utility.submission import create_submission, submit_data, data_to_segments from utility.utils import wait_until, assert_equal @@ -30,7 +31,7 @@ class PrunerTest(TestFramework): chunk_data = b"\x02" * 8 * 256 * 1024 submissions, data_root = create_submission(chunk_data) - self.contract.submit(submissions) + self.contract.submit(submissions, tx_prarams = {"value": int(len(chunk_data) / 256 * PRICE_PER_SECTOR * 1.1)}) wait_until(lambda: self.contract.num_submissions() == 1) wait_until(lambda: client.zgs_get_file_info(data_root) is not None) diff --git a/tests/utility/simple_rpc_proxy.py b/tests/utility/simple_rpc_proxy.py index 4694330..5b0628b 100644 --- a/tests/utility/simple_rpc_proxy.py +++ b/tests/utility/simple_rpc_proxy.py @@ -25,7 +25,7 @@ class RpcCaller: if isinstance(parsed, Ok): return parsed.result else: - print("Failed to call RPC, method = %s(%s), error = %s" % (self.method, str(*args), parsed)) + print("Failed to call RPC, method = %s(%s), error = %s" % (self.method, str(*args)[-1500:], parsed)) except Exception as ex: - print("Failed to call RPC, method = %s(%s), exception = %s" % (self.method, str(*args), ex)) + print("Failed to call RPC, method = %s(%s), exception = %s" % (self.method, str(*args)[-1500:], ex)) return None