fix tx store

This commit is contained in:
Peter Zhang 2024-11-20 16:02:11 +08:00
parent 40104de891
commit a3a1f3f865
2 changed files with 35 additions and 26 deletions

View File

@ -700,7 +700,7 @@ impl LogManager {
data_db_source: Arc<dyn ZgsKeyValueDB>, data_db_source: Arc<dyn ZgsKeyValueDB>,
config: LogConfig, config: LogConfig,
) -> Result<Self> { ) -> Result<Self> {
let tx_store = TransactionStore::new(data_db_source.clone())?; let tx_store = TransactionStore::new(flow_db_source.clone(), data_db_source.clone())?;
let flow_db = Arc::new(FlowDBStore::new(flow_db_source.clone())); let flow_db = Arc::new(FlowDBStore::new(flow_db_source.clone()));
let data_db = Arc::new(FlowDBStore::new(data_db_source.clone())); let data_db = Arc::new(FlowDBStore::new(data_db_source.clone()));
let flow_store = Arc::new(FlowStore::new( let flow_store = Arc::new(FlowStore::new(

View File

@ -57,19 +57,24 @@ pub struct BlockHashAndSubmissionIndex {
} }
pub struct TransactionStore { pub struct TransactionStore {
kvdb: Arc<dyn ZgsKeyValueDB>, flow_kvdb: Arc<dyn ZgsKeyValueDB>,
data_kvdb: Arc<dyn ZgsKeyValueDB>,
/// This is always updated before writing the database to ensure no intermediate states. /// This is always updated before writing the database to ensure no intermediate states.
next_tx_seq: AtomicU64, next_tx_seq: AtomicU64,
} }
impl TransactionStore { impl TransactionStore {
pub fn new(kvdb: Arc<dyn ZgsKeyValueDB>) -> Result<Self> { pub fn new(
let next_tx_seq = kvdb flow_kvdb: Arc<dyn ZgsKeyValueDB>,
data_kvdb: Arc<dyn ZgsKeyValueDB>,
) -> Result<Self> {
let next_tx_seq = flow_kvdb
.get(COL_TX, NEXT_TX_KEY.as_bytes())? .get(COL_TX, NEXT_TX_KEY.as_bytes())?
.map(|a| decode_tx_seq(&a)) .map(|a| decode_tx_seq(&a))
.unwrap_or(Ok(0))?; .unwrap_or(Ok(0))?;
Ok(Self { Ok(Self {
kvdb, flow_kvdb,
data_kvdb,
next_tx_seq: AtomicU64::new(next_tx_seq), next_tx_seq: AtomicU64::new(next_tx_seq),
}) })
} }
@ -86,7 +91,7 @@ impl TransactionStore {
return Ok(old_tx_seq_list); return Ok(old_tx_seq_list);
} }
let mut db_tx = self.kvdb.transaction(); let mut db_tx = self.flow_kvdb.transaction();
if !tx.data.is_empty() { if !tx.data.is_empty() {
tx.size = tx.data.len() as u64; tx.size = tx.data.len() as u64;
let mut padded_data = tx.data.clone(); let mut padded_data = tx.data.clone();
@ -113,7 +118,7 @@ impl TransactionStore {
&new_tx_seq_list.as_ssz_bytes(), &new_tx_seq_list.as_ssz_bytes(),
); );
self.next_tx_seq.store(tx.seq + 1, Ordering::SeqCst); self.next_tx_seq.store(tx.seq + 1, Ordering::SeqCst);
self.kvdb.write(db_tx)?; self.flow_kvdb.write(db_tx)?;
metrics::TX_STORE_PUT.update_since(start_time); metrics::TX_STORE_PUT.update_since(start_time);
Ok(old_tx_seq_list) Ok(old_tx_seq_list)
} }
@ -123,7 +128,7 @@ impl TransactionStore {
if seq >= self.next_tx_seq() { if seq >= self.next_tx_seq() {
return Ok(None); return Ok(None);
} }
let value = try_option!(self.kvdb.get(COL_TX, &seq.to_be_bytes())?); let value = try_option!(self.flow_kvdb.get(COL_TX, &seq.to_be_bytes())?);
let tx = Transaction::from_ssz_bytes(&value).map_err(Error::from)?; let tx = Transaction::from_ssz_bytes(&value).map_err(Error::from)?;
metrics::TX_BY_SEQ_NUMBER.update_since(start_time); metrics::TX_BY_SEQ_NUMBER.update_since(start_time);
Ok(Some(tx)) Ok(Some(tx))
@ -132,15 +137,16 @@ impl TransactionStore {
pub fn remove_tx_after(&self, min_seq: u64) -> Result<Vec<Transaction>> { pub fn remove_tx_after(&self, min_seq: u64) -> Result<Vec<Transaction>> {
let mut removed_txs = Vec::new(); let mut removed_txs = Vec::new();
let max_seq = self.next_tx_seq(); let max_seq = self.next_tx_seq();
let mut db_tx = self.kvdb.transaction(); let mut flow_db_tx = self.flow_kvdb.transaction();
let mut data_db_tx = self.data_kvdb.transaction();
let mut modified_merkle_root_map = HashMap::new(); let mut modified_merkle_root_map = HashMap::new();
for seq in min_seq..max_seq { for seq in min_seq..max_seq {
let Some(tx) = self.get_tx_by_seq_number(seq)? else { let Some(tx) = self.get_tx_by_seq_number(seq)? else {
error!(?seq, ?max_seq, "Transaction missing before the end"); error!(?seq, ?max_seq, "Transaction missing before the end");
break; break;
}; };
db_tx.delete(COL_TX, &seq.to_be_bytes()); flow_db_tx.delete(COL_TX, &seq.to_be_bytes());
db_tx.delete(COL_TX_COMPLETED, &seq.to_be_bytes()); data_db_tx.delete(COL_TX_COMPLETED, &seq.to_be_bytes());
// We only remove tx when the blockchain reorgs. // We only remove tx when the blockchain reorgs.
// If a tx is reverted, all data after it will also be reverted, so we call remove // If a tx is reverted, all data after it will also be reverted, so we call remove
// all indices after it. // all indices after it.
@ -155,24 +161,25 @@ impl TransactionStore {
} }
for (merkle_root, tx_seq_list) in modified_merkle_root_map { for (merkle_root, tx_seq_list) in modified_merkle_root_map {
if tx_seq_list.is_empty() { if tx_seq_list.is_empty() {
db_tx.delete(COL_TX_DATA_ROOT_INDEX, merkle_root.as_bytes()); flow_db_tx.delete(COL_TX_DATA_ROOT_INDEX, merkle_root.as_bytes());
} else { } else {
db_tx.put( flow_db_tx.put(
COL_TX_DATA_ROOT_INDEX, COL_TX_DATA_ROOT_INDEX,
merkle_root.as_bytes(), merkle_root.as_bytes(),
&tx_seq_list.as_ssz_bytes(), &tx_seq_list.as_ssz_bytes(),
); );
} }
} }
db_tx.put(COL_TX, NEXT_TX_KEY.as_bytes(), &min_seq.to_be_bytes()); flow_db_tx.put(COL_TX, NEXT_TX_KEY.as_bytes(), &min_seq.to_be_bytes());
self.next_tx_seq.store(min_seq, Ordering::SeqCst); self.next_tx_seq.store(min_seq, Ordering::SeqCst);
self.kvdb.write(db_tx)?; self.data_kvdb.write(data_db_tx)?;
self.flow_kvdb.write(flow_db_tx)?;
Ok(removed_txs) Ok(removed_txs)
} }
pub fn get_tx_seq_list_by_data_root(&self, data_root: &DataRoot) -> Result<Vec<u64>> { pub fn get_tx_seq_list_by_data_root(&self, data_root: &DataRoot) -> Result<Vec<u64>> {
let value = match self let value = match self
.kvdb .flow_kvdb
.get(COL_TX_DATA_ROOT_INDEX, data_root.as_bytes())? .get(COL_TX_DATA_ROOT_INDEX, data_root.as_bytes())?
{ {
Some(v) => v, Some(v) => v,
@ -183,7 +190,7 @@ impl TransactionStore {
#[instrument(skip(self))] #[instrument(skip(self))]
pub fn finalize_tx(&self, tx_seq: u64) -> Result<()> { pub fn finalize_tx(&self, tx_seq: u64) -> Result<()> {
Ok(self.kvdb.put( Ok(self.data_kvdb.put(
COL_TX_COMPLETED, COL_TX_COMPLETED,
&tx_seq.to_be_bytes(), &tx_seq.to_be_bytes(),
&[TxStatus::Finalized.into()], &[TxStatus::Finalized.into()],
@ -192,7 +199,7 @@ impl TransactionStore {
#[instrument(skip(self))] #[instrument(skip(self))]
pub fn prune_tx(&self, tx_seq: u64) -> Result<()> { pub fn prune_tx(&self, tx_seq: u64) -> Result<()> {
Ok(self.kvdb.put( Ok(self.data_kvdb.put(
COL_TX_COMPLETED, COL_TX_COMPLETED,
&tx_seq.to_be_bytes(), &tx_seq.to_be_bytes(),
&[TxStatus::Pruned.into()], &[TxStatus::Pruned.into()],
@ -200,7 +207,9 @@ impl TransactionStore {
} }
pub fn get_tx_status(&self, tx_seq: u64) -> Result<Option<TxStatus>> { pub fn get_tx_status(&self, tx_seq: u64) -> Result<Option<TxStatus>> {
let value = try_option!(self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())?); let value = try_option!(self
.data_kvdb
.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())?);
match value.first() { match value.first() {
Some(v) => Ok(Some(TxStatus::try_from(*v)?)), Some(v) => Ok(Some(TxStatus::try_from(*v)?)),
None => Ok(None), None => Ok(None),
@ -239,14 +248,14 @@ impl TransactionStore {
(progress.1, p).as_ssz_bytes(), (progress.1, p).as_ssz_bytes(),
)); ));
} }
Ok(self.kvdb.puts(items)?) Ok(self.flow_kvdb.puts(items)?)
} }
#[instrument(skip(self))] #[instrument(skip(self))]
pub fn get_progress(&self) -> Result<Option<(u64, H256)>> { pub fn get_progress(&self) -> Result<Option<(u64, H256)>> {
Ok(Some( Ok(Some(
<(u64, H256)>::from_ssz_bytes(&try_option!(self <(u64, H256)>::from_ssz_bytes(&try_option!(self
.kvdb .flow_kvdb
.get(COL_MISC, LOG_SYNC_PROGRESS_KEY.as_bytes())?)) .get(COL_MISC, LOG_SYNC_PROGRESS_KEY.as_bytes())?))
.map_err(Error::from)?, .map_err(Error::from)?,
)) ))
@ -254,7 +263,7 @@ impl TransactionStore {
#[instrument(skip(self))] #[instrument(skip(self))]
pub fn put_log_latest_block_number(&self, block_number: u64) -> Result<()> { pub fn put_log_latest_block_number(&self, block_number: u64) -> Result<()> {
Ok(self.kvdb.put( Ok(self.flow_kvdb.put(
COL_MISC, COL_MISC,
LOG_LATEST_BLOCK_NUMBER_KEY.as_bytes(), LOG_LATEST_BLOCK_NUMBER_KEY.as_bytes(),
&block_number.as_ssz_bytes(), &block_number.as_ssz_bytes(),
@ -265,7 +274,7 @@ impl TransactionStore {
pub fn get_log_latest_block_number(&self) -> Result<Option<u64>> { pub fn get_log_latest_block_number(&self) -> Result<Option<u64>> {
Ok(Some( Ok(Some(
<u64>::from_ssz_bytes(&try_option!(self <u64>::from_ssz_bytes(&try_option!(self
.kvdb .flow_kvdb
.get(COL_MISC, LOG_LATEST_BLOCK_NUMBER_KEY.as_bytes())?)) .get(COL_MISC, LOG_LATEST_BLOCK_NUMBER_KEY.as_bytes())?))
.map_err(Error::from)?, .map_err(Error::from)?,
)) ))
@ -277,7 +286,7 @@ impl TransactionStore {
) -> Result<Option<(H256, Option<u64>)>> { ) -> Result<Option<(H256, Option<u64>)>> {
Ok(Some( Ok(Some(
<(H256, Option<u64>)>::from_ssz_bytes(&try_option!(self <(H256, Option<u64>)>::from_ssz_bytes(&try_option!(self
.kvdb .flow_kvdb
.get(COL_BLOCK_PROGRESS, &block_number.to_be_bytes())?)) .get(COL_BLOCK_PROGRESS, &block_number.to_be_bytes())?))
.map_err(Error::from)?, .map_err(Error::from)?,
)) ))
@ -285,7 +294,7 @@ impl TransactionStore {
pub fn get_block_hashes(&self) -> Result<Vec<(u64, BlockHashAndSubmissionIndex)>> { pub fn get_block_hashes(&self) -> Result<Vec<(u64, BlockHashAndSubmissionIndex)>> {
let mut block_numbers = vec![]; let mut block_numbers = vec![];
for r in self.kvdb.iter(COL_BLOCK_PROGRESS) { for r in self.flow_kvdb.iter(COL_BLOCK_PROGRESS) {
let (key, val) = r?; let (key, val) = r?;
let block_number = let block_number =
u64::from_be_bytes(key.as_ref().try_into().map_err(|e| anyhow!("{:?}", e))?); u64::from_be_bytes(key.as_ref().try_into().map_err(|e| anyhow!("{:?}", e))?);
@ -305,7 +314,7 @@ impl TransactionStore {
pub fn delete_block_hash_by_number(&self, block_number: u64) -> Result<()> { pub fn delete_block_hash_by_number(&self, block_number: u64) -> Result<()> {
Ok(self Ok(self
.kvdb .flow_kvdb
.delete(COL_BLOCK_PROGRESS, &block_number.to_be_bytes())?) .delete(COL_BLOCK_PROGRESS, &block_number.to_be_bytes())?)
} }