diff --git a/Cargo.lock b/Cargo.lock index 46baeeb..aaca1b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5824,6 +5824,9 @@ name = "pruner" version = "0.1.0" dependencies = [ "anyhow", + "contract-interface", + "ethereum-types 0.14.1", + "ethers", "miner", "rand 0.8.5", "storage", @@ -5831,6 +5834,7 @@ dependencies = [ "task_executor", "tokio", "tracing", + "zgs_spec", ] [[package]] diff --git a/common/contract-interface/src/lib.rs b/common/contract-interface/src/lib.rs index 70a1adb..5e3badb 100644 --- a/common/contract-interface/src/lib.rs +++ b/common/contract-interface/src/lib.rs @@ -8,6 +8,12 @@ abigen!(ZgsFlow, "../../storage-contracts-abis/Flow.json"); #[cfg(not(feature = "dev"))] abigen!(PoraMine, "../../storage-contracts-abis/PoraMine.json"); +#[cfg(not(feature = "dev"))] +abigen!( + ChunkLinearReward, + "../../storage-contracts-abis/ChunkLinearReward.json" +); + #[cfg(feature = "dev")] abigen!( ZgsFlow, @@ -19,3 +25,9 @@ abigen!( PoraMine, "../../0g-storage-contracts-dev/artifacts/contracts/miner/Mine.sol/PoraMine.json" ); + +#[cfg(feature = "dev")] +abigen!( + ChunkLinearReward, + "../../0g-storage-contracts-dev/artifacts/contracts/reward/ChunkLinearReward.sol/ChunkLinearReward.json" +); diff --git a/node/pruner/Cargo.toml b/node/pruner/Cargo.toml index 7fe0e2f..5a50b72 100644 --- a/node/pruner/Cargo.toml +++ b/node/pruner/Cargo.toml @@ -11,4 +11,8 @@ anyhow = "1.0.86" tokio = "1.37.0" rand = "0.8.5" task_executor = { path = "../../common/task_executor" } -tracing = "0.1.40" \ No newline at end of file +tracing = "0.1.40" +ethereum-types = "0.14.1" +contract-interface = { path = "../../common/contract-interface" } +ethers = "^2" +zgs_spec = { path = "../../common/spec" } \ No newline at end of file diff --git a/node/pruner/src/lib.rs b/node/pruner/src/lib.rs index fa4e5a9..111b7d7 100644 --- a/node/pruner/src/lib.rs +++ b/node/pruner/src/lib.rs @@ -1,18 +1,28 @@ -use anyhow::Result; +use anyhow::{anyhow, bail, Result}; +use contract_interface::ChunkLinearReward; +use ethereum_types::Address; +use ethers::prelude::{Http, Provider}; use miner::MinerMessage; use rand::Rng; +use std::cmp::Ordering; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use storage::config::{ShardConfig, SHARD_CONFIG_KEY}; +use storage::log_store::log_manager::PORA_CHUNK_SIZE; use storage_async::Store; use task_executor::TaskExecutor; use tokio::sync::{broadcast, mpsc}; use tracing::{debug, info}; +use zgs_spec::SECTORS_PER_PRICING; // Start pruning when the db directory size exceeds 0.9 * limit. const PRUNE_THRESHOLD: f32 = 0.9; +const FIRST_REWARDABLE_CHUNK_KEY: &str = "first_rewardable_chunk"; + +const CHUNKS_PER_PRICING: u64 = (SECTORS_PER_PRICING / PORA_CHUNK_SIZE) as u64; + #[derive(Debug)] pub struct PrunerConfig { pub shard_config: ShardConfig, @@ -21,20 +31,33 @@ pub struct PrunerConfig { pub check_time: Duration, pub batch_size: usize, pub batch_wait_time: Duration, + + pub rpc_endpoint_url: String, + pub reward_address: Address, } impl PrunerConfig { fn start_prune_size(&self) -> u64 { (self.max_num_sectors as f32 * PRUNE_THRESHOLD) as u64 } + + fn make_provider(&self) -> Result> { + Provider::::try_from(&self.rpc_endpoint_url) + .map_err(|e| anyhow!("Can not parse blockchain endpoint: {:?}", e)) + } } pub struct Pruner { config: PrunerConfig, + first_rewardable_chunk: u64, + first_tx_seq: u64, + store: Arc, sender: mpsc::UnboundedSender, miner_sender: Option>, + + reward_contract: ChunkLinearReward>, } impl Pruner { @@ -47,12 +70,20 @@ impl Pruner { if let Some(shard_config) = get_shard_config(store.as_ref()).await? { config.shard_config = shard_config; } + let (first_rewardable_chunk, first_tx_seq) = get_first_rewardable_chunk(store.as_ref()) + .await? + .unwrap_or((0, 0)); + let reward_contract = + ChunkLinearReward::new(config.reward_address, Arc::new(config.make_provider()?)); let (tx, rx) = mpsc::unbounded_channel(); let pruner = Pruner { config, + first_rewardable_chunk, + first_tx_seq, store, sender: tx, miner_sender, + reward_contract, }; pruner.put_shard_config().await?; executor.spawn( @@ -66,20 +97,36 @@ impl Pruner { pub async fn start(mut self) -> Result<()> { loop { + // Check shard config update and prune unneeded data. if let Some(delete_list) = self.maybe_update().await? { info!(new_config = ?self.config.shard_config, "new shard config"); self.put_shard_config().await?; - let mut batch = Vec::with_capacity(self.config.batch_size); - let mut iter = delete_list.peekable(); - while let Some(index) = iter.next() { - batch.push(index); - if batch.len() == self.config.batch_size || iter.peek().is_none() { - debug!(start = batch.first(), end = batch.last(), "prune batch"); - self.store.remove_chunks_batch(&batch).await?; - batch = Vec::with_capacity(self.config.batch_size); - tokio::time::sleep(self.config.batch_wait_time).await; - } - } + self.prune_in_batch(delete_list).await?; + } + + // Check no reward chunks and prune. + let new_first_rewardable = self.reward_contract.first_rewardable_chunk().call().await?; + if let Some(no_reward_list) = self + .maybe_forward_first_rewardable(new_first_rewardable) + .await? + { + info!( + ?new_first_rewardable, + "first rewardable chunk moves forward, start pruning" + ); + self.prune_tx( + self.first_rewardable_chunk * SECTORS_PER_PRICING as u64, + new_first_rewardable * SECTORS_PER_PRICING as u64, + ) + .await?; + self.prune_in_batch(no_reward_list).await?; + + self.first_rewardable_chunk = new_first_rewardable; + self.put_first_rewardable_chunk_index( + self.first_rewardable_chunk, + self.first_tx_seq, + ) + .await?; } tokio::time::sleep(self.config.check_time).await; } @@ -92,7 +139,9 @@ impl Pruner { config = ?self.config.shard_config, "maybe_update" ); - if current_size >= self.config.start_prune_size() { + if current_size < self.config.start_prune_size() { + Ok(None) + } else { // Update config and generate delete list should be done in a single lock to ensure // the list is complete. let config = &mut self.config.shard_config; @@ -108,13 +157,72 @@ impl Pruner { config.num_shard *= 2; // Generate delete list - let flow_len = self.store.get_context().await?.1; + let flow_len = self + .store + .get_context() + .await? + .1 + .div_ceil(PORA_CHUNK_SIZE as u64); let start_index = old_shard_id + (!rand_bit) as usize * old_num_shard; - return Ok(Some(Box::new( + Ok(Some(Box::new( (start_index as u64..flow_len).step_by(config.num_shard), - ))); + ))) } - Ok(None) + } + + async fn maybe_forward_first_rewardable( + &mut self, + new_first_rewardable: u64, + ) -> Result>>> { + match self.first_rewardable_chunk.cmp(&new_first_rewardable) { + Ordering::Less => Ok(Some(Box::new( + self.first_rewardable_chunk * CHUNKS_PER_PRICING + ..new_first_rewardable * CHUNKS_PER_PRICING, + ))), + Ordering::Equal => Ok(None), + Ordering::Greater => { + bail!( + "Unexpected first_rewardable_chunk revert: old={} new={}", + self.first_rewardable_chunk, + new_first_rewardable + ); + } + } + } + + async fn prune_in_batch(&self, to_prune: Box>) -> Result<()> { + let mut batch = Vec::with_capacity(self.config.batch_size); + let mut iter = to_prune.peekable(); + while let Some(index) = iter.next() { + batch.push(index); + if batch.len() == self.config.batch_size || iter.peek().is_none() { + debug!(start = batch.first(), end = batch.last(), "prune batch"); + self.store.remove_chunks_batch(&batch).await?; + batch = Vec::with_capacity(self.config.batch_size); + tokio::time::sleep(self.config.batch_wait_time).await; + } + } + Ok(()) + } + + async fn prune_tx(&mut self, start_sector: u64, end_sector: u64) -> Result<()> { + while let Some(tx) = self.store.get_tx_by_seq_number(self.first_tx_seq).await? { + // If a part of the tx data is pruned, we mark the tx as pruned. + if tx.start_entry_index() >= start_sector && tx.start_entry_index() < end_sector { + self.store.prune_tx(tx.seq).await?; + } else if tx.start_entry_index() >= end_sector { + break; + } else { + bail!( + "prune tx out of range: tx={:?}, start={} end={}", + tx, + start_sector, + end_sector + ); + } + self.first_tx_seq += 1; + } + Ok(()) } async fn put_shard_config(&self) -> Result<()> { @@ -130,12 +238,29 @@ impl Pruner { .set_config_encoded(&SHARD_CONFIG_KEY, &self.config.shard_config) .await } + + async fn put_first_rewardable_chunk_index( + &self, + new_first_rewardable_chunk: u64, + new_first_tx_seq: u64, + ) -> Result<()> { + self.store + .set_config_encoded( + &FIRST_REWARDABLE_CHUNK_KEY, + &(new_first_rewardable_chunk, new_first_tx_seq), + ) + .await + } } async fn get_shard_config(store: &Store) -> Result> { store.get_config_decoded(&SHARD_CONFIG_KEY).await } +async fn get_first_rewardable_chunk(store: &Store) -> Result> { + store.get_config_decoded(&FIRST_REWARDABLE_CHUNK_KEY).await +} + #[derive(Debug)] pub enum PrunerMessage { ChangeShardConfig(ShardConfig), diff --git a/node/rpc/src/zgs/impl.rs b/node/rpc/src/zgs/impl.rs index 017cc2b..cd765bd 100644 --- a/node/rpc/src/zgs/impl.rs +++ b/node/rpc/src/zgs/impl.rs @@ -195,6 +195,10 @@ impl RpcServerImpl { )); } + if self.ctx.log_store.check_tx_pruned(tx.seq).await? { + return Err(error::invalid_params("root", "already pruned")); + } + Ok(false) } else { //Check whether file is small enough to cache in the system diff --git a/node/src/config/convert.rs b/node/src/config/convert.rs index f33fdb5..eff984c 100644 --- a/node/src/config/convert.rs +++ b/node/src/config/convert.rs @@ -203,6 +203,10 @@ impl ZgsConfig { pub fn pruner_config(&self) -> Result, String> { if let Some(max_num_sectors) = self.db_max_num_sectors { let shard_config = self.shard_config()?; + let reward_address = self + .reward_contract_address + .parse::() + .map_err(|e| format!("Unable to parse reward_contract_address: {:?}", e))?; Ok(Some(PrunerConfig { shard_config, db_path: self.db_dir.clone().into(), @@ -210,6 +214,8 @@ impl ZgsConfig { check_time: Duration::from_secs(self.prune_check_time_s), batch_size: self.prune_batch_size, batch_wait_time: Duration::from_millis(self.prune_batch_wait_time_ms), + rpc_endpoint_url: self.blockchain_rpc_endpoint.clone(), + reward_address, })) } else { Ok(None) diff --git a/node/src/config/mod.rs b/node/src/config/mod.rs index 42ce4f1..715b976 100644 --- a/node/src/config/mod.rs +++ b/node/src/config/mod.rs @@ -65,7 +65,7 @@ build_config! { (db_dir, (String), "db".to_string()) (db_max_num_sectors, (Option), None) (prune_check_time_s, (u64), 60) - (prune_batch_size, (usize), 1024) + (prune_batch_size, (usize), 16 * 1024) (prune_batch_wait_time_ms, (u64), 1000) // misc @@ -79,6 +79,7 @@ build_config! { (miner_submission_gas, (Option), None) (miner_cpu_percentage, (u64), 100) (mine_iter_batch_size, (usize), 100) + (reward_contract_address, (String), "".to_string()) (shard_position, (Option), None) } diff --git a/node/storage-async/src/lib.rs b/node/storage-async/src/lib.rs index 692a725..7a3da13 100644 --- a/node/storage-async/src/lib.rs +++ b/node/storage-async/src/lib.rs @@ -45,6 +45,7 @@ impl Store { } delegate!(fn check_tx_completed(tx_seq: u64) -> Result); + delegate!(fn check_tx_pruned(tx_seq: u64) -> Result); delegate!(fn get_chunk_by_tx_and_index(tx_seq: u64, index: usize) -> Result>); delegate!(fn get_chunks_by_tx_and_index_range(tx_seq: u64, index_start: usize, index_end: usize) -> Result>); delegate!(fn get_chunks_with_proof_by_tx_and_index_range(tx_seq: u64, index_start: usize, index_end: usize, merkle_tx_seq: Option) -> Result>); @@ -53,6 +54,7 @@ impl Store { delegate!(fn put_chunks_with_tx_hash(tx_seq: u64, tx_hash: H256, chunks: ChunkArray, maybe_file_proof: Option) -> Result); delegate!(fn get_chunk_by_flow_index(index: u64, length: u64) -> Result>); delegate!(fn finalize_tx(tx_seq: u64) -> Result<()>); + delegate!(fn prune_tx(tx_seq: u64) -> Result<()>); delegate!(fn finalize_tx_with_hash(tx_seq: u64, tx_hash: H256) -> Result); delegate!(fn get_proof_at_root(root: Option, index: u64, length: u64) -> Result); delegate!(fn get_context() -> Result<(DataRoot, u64)>); diff --git a/node/storage/src/log_store/log_manager.rs b/node/storage/src/log_store/log_manager.rs index 245cb37..64ea1cf 100644 --- a/node/storage/src/log_store/log_manager.rs +++ b/node/storage/src/log_store/log_manager.rs @@ -337,6 +337,10 @@ impl LogStoreWrite for LogManager { } } + fn prune_tx(&self, tx_seq: u64) -> crate::error::Result<()> { + self.tx_store.prune_tx(tx_seq) + } + fn put_sync_progress(&self, progress: (u64, H256, Option>)) -> Result<()> { self.tx_store.put_progress(progress) } @@ -563,6 +567,10 @@ impl LogStoreRead for LogManager { merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64, )) } + + fn check_tx_pruned(&self, tx_seq: u64) -> crate::error::Result { + self.tx_store.check_tx_pruned(tx_seq) + } } impl LogManager { diff --git a/node/storage/src/log_store/mod.rs b/node/storage/src/log_store/mod.rs index 8b0f00a..a4ffb94 100644 --- a/node/storage/src/log_store/mod.rs +++ b/node/storage/src/log_store/mod.rs @@ -53,6 +53,8 @@ pub trait LogStoreRead: LogStoreChunkRead { fn check_tx_completed(&self, tx_seq: u64) -> Result; + fn check_tx_pruned(&self, tx_seq: u64) -> Result; + fn next_tx_seq(&self) -> u64; fn get_sync_progress(&self) -> Result>; @@ -118,6 +120,8 @@ pub trait LogStoreWrite: LogStoreChunkWrite { /// the caller is supposed to track chunk statuses and call this after storing all the chunks. fn finalize_tx(&self, tx_seq: u64) -> Result<()>; fn finalize_tx_with_hash(&self, tx_seq: u64, tx_hash: H256) -> Result; + /// Mark the tx as pruned, meaning the data will not be stored. + fn prune_tx(&self, tx_seq: u64) -> Result<()>; /// Store the progress of synced block number and its hash. fn put_sync_progress(&self, progress: (u64, H256, Option>)) -> Result<()>; diff --git a/node/storage/src/log_store/tx_store.rs b/node/storage/src/log_store/tx_store.rs index 4074aa1..b638726 100644 --- a/node/storage/src/log_store/tx_store.rs +++ b/node/storage/src/log_store/tx_store.rs @@ -20,6 +20,9 @@ use tracing::{error, instrument}; const LOG_SYNC_PROGRESS_KEY: &str = "log_sync_progress"; const NEXT_TX_KEY: &str = "next_tx_seq"; +const TX_STATUS_FINALIZED: u8 = 0; +const TX_STATUS_PRUNED: u8 = 1; + #[derive(Clone, Debug)] pub struct BlockHashAndSubmissionIndex { pub block_hash: H256, @@ -156,13 +159,27 @@ impl TransactionStore { #[instrument(skip(self))] pub fn finalize_tx(&self, tx_seq: u64) -> Result<()> { + Ok(self.kvdb.put( + COL_TX_COMPLETED, + &tx_seq.to_be_bytes(), + &[TX_STATUS_FINALIZED], + )?) + } + + #[instrument(skip(self))] + pub fn prune_tx(&self, tx_seq: u64) -> Result<()> { Ok(self .kvdb - .put(COL_TX_COMPLETED, &tx_seq.to_be_bytes(), &[0])?) + .put(COL_TX_COMPLETED, &tx_seq.to_be_bytes(), &[TX_STATUS_PRUNED])?) } pub fn check_tx_completed(&self, tx_seq: u64) -> Result { - Ok(self.kvdb.has_key(COL_TX_COMPLETED, &tx_seq.to_be_bytes())?) + Ok(self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())? + == Some(vec![TX_STATUS_FINALIZED])) + } + + pub fn check_tx_pruned(&self, tx_seq: u64) -> Result { + Ok(self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())? == Some(vec![TX_STATUS_PRUNED])) } pub fn next_tx_seq(&self) -> u64 { diff --git a/node/sync/src/auto_sync/batcher.rs b/node/sync/src/auto_sync/batcher.rs index 0fcea06..e8c482f 100644 --- a/node/sync/src/auto_sync/batcher.rs +++ b/node/sync/src/auto_sync/batcher.rs @@ -87,7 +87,9 @@ impl Batcher { async fn poll_tx(&self, tx_seq: u64) -> Result> { // file already exists - if self.store.check_tx_completed(tx_seq).await? { + if self.store.check_tx_completed(tx_seq).await? + || self.store.check_tx_pruned(tx_seq).await? + { // File may be finalized during file sync, e.g. user uploaded file via RPC. // In this case, just terminate the file sync. let num_terminated = self.terminate_file_sync(tx_seq, false).await; diff --git a/node/sync/src/service.rs b/node/sync/src/service.rs index f38c827..2daeab0 100644 --- a/node/sync/src/service.rs +++ b/node/sync/src/service.rs @@ -571,7 +571,9 @@ impl SyncService { async fn on_find_file(&mut self, tx_seq: u64) -> Result<()> { // file already exists - if self.store.check_tx_completed(tx_seq).await? { + if self.store.check_tx_completed(tx_seq).await? + || self.store.check_tx_pruned(tx_seq).await? + { return Ok(()); } // broadcast find file @@ -631,7 +633,9 @@ impl SyncService { }; // file already exists - if self.store.check_tx_completed(tx_seq).await? { + if self.store.check_tx_completed(tx_seq).await? + || self.store.check_tx_pruned(tx_seq).await? + { bail!("File already exists"); } @@ -703,6 +707,15 @@ impl SyncService { } } + match self.store.check_tx_pruned(tx_seq).await { + Ok(true) => return, + Ok(false) => {} + Err(err) => { + error!(%tx_seq, %err, "Failed to check if file pruned"); + return; + } + } + // Now, always sync files among all nodes if let Err(err) = self .on_start_sync_file(tx_seq, None, Some((peer_id, addr))) diff --git a/tests/pruner_test.py b/tests/pruner_test.py index 395fc81..bac2a76 100755 --- a/tests/pruner_test.py +++ b/tests/pruner_test.py @@ -2,8 +2,10 @@ import time from test_framework.test_framework import TestFramework +from config.node_config import GENESIS_PRIV_KEY +from mine_with_market_test import PRICE_PER_SECTOR from utility.submission import create_submission, submit_data -from utility.utils import wait_until, assert_equal +from utility.utils import wait_until, assert_equal, estimate_st_performance class PrunerTest(TestFramework): @@ -13,34 +15,46 @@ class PrunerTest(TestFramework): self.num_nodes = 1 self.zgs_node_configs[0] = { "db_max_num_sectors": 16 * 1024, + # "db_max_num_sectors": 32 * 1024 * 1024, "prune_check_time_s": 1, - "prune_batch_wait_time_ms": 10, + "prune_batch_wait_time_ms": 1, } + self.enable_market = True + self.mine_period = int(45 / self.block_time) + self.lifetime_seconds = 240 + self.launch_wait_seconds = 15 + self.log.info("Contract Info: Est. block time %.2f, Mine period %d", self.block_time, self.mine_period) def run_test(self): client = self.nodes[0] chunk_data = b"\x02" * 16 * 256 * 1024 + # chunk_data = b"\x02" * 5 * 1024 * 1024 * 1024 submissions, data_root = create_submission(chunk_data) - self.contract.submit(submissions) + self.contract.submit(submissions, tx_prarams = {"value": int(len(chunk_data) / 256 * PRICE_PER_SECTOR * 1.1)}) wait_until(lambda: self.contract.num_submissions() == 1) wait_until(lambda: client.zgs_get_file_info(data_root) is not None) segment = submit_data(client, chunk_data) self.log.info("segment: %s", len(segment)) # Wait for 1 sec for the shard config to be updated - time.sleep(1) + time.sleep(2) shard_config = client.rpc.zgs_getShardConfig() shard_id = int(shard_config["shardId"]) num_shard = int(shard_config["numShard"]) + # wait_until(lambda: self.reward_contract.first_rewardable_chunk() != 0, timeout=180) + # first_rewardable = self.reward_contract.first_rewardable_chunk() * 32 * 1024 + # Wait for 1 sec for the no reward segments to be pruned. + time.sleep(1) + # Wait for chunks to be removed. for i in range(len(segment)): seg = client.zgs_download_segment(data_root, i * 1024, (i + 1) * 1024) - if i % num_shard == shard_id: - # base64 encoding size - assert_equal(len(seg), 349528) - else: + # if i < first_rewardable or i % num_shard != shard_id: + if i % num_shard != shard_id: assert_equal(seg, None) + else: + assert_equal(len(seg), 349528) if __name__ == "__main__": diff --git a/tests/shard_sync_test.py b/tests/shard_sync_test.py index d838fb5..bab9e8c 100755 --- a/tests/shard_sync_test.py +++ b/tests/shard_sync_test.py @@ -2,6 +2,7 @@ import time from test_framework.test_framework import TestFramework +from mine_with_market_test import PRICE_PER_SECTOR from utility.submission import create_submission, submit_data, data_to_segments from utility.utils import wait_until, assert_equal @@ -23,13 +24,14 @@ class PrunerTest(TestFramework): "db_max_num_sectors": 2 ** 30, "shard_position": "1/4" } + self.enable_market = True def run_test(self): client = self.nodes[0] chunk_data = b"\x02" * 8 * 256 * 1024 submissions, data_root = create_submission(chunk_data) - self.contract.submit(submissions) + self.contract.submit(submissions, tx_prarams = {"value": int(len(chunk_data) / 256 * PRICE_PER_SECTOR * 1.1)}) wait_until(lambda: self.contract.num_submissions() == 1) wait_until(lambda: client.zgs_get_file_info(data_root) is not None) diff --git a/tests/test_framework/blockchain_node.py b/tests/test_framework/blockchain_node.py index 8b1b544..0fbe1a5 100644 --- a/tests/test_framework/blockchain_node.py +++ b/tests/test_framework/blockchain_node.py @@ -253,7 +253,7 @@ class BlockchainNode(TestNode): def wait_for_transaction_receipt(self, w3, tx_hash, timeout=120, parent_hash=None): return w3.eth.wait_for_transaction_receipt(tx_hash, timeout) - def setup_contract(self, enable_market, mine_period): + def setup_contract(self, enable_market, mine_period, lifetime_seconds): w3 = Web3(HTTPProvider(self.rpc_url)) account1 = w3.eth.account.from_key(GENESIS_PRIV_KEY) @@ -314,9 +314,7 @@ class BlockchainNode(TestNode): return flow_contract, flow_initialize_hash, mine_contract, dummy_reward_contract - def deploy_with_market(): - LIFETIME_MONTH = 1 - + def deploy_with_market(lifetime_seconds): self.log.debug("Start deploy contracts") mine_contract, _ = deploy_contract("PoraMineTest", [0]) @@ -325,7 +323,7 @@ class BlockchainNode(TestNode): market_contract, _ = deploy_contract("FixedPrice", []) self.log.debug("Market deployed") - reward_contract, _ =deploy_contract("ChunkLinearReward", [LIFETIME_MONTH * 31 * 86400]) + reward_contract, _ = deploy_contract("ChunkLinearReward", [lifetime_seconds]) self.log.debug("Reward deployed") flow_contract, _ = deploy_contract("FixedPriceFlow", [mine_period, 0]) @@ -336,8 +334,9 @@ class BlockchainNode(TestNode): mine_contract.functions.setTargetSubmissions(2).transact(TX_PARAMS) self.log.debug("Mine Initialized") - price_per_sector = int(LIFETIME_MONTH * 256 * 10 * 1_000_000_000_000_000_000 / 1024 / 1024 / 1024 / 12) - market_contract.functions.initialize(price_per_sector, flow_contract.address, reward_contract.address).transact(TX_PARAMS) + market_contract.functions.initialize(int(lifetime_seconds * 256 * 10 * 10 ** 18 / + 2 ** 30 / 12 / 31 / 86400), + flow_contract.address, reward_contract.address).transact(TX_PARAMS) self.log.debug("Market Initialized") reward_contract.functions.initialize(market_contract.address, mine_contract.address).transact(TX_PARAMS) @@ -353,7 +352,7 @@ class BlockchainNode(TestNode): return flow_contract, flow_initialize_hash, mine_contract, reward_contract if enable_market: - return deploy_with_market() + return deploy_with_market(lifetime_seconds) else: return deploy_no_market() diff --git a/tests/test_framework/contract_proxy.py b/tests/test_framework/contract_proxy.py index 8269b24..a9e05f1 100644 --- a/tests/test_framework/contract_proxy.py +++ b/tests/test_framework/contract_proxy.py @@ -17,11 +17,11 @@ class ContractProxy: else self.blockchain_nodes[node_idx].get_contract(self.contract_address) ) - def _call(self, fn_name, node_idx, **args): + def _call(self, fn_name, node_idx, *args): assert node_idx < len(self.blockchain_nodes) contract = self._get_contract(node_idx) - return getattr(contract.functions, fn_name)(**args).call() + return getattr(contract.functions, fn_name)(*args).call() def _send(self, fn_name, node_idx, **args): assert node_idx < len(self.blockchain_nodes) @@ -113,4 +113,10 @@ class RewardContractProxy(ContractProxy): return self._send_payable("donate", node_idx, value) def base_reward(self, node_idx = 0): - return self._call("baseReward", node_idx) \ No newline at end of file + return self._call("baseReward", node_idx) + + def first_rewardable_chunk(self, node_idx = 0): + return self._call("firstRewardableChunk", node_idx) + + def reward_deadline(self, node_idx = 0): + return self._call("rewardDeadline", node_idx, 0) diff --git a/tests/test_framework/test_framework.py b/tests/test_framework/test_framework.py index 17183b9..c7c4619 100644 --- a/tests/test_framework/test_framework.py +++ b/tests/test_framework/test_framework.py @@ -51,6 +51,7 @@ class TestFramework: self.block_time = blockchain_node_type.block_time() self.enable_market = False self.mine_period = 100 + self.lifetime_seconds = 3600 self.launch_wait_seconds = 1 # Set default binary path @@ -171,7 +172,7 @@ class TestFramework: wait_until(lambda: node.eth_blockNumber() is not None) wait_until(lambda: int(node.eth_blockNumber(), 16) > 0) - contract, tx_hash, mine_contract, reward_contract = self.blockchain_nodes[0].setup_contract(self.enable_market, self.mine_period) + contract, tx_hash, mine_contract, reward_contract = self.blockchain_nodes[0].setup_contract(self.enable_market, self.mine_period, self.lifetime_seconds) self.contract = FlowContractProxy(contract, self.blockchain_nodes) self.mine_contract = MineContractProxy(mine_contract, self.blockchain_nodes) self.reward_contract = RewardContractProxy(reward_contract, self.blockchain_nodes) @@ -197,6 +198,7 @@ class TestFramework: updated_config, self.contract.address(), self.mine_contract.address(), + self.reward_contract.address(), self.log, ) self.nodes.append(node) diff --git a/tests/test_framework/zgs_node.py b/tests/test_framework/zgs_node.py index 19a4256..06e3c0c 100644 --- a/tests/test_framework/zgs_node.py +++ b/tests/test_framework/zgs_node.py @@ -22,6 +22,7 @@ class ZgsNode(TestNode): updated_config, log_contract_address, mine_contract_address, + reward_contract_address, log, rpc_timeout=10, libp2p_nodes=None, @@ -43,6 +44,7 @@ class ZgsNode(TestNode): "network_libp2p_nodes": libp2p_nodes, "log_contract_address": log_contract_address, "mine_contract_address": mine_contract_address, + "reward_contract_address": reward_contract_address, "blockchain_rpc_endpoint": f"http://127.0.0.1:{blockchain_rpc_port(0)}", } # Set configs for this specific node. diff --git a/tests/utility/simple_rpc_proxy.py b/tests/utility/simple_rpc_proxy.py index 4694330..5b0628b 100644 --- a/tests/utility/simple_rpc_proxy.py +++ b/tests/utility/simple_rpc_proxy.py @@ -25,7 +25,7 @@ class RpcCaller: if isinstance(parsed, Ok): return parsed.result else: - print("Failed to call RPC, method = %s(%s), error = %s" % (self.method, str(*args), parsed)) + print("Failed to call RPC, method = %s(%s), error = %s" % (self.method, str(*args)[-1500:], parsed)) except Exception as ex: - print("Failed to call RPC, method = %s(%s), exception = %s" % (self.method, str(*args), ex)) + print("Failed to call RPC, method = %s(%s), exception = %s" % (self.method, str(*args)[-1500:], ex)) return None