diff --git a/Cargo.lock b/Cargo.lock index 46baeeb..aaca1b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5824,6 +5824,9 @@ name = "pruner" version = "0.1.0" dependencies = [ "anyhow", + "contract-interface", + "ethereum-types 0.14.1", + "ethers", "miner", "rand 0.8.5", "storage", @@ -5831,6 +5834,7 @@ dependencies = [ "task_executor", "tokio", "tracing", + "zgs_spec", ] [[package]] diff --git a/common/contract-interface/src/lib.rs b/common/contract-interface/src/lib.rs index 70a1adb..5e3badb 100644 --- a/common/contract-interface/src/lib.rs +++ b/common/contract-interface/src/lib.rs @@ -8,6 +8,12 @@ abigen!(ZgsFlow, "../../storage-contracts-abis/Flow.json"); #[cfg(not(feature = "dev"))] abigen!(PoraMine, "../../storage-contracts-abis/PoraMine.json"); +#[cfg(not(feature = "dev"))] +abigen!( + ChunkLinearReward, + "../../storage-contracts-abis/ChunkLinearReward.json" +); + #[cfg(feature = "dev")] abigen!( ZgsFlow, @@ -19,3 +25,9 @@ abigen!( PoraMine, "../../0g-storage-contracts-dev/artifacts/contracts/miner/Mine.sol/PoraMine.json" ); + +#[cfg(feature = "dev")] +abigen!( + ChunkLinearReward, + "../../0g-storage-contracts-dev/artifacts/contracts/reward/ChunkLinearReward.sol/ChunkLinearReward.json" +); diff --git a/node/pruner/Cargo.toml b/node/pruner/Cargo.toml index 7fe0e2f..5a50b72 100644 --- a/node/pruner/Cargo.toml +++ b/node/pruner/Cargo.toml @@ -11,4 +11,8 @@ anyhow = "1.0.86" tokio = "1.37.0" rand = "0.8.5" task_executor = { path = "../../common/task_executor" } -tracing = "0.1.40" \ No newline at end of file +tracing = "0.1.40" +ethereum-types = "0.14.1" +contract-interface = { path = "../../common/contract-interface" } +ethers = "^2" +zgs_spec = { path = "../../common/spec" } \ No newline at end of file diff --git a/node/pruner/src/lib.rs b/node/pruner/src/lib.rs index fa4e5a9..fe4bb93 100644 --- a/node/pruner/src/lib.rs +++ b/node/pruner/src/lib.rs @@ -1,18 +1,28 @@ -use anyhow::Result; +use anyhow::{anyhow, bail, Result}; +use contract_interface::ChunkLinearReward; +use ethereum_types::Address; +use ethers::prelude::{Http, Provider}; use miner::MinerMessage; use rand::Rng; +use std::cmp::Ordering; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use storage::config::{ShardConfig, SHARD_CONFIG_KEY}; +use storage::log_store::log_manager::PORA_CHUNK_SIZE; use storage_async::Store; use task_executor::TaskExecutor; use tokio::sync::{broadcast, mpsc}; use tracing::{debug, info}; +use zgs_spec::SECTORS_PER_PRICING; // Start pruning when the db directory size exceeds 0.9 * limit. const PRUNE_THRESHOLD: f32 = 0.9; +const FIRST_REWARDABLE_CHUNK_KEY: &str = "first_rewardable_chunk"; + +const CHUNKS_PER_PRICING: u64 = (SECTORS_PER_PRICING / PORA_CHUNK_SIZE) as u64; + #[derive(Debug)] pub struct PrunerConfig { pub shard_config: ShardConfig, @@ -21,20 +31,32 @@ pub struct PrunerConfig { pub check_time: Duration, pub batch_size: usize, pub batch_wait_time: Duration, + + pub rpc_endpoint_url: String, + pub reward_address: Address, } impl PrunerConfig { fn start_prune_size(&self) -> u64 { (self.max_num_sectors as f32 * PRUNE_THRESHOLD) as u64 } + + fn make_provider(&self) -> Result> { + Provider::::try_from(&self.rpc_endpoint_url) + .map_err(|e| anyhow!("Can not parse blockchain endpoint: {:?}", e)) + } } pub struct Pruner { config: PrunerConfig, + first_rewardable_chunk: u64, + store: Arc, sender: mpsc::UnboundedSender, miner_sender: Option>, + + reward_contract: ChunkLinearReward>, } impl Pruner { @@ -47,12 +69,19 @@ impl Pruner { if let Some(shard_config) = get_shard_config(store.as_ref()).await? { config.shard_config = shard_config; } + let first_rewardable_chunk = get_first_rewardable_chunk(store.as_ref()) + .await? + .unwrap_or(0); + let reward_contract = + ChunkLinearReward::new(config.reward_address, Arc::new(config.make_provider()?)); let (tx, rx) = mpsc::unbounded_channel(); let pruner = Pruner { config, + first_rewardable_chunk, store, sender: tx, miner_sender, + reward_contract, }; pruner.put_shard_config().await?; executor.spawn( @@ -66,20 +95,27 @@ impl Pruner { pub async fn start(mut self) -> Result<()> { loop { + // Check shard config update and prune unneeded data. if let Some(delete_list) = self.maybe_update().await? { info!(new_config = ?self.config.shard_config, "new shard config"); self.put_shard_config().await?; - let mut batch = Vec::with_capacity(self.config.batch_size); - let mut iter = delete_list.peekable(); - while let Some(index) = iter.next() { - batch.push(index); - if batch.len() == self.config.batch_size || iter.peek().is_none() { - debug!(start = batch.first(), end = batch.last(), "prune batch"); - self.store.remove_chunks_batch(&batch).await?; - batch = Vec::with_capacity(self.config.batch_size); - tokio::time::sleep(self.config.batch_wait_time).await; - } - } + self.prune_in_batch(delete_list).await?; + } + + // Check no reward chunks and prune. + let new_first_rewardable = self.reward_contract.first_rewardable_chunk().call().await?; + if let Some(no_reward_list) = self + .maybe_forward_first_rewardable(new_first_rewardable) + .await? + { + info!( + ?new_first_rewardable, + "first rewardable chunk moves forward, start pruning" + ); + self.prune_in_batch(no_reward_list).await?; + self.put_first_rewardable_chunk_index(new_first_rewardable) + .await?; + self.first_rewardable_chunk = new_first_rewardable; } tokio::time::sleep(self.config.check_time).await; } @@ -92,7 +128,9 @@ impl Pruner { config = ?self.config.shard_config, "maybe_update" ); - if current_size >= self.config.start_prune_size() { + if current_size < self.config.start_prune_size() { + Ok(None) + } else { // Update config and generate delete list should be done in a single lock to ensure // the list is complete. let config = &mut self.config.shard_config; @@ -108,13 +146,52 @@ impl Pruner { config.num_shard *= 2; // Generate delete list - let flow_len = self.store.get_context().await?.1; + let flow_len = self + .store + .get_context() + .await? + .1 + .div_ceil(PORA_CHUNK_SIZE as u64); let start_index = old_shard_id + (!rand_bit) as usize * old_num_shard; - return Ok(Some(Box::new( + Ok(Some(Box::new( (start_index as u64..flow_len).step_by(config.num_shard), - ))); + ))) } - Ok(None) + } + + async fn maybe_forward_first_rewardable( + &mut self, + new_first_rewardable: u64, + ) -> Result>>> { + match self.first_rewardable_chunk.cmp(&new_first_rewardable) { + Ordering::Less => Ok(Some(Box::new( + self.first_rewardable_chunk * CHUNKS_PER_PRICING + ..new_first_rewardable * CHUNKS_PER_PRICING, + ))), + Ordering::Equal => Ok(None), + Ordering::Greater => { + bail!( + "Unexpected first_rewardable_chunk revert: old={} new={}", + self.first_rewardable_chunk, + new_first_rewardable + ); + } + } + } + + async fn prune_in_batch(&self, to_prune: Box>) -> Result<()> { + let mut batch = Vec::with_capacity(self.config.batch_size); + let mut iter = to_prune.peekable(); + while let Some(index) = iter.next() { + batch.push(index); + if batch.len() == self.config.batch_size || iter.peek().is_none() { + debug!(start = batch.first(), end = batch.last(), "prune batch"); + self.store.remove_chunks_batch(&batch).await?; + batch = Vec::with_capacity(self.config.batch_size); + tokio::time::sleep(self.config.batch_wait_time).await; + } + } + Ok(()) } async fn put_shard_config(&self) -> Result<()> { @@ -130,12 +207,25 @@ impl Pruner { .set_config_encoded(&SHARD_CONFIG_KEY, &self.config.shard_config) .await } + + async fn put_first_rewardable_chunk_index( + &self, + new_first_rewardable_chunk: u64, + ) -> Result<()> { + self.store + .set_config_encoded(&FIRST_REWARDABLE_CHUNK_KEY, &new_first_rewardable_chunk) + .await + } } async fn get_shard_config(store: &Store) -> Result> { store.get_config_decoded(&SHARD_CONFIG_KEY).await } +async fn get_first_rewardable_chunk(store: &Store) -> Result> { + store.get_config_decoded(&FIRST_REWARDABLE_CHUNK_KEY).await +} + #[derive(Debug)] pub enum PrunerMessage { ChangeShardConfig(ShardConfig), diff --git a/node/src/config/convert.rs b/node/src/config/convert.rs index f33fdb5..eff984c 100644 --- a/node/src/config/convert.rs +++ b/node/src/config/convert.rs @@ -203,6 +203,10 @@ impl ZgsConfig { pub fn pruner_config(&self) -> Result, String> { if let Some(max_num_sectors) = self.db_max_num_sectors { let shard_config = self.shard_config()?; + let reward_address = self + .reward_contract_address + .parse::() + .map_err(|e| format!("Unable to parse reward_contract_address: {:?}", e))?; Ok(Some(PrunerConfig { shard_config, db_path: self.db_dir.clone().into(), @@ -210,6 +214,8 @@ impl ZgsConfig { check_time: Duration::from_secs(self.prune_check_time_s), batch_size: self.prune_batch_size, batch_wait_time: Duration::from_millis(self.prune_batch_wait_time_ms), + rpc_endpoint_url: self.blockchain_rpc_endpoint.clone(), + reward_address, })) } else { Ok(None) diff --git a/node/src/config/mod.rs b/node/src/config/mod.rs index 42ce4f1..715b976 100644 --- a/node/src/config/mod.rs +++ b/node/src/config/mod.rs @@ -65,7 +65,7 @@ build_config! { (db_dir, (String), "db".to_string()) (db_max_num_sectors, (Option), None) (prune_check_time_s, (u64), 60) - (prune_batch_size, (usize), 1024) + (prune_batch_size, (usize), 16 * 1024) (prune_batch_wait_time_ms, (u64), 1000) // misc @@ -79,6 +79,7 @@ build_config! { (miner_submission_gas, (Option), None) (miner_cpu_percentage, (u64), 100) (mine_iter_batch_size, (usize), 100) + (reward_contract_address, (String), "".to_string()) (shard_position, (Option), None) } diff --git a/tests/pruner_test.py b/tests/pruner_test.py index 395fc81..bac2a76 100755 --- a/tests/pruner_test.py +++ b/tests/pruner_test.py @@ -2,8 +2,10 @@ import time from test_framework.test_framework import TestFramework +from config.node_config import GENESIS_PRIV_KEY +from mine_with_market_test import PRICE_PER_SECTOR from utility.submission import create_submission, submit_data -from utility.utils import wait_until, assert_equal +from utility.utils import wait_until, assert_equal, estimate_st_performance class PrunerTest(TestFramework): @@ -13,34 +15,46 @@ class PrunerTest(TestFramework): self.num_nodes = 1 self.zgs_node_configs[0] = { "db_max_num_sectors": 16 * 1024, + # "db_max_num_sectors": 32 * 1024 * 1024, "prune_check_time_s": 1, - "prune_batch_wait_time_ms": 10, + "prune_batch_wait_time_ms": 1, } + self.enable_market = True + self.mine_period = int(45 / self.block_time) + self.lifetime_seconds = 240 + self.launch_wait_seconds = 15 + self.log.info("Contract Info: Est. block time %.2f, Mine period %d", self.block_time, self.mine_period) def run_test(self): client = self.nodes[0] chunk_data = b"\x02" * 16 * 256 * 1024 + # chunk_data = b"\x02" * 5 * 1024 * 1024 * 1024 submissions, data_root = create_submission(chunk_data) - self.contract.submit(submissions) + self.contract.submit(submissions, tx_prarams = {"value": int(len(chunk_data) / 256 * PRICE_PER_SECTOR * 1.1)}) wait_until(lambda: self.contract.num_submissions() == 1) wait_until(lambda: client.zgs_get_file_info(data_root) is not None) segment = submit_data(client, chunk_data) self.log.info("segment: %s", len(segment)) # Wait for 1 sec for the shard config to be updated - time.sleep(1) + time.sleep(2) shard_config = client.rpc.zgs_getShardConfig() shard_id = int(shard_config["shardId"]) num_shard = int(shard_config["numShard"]) + # wait_until(lambda: self.reward_contract.first_rewardable_chunk() != 0, timeout=180) + # first_rewardable = self.reward_contract.first_rewardable_chunk() * 32 * 1024 + # Wait for 1 sec for the no reward segments to be pruned. + time.sleep(1) + # Wait for chunks to be removed. for i in range(len(segment)): seg = client.zgs_download_segment(data_root, i * 1024, (i + 1) * 1024) - if i % num_shard == shard_id: - # base64 encoding size - assert_equal(len(seg), 349528) - else: + # if i < first_rewardable or i % num_shard != shard_id: + if i % num_shard != shard_id: assert_equal(seg, None) + else: + assert_equal(len(seg), 349528) if __name__ == "__main__": diff --git a/tests/shard_sync_test.py b/tests/shard_sync_test.py index d838fb5..6c1c468 100755 --- a/tests/shard_sync_test.py +++ b/tests/shard_sync_test.py @@ -23,6 +23,7 @@ class PrunerTest(TestFramework): "db_max_num_sectors": 2 ** 30, "shard_position": "1/4" } + self.enable_market = True def run_test(self): client = self.nodes[0] diff --git a/tests/test_framework/blockchain_node.py b/tests/test_framework/blockchain_node.py index 8b1b544..0fbe1a5 100644 --- a/tests/test_framework/blockchain_node.py +++ b/tests/test_framework/blockchain_node.py @@ -253,7 +253,7 @@ class BlockchainNode(TestNode): def wait_for_transaction_receipt(self, w3, tx_hash, timeout=120, parent_hash=None): return w3.eth.wait_for_transaction_receipt(tx_hash, timeout) - def setup_contract(self, enable_market, mine_period): + def setup_contract(self, enable_market, mine_period, lifetime_seconds): w3 = Web3(HTTPProvider(self.rpc_url)) account1 = w3.eth.account.from_key(GENESIS_PRIV_KEY) @@ -314,9 +314,7 @@ class BlockchainNode(TestNode): return flow_contract, flow_initialize_hash, mine_contract, dummy_reward_contract - def deploy_with_market(): - LIFETIME_MONTH = 1 - + def deploy_with_market(lifetime_seconds): self.log.debug("Start deploy contracts") mine_contract, _ = deploy_contract("PoraMineTest", [0]) @@ -325,7 +323,7 @@ class BlockchainNode(TestNode): market_contract, _ = deploy_contract("FixedPrice", []) self.log.debug("Market deployed") - reward_contract, _ =deploy_contract("ChunkLinearReward", [LIFETIME_MONTH * 31 * 86400]) + reward_contract, _ = deploy_contract("ChunkLinearReward", [lifetime_seconds]) self.log.debug("Reward deployed") flow_contract, _ = deploy_contract("FixedPriceFlow", [mine_period, 0]) @@ -336,8 +334,9 @@ class BlockchainNode(TestNode): mine_contract.functions.setTargetSubmissions(2).transact(TX_PARAMS) self.log.debug("Mine Initialized") - price_per_sector = int(LIFETIME_MONTH * 256 * 10 * 1_000_000_000_000_000_000 / 1024 / 1024 / 1024 / 12) - market_contract.functions.initialize(price_per_sector, flow_contract.address, reward_contract.address).transact(TX_PARAMS) + market_contract.functions.initialize(int(lifetime_seconds * 256 * 10 * 10 ** 18 / + 2 ** 30 / 12 / 31 / 86400), + flow_contract.address, reward_contract.address).transact(TX_PARAMS) self.log.debug("Market Initialized") reward_contract.functions.initialize(market_contract.address, mine_contract.address).transact(TX_PARAMS) @@ -353,7 +352,7 @@ class BlockchainNode(TestNode): return flow_contract, flow_initialize_hash, mine_contract, reward_contract if enable_market: - return deploy_with_market() + return deploy_with_market(lifetime_seconds) else: return deploy_no_market() diff --git a/tests/test_framework/contract_proxy.py b/tests/test_framework/contract_proxy.py index 8269b24..a9e05f1 100644 --- a/tests/test_framework/contract_proxy.py +++ b/tests/test_framework/contract_proxy.py @@ -17,11 +17,11 @@ class ContractProxy: else self.blockchain_nodes[node_idx].get_contract(self.contract_address) ) - def _call(self, fn_name, node_idx, **args): + def _call(self, fn_name, node_idx, *args): assert node_idx < len(self.blockchain_nodes) contract = self._get_contract(node_idx) - return getattr(contract.functions, fn_name)(**args).call() + return getattr(contract.functions, fn_name)(*args).call() def _send(self, fn_name, node_idx, **args): assert node_idx < len(self.blockchain_nodes) @@ -113,4 +113,10 @@ class RewardContractProxy(ContractProxy): return self._send_payable("donate", node_idx, value) def base_reward(self, node_idx = 0): - return self._call("baseReward", node_idx) \ No newline at end of file + return self._call("baseReward", node_idx) + + def first_rewardable_chunk(self, node_idx = 0): + return self._call("firstRewardableChunk", node_idx) + + def reward_deadline(self, node_idx = 0): + return self._call("rewardDeadline", node_idx, 0) diff --git a/tests/test_framework/test_framework.py b/tests/test_framework/test_framework.py index 17183b9..c7c4619 100644 --- a/tests/test_framework/test_framework.py +++ b/tests/test_framework/test_framework.py @@ -51,6 +51,7 @@ class TestFramework: self.block_time = blockchain_node_type.block_time() self.enable_market = False self.mine_period = 100 + self.lifetime_seconds = 3600 self.launch_wait_seconds = 1 # Set default binary path @@ -171,7 +172,7 @@ class TestFramework: wait_until(lambda: node.eth_blockNumber() is not None) wait_until(lambda: int(node.eth_blockNumber(), 16) > 0) - contract, tx_hash, mine_contract, reward_contract = self.blockchain_nodes[0].setup_contract(self.enable_market, self.mine_period) + contract, tx_hash, mine_contract, reward_contract = self.blockchain_nodes[0].setup_contract(self.enable_market, self.mine_period, self.lifetime_seconds) self.contract = FlowContractProxy(contract, self.blockchain_nodes) self.mine_contract = MineContractProxy(mine_contract, self.blockchain_nodes) self.reward_contract = RewardContractProxy(reward_contract, self.blockchain_nodes) @@ -197,6 +198,7 @@ class TestFramework: updated_config, self.contract.address(), self.mine_contract.address(), + self.reward_contract.address(), self.log, ) self.nodes.append(node) diff --git a/tests/test_framework/zgs_node.py b/tests/test_framework/zgs_node.py index 19a4256..06e3c0c 100644 --- a/tests/test_framework/zgs_node.py +++ b/tests/test_framework/zgs_node.py @@ -22,6 +22,7 @@ class ZgsNode(TestNode): updated_config, log_contract_address, mine_contract_address, + reward_contract_address, log, rpc_timeout=10, libp2p_nodes=None, @@ -43,6 +44,7 @@ class ZgsNode(TestNode): "network_libp2p_nodes": libp2p_nodes, "log_contract_address": log_contract_address, "mine_contract_address": mine_contract_address, + "reward_contract_address": reward_contract_address, "blockchain_rpc_endpoint": f"http://127.0.0.1:{blockchain_rpc_port(0)}", } # Set configs for this specific node.