Compare commits

..

No commits in common. "d3d0a3cd9f5c3464fc3fb81c6f7488875cd2a145" and "e7f2d46cd473270de40848e0d331a5ccbcb4771a" have entirely different histories.

3 changed files with 15 additions and 4 deletions

View File

@ -3,6 +3,7 @@ use crate::metrics::unbounded_channel;
use metrics::{Counter, CounterUsize}; use metrics::{Counter, CounterUsize};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::oneshot; use tokio::sync::oneshot;
use tokio::time::timeout; use tokio::time::timeout;
@ -30,7 +31,7 @@ impl<N, Req, Res> Channel<N, Req, Res> {
chan: sender, chan: sender,
metrics_timeout, metrics_timeout,
}, },
receiver, Receiver { chan: receiver },
) )
} }
} }
@ -73,7 +74,19 @@ impl<N, Req, Res> Sender<N, Req, Res> {
} }
} }
pub type Receiver<N, Req, Res> = crate::metrics::Receiver<Message<N, Req, Res>>; pub struct Receiver<N, Req, Res> {
chan: crate::metrics::Receiver<Message<N, Req, Res>>,
}
impl<N, Req, Res> Receiver<N, Req, Res> {
pub async fn recv(&mut self) -> Option<Message<N, Req, Res>> {
self.chan.recv().await
}
pub fn try_recv(&mut self) -> Result<Message<N, Req, Res>, TryRecvError> {
self.chan.try_recv()
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {

View File

@ -6,6 +6,5 @@ lazy_static::lazy_static! {
pub static ref SERIAL_SYNC_FILE_COMPLETED: Arc<dyn Timer> = register_timer("sync_controllers_serial_sync_file_completed"); pub static ref SERIAL_SYNC_FILE_COMPLETED: Arc<dyn Timer> = register_timer("sync_controllers_serial_sync_file_completed");
pub static ref SERIAL_SYNC_CHUNKS_COMPLETED: Arc<dyn Timer> = register_timer("sync_controllers_serial_sync_chunks_completed"); pub static ref SERIAL_SYNC_CHUNKS_COMPLETED: Arc<dyn Timer> = register_timer("sync_controllers_serial_sync_chunks_completed");
pub static ref SERIAL_SYNC_SEGMENT_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("sync_controllers_serial_sync_segment_latency", 1024); pub static ref SERIAL_SYNC_SEGMENT_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("sync_controllers_serial_sync_segment_latency", 1024);
pub static ref SERIAL_SYNC_SEGMENT_TIMEOUT: Arc<dyn Counter<usize>> = CounterUsize::register("sync_controllers_serial_sync_segment_timeout");
pub static ref SERIAL_SYNC_UNEXPECTED_ERRORS: Arc<dyn Counter<usize>> = CounterUsize::register("sync_controllers_serial_sync_unexpected_errors"); pub static ref SERIAL_SYNC_UNEXPECTED_ERRORS: Arc<dyn Counter<usize>> = CounterUsize::register("sync_controllers_serial_sync_unexpected_errors");
} }

View File

@ -684,7 +684,6 @@ impl SerialSyncController {
debug!(%self.tx_seq, "No peer to continue downloading and try to find other peers to download"); debug!(%self.tx_seq, "No peer to continue downloading and try to find other peers to download");
self.state = SyncState::Idle; self.state = SyncState::Idle;
} else if since.elapsed() >= self.config.peer_chunks_download_timeout { } else if since.elapsed() >= self.config.peer_chunks_download_timeout {
metrics::SERIAL_SYNC_SEGMENT_TIMEOUT.inc(1);
self.handle_response_failure(peer_id, "RPC timeout"); self.handle_response_failure(peer_id, "RPC timeout");
} else { } else {
completed = true; completed = true;