Mark peer connected if FileAnnouncement RPC message received

This commit is contained in:
boqiu 2024-10-24 19:29:08 +08:00
parent 25524bd9d2
commit cebc3c8247
4 changed files with 81 additions and 28 deletions

View File

@ -182,6 +182,8 @@ pub struct DataByHashRequest {
#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
pub struct FileAnnouncement {
pub tx_id: TxID,
pub num_shard: usize,
pub shard_id: usize,
}
/// Request a chunk array from a peer.

View File

@ -222,11 +222,23 @@ impl Libp2pEventHandler {
metrics::LIBP2P_HANDLE_GET_CHUNKS_REQUEST.mark(1);
}
Request::AnnounceFile(announcement) => {
self.send_to_sync(SyncMessage::AnnounceFile {
peer_id,
request_id,
announcement,
});
match ShardConfig::new(announcement.shard_id, announcement.num_shard) {
Ok(v) => {
self.file_location_cache.insert_peer_config(peer_id, v);
self.send_to_sync(SyncMessage::AnnounceFile {
peer_id,
request_id,
announcement,
});
}
Err(_) => self.send_to_network(NetworkMessage::ReportPeer {
peer_id,
action: PeerAction::Fatal,
source: ReportSource::RPC,
msg: "Invalid shard config in FileAnnouncement",
}),
}
}
Request::DataByHash(_) => {
// ignore
@ -411,6 +423,9 @@ impl Libp2pEventHandler {
self.send_to_sync(SyncMessage::NewFile { from, msg });
}
self.file_location_cache
.insert_peer_config(from, announced_shard_config);
MessageAcceptance::Ignore
}
@ -604,7 +619,11 @@ impl Libp2pEventHandler {
if msg.neighbors_only {
self.send_to_network(NetworkMessage::SendRequest {
peer_id,
request: Request::AnnounceFile(FileAnnouncement { tx_id }),
request: Request::AnnounceFile(FileAnnouncement {
tx_id,
num_shard: my_shard_config.num_shard,
shard_id: my_shard_config.shard_id,
}),
request_id: RequestId::Router(Instant::now()),
});
} else if self.publish_file(tx_id).await.is_some() {
@ -617,7 +636,7 @@ impl Libp2pEventHandler {
if msg.neighbors_only {
// do not forward to other peers anymore
return MessageAcceptance::Ignore
return MessageAcceptance::Ignore;
}
// try from cache

View File

@ -14,12 +14,13 @@ use shared_types::{timestamp_now, ChunkArrayWithProof, TxID, CHUNK_SIZE};
use ssz::Encode;
use std::{sync::Arc, time::Instant};
use storage::log_store::log_manager::{sector_to_segment, segment_to_sector, PORA_CHUNK_SIZE};
use storage_async::Store;
use storage_async::{ShardConfig, Store};
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum FailureReason {
DBError(String),
TxReverted(TxID),
TimeoutFindFile,
}
#[derive(Clone, Debug, PartialEq, Eq)]
@ -88,6 +89,9 @@ pub struct SerialSyncController {
/// Cache for storing and serving gossip messages.
file_location_cache: Arc<FileLocationCache>,
/// Whether to find files from neighbors only.
neighbors_only: bool,
}
impl SerialSyncController {
@ -114,6 +118,7 @@ impl SerialSyncController {
ctx,
store,
file_location_cache,
neighbors_only: true,
}
}
@ -159,11 +164,14 @@ impl SerialSyncController {
/// Find more peers to sync chunks. Return whether `FindFile` pubsub message published,
fn try_find_peers(&mut self) {
let (published, num_new_peers) = if self.goal.is_all_chunks() {
self.publish_find_file()
} else {
let (published, num_new_peers) = if !self.goal.is_all_chunks() {
self.publish_find_chunks();
(true, 0)
} else if self.neighbors_only {
self.do_publish_find_file();
(true, 0)
} else {
self.publish_find_file()
};
info!(%self.tx_seq, %published, %num_new_peers, "Finding peers");
@ -199,16 +207,21 @@ impl SerialSyncController {
return (false, num_new_peers);
}
self.do_publish_find_file();
(true, num_new_peers)
}
fn do_publish_find_file(&self) {
let shard_config = self.store.get_store().get_shard_config();
self.ctx.publish(PubsubMessage::FindFile(FindFile {
tx_id: self.tx_id,
num_shard: shard_config.num_shard,
shard_id: shard_config.shard_id,
neighbors_only: false,
neighbors_only: self.neighbors_only,
timestamp: timestamp_now(),
}));
(true, num_new_peers)
}
fn publish_find_chunks(&self) {
@ -341,6 +354,14 @@ impl SerialSyncController {
}
}
/// Triggered when any peer (TCP connected) announced file via RPC message.
pub fn on_peer_announced(&mut self, peer_id: PeerId, shard_config: ShardConfig) {
self.peers
.add_new_peer_with_config(peer_id, Multiaddr::empty(), shard_config);
self.peers
.update_state_force(&peer_id, PeerState::Connected);
}
pub fn on_dail_failed(&mut self, peer_id: PeerId, err: &DialError) {
match err {
DialError::ConnectionLimit(_) => {
@ -643,12 +664,19 @@ impl SerialSyncController {
{
self.state = SyncState::FoundPeers;
} else {
// storage node may not have the specific file when `FindFile`
// gossip message received. In this case, just broadcast the
// `FindFile` message again.
// FindFile timeout
if since.elapsed() >= self.config.peer_find_timeout {
debug!(%self.tx_seq, "Finding peer timeout and try to find peers again");
self.try_find_peers();
if self.neighbors_only {
self.state = SyncState::Failed {
reason: FailureReason::TimeoutFindFile,
};
} else {
// storage node may not have the specific file when `FindFile`
// gossip message received. In this case, just broadcast the
// `FindFile` message again.
debug!(%self.tx_seq, "Finding peer timeout and try to find peers again");
self.try_find_peers();
}
}
completed = true;

View File

@ -779,22 +779,26 @@ impl SyncService {
debug!(%from, ?msg, "Received NewFile gossip");
if let Some(controller) = self.controllers.get_mut(&msg.tx_id.seq) {
// Notify new peer found if file already in sync
// TODO qbit: do not require remote address since already TCP connected
// controller.on_peer_found(from, addr);
controller.transition();
// Notify new peer announced if file already in sync
if let Ok(shard_config) = ShardConfig::new(msg.shard_id, msg.num_shard) {
controller.on_peer_announced(from, shard_config);
controller.transition();
}
} else if let Some(manager) = &self.auto_sync_manager {
let _ = manager.new_file_send.send(msg.tx_id.seq);
}
}
/// Handle on AnnounceFile RPC message received.
async fn on_announce_file(&mut self, _peer_id: PeerId, announcement: FileAnnouncement) {
async fn on_announce_file(&mut self, peer_id: PeerId, announcement: FileAnnouncement) {
if let Some(controller) = self.controllers.get_mut(&announcement.tx_id.seq) {
// Notify new peer found if file already in sync
// TODO qbit: do not require remote address since already TCP connected
// controller.on_peer_found(from, addr);
controller.transition();
// Notify new peer announced if file already in sync
if let Ok(shard_config) =
ShardConfig::new(announcement.shard_id, announcement.num_shard)
{
controller.on_peer_announced(peer_id, shard_config);
controller.transition();
}
}
}