Compare commits

..

4 Commits

Author SHA1 Message Date
0g-peterzhb
30f40d1bde
Merge f456773b72 into 77d1b84974 2024-08-06 18:47:17 +08:00
rickiey
77d1b84974
fix:log config : invalid filter directive (#146)
Some checks failed
abi-consistent-check / build-and-compare (push) Has been cancelled
code-coverage / unittest-cov (push) Has been cancelled
rust / check (push) Has been cancelled
rust / test (push) Has been cancelled
rust / lints (push) Has been cancelled
functional-test / test (push) Has been cancelled
2024-08-06 18:46:53 +08:00
peilun-conflux
d80e7e22ca
Prune no reward chunks. (#145)
* Prune no reward chunks.

* Add tests.

* Fix tests.

* Fix clippy.

* Revert test.

* Enable market in shard_sync_test.

* Add tx prune status.

* Fix tests.
2024-08-06 15:06:15 +08:00
Bo QIU
6ade66c086
Add admin rpc to return sync service state (#151) 2024-08-06 14:01:57 +08:00
24 changed files with 333 additions and 55 deletions

4
Cargo.lock generated
View File

@ -5824,6 +5824,9 @@ name = "pruner"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"contract-interface",
"ethereum-types 0.14.1",
"ethers",
"miner", "miner",
"rand 0.8.5", "rand 0.8.5",
"storage", "storage",
@ -5831,6 +5834,7 @@ dependencies = [
"task_executor", "task_executor",
"tokio", "tokio",
"tracing", "tracing",
"zgs_spec",
] ]
[[package]] [[package]]

View File

@ -8,6 +8,12 @@ abigen!(ZgsFlow, "../../storage-contracts-abis/Flow.json");
#[cfg(not(feature = "dev"))] #[cfg(not(feature = "dev"))]
abigen!(PoraMine, "../../storage-contracts-abis/PoraMine.json"); abigen!(PoraMine, "../../storage-contracts-abis/PoraMine.json");
#[cfg(not(feature = "dev"))]
abigen!(
ChunkLinearReward,
"../../storage-contracts-abis/ChunkLinearReward.json"
);
#[cfg(feature = "dev")] #[cfg(feature = "dev")]
abigen!( abigen!(
ZgsFlow, ZgsFlow,
@ -19,3 +25,9 @@ abigen!(
PoraMine, PoraMine,
"../../0g-storage-contracts-dev/artifacts/contracts/miner/Mine.sol/PoraMine.json" "../../0g-storage-contracts-dev/artifacts/contracts/miner/Mine.sol/PoraMine.json"
); );
#[cfg(feature = "dev")]
abigen!(
ChunkLinearReward,
"../../0g-storage-contracts-dev/artifacts/contracts/reward/ChunkLinearReward.sol/ChunkLinearReward.json"
);

View File

@ -11,4 +11,8 @@ anyhow = "1.0.86"
tokio = "1.37.0" tokio = "1.37.0"
rand = "0.8.5" rand = "0.8.5"
task_executor = { path = "../../common/task_executor" } task_executor = { path = "../../common/task_executor" }
tracing = "0.1.40" tracing = "0.1.40"
ethereum-types = "0.14.1"
contract-interface = { path = "../../common/contract-interface" }
ethers = "^2"
zgs_spec = { path = "../../common/spec" }

View File

@ -1,18 +1,28 @@
use anyhow::Result; use anyhow::{anyhow, bail, Result};
use contract_interface::ChunkLinearReward;
use ethereum_types::Address;
use ethers::prelude::{Http, Provider};
use miner::MinerMessage; use miner::MinerMessage;
use rand::Rng; use rand::Rng;
use std::cmp::Ordering;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use storage::config::{ShardConfig, SHARD_CONFIG_KEY}; use storage::config::{ShardConfig, SHARD_CONFIG_KEY};
use storage::log_store::log_manager::PORA_CHUNK_SIZE;
use storage_async::Store; use storage_async::Store;
use task_executor::TaskExecutor; use task_executor::TaskExecutor;
use tokio::sync::{broadcast, mpsc}; use tokio::sync::{broadcast, mpsc};
use tracing::{debug, info}; use tracing::{debug, info};
use zgs_spec::SECTORS_PER_PRICING;
// Start pruning when the db directory size exceeds 0.9 * limit. // Start pruning when the db directory size exceeds 0.9 * limit.
const PRUNE_THRESHOLD: f32 = 0.9; const PRUNE_THRESHOLD: f32 = 0.9;
const FIRST_REWARDABLE_CHUNK_KEY: &str = "first_rewardable_chunk";
const CHUNKS_PER_PRICING: u64 = (SECTORS_PER_PRICING / PORA_CHUNK_SIZE) as u64;
#[derive(Debug)] #[derive(Debug)]
pub struct PrunerConfig { pub struct PrunerConfig {
pub shard_config: ShardConfig, pub shard_config: ShardConfig,
@ -21,20 +31,33 @@ pub struct PrunerConfig {
pub check_time: Duration, pub check_time: Duration,
pub batch_size: usize, pub batch_size: usize,
pub batch_wait_time: Duration, pub batch_wait_time: Duration,
pub rpc_endpoint_url: String,
pub reward_address: Address,
} }
impl PrunerConfig { impl PrunerConfig {
fn start_prune_size(&self) -> u64 { fn start_prune_size(&self) -> u64 {
(self.max_num_sectors as f32 * PRUNE_THRESHOLD) as u64 (self.max_num_sectors as f32 * PRUNE_THRESHOLD) as u64
} }
fn make_provider(&self) -> Result<Provider<Http>> {
Provider::<Http>::try_from(&self.rpc_endpoint_url)
.map_err(|e| anyhow!("Can not parse blockchain endpoint: {:?}", e))
}
} }
pub struct Pruner { pub struct Pruner {
config: PrunerConfig, config: PrunerConfig,
first_rewardable_chunk: u64,
first_tx_seq: u64,
store: Arc<Store>, store: Arc<Store>,
sender: mpsc::UnboundedSender<PrunerMessage>, sender: mpsc::UnboundedSender<PrunerMessage>,
miner_sender: Option<broadcast::Sender<MinerMessage>>, miner_sender: Option<broadcast::Sender<MinerMessage>>,
reward_contract: ChunkLinearReward<Provider<Http>>,
} }
impl Pruner { impl Pruner {
@ -47,12 +70,20 @@ impl Pruner {
if let Some(shard_config) = get_shard_config(store.as_ref()).await? { if let Some(shard_config) = get_shard_config(store.as_ref()).await? {
config.shard_config = shard_config; config.shard_config = shard_config;
} }
let (first_rewardable_chunk, first_tx_seq) = get_first_rewardable_chunk(store.as_ref())
.await?
.unwrap_or((0, 0));
let reward_contract =
ChunkLinearReward::new(config.reward_address, Arc::new(config.make_provider()?));
let (tx, rx) = mpsc::unbounded_channel(); let (tx, rx) = mpsc::unbounded_channel();
let pruner = Pruner { let pruner = Pruner {
config, config,
first_rewardable_chunk,
first_tx_seq,
store, store,
sender: tx, sender: tx,
miner_sender, miner_sender,
reward_contract,
}; };
pruner.put_shard_config().await?; pruner.put_shard_config().await?;
executor.spawn( executor.spawn(
@ -66,20 +97,36 @@ impl Pruner {
pub async fn start(mut self) -> Result<()> { pub async fn start(mut self) -> Result<()> {
loop { loop {
// Check shard config update and prune unneeded data.
if let Some(delete_list) = self.maybe_update().await? { if let Some(delete_list) = self.maybe_update().await? {
info!(new_config = ?self.config.shard_config, "new shard config"); info!(new_config = ?self.config.shard_config, "new shard config");
self.put_shard_config().await?; self.put_shard_config().await?;
let mut batch = Vec::with_capacity(self.config.batch_size); self.prune_in_batch(delete_list).await?;
let mut iter = delete_list.peekable(); }
while let Some(index) = iter.next() {
batch.push(index); // Check no reward chunks and prune.
if batch.len() == self.config.batch_size || iter.peek().is_none() { let new_first_rewardable = self.reward_contract.first_rewardable_chunk().call().await?;
debug!(start = batch.first(), end = batch.last(), "prune batch"); if let Some(no_reward_list) = self
self.store.remove_chunks_batch(&batch).await?; .maybe_forward_first_rewardable(new_first_rewardable)
batch = Vec::with_capacity(self.config.batch_size); .await?
tokio::time::sleep(self.config.batch_wait_time).await; {
} info!(
} ?new_first_rewardable,
"first rewardable chunk moves forward, start pruning"
);
self.prune_tx(
self.first_rewardable_chunk * SECTORS_PER_PRICING as u64,
new_first_rewardable * SECTORS_PER_PRICING as u64,
)
.await?;
self.prune_in_batch(no_reward_list).await?;
self.first_rewardable_chunk = new_first_rewardable;
self.put_first_rewardable_chunk_index(
self.first_rewardable_chunk,
self.first_tx_seq,
)
.await?;
} }
tokio::time::sleep(self.config.check_time).await; tokio::time::sleep(self.config.check_time).await;
} }
@ -92,7 +139,9 @@ impl Pruner {
config = ?self.config.shard_config, config = ?self.config.shard_config,
"maybe_update" "maybe_update"
); );
if current_size >= self.config.start_prune_size() { if current_size < self.config.start_prune_size() {
Ok(None)
} else {
// Update config and generate delete list should be done in a single lock to ensure // Update config and generate delete list should be done in a single lock to ensure
// the list is complete. // the list is complete.
let config = &mut self.config.shard_config; let config = &mut self.config.shard_config;
@ -108,13 +157,72 @@ impl Pruner {
config.num_shard *= 2; config.num_shard *= 2;
// Generate delete list // Generate delete list
let flow_len = self.store.get_context().await?.1; let flow_len = self
.store
.get_context()
.await?
.1
.div_ceil(PORA_CHUNK_SIZE as u64);
let start_index = old_shard_id + (!rand_bit) as usize * old_num_shard; let start_index = old_shard_id + (!rand_bit) as usize * old_num_shard;
return Ok(Some(Box::new( Ok(Some(Box::new(
(start_index as u64..flow_len).step_by(config.num_shard), (start_index as u64..flow_len).step_by(config.num_shard),
))); )))
} }
Ok(None) }
async fn maybe_forward_first_rewardable(
&mut self,
new_first_rewardable: u64,
) -> Result<Option<Box<dyn Send + Iterator<Item = u64>>>> {
match self.first_rewardable_chunk.cmp(&new_first_rewardable) {
Ordering::Less => Ok(Some(Box::new(
self.first_rewardable_chunk * CHUNKS_PER_PRICING
..new_first_rewardable * CHUNKS_PER_PRICING,
))),
Ordering::Equal => Ok(None),
Ordering::Greater => {
bail!(
"Unexpected first_rewardable_chunk revert: old={} new={}",
self.first_rewardable_chunk,
new_first_rewardable
);
}
}
}
async fn prune_in_batch(&self, to_prune: Box<dyn Send + Iterator<Item = u64>>) -> Result<()> {
let mut batch = Vec::with_capacity(self.config.batch_size);
let mut iter = to_prune.peekable();
while let Some(index) = iter.next() {
batch.push(index);
if batch.len() == self.config.batch_size || iter.peek().is_none() {
debug!(start = batch.first(), end = batch.last(), "prune batch");
self.store.remove_chunks_batch(&batch).await?;
batch = Vec::with_capacity(self.config.batch_size);
tokio::time::sleep(self.config.batch_wait_time).await;
}
}
Ok(())
}
async fn prune_tx(&mut self, start_sector: u64, end_sector: u64) -> Result<()> {
while let Some(tx) = self.store.get_tx_by_seq_number(self.first_tx_seq).await? {
// If a part of the tx data is pruned, we mark the tx as pruned.
if tx.start_entry_index() >= start_sector && tx.start_entry_index() < end_sector {
self.store.prune_tx(tx.seq).await?;
} else if tx.start_entry_index() >= end_sector {
break;
} else {
bail!(
"prune tx out of range: tx={:?}, start={} end={}",
tx,
start_sector,
end_sector
);
}
self.first_tx_seq += 1;
}
Ok(())
} }
async fn put_shard_config(&self) -> Result<()> { async fn put_shard_config(&self) -> Result<()> {
@ -130,12 +238,29 @@ impl Pruner {
.set_config_encoded(&SHARD_CONFIG_KEY, &self.config.shard_config) .set_config_encoded(&SHARD_CONFIG_KEY, &self.config.shard_config)
.await .await
} }
async fn put_first_rewardable_chunk_index(
&self,
new_first_rewardable_chunk: u64,
new_first_tx_seq: u64,
) -> Result<()> {
self.store
.set_config_encoded(
&FIRST_REWARDABLE_CHUNK_KEY,
&(new_first_rewardable_chunk, new_first_tx_seq),
)
.await
}
} }
async fn get_shard_config(store: &Store) -> Result<Option<ShardConfig>> { async fn get_shard_config(store: &Store) -> Result<Option<ShardConfig>> {
store.get_config_decoded(&SHARD_CONFIG_KEY).await store.get_config_decoded(&SHARD_CONFIG_KEY).await
} }
async fn get_first_rewardable_chunk(store: &Store) -> Result<Option<(u64, u64)>> {
store.get_config_decoded(&FIRST_REWARDABLE_CHUNK_KEY).await
}
#[derive(Debug)] #[derive(Debug)]
pub enum PrunerMessage { pub enum PrunerMessage {
ChangeShardConfig(ShardConfig), ChangeShardConfig(ShardConfig),

View File

@ -2,7 +2,7 @@ use crate::types::{LocationInfo, NetworkInfo, PeerInfo};
use jsonrpsee::core::RpcResult; use jsonrpsee::core::RpcResult;
use jsonrpsee::proc_macros::rpc; use jsonrpsee::proc_macros::rpc;
use std::collections::HashMap; use std::collections::HashMap;
use sync::FileSyncInfo; use sync::{FileSyncInfo, SyncServiceState};
#[rpc(server, client, namespace = "admin")] #[rpc(server, client, namespace = "admin")]
pub trait Rpc { pub trait Rpc {
@ -27,6 +27,9 @@ pub trait Rpc {
#[method(name = "terminateSync")] #[method(name = "terminateSync")]
async fn terminate_sync(&self, tx_seq: u64) -> RpcResult<bool>; async fn terminate_sync(&self, tx_seq: u64) -> RpcResult<bool>;
#[method(name = "getSyncServiceState")]
async fn get_sync_service_state(&self) -> RpcResult<SyncServiceState>;
#[method(name = "getSyncStatus")] #[method(name = "getSyncStatus")]
async fn get_sync_status(&self, tx_seq: u64) -> RpcResult<String>; async fn get_sync_status(&self, tx_seq: u64) -> RpcResult<String>;

View File

@ -8,7 +8,7 @@ use network::{multiaddr::Protocol, Multiaddr};
use std::collections::HashMap; use std::collections::HashMap;
use std::net::IpAddr; use std::net::IpAddr;
use storage::config::all_shards_available; use storage::config::all_shards_available;
use sync::{FileSyncInfo, SyncRequest, SyncResponse}; use sync::{FileSyncInfo, SyncRequest, SyncResponse, SyncServiceState};
use task_executor::ShutdownReason; use task_executor::ShutdownReason;
pub struct RpcServerImpl { pub struct RpcServerImpl {
@ -119,6 +119,17 @@ impl RpcServer for RpcServerImpl {
} }
} }
async fn get_sync_service_state(&self) -> RpcResult<SyncServiceState> {
info!("admin_getSyncServiceState()");
let response = self.ctx.request_sync(SyncRequest::SyncState).await?;
match response {
SyncResponse::SyncState { state } => Ok(state),
_ => Err(error::internal_error("unexpected response type")),
}
}
#[tracing::instrument(skip(self), err)] #[tracing::instrument(skip(self), err)]
async fn get_sync_status(&self, tx_seq: u64) -> RpcResult<String> { async fn get_sync_status(&self, tx_seq: u64) -> RpcResult<String> {
info!("admin_getSyncStatus({tx_seq})"); info!("admin_getSyncStatus({tx_seq})");

View File

@ -195,6 +195,10 @@ impl RpcServerImpl {
)); ));
} }
if self.ctx.log_store.check_tx_pruned(tx.seq).await? {
return Err(error::invalid_params("root", "already pruned"));
}
Ok(false) Ok(false)
} else { } else {
//Check whether file is small enough to cache in the system //Check whether file is small enough to cache in the system

View File

@ -203,6 +203,10 @@ impl ZgsConfig {
pub fn pruner_config(&self) -> Result<Option<PrunerConfig>, String> { pub fn pruner_config(&self) -> Result<Option<PrunerConfig>, String> {
if let Some(max_num_sectors) = self.db_max_num_sectors { if let Some(max_num_sectors) = self.db_max_num_sectors {
let shard_config = self.shard_config()?; let shard_config = self.shard_config()?;
let reward_address = self
.reward_contract_address
.parse::<ContractAddress>()
.map_err(|e| format!("Unable to parse reward_contract_address: {:?}", e))?;
Ok(Some(PrunerConfig { Ok(Some(PrunerConfig {
shard_config, shard_config,
db_path: self.db_dir.clone().into(), db_path: self.db_dir.clone().into(),
@ -210,6 +214,8 @@ impl ZgsConfig {
check_time: Duration::from_secs(self.prune_check_time_s), check_time: Duration::from_secs(self.prune_check_time_s),
batch_size: self.prune_batch_size, batch_size: self.prune_batch_size,
batch_wait_time: Duration::from_millis(self.prune_batch_wait_time_ms), batch_wait_time: Duration::from_millis(self.prune_batch_wait_time_ms),
rpc_endpoint_url: self.blockchain_rpc_endpoint.clone(),
reward_address,
})) }))
} else { } else {
Ok(None) Ok(None)

View File

@ -65,7 +65,7 @@ build_config! {
(db_dir, (String), "db".to_string()) (db_dir, (String), "db".to_string())
(db_max_num_sectors, (Option<usize>), None) (db_max_num_sectors, (Option<usize>), None)
(prune_check_time_s, (u64), 60) (prune_check_time_s, (u64), 60)
(prune_batch_size, (usize), 1024) (prune_batch_size, (usize), 16 * 1024)
(prune_batch_wait_time_ms, (u64), 1000) (prune_batch_wait_time_ms, (u64), 1000)
// misc // misc
@ -79,6 +79,7 @@ build_config! {
(miner_submission_gas, (Option<u64>), None) (miner_submission_gas, (Option<u64>), None)
(miner_cpu_percentage, (u64), 100) (miner_cpu_percentage, (u64), 100)
(mine_iter_batch_size, (usize), 100) (mine_iter_batch_size, (usize), 100)
(reward_contract_address, (String), "".to_string())
(shard_position, (Option<String>), None) (shard_position, (Option<String>), None)
} }

View File

@ -20,10 +20,13 @@ pub fn configure(log_level_file: &str, log_directory: &str, executor: TaskExecut
let handle = builder.reload_handle(); let handle = builder.reload_handle();
builder.init(); builder.init();
let level_file = log_level_file.to_string(); let level_file = log_level_file.trim_end().to_string();
// load config synchronously // load config synchronously
let mut config = std::fs::read_to_string(&level_file).unwrap_or_default(); let mut config = std::fs::read_to_string(&level_file)
.unwrap_or_default()
.trim_end()
.to_string();
let _ = handle.reload(&config); let _ = handle.reload(&config);
// periodically check for config changes // periodically check for config changes
@ -38,8 +41,14 @@ pub fn configure(log_level_file: &str, log_directory: &str, executor: TaskExecut
interval.tick().await; interval.tick().await;
let new_config = match tokio::fs::read_to_string(&level_file).await { let new_config = match tokio::fs::read_to_string(&level_file).await {
Ok(c) if c == config => continue, Ok(c) => {
Ok(c) => c, let nc = c.trim_end().to_string();
if nc == config {
continue;
} else {
nc
}
}
Err(e) => { Err(e) => {
println!("Unable to read log file {}: {:?}", level_file, e); println!("Unable to read log file {}: {:?}", level_file, e);
continue; continue;

View File

@ -45,6 +45,7 @@ impl Store {
} }
delegate!(fn check_tx_completed(tx_seq: u64) -> Result<bool>); delegate!(fn check_tx_completed(tx_seq: u64) -> Result<bool>);
delegate!(fn check_tx_pruned(tx_seq: u64) -> Result<bool>);
delegate!(fn get_chunk_by_tx_and_index(tx_seq: u64, index: usize) -> Result<Option<Chunk>>); delegate!(fn get_chunk_by_tx_and_index(tx_seq: u64, index: usize) -> Result<Option<Chunk>>);
delegate!(fn get_chunks_by_tx_and_index_range(tx_seq: u64, index_start: usize, index_end: usize) -> Result<Option<ChunkArray>>); delegate!(fn get_chunks_by_tx_and_index_range(tx_seq: u64, index_start: usize, index_end: usize) -> Result<Option<ChunkArray>>);
delegate!(fn get_chunks_with_proof_by_tx_and_index_range(tx_seq: u64, index_start: usize, index_end: usize, merkle_tx_seq: Option<u64>) -> Result<Option<ChunkArrayWithProof>>); delegate!(fn get_chunks_with_proof_by_tx_and_index_range(tx_seq: u64, index_start: usize, index_end: usize, merkle_tx_seq: Option<u64>) -> Result<Option<ChunkArrayWithProof>>);
@ -53,6 +54,7 @@ impl Store {
delegate!(fn put_chunks_with_tx_hash(tx_seq: u64, tx_hash: H256, chunks: ChunkArray, maybe_file_proof: Option<FlowProof>) -> Result<bool>); delegate!(fn put_chunks_with_tx_hash(tx_seq: u64, tx_hash: H256, chunks: ChunkArray, maybe_file_proof: Option<FlowProof>) -> Result<bool>);
delegate!(fn get_chunk_by_flow_index(index: u64, length: u64) -> Result<Option<ChunkArray>>); delegate!(fn get_chunk_by_flow_index(index: u64, length: u64) -> Result<Option<ChunkArray>>);
delegate!(fn finalize_tx(tx_seq: u64) -> Result<()>); delegate!(fn finalize_tx(tx_seq: u64) -> Result<()>);
delegate!(fn prune_tx(tx_seq: u64) -> Result<()>);
delegate!(fn finalize_tx_with_hash(tx_seq: u64, tx_hash: H256) -> Result<bool>); delegate!(fn finalize_tx_with_hash(tx_seq: u64, tx_hash: H256) -> Result<bool>);
delegate!(fn get_proof_at_root(root: Option<DataRoot>, index: u64, length: u64) -> Result<FlowRangeProof>); delegate!(fn get_proof_at_root(root: Option<DataRoot>, index: u64, length: u64) -> Result<FlowRangeProof>);
delegate!(fn get_context() -> Result<(DataRoot, u64)>); delegate!(fn get_context() -> Result<(DataRoot, u64)>);

View File

@ -337,6 +337,10 @@ impl LogStoreWrite for LogManager {
} }
} }
fn prune_tx(&self, tx_seq: u64) -> crate::error::Result<()> {
self.tx_store.prune_tx(tx_seq)
}
fn put_sync_progress(&self, progress: (u64, H256, Option<Option<u64>>)) -> Result<()> { fn put_sync_progress(&self, progress: (u64, H256, Option<Option<u64>>)) -> Result<()> {
self.tx_store.put_progress(progress) self.tx_store.put_progress(progress)
} }
@ -563,6 +567,10 @@ impl LogStoreRead for LogManager {
merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64, merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64,
)) ))
} }
fn check_tx_pruned(&self, tx_seq: u64) -> crate::error::Result<bool> {
self.tx_store.check_tx_pruned(tx_seq)
}
} }
impl LogManager { impl LogManager {

View File

@ -53,6 +53,8 @@ pub trait LogStoreRead: LogStoreChunkRead {
fn check_tx_completed(&self, tx_seq: u64) -> Result<bool>; fn check_tx_completed(&self, tx_seq: u64) -> Result<bool>;
fn check_tx_pruned(&self, tx_seq: u64) -> Result<bool>;
fn next_tx_seq(&self) -> u64; fn next_tx_seq(&self) -> u64;
fn get_sync_progress(&self) -> Result<Option<(u64, H256)>>; fn get_sync_progress(&self) -> Result<Option<(u64, H256)>>;
@ -118,6 +120,8 @@ pub trait LogStoreWrite: LogStoreChunkWrite {
/// the caller is supposed to track chunk statuses and call this after storing all the chunks. /// the caller is supposed to track chunk statuses and call this after storing all the chunks.
fn finalize_tx(&self, tx_seq: u64) -> Result<()>; fn finalize_tx(&self, tx_seq: u64) -> Result<()>;
fn finalize_tx_with_hash(&self, tx_seq: u64, tx_hash: H256) -> Result<bool>; fn finalize_tx_with_hash(&self, tx_seq: u64, tx_hash: H256) -> Result<bool>;
/// Mark the tx as pruned, meaning the data will not be stored.
fn prune_tx(&self, tx_seq: u64) -> Result<()>;
/// Store the progress of synced block number and its hash. /// Store the progress of synced block number and its hash.
fn put_sync_progress(&self, progress: (u64, H256, Option<Option<u64>>)) -> Result<()>; fn put_sync_progress(&self, progress: (u64, H256, Option<Option<u64>>)) -> Result<()>;

View File

@ -20,6 +20,9 @@ use tracing::{error, instrument};
const LOG_SYNC_PROGRESS_KEY: &str = "log_sync_progress"; const LOG_SYNC_PROGRESS_KEY: &str = "log_sync_progress";
const NEXT_TX_KEY: &str = "next_tx_seq"; const NEXT_TX_KEY: &str = "next_tx_seq";
const TX_STATUS_FINALIZED: u8 = 0;
const TX_STATUS_PRUNED: u8 = 1;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct BlockHashAndSubmissionIndex { pub struct BlockHashAndSubmissionIndex {
pub block_hash: H256, pub block_hash: H256,
@ -156,13 +159,27 @@ impl TransactionStore {
#[instrument(skip(self))] #[instrument(skip(self))]
pub fn finalize_tx(&self, tx_seq: u64) -> Result<()> { pub fn finalize_tx(&self, tx_seq: u64) -> Result<()> {
Ok(self.kvdb.put(
COL_TX_COMPLETED,
&tx_seq.to_be_bytes(),
&[TX_STATUS_FINALIZED],
)?)
}
#[instrument(skip(self))]
pub fn prune_tx(&self, tx_seq: u64) -> Result<()> {
Ok(self Ok(self
.kvdb .kvdb
.put(COL_TX_COMPLETED, &tx_seq.to_be_bytes(), &[0])?) .put(COL_TX_COMPLETED, &tx_seq.to_be_bytes(), &[TX_STATUS_PRUNED])?)
} }
pub fn check_tx_completed(&self, tx_seq: u64) -> Result<bool> { pub fn check_tx_completed(&self, tx_seq: u64) -> Result<bool> {
Ok(self.kvdb.has_key(COL_TX_COMPLETED, &tx_seq.to_be_bytes())?) Ok(self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())?
== Some(vec![TX_STATUS_FINALIZED]))
}
pub fn check_tx_pruned(&self, tx_seq: u64) -> Result<bool> {
Ok(self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())? == Some(vec![TX_STATUS_PRUNED]))
} }
pub fn next_tx_seq(&self) -> u64 { pub fn next_tx_seq(&self) -> u64 {

View File

@ -87,7 +87,9 @@ impl Batcher {
async fn poll_tx(&self, tx_seq: u64) -> Result<Option<SyncResult>> { async fn poll_tx(&self, tx_seq: u64) -> Result<Option<SyncResult>> {
// file already exists // file already exists
if self.store.check_tx_completed(tx_seq).await? { if self.store.check_tx_completed(tx_seq).await?
|| self.store.check_tx_pruned(tx_seq).await?
{
// File may be finalized during file sync, e.g. user uploaded file via RPC. // File may be finalized during file sync, e.g. user uploaded file via RPC.
// In this case, just terminate the file sync. // In this case, just terminate the file sync.
let num_terminated = self.terminate_file_sync(tx_seq, false).await; let num_terminated = self.terminate_file_sync(tx_seq, false).await;

View File

@ -7,9 +7,10 @@ mod controllers;
mod service; mod service;
pub mod test_util; pub mod test_util;
use auto_sync::{batcher_random::RandomBatcherState, batcher_serial::SerialBatcherState};
pub use controllers::FileSyncInfo; pub use controllers::FileSyncInfo;
use duration_str::deserialize_duration; use duration_str::deserialize_duration;
use serde::Deserialize; use serde::{Deserialize, Serialize};
pub use service::{SyncMessage, SyncReceiver, SyncRequest, SyncResponse, SyncSender, SyncService}; pub use service::{SyncMessage, SyncReceiver, SyncRequest, SyncResponse, SyncSender, SyncService};
use std::{ use std::{
fmt::Debug, fmt::Debug,
@ -64,3 +65,11 @@ impl InstantWrapper {
self.0.elapsed() self.0.elapsed()
} }
} }
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SyncServiceState {
pub num_syncing: usize,
pub auto_sync_serial: Option<SerialBatcherState>,
pub auto_sync_random: Option<RandomBatcherState>,
}

View File

@ -4,7 +4,7 @@ use crate::controllers::{
FailureReason, FileSyncGoal, FileSyncInfo, SerialSyncController, SyncState, FailureReason, FileSyncGoal, FileSyncInfo, SerialSyncController, SyncState,
MAX_CHUNKS_TO_REQUEST, MAX_CHUNKS_TO_REQUEST,
}; };
use crate::Config; use crate::{Config, SyncServiceState};
use anyhow::{bail, Result}; use anyhow::{bail, Result};
use file_location_cache::FileLocationCache; use file_location_cache::FileLocationCache;
use libp2p::swarm::DialError; use libp2p::swarm::DialError;
@ -74,6 +74,7 @@ pub enum SyncMessage {
#[derive(Debug)] #[derive(Debug)]
pub enum SyncRequest { pub enum SyncRequest {
SyncState,
SyncStatus { SyncStatus {
tx_seq: u64, tx_seq: u64,
}, },
@ -99,6 +100,7 @@ pub enum SyncRequest {
#[derive(Debug)] #[derive(Debug)]
pub enum SyncResponse { pub enum SyncResponse {
SyncState { state: SyncServiceState },
SyncStatus { status: Option<SyncState> }, SyncStatus { status: Option<SyncState> },
SyncFile { err: String }, SyncFile { err: String },
FileSyncInfo { result: HashMap<u64, FileSyncInfo> }, FileSyncInfo { result: HashMap<u64, FileSyncInfo> },
@ -278,6 +280,23 @@ impl SyncService {
sender: channel::ResponseSender<SyncResponse>, sender: channel::ResponseSender<SyncResponse>,
) { ) {
match req { match req {
SyncRequest::SyncState => {
let state = match &self.auto_sync_manager {
Some(manager) => SyncServiceState {
num_syncing: self.controllers.len(),
auto_sync_serial: Some(manager.serial.get_state().await),
auto_sync_random: manager.random.get_state().await.ok(),
},
None => SyncServiceState {
num_syncing: self.controllers.len(),
auto_sync_serial: None,
auto_sync_random: None,
},
};
let _ = sender.send(SyncResponse::SyncState { state });
}
SyncRequest::SyncStatus { tx_seq } => { SyncRequest::SyncStatus { tx_seq } => {
let status = self let status = self
.controllers .controllers
@ -552,7 +571,9 @@ impl SyncService {
async fn on_find_file(&mut self, tx_seq: u64) -> Result<()> { async fn on_find_file(&mut self, tx_seq: u64) -> Result<()> {
// file already exists // file already exists
if self.store.check_tx_completed(tx_seq).await? { if self.store.check_tx_completed(tx_seq).await?
|| self.store.check_tx_pruned(tx_seq).await?
{
return Ok(()); return Ok(());
} }
// broadcast find file // broadcast find file
@ -612,7 +633,9 @@ impl SyncService {
}; };
// file already exists // file already exists
if self.store.check_tx_completed(tx_seq).await? { if self.store.check_tx_completed(tx_seq).await?
|| self.store.check_tx_pruned(tx_seq).await?
{
bail!("File already exists"); bail!("File already exists");
} }
@ -684,6 +707,15 @@ impl SyncService {
} }
} }
match self.store.check_tx_pruned(tx_seq).await {
Ok(true) => return,
Ok(false) => {}
Err(err) => {
error!(%tx_seq, %err, "Failed to check if file pruned");
return;
}
}
// Now, always sync files among all nodes // Now, always sync files among all nodes
if let Err(err) = self if let Err(err) = self
.on_start_sync_file(tx_seq, None, Some((peer_id, addr))) .on_start_sync_file(tx_seq, None, Some((peer_id, addr)))

View File

@ -2,8 +2,10 @@
import time import time
from test_framework.test_framework import TestFramework from test_framework.test_framework import TestFramework
from config.node_config import GENESIS_PRIV_KEY
from mine_with_market_test import PRICE_PER_SECTOR
from utility.submission import create_submission, submit_data from utility.submission import create_submission, submit_data
from utility.utils import wait_until, assert_equal from utility.utils import wait_until, assert_equal, estimate_st_performance
class PrunerTest(TestFramework): class PrunerTest(TestFramework):
@ -13,34 +15,46 @@ class PrunerTest(TestFramework):
self.num_nodes = 1 self.num_nodes = 1
self.zgs_node_configs[0] = { self.zgs_node_configs[0] = {
"db_max_num_sectors": 16 * 1024, "db_max_num_sectors": 16 * 1024,
# "db_max_num_sectors": 32 * 1024 * 1024,
"prune_check_time_s": 1, "prune_check_time_s": 1,
"prune_batch_wait_time_ms": 10, "prune_batch_wait_time_ms": 1,
} }
self.enable_market = True
self.mine_period = int(45 / self.block_time)
self.lifetime_seconds = 240
self.launch_wait_seconds = 15
self.log.info("Contract Info: Est. block time %.2f, Mine period %d", self.block_time, self.mine_period)
def run_test(self): def run_test(self):
client = self.nodes[0] client = self.nodes[0]
chunk_data = b"\x02" * 16 * 256 * 1024 chunk_data = b"\x02" * 16 * 256 * 1024
# chunk_data = b"\x02" * 5 * 1024 * 1024 * 1024
submissions, data_root = create_submission(chunk_data) submissions, data_root = create_submission(chunk_data)
self.contract.submit(submissions) self.contract.submit(submissions, tx_prarams = {"value": int(len(chunk_data) / 256 * PRICE_PER_SECTOR * 1.1)})
wait_until(lambda: self.contract.num_submissions() == 1) wait_until(lambda: self.contract.num_submissions() == 1)
wait_until(lambda: client.zgs_get_file_info(data_root) is not None) wait_until(lambda: client.zgs_get_file_info(data_root) is not None)
segment = submit_data(client, chunk_data) segment = submit_data(client, chunk_data)
self.log.info("segment: %s", len(segment)) self.log.info("segment: %s", len(segment))
# Wait for 1 sec for the shard config to be updated # Wait for 1 sec for the shard config to be updated
time.sleep(1) time.sleep(2)
shard_config = client.rpc.zgs_getShardConfig() shard_config = client.rpc.zgs_getShardConfig()
shard_id = int(shard_config["shardId"]) shard_id = int(shard_config["shardId"])
num_shard = int(shard_config["numShard"]) num_shard = int(shard_config["numShard"])
# wait_until(lambda: self.reward_contract.first_rewardable_chunk() != 0, timeout=180)
# first_rewardable = self.reward_contract.first_rewardable_chunk() * 32 * 1024
# Wait for 1 sec for the no reward segments to be pruned.
time.sleep(1)
# Wait for chunks to be removed.
for i in range(len(segment)): for i in range(len(segment)):
seg = client.zgs_download_segment(data_root, i * 1024, (i + 1) * 1024) seg = client.zgs_download_segment(data_root, i * 1024, (i + 1) * 1024)
if i % num_shard == shard_id: # if i < first_rewardable or i % num_shard != shard_id:
# base64 encoding size if i % num_shard != shard_id:
assert_equal(len(seg), 349528)
else:
assert_equal(seg, None) assert_equal(seg, None)
else:
assert_equal(len(seg), 349528)
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -2,6 +2,7 @@
import time import time
from test_framework.test_framework import TestFramework from test_framework.test_framework import TestFramework
from mine_with_market_test import PRICE_PER_SECTOR
from utility.submission import create_submission, submit_data, data_to_segments from utility.submission import create_submission, submit_data, data_to_segments
from utility.utils import wait_until, assert_equal from utility.utils import wait_until, assert_equal
@ -23,13 +24,14 @@ class PrunerTest(TestFramework):
"db_max_num_sectors": 2 ** 30, "db_max_num_sectors": 2 ** 30,
"shard_position": "1/4" "shard_position": "1/4"
} }
self.enable_market = True
def run_test(self): def run_test(self):
client = self.nodes[0] client = self.nodes[0]
chunk_data = b"\x02" * 8 * 256 * 1024 chunk_data = b"\x02" * 8 * 256 * 1024
submissions, data_root = create_submission(chunk_data) submissions, data_root = create_submission(chunk_data)
self.contract.submit(submissions) self.contract.submit(submissions, tx_prarams = {"value": int(len(chunk_data) / 256 * PRICE_PER_SECTOR * 1.1)})
wait_until(lambda: self.contract.num_submissions() == 1) wait_until(lambda: self.contract.num_submissions() == 1)
wait_until(lambda: client.zgs_get_file_info(data_root) is not None) wait_until(lambda: client.zgs_get_file_info(data_root) is not None)

View File

@ -253,7 +253,7 @@ class BlockchainNode(TestNode):
def wait_for_transaction_receipt(self, w3, tx_hash, timeout=120, parent_hash=None): def wait_for_transaction_receipt(self, w3, tx_hash, timeout=120, parent_hash=None):
return w3.eth.wait_for_transaction_receipt(tx_hash, timeout) return w3.eth.wait_for_transaction_receipt(tx_hash, timeout)
def setup_contract(self, enable_market, mine_period): def setup_contract(self, enable_market, mine_period, lifetime_seconds):
w3 = Web3(HTTPProvider(self.rpc_url)) w3 = Web3(HTTPProvider(self.rpc_url))
account1 = w3.eth.account.from_key(GENESIS_PRIV_KEY) account1 = w3.eth.account.from_key(GENESIS_PRIV_KEY)
@ -314,9 +314,7 @@ class BlockchainNode(TestNode):
return flow_contract, flow_initialize_hash, mine_contract, dummy_reward_contract return flow_contract, flow_initialize_hash, mine_contract, dummy_reward_contract
def deploy_with_market(): def deploy_with_market(lifetime_seconds):
LIFETIME_MONTH = 1
self.log.debug("Start deploy contracts") self.log.debug("Start deploy contracts")
mine_contract, _ = deploy_contract("PoraMineTest", [0]) mine_contract, _ = deploy_contract("PoraMineTest", [0])
@ -325,7 +323,7 @@ class BlockchainNode(TestNode):
market_contract, _ = deploy_contract("FixedPrice", []) market_contract, _ = deploy_contract("FixedPrice", [])
self.log.debug("Market deployed") self.log.debug("Market deployed")
reward_contract, _ =deploy_contract("ChunkLinearReward", [LIFETIME_MONTH * 31 * 86400]) reward_contract, _ = deploy_contract("ChunkLinearReward", [lifetime_seconds])
self.log.debug("Reward deployed") self.log.debug("Reward deployed")
flow_contract, _ = deploy_contract("FixedPriceFlow", [mine_period, 0]) flow_contract, _ = deploy_contract("FixedPriceFlow", [mine_period, 0])
@ -336,8 +334,9 @@ class BlockchainNode(TestNode):
mine_contract.functions.setTargetSubmissions(2).transact(TX_PARAMS) mine_contract.functions.setTargetSubmissions(2).transact(TX_PARAMS)
self.log.debug("Mine Initialized") self.log.debug("Mine Initialized")
price_per_sector = int(LIFETIME_MONTH * 256 * 10 * 1_000_000_000_000_000_000 / 1024 / 1024 / 1024 / 12) market_contract.functions.initialize(int(lifetime_seconds * 256 * 10 * 10 ** 18 /
market_contract.functions.initialize(price_per_sector, flow_contract.address, reward_contract.address).transact(TX_PARAMS) 2 ** 30 / 12 / 31 / 86400),
flow_contract.address, reward_contract.address).transact(TX_PARAMS)
self.log.debug("Market Initialized") self.log.debug("Market Initialized")
reward_contract.functions.initialize(market_contract.address, mine_contract.address).transact(TX_PARAMS) reward_contract.functions.initialize(market_contract.address, mine_contract.address).transact(TX_PARAMS)
@ -353,7 +352,7 @@ class BlockchainNode(TestNode):
return flow_contract, flow_initialize_hash, mine_contract, reward_contract return flow_contract, flow_initialize_hash, mine_contract, reward_contract
if enable_market: if enable_market:
return deploy_with_market() return deploy_with_market(lifetime_seconds)
else: else:
return deploy_no_market() return deploy_no_market()

View File

@ -17,11 +17,11 @@ class ContractProxy:
else self.blockchain_nodes[node_idx].get_contract(self.contract_address) else self.blockchain_nodes[node_idx].get_contract(self.contract_address)
) )
def _call(self, fn_name, node_idx, **args): def _call(self, fn_name, node_idx, *args):
assert node_idx < len(self.blockchain_nodes) assert node_idx < len(self.blockchain_nodes)
contract = self._get_contract(node_idx) contract = self._get_contract(node_idx)
return getattr(contract.functions, fn_name)(**args).call() return getattr(contract.functions, fn_name)(*args).call()
def _send(self, fn_name, node_idx, **args): def _send(self, fn_name, node_idx, **args):
assert node_idx < len(self.blockchain_nodes) assert node_idx < len(self.blockchain_nodes)
@ -113,4 +113,10 @@ class RewardContractProxy(ContractProxy):
return self._send_payable("donate", node_idx, value) return self._send_payable("donate", node_idx, value)
def base_reward(self, node_idx = 0): def base_reward(self, node_idx = 0):
return self._call("baseReward", node_idx) return self._call("baseReward", node_idx)
def first_rewardable_chunk(self, node_idx = 0):
return self._call("firstRewardableChunk", node_idx)
def reward_deadline(self, node_idx = 0):
return self._call("rewardDeadline", node_idx, 0)

View File

@ -51,6 +51,7 @@ class TestFramework:
self.block_time = blockchain_node_type.block_time() self.block_time = blockchain_node_type.block_time()
self.enable_market = False self.enable_market = False
self.mine_period = 100 self.mine_period = 100
self.lifetime_seconds = 3600
self.launch_wait_seconds = 1 self.launch_wait_seconds = 1
# Set default binary path # Set default binary path
@ -171,7 +172,7 @@ class TestFramework:
wait_until(lambda: node.eth_blockNumber() is not None) wait_until(lambda: node.eth_blockNumber() is not None)
wait_until(lambda: int(node.eth_blockNumber(), 16) > 0) wait_until(lambda: int(node.eth_blockNumber(), 16) > 0)
contract, tx_hash, mine_contract, reward_contract = self.blockchain_nodes[0].setup_contract(self.enable_market, self.mine_period) contract, tx_hash, mine_contract, reward_contract = self.blockchain_nodes[0].setup_contract(self.enable_market, self.mine_period, self.lifetime_seconds)
self.contract = FlowContractProxy(contract, self.blockchain_nodes) self.contract = FlowContractProxy(contract, self.blockchain_nodes)
self.mine_contract = MineContractProxy(mine_contract, self.blockchain_nodes) self.mine_contract = MineContractProxy(mine_contract, self.blockchain_nodes)
self.reward_contract = RewardContractProxy(reward_contract, self.blockchain_nodes) self.reward_contract = RewardContractProxy(reward_contract, self.blockchain_nodes)
@ -197,6 +198,7 @@ class TestFramework:
updated_config, updated_config,
self.contract.address(), self.contract.address(),
self.mine_contract.address(), self.mine_contract.address(),
self.reward_contract.address(),
self.log, self.log,
) )
self.nodes.append(node) self.nodes.append(node)

View File

@ -22,6 +22,7 @@ class ZgsNode(TestNode):
updated_config, updated_config,
log_contract_address, log_contract_address,
mine_contract_address, mine_contract_address,
reward_contract_address,
log, log,
rpc_timeout=10, rpc_timeout=10,
libp2p_nodes=None, libp2p_nodes=None,
@ -43,6 +44,7 @@ class ZgsNode(TestNode):
"network_libp2p_nodes": libp2p_nodes, "network_libp2p_nodes": libp2p_nodes,
"log_contract_address": log_contract_address, "log_contract_address": log_contract_address,
"mine_contract_address": mine_contract_address, "mine_contract_address": mine_contract_address,
"reward_contract_address": reward_contract_address,
"blockchain_rpc_endpoint": f"http://127.0.0.1:{blockchain_rpc_port(0)}", "blockchain_rpc_endpoint": f"http://127.0.0.1:{blockchain_rpc_port(0)}",
} }
# Set configs for this specific node. # Set configs for this specific node.

View File

@ -25,7 +25,7 @@ class RpcCaller:
if isinstance(parsed, Ok): if isinstance(parsed, Ok):
return parsed.result return parsed.result
else: else:
print("Failed to call RPC, method = %s(%s), error = %s" % (self.method, str(*args), parsed)) print("Failed to call RPC, method = %s(%s), error = %s" % (self.method, str(*args)[-1500:], parsed))
except Exception as ex: except Exception as ex:
print("Failed to call RPC, method = %s(%s), exception = %s" % (self.method, str(*args), ex)) print("Failed to call RPC, method = %s(%s), exception = %s" % (self.method, str(*args)[-1500:], ex))
return None return None