mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-04-04 15:35:18 +00:00
do not propagate FindFile to whole network
This commit is contained in:
parent
35c4760724
commit
25524bd9d2
@ -127,6 +127,7 @@ pub struct FindFile {
|
|||||||
pub tx_id: TxID,
|
pub tx_id: TxID,
|
||||||
pub num_shard: usize,
|
pub num_shard: usize,
|
||||||
pub shard_id: usize,
|
pub shard_id: usize,
|
||||||
|
pub neighbors_only: bool,
|
||||||
pub timestamp: u32,
|
pub timestamp: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5,6 +5,7 @@ use std::{ops::Neg, sync::Arc};
|
|||||||
use chunk_pool::ChunkPoolMessage;
|
use chunk_pool::ChunkPoolMessage;
|
||||||
use file_location_cache::FileLocationCache;
|
use file_location_cache::FileLocationCache;
|
||||||
use network::multiaddr::Protocol;
|
use network::multiaddr::Protocol;
|
||||||
|
use network::rpc::methods::FileAnnouncement;
|
||||||
use network::types::{AnnounceShardConfig, NewFile, SignedAnnounceShardConfig};
|
use network::types::{AnnounceShardConfig, NewFile, SignedAnnounceShardConfig};
|
||||||
use network::{
|
use network::{
|
||||||
rpc::StatusMessage,
|
rpc::StatusMessage,
|
||||||
@ -30,7 +31,7 @@ use crate::Config;
|
|||||||
|
|
||||||
lazy_static::lazy_static! {
|
lazy_static::lazy_static! {
|
||||||
pub static ref NEW_FILE_TIMEOUT: chrono::Duration = chrono::Duration::seconds(30);
|
pub static ref NEW_FILE_TIMEOUT: chrono::Duration = chrono::Duration::seconds(30);
|
||||||
pub static ref FIND_FILE_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5);
|
pub static ref FIND_FILE_TIMEOUT: chrono::Duration = chrono::Duration::seconds(30);
|
||||||
pub static ref ANNOUNCE_FILE_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5);
|
pub static ref ANNOUNCE_FILE_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5);
|
||||||
pub static ref ANNOUNCE_SHARD_CONFIG_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5);
|
pub static ref ANNOUNCE_SHARD_CONFIG_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5);
|
||||||
pub static ref TOLERABLE_DRIFT: chrono::Duration = chrono::Duration::seconds(10);
|
pub static ref TOLERABLE_DRIFT: chrono::Duration = chrono::Duration::seconds(10);
|
||||||
@ -326,11 +327,11 @@ impl Libp2pEventHandler {
|
|||||||
PubsubMessage::ExampleMessage(_) => MessageAcceptance::Ignore,
|
PubsubMessage::ExampleMessage(_) => MessageAcceptance::Ignore,
|
||||||
PubsubMessage::NewFile(msg) => {
|
PubsubMessage::NewFile(msg) => {
|
||||||
metrics::LIBP2P_HANDLE_PUBSUB_NEW_FILE.mark(1);
|
metrics::LIBP2P_HANDLE_PUBSUB_NEW_FILE.mark(1);
|
||||||
self.on_new_file(propagation_source, msg).await
|
self.on_new_file(source, msg).await
|
||||||
}
|
}
|
||||||
PubsubMessage::FindFile(msg) => {
|
PubsubMessage::FindFile(msg) => {
|
||||||
metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE.mark(1);
|
metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE.mark(1);
|
||||||
self.on_find_file(msg).await
|
self.on_find_file(source, msg).await
|
||||||
}
|
}
|
||||||
PubsubMessage::FindChunks(msg) => {
|
PubsubMessage::FindChunks(msg) => {
|
||||||
metrics::LIBP2P_HANDLE_PUBSUB_FIND_CHUNKS.mark(1);
|
metrics::LIBP2P_HANDLE_PUBSUB_FIND_CHUNKS.mark(1);
|
||||||
@ -550,7 +551,7 @@ impl Libp2pEventHandler {
|
|||||||
Some(PubsubMessage::AnnounceShardConfig(signed))
|
Some(PubsubMessage::AnnounceShardConfig(signed))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn on_find_file(&self, msg: FindFile) -> MessageAcceptance {
|
async fn on_find_file(&self, peer_id: PeerId, msg: FindFile) -> MessageAcceptance {
|
||||||
let FindFile {
|
let FindFile {
|
||||||
tx_id, timestamp, ..
|
tx_id, timestamp, ..
|
||||||
} = msg;
|
} = msg;
|
||||||
@ -563,6 +564,14 @@ impl Libp2pEventHandler {
|
|||||||
if d < TOLERABLE_DRIFT.neg() || d > *FIND_FILE_TIMEOUT {
|
if d < TOLERABLE_DRIFT.neg() || d > *FIND_FILE_TIMEOUT {
|
||||||
debug!(%timestamp, ?d, "Invalid timestamp, ignoring FindFile message");
|
debug!(%timestamp, ?d, "Invalid timestamp, ignoring FindFile message");
|
||||||
metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_TIMEOUT.mark(1);
|
metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_TIMEOUT.mark(1);
|
||||||
|
if msg.neighbors_only {
|
||||||
|
self.send_to_network(NetworkMessage::ReportPeer {
|
||||||
|
peer_id,
|
||||||
|
action: PeerAction::LowToleranceError,
|
||||||
|
source: ReportSource::Gossipsub,
|
||||||
|
msg: "Received out of date FindFile message",
|
||||||
|
});
|
||||||
|
}
|
||||||
return MessageAcceptance::Ignore;
|
return MessageAcceptance::Ignore;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -572,20 +581,33 @@ impl Libp2pEventHandler {
|
|||||||
Err(_) => return MessageAcceptance::Reject,
|
Err(_) => return MessageAcceptance::Reject,
|
||||||
};
|
};
|
||||||
|
|
||||||
// propagate FindFile query to other nodes if shard mismatch
|
// ignore if shard config mismatch
|
||||||
let my_shard_config = self.store.get_store().get_shard_config();
|
let my_shard_config = self.store.get_store().get_shard_config();
|
||||||
if !my_shard_config.intersect(&announced_shard_config) {
|
if !my_shard_config.intersect(&announced_shard_config) {
|
||||||
metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_FORWARD.mark(1);
|
return if msg.neighbors_only {
|
||||||
return MessageAcceptance::Accept;
|
MessageAcceptance::Ignore
|
||||||
|
} else {
|
||||||
|
MessageAcceptance::Accept
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// update peer shard config in cache
|
||||||
|
self.file_location_cache
|
||||||
|
.insert_peer_config(peer_id, announced_shard_config);
|
||||||
|
|
||||||
// check if we have it
|
// check if we have it
|
||||||
if matches!(self.store.check_tx_completed(tx_id.seq).await, Ok(true)) {
|
if matches!(self.store.check_tx_completed(tx_id.seq).await, Ok(true)) {
|
||||||
if let Ok(Some(tx)) = self.store.get_tx_by_seq_number(tx_id.seq).await {
|
if let Ok(Some(tx)) = self.store.get_tx_by_seq_number(tx_id.seq).await {
|
||||||
if tx.id() == tx_id {
|
if tx.id() == tx_id {
|
||||||
trace!(?tx_id, "Found file locally, responding to FindFile query");
|
trace!(?tx_id, "Found file locally, responding to FindFile query");
|
||||||
|
|
||||||
if self.publish_file(tx_id).await.is_some() {
|
if msg.neighbors_only {
|
||||||
|
self.send_to_network(NetworkMessage::SendRequest {
|
||||||
|
peer_id,
|
||||||
|
request: Request::AnnounceFile(FileAnnouncement { tx_id }),
|
||||||
|
request_id: RequestId::Router(Instant::now()),
|
||||||
|
});
|
||||||
|
} else if self.publish_file(tx_id).await.is_some() {
|
||||||
metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_STORE.mark(1);
|
metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_STORE.mark(1);
|
||||||
return MessageAcceptance::Ignore;
|
return MessageAcceptance::Ignore;
|
||||||
}
|
}
|
||||||
@ -593,6 +615,11 @@ impl Libp2pEventHandler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if msg.neighbors_only {
|
||||||
|
// do not forward to other peers anymore
|
||||||
|
return MessageAcceptance::Ignore
|
||||||
|
}
|
||||||
|
|
||||||
// try from cache
|
// try from cache
|
||||||
if let Some(mut msg) = self.file_location_cache.get_one(tx_id) {
|
if let Some(mut msg) = self.file_location_cache.get_one(tx_id) {
|
||||||
trace!(?tx_id, "Found file in cache, responding to FindFile query");
|
trace!(?tx_id, "Found file in cache, responding to FindFile query");
|
||||||
@ -914,7 +941,7 @@ impl Libp2pEventHandler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn publish_file(&self, tx_id: TxID) -> Option<bool> {
|
async fn publish_file(&self, tx_id: TxID) -> Option<bool> {
|
||||||
match self.file_batcher.write().await.add(tx_id) {
|
match self.file_batcher.write().await.add(tx_id) {
|
||||||
Some(batch) => {
|
Some(batch) => {
|
||||||
let announcement = self.construct_announce_file_message(batch).await?;
|
let announcement = self.construct_announce_file_message(batch).await?;
|
||||||
@ -1287,6 +1314,7 @@ mod tests {
|
|||||||
tx_id,
|
tx_id,
|
||||||
num_shard: 1,
|
num_shard: 1,
|
||||||
shard_id: 0,
|
shard_id: 0,
|
||||||
|
neighbors_only: false,
|
||||||
timestamp,
|
timestamp,
|
||||||
});
|
});
|
||||||
handler.on_pubsub_message(alice, bob, &id, message).await
|
handler.on_pubsub_message(alice, bob, &id, message).await
|
||||||
|
@ -204,6 +204,7 @@ impl SerialSyncController {
|
|||||||
tx_id: self.tx_id,
|
tx_id: self.tx_id,
|
||||||
num_shard: shard_config.num_shard,
|
num_shard: shard_config.num_shard,
|
||||||
shard_id: shard_config.shard_id,
|
shard_id: shard_config.shard_id,
|
||||||
|
neighbors_only: false,
|
||||||
timestamp: timestamp_now(),
|
timestamp: timestamp_now(),
|
||||||
}));
|
}));
|
||||||
|
|
||||||
|
@ -601,6 +601,7 @@ impl SyncService {
|
|||||||
tx_id: tx.id(),
|
tx_id: tx.id(),
|
||||||
num_shard: shard_config.num_shard,
|
num_shard: shard_config.num_shard,
|
||||||
shard_id: shard_config.shard_id,
|
shard_id: shard_config.shard_id,
|
||||||
|
neighbors_only: false,
|
||||||
timestamp: timestamp_now(),
|
timestamp: timestamp_now(),
|
||||||
}));
|
}));
|
||||||
Ok(())
|
Ok(())
|
||||||
|
Loading…
Reference in New Issue
Block a user