mirror of
				https://github.com/0glabs/0g-storage-node.git
				synced 2025-11-04 00:27:39 +00:00 
			
		
		
		
	Compare commits
	
		
			3 Commits
		
	
	
		
			891e00fa80
			...
			77d1b84974
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 
						 | 
					77d1b84974 | ||
| 
						 | 
					d80e7e22ca | ||
| 
						 | 
					6ade66c086 | 
							
								
								
									
										4
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										4
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							@ -5824,6 +5824,9 @@ name = "pruner"
 | 
			
		||||
version = "0.1.0"
 | 
			
		||||
dependencies = [
 | 
			
		||||
 "anyhow",
 | 
			
		||||
 "contract-interface",
 | 
			
		||||
 "ethereum-types 0.14.1",
 | 
			
		||||
 "ethers",
 | 
			
		||||
 "miner",
 | 
			
		||||
 "rand 0.8.5",
 | 
			
		||||
 "storage",
 | 
			
		||||
@ -5831,6 +5834,7 @@ dependencies = [
 | 
			
		||||
 "task_executor",
 | 
			
		||||
 "tokio",
 | 
			
		||||
 "tracing",
 | 
			
		||||
 "zgs_spec",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
 | 
			
		||||
@ -8,6 +8,12 @@ abigen!(ZgsFlow, "../../storage-contracts-abis/Flow.json");
 | 
			
		||||
#[cfg(not(feature = "dev"))]
 | 
			
		||||
abigen!(PoraMine, "../../storage-contracts-abis/PoraMine.json");
 | 
			
		||||
 | 
			
		||||
#[cfg(not(feature = "dev"))]
 | 
			
		||||
abigen!(
 | 
			
		||||
    ChunkLinearReward,
 | 
			
		||||
    "../../storage-contracts-abis/ChunkLinearReward.json"
 | 
			
		||||
);
 | 
			
		||||
 | 
			
		||||
#[cfg(feature = "dev")]
 | 
			
		||||
abigen!(
 | 
			
		||||
    ZgsFlow,
 | 
			
		||||
@ -19,3 +25,9 @@ abigen!(
 | 
			
		||||
    PoraMine,
 | 
			
		||||
    "../../0g-storage-contracts-dev/artifacts/contracts/miner/Mine.sol/PoraMine.json"
 | 
			
		||||
);
 | 
			
		||||
 | 
			
		||||
#[cfg(feature = "dev")]
 | 
			
		||||
abigen!(
 | 
			
		||||
    ChunkLinearReward,
 | 
			
		||||
    "../../0g-storage-contracts-dev/artifacts/contracts/reward/ChunkLinearReward.sol/ChunkLinearReward.json"
 | 
			
		||||
);
 | 
			
		||||
 | 
			
		||||
@ -12,3 +12,7 @@ tokio = "1.37.0"
 | 
			
		||||
rand = "0.8.5"
 | 
			
		||||
task_executor = { path = "../../common/task_executor" }
 | 
			
		||||
tracing = "0.1.40"
 | 
			
		||||
ethereum-types = "0.14.1"
 | 
			
		||||
contract-interface = { path = "../../common/contract-interface" }
 | 
			
		||||
ethers = "^2"
 | 
			
		||||
zgs_spec = { path = "../../common/spec" }
 | 
			
		||||
@ -1,18 +1,28 @@
 | 
			
		||||
use anyhow::Result;
 | 
			
		||||
use anyhow::{anyhow, bail, Result};
 | 
			
		||||
use contract_interface::ChunkLinearReward;
 | 
			
		||||
use ethereum_types::Address;
 | 
			
		||||
use ethers::prelude::{Http, Provider};
 | 
			
		||||
use miner::MinerMessage;
 | 
			
		||||
use rand::Rng;
 | 
			
		||||
use std::cmp::Ordering;
 | 
			
		||||
use std::path::PathBuf;
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
use std::time::Duration;
 | 
			
		||||
use storage::config::{ShardConfig, SHARD_CONFIG_KEY};
 | 
			
		||||
use storage::log_store::log_manager::PORA_CHUNK_SIZE;
 | 
			
		||||
use storage_async::Store;
 | 
			
		||||
use task_executor::TaskExecutor;
 | 
			
		||||
use tokio::sync::{broadcast, mpsc};
 | 
			
		||||
use tracing::{debug, info};
 | 
			
		||||
use zgs_spec::SECTORS_PER_PRICING;
 | 
			
		||||
 | 
			
		||||
// Start pruning when the db directory size exceeds 0.9 * limit.
 | 
			
		||||
const PRUNE_THRESHOLD: f32 = 0.9;
 | 
			
		||||
 | 
			
		||||
const FIRST_REWARDABLE_CHUNK_KEY: &str = "first_rewardable_chunk";
 | 
			
		||||
 | 
			
		||||
const CHUNKS_PER_PRICING: u64 = (SECTORS_PER_PRICING / PORA_CHUNK_SIZE) as u64;
 | 
			
		||||
 | 
			
		||||
#[derive(Debug)]
 | 
			
		||||
pub struct PrunerConfig {
 | 
			
		||||
    pub shard_config: ShardConfig,
 | 
			
		||||
@ -21,20 +31,33 @@ pub struct PrunerConfig {
 | 
			
		||||
    pub check_time: Duration,
 | 
			
		||||
    pub batch_size: usize,
 | 
			
		||||
    pub batch_wait_time: Duration,
 | 
			
		||||
 | 
			
		||||
    pub rpc_endpoint_url: String,
 | 
			
		||||
    pub reward_address: Address,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl PrunerConfig {
 | 
			
		||||
    fn start_prune_size(&self) -> u64 {
 | 
			
		||||
        (self.max_num_sectors as f32 * PRUNE_THRESHOLD) as u64
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn make_provider(&self) -> Result<Provider<Http>> {
 | 
			
		||||
        Provider::<Http>::try_from(&self.rpc_endpoint_url)
 | 
			
		||||
            .map_err(|e| anyhow!("Can not parse blockchain endpoint: {:?}", e))
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub struct Pruner {
 | 
			
		||||
    config: PrunerConfig,
 | 
			
		||||
    first_rewardable_chunk: u64,
 | 
			
		||||
    first_tx_seq: u64,
 | 
			
		||||
 | 
			
		||||
    store: Arc<Store>,
 | 
			
		||||
 | 
			
		||||
    sender: mpsc::UnboundedSender<PrunerMessage>,
 | 
			
		||||
    miner_sender: Option<broadcast::Sender<MinerMessage>>,
 | 
			
		||||
 | 
			
		||||
    reward_contract: ChunkLinearReward<Provider<Http>>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Pruner {
 | 
			
		||||
@ -47,12 +70,20 @@ impl Pruner {
 | 
			
		||||
        if let Some(shard_config) = get_shard_config(store.as_ref()).await? {
 | 
			
		||||
            config.shard_config = shard_config;
 | 
			
		||||
        }
 | 
			
		||||
        let (first_rewardable_chunk, first_tx_seq) = get_first_rewardable_chunk(store.as_ref())
 | 
			
		||||
            .await?
 | 
			
		||||
            .unwrap_or((0, 0));
 | 
			
		||||
        let reward_contract =
 | 
			
		||||
            ChunkLinearReward::new(config.reward_address, Arc::new(config.make_provider()?));
 | 
			
		||||
        let (tx, rx) = mpsc::unbounded_channel();
 | 
			
		||||
        let pruner = Pruner {
 | 
			
		||||
            config,
 | 
			
		||||
            first_rewardable_chunk,
 | 
			
		||||
            first_tx_seq,
 | 
			
		||||
            store,
 | 
			
		||||
            sender: tx,
 | 
			
		||||
            miner_sender,
 | 
			
		||||
            reward_contract,
 | 
			
		||||
        };
 | 
			
		||||
        pruner.put_shard_config().await?;
 | 
			
		||||
        executor.spawn(
 | 
			
		||||
@ -66,20 +97,36 @@ impl Pruner {
 | 
			
		||||
 | 
			
		||||
    pub async fn start(mut self) -> Result<()> {
 | 
			
		||||
        loop {
 | 
			
		||||
            // Check shard config update and prune unneeded data.
 | 
			
		||||
            if let Some(delete_list) = self.maybe_update().await? {
 | 
			
		||||
                info!(new_config = ?self.config.shard_config, "new shard config");
 | 
			
		||||
                self.put_shard_config().await?;
 | 
			
		||||
                let mut batch = Vec::with_capacity(self.config.batch_size);
 | 
			
		||||
                let mut iter = delete_list.peekable();
 | 
			
		||||
                while let Some(index) = iter.next() {
 | 
			
		||||
                    batch.push(index);
 | 
			
		||||
                    if batch.len() == self.config.batch_size || iter.peek().is_none() {
 | 
			
		||||
                        debug!(start = batch.first(), end = batch.last(), "prune batch");
 | 
			
		||||
                        self.store.remove_chunks_batch(&batch).await?;
 | 
			
		||||
                        batch = Vec::with_capacity(self.config.batch_size);
 | 
			
		||||
                        tokio::time::sleep(self.config.batch_wait_time).await;
 | 
			
		||||
                    }
 | 
			
		||||
                self.prune_in_batch(delete_list).await?;
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            // Check no reward chunks and prune.
 | 
			
		||||
            let new_first_rewardable = self.reward_contract.first_rewardable_chunk().call().await?;
 | 
			
		||||
            if let Some(no_reward_list) = self
 | 
			
		||||
                .maybe_forward_first_rewardable(new_first_rewardable)
 | 
			
		||||
                .await?
 | 
			
		||||
            {
 | 
			
		||||
                info!(
 | 
			
		||||
                    ?new_first_rewardable,
 | 
			
		||||
                    "first rewardable chunk moves forward, start pruning"
 | 
			
		||||
                );
 | 
			
		||||
                self.prune_tx(
 | 
			
		||||
                    self.first_rewardable_chunk * SECTORS_PER_PRICING as u64,
 | 
			
		||||
                    new_first_rewardable * SECTORS_PER_PRICING as u64,
 | 
			
		||||
                )
 | 
			
		||||
                .await?;
 | 
			
		||||
                self.prune_in_batch(no_reward_list).await?;
 | 
			
		||||
 | 
			
		||||
                self.first_rewardable_chunk = new_first_rewardable;
 | 
			
		||||
                self.put_first_rewardable_chunk_index(
 | 
			
		||||
                    self.first_rewardable_chunk,
 | 
			
		||||
                    self.first_tx_seq,
 | 
			
		||||
                )
 | 
			
		||||
                .await?;
 | 
			
		||||
            }
 | 
			
		||||
            tokio::time::sleep(self.config.check_time).await;
 | 
			
		||||
        }
 | 
			
		||||
@ -92,7 +139,9 @@ impl Pruner {
 | 
			
		||||
            config = ?self.config.shard_config,
 | 
			
		||||
            "maybe_update"
 | 
			
		||||
        );
 | 
			
		||||
        if current_size >= self.config.start_prune_size() {
 | 
			
		||||
        if current_size < self.config.start_prune_size() {
 | 
			
		||||
            Ok(None)
 | 
			
		||||
        } else {
 | 
			
		||||
            // Update config and generate delete list should be done in a single lock to ensure
 | 
			
		||||
            // the list is complete.
 | 
			
		||||
            let config = &mut self.config.shard_config;
 | 
			
		||||
@ -108,13 +157,72 @@ impl Pruner {
 | 
			
		||||
            config.num_shard *= 2;
 | 
			
		||||
 | 
			
		||||
            // Generate delete list
 | 
			
		||||
            let flow_len = self.store.get_context().await?.1;
 | 
			
		||||
            let flow_len = self
 | 
			
		||||
                .store
 | 
			
		||||
                .get_context()
 | 
			
		||||
                .await?
 | 
			
		||||
                .1
 | 
			
		||||
                .div_ceil(PORA_CHUNK_SIZE as u64);
 | 
			
		||||
            let start_index = old_shard_id + (!rand_bit) as usize * old_num_shard;
 | 
			
		||||
            return Ok(Some(Box::new(
 | 
			
		||||
            Ok(Some(Box::new(
 | 
			
		||||
                (start_index as u64..flow_len).step_by(config.num_shard),
 | 
			
		||||
            )));
 | 
			
		||||
            )))
 | 
			
		||||
        }
 | 
			
		||||
        Ok(None)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn maybe_forward_first_rewardable(
 | 
			
		||||
        &mut self,
 | 
			
		||||
        new_first_rewardable: u64,
 | 
			
		||||
    ) -> Result<Option<Box<dyn Send + Iterator<Item = u64>>>> {
 | 
			
		||||
        match self.first_rewardable_chunk.cmp(&new_first_rewardable) {
 | 
			
		||||
            Ordering::Less => Ok(Some(Box::new(
 | 
			
		||||
                self.first_rewardable_chunk * CHUNKS_PER_PRICING
 | 
			
		||||
                    ..new_first_rewardable * CHUNKS_PER_PRICING,
 | 
			
		||||
            ))),
 | 
			
		||||
            Ordering::Equal => Ok(None),
 | 
			
		||||
            Ordering::Greater => {
 | 
			
		||||
                bail!(
 | 
			
		||||
                    "Unexpected first_rewardable_chunk revert: old={} new={}",
 | 
			
		||||
                    self.first_rewardable_chunk,
 | 
			
		||||
                    new_first_rewardable
 | 
			
		||||
                );
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn prune_in_batch(&self, to_prune: Box<dyn Send + Iterator<Item = u64>>) -> Result<()> {
 | 
			
		||||
        let mut batch = Vec::with_capacity(self.config.batch_size);
 | 
			
		||||
        let mut iter = to_prune.peekable();
 | 
			
		||||
        while let Some(index) = iter.next() {
 | 
			
		||||
            batch.push(index);
 | 
			
		||||
            if batch.len() == self.config.batch_size || iter.peek().is_none() {
 | 
			
		||||
                debug!(start = batch.first(), end = batch.last(), "prune batch");
 | 
			
		||||
                self.store.remove_chunks_batch(&batch).await?;
 | 
			
		||||
                batch = Vec::with_capacity(self.config.batch_size);
 | 
			
		||||
                tokio::time::sleep(self.config.batch_wait_time).await;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn prune_tx(&mut self, start_sector: u64, end_sector: u64) -> Result<()> {
 | 
			
		||||
        while let Some(tx) = self.store.get_tx_by_seq_number(self.first_tx_seq).await? {
 | 
			
		||||
            // If a part of the tx data is pruned, we mark the tx as pruned.
 | 
			
		||||
            if tx.start_entry_index() >= start_sector && tx.start_entry_index() < end_sector {
 | 
			
		||||
                self.store.prune_tx(tx.seq).await?;
 | 
			
		||||
            } else if tx.start_entry_index() >= end_sector {
 | 
			
		||||
                break;
 | 
			
		||||
            } else {
 | 
			
		||||
                bail!(
 | 
			
		||||
                    "prune tx out of range: tx={:?}, start={} end={}",
 | 
			
		||||
                    tx,
 | 
			
		||||
                    start_sector,
 | 
			
		||||
                    end_sector
 | 
			
		||||
                );
 | 
			
		||||
            }
 | 
			
		||||
            self.first_tx_seq += 1;
 | 
			
		||||
        }
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn put_shard_config(&self) -> Result<()> {
 | 
			
		||||
@ -130,12 +238,29 @@ impl Pruner {
 | 
			
		||||
            .set_config_encoded(&SHARD_CONFIG_KEY, &self.config.shard_config)
 | 
			
		||||
            .await
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn put_first_rewardable_chunk_index(
 | 
			
		||||
        &self,
 | 
			
		||||
        new_first_rewardable_chunk: u64,
 | 
			
		||||
        new_first_tx_seq: u64,
 | 
			
		||||
    ) -> Result<()> {
 | 
			
		||||
        self.store
 | 
			
		||||
            .set_config_encoded(
 | 
			
		||||
                &FIRST_REWARDABLE_CHUNK_KEY,
 | 
			
		||||
                &(new_first_rewardable_chunk, new_first_tx_seq),
 | 
			
		||||
            )
 | 
			
		||||
            .await
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async fn get_shard_config(store: &Store) -> Result<Option<ShardConfig>> {
 | 
			
		||||
    store.get_config_decoded(&SHARD_CONFIG_KEY).await
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async fn get_first_rewardable_chunk(store: &Store) -> Result<Option<(u64, u64)>> {
 | 
			
		||||
    store.get_config_decoded(&FIRST_REWARDABLE_CHUNK_KEY).await
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Debug)]
 | 
			
		||||
pub enum PrunerMessage {
 | 
			
		||||
    ChangeShardConfig(ShardConfig),
 | 
			
		||||
 | 
			
		||||
@ -2,7 +2,7 @@ use crate::types::{LocationInfo, NetworkInfo, PeerInfo};
 | 
			
		||||
use jsonrpsee::core::RpcResult;
 | 
			
		||||
use jsonrpsee::proc_macros::rpc;
 | 
			
		||||
use std::collections::HashMap;
 | 
			
		||||
use sync::FileSyncInfo;
 | 
			
		||||
use sync::{FileSyncInfo, SyncServiceState};
 | 
			
		||||
 | 
			
		||||
#[rpc(server, client, namespace = "admin")]
 | 
			
		||||
pub trait Rpc {
 | 
			
		||||
@ -27,6 +27,9 @@ pub trait Rpc {
 | 
			
		||||
    #[method(name = "terminateSync")]
 | 
			
		||||
    async fn terminate_sync(&self, tx_seq: u64) -> RpcResult<bool>;
 | 
			
		||||
 | 
			
		||||
    #[method(name = "getSyncServiceState")]
 | 
			
		||||
    async fn get_sync_service_state(&self) -> RpcResult<SyncServiceState>;
 | 
			
		||||
 | 
			
		||||
    #[method(name = "getSyncStatus")]
 | 
			
		||||
    async fn get_sync_status(&self, tx_seq: u64) -> RpcResult<String>;
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -8,7 +8,7 @@ use network::{multiaddr::Protocol, Multiaddr};
 | 
			
		||||
use std::collections::HashMap;
 | 
			
		||||
use std::net::IpAddr;
 | 
			
		||||
use storage::config::all_shards_available;
 | 
			
		||||
use sync::{FileSyncInfo, SyncRequest, SyncResponse};
 | 
			
		||||
use sync::{FileSyncInfo, SyncRequest, SyncResponse, SyncServiceState};
 | 
			
		||||
use task_executor::ShutdownReason;
 | 
			
		||||
 | 
			
		||||
pub struct RpcServerImpl {
 | 
			
		||||
@ -119,6 +119,17 @@ impl RpcServer for RpcServerImpl {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn get_sync_service_state(&self) -> RpcResult<SyncServiceState> {
 | 
			
		||||
        info!("admin_getSyncServiceState()");
 | 
			
		||||
 | 
			
		||||
        let response = self.ctx.request_sync(SyncRequest::SyncState).await?;
 | 
			
		||||
 | 
			
		||||
        match response {
 | 
			
		||||
            SyncResponse::SyncState { state } => Ok(state),
 | 
			
		||||
            _ => Err(error::internal_error("unexpected response type")),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[tracing::instrument(skip(self), err)]
 | 
			
		||||
    async fn get_sync_status(&self, tx_seq: u64) -> RpcResult<String> {
 | 
			
		||||
        info!("admin_getSyncStatus({tx_seq})");
 | 
			
		||||
 | 
			
		||||
@ -195,6 +195,10 @@ impl RpcServerImpl {
 | 
			
		||||
                ));
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            if self.ctx.log_store.check_tx_pruned(tx.seq).await? {
 | 
			
		||||
                return Err(error::invalid_params("root", "already pruned"));
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            Ok(false)
 | 
			
		||||
        } else {
 | 
			
		||||
            //Check whether file is small enough to cache in the system
 | 
			
		||||
 | 
			
		||||
@ -203,6 +203,10 @@ impl ZgsConfig {
 | 
			
		||||
    pub fn pruner_config(&self) -> Result<Option<PrunerConfig>, String> {
 | 
			
		||||
        if let Some(max_num_sectors) = self.db_max_num_sectors {
 | 
			
		||||
            let shard_config = self.shard_config()?;
 | 
			
		||||
            let reward_address = self
 | 
			
		||||
                .reward_contract_address
 | 
			
		||||
                .parse::<ContractAddress>()
 | 
			
		||||
                .map_err(|e| format!("Unable to parse reward_contract_address: {:?}", e))?;
 | 
			
		||||
            Ok(Some(PrunerConfig {
 | 
			
		||||
                shard_config,
 | 
			
		||||
                db_path: self.db_dir.clone().into(),
 | 
			
		||||
@ -210,6 +214,8 @@ impl ZgsConfig {
 | 
			
		||||
                check_time: Duration::from_secs(self.prune_check_time_s),
 | 
			
		||||
                batch_size: self.prune_batch_size,
 | 
			
		||||
                batch_wait_time: Duration::from_millis(self.prune_batch_wait_time_ms),
 | 
			
		||||
                rpc_endpoint_url: self.blockchain_rpc_endpoint.clone(),
 | 
			
		||||
                reward_address,
 | 
			
		||||
            }))
 | 
			
		||||
        } else {
 | 
			
		||||
            Ok(None)
 | 
			
		||||
 | 
			
		||||
@ -65,7 +65,7 @@ build_config! {
 | 
			
		||||
    (db_dir, (String), "db".to_string())
 | 
			
		||||
    (db_max_num_sectors, (Option<usize>), None)
 | 
			
		||||
    (prune_check_time_s, (u64), 60)
 | 
			
		||||
    (prune_batch_size, (usize), 1024)
 | 
			
		||||
    (prune_batch_size, (usize), 16 * 1024)
 | 
			
		||||
    (prune_batch_wait_time_ms, (u64), 1000)
 | 
			
		||||
 | 
			
		||||
    // misc
 | 
			
		||||
@ -79,6 +79,7 @@ build_config! {
 | 
			
		||||
    (miner_submission_gas, (Option<u64>), None)
 | 
			
		||||
    (miner_cpu_percentage, (u64), 100)
 | 
			
		||||
    (mine_iter_batch_size, (usize), 100)
 | 
			
		||||
    (reward_contract_address, (String), "".to_string())
 | 
			
		||||
    (shard_position, (Option<String>), None)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -20,10 +20,13 @@ pub fn configure(log_level_file: &str, log_directory: &str, executor: TaskExecut
 | 
			
		||||
    let handle = builder.reload_handle();
 | 
			
		||||
    builder.init();
 | 
			
		||||
 | 
			
		||||
    let level_file = log_level_file.to_string();
 | 
			
		||||
    let level_file = log_level_file.trim_end().to_string();
 | 
			
		||||
 | 
			
		||||
    // load config synchronously
 | 
			
		||||
    let mut config = std::fs::read_to_string(&level_file).unwrap_or_default();
 | 
			
		||||
    let mut config = std::fs::read_to_string(&level_file)
 | 
			
		||||
        .unwrap_or_default()
 | 
			
		||||
        .trim_end()
 | 
			
		||||
        .to_string();
 | 
			
		||||
    let _ = handle.reload(&config);
 | 
			
		||||
 | 
			
		||||
    // periodically check for config changes
 | 
			
		||||
@ -38,8 +41,14 @@ pub fn configure(log_level_file: &str, log_directory: &str, executor: TaskExecut
 | 
			
		||||
                interval.tick().await;
 | 
			
		||||
 | 
			
		||||
                let new_config = match tokio::fs::read_to_string(&level_file).await {
 | 
			
		||||
                    Ok(c) if c == config => continue,
 | 
			
		||||
                    Ok(c) => c,
 | 
			
		||||
                    Ok(c) => {
 | 
			
		||||
                        let nc = c.trim_end().to_string();
 | 
			
		||||
                        if nc == config {
 | 
			
		||||
                            continue;
 | 
			
		||||
                        } else {
 | 
			
		||||
                            nc
 | 
			
		||||
                        }
 | 
			
		||||
                    }
 | 
			
		||||
                    Err(e) => {
 | 
			
		||||
                        println!("Unable to read log file {}: {:?}", level_file, e);
 | 
			
		||||
                        continue;
 | 
			
		||||
 | 
			
		||||
@ -45,6 +45,7 @@ impl Store {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    delegate!(fn check_tx_completed(tx_seq: u64) -> Result<bool>);
 | 
			
		||||
    delegate!(fn check_tx_pruned(tx_seq: u64) -> Result<bool>);
 | 
			
		||||
    delegate!(fn get_chunk_by_tx_and_index(tx_seq: u64, index: usize) -> Result<Option<Chunk>>);
 | 
			
		||||
    delegate!(fn get_chunks_by_tx_and_index_range(tx_seq: u64, index_start: usize, index_end: usize) -> Result<Option<ChunkArray>>);
 | 
			
		||||
    delegate!(fn get_chunks_with_proof_by_tx_and_index_range(tx_seq: u64, index_start: usize, index_end: usize, merkle_tx_seq: Option<u64>) -> Result<Option<ChunkArrayWithProof>>);
 | 
			
		||||
@ -53,6 +54,7 @@ impl Store {
 | 
			
		||||
    delegate!(fn put_chunks_with_tx_hash(tx_seq: u64, tx_hash: H256, chunks: ChunkArray, maybe_file_proof: Option<FlowProof>) -> Result<bool>);
 | 
			
		||||
    delegate!(fn get_chunk_by_flow_index(index: u64, length: u64) -> Result<Option<ChunkArray>>);
 | 
			
		||||
    delegate!(fn finalize_tx(tx_seq: u64) -> Result<()>);
 | 
			
		||||
    delegate!(fn prune_tx(tx_seq: u64) -> Result<()>);
 | 
			
		||||
    delegate!(fn finalize_tx_with_hash(tx_seq: u64, tx_hash: H256) -> Result<bool>);
 | 
			
		||||
    delegate!(fn get_proof_at_root(root: Option<DataRoot>, index: u64, length: u64) -> Result<FlowRangeProof>);
 | 
			
		||||
    delegate!(fn get_context() -> Result<(DataRoot, u64)>);
 | 
			
		||||
 | 
			
		||||
@ -337,6 +337,10 @@ impl LogStoreWrite for LogManager {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn prune_tx(&self, tx_seq: u64) -> crate::error::Result<()> {
 | 
			
		||||
        self.tx_store.prune_tx(tx_seq)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn put_sync_progress(&self, progress: (u64, H256, Option<Option<u64>>)) -> Result<()> {
 | 
			
		||||
        self.tx_store.put_progress(progress)
 | 
			
		||||
    }
 | 
			
		||||
@ -563,6 +567,10 @@ impl LogStoreRead for LogManager {
 | 
			
		||||
            merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64,
 | 
			
		||||
        ))
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn check_tx_pruned(&self, tx_seq: u64) -> crate::error::Result<bool> {
 | 
			
		||||
        self.tx_store.check_tx_pruned(tx_seq)
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl LogManager {
 | 
			
		||||
 | 
			
		||||
@ -53,6 +53,8 @@ pub trait LogStoreRead: LogStoreChunkRead {
 | 
			
		||||
 | 
			
		||||
    fn check_tx_completed(&self, tx_seq: u64) -> Result<bool>;
 | 
			
		||||
 | 
			
		||||
    fn check_tx_pruned(&self, tx_seq: u64) -> Result<bool>;
 | 
			
		||||
 | 
			
		||||
    fn next_tx_seq(&self) -> u64;
 | 
			
		||||
 | 
			
		||||
    fn get_sync_progress(&self) -> Result<Option<(u64, H256)>>;
 | 
			
		||||
@ -118,6 +120,8 @@ pub trait LogStoreWrite: LogStoreChunkWrite {
 | 
			
		||||
    /// the caller is supposed to track chunk statuses and call this after storing all the chunks.
 | 
			
		||||
    fn finalize_tx(&self, tx_seq: u64) -> Result<()>;
 | 
			
		||||
    fn finalize_tx_with_hash(&self, tx_seq: u64, tx_hash: H256) -> Result<bool>;
 | 
			
		||||
    /// Mark the tx as pruned, meaning the data will not be stored.
 | 
			
		||||
    fn prune_tx(&self, tx_seq: u64) -> Result<()>;
 | 
			
		||||
 | 
			
		||||
    /// Store the progress of synced block number and its hash.
 | 
			
		||||
    fn put_sync_progress(&self, progress: (u64, H256, Option<Option<u64>>)) -> Result<()>;
 | 
			
		||||
 | 
			
		||||
@ -20,6 +20,9 @@ use tracing::{error, instrument};
 | 
			
		||||
const LOG_SYNC_PROGRESS_KEY: &str = "log_sync_progress";
 | 
			
		||||
const NEXT_TX_KEY: &str = "next_tx_seq";
 | 
			
		||||
 | 
			
		||||
const TX_STATUS_FINALIZED: u8 = 0;
 | 
			
		||||
const TX_STATUS_PRUNED: u8 = 1;
 | 
			
		||||
 | 
			
		||||
#[derive(Clone, Debug)]
 | 
			
		||||
pub struct BlockHashAndSubmissionIndex {
 | 
			
		||||
    pub block_hash: H256,
 | 
			
		||||
@ -156,13 +159,27 @@ impl TransactionStore {
 | 
			
		||||
 | 
			
		||||
    #[instrument(skip(self))]
 | 
			
		||||
    pub fn finalize_tx(&self, tx_seq: u64) -> Result<()> {
 | 
			
		||||
        Ok(self.kvdb.put(
 | 
			
		||||
            COL_TX_COMPLETED,
 | 
			
		||||
            &tx_seq.to_be_bytes(),
 | 
			
		||||
            &[TX_STATUS_FINALIZED],
 | 
			
		||||
        )?)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[instrument(skip(self))]
 | 
			
		||||
    pub fn prune_tx(&self, tx_seq: u64) -> Result<()> {
 | 
			
		||||
        Ok(self
 | 
			
		||||
            .kvdb
 | 
			
		||||
            .put(COL_TX_COMPLETED, &tx_seq.to_be_bytes(), &[0])?)
 | 
			
		||||
            .put(COL_TX_COMPLETED, &tx_seq.to_be_bytes(), &[TX_STATUS_PRUNED])?)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn check_tx_completed(&self, tx_seq: u64) -> Result<bool> {
 | 
			
		||||
        Ok(self.kvdb.has_key(COL_TX_COMPLETED, &tx_seq.to_be_bytes())?)
 | 
			
		||||
        Ok(self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())?
 | 
			
		||||
            == Some(vec![TX_STATUS_FINALIZED]))
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn check_tx_pruned(&self, tx_seq: u64) -> Result<bool> {
 | 
			
		||||
        Ok(self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())? == Some(vec![TX_STATUS_PRUNED]))
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn next_tx_seq(&self) -> u64 {
 | 
			
		||||
 | 
			
		||||
@ -87,7 +87,9 @@ impl Batcher {
 | 
			
		||||
 | 
			
		||||
    async fn poll_tx(&self, tx_seq: u64) -> Result<Option<SyncResult>> {
 | 
			
		||||
        // file already exists
 | 
			
		||||
        if self.store.check_tx_completed(tx_seq).await? {
 | 
			
		||||
        if self.store.check_tx_completed(tx_seq).await?
 | 
			
		||||
            || self.store.check_tx_pruned(tx_seq).await?
 | 
			
		||||
        {
 | 
			
		||||
            // File may be finalized during file sync, e.g. user uploaded file via RPC.
 | 
			
		||||
            // In this case, just terminate the file sync.
 | 
			
		||||
            let num_terminated = self.terminate_file_sync(tx_seq, false).await;
 | 
			
		||||
 | 
			
		||||
@ -7,9 +7,10 @@ mod controllers;
 | 
			
		||||
mod service;
 | 
			
		||||
pub mod test_util;
 | 
			
		||||
 | 
			
		||||
use auto_sync::{batcher_random::RandomBatcherState, batcher_serial::SerialBatcherState};
 | 
			
		||||
pub use controllers::FileSyncInfo;
 | 
			
		||||
use duration_str::deserialize_duration;
 | 
			
		||||
use serde::Deserialize;
 | 
			
		||||
use serde::{Deserialize, Serialize};
 | 
			
		||||
pub use service::{SyncMessage, SyncReceiver, SyncRequest, SyncResponse, SyncSender, SyncService};
 | 
			
		||||
use std::{
 | 
			
		||||
    fmt::Debug,
 | 
			
		||||
@ -64,3 +65,11 @@ impl InstantWrapper {
 | 
			
		||||
        self.0.elapsed()
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Debug, Serialize, Deserialize)]
 | 
			
		||||
#[serde(rename_all = "camelCase")]
 | 
			
		||||
pub struct SyncServiceState {
 | 
			
		||||
    pub num_syncing: usize,
 | 
			
		||||
    pub auto_sync_serial: Option<SerialBatcherState>,
 | 
			
		||||
    pub auto_sync_random: Option<RandomBatcherState>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -4,7 +4,7 @@ use crate::controllers::{
 | 
			
		||||
    FailureReason, FileSyncGoal, FileSyncInfo, SerialSyncController, SyncState,
 | 
			
		||||
    MAX_CHUNKS_TO_REQUEST,
 | 
			
		||||
};
 | 
			
		||||
use crate::Config;
 | 
			
		||||
use crate::{Config, SyncServiceState};
 | 
			
		||||
use anyhow::{bail, Result};
 | 
			
		||||
use file_location_cache::FileLocationCache;
 | 
			
		||||
use libp2p::swarm::DialError;
 | 
			
		||||
@ -74,6 +74,7 @@ pub enum SyncMessage {
 | 
			
		||||
 | 
			
		||||
#[derive(Debug)]
 | 
			
		||||
pub enum SyncRequest {
 | 
			
		||||
    SyncState,
 | 
			
		||||
    SyncStatus {
 | 
			
		||||
        tx_seq: u64,
 | 
			
		||||
    },
 | 
			
		||||
@ -99,6 +100,7 @@ pub enum SyncRequest {
 | 
			
		||||
 | 
			
		||||
#[derive(Debug)]
 | 
			
		||||
pub enum SyncResponse {
 | 
			
		||||
    SyncState { state: SyncServiceState },
 | 
			
		||||
    SyncStatus { status: Option<SyncState> },
 | 
			
		||||
    SyncFile { err: String },
 | 
			
		||||
    FileSyncInfo { result: HashMap<u64, FileSyncInfo> },
 | 
			
		||||
@ -278,6 +280,23 @@ impl SyncService {
 | 
			
		||||
        sender: channel::ResponseSender<SyncResponse>,
 | 
			
		||||
    ) {
 | 
			
		||||
        match req {
 | 
			
		||||
            SyncRequest::SyncState => {
 | 
			
		||||
                let state = match &self.auto_sync_manager {
 | 
			
		||||
                    Some(manager) => SyncServiceState {
 | 
			
		||||
                        num_syncing: self.controllers.len(),
 | 
			
		||||
                        auto_sync_serial: Some(manager.serial.get_state().await),
 | 
			
		||||
                        auto_sync_random: manager.random.get_state().await.ok(),
 | 
			
		||||
                    },
 | 
			
		||||
                    None => SyncServiceState {
 | 
			
		||||
                        num_syncing: self.controllers.len(),
 | 
			
		||||
                        auto_sync_serial: None,
 | 
			
		||||
                        auto_sync_random: None,
 | 
			
		||||
                    },
 | 
			
		||||
                };
 | 
			
		||||
 | 
			
		||||
                let _ = sender.send(SyncResponse::SyncState { state });
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            SyncRequest::SyncStatus { tx_seq } => {
 | 
			
		||||
                let status = self
 | 
			
		||||
                    .controllers
 | 
			
		||||
@ -552,7 +571,9 @@ impl SyncService {
 | 
			
		||||
 | 
			
		||||
    async fn on_find_file(&mut self, tx_seq: u64) -> Result<()> {
 | 
			
		||||
        // file already exists
 | 
			
		||||
        if self.store.check_tx_completed(tx_seq).await? {
 | 
			
		||||
        if self.store.check_tx_completed(tx_seq).await?
 | 
			
		||||
            || self.store.check_tx_pruned(tx_seq).await?
 | 
			
		||||
        {
 | 
			
		||||
            return Ok(());
 | 
			
		||||
        }
 | 
			
		||||
        // broadcast find file
 | 
			
		||||
@ -612,7 +633,9 @@ impl SyncService {
 | 
			
		||||
                };
 | 
			
		||||
 | 
			
		||||
                // file already exists
 | 
			
		||||
                if self.store.check_tx_completed(tx_seq).await? {
 | 
			
		||||
                if self.store.check_tx_completed(tx_seq).await?
 | 
			
		||||
                    || self.store.check_tx_pruned(tx_seq).await?
 | 
			
		||||
                {
 | 
			
		||||
                    bail!("File already exists");
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
@ -684,6 +707,15 @@ impl SyncService {
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        match self.store.check_tx_pruned(tx_seq).await {
 | 
			
		||||
            Ok(true) => return,
 | 
			
		||||
            Ok(false) => {}
 | 
			
		||||
            Err(err) => {
 | 
			
		||||
                error!(%tx_seq, %err, "Failed to check if file pruned");
 | 
			
		||||
                return;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // Now, always sync files among all nodes
 | 
			
		||||
        if let Err(err) = self
 | 
			
		||||
            .on_start_sync_file(tx_seq, None, Some((peer_id, addr)))
 | 
			
		||||
 | 
			
		||||
@ -2,8 +2,10 @@
 | 
			
		||||
import time
 | 
			
		||||
 | 
			
		||||
from test_framework.test_framework import TestFramework
 | 
			
		||||
from config.node_config import GENESIS_PRIV_KEY
 | 
			
		||||
from mine_with_market_test import PRICE_PER_SECTOR
 | 
			
		||||
from utility.submission import create_submission, submit_data
 | 
			
		||||
from utility.utils import wait_until, assert_equal
 | 
			
		||||
from utility.utils import wait_until, assert_equal, estimate_st_performance
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class PrunerTest(TestFramework):
 | 
			
		||||
@ -13,34 +15,46 @@ class PrunerTest(TestFramework):
 | 
			
		||||
        self.num_nodes = 1
 | 
			
		||||
        self.zgs_node_configs[0] = {
 | 
			
		||||
            "db_max_num_sectors": 16 * 1024,
 | 
			
		||||
            # "db_max_num_sectors": 32 * 1024 * 1024,
 | 
			
		||||
            "prune_check_time_s": 1,
 | 
			
		||||
            "prune_batch_wait_time_ms": 10,
 | 
			
		||||
            "prune_batch_wait_time_ms": 1,
 | 
			
		||||
        }
 | 
			
		||||
        self.enable_market = True
 | 
			
		||||
        self.mine_period = int(45 / self.block_time)
 | 
			
		||||
        self.lifetime_seconds = 240
 | 
			
		||||
        self.launch_wait_seconds = 15
 | 
			
		||||
        self.log.info("Contract Info: Est. block time %.2f, Mine period %d", self.block_time, self.mine_period)
 | 
			
		||||
 | 
			
		||||
    def run_test(self):
 | 
			
		||||
        client = self.nodes[0]
 | 
			
		||||
 | 
			
		||||
        chunk_data = b"\x02" * 16 * 256 * 1024
 | 
			
		||||
        # chunk_data = b"\x02" * 5 * 1024 * 1024 * 1024
 | 
			
		||||
        submissions, data_root = create_submission(chunk_data)
 | 
			
		||||
        self.contract.submit(submissions)
 | 
			
		||||
        self.contract.submit(submissions, tx_prarams = {"value": int(len(chunk_data) / 256 * PRICE_PER_SECTOR * 1.1)})
 | 
			
		||||
        wait_until(lambda: self.contract.num_submissions() == 1)
 | 
			
		||||
        wait_until(lambda: client.zgs_get_file_info(data_root) is not None)
 | 
			
		||||
 | 
			
		||||
        segment = submit_data(client, chunk_data)
 | 
			
		||||
        self.log.info("segment: %s", len(segment))
 | 
			
		||||
        # Wait for 1 sec for the shard config to be updated
 | 
			
		||||
        time.sleep(1)
 | 
			
		||||
        time.sleep(2)
 | 
			
		||||
        shard_config = client.rpc.zgs_getShardConfig()
 | 
			
		||||
        shard_id = int(shard_config["shardId"])
 | 
			
		||||
        num_shard = int(shard_config["numShard"])
 | 
			
		||||
 | 
			
		||||
        # wait_until(lambda: self.reward_contract.first_rewardable_chunk() != 0, timeout=180)
 | 
			
		||||
        # first_rewardable = self.reward_contract.first_rewardable_chunk() * 32 * 1024
 | 
			
		||||
        # Wait for 1 sec for the no reward segments to be pruned.
 | 
			
		||||
        time.sleep(1)
 | 
			
		||||
        # Wait for chunks to be removed.
 | 
			
		||||
        for i in range(len(segment)):
 | 
			
		||||
            seg = client.zgs_download_segment(data_root, i * 1024, (i + 1) * 1024)
 | 
			
		||||
            if i % num_shard == shard_id:
 | 
			
		||||
                # base64 encoding size
 | 
			
		||||
                assert_equal(len(seg), 349528)
 | 
			
		||||
            else:
 | 
			
		||||
            # if i < first_rewardable or i % num_shard != shard_id:
 | 
			
		||||
            if i % num_shard != shard_id:
 | 
			
		||||
                assert_equal(seg, None)
 | 
			
		||||
            else:
 | 
			
		||||
                assert_equal(len(seg), 349528)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if __name__ == "__main__":
 | 
			
		||||
 | 
			
		||||
@ -2,6 +2,7 @@
 | 
			
		||||
import time
 | 
			
		||||
 | 
			
		||||
from test_framework.test_framework import TestFramework
 | 
			
		||||
from mine_with_market_test import PRICE_PER_SECTOR
 | 
			
		||||
from utility.submission import create_submission, submit_data, data_to_segments
 | 
			
		||||
from utility.utils import wait_until, assert_equal
 | 
			
		||||
 | 
			
		||||
@ -23,13 +24,14 @@ class PrunerTest(TestFramework):
 | 
			
		||||
            "db_max_num_sectors": 2 ** 30,
 | 
			
		||||
            "shard_position": "1/4"
 | 
			
		||||
        }
 | 
			
		||||
        self.enable_market = True
 | 
			
		||||
 | 
			
		||||
    def run_test(self):
 | 
			
		||||
        client = self.nodes[0]
 | 
			
		||||
 | 
			
		||||
        chunk_data = b"\x02" * 8 * 256 * 1024
 | 
			
		||||
        submissions, data_root = create_submission(chunk_data)
 | 
			
		||||
        self.contract.submit(submissions)
 | 
			
		||||
        self.contract.submit(submissions, tx_prarams = {"value": int(len(chunk_data) / 256 * PRICE_PER_SECTOR * 1.1)})
 | 
			
		||||
        wait_until(lambda: self.contract.num_submissions() == 1)
 | 
			
		||||
        wait_until(lambda: client.zgs_get_file_info(data_root) is not None)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -253,7 +253,7 @@ class BlockchainNode(TestNode):
 | 
			
		||||
    def wait_for_transaction_receipt(self, w3, tx_hash, timeout=120, parent_hash=None):
 | 
			
		||||
        return w3.eth.wait_for_transaction_receipt(tx_hash, timeout)
 | 
			
		||||
 | 
			
		||||
    def setup_contract(self, enable_market, mine_period):
 | 
			
		||||
    def setup_contract(self, enable_market, mine_period, lifetime_seconds):
 | 
			
		||||
        w3 = Web3(HTTPProvider(self.rpc_url))
 | 
			
		||||
 | 
			
		||||
        account1 = w3.eth.account.from_key(GENESIS_PRIV_KEY)
 | 
			
		||||
@ -314,9 +314,7 @@ class BlockchainNode(TestNode):
 | 
			
		||||
 | 
			
		||||
            return flow_contract, flow_initialize_hash, mine_contract, dummy_reward_contract
 | 
			
		||||
        
 | 
			
		||||
        def deploy_with_market():
 | 
			
		||||
            LIFETIME_MONTH = 1
 | 
			
		||||
 | 
			
		||||
        def deploy_with_market(lifetime_seconds):
 | 
			
		||||
            self.log.debug("Start deploy contracts")
 | 
			
		||||
            
 | 
			
		||||
            mine_contract, _ = deploy_contract("PoraMineTest", [0])
 | 
			
		||||
@ -325,7 +323,7 @@ class BlockchainNode(TestNode):
 | 
			
		||||
            market_contract, _ = deploy_contract("FixedPrice", [])
 | 
			
		||||
            self.log.debug("Market deployed")
 | 
			
		||||
            
 | 
			
		||||
            reward_contract, _ =deploy_contract("ChunkLinearReward", [LIFETIME_MONTH * 31 * 86400])
 | 
			
		||||
            reward_contract, _ = deploy_contract("ChunkLinearReward", [lifetime_seconds])
 | 
			
		||||
            self.log.debug("Reward deployed")
 | 
			
		||||
            
 | 
			
		||||
            flow_contract, _ = deploy_contract("FixedPriceFlow", [mine_period, 0])
 | 
			
		||||
@ -336,8 +334,9 @@ class BlockchainNode(TestNode):
 | 
			
		||||
            mine_contract.functions.setTargetSubmissions(2).transact(TX_PARAMS)
 | 
			
		||||
            self.log.debug("Mine Initialized")
 | 
			
		||||
            
 | 
			
		||||
            price_per_sector = int(LIFETIME_MONTH * 256 * 10 * 1_000_000_000_000_000_000 / 1024 / 1024 / 1024 / 12)
 | 
			
		||||
            market_contract.functions.initialize(price_per_sector, flow_contract.address, reward_contract.address).transact(TX_PARAMS)
 | 
			
		||||
            market_contract.functions.initialize(int(lifetime_seconds * 256 * 10 * 10 ** 18 /
 | 
			
		||||
                                                     2 ** 30 / 12 / 31 / 86400),
 | 
			
		||||
                                                 flow_contract.address, reward_contract.address).transact(TX_PARAMS)
 | 
			
		||||
            self.log.debug("Market Initialized")
 | 
			
		||||
            
 | 
			
		||||
            reward_contract.functions.initialize(market_contract.address, mine_contract.address).transact(TX_PARAMS)
 | 
			
		||||
@ -353,7 +352,7 @@ class BlockchainNode(TestNode):
 | 
			
		||||
            return flow_contract, flow_initialize_hash, mine_contract, reward_contract
 | 
			
		||||
        
 | 
			
		||||
        if enable_market:
 | 
			
		||||
            return deploy_with_market()
 | 
			
		||||
            return deploy_with_market(lifetime_seconds)
 | 
			
		||||
        else:
 | 
			
		||||
            return deploy_no_market()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -17,11 +17,11 @@ class ContractProxy:
 | 
			
		||||
            else self.blockchain_nodes[node_idx].get_contract(self.contract_address)
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    def _call(self, fn_name, node_idx, **args):
 | 
			
		||||
    def _call(self, fn_name, node_idx, *args):
 | 
			
		||||
        assert node_idx < len(self.blockchain_nodes)
 | 
			
		||||
 | 
			
		||||
        contract = self._get_contract(node_idx)
 | 
			
		||||
        return getattr(contract.functions, fn_name)(**args).call()
 | 
			
		||||
        return getattr(contract.functions, fn_name)(*args).call()
 | 
			
		||||
 | 
			
		||||
    def _send(self, fn_name, node_idx, **args):
 | 
			
		||||
        assert node_idx < len(self.blockchain_nodes)
 | 
			
		||||
@ -114,3 +114,9 @@ class RewardContractProxy(ContractProxy):
 | 
			
		||||
    
 | 
			
		||||
    def base_reward(self, node_idx = 0):
 | 
			
		||||
        return self._call("baseReward", node_idx)
 | 
			
		||||
 | 
			
		||||
    def first_rewardable_chunk(self, node_idx = 0):
 | 
			
		||||
        return self._call("firstRewardableChunk", node_idx)
 | 
			
		||||
 | 
			
		||||
    def reward_deadline(self, node_idx = 0):
 | 
			
		||||
        return self._call("rewardDeadline", node_idx, 0)
 | 
			
		||||
 | 
			
		||||
@ -51,6 +51,7 @@ class TestFramework:
 | 
			
		||||
        self.block_time = blockchain_node_type.block_time()
 | 
			
		||||
        self.enable_market = False
 | 
			
		||||
        self.mine_period = 100
 | 
			
		||||
        self.lifetime_seconds = 3600
 | 
			
		||||
        self.launch_wait_seconds = 1
 | 
			
		||||
 | 
			
		||||
        # Set default binary path
 | 
			
		||||
@ -171,7 +172,7 @@ class TestFramework:
 | 
			
		||||
                wait_until(lambda: node.eth_blockNumber() is not None)
 | 
			
		||||
                wait_until(lambda: int(node.eth_blockNumber(), 16) > 0)
 | 
			
		||||
 | 
			
		||||
        contract, tx_hash, mine_contract, reward_contract = self.blockchain_nodes[0].setup_contract(self.enable_market, self.mine_period)
 | 
			
		||||
        contract, tx_hash, mine_contract, reward_contract = self.blockchain_nodes[0].setup_contract(self.enable_market, self.mine_period, self.lifetime_seconds)
 | 
			
		||||
        self.contract = FlowContractProxy(contract, self.blockchain_nodes)
 | 
			
		||||
        self.mine_contract = MineContractProxy(mine_contract, self.blockchain_nodes)
 | 
			
		||||
        self.reward_contract = RewardContractProxy(reward_contract, self.blockchain_nodes)
 | 
			
		||||
@ -197,6 +198,7 @@ class TestFramework:
 | 
			
		||||
                updated_config,
 | 
			
		||||
                self.contract.address(),
 | 
			
		||||
                self.mine_contract.address(),
 | 
			
		||||
                self.reward_contract.address(),
 | 
			
		||||
                self.log,
 | 
			
		||||
            )
 | 
			
		||||
            self.nodes.append(node)
 | 
			
		||||
 | 
			
		||||
@ -22,6 +22,7 @@ class ZgsNode(TestNode):
 | 
			
		||||
        updated_config,
 | 
			
		||||
        log_contract_address,
 | 
			
		||||
        mine_contract_address,
 | 
			
		||||
        reward_contract_address,
 | 
			
		||||
        log,
 | 
			
		||||
        rpc_timeout=10,
 | 
			
		||||
        libp2p_nodes=None,
 | 
			
		||||
@ -43,6 +44,7 @@ class ZgsNode(TestNode):
 | 
			
		||||
            "network_libp2p_nodes": libp2p_nodes,
 | 
			
		||||
            "log_contract_address": log_contract_address,
 | 
			
		||||
            "mine_contract_address": mine_contract_address,
 | 
			
		||||
            "reward_contract_address": reward_contract_address,
 | 
			
		||||
            "blockchain_rpc_endpoint": f"http://127.0.0.1:{blockchain_rpc_port(0)}",
 | 
			
		||||
        }
 | 
			
		||||
        # Set configs for this specific node.
 | 
			
		||||
 | 
			
		||||
@ -25,7 +25,7 @@ class RpcCaller:
 | 
			
		||||
            if isinstance(parsed, Ok):
 | 
			
		||||
                return parsed.result
 | 
			
		||||
            else:
 | 
			
		||||
                print("Failed to call RPC, method = %s(%s), error = %s" % (self.method, str(*args), parsed))
 | 
			
		||||
                print("Failed to call RPC, method = %s(%s), error = %s" % (self.method, str(*args)[-1500:], parsed))
 | 
			
		||||
        except Exception as ex:
 | 
			
		||||
            print("Failed to call RPC, method = %s(%s), exception = %s" % (self.method, str(*args), ex))
 | 
			
		||||
            print("Failed to call RPC, method = %s(%s), exception = %s" % (self.method, str(*args)[-1500:], ex))
 | 
			
		||||
        return None
 | 
			
		||||
 | 
			
		||||
		Loading…
	
		Reference in New Issue
	
	Block a user