mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2024-12-24 23:35:18 +00:00
Set sync start index based on data in db. (#166)
Some checks failed
abi-consistent-check / build-and-compare (push) Has been cancelled
code-coverage / unittest-cov (push) Has been cancelled
rust / check (push) Has been cancelled
rust / test (push) Has been cancelled
rust / lints (push) Has been cancelled
functional-test / test (push) Has been cancelled
Some checks failed
abi-consistent-check / build-and-compare (push) Has been cancelled
code-coverage / unittest-cov (push) Has been cancelled
rust / check (push) Has been cancelled
rust / test (push) Has been cancelled
rust / lints (push) Has been cancelled
functional-test / test (push) Has been cancelled
* Set sync start index based on data in db. * Fix test. * nit.
This commit is contained in:
parent
f14a1b5975
commit
1c72607fbc
@ -609,9 +609,9 @@ impl LogManager {
|
|||||||
.get_tx_by_seq_number(last_tx_seq)?
|
.get_tx_by_seq_number(last_tx_seq)?
|
||||||
.expect("tx missing");
|
.expect("tx missing");
|
||||||
let mut current_len = initial_data.leaves();
|
let mut current_len = initial_data.leaves();
|
||||||
let expected_len = (last_tx.start_entry_index + last_tx.num_entries() as u64)
|
let expected_len =
|
||||||
/ PORA_CHUNK_SIZE as u64;
|
sector_to_segment(last_tx.start_entry_index + last_tx.num_entries() as u64);
|
||||||
match expected_len.cmp(&(current_len as u64)) {
|
match expected_len.cmp(&(current_len)) {
|
||||||
Ordering::Less => {
|
Ordering::Less => {
|
||||||
bail!(
|
bail!(
|
||||||
"Unexpected DB: merkle tree larger than the known data size,\
|
"Unexpected DB: merkle tree larger than the known data size,\
|
||||||
@ -634,10 +634,9 @@ impl LogManager {
|
|||||||
let previous_tx = tx_store
|
let previous_tx = tx_store
|
||||||
.get_tx_by_seq_number(last_tx_seq - 1)?
|
.get_tx_by_seq_number(last_tx_seq - 1)?
|
||||||
.expect("tx missing");
|
.expect("tx missing");
|
||||||
let expected_len = ((previous_tx.start_entry_index
|
let expected_len = sector_to_segment(
|
||||||
+ previous_tx.num_entries() as u64)
|
previous_tx.start_entry_index + previous_tx.num_entries() as u64,
|
||||||
/ PORA_CHUNK_SIZE as u64)
|
);
|
||||||
as usize;
|
|
||||||
if current_len > expected_len {
|
if current_len > expected_len {
|
||||||
while let Some((subtree_depth, _)) = initial_data.subtree_list.pop()
|
while let Some((subtree_depth, _)) = initial_data.subtree_list.pop()
|
||||||
{
|
{
|
||||||
@ -737,13 +736,13 @@ impl LogManager {
|
|||||||
maybe_tx_seq: Option<u64>,
|
maybe_tx_seq: Option<u64>,
|
||||||
) -> Result<FlowProof> {
|
) -> Result<FlowProof> {
|
||||||
let merkle = self.merkle.read_recursive();
|
let merkle = self.merkle.read_recursive();
|
||||||
let chunk_index = flow_index / PORA_CHUNK_SIZE as u64;
|
let seg_index = sector_to_segment(flow_index);
|
||||||
let top_proof = match maybe_tx_seq {
|
let top_proof = match maybe_tx_seq {
|
||||||
None => merkle.pora_chunks_merkle.gen_proof(chunk_index as usize)?,
|
None => merkle.pora_chunks_merkle.gen_proof(seg_index)?,
|
||||||
Some(tx_seq) => merkle
|
Some(tx_seq) => merkle
|
||||||
.pora_chunks_merkle
|
.pora_chunks_merkle
|
||||||
.at_version(tx_seq)?
|
.at_version(tx_seq)?
|
||||||
.gen_proof(chunk_index as usize)?,
|
.gen_proof(seg_index)?,
|
||||||
};
|
};
|
||||||
|
|
||||||
// TODO(zz): Maybe we can decide that all proofs are at the PoRA chunk level, so
|
// TODO(zz): Maybe we can decide that all proofs are at the PoRA chunk level, so
|
||||||
@ -753,11 +752,11 @@ impl LogManager {
|
|||||||
// and `flow_index` must be within a complete PoRA chunk. For possible future usages,
|
// and `flow_index` must be within a complete PoRA chunk. For possible future usages,
|
||||||
// we'll need to find the flow length at the given root and load a partial chunk
|
// we'll need to find the flow length at the given root and load a partial chunk
|
||||||
// if `flow_index` is in the last chunk.
|
// if `flow_index` is in the last chunk.
|
||||||
let sub_proof = if chunk_index as usize != merkle.pora_chunks_merkle.leaves() - 1
|
let sub_proof = if seg_index != merkle.pora_chunks_merkle.leaves() - 1
|
||||||
|| merkle.last_chunk_merkle.leaves() == 0
|
|| merkle.last_chunk_merkle.leaves() == 0
|
||||||
{
|
{
|
||||||
self.flow_store
|
self.flow_store
|
||||||
.gen_proof_in_batch(chunk_index as usize, flow_index as usize % PORA_CHUNK_SIZE)?
|
.gen_proof_in_batch(seg_index, flow_index as usize % PORA_CHUNK_SIZE)?
|
||||||
} else {
|
} else {
|
||||||
match maybe_tx_seq {
|
match maybe_tx_seq {
|
||||||
None => merkle
|
None => merkle
|
||||||
@ -1236,3 +1235,11 @@ pub fn tx_subtree_root_list_padded(data: &[u8]) -> Vec<(usize, DataRoot)> {
|
|||||||
|
|
||||||
root_list
|
root_list
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn sector_to_segment(sector_index: u64) -> usize {
|
||||||
|
(sector_index / PORA_CHUNK_SIZE as u64) as usize
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn segment_to_sector(segment_index: usize) -> usize {
|
||||||
|
segment_index * PORA_CHUNK_SIZE
|
||||||
|
}
|
||||||
|
@ -18,10 +18,12 @@ pub struct FileSyncGoal {
|
|||||||
pub index_start: u64,
|
pub index_start: u64,
|
||||||
/// Chunk index to sync to (exclusive).
|
/// Chunk index to sync to (exclusive).
|
||||||
pub index_end: u64,
|
pub index_end: u64,
|
||||||
|
/// `true` if we are syncing all the needed data of this file.
|
||||||
|
pub all_chunks: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FileSyncGoal {
|
impl FileSyncGoal {
|
||||||
pub fn new(num_chunks: u64, index_start: u64, index_end: u64) -> Self {
|
pub fn new(num_chunks: u64, index_start: u64, index_end: u64, all_chunks: bool) -> Self {
|
||||||
assert!(
|
assert!(
|
||||||
index_start < index_end && index_end <= num_chunks,
|
index_start < index_end && index_end <= num_chunks,
|
||||||
"invalid index_end"
|
"invalid index_end"
|
||||||
@ -30,15 +32,16 @@ impl FileSyncGoal {
|
|||||||
num_chunks,
|
num_chunks,
|
||||||
index_start,
|
index_start,
|
||||||
index_end,
|
index_end,
|
||||||
|
all_chunks,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new_file(num_chunks: u64) -> Self {
|
pub fn new_file(num_chunks: u64) -> Self {
|
||||||
Self::new(num_chunks, 0, num_chunks)
|
Self::new(num_chunks, 0, num_chunks, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn is_all_chunks(&self) -> bool {
|
pub fn is_all_chunks(&self) -> bool {
|
||||||
self.index_start == 0 && self.index_end == self.num_chunks
|
self.all_chunks
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -12,7 +12,7 @@ use network::{
|
|||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
use shared_types::{timestamp_now, ChunkArrayWithProof, TxID, CHUNK_SIZE};
|
use shared_types::{timestamp_now, ChunkArrayWithProof, TxID, CHUNK_SIZE};
|
||||||
use std::{sync::Arc, time::Instant};
|
use std::{sync::Arc, time::Instant};
|
||||||
use storage::log_store::log_manager::PORA_CHUNK_SIZE;
|
use storage::log_store::log_manager::{sector_to_segment, segment_to_sector, PORA_CHUNK_SIZE};
|
||||||
use storage_async::Store;
|
use storage_async::Store;
|
||||||
|
|
||||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||||
@ -139,7 +139,7 @@ impl SerialSyncController {
|
|||||||
if let Some((start, end)) = maybe_range {
|
if let Some((start, end)) = maybe_range {
|
||||||
// Sync new chunks regardless of previously downloaded file or chunks.
|
// Sync new chunks regardless of previously downloaded file or chunks.
|
||||||
// It's up to client to avoid duplicated chunks sync.
|
// It's up to client to avoid duplicated chunks sync.
|
||||||
self.goal = FileSyncGoal::new(self.goal.num_chunks, start, end);
|
self.goal = FileSyncGoal::new(self.goal.num_chunks, start, end, false);
|
||||||
self.next_chunk = start;
|
self.next_chunk = start;
|
||||||
} else if self.goal.is_all_chunks() {
|
} else if self.goal.is_all_chunks() {
|
||||||
// retry the failed file sync at break point
|
// retry the failed file sync at break point
|
||||||
@ -479,10 +479,10 @@ impl SerialSyncController {
|
|||||||
metrics::SERIAL_SYNC_SEGMENT_LATENCY.update_since(since.0);
|
metrics::SERIAL_SYNC_SEGMENT_LATENCY.update_since(since.0);
|
||||||
|
|
||||||
let shard_config = self.store.get_store().flow().get_shard_config();
|
let shard_config = self.store.get_store().flow().get_shard_config();
|
||||||
let next_chunk = shard_config.next_segment_index(
|
let next_chunk = segment_to_sector(shard_config.next_segment_index(
|
||||||
(from_chunk / PORA_CHUNK_SIZE as u64) as usize,
|
sector_to_segment(from_chunk),
|
||||||
(self.tx_start_chunk_in_flow / PORA_CHUNK_SIZE as u64) as usize,
|
sector_to_segment(self.tx_start_chunk_in_flow),
|
||||||
) * PORA_CHUNK_SIZE;
|
));
|
||||||
// store in db
|
// store in db
|
||||||
match self
|
match self
|
||||||
.store
|
.store
|
||||||
@ -576,12 +576,11 @@ impl SerialSyncController {
|
|||||||
|
|
||||||
/// Randomly select a `Connected` peer to sync chunks.
|
/// Randomly select a `Connected` peer to sync chunks.
|
||||||
fn select_peer_for_request(&self, request: &GetChunksRequest) -> Option<PeerId> {
|
fn select_peer_for_request(&self, request: &GetChunksRequest) -> Option<PeerId> {
|
||||||
let segment_index =
|
let segment_index = sector_to_segment(request.index_start + self.tx_start_chunk_in_flow);
|
||||||
(request.index_start + self.tx_start_chunk_in_flow) / PORA_CHUNK_SIZE as u64;
|
|
||||||
let mut peers = self.peers.filter_peers(vec![PeerState::Connected]);
|
let mut peers = self.peers.filter_peers(vec![PeerState::Connected]);
|
||||||
|
|
||||||
peers.retain(|peer_id| match self.peers.shard_config(peer_id) {
|
peers.retain(|peer_id| match self.peers.shard_config(peer_id) {
|
||||||
Some(v) => v.in_range(segment_index),
|
Some(v) => v.in_range(segment_index as u64),
|
||||||
None => false,
|
None => false,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -4,7 +4,7 @@ use crate::controllers::{
|
|||||||
FailureReason, FileSyncGoal, FileSyncInfo, SerialSyncController, SyncState,
|
FailureReason, FileSyncGoal, FileSyncInfo, SerialSyncController, SyncState,
|
||||||
};
|
};
|
||||||
use crate::{Config, SyncServiceState};
|
use crate::{Config, SyncServiceState};
|
||||||
use anyhow::{bail, Result};
|
use anyhow::{anyhow, bail, Result};
|
||||||
use file_location_cache::FileLocationCache;
|
use file_location_cache::FileLocationCache;
|
||||||
use libp2p::swarm::DialError;
|
use libp2p::swarm::DialError;
|
||||||
use log_entry_sync::LogSyncEvent;
|
use log_entry_sync::LogSyncEvent;
|
||||||
@ -14,14 +14,16 @@ use network::{
|
|||||||
rpc::GetChunksRequest, rpc::RPCResponseErrorCode, Multiaddr, NetworkMessage, PeerId,
|
rpc::GetChunksRequest, rpc::RPCResponseErrorCode, Multiaddr, NetworkMessage, PeerId,
|
||||||
PeerRequestId, SyncId as RequestId,
|
PeerRequestId, SyncId as RequestId,
|
||||||
};
|
};
|
||||||
use shared_types::{bytes_to_chunks, timestamp_now, ChunkArrayWithProof, TxID};
|
use shared_types::{bytes_to_chunks, timestamp_now, ChunkArrayWithProof, Transaction, TxID};
|
||||||
use std::sync::atomic::Ordering;
|
use std::sync::atomic::Ordering;
|
||||||
use std::{
|
use std::{
|
||||||
|
cmp,
|
||||||
collections::{hash_map::Entry, HashMap},
|
collections::{hash_map::Entry, HashMap},
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
};
|
};
|
||||||
use storage::config::ShardConfig;
|
use storage::config::ShardConfig;
|
||||||
use storage::error::Result as StorageResult;
|
use storage::error::Result as StorageResult;
|
||||||
|
use storage::log_store::log_manager::{sector_to_segment, segment_to_sector, PORA_CHUNK_SIZE};
|
||||||
use storage::log_store::Store as LogStore;
|
use storage::log_store::Store as LogStore;
|
||||||
use storage_async::Store;
|
use storage_async::Store;
|
||||||
use tokio::sync::{broadcast, mpsc, oneshot};
|
use tokio::sync::{broadcast, mpsc, oneshot};
|
||||||
@ -633,9 +635,19 @@ impl SyncService {
|
|||||||
bail!("File already exists");
|
bail!("File already exists");
|
||||||
}
|
}
|
||||||
|
|
||||||
let (index_start, index_end) = match maybe_range {
|
let (index_start, index_end, all_chunks) = match maybe_range {
|
||||||
Some((start, end)) => (start, end),
|
Some((start, end)) => (start, end, false),
|
||||||
None => (0, num_chunks),
|
None => {
|
||||||
|
let start = match Self::tx_sync_start_index(&self.store, &tx).await? {
|
||||||
|
Some(s) => s,
|
||||||
|
None => {
|
||||||
|
debug!(%tx.seq, "No more data needed");
|
||||||
|
self.store.finalize_tx_with_hash(tx.seq, tx.hash()).await?;
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
(start, num_chunks, true)
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
if index_start >= index_end || index_end > num_chunks {
|
if index_start >= index_end || index_end > num_chunks {
|
||||||
@ -646,7 +658,7 @@ impl SyncService {
|
|||||||
self.config,
|
self.config,
|
||||||
tx.id(),
|
tx.id(),
|
||||||
tx.start_entry_index(),
|
tx.start_entry_index(),
|
||||||
FileSyncGoal::new(num_chunks, index_start, index_end),
|
FileSyncGoal::new(num_chunks, index_start, index_end, all_chunks),
|
||||||
self.ctx.clone(),
|
self.ctx.clone(),
|
||||||
self.store.clone(),
|
self.store.clone(),
|
||||||
self.file_location_cache.clone(),
|
self.file_location_cache.clone(),
|
||||||
@ -793,6 +805,35 @@ impl SyncService {
|
|||||||
self.controllers.remove(&tx_seq);
|
self.controllers.remove(&tx_seq);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn tx_sync_start_index(store: &Store, tx: &Transaction) -> Result<Option<u64>> {
|
||||||
|
let shard_config = store.get_store().flow().get_shard_config();
|
||||||
|
let start_segment = sector_to_segment(tx.start_entry_index());
|
||||||
|
let end =
|
||||||
|
bytes_to_chunks(usize::try_from(tx.size).map_err(|e| anyhow!("tx size e={}", e))?);
|
||||||
|
let mut start = if shard_config.in_range(start_segment as u64) {
|
||||||
|
0
|
||||||
|
} else {
|
||||||
|
segment_to_sector(shard_config.next_segment_index(0, start_segment))
|
||||||
|
};
|
||||||
|
while start < end {
|
||||||
|
if store
|
||||||
|
.get_chunks_by_tx_and_index_range(
|
||||||
|
tx.seq,
|
||||||
|
start,
|
||||||
|
cmp::min(start + PORA_CHUNK_SIZE, end),
|
||||||
|
)
|
||||||
|
.await?
|
||||||
|
.is_none()
|
||||||
|
{
|
||||||
|
return Ok(Some(start as u64));
|
||||||
|
}
|
||||||
|
start = segment_to_sector(
|
||||||
|
shard_config.next_segment_index(sector_to_segment(start as u64), start_segment),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
@ -68,7 +68,7 @@ class ZgsNode(TestNode):
|
|||||||
os.mkdir(self.data_dir)
|
os.mkdir(self.data_dir)
|
||||||
log_config_path = os.path.join(self.data_dir, self.config["log_config_file"])
|
log_config_path = os.path.join(self.data_dir, self.config["log_config_file"])
|
||||||
with open(log_config_path, "w") as f:
|
with open(log_config_path, "w") as f:
|
||||||
f.write("debug,hyper=info,h2=info")
|
f.write("trace,hyper=info,h2=info")
|
||||||
|
|
||||||
initialize_toml_config(self.config_file, self.config)
|
initialize_toml_config(self.config_file, self.config)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user