Supports to sync partial chunks (#4)

* refactor p2p signed message

* add new pubsub messages in network layer to find chunks

* handle find chunks pubsub message in router

* Supports to sync partial chunks

* add admin rpc to sync chunks

* limit number of chunks to sync at a time

* refactor code to sync file and chunks

* add more switches to trigger file sync

* fix ut failure

* refactor code
This commit is contained in:
Bo QIU 2024-01-19 14:04:59 +08:00 committed by GitHub
parent 971d344acd
commit 9b4b0436c3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 518 additions and 134 deletions

View File

@ -1,6 +1,6 @@
use network::{
libp2p::identity,
types::{AnnounceFile, SignedAnnounceFile},
types::{AnnounceFile, SignedAnnounceFile, SignedMessage},
Multiaddr, PeerId,
};
use shared_types::{timestamp_now, TxID};
@ -42,6 +42,6 @@ impl AnnounceFileBuilder {
};
let keypair = identity::Keypair::generate_secp256k1();
msg.into_signed(&keypair).unwrap()
SignedMessage::sign_message(msg, &keypair).unwrap()
}
}

View File

@ -22,8 +22,12 @@ pub struct GossipCache {
example: Option<Duration>,
/// Timeout for FindFile messages.
find_file: Option<Duration>,
/// Timeout for FindChunks messages.
find_chunks: Option<Duration>,
/// Timeout for AnnounceFile.
announce_file: Option<Duration>,
/// Timeout for AnnounceChunks.
announce_chunks: Option<Duration>,
}
#[derive(Default)]
@ -33,8 +37,12 @@ pub struct GossipCacheBuilder {
example: Option<Duration>,
/// Timeout for blocks FindFile messages.
find_file: Option<Duration>,
/// Timeout for blocks FindChunks messages.
find_chunks: Option<Duration>,
/// Timeout for AnnounceFile messages.
announce_file: Option<Duration>,
/// Timeout for AnnounceChunks messages.
announce_chunks: Option<Duration>,
}
#[allow(dead_code)]
@ -58,18 +66,32 @@ impl GossipCacheBuilder {
self
}
/// Timeout for FindChunks messages.
pub fn find_chunks_timeout(mut self, timeout: Duration) -> Self {
self.find_chunks = Some(timeout);
self
}
/// Timeout for AnnounceFile messages.
pub fn announce_file_timeout(mut self, timeout: Duration) -> Self {
self.announce_file = Some(timeout);
self
}
/// Timeout for AnnounceChunks messages.
pub fn announce_chunks_timeout(mut self, timeout: Duration) -> Self {
self.announce_chunks = Some(timeout);
self
}
pub fn build(self) -> GossipCache {
let GossipCacheBuilder {
default_timeout,
example,
find_file,
find_chunks,
announce_file,
announce_chunks,
} = self;
GossipCache {
@ -77,7 +99,9 @@ impl GossipCacheBuilder {
topic_msgs: HashMap::default(),
example: example.or(default_timeout),
find_file: find_file.or(default_timeout),
find_chunks: find_chunks.or(default_timeout),
announce_file: announce_file.or(default_timeout),
announce_chunks: announce_chunks.or(default_timeout),
}
}
}
@ -94,7 +118,9 @@ impl GossipCache {
let expire_timeout = match topic.kind() {
GossipKind::Example => self.example,
GossipKind::FindFile => self.find_file,
GossipKind::FindChunks => self.find_chunks,
GossipKind::AnnounceFile => self.announce_file,
GossipKind::AnnounceChunks => self.announce_chunks,
};
let expire_timeout = match expire_timeout {

View File

@ -6,5 +6,8 @@ mod topics;
pub type Enr = discv5::enr::Enr<discv5::enr::CombinedKey>;
pub use globals::NetworkGlobals;
pub use pubsub::{AnnounceFile, FindFile, PubsubMessage, SignedAnnounceFile, SnappyTransform};
pub use pubsub::{
AnnounceChunks, AnnounceFile, FindChunks, FindFile, HasSignature, PubsubMessage,
SignedAnnounceChunks, SignedAnnounceFile, SignedMessage, SnappyTransform,
};
pub use topics::{GossipEncoding, GossipKind, GossipTopic, CORE_TOPICS};

View File

@ -120,6 +120,14 @@ pub struct FindFile {
pub timestamp: u32,
}
#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)]
pub struct FindChunks {
pub tx_id: TxID,
pub index_start: u64, // inclusive
pub index_end: u64, // exclusive
pub timestamp: u32,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode)]
pub struct AnnounceFile {
pub tx_id: TxID,
@ -128,46 +136,65 @@ pub struct AnnounceFile {
pub timestamp: u32,
}
impl AnnounceFile {
pub fn into_signed(self, keypair: &Keypair) -> Result<SignedAnnounceFile, SigningError> {
let raw = self.as_ssz_bytes();
#[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode)]
pub struct AnnounceChunks {
pub tx_id: TxID,
pub index_start: u64, // inclusive
pub index_end: u64, // exclusive
pub peer_id: WrappedPeerId,
pub at: WrappedMultiaddr,
pub timestamp: u32,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode)]
pub struct SignedMessage<T: Encode + Decode> {
pub inner: T,
pub signature: Vec<u8>,
pub resend_timestamp: u32,
}
impl<T: Encode + Decode> SignedMessage<T> {
pub fn sign_message(msg: T, keypair: &Keypair) -> Result<SignedMessage<T>, SigningError> {
let raw = msg.as_ssz_bytes();
let signature = keypair.sign(&raw)?;
Ok(SignedAnnounceFile {
inner: self,
Ok(SignedMessage {
inner: msg,
signature,
resend_timestamp: 0,
})
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode)]
pub struct SignedAnnounceFile {
pub inner: AnnounceFile,
pub signature: Vec<u8>,
pub resend_timestamp: u32,
}
impl SignedAnnounceFile {
pub fn verify_signature(&self, public_key: &PublicKey) -> bool {
let raw = self.inner.as_ssz_bytes();
public_key.verify(&raw, &self.signature)
}
}
impl Deref for SignedAnnounceFile {
type Target = AnnounceFile;
impl<T: Encode + Decode> Deref for SignedMessage<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
pub trait HasSignature {
fn verify_signature(&self, public_key: &PublicKey) -> bool;
}
impl<T: Encode + Decode> HasSignature for SignedMessage<T> {
fn verify_signature(&self, public_key: &PublicKey) -> bool {
let raw = self.inner.as_ssz_bytes();
public_key.verify(&raw, &self.signature)
}
}
pub type SignedAnnounceFile = SignedMessage<AnnounceFile>;
pub type SignedAnnounceChunks = SignedMessage<AnnounceChunks>;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PubsubMessage {
ExampleMessage(u64),
FindFile(FindFile),
FindChunks(FindChunks),
AnnounceFile(SignedAnnounceFile),
AnnounceChunks(SignedAnnounceChunks),
}
// Implements the `DataTransform` trait of gossipsub to employ snappy compression
@ -242,7 +269,9 @@ impl PubsubMessage {
match self {
PubsubMessage::ExampleMessage(_) => GossipKind::Example,
PubsubMessage::FindFile(_) => GossipKind::FindFile,
PubsubMessage::FindChunks(_) => GossipKind::FindChunks,
PubsubMessage::AnnounceFile(_) => GossipKind::AnnounceFile,
PubsubMessage::AnnounceChunks(_) => GossipKind::AnnounceChunks,
}
}
@ -267,9 +296,16 @@ impl PubsubMessage {
GossipKind::FindFile => Ok(PubsubMessage::FindFile(
FindFile::from_ssz_bytes(data).map_err(|e| format!("{:?}", e))?,
)),
GossipKind::FindChunks => Ok(PubsubMessage::FindChunks(
FindChunks::from_ssz_bytes(data).map_err(|e| format!("{:?}", e))?,
)),
GossipKind::AnnounceFile => Ok(PubsubMessage::AnnounceFile(
SignedAnnounceFile::from_ssz_bytes(data).map_err(|e| format!("{:?}", e))?,
)),
GossipKind::AnnounceChunks => Ok(PubsubMessage::AnnounceChunks(
SignedAnnounceChunks::from_ssz_bytes(data)
.map_err(|e| format!("{:?}", e))?,
)),
}
}
}
@ -285,7 +321,9 @@ impl PubsubMessage {
match &self {
PubsubMessage::ExampleMessage(data) => data.as_ssz_bytes(),
PubsubMessage::FindFile(data) => data.as_ssz_bytes(),
PubsubMessage::FindChunks(data) => data.as_ssz_bytes(),
PubsubMessage::AnnounceFile(data) => data.as_ssz_bytes(),
PubsubMessage::AnnounceChunks(data) => data.as_ssz_bytes(),
}
}
}
@ -299,9 +337,15 @@ impl std::fmt::Display for PubsubMessage {
PubsubMessage::FindFile(msg) => {
write!(f, "FindFile message: {:?}", msg)
}
PubsubMessage::FindChunks(msg) => {
write!(f, "FindChunks message: {:?}", msg)
}
PubsubMessage::AnnounceFile(msg) => {
write!(f, "AnnounceFile message: {:?}", msg)
}
PubsubMessage::AnnounceChunks(msg) => {
write!(f, "AnnounceChunks message: {:?}", msg)
}
}
}
}

View File

@ -9,9 +9,16 @@ pub const TOPIC_PREFIX: &str = "eth2";
pub const SSZ_SNAPPY_ENCODING_POSTFIX: &str = "ssz_snappy";
pub const EXAMPLE_TOPIC: &str = "example";
pub const FIND_FILE_TOPIC: &str = "find_file";
pub const FIND_CHUNKS_TOPIC: &str = "find_chunks";
pub const ANNOUNCE_FILE_TOPIC: &str = "announce_file";
pub const ANNOUNCE_CHUNKS_TOPIC: &str = "announce_chunks";
pub const CORE_TOPICS: [GossipKind; 2] = [GossipKind::FindFile, GossipKind::AnnounceFile];
pub const CORE_TOPICS: [GossipKind; 4] = [
GossipKind::FindFile,
GossipKind::FindChunks,
GossipKind::AnnounceFile,
GossipKind::AnnounceChunks,
];
/// A gossipsub topic which encapsulates the type of messages that should be sent and received over
/// the pubsub protocol and the way the messages should be encoded.
@ -30,7 +37,9 @@ pub struct GossipTopic {
pub enum GossipKind {
Example,
FindFile,
FindChunks,
AnnounceFile,
AnnounceChunks,
}
/// The known encoding types for gossipsub messages.
@ -67,7 +76,9 @@ impl GossipTopic {
let kind = match topic_parts[2] {
EXAMPLE_TOPIC => GossipKind::Example,
FIND_FILE_TOPIC => GossipKind::FindFile,
FIND_CHUNKS_TOPIC => GossipKind::FindChunks,
ANNOUNCE_FILE_TOPIC => GossipKind::AnnounceFile,
ANNOUNCE_CHUNKS_TOPIC => GossipKind::AnnounceChunks,
_ => return Err(format!("Unknown topic: {}", topic)),
};
@ -93,7 +104,9 @@ impl From<GossipTopic> for String {
let kind = match topic.kind {
GossipKind::Example => EXAMPLE_TOPIC,
GossipKind::FindFile => FIND_FILE_TOPIC,
GossipKind::FindChunks => FIND_CHUNKS_TOPIC,
GossipKind::AnnounceFile => ANNOUNCE_FILE_TOPIC,
GossipKind::AnnounceChunks => ANNOUNCE_CHUNKS_TOPIC,
};
format!("/{}/{}/{}", TOPIC_PREFIX, kind, encoding)
@ -109,7 +122,9 @@ impl std::fmt::Display for GossipTopic {
let kind = match self.kind {
GossipKind::Example => EXAMPLE_TOPIC,
GossipKind::FindFile => FIND_FILE_TOPIC,
GossipKind::FindChunks => FIND_CHUNKS_TOPIC,
GossipKind::AnnounceFile => ANNOUNCE_FILE_TOPIC,
GossipKind::AnnounceChunks => ANNOUNCE_CHUNKS_TOPIC,
};
write!(f, "/{}/{}/{}", TOPIC_PREFIX, kind, encoding)

View File

@ -3,11 +3,14 @@ use std::{ops::Neg, sync::Arc};
use file_location_cache::FileLocationCache;
use network::{
rpc::StatusMessage,
types::{AnnounceFile, FindFile, SignedAnnounceFile},
types::{
AnnounceChunks, AnnounceFile, FindChunks, FindFile, HasSignature, SignedAnnounceChunks,
SignedAnnounceFile, SignedMessage,
},
Keypair, MessageAcceptance, MessageId, NetworkGlobals, NetworkMessage, PeerId, PeerRequestId,
PublicKey, PubsubMessage, Request, RequestId, Response,
};
use shared_types::{timestamp_now, TxID};
use shared_types::{bytes_to_chunks, timestamp_now, TxID};
use storage_async::Store;
use sync::{SyncMessage, SyncSender};
use tokio::sync::{mpsc, RwLock};
@ -40,6 +43,21 @@ fn peer_id_to_public_key(peer_id: &PeerId) -> Result<PublicKey, String> {
})
}
fn verify_signature(msg: &dyn HasSignature, peer_id: &PeerId, propagation_source: PeerId) -> bool {
match peer_id_to_public_key(peer_id) {
Ok(pub_key) => msg.verify_signature(&pub_key),
Err(err) => {
error!(
?err,
?peer_id,
?propagation_source,
"Failed to verify signature"
);
false
}
}
}
pub struct Libp2pEventHandler {
/// A collection of global variables, accessible outside of the network service.
network_globals: Arc<NetworkGlobals>,
@ -213,7 +231,9 @@ impl Libp2pEventHandler {
match message {
PubsubMessage::ExampleMessage(_) => MessageAcceptance::Ignore,
PubsubMessage::FindFile(msg) => self.on_find_file(msg).await,
PubsubMessage::FindChunks(msg) => self.on_find_chunks(msg).await,
PubsubMessage::AnnounceFile(msg) => self.on_announce_file(propagation_source, msg),
PubsubMessage::AnnounceChunks(msg) => self.on_announce_chunks(propagation_source, msg),
}
}
@ -237,7 +257,7 @@ impl Libp2pEventHandler {
timestamp,
};
let mut signed = match msg.into_signed(&self.local_keypair) {
let mut signed = match SignedMessage::sign_message(msg, &self.local_keypair) {
Ok(signed) => signed,
Err(e) => {
error!(%tx_id.seq, %e, "Failed to sign AnnounceFile message");
@ -292,28 +312,106 @@ impl Libp2pEventHandler {
MessageAcceptance::Accept
}
pub fn construct_announce_chunks_message(
&self,
tx_id: TxID,
index_start: u64,
index_end: u64,
) -> Option<PubsubMessage> {
let peer_id = *self.network_globals.peer_id.read();
let addr = self
.network_globals
.listen_multiaddrs
.read()
.first()?
.clone();
let timestamp = timestamp_now();
let msg = AnnounceChunks {
tx_id,
index_start,
index_end,
peer_id: peer_id.into(),
at: addr.into(),
timestamp,
};
let mut signed = match SignedMessage::sign_message(msg, &self.local_keypair) {
Ok(signed) => signed,
Err(e) => {
error!(%tx_id.seq, %e, "Failed to sign AnnounceChunks message");
return None;
}
};
signed.resend_timestamp = timestamp;
Some(PubsubMessage::AnnounceChunks(signed))
}
async fn on_find_chunks(&self, msg: FindChunks) -> MessageAcceptance {
// validate message
if msg.index_start >= msg.index_end {
debug!(?msg, "Invalid chunk index range");
return MessageAcceptance::Reject;
}
// verify timestamp
let d = duration_since(msg.timestamp);
if d < TOLERABLE_DRIFT.neg() || d > *FIND_FILE_TIMEOUT {
debug!(%msg.timestamp, "Invalid timestamp, ignoring FindFile message");
return MessageAcceptance::Ignore;
}
// check if we have specified chunks even file not finalized yet
// validate end index
let tx = match self.store.get_tx_by_seq_number(msg.tx_id.seq).await {
Ok(Some(tx)) if tx.id() == msg.tx_id => tx,
_ => return MessageAcceptance::Accept,
};
// validate index range
if let Ok(size) = usize::try_from(tx.size) {
let num_chunks = bytes_to_chunks(size);
if msg.index_end > num_chunks as u64 {
debug!(?msg, "Invalid chunk end index for FindChunks message");
return MessageAcceptance::Reject;
}
}
// TODO(qhz): check if there is better way to check existence of requested chunks.
let _ = match self
.store
.get_chunks_by_tx_and_index_range(
msg.tx_id.seq,
msg.index_start as usize,
msg.index_end as usize,
)
.await
{
Ok(Some(_)) => (),
_ => return MessageAcceptance::Accept,
};
debug!(?msg, "Found chunks to respond FindChunks message");
match self.construct_announce_chunks_message(msg.tx_id, msg.index_start, msg.index_end) {
Some(msg) => {
self.publish(msg);
MessageAcceptance::Ignore
}
// propagate FindFile query to other nodes
None => MessageAcceptance::Accept,
}
}
fn on_announce_file(
&self,
propagation_source: PeerId,
msg: SignedAnnounceFile,
) -> MessageAcceptance {
// verify message signature
let pk = match peer_id_to_public_key(&msg.peer_id) {
Ok(pk) => pk,
Err(e) => {
error!(
"Failed to convert peer id {:?} to public key: {:?}",
msg.peer_id, e
);
return MessageAcceptance::Reject;
}
};
if !msg.verify_signature(&pk) {
warn!(
"Received message with invalid signature from peer {:?}",
propagation_source
);
if !verify_signature(&msg, &msg.peer_id, propagation_source) {
return MessageAcceptance::Reject;
}
@ -336,6 +434,29 @@ impl Libp2pEventHandler {
MessageAcceptance::Accept
}
fn on_announce_chunks(
&self,
propagation_source: PeerId,
msg: SignedAnnounceChunks,
) -> MessageAcceptance {
// verify message signature
if !verify_signature(&msg, &msg.peer_id, propagation_source) {
return MessageAcceptance::Reject;
}
// propagate gossip to peers
let d = duration_since(msg.resend_timestamp);
if d < TOLERABLE_DRIFT.neg() || d > *ANNOUNCE_FILE_TIMEOUT {
debug!(%msg.resend_timestamp, "Invalid resend timestamp, ignoring AnnounceChunks message");
return MessageAcceptance::Ignore;
}
// notify sync layer
self.send_to_sync(SyncMessage::AnnounceChunksGossip { msg: msg.inner });
MessageAcceptance::Accept
}
}
#[cfg(test)]

View File

@ -12,6 +12,14 @@ pub trait Rpc {
#[method(name = "startSyncFile")]
async fn start_sync_file(&self, tx_seq: u64) -> RpcResult<()>;
#[method(name = "startSyncChunks")]
async fn start_sync_chunks(
&self,
tx_seq: u64,
start_index: u64,
end_index: u64, // exclusive
) -> RpcResult<()>;
#[method(name = "getSyncStatus")]
async fn get_sync_status(&self, tx_seq: u64) -> RpcResult<String>;

View File

@ -47,6 +47,36 @@ impl RpcServer for RpcServerImpl {
}
}
#[tracing::instrument(skip(self), err)]
async fn start_sync_chunks(
&self,
tx_seq: u64,
start_index: u64,
end_index: u64,
) -> RpcResult<()> {
info!("admin_startSyncChunks({tx_seq}, {start_index}, {end_index})");
let response = self
.ctx
.request_sync(SyncRequest::SyncChunks {
tx_seq,
start_index,
end_index,
})
.await?;
match response {
SyncResponse::SyncFile { err } => {
if err.is_empty() {
Ok(())
} else {
Err(error::internal_error(err))
}
}
_ => Err(error::internal_error("unexpected response type")),
}
}
#[tracing::instrument(skip(self), err)]
async fn get_sync_status(&self, tx_seq: u64) -> RpcResult<String> {
info!("admin_getSyncStatus({tx_seq})");

View File

@ -6,7 +6,6 @@ use log_entry_sync::{CacheConfig, ContractAddress, LogSyncConfig};
use miner::MinerConfig;
use network::NetworkConfig;
use rpc::RPCConfig;
use std::time::Duration;
use storage::StorageConfig;
impl ZgsConfig {
@ -148,11 +147,4 @@ impl ZgsConfig {
router_config.libp2p_nodes = network_config.libp2p_nodes.to_vec();
Ok(router_config)
}
pub fn sync_config(&self) -> sync::Config {
let mut config = self.sync.clone();
config.find_peer_timeout = Duration::from_secs(self.find_peer_timeout_secs);
config.enable_chunk_request = self.enable_chunk_request;
config
}
}

View File

@ -57,10 +57,6 @@ build_config! {
(mine_contract_address, (String), "".to_string())
(miner_id, (Option<String>), None)
(miner_key, (Option<String>), None)
// sync
(find_peer_timeout_secs, (u64), 30)
(enable_chunk_request, (bool), true)
}
#[derive(Debug, Default, Deserialize)]
@ -68,6 +64,7 @@ build_config! {
pub struct ZgsConfig {
pub raw_conf: RawConfiguration,
// sync config, configured by [sync] section by `config` crate.
pub sync: sync::Config,
}

View File

@ -17,7 +17,6 @@ async fn start_node(context: RuntimeContext, config: ZgsConfig) -> Result<Client
let log_sync_config = config.log_sync_config()?;
let miner_config = config.mine_config()?;
let router_config = config.router_config(&network_config)?;
let sync_config = config.sync_config();
ClientBuilder::default()
.with_runtime_context(context)
@ -27,7 +26,7 @@ async fn start_node(context: RuntimeContext, config: ZgsConfig) -> Result<Client
.with_file_location_cache()
.with_network(&network_config)
.await?
.with_sync(sync_config)
.with_sync(config.sync)
.await?
.with_miner(miner_config)
.await?

View File

@ -3,14 +3,47 @@ mod serial;
use serde::{Deserialize, Serialize};
pub use serial::{FailureReason, SerialSyncController, SyncState};
pub use serial::{FailureReason, SerialSyncController, SyncState, MAX_CHUNKS_TO_REQUEST};
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct FileSyncGoal {
/// File chunks in total.
pub num_chunks: u64,
/// Chunk index to sync from (starts from 0, inclusive).
pub index_start: u64,
/// Chunk index to sync to (exclusive).
pub index_end: u64,
}
impl FileSyncGoal {
pub fn new(num_chunks: u64, index_start: u64, index_end: u64) -> Self {
assert!(
index_start < index_end && index_end <= num_chunks,
"invalid index_end"
);
Self {
num_chunks,
index_start,
index_end,
}
}
pub fn new_file(num_chunks: u64) -> Self {
Self::new(num_chunks, 0, num_chunks)
}
pub fn is_all_chunks(&self) -> bool {
self.index_start == 0 && self.index_end == self.num_chunks
}
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct FileSyncInfo {
pub elapsed_secs: u64,
pub peers: usize,
pub num_chunks: u64,
pub goal: FileSyncGoal,
pub next_chunks: u64,
pub state: String,
}

View File

@ -1,8 +1,9 @@
use crate::context::SyncNetworkContext;
use crate::controllers::peers::{PeerState, SyncPeers};
use crate::controllers::FileSyncInfo;
use crate::controllers::{FileSyncGoal, FileSyncInfo};
use file_location_cache::FileLocationCache;
use libp2p::swarm::DialError;
use network::types::FindChunks;
use network::{
multiaddr::Protocol, rpc::GetChunksRequest, types::FindFile, Multiaddr, NetworkMessage,
PeerAction, PeerId, PubsubMessage, SyncId as RequestId,
@ -14,7 +15,7 @@ use std::{
};
use storage_async::Store;
const MAX_CHUNKS_TO_REQUEST: u64 = 2 * 1024;
pub const MAX_CHUNKS_TO_REQUEST: u64 = 2 * 1024;
const MAX_REQUEST_FAILURES: usize = 100;
const PEER_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
const DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(5);
@ -63,8 +64,8 @@ pub struct SerialSyncController {
since: Instant,
/// The size of the file to be synced.
num_chunks: u64,
/// File sync goal.
goal: FileSyncGoal,
/// The next chunk id that we need to retrieve.
next_chunk: u64,
@ -91,7 +92,7 @@ pub struct SerialSyncController {
impl SerialSyncController {
pub fn new(
tx_id: TxID,
num_chunks: u64,
goal: FileSyncGoal,
ctx: Arc<SyncNetworkContext>,
store: Store,
file_location_cache: Arc<FileLocationCache>,
@ -100,8 +101,8 @@ impl SerialSyncController {
tx_seq: tx_id.seq,
tx_id,
since: Instant::now(),
num_chunks,
next_chunk: 0,
goal,
next_chunk: goal.index_start,
failures: 0,
state: SyncState::Idle,
peers: Default::default(),
@ -115,7 +116,7 @@ impl SerialSyncController {
FileSyncInfo {
elapsed_secs: self.since.elapsed().as_secs(),
peers: self.peers.count(&[PeerState::Connected]),
num_chunks: self.num_chunks,
goal: self.goal,
next_chunks: self.next_chunk,
state: format!("{:?}", self.state),
}
@ -127,7 +128,7 @@ impl SerialSyncController {
/// Resets the status to re-sync file when failed.
pub fn reset(&mut self) {
self.next_chunk = 0;
self.next_chunk = self.goal.index_start;
self.failures = 0;
self.state = SyncState::Idle;
// remove disconnected peers
@ -137,6 +138,23 @@ impl SerialSyncController {
fn try_find_peers(&mut self) {
info!(%self.tx_seq, "Finding peers");
if self.goal.is_all_chunks() {
self.publish_find_file();
} else {
self.publish_find_chunks();
}
let now = Instant::now();
let (since, updated) = match self.state {
SyncState::FindingPeers { since, .. } => (since, now),
_ => (now, now),
};
self.state = SyncState::FindingPeers { since, updated };
}
fn publish_find_file(&mut self) {
// try from cache
let mut found_new_peer = false;
@ -149,21 +167,23 @@ impl SerialSyncController {
found_new_peer = self.on_peer_found(peer_id, addr) || found_new_peer;
}
if !found_new_peer {
if found_new_peer {
return;
}
self.ctx.publish(PubsubMessage::FindFile(FindFile {
tx_id: self.tx_id,
timestamp: timestamp_now(),
}));
}
let now = Instant::now();
let (since, updated) = match self.state {
SyncState::FindingPeers { since, .. } => (since, now),
_ => (now, now),
};
self.state = SyncState::FindingPeers { since, updated };
fn publish_find_chunks(&self) {
self.ctx.publish(PubsubMessage::FindChunks(FindChunks {
tx_id: self.tx_id,
index_start: self.goal.index_start,
index_end: self.goal.index_end,
timestamp: timestamp_now(),
}));
}
fn try_connect(&mut self) {
@ -201,7 +221,7 @@ impl SerialSyncController {
// request next chunk array
let from_chunk = self.next_chunk;
let to_chunk = std::cmp::min(from_chunk + MAX_CHUNKS_TO_REQUEST, self.num_chunks);
let to_chunk = std::cmp::min(from_chunk + MAX_CHUNKS_TO_REQUEST, self.goal.index_end);
let request_id = network::RequestId::Sync(RequestId::SerialSync { tx_id: self.tx_id });
@ -412,11 +432,17 @@ impl SerialSyncController {
}
// prepare to download next
if self.next_chunk < self.num_chunks {
if self.next_chunk < self.goal.index_end {
self.state = SyncState::Idle;
return;
}
// completed to download chunks
if !self.goal.is_all_chunks() {
self.state = SyncState::Completed;
return;
}
// finalize tx if all chunks downloaded
match self
.store
@ -1317,7 +1343,8 @@ mod tests {
since: Instant::now(),
};
controller.num_chunks = 2048;
controller.goal.num_chunks = 2048;
controller.goal.index_end = 2048;
controller.on_response(peer_id, chunks).await;
match controller.get_status() {
@ -1499,7 +1526,7 @@ mod tests {
let controller = SerialSyncController::new(
tx_id,
num_chunks as u64,
FileSyncGoal::new_file(num_chunks as u64),
ctx,
Store::new(store, task_executor),
file_location_cache,

View File

@ -13,30 +13,25 @@ use serde::Deserialize;
pub use service::{SyncMessage, SyncReceiver, SyncRequest, SyncResponse, SyncSender, SyncService};
use std::time::Duration;
#[derive(Clone, Debug, Deserialize)]
#[derive(Clone, Copy, Debug, Deserialize)]
#[serde(default)]
pub struct Config {
pub auto_sync_disabled: bool,
pub auto_sync_enabled: bool,
pub max_sync_files: usize,
#[serde(deserialize_with = "deserialize_duration")]
pub find_peer_timeout: Duration,
pub enable_chunk_request: bool,
pub sync_file_by_rpc_enabled: bool,
pub sync_file_on_announcement_enabled: bool,
}
impl Default for Config {
fn default() -> Self {
Self {
auto_sync_disabled: false,
auto_sync_enabled: false,
max_sync_files: 100,
find_peer_timeout: Duration::from_secs(30),
enable_chunk_request: false,
sync_file_by_rpc_enabled: true,
sync_file_on_announcement_enabled: false,
}
}
}
impl Config {
pub fn disable_auto_sync(mut self) -> Self {
self.auto_sync_disabled = true;
self
}
}

View File

@ -1,11 +1,15 @@
use crate::auto_sync::AutoSyncManager;
use crate::context::SyncNetworkContext;
use crate::controllers::{FailureReason, FileSyncInfo, SerialSyncController, SyncState};
use crate::controllers::{
FailureReason, FileSyncGoal, FileSyncInfo, SerialSyncController, SyncState,
MAX_CHUNKS_TO_REQUEST,
};
use crate::Config;
use anyhow::{bail, Result};
use file_location_cache::FileLocationCache;
use libp2p::swarm::DialError;
use log_entry_sync::LogSyncEvent;
use network::types::AnnounceChunks;
use network::{
rpc::GetChunksRequest, rpc::RPCResponseErrorCode, Multiaddr, NetworkMessage, PeerId,
PeerRequestId, SyncId as RequestId,
@ -56,14 +60,31 @@ pub enum SyncMessage {
peer_id: PeerId,
addr: Multiaddr,
},
AnnounceChunksGossip {
msg: AnnounceChunks,
},
}
#[derive(Debug)]
pub enum SyncRequest {
SyncStatus { tx_seq: u64 },
SyncFile { tx_seq: u64 },
FileSyncInfo { tx_seq: Option<u64> },
TerminateFileSync { tx_seq: u64, is_reverted: bool },
SyncStatus {
tx_seq: u64,
},
SyncFile {
tx_seq: u64,
},
SyncChunks {
tx_seq: u64,
start_index: u64,
end_index: u64,
},
FileSyncInfo {
tx_seq: Option<u64>,
},
TerminateFileSync {
tx_seq: u64,
is_reverted: bool,
},
}
#[derive(Debug)]
@ -134,7 +155,7 @@ impl SyncService {
let manager =
AutoSyncManager::new(store.clone(), sync_send.clone(), config.clone()).await?;
if !config.auto_sync_disabled {
if config.auto_sync_enabled {
manager.spwn(&executor, event_recv);
}
@ -218,6 +239,8 @@ impl SyncService {
} => {
self.on_announce_file_gossip(tx_id, peer_id, addr).await;
}
SyncMessage::AnnounceChunksGossip { msg } => self.on_announce_chunks_gossip(msg).await,
}
}
@ -237,24 +260,19 @@ impl SyncService {
}
SyncRequest::SyncFile { tx_seq } => {
if !self.controllers.contains_key(&tx_seq)
&& self.controllers.len() >= self.config.max_sync_files
{
let _ = sender.send(SyncResponse::SyncFile {
err: format!(
"max sync file limitation reached: {:?}",
self.config.max_sync_files
),
});
return;
let result = self.on_sync_file_request(tx_seq, None).await;
let _ = sender.send(SyncResponse::SyncFile { err: result });
}
let err = match self.on_start_sync_file(tx_seq, None).await {
Ok(()) => "".into(),
Err(err) => err.to_string(),
};
let _ = sender.send(SyncResponse::SyncFile { err });
SyncRequest::SyncChunks {
tx_seq,
start_index,
end_index,
} => {
let result = self
.on_sync_file_request(tx_seq, Some((start_index, end_index)))
.await;
let _ = sender.send(SyncResponse::SyncFile { err: result });
}
SyncRequest::FileSyncInfo { tx_seq } => {
@ -348,6 +366,12 @@ impl SyncService {
return Ok(());
}
// ban peer if requested too many chunks
if request.index_end - request.index_start > MAX_CHUNKS_TO_REQUEST {
self.ctx.ban_peer(peer_id, "Too many chunks requested");
return Ok(());
}
// ban peer if invalid tx requested
let tx = match self.store.get_tx_by_seq_number(request.tx_id.seq).await? {
Some(tx) => tx,
@ -463,15 +487,37 @@ impl SyncService {
}
}
async fn on_sync_file_request(
&mut self,
tx_seq: u64,
maybe_range: Option<(u64, u64)>,
) -> String {
if maybe_range.is_none() && !self.config.sync_file_by_rpc_enabled {
return "Disabled to sync file".into();
}
if !self.controllers.contains_key(&tx_seq)
&& self.controllers.len() >= self.config.max_sync_files
{
return format!(
"Max sync file limitation reached: {}",
self.config.max_sync_files
);
}
match self.on_start_sync_file(tx_seq, maybe_range, None).await {
Ok(()) => "".into(),
Err(e) => e.to_string(),
}
}
async fn on_start_sync_file(
&mut self,
tx_seq: u64,
maybe_range: Option<(u64, u64)>,
maybe_peer: Option<(PeerId, Multiaddr)>,
) -> Result<()> {
info!(%tx_seq, "Start to sync file");
if !self.config.enable_chunk_request {
return Ok(());
}
// remove failed entry if caused by tx reverted, so as to re-sync
// file with latest tx_id.
@ -498,7 +544,7 @@ impl SyncService {
};
let num_chunks = match usize::try_from(tx.size) {
Ok(size) => bytes_to_chunks(size),
Ok(size) => bytes_to_chunks(size) as u64,
Err(_) => {
error!(%tx_seq, "Unexpected transaction size: {}", tx.size);
bail!("Unexpected transaction size");
@ -510,9 +556,18 @@ impl SyncService {
bail!("File already exists");
}
let (index_start, index_end) = match maybe_range {
Some((start, end)) => (start, end),
None => (0, num_chunks),
};
if index_start >= index_end || index_end > num_chunks {
bail!("invalid chunk range");
}
entry.insert(SerialSyncController::new(
tx.id(),
num_chunks as u64,
FileSyncGoal::new(num_chunks, index_start, index_end),
self.ctx.clone(),
self.store.clone(),
self.file_location_cache.clone(),
@ -542,8 +597,16 @@ impl SyncService {
// File already in sync
if let Some(controller) = self.controllers.get_mut(&tx_seq) {
let info = controller.get_sync_info();
if info.goal.is_all_chunks() {
controller.on_peer_found(peer_id, addr);
controller.transition();
}
return;
}
if !self.config.sync_file_on_announcement_enabled {
return;
}
@ -558,12 +621,30 @@ impl SyncService {
}
// Now, always sync files among all nodes
if let Err(err) = self.on_start_sync_file(tx_seq, Some((peer_id, addr))).await {
if let Err(err) = self
.on_start_sync_file(tx_seq, None, Some((peer_id, addr)))
.await
{
// FIXME(zz): This is possible for tx missing. Is it expected?
error!(%tx_seq, %err, "Failed to sync file");
}
}
async fn on_announce_chunks_gossip(&mut self, msg: AnnounceChunks) {
info!(?msg, "Received AnnounceChunks gossip");
if let Some(controller) = self.controllers.get_mut(&msg.tx_id.seq) {
let info = controller.get_sync_info();
if !info.goal.is_all_chunks()
&& info.goal.index_start == msg.index_start
&& info.goal.index_end == msg.index_end
{
controller.on_peer_found(msg.peer_id.into(), msg.at.into());
controller.transition();
}
}
}
/// Terminate file sync of `min_tx_seq`.
/// If `is_reverted` is `true` (means confirmed transactions reverted),
/// also terminate `tx_seq` greater than `min_tx_seq`
@ -679,6 +760,15 @@ mod tests {
}
async fn spawn_sync_service(&self, with_peer_store: bool) -> SyncSender {
self.spawn_sync_service_with_config(with_peer_store, Config::default())
.await
}
async fn spawn_sync_service_with_config(
&self,
with_peer_store: bool,
config: Config,
) -> SyncSender {
let store = if with_peer_store {
self.peer_store.clone()
} else {
@ -686,7 +776,7 @@ mod tests {
};
SyncService::spawn_with_config(
Config::default().disable_auto_sync(),
config,
self.runtime.task_executor.clone(),
self.network_send.clone(),
store,
@ -719,7 +809,7 @@ mod tests {
.unwrap();
let mut sync = SyncService {
config: Config::default().disable_auto_sync(),
config: Config::default(),
msg_recv: sync_recv,
ctx: Arc::new(SyncNetworkContext::new(network_send)),
store,
@ -754,7 +844,7 @@ mod tests {
.unwrap();
let mut sync = SyncService {
config: Config::default().disable_auto_sync(),
config: Config::default(),
msg_recv: sync_recv,
ctx: Arc::new(SyncNetworkContext::new(network_send)),
store,
@ -1070,7 +1160,7 @@ mod tests {
let (network_send, mut network_recv) = mpsc::unbounded_channel::<NetworkMessage>();
let (_event_send, event_recv) = broadcast::channel(16);
let sync_send = SyncService::spawn_with_config(
Config::default().disable_auto_sync(),
Config::default(),
runtime.task_executor.clone(),
network_send,
store.clone(),
@ -1341,7 +1431,9 @@ mod tests {
#[tokio::test]
async fn test_announce_file() {
let mut runtime = TestSyncRuntime::new(vec![1535], 0);
let sync_send = runtime.spawn_sync_service(false).await;
let mut config = Config::default();
config.sync_file_on_announcement_enabled = true;
let sync_send = runtime.spawn_sync_service_with_config(false, config).await;
let tx_seq = 0u64;
let address: Multiaddr = "/ip4/127.0.0.1/tcp/10000".parse().unwrap();

View File

@ -27,6 +27,8 @@ log_sync_start_block_number = 134253180
log_page_size = 999
# [sync]
# auto_sync_disabled = false
# auto_sync_enabled = false
# max_sync_files = 8
# find_peer_timeout = "30s"
# sync_file_by_rpc_enabled = true
# sync_file_on_announcement_enabled = false