mirror of
				https://github.com/0glabs/0g-storage-node.git
				synced 2025-11-04 08:37:27 +00:00 
			
		
		
		
	Compare commits
	
		
			4 Commits
		
	
	
		
			954d792198
			...
			0bde65c81c
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 
						 | 
					0bde65c81c | ||
| 
						 | 
					e701c8fdbd | ||
| 
						 | 
					a4b02a21b7 | ||
| 
						 | 
					d49bdf1ad9 | 
@ -662,17 +662,24 @@ async fn check_watch_process(
 | 
				
			|||||||
                    "get block hash for block {} from RPC, assume there is no org",
 | 
					                    "get block hash for block {} from RPC, assume there is no org",
 | 
				
			||||||
                    *progress - 1
 | 
					                    *progress - 1
 | 
				
			||||||
                );
 | 
					                );
 | 
				
			||||||
                match provider.get_block(*progress - 1).await {
 | 
					                let hash = loop {
 | 
				
			||||||
                    Ok(Some(v)) => {
 | 
					                    match provider.get_block(*progress - 1).await {
 | 
				
			||||||
                        break v.hash.expect("parent block hash expect exist");
 | 
					                        Ok(Some(v)) => {
 | 
				
			||||||
 | 
					                            break v.hash.expect("parent block hash expect exist");
 | 
				
			||||||
 | 
					                        }
 | 
				
			||||||
 | 
					                        Ok(None) => {
 | 
				
			||||||
 | 
					                            panic!("parent block {} expect exist", *progress - 1);
 | 
				
			||||||
 | 
					                        }
 | 
				
			||||||
 | 
					                        Err(e) => {
 | 
				
			||||||
 | 
					                            if e.to_string().contains("server is too busy") {
 | 
				
			||||||
 | 
					                                warn!("server busy, wait for parent block {}", *progress - 1);
 | 
				
			||||||
 | 
					                            } else {
 | 
				
			||||||
 | 
					                                panic!("parent block {} expect exist, error {}", *progress - 1, e);
 | 
				
			||||||
 | 
					                            }
 | 
				
			||||||
 | 
					                        }
 | 
				
			||||||
                    }
 | 
					                    }
 | 
				
			||||||
                    Ok(None) => {
 | 
					                };
 | 
				
			||||||
                        panic!("parent block {} expect exist", *progress - 1);
 | 
					                break hash;
 | 
				
			||||||
                    }
 | 
					 | 
				
			||||||
                    Err(e) => {
 | 
					 | 
				
			||||||
                        panic!("parent block {} expect exist, error {}", *progress - 1, e);
 | 
					 | 
				
			||||||
                    }
 | 
					 | 
				
			||||||
                }
 | 
					 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
        };
 | 
					        };
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
				
			|||||||
@ -10,7 +10,7 @@ mod service;
 | 
				
			|||||||
use duration_str::deserialize_duration;
 | 
					use duration_str::deserialize_duration;
 | 
				
			||||||
use network::Multiaddr;
 | 
					use network::Multiaddr;
 | 
				
			||||||
use serde::Deserialize;
 | 
					use serde::Deserialize;
 | 
				
			||||||
use std::time::Duration;
 | 
					use std::{net::IpAddr, time::Duration};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
pub use crate::service::RouterService;
 | 
					pub use crate::service::RouterService;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -26,6 +26,7 @@ pub struct Config {
 | 
				
			|||||||
    pub libp2p_nodes: Vec<Multiaddr>,
 | 
					    pub libp2p_nodes: Vec<Multiaddr>,
 | 
				
			||||||
    pub private_ip_enabled: bool,
 | 
					    pub private_ip_enabled: bool,
 | 
				
			||||||
    pub check_announced_ip: bool,
 | 
					    pub check_announced_ip: bool,
 | 
				
			||||||
 | 
					    pub public_address: Option<IpAddr>,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    // batcher
 | 
					    // batcher
 | 
				
			||||||
    /// Timeout to publish messages in batch
 | 
					    /// Timeout to publish messages in batch
 | 
				
			||||||
@ -47,6 +48,7 @@ impl Default for Config {
 | 
				
			|||||||
            libp2p_nodes: vec![],
 | 
					            libp2p_nodes: vec![],
 | 
				
			||||||
            private_ip_enabled: false,
 | 
					            private_ip_enabled: false,
 | 
				
			||||||
            check_announced_ip: false,
 | 
					            check_announced_ip: false,
 | 
				
			||||||
 | 
					            public_address: None,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            batcher_timeout: Duration::from_secs(1),
 | 
					            batcher_timeout: Duration::from_secs(1),
 | 
				
			||||||
            batcher_file_capacity: 1,
 | 
					            batcher_file_capacity: 1,
 | 
				
			||||||
 | 
				
			|||||||
@ -348,17 +348,26 @@ impl Libp2pEventHandler {
 | 
				
			|||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    async fn get_listen_addr_or_add(&self) -> Option<Multiaddr> {
 | 
					    async fn construct_announced_ip(&self) -> Option<Multiaddr> {
 | 
				
			||||||
 | 
					        // public address configured
 | 
				
			||||||
 | 
					        if let Some(ip) = self.config.public_address {
 | 
				
			||||||
 | 
					            let mut addr = Multiaddr::empty();
 | 
				
			||||||
 | 
					            addr.push(ip.into());
 | 
				
			||||||
 | 
					            addr.push(Protocol::Tcp(self.network_globals.listen_port_tcp()));
 | 
				
			||||||
 | 
					            return Some(addr);
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        // public listen address
 | 
				
			||||||
        if let Some(addr) = self.get_listen_addr() {
 | 
					        if let Some(addr) = self.get_listen_addr() {
 | 
				
			||||||
            return Some(addr);
 | 
					            return Some(addr);
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        // auto detect public IP address
 | 
				
			||||||
        let ipv4_addr = public_ip::addr_v4().await?;
 | 
					        let ipv4_addr = public_ip::addr_v4().await?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        let mut addr = Multiaddr::empty();
 | 
					        let mut addr = Multiaddr::empty();
 | 
				
			||||||
        addr.push(Protocol::Ip4(ipv4_addr));
 | 
					        addr.push(Protocol::Ip4(ipv4_addr));
 | 
				
			||||||
        addr.push(Protocol::Tcp(self.network_globals.listen_port_tcp()));
 | 
					        addr.push(Protocol::Tcp(self.network_globals.listen_port_tcp()));
 | 
				
			||||||
        addr.push(Protocol::P2p(self.network_globals.local_peer_id().into()));
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        self.network_globals
 | 
					        self.network_globals
 | 
				
			||||||
            .listen_multiaddrs
 | 
					            .listen_multiaddrs
 | 
				
			||||||
@ -420,7 +429,7 @@ impl Libp2pEventHandler {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
        let peer_id = *self.network_globals.peer_id.read();
 | 
					        let peer_id = *self.network_globals.peer_id.read();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        let addr = self.get_listen_addr_or_add().await?;
 | 
					        let addr = self.construct_announced_ip().await?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        let timestamp = timestamp_now();
 | 
					        let timestamp = timestamp_now();
 | 
				
			||||||
        let shard_config = self.store.get_store().get_shard_config();
 | 
					        let shard_config = self.store.get_store().get_shard_config();
 | 
				
			||||||
@ -452,7 +461,7 @@ impl Libp2pEventHandler {
 | 
				
			|||||||
        shard_config: ShardConfig,
 | 
					        shard_config: ShardConfig,
 | 
				
			||||||
    ) -> Option<PubsubMessage> {
 | 
					    ) -> Option<PubsubMessage> {
 | 
				
			||||||
        let peer_id = *self.network_globals.peer_id.read();
 | 
					        let peer_id = *self.network_globals.peer_id.read();
 | 
				
			||||||
        let addr = self.get_listen_addr_or_add().await?;
 | 
					        let addr = self.construct_announced_ip().await?;
 | 
				
			||||||
        let timestamp = timestamp_now();
 | 
					        let timestamp = timestamp_now();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        let msg = AnnounceShardConfig {
 | 
					        let msg = AnnounceShardConfig {
 | 
				
			||||||
@ -528,7 +537,7 @@ impl Libp2pEventHandler {
 | 
				
			|||||||
        index_end: u64,
 | 
					        index_end: u64,
 | 
				
			||||||
    ) -> Option<PubsubMessage> {
 | 
					    ) -> Option<PubsubMessage> {
 | 
				
			||||||
        let peer_id = *self.network_globals.peer_id.read();
 | 
					        let peer_id = *self.network_globals.peer_id.read();
 | 
				
			||||||
        let addr = self.get_listen_addr_or_add().await?;
 | 
					        let addr = self.construct_announced_ip().await?;
 | 
				
			||||||
        let timestamp = timestamp_now();
 | 
					        let timestamp = timestamp_now();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        let msg = AnnounceChunks {
 | 
					        let msg = AnnounceChunks {
 | 
				
			||||||
 | 
				
			|||||||
@ -2,7 +2,7 @@ use crate::types::{FileInfo, Segment, SegmentWithProof, Status};
 | 
				
			|||||||
use jsonrpsee::core::RpcResult;
 | 
					use jsonrpsee::core::RpcResult;
 | 
				
			||||||
use jsonrpsee::proc_macros::rpc;
 | 
					use jsonrpsee::proc_macros::rpc;
 | 
				
			||||||
use shared_types::{DataRoot, FlowProof, TxSeqOrRoot};
 | 
					use shared_types::{DataRoot, FlowProof, TxSeqOrRoot};
 | 
				
			||||||
use storage::config::ShardConfig;
 | 
					use storage::{config::ShardConfig, H256};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#[rpc(server, client, namespace = "zgs")]
 | 
					#[rpc(server, client, namespace = "zgs")]
 | 
				
			||||||
pub trait Rpc {
 | 
					pub trait Rpc {
 | 
				
			||||||
@ -77,4 +77,7 @@ pub trait Rpc {
 | 
				
			|||||||
        sector_index: u64,
 | 
					        sector_index: u64,
 | 
				
			||||||
        flow_root: Option<DataRoot>,
 | 
					        flow_root: Option<DataRoot>,
 | 
				
			||||||
    ) -> RpcResult<FlowProof>;
 | 
					    ) -> RpcResult<FlowProof>;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    #[method(name = "getFlowContext")]
 | 
				
			||||||
 | 
					    async fn get_flow_context(&self) -> RpcResult<(H256, u64)>;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -8,7 +8,7 @@ use jsonrpsee::core::RpcResult;
 | 
				
			|||||||
use shared_types::{DataRoot, FlowProof, Transaction, TxSeqOrRoot, CHUNK_SIZE};
 | 
					use shared_types::{DataRoot, FlowProof, Transaction, TxSeqOrRoot, CHUNK_SIZE};
 | 
				
			||||||
use std::fmt::{Debug, Formatter, Result};
 | 
					use std::fmt::{Debug, Formatter, Result};
 | 
				
			||||||
use storage::config::ShardConfig;
 | 
					use storage::config::ShardConfig;
 | 
				
			||||||
use storage::try_option;
 | 
					use storage::{try_option, H256};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
pub struct RpcServerImpl {
 | 
					pub struct RpcServerImpl {
 | 
				
			||||||
    pub ctx: Context,
 | 
					    pub ctx: Context,
 | 
				
			||||||
@ -198,6 +198,10 @@ impl RpcServer for RpcServerImpl {
 | 
				
			|||||||
        assert_eq!(proof.left_proof, proof.right_proof);
 | 
					        assert_eq!(proof.left_proof, proof.right_proof);
 | 
				
			||||||
        Ok(proof.right_proof)
 | 
					        Ok(proof.right_proof)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    async fn get_flow_context(&self) -> RpcResult<(H256, u64)> {
 | 
				
			||||||
 | 
					        Ok(self.ctx.log_store.get_context().await?)
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
impl RpcServerImpl {
 | 
					impl RpcServerImpl {
 | 
				
			||||||
 | 
				
			|||||||
@ -200,6 +200,13 @@ impl ZgsConfig {
 | 
				
			|||||||
    pub fn router_config(&self, network_config: &NetworkConfig) -> Result<router::Config, String> {
 | 
					    pub fn router_config(&self, network_config: &NetworkConfig) -> Result<router::Config, String> {
 | 
				
			||||||
        let mut router_config = self.router.clone();
 | 
					        let mut router_config = self.router.clone();
 | 
				
			||||||
        router_config.libp2p_nodes = network_config.libp2p_nodes.to_vec();
 | 
					        router_config.libp2p_nodes = network_config.libp2p_nodes.to_vec();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if router_config.public_address.is_none() {
 | 
				
			||||||
 | 
					            if let Some(addr) = &self.network_enr_address {
 | 
				
			||||||
 | 
					                router_config.public_address = Some(addr.parse().unwrap());
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        Ok(router_config)
 | 
					        Ok(router_config)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
							
								
								
									
										55
									
								
								tests/root_consistency_test.py
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										55
									
								
								tests/root_consistency_test.py
									
									
									
									
									
										Executable file
									
								
							@ -0,0 +1,55 @@
 | 
				
			|||||||
 | 
					#!/usr/bin/env python3
 | 
				
			||||||
 | 
					from test_framework.test_framework import TestFramework
 | 
				
			||||||
 | 
					from config.node_config import MINER_ID, GENESIS_PRIV_KEY
 | 
				
			||||||
 | 
					from utility.submission import create_submission, submit_data
 | 
				
			||||||
 | 
					from utility.utils import wait_until, assert_equal
 | 
				
			||||||
 | 
					from test_framework.blockchain_node import BlockChainNodeType
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class RootConsistencyTest(TestFramework):
 | 
				
			||||||
 | 
					    def setup_params(self):
 | 
				
			||||||
 | 
					        self.num_blockchain_nodes = 1
 | 
				
			||||||
 | 
					        self.num_nodes = 1
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def submit_data(self, item, size):
 | 
				
			||||||
 | 
					        submissions_before = self.contract.num_submissions()
 | 
				
			||||||
 | 
					        client = self.nodes[0]
 | 
				
			||||||
 | 
					        chunk_data = item * 256 * size
 | 
				
			||||||
 | 
					        submissions, data_root = create_submission(chunk_data)
 | 
				
			||||||
 | 
					        self.contract.submit(submissions)
 | 
				
			||||||
 | 
					        wait_until(lambda: self.contract.num_submissions() == submissions_before + 1)
 | 
				
			||||||
 | 
					        wait_until(lambda: client.zgs_get_file_info(data_root) is not None)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        segment = submit_data(client, chunk_data)
 | 
				
			||||||
 | 
					        wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def assert_flow_status(self, expected_length):
 | 
				
			||||||
 | 
					        contract_root = self.contract.get_flow_root().hex()
 | 
				
			||||||
 | 
					        contract_length = self.contract.get_flow_length()
 | 
				
			||||||
 | 
					        (node_root, node_length) = tuple(self.nodes[0].zgs_getFlowContext())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        assert_equal(contract_length, node_length)
 | 
				
			||||||
 | 
					        assert_equal(contract_length, expected_length)
 | 
				
			||||||
 | 
					        assert_equal(contract_root, node_root[2:])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def run_test(self):
 | 
				
			||||||
 | 
					        self.assert_flow_status(1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        self.submit_data(b"\x11", 1)
 | 
				
			||||||
 | 
					        self.assert_flow_status(2)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        self.submit_data(b"\x11", 8 + 4 + 2)
 | 
				
			||||||
 | 
					        self.assert_flow_status(16 + 4 + 2)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        self.submit_data(b"\x12", 128 + 64)
 | 
				
			||||||
 | 
					        self.assert_flow_status(256 + 64)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        self.submit_data(b"\x13", 512 + 256)
 | 
				
			||||||
 | 
					        self.assert_flow_status(1024 + 512 + 256)
 | 
				
			||||||
 | 
					        
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					if __name__ == "__main__":
 | 
				
			||||||
 | 
					    RootConsistencyTest().main()
 | 
				
			||||||
@ -91,7 +91,12 @@ class FlowContractProxy(ContractProxy):
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    def get_mine_context(self, node_idx=0):
 | 
					    def get_mine_context(self, node_idx=0):
 | 
				
			||||||
        return self._call("makeContextWithResult", node_idx)
 | 
					        return self._call("makeContextWithResult", node_idx)
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    def get_flow_root(self, node_idx=0):
 | 
				
			||||||
 | 
					        return self._call("computeFlowRoot", node_idx)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def get_flow_length(self, node_idx=0):
 | 
				
			||||||
 | 
					        return self._call("tree", node_idx)[0]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class MineContractProxy(ContractProxy):
 | 
					class MineContractProxy(ContractProxy):
 | 
				
			||||||
    def last_mined_epoch(self, node_idx=0):
 | 
					    def last_mined_epoch(self, node_idx=0):
 | 
				
			||||||
 | 
				
			|||||||
@ -100,6 +100,9 @@ class ZgsNode(TestNode):
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    def zgs_get_file_info_by_tx_seq(self, tx_seq):
 | 
					    def zgs_get_file_info_by_tx_seq(self, tx_seq):
 | 
				
			||||||
        return self.rpc.zgs_getFileInfoByTxSeq([tx_seq])
 | 
					        return self.rpc.zgs_getFileInfoByTxSeq([tx_seq])
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    def zgs_get_flow_context(self, tx_seq):
 | 
				
			||||||
 | 
					        return self.rpc.zgs_getFlowContext([tx_seq])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def shutdown(self):
 | 
					    def shutdown(self):
 | 
				
			||||||
        self.rpc.admin_shutdown()
 | 
					        self.rpc.admin_shutdown()
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
		Reference in New Issue
	
	Block a user