mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-04-04 15:35:18 +00:00
Compare commits
9 Commits
cebc3c8247
...
cc6cf0fdb2
Author | SHA1 | Date | |
---|---|---|---|
![]() |
cc6cf0fdb2 | ||
![]() |
51a2305c2d | ||
![]() |
98c62b314e | ||
![]() |
da903fefe7 | ||
![]() |
98438fd0e5 | ||
![]() |
247b1aaf8f | ||
![]() |
3e46e10a72 | ||
![]() |
43afc79a4a | ||
![]() |
0f93b035f1 |
@ -39,7 +39,7 @@ pub struct GossipCacheBuilder {
|
||||
default_timeout: Option<Duration>,
|
||||
/// Timeout for Example messages.
|
||||
example: Option<Duration>,
|
||||
/// Timeout for NewFile messges.
|
||||
/// Timeout for NewFile messages.
|
||||
new_file: Option<Duration>,
|
||||
/// Timeout for blocks FindFile messages.
|
||||
find_file: Option<Duration>,
|
||||
|
@ -93,7 +93,10 @@ pub use peer_manager::{
|
||||
};
|
||||
pub use service::{load_private_key, Context, Libp2pEvent, Service, NETWORK_KEY_FILENAME};
|
||||
|
||||
pub const PROTOCOL_VERSION: [u8; 3] = [0, 1, 0];
|
||||
/// Defines the current P2P protocol version.
|
||||
/// - v1: Broadcast FindFile & AnnounceFile messages in the whole network, which caused network too heavey.
|
||||
/// - v2: Publish NewFile to neighbors only and announce file via RPC message.
|
||||
pub const PROTOCOL_VERSION: [u8; 3] = [0, 2, 0];
|
||||
|
||||
/// Application level requests sent to the network.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
|
@ -114,6 +114,7 @@ impl ssz::Decode for WrappedPeerId {
|
||||
}
|
||||
}
|
||||
|
||||
/// Published when file uploaded or completed to sync from other peers.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)]
|
||||
pub struct NewFile {
|
||||
pub tx_id: TxID,
|
||||
@ -127,6 +128,7 @@ pub struct FindFile {
|
||||
pub tx_id: TxID,
|
||||
pub num_shard: usize,
|
||||
pub shard_id: usize,
|
||||
/// Indicates whether publish to neighboar nodes only.
|
||||
pub neighbors_only: bool,
|
||||
pub timestamp: u32,
|
||||
}
|
||||
|
@ -30,8 +30,12 @@ use crate::peer_manager::PeerManager;
|
||||
use crate::Config;
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
/// Timeout to publish NewFile message to neighbor nodes.
|
||||
pub static ref NEW_FILE_TIMEOUT: chrono::Duration = chrono::Duration::seconds(30);
|
||||
pub static ref FIND_FILE_TIMEOUT: chrono::Duration = chrono::Duration::seconds(30);
|
||||
/// Timeout to publish FindFile message to neighbor nodes.
|
||||
pub static ref FIND_FILE_NEIGHBORS_TIMEOUT: chrono::Duration = chrono::Duration::seconds(30);
|
||||
/// Timeout to publish FindFile message in the whole network.
|
||||
pub static ref FIND_FILE_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5);
|
||||
pub static ref ANNOUNCE_FILE_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5);
|
||||
pub static ref ANNOUNCE_SHARD_CONFIG_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5);
|
||||
pub static ref TOLERABLE_DRIFT: chrono::Duration = chrono::Duration::seconds(10);
|
||||
@ -236,7 +240,7 @@ impl Libp2pEventHandler {
|
||||
peer_id,
|
||||
action: PeerAction::Fatal,
|
||||
source: ReportSource::RPC,
|
||||
msg: "Invalid shard config in FileAnnouncement",
|
||||
msg: "Invalid shard config in AnnounceFile RPC message",
|
||||
}),
|
||||
}
|
||||
}
|
||||
@ -339,11 +343,11 @@ impl Libp2pEventHandler {
|
||||
PubsubMessage::ExampleMessage(_) => MessageAcceptance::Ignore,
|
||||
PubsubMessage::NewFile(msg) => {
|
||||
metrics::LIBP2P_HANDLE_PUBSUB_NEW_FILE.mark(1);
|
||||
self.on_new_file(source, msg).await
|
||||
self.on_new_file(propagation_source, msg).await
|
||||
}
|
||||
PubsubMessage::FindFile(msg) => {
|
||||
metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE.mark(1);
|
||||
self.on_find_file(source, msg).await
|
||||
self.on_find_file(propagation_source, msg).await
|
||||
}
|
||||
PubsubMessage::FindChunks(msg) => {
|
||||
metrics::LIBP2P_HANDLE_PUBSUB_FIND_CHUNKS.mark(1);
|
||||
@ -373,6 +377,7 @@ impl Libp2pEventHandler {
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle NewFile pubsub message `msg` that published by `from` peer.
|
||||
async fn on_new_file(&self, from: PeerId, msg: NewFile) -> MessageAcceptance {
|
||||
// verify timestamp
|
||||
let d = duration_since(
|
||||
@ -397,6 +402,12 @@ impl Libp2pEventHandler {
|
||||
Err(_) => return MessageAcceptance::Reject,
|
||||
};
|
||||
|
||||
// ignore if shard config mismatch
|
||||
let my_shard_config = self.store.get_store().get_shard_config();
|
||||
if !my_shard_config.intersect(&announced_shard_config) {
|
||||
return MessageAcceptance::Ignore;
|
||||
}
|
||||
|
||||
// ignore if already exists
|
||||
match self.store.check_tx_completed(msg.tx_id.seq).await {
|
||||
Ok(true) => return MessageAcceptance::Ignore,
|
||||
@ -417,14 +428,8 @@ impl Libp2pEventHandler {
|
||||
}
|
||||
}
|
||||
|
||||
// notify sync layer if shard config matches
|
||||
let my_shard_config = self.store.get_store().get_shard_config();
|
||||
if my_shard_config.intersect(&announced_shard_config) {
|
||||
// notify sync layer to handle in advance
|
||||
self.send_to_sync(SyncMessage::NewFile { from, msg });
|
||||
}
|
||||
|
||||
self.file_location_cache
|
||||
.insert_peer_config(from, announced_shard_config);
|
||||
|
||||
MessageAcceptance::Ignore
|
||||
}
|
||||
@ -566,7 +571,7 @@ impl Libp2pEventHandler {
|
||||
Some(PubsubMessage::AnnounceShardConfig(signed))
|
||||
}
|
||||
|
||||
async fn on_find_file(&self, peer_id: PeerId, msg: FindFile) -> MessageAcceptance {
|
||||
async fn on_find_file(&self, from: PeerId, msg: FindFile) -> MessageAcceptance {
|
||||
let FindFile {
|
||||
tx_id, timestamp, ..
|
||||
} = msg;
|
||||
@ -576,12 +581,17 @@ impl Libp2pEventHandler {
|
||||
timestamp,
|
||||
metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_LATENCY.clone(),
|
||||
);
|
||||
if d < TOLERABLE_DRIFT.neg() || d > *FIND_FILE_TIMEOUT {
|
||||
let timeout = if msg.neighbors_only {
|
||||
*FIND_FILE_NEIGHBORS_TIMEOUT
|
||||
} else {
|
||||
*FIND_FILE_TIMEOUT
|
||||
};
|
||||
if d < TOLERABLE_DRIFT.neg() || d > timeout {
|
||||
debug!(%timestamp, ?d, "Invalid timestamp, ignoring FindFile message");
|
||||
metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_TIMEOUT.mark(1);
|
||||
if msg.neighbors_only {
|
||||
self.send_to_network(NetworkMessage::ReportPeer {
|
||||
peer_id,
|
||||
peer_id: from,
|
||||
action: PeerAction::LowToleranceError,
|
||||
source: ReportSource::Gossipsub,
|
||||
msg: "Received out of date FindFile message",
|
||||
@ -596,7 +606,7 @@ impl Libp2pEventHandler {
|
||||
Err(_) => return MessageAcceptance::Reject,
|
||||
};
|
||||
|
||||
// ignore if shard config mismatch
|
||||
// handle on shard config mismatch
|
||||
let my_shard_config = self.store.get_store().get_shard_config();
|
||||
if !my_shard_config.intersect(&announced_shard_config) {
|
||||
return if msg.neighbors_only {
|
||||
@ -606,10 +616,6 @@ impl Libp2pEventHandler {
|
||||
};
|
||||
}
|
||||
|
||||
// update peer shard config in cache
|
||||
self.file_location_cache
|
||||
.insert_peer_config(peer_id, announced_shard_config);
|
||||
|
||||
// check if we have it
|
||||
if matches!(self.store.check_tx_completed(tx_id.seq).await, Ok(true)) {
|
||||
if let Ok(Some(tx)) = self.store.get_tx_by_seq_number(tx_id.seq).await {
|
||||
@ -617,8 +623,9 @@ impl Libp2pEventHandler {
|
||||
trace!(?tx_id, "Found file locally, responding to FindFile query");
|
||||
|
||||
if msg.neighbors_only {
|
||||
// announce file via RPC to avoid flooding pubsub message
|
||||
self.send_to_network(NetworkMessage::SendRequest {
|
||||
peer_id,
|
||||
peer_id: from,
|
||||
request: Request::AnnounceFile(FileAnnouncement {
|
||||
tx_id,
|
||||
num_shard: my_shard_config.num_shard,
|
||||
@ -634,8 +641,8 @@ impl Libp2pEventHandler {
|
||||
}
|
||||
}
|
||||
|
||||
// do not forward to whole network if only find file from neighbor nodes
|
||||
if msg.neighbors_only {
|
||||
// do not forward to other peers anymore
|
||||
return MessageAcceptance::Ignore;
|
||||
}
|
||||
|
||||
|
@ -35,17 +35,21 @@ impl AutoSyncManager {
|
||||
executor: &TaskExecutor,
|
||||
store: Store,
|
||||
sync_send: SyncSender,
|
||||
_log_sync_recv: broadcast::Receiver<LogSyncEvent>,
|
||||
log_sync_recv: broadcast::Receiver<LogSyncEvent>,
|
||||
catch_up_end_recv: oneshot::Receiver<()>,
|
||||
) -> Result<Self> {
|
||||
let (file_announcement_send, _file_announcement_recv) = unbounded_channel();
|
||||
let (file_announcement_send, file_announcement_recv) = unbounded_channel();
|
||||
let (new_file_send, new_file_recv) = unbounded_channel();
|
||||
let sync_store = if config.neighbors_only {
|
||||
// use v2 db to avoid reading v1 files that announced from the whole network instead of neighbors
|
||||
let sync_store = Arc::new(SyncStore::new_with_name(
|
||||
Arc::new(SyncStore::new_with_name(
|
||||
store.clone(),
|
||||
"pendingv2",
|
||||
"readyv2",
|
||||
));
|
||||
))
|
||||
} else {
|
||||
Arc::new(SyncStore::new(store.clone()))
|
||||
};
|
||||
let catched_up = Arc::new(AtomicBool::new(false));
|
||||
|
||||
// handle new file
|
||||
@ -55,15 +59,21 @@ impl AutoSyncManager {
|
||||
);
|
||||
|
||||
// sync in sequence
|
||||
// let serial =
|
||||
// SerialBatcher::new(config, store.clone(), sync_send.clone(), sync_store.clone())
|
||||
// .await?;
|
||||
// executor.spawn(
|
||||
// serial
|
||||
// .clone()
|
||||
// .start(file_announcement_recv, log_sync_recv, catched_up.clone()),
|
||||
// "auto_sync_serial",
|
||||
// );
|
||||
let serial = if config.neighbors_only {
|
||||
None
|
||||
} else {
|
||||
let serial =
|
||||
SerialBatcher::new(config, store.clone(), sync_send.clone(), sync_store.clone())
|
||||
.await?;
|
||||
executor.spawn(
|
||||
serial
|
||||
.clone()
|
||||
.start(file_announcement_recv, log_sync_recv, catched_up.clone()),
|
||||
"auto_sync_serial",
|
||||
);
|
||||
|
||||
Some(serial)
|
||||
};
|
||||
|
||||
// sync randomly
|
||||
let random = RandomBatcher::new(config, store, sync_send, sync_store);
|
||||
@ -76,7 +86,7 @@ impl AutoSyncManager {
|
||||
);
|
||||
|
||||
Ok(Self {
|
||||
serial: None,
|
||||
serial,
|
||||
random,
|
||||
file_announcement_send,
|
||||
new_file_send,
|
||||
|
@ -89,9 +89,6 @@ pub struct SerialSyncController {
|
||||
|
||||
/// Cache for storing and serving gossip messages.
|
||||
file_location_cache: Arc<FileLocationCache>,
|
||||
|
||||
/// Whether to find files from neighbors only.
|
||||
neighbors_only: bool,
|
||||
}
|
||||
|
||||
impl SerialSyncController {
|
||||
@ -118,7 +115,6 @@ impl SerialSyncController {
|
||||
ctx,
|
||||
store,
|
||||
file_location_cache,
|
||||
neighbors_only: true,
|
||||
}
|
||||
}
|
||||
|
||||
@ -167,7 +163,7 @@ impl SerialSyncController {
|
||||
let (published, num_new_peers) = if !self.goal.is_all_chunks() {
|
||||
self.publish_find_chunks();
|
||||
(true, 0)
|
||||
} else if self.neighbors_only {
|
||||
} else if self.config.neighbors_only {
|
||||
self.do_publish_find_file();
|
||||
(true, 0)
|
||||
} else {
|
||||
@ -219,7 +215,7 @@ impl SerialSyncController {
|
||||
tx_id: self.tx_id,
|
||||
num_shard: shard_config.num_shard,
|
||||
shard_id: shard_config.shard_id,
|
||||
neighbors_only: self.neighbors_only,
|
||||
neighbors_only: self.config.neighbors_only,
|
||||
timestamp: timestamp_now(),
|
||||
}));
|
||||
}
|
||||
@ -570,6 +566,7 @@ impl SerialSyncController {
|
||||
info!(%self.tx_seq, "Succeeded to finalize file");
|
||||
self.state = SyncState::Completed;
|
||||
metrics::SERIAL_SYNC_FILE_COMPLETED.update_since(self.since.0);
|
||||
// notify neighbor nodes about new file completed to sync
|
||||
self.ctx
|
||||
.send(NetworkMessage::AnnounceLocalFile { tx_id: self.tx_id });
|
||||
}
|
||||
@ -666,7 +663,7 @@ impl SerialSyncController {
|
||||
} else {
|
||||
// FindFile timeout
|
||||
if since.elapsed() >= self.config.peer_find_timeout {
|
||||
if self.neighbors_only {
|
||||
if self.config.neighbors_only {
|
||||
self.state = SyncState::Failed {
|
||||
reason: FailureReason::TimeoutFindFile,
|
||||
};
|
||||
@ -1547,6 +1544,10 @@ mod tests {
|
||||
|
||||
controller.on_response(peer_id, chunks).await;
|
||||
assert_eq!(*controller.get_status(), SyncState::Completed);
|
||||
assert!(matches!(
|
||||
network_recv.try_recv().unwrap(),
|
||||
NetworkMessage::AnnounceLocalFile { .. }
|
||||
));
|
||||
assert!(network_recv.try_recv().is_err());
|
||||
}
|
||||
|
||||
|
@ -21,6 +21,10 @@ use std::{
|
||||
#[serde(default)]
|
||||
pub struct Config {
|
||||
// sync service config
|
||||
/// Indicates whether to sync file from neighbor nodes only.
|
||||
/// This is to avoid flooding file announcements in the whole network,
|
||||
/// which leads to high latency or even timeout to sync files.
|
||||
pub neighbors_only: bool,
|
||||
#[serde(deserialize_with = "deserialize_duration")]
|
||||
pub heartbeat_interval: Duration,
|
||||
pub auto_sync_enabled: bool,
|
||||
@ -64,6 +68,7 @@ impl Default for Config {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
// sync service config
|
||||
neighbors_only: false,
|
||||
heartbeat_interval: Duration::from_secs(5),
|
||||
auto_sync_enabled: false,
|
||||
max_sync_files: 8,
|
||||
|
@ -774,7 +774,7 @@ impl SyncService {
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle on NewFile gossip message received.
|
||||
/// Handle on `NewFile` gossip message received.
|
||||
async fn on_new_file_gossip(&mut self, from: PeerId, msg: NewFile) {
|
||||
debug!(%from, ?msg, "Received NewFile gossip");
|
||||
|
||||
@ -789,14 +789,14 @@ impl SyncService {
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle on AnnounceFile RPC message received.
|
||||
async fn on_announce_file(&mut self, peer_id: PeerId, announcement: FileAnnouncement) {
|
||||
if let Some(controller) = self.controllers.get_mut(&announcement.tx_id.seq) {
|
||||
/// Handle on `AnnounceFile` RPC message received.
|
||||
async fn on_announce_file(&mut self, from: PeerId, announcement: FileAnnouncement) {
|
||||
// Notify new peer announced if file already in sync
|
||||
if let Some(controller) = self.controllers.get_mut(&announcement.tx_id.seq) {
|
||||
if let Ok(shard_config) =
|
||||
ShardConfig::new(announcement.shard_id, announcement.num_shard)
|
||||
{
|
||||
controller.on_peer_announced(peer_id, shard_config);
|
||||
controller.on_peer_announced(from, shard_config);
|
||||
controller.transition();
|
||||
}
|
||||
}
|
||||
@ -1558,6 +1558,10 @@ mod tests {
|
||||
.await;
|
||||
|
||||
wait_for_tx_finalized(runtime.store.clone(), tx_seq).await;
|
||||
assert!(matches!(
|
||||
runtime.network_recv.try_recv().unwrap(),
|
||||
NetworkMessage::AnnounceLocalFile { .. }
|
||||
));
|
||||
|
||||
assert!(!runtime.store.check_tx_completed(0).unwrap());
|
||||
|
||||
@ -1582,6 +1586,10 @@ mod tests {
|
||||
.await;
|
||||
|
||||
wait_for_tx_finalized(runtime.store, tx_seq).await;
|
||||
assert!(matches!(
|
||||
runtime.network_recv.try_recv().unwrap(),
|
||||
NetworkMessage::AnnounceLocalFile { .. }
|
||||
));
|
||||
|
||||
sync_send
|
||||
.notify(SyncMessage::PeerDisconnected {
|
||||
|
@ -232,6 +232,10 @@ batcher_announcement_capacity = 100
|
||||
# all files, and sufficient disk space is required.
|
||||
auto_sync_enabled = true
|
||||
|
||||
# Indicates whether to sync file from neighbor nodes only. This is to avoid flooding file
|
||||
# announcements in the whole network, which leads to high latency or even timeout to sync files.
|
||||
neighbors_only = true
|
||||
|
||||
# Maximum number of files in sync from other peers simultaneously.
|
||||
# max_sync_files = 8
|
||||
|
||||
|
@ -244,6 +244,10 @@ batcher_announcement_capacity = 100
|
||||
# all files, and sufficient disk space is required.
|
||||
auto_sync_enabled = true
|
||||
|
||||
# Indicates whether to sync file from neighbor nodes only. This is to avoid flooding file
|
||||
# announcements in the whole network, which leads to high latency or even timeout to sync files.
|
||||
neighbors_only = true
|
||||
|
||||
# Maximum number of files in sync from other peers simultaneously.
|
||||
# max_sync_files = 8
|
||||
|
||||
|
@ -246,6 +246,10 @@
|
||||
# all files, and sufficient disk space is required.
|
||||
# auto_sync_enabled = false
|
||||
|
||||
# Indicates whether to sync file from neighbor nodes only. This is to avoid flooding file
|
||||
# announcements in the whole network, which leads to high latency or even timeout to sync files.
|
||||
neighbors_only = true
|
||||
|
||||
# Maximum number of files in sync from other peers simultaneously.
|
||||
# max_sync_files = 8
|
||||
|
||||
|
34
tests/sync_auto_random_v2_test.py
Normal file
34
tests/sync_auto_random_v2_test.py
Normal file
@ -0,0 +1,34 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
from test_framework.test_framework import TestFramework
|
||||
from utility.utils import wait_until
|
||||
|
||||
class AutoRandomSyncV2Test(TestFramework):
|
||||
def setup_params(self):
|
||||
self.num_nodes = 4
|
||||
|
||||
# Enable random auto sync v2
|
||||
for i in range(self.num_nodes):
|
||||
self.zgs_node_configs[i] = {
|
||||
"sync": {
|
||||
"auto_sync_enabled": True,
|
||||
"max_sequential_workers": 0,
|
||||
"max_random_workers": 3,
|
||||
"neighbors_only": True,
|
||||
}
|
||||
}
|
||||
|
||||
def run_test(self):
|
||||
# Submit and upload files on node 0
|
||||
data_root_1 = self.__upload_file__(0, 256 * 1024)
|
||||
data_root_2 = self.__upload_file__(0, 256 * 1024)
|
||||
|
||||
# Files should be available on other nodes via auto sync
|
||||
for i in range(1, self.num_nodes):
|
||||
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_1) is not None)
|
||||
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_1)["finalized"])
|
||||
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2) is not None)
|
||||
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2)["finalized"])
|
||||
|
||||
if __name__ == "__main__":
|
||||
AutoRandomSyncV2Test().main()
|
Loading…
Reference in New Issue
Block a user