Compare commits

...

4 Commits

Author SHA1 Message Date
bruno-valante
0bde65c81c
Merge d49bdf1ad9 into e701c8fdbd 2024-10-14 08:58:00 +02:00
Bo QIU
e701c8fdbd
Supports custom public ip to announce file (#233)
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run
* Supports custom public ip to announce file

* Fix comment
2024-10-14 14:57:42 +08:00
0g-peterzhb
a4b02a21b7
add retry (#232) 2024-10-14 14:19:05 +08:00
Bruno Valente
d49bdf1ad9 Test flow root consistency 2024-10-12 17:25:09 +08:00
9 changed files with 113 additions and 18 deletions

View File

@ -662,17 +662,24 @@ async fn check_watch_process(
"get block hash for block {} from RPC, assume there is no org", "get block hash for block {} from RPC, assume there is no org",
*progress - 1 *progress - 1
); );
match provider.get_block(*progress - 1).await { let hash = loop {
Ok(Some(v)) => { match provider.get_block(*progress - 1).await {
break v.hash.expect("parent block hash expect exist"); Ok(Some(v)) => {
break v.hash.expect("parent block hash expect exist");
}
Ok(None) => {
panic!("parent block {} expect exist", *progress - 1);
}
Err(e) => {
if e.to_string().contains("server is too busy") {
warn!("server busy, wait for parent block {}", *progress - 1);
} else {
panic!("parent block {} expect exist, error {}", *progress - 1, e);
}
}
} }
Ok(None) => { };
panic!("parent block {} expect exist", *progress - 1); break hash;
}
Err(e) => {
panic!("parent block {} expect exist, error {}", *progress - 1, e);
}
}
} }
}; };
} }

View File

@ -10,7 +10,7 @@ mod service;
use duration_str::deserialize_duration; use duration_str::deserialize_duration;
use network::Multiaddr; use network::Multiaddr;
use serde::Deserialize; use serde::Deserialize;
use std::time::Duration; use std::{net::IpAddr, time::Duration};
pub use crate::service::RouterService; pub use crate::service::RouterService;
@ -26,6 +26,7 @@ pub struct Config {
pub libp2p_nodes: Vec<Multiaddr>, pub libp2p_nodes: Vec<Multiaddr>,
pub private_ip_enabled: bool, pub private_ip_enabled: bool,
pub check_announced_ip: bool, pub check_announced_ip: bool,
pub public_address: Option<IpAddr>,
// batcher // batcher
/// Timeout to publish messages in batch /// Timeout to publish messages in batch
@ -47,6 +48,7 @@ impl Default for Config {
libp2p_nodes: vec![], libp2p_nodes: vec![],
private_ip_enabled: false, private_ip_enabled: false,
check_announced_ip: false, check_announced_ip: false,
public_address: None,
batcher_timeout: Duration::from_secs(1), batcher_timeout: Duration::from_secs(1),
batcher_file_capacity: 1, batcher_file_capacity: 1,

View File

@ -348,17 +348,26 @@ impl Libp2pEventHandler {
} }
} }
async fn get_listen_addr_or_add(&self) -> Option<Multiaddr> { async fn construct_announced_ip(&self) -> Option<Multiaddr> {
// public address configured
if let Some(ip) = self.config.public_address {
let mut addr = Multiaddr::empty();
addr.push(ip.into());
addr.push(Protocol::Tcp(self.network_globals.listen_port_tcp()));
return Some(addr);
}
// public listen address
if let Some(addr) = self.get_listen_addr() { if let Some(addr) = self.get_listen_addr() {
return Some(addr); return Some(addr);
} }
// auto detect public IP address
let ipv4_addr = public_ip::addr_v4().await?; let ipv4_addr = public_ip::addr_v4().await?;
let mut addr = Multiaddr::empty(); let mut addr = Multiaddr::empty();
addr.push(Protocol::Ip4(ipv4_addr)); addr.push(Protocol::Ip4(ipv4_addr));
addr.push(Protocol::Tcp(self.network_globals.listen_port_tcp())); addr.push(Protocol::Tcp(self.network_globals.listen_port_tcp()));
addr.push(Protocol::P2p(self.network_globals.local_peer_id().into()));
self.network_globals self.network_globals
.listen_multiaddrs .listen_multiaddrs
@ -420,7 +429,7 @@ impl Libp2pEventHandler {
let peer_id = *self.network_globals.peer_id.read(); let peer_id = *self.network_globals.peer_id.read();
let addr = self.get_listen_addr_or_add().await?; let addr = self.construct_announced_ip().await?;
let timestamp = timestamp_now(); let timestamp = timestamp_now();
let shard_config = self.store.get_store().get_shard_config(); let shard_config = self.store.get_store().get_shard_config();
@ -452,7 +461,7 @@ impl Libp2pEventHandler {
shard_config: ShardConfig, shard_config: ShardConfig,
) -> Option<PubsubMessage> { ) -> Option<PubsubMessage> {
let peer_id = *self.network_globals.peer_id.read(); let peer_id = *self.network_globals.peer_id.read();
let addr = self.get_listen_addr_or_add().await?; let addr = self.construct_announced_ip().await?;
let timestamp = timestamp_now(); let timestamp = timestamp_now();
let msg = AnnounceShardConfig { let msg = AnnounceShardConfig {
@ -528,7 +537,7 @@ impl Libp2pEventHandler {
index_end: u64, index_end: u64,
) -> Option<PubsubMessage> { ) -> Option<PubsubMessage> {
let peer_id = *self.network_globals.peer_id.read(); let peer_id = *self.network_globals.peer_id.read();
let addr = self.get_listen_addr_or_add().await?; let addr = self.construct_announced_ip().await?;
let timestamp = timestamp_now(); let timestamp = timestamp_now();
let msg = AnnounceChunks { let msg = AnnounceChunks {

View File

@ -2,7 +2,7 @@ use crate::types::{FileInfo, Segment, SegmentWithProof, Status};
use jsonrpsee::core::RpcResult; use jsonrpsee::core::RpcResult;
use jsonrpsee::proc_macros::rpc; use jsonrpsee::proc_macros::rpc;
use shared_types::{DataRoot, FlowProof, TxSeqOrRoot}; use shared_types::{DataRoot, FlowProof, TxSeqOrRoot};
use storage::config::ShardConfig; use storage::{config::ShardConfig, H256};
#[rpc(server, client, namespace = "zgs")] #[rpc(server, client, namespace = "zgs")]
pub trait Rpc { pub trait Rpc {
@ -77,4 +77,7 @@ pub trait Rpc {
sector_index: u64, sector_index: u64,
flow_root: Option<DataRoot>, flow_root: Option<DataRoot>,
) -> RpcResult<FlowProof>; ) -> RpcResult<FlowProof>;
#[method(name = "getFlowContext")]
async fn get_flow_context(&self) -> RpcResult<(H256, u64)>;
} }

View File

@ -8,7 +8,7 @@ use jsonrpsee::core::RpcResult;
use shared_types::{DataRoot, FlowProof, Transaction, TxSeqOrRoot, CHUNK_SIZE}; use shared_types::{DataRoot, FlowProof, Transaction, TxSeqOrRoot, CHUNK_SIZE};
use std::fmt::{Debug, Formatter, Result}; use std::fmt::{Debug, Formatter, Result};
use storage::config::ShardConfig; use storage::config::ShardConfig;
use storage::try_option; use storage::{try_option, H256};
pub struct RpcServerImpl { pub struct RpcServerImpl {
pub ctx: Context, pub ctx: Context,
@ -198,6 +198,10 @@ impl RpcServer for RpcServerImpl {
assert_eq!(proof.left_proof, proof.right_proof); assert_eq!(proof.left_proof, proof.right_proof);
Ok(proof.right_proof) Ok(proof.right_proof)
} }
async fn get_flow_context(&self) -> RpcResult<(H256, u64)> {
Ok(self.ctx.log_store.get_context().await?)
}
} }
impl RpcServerImpl { impl RpcServerImpl {

View File

@ -200,6 +200,13 @@ impl ZgsConfig {
pub fn router_config(&self, network_config: &NetworkConfig) -> Result<router::Config, String> { pub fn router_config(&self, network_config: &NetworkConfig) -> Result<router::Config, String> {
let mut router_config = self.router.clone(); let mut router_config = self.router.clone();
router_config.libp2p_nodes = network_config.libp2p_nodes.to_vec(); router_config.libp2p_nodes = network_config.libp2p_nodes.to_vec();
if router_config.public_address.is_none() {
if let Some(addr) = &self.network_enr_address {
router_config.public_address = Some(addr.parse().unwrap());
}
}
Ok(router_config) Ok(router_config)
} }

55
tests/root_consistency_test.py Executable file
View File

@ -0,0 +1,55 @@
#!/usr/bin/env python3
from test_framework.test_framework import TestFramework
from config.node_config import MINER_ID, GENESIS_PRIV_KEY
from utility.submission import create_submission, submit_data
from utility.utils import wait_until, assert_equal
from test_framework.blockchain_node import BlockChainNodeType
class RootConsistencyTest(TestFramework):
def setup_params(self):
self.num_blockchain_nodes = 1
self.num_nodes = 1
def submit_data(self, item, size):
submissions_before = self.contract.num_submissions()
client = self.nodes[0]
chunk_data = item * 256 * size
submissions, data_root = create_submission(chunk_data)
self.contract.submit(submissions)
wait_until(lambda: self.contract.num_submissions() == submissions_before + 1)
wait_until(lambda: client.zgs_get_file_info(data_root) is not None)
segment = submit_data(client, chunk_data)
wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"])
def assert_flow_status(self, expected_length):
contract_root = self.contract.get_flow_root().hex()
contract_length = self.contract.get_flow_length()
(node_root, node_length) = tuple(self.nodes[0].zgs_getFlowContext())
assert_equal(contract_length, node_length)
assert_equal(contract_length, expected_length)
assert_equal(contract_root, node_root[2:])
def run_test(self):
self.assert_flow_status(1)
self.submit_data(b"\x11", 1)
self.assert_flow_status(2)
self.submit_data(b"\x11", 8 + 4 + 2)
self.assert_flow_status(16 + 4 + 2)
self.submit_data(b"\x12", 128 + 64)
self.assert_flow_status(256 + 64)
self.submit_data(b"\x13", 512 + 256)
self.assert_flow_status(1024 + 512 + 256)
if __name__ == "__main__":
RootConsistencyTest().main()

View File

@ -91,7 +91,12 @@ class FlowContractProxy(ContractProxy):
def get_mine_context(self, node_idx=0): def get_mine_context(self, node_idx=0):
return self._call("makeContextWithResult", node_idx) return self._call("makeContextWithResult", node_idx)
def get_flow_root(self, node_idx=0):
return self._call("computeFlowRoot", node_idx)
def get_flow_length(self, node_idx=0):
return self._call("tree", node_idx)[0]
class MineContractProxy(ContractProxy): class MineContractProxy(ContractProxy):
def last_mined_epoch(self, node_idx=0): def last_mined_epoch(self, node_idx=0):

View File

@ -100,6 +100,9 @@ class ZgsNode(TestNode):
def zgs_get_file_info_by_tx_seq(self, tx_seq): def zgs_get_file_info_by_tx_seq(self, tx_seq):
return self.rpc.zgs_getFileInfoByTxSeq([tx_seq]) return self.rpc.zgs_getFileInfoByTxSeq([tx_seq])
def zgs_get_flow_context(self, tx_seq):
return self.rpc.zgs_getFlowContext([tx_seq])
def shutdown(self): def shutdown(self):
self.rpc.admin_shutdown() self.rpc.admin_shutdown()