mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-04-04 15:35:18 +00:00
Add comments
This commit is contained in:
parent
98c62b314e
commit
51a2305c2d
@ -39,7 +39,7 @@ pub struct GossipCacheBuilder {
|
|||||||
default_timeout: Option<Duration>,
|
default_timeout: Option<Duration>,
|
||||||
/// Timeout for Example messages.
|
/// Timeout for Example messages.
|
||||||
example: Option<Duration>,
|
example: Option<Duration>,
|
||||||
/// Timeout for NewFile messges.
|
/// Timeout for NewFile messages.
|
||||||
new_file: Option<Duration>,
|
new_file: Option<Duration>,
|
||||||
/// Timeout for blocks FindFile messages.
|
/// Timeout for blocks FindFile messages.
|
||||||
find_file: Option<Duration>,
|
find_file: Option<Duration>,
|
||||||
|
@ -114,6 +114,7 @@ impl ssz::Decode for WrappedPeerId {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Published when file uploaded or completed to sync from other peers.
|
||||||
#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)]
|
#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)]
|
||||||
pub struct NewFile {
|
pub struct NewFile {
|
||||||
pub tx_id: TxID,
|
pub tx_id: TxID,
|
||||||
@ -127,6 +128,7 @@ pub struct FindFile {
|
|||||||
pub tx_id: TxID,
|
pub tx_id: TxID,
|
||||||
pub num_shard: usize,
|
pub num_shard: usize,
|
||||||
pub shard_id: usize,
|
pub shard_id: usize,
|
||||||
|
/// Indicates whether publish to neighboar nodes only.
|
||||||
pub neighbors_only: bool,
|
pub neighbors_only: bool,
|
||||||
pub timestamp: u32,
|
pub timestamp: u32,
|
||||||
}
|
}
|
||||||
|
@ -30,8 +30,12 @@ use crate::peer_manager::PeerManager;
|
|||||||
use crate::Config;
|
use crate::Config;
|
||||||
|
|
||||||
lazy_static::lazy_static! {
|
lazy_static::lazy_static! {
|
||||||
|
/// Timeout to publish NewFile message to neighbor nodes.
|
||||||
pub static ref NEW_FILE_TIMEOUT: chrono::Duration = chrono::Duration::seconds(30);
|
pub static ref NEW_FILE_TIMEOUT: chrono::Duration = chrono::Duration::seconds(30);
|
||||||
pub static ref FIND_FILE_TIMEOUT: chrono::Duration = chrono::Duration::seconds(30);
|
/// Timeout to publish FindFile message to neighbor nodes.
|
||||||
|
pub static ref FIND_FILE_NEIGHBORS_TIMEOUT: chrono::Duration = chrono::Duration::seconds(30);
|
||||||
|
/// Timeout to publish FindFile message in the whole network.
|
||||||
|
pub static ref FIND_FILE_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5);
|
||||||
pub static ref ANNOUNCE_FILE_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5);
|
pub static ref ANNOUNCE_FILE_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5);
|
||||||
pub static ref ANNOUNCE_SHARD_CONFIG_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5);
|
pub static ref ANNOUNCE_SHARD_CONFIG_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5);
|
||||||
pub static ref TOLERABLE_DRIFT: chrono::Duration = chrono::Duration::seconds(10);
|
pub static ref TOLERABLE_DRIFT: chrono::Duration = chrono::Duration::seconds(10);
|
||||||
@ -236,7 +240,7 @@ impl Libp2pEventHandler {
|
|||||||
peer_id,
|
peer_id,
|
||||||
action: PeerAction::Fatal,
|
action: PeerAction::Fatal,
|
||||||
source: ReportSource::RPC,
|
source: ReportSource::RPC,
|
||||||
msg: "Invalid shard config in FileAnnouncement",
|
msg: "Invalid shard config in AnnounceFile RPC message",
|
||||||
}),
|
}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -339,11 +343,11 @@ impl Libp2pEventHandler {
|
|||||||
PubsubMessage::ExampleMessage(_) => MessageAcceptance::Ignore,
|
PubsubMessage::ExampleMessage(_) => MessageAcceptance::Ignore,
|
||||||
PubsubMessage::NewFile(msg) => {
|
PubsubMessage::NewFile(msg) => {
|
||||||
metrics::LIBP2P_HANDLE_PUBSUB_NEW_FILE.mark(1);
|
metrics::LIBP2P_HANDLE_PUBSUB_NEW_FILE.mark(1);
|
||||||
self.on_new_file(source, msg).await
|
self.on_new_file(propagation_source, msg).await
|
||||||
}
|
}
|
||||||
PubsubMessage::FindFile(msg) => {
|
PubsubMessage::FindFile(msg) => {
|
||||||
metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE.mark(1);
|
metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE.mark(1);
|
||||||
self.on_find_file(source, msg).await
|
self.on_find_file(propagation_source, msg).await
|
||||||
}
|
}
|
||||||
PubsubMessage::FindChunks(msg) => {
|
PubsubMessage::FindChunks(msg) => {
|
||||||
metrics::LIBP2P_HANDLE_PUBSUB_FIND_CHUNKS.mark(1);
|
metrics::LIBP2P_HANDLE_PUBSUB_FIND_CHUNKS.mark(1);
|
||||||
@ -373,6 +377,7 @@ impl Libp2pEventHandler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Handle NewFile pubsub message `msg` that published by `from` peer.
|
||||||
async fn on_new_file(&self, from: PeerId, msg: NewFile) -> MessageAcceptance {
|
async fn on_new_file(&self, from: PeerId, msg: NewFile) -> MessageAcceptance {
|
||||||
// verify timestamp
|
// verify timestamp
|
||||||
let d = duration_since(
|
let d = duration_since(
|
||||||
@ -397,9 +402,11 @@ impl Libp2pEventHandler {
|
|||||||
Err(_) => return MessageAcceptance::Reject,
|
Err(_) => return MessageAcceptance::Reject,
|
||||||
};
|
};
|
||||||
|
|
||||||
// update shard config cache
|
// ignore if shard config mismatch
|
||||||
self.file_location_cache
|
let my_shard_config = self.store.get_store().get_shard_config();
|
||||||
.insert_peer_config(from, announced_shard_config);
|
if !my_shard_config.intersect(&announced_shard_config) {
|
||||||
|
return MessageAcceptance::Ignore;
|
||||||
|
}
|
||||||
|
|
||||||
// ignore if already exists
|
// ignore if already exists
|
||||||
match self.store.check_tx_completed(msg.tx_id.seq).await {
|
match self.store.check_tx_completed(msg.tx_id.seq).await {
|
||||||
@ -421,11 +428,8 @@ impl Libp2pEventHandler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// notify sync layer if shard config matches
|
// notify sync layer to handle in advance
|
||||||
let my_shard_config = self.store.get_store().get_shard_config();
|
|
||||||
if my_shard_config.intersect(&announced_shard_config) {
|
|
||||||
self.send_to_sync(SyncMessage::NewFile { from, msg });
|
self.send_to_sync(SyncMessage::NewFile { from, msg });
|
||||||
}
|
|
||||||
|
|
||||||
MessageAcceptance::Ignore
|
MessageAcceptance::Ignore
|
||||||
}
|
}
|
||||||
@ -567,7 +571,7 @@ impl Libp2pEventHandler {
|
|||||||
Some(PubsubMessage::AnnounceShardConfig(signed))
|
Some(PubsubMessage::AnnounceShardConfig(signed))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn on_find_file(&self, peer_id: PeerId, msg: FindFile) -> MessageAcceptance {
|
async fn on_find_file(&self, from: PeerId, msg: FindFile) -> MessageAcceptance {
|
||||||
let FindFile {
|
let FindFile {
|
||||||
tx_id, timestamp, ..
|
tx_id, timestamp, ..
|
||||||
} = msg;
|
} = msg;
|
||||||
@ -577,12 +581,17 @@ impl Libp2pEventHandler {
|
|||||||
timestamp,
|
timestamp,
|
||||||
metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_LATENCY.clone(),
|
metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_LATENCY.clone(),
|
||||||
);
|
);
|
||||||
if d < TOLERABLE_DRIFT.neg() || d > *FIND_FILE_TIMEOUT {
|
let timeout = if msg.neighbors_only {
|
||||||
|
*FIND_FILE_NEIGHBORS_TIMEOUT
|
||||||
|
} else {
|
||||||
|
*FIND_FILE_TIMEOUT
|
||||||
|
};
|
||||||
|
if d < TOLERABLE_DRIFT.neg() || d > timeout {
|
||||||
debug!(%timestamp, ?d, "Invalid timestamp, ignoring FindFile message");
|
debug!(%timestamp, ?d, "Invalid timestamp, ignoring FindFile message");
|
||||||
metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_TIMEOUT.mark(1);
|
metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_TIMEOUT.mark(1);
|
||||||
if msg.neighbors_only {
|
if msg.neighbors_only {
|
||||||
self.send_to_network(NetworkMessage::ReportPeer {
|
self.send_to_network(NetworkMessage::ReportPeer {
|
||||||
peer_id,
|
peer_id: from,
|
||||||
action: PeerAction::LowToleranceError,
|
action: PeerAction::LowToleranceError,
|
||||||
source: ReportSource::Gossipsub,
|
source: ReportSource::Gossipsub,
|
||||||
msg: "Received out of date FindFile message",
|
msg: "Received out of date FindFile message",
|
||||||
@ -597,7 +606,7 @@ impl Libp2pEventHandler {
|
|||||||
Err(_) => return MessageAcceptance::Reject,
|
Err(_) => return MessageAcceptance::Reject,
|
||||||
};
|
};
|
||||||
|
|
||||||
// ignore if shard config mismatch
|
// handle on shard config mismatch
|
||||||
let my_shard_config = self.store.get_store().get_shard_config();
|
let my_shard_config = self.store.get_store().get_shard_config();
|
||||||
if !my_shard_config.intersect(&announced_shard_config) {
|
if !my_shard_config.intersect(&announced_shard_config) {
|
||||||
return if msg.neighbors_only {
|
return if msg.neighbors_only {
|
||||||
@ -607,10 +616,6 @@ impl Libp2pEventHandler {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
// update peer shard config in cache
|
|
||||||
self.file_location_cache
|
|
||||||
.insert_peer_config(peer_id, announced_shard_config);
|
|
||||||
|
|
||||||
// check if we have it
|
// check if we have it
|
||||||
if matches!(self.store.check_tx_completed(tx_id.seq).await, Ok(true)) {
|
if matches!(self.store.check_tx_completed(tx_id.seq).await, Ok(true)) {
|
||||||
if let Ok(Some(tx)) = self.store.get_tx_by_seq_number(tx_id.seq).await {
|
if let Ok(Some(tx)) = self.store.get_tx_by_seq_number(tx_id.seq).await {
|
||||||
@ -618,8 +623,9 @@ impl Libp2pEventHandler {
|
|||||||
trace!(?tx_id, "Found file locally, responding to FindFile query");
|
trace!(?tx_id, "Found file locally, responding to FindFile query");
|
||||||
|
|
||||||
if msg.neighbors_only {
|
if msg.neighbors_only {
|
||||||
|
// announce file via RPC to avoid flooding pubsub message
|
||||||
self.send_to_network(NetworkMessage::SendRequest {
|
self.send_to_network(NetworkMessage::SendRequest {
|
||||||
peer_id,
|
peer_id: from,
|
||||||
request: Request::AnnounceFile(FileAnnouncement {
|
request: Request::AnnounceFile(FileAnnouncement {
|
||||||
tx_id,
|
tx_id,
|
||||||
num_shard: my_shard_config.num_shard,
|
num_shard: my_shard_config.num_shard,
|
||||||
@ -635,8 +641,8 @@ impl Libp2pEventHandler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// do not forward to whole network if only find file from neighbor nodes
|
||||||
if msg.neighbors_only {
|
if msg.neighbors_only {
|
||||||
// do not forward to other peers anymore
|
|
||||||
return MessageAcceptance::Ignore;
|
return MessageAcceptance::Ignore;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -566,6 +566,7 @@ impl SerialSyncController {
|
|||||||
info!(%self.tx_seq, "Succeeded to finalize file");
|
info!(%self.tx_seq, "Succeeded to finalize file");
|
||||||
self.state = SyncState::Completed;
|
self.state = SyncState::Completed;
|
||||||
metrics::SERIAL_SYNC_FILE_COMPLETED.update_since(self.since.0);
|
metrics::SERIAL_SYNC_FILE_COMPLETED.update_since(self.since.0);
|
||||||
|
// notify neighbor nodes about new file completed to sync
|
||||||
self.ctx
|
self.ctx
|
||||||
.send(NetworkMessage::AnnounceLocalFile { tx_id: self.tx_id });
|
.send(NetworkMessage::AnnounceLocalFile { tx_id: self.tx_id });
|
||||||
}
|
}
|
||||||
|
@ -21,6 +21,9 @@ use std::{
|
|||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
// sync service config
|
// sync service config
|
||||||
|
/// Indicates whether to sync file from neighbor nodes only.
|
||||||
|
/// This is to avoid flooding file announcements in the whole network,
|
||||||
|
/// which leads to high latency or event timeout to sync files.
|
||||||
pub neighbors_only: bool,
|
pub neighbors_only: bool,
|
||||||
#[serde(deserialize_with = "deserialize_duration")]
|
#[serde(deserialize_with = "deserialize_duration")]
|
||||||
pub heartbeat_interval: Duration,
|
pub heartbeat_interval: Duration,
|
||||||
|
@ -774,7 +774,7 @@ impl SyncService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handle on NewFile gossip message received.
|
/// Handle on `NewFile` gossip message received.
|
||||||
async fn on_new_file_gossip(&mut self, from: PeerId, msg: NewFile) {
|
async fn on_new_file_gossip(&mut self, from: PeerId, msg: NewFile) {
|
||||||
debug!(%from, ?msg, "Received NewFile gossip");
|
debug!(%from, ?msg, "Received NewFile gossip");
|
||||||
|
|
||||||
@ -789,14 +789,14 @@ impl SyncService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handle on AnnounceFile RPC message received.
|
/// Handle on `AnnounceFile` RPC message received.
|
||||||
async fn on_announce_file(&mut self, peer_id: PeerId, announcement: FileAnnouncement) {
|
async fn on_announce_file(&mut self, from: PeerId, announcement: FileAnnouncement) {
|
||||||
if let Some(controller) = self.controllers.get_mut(&announcement.tx_id.seq) {
|
|
||||||
// Notify new peer announced if file already in sync
|
// Notify new peer announced if file already in sync
|
||||||
|
if let Some(controller) = self.controllers.get_mut(&announcement.tx_id.seq) {
|
||||||
if let Ok(shard_config) =
|
if let Ok(shard_config) =
|
||||||
ShardConfig::new(announcement.shard_id, announcement.num_shard)
|
ShardConfig::new(announcement.shard_id, announcement.num_shard)
|
||||||
{
|
{
|
||||||
controller.on_peer_announced(peer_id, shard_config);
|
controller.on_peer_announced(from, shard_config);
|
||||||
controller.transition();
|
controller.transition();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user