mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-04-04 15:35:18 +00:00
Add AnnounceFile RPC message in network layer
This commit is contained in:
parent
09b34fbf07
commit
35c4760724
@ -6,6 +6,7 @@ use crate::peer_manager::{
|
||||
ConnectionDirection, PeerManager, PeerManagerEvent,
|
||||
};
|
||||
use crate::rpc::methods::DataByHashRequest;
|
||||
use crate::rpc::methods::FileAnnouncement;
|
||||
use crate::rpc::methods::GetChunksRequest;
|
||||
use crate::rpc::*;
|
||||
use crate::service::Context as ServiceContext;
|
||||
@ -546,6 +547,9 @@ impl<AppReqId: ReqId> Behaviour<AppReqId> {
|
||||
Request::DataByHash { .. } => {
|
||||
metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["data_by_hash"])
|
||||
}
|
||||
Request::AnnounceFile { .. } => {
|
||||
metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["announce_file"])
|
||||
}
|
||||
Request::GetChunks { .. } => {
|
||||
metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["get_chunks"])
|
||||
}
|
||||
@ -758,6 +762,9 @@ where
|
||||
InboundRequest::DataByHash(req) => {
|
||||
self.propagate_request(peer_request_id, peer_id, Request::DataByHash(req))
|
||||
}
|
||||
InboundRequest::AnnounceFile(req) => {
|
||||
self.propagate_request(peer_request_id, peer_id, Request::AnnounceFile(req))
|
||||
}
|
||||
InboundRequest::GetChunks(req) => {
|
||||
self.propagate_request(peer_request_id, peer_id, Request::GetChunks(req))
|
||||
}
|
||||
@ -972,6 +979,8 @@ pub enum Request {
|
||||
Status(StatusMessage),
|
||||
/// A data by hash request.
|
||||
DataByHash(DataByHashRequest),
|
||||
/// An AnnounceFile message.
|
||||
AnnounceFile(FileAnnouncement),
|
||||
/// A GetChunks request.
|
||||
GetChunks(GetChunksRequest),
|
||||
}
|
||||
@ -981,6 +990,7 @@ impl std::convert::From<Request> for OutboundRequest {
|
||||
match req {
|
||||
Request::Status(s) => OutboundRequest::Status(s),
|
||||
Request::DataByHash(r) => OutboundRequest::DataByHash(r),
|
||||
Request::AnnounceFile(r) => OutboundRequest::AnnounceFile(r),
|
||||
Request::GetChunks(r) => OutboundRequest::GetChunks(r),
|
||||
}
|
||||
}
|
||||
|
@ -460,6 +460,7 @@ impl PeerManager {
|
||||
Protocol::Goodbye => PeerAction::LowToleranceError,
|
||||
Protocol::Status => PeerAction::LowToleranceError,
|
||||
Protocol::DataByHash => PeerAction::MidToleranceError,
|
||||
Protocol::AnnounceFile => PeerAction::MidToleranceError,
|
||||
Protocol::GetChunks => PeerAction::MidToleranceError,
|
||||
},
|
||||
},
|
||||
@ -474,6 +475,7 @@ impl PeerManager {
|
||||
Protocol::Goodbye => return,
|
||||
Protocol::Status => PeerAction::LowToleranceError,
|
||||
Protocol::DataByHash => return,
|
||||
Protocol::AnnounceFile => return,
|
||||
Protocol::GetChunks => return,
|
||||
}
|
||||
}
|
||||
@ -488,6 +490,7 @@ impl PeerManager {
|
||||
Protocol::Goodbye => return,
|
||||
Protocol::Status => return,
|
||||
Protocol::DataByHash => PeerAction::MidToleranceError,
|
||||
Protocol::AnnounceFile => PeerAction::MidToleranceError,
|
||||
Protocol::GetChunks => PeerAction::MidToleranceError,
|
||||
},
|
||||
},
|
||||
|
@ -159,6 +159,7 @@ impl Encoder<OutboundRequest> for SSZSnappyOutboundCodec {
|
||||
OutboundRequest::Goodbye(req) => req.as_ssz_bytes(),
|
||||
OutboundRequest::Ping(req) => req.as_ssz_bytes(),
|
||||
OutboundRequest::DataByHash(req) => req.hashes.as_ssz_bytes(),
|
||||
OutboundRequest::AnnounceFile(req) => req.as_ssz_bytes(),
|
||||
OutboundRequest::GetChunks(req) => req.as_ssz_bytes(),
|
||||
};
|
||||
// SSZ encoded bytes should be within `max_packet_size`
|
||||
@ -346,6 +347,9 @@ fn handle_v1_request(
|
||||
Protocol::DataByHash => Ok(Some(InboundRequest::DataByHash(DataByHashRequest {
|
||||
hashes: VariableList::from_ssz_bytes(decoded_buffer)?,
|
||||
}))),
|
||||
Protocol::AnnounceFile => Ok(Some(InboundRequest::AnnounceFile(
|
||||
FileAnnouncement::from_ssz_bytes(decoded_buffer)?,
|
||||
))),
|
||||
Protocol::GetChunks => Ok(Some(InboundRequest::GetChunks(
|
||||
GetChunksRequest::from_ssz_bytes(decoded_buffer)?,
|
||||
))),
|
||||
@ -373,6 +377,10 @@ fn handle_v1_response(
|
||||
Protocol::DataByHash => Ok(Some(RPCResponse::DataByHash(Box::new(
|
||||
ZgsData::from_ssz_bytes(decoded_buffer)?,
|
||||
)))),
|
||||
// This case should be unreachable as `AnnounceFile` has no response.
|
||||
Protocol::AnnounceFile => Err(RPCError::InvalidData(
|
||||
"AnnounceFile RPC message has no valid response".to_string(),
|
||||
)),
|
||||
Protocol::GetChunks => Ok(Some(RPCResponse::Chunks(
|
||||
ChunkArrayWithProof::from_ssz_bytes(decoded_buffer)?,
|
||||
))),
|
||||
|
@ -178,6 +178,12 @@ pub struct DataByHashRequest {
|
||||
pub hashes: VariableList<Hash256, MaxRequestBlocks>,
|
||||
}
|
||||
|
||||
// The message of `AnnounceFile` RPC message.
|
||||
#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
|
||||
pub struct FileAnnouncement {
|
||||
pub tx_id: TxID,
|
||||
}
|
||||
|
||||
/// Request a chunk array from a peer.
|
||||
#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
|
||||
pub struct GetChunksRequest {
|
||||
|
@ -118,6 +118,7 @@ impl<Id: ReqId> RPC<Id> {
|
||||
.n_every(Protocol::Status, 5, Duration::from_secs(15))
|
||||
.one_every(Protocol::Goodbye, Duration::from_secs(10))
|
||||
.n_every(Protocol::DataByHash, 128, Duration::from_secs(10))
|
||||
.n_every(Protocol::AnnounceFile, 256, Duration::from_secs(10))
|
||||
.n_every(Protocol::GetChunks, 4096, Duration::from_secs(10))
|
||||
.build()
|
||||
.expect("Configuration parameters are valid");
|
||||
|
@ -34,6 +34,7 @@ pub enum OutboundRequest {
|
||||
Goodbye(GoodbyeReason),
|
||||
Ping(Ping),
|
||||
DataByHash(DataByHashRequest),
|
||||
AnnounceFile(FileAnnouncement),
|
||||
GetChunks(GetChunksRequest),
|
||||
}
|
||||
|
||||
@ -72,6 +73,11 @@ impl OutboundRequest {
|
||||
Version::V1,
|
||||
Encoding::SSZSnappy,
|
||||
)],
|
||||
OutboundRequest::AnnounceFile(_) => vec![ProtocolId::new(
|
||||
Protocol::AnnounceFile,
|
||||
Version::V1,
|
||||
Encoding::SSZSnappy,
|
||||
)],
|
||||
OutboundRequest::GetChunks(_) => vec![ProtocolId::new(
|
||||
Protocol::GetChunks,
|
||||
Version::V1,
|
||||
@ -89,6 +95,7 @@ impl OutboundRequest {
|
||||
OutboundRequest::Goodbye(_) => 0,
|
||||
OutboundRequest::Ping(_) => 1,
|
||||
OutboundRequest::DataByHash(req) => req.hashes.len() as u64,
|
||||
OutboundRequest::AnnounceFile(_) => 0,
|
||||
OutboundRequest::GetChunks(_) => 1,
|
||||
}
|
||||
}
|
||||
@ -100,6 +107,7 @@ impl OutboundRequest {
|
||||
OutboundRequest::Goodbye(_) => Protocol::Goodbye,
|
||||
OutboundRequest::Ping(_) => Protocol::Ping,
|
||||
OutboundRequest::DataByHash(_) => Protocol::DataByHash,
|
||||
OutboundRequest::AnnounceFile(_) => Protocol::AnnounceFile,
|
||||
OutboundRequest::GetChunks(_) => Protocol::GetChunks,
|
||||
}
|
||||
}
|
||||
@ -114,6 +122,7 @@ impl OutboundRequest {
|
||||
OutboundRequest::Status(_) => unreachable!(),
|
||||
OutboundRequest::Goodbye(_) => unreachable!(),
|
||||
OutboundRequest::Ping(_) => unreachable!(),
|
||||
OutboundRequest::AnnounceFile(_) => unreachable!(),
|
||||
OutboundRequest::GetChunks(_) => unreachable!(),
|
||||
}
|
||||
}
|
||||
@ -170,6 +179,9 @@ impl std::fmt::Display for OutboundRequest {
|
||||
OutboundRequest::DataByHash(req) => {
|
||||
write!(f, "Data by hash: {:?}", req)
|
||||
}
|
||||
OutboundRequest::AnnounceFile(req) => {
|
||||
write!(f, "AnnounceFile: {:?}", req)
|
||||
}
|
||||
OutboundRequest::GetChunks(req) => {
|
||||
write!(f, "GetChunks: {:?}", req)
|
||||
}
|
||||
|
@ -91,6 +91,8 @@ pub enum Protocol {
|
||||
/// TODO
|
||||
DataByHash,
|
||||
|
||||
/// The file announce protocol.
|
||||
AnnounceFile,
|
||||
/// The Chunk sync protocol.
|
||||
GetChunks,
|
||||
}
|
||||
@ -115,6 +117,7 @@ impl std::fmt::Display for Protocol {
|
||||
Protocol::Goodbye => "goodbye",
|
||||
Protocol::Ping => "ping",
|
||||
Protocol::DataByHash => "data_by_hash",
|
||||
Protocol::AnnounceFile => "announce_file",
|
||||
Protocol::GetChunks => "get_chunks",
|
||||
};
|
||||
f.write_str(repr)
|
||||
@ -155,6 +158,7 @@ impl UpgradeInfo for RPCProtocol {
|
||||
ProtocolId::new(Protocol::Goodbye, Version::V1, Encoding::SSZSnappy),
|
||||
ProtocolId::new(Protocol::Ping, Version::V1, Encoding::SSZSnappy),
|
||||
ProtocolId::new(Protocol::DataByHash, Version::V1, Encoding::SSZSnappy),
|
||||
ProtocolId::new(Protocol::AnnounceFile, Version::V1, Encoding::SSZSnappy),
|
||||
ProtocolId::new(Protocol::GetChunks, Version::V1, Encoding::SSZSnappy),
|
||||
]
|
||||
}
|
||||
@ -216,6 +220,10 @@ impl ProtocolId {
|
||||
// TODO
|
||||
RpcLimits::new(1, *DATA_BY_HASH_REQUEST_MAX)
|
||||
}
|
||||
Protocol::AnnounceFile => RpcLimits::new(
|
||||
<FileAnnouncement as Encode>::ssz_fixed_len(),
|
||||
<FileAnnouncement as Encode>::ssz_fixed_len(),
|
||||
),
|
||||
Protocol::GetChunks => RpcLimits::new(
|
||||
<GetChunksRequest as Encode>::ssz_fixed_len(),
|
||||
<GetChunksRequest as Encode>::ssz_fixed_len(),
|
||||
@ -243,6 +251,7 @@ impl ProtocolId {
|
||||
<ZgsData as Encode>::ssz_fixed_len(),
|
||||
),
|
||||
|
||||
Protocol::AnnounceFile => RpcLimits::new(0, 0), // AnnounceFile request has no response
|
||||
Protocol::GetChunks => RpcLimits::new(*CHUNKS_RESPONSE_MIN, *CHUNKS_RESPONSE_MAX),
|
||||
}
|
||||
}
|
||||
@ -325,6 +334,7 @@ pub enum InboundRequest {
|
||||
Goodbye(GoodbyeReason),
|
||||
Ping(Ping),
|
||||
DataByHash(DataByHashRequest),
|
||||
AnnounceFile(FileAnnouncement),
|
||||
GetChunks(GetChunksRequest),
|
||||
}
|
||||
|
||||
@ -363,6 +373,11 @@ impl InboundRequest {
|
||||
Version::V1,
|
||||
Encoding::SSZSnappy,
|
||||
)],
|
||||
InboundRequest::AnnounceFile(_) => vec![ProtocolId::new(
|
||||
Protocol::AnnounceFile,
|
||||
Version::V1,
|
||||
Encoding::SSZSnappy,
|
||||
)],
|
||||
InboundRequest::GetChunks(_) => vec![ProtocolId::new(
|
||||
Protocol::GetChunks,
|
||||
Version::V1,
|
||||
@ -380,6 +395,7 @@ impl InboundRequest {
|
||||
InboundRequest::Goodbye(_) => 0,
|
||||
InboundRequest::DataByHash(req) => req.hashes.len() as u64,
|
||||
InboundRequest::Ping(_) => 1,
|
||||
InboundRequest::AnnounceFile(_) => 0,
|
||||
InboundRequest::GetChunks(_) => 1,
|
||||
}
|
||||
}
|
||||
@ -391,6 +407,7 @@ impl InboundRequest {
|
||||
InboundRequest::Goodbye(_) => Protocol::Goodbye,
|
||||
InboundRequest::Ping(_) => Protocol::Ping,
|
||||
InboundRequest::DataByHash(_) => Protocol::DataByHash,
|
||||
InboundRequest::AnnounceFile(_) => Protocol::AnnounceFile,
|
||||
InboundRequest::GetChunks(_) => Protocol::GetChunks,
|
||||
}
|
||||
}
|
||||
@ -405,6 +422,7 @@ impl InboundRequest {
|
||||
InboundRequest::Status(_) => unreachable!(),
|
||||
InboundRequest::Goodbye(_) => unreachable!(),
|
||||
InboundRequest::Ping(_) => unreachable!(),
|
||||
InboundRequest::AnnounceFile(_) => unreachable!(),
|
||||
InboundRequest::GetChunks(_) => unreachable!(),
|
||||
}
|
||||
}
|
||||
@ -523,6 +541,9 @@ impl std::fmt::Display for InboundRequest {
|
||||
InboundRequest::DataByHash(req) => {
|
||||
write!(f, "Data by hash: {:?}", req)
|
||||
}
|
||||
InboundRequest::AnnounceFile(req) => {
|
||||
write!(f, "Announce File: {:?}", req)
|
||||
}
|
||||
InboundRequest::GetChunks(req) => {
|
||||
write!(f, "Get Chunks: {:?}", req)
|
||||
}
|
||||
|
@ -68,6 +68,8 @@ pub struct RPCRateLimiter {
|
||||
status_rl: Limiter<PeerId>,
|
||||
/// DataByHash rate limiter.
|
||||
data_by_hash_rl: Limiter<PeerId>,
|
||||
/// AnnounceFile rate limiter.
|
||||
announce_file_rl: Limiter<PeerId>,
|
||||
/// GetChunks rate limiter.
|
||||
get_chunks_rl: Limiter<PeerId>,
|
||||
}
|
||||
@ -91,6 +93,8 @@ pub struct RPCRateLimiterBuilder {
|
||||
status_quota: Option<Quota>,
|
||||
/// Quota for the DataByHash protocol.
|
||||
data_by_hash_quota: Option<Quota>,
|
||||
/// Quota for the AnnounceFile protocol.
|
||||
announce_file_quota: Option<Quota>,
|
||||
/// Quota for the GetChunks protocol.
|
||||
get_chunks_quota: Option<Quota>,
|
||||
}
|
||||
@ -109,6 +113,7 @@ impl RPCRateLimiterBuilder {
|
||||
Protocol::Status => self.status_quota = q,
|
||||
Protocol::Goodbye => self.goodbye_quota = q,
|
||||
Protocol::DataByHash => self.data_by_hash_quota = q,
|
||||
Protocol::AnnounceFile => self.announce_file_quota = q,
|
||||
Protocol::GetChunks => self.get_chunks_quota = q,
|
||||
}
|
||||
self
|
||||
@ -145,6 +150,9 @@ impl RPCRateLimiterBuilder {
|
||||
let data_by_hash_quota = self
|
||||
.data_by_hash_quota
|
||||
.ok_or("DataByHash quota not specified")?;
|
||||
let announce_file_quota = self
|
||||
.announce_file_quota
|
||||
.ok_or("AnnounceFile quota not specified")?;
|
||||
let get_chunks_quota = self
|
||||
.get_chunks_quota
|
||||
.ok_or("GetChunks quota not specified")?;
|
||||
@ -154,6 +162,7 @@ impl RPCRateLimiterBuilder {
|
||||
let status_rl = Limiter::from_quota(status_quota)?;
|
||||
let goodbye_rl = Limiter::from_quota(goodbye_quota)?;
|
||||
let data_by_hash_rl = Limiter::from_quota(data_by_hash_quota)?;
|
||||
let announce_file_rl = Limiter::from_quota(announce_file_quota)?;
|
||||
let get_chunks_rl = Limiter::from_quota(get_chunks_quota)?;
|
||||
|
||||
// check for peers to prune every 30 seconds, starting in 30 seconds
|
||||
@ -166,6 +175,7 @@ impl RPCRateLimiterBuilder {
|
||||
status_rl,
|
||||
goodbye_rl,
|
||||
data_by_hash_rl,
|
||||
announce_file_rl,
|
||||
get_chunks_rl,
|
||||
init_time: Instant::now(),
|
||||
})
|
||||
@ -210,6 +220,7 @@ impl RPCRateLimiter {
|
||||
Protocol::Status => &mut self.status_rl,
|
||||
Protocol::Goodbye => &mut self.goodbye_rl,
|
||||
Protocol::DataByHash => &mut self.data_by_hash_rl,
|
||||
Protocol::AnnounceFile => &mut self.announce_file_rl,
|
||||
Protocol::GetChunks => &mut self.get_chunks_rl,
|
||||
};
|
||||
check(limiter)
|
||||
|
@ -220,6 +220,13 @@ impl Libp2pEventHandler {
|
||||
});
|
||||
metrics::LIBP2P_HANDLE_GET_CHUNKS_REQUEST.mark(1);
|
||||
}
|
||||
Request::AnnounceFile(announcement) => {
|
||||
self.send_to_sync(SyncMessage::AnnounceFile {
|
||||
peer_id,
|
||||
request_id,
|
||||
announcement,
|
||||
});
|
||||
}
|
||||
Request::DataByHash(_) => {
|
||||
// ignore
|
||||
}
|
||||
|
@ -8,6 +8,7 @@ use anyhow::{anyhow, bail, Result};
|
||||
use file_location_cache::FileLocationCache;
|
||||
use libp2p::swarm::DialError;
|
||||
use log_entry_sync::LogSyncEvent;
|
||||
use network::rpc::methods::FileAnnouncement;
|
||||
use network::types::{AnnounceChunks, FindFile, NewFile};
|
||||
use network::PubsubMessage;
|
||||
use network::{
|
||||
@ -74,6 +75,11 @@ pub enum SyncMessage {
|
||||
from: PeerId,
|
||||
msg: NewFile,
|
||||
},
|
||||
AnnounceFile {
|
||||
peer_id: PeerId,
|
||||
request_id: PeerRequestId,
|
||||
announcement: FileAnnouncement,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@ -270,6 +276,11 @@ impl SyncService {
|
||||
// FIXME: Check if controllers need to be reset?
|
||||
}
|
||||
SyncMessage::NewFile { from, msg } => self.on_new_file_gossip(from, msg).await,
|
||||
SyncMessage::AnnounceFile {
|
||||
peer_id,
|
||||
announcement,
|
||||
..
|
||||
} => self.on_announce_file(peer_id, announcement).await,
|
||||
}
|
||||
}
|
||||
|
||||
@ -762,6 +773,7 @@ impl SyncService {
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle on NewFile gossip message received.
|
||||
async fn on_new_file_gossip(&mut self, from: PeerId, msg: NewFile) {
|
||||
debug!(%from, ?msg, "Received NewFile gossip");
|
||||
|
||||
@ -775,6 +787,16 @@ impl SyncService {
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle on AnnounceFile RPC message received.
|
||||
async fn on_announce_file(&mut self, _peer_id: PeerId, announcement: FileAnnouncement) {
|
||||
if let Some(controller) = self.controllers.get_mut(&announcement.tx_id.seq) {
|
||||
// Notify new peer found if file already in sync
|
||||
// TODO qbit: do not require remote address since already TCP connected
|
||||
// controller.on_peer_found(from, addr);
|
||||
controller.transition();
|
||||
}
|
||||
}
|
||||
|
||||
/// Terminate file sync of `min_tx_seq`.
|
||||
/// If `is_reverted` is `true` (means confirmed transactions reverted),
|
||||
/// also terminate `tx_seq` greater than `min_tx_seq`
|
||||
|
Loading…
Reference in New Issue
Block a user