Compare commits

..

9 Commits

Author SHA1 Message Date
boqiu
cc6cf0fdb2 Enable file sync protocol v2 in config file by default 2024-10-25 15:01:32 +08:00
boqiu
51a2305c2d Add comments 2024-10-25 14:57:30 +08:00
boqiu
98c62b314e fix random test failure 2024-10-25 12:23:08 +08:00
boqiu
da903fefe7 remove dummy code in py test 2024-10-25 11:56:21 +08:00
boqiu
98438fd0e5 fmt code 2024-10-25 11:31:54 +08:00
boqiu
247b1aaf8f Add py test for auto sync v2 2024-10-25 11:09:30 +08:00
boqiu
3e46e10a72 Ignore py tests of sequential auto sync 2024-10-25 10:53:49 +08:00
boqiu
43afc79a4a Change P2P protocol version 2024-10-25 10:53:16 +08:00
boqiu
0f93b035f1 fix unit test failures 2024-10-24 20:01:57 +08:00
14 changed files with 136 additions and 54 deletions

View File

@ -39,7 +39,7 @@ pub struct GossipCacheBuilder {
default_timeout: Option<Duration>,
/// Timeout for Example messages.
example: Option<Duration>,
/// Timeout for NewFile messges.
/// Timeout for NewFile messages.
new_file: Option<Duration>,
/// Timeout for blocks FindFile messages.
find_file: Option<Duration>,

View File

@ -93,7 +93,10 @@ pub use peer_manager::{
};
pub use service::{load_private_key, Context, Libp2pEvent, Service, NETWORK_KEY_FILENAME};
pub const PROTOCOL_VERSION: [u8; 3] = [0, 1, 0];
/// Defines the current P2P protocol version.
/// - v1: Broadcast FindFile & AnnounceFile messages in the whole network, which caused network too heavey.
/// - v2: Publish NewFile to neighbors only and announce file via RPC message.
pub const PROTOCOL_VERSION: [u8; 3] = [0, 2, 0];
/// Application level requests sent to the network.
#[derive(Debug, Clone, Copy)]

View File

@ -114,6 +114,7 @@ impl ssz::Decode for WrappedPeerId {
}
}
/// Published when file uploaded or completed to sync from other peers.
#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)]
pub struct NewFile {
pub tx_id: TxID,
@ -127,6 +128,7 @@ pub struct FindFile {
pub tx_id: TxID,
pub num_shard: usize,
pub shard_id: usize,
/// Indicates whether publish to neighboar nodes only.
pub neighbors_only: bool,
pub timestamp: u32,
}

View File

@ -30,8 +30,12 @@ use crate::peer_manager::PeerManager;
use crate::Config;
lazy_static::lazy_static! {
/// Timeout to publish NewFile message to neighbor nodes.
pub static ref NEW_FILE_TIMEOUT: chrono::Duration = chrono::Duration::seconds(30);
pub static ref FIND_FILE_TIMEOUT: chrono::Duration = chrono::Duration::seconds(30);
/// Timeout to publish FindFile message to neighbor nodes.
pub static ref FIND_FILE_NEIGHBORS_TIMEOUT: chrono::Duration = chrono::Duration::seconds(30);
/// Timeout to publish FindFile message in the whole network.
pub static ref FIND_FILE_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5);
pub static ref ANNOUNCE_FILE_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5);
pub static ref ANNOUNCE_SHARD_CONFIG_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5);
pub static ref TOLERABLE_DRIFT: chrono::Duration = chrono::Duration::seconds(10);
@ -236,7 +240,7 @@ impl Libp2pEventHandler {
peer_id,
action: PeerAction::Fatal,
source: ReportSource::RPC,
msg: "Invalid shard config in FileAnnouncement",
msg: "Invalid shard config in AnnounceFile RPC message",
}),
}
}
@ -339,11 +343,11 @@ impl Libp2pEventHandler {
PubsubMessage::ExampleMessage(_) => MessageAcceptance::Ignore,
PubsubMessage::NewFile(msg) => {
metrics::LIBP2P_HANDLE_PUBSUB_NEW_FILE.mark(1);
self.on_new_file(source, msg).await
self.on_new_file(propagation_source, msg).await
}
PubsubMessage::FindFile(msg) => {
metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE.mark(1);
self.on_find_file(source, msg).await
self.on_find_file(propagation_source, msg).await
}
PubsubMessage::FindChunks(msg) => {
metrics::LIBP2P_HANDLE_PUBSUB_FIND_CHUNKS.mark(1);
@ -373,6 +377,7 @@ impl Libp2pEventHandler {
}
}
/// Handle NewFile pubsub message `msg` that published by `from` peer.
async fn on_new_file(&self, from: PeerId, msg: NewFile) -> MessageAcceptance {
// verify timestamp
let d = duration_since(
@ -397,6 +402,12 @@ impl Libp2pEventHandler {
Err(_) => return MessageAcceptance::Reject,
};
// ignore if shard config mismatch
let my_shard_config = self.store.get_store().get_shard_config();
if !my_shard_config.intersect(&announced_shard_config) {
return MessageAcceptance::Ignore;
}
// ignore if already exists
match self.store.check_tx_completed(msg.tx_id.seq).await {
Ok(true) => return MessageAcceptance::Ignore,
@ -417,14 +428,8 @@ impl Libp2pEventHandler {
}
}
// notify sync layer if shard config matches
let my_shard_config = self.store.get_store().get_shard_config();
if my_shard_config.intersect(&announced_shard_config) {
self.send_to_sync(SyncMessage::NewFile { from, msg });
}
self.file_location_cache
.insert_peer_config(from, announced_shard_config);
// notify sync layer to handle in advance
self.send_to_sync(SyncMessage::NewFile { from, msg });
MessageAcceptance::Ignore
}
@ -566,7 +571,7 @@ impl Libp2pEventHandler {
Some(PubsubMessage::AnnounceShardConfig(signed))
}
async fn on_find_file(&self, peer_id: PeerId, msg: FindFile) -> MessageAcceptance {
async fn on_find_file(&self, from: PeerId, msg: FindFile) -> MessageAcceptance {
let FindFile {
tx_id, timestamp, ..
} = msg;
@ -576,12 +581,17 @@ impl Libp2pEventHandler {
timestamp,
metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_LATENCY.clone(),
);
if d < TOLERABLE_DRIFT.neg() || d > *FIND_FILE_TIMEOUT {
let timeout = if msg.neighbors_only {
*FIND_FILE_NEIGHBORS_TIMEOUT
} else {
*FIND_FILE_TIMEOUT
};
if d < TOLERABLE_DRIFT.neg() || d > timeout {
debug!(%timestamp, ?d, "Invalid timestamp, ignoring FindFile message");
metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_TIMEOUT.mark(1);
if msg.neighbors_only {
self.send_to_network(NetworkMessage::ReportPeer {
peer_id,
peer_id: from,
action: PeerAction::LowToleranceError,
source: ReportSource::Gossipsub,
msg: "Received out of date FindFile message",
@ -596,7 +606,7 @@ impl Libp2pEventHandler {
Err(_) => return MessageAcceptance::Reject,
};
// ignore if shard config mismatch
// handle on shard config mismatch
let my_shard_config = self.store.get_store().get_shard_config();
if !my_shard_config.intersect(&announced_shard_config) {
return if msg.neighbors_only {
@ -606,10 +616,6 @@ impl Libp2pEventHandler {
};
}
// update peer shard config in cache
self.file_location_cache
.insert_peer_config(peer_id, announced_shard_config);
// check if we have it
if matches!(self.store.check_tx_completed(tx_id.seq).await, Ok(true)) {
if let Ok(Some(tx)) = self.store.get_tx_by_seq_number(tx_id.seq).await {
@ -617,8 +623,9 @@ impl Libp2pEventHandler {
trace!(?tx_id, "Found file locally, responding to FindFile query");
if msg.neighbors_only {
// announce file via RPC to avoid flooding pubsub message
self.send_to_network(NetworkMessage::SendRequest {
peer_id,
peer_id: from,
request: Request::AnnounceFile(FileAnnouncement {
tx_id,
num_shard: my_shard_config.num_shard,
@ -634,8 +641,8 @@ impl Libp2pEventHandler {
}
}
// do not forward to whole network if only find file from neighbor nodes
if msg.neighbors_only {
// do not forward to other peers anymore
return MessageAcceptance::Ignore;
}

View File

@ -35,17 +35,21 @@ impl AutoSyncManager {
executor: &TaskExecutor,
store: Store,
sync_send: SyncSender,
_log_sync_recv: broadcast::Receiver<LogSyncEvent>,
log_sync_recv: broadcast::Receiver<LogSyncEvent>,
catch_up_end_recv: oneshot::Receiver<()>,
) -> Result<Self> {
let (file_announcement_send, _file_announcement_recv) = unbounded_channel();
let (file_announcement_send, file_announcement_recv) = unbounded_channel();
let (new_file_send, new_file_recv) = unbounded_channel();
// use v2 db to avoid reading v1 files that announced from the whole network instead of neighbors
let sync_store = Arc::new(SyncStore::new_with_name(
store.clone(),
"pendingv2",
"readyv2",
));
let sync_store = if config.neighbors_only {
// use v2 db to avoid reading v1 files that announced from the whole network instead of neighbors
Arc::new(SyncStore::new_with_name(
store.clone(),
"pendingv2",
"readyv2",
))
} else {
Arc::new(SyncStore::new(store.clone()))
};
let catched_up = Arc::new(AtomicBool::new(false));
// handle new file
@ -55,15 +59,21 @@ impl AutoSyncManager {
);
// sync in sequence
// let serial =
// SerialBatcher::new(config, store.clone(), sync_send.clone(), sync_store.clone())
// .await?;
// executor.spawn(
// serial
// .clone()
// .start(file_announcement_recv, log_sync_recv, catched_up.clone()),
// "auto_sync_serial",
// );
let serial = if config.neighbors_only {
None
} else {
let serial =
SerialBatcher::new(config, store.clone(), sync_send.clone(), sync_store.clone())
.await?;
executor.spawn(
serial
.clone()
.start(file_announcement_recv, log_sync_recv, catched_up.clone()),
"auto_sync_serial",
);
Some(serial)
};
// sync randomly
let random = RandomBatcher::new(config, store, sync_send, sync_store);
@ -76,7 +86,7 @@ impl AutoSyncManager {
);
Ok(Self {
serial: None,
serial,
random,
file_announcement_send,
new_file_send,

View File

@ -89,9 +89,6 @@ pub struct SerialSyncController {
/// Cache for storing and serving gossip messages.
file_location_cache: Arc<FileLocationCache>,
/// Whether to find files from neighbors only.
neighbors_only: bool,
}
impl SerialSyncController {
@ -118,7 +115,6 @@ impl SerialSyncController {
ctx,
store,
file_location_cache,
neighbors_only: true,
}
}
@ -167,7 +163,7 @@ impl SerialSyncController {
let (published, num_new_peers) = if !self.goal.is_all_chunks() {
self.publish_find_chunks();
(true, 0)
} else if self.neighbors_only {
} else if self.config.neighbors_only {
self.do_publish_find_file();
(true, 0)
} else {
@ -219,7 +215,7 @@ impl SerialSyncController {
tx_id: self.tx_id,
num_shard: shard_config.num_shard,
shard_id: shard_config.shard_id,
neighbors_only: self.neighbors_only,
neighbors_only: self.config.neighbors_only,
timestamp: timestamp_now(),
}));
}
@ -570,6 +566,7 @@ impl SerialSyncController {
info!(%self.tx_seq, "Succeeded to finalize file");
self.state = SyncState::Completed;
metrics::SERIAL_SYNC_FILE_COMPLETED.update_since(self.since.0);
// notify neighbor nodes about new file completed to sync
self.ctx
.send(NetworkMessage::AnnounceLocalFile { tx_id: self.tx_id });
}
@ -666,7 +663,7 @@ impl SerialSyncController {
} else {
// FindFile timeout
if since.elapsed() >= self.config.peer_find_timeout {
if self.neighbors_only {
if self.config.neighbors_only {
self.state = SyncState::Failed {
reason: FailureReason::TimeoutFindFile,
};
@ -1547,6 +1544,10 @@ mod tests {
controller.on_response(peer_id, chunks).await;
assert_eq!(*controller.get_status(), SyncState::Completed);
assert!(matches!(
network_recv.try_recv().unwrap(),
NetworkMessage::AnnounceLocalFile { .. }
));
assert!(network_recv.try_recv().is_err());
}

View File

@ -21,6 +21,10 @@ use std::{
#[serde(default)]
pub struct Config {
// sync service config
/// Indicates whether to sync file from neighbor nodes only.
/// This is to avoid flooding file announcements in the whole network,
/// which leads to high latency or even timeout to sync files.
pub neighbors_only: bool,
#[serde(deserialize_with = "deserialize_duration")]
pub heartbeat_interval: Duration,
pub auto_sync_enabled: bool,
@ -64,6 +68,7 @@ impl Default for Config {
fn default() -> Self {
Self {
// sync service config
neighbors_only: false,
heartbeat_interval: Duration::from_secs(5),
auto_sync_enabled: false,
max_sync_files: 8,

View File

@ -774,7 +774,7 @@ impl SyncService {
}
}
/// Handle on NewFile gossip message received.
/// Handle on `NewFile` gossip message received.
async fn on_new_file_gossip(&mut self, from: PeerId, msg: NewFile) {
debug!(%from, ?msg, "Received NewFile gossip");
@ -789,14 +789,14 @@ impl SyncService {
}
}
/// Handle on AnnounceFile RPC message received.
async fn on_announce_file(&mut self, peer_id: PeerId, announcement: FileAnnouncement) {
/// Handle on `AnnounceFile` RPC message received.
async fn on_announce_file(&mut self, from: PeerId, announcement: FileAnnouncement) {
// Notify new peer announced if file already in sync
if let Some(controller) = self.controllers.get_mut(&announcement.tx_id.seq) {
// Notify new peer announced if file already in sync
if let Ok(shard_config) =
ShardConfig::new(announcement.shard_id, announcement.num_shard)
{
controller.on_peer_announced(peer_id, shard_config);
controller.on_peer_announced(from, shard_config);
controller.transition();
}
}
@ -1558,6 +1558,10 @@ mod tests {
.await;
wait_for_tx_finalized(runtime.store.clone(), tx_seq).await;
assert!(matches!(
runtime.network_recv.try_recv().unwrap(),
NetworkMessage::AnnounceLocalFile { .. }
));
assert!(!runtime.store.check_tx_completed(0).unwrap());
@ -1582,6 +1586,10 @@ mod tests {
.await;
wait_for_tx_finalized(runtime.store, tx_seq).await;
assert!(matches!(
runtime.network_recv.try_recv().unwrap(),
NetworkMessage::AnnounceLocalFile { .. }
));
sync_send
.notify(SyncMessage::PeerDisconnected {

View File

@ -232,6 +232,10 @@ batcher_announcement_capacity = 100
# all files, and sufficient disk space is required.
auto_sync_enabled = true
# Indicates whether to sync file from neighbor nodes only. This is to avoid flooding file
# announcements in the whole network, which leads to high latency or even timeout to sync files.
neighbors_only = true
# Maximum number of files in sync from other peers simultaneously.
# max_sync_files = 8

View File

@ -244,6 +244,10 @@ batcher_announcement_capacity = 100
# all files, and sufficient disk space is required.
auto_sync_enabled = true
# Indicates whether to sync file from neighbor nodes only. This is to avoid flooding file
# announcements in the whole network, which leads to high latency or even timeout to sync files.
neighbors_only = true
# Maximum number of files in sync from other peers simultaneously.
# max_sync_files = 8

View File

@ -246,6 +246,10 @@
# all files, and sufficient disk space is required.
# auto_sync_enabled = false
# Indicates whether to sync file from neighbor nodes only. This is to avoid flooding file
# announcements in the whole network, which leads to high latency or even timeout to sync files.
neighbors_only = true
# Maximum number of files in sync from other peers simultaneously.
# max_sync_files = 8

View File

@ -0,0 +1,34 @@
#!/usr/bin/env python3
from test_framework.test_framework import TestFramework
from utility.utils import wait_until
class AutoRandomSyncV2Test(TestFramework):
def setup_params(self):
self.num_nodes = 4
# Enable random auto sync v2
for i in range(self.num_nodes):
self.zgs_node_configs[i] = {
"sync": {
"auto_sync_enabled": True,
"max_sequential_workers": 0,
"max_random_workers": 3,
"neighbors_only": True,
}
}
def run_test(self):
# Submit and upload files on node 0
data_root_1 = self.__upload_file__(0, 256 * 1024)
data_root_2 = self.__upload_file__(0, 256 * 1024)
# Files should be available on other nodes via auto sync
for i in range(1, self.num_nodes):
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_1) is not None)
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_1)["finalized"])
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2) is not None)
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2)["finalized"])
if __name__ == "__main__":
AutoRandomSyncV2Test().main()