From 1046fed08842dc954a471ca757847ebde9647831 Mon Sep 17 00:00:00 2001 From: peilun-conflux <48905552+peilun-conflux@users.noreply.github.com> Date: Thu, 14 Nov 2024 02:58:59 +0800 Subject: [PATCH] Wait for tx to be processed in pruner. (#267) * Wait for tx to be processed in pruner. * Put FIRST_REWARDABLE_CHUNK_KEY in data db. --- node/pruner/src/lib.rs | 37 +++++++++++++++++++++---------------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/node/pruner/src/lib.rs b/node/pruner/src/lib.rs index cbcc187..142c998 100644 --- a/node/pruner/src/lib.rs +++ b/node/pruner/src/lib.rs @@ -11,7 +11,7 @@ use std::str::FromStr; use std::sync::Arc; use std::time::Duration; use storage::config::{ShardConfig, SHARD_CONFIG_KEY}; -use storage::log_store::log_manager::{DATA_DB_KEY, FLOW_DB_KEY, PORA_CHUNK_SIZE}; +use storage::log_store::log_manager::{DATA_DB_KEY, PORA_CHUNK_SIZE}; use storage_async::Store; use task_executor::TaskExecutor; use tokio::sync::{broadcast, mpsc}; @@ -223,21 +223,26 @@ impl Pruner { } async fn prune_tx(&mut self, start_sector: u64, end_sector: u64) -> Result<()> { - while let Some(tx) = self.store.get_tx_by_seq_number(self.first_tx_seq).await? { - // If a part of the tx data is pruned, we mark the tx as pruned. - if tx.start_entry_index() >= start_sector && tx.start_entry_index() < end_sector { - self.store.prune_tx(tx.seq).await?; - } else if tx.start_entry_index() >= end_sector { - break; + loop { + if let Some(tx) = self.store.get_tx_by_seq_number(self.first_tx_seq).await? { + // If a part of the tx data is pruned, we mark the tx as pruned. + if tx.start_entry_index() >= start_sector && tx.start_entry_index() < end_sector { + self.store.prune_tx(tx.seq).await?; + } else if tx.start_entry_index() >= end_sector { + break; + } else { + bail!( + "prune tx out of range: tx={:?}, start={} end={}", + tx, + start_sector, + end_sector + ); + } + self.first_tx_seq += 1; } else { - bail!( - "prune tx out of range: tx={:?}, start={} end={}", - tx, - start_sector, - end_sector - ); + // Wait for `first_tx_seq` to be processed. + tokio::time::sleep(Duration::from_secs(60)).await; } - self.first_tx_seq += 1; } Ok(()) } @@ -265,7 +270,7 @@ impl Pruner { .set_config_encoded( &FIRST_REWARDABLE_CHUNK_KEY, &(new_first_rewardable_chunk, new_first_tx_seq), - FLOW_DB_KEY, + DATA_DB_KEY, ) .await } @@ -279,7 +284,7 @@ async fn get_shard_config(store: &Store) -> Result> { async fn get_first_rewardable_chunk(store: &Store) -> Result> { store - .get_config_decoded(&FIRST_REWARDABLE_CHUNK_KEY, FLOW_DB_KEY) + .get_config_decoded(&FIRST_REWARDABLE_CHUNK_KEY, DATA_DB_KEY) .await }