mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-01-18 11:05:18 +00:00
Implement file sync protocol V2 (#249)
Some checks failed
abi-consistent-check / build-and-compare (push) Has been cancelled
code-coverage / unittest-cov (push) Has been cancelled
rust / check (push) Has been cancelled
rust / test (push) Has been cancelled
rust / lints (push) Has been cancelled
functional-test / test (push) Has been cancelled
Some checks failed
abi-consistent-check / build-and-compare (push) Has been cancelled
code-coverage / unittest-cov (push) Has been cancelled
rust / check (push) Has been cancelled
rust / test (push) Has been cancelled
rust / lints (push) Has been cancelled
functional-test / test (push) Has been cancelled
* Add new P2P protocol NewFile * Publish NewFile message when any file finalized * handle NewFile message in router * handle NewFile in sync servic to write in db * use propagation source to handle NewFile message * Disable sequential sync and store new file in v2 sync store * Add shard config in FindFile * Add AnnounceFile RPC message in network layer * do not propagate FindFile to whole network * Mark peer connected if FileAnnouncement RPC message received * fix unit test failures * Change P2P protocol version * Ignore py tests of sequential auto sync * Add py test for auto sync v2 * fmt code * remove dummy code in py test * fix random test failure * Add comments * Enable file sync protocol v2 in config file by default
This commit is contained in:
parent
789eae5cc1
commit
9b68a8b7d7
@ -20,6 +20,8 @@ pub struct GossipCache {
|
|||||||
topic_msgs: HashMap<GossipTopic, HashMap<Vec<u8>, Key>>,
|
topic_msgs: HashMap<GossipTopic, HashMap<Vec<u8>, Key>>,
|
||||||
/// Timeout for Example messages.
|
/// Timeout for Example messages.
|
||||||
example: Option<Duration>,
|
example: Option<Duration>,
|
||||||
|
/// Timeout for NewFile messages.
|
||||||
|
new_file: Option<Duration>,
|
||||||
/// Timeout for FindFile messages.
|
/// Timeout for FindFile messages.
|
||||||
find_file: Option<Duration>,
|
find_file: Option<Duration>,
|
||||||
/// Timeout for FindChunks messages.
|
/// Timeout for FindChunks messages.
|
||||||
@ -37,6 +39,8 @@ pub struct GossipCacheBuilder {
|
|||||||
default_timeout: Option<Duration>,
|
default_timeout: Option<Duration>,
|
||||||
/// Timeout for Example messages.
|
/// Timeout for Example messages.
|
||||||
example: Option<Duration>,
|
example: Option<Duration>,
|
||||||
|
/// Timeout for NewFile messages.
|
||||||
|
new_file: Option<Duration>,
|
||||||
/// Timeout for blocks FindFile messages.
|
/// Timeout for blocks FindFile messages.
|
||||||
find_file: Option<Duration>,
|
find_file: Option<Duration>,
|
||||||
/// Timeout for blocks FindChunks messages.
|
/// Timeout for blocks FindChunks messages.
|
||||||
@ -64,6 +68,12 @@ impl GossipCacheBuilder {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Timeout for NewFile messages.
|
||||||
|
pub fn new_file_timeout(mut self, timeout: Duration) -> Self {
|
||||||
|
self.new_file = Some(timeout);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
/// Timeout for FindFile messages.
|
/// Timeout for FindFile messages.
|
||||||
pub fn find_file_timeout(mut self, timeout: Duration) -> Self {
|
pub fn find_file_timeout(mut self, timeout: Duration) -> Self {
|
||||||
self.find_file = Some(timeout);
|
self.find_file = Some(timeout);
|
||||||
@ -98,6 +108,7 @@ impl GossipCacheBuilder {
|
|||||||
let GossipCacheBuilder {
|
let GossipCacheBuilder {
|
||||||
default_timeout,
|
default_timeout,
|
||||||
example,
|
example,
|
||||||
|
new_file,
|
||||||
find_file,
|
find_file,
|
||||||
find_chunks,
|
find_chunks,
|
||||||
announce_file,
|
announce_file,
|
||||||
@ -109,6 +120,7 @@ impl GossipCacheBuilder {
|
|||||||
expirations: DelayQueue::default(),
|
expirations: DelayQueue::default(),
|
||||||
topic_msgs: HashMap::default(),
|
topic_msgs: HashMap::default(),
|
||||||
example: example.or(default_timeout),
|
example: example.or(default_timeout),
|
||||||
|
new_file: new_file.or(default_timeout),
|
||||||
find_file: find_file.or(default_timeout),
|
find_file: find_file.or(default_timeout),
|
||||||
find_chunks: find_chunks.or(default_timeout),
|
find_chunks: find_chunks.or(default_timeout),
|
||||||
announce_file: announce_file.or(default_timeout),
|
announce_file: announce_file.or(default_timeout),
|
||||||
@ -129,6 +141,7 @@ impl GossipCache {
|
|||||||
pub fn insert(&mut self, topic: GossipTopic, data: Vec<u8>) {
|
pub fn insert(&mut self, topic: GossipTopic, data: Vec<u8>) {
|
||||||
let expire_timeout = match topic.kind() {
|
let expire_timeout = match topic.kind() {
|
||||||
GossipKind::Example => self.example,
|
GossipKind::Example => self.example,
|
||||||
|
GossipKind::NewFile => self.new_file,
|
||||||
GossipKind::FindFile => self.find_file,
|
GossipKind::FindFile => self.find_file,
|
||||||
GossipKind::FindChunks => self.find_chunks,
|
GossipKind::FindChunks => self.find_chunks,
|
||||||
GossipKind::AnnounceFile => self.announce_file,
|
GossipKind::AnnounceFile => self.announce_file,
|
||||||
|
@ -6,6 +6,7 @@ use crate::peer_manager::{
|
|||||||
ConnectionDirection, PeerManager, PeerManagerEvent,
|
ConnectionDirection, PeerManager, PeerManagerEvent,
|
||||||
};
|
};
|
||||||
use crate::rpc::methods::DataByHashRequest;
|
use crate::rpc::methods::DataByHashRequest;
|
||||||
|
use crate::rpc::methods::FileAnnouncement;
|
||||||
use crate::rpc::methods::GetChunksRequest;
|
use crate::rpc::methods::GetChunksRequest;
|
||||||
use crate::rpc::*;
|
use crate::rpc::*;
|
||||||
use crate::service::Context as ServiceContext;
|
use crate::service::Context as ServiceContext;
|
||||||
@ -232,6 +233,9 @@ impl<AppReqId: ReqId> Behaviour<AppReqId> {
|
|||||||
let topic: Topic = GossipTopic::new(kind, GossipEncoding::default()).into();
|
let topic: Topic = GossipTopic::new(kind, GossipEncoding::default()).into();
|
||||||
topic.hash()
|
topic.hash()
|
||||||
};
|
};
|
||||||
|
params
|
||||||
|
.topics
|
||||||
|
.insert(get_hash(GossipKind::NewFile), TopicScoreParams::default());
|
||||||
params
|
params
|
||||||
.topics
|
.topics
|
||||||
.insert(get_hash(GossipKind::FindFile), TopicScoreParams::default());
|
.insert(get_hash(GossipKind::FindFile), TopicScoreParams::default());
|
||||||
@ -543,6 +547,9 @@ impl<AppReqId: ReqId> Behaviour<AppReqId> {
|
|||||||
Request::DataByHash { .. } => {
|
Request::DataByHash { .. } => {
|
||||||
metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["data_by_hash"])
|
metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["data_by_hash"])
|
||||||
}
|
}
|
||||||
|
Request::AnnounceFile { .. } => {
|
||||||
|
metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["announce_file"])
|
||||||
|
}
|
||||||
Request::GetChunks { .. } => {
|
Request::GetChunks { .. } => {
|
||||||
metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["get_chunks"])
|
metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["get_chunks"])
|
||||||
}
|
}
|
||||||
@ -755,6 +762,9 @@ where
|
|||||||
InboundRequest::DataByHash(req) => {
|
InboundRequest::DataByHash(req) => {
|
||||||
self.propagate_request(peer_request_id, peer_id, Request::DataByHash(req))
|
self.propagate_request(peer_request_id, peer_id, Request::DataByHash(req))
|
||||||
}
|
}
|
||||||
|
InboundRequest::AnnounceFile(req) => {
|
||||||
|
self.propagate_request(peer_request_id, peer_id, Request::AnnounceFile(req))
|
||||||
|
}
|
||||||
InboundRequest::GetChunks(req) => {
|
InboundRequest::GetChunks(req) => {
|
||||||
self.propagate_request(peer_request_id, peer_id, Request::GetChunks(req))
|
self.propagate_request(peer_request_id, peer_id, Request::GetChunks(req))
|
||||||
}
|
}
|
||||||
@ -969,6 +979,8 @@ pub enum Request {
|
|||||||
Status(StatusMessage),
|
Status(StatusMessage),
|
||||||
/// A data by hash request.
|
/// A data by hash request.
|
||||||
DataByHash(DataByHashRequest),
|
DataByHash(DataByHashRequest),
|
||||||
|
/// An AnnounceFile message.
|
||||||
|
AnnounceFile(FileAnnouncement),
|
||||||
/// A GetChunks request.
|
/// A GetChunks request.
|
||||||
GetChunks(GetChunksRequest),
|
GetChunks(GetChunksRequest),
|
||||||
}
|
}
|
||||||
@ -978,6 +990,7 @@ impl std::convert::From<Request> for OutboundRequest {
|
|||||||
match req {
|
match req {
|
||||||
Request::Status(s) => OutboundRequest::Status(s),
|
Request::Status(s) => OutboundRequest::Status(s),
|
||||||
Request::DataByHash(r) => OutboundRequest::DataByHash(r),
|
Request::DataByHash(r) => OutboundRequest::DataByHash(r),
|
||||||
|
Request::AnnounceFile(r) => OutboundRequest::AnnounceFile(r),
|
||||||
Request::GetChunks(r) => OutboundRequest::GetChunks(r),
|
Request::GetChunks(r) => OutboundRequest::GetChunks(r),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -93,7 +93,10 @@ pub use peer_manager::{
|
|||||||
};
|
};
|
||||||
pub use service::{load_private_key, Context, Libp2pEvent, Service, NETWORK_KEY_FILENAME};
|
pub use service::{load_private_key, Context, Libp2pEvent, Service, NETWORK_KEY_FILENAME};
|
||||||
|
|
||||||
pub const PROTOCOL_VERSION: [u8; 3] = [0, 1, 0];
|
/// Defines the current P2P protocol version.
|
||||||
|
/// - v1: Broadcast FindFile & AnnounceFile messages in the whole network, which caused network too heavey.
|
||||||
|
/// - v2: Publish NewFile to neighbors only and announce file via RPC message.
|
||||||
|
pub const PROTOCOL_VERSION: [u8; 3] = [0, 2, 0];
|
||||||
|
|
||||||
/// Application level requests sent to the network.
|
/// Application level requests sent to the network.
|
||||||
#[derive(Debug, Clone, Copy)]
|
#[derive(Debug, Clone, Copy)]
|
||||||
|
@ -460,6 +460,7 @@ impl PeerManager {
|
|||||||
Protocol::Goodbye => PeerAction::LowToleranceError,
|
Protocol::Goodbye => PeerAction::LowToleranceError,
|
||||||
Protocol::Status => PeerAction::LowToleranceError,
|
Protocol::Status => PeerAction::LowToleranceError,
|
||||||
Protocol::DataByHash => PeerAction::MidToleranceError,
|
Protocol::DataByHash => PeerAction::MidToleranceError,
|
||||||
|
Protocol::AnnounceFile => PeerAction::MidToleranceError,
|
||||||
Protocol::GetChunks => PeerAction::MidToleranceError,
|
Protocol::GetChunks => PeerAction::MidToleranceError,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -474,6 +475,7 @@ impl PeerManager {
|
|||||||
Protocol::Goodbye => return,
|
Protocol::Goodbye => return,
|
||||||
Protocol::Status => PeerAction::LowToleranceError,
|
Protocol::Status => PeerAction::LowToleranceError,
|
||||||
Protocol::DataByHash => return,
|
Protocol::DataByHash => return,
|
||||||
|
Protocol::AnnounceFile => return,
|
||||||
Protocol::GetChunks => return,
|
Protocol::GetChunks => return,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -488,6 +490,7 @@ impl PeerManager {
|
|||||||
Protocol::Goodbye => return,
|
Protocol::Goodbye => return,
|
||||||
Protocol::Status => return,
|
Protocol::Status => return,
|
||||||
Protocol::DataByHash => PeerAction::MidToleranceError,
|
Protocol::DataByHash => PeerAction::MidToleranceError,
|
||||||
|
Protocol::AnnounceFile => PeerAction::MidToleranceError,
|
||||||
Protocol::GetChunks => PeerAction::MidToleranceError,
|
Protocol::GetChunks => PeerAction::MidToleranceError,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -159,6 +159,7 @@ impl Encoder<OutboundRequest> for SSZSnappyOutboundCodec {
|
|||||||
OutboundRequest::Goodbye(req) => req.as_ssz_bytes(),
|
OutboundRequest::Goodbye(req) => req.as_ssz_bytes(),
|
||||||
OutboundRequest::Ping(req) => req.as_ssz_bytes(),
|
OutboundRequest::Ping(req) => req.as_ssz_bytes(),
|
||||||
OutboundRequest::DataByHash(req) => req.hashes.as_ssz_bytes(),
|
OutboundRequest::DataByHash(req) => req.hashes.as_ssz_bytes(),
|
||||||
|
OutboundRequest::AnnounceFile(req) => req.as_ssz_bytes(),
|
||||||
OutboundRequest::GetChunks(req) => req.as_ssz_bytes(),
|
OutboundRequest::GetChunks(req) => req.as_ssz_bytes(),
|
||||||
};
|
};
|
||||||
// SSZ encoded bytes should be within `max_packet_size`
|
// SSZ encoded bytes should be within `max_packet_size`
|
||||||
@ -346,6 +347,9 @@ fn handle_v1_request(
|
|||||||
Protocol::DataByHash => Ok(Some(InboundRequest::DataByHash(DataByHashRequest {
|
Protocol::DataByHash => Ok(Some(InboundRequest::DataByHash(DataByHashRequest {
|
||||||
hashes: VariableList::from_ssz_bytes(decoded_buffer)?,
|
hashes: VariableList::from_ssz_bytes(decoded_buffer)?,
|
||||||
}))),
|
}))),
|
||||||
|
Protocol::AnnounceFile => Ok(Some(InboundRequest::AnnounceFile(
|
||||||
|
FileAnnouncement::from_ssz_bytes(decoded_buffer)?,
|
||||||
|
))),
|
||||||
Protocol::GetChunks => Ok(Some(InboundRequest::GetChunks(
|
Protocol::GetChunks => Ok(Some(InboundRequest::GetChunks(
|
||||||
GetChunksRequest::from_ssz_bytes(decoded_buffer)?,
|
GetChunksRequest::from_ssz_bytes(decoded_buffer)?,
|
||||||
))),
|
))),
|
||||||
@ -373,6 +377,10 @@ fn handle_v1_response(
|
|||||||
Protocol::DataByHash => Ok(Some(RPCResponse::DataByHash(Box::new(
|
Protocol::DataByHash => Ok(Some(RPCResponse::DataByHash(Box::new(
|
||||||
ZgsData::from_ssz_bytes(decoded_buffer)?,
|
ZgsData::from_ssz_bytes(decoded_buffer)?,
|
||||||
)))),
|
)))),
|
||||||
|
// This case should be unreachable as `AnnounceFile` has no response.
|
||||||
|
Protocol::AnnounceFile => Err(RPCError::InvalidData(
|
||||||
|
"AnnounceFile RPC message has no valid response".to_string(),
|
||||||
|
)),
|
||||||
Protocol::GetChunks => Ok(Some(RPCResponse::Chunks(
|
Protocol::GetChunks => Ok(Some(RPCResponse::Chunks(
|
||||||
ChunkArrayWithProof::from_ssz_bytes(decoded_buffer)?,
|
ChunkArrayWithProof::from_ssz_bytes(decoded_buffer)?,
|
||||||
))),
|
))),
|
||||||
|
@ -178,6 +178,14 @@ pub struct DataByHashRequest {
|
|||||||
pub hashes: VariableList<Hash256, MaxRequestBlocks>,
|
pub hashes: VariableList<Hash256, MaxRequestBlocks>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The message of `AnnounceFile` RPC message.
|
||||||
|
#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
|
||||||
|
pub struct FileAnnouncement {
|
||||||
|
pub tx_id: TxID,
|
||||||
|
pub num_shard: usize,
|
||||||
|
pub shard_id: usize,
|
||||||
|
}
|
||||||
|
|
||||||
/// Request a chunk array from a peer.
|
/// Request a chunk array from a peer.
|
||||||
#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
|
#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
|
||||||
pub struct GetChunksRequest {
|
pub struct GetChunksRequest {
|
||||||
|
@ -118,6 +118,7 @@ impl<Id: ReqId> RPC<Id> {
|
|||||||
.n_every(Protocol::Status, 5, Duration::from_secs(15))
|
.n_every(Protocol::Status, 5, Duration::from_secs(15))
|
||||||
.one_every(Protocol::Goodbye, Duration::from_secs(10))
|
.one_every(Protocol::Goodbye, Duration::from_secs(10))
|
||||||
.n_every(Protocol::DataByHash, 128, Duration::from_secs(10))
|
.n_every(Protocol::DataByHash, 128, Duration::from_secs(10))
|
||||||
|
.n_every(Protocol::AnnounceFile, 256, Duration::from_secs(10))
|
||||||
.n_every(Protocol::GetChunks, 4096, Duration::from_secs(10))
|
.n_every(Protocol::GetChunks, 4096, Duration::from_secs(10))
|
||||||
.build()
|
.build()
|
||||||
.expect("Configuration parameters are valid");
|
.expect("Configuration parameters are valid");
|
||||||
|
@ -34,6 +34,7 @@ pub enum OutboundRequest {
|
|||||||
Goodbye(GoodbyeReason),
|
Goodbye(GoodbyeReason),
|
||||||
Ping(Ping),
|
Ping(Ping),
|
||||||
DataByHash(DataByHashRequest),
|
DataByHash(DataByHashRequest),
|
||||||
|
AnnounceFile(FileAnnouncement),
|
||||||
GetChunks(GetChunksRequest),
|
GetChunks(GetChunksRequest),
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -72,6 +73,11 @@ impl OutboundRequest {
|
|||||||
Version::V1,
|
Version::V1,
|
||||||
Encoding::SSZSnappy,
|
Encoding::SSZSnappy,
|
||||||
)],
|
)],
|
||||||
|
OutboundRequest::AnnounceFile(_) => vec![ProtocolId::new(
|
||||||
|
Protocol::AnnounceFile,
|
||||||
|
Version::V1,
|
||||||
|
Encoding::SSZSnappy,
|
||||||
|
)],
|
||||||
OutboundRequest::GetChunks(_) => vec![ProtocolId::new(
|
OutboundRequest::GetChunks(_) => vec![ProtocolId::new(
|
||||||
Protocol::GetChunks,
|
Protocol::GetChunks,
|
||||||
Version::V1,
|
Version::V1,
|
||||||
@ -89,6 +95,7 @@ impl OutboundRequest {
|
|||||||
OutboundRequest::Goodbye(_) => 0,
|
OutboundRequest::Goodbye(_) => 0,
|
||||||
OutboundRequest::Ping(_) => 1,
|
OutboundRequest::Ping(_) => 1,
|
||||||
OutboundRequest::DataByHash(req) => req.hashes.len() as u64,
|
OutboundRequest::DataByHash(req) => req.hashes.len() as u64,
|
||||||
|
OutboundRequest::AnnounceFile(_) => 0,
|
||||||
OutboundRequest::GetChunks(_) => 1,
|
OutboundRequest::GetChunks(_) => 1,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -100,6 +107,7 @@ impl OutboundRequest {
|
|||||||
OutboundRequest::Goodbye(_) => Protocol::Goodbye,
|
OutboundRequest::Goodbye(_) => Protocol::Goodbye,
|
||||||
OutboundRequest::Ping(_) => Protocol::Ping,
|
OutboundRequest::Ping(_) => Protocol::Ping,
|
||||||
OutboundRequest::DataByHash(_) => Protocol::DataByHash,
|
OutboundRequest::DataByHash(_) => Protocol::DataByHash,
|
||||||
|
OutboundRequest::AnnounceFile(_) => Protocol::AnnounceFile,
|
||||||
OutboundRequest::GetChunks(_) => Protocol::GetChunks,
|
OutboundRequest::GetChunks(_) => Protocol::GetChunks,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -114,6 +122,7 @@ impl OutboundRequest {
|
|||||||
OutboundRequest::Status(_) => unreachable!(),
|
OutboundRequest::Status(_) => unreachable!(),
|
||||||
OutboundRequest::Goodbye(_) => unreachable!(),
|
OutboundRequest::Goodbye(_) => unreachable!(),
|
||||||
OutboundRequest::Ping(_) => unreachable!(),
|
OutboundRequest::Ping(_) => unreachable!(),
|
||||||
|
OutboundRequest::AnnounceFile(_) => unreachable!(),
|
||||||
OutboundRequest::GetChunks(_) => unreachable!(),
|
OutboundRequest::GetChunks(_) => unreachable!(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -170,6 +179,9 @@ impl std::fmt::Display for OutboundRequest {
|
|||||||
OutboundRequest::DataByHash(req) => {
|
OutboundRequest::DataByHash(req) => {
|
||||||
write!(f, "Data by hash: {:?}", req)
|
write!(f, "Data by hash: {:?}", req)
|
||||||
}
|
}
|
||||||
|
OutboundRequest::AnnounceFile(req) => {
|
||||||
|
write!(f, "AnnounceFile: {:?}", req)
|
||||||
|
}
|
||||||
OutboundRequest::GetChunks(req) => {
|
OutboundRequest::GetChunks(req) => {
|
||||||
write!(f, "GetChunks: {:?}", req)
|
write!(f, "GetChunks: {:?}", req)
|
||||||
}
|
}
|
||||||
|
@ -91,6 +91,8 @@ pub enum Protocol {
|
|||||||
/// TODO
|
/// TODO
|
||||||
DataByHash,
|
DataByHash,
|
||||||
|
|
||||||
|
/// The file announce protocol.
|
||||||
|
AnnounceFile,
|
||||||
/// The Chunk sync protocol.
|
/// The Chunk sync protocol.
|
||||||
GetChunks,
|
GetChunks,
|
||||||
}
|
}
|
||||||
@ -115,6 +117,7 @@ impl std::fmt::Display for Protocol {
|
|||||||
Protocol::Goodbye => "goodbye",
|
Protocol::Goodbye => "goodbye",
|
||||||
Protocol::Ping => "ping",
|
Protocol::Ping => "ping",
|
||||||
Protocol::DataByHash => "data_by_hash",
|
Protocol::DataByHash => "data_by_hash",
|
||||||
|
Protocol::AnnounceFile => "announce_file",
|
||||||
Protocol::GetChunks => "get_chunks",
|
Protocol::GetChunks => "get_chunks",
|
||||||
};
|
};
|
||||||
f.write_str(repr)
|
f.write_str(repr)
|
||||||
@ -155,6 +158,7 @@ impl UpgradeInfo for RPCProtocol {
|
|||||||
ProtocolId::new(Protocol::Goodbye, Version::V1, Encoding::SSZSnappy),
|
ProtocolId::new(Protocol::Goodbye, Version::V1, Encoding::SSZSnappy),
|
||||||
ProtocolId::new(Protocol::Ping, Version::V1, Encoding::SSZSnappy),
|
ProtocolId::new(Protocol::Ping, Version::V1, Encoding::SSZSnappy),
|
||||||
ProtocolId::new(Protocol::DataByHash, Version::V1, Encoding::SSZSnappy),
|
ProtocolId::new(Protocol::DataByHash, Version::V1, Encoding::SSZSnappy),
|
||||||
|
ProtocolId::new(Protocol::AnnounceFile, Version::V1, Encoding::SSZSnappy),
|
||||||
ProtocolId::new(Protocol::GetChunks, Version::V1, Encoding::SSZSnappy),
|
ProtocolId::new(Protocol::GetChunks, Version::V1, Encoding::SSZSnappy),
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
@ -216,6 +220,10 @@ impl ProtocolId {
|
|||||||
// TODO
|
// TODO
|
||||||
RpcLimits::new(1, *DATA_BY_HASH_REQUEST_MAX)
|
RpcLimits::new(1, *DATA_BY_HASH_REQUEST_MAX)
|
||||||
}
|
}
|
||||||
|
Protocol::AnnounceFile => RpcLimits::new(
|
||||||
|
<FileAnnouncement as Encode>::ssz_fixed_len(),
|
||||||
|
<FileAnnouncement as Encode>::ssz_fixed_len(),
|
||||||
|
),
|
||||||
Protocol::GetChunks => RpcLimits::new(
|
Protocol::GetChunks => RpcLimits::new(
|
||||||
<GetChunksRequest as Encode>::ssz_fixed_len(),
|
<GetChunksRequest as Encode>::ssz_fixed_len(),
|
||||||
<GetChunksRequest as Encode>::ssz_fixed_len(),
|
<GetChunksRequest as Encode>::ssz_fixed_len(),
|
||||||
@ -243,6 +251,7 @@ impl ProtocolId {
|
|||||||
<ZgsData as Encode>::ssz_fixed_len(),
|
<ZgsData as Encode>::ssz_fixed_len(),
|
||||||
),
|
),
|
||||||
|
|
||||||
|
Protocol::AnnounceFile => RpcLimits::new(0, 0), // AnnounceFile request has no response
|
||||||
Protocol::GetChunks => RpcLimits::new(*CHUNKS_RESPONSE_MIN, *CHUNKS_RESPONSE_MAX),
|
Protocol::GetChunks => RpcLimits::new(*CHUNKS_RESPONSE_MIN, *CHUNKS_RESPONSE_MAX),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -325,6 +334,7 @@ pub enum InboundRequest {
|
|||||||
Goodbye(GoodbyeReason),
|
Goodbye(GoodbyeReason),
|
||||||
Ping(Ping),
|
Ping(Ping),
|
||||||
DataByHash(DataByHashRequest),
|
DataByHash(DataByHashRequest),
|
||||||
|
AnnounceFile(FileAnnouncement),
|
||||||
GetChunks(GetChunksRequest),
|
GetChunks(GetChunksRequest),
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -363,6 +373,11 @@ impl InboundRequest {
|
|||||||
Version::V1,
|
Version::V1,
|
||||||
Encoding::SSZSnappy,
|
Encoding::SSZSnappy,
|
||||||
)],
|
)],
|
||||||
|
InboundRequest::AnnounceFile(_) => vec![ProtocolId::new(
|
||||||
|
Protocol::AnnounceFile,
|
||||||
|
Version::V1,
|
||||||
|
Encoding::SSZSnappy,
|
||||||
|
)],
|
||||||
InboundRequest::GetChunks(_) => vec![ProtocolId::new(
|
InboundRequest::GetChunks(_) => vec![ProtocolId::new(
|
||||||
Protocol::GetChunks,
|
Protocol::GetChunks,
|
||||||
Version::V1,
|
Version::V1,
|
||||||
@ -380,6 +395,7 @@ impl InboundRequest {
|
|||||||
InboundRequest::Goodbye(_) => 0,
|
InboundRequest::Goodbye(_) => 0,
|
||||||
InboundRequest::DataByHash(req) => req.hashes.len() as u64,
|
InboundRequest::DataByHash(req) => req.hashes.len() as u64,
|
||||||
InboundRequest::Ping(_) => 1,
|
InboundRequest::Ping(_) => 1,
|
||||||
|
InboundRequest::AnnounceFile(_) => 0,
|
||||||
InboundRequest::GetChunks(_) => 1,
|
InboundRequest::GetChunks(_) => 1,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -391,6 +407,7 @@ impl InboundRequest {
|
|||||||
InboundRequest::Goodbye(_) => Protocol::Goodbye,
|
InboundRequest::Goodbye(_) => Protocol::Goodbye,
|
||||||
InboundRequest::Ping(_) => Protocol::Ping,
|
InboundRequest::Ping(_) => Protocol::Ping,
|
||||||
InboundRequest::DataByHash(_) => Protocol::DataByHash,
|
InboundRequest::DataByHash(_) => Protocol::DataByHash,
|
||||||
|
InboundRequest::AnnounceFile(_) => Protocol::AnnounceFile,
|
||||||
InboundRequest::GetChunks(_) => Protocol::GetChunks,
|
InboundRequest::GetChunks(_) => Protocol::GetChunks,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -405,6 +422,7 @@ impl InboundRequest {
|
|||||||
InboundRequest::Status(_) => unreachable!(),
|
InboundRequest::Status(_) => unreachable!(),
|
||||||
InboundRequest::Goodbye(_) => unreachable!(),
|
InboundRequest::Goodbye(_) => unreachable!(),
|
||||||
InboundRequest::Ping(_) => unreachable!(),
|
InboundRequest::Ping(_) => unreachable!(),
|
||||||
|
InboundRequest::AnnounceFile(_) => unreachable!(),
|
||||||
InboundRequest::GetChunks(_) => unreachable!(),
|
InboundRequest::GetChunks(_) => unreachable!(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -523,6 +541,9 @@ impl std::fmt::Display for InboundRequest {
|
|||||||
InboundRequest::DataByHash(req) => {
|
InboundRequest::DataByHash(req) => {
|
||||||
write!(f, "Data by hash: {:?}", req)
|
write!(f, "Data by hash: {:?}", req)
|
||||||
}
|
}
|
||||||
|
InboundRequest::AnnounceFile(req) => {
|
||||||
|
write!(f, "Announce File: {:?}", req)
|
||||||
|
}
|
||||||
InboundRequest::GetChunks(req) => {
|
InboundRequest::GetChunks(req) => {
|
||||||
write!(f, "Get Chunks: {:?}", req)
|
write!(f, "Get Chunks: {:?}", req)
|
||||||
}
|
}
|
||||||
|
@ -68,6 +68,8 @@ pub struct RPCRateLimiter {
|
|||||||
status_rl: Limiter<PeerId>,
|
status_rl: Limiter<PeerId>,
|
||||||
/// DataByHash rate limiter.
|
/// DataByHash rate limiter.
|
||||||
data_by_hash_rl: Limiter<PeerId>,
|
data_by_hash_rl: Limiter<PeerId>,
|
||||||
|
/// AnnounceFile rate limiter.
|
||||||
|
announce_file_rl: Limiter<PeerId>,
|
||||||
/// GetChunks rate limiter.
|
/// GetChunks rate limiter.
|
||||||
get_chunks_rl: Limiter<PeerId>,
|
get_chunks_rl: Limiter<PeerId>,
|
||||||
}
|
}
|
||||||
@ -91,6 +93,8 @@ pub struct RPCRateLimiterBuilder {
|
|||||||
status_quota: Option<Quota>,
|
status_quota: Option<Quota>,
|
||||||
/// Quota for the DataByHash protocol.
|
/// Quota for the DataByHash protocol.
|
||||||
data_by_hash_quota: Option<Quota>,
|
data_by_hash_quota: Option<Quota>,
|
||||||
|
/// Quota for the AnnounceFile protocol.
|
||||||
|
announce_file_quota: Option<Quota>,
|
||||||
/// Quota for the GetChunks protocol.
|
/// Quota for the GetChunks protocol.
|
||||||
get_chunks_quota: Option<Quota>,
|
get_chunks_quota: Option<Quota>,
|
||||||
}
|
}
|
||||||
@ -109,6 +113,7 @@ impl RPCRateLimiterBuilder {
|
|||||||
Protocol::Status => self.status_quota = q,
|
Protocol::Status => self.status_quota = q,
|
||||||
Protocol::Goodbye => self.goodbye_quota = q,
|
Protocol::Goodbye => self.goodbye_quota = q,
|
||||||
Protocol::DataByHash => self.data_by_hash_quota = q,
|
Protocol::DataByHash => self.data_by_hash_quota = q,
|
||||||
|
Protocol::AnnounceFile => self.announce_file_quota = q,
|
||||||
Protocol::GetChunks => self.get_chunks_quota = q,
|
Protocol::GetChunks => self.get_chunks_quota = q,
|
||||||
}
|
}
|
||||||
self
|
self
|
||||||
@ -145,6 +150,9 @@ impl RPCRateLimiterBuilder {
|
|||||||
let data_by_hash_quota = self
|
let data_by_hash_quota = self
|
||||||
.data_by_hash_quota
|
.data_by_hash_quota
|
||||||
.ok_or("DataByHash quota not specified")?;
|
.ok_or("DataByHash quota not specified")?;
|
||||||
|
let announce_file_quota = self
|
||||||
|
.announce_file_quota
|
||||||
|
.ok_or("AnnounceFile quota not specified")?;
|
||||||
let get_chunks_quota = self
|
let get_chunks_quota = self
|
||||||
.get_chunks_quota
|
.get_chunks_quota
|
||||||
.ok_or("GetChunks quota not specified")?;
|
.ok_or("GetChunks quota not specified")?;
|
||||||
@ -154,6 +162,7 @@ impl RPCRateLimiterBuilder {
|
|||||||
let status_rl = Limiter::from_quota(status_quota)?;
|
let status_rl = Limiter::from_quota(status_quota)?;
|
||||||
let goodbye_rl = Limiter::from_quota(goodbye_quota)?;
|
let goodbye_rl = Limiter::from_quota(goodbye_quota)?;
|
||||||
let data_by_hash_rl = Limiter::from_quota(data_by_hash_quota)?;
|
let data_by_hash_rl = Limiter::from_quota(data_by_hash_quota)?;
|
||||||
|
let announce_file_rl = Limiter::from_quota(announce_file_quota)?;
|
||||||
let get_chunks_rl = Limiter::from_quota(get_chunks_quota)?;
|
let get_chunks_rl = Limiter::from_quota(get_chunks_quota)?;
|
||||||
|
|
||||||
// check for peers to prune every 30 seconds, starting in 30 seconds
|
// check for peers to prune every 30 seconds, starting in 30 seconds
|
||||||
@ -166,6 +175,7 @@ impl RPCRateLimiterBuilder {
|
|||||||
status_rl,
|
status_rl,
|
||||||
goodbye_rl,
|
goodbye_rl,
|
||||||
data_by_hash_rl,
|
data_by_hash_rl,
|
||||||
|
announce_file_rl,
|
||||||
get_chunks_rl,
|
get_chunks_rl,
|
||||||
init_time: Instant::now(),
|
init_time: Instant::now(),
|
||||||
})
|
})
|
||||||
@ -210,6 +220,7 @@ impl RPCRateLimiter {
|
|||||||
Protocol::Status => &mut self.status_rl,
|
Protocol::Status => &mut self.status_rl,
|
||||||
Protocol::Goodbye => &mut self.goodbye_rl,
|
Protocol::Goodbye => &mut self.goodbye_rl,
|
||||||
Protocol::DataByHash => &mut self.data_by_hash_rl,
|
Protocol::DataByHash => &mut self.data_by_hash_rl,
|
||||||
|
Protocol::AnnounceFile => &mut self.announce_file_rl,
|
||||||
Protocol::GetChunks => &mut self.get_chunks_rl,
|
Protocol::GetChunks => &mut self.get_chunks_rl,
|
||||||
};
|
};
|
||||||
check(limiter)
|
check(limiter)
|
||||||
|
@ -7,7 +7,7 @@ pub type Enr = discv5::enr::Enr<discv5::enr::CombinedKey>;
|
|||||||
|
|
||||||
pub use globals::NetworkGlobals;
|
pub use globals::NetworkGlobals;
|
||||||
pub use pubsub::{
|
pub use pubsub::{
|
||||||
AnnounceChunks, AnnounceFile, AnnounceShardConfig, FindChunks, FindFile, HasSignature,
|
AnnounceChunks, AnnounceFile, AnnounceShardConfig, FindChunks, FindFile, HasSignature, NewFile,
|
||||||
PubsubMessage, SignedAnnounceChunks, SignedAnnounceFile, SignedAnnounceShardConfig,
|
PubsubMessage, SignedAnnounceChunks, SignedAnnounceFile, SignedAnnounceShardConfig,
|
||||||
SignedMessage, SnappyTransform,
|
SignedMessage, SnappyTransform,
|
||||||
};
|
};
|
||||||
|
@ -114,9 +114,22 @@ impl ssz::Decode for WrappedPeerId {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Published when file uploaded or completed to sync from other peers.
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)]
|
||||||
|
pub struct NewFile {
|
||||||
|
pub tx_id: TxID,
|
||||||
|
pub num_shard: usize,
|
||||||
|
pub shard_id: usize,
|
||||||
|
pub timestamp: u32,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)]
|
#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)]
|
||||||
pub struct FindFile {
|
pub struct FindFile {
|
||||||
pub tx_id: TxID,
|
pub tx_id: TxID,
|
||||||
|
pub num_shard: usize,
|
||||||
|
pub shard_id: usize,
|
||||||
|
/// Indicates whether publish to neighboar nodes only.
|
||||||
|
pub neighbors_only: bool,
|
||||||
pub timestamp: u32,
|
pub timestamp: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -205,6 +218,7 @@ type SignedAnnounceFiles = Vec<SignedAnnounceFile>;
|
|||||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
pub enum PubsubMessage {
|
pub enum PubsubMessage {
|
||||||
ExampleMessage(u64),
|
ExampleMessage(u64),
|
||||||
|
NewFile(NewFile),
|
||||||
FindFile(FindFile),
|
FindFile(FindFile),
|
||||||
FindChunks(FindChunks),
|
FindChunks(FindChunks),
|
||||||
AnnounceFile(Vec<SignedAnnounceFile>),
|
AnnounceFile(Vec<SignedAnnounceFile>),
|
||||||
@ -283,6 +297,7 @@ impl PubsubMessage {
|
|||||||
pub fn kind(&self) -> GossipKind {
|
pub fn kind(&self) -> GossipKind {
|
||||||
match self {
|
match self {
|
||||||
PubsubMessage::ExampleMessage(_) => GossipKind::Example,
|
PubsubMessage::ExampleMessage(_) => GossipKind::Example,
|
||||||
|
PubsubMessage::NewFile(_) => GossipKind::NewFile,
|
||||||
PubsubMessage::FindFile(_) => GossipKind::FindFile,
|
PubsubMessage::FindFile(_) => GossipKind::FindFile,
|
||||||
PubsubMessage::FindChunks(_) => GossipKind::FindChunks,
|
PubsubMessage::FindChunks(_) => GossipKind::FindChunks,
|
||||||
PubsubMessage::AnnounceFile(_) => GossipKind::AnnounceFile,
|
PubsubMessage::AnnounceFile(_) => GossipKind::AnnounceFile,
|
||||||
@ -309,6 +324,9 @@ impl PubsubMessage {
|
|||||||
GossipKind::Example => Ok(PubsubMessage::ExampleMessage(
|
GossipKind::Example => Ok(PubsubMessage::ExampleMessage(
|
||||||
u64::from_ssz_bytes(data).map_err(|e| format!("{:?}", e))?,
|
u64::from_ssz_bytes(data).map_err(|e| format!("{:?}", e))?,
|
||||||
)),
|
)),
|
||||||
|
GossipKind::NewFile => Ok(PubsubMessage::NewFile(
|
||||||
|
NewFile::from_ssz_bytes(data).map_err(|e| format!("{:?}", e))?,
|
||||||
|
)),
|
||||||
GossipKind::FindFile => Ok(PubsubMessage::FindFile(
|
GossipKind::FindFile => Ok(PubsubMessage::FindFile(
|
||||||
FindFile::from_ssz_bytes(data).map_err(|e| format!("{:?}", e))?,
|
FindFile::from_ssz_bytes(data).map_err(|e| format!("{:?}", e))?,
|
||||||
)),
|
)),
|
||||||
@ -341,6 +359,7 @@ impl PubsubMessage {
|
|||||||
// messages for us.
|
// messages for us.
|
||||||
match &self {
|
match &self {
|
||||||
PubsubMessage::ExampleMessage(data) => data.as_ssz_bytes(),
|
PubsubMessage::ExampleMessage(data) => data.as_ssz_bytes(),
|
||||||
|
PubsubMessage::NewFile(data) => data.as_ssz_bytes(),
|
||||||
PubsubMessage::FindFile(data) => data.as_ssz_bytes(),
|
PubsubMessage::FindFile(data) => data.as_ssz_bytes(),
|
||||||
PubsubMessage::FindChunks(data) => data.as_ssz_bytes(),
|
PubsubMessage::FindChunks(data) => data.as_ssz_bytes(),
|
||||||
PubsubMessage::AnnounceFile(data) => data.as_ssz_bytes(),
|
PubsubMessage::AnnounceFile(data) => data.as_ssz_bytes(),
|
||||||
@ -356,6 +375,9 @@ impl std::fmt::Display for PubsubMessage {
|
|||||||
PubsubMessage::ExampleMessage(msg) => {
|
PubsubMessage::ExampleMessage(msg) => {
|
||||||
write!(f, "Example message: {}", msg)
|
write!(f, "Example message: {}", msg)
|
||||||
}
|
}
|
||||||
|
PubsubMessage::NewFile(msg) => {
|
||||||
|
write!(f, "NewFile message: {:?}", msg)
|
||||||
|
}
|
||||||
PubsubMessage::FindFile(msg) => {
|
PubsubMessage::FindFile(msg) => {
|
||||||
write!(f, "FindFile message: {:?}", msg)
|
write!(f, "FindFile message: {:?}", msg)
|
||||||
}
|
}
|
||||||
|
@ -8,13 +8,15 @@ use strum::AsRefStr;
|
|||||||
pub const TOPIC_PREFIX: &str = "eth2";
|
pub const TOPIC_PREFIX: &str = "eth2";
|
||||||
pub const SSZ_SNAPPY_ENCODING_POSTFIX: &str = "ssz_snappy";
|
pub const SSZ_SNAPPY_ENCODING_POSTFIX: &str = "ssz_snappy";
|
||||||
pub const EXAMPLE_TOPIC: &str = "example";
|
pub const EXAMPLE_TOPIC: &str = "example";
|
||||||
|
pub const NEW_FILE_TOPIC: &str = "new_file";
|
||||||
pub const FIND_FILE_TOPIC: &str = "find_file";
|
pub const FIND_FILE_TOPIC: &str = "find_file";
|
||||||
pub const FIND_CHUNKS_TOPIC: &str = "find_chunks";
|
pub const FIND_CHUNKS_TOPIC: &str = "find_chunks";
|
||||||
pub const ANNOUNCE_FILE_TOPIC: &str = "announce_file";
|
pub const ANNOUNCE_FILE_TOPIC: &str = "announce_file";
|
||||||
pub const ANNOUNCE_CHUNKS_TOPIC: &str = "announce_chunks";
|
pub const ANNOUNCE_CHUNKS_TOPIC: &str = "announce_chunks";
|
||||||
pub const ANNOUNCE_SHARD_CONFIG_TOPIC: &str = "announce_shard_config";
|
pub const ANNOUNCE_SHARD_CONFIG_TOPIC: &str = "announce_shard_config";
|
||||||
|
|
||||||
pub const CORE_TOPICS: [GossipKind; 4] = [
|
pub const CORE_TOPICS: [GossipKind; 5] = [
|
||||||
|
GossipKind::NewFile,
|
||||||
GossipKind::FindFile,
|
GossipKind::FindFile,
|
||||||
GossipKind::FindChunks,
|
GossipKind::FindChunks,
|
||||||
GossipKind::AnnounceFile,
|
GossipKind::AnnounceFile,
|
||||||
@ -37,6 +39,7 @@ pub struct GossipTopic {
|
|||||||
#[strum(serialize_all = "snake_case")]
|
#[strum(serialize_all = "snake_case")]
|
||||||
pub enum GossipKind {
|
pub enum GossipKind {
|
||||||
Example,
|
Example,
|
||||||
|
NewFile,
|
||||||
FindFile,
|
FindFile,
|
||||||
FindChunks,
|
FindChunks,
|
||||||
AnnounceFile,
|
AnnounceFile,
|
||||||
@ -77,6 +80,7 @@ impl GossipTopic {
|
|||||||
|
|
||||||
let kind = match topic_parts[2] {
|
let kind = match topic_parts[2] {
|
||||||
EXAMPLE_TOPIC => GossipKind::Example,
|
EXAMPLE_TOPIC => GossipKind::Example,
|
||||||
|
NEW_FILE_TOPIC => GossipKind::NewFile,
|
||||||
FIND_FILE_TOPIC => GossipKind::FindFile,
|
FIND_FILE_TOPIC => GossipKind::FindFile,
|
||||||
FIND_CHUNKS_TOPIC => GossipKind::FindChunks,
|
FIND_CHUNKS_TOPIC => GossipKind::FindChunks,
|
||||||
ANNOUNCE_FILE_TOPIC => GossipKind::AnnounceFile,
|
ANNOUNCE_FILE_TOPIC => GossipKind::AnnounceFile,
|
||||||
@ -106,6 +110,7 @@ impl From<GossipTopic> for String {
|
|||||||
|
|
||||||
let kind = match topic.kind {
|
let kind = match topic.kind {
|
||||||
GossipKind::Example => EXAMPLE_TOPIC,
|
GossipKind::Example => EXAMPLE_TOPIC,
|
||||||
|
GossipKind::NewFile => NEW_FILE_TOPIC,
|
||||||
GossipKind::FindFile => FIND_FILE_TOPIC,
|
GossipKind::FindFile => FIND_FILE_TOPIC,
|
||||||
GossipKind::FindChunks => FIND_CHUNKS_TOPIC,
|
GossipKind::FindChunks => FIND_CHUNKS_TOPIC,
|
||||||
GossipKind::AnnounceFile => ANNOUNCE_FILE_TOPIC,
|
GossipKind::AnnounceFile => ANNOUNCE_FILE_TOPIC,
|
||||||
@ -125,6 +130,7 @@ impl std::fmt::Display for GossipTopic {
|
|||||||
|
|
||||||
let kind = match self.kind {
|
let kind = match self.kind {
|
||||||
GossipKind::Example => EXAMPLE_TOPIC,
|
GossipKind::Example => EXAMPLE_TOPIC,
|
||||||
|
GossipKind::NewFile => NEW_FILE_TOPIC,
|
||||||
GossipKind::FindFile => FIND_FILE_TOPIC,
|
GossipKind::FindFile => FIND_FILE_TOPIC,
|
||||||
GossipKind::FindChunks => FIND_CHUNKS_TOPIC,
|
GossipKind::FindChunks => FIND_CHUNKS_TOPIC,
|
||||||
GossipKind::AnnounceFile => ANNOUNCE_FILE_TOPIC,
|
GossipKind::AnnounceFile => ANNOUNCE_FILE_TOPIC,
|
||||||
|
@ -5,7 +5,8 @@ use std::{ops::Neg, sync::Arc};
|
|||||||
use chunk_pool::ChunkPoolMessage;
|
use chunk_pool::ChunkPoolMessage;
|
||||||
use file_location_cache::FileLocationCache;
|
use file_location_cache::FileLocationCache;
|
||||||
use network::multiaddr::Protocol;
|
use network::multiaddr::Protocol;
|
||||||
use network::types::{AnnounceShardConfig, SignedAnnounceShardConfig};
|
use network::rpc::methods::FileAnnouncement;
|
||||||
|
use network::types::{AnnounceShardConfig, NewFile, SignedAnnounceShardConfig};
|
||||||
use network::{
|
use network::{
|
||||||
rpc::StatusMessage,
|
rpc::StatusMessage,
|
||||||
types::{
|
types::{
|
||||||
@ -29,6 +30,11 @@ use crate::peer_manager::PeerManager;
|
|||||||
use crate::Config;
|
use crate::Config;
|
||||||
|
|
||||||
lazy_static::lazy_static! {
|
lazy_static::lazy_static! {
|
||||||
|
/// Timeout to publish NewFile message to neighbor nodes.
|
||||||
|
pub static ref NEW_FILE_TIMEOUT: chrono::Duration = chrono::Duration::seconds(30);
|
||||||
|
/// Timeout to publish FindFile message to neighbor nodes.
|
||||||
|
pub static ref FIND_FILE_NEIGHBORS_TIMEOUT: chrono::Duration = chrono::Duration::seconds(30);
|
||||||
|
/// Timeout to publish FindFile message in the whole network.
|
||||||
pub static ref FIND_FILE_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5);
|
pub static ref FIND_FILE_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5);
|
||||||
pub static ref ANNOUNCE_FILE_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5);
|
pub static ref ANNOUNCE_FILE_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5);
|
||||||
pub static ref ANNOUNCE_SHARD_CONFIG_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5);
|
pub static ref ANNOUNCE_SHARD_CONFIG_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5);
|
||||||
@ -219,6 +225,25 @@ impl Libp2pEventHandler {
|
|||||||
});
|
});
|
||||||
metrics::LIBP2P_HANDLE_GET_CHUNKS_REQUEST.mark(1);
|
metrics::LIBP2P_HANDLE_GET_CHUNKS_REQUEST.mark(1);
|
||||||
}
|
}
|
||||||
|
Request::AnnounceFile(announcement) => {
|
||||||
|
match ShardConfig::new(announcement.shard_id, announcement.num_shard) {
|
||||||
|
Ok(v) => {
|
||||||
|
self.file_location_cache.insert_peer_config(peer_id, v);
|
||||||
|
|
||||||
|
self.send_to_sync(SyncMessage::AnnounceFile {
|
||||||
|
peer_id,
|
||||||
|
request_id,
|
||||||
|
announcement,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Err(_) => self.send_to_network(NetworkMessage::ReportPeer {
|
||||||
|
peer_id,
|
||||||
|
action: PeerAction::Fatal,
|
||||||
|
source: ReportSource::RPC,
|
||||||
|
msg: "Invalid shard config in AnnounceFile RPC message",
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
}
|
||||||
Request::DataByHash(_) => {
|
Request::DataByHash(_) => {
|
||||||
// ignore
|
// ignore
|
||||||
}
|
}
|
||||||
@ -316,9 +341,13 @@ impl Libp2pEventHandler {
|
|||||||
|
|
||||||
match message {
|
match message {
|
||||||
PubsubMessage::ExampleMessage(_) => MessageAcceptance::Ignore,
|
PubsubMessage::ExampleMessage(_) => MessageAcceptance::Ignore,
|
||||||
|
PubsubMessage::NewFile(msg) => {
|
||||||
|
metrics::LIBP2P_HANDLE_PUBSUB_NEW_FILE.mark(1);
|
||||||
|
self.on_new_file(propagation_source, msg).await
|
||||||
|
}
|
||||||
PubsubMessage::FindFile(msg) => {
|
PubsubMessage::FindFile(msg) => {
|
||||||
metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE.mark(1);
|
metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE.mark(1);
|
||||||
self.on_find_file(msg).await
|
self.on_find_file(propagation_source, msg).await
|
||||||
}
|
}
|
||||||
PubsubMessage::FindChunks(msg) => {
|
PubsubMessage::FindChunks(msg) => {
|
||||||
metrics::LIBP2P_HANDLE_PUBSUB_FIND_CHUNKS.mark(1);
|
metrics::LIBP2P_HANDLE_PUBSUB_FIND_CHUNKS.mark(1);
|
||||||
@ -348,6 +377,63 @@ impl Libp2pEventHandler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Handle NewFile pubsub message `msg` that published by `from` peer.
|
||||||
|
async fn on_new_file(&self, from: PeerId, msg: NewFile) -> MessageAcceptance {
|
||||||
|
// verify timestamp
|
||||||
|
let d = duration_since(
|
||||||
|
msg.timestamp,
|
||||||
|
metrics::LIBP2P_HANDLE_PUBSUB_NEW_FILE_LATENCY.clone(),
|
||||||
|
);
|
||||||
|
if d < TOLERABLE_DRIFT.neg() || d > *NEW_FILE_TIMEOUT {
|
||||||
|
debug!(?d, ?msg, "Invalid timestamp, ignoring NewFile message");
|
||||||
|
metrics::LIBP2P_HANDLE_PUBSUB_NEW_FILE_TIMEOUT.mark(1);
|
||||||
|
self.send_to_network(NetworkMessage::ReportPeer {
|
||||||
|
peer_id: from,
|
||||||
|
action: PeerAction::LowToleranceError,
|
||||||
|
source: ReportSource::Gossipsub,
|
||||||
|
msg: "Received out of date NewFile message",
|
||||||
|
});
|
||||||
|
return MessageAcceptance::Ignore;
|
||||||
|
}
|
||||||
|
|
||||||
|
// verify announced shard config
|
||||||
|
let announced_shard_config = match ShardConfig::new(msg.shard_id, msg.num_shard) {
|
||||||
|
Ok(v) => v,
|
||||||
|
Err(_) => return MessageAcceptance::Reject,
|
||||||
|
};
|
||||||
|
|
||||||
|
// ignore if shard config mismatch
|
||||||
|
let my_shard_config = self.store.get_store().get_shard_config();
|
||||||
|
if !my_shard_config.intersect(&announced_shard_config) {
|
||||||
|
return MessageAcceptance::Ignore;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ignore if already exists
|
||||||
|
match self.store.check_tx_completed(msg.tx_id.seq).await {
|
||||||
|
Ok(true) => return MessageAcceptance::Ignore,
|
||||||
|
Ok(false) => {}
|
||||||
|
Err(err) => {
|
||||||
|
warn!(?err, tx_seq = %msg.tx_id.seq, "Failed to check tx completed");
|
||||||
|
return MessageAcceptance::Ignore;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ignore if already pruned
|
||||||
|
match self.store.check_tx_pruned(msg.tx_id.seq).await {
|
||||||
|
Ok(true) => return MessageAcceptance::Ignore,
|
||||||
|
Ok(false) => {}
|
||||||
|
Err(err) => {
|
||||||
|
warn!(?err, tx_seq = %msg.tx_id.seq, "Failed to check tx pruned");
|
||||||
|
return MessageAcceptance::Ignore;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// notify sync layer to handle in advance
|
||||||
|
self.send_to_sync(SyncMessage::NewFile { from, msg });
|
||||||
|
|
||||||
|
MessageAcceptance::Ignore
|
||||||
|
}
|
||||||
|
|
||||||
async fn construct_announced_ip(&self) -> Option<Multiaddr> {
|
async fn construct_announced_ip(&self) -> Option<Multiaddr> {
|
||||||
// public address configured
|
// public address configured
|
||||||
if let Some(ip) = self.config.public_address {
|
if let Some(ip) = self.config.public_address {
|
||||||
@ -485,27 +571,69 @@ impl Libp2pEventHandler {
|
|||||||
Some(PubsubMessage::AnnounceShardConfig(signed))
|
Some(PubsubMessage::AnnounceShardConfig(signed))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn on_find_file(&self, msg: FindFile) -> MessageAcceptance {
|
async fn on_find_file(&self, from: PeerId, msg: FindFile) -> MessageAcceptance {
|
||||||
let FindFile { tx_id, timestamp } = msg;
|
let FindFile {
|
||||||
|
tx_id, timestamp, ..
|
||||||
|
} = msg;
|
||||||
|
|
||||||
// verify timestamp
|
// verify timestamp
|
||||||
let d = duration_since(
|
let d = duration_since(
|
||||||
timestamp,
|
timestamp,
|
||||||
metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_LATENCY.clone(),
|
metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_LATENCY.clone(),
|
||||||
);
|
);
|
||||||
if d < TOLERABLE_DRIFT.neg() || d > *FIND_FILE_TIMEOUT {
|
let timeout = if msg.neighbors_only {
|
||||||
|
*FIND_FILE_NEIGHBORS_TIMEOUT
|
||||||
|
} else {
|
||||||
|
*FIND_FILE_TIMEOUT
|
||||||
|
};
|
||||||
|
if d < TOLERABLE_DRIFT.neg() || d > timeout {
|
||||||
debug!(%timestamp, ?d, "Invalid timestamp, ignoring FindFile message");
|
debug!(%timestamp, ?d, "Invalid timestamp, ignoring FindFile message");
|
||||||
metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_TIMEOUT.mark(1);
|
metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_TIMEOUT.mark(1);
|
||||||
|
if msg.neighbors_only {
|
||||||
|
self.send_to_network(NetworkMessage::ReportPeer {
|
||||||
|
peer_id: from,
|
||||||
|
action: PeerAction::LowToleranceError,
|
||||||
|
source: ReportSource::Gossipsub,
|
||||||
|
msg: "Received out of date FindFile message",
|
||||||
|
});
|
||||||
|
}
|
||||||
return MessageAcceptance::Ignore;
|
return MessageAcceptance::Ignore;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// verify announced shard config
|
||||||
|
let announced_shard_config = match ShardConfig::new(msg.shard_id, msg.num_shard) {
|
||||||
|
Ok(v) => v,
|
||||||
|
Err(_) => return MessageAcceptance::Reject,
|
||||||
|
};
|
||||||
|
|
||||||
|
// handle on shard config mismatch
|
||||||
|
let my_shard_config = self.store.get_store().get_shard_config();
|
||||||
|
if !my_shard_config.intersect(&announced_shard_config) {
|
||||||
|
return if msg.neighbors_only {
|
||||||
|
MessageAcceptance::Ignore
|
||||||
|
} else {
|
||||||
|
MessageAcceptance::Accept
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
// check if we have it
|
// check if we have it
|
||||||
if matches!(self.store.check_tx_completed(tx_id.seq).await, Ok(true)) {
|
if matches!(self.store.check_tx_completed(tx_id.seq).await, Ok(true)) {
|
||||||
if let Ok(Some(tx)) = self.store.get_tx_by_seq_number(tx_id.seq).await {
|
if let Ok(Some(tx)) = self.store.get_tx_by_seq_number(tx_id.seq).await {
|
||||||
if tx.id() == tx_id {
|
if tx.id() == tx_id {
|
||||||
trace!(?tx_id, "Found file locally, responding to FindFile query");
|
trace!(?tx_id, "Found file locally, responding to FindFile query");
|
||||||
|
|
||||||
if self.publish_file(tx_id).await.is_some() {
|
if msg.neighbors_only {
|
||||||
|
// announce file via RPC to avoid flooding pubsub message
|
||||||
|
self.send_to_network(NetworkMessage::SendRequest {
|
||||||
|
peer_id: from,
|
||||||
|
request: Request::AnnounceFile(FileAnnouncement {
|
||||||
|
tx_id,
|
||||||
|
num_shard: my_shard_config.num_shard,
|
||||||
|
shard_id: my_shard_config.shard_id,
|
||||||
|
}),
|
||||||
|
request_id: RequestId::Router(Instant::now()),
|
||||||
|
});
|
||||||
|
} else if self.publish_file(tx_id).await.is_some() {
|
||||||
metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_STORE.mark(1);
|
metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_STORE.mark(1);
|
||||||
return MessageAcceptance::Ignore;
|
return MessageAcceptance::Ignore;
|
||||||
}
|
}
|
||||||
@ -513,6 +641,11 @@ impl Libp2pEventHandler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// do not forward to whole network if only find file from neighbor nodes
|
||||||
|
if msg.neighbors_only {
|
||||||
|
return MessageAcceptance::Ignore;
|
||||||
|
}
|
||||||
|
|
||||||
// try from cache
|
// try from cache
|
||||||
if let Some(mut msg) = self.file_location_cache.get_one(tx_id) {
|
if let Some(mut msg) = self.file_location_cache.get_one(tx_id) {
|
||||||
trace!(?tx_id, "Found file in cache, responding to FindFile query");
|
trace!(?tx_id, "Found file in cache, responding to FindFile query");
|
||||||
@ -834,7 +967,7 @@ impl Libp2pEventHandler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn publish_file(&self, tx_id: TxID) -> Option<bool> {
|
async fn publish_file(&self, tx_id: TxID) -> Option<bool> {
|
||||||
match self.file_batcher.write().await.add(tx_id) {
|
match self.file_batcher.write().await.add(tx_id) {
|
||||||
Some(batch) => {
|
Some(batch) => {
|
||||||
let announcement = self.construct_announce_file_message(batch).await?;
|
let announcement = self.construct_announce_file_message(batch).await?;
|
||||||
@ -1203,7 +1336,13 @@ mod tests {
|
|||||||
) -> MessageAcceptance {
|
) -> MessageAcceptance {
|
||||||
let (alice, bob) = (PeerId::random(), PeerId::random());
|
let (alice, bob) = (PeerId::random(), PeerId::random());
|
||||||
let id = MessageId::new(b"dummy message");
|
let id = MessageId::new(b"dummy message");
|
||||||
let message = PubsubMessage::FindFile(FindFile { tx_id, timestamp });
|
let message = PubsubMessage::FindFile(FindFile {
|
||||||
|
tx_id,
|
||||||
|
num_shard: 1,
|
||||||
|
shard_id: 0,
|
||||||
|
neighbors_only: false,
|
||||||
|
timestamp,
|
||||||
|
});
|
||||||
handler.on_pubsub_message(alice, bob, &id, message).await
|
handler.on_pubsub_message(alice, bob, &id, message).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -44,6 +44,11 @@ lazy_static::lazy_static! {
|
|||||||
pub static ref LIBP2P_HANDLE_RESPONSE_ERROR: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_response_error", "qps");
|
pub static ref LIBP2P_HANDLE_RESPONSE_ERROR: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_response_error", "qps");
|
||||||
pub static ref LIBP2P_HANDLE_RESPONSE_ERROR_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_response_error", "latency", 1024);
|
pub static ref LIBP2P_HANDLE_RESPONSE_ERROR_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_response_error", "latency", 1024);
|
||||||
|
|
||||||
|
// libp2p_event_handler: new file
|
||||||
|
pub static ref LIBP2P_HANDLE_PUBSUB_NEW_FILE: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_pubsub_new_file", "qps");
|
||||||
|
pub static ref LIBP2P_HANDLE_PUBSUB_NEW_FILE_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_pubsub_new_file", "latency", 1024);
|
||||||
|
pub static ref LIBP2P_HANDLE_PUBSUB_NEW_FILE_TIMEOUT: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_pubsub_new_file", "timeout");
|
||||||
|
|
||||||
// libp2p_event_handler: find & announce file
|
// libp2p_event_handler: find & announce file
|
||||||
pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_pubsub_find_file", "qps");
|
pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_pubsub_find_file", "qps");
|
||||||
pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_pubsub_find_file", "latency", 1024);
|
pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_pubsub_find_file", "latency", 1024);
|
||||||
|
@ -5,11 +5,14 @@ use chunk_pool::ChunkPoolMessage;
|
|||||||
use file_location_cache::FileLocationCache;
|
use file_location_cache::FileLocationCache;
|
||||||
use futures::{channel::mpsc::Sender, prelude::*};
|
use futures::{channel::mpsc::Sender, prelude::*};
|
||||||
use miner::MinerMessage;
|
use miner::MinerMessage;
|
||||||
|
use network::types::NewFile;
|
||||||
|
use network::PubsubMessage;
|
||||||
use network::{
|
use network::{
|
||||||
BehaviourEvent, Keypair, Libp2pEvent, NetworkGlobals, NetworkMessage, RequestId,
|
BehaviourEvent, Keypair, Libp2pEvent, NetworkGlobals, NetworkMessage, RequestId,
|
||||||
Service as LibP2PService, Swarm,
|
Service as LibP2PService, Swarm,
|
||||||
};
|
};
|
||||||
use pruner::PrunerMessage;
|
use pruner::PrunerMessage;
|
||||||
|
use shared_types::timestamp_now;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use storage::log_store::Store as LogStore;
|
use storage::log_store::Store as LogStore;
|
||||||
use storage_async::Store;
|
use storage_async::Store;
|
||||||
@ -44,6 +47,8 @@ pub struct RouterService {
|
|||||||
/// Stores potentially created UPnP mappings to be removed on shutdown. (TCP port and UDP
|
/// Stores potentially created UPnP mappings to be removed on shutdown. (TCP port and UDP
|
||||||
/// port).
|
/// port).
|
||||||
upnp_mappings: (Option<u16>, Option<u16>),
|
upnp_mappings: (Option<u16>, Option<u16>),
|
||||||
|
|
||||||
|
store: Arc<dyn LogStore>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RouterService {
|
impl RouterService {
|
||||||
@ -63,7 +68,6 @@ impl RouterService {
|
|||||||
local_keypair: Keypair,
|
local_keypair: Keypair,
|
||||||
config: Config,
|
config: Config,
|
||||||
) {
|
) {
|
||||||
let store = Store::new(store, executor.clone());
|
|
||||||
let peers = Arc::new(RwLock::new(PeerManager::new(config.clone())));
|
let peers = Arc::new(RwLock::new(PeerManager::new(config.clone())));
|
||||||
|
|
||||||
// create the network service and spawn the task
|
// create the network service and spawn the task
|
||||||
@ -81,11 +85,12 @@ impl RouterService {
|
|||||||
sync_send,
|
sync_send,
|
||||||
chunk_pool_send,
|
chunk_pool_send,
|
||||||
local_keypair,
|
local_keypair,
|
||||||
store,
|
Store::new(store.clone(), executor.clone()),
|
||||||
file_location_cache,
|
file_location_cache,
|
||||||
peers,
|
peers,
|
||||||
),
|
),
|
||||||
upnp_mappings: (None, None),
|
upnp_mappings: (None, None),
|
||||||
|
store,
|
||||||
};
|
};
|
||||||
|
|
||||||
// spawn service
|
// spawn service
|
||||||
@ -328,15 +333,16 @@ impl RouterService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
NetworkMessage::AnnounceLocalFile { tx_id } => {
|
NetworkMessage::AnnounceLocalFile { tx_id } => {
|
||||||
if self
|
let shard_config = self.store.get_shard_config();
|
||||||
.libp2p_event_handler
|
let msg = PubsubMessage::NewFile(NewFile {
|
||||||
.publish_file(tx_id)
|
tx_id,
|
||||||
.await
|
num_shard: shard_config.num_shard,
|
||||||
.is_some()
|
shard_id: shard_config.shard_id,
|
||||||
{
|
timestamp: timestamp_now(),
|
||||||
|
});
|
||||||
|
self.libp2p.swarm.behaviour_mut().publish(vec![msg]);
|
||||||
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_ANNOUNCE_LOCAL_FILE.mark(1);
|
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_ANNOUNCE_LOCAL_FILE.mark(1);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
NetworkMessage::UPnPMappingEstablished {
|
NetworkMessage::UPnPMappingEstablished {
|
||||||
tcp_socket,
|
tcp_socket,
|
||||||
udp_socket,
|
udp_socket,
|
||||||
|
@ -59,14 +59,13 @@ impl RandomBatcher {
|
|||||||
pub async fn start(mut self, catched_up: Arc<AtomicBool>) {
|
pub async fn start(mut self, catched_up: Arc<AtomicBool>) {
|
||||||
info!("Start to sync files");
|
info!("Start to sync files");
|
||||||
|
|
||||||
loop {
|
// wait for log entry sync catched up
|
||||||
// disable file sync until catched up
|
while !catched_up.load(Ordering::Relaxed) {
|
||||||
if !catched_up.load(Ordering::Relaxed) {
|
|
||||||
trace!("Cannot sync file in catch-up phase");
|
trace!("Cannot sync file in catch-up phase");
|
||||||
sleep(self.config.auto_sync_idle_interval).await;
|
sleep(self.config.auto_sync_idle_interval).await;
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
loop {
|
||||||
if let Ok(state) = self.get_state().await {
|
if let Ok(state) = self.get_state().await {
|
||||||
metrics::RANDOM_STATE_TXS_SYNCING.update(state.tasks.len() as u64);
|
metrics::RANDOM_STATE_TXS_SYNCING.update(state.tasks.len() as u64);
|
||||||
metrics::RANDOM_STATE_TXS_READY.update(state.ready_txs as u64);
|
metrics::RANDOM_STATE_TXS_READY.update(state.ready_txs as u64);
|
||||||
|
@ -9,18 +9,23 @@ use storage_async::Store;
|
|||||||
use task_executor::TaskExecutor;
|
use task_executor::TaskExecutor;
|
||||||
use tokio::sync::{
|
use tokio::sync::{
|
||||||
broadcast,
|
broadcast,
|
||||||
mpsc::{unbounded_channel, UnboundedSender},
|
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
|
||||||
oneshot,
|
oneshot,
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::{Config, SyncSender};
|
use crate::{Config, SyncSender};
|
||||||
|
|
||||||
use super::{batcher_random::RandomBatcher, batcher_serial::SerialBatcher, sync_store::SyncStore};
|
use super::{
|
||||||
|
batcher_random::RandomBatcher,
|
||||||
|
batcher_serial::SerialBatcher,
|
||||||
|
sync_store::{Queue, SyncStore},
|
||||||
|
};
|
||||||
|
|
||||||
pub struct AutoSyncManager {
|
pub struct AutoSyncManager {
|
||||||
pub serial: SerialBatcher,
|
pub serial: Option<SerialBatcher>,
|
||||||
pub random: RandomBatcher,
|
pub random: RandomBatcher,
|
||||||
pub file_announcement_send: UnboundedSender<u64>,
|
pub file_announcement_send: UnboundedSender<u64>,
|
||||||
|
pub new_file_send: UnboundedSender<u64>,
|
||||||
pub catched_up: Arc<AtomicBool>,
|
pub catched_up: Arc<AtomicBool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -33,42 +38,80 @@ impl AutoSyncManager {
|
|||||||
log_sync_recv: broadcast::Receiver<LogSyncEvent>,
|
log_sync_recv: broadcast::Receiver<LogSyncEvent>,
|
||||||
catch_up_end_recv: oneshot::Receiver<()>,
|
catch_up_end_recv: oneshot::Receiver<()>,
|
||||||
) -> Result<Self> {
|
) -> Result<Self> {
|
||||||
let (send, recv) = unbounded_channel();
|
let (file_announcement_send, file_announcement_recv) = unbounded_channel();
|
||||||
let sync_store = Arc::new(SyncStore::new(store.clone()));
|
let (new_file_send, new_file_recv) = unbounded_channel();
|
||||||
|
let sync_store = if config.neighbors_only {
|
||||||
|
// use v2 db to avoid reading v1 files that announced from the whole network instead of neighbors
|
||||||
|
Arc::new(SyncStore::new_with_name(
|
||||||
|
store.clone(),
|
||||||
|
"pendingv2",
|
||||||
|
"readyv2",
|
||||||
|
))
|
||||||
|
} else {
|
||||||
|
Arc::new(SyncStore::new(store.clone()))
|
||||||
|
};
|
||||||
let catched_up = Arc::new(AtomicBool::new(false));
|
let catched_up = Arc::new(AtomicBool::new(false));
|
||||||
|
|
||||||
|
// handle new file
|
||||||
|
executor.spawn(
|
||||||
|
Self::handle_new_file(new_file_recv, sync_store.clone()),
|
||||||
|
"auto_sync_handle_new_file",
|
||||||
|
);
|
||||||
|
|
||||||
// sync in sequence
|
// sync in sequence
|
||||||
|
let serial = if config.neighbors_only {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
let serial =
|
let serial =
|
||||||
SerialBatcher::new(config, store.clone(), sync_send.clone(), sync_store.clone())
|
SerialBatcher::new(config, store.clone(), sync_send.clone(), sync_store.clone())
|
||||||
.await?;
|
.await?;
|
||||||
executor.spawn(
|
executor.spawn(
|
||||||
serial
|
serial
|
||||||
.clone()
|
.clone()
|
||||||
.start(recv, log_sync_recv, catched_up.clone()),
|
.start(file_announcement_recv, log_sync_recv, catched_up.clone()),
|
||||||
"auto_sync_serial",
|
"auto_sync_serial",
|
||||||
);
|
);
|
||||||
|
|
||||||
|
Some(serial)
|
||||||
|
};
|
||||||
|
|
||||||
// sync randomly
|
// sync randomly
|
||||||
let random = RandomBatcher::new(config, store, sync_send, sync_store);
|
let random = RandomBatcher::new(config, store, sync_send, sync_store);
|
||||||
executor.spawn(random.clone().start(catched_up.clone()), "auto_sync_random");
|
executor.spawn(random.clone().start(catched_up.clone()), "auto_sync_random");
|
||||||
|
|
||||||
// handle on catched up notification
|
// handle on catched up notification
|
||||||
let catched_up_cloned = catched_up.clone();
|
|
||||||
executor.spawn(
|
executor.spawn(
|
||||||
async move {
|
Self::listen_catch_up(catch_up_end_recv, catched_up.clone()),
|
||||||
if catch_up_end_recv.await.is_ok() {
|
|
||||||
info!("log entry catched up");
|
|
||||||
catched_up_cloned.store(true, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"auto_sync_wait_for_catchup",
|
"auto_sync_wait_for_catchup",
|
||||||
);
|
);
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
serial,
|
serial,
|
||||||
random,
|
random,
|
||||||
file_announcement_send: send,
|
file_announcement_send,
|
||||||
|
new_file_send,
|
||||||
catched_up,
|
catched_up,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn handle_new_file(
|
||||||
|
mut new_file_recv: UnboundedReceiver<u64>,
|
||||||
|
sync_store: Arc<SyncStore>,
|
||||||
|
) {
|
||||||
|
while let Some(tx_seq) = new_file_recv.recv().await {
|
||||||
|
if let Err(err) = sync_store.insert(tx_seq, Queue::Ready).await {
|
||||||
|
warn!(?err, %tx_seq, "Failed to insert new file to ready queue");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn listen_catch_up(
|
||||||
|
catch_up_end_recv: oneshot::Receiver<()>,
|
||||||
|
catched_up: Arc<AtomicBool>,
|
||||||
|
) {
|
||||||
|
if catch_up_end_recv.await.is_ok() {
|
||||||
|
info!("log entry catched up");
|
||||||
|
catched_up.store(true, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -42,6 +42,14 @@ impl SyncStore {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn new_with_name(store: Store, pending: &'static str, ready: &'static str) -> Self {
|
||||||
|
Self {
|
||||||
|
store: Arc::new(RwLock::new(store)),
|
||||||
|
pending_txs: TxStore::new(pending),
|
||||||
|
ready_txs: TxStore::new(ready),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns the number of pending txs and ready txs.
|
/// Returns the number of pending txs and ready txs.
|
||||||
pub async fn stat(&self) -> Result<(usize, usize)> {
|
pub async fn stat(&self) -> Result<(usize, usize)> {
|
||||||
let async_store = self.store.read().await;
|
let async_store = self.store.read().await;
|
||||||
|
@ -14,12 +14,13 @@ use shared_types::{timestamp_now, ChunkArrayWithProof, TxID, CHUNK_SIZE};
|
|||||||
use ssz::Encode;
|
use ssz::Encode;
|
||||||
use std::{sync::Arc, time::Instant};
|
use std::{sync::Arc, time::Instant};
|
||||||
use storage::log_store::log_manager::{sector_to_segment, segment_to_sector, PORA_CHUNK_SIZE};
|
use storage::log_store::log_manager::{sector_to_segment, segment_to_sector, PORA_CHUNK_SIZE};
|
||||||
use storage_async::Store;
|
use storage_async::{ShardConfig, Store};
|
||||||
|
|
||||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||||
pub enum FailureReason {
|
pub enum FailureReason {
|
||||||
DBError(String),
|
DBError(String),
|
||||||
TxReverted(TxID),
|
TxReverted(TxID),
|
||||||
|
TimeoutFindFile,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||||
@ -159,11 +160,14 @@ impl SerialSyncController {
|
|||||||
|
|
||||||
/// Find more peers to sync chunks. Return whether `FindFile` pubsub message published,
|
/// Find more peers to sync chunks. Return whether `FindFile` pubsub message published,
|
||||||
fn try_find_peers(&mut self) {
|
fn try_find_peers(&mut self) {
|
||||||
let (published, num_new_peers) = if self.goal.is_all_chunks() {
|
let (published, num_new_peers) = if !self.goal.is_all_chunks() {
|
||||||
self.publish_find_file()
|
|
||||||
} else {
|
|
||||||
self.publish_find_chunks();
|
self.publish_find_chunks();
|
||||||
(true, 0)
|
(true, 0)
|
||||||
|
} else if self.config.neighbors_only {
|
||||||
|
self.do_publish_find_file();
|
||||||
|
(true, 0)
|
||||||
|
} else {
|
||||||
|
self.publish_find_file()
|
||||||
};
|
};
|
||||||
|
|
||||||
info!(%self.tx_seq, %published, %num_new_peers, "Finding peers");
|
info!(%self.tx_seq, %published, %num_new_peers, "Finding peers");
|
||||||
@ -199,14 +203,23 @@ impl SerialSyncController {
|
|||||||
return (false, num_new_peers);
|
return (false, num_new_peers);
|
||||||
}
|
}
|
||||||
|
|
||||||
self.ctx.publish(PubsubMessage::FindFile(FindFile {
|
self.do_publish_find_file();
|
||||||
tx_id: self.tx_id,
|
|
||||||
timestamp: timestamp_now(),
|
|
||||||
}));
|
|
||||||
|
|
||||||
(true, num_new_peers)
|
(true, num_new_peers)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn do_publish_find_file(&self) {
|
||||||
|
let shard_config = self.store.get_store().get_shard_config();
|
||||||
|
|
||||||
|
self.ctx.publish(PubsubMessage::FindFile(FindFile {
|
||||||
|
tx_id: self.tx_id,
|
||||||
|
num_shard: shard_config.num_shard,
|
||||||
|
shard_id: shard_config.shard_id,
|
||||||
|
neighbors_only: self.config.neighbors_only,
|
||||||
|
timestamp: timestamp_now(),
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
fn publish_find_chunks(&self) {
|
fn publish_find_chunks(&self) {
|
||||||
self.ctx.publish(PubsubMessage::FindChunks(FindChunks {
|
self.ctx.publish(PubsubMessage::FindChunks(FindChunks {
|
||||||
tx_id: self.tx_id,
|
tx_id: self.tx_id,
|
||||||
@ -337,6 +350,14 @@ impl SerialSyncController {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Triggered when any peer (TCP connected) announced file via RPC message.
|
||||||
|
pub fn on_peer_announced(&mut self, peer_id: PeerId, shard_config: ShardConfig) {
|
||||||
|
self.peers
|
||||||
|
.add_new_peer_with_config(peer_id, Multiaddr::empty(), shard_config);
|
||||||
|
self.peers
|
||||||
|
.update_state_force(&peer_id, PeerState::Connected);
|
||||||
|
}
|
||||||
|
|
||||||
pub fn on_dail_failed(&mut self, peer_id: PeerId, err: &DialError) {
|
pub fn on_dail_failed(&mut self, peer_id: PeerId, err: &DialError) {
|
||||||
match err {
|
match err {
|
||||||
DialError::ConnectionLimit(_) => {
|
DialError::ConnectionLimit(_) => {
|
||||||
@ -545,6 +566,9 @@ impl SerialSyncController {
|
|||||||
info!(%self.tx_seq, "Succeeded to finalize file");
|
info!(%self.tx_seq, "Succeeded to finalize file");
|
||||||
self.state = SyncState::Completed;
|
self.state = SyncState::Completed;
|
||||||
metrics::SERIAL_SYNC_FILE_COMPLETED.update_since(self.since.0);
|
metrics::SERIAL_SYNC_FILE_COMPLETED.update_since(self.since.0);
|
||||||
|
// notify neighbor nodes about new file completed to sync
|
||||||
|
self.ctx
|
||||||
|
.send(NetworkMessage::AnnounceLocalFile { tx_id: self.tx_id });
|
||||||
}
|
}
|
||||||
Ok(false) => {
|
Ok(false) => {
|
||||||
warn!(?self.tx_id, %self.tx_seq, "Transaction reverted during finalize_tx");
|
warn!(?self.tx_id, %self.tx_seq, "Transaction reverted during finalize_tx");
|
||||||
@ -636,14 +660,21 @@ impl SerialSyncController {
|
|||||||
.all_shards_available(vec![Found, Connecting, Connected])
|
.all_shards_available(vec![Found, Connecting, Connected])
|
||||||
{
|
{
|
||||||
self.state = SyncState::FoundPeers;
|
self.state = SyncState::FoundPeers;
|
||||||
|
} else {
|
||||||
|
// FindFile timeout
|
||||||
|
if since.elapsed() >= self.config.peer_find_timeout {
|
||||||
|
if self.config.neighbors_only {
|
||||||
|
self.state = SyncState::Failed {
|
||||||
|
reason: FailureReason::TimeoutFindFile,
|
||||||
|
};
|
||||||
} else {
|
} else {
|
||||||
// storage node may not have the specific file when `FindFile`
|
// storage node may not have the specific file when `FindFile`
|
||||||
// gossip message received. In this case, just broadcast the
|
// gossip message received. In this case, just broadcast the
|
||||||
// `FindFile` message again.
|
// `FindFile` message again.
|
||||||
if since.elapsed() >= self.config.peer_find_timeout {
|
|
||||||
debug!(%self.tx_seq, "Finding peer timeout and try to find peers again");
|
debug!(%self.tx_seq, "Finding peer timeout and try to find peers again");
|
||||||
self.try_find_peers();
|
self.try_find_peers();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
completed = true;
|
completed = true;
|
||||||
}
|
}
|
||||||
@ -1513,6 +1544,10 @@ mod tests {
|
|||||||
|
|
||||||
controller.on_response(peer_id, chunks).await;
|
controller.on_response(peer_id, chunks).await;
|
||||||
assert_eq!(*controller.get_status(), SyncState::Completed);
|
assert_eq!(*controller.get_status(), SyncState::Completed);
|
||||||
|
assert!(matches!(
|
||||||
|
network_recv.try_recv().unwrap(),
|
||||||
|
NetworkMessage::AnnounceLocalFile { .. }
|
||||||
|
));
|
||||||
assert!(network_recv.try_recv().is_err());
|
assert!(network_recv.try_recv().is_err());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -21,6 +21,10 @@ use std::{
|
|||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
// sync service config
|
// sync service config
|
||||||
|
/// Indicates whether to sync file from neighbor nodes only.
|
||||||
|
/// This is to avoid flooding file announcements in the whole network,
|
||||||
|
/// which leads to high latency or even timeout to sync files.
|
||||||
|
pub neighbors_only: bool,
|
||||||
#[serde(deserialize_with = "deserialize_duration")]
|
#[serde(deserialize_with = "deserialize_duration")]
|
||||||
pub heartbeat_interval: Duration,
|
pub heartbeat_interval: Duration,
|
||||||
pub auto_sync_enabled: bool,
|
pub auto_sync_enabled: bool,
|
||||||
@ -64,6 +68,7 @@ impl Default for Config {
|
|||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self {
|
Self {
|
||||||
// sync service config
|
// sync service config
|
||||||
|
neighbors_only: false,
|
||||||
heartbeat_interval: Duration::from_secs(5),
|
heartbeat_interval: Duration::from_secs(5),
|
||||||
auto_sync_enabled: false,
|
auto_sync_enabled: false,
|
||||||
max_sync_files: 8,
|
max_sync_files: 8,
|
||||||
|
@ -8,7 +8,8 @@ use anyhow::{anyhow, bail, Result};
|
|||||||
use file_location_cache::FileLocationCache;
|
use file_location_cache::FileLocationCache;
|
||||||
use libp2p::swarm::DialError;
|
use libp2p::swarm::DialError;
|
||||||
use log_entry_sync::LogSyncEvent;
|
use log_entry_sync::LogSyncEvent;
|
||||||
use network::types::{AnnounceChunks, FindFile};
|
use network::rpc::methods::FileAnnouncement;
|
||||||
|
use network::types::{AnnounceChunks, FindFile, NewFile};
|
||||||
use network::PubsubMessage;
|
use network::PubsubMessage;
|
||||||
use network::{
|
use network::{
|
||||||
rpc::GetChunksRequest, rpc::RPCResponseErrorCode, Multiaddr, NetworkMessage, PeerId,
|
rpc::GetChunksRequest, rpc::RPCResponseErrorCode, Multiaddr, NetworkMessage, PeerId,
|
||||||
@ -70,6 +71,15 @@ pub enum SyncMessage {
|
|||||||
AnnounceChunksGossip {
|
AnnounceChunksGossip {
|
||||||
msg: AnnounceChunks,
|
msg: AnnounceChunks,
|
||||||
},
|
},
|
||||||
|
NewFile {
|
||||||
|
from: PeerId,
|
||||||
|
msg: NewFile,
|
||||||
|
},
|
||||||
|
AnnounceFile {
|
||||||
|
peer_id: PeerId,
|
||||||
|
request_id: PeerRequestId,
|
||||||
|
announcement: FileAnnouncement,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@ -265,6 +275,12 @@ impl SyncService {
|
|||||||
SyncMessage::AnnounceShardConfig { .. } => {
|
SyncMessage::AnnounceShardConfig { .. } => {
|
||||||
// FIXME: Check if controllers need to be reset?
|
// FIXME: Check if controllers need to be reset?
|
||||||
}
|
}
|
||||||
|
SyncMessage::NewFile { from, msg } => self.on_new_file_gossip(from, msg).await,
|
||||||
|
SyncMessage::AnnounceFile {
|
||||||
|
peer_id,
|
||||||
|
announcement,
|
||||||
|
..
|
||||||
|
} => self.on_announce_file(peer_id, announcement).await,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -279,7 +295,10 @@ impl SyncService {
|
|||||||
Some(manager) => SyncServiceState {
|
Some(manager) => SyncServiceState {
|
||||||
num_syncing: self.controllers.len(),
|
num_syncing: self.controllers.len(),
|
||||||
catched_up: Some(manager.catched_up.load(Ordering::Relaxed)),
|
catched_up: Some(manager.catched_up.load(Ordering::Relaxed)),
|
||||||
auto_sync_serial: Some(manager.serial.get_state().await),
|
auto_sync_serial: match &manager.serial {
|
||||||
|
Some(v) => Some(v.get_state().await),
|
||||||
|
None => None,
|
||||||
|
},
|
||||||
auto_sync_random: manager.random.get_state().await.ok(),
|
auto_sync_random: manager.random.get_state().await.ok(),
|
||||||
},
|
},
|
||||||
None => SyncServiceState {
|
None => SyncServiceState {
|
||||||
@ -577,8 +596,12 @@ impl SyncService {
|
|||||||
Some(tx) => tx,
|
Some(tx) => tx,
|
||||||
None => bail!("Transaction not found"),
|
None => bail!("Transaction not found"),
|
||||||
};
|
};
|
||||||
|
let shard_config = self.store.get_store().get_shard_config();
|
||||||
self.ctx.publish(PubsubMessage::FindFile(FindFile {
|
self.ctx.publish(PubsubMessage::FindFile(FindFile {
|
||||||
tx_id: tx.id(),
|
tx_id: tx.id(),
|
||||||
|
num_shard: shard_config.num_shard,
|
||||||
|
shard_id: shard_config.shard_id,
|
||||||
|
neighbors_only: false,
|
||||||
timestamp: timestamp_now(),
|
timestamp: timestamp_now(),
|
||||||
}));
|
}));
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -642,7 +665,10 @@ impl SyncService {
|
|||||||
Some(s) => s,
|
Some(s) => s,
|
||||||
None => {
|
None => {
|
||||||
debug!(%tx.seq, "No more data needed");
|
debug!(%tx.seq, "No more data needed");
|
||||||
self.store.finalize_tx_with_hash(tx.seq, tx.hash()).await?;
|
if self.store.finalize_tx_with_hash(tx.seq, tx.hash()).await? {
|
||||||
|
self.ctx
|
||||||
|
.send(NetworkMessage::AnnounceLocalFile { tx_id: tx.id() });
|
||||||
|
}
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -748,6 +774,34 @@ impl SyncService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Handle on `NewFile` gossip message received.
|
||||||
|
async fn on_new_file_gossip(&mut self, from: PeerId, msg: NewFile) {
|
||||||
|
debug!(%from, ?msg, "Received NewFile gossip");
|
||||||
|
|
||||||
|
if let Some(controller) = self.controllers.get_mut(&msg.tx_id.seq) {
|
||||||
|
// Notify new peer announced if file already in sync
|
||||||
|
if let Ok(shard_config) = ShardConfig::new(msg.shard_id, msg.num_shard) {
|
||||||
|
controller.on_peer_announced(from, shard_config);
|
||||||
|
controller.transition();
|
||||||
|
}
|
||||||
|
} else if let Some(manager) = &self.auto_sync_manager {
|
||||||
|
let _ = manager.new_file_send.send(msg.tx_id.seq);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Handle on `AnnounceFile` RPC message received.
|
||||||
|
async fn on_announce_file(&mut self, from: PeerId, announcement: FileAnnouncement) {
|
||||||
|
// Notify new peer announced if file already in sync
|
||||||
|
if let Some(controller) = self.controllers.get_mut(&announcement.tx_id.seq) {
|
||||||
|
if let Ok(shard_config) =
|
||||||
|
ShardConfig::new(announcement.shard_id, announcement.num_shard)
|
||||||
|
{
|
||||||
|
controller.on_peer_announced(from, shard_config);
|
||||||
|
controller.transition();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Terminate file sync of `min_tx_seq`.
|
/// Terminate file sync of `min_tx_seq`.
|
||||||
/// If `is_reverted` is `true` (means confirmed transactions reverted),
|
/// If `is_reverted` is `true` (means confirmed transactions reverted),
|
||||||
/// also terminate `tx_seq` greater than `min_tx_seq`
|
/// also terminate `tx_seq` greater than `min_tx_seq`
|
||||||
@ -1504,6 +1558,10 @@ mod tests {
|
|||||||
.await;
|
.await;
|
||||||
|
|
||||||
wait_for_tx_finalized(runtime.store.clone(), tx_seq).await;
|
wait_for_tx_finalized(runtime.store.clone(), tx_seq).await;
|
||||||
|
assert!(matches!(
|
||||||
|
runtime.network_recv.try_recv().unwrap(),
|
||||||
|
NetworkMessage::AnnounceLocalFile { .. }
|
||||||
|
));
|
||||||
|
|
||||||
assert!(!runtime.store.check_tx_completed(0).unwrap());
|
assert!(!runtime.store.check_tx_completed(0).unwrap());
|
||||||
|
|
||||||
@ -1528,6 +1586,10 @@ mod tests {
|
|||||||
.await;
|
.await;
|
||||||
|
|
||||||
wait_for_tx_finalized(runtime.store, tx_seq).await;
|
wait_for_tx_finalized(runtime.store, tx_seq).await;
|
||||||
|
assert!(matches!(
|
||||||
|
runtime.network_recv.try_recv().unwrap(),
|
||||||
|
NetworkMessage::AnnounceLocalFile { .. }
|
||||||
|
));
|
||||||
|
|
||||||
sync_send
|
sync_send
|
||||||
.notify(SyncMessage::PeerDisconnected {
|
.notify(SyncMessage::PeerDisconnected {
|
||||||
|
@ -232,6 +232,10 @@ batcher_announcement_capacity = 100
|
|||||||
# all files, and sufficient disk space is required.
|
# all files, and sufficient disk space is required.
|
||||||
auto_sync_enabled = true
|
auto_sync_enabled = true
|
||||||
|
|
||||||
|
# Indicates whether to sync file from neighbor nodes only. This is to avoid flooding file
|
||||||
|
# announcements in the whole network, which leads to high latency or even timeout to sync files.
|
||||||
|
neighbors_only = true
|
||||||
|
|
||||||
# Maximum number of files in sync from other peers simultaneously.
|
# Maximum number of files in sync from other peers simultaneously.
|
||||||
# max_sync_files = 8
|
# max_sync_files = 8
|
||||||
|
|
||||||
|
@ -244,6 +244,10 @@ batcher_announcement_capacity = 100
|
|||||||
# all files, and sufficient disk space is required.
|
# all files, and sufficient disk space is required.
|
||||||
auto_sync_enabled = true
|
auto_sync_enabled = true
|
||||||
|
|
||||||
|
# Indicates whether to sync file from neighbor nodes only. This is to avoid flooding file
|
||||||
|
# announcements in the whole network, which leads to high latency or even timeout to sync files.
|
||||||
|
neighbors_only = true
|
||||||
|
|
||||||
# Maximum number of files in sync from other peers simultaneously.
|
# Maximum number of files in sync from other peers simultaneously.
|
||||||
# max_sync_files = 8
|
# max_sync_files = 8
|
||||||
|
|
||||||
|
@ -246,6 +246,10 @@
|
|||||||
# all files, and sufficient disk space is required.
|
# all files, and sufficient disk space is required.
|
||||||
# auto_sync_enabled = false
|
# auto_sync_enabled = false
|
||||||
|
|
||||||
|
# Indicates whether to sync file from neighbor nodes only. This is to avoid flooding file
|
||||||
|
# announcements in the whole network, which leads to high latency or even timeout to sync files.
|
||||||
|
neighbors_only = true
|
||||||
|
|
||||||
# Maximum number of files in sync from other peers simultaneously.
|
# Maximum number of files in sync from other peers simultaneously.
|
||||||
# max_sync_files = 8
|
# max_sync_files = 8
|
||||||
|
|
||||||
|
34
tests/sync_auto_random_v2_test.py
Normal file
34
tests/sync_auto_random_v2_test.py
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
from test_framework.test_framework import TestFramework
|
||||||
|
from utility.utils import wait_until
|
||||||
|
|
||||||
|
class AutoRandomSyncV2Test(TestFramework):
|
||||||
|
def setup_params(self):
|
||||||
|
self.num_nodes = 4
|
||||||
|
|
||||||
|
# Enable random auto sync v2
|
||||||
|
for i in range(self.num_nodes):
|
||||||
|
self.zgs_node_configs[i] = {
|
||||||
|
"sync": {
|
||||||
|
"auto_sync_enabled": True,
|
||||||
|
"max_sequential_workers": 0,
|
||||||
|
"max_random_workers": 3,
|
||||||
|
"neighbors_only": True,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def run_test(self):
|
||||||
|
# Submit and upload files on node 0
|
||||||
|
data_root_1 = self.__upload_file__(0, 256 * 1024)
|
||||||
|
data_root_2 = self.__upload_file__(0, 256 * 1024)
|
||||||
|
|
||||||
|
# Files should be available on other nodes via auto sync
|
||||||
|
for i in range(1, self.num_nodes):
|
||||||
|
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_1) is not None)
|
||||||
|
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_1)["finalized"])
|
||||||
|
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2) is not None)
|
||||||
|
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2)["finalized"])
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
AutoRandomSyncV2Test().main()
|
Loading…
Reference in New Issue
Block a user