Sync and finalize sharded data. (#79)

* Save test logs.

* Prepare to handle shard config in chunk pool.

* Allow files to be finalized with only sharded data.

* Handle shard config change.

* Only sync needed data in shard config.

* Fix clippy and tests.
This commit is contained in:
peilun-conflux 2024-06-08 02:50:36 +08:00 committed by GitHub
parent c2c6e2d5fb
commit 0f52325f67
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 203 additions and 59 deletions

1
Cargo.lock generated
View File

@ -6163,6 +6163,7 @@ version = "0.1.0"
dependencies = [
"channel",
"chrono",
"chunk_pool",
"error-chain",
"file_location_cache",
"futures",

View File

@ -4,13 +4,13 @@ use anyhow::Result;
use network::NetworkMessage;
use shared_types::{ChunkArray, FileProof};
use std::{sync::Arc, time::SystemTime};
use storage_async::Store;
use storage_async::{ShardConfig, Store};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
/// Handle the cached file when uploaded completely and verified from blockchain.
/// Generally, the file will be persisted into log store.
pub struct ChunkPoolHandler {
receiver: UnboundedReceiver<FileID>,
receiver: UnboundedReceiver<ChunkPoolMessage>,
mem_pool: Arc<MemoryChunkPool>,
log_store: Store,
sender: UnboundedSender<NetworkMessage>,
@ -18,7 +18,7 @@ pub struct ChunkPoolHandler {
impl ChunkPoolHandler {
pub(crate) fn new(
receiver: UnboundedReceiver<FileID>,
receiver: UnboundedReceiver<ChunkPoolMessage>,
mem_pool: Arc<MemoryChunkPool>,
log_store: Store,
sender: UnboundedSender<NetworkMessage>,
@ -31,14 +31,20 @@ impl ChunkPoolHandler {
}
}
async fn handle(&mut self) -> Result<bool> {
match self.receiver.recv().await {
Some(ChunkPoolMessage::FinalizeFile(file_id)) => self.handle_file_id(file_id).await,
Some(ChunkPoolMessage::ChangeShardConfig(shard_config)) => {
self.handle_change_shard_config(shard_config).await;
Ok(true)
}
None => Ok(false),
}
}
/// Writes memory cached chunks into store and finalize transaction.
/// Note, a separate thread should be spawned to call this method.
pub async fn handle(&mut self) -> Result<bool> {
let id = match self.receiver.recv().await {
Some(id) => id,
None => return Ok(false),
};
async fn handle_file_id(&mut self, id: FileID) -> Result<bool> {
debug!(?id, "Received task to finalize transaction");
// TODO(qhz): remove from memory pool after transaction finalized,
@ -88,6 +94,10 @@ impl ChunkPoolHandler {
Ok(true)
}
async fn handle_change_shard_config(&self, shard_config: ShardConfig) {
self.mem_pool.set_shard_config(shard_config).await
}
pub async fn run(mut self) {
info!("Worker started to finalize transactions");
@ -98,3 +108,8 @@ impl ChunkPoolHandler {
}
}
}
pub enum ChunkPoolMessage {
FinalizeFile(FileID),
ChangeShardConfig(ShardConfig),
}

View File

@ -4,11 +4,12 @@ extern crate tracing;
mod handler;
mod mem_pool;
pub use handler::ChunkPoolHandler;
pub use handler::{ChunkPoolHandler, ChunkPoolMessage};
pub use mem_pool::{FileID, MemoryChunkPool, SegmentInfo};
use std::sync::Arc;
use std::time::Duration;
use storage_async::ShardConfig;
#[derive(Clone, Copy, Debug)]
pub struct Config {
@ -16,6 +17,7 @@ pub struct Config {
pub max_cached_chunks_all: usize,
pub max_writings: usize,
pub expiration_time_secs: u64,
pub shard_config: ShardConfig,
}
impl Config {

View File

@ -1,15 +1,16 @@
use super::chunk_cache::{ChunkPoolCache, MemoryCachedFile};
use super::chunk_write_control::ChunkPoolWriteCtrl;
use super::FileID;
use crate::handler::ChunkPoolMessage;
use crate::Config;
use anyhow::{bail, Result};
use anyhow::{anyhow, bail, Result};
use async_lock::Mutex;
use log_entry_sync::LogSyncEvent;
use shared_types::{
bytes_to_chunks, compute_segment_size, ChunkArray, DataRoot, FileProof, Transaction, CHUNK_SIZE,
};
use std::sync::Arc;
use storage_async::Store;
use storage_async::{ShardConfig, Store};
use tokio::sync::broadcast::{error::RecvError, Receiver};
use tokio::sync::mpsc::UnboundedSender;
@ -82,11 +83,15 @@ impl From<SegmentInfo> for (ChunkArray, FileProof) {
pub struct MemoryChunkPool {
inner: Mutex<Inner>,
log_store: Store,
sender: UnboundedSender<FileID>,
sender: UnboundedSender<ChunkPoolMessage>,
}
impl MemoryChunkPool {
pub(crate) fn new(config: Config, log_store: Store, sender: UnboundedSender<FileID>) -> Self {
pub(crate) fn new(
config: Config,
log_store: Store,
sender: UnboundedSender<ChunkPoolMessage>,
) -> Self {
MemoryChunkPool {
inner: Mutex::new(Inner::new(config)),
log_store,
@ -142,10 +147,18 @@ impl MemoryChunkPool {
//Write the segment in window
let (total_segments, _) = compute_segment_size(total_chunks, seg_info.chunks_per_segment);
let tx_start_index = self
.log_store
.get_tx_by_seq_number(file_id.tx_id.seq)
.await?
.ok_or(anyhow!("unexpected tx missing"))?
.start_entry_index()
/ seg_info.chunks_per_segment as u64;
self.inner.lock().await.write_control.write_segment(
file_id,
seg_info.seg_index,
total_segments,
tx_start_index as usize,
)?;
// Write memory cached segments into store.
@ -201,10 +214,7 @@ impl MemoryChunkPool {
// Notify to finalize transaction asynchronously.
if all_uploaded {
if let Err(e) = self.sender.send(file_id) {
// Channel receiver will not be dropped until program exit.
bail!("channel send error: {}", e);
}
self.send_finalize_file(file_id).await?;
debug!("Queue to finalize transaction for file {}", seg_info.root);
}
@ -328,10 +338,7 @@ impl MemoryChunkPool {
self.inner.lock().await.after_flush_cache();
if let Err(e) = self.sender.send(file) {
// Channel receiver will not be dropped until program exit.
bail!("channel send error: {}", e);
}
self.send_finalize_file(file).await?;
Ok(())
}
@ -348,4 +355,24 @@ impl MemoryChunkPool {
.map(|file| (file.uploaded_seg_num(), false))
}
}
async fn send_finalize_file(&self, file_id: FileID) -> Result<()> {
if let Err(e) = self.sender.send(ChunkPoolMessage::FinalizeFile(file_id)) {
// Channel receiver will not be dropped until program exit.
bail!("channel send error: {}", e);
}
Ok(())
}
pub fn sender(&self) -> UnboundedSender<ChunkPoolMessage> {
self.sender.clone()
}
pub async fn set_shard_config(&self, shard_config: ShardConfig) {
let mut inner = self.inner.lock().await;
if inner.config.shard_config != shard_config {
inner.config.shard_config = shard_config;
inner.write_control.update_shard_config(shard_config);
}
}
}

View File

@ -3,6 +3,7 @@ use crate::Config;
use anyhow::{bail, Result};
use shared_types::DataRoot;
use std::collections::HashMap;
use storage_async::ShardConfig;
/// The segment status in sliding window
#[derive(PartialEq, Eq, Debug)]
@ -18,15 +19,20 @@ enum SlotStatus {
struct CtrlWindow {
#[allow(unused)]
size: usize,
tx_start_index: usize,
shard_config: ShardConfig,
left_boundary: usize,
slots: HashMap<usize, SlotStatus>,
}
impl CtrlWindow {
fn new(size: usize) -> Self {
fn new(size: usize, shard_config: ShardConfig, tx_start_index: usize) -> Self {
CtrlWindow {
size,
left_boundary: 0,
tx_start_index,
shard_config,
left_boundary: shard_config.next_segment_index(0, tx_start_index)
% shard_config.num_shard,
slots: HashMap::default(),
}
}
@ -70,7 +76,10 @@ impl CtrlWindow {
let mut left_boundary = self.left_boundary;
while let Some(&SlotStatus::Finished) = self.slots.get(&left_boundary) {
self.slots.remove(&left_boundary);
left_boundary += 1;
// Handle shard_config change.
left_boundary = self
.shard_config
.next_segment_index(left_boundary, self.tx_start_index);
}
self.left_boundary = left_boundary;
@ -85,11 +94,17 @@ pub struct FileWriteCtrl {
}
impl FileWriteCtrl {
fn new(id: FileID, total_segments: usize, window_size: usize) -> Self {
fn new(
id: FileID,
total_segments: usize,
window_size: usize,
shard_config: ShardConfig,
tx_start_index: usize,
) -> Self {
FileWriteCtrl {
id,
total_segments,
window: CtrlWindow::new(window_size),
window: CtrlWindow::new(window_size, shard_config, tx_start_index),
}
}
@ -130,9 +145,16 @@ impl ChunkPoolWriteCtrl {
id: FileID,
seg_index: usize,
total_segments: usize,
tx_start_index: usize,
) -> Result<()> {
let file_ctrl = self.files.entry(id.root).or_insert_with(|| {
FileWriteCtrl::new(id, total_segments, self.config.write_window_size)
FileWriteCtrl::new(
id,
total_segments,
self.config.write_window_size,
self.config.shard_config,
tx_start_index,
)
});
// ensure the tx_id not changed during file uploading
@ -183,7 +205,8 @@ impl ChunkPoolWriteCtrl {
);
// All chunks of file written into store.
file_ctrl.window.left_boundary >= file_ctrl.total_segments
file_ctrl.window.left_boundary + self.config.shard_config.num_shard
> file_ctrl.total_segments
}
pub fn on_write_failed(&mut self, root: &DataRoot, seg_index: usize) {
@ -198,4 +221,11 @@ impl ChunkPoolWriteCtrl {
assert!(self.total_writings > 0);
self.total_writings -= 1;
}
pub fn update_shard_config(&mut self, shard_config: ShardConfig) {
self.config.shard_config = shard_config;
for file_ctrl in self.files.values_mut() {
file_ctrl.window.shard_config = shard_config;
}
}
}

View File

@ -17,6 +17,7 @@ storage-async = { path = "../storage-async" }
sync = { path = "../sync" }
task_executor = { path = "../../common/task_executor" }
pruner = { path = "../pruner" }
chunk_pool = { path = "../chunk_pool" }
tokio = { version = "1.19.2", features = ["full"] }
tracing = "0.1.35"
rand = "0.8.5"

View File

@ -1,5 +1,6 @@
use std::{ops::Neg, sync::Arc};
use chunk_pool::ChunkPoolMessage;
use file_location_cache::FileLocationCache;
use network::types::{AnnounceShardConfig, SignedAnnounceShardConfig};
use network::{
@ -15,6 +16,7 @@ use shared_types::{bytes_to_chunks, timestamp_now, TxID};
use storage::config::ShardConfig;
use storage_async::Store;
use sync::{SyncMessage, SyncSender};
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::{mpsc, RwLock};
use crate::peer_manager::PeerManager;
@ -69,6 +71,8 @@ pub struct Libp2pEventHandler {
network_send: mpsc::UnboundedSender<NetworkMessage>,
/// A channel to the syncing service.
sync_send: SyncSender,
/// A channel to the RPC chunk pool service.
chunk_pool_send: mpsc::UnboundedSender<ChunkPoolMessage>,
/// Node keypair for signing messages.
local_keypair: Keypair,
/// Log and transaction storage.
@ -80,10 +84,12 @@ pub struct Libp2pEventHandler {
}
impl Libp2pEventHandler {
#[allow(clippy::too_many_arguments)]
pub fn new(
network_globals: Arc<NetworkGlobals>,
network_send: mpsc::UnboundedSender<NetworkMessage>,
sync_send: SyncSender,
chunk_pool_send: UnboundedSender<ChunkPoolMessage>,
local_keypair: Keypair,
store: Store,
file_location_cache: Arc<FileLocationCache>,
@ -93,6 +99,7 @@ impl Libp2pEventHandler {
network_globals,
network_send,
sync_send,
chunk_pool_send,
local_keypair,
store,
file_location_cache,
@ -112,6 +119,12 @@ impl Libp2pEventHandler {
});
}
pub fn send_to_chunk_pool(&self, message: ChunkPoolMessage) {
self.chunk_pool_send.send(message).unwrap_or_else(|err| {
warn!(%err, "Could not send message to the chunk pool service");
});
}
pub fn publish(&self, msg: PubsubMessage) {
self.send_to_network(NetworkMessage::Publish {
messages: vec![msg],
@ -583,6 +596,8 @@ mod tests {
network_recv: mpsc::UnboundedReceiver<NetworkMessage>,
sync_send: SyncSender,
sync_recv: SyncReceiver,
chunk_pool_send: mpsc::UnboundedSender<ChunkPoolMessage>,
chunk_pool_recv: mpsc::UnboundedReceiver<ChunkPoolMessage>,
store: Arc<RwLock<dyn Store>>,
file_location_cache: Arc<FileLocationCache>,
peers: Arc<RwLock<PeerManager>>,
@ -594,6 +609,7 @@ mod tests {
let (network_globals, keypair) = Context::new_network_globals();
let (network_send, network_recv) = mpsc::unbounded_channel();
let (sync_send, sync_recv) = channel::Channel::unbounded();
let (chunk_pool_send, chunk_pool_recv) = mpsc::unbounded_channel();
let store = LogManager::memorydb(LogConfig::default()).unwrap();
Self {
runtime,
@ -603,6 +619,8 @@ mod tests {
network_recv,
sync_send,
sync_recv,
chunk_pool_send,
chunk_pool_recv,
store: Arc::new(RwLock::new(store)),
file_location_cache: Arc::new(FileLocationCache::default()),
peers: Arc::new(RwLock::new(PeerManager::new(Config::default()))),
@ -616,6 +634,7 @@ mod tests {
self.network_globals.clone(),
self.network_send.clone(),
self.sync_send.clone(),
self.chunk_pool_send.clone(),
self.keypair.clone(),
storage_async::Store::new(self.store.clone(), self.runtime.task_executor.clone()),
self.file_location_cache.clone(),

View File

@ -1,5 +1,6 @@
use crate::Config;
use crate::{libp2p_event_handler::Libp2pEventHandler, peer_manager::PeerManager};
use chunk_pool::ChunkPoolMessage;
use file_location_cache::FileLocationCache;
use futures::{channel::mpsc::Sender, prelude::*};
use miner::MinerMessage;
@ -14,6 +15,7 @@ use storage::log_store::Store as LogStore;
use storage_async::Store;
use sync::{SyncMessage, SyncSender};
use task_executor::ShutdownReason;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::{broadcast, mpsc, RwLock};
use tokio::time::interval;
@ -54,6 +56,7 @@ impl RouterService {
network_send: mpsc::UnboundedSender<NetworkMessage>,
sync_send: SyncSender,
_miner_send: Option<broadcast::Sender<MinerMessage>>,
chunk_pool_send: UnboundedSender<ChunkPoolMessage>,
pruner_recv: Option<mpsc::UnboundedReceiver<PrunerMessage>>,
store: Arc<RwLock<dyn LogStore>>,
file_location_cache: Arc<FileLocationCache>,
@ -75,6 +78,7 @@ impl RouterService {
network_globals,
network_send,
sync_send,
chunk_pool_send,
local_keypair,
store,
file_location_cache,
@ -341,6 +345,8 @@ impl RouterService {
async fn on_pruner_msg(&mut self, msg: PrunerMessage) {
match msg {
PrunerMessage::ChangeShardConfig(shard_config) => {
self.libp2p_event_handler
.send_to_chunk_pool(ChunkPoolMessage::ChangeShardConfig(shard_config));
if let Some(msg) = self
.libp2p_event_handler
.construct_announce_shard_config_message(shard_config)

View File

@ -1,5 +1,5 @@
use super::{Client, RuntimeContext};
use chunk_pool::{Config as ChunkPoolConfig, MemoryChunkPool};
use chunk_pool::{ChunkPoolMessage, Config as ChunkPoolConfig, MemoryChunkPool};
use file_location_cache::FileLocationCache;
use log_entry_sync::{LogSyncConfig, LogSyncEvent, LogSyncManager};
use miner::{MineService, MinerConfig, MinerMessage};
@ -55,6 +55,10 @@ struct PrunerComponents {
owned: Option<mpsc::UnboundedReceiver<PrunerMessage>>,
}
struct ChunkPoolComponents {
send: mpsc::UnboundedSender<ChunkPoolMessage>,
}
/// Builds a `Client` instance.
///
/// ## Notes
@ -72,6 +76,7 @@ pub struct ClientBuilder {
miner: Option<MinerComponents>,
log_sync: Option<LogSyncComponents>,
pruner: Option<PrunerComponents>,
chunk_pool: Option<ChunkPoolComponents>,
}
impl ClientBuilder {
@ -199,6 +204,7 @@ impl ClientBuilder {
let executor = require!("router", self, runtime_context).clone().executor;
let sync_send = require!("router", self, sync).send.clone(); // note: we can make this optional in the future
let miner_send = self.miner.as_ref().map(|x| x.send.clone());
let chunk_pool_send = require!("router", self, chunk_pool).send.clone();
let store = require!("router", self, store).clone();
let file_location_cache = require!("router", self, file_location_cache).clone();
@ -217,6 +223,7 @@ impl ClientBuilder {
network.send.clone(),
sync_send,
miner_send,
chunk_pool_send,
pruner_recv,
store,
file_location_cache,
@ -228,7 +235,7 @@ impl ClientBuilder {
}
pub async fn with_rpc(
self,
mut self,
rpc_config: RPCConfig,
chunk_pool_config: ChunkPoolConfig,
) -> Result<Self, String> {
@ -244,6 +251,9 @@ impl ClientBuilder {
let (chunk_pool, chunk_pool_handler) =
chunk_pool::unbounded(chunk_pool_config, async_store.clone(), network_send.clone());
let chunk_pool_components = ChunkPoolComponents {
send: chunk_pool.sender(),
};
let chunk_pool_clone = chunk_pool.clone();
let ctx = rpc::Context {
@ -271,6 +281,8 @@ impl ClientBuilder {
"chunk_pool_log_monitor",
);
self.chunk_pool = Some(chunk_pool_components);
Ok(self)
}

View File

@ -146,7 +146,7 @@ impl ZgsConfig {
let cpu_percentage = self.miner_cpu_percentage;
let iter_batch = self.mine_iter_batch_size;
let shard_config = ShardConfig::new(&self.shard_position)?;
let shard_config = self.shard_config()?;
Ok(MinerConfig::new(
miner_id,
@ -161,13 +161,14 @@ impl ZgsConfig {
))
}
pub fn chunk_pool_config(&self) -> chunk_pool::Config {
chunk_pool::Config {
pub fn chunk_pool_config(&self) -> Result<chunk_pool::Config, String> {
Ok(chunk_pool::Config {
write_window_size: self.chunk_pool_write_window_size,
max_cached_chunks_all: self.chunk_pool_max_cached_chunks_all,
max_writings: self.chunk_pool_max_writings,
expiration_time_secs: self.chunk_pool_expiration_time_secs,
}
shard_config: self.shard_config()?,
})
}
pub fn router_config(&self, network_config: &NetworkConfig) -> Result<router::Config, String> {
@ -178,7 +179,7 @@ impl ZgsConfig {
pub fn pruner_config(&self) -> Result<Option<PrunerConfig>, String> {
if let Some(max_num_chunks) = self.db_max_num_chunks {
let shard_config = ShardConfig::new(&self.shard_position)?;
let shard_config = self.shard_config()?;
Ok(Some(PrunerConfig {
shard_config,
db_path: self.db_dir.clone().into(),
@ -191,4 +192,8 @@ impl ZgsConfig {
Ok(None)
}
}
fn shard_config(&self) -> Result<ShardConfig, String> {
ShardConfig::new(&self.shard_position)
}
}

View File

@ -33,9 +33,9 @@ async fn start_node(context: RuntimeContext, config: ZgsConfig) -> Result<Client
.await?
.with_pruner(pruner_config)
.await?
.with_router(router_config)?
.with_rpc(rpc_config, config.chunk_pool_config())
.with_rpc(rpc_config, config.chunk_pool_config()?)
.await?
.with_router(router_config)?
.build()
}

View File

@ -8,6 +8,8 @@ use storage::{error, error::Result, log_store::Store as LogStore, H256};
use task_executor::TaskExecutor;
use tokio::sync::{oneshot, RwLock};
pub use storage::config::ShardConfig;
/// The name of the worker tokio tasks.
const WORKER_TASK_NAME: &str = "async_storage_worker";

View File

@ -80,4 +80,10 @@ impl ShardConfig {
Ok((numerator, denominator))
}
pub fn next_segment_index(&self, current: usize, start_index: usize) -> usize {
// `shift` should be 0 if `current` was returned by the same config.
let shift = (start_index + current + self.num_shard - self.shard_id) % self.num_shard;
current + self.num_shard - shift
}
}

View File

@ -245,16 +245,7 @@ impl SerialSyncController {
fn try_request_next(&mut self) {
// request next chunk array
let from_chunk = self.next_chunk;
// let to_chunk = std::cmp::min(from_chunk + MAX_CHUNKS_TO_REQUEST, self.goal.index_end);
let to_chunk =
if from_chunk == 0 && self.tx_start_chunk_in_flow % PORA_CHUNK_SIZE as u64 != 0 {
// Align the first request with segments.
PORA_CHUNK_SIZE as u64 - self.tx_start_chunk_in_flow % PORA_CHUNK_SIZE as u64
} else {
from_chunk + PORA_CHUNK_SIZE as u64
};
let to_chunk = std::cmp::min(to_chunk, self.goal.index_end);
let to_chunk = std::cmp::min(from_chunk + PORA_CHUNK_SIZE as u64, self.goal.index_end);
let request_id = network::RequestId::Sync(RequestId::SerialSync { tx_id: self.tx_id });
let request = GetChunksRequest {
tx_id: self.tx_id,
@ -456,13 +447,24 @@ impl SerialSyncController {
self.failures = 0;
let shard_config = self
.store
.get_store()
.read()
.await
.flow()
.get_shard_config();
let next_chunk = shard_config.next_segment_index(
(from_chunk / PORA_CHUNK_SIZE as u64) as usize,
(self.tx_start_chunk_in_flow / PORA_CHUNK_SIZE as u64) as usize,
) * PORA_CHUNK_SIZE;
// store in db
match self
.store
.put_chunks_with_tx_hash(self.tx_id.seq, self.tx_id.hash, response.chunks, None)
.await
{
Ok(true) => self.next_chunk = to_chunk,
Ok(true) => self.next_chunk = next_chunk as u64,
Ok(false) => {
warn!(?self.tx_id, "Transaction reverted while storing chunks");
self.state = SyncState::Failed {
@ -1379,7 +1381,7 @@ mod tests {
let peer_id = identity::Keypair::generate_ed25519().public().to_peer_id();
let tx_seq = 0;
let chunk_count = 2049;
let chunk_count = 1025;
let (store, peer_store, txs, _) = create_2_store(vec![chunk_count]);
let runtime = TestRuntime::default();
@ -1395,19 +1397,19 @@ mod tests {
let chunks = peer_store
.read()
.await
.get_chunks_with_proof_by_tx_and_index_range(tx_seq, 0, 2048)
.get_chunks_with_proof_by_tx_and_index_range(tx_seq, 0, 1024)
.unwrap()
.unwrap();
controller.state = SyncState::Downloading {
peer_id,
from_chunk: 0,
to_chunk: 2048,
to_chunk: 1024,
since: Instant::now(),
};
controller.goal.num_chunks = 2048;
controller.goal.index_end = 2048;
controller.goal.num_chunks = 1024;
controller.goal.index_end = 1024;
controller.on_response(peer_id, chunks).await;
match controller.get_status() {

View File

@ -2,7 +2,7 @@
import time
from test_framework.test_framework import TestFramework
from utility.submission import create_submission, submit_data
from utility.submission import create_submission, submit_data, data_to_segments
from utility.utils import wait_until, assert_equal
@ -10,7 +10,7 @@ class PrunerTest(TestFramework):
def setup_params(self):
self.num_blockchain_nodes = 1
self.num_nodes = 3
self.num_nodes = 4
self.zgs_node_configs[0] = {
"db_max_num_chunks": 2 ** 30,
"shard_position": "0/2"
@ -19,6 +19,10 @@ class PrunerTest(TestFramework):
"db_max_num_chunks": 2 ** 30,
"shard_position": "1/2"
}
self.zgs_node_configs[3] = {
"db_max_num_chunks": 2 ** 30,
"shard_position": "1/4"
}
def run_test(self):
client = self.nodes[0]
@ -30,24 +34,36 @@ class PrunerTest(TestFramework):
wait_until(lambda: client.zgs_get_file_info(data_root) is not None)
# Submit data to two nodes with different shards.
segment = submit_data(client, chunk_data)
submit_data(self.nodes[1], chunk_data)
segments = data_to_segments(chunk_data)
for i in range(len(segments)):
client_index = i % 2
self.nodes[client_index].zgs_upload_segment(segments[i])
wait_until(lambda: self.nodes[0].zgs_get_file_info(data_root)["finalized"])
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root)["finalized"])
self.nodes[2].admin_start_sync_file(0)
self.nodes[3].admin_start_sync_file(0)
wait_until(lambda: self.nodes[2].sync_status_is_completed_or_unknown(0))
wait_until(lambda: self.nodes[2].zgs_get_file_info(data_root)["finalized"])
wait_until(lambda: self.nodes[3].sync_status_is_completed_or_unknown(0))
wait_until(lambda: self.nodes[3].zgs_get_file_info(data_root)["finalized"])
for i in range(len(segment)):
for i in range(len(segments)):
index_store = i % 2
index_empty = 1 - i % 2
seg0 = self.nodes[index_store].zgs_download_segment(data_root, i * 1024, (i + 1) * 1024)
seg1 = self.nodes[index_empty].zgs_download_segment(data_root, i * 1024, (i + 1) * 1024)
seg2 = self.nodes[2].zgs_download_segment(data_root, i * 1024, (i + 1) * 1024)
seg3 = self.nodes[3].zgs_download_segment(data_root, i * 1024, (i + 1) * 1024)
# base64 encoding size
assert_equal(len(seg0), 349528)
assert_equal(seg1, None)
# node 2 should save all data
assert_equal(len(seg2), 349528)
if i % 4 == 1:
assert_equal(len(seg3), 349528)
else:
assert_equal(seg3, None)
if __name__ == "__main__":