From a3a1f3f865433a12a5f915ef818cbd37b163a591 Mon Sep 17 00:00:00 2001 From: Peter Zhang Date: Wed, 20 Nov 2024 16:02:11 +0800 Subject: [PATCH] fix tx store --- node/storage/src/log_store/log_manager.rs | 2 +- node/storage/src/log_store/tx_store.rs | 59 +++++++++++++---------- 2 files changed, 35 insertions(+), 26 deletions(-) diff --git a/node/storage/src/log_store/log_manager.rs b/node/storage/src/log_store/log_manager.rs index 1c1e27a..516092e 100644 --- a/node/storage/src/log_store/log_manager.rs +++ b/node/storage/src/log_store/log_manager.rs @@ -700,7 +700,7 @@ impl LogManager { data_db_source: Arc, config: LogConfig, ) -> Result { - let tx_store = TransactionStore::new(data_db_source.clone())?; + let tx_store = TransactionStore::new(flow_db_source.clone(), data_db_source.clone())?; let flow_db = Arc::new(FlowDBStore::new(flow_db_source.clone())); let data_db = Arc::new(FlowDBStore::new(data_db_source.clone())); let flow_store = Arc::new(FlowStore::new( diff --git a/node/storage/src/log_store/tx_store.rs b/node/storage/src/log_store/tx_store.rs index 93fd701..45cb7b5 100644 --- a/node/storage/src/log_store/tx_store.rs +++ b/node/storage/src/log_store/tx_store.rs @@ -57,19 +57,24 @@ pub struct BlockHashAndSubmissionIndex { } pub struct TransactionStore { - kvdb: Arc, + flow_kvdb: Arc, + data_kvdb: Arc, /// This is always updated before writing the database to ensure no intermediate states. next_tx_seq: AtomicU64, } impl TransactionStore { - pub fn new(kvdb: Arc) -> Result { - let next_tx_seq = kvdb + pub fn new( + flow_kvdb: Arc, + data_kvdb: Arc, + ) -> Result { + let next_tx_seq = flow_kvdb .get(COL_TX, NEXT_TX_KEY.as_bytes())? .map(|a| decode_tx_seq(&a)) .unwrap_or(Ok(0))?; Ok(Self { - kvdb, + flow_kvdb, + data_kvdb, next_tx_seq: AtomicU64::new(next_tx_seq), }) } @@ -86,7 +91,7 @@ impl TransactionStore { return Ok(old_tx_seq_list); } - let mut db_tx = self.kvdb.transaction(); + let mut db_tx = self.flow_kvdb.transaction(); if !tx.data.is_empty() { tx.size = tx.data.len() as u64; let mut padded_data = tx.data.clone(); @@ -113,7 +118,7 @@ impl TransactionStore { &new_tx_seq_list.as_ssz_bytes(), ); self.next_tx_seq.store(tx.seq + 1, Ordering::SeqCst); - self.kvdb.write(db_tx)?; + self.flow_kvdb.write(db_tx)?; metrics::TX_STORE_PUT.update_since(start_time); Ok(old_tx_seq_list) } @@ -123,7 +128,7 @@ impl TransactionStore { if seq >= self.next_tx_seq() { return Ok(None); } - let value = try_option!(self.kvdb.get(COL_TX, &seq.to_be_bytes())?); + let value = try_option!(self.flow_kvdb.get(COL_TX, &seq.to_be_bytes())?); let tx = Transaction::from_ssz_bytes(&value).map_err(Error::from)?; metrics::TX_BY_SEQ_NUMBER.update_since(start_time); Ok(Some(tx)) @@ -132,15 +137,16 @@ impl TransactionStore { pub fn remove_tx_after(&self, min_seq: u64) -> Result> { let mut removed_txs = Vec::new(); let max_seq = self.next_tx_seq(); - let mut db_tx = self.kvdb.transaction(); + let mut flow_db_tx = self.flow_kvdb.transaction(); + let mut data_db_tx = self.data_kvdb.transaction(); let mut modified_merkle_root_map = HashMap::new(); for seq in min_seq..max_seq { let Some(tx) = self.get_tx_by_seq_number(seq)? else { error!(?seq, ?max_seq, "Transaction missing before the end"); break; }; - db_tx.delete(COL_TX, &seq.to_be_bytes()); - db_tx.delete(COL_TX_COMPLETED, &seq.to_be_bytes()); + flow_db_tx.delete(COL_TX, &seq.to_be_bytes()); + data_db_tx.delete(COL_TX_COMPLETED, &seq.to_be_bytes()); // We only remove tx when the blockchain reorgs. // If a tx is reverted, all data after it will also be reverted, so we call remove // all indices after it. @@ -155,24 +161,25 @@ impl TransactionStore { } for (merkle_root, tx_seq_list) in modified_merkle_root_map { if tx_seq_list.is_empty() { - db_tx.delete(COL_TX_DATA_ROOT_INDEX, merkle_root.as_bytes()); + flow_db_tx.delete(COL_TX_DATA_ROOT_INDEX, merkle_root.as_bytes()); } else { - db_tx.put( + flow_db_tx.put( COL_TX_DATA_ROOT_INDEX, merkle_root.as_bytes(), &tx_seq_list.as_ssz_bytes(), ); } } - db_tx.put(COL_TX, NEXT_TX_KEY.as_bytes(), &min_seq.to_be_bytes()); + flow_db_tx.put(COL_TX, NEXT_TX_KEY.as_bytes(), &min_seq.to_be_bytes()); self.next_tx_seq.store(min_seq, Ordering::SeqCst); - self.kvdb.write(db_tx)?; + self.data_kvdb.write(data_db_tx)?; + self.flow_kvdb.write(flow_db_tx)?; Ok(removed_txs) } pub fn get_tx_seq_list_by_data_root(&self, data_root: &DataRoot) -> Result> { let value = match self - .kvdb + .flow_kvdb .get(COL_TX_DATA_ROOT_INDEX, data_root.as_bytes())? { Some(v) => v, @@ -183,7 +190,7 @@ impl TransactionStore { #[instrument(skip(self))] pub fn finalize_tx(&self, tx_seq: u64) -> Result<()> { - Ok(self.kvdb.put( + Ok(self.data_kvdb.put( COL_TX_COMPLETED, &tx_seq.to_be_bytes(), &[TxStatus::Finalized.into()], @@ -192,7 +199,7 @@ impl TransactionStore { #[instrument(skip(self))] pub fn prune_tx(&self, tx_seq: u64) -> Result<()> { - Ok(self.kvdb.put( + Ok(self.data_kvdb.put( COL_TX_COMPLETED, &tx_seq.to_be_bytes(), &[TxStatus::Pruned.into()], @@ -200,7 +207,9 @@ impl TransactionStore { } pub fn get_tx_status(&self, tx_seq: u64) -> Result> { - let value = try_option!(self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())?); + let value = try_option!(self + .data_kvdb + .get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())?); match value.first() { Some(v) => Ok(Some(TxStatus::try_from(*v)?)), None => Ok(None), @@ -239,14 +248,14 @@ impl TransactionStore { (progress.1, p).as_ssz_bytes(), )); } - Ok(self.kvdb.puts(items)?) + Ok(self.flow_kvdb.puts(items)?) } #[instrument(skip(self))] pub fn get_progress(&self) -> Result> { Ok(Some( <(u64, H256)>::from_ssz_bytes(&try_option!(self - .kvdb + .flow_kvdb .get(COL_MISC, LOG_SYNC_PROGRESS_KEY.as_bytes())?)) .map_err(Error::from)?, )) @@ -254,7 +263,7 @@ impl TransactionStore { #[instrument(skip(self))] pub fn put_log_latest_block_number(&self, block_number: u64) -> Result<()> { - Ok(self.kvdb.put( + Ok(self.flow_kvdb.put( COL_MISC, LOG_LATEST_BLOCK_NUMBER_KEY.as_bytes(), &block_number.as_ssz_bytes(), @@ -265,7 +274,7 @@ impl TransactionStore { pub fn get_log_latest_block_number(&self) -> Result> { Ok(Some( ::from_ssz_bytes(&try_option!(self - .kvdb + .flow_kvdb .get(COL_MISC, LOG_LATEST_BLOCK_NUMBER_KEY.as_bytes())?)) .map_err(Error::from)?, )) @@ -277,7 +286,7 @@ impl TransactionStore { ) -> Result)>> { Ok(Some( <(H256, Option)>::from_ssz_bytes(&try_option!(self - .kvdb + .flow_kvdb .get(COL_BLOCK_PROGRESS, &block_number.to_be_bytes())?)) .map_err(Error::from)?, )) @@ -285,7 +294,7 @@ impl TransactionStore { pub fn get_block_hashes(&self) -> Result> { let mut block_numbers = vec![]; - for r in self.kvdb.iter(COL_BLOCK_PROGRESS) { + for r in self.flow_kvdb.iter(COL_BLOCK_PROGRESS) { let (key, val) = r?; let block_number = u64::from_be_bytes(key.as_ref().try_into().map_err(|e| anyhow!("{:?}", e))?); @@ -305,7 +314,7 @@ impl TransactionStore { pub fn delete_block_hash_by_number(&self, block_number: u64) -> Result<()> { Ok(self - .kvdb + .flow_kvdb .delete(COL_BLOCK_PROGRESS, &block_number.to_be_bytes())?) }