diff --git a/0g-storage-contracts b/0g-storage-contracts index 25bc14a..8f8f906 160000 --- a/0g-storage-contracts +++ b/0g-storage-contracts @@ -1 +1 @@ -Subproject commit 25bc14a27441e8fb26e4d42d7c8c885f92d6c74a +Subproject commit 8f8f906224f966651de0b745738fb8aab4f492c4 diff --git a/Cargo.lock b/Cargo.lock index 46baeeb..05c4002 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5824,6 +5824,9 @@ name = "pruner" version = "0.1.0" dependencies = [ "anyhow", + "contract-interface", + "ethereum-types 0.14.1", + "ethers", "miner", "rand 0.8.5", "storage", diff --git a/common/contract-interface/src/lib.rs b/common/contract-interface/src/lib.rs index 10333c2..5ba6e18 100644 --- a/common/contract-interface/src/lib.rs +++ b/common/contract-interface/src/lib.rs @@ -14,6 +14,12 @@ abigen!( "../../0g-storage-contracts/artifacts/contracts/miner/Mine.sol/PoraMine.json" ); +#[cfg(not(feature = "dev"))] +abigen!( + ChunkLinearReward, + "../../0g-storage-contracts/artifacts/contracts/reward/ChunkLinearReward.sol/ChunkLinearReward.json" +); + #[cfg(feature = "dev")] abigen!( ZgsFlow, @@ -25,3 +31,9 @@ abigen!( PoraMine, "../../0g-storage-contracts-dev/artifacts/contracts/miner/Mine.sol/PoraMine.json" ); + +#[cfg(feature = "dev")] +abigen!( + ChunkLinearReward, + "../../0g-storage-contracts/artifacts/contracts/reward/ChunkLinearReward.sol/ChunkLinearReward.json" +); diff --git a/node/pruner/Cargo.toml b/node/pruner/Cargo.toml index 7fe0e2f..c9848a1 100644 --- a/node/pruner/Cargo.toml +++ b/node/pruner/Cargo.toml @@ -11,4 +11,7 @@ anyhow = "1.0.86" tokio = "1.37.0" rand = "0.8.5" task_executor = { path = "../../common/task_executor" } -tracing = "0.1.40" \ No newline at end of file +tracing = "0.1.40" +ethereum-types = "0.14.1" +contract-interface = { path = "../../common/contract-interface" } +ethers = "^2" \ No newline at end of file diff --git a/node/pruner/src/lib.rs b/node/pruner/src/lib.rs index 4ceab42..ebac666 100644 --- a/node/pruner/src/lib.rs +++ b/node/pruner/src/lib.rs @@ -1,4 +1,7 @@ -use anyhow::Result; +use anyhow::{anyhow, bail, Result}; +use contract_interface::ChunkLinearReward; +use ethereum_types::Address; +use ethers::prelude::{Http, Provider}; use miner::MinerMessage; use rand::Rng; use std::path::PathBuf; @@ -13,6 +16,8 @@ use tracing::{debug, info}; // Start pruning when the db directory size exceeds 0.9 * limit. const PRUNE_THRESHOLD: f32 = 0.9; +const FIRST_REWARDABLE_CHUNK_KEY: &str = "first_rewardable_chunk"; + #[derive(Debug)] pub struct PrunerConfig { pub shard_config: ShardConfig, @@ -21,20 +26,32 @@ pub struct PrunerConfig { pub check_time: Duration, pub batch_size: usize, pub batch_wait_time: Duration, + + pub rpc_endpoint_url: String, + pub reward_address: Address, } impl PrunerConfig { fn start_prune_size(&self) -> u64 { (self.max_num_chunks as f32 * PRUNE_THRESHOLD) as u64 } + + fn make_provider(&self) -> Result> { + Provider::::try_from(&self.rpc_endpoint_url) + .map_err(|e| anyhow!("Can not parse blockchain endpoint: {:?}", e)) + } } pub struct Pruner { config: PrunerConfig, + first_rewardable_chunk: u64, + store: Arc, sender: mpsc::UnboundedSender, miner_sender: Option>, + + reward_contract: ChunkLinearReward>, } impl Pruner { @@ -47,12 +64,19 @@ impl Pruner { if let Some(shard_config) = get_shard_config(store.as_ref()).await? { config.shard_config = shard_config; } + let first_rewardable_chunk = get_first_rewardable_chunk(store.as_ref()) + .await? + .unwrap_or(0); + let reward_contract = + ChunkLinearReward::new(config.reward_address, Arc::new(config.make_provider()?)); let (tx, rx) = mpsc::unbounded_channel(); let pruner = Pruner { config, + first_rewardable_chunk, store, sender: tx, miner_sender, + reward_contract, }; pruner.put_shard_config().await?; executor.spawn( @@ -66,20 +90,22 @@ impl Pruner { pub async fn start(mut self) -> Result<()> { loop { + // Check shard config update and prune unneeded data. if let Some(delete_list) = self.maybe_update().await? { info!(new_config = ?self.config.shard_config, "new shard config"); self.put_shard_config().await?; - let mut batch = Vec::with_capacity(self.config.batch_size); - let mut iter = delete_list.peekable(); - while let Some(index) = iter.next() { - batch.push(index); - if batch.len() == self.config.batch_size || iter.peek().is_none() { - debug!(start = batch.first(), end = batch.last(), "prune batch"); - self.store.remove_chunks_batch(&batch).await?; - batch = Vec::with_capacity(self.config.batch_size); - tokio::time::sleep(self.config.batch_wait_time).await; - } - } + self.prune_in_batch(delete_list).await?; + } + + // Check no reward chunks and prune. + let new_first_rewardable = self.reward_contract.first_rewardable_chunk().call().await?; + if let Some(no_reward_list) = self + .maybe_forward_first_rewardable(new_first_rewardable) + .await? + { + self.prune_in_batch(no_reward_list).await?; + self.put_first_rewardable_chunk_index(new_first_rewardable) + .await?; } tokio::time::sleep(self.config.check_time).await; } @@ -92,7 +118,9 @@ impl Pruner { config = ?self.config.shard_config, "maybe_update" ); - if current_size >= self.config.start_prune_size() { + if current_size < self.config.start_prune_size() { + Ok(None) + } else { // Update config and generate delete list should be done in a single lock to ensure // the list is complete. let config = &mut self.config.shard_config; @@ -110,11 +138,44 @@ impl Pruner { // Generate delete list let flow_len = self.store.get_context().await?.1; let start_index = old_shard_id + (!rand_bit) as usize * old_num_shard; - return Ok(Some(Box::new( + Ok(Some(Box::new( (start_index as u64..flow_len).step_by(config.num_shard), - ))); + ))) } - Ok(None) + } + + async fn maybe_forward_first_rewardable( + &mut self, + new_first_rewardable: u64, + ) -> Result>>> { + if self.first_rewardable_chunk == new_first_rewardable { + Ok(None) + } else if self.first_rewardable_chunk > new_first_rewardable { + bail!( + "Unexpected first_rewardable_chunk revert: old={} new={}", + self.first_rewardable_chunk, + new_first_rewardable + ); + } else { + Ok(Some(Box::new( + self.first_rewardable_chunk..new_first_rewardable, + ))) + } + } + + async fn prune_in_batch(&self, to_prune: Box>) -> Result<()> { + let mut batch = Vec::with_capacity(self.config.batch_size); + let mut iter = to_prune.peekable(); + while let Some(index) = iter.next() { + batch.push(index); + if batch.len() == self.config.batch_size || iter.peek().is_none() { + debug!(start = batch.first(), end = batch.last(), "prune batch"); + self.store.remove_chunks_batch(&batch).await?; + batch = Vec::with_capacity(self.config.batch_size); + tokio::time::sleep(self.config.batch_wait_time).await; + } + } + Ok(()) } async fn put_shard_config(&self) -> Result<()> { @@ -130,12 +191,25 @@ impl Pruner { .set_config_encoded(&SHARD_CONFIG_KEY, &self.config.shard_config) .await } + + async fn put_first_rewardable_chunk_index( + &self, + new_first_rewardable_chunk: u64, + ) -> Result<()> { + self.store + .set_config_encoded(&FIRST_REWARDABLE_CHUNK_KEY, &new_first_rewardable_chunk) + .await + } } async fn get_shard_config(store: &Store) -> Result> { store.get_config_decoded(&SHARD_CONFIG_KEY).await } +async fn get_first_rewardable_chunk(store: &Store) -> Result> { + store.get_config_decoded(&FIRST_REWARDABLE_CHUNK_KEY).await +} + #[derive(Debug)] pub enum PrunerMessage { ChangeShardConfig(ShardConfig), diff --git a/node/src/config/convert.rs b/node/src/config/convert.rs index 3f9cb36..9c8b395 100644 --- a/node/src/config/convert.rs +++ b/node/src/config/convert.rs @@ -203,6 +203,10 @@ impl ZgsConfig { pub fn pruner_config(&self) -> Result, String> { if let Some(max_num_chunks) = self.db_max_num_chunks { let shard_config = self.shard_config()?; + let reward_address = self + .reward_contract_address + .parse::() + .map_err(|e| format!("Unable to parse reward_contract_address: {:?}", e))?; Ok(Some(PrunerConfig { shard_config, db_path: self.db_dir.clone().into(), @@ -210,6 +214,8 @@ impl ZgsConfig { check_time: Duration::from_secs(self.prune_check_time_s), batch_size: self.prune_batch_size, batch_wait_time: Duration::from_millis(self.prune_batch_wait_time_ms), + rpc_endpoint_url: self.blockchain_rpc_endpoint.clone(), + reward_address, })) } else { Ok(None) diff --git a/node/src/config/mod.rs b/node/src/config/mod.rs index 5f66515..d7701b9 100644 --- a/node/src/config/mod.rs +++ b/node/src/config/mod.rs @@ -79,6 +79,7 @@ build_config! { (miner_submission_gas, (Option), None) (miner_cpu_percentage, (u64), 100) (mine_iter_batch_size, (usize), 100) + (reward_contract_address, (String), "".to_string()) (shard_position, (Option), None) }