Compare commits

..

No commits in common. "bff760cdf76054e973671865ac37a01770383aa2" and "41ba320bcc70285e3b5be1ccafa32d653f84b263" have entirely different histories.

10 changed files with 14 additions and 100 deletions

View File

@ -50,7 +50,6 @@ impl PrunerConfig {
pub struct Pruner { pub struct Pruner {
config: PrunerConfig, config: PrunerConfig,
first_rewardable_chunk: u64, first_rewardable_chunk: u64,
first_tx_seq: u64,
store: Arc<Store>, store: Arc<Store>,
@ -70,16 +69,15 @@ impl Pruner {
if let Some(shard_config) = get_shard_config(store.as_ref()).await? { if let Some(shard_config) = get_shard_config(store.as_ref()).await? {
config.shard_config = shard_config; config.shard_config = shard_config;
} }
let (first_rewardable_chunk, first_tx_seq) = get_first_rewardable_chunk(store.as_ref()) let first_rewardable_chunk = get_first_rewardable_chunk(store.as_ref())
.await? .await?
.unwrap_or((0, 0)); .unwrap_or(0);
let reward_contract = let reward_contract =
ChunkLinearReward::new(config.reward_address, Arc::new(config.make_provider()?)); ChunkLinearReward::new(config.reward_address, Arc::new(config.make_provider()?));
let (tx, rx) = mpsc::unbounded_channel(); let (tx, rx) = mpsc::unbounded_channel();
let pruner = Pruner { let pruner = Pruner {
config, config,
first_rewardable_chunk, first_rewardable_chunk,
first_tx_seq,
store, store,
sender: tx, sender: tx,
miner_sender, miner_sender,
@ -114,19 +112,10 @@ impl Pruner {
?new_first_rewardable, ?new_first_rewardable,
"first rewardable chunk moves forward, start pruning" "first rewardable chunk moves forward, start pruning"
); );
self.prune_tx(
self.first_rewardable_chunk * SECTORS_PER_PRICING as u64,
new_first_rewardable * SECTORS_PER_PRICING as u64,
)
.await?;
self.prune_in_batch(no_reward_list).await?; self.prune_in_batch(no_reward_list).await?;
self.put_first_rewardable_chunk_index(new_first_rewardable)
.await?;
self.first_rewardable_chunk = new_first_rewardable; self.first_rewardable_chunk = new_first_rewardable;
self.put_first_rewardable_chunk_index(
self.first_rewardable_chunk,
self.first_tx_seq,
)
.await?;
} }
tokio::time::sleep(self.config.check_time).await; tokio::time::sleep(self.config.check_time).await;
} }
@ -205,26 +194,6 @@ impl Pruner {
Ok(()) Ok(())
} }
async fn prune_tx(&mut self, start_sector: u64, end_sector: u64) -> Result<()> {
while let Some(tx) = self.store.get_tx_by_seq_number(self.first_tx_seq).await? {
// If a part of the tx data is pruned, we mark the tx as pruned.
if tx.start_entry_index() >= start_sector && tx.start_entry_index() < end_sector {
self.store.prune_tx(tx.seq).await?;
} else if tx.start_entry_index() >= end_sector {
break;
} else {
bail!(
"prune tx out of range: tx={:?}, start={} end={}",
tx,
start_sector,
end_sector
);
}
self.first_tx_seq += 1;
}
Ok(())
}
async fn put_shard_config(&self) -> Result<()> { async fn put_shard_config(&self) -> Result<()> {
if let Some(sender) = &self.miner_sender { if let Some(sender) = &self.miner_sender {
sender.send(MinerMessage::SetShardConfig(self.config.shard_config))?; sender.send(MinerMessage::SetShardConfig(self.config.shard_config))?;
@ -242,13 +211,9 @@ impl Pruner {
async fn put_first_rewardable_chunk_index( async fn put_first_rewardable_chunk_index(
&self, &self,
new_first_rewardable_chunk: u64, new_first_rewardable_chunk: u64,
new_first_tx_seq: u64,
) -> Result<()> { ) -> Result<()> {
self.store self.store
.set_config_encoded( .set_config_encoded(&FIRST_REWARDABLE_CHUNK_KEY, &new_first_rewardable_chunk)
&FIRST_REWARDABLE_CHUNK_KEY,
&(new_first_rewardable_chunk, new_first_tx_seq),
)
.await .await
} }
} }
@ -257,7 +222,7 @@ async fn get_shard_config(store: &Store) -> Result<Option<ShardConfig>> {
store.get_config_decoded(&SHARD_CONFIG_KEY).await store.get_config_decoded(&SHARD_CONFIG_KEY).await
} }
async fn get_first_rewardable_chunk(store: &Store) -> Result<Option<(u64, u64)>> { async fn get_first_rewardable_chunk(store: &Store) -> Result<Option<u64>> {
store.get_config_decoded(&FIRST_REWARDABLE_CHUNK_KEY).await store.get_config_decoded(&FIRST_REWARDABLE_CHUNK_KEY).await
} }

View File

@ -195,10 +195,6 @@ impl RpcServerImpl {
)); ));
} }
if self.ctx.log_store.check_tx_pruned(tx.seq).await? {
return Err(error::invalid_params("root", "already pruned"));
}
Ok(false) Ok(false)
} else { } else {
//Check whether file is small enough to cache in the system //Check whether file is small enough to cache in the system

View File

@ -45,7 +45,6 @@ impl Store {
} }
delegate!(fn check_tx_completed(tx_seq: u64) -> Result<bool>); delegate!(fn check_tx_completed(tx_seq: u64) -> Result<bool>);
delegate!(fn check_tx_pruned(tx_seq: u64) -> Result<bool>);
delegate!(fn get_chunk_by_tx_and_index(tx_seq: u64, index: usize) -> Result<Option<Chunk>>); delegate!(fn get_chunk_by_tx_and_index(tx_seq: u64, index: usize) -> Result<Option<Chunk>>);
delegate!(fn get_chunks_by_tx_and_index_range(tx_seq: u64, index_start: usize, index_end: usize) -> Result<Option<ChunkArray>>); delegate!(fn get_chunks_by_tx_and_index_range(tx_seq: u64, index_start: usize, index_end: usize) -> Result<Option<ChunkArray>>);
delegate!(fn get_chunks_with_proof_by_tx_and_index_range(tx_seq: u64, index_start: usize, index_end: usize, merkle_tx_seq: Option<u64>) -> Result<Option<ChunkArrayWithProof>>); delegate!(fn get_chunks_with_proof_by_tx_and_index_range(tx_seq: u64, index_start: usize, index_end: usize, merkle_tx_seq: Option<u64>) -> Result<Option<ChunkArrayWithProof>>);
@ -54,7 +53,6 @@ impl Store {
delegate!(fn put_chunks_with_tx_hash(tx_seq: u64, tx_hash: H256, chunks: ChunkArray, maybe_file_proof: Option<FlowProof>) -> Result<bool>); delegate!(fn put_chunks_with_tx_hash(tx_seq: u64, tx_hash: H256, chunks: ChunkArray, maybe_file_proof: Option<FlowProof>) -> Result<bool>);
delegate!(fn get_chunk_by_flow_index(index: u64, length: u64) -> Result<Option<ChunkArray>>); delegate!(fn get_chunk_by_flow_index(index: u64, length: u64) -> Result<Option<ChunkArray>>);
delegate!(fn finalize_tx(tx_seq: u64) -> Result<()>); delegate!(fn finalize_tx(tx_seq: u64) -> Result<()>);
delegate!(fn prune_tx(tx_seq: u64) -> Result<()>);
delegate!(fn finalize_tx_with_hash(tx_seq: u64, tx_hash: H256) -> Result<bool>); delegate!(fn finalize_tx_with_hash(tx_seq: u64, tx_hash: H256) -> Result<bool>);
delegate!(fn get_proof_at_root(root: Option<DataRoot>, index: u64, length: u64) -> Result<FlowRangeProof>); delegate!(fn get_proof_at_root(root: Option<DataRoot>, index: u64, length: u64) -> Result<FlowRangeProof>);
delegate!(fn get_context() -> Result<(DataRoot, u64)>); delegate!(fn get_context() -> Result<(DataRoot, u64)>);

View File

@ -337,10 +337,6 @@ impl LogStoreWrite for LogManager {
} }
} }
fn prune_tx(&self, tx_seq: u64) -> crate::error::Result<()> {
self.tx_store.prune_tx(tx_seq)
}
fn put_sync_progress(&self, progress: (u64, H256, Option<Option<u64>>)) -> Result<()> { fn put_sync_progress(&self, progress: (u64, H256, Option<Option<u64>>)) -> Result<()> {
self.tx_store.put_progress(progress) self.tx_store.put_progress(progress)
} }
@ -567,10 +563,6 @@ impl LogStoreRead for LogManager {
merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64, merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64,
)) ))
} }
fn check_tx_pruned(&self, tx_seq: u64) -> crate::error::Result<bool> {
self.tx_store.check_tx_pruned(tx_seq)
}
} }
impl LogManager { impl LogManager {

View File

@ -53,8 +53,6 @@ pub trait LogStoreRead: LogStoreChunkRead {
fn check_tx_completed(&self, tx_seq: u64) -> Result<bool>; fn check_tx_completed(&self, tx_seq: u64) -> Result<bool>;
fn check_tx_pruned(&self, tx_seq: u64) -> Result<bool>;
fn next_tx_seq(&self) -> u64; fn next_tx_seq(&self) -> u64;
fn get_sync_progress(&self) -> Result<Option<(u64, H256)>>; fn get_sync_progress(&self) -> Result<Option<(u64, H256)>>;
@ -120,8 +118,6 @@ pub trait LogStoreWrite: LogStoreChunkWrite {
/// the caller is supposed to track chunk statuses and call this after storing all the chunks. /// the caller is supposed to track chunk statuses and call this after storing all the chunks.
fn finalize_tx(&self, tx_seq: u64) -> Result<()>; fn finalize_tx(&self, tx_seq: u64) -> Result<()>;
fn finalize_tx_with_hash(&self, tx_seq: u64, tx_hash: H256) -> Result<bool>; fn finalize_tx_with_hash(&self, tx_seq: u64, tx_hash: H256) -> Result<bool>;
/// Mark the tx as pruned, meaning the data will not be stored.
fn prune_tx(&self, tx_seq: u64) -> Result<()>;
/// Store the progress of synced block number and its hash. /// Store the progress of synced block number and its hash.
fn put_sync_progress(&self, progress: (u64, H256, Option<Option<u64>>)) -> Result<()>; fn put_sync_progress(&self, progress: (u64, H256, Option<Option<u64>>)) -> Result<()>;

View File

@ -20,9 +20,6 @@ use tracing::{error, instrument};
const LOG_SYNC_PROGRESS_KEY: &str = "log_sync_progress"; const LOG_SYNC_PROGRESS_KEY: &str = "log_sync_progress";
const NEXT_TX_KEY: &str = "next_tx_seq"; const NEXT_TX_KEY: &str = "next_tx_seq";
const TX_STATUS_FINALIZED: u8 = 0;
const TX_STATUS_PRUNED: u8 = 1;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct BlockHashAndSubmissionIndex { pub struct BlockHashAndSubmissionIndex {
pub block_hash: H256, pub block_hash: H256,
@ -159,27 +156,13 @@ impl TransactionStore {
#[instrument(skip(self))] #[instrument(skip(self))]
pub fn finalize_tx(&self, tx_seq: u64) -> Result<()> { pub fn finalize_tx(&self, tx_seq: u64) -> Result<()> {
Ok(self.kvdb.put(
COL_TX_COMPLETED,
&tx_seq.to_be_bytes(),
&[TX_STATUS_FINALIZED],
)?)
}
#[instrument(skip(self))]
pub fn prune_tx(&self, tx_seq: u64) -> Result<()> {
Ok(self Ok(self
.kvdb .kvdb
.put(COL_TX_COMPLETED, &tx_seq.to_be_bytes(), &[TX_STATUS_PRUNED])?) .put(COL_TX_COMPLETED, &tx_seq.to_be_bytes(), &[0])?)
} }
pub fn check_tx_completed(&self, tx_seq: u64) -> Result<bool> { pub fn check_tx_completed(&self, tx_seq: u64) -> Result<bool> {
Ok(self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())? Ok(self.kvdb.has_key(COL_TX_COMPLETED, &tx_seq.to_be_bytes())?)
== Some(vec![TX_STATUS_FINALIZED]))
}
pub fn check_tx_pruned(&self, tx_seq: u64) -> Result<bool> {
Ok(self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())? == Some(vec![TX_STATUS_PRUNED]))
} }
pub fn next_tx_seq(&self) -> u64 { pub fn next_tx_seq(&self) -> u64 {

View File

@ -82,9 +82,7 @@ impl Batcher {
async fn poll_tx(&self, tx_seq: u64) -> Result<Option<SyncResult>> { async fn poll_tx(&self, tx_seq: u64) -> Result<Option<SyncResult>> {
// file already exists // file already exists
if self.store.check_tx_completed(tx_seq).await? if self.store.check_tx_completed(tx_seq).await? {
|| self.store.check_tx_pruned(tx_seq).await?
{
// File may be finalized during file sync, e.g. user uploaded file via RPC. // File may be finalized during file sync, e.g. user uploaded file via RPC.
// In this case, just terminate the file sync. // In this case, just terminate the file sync.
let num_terminated = self.terminate_file_sync(tx_seq, false).await; let num_terminated = self.terminate_file_sync(tx_seq, false).await;

View File

@ -573,9 +573,7 @@ impl SyncService {
async fn on_find_file(&mut self, tx_seq: u64) -> Result<()> { async fn on_find_file(&mut self, tx_seq: u64) -> Result<()> {
// file already exists // file already exists
if self.store.check_tx_completed(tx_seq).await? if self.store.check_tx_completed(tx_seq).await? {
|| self.store.check_tx_pruned(tx_seq).await?
{
return Ok(()); return Ok(());
} }
// broadcast find file // broadcast find file
@ -635,9 +633,7 @@ impl SyncService {
}; };
// file already exists // file already exists
if self.store.check_tx_completed(tx_seq).await? if self.store.check_tx_completed(tx_seq).await? {
|| self.store.check_tx_pruned(tx_seq).await?
{
bail!("File already exists"); bail!("File already exists");
} }
@ -709,15 +705,6 @@ impl SyncService {
} }
} }
match self.store.check_tx_pruned(tx_seq).await {
Ok(true) => return,
Ok(false) => {}
Err(err) => {
error!(%tx_seq, %err, "Failed to check if file pruned");
return;
}
}
// Now, always sync files among all nodes // Now, always sync files among all nodes
if let Err(err) = self if let Err(err) = self
.on_start_sync_file(tx_seq, None, Some((peer_id, addr))) .on_start_sync_file(tx_seq, None, Some((peer_id, addr)))

View File

@ -2,7 +2,6 @@
import time import time
from test_framework.test_framework import TestFramework from test_framework.test_framework import TestFramework
from mine_with_market_test import PRICE_PER_SECTOR
from utility.submission import create_submission, submit_data, data_to_segments from utility.submission import create_submission, submit_data, data_to_segments
from utility.utils import wait_until, assert_equal from utility.utils import wait_until, assert_equal
@ -31,7 +30,7 @@ class PrunerTest(TestFramework):
chunk_data = b"\x02" * 8 * 256 * 1024 chunk_data = b"\x02" * 8 * 256 * 1024
submissions, data_root = create_submission(chunk_data) submissions, data_root = create_submission(chunk_data)
self.contract.submit(submissions, tx_prarams = {"value": int(len(chunk_data) / 256 * PRICE_PER_SECTOR * 1.1)}) self.contract.submit(submissions)
wait_until(lambda: self.contract.num_submissions() == 1) wait_until(lambda: self.contract.num_submissions() == 1)
wait_until(lambda: client.zgs_get_file_info(data_root) is not None) wait_until(lambda: client.zgs_get_file_info(data_root) is not None)

View File

@ -25,7 +25,7 @@ class RpcCaller:
if isinstance(parsed, Ok): if isinstance(parsed, Ok):
return parsed.result return parsed.result
else: else:
print("Failed to call RPC, method = %s(%s), error = %s" % (self.method, str(*args)[-1500:], parsed)) print("Failed to call RPC, method = %s(%s), error = %s" % (self.method, str(*args), parsed))
except Exception as ex: except Exception as ex:
print("Failed to call RPC, method = %s(%s), exception = %s" % (self.method, str(*args)[-1500:], ex)) print("Failed to call RPC, method = %s(%s), exception = %s" % (self.method, str(*args), ex))
return None return None