From ae2b83d239d168bf8268112338b7564c60a5f76c Mon Sep 17 00:00:00 2001 From: Curtis Myzie Date: Wed, 13 Dec 2023 09:27:16 -0500 Subject: [PATCH] Prometheus metrics updates: * Add username label to deconflict metrics that would otherwise have duplicate labels across different pools. * Group metrics by name and only print HELP and TYPE once per metric name. * Sort labels for a deterministic output. --- src/prometheus.rs | 79 ++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 68 insertions(+), 11 deletions(-) diff --git a/src/prometheus.rs b/src/prometheus.rs index 7e264dc..8b85ce0 100644 --- a/src/prometheus.rs +++ b/src/prometheus.rs @@ -158,18 +158,17 @@ struct PrometheusMetric { impl fmt::Display for PrometheusMetric { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let formatted_labels = self - .labels + let mut sorted_labels: Vec<_> = self.labels.iter().collect(); + sorted_labels.sort_by_key(|&(key, _)| key); + let formatted_labels = sorted_labels .iter() .map(|(key, value)| format!("{}=\"{}\"", key, value)) .collect::>() .join(","); write!( f, - "# HELP {name} {help}\n# TYPE {name} {ty}\n{name}{{{formatted_labels}}} {value}\n", + "{name}{{{formatted_labels}}} {value}", name = format_args!("pgcat_{}", self.name), - help = self.help, - ty = self.ty, formatted_labels = formatted_labels, value = self.value ) @@ -204,6 +203,7 @@ impl PrometheusMetric { labels.insert("role", address.role.to_string()); labels.insert("pool", address.pool_name.clone()); labels.insert("database", address.database.to_string()); + labels.insert("username", address.username.clone()); Self::from_name(&format!("databases_{}", name), value, labels) } @@ -219,6 +219,7 @@ impl PrometheusMetric { labels.insert("role", address.role.to_string()); labels.insert("pool", address.pool_name.clone()); labels.insert("database", address.database.to_string()); + labels.insert("username", address.username.clone()); Self::from_name(&format!("servers_{}", name), value, labels) } @@ -230,6 +231,7 @@ impl PrometheusMetric { labels.insert("pool", address.pool_name.clone()); labels.insert("role", address.role.to_string()); labels.insert("database", address.database.to_string()); + labels.insert("username", address.username.clone()); Self::from_name(&format!("stats_{}", name), value, labels) } @@ -241,6 +243,15 @@ impl PrometheusMetric { Self::from_name(&format!("pools_{}", name), value, labels) } + + fn get_header(&self) -> String { + format!( + "\n# HELP {name} {help}\n# TYPE {name} {ty}", + name = format_args!("pgcat_{}", self.name), + help = self.help, + ty = self.ty, + ) + } } async fn prometheus_stats(request: Request) -> Result, hyper::http::Error> { @@ -264,6 +275,7 @@ async fn prometheus_stats(request: Request) -> Result, hype // Adds metrics shown in a SHOW STATS admin command. fn push_address_stats(lines: &mut Vec) { + let mut grouped_metrics: HashMap>> = HashMap::new(); for (_, pool) in get_all_pools() { for shard in 0..pool.shards() { for server in 0..pool.servers(shard) { @@ -273,7 +285,10 @@ fn push_address_stats(lines: &mut Vec) { if let Some(prometheus_metric) = PrometheusMetric::::from_address(address, &key, value) { - lines.push(prometheus_metric.to_string()); + grouped_metrics + .entry(key) + .or_insert_with(Vec::new) + .push(prometheus_metric); } else { debug!("Metric {} not implemented for {}", key, address.name()); } @@ -281,33 +296,53 @@ fn push_address_stats(lines: &mut Vec) { } } } + for (_key, metrics) in grouped_metrics { + if metrics.len() > 0 { + lines.push(metrics[0].get_header()); + for metric in metrics { + lines.push(metric.to_string()); + } + } + } } // Adds relevant metrics shown in a SHOW POOLS admin command. fn push_pool_stats(lines: &mut Vec) { + let mut grouped_metrics: HashMap>> = HashMap::new(); let pool_stats = PoolStats::construct_pool_lookup(); for (pool_id, stats) in pool_stats.iter() { for (name, value) in stats.clone() { if let Some(prometheus_metric) = PrometheusMetric::::from_pool(pool_id.clone(), &name, value) { - lines.push(prometheus_metric.to_string()); + grouped_metrics + .entry(name) + .or_insert_with(Vec::new) + .push(prometheus_metric); } else { debug!("Metric {} not implemented for ({})", name, *pool_id); } } } + for (_key, metrics) in grouped_metrics { + if metrics.len() > 0 { + lines.push(metrics[0].get_header()); + for metric in metrics { + lines.push(metric.to_string()); + } + } + } } // Adds relevant metrics shown in a SHOW DATABASES admin command. fn push_database_stats(lines: &mut Vec) { + let mut grouped_metrics: HashMap>> = HashMap::new(); for (_, pool) in get_all_pools() { let pool_config = pool.settings.clone(); for shard in 0..pool.shards() { for server in 0..pool.servers(shard) { let address = pool.address(shard, server); let pool_state = pool.pool_state(shard, server); - let metrics = vec![ ("pool_size", pool_config.user.pool_size), ("current_connections", pool_state.connections), @@ -316,7 +351,10 @@ fn push_database_stats(lines: &mut Vec) { if let Some(prometheus_metric) = PrometheusMetric::::from_database_info(address, key, value) { - lines.push(prometheus_metric.to_string()); + grouped_metrics + .entry(key.to_string()) + .or_insert_with(Vec::new) + .push(prometheus_metric); } else { debug!("Metric {} not implemented for {}", key, address.name()); } @@ -324,6 +362,14 @@ fn push_database_stats(lines: &mut Vec) { } } } + for (_key, metrics) in grouped_metrics { + if metrics.len() > 0 { + lines.push(metrics[0].get_header()); + for metric in metrics { + lines.push(metric.to_string()); + } + } + } } // Adds relevant metrics shown in a SHOW SERVERS admin command. @@ -333,7 +379,7 @@ fn push_server_stats(lines: &mut Vec) { for (_, stats) in server_stats { server_stats_by_addresses.insert(stats.address_name(), stats); } - + let mut grouped_metrics: HashMap>> = HashMap::new(); for (_, pool) in get_all_pools() { for shard in 0..pool.shards() { for server in 0..pool.servers(shard) { @@ -362,7 +408,10 @@ fn push_server_stats(lines: &mut Vec) { if let Some(prometheus_metric) = PrometheusMetric::::from_server_info(address, key, value) { - lines.push(prometheus_metric.to_string()); + grouped_metrics + .entry(key.to_string()) + .or_insert_with(Vec::new) + .push(prometheus_metric); } else { debug!("Metric {} not implemented for {}", key, address.name()); } @@ -371,6 +420,14 @@ fn push_server_stats(lines: &mut Vec) { } } } + for (_key, metrics) in grouped_metrics { + if metrics.len() > 0 { + lines.push(metrics[0].get_header()); + for metric in metrics { + lines.push(metric.to_string()); + } + } + } } pub async fn start_metric_server(http_addr: SocketAddr) {