From 949462084a0108cb31b17b79b8be5911b5686ef9 Mon Sep 17 00:00:00 2001 From: MiniFrenchBread <103425574+MiniFrenchBread@users.noreply.github.com> Date: Mon, 30 Sep 2024 18:43:11 +0800 Subject: [PATCH] fix: finalize check on tx sync (#220) * fix: finalize * fix: check * fix: end_segment_index * fix: fmt --- node/log_entry_sync/src/sync_manager/mod.rs | 27 ++++++++++++++++++--- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/node/log_entry_sync/src/sync_manager/mod.rs b/node/log_entry_sync/src/sync_manager/mod.rs index 39b0b3e..48b3558 100644 --- a/node/log_entry_sync/src/sync_manager/mod.rs +++ b/node/log_entry_sync/src/sync_manager/mod.rs @@ -12,7 +12,7 @@ use std::fmt::Debug; use std::future::Future; use std::sync::Arc; use std::time::{Duration, Instant}; -use storage::log_store::log_manager::sector_to_segment; +use storage::log_store::log_manager::PORA_CHUNK_SIZE; use storage::log_store::{tx_store::BlockHashAndSubmissionIndex, Store}; use task_executor::{ShutdownReason, TaskExecutor}; use thiserror::Error; @@ -479,11 +479,30 @@ impl LogSyncManager { return false; } } else { + // check if current node need to save at least one segment let store = self.store.clone(); let shard_config = store.flow().get_shard_config(); - if sector_to_segment(bytes_to_chunks(tx.size as usize) as u64) - < shard_config.shard_id - { + let start_segment_index = tx.start_entry_index as usize / PORA_CHUNK_SIZE; + let sector_size = bytes_to_chunks(tx.size as usize); + let end_segment_index = start_segment_index + + ((sector_size + PORA_CHUNK_SIZE - 1) / PORA_CHUNK_SIZE) + - 1; + let mut can_finalize = false; + if end_segment_index < shard_config.shard_id { + can_finalize = true; + } else { + // check if there is a number N between [start_segment_index, end_segment_index] that satisfy: + // N % num_shard = shard_id + let min_n_gte_start = + (start_segment_index + shard_config.num_shard - 1 - shard_config.shard_id) + / shard_config.num_shard; + let max_n_lte_end = + (end_segment_index - shard_config.shard_id) / shard_config.num_shard; + if min_n_gte_start > max_n_lte_end { + can_finalize = true; + } + } + if can_finalize { if let Err(e) = store.finalize_tx_with_hash(tx.seq, tx.hash()) { error!("finalize file that does not need to store: e={:?}", e); return false;