diff --git a/node/pruner/src/lib.rs b/node/pruner/src/lib.rs index fe4bb93..111b7d7 100644 --- a/node/pruner/src/lib.rs +++ b/node/pruner/src/lib.rs @@ -50,6 +50,7 @@ impl PrunerConfig { pub struct Pruner { config: PrunerConfig, first_rewardable_chunk: u64, + first_tx_seq: u64, store: Arc, @@ -69,15 +70,16 @@ impl Pruner { if let Some(shard_config) = get_shard_config(store.as_ref()).await? { config.shard_config = shard_config; } - let first_rewardable_chunk = get_first_rewardable_chunk(store.as_ref()) + let (first_rewardable_chunk, first_tx_seq) = get_first_rewardable_chunk(store.as_ref()) .await? - .unwrap_or(0); + .unwrap_or((0, 0)); let reward_contract = ChunkLinearReward::new(config.reward_address, Arc::new(config.make_provider()?)); let (tx, rx) = mpsc::unbounded_channel(); let pruner = Pruner { config, first_rewardable_chunk, + first_tx_seq, store, sender: tx, miner_sender, @@ -112,10 +114,19 @@ impl Pruner { ?new_first_rewardable, "first rewardable chunk moves forward, start pruning" ); + self.prune_tx( + self.first_rewardable_chunk * SECTORS_PER_PRICING as u64, + new_first_rewardable * SECTORS_PER_PRICING as u64, + ) + .await?; self.prune_in_batch(no_reward_list).await?; - self.put_first_rewardable_chunk_index(new_first_rewardable) - .await?; + self.first_rewardable_chunk = new_first_rewardable; + self.put_first_rewardable_chunk_index( + self.first_rewardable_chunk, + self.first_tx_seq, + ) + .await?; } tokio::time::sleep(self.config.check_time).await; } @@ -194,6 +205,26 @@ impl Pruner { Ok(()) } + async fn prune_tx(&mut self, start_sector: u64, end_sector: u64) -> Result<()> { + while let Some(tx) = self.store.get_tx_by_seq_number(self.first_tx_seq).await? { + // If a part of the tx data is pruned, we mark the tx as pruned. + if tx.start_entry_index() >= start_sector && tx.start_entry_index() < end_sector { + self.store.prune_tx(tx.seq).await?; + } else if tx.start_entry_index() >= end_sector { + break; + } else { + bail!( + "prune tx out of range: tx={:?}, start={} end={}", + tx, + start_sector, + end_sector + ); + } + self.first_tx_seq += 1; + } + Ok(()) + } + async fn put_shard_config(&self) -> Result<()> { if let Some(sender) = &self.miner_sender { sender.send(MinerMessage::SetShardConfig(self.config.shard_config))?; @@ -211,9 +242,13 @@ impl Pruner { async fn put_first_rewardable_chunk_index( &self, new_first_rewardable_chunk: u64, + new_first_tx_seq: u64, ) -> Result<()> { self.store - .set_config_encoded(&FIRST_REWARDABLE_CHUNK_KEY, &new_first_rewardable_chunk) + .set_config_encoded( + &FIRST_REWARDABLE_CHUNK_KEY, + &(new_first_rewardable_chunk, new_first_tx_seq), + ) .await } } @@ -222,7 +257,7 @@ async fn get_shard_config(store: &Store) -> Result> { store.get_config_decoded(&SHARD_CONFIG_KEY).await } -async fn get_first_rewardable_chunk(store: &Store) -> Result> { +async fn get_first_rewardable_chunk(store: &Store) -> Result> { store.get_config_decoded(&FIRST_REWARDABLE_CHUNK_KEY).await } diff --git a/node/rpc/src/zgs/impl.rs b/node/rpc/src/zgs/impl.rs index 017cc2b..cd765bd 100644 --- a/node/rpc/src/zgs/impl.rs +++ b/node/rpc/src/zgs/impl.rs @@ -195,6 +195,10 @@ impl RpcServerImpl { )); } + if self.ctx.log_store.check_tx_pruned(tx.seq).await? { + return Err(error::invalid_params("root", "already pruned")); + } + Ok(false) } else { //Check whether file is small enough to cache in the system diff --git a/node/storage-async/src/lib.rs b/node/storage-async/src/lib.rs index 692a725..7a3da13 100644 --- a/node/storage-async/src/lib.rs +++ b/node/storage-async/src/lib.rs @@ -45,6 +45,7 @@ impl Store { } delegate!(fn check_tx_completed(tx_seq: u64) -> Result); + delegate!(fn check_tx_pruned(tx_seq: u64) -> Result); delegate!(fn get_chunk_by_tx_and_index(tx_seq: u64, index: usize) -> Result>); delegate!(fn get_chunks_by_tx_and_index_range(tx_seq: u64, index_start: usize, index_end: usize) -> Result>); delegate!(fn get_chunks_with_proof_by_tx_and_index_range(tx_seq: u64, index_start: usize, index_end: usize, merkle_tx_seq: Option) -> Result>); @@ -53,6 +54,7 @@ impl Store { delegate!(fn put_chunks_with_tx_hash(tx_seq: u64, tx_hash: H256, chunks: ChunkArray, maybe_file_proof: Option) -> Result); delegate!(fn get_chunk_by_flow_index(index: u64, length: u64) -> Result>); delegate!(fn finalize_tx(tx_seq: u64) -> Result<()>); + delegate!(fn prune_tx(tx_seq: u64) -> Result<()>); delegate!(fn finalize_tx_with_hash(tx_seq: u64, tx_hash: H256) -> Result); delegate!(fn get_proof_at_root(root: Option, index: u64, length: u64) -> Result); delegate!(fn get_context() -> Result<(DataRoot, u64)>); diff --git a/node/storage/src/log_store/log_manager.rs b/node/storage/src/log_store/log_manager.rs index 245cb37..64ea1cf 100644 --- a/node/storage/src/log_store/log_manager.rs +++ b/node/storage/src/log_store/log_manager.rs @@ -337,6 +337,10 @@ impl LogStoreWrite for LogManager { } } + fn prune_tx(&self, tx_seq: u64) -> crate::error::Result<()> { + self.tx_store.prune_tx(tx_seq) + } + fn put_sync_progress(&self, progress: (u64, H256, Option>)) -> Result<()> { self.tx_store.put_progress(progress) } @@ -563,6 +567,10 @@ impl LogStoreRead for LogManager { merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64, )) } + + fn check_tx_pruned(&self, tx_seq: u64) -> crate::error::Result { + self.tx_store.check_tx_pruned(tx_seq) + } } impl LogManager { diff --git a/node/storage/src/log_store/mod.rs b/node/storage/src/log_store/mod.rs index 8b0f00a..a4ffb94 100644 --- a/node/storage/src/log_store/mod.rs +++ b/node/storage/src/log_store/mod.rs @@ -53,6 +53,8 @@ pub trait LogStoreRead: LogStoreChunkRead { fn check_tx_completed(&self, tx_seq: u64) -> Result; + fn check_tx_pruned(&self, tx_seq: u64) -> Result; + fn next_tx_seq(&self) -> u64; fn get_sync_progress(&self) -> Result>; @@ -118,6 +120,8 @@ pub trait LogStoreWrite: LogStoreChunkWrite { /// the caller is supposed to track chunk statuses and call this after storing all the chunks. fn finalize_tx(&self, tx_seq: u64) -> Result<()>; fn finalize_tx_with_hash(&self, tx_seq: u64, tx_hash: H256) -> Result; + /// Mark the tx as pruned, meaning the data will not be stored. + fn prune_tx(&self, tx_seq: u64) -> Result<()>; /// Store the progress of synced block number and its hash. fn put_sync_progress(&self, progress: (u64, H256, Option>)) -> Result<()>; diff --git a/node/storage/src/log_store/tx_store.rs b/node/storage/src/log_store/tx_store.rs index 4074aa1..b638726 100644 --- a/node/storage/src/log_store/tx_store.rs +++ b/node/storage/src/log_store/tx_store.rs @@ -20,6 +20,9 @@ use tracing::{error, instrument}; const LOG_SYNC_PROGRESS_KEY: &str = "log_sync_progress"; const NEXT_TX_KEY: &str = "next_tx_seq"; +const TX_STATUS_FINALIZED: u8 = 0; +const TX_STATUS_PRUNED: u8 = 1; + #[derive(Clone, Debug)] pub struct BlockHashAndSubmissionIndex { pub block_hash: H256, @@ -156,13 +159,27 @@ impl TransactionStore { #[instrument(skip(self))] pub fn finalize_tx(&self, tx_seq: u64) -> Result<()> { + Ok(self.kvdb.put( + COL_TX_COMPLETED, + &tx_seq.to_be_bytes(), + &[TX_STATUS_FINALIZED], + )?) + } + + #[instrument(skip(self))] + pub fn prune_tx(&self, tx_seq: u64) -> Result<()> { Ok(self .kvdb - .put(COL_TX_COMPLETED, &tx_seq.to_be_bytes(), &[0])?) + .put(COL_TX_COMPLETED, &tx_seq.to_be_bytes(), &[TX_STATUS_PRUNED])?) } pub fn check_tx_completed(&self, tx_seq: u64) -> Result { - Ok(self.kvdb.has_key(COL_TX_COMPLETED, &tx_seq.to_be_bytes())?) + Ok(self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())? + == Some(vec![TX_STATUS_FINALIZED])) + } + + pub fn check_tx_pruned(&self, tx_seq: u64) -> Result { + Ok(self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())? == Some(vec![TX_STATUS_PRUNED])) } pub fn next_tx_seq(&self) -> u64 { diff --git a/node/sync/src/auto_sync/batcher.rs b/node/sync/src/auto_sync/batcher.rs index 847048d..5414b1f 100644 --- a/node/sync/src/auto_sync/batcher.rs +++ b/node/sync/src/auto_sync/batcher.rs @@ -82,7 +82,9 @@ impl Batcher { async fn poll_tx(&self, tx_seq: u64) -> Result> { // file already exists - if self.store.check_tx_completed(tx_seq).await? { + if self.store.check_tx_completed(tx_seq).await? + || self.store.check_tx_pruned(tx_seq).await? + { // File may be finalized during file sync, e.g. user uploaded file via RPC. // In this case, just terminate the file sync. let num_terminated = self.terminate_file_sync(tx_seq, false).await; diff --git a/node/sync/src/service.rs b/node/sync/src/service.rs index dc917c1..3e3c24a 100644 --- a/node/sync/src/service.rs +++ b/node/sync/src/service.rs @@ -573,7 +573,9 @@ impl SyncService { async fn on_find_file(&mut self, tx_seq: u64) -> Result<()> { // file already exists - if self.store.check_tx_completed(tx_seq).await? { + if self.store.check_tx_completed(tx_seq).await? + || self.store.check_tx_pruned(tx_seq).await? + { return Ok(()); } // broadcast find file @@ -633,7 +635,9 @@ impl SyncService { }; // file already exists - if self.store.check_tx_completed(tx_seq).await? { + if self.store.check_tx_completed(tx_seq).await? + || self.store.check_tx_pruned(tx_seq).await? + { bail!("File already exists"); } @@ -705,6 +709,15 @@ impl SyncService { } } + match self.store.check_tx_pruned(tx_seq).await { + Ok(true) => return, + Ok(false) => {} + Err(err) => { + error!(%tx_seq, %err, "Failed to check if file pruned"); + return; + } + } + // Now, always sync files among all nodes if let Err(err) = self .on_start_sync_file(tx_seq, None, Some((peer_id, addr)))