From 1b166b462dc242af9b943569cb0ea3a2266c3030 Mon Sep 17 00:00:00 2001 From: Nicholas Dujay <3258756+dat2@users.noreply.github.com> Date: Tue, 9 Aug 2022 15:19:11 -0400 Subject: [PATCH] create a prometheus exporter on a standard http port (#107) * create a hyper server and add option to enable it in config * move prometheus stuff to its own file; update format * create metric type and help lookup table * finish the metric help type map * switch to a boolean and a standard port * dont emit unimplemented metrics * fail if curl returns a non 200 * resolve conflicts * move log out of config.show and into main * terminating new line * upgrade curl * include unimplemented stats --- .circleci/config.yml | 2 +- .circleci/pgcat.toml | 3 + .circleci/run_tests.sh | 3 + Cargo.lock | 247 +++++++++++++++++++++++++++++++++++++ Cargo.toml | 2 + docker-compose.yml | 1 + examples/docker/pgcat.toml | 3 + pgcat.toml | 5 +- src/config.rs | 2 + src/main.rs | 27 +++- src/prometheus.rs | 212 +++++++++++++++++++++++++++++++ 11 files changed, 500 insertions(+), 7 deletions(-) create mode 100644 src/prometheus.rs diff --git a/.circleci/config.yml b/.circleci/config.yml index aed5c18..337dcdc 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -34,7 +34,7 @@ jobs: command: "cargo fmt --check" - run: name: "Install dependencies" - command: "sudo apt-get update && sudo apt-get install -y psmisc postgresql-contrib-12 postgresql-client-12 ruby ruby-dev libpq-dev python3 python3-pip lcov llvm-11" + command: "sudo apt-get update && sudo apt-get install -y psmisc postgresql-contrib-12 postgresql-client-12 ruby ruby-dev libpq-dev python3 python3-pip lcov llvm-11 && sudo apt-get upgrade curl" - run: name: "Install rust tools" command: "cargo install cargo-binutils rustfilt && rustup component add llvm-tools-preview" diff --git a/.circleci/pgcat.toml b/.circleci/pgcat.toml index 502215b..bc37a29 100644 --- a/.circleci/pgcat.toml +++ b/.circleci/pgcat.toml @@ -11,6 +11,9 @@ host = "0.0.0.0" # Port to run on, same as PgBouncer used in this example. port = 6432 +# enable prometheus exporter on port 9930 +enable_prometheus_exporter = true + # How long to wait before aborting a server connection (ms). connect_timeout = 100 diff --git a/.circleci/run_tests.sh b/.circleci/run_tests.sh index 431f2d6..12e2036 100644 --- a/.circleci/run_tests.sh +++ b/.circleci/run_tests.sh @@ -31,6 +31,9 @@ toxiproxy-cli create -l 127.0.0.1:5433 -u 127.0.0.1:5432 postgres_replica start_pgcat "info" +# Check that prometheus is running +curl --fail localhost:9930/metrics + export PGPASSWORD=sharding_user export PGDATABASE=sharded_db diff --git a/Cargo.lock b/Cargo.lock index ddab730..2e20f70 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -159,6 +159,12 @@ dependencies = [ "termcolor", ] +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + [[package]] name = "futures-channel" version = "0.3.19" @@ -174,6 +180,12 @@ version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0c8ff0461b82559810cdccfde3215c3f373807f5e5232b71479bff7bb2583d7" +[[package]] +name = "futures-sink" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21163e139fa306126e6eedaf49ecdb4588f939600f0b1e770f4205ee4b7fa868" + [[package]] name = "futures-task" version = "0.3.19" @@ -215,6 +227,31 @@ dependencies = [ "wasi", ] +[[package]] +name = "h2" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37a82c6d637fc9515a4694bbf1cb2457b79d81ce52b3108bdeea58b07dd34a57" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" + [[package]] name = "hermit-abi" version = "0.1.19" @@ -233,12 +270,80 @@ dependencies = [ "digest", ] +[[package]] +name = "http" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + +[[package]] +name = "http-body" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" +dependencies = [ + "bytes", + "http", + "pin-project-lite", +] + +[[package]] +name = "httparse" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "496ce29bb5a52785b44e0f7ca2847ae0bb839c9bd28f69acac9b99d461c0c04c" + +[[package]] +name = "httpdate" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" + [[package]] name = "humantime" version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" +[[package]] +name = "hyper" +version = "0.14.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02c929dc5c39e335a03c405292728118860721b10190d98c2a0f0efd5baafbac" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2", + "tokio", + "tower-service", + "tracing", + "want", +] + +[[package]] +name = "indexmap" +version = "1.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10a35a97730320ffe8e2d410b5d3b69279b98d2c14bdb8b70ea89ecf7888d41e" +dependencies = [ + "autocfg", + "hashbrown", +] + [[package]] name = "instant" version = "0.1.12" @@ -248,6 +353,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "itoa" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c8af84674fe1f223a982c933a0ee1086ac4d4052aa0fb8060c12c6ad838e754" + [[package]] name = "js-sys" version = "0.3.58" @@ -405,11 +516,13 @@ dependencies = [ "chrono", "env_logger", "hmac", + "hyper", "log", "md-5", "num_cpus", "once_cell", "parking_lot", + "phf", "rand", "regex", "rustls-pemfile", @@ -424,6 +537,50 @@ dependencies = [ "toml", ] +[[package]] +name = "phf" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fabbf1ead8a5bcbc20f5f8b939ee3f5b0f6f281b6ad3468b84656b658b455259" +dependencies = [ + "phf_macros", + "phf_shared", + "proc-macro-hack", +] + +[[package]] +name = "phf_generator" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d5285893bb5eb82e6aaf5d59ee909a06a16737a8970984dd7746ba9283498d6" +dependencies = [ + "phf_shared", + "rand", +] + +[[package]] +name = "phf_macros" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "58fdf3184dd560f160dd73922bea2d5cd6e8f064bf4b13110abd81b03697b4e0" +dependencies = [ + "phf_generator", + "phf_shared", + "proc-macro-hack", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "phf_shared" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6796ad771acdc0123d2a88dc428b5e38ef24456743ddb1744ed628f9815c096" +dependencies = [ + "siphasher", +] + [[package]] name = "pin-project-lite" version = "0.2.8" @@ -442,6 +599,12 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872" +[[package]] +name = "proc-macro-hack" +version = "0.5.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" + [[package]] name = "proc-macro2" version = "1.0.36" @@ -626,6 +789,12 @@ dependencies = [ "libc", ] +[[package]] +name = "siphasher" +version = "0.3.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de" + [[package]] name = "slab" version = "0.4.5" @@ -638,6 +807,16 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83" +[[package]] +name = "socket2" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66d72b759436ae32898a2af0a14218dbf55efde3feeb170eb623637db85ee1e0" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "spin" version = "0.5.2" @@ -756,6 +935,20 @@ dependencies = [ "webpki", ] +[[package]] +name = "tokio-util" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f988a1a1adc2fb21f9c12aa96441da33a1728193ae0b95d2be22dbd17fcb4e5c" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", + "tracing", +] + [[package]] name = "toml" version = "0.5.8" @@ -765,6 +958,50 @@ dependencies = [ "serde", ] +[[package]] +name = "tower-service" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" + +[[package]] +name = "tracing" +version = "0.1.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d0ecdcb44a79f0fe9844f0c4f33a342cbcbb5117de8001e6ba0dc2351327d09" +dependencies = [ + "cfg-if", + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11c75893af559bc8e10716548bdef5cb2b983f8e637db9d0e15126b61b484ee2" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f54c8ca710e81886d498c2fd3331b56c93aa248d49de2222ad2742247c60072f" +dependencies = [ + "lazy_static", +] + +[[package]] +name = "try-lock" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" + [[package]] name = "typenum" version = "1.15.0" @@ -804,6 +1041,16 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "want" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0" +dependencies = [ + "log", + "try-lock", +] + [[package]] name = "wasi" version = "0.10.0+wasi-snapshot-preview1" diff --git a/Cargo.toml b/Cargo.toml index 8bdeab6..3737024 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,3 +31,5 @@ base64 = "0.13" stringprep = "0.1" tokio-rustls = "0.23" rustls-pemfile = "1" +hyper = { version = "0.14", features = ["full"] } +phf = { version = "0.10", features = ["macros"] } diff --git a/docker-compose.yml b/docker-compose.yml index 510546e..89cb67e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -14,3 +14,4 @@ services: - "${PWD}/examples/docker/pgcat.toml:/etc/pgcat/pgcat.toml" ports: - "6432:6432" + - "9090:9090" diff --git a/examples/docker/pgcat.toml b/examples/docker/pgcat.toml index 40a5492..cbfb1d9 100644 --- a/examples/docker/pgcat.toml +++ b/examples/docker/pgcat.toml @@ -11,6 +11,9 @@ host = "0.0.0.0" # Port to run on, same as PgBouncer used in this example. port = 6432 +# enable prometheus exporter on port 9930 +enable_prometheus_exporter = true + # How long to wait before aborting a server connection (ms). connect_timeout = 5000 diff --git a/pgcat.toml b/pgcat.toml index 50d797f..d826994 100644 --- a/pgcat.toml +++ b/pgcat.toml @@ -11,6 +11,9 @@ host = "0.0.0.0" # Port to run on, same as PgBouncer used in this example. port = 6432 +# enable prometheus exporter on port 9930 +enable_prometheus_exporter = true + # How long to wait before aborting a server connection (ms). connect_timeout = 5000 @@ -61,7 +64,7 @@ query_parser_enabled = true # If the query parser is enabled and this setting is enabled, the primary will be part of the pool of databases used for # load balancing of read queries. Otherwise, the primary will only be used for write -# queries. The primary can always be explicitely selected with our custom protocol. +# queries. The primary can always be explicitly selected with our custom protocol. primary_reads_enabled = true # So what if you wanted to implement a different hashing function, diff --git a/src/config.rs b/src/config.rs index 05a1b36..17aad85 100644 --- a/src/config.rs +++ b/src/config.rs @@ -117,6 +117,7 @@ impl Default for User { pub struct General { pub host: String, pub port: i16, + pub enable_prometheus_exporter: Option, pub connect_timeout: u64, pub healthcheck_timeout: u64, pub shutdown_timeout: u64, @@ -133,6 +134,7 @@ impl Default for General { General { host: String::from("localhost"), port: 5432, + enable_prometheus_exporter: Some(false), connect_timeout: 5000, healthcheck_timeout: 1000, shutdown_timeout: 60000, diff --git a/src/main.rs b/src/main.rs index 5e5c924..7ae71fe 100644 --- a/src/main.rs +++ b/src/main.rs @@ -45,6 +45,8 @@ use tokio::{ }; use std::collections::HashMap; +use std::net::SocketAddr; +use std::str::FromStr; use std::sync::Arc; use tokio::sync::broadcast; @@ -55,6 +57,7 @@ mod constants; mod errors; mod messages; mod pool; +mod prometheus; mod query_router; mod scram; mod server; @@ -62,11 +65,10 @@ mod sharding; mod stats; mod tls; -use config::{get_config, reload_config}; -use pool::{ClientServerMap, ConnectionPool}; -use stats::{Collector, Reporter, REPORTER}; - -use crate::config::VERSION; +use crate::config::{get_config, reload_config, VERSION}; +use crate::pool::{ClientServerMap, ConnectionPool}; +use crate::prometheus::start_metric_server; +use crate::stats::{Collector, Reporter, REPORTER}; #[tokio::main(worker_threads = 4)] async fn main() { @@ -95,6 +97,21 @@ async fn main() { }; let config = get_config(); + + if let Some(true) = config.general.enable_prometheus_exporter { + let http_addr_str = format!("{}:{}", config.general.host, crate::prometheus::HTTP_PORT); + let http_addr = match SocketAddr::from_str(&http_addr_str) { + Ok(addr) => addr, + Err(err) => { + error!("Invalid http address: {}", err); + return; + } + }; + tokio::task::spawn(async move { + start_metric_server(http_addr).await; + }); + } + let addr = format!("{}:{}", config.general.host, config.general.port); let listener = match TcpListener::bind(&addr).await { diff --git a/src/prometheus.rs b/src/prometheus.rs new file mode 100644 index 0000000..eeaa9b1 --- /dev/null +++ b/src/prometheus.rs @@ -0,0 +1,212 @@ +use hyper::service::{make_service_fn, service_fn}; +use hyper::{Body, Method, Request, Response, Server, StatusCode}; +use log::{error, info, warn}; +use phf::phf_map; +use std::collections::HashMap; +use std::fmt; +use std::net::SocketAddr; + +use crate::config::Address; +use crate::pool::get_all_pools; +use crate::stats::get_stats; + +pub const HTTP_PORT: usize = 9930; + +struct MetricHelpType { + help: &'static str, + ty: &'static str, +} + +// reference for metric types: https://prometheus.io/docs/concepts/metric_types/ +// counters only increase +// gauges can arbitrarily increase or decrease +static METRIC_HELP_AND_TYPES_LOOKUP: phf::Map<&'static str, MetricHelpType> = phf_map! { + "total_query_count" => MetricHelpType { + help: "Number of queries sent by all clients", + ty: "counter", + }, + "total_query_time" => MetricHelpType { + help: "Total amount of time for queries to execute", + ty: "counter", + }, + "total_received" => MetricHelpType { + help: "Number of bytes received from the server", + ty: "counter", + }, + "total_sent" => MetricHelpType { + help: "Number of bytes sent to the server", + ty: "counter", + }, + "total_xact_count" => MetricHelpType { + help: "Total number of transactions started by the client", + ty: "counter", + }, + "total_xact_time" => MetricHelpType { + help: "Total amount of time for all transactions to execute", + ty: "counter", + }, + "total_wait_time" => MetricHelpType { + help: "Total time client waited for a server connection", + ty: "counter", + }, + "avg_query_count" => MetricHelpType { + help: "Average of total_query_count every 15 seconds", + ty: "gauge", + }, + "avg_query_time" => MetricHelpType { + help: "Average time taken for queries to execute every 15 seconds", + ty: "gauge", + }, + "avg_recv" => MetricHelpType { + help: "Average of total_received bytes every 15 seconds", + ty: "gauge", + }, + "avg_sent" => MetricHelpType { + help: "Average of total_sent bytes every 15 seconds", + ty: "gauge", + }, + "avg_xact_count" => MetricHelpType { + help: "Average of total_xact_count every 15 seconds", + ty: "gauge", + }, + "avg_xact_time" => MetricHelpType { + help: "Average of total_xact_time every 15 seconds", + ty: "gauge", + }, + "avg_wait_time" => MetricHelpType { + help: "Average of total_wait_time every 15 seconds", + ty: "gauge", + }, + "maxwait_us" => MetricHelpType { + help: "The time a client waited for a server connection in microseconds", + ty: "gauge", + }, + "maxwait" => MetricHelpType { + help: "The time a client waited for a server connection in seconds", + ty: "gauge", + }, + "cl_waiting" => MetricHelpType { + help: "How many clients are waiting for a connection from the pool", + ty: "gauge", + }, + "cl_active" => MetricHelpType { + help: "How many clients are actively communicating with a server", + ty: "gauge", + }, + "cl_idle" => MetricHelpType { + help: "How many clients are idle", + ty: "gauge", + }, + "sv_idle" => MetricHelpType { + help: "How many server connections are idle", + ty: "gauge", + }, + "sv_active" => MetricHelpType { + help: "How many server connections are actively communicating with a client", + ty: "gauge", + }, + "sv_login" => MetricHelpType { + help: "How many server connections are currently being created", + ty: "gauge", + }, + "sv_tested" => MetricHelpType { + help: "How many server connections are currently waiting on a health check to succeed", + ty: "gauge", + }, +}; + +struct PrometheusMetric { + name: String, + help: String, + ty: String, + labels: HashMap<&'static str, String>, + value: i64, +} + +impl fmt::Display for PrometheusMetric { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let formatted_labels = self + .labels + .iter() + .map(|(key, value)| format!("{}=\"{}\"", key, value)) + .collect::>() + .join(","); + write!( + f, + "# HELP {name} {help}\n# TYPE {name} {ty}\n{name}{{{formatted_labels}}} {value}\n", + name = format_args!("pgcat_{}", self.name), + help = self.help, + ty = self.ty, + formatted_labels = formatted_labels, + value = self.value + ) + } +} + +impl PrometheusMetric { + fn new(address: &Address, name: &str, value: i64) -> Option { + let mut labels = HashMap::new(); + labels.insert("host", address.host.clone()); + labels.insert("shard", address.shard.to_string()); + labels.insert("role", address.role.to_string()); + labels.insert("database", address.database.to_string()); + + METRIC_HELP_AND_TYPES_LOOKUP + .get(name) + .map(|metric| PrometheusMetric { + name: name.to_owned(), + help: metric.help.to_owned(), + ty: metric.ty.to_owned(), + labels, + value, + }) + } +} + +async fn prometheus_stats(request: Request) -> Result, hyper::http::Error> { + match (request.method(), request.uri().path()) { + (&Method::GET, "/metrics") => { + let stats = get_stats(); + + let mut lines = Vec::new(); + for (_, pool) in get_all_pools() { + for shard in 0..pool.shards() { + for server in 0..pool.servers(shard) { + let address = pool.address(shard, server); + if let Some(address_stats) = stats.get(&address.id) { + for (key, value) in address_stats.iter() { + if let Some(prometheus_metric) = + PrometheusMetric::new(address, key, *value) + { + lines.push(prometheus_metric.to_string()); + } else { + warn!("Metric {} not implemented for {}", key, address.name()); + } + } + } + } + } + } + + Response::builder() + .header("content-type", "text/plain; version=0.0.4") + .body(lines.join("\n").into()) + } + _ => Response::builder() + .status(StatusCode::NOT_FOUND) + .body("".into()), + } +} + +pub async fn start_metric_server(http_addr: SocketAddr) { + let http_service_factory = + make_service_fn(|_conn| async { Ok::<_, hyper::Error>(service_fn(prometheus_stats)) }); + let server = Server::bind(&http_addr.into()).serve(http_service_factory); + info!( + "Exposing prometheus metrics on http://{}/metrics.", + http_addr + ); + if let Err(e) = server.await { + error!("Failed to run HTTP server: {}.", e); + } +}