Add metrics in file location cache
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run

This commit is contained in:
boqiu 2024-08-29 11:55:00 +08:00
parent 2fd9712d59
commit 75531a5878
3 changed files with 20 additions and 0 deletions

2
Cargo.lock generated
View File

@ -2373,6 +2373,8 @@ name = "file_location_cache"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"hashlink 0.8.4", "hashlink 0.8.4",
"lazy_static",
"metrics",
"network", "network",
"parking_lot 0.12.3", "parking_lot 0.12.3",
"priority-queue", "priority-queue",

View File

@ -13,3 +13,5 @@ tracing = "0.1.35"
priority-queue = "1.2.3" priority-queue = "1.2.3"
shared_types = { path = "../shared_types" } shared_types = { path = "../shared_types" }
serde = { version = "1.0.137", features = ["derive"] } serde = { version = "1.0.137", features = ["derive"] }
lazy_static = "1.4.0"
metrics = { workspace = true }

View File

@ -1,4 +1,5 @@
use crate::Config; use crate::Config;
use metrics::{register_meter_with_group, Histogram, Meter, Sample};
use network::types::SignedAnnounceFile; use network::types::SignedAnnounceFile;
use network::PeerId; use network::PeerId;
use parking_lot::Mutex; use parking_lot::Mutex;
@ -7,8 +8,15 @@ use rand::seq::IteratorRandom;
use shared_types::{timestamp_now, TxID}; use shared_types::{timestamp_now, TxID};
use std::cmp::Reverse; use std::cmp::Reverse;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc;
use storage::config::ShardConfig; use storage::config::ShardConfig;
lazy_static::lazy_static! {
pub static ref INSERT_QPS: Arc<dyn Meter> = register_meter_with_group("file_location_cache_insert", "qps");
pub static ref INSERT_BATCH: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register_with_group("file_location_cache_insert", "batch", 1024);
pub static ref TOTAL_CACHED: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("file_location_cache_size", 1024);
}
/// Caches limited announcements of specified file from different peers. /// Caches limited announcements of specified file from different peers.
struct AnnouncementCache { struct AnnouncementCache {
/// Maximum number of announcements in cache. /// Maximum number of announcements in cache.
@ -201,6 +209,7 @@ impl FileCache {
let item = self.files.get_mut(&tx_id)?; let item = self.files.get_mut(&tx_id)?;
let (result, collected) = item.random(); let (result, collected) = item.random();
self.update_on_announcement_cache_changed(&tx_id, collected); self.update_on_announcement_cache_changed(&tx_id, collected);
TOTAL_CACHED.update(self.total_announcements as u64);
result result
} }
@ -232,6 +241,7 @@ impl FileCache {
let item = self.files.get_mut(&tx_id)?; let item = self.files.get_mut(&tx_id)?;
let (result, collected) = item.all(); let (result, collected) = item.all();
self.update_on_announcement_cache_changed(&tx_id, collected); self.update_on_announcement_cache_changed(&tx_id, collected);
TOTAL_CACHED.update(self.total_announcements as u64);
Some(result) Some(result)
} }
@ -240,6 +250,7 @@ impl FileCache {
let item = self.files.get_mut(tx_id)?; let item = self.files.get_mut(tx_id)?;
let result = item.remove(peer_id)?; let result = item.remove(peer_id)?;
self.update_on_announcement_cache_changed(tx_id, 1); self.update_on_announcement_cache_changed(tx_id, 1);
TOTAL_CACHED.update(self.total_announcements as u64);
Some(result) Some(result)
} }
} }
@ -282,6 +293,9 @@ impl FileLocationCache {
} }
pub fn insert(&self, announcement: SignedAnnounceFile) { pub fn insert(&self, announcement: SignedAnnounceFile) {
INSERT_QPS.mark(1);
INSERT_BATCH.update(announcement.tx_ids.len() as u64);
let peer_id = *announcement.peer_id; let peer_id = *announcement.peer_id;
// FIXME: Check validity. // FIXME: Check validity.
let shard_config = ShardConfig { let shard_config = ShardConfig {
@ -294,6 +308,8 @@ impl FileLocationCache {
for tx_id in announcement.tx_ids.iter() { for tx_id in announcement.tx_ids.iter() {
cache.insert(*tx_id, announcement.clone()); cache.insert(*tx_id, announcement.clone());
} }
TOTAL_CACHED.update(cache.total_announcements as u64);
} }
pub fn get_one(&self, tx_id: TxID) -> Option<SignedAnnounceFile> { pub fn get_one(&self, tx_id: TxID) -> Option<SignedAnnounceFile> {