fix: finalize check on tx sync (#220)
Some checks failed
abi-consistent-check / build-and-compare (push) Has been cancelled
code-coverage / unittest-cov (push) Has been cancelled
rust / check (push) Has been cancelled
rust / test (push) Has been cancelled
rust / lints (push) Has been cancelled
functional-test / test (push) Has been cancelled

* fix: finalize

* fix: check

* fix: end_segment_index

* fix: fmt
This commit is contained in:
MiniFrenchBread 2024-09-30 18:43:11 +08:00 committed by GitHub
parent 07ac814e57
commit 949462084a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -12,7 +12,7 @@ use std::fmt::Debug;
use std::future::Future; use std::future::Future;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use storage::log_store::log_manager::sector_to_segment; use storage::log_store::log_manager::PORA_CHUNK_SIZE;
use storage::log_store::{tx_store::BlockHashAndSubmissionIndex, Store}; use storage::log_store::{tx_store::BlockHashAndSubmissionIndex, Store};
use task_executor::{ShutdownReason, TaskExecutor}; use task_executor::{ShutdownReason, TaskExecutor};
use thiserror::Error; use thiserror::Error;
@ -479,11 +479,30 @@ impl LogSyncManager {
return false; return false;
} }
} else { } else {
// check if current node need to save at least one segment
let store = self.store.clone(); let store = self.store.clone();
let shard_config = store.flow().get_shard_config(); let shard_config = store.flow().get_shard_config();
if sector_to_segment(bytes_to_chunks(tx.size as usize) as u64) let start_segment_index = tx.start_entry_index as usize / PORA_CHUNK_SIZE;
< shard_config.shard_id let sector_size = bytes_to_chunks(tx.size as usize);
{ let end_segment_index = start_segment_index
+ ((sector_size + PORA_CHUNK_SIZE - 1) / PORA_CHUNK_SIZE)
- 1;
let mut can_finalize = false;
if end_segment_index < shard_config.shard_id {
can_finalize = true;
} else {
// check if there is a number N between [start_segment_index, end_segment_index] that satisfy:
// N % num_shard = shard_id
let min_n_gte_start =
(start_segment_index + shard_config.num_shard - 1 - shard_config.shard_id)
/ shard_config.num_shard;
let max_n_lte_end =
(end_segment_index - shard_config.shard_id) / shard_config.num_shard;
if min_n_gte_start > max_n_lte_end {
can_finalize = true;
}
}
if can_finalize {
if let Err(e) = store.finalize_tx_with_hash(tx.seq, tx.hash()) { if let Err(e) = store.finalize_tx_with_hash(tx.seq, tx.hash()) {
error!("finalize file that does not need to store: e={:?}", e); error!("finalize file that does not need to store: e={:?}", e);
return false; return false;