Compare commits

..

2 Commits

Author SHA1 Message Date
Mostafa
2b694509f5 rebase on main 2024-08-30 04:10:00 -05:00
Javier Goday
e1f607b732 QueryRouter: route to primary when locks exists (select for update) 2024-07-06 09:07:16 +02:00
8 changed files with 35 additions and 2239 deletions

2
Cargo.lock generated
View File

@@ -1034,7 +1034,7 @@ checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94"
[[package]]
name = "pgcat"
version = "1.2.0"
version = "1.1.2-dev4"
dependencies = [
"arc-swap",
"async-trait",

View File

@@ -1,6 +1,6 @@
[package]
name = "pgcat"
version = "1.2.0"
version = "1.1.2-dev4"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

View File

@@ -268,8 +268,6 @@ psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW DATABASES'
Additionally, Prometheus statistics are available at `/metrics` via HTTP.
We also have a [basic Grafana dashboard](https://github.com/postgresml/pgcat/blob/main/grafana_dashboard.json) based on Prometheus metrics that you can import into Grafana and build on it or use it for monitoring.
### Live configuration reloading
The config can be reloaded by sending a `kill -s SIGHUP` to the process or by querying `RELOAD` to the admin database. All settings except the `host` and `port` can be reloaded without restarting the pooler, including sharding and replicas configurations.

View File

@@ -4,5 +4,5 @@ description: A Helm chart for PgCat a PostgreSQL pooler and proxy (like PgBounce
maintainers:
- name: Wildcard
email: support@w6d.io
appVersion: "1.2.0"
version: 0.2.0
appVersion: "1.1.1"
version: 0.1.0

View File

@@ -170,13 +170,13 @@ configuration:
connect_timeout: 5000
# How long an idle connection with a server is left open (ms).
idle_timeout: 30000 # milliseconds
idle_timeout: 30000 # milliseconds
# Max connection lifetime before it's closed, even if actively used.
server_lifetime: 86400000 # 24 hours
server_lifetime: 86400000 # 24 hours
# How long a client is allowed to be idle while in a transaction (ms).
idle_client_in_transaction_timeout: 0 # milliseconds
idle_client_in_transaction_timeout: 0 # milliseconds
# @param configuration.general.healthcheck_timeout How much time to give `SELECT 1` health check query to return with a result (ms).
healthcheck_timeout: 1000
@@ -240,15 +240,7 @@ configuration:
## the pool_name is what clients use as database name when connecting
## For the example below a client can connect using "postgres://sharding_user:sharding_user@pgcat_host:pgcat_port/sharded"
## @param [object]
pools:
[{
name: "simple", pool_mode: "transaction",
users: [{username: "user", password: "pass", pool_size: 5, statement_timeout: 0}],
shards: [{
servers: [{host: "postgres", port: 5432, role: "primary"}],
database: "postgres"
}]
}]
pools: []
# - ## default values
# ##
# ##

File diff suppressed because it is too large Load Diff

View File

@@ -12,30 +12,19 @@ use std::collections::HashMap;
use std::fmt;
use std::net::SocketAddr;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use tokio::net::TcpListener;
use crate::config::Address;
use crate::pool::{get_all_pools, PoolIdentifier};
use crate::stats::get_server_stats;
use crate::stats::pool::PoolStats;
use crate::stats::{get_server_stats, ServerStats};
struct MetricHelpType {
help: &'static str,
ty: &'static str,
}
struct ServerPrometheusStats {
bytes_received: u64,
bytes_sent: u64,
transaction_count: u64,
query_count: u64,
error_count: u64,
active_count: u64,
idle_count: u64,
login_count: u64,
tested_count: u64,
}
// reference for metric types: https://prometheus.io/docs/concepts/metric_types/
// counters only increase
// gauges can arbitrarily increase or decrease
@@ -138,46 +127,22 @@ static METRIC_HELP_AND_TYPES_LOOKUP: phf::Map<&'static str, MetricHelpType> = ph
},
"servers_bytes_received" => MetricHelpType {
help: "Volume in bytes of network traffic received by server",
ty: "counter",
ty: "gauge",
},
"servers_bytes_sent" => MetricHelpType {
help: "Volume in bytes of network traffic sent by server",
ty: "counter",
ty: "gauge",
},
"servers_transaction_count" => MetricHelpType {
help: "Number of transactions executed by server",
ty: "counter",
ty: "gauge",
},
"servers_query_count" => MetricHelpType {
help: "Number of queries executed by server",
ty: "counter",
ty: "gauge",
},
"servers_error_count" => MetricHelpType {
help: "Number of errors",
ty: "counter",
},
"servers_idle_count" => MetricHelpType {
help: "Number of server connection in idle state",
ty: "gauge",
},
"servers_active_count" => MetricHelpType {
help: "Number of server connection in active state",
ty: "gauge",
},
"servers_tested_count" => MetricHelpType {
help: "Number of server connection in tested state",
ty: "gauge",
},
"servers_login_count" => MetricHelpType {
help: "Number of server connection in login state",
ty: "gauge",
},
"servers_is_banned" => MetricHelpType {
help: "0 if server is not banned, 1 if server is banned",
ty: "gauge",
},
"servers_is_paused" => MetricHelpType {
help: "0 if server is not paused, 1 if server is paused",
ty: "gauge",
},
"databases_pool_size" => MetricHelpType {
@@ -245,9 +210,7 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
labels.insert("shard", address.shard.to_string());
labels.insert("role", address.role.to_string());
labels.insert("pool", address.pool_name.clone());
labels.insert("index", address.address_index.to_string());
labels.insert("database", address.database.to_string());
labels.insert("user", address.username.clone());
Self::from_name(&format!("databases_{}", name), value, labels)
}
@@ -262,9 +225,8 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
labels.insert("shard", address.shard.to_string());
labels.insert("role", address.role.to_string());
labels.insert("pool", address.pool_name.clone());
labels.insert("index", address.address_index.to_string());
labels.insert("database", address.database.to_string());
labels.insert("user", address.username.clone());
Self::from_name(&format!("servers_{}", name), value, labels)
}
@@ -274,9 +236,7 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
labels.insert("shard", address.shard.to_string());
labels.insert("pool", address.pool_name.clone());
labels.insert("role", address.role.to_string());
labels.insert("index", address.address_index.to_string());
labels.insert("database", address.database.to_string());
labels.insert("user", address.username.clone());
Self::from_name(&format!("stats_{}", name), value, labels)
}
@@ -378,51 +338,34 @@ fn push_database_stats(lines: &mut Vec<String>) {
// Adds relevant metrics shown in a SHOW SERVERS admin command.
fn push_server_stats(lines: &mut Vec<String>) {
let server_stats = get_server_stats();
let mut prom_stats = HashMap::<String, ServerPrometheusStats>::new();
let mut server_stats_by_addresses = HashMap::<String, Arc<ServerStats>>::new();
for (_, stats) in server_stats {
let entry = prom_stats
.entry(stats.address_name())
.or_insert(ServerPrometheusStats {
bytes_received: 0,
bytes_sent: 0,
transaction_count: 0,
query_count: 0,
error_count: 0,
active_count: 0,
idle_count: 0,
login_count: 0,
tested_count: 0,
});
entry.bytes_received += stats.bytes_received.load(Ordering::Relaxed);
entry.bytes_sent += stats.bytes_sent.load(Ordering::Relaxed);
entry.transaction_count += stats.transaction_count.load(Ordering::Relaxed);
entry.query_count += stats.query_count.load(Ordering::Relaxed);
entry.error_count += stats.error_count.load(Ordering::Relaxed);
match stats.state.load(Ordering::Relaxed) {
crate::stats::ServerState::Login => entry.login_count += 1,
crate::stats::ServerState::Active => entry.active_count += 1,
crate::stats::ServerState::Tested => entry.tested_count += 1,
crate::stats::ServerState::Idle => entry.idle_count += 1,
}
server_stats_by_addresses.insert(stats.address_name(), stats);
}
for (_, pool) in get_all_pools() {
for shard in 0..pool.shards() {
for server in 0..pool.servers(shard) {
let address = pool.address(shard, server);
if let Some(server_info) = prom_stats.get(&address.name()) {
if let Some(server_info) = server_stats_by_addresses.get(&address.name()) {
let metrics = [
("bytes_received", server_info.bytes_received),
("bytes_sent", server_info.bytes_sent),
("transaction_count", server_info.transaction_count),
("query_count", server_info.query_count),
("error_count", server_info.error_count),
("idle_count", server_info.idle_count),
("active_count", server_info.active_count),
("login_count", server_info.login_count),
("tested_count", server_info.tested_count),
("is_banned", if pool.is_banned(address) { 1 } else { 0 }),
("is_paused", if pool.paused() { 1 } else { 0 }),
(
"bytes_received",
server_info.bytes_received.load(Ordering::Relaxed),
),
("bytes_sent", server_info.bytes_sent.load(Ordering::Relaxed)),
(
"transaction_count",
server_info.transaction_count.load(Ordering::Relaxed),
),
(
"query_count",
server_info.query_count.load(Ordering::Relaxed),
),
(
"error_count",
server_info.error_count.load(Ordering::Relaxed),
),
];
for (key, value) in metrics {
if let Some(prometheus_metric) =

View File

@@ -1399,19 +1399,6 @@ mod test {
assert!(!qr.query_parser_enabled());
}
#[test]
fn test_query_parser() {
QueryRouter::setup();
let mut qr = QueryRouter::new();
qr.pool_settings.query_parser_read_write_splitting = true;
let query = simple_query("SELECT req_tab_0.* FROM validation req_tab_0 WHERE array['http://www.w3.org/ns/shacl#ValidationResult'] && req_tab_0.type::text[] AND ( ( (req_tab_0.focusnode = 'DataSource_Credilogic_DataSourceAddress_144959227') ) )");
assert!(qr.infer(&qr.parse(&query).unwrap()).is_ok());
let query = simple_query("WITH EmployeeSalaries AS (SELECT Department, Salary FROM Employees) SELECT Department, AVG(Salary) AS AverageSalary FROM EmployeeSalaries GROUP BY Department;");
assert!(qr.infer(&qr.parse(&query).unwrap()).is_ok());
}
#[test]
fn test_update_from_pool_settings() {
QueryRouter::setup();