Compare commits

..

2 Commits

Author SHA1 Message Date
0g-peterzhb
1092344598
Merge c8fbb13443 into 395aeabde7 2024-10-05 22:16:51 +08:00
peilun-conflux
395aeabde7
fix: do not finalize same-root tx with missing data. (#222)
Some checks failed
abi-consistent-check / build-and-compare (push) Has been cancelled
code-coverage / unittest-cov (push) Has been cancelled
rust / check (push) Has been cancelled
rust / test (push) Has been cancelled
rust / lints (push) Has been cancelled
functional-test / test (push) Has been cancelled
2024-10-05 22:16:33 +08:00

View File

@ -268,8 +268,7 @@ impl LogStoreWrite for LogManager {
if let Some(old_tx_seq) = maybe_same_data_tx_seq {
if self.check_tx_completed(old_tx_seq)? {
self.copy_tx_data(old_tx_seq, vec![tx.seq])?;
self.tx_store.finalize_tx(tx.seq)?;
self.copy_tx_and_finalize(old_tx_seq, vec![tx.seq])?;
}
}
Ok(())
@ -292,7 +291,7 @@ impl LogStoreWrite for LogManager {
.get_tx_seq_list_by_data_root(&tx.data_merkle_root)?;
// Check if there are other same-root transaction not finalized.
if same_root_seq_list.first() == Some(&tx_seq) {
self.copy_tx_data(tx_seq, same_root_seq_list[1..].to_vec())?;
self.copy_tx_and_finalize(tx_seq, same_root_seq_list[1..].to_vec())?;
}
self.tx_store.finalize_tx(tx_seq)?;
Ok(())
@ -328,7 +327,7 @@ impl LogStoreWrite for LogManager {
.get_tx_seq_list_by_data_root(&tx.data_merkle_root)?;
// Check if there are other same-root transaction not finalized.
if same_root_seq_list.first() == Some(&tx_seq) {
self.copy_tx_data(tx_seq, same_root_seq_list[1..].to_vec())?;
self.copy_tx_and_finalize(tx_seq, same_root_seq_list[1..].to_vec())?;
}
Ok(true)
} else {
@ -1144,8 +1143,9 @@ impl LogManager {
Ok(())
}
fn copy_tx_data(&self, from_tx_seq: u64, to_tx_seq_list: Vec<u64>) -> Result<()> {
fn copy_tx_and_finalize(&self, from_tx_seq: u64, to_tx_seq_list: Vec<u64>) -> Result<()> {
let mut merkle = self.merkle.write();
let shard_config = self.flow_store.get_shard_config();
// We have all the data need for this tx, so just copy them.
let old_tx = self
.get_tx_by_seq_number(from_tx_seq)?
@ -1159,6 +1159,12 @@ impl LogManager {
let tx = self
.get_tx_by_seq_number(seq)?
.ok_or_else(|| anyhow!("to tx missing"))?;
// Data for `tx` is not available due to sharding.
if sector_to_segment(tx.start_entry_index) % shard_config.num_shard
!= sector_to_segment(old_tx.start_entry_index) % shard_config.num_shard
{
continue;
}
to_tx_offset_list.push((tx.seq, tx.start_entry_index - old_tx.start_entry_index));
}
if to_tx_offset_list.is_empty() {
@ -1170,7 +1176,7 @@ impl LogManager {
old_tx.start_entry_index,
old_tx.start_entry_index + old_tx.num_entries() as u64,
PORA_CHUNK_SIZE,
self.flow_store.get_shard_config(),
shard_config,
) {
let batch_data = self
.get_chunk_by_flow_index(batch_start, batch_end - batch_start)?