Prune no reward chunks.

This commit is contained in:
Peilun Li 2024-07-23 15:52:29 +08:00
parent 085c34beb0
commit 69be4e1412
7 changed files with 117 additions and 18 deletions

@ -1 +1 @@
Subproject commit 25bc14a27441e8fb26e4d42d7c8c885f92d6c74a Subproject commit 8f8f906224f966651de0b745738fb8aab4f492c4

3
Cargo.lock generated
View File

@ -5824,6 +5824,9 @@ name = "pruner"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"contract-interface",
"ethereum-types 0.14.1",
"ethers",
"miner", "miner",
"rand 0.8.5", "rand 0.8.5",
"storage", "storage",

View File

@ -14,6 +14,12 @@ abigen!(
"../../0g-storage-contracts/artifacts/contracts/miner/Mine.sol/PoraMine.json" "../../0g-storage-contracts/artifacts/contracts/miner/Mine.sol/PoraMine.json"
); );
#[cfg(not(feature = "dev"))]
abigen!(
ChunkLinearReward,
"../../0g-storage-contracts/artifacts/contracts/reward/ChunkLinearReward.sol/ChunkLinearReward.json"
);
#[cfg(feature = "dev")] #[cfg(feature = "dev")]
abigen!( abigen!(
ZgsFlow, ZgsFlow,
@ -25,3 +31,9 @@ abigen!(
PoraMine, PoraMine,
"../../0g-storage-contracts-dev/artifacts/contracts/miner/Mine.sol/PoraMine.json" "../../0g-storage-contracts-dev/artifacts/contracts/miner/Mine.sol/PoraMine.json"
); );
#[cfg(feature = "dev")]
abigen!(
ChunkLinearReward,
"../../0g-storage-contracts/artifacts/contracts/reward/ChunkLinearReward.sol/ChunkLinearReward.json"
);

View File

@ -12,3 +12,6 @@ tokio = "1.37.0"
rand = "0.8.5" rand = "0.8.5"
task_executor = { path = "../../common/task_executor" } task_executor = { path = "../../common/task_executor" }
tracing = "0.1.40" tracing = "0.1.40"
ethereum-types = "0.14.1"
contract-interface = { path = "../../common/contract-interface" }
ethers = "^2"

View File

@ -1,4 +1,7 @@
use anyhow::Result; use anyhow::{anyhow, bail, Result};
use contract_interface::ChunkLinearReward;
use ethereum_types::Address;
use ethers::prelude::{Http, Provider};
use miner::MinerMessage; use miner::MinerMessage;
use rand::Rng; use rand::Rng;
use std::path::PathBuf; use std::path::PathBuf;
@ -13,6 +16,8 @@ use tracing::{debug, info};
// Start pruning when the db directory size exceeds 0.9 * limit. // Start pruning when the db directory size exceeds 0.9 * limit.
const PRUNE_THRESHOLD: f32 = 0.9; const PRUNE_THRESHOLD: f32 = 0.9;
const FIRST_REWARDABLE_CHUNK_KEY: &str = "first_rewardable_chunk";
#[derive(Debug)] #[derive(Debug)]
pub struct PrunerConfig { pub struct PrunerConfig {
pub shard_config: ShardConfig, pub shard_config: ShardConfig,
@ -21,20 +26,32 @@ pub struct PrunerConfig {
pub check_time: Duration, pub check_time: Duration,
pub batch_size: usize, pub batch_size: usize,
pub batch_wait_time: Duration, pub batch_wait_time: Duration,
pub rpc_endpoint_url: String,
pub reward_address: Address,
} }
impl PrunerConfig { impl PrunerConfig {
fn start_prune_size(&self) -> u64 { fn start_prune_size(&self) -> u64 {
(self.max_num_chunks as f32 * PRUNE_THRESHOLD) as u64 (self.max_num_chunks as f32 * PRUNE_THRESHOLD) as u64
} }
fn make_provider(&self) -> Result<Provider<Http>> {
Provider::<Http>::try_from(&self.rpc_endpoint_url)
.map_err(|e| anyhow!("Can not parse blockchain endpoint: {:?}", e))
}
} }
pub struct Pruner { pub struct Pruner {
config: PrunerConfig, config: PrunerConfig,
first_rewardable_chunk: u64,
store: Arc<Store>, store: Arc<Store>,
sender: mpsc::UnboundedSender<PrunerMessage>, sender: mpsc::UnboundedSender<PrunerMessage>,
miner_sender: Option<broadcast::Sender<MinerMessage>>, miner_sender: Option<broadcast::Sender<MinerMessage>>,
reward_contract: ChunkLinearReward<Provider<Http>>,
} }
impl Pruner { impl Pruner {
@ -47,12 +64,19 @@ impl Pruner {
if let Some(shard_config) = get_shard_config(store.as_ref()).await? { if let Some(shard_config) = get_shard_config(store.as_ref()).await? {
config.shard_config = shard_config; config.shard_config = shard_config;
} }
let first_rewardable_chunk = get_first_rewardable_chunk(store.as_ref())
.await?
.unwrap_or(0);
let reward_contract =
ChunkLinearReward::new(config.reward_address, Arc::new(config.make_provider()?));
let (tx, rx) = mpsc::unbounded_channel(); let (tx, rx) = mpsc::unbounded_channel();
let pruner = Pruner { let pruner = Pruner {
config, config,
first_rewardable_chunk,
store, store,
sender: tx, sender: tx,
miner_sender, miner_sender,
reward_contract,
}; };
pruner.put_shard_config().await?; pruner.put_shard_config().await?;
executor.spawn( executor.spawn(
@ -66,20 +90,22 @@ impl Pruner {
pub async fn start(mut self) -> Result<()> { pub async fn start(mut self) -> Result<()> {
loop { loop {
// Check shard config update and prune unneeded data.
if let Some(delete_list) = self.maybe_update().await? { if let Some(delete_list) = self.maybe_update().await? {
info!(new_config = ?self.config.shard_config, "new shard config"); info!(new_config = ?self.config.shard_config, "new shard config");
self.put_shard_config().await?; self.put_shard_config().await?;
let mut batch = Vec::with_capacity(self.config.batch_size); self.prune_in_batch(delete_list).await?;
let mut iter = delete_list.peekable(); }
while let Some(index) = iter.next() {
batch.push(index); // Check no reward chunks and prune.
if batch.len() == self.config.batch_size || iter.peek().is_none() { let new_first_rewardable = self.reward_contract.first_rewardable_chunk().call().await?;
debug!(start = batch.first(), end = batch.last(), "prune batch"); if let Some(no_reward_list) = self
self.store.remove_chunks_batch(&batch).await?; .maybe_forward_first_rewardable(new_first_rewardable)
batch = Vec::with_capacity(self.config.batch_size); .await?
tokio::time::sleep(self.config.batch_wait_time).await; {
} self.prune_in_batch(no_reward_list).await?;
} self.put_first_rewardable_chunk_index(new_first_rewardable)
.await?;
} }
tokio::time::sleep(self.config.check_time).await; tokio::time::sleep(self.config.check_time).await;
} }
@ -92,7 +118,9 @@ impl Pruner {
config = ?self.config.shard_config, config = ?self.config.shard_config,
"maybe_update" "maybe_update"
); );
if current_size >= self.config.start_prune_size() { if current_size < self.config.start_prune_size() {
Ok(None)
} else {
// Update config and generate delete list should be done in a single lock to ensure // Update config and generate delete list should be done in a single lock to ensure
// the list is complete. // the list is complete.
let config = &mut self.config.shard_config; let config = &mut self.config.shard_config;
@ -110,11 +138,44 @@ impl Pruner {
// Generate delete list // Generate delete list
let flow_len = self.store.get_context().await?.1; let flow_len = self.store.get_context().await?.1;
let start_index = old_shard_id + (!rand_bit) as usize * old_num_shard; let start_index = old_shard_id + (!rand_bit) as usize * old_num_shard;
return Ok(Some(Box::new( Ok(Some(Box::new(
(start_index as u64..flow_len).step_by(config.num_shard), (start_index as u64..flow_len).step_by(config.num_shard),
))); )))
} }
Ok(None) }
async fn maybe_forward_first_rewardable(
&mut self,
new_first_rewardable: u64,
) -> Result<Option<Box<dyn Send + Iterator<Item = u64>>>> {
if self.first_rewardable_chunk == new_first_rewardable {
Ok(None)
} else if self.first_rewardable_chunk > new_first_rewardable {
bail!(
"Unexpected first_rewardable_chunk revert: old={} new={}",
self.first_rewardable_chunk,
new_first_rewardable
);
} else {
Ok(Some(Box::new(
self.first_rewardable_chunk..new_first_rewardable,
)))
}
}
async fn prune_in_batch(&self, to_prune: Box<dyn Send + Iterator<Item = u64>>) -> Result<()> {
let mut batch = Vec::with_capacity(self.config.batch_size);
let mut iter = to_prune.peekable();
while let Some(index) = iter.next() {
batch.push(index);
if batch.len() == self.config.batch_size || iter.peek().is_none() {
debug!(start = batch.first(), end = batch.last(), "prune batch");
self.store.remove_chunks_batch(&batch).await?;
batch = Vec::with_capacity(self.config.batch_size);
tokio::time::sleep(self.config.batch_wait_time).await;
}
}
Ok(())
} }
async fn put_shard_config(&self) -> Result<()> { async fn put_shard_config(&self) -> Result<()> {
@ -130,12 +191,25 @@ impl Pruner {
.set_config_encoded(&SHARD_CONFIG_KEY, &self.config.shard_config) .set_config_encoded(&SHARD_CONFIG_KEY, &self.config.shard_config)
.await .await
} }
async fn put_first_rewardable_chunk_index(
&self,
new_first_rewardable_chunk: u64,
) -> Result<()> {
self.store
.set_config_encoded(&FIRST_REWARDABLE_CHUNK_KEY, &new_first_rewardable_chunk)
.await
}
} }
async fn get_shard_config(store: &Store) -> Result<Option<ShardConfig>> { async fn get_shard_config(store: &Store) -> Result<Option<ShardConfig>> {
store.get_config_decoded(&SHARD_CONFIG_KEY).await store.get_config_decoded(&SHARD_CONFIG_KEY).await
} }
async fn get_first_rewardable_chunk(store: &Store) -> Result<Option<u64>> {
store.get_config_decoded(&FIRST_REWARDABLE_CHUNK_KEY).await
}
#[derive(Debug)] #[derive(Debug)]
pub enum PrunerMessage { pub enum PrunerMessage {
ChangeShardConfig(ShardConfig), ChangeShardConfig(ShardConfig),

View File

@ -203,6 +203,10 @@ impl ZgsConfig {
pub fn pruner_config(&self) -> Result<Option<PrunerConfig>, String> { pub fn pruner_config(&self) -> Result<Option<PrunerConfig>, String> {
if let Some(max_num_chunks) = self.db_max_num_chunks { if let Some(max_num_chunks) = self.db_max_num_chunks {
let shard_config = self.shard_config()?; let shard_config = self.shard_config()?;
let reward_address = self
.reward_contract_address
.parse::<ContractAddress>()
.map_err(|e| format!("Unable to parse reward_contract_address: {:?}", e))?;
Ok(Some(PrunerConfig { Ok(Some(PrunerConfig {
shard_config, shard_config,
db_path: self.db_dir.clone().into(), db_path: self.db_dir.clone().into(),
@ -210,6 +214,8 @@ impl ZgsConfig {
check_time: Duration::from_secs(self.prune_check_time_s), check_time: Duration::from_secs(self.prune_check_time_s),
batch_size: self.prune_batch_size, batch_size: self.prune_batch_size,
batch_wait_time: Duration::from_millis(self.prune_batch_wait_time_ms), batch_wait_time: Duration::from_millis(self.prune_batch_wait_time_ms),
rpc_endpoint_url: self.blockchain_rpc_endpoint.clone(),
reward_address,
})) }))
} else { } else {
Ok(None) Ok(None)

View File

@ -79,6 +79,7 @@ build_config! {
(miner_submission_gas, (Option<u64>), None) (miner_submission_gas, (Option<u64>), None)
(miner_cpu_percentage, (u64), 100) (miner_cpu_percentage, (u64), 100)
(mine_iter_batch_size, (usize), 100) (mine_iter_batch_size, (usize), 100)
(reward_contract_address, (String), "".to_string())
(shard_position, (Option<String>), None) (shard_position, (Option<String>), None)
} }