Prometheus metrics updates:

* Add username label to deconflict metrics that would otherwise
   have duplicate labels across different pools.
 * Group metrics by name and only print HELP and TYPE once per
   metric name.
 * Sort labels for a deterministic output.
This commit is contained in:
Curtis Myzie
2023-12-13 09:27:16 -05:00
committed by KnockOutEZ
parent 966b8e093c
commit ae2b83d239

View File

@@ -158,18 +158,17 @@ struct PrometheusMetric<Value: fmt::Display> {
impl<Value: fmt::Display> fmt::Display for PrometheusMetric<Value> { impl<Value: fmt::Display> fmt::Display for PrometheusMetric<Value> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let formatted_labels = self let mut sorted_labels: Vec<_> = self.labels.iter().collect();
.labels sorted_labels.sort_by_key(|&(key, _)| key);
let formatted_labels = sorted_labels
.iter() .iter()
.map(|(key, value)| format!("{}=\"{}\"", key, value)) .map(|(key, value)| format!("{}=\"{}\"", key, value))
.collect::<Vec<_>>() .collect::<Vec<_>>()
.join(","); .join(",");
write!( write!(
f, f,
"# HELP {name} {help}\n# TYPE {name} {ty}\n{name}{{{formatted_labels}}} {value}\n", "{name}{{{formatted_labels}}} {value}",
name = format_args!("pgcat_{}", self.name), name = format_args!("pgcat_{}", self.name),
help = self.help,
ty = self.ty,
formatted_labels = formatted_labels, formatted_labels = formatted_labels,
value = self.value value = self.value
) )
@@ -204,6 +203,7 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
labels.insert("role", address.role.to_string()); labels.insert("role", address.role.to_string());
labels.insert("pool", address.pool_name.clone()); labels.insert("pool", address.pool_name.clone());
labels.insert("database", address.database.to_string()); labels.insert("database", address.database.to_string());
labels.insert("username", address.username.clone());
Self::from_name(&format!("databases_{}", name), value, labels) Self::from_name(&format!("databases_{}", name), value, labels)
} }
@@ -219,6 +219,7 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
labels.insert("role", address.role.to_string()); labels.insert("role", address.role.to_string());
labels.insert("pool", address.pool_name.clone()); labels.insert("pool", address.pool_name.clone());
labels.insert("database", address.database.to_string()); labels.insert("database", address.database.to_string());
labels.insert("username", address.username.clone());
Self::from_name(&format!("servers_{}", name), value, labels) Self::from_name(&format!("servers_{}", name), value, labels)
} }
@@ -230,6 +231,7 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
labels.insert("pool", address.pool_name.clone()); labels.insert("pool", address.pool_name.clone());
labels.insert("role", address.role.to_string()); labels.insert("role", address.role.to_string());
labels.insert("database", address.database.to_string()); labels.insert("database", address.database.to_string());
labels.insert("username", address.username.clone());
Self::from_name(&format!("stats_{}", name), value, labels) Self::from_name(&format!("stats_{}", name), value, labels)
} }
@@ -241,6 +243,15 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
Self::from_name(&format!("pools_{}", name), value, labels) Self::from_name(&format!("pools_{}", name), value, labels)
} }
fn get_header(&self) -> String {
format!(
"\n# HELP {name} {help}\n# TYPE {name} {ty}",
name = format_args!("pgcat_{}", self.name),
help = self.help,
ty = self.ty,
)
}
} }
async fn prometheus_stats(request: Request<Body>) -> Result<Response<Body>, hyper::http::Error> { async fn prometheus_stats(request: Request<Body>) -> Result<Response<Body>, hyper::http::Error> {
@@ -264,6 +275,7 @@ async fn prometheus_stats(request: Request<Body>) -> Result<Response<Body>, hype
// Adds metrics shown in a SHOW STATS admin command. // Adds metrics shown in a SHOW STATS admin command.
fn push_address_stats(lines: &mut Vec<String>) { fn push_address_stats(lines: &mut Vec<String>) {
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u64>>> = HashMap::new();
for (_, pool) in get_all_pools() { for (_, pool) in get_all_pools() {
for shard in 0..pool.shards() { for shard in 0..pool.shards() {
for server in 0..pool.servers(shard) { for server in 0..pool.servers(shard) {
@@ -273,7 +285,10 @@ fn push_address_stats(lines: &mut Vec<String>) {
if let Some(prometheus_metric) = if let Some(prometheus_metric) =
PrometheusMetric::<u64>::from_address(address, &key, value) PrometheusMetric::<u64>::from_address(address, &key, value)
{ {
lines.push(prometheus_metric.to_string()); grouped_metrics
.entry(key)
.or_insert_with(Vec::new)
.push(prometheus_metric);
} else { } else {
debug!("Metric {} not implemented for {}", key, address.name()); debug!("Metric {} not implemented for {}", key, address.name());
} }
@@ -281,33 +296,53 @@ fn push_address_stats(lines: &mut Vec<String>) {
} }
} }
} }
for (_key, metrics) in grouped_metrics {
if metrics.len() > 0 {
lines.push(metrics[0].get_header());
for metric in metrics {
lines.push(metric.to_string());
}
}
}
} }
// Adds relevant metrics shown in a SHOW POOLS admin command. // Adds relevant metrics shown in a SHOW POOLS admin command.
fn push_pool_stats(lines: &mut Vec<String>) { fn push_pool_stats(lines: &mut Vec<String>) {
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u64>>> = HashMap::new();
let pool_stats = PoolStats::construct_pool_lookup(); let pool_stats = PoolStats::construct_pool_lookup();
for (pool_id, stats) in pool_stats.iter() { for (pool_id, stats) in pool_stats.iter() {
for (name, value) in stats.clone() { for (name, value) in stats.clone() {
if let Some(prometheus_metric) = if let Some(prometheus_metric) =
PrometheusMetric::<u64>::from_pool(pool_id.clone(), &name, value) PrometheusMetric::<u64>::from_pool(pool_id.clone(), &name, value)
{ {
lines.push(prometheus_metric.to_string()); grouped_metrics
.entry(name)
.or_insert_with(Vec::new)
.push(prometheus_metric);
} else { } else {
debug!("Metric {} not implemented for ({})", name, *pool_id); debug!("Metric {} not implemented for ({})", name, *pool_id);
} }
} }
} }
for (_key, metrics) in grouped_metrics {
if metrics.len() > 0 {
lines.push(metrics[0].get_header());
for metric in metrics {
lines.push(metric.to_string());
}
}
}
} }
// Adds relevant metrics shown in a SHOW DATABASES admin command. // Adds relevant metrics shown in a SHOW DATABASES admin command.
fn push_database_stats(lines: &mut Vec<String>) { fn push_database_stats(lines: &mut Vec<String>) {
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u32>>> = HashMap::new();
for (_, pool) in get_all_pools() { for (_, pool) in get_all_pools() {
let pool_config = pool.settings.clone(); let pool_config = pool.settings.clone();
for shard in 0..pool.shards() { for shard in 0..pool.shards() {
for server in 0..pool.servers(shard) { for server in 0..pool.servers(shard) {
let address = pool.address(shard, server); let address = pool.address(shard, server);
let pool_state = pool.pool_state(shard, server); let pool_state = pool.pool_state(shard, server);
let metrics = vec![ let metrics = vec![
("pool_size", pool_config.user.pool_size), ("pool_size", pool_config.user.pool_size),
("current_connections", pool_state.connections), ("current_connections", pool_state.connections),
@@ -316,7 +351,10 @@ fn push_database_stats(lines: &mut Vec<String>) {
if let Some(prometheus_metric) = if let Some(prometheus_metric) =
PrometheusMetric::<u32>::from_database_info(address, key, value) PrometheusMetric::<u32>::from_database_info(address, key, value)
{ {
lines.push(prometheus_metric.to_string()); grouped_metrics
.entry(key.to_string())
.or_insert_with(Vec::new)
.push(prometheus_metric);
} else { } else {
debug!("Metric {} not implemented for {}", key, address.name()); debug!("Metric {} not implemented for {}", key, address.name());
} }
@@ -324,6 +362,14 @@ fn push_database_stats(lines: &mut Vec<String>) {
} }
} }
} }
for (_key, metrics) in grouped_metrics {
if metrics.len() > 0 {
lines.push(metrics[0].get_header());
for metric in metrics {
lines.push(metric.to_string());
}
}
}
} }
// Adds relevant metrics shown in a SHOW SERVERS admin command. // Adds relevant metrics shown in a SHOW SERVERS admin command.
@@ -333,7 +379,7 @@ fn push_server_stats(lines: &mut Vec<String>) {
for (_, stats) in server_stats { for (_, stats) in server_stats {
server_stats_by_addresses.insert(stats.address_name(), stats); server_stats_by_addresses.insert(stats.address_name(), stats);
} }
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u64>>> = HashMap::new();
for (_, pool) in get_all_pools() { for (_, pool) in get_all_pools() {
for shard in 0..pool.shards() { for shard in 0..pool.shards() {
for server in 0..pool.servers(shard) { for server in 0..pool.servers(shard) {
@@ -362,7 +408,10 @@ fn push_server_stats(lines: &mut Vec<String>) {
if let Some(prometheus_metric) = if let Some(prometheus_metric) =
PrometheusMetric::<u64>::from_server_info(address, key, value) PrometheusMetric::<u64>::from_server_info(address, key, value)
{ {
lines.push(prometheus_metric.to_string()); grouped_metrics
.entry(key.to_string())
.or_insert_with(Vec::new)
.push(prometheus_metric);
} else { } else {
debug!("Metric {} not implemented for {}", key, address.name()); debug!("Metric {} not implemented for {}", key, address.name());
} }
@@ -371,6 +420,14 @@ fn push_server_stats(lines: &mut Vec<String>) {
} }
} }
} }
for (_key, metrics) in grouped_metrics {
if metrics.len() > 0 {
lines.push(metrics[0].get_header());
for metric in metrics {
lines.push(metric.to_string());
}
}
}
} }
pub async fn start_metric_server(http_addr: SocketAddr) { pub async fn start_metric_server(http_addr: SocketAddr) {