From d3a2118985cc5ecb0ca15e94680f6c6b030852c6 Mon Sep 17 00:00:00 2001 From: Bo QIU <35757521+boqiu@users.noreply.github.com> Date: Mon, 9 Dec 2024 16:04:55 +0800 Subject: [PATCH] Sync file from neighbors by default (#295) * sync from neighbors by default * change p2p protocol version * minor fix for crash python test * Fix pubsub message id issue * fix python tests --- node/network/src/behaviour/mod.rs | 2 +- node/network/src/config.rs | 64 +++++++++++++++++------------ node/network/src/lib.rs | 2 + node/router/src/service.rs | 1 + node/src/config/convert.rs | 10 ++--- node/sync/src/controllers/serial.rs | 7 +++- node/sync/src/lib.rs | 2 +- node/sync/src/service.rs | 10 ++++- run/config-testnet-standard.toml | 4 -- run/config-testnet-turbo.toml | 4 -- run/config.toml | 4 -- tests/crash_test.py | 3 +- tests/sync_auto_random_v2_test.py | 1 - tests/sync_test.py | 6 ++- 14 files changed, 65 insertions(+), 55 deletions(-) diff --git a/node/network/src/behaviour/mod.rs b/node/network/src/behaviour/mod.rs index e18f0af..f0e9da7 100644 --- a/node/network/src/behaviour/mod.rs +++ b/node/network/src/behaviour/mod.rs @@ -390,7 +390,7 @@ impl Behaviour { .gossipsub .publish(topic.clone().into(), message_data.clone()) { - warn!(error = ?e, "Could not publish message"); + warn!(error = ?e, topic = ?topic.kind(), "Failed to publish message"); // add to metrics if let Some(v) = metrics::get_int_gauge( diff --git a/node/network/src/config.rs b/node/network/src/config.rs index 616bd8b..2f44dff 100644 --- a/node/network/src/config.rs +++ b/node/network/src/config.rs @@ -294,32 +294,6 @@ impl From for NetworkLoad { /// Return a Lighthouse specific `GossipsubConfig` where the `message_id_fn` depends on the current fork. pub fn gossipsub_config(network_load: u8) -> GossipsubConfig { - // The function used to generate a gossipsub message id - // We use the first 8 bytes of SHA256(data) for content addressing - let fast_gossip_message_id = - |message: &RawGossipsubMessage| FastMessageId::from(&Sha256::digest(&message.data)[..8]); - fn prefix(prefix: [u8; 4], message: &GossipsubMessage) -> Vec { - let topic_bytes = message.topic.as_str().as_bytes(); - - // according to: https://github.com/ethereum/consensus-specs/blob/dev/specs/merge/p2p-interface.md#the-gossip-domain-gossipsub - // the derivation of the message-id remains the same in the merge - let topic_len_bytes = topic_bytes.len().to_le_bytes(); - let mut vec = Vec::with_capacity( - prefix.len() + topic_len_bytes.len() + topic_bytes.len() + message.data.len(), - ); - vec.extend_from_slice(&prefix); - vec.extend_from_slice(&topic_len_bytes); - vec.extend_from_slice(topic_bytes); - vec.extend_from_slice(&message.data); - vec - } - - let gossip_message_id = move |message: &GossipsubMessage| { - MessageId::from( - &Sha256::digest(prefix(MESSAGE_DOMAIN_VALID_SNAPPY, message).as_slice())[..20], - ) - }; - let load = NetworkLoad::from(network_load); GossipsubConfigBuilder::default() @@ -368,3 +342,41 @@ fn is_global(addr: &std::net::Ipv4Addr) -> bool { // Make sure the address is not in 0.0.0.0/8 && addr.octets()[0] != 0 } + +fn fast_gossip_message_id(message: &RawGossipsubMessage) -> FastMessageId { + // source | topic | data | nonce + let topic_bytes = message.topic.as_str().as_bytes(); + let mut buf = Vec::with_capacity(64 + topic_bytes.len() + message.data.len() + 8); + + if let Some(peer_id) = message.source { + buf.extend_from_slice(&peer_id.to_bytes()); + } + + buf.extend_from_slice(&topic_bytes); + buf.extend_from_slice(&message.data); + + if let Some(nonce) = message.sequence_number { + buf.extend_from_slice(&nonce.to_le_bytes()); + } + + FastMessageId::from(&Sha256::digest(&buf)[..8]) +} + +fn gossip_message_id(message: &GossipsubMessage) -> MessageId { + // prefix | source | topic | data | nonce + let topic_bytes = message.topic.as_str().as_bytes(); + let mut vec = Vec::with_capacity( + MESSAGE_DOMAIN_VALID_SNAPPY.len() + 64 + topic_bytes.len() + message.data.len() + 8, + ); + vec.extend_from_slice(&MESSAGE_DOMAIN_VALID_SNAPPY); + if let Some(peer_id) = message.source { + vec.extend_from_slice(&peer_id.to_bytes()); + } + vec.extend_from_slice(topic_bytes); + vec.extend_from_slice(&message.data); + if let Some(nonce) = message.sequence_number { + vec.extend_from_slice(&nonce.to_le_bytes()); + } + + MessageId::from(&Sha256::digest(&vec)[..20]) +} diff --git a/node/network/src/lib.rs b/node/network/src/lib.rs index 29a503f..2ed2f8f 100644 --- a/node/network/src/lib.rs +++ b/node/network/src/lib.rs @@ -97,9 +97,11 @@ pub use service::{load_private_key, Context, Libp2pEvent, Service, NETWORK_KEY_F /// - v1: Broadcast FindFile & AnnounceFile messages in the whole network, which caused network too heavey. /// - v2: Publish NewFile to neighbors only and announce file via RPC message. /// - v3: Add shard config in Status message. +/// - v4: Refactor pubsub messages. pub const PROTOCOL_VERSION_V1: [u8; 3] = [0, 1, 1]; pub const PROTOCOL_VERSION_V2: [u8; 3] = [0, 2, 1]; pub const PROTOCOL_VERSION_V3: [u8; 3] = [0, 3, 0]; +pub const PROTOCOL_VERSION_V4: [u8; 3] = [0, 4, 0]; /// Application level requests sent to the network. #[derive(Debug, Clone, Copy)] diff --git a/node/router/src/service.rs b/node/router/src/service.rs index 190ff0c..f9d4160 100644 --- a/node/router/src/service.rs +++ b/node/router/src/service.rs @@ -343,6 +343,7 @@ impl RouterService { let msg = PubsubMessage::NewFile(new_file.into()); self.libp2p.swarm.behaviour_mut().publish(vec![msg]); metrics::SERVICE_ROUTE_NETWORK_MESSAGE_ANNOUNCE_LOCAL_FILE.mark(1); + debug!(?new_file, "Publish NewFile message"); } NetworkMessage::UPnPMappingEstablished { tcp_socket, diff --git a/node/src/config/convert.rs b/node/src/config/convert.rs index 0ba2a1e..5f76559 100644 --- a/node/src/config/convert.rs +++ b/node/src/config/convert.rs @@ -43,13 +43,9 @@ impl ZgsConfig { chain_id, flow_address, p2p_protocol_version: ProtocolVersion { - major: network::PROTOCOL_VERSION_V3[0], - minor: network::PROTOCOL_VERSION_V3[1], - build: if self.sync.neighbors_only { - network::PROTOCOL_VERSION_V3[2] + 1 - } else { - network::PROTOCOL_VERSION_V3[2] - }, + major: network::PROTOCOL_VERSION_V4[0], + minor: network::PROTOCOL_VERSION_V4[1], + build: network::PROTOCOL_VERSION_V4[2], }, }; network_config.network_id = local_network_id.clone(); diff --git a/node/sync/src/controllers/serial.rs b/node/sync/src/controllers/serial.rs index 0865b3e..cba6b69 100644 --- a/node/sync/src/controllers/serial.rs +++ b/node/sync/src/controllers/serial.rs @@ -677,7 +677,7 @@ impl SerialSyncController { } else { // FindFile timeout if since.elapsed() >= self.config.peer_find_timeout { - if self.config.neighbors_only { + if self.goal.is_all_chunks() && self.config.neighbors_only { self.state = SyncState::Failed { reason: FailureReason::TimeoutFindFile, }; @@ -1694,7 +1694,10 @@ mod tests { let file_location_cache = create_file_location_cache(peer_id, vec![tx_id]); let controller = SerialSyncController::new( - Config::default(), + Config { + neighbors_only: false, + ..Default::default() + }, tx_id, 0, FileSyncGoal::new_file(num_chunks as u64), diff --git a/node/sync/src/lib.rs b/node/sync/src/lib.rs index 3f08d85..cfe2df5 100644 --- a/node/sync/src/lib.rs +++ b/node/sync/src/lib.rs @@ -68,7 +68,7 @@ impl Default for Config { fn default() -> Self { Self { // sync service config - neighbors_only: false, + neighbors_only: true, heartbeat_interval: Duration::from_secs(5), auto_sync_enabled: false, max_sync_files: 8, diff --git a/node/sync/src/service.rs b/node/sync/src/service.rs index 90e1c31..646da5b 100644 --- a/node/sync/src/service.rs +++ b/node/sync/src/service.rs @@ -941,8 +941,14 @@ mod tests { } async fn spawn_sync_service(&mut self, with_peer_store: bool) -> SyncSender { - self.spawn_sync_service_with_config(with_peer_store, Config::default()) - .await + self.spawn_sync_service_with_config( + with_peer_store, + Config { + neighbors_only: false, + ..Default::default() + }, + ) + .await } async fn spawn_sync_service_with_config( diff --git a/run/config-testnet-standard.toml b/run/config-testnet-standard.toml index d39b57d..5434154 100644 --- a/run/config-testnet-standard.toml +++ b/run/config-testnet-standard.toml @@ -232,10 +232,6 @@ batcher_announcement_capacity = 100 # all files, and sufficient disk space is required. auto_sync_enabled = true -# Indicates whether to sync file from neighbor nodes only. This is to avoid flooding file -# announcements in the whole network, which leads to high latency or even timeout to sync files. -neighbors_only = true - # Maximum number of files in sync from other peers simultaneously. # max_sync_files = 8 diff --git a/run/config-testnet-turbo.toml b/run/config-testnet-turbo.toml index f3083cc..a59047a 100644 --- a/run/config-testnet-turbo.toml +++ b/run/config-testnet-turbo.toml @@ -244,10 +244,6 @@ batcher_announcement_capacity = 100 # all files, and sufficient disk space is required. auto_sync_enabled = true -# Indicates whether to sync file from neighbor nodes only. This is to avoid flooding file -# announcements in the whole network, which leads to high latency or even timeout to sync files. -neighbors_only = true - # Maximum number of files in sync from other peers simultaneously. # max_sync_files = 8 diff --git a/run/config.toml b/run/config.toml index fe7ab12..2c43742 100644 --- a/run/config.toml +++ b/run/config.toml @@ -246,10 +246,6 @@ # all files, and sufficient disk space is required. # auto_sync_enabled = false -# Indicates whether to sync file from neighbor nodes only. This is to avoid flooding file -# announcements in the whole network, which leads to high latency or even timeout to sync files. -neighbors_only = true - # Maximum number of files in sync from other peers simultaneously. # max_sync_files = 8 diff --git a/tests/crash_test.py b/tests/crash_test.py index 503e614..22c09c1 100755 --- a/tests/crash_test.py +++ b/tests/crash_test.py @@ -20,8 +20,9 @@ class CrashTest(TestFramework): segment = submit_data(self.nodes[0], chunk_data) self.log.info("segment: %s", segment) + wait_until(lambda: self.nodes[0].zgs_get_file_info(data_root)["finalized"] is True) - for i in range(self.num_nodes): + for i in range(1, self.num_nodes): wait_until( lambda: self.nodes[i].zgs_get_file_info(data_root) is not None ) diff --git a/tests/sync_auto_random_v2_test.py b/tests/sync_auto_random_v2_test.py index f584283..a00c559 100644 --- a/tests/sync_auto_random_v2_test.py +++ b/tests/sync_auto_random_v2_test.py @@ -14,7 +14,6 @@ class AutoRandomSyncV2Test(TestFramework): "auto_sync_enabled": True, "max_sequential_workers": 0, "max_random_workers": 3, - "neighbors_only": True, } } diff --git a/tests/sync_test.py b/tests/sync_test.py index 45f5bb5..e2e8f7f 100755 --- a/tests/sync_test.py +++ b/tests/sync_test.py @@ -45,13 +45,15 @@ class SyncTest(TestFramework): wait_until(lambda: client2.zgs_get_file_info(data_root) is not None) time.sleep(3) assert_equal(client2.zgs_get_file_info(data_root)["finalized"], False) - assert(client2.admin_get_file_location(0) is None) + # file sync use ASK_FILE & ANSWER FILE protocol, and do not cache file announcement anymore. + # assert(client2.admin_get_file_location(0) is None) # Trigger file sync by rpc assert(client2.admin_start_sync_file(0) is None) wait_until(lambda: client2.sync_status_is_completed_or_unknown(0)) wait_until(lambda: client2.zgs_get_file_info(data_root)["finalized"]) - assert(client2.admin_get_file_location(0) is not None) + # file sync use ASK_FILE & ANSWER FILE protocol, and do not cache file announcement anymore. + # assert(client2.admin_get_file_location(0) is not None) # Validate data assert_equal(