Compare commits

...

2 Commits

Author SHA1 Message Date
Peilun Li
bff760cdf7 Fix tests. 2024-08-05 19:54:12 +08:00
Peilun Li
ce34ff8be6 Add tx prune status. 2024-08-05 13:58:48 +08:00
10 changed files with 100 additions and 14 deletions

View File

@ -50,6 +50,7 @@ impl PrunerConfig {
pub struct Pruner {
config: PrunerConfig,
first_rewardable_chunk: u64,
first_tx_seq: u64,
store: Arc<Store>,
@ -69,15 +70,16 @@ impl Pruner {
if let Some(shard_config) = get_shard_config(store.as_ref()).await? {
config.shard_config = shard_config;
}
let first_rewardable_chunk = get_first_rewardable_chunk(store.as_ref())
let (first_rewardable_chunk, first_tx_seq) = get_first_rewardable_chunk(store.as_ref())
.await?
.unwrap_or(0);
.unwrap_or((0, 0));
let reward_contract =
ChunkLinearReward::new(config.reward_address, Arc::new(config.make_provider()?));
let (tx, rx) = mpsc::unbounded_channel();
let pruner = Pruner {
config,
first_rewardable_chunk,
first_tx_seq,
store,
sender: tx,
miner_sender,
@ -112,10 +114,19 @@ impl Pruner {
?new_first_rewardable,
"first rewardable chunk moves forward, start pruning"
);
self.prune_in_batch(no_reward_list).await?;
self.put_first_rewardable_chunk_index(new_first_rewardable)
self.prune_tx(
self.first_rewardable_chunk * SECTORS_PER_PRICING as u64,
new_first_rewardable * SECTORS_PER_PRICING as u64,
)
.await?;
self.prune_in_batch(no_reward_list).await?;
self.first_rewardable_chunk = new_first_rewardable;
self.put_first_rewardable_chunk_index(
self.first_rewardable_chunk,
self.first_tx_seq,
)
.await?;
}
tokio::time::sleep(self.config.check_time).await;
}
@ -194,6 +205,26 @@ impl Pruner {
Ok(())
}
async fn prune_tx(&mut self, start_sector: u64, end_sector: u64) -> Result<()> {
while let Some(tx) = self.store.get_tx_by_seq_number(self.first_tx_seq).await? {
// If a part of the tx data is pruned, we mark the tx as pruned.
if tx.start_entry_index() >= start_sector && tx.start_entry_index() < end_sector {
self.store.prune_tx(tx.seq).await?;
} else if tx.start_entry_index() >= end_sector {
break;
} else {
bail!(
"prune tx out of range: tx={:?}, start={} end={}",
tx,
start_sector,
end_sector
);
}
self.first_tx_seq += 1;
}
Ok(())
}
async fn put_shard_config(&self) -> Result<()> {
if let Some(sender) = &self.miner_sender {
sender.send(MinerMessage::SetShardConfig(self.config.shard_config))?;
@ -211,9 +242,13 @@ impl Pruner {
async fn put_first_rewardable_chunk_index(
&self,
new_first_rewardable_chunk: u64,
new_first_tx_seq: u64,
) -> Result<()> {
self.store
.set_config_encoded(&FIRST_REWARDABLE_CHUNK_KEY, &new_first_rewardable_chunk)
.set_config_encoded(
&FIRST_REWARDABLE_CHUNK_KEY,
&(new_first_rewardable_chunk, new_first_tx_seq),
)
.await
}
}
@ -222,7 +257,7 @@ async fn get_shard_config(store: &Store) -> Result<Option<ShardConfig>> {
store.get_config_decoded(&SHARD_CONFIG_KEY).await
}
async fn get_first_rewardable_chunk(store: &Store) -> Result<Option<u64>> {
async fn get_first_rewardable_chunk(store: &Store) -> Result<Option<(u64, u64)>> {
store.get_config_decoded(&FIRST_REWARDABLE_CHUNK_KEY).await
}

View File

@ -195,6 +195,10 @@ impl RpcServerImpl {
));
}
if self.ctx.log_store.check_tx_pruned(tx.seq).await? {
return Err(error::invalid_params("root", "already pruned"));
}
Ok(false)
} else {
//Check whether file is small enough to cache in the system

View File

@ -45,6 +45,7 @@ impl Store {
}
delegate!(fn check_tx_completed(tx_seq: u64) -> Result<bool>);
delegate!(fn check_tx_pruned(tx_seq: u64) -> Result<bool>);
delegate!(fn get_chunk_by_tx_and_index(tx_seq: u64, index: usize) -> Result<Option<Chunk>>);
delegate!(fn get_chunks_by_tx_and_index_range(tx_seq: u64, index_start: usize, index_end: usize) -> Result<Option<ChunkArray>>);
delegate!(fn get_chunks_with_proof_by_tx_and_index_range(tx_seq: u64, index_start: usize, index_end: usize, merkle_tx_seq: Option<u64>) -> Result<Option<ChunkArrayWithProof>>);
@ -53,6 +54,7 @@ impl Store {
delegate!(fn put_chunks_with_tx_hash(tx_seq: u64, tx_hash: H256, chunks: ChunkArray, maybe_file_proof: Option<FlowProof>) -> Result<bool>);
delegate!(fn get_chunk_by_flow_index(index: u64, length: u64) -> Result<Option<ChunkArray>>);
delegate!(fn finalize_tx(tx_seq: u64) -> Result<()>);
delegate!(fn prune_tx(tx_seq: u64) -> Result<()>);
delegate!(fn finalize_tx_with_hash(tx_seq: u64, tx_hash: H256) -> Result<bool>);
delegate!(fn get_proof_at_root(root: Option<DataRoot>, index: u64, length: u64) -> Result<FlowRangeProof>);
delegate!(fn get_context() -> Result<(DataRoot, u64)>);

View File

@ -337,6 +337,10 @@ impl LogStoreWrite for LogManager {
}
}
fn prune_tx(&self, tx_seq: u64) -> crate::error::Result<()> {
self.tx_store.prune_tx(tx_seq)
}
fn put_sync_progress(&self, progress: (u64, H256, Option<Option<u64>>)) -> Result<()> {
self.tx_store.put_progress(progress)
}
@ -563,6 +567,10 @@ impl LogStoreRead for LogManager {
merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64,
))
}
fn check_tx_pruned(&self, tx_seq: u64) -> crate::error::Result<bool> {
self.tx_store.check_tx_pruned(tx_seq)
}
}
impl LogManager {

View File

@ -53,6 +53,8 @@ pub trait LogStoreRead: LogStoreChunkRead {
fn check_tx_completed(&self, tx_seq: u64) -> Result<bool>;
fn check_tx_pruned(&self, tx_seq: u64) -> Result<bool>;
fn next_tx_seq(&self) -> u64;
fn get_sync_progress(&self) -> Result<Option<(u64, H256)>>;
@ -118,6 +120,8 @@ pub trait LogStoreWrite: LogStoreChunkWrite {
/// the caller is supposed to track chunk statuses and call this after storing all the chunks.
fn finalize_tx(&self, tx_seq: u64) -> Result<()>;
fn finalize_tx_with_hash(&self, tx_seq: u64, tx_hash: H256) -> Result<bool>;
/// Mark the tx as pruned, meaning the data will not be stored.
fn prune_tx(&self, tx_seq: u64) -> Result<()>;
/// Store the progress of synced block number and its hash.
fn put_sync_progress(&self, progress: (u64, H256, Option<Option<u64>>)) -> Result<()>;

View File

@ -20,6 +20,9 @@ use tracing::{error, instrument};
const LOG_SYNC_PROGRESS_KEY: &str = "log_sync_progress";
const NEXT_TX_KEY: &str = "next_tx_seq";
const TX_STATUS_FINALIZED: u8 = 0;
const TX_STATUS_PRUNED: u8 = 1;
#[derive(Clone, Debug)]
pub struct BlockHashAndSubmissionIndex {
pub block_hash: H256,
@ -156,13 +159,27 @@ impl TransactionStore {
#[instrument(skip(self))]
pub fn finalize_tx(&self, tx_seq: u64) -> Result<()> {
Ok(self.kvdb.put(
COL_TX_COMPLETED,
&tx_seq.to_be_bytes(),
&[TX_STATUS_FINALIZED],
)?)
}
#[instrument(skip(self))]
pub fn prune_tx(&self, tx_seq: u64) -> Result<()> {
Ok(self
.kvdb
.put(COL_TX_COMPLETED, &tx_seq.to_be_bytes(), &[0])?)
.put(COL_TX_COMPLETED, &tx_seq.to_be_bytes(), &[TX_STATUS_PRUNED])?)
}
pub fn check_tx_completed(&self, tx_seq: u64) -> Result<bool> {
Ok(self.kvdb.has_key(COL_TX_COMPLETED, &tx_seq.to_be_bytes())?)
Ok(self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())?
== Some(vec![TX_STATUS_FINALIZED]))
}
pub fn check_tx_pruned(&self, tx_seq: u64) -> Result<bool> {
Ok(self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())? == Some(vec![TX_STATUS_PRUNED]))
}
pub fn next_tx_seq(&self) -> u64 {

View File

@ -82,7 +82,9 @@ impl Batcher {
async fn poll_tx(&self, tx_seq: u64) -> Result<Option<SyncResult>> {
// file already exists
if self.store.check_tx_completed(tx_seq).await? {
if self.store.check_tx_completed(tx_seq).await?
|| self.store.check_tx_pruned(tx_seq).await?
{
// File may be finalized during file sync, e.g. user uploaded file via RPC.
// In this case, just terminate the file sync.
let num_terminated = self.terminate_file_sync(tx_seq, false).await;

View File

@ -573,7 +573,9 @@ impl SyncService {
async fn on_find_file(&mut self, tx_seq: u64) -> Result<()> {
// file already exists
if self.store.check_tx_completed(tx_seq).await? {
if self.store.check_tx_completed(tx_seq).await?
|| self.store.check_tx_pruned(tx_seq).await?
{
return Ok(());
}
// broadcast find file
@ -633,7 +635,9 @@ impl SyncService {
};
// file already exists
if self.store.check_tx_completed(tx_seq).await? {
if self.store.check_tx_completed(tx_seq).await?
|| self.store.check_tx_pruned(tx_seq).await?
{
bail!("File already exists");
}
@ -705,6 +709,15 @@ impl SyncService {
}
}
match self.store.check_tx_pruned(tx_seq).await {
Ok(true) => return,
Ok(false) => {}
Err(err) => {
error!(%tx_seq, %err, "Failed to check if file pruned");
return;
}
}
// Now, always sync files among all nodes
if let Err(err) = self
.on_start_sync_file(tx_seq, None, Some((peer_id, addr)))

View File

@ -2,6 +2,7 @@
import time
from test_framework.test_framework import TestFramework
from mine_with_market_test import PRICE_PER_SECTOR
from utility.submission import create_submission, submit_data, data_to_segments
from utility.utils import wait_until, assert_equal
@ -30,7 +31,7 @@ class PrunerTest(TestFramework):
chunk_data = b"\x02" * 8 * 256 * 1024
submissions, data_root = create_submission(chunk_data)
self.contract.submit(submissions)
self.contract.submit(submissions, tx_prarams = {"value": int(len(chunk_data) / 256 * PRICE_PER_SECTOR * 1.1)})
wait_until(lambda: self.contract.num_submissions() == 1)
wait_until(lambda: client.zgs_get_file_info(data_root) is not None)

View File

@ -25,7 +25,7 @@ class RpcCaller:
if isinstance(parsed, Ok):
return parsed.result
else:
print("Failed to call RPC, method = %s(%s), error = %s" % (self.method, str(*args), parsed))
print("Failed to call RPC, method = %s(%s), error = %s" % (self.method, str(*args)[-1500:], parsed))
except Exception as ex:
print("Failed to call RPC, method = %s(%s), exception = %s" % (self.method, str(*args), ex))
print("Failed to call RPC, method = %s(%s), exception = %s" % (self.method, str(*args)[-1500:], ex))
return None