mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2024-12-25 07:45:17 +00:00
Compare commits
2 Commits
41ba320bcc
...
bff760cdf7
Author | SHA1 | Date | |
---|---|---|---|
|
bff760cdf7 | ||
|
ce34ff8be6 |
@ -50,6 +50,7 @@ impl PrunerConfig {
|
|||||||
pub struct Pruner {
|
pub struct Pruner {
|
||||||
config: PrunerConfig,
|
config: PrunerConfig,
|
||||||
first_rewardable_chunk: u64,
|
first_rewardable_chunk: u64,
|
||||||
|
first_tx_seq: u64,
|
||||||
|
|
||||||
store: Arc<Store>,
|
store: Arc<Store>,
|
||||||
|
|
||||||
@ -69,15 +70,16 @@ impl Pruner {
|
|||||||
if let Some(shard_config) = get_shard_config(store.as_ref()).await? {
|
if let Some(shard_config) = get_shard_config(store.as_ref()).await? {
|
||||||
config.shard_config = shard_config;
|
config.shard_config = shard_config;
|
||||||
}
|
}
|
||||||
let first_rewardable_chunk = get_first_rewardable_chunk(store.as_ref())
|
let (first_rewardable_chunk, first_tx_seq) = get_first_rewardable_chunk(store.as_ref())
|
||||||
.await?
|
.await?
|
||||||
.unwrap_or(0);
|
.unwrap_or((0, 0));
|
||||||
let reward_contract =
|
let reward_contract =
|
||||||
ChunkLinearReward::new(config.reward_address, Arc::new(config.make_provider()?));
|
ChunkLinearReward::new(config.reward_address, Arc::new(config.make_provider()?));
|
||||||
let (tx, rx) = mpsc::unbounded_channel();
|
let (tx, rx) = mpsc::unbounded_channel();
|
||||||
let pruner = Pruner {
|
let pruner = Pruner {
|
||||||
config,
|
config,
|
||||||
first_rewardable_chunk,
|
first_rewardable_chunk,
|
||||||
|
first_tx_seq,
|
||||||
store,
|
store,
|
||||||
sender: tx,
|
sender: tx,
|
||||||
miner_sender,
|
miner_sender,
|
||||||
@ -112,10 +114,19 @@ impl Pruner {
|
|||||||
?new_first_rewardable,
|
?new_first_rewardable,
|
||||||
"first rewardable chunk moves forward, start pruning"
|
"first rewardable chunk moves forward, start pruning"
|
||||||
);
|
);
|
||||||
|
self.prune_tx(
|
||||||
|
self.first_rewardable_chunk * SECTORS_PER_PRICING as u64,
|
||||||
|
new_first_rewardable * SECTORS_PER_PRICING as u64,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
self.prune_in_batch(no_reward_list).await?;
|
self.prune_in_batch(no_reward_list).await?;
|
||||||
self.put_first_rewardable_chunk_index(new_first_rewardable)
|
|
||||||
.await?;
|
|
||||||
self.first_rewardable_chunk = new_first_rewardable;
|
self.first_rewardable_chunk = new_first_rewardable;
|
||||||
|
self.put_first_rewardable_chunk_index(
|
||||||
|
self.first_rewardable_chunk,
|
||||||
|
self.first_tx_seq,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
}
|
}
|
||||||
tokio::time::sleep(self.config.check_time).await;
|
tokio::time::sleep(self.config.check_time).await;
|
||||||
}
|
}
|
||||||
@ -194,6 +205,26 @@ impl Pruner {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn prune_tx(&mut self, start_sector: u64, end_sector: u64) -> Result<()> {
|
||||||
|
while let Some(tx) = self.store.get_tx_by_seq_number(self.first_tx_seq).await? {
|
||||||
|
// If a part of the tx data is pruned, we mark the tx as pruned.
|
||||||
|
if tx.start_entry_index() >= start_sector && tx.start_entry_index() < end_sector {
|
||||||
|
self.store.prune_tx(tx.seq).await?;
|
||||||
|
} else if tx.start_entry_index() >= end_sector {
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
bail!(
|
||||||
|
"prune tx out of range: tx={:?}, start={} end={}",
|
||||||
|
tx,
|
||||||
|
start_sector,
|
||||||
|
end_sector
|
||||||
|
);
|
||||||
|
}
|
||||||
|
self.first_tx_seq += 1;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
async fn put_shard_config(&self) -> Result<()> {
|
async fn put_shard_config(&self) -> Result<()> {
|
||||||
if let Some(sender) = &self.miner_sender {
|
if let Some(sender) = &self.miner_sender {
|
||||||
sender.send(MinerMessage::SetShardConfig(self.config.shard_config))?;
|
sender.send(MinerMessage::SetShardConfig(self.config.shard_config))?;
|
||||||
@ -211,9 +242,13 @@ impl Pruner {
|
|||||||
async fn put_first_rewardable_chunk_index(
|
async fn put_first_rewardable_chunk_index(
|
||||||
&self,
|
&self,
|
||||||
new_first_rewardable_chunk: u64,
|
new_first_rewardable_chunk: u64,
|
||||||
|
new_first_tx_seq: u64,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
self.store
|
self.store
|
||||||
.set_config_encoded(&FIRST_REWARDABLE_CHUNK_KEY, &new_first_rewardable_chunk)
|
.set_config_encoded(
|
||||||
|
&FIRST_REWARDABLE_CHUNK_KEY,
|
||||||
|
&(new_first_rewardable_chunk, new_first_tx_seq),
|
||||||
|
)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -222,7 +257,7 @@ async fn get_shard_config(store: &Store) -> Result<Option<ShardConfig>> {
|
|||||||
store.get_config_decoded(&SHARD_CONFIG_KEY).await
|
store.get_config_decoded(&SHARD_CONFIG_KEY).await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_first_rewardable_chunk(store: &Store) -> Result<Option<u64>> {
|
async fn get_first_rewardable_chunk(store: &Store) -> Result<Option<(u64, u64)>> {
|
||||||
store.get_config_decoded(&FIRST_REWARDABLE_CHUNK_KEY).await
|
store.get_config_decoded(&FIRST_REWARDABLE_CHUNK_KEY).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -195,6 +195,10 @@ impl RpcServerImpl {
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if self.ctx.log_store.check_tx_pruned(tx.seq).await? {
|
||||||
|
return Err(error::invalid_params("root", "already pruned"));
|
||||||
|
}
|
||||||
|
|
||||||
Ok(false)
|
Ok(false)
|
||||||
} else {
|
} else {
|
||||||
//Check whether file is small enough to cache in the system
|
//Check whether file is small enough to cache in the system
|
||||||
|
@ -45,6 +45,7 @@ impl Store {
|
|||||||
}
|
}
|
||||||
|
|
||||||
delegate!(fn check_tx_completed(tx_seq: u64) -> Result<bool>);
|
delegate!(fn check_tx_completed(tx_seq: u64) -> Result<bool>);
|
||||||
|
delegate!(fn check_tx_pruned(tx_seq: u64) -> Result<bool>);
|
||||||
delegate!(fn get_chunk_by_tx_and_index(tx_seq: u64, index: usize) -> Result<Option<Chunk>>);
|
delegate!(fn get_chunk_by_tx_and_index(tx_seq: u64, index: usize) -> Result<Option<Chunk>>);
|
||||||
delegate!(fn get_chunks_by_tx_and_index_range(tx_seq: u64, index_start: usize, index_end: usize) -> Result<Option<ChunkArray>>);
|
delegate!(fn get_chunks_by_tx_and_index_range(tx_seq: u64, index_start: usize, index_end: usize) -> Result<Option<ChunkArray>>);
|
||||||
delegate!(fn get_chunks_with_proof_by_tx_and_index_range(tx_seq: u64, index_start: usize, index_end: usize, merkle_tx_seq: Option<u64>) -> Result<Option<ChunkArrayWithProof>>);
|
delegate!(fn get_chunks_with_proof_by_tx_and_index_range(tx_seq: u64, index_start: usize, index_end: usize, merkle_tx_seq: Option<u64>) -> Result<Option<ChunkArrayWithProof>>);
|
||||||
@ -53,6 +54,7 @@ impl Store {
|
|||||||
delegate!(fn put_chunks_with_tx_hash(tx_seq: u64, tx_hash: H256, chunks: ChunkArray, maybe_file_proof: Option<FlowProof>) -> Result<bool>);
|
delegate!(fn put_chunks_with_tx_hash(tx_seq: u64, tx_hash: H256, chunks: ChunkArray, maybe_file_proof: Option<FlowProof>) -> Result<bool>);
|
||||||
delegate!(fn get_chunk_by_flow_index(index: u64, length: u64) -> Result<Option<ChunkArray>>);
|
delegate!(fn get_chunk_by_flow_index(index: u64, length: u64) -> Result<Option<ChunkArray>>);
|
||||||
delegate!(fn finalize_tx(tx_seq: u64) -> Result<()>);
|
delegate!(fn finalize_tx(tx_seq: u64) -> Result<()>);
|
||||||
|
delegate!(fn prune_tx(tx_seq: u64) -> Result<()>);
|
||||||
delegate!(fn finalize_tx_with_hash(tx_seq: u64, tx_hash: H256) -> Result<bool>);
|
delegate!(fn finalize_tx_with_hash(tx_seq: u64, tx_hash: H256) -> Result<bool>);
|
||||||
delegate!(fn get_proof_at_root(root: Option<DataRoot>, index: u64, length: u64) -> Result<FlowRangeProof>);
|
delegate!(fn get_proof_at_root(root: Option<DataRoot>, index: u64, length: u64) -> Result<FlowRangeProof>);
|
||||||
delegate!(fn get_context() -> Result<(DataRoot, u64)>);
|
delegate!(fn get_context() -> Result<(DataRoot, u64)>);
|
||||||
|
@ -337,6 +337,10 @@ impl LogStoreWrite for LogManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn prune_tx(&self, tx_seq: u64) -> crate::error::Result<()> {
|
||||||
|
self.tx_store.prune_tx(tx_seq)
|
||||||
|
}
|
||||||
|
|
||||||
fn put_sync_progress(&self, progress: (u64, H256, Option<Option<u64>>)) -> Result<()> {
|
fn put_sync_progress(&self, progress: (u64, H256, Option<Option<u64>>)) -> Result<()> {
|
||||||
self.tx_store.put_progress(progress)
|
self.tx_store.put_progress(progress)
|
||||||
}
|
}
|
||||||
@ -563,6 +567,10 @@ impl LogStoreRead for LogManager {
|
|||||||
merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64,
|
merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64,
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn check_tx_pruned(&self, tx_seq: u64) -> crate::error::Result<bool> {
|
||||||
|
self.tx_store.check_tx_pruned(tx_seq)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl LogManager {
|
impl LogManager {
|
||||||
|
@ -53,6 +53,8 @@ pub trait LogStoreRead: LogStoreChunkRead {
|
|||||||
|
|
||||||
fn check_tx_completed(&self, tx_seq: u64) -> Result<bool>;
|
fn check_tx_completed(&self, tx_seq: u64) -> Result<bool>;
|
||||||
|
|
||||||
|
fn check_tx_pruned(&self, tx_seq: u64) -> Result<bool>;
|
||||||
|
|
||||||
fn next_tx_seq(&self) -> u64;
|
fn next_tx_seq(&self) -> u64;
|
||||||
|
|
||||||
fn get_sync_progress(&self) -> Result<Option<(u64, H256)>>;
|
fn get_sync_progress(&self) -> Result<Option<(u64, H256)>>;
|
||||||
@ -118,6 +120,8 @@ pub trait LogStoreWrite: LogStoreChunkWrite {
|
|||||||
/// the caller is supposed to track chunk statuses and call this after storing all the chunks.
|
/// the caller is supposed to track chunk statuses and call this after storing all the chunks.
|
||||||
fn finalize_tx(&self, tx_seq: u64) -> Result<()>;
|
fn finalize_tx(&self, tx_seq: u64) -> Result<()>;
|
||||||
fn finalize_tx_with_hash(&self, tx_seq: u64, tx_hash: H256) -> Result<bool>;
|
fn finalize_tx_with_hash(&self, tx_seq: u64, tx_hash: H256) -> Result<bool>;
|
||||||
|
/// Mark the tx as pruned, meaning the data will not be stored.
|
||||||
|
fn prune_tx(&self, tx_seq: u64) -> Result<()>;
|
||||||
|
|
||||||
/// Store the progress of synced block number and its hash.
|
/// Store the progress of synced block number and its hash.
|
||||||
fn put_sync_progress(&self, progress: (u64, H256, Option<Option<u64>>)) -> Result<()>;
|
fn put_sync_progress(&self, progress: (u64, H256, Option<Option<u64>>)) -> Result<()>;
|
||||||
|
@ -20,6 +20,9 @@ use tracing::{error, instrument};
|
|||||||
const LOG_SYNC_PROGRESS_KEY: &str = "log_sync_progress";
|
const LOG_SYNC_PROGRESS_KEY: &str = "log_sync_progress";
|
||||||
const NEXT_TX_KEY: &str = "next_tx_seq";
|
const NEXT_TX_KEY: &str = "next_tx_seq";
|
||||||
|
|
||||||
|
const TX_STATUS_FINALIZED: u8 = 0;
|
||||||
|
const TX_STATUS_PRUNED: u8 = 1;
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct BlockHashAndSubmissionIndex {
|
pub struct BlockHashAndSubmissionIndex {
|
||||||
pub block_hash: H256,
|
pub block_hash: H256,
|
||||||
@ -156,13 +159,27 @@ impl TransactionStore {
|
|||||||
|
|
||||||
#[instrument(skip(self))]
|
#[instrument(skip(self))]
|
||||||
pub fn finalize_tx(&self, tx_seq: u64) -> Result<()> {
|
pub fn finalize_tx(&self, tx_seq: u64) -> Result<()> {
|
||||||
|
Ok(self.kvdb.put(
|
||||||
|
COL_TX_COMPLETED,
|
||||||
|
&tx_seq.to_be_bytes(),
|
||||||
|
&[TX_STATUS_FINALIZED],
|
||||||
|
)?)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[instrument(skip(self))]
|
||||||
|
pub fn prune_tx(&self, tx_seq: u64) -> Result<()> {
|
||||||
Ok(self
|
Ok(self
|
||||||
.kvdb
|
.kvdb
|
||||||
.put(COL_TX_COMPLETED, &tx_seq.to_be_bytes(), &[0])?)
|
.put(COL_TX_COMPLETED, &tx_seq.to_be_bytes(), &[TX_STATUS_PRUNED])?)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn check_tx_completed(&self, tx_seq: u64) -> Result<bool> {
|
pub fn check_tx_completed(&self, tx_seq: u64) -> Result<bool> {
|
||||||
Ok(self.kvdb.has_key(COL_TX_COMPLETED, &tx_seq.to_be_bytes())?)
|
Ok(self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())?
|
||||||
|
== Some(vec![TX_STATUS_FINALIZED]))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn check_tx_pruned(&self, tx_seq: u64) -> Result<bool> {
|
||||||
|
Ok(self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())? == Some(vec![TX_STATUS_PRUNED]))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn next_tx_seq(&self) -> u64 {
|
pub fn next_tx_seq(&self) -> u64 {
|
||||||
|
@ -82,7 +82,9 @@ impl Batcher {
|
|||||||
|
|
||||||
async fn poll_tx(&self, tx_seq: u64) -> Result<Option<SyncResult>> {
|
async fn poll_tx(&self, tx_seq: u64) -> Result<Option<SyncResult>> {
|
||||||
// file already exists
|
// file already exists
|
||||||
if self.store.check_tx_completed(tx_seq).await? {
|
if self.store.check_tx_completed(tx_seq).await?
|
||||||
|
|| self.store.check_tx_pruned(tx_seq).await?
|
||||||
|
{
|
||||||
// File may be finalized during file sync, e.g. user uploaded file via RPC.
|
// File may be finalized during file sync, e.g. user uploaded file via RPC.
|
||||||
// In this case, just terminate the file sync.
|
// In this case, just terminate the file sync.
|
||||||
let num_terminated = self.terminate_file_sync(tx_seq, false).await;
|
let num_terminated = self.terminate_file_sync(tx_seq, false).await;
|
||||||
|
@ -573,7 +573,9 @@ impl SyncService {
|
|||||||
|
|
||||||
async fn on_find_file(&mut self, tx_seq: u64) -> Result<()> {
|
async fn on_find_file(&mut self, tx_seq: u64) -> Result<()> {
|
||||||
// file already exists
|
// file already exists
|
||||||
if self.store.check_tx_completed(tx_seq).await? {
|
if self.store.check_tx_completed(tx_seq).await?
|
||||||
|
|| self.store.check_tx_pruned(tx_seq).await?
|
||||||
|
{
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
// broadcast find file
|
// broadcast find file
|
||||||
@ -633,7 +635,9 @@ impl SyncService {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// file already exists
|
// file already exists
|
||||||
if self.store.check_tx_completed(tx_seq).await? {
|
if self.store.check_tx_completed(tx_seq).await?
|
||||||
|
|| self.store.check_tx_pruned(tx_seq).await?
|
||||||
|
{
|
||||||
bail!("File already exists");
|
bail!("File already exists");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -705,6 +709,15 @@ impl SyncService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
match self.store.check_tx_pruned(tx_seq).await {
|
||||||
|
Ok(true) => return,
|
||||||
|
Ok(false) => {}
|
||||||
|
Err(err) => {
|
||||||
|
error!(%tx_seq, %err, "Failed to check if file pruned");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Now, always sync files among all nodes
|
// Now, always sync files among all nodes
|
||||||
if let Err(err) = self
|
if let Err(err) = self
|
||||||
.on_start_sync_file(tx_seq, None, Some((peer_id, addr)))
|
.on_start_sync_file(tx_seq, None, Some((peer_id, addr)))
|
||||||
|
@ -2,6 +2,7 @@
|
|||||||
import time
|
import time
|
||||||
|
|
||||||
from test_framework.test_framework import TestFramework
|
from test_framework.test_framework import TestFramework
|
||||||
|
from mine_with_market_test import PRICE_PER_SECTOR
|
||||||
from utility.submission import create_submission, submit_data, data_to_segments
|
from utility.submission import create_submission, submit_data, data_to_segments
|
||||||
from utility.utils import wait_until, assert_equal
|
from utility.utils import wait_until, assert_equal
|
||||||
|
|
||||||
@ -30,7 +31,7 @@ class PrunerTest(TestFramework):
|
|||||||
|
|
||||||
chunk_data = b"\x02" * 8 * 256 * 1024
|
chunk_data = b"\x02" * 8 * 256 * 1024
|
||||||
submissions, data_root = create_submission(chunk_data)
|
submissions, data_root = create_submission(chunk_data)
|
||||||
self.contract.submit(submissions)
|
self.contract.submit(submissions, tx_prarams = {"value": int(len(chunk_data) / 256 * PRICE_PER_SECTOR * 1.1)})
|
||||||
wait_until(lambda: self.contract.num_submissions() == 1)
|
wait_until(lambda: self.contract.num_submissions() == 1)
|
||||||
wait_until(lambda: client.zgs_get_file_info(data_root) is not None)
|
wait_until(lambda: client.zgs_get_file_info(data_root) is not None)
|
||||||
|
|
||||||
|
@ -25,7 +25,7 @@ class RpcCaller:
|
|||||||
if isinstance(parsed, Ok):
|
if isinstance(parsed, Ok):
|
||||||
return parsed.result
|
return parsed.result
|
||||||
else:
|
else:
|
||||||
print("Failed to call RPC, method = %s(%s), error = %s" % (self.method, str(*args), parsed))
|
print("Failed to call RPC, method = %s(%s), error = %s" % (self.method, str(*args)[-1500:], parsed))
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
print("Failed to call RPC, method = %s(%s), exception = %s" % (self.method, str(*args), ex))
|
print("Failed to call RPC, method = %s(%s), exception = %s" % (self.method, str(*args)[-1500:], ex))
|
||||||
return None
|
return None
|
||||||
|
Loading…
Reference in New Issue
Block a user