mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-26 18:36:28 +00:00
Compare commits
9 Commits
pgcat-0.1.
...
circleci_s
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
04e346df80 | ||
|
|
cbf4d58144 | ||
|
|
731aa047ba | ||
|
|
88dbcc21d1 | ||
|
|
c34b15bddc | ||
|
|
0b034a6831 | ||
|
|
966b8e093c | ||
|
|
c9270a47d4 | ||
|
|
0d94d0b90a |
10
Cargo.lock
generated
10
Cargo.lock
generated
@@ -1510,9 +1510,9 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "sqlparser"
|
name = "sqlparser"
|
||||||
version = "0.34.0"
|
version = "0.41.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "37d3706eefb17039056234df6b566b0014f303f867f2656108334a55b8096f59"
|
checksum = "5cc2c25a6c66789625ef164b4c7d2e548d627902280c13710d33da8222169964"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"log",
|
"log",
|
||||||
"sqlparser_derive",
|
"sqlparser_derive",
|
||||||
@@ -1520,13 +1520,13 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "sqlparser_derive"
|
name = "sqlparser_derive"
|
||||||
version = "0.1.1"
|
version = "0.2.2"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "55fe75cb4a364c7f7ae06c7dbbc8d84bddd85d6cdf9975963c3935bc1991761e"
|
checksum = "01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
"quote",
|
"quote",
|
||||||
"syn 1.0.109",
|
"syn 2.0.26",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
|||||||
@@ -19,7 +19,7 @@ serde_derive = "1"
|
|||||||
regex = "1"
|
regex = "1"
|
||||||
num_cpus = "1"
|
num_cpus = "1"
|
||||||
once_cell = "1"
|
once_cell = "1"
|
||||||
sqlparser = {version = "0.34", features = ["visitor"] }
|
sqlparser = { version = "0.41", features = ["visitor"] }
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
arc-swap = "1"
|
arc-swap = "1"
|
||||||
parking_lot = "0.12.1"
|
parking_lot = "0.12.1"
|
||||||
@@ -47,9 +47,12 @@ serde_json = "1"
|
|||||||
itertools = "0.10"
|
itertools = "0.10"
|
||||||
clap = { version = "4.3.1", features = ["derive", "env"] }
|
clap = { version = "4.3.1", features = ["derive", "env"] }
|
||||||
tracing = "0.1.37"
|
tracing = "0.1.37"
|
||||||
tracing-subscriber = { version = "0.3.17", features = ["json", "env-filter", "std"]}
|
tracing-subscriber = { version = "0.3.17", features = [
|
||||||
|
"json",
|
||||||
|
"env-filter",
|
||||||
|
"std",
|
||||||
|
] }
|
||||||
lru = "0.12.0"
|
lru = "0.12.0"
|
||||||
|
|
||||||
[target.'cfg(not(target_env = "msvc"))'.dependencies]
|
[target.'cfg(not(target_env = "msvc"))'.dependencies]
|
||||||
jemallocator = "0.5.0"
|
jemallocator = "0.5.0"
|
||||||
|
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
FROM rust:1-slim-bookworm AS builder
|
FROM rust:1.79.0-slim-bookworm AS builder
|
||||||
|
|
||||||
RUN apt-get update && \
|
RUN apt-get update && \
|
||||||
apt-get install -y build-essential
|
apt-get install -y build-essential
|
||||||
@@ -19,3 +19,4 @@ COPY --from=builder /app/pgcat.toml /etc/pgcat/pgcat.toml
|
|||||||
WORKDIR /etc/pgcat
|
WORKDIR /etc/pgcat
|
||||||
ENV RUST_LOG=info
|
ENV RUST_LOG=info
|
||||||
CMD ["pgcat"]
|
CMD ["pgcat"]
|
||||||
|
STOPSIGNAL SIGINT
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
FROM rust:1.70-bullseye
|
FROM rust:bullseye
|
||||||
|
|
||||||
# Dependencies
|
# Dependencies
|
||||||
COPY --from=sclevine/yj /bin/yj /bin/yj
|
COPY --from=sclevine/yj /bin/yj /bin/yj
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ RestartSec=1
|
|||||||
Environment=RUST_LOG=info
|
Environment=RUST_LOG=info
|
||||||
LimitNOFILE=65536
|
LimitNOFILE=65536
|
||||||
ExecStart=/usr/bin/pgcat /etc/pgcat.toml
|
ExecStart=/usr/bin/pgcat /etc/pgcat.toml
|
||||||
|
ExecReload=/bin/kill -SIGHUP $MAINPID
|
||||||
|
|
||||||
[Install]
|
[Install]
|
||||||
WantedBy=multi-user.target
|
WantedBy=multi-user.target
|
||||||
|
|||||||
@@ -1204,9 +1204,12 @@ where
|
|||||||
if !server.in_transaction() {
|
if !server.in_transaction() {
|
||||||
// Report transaction executed statistics.
|
// Report transaction executed statistics.
|
||||||
self.stats.transaction();
|
self.stats.transaction();
|
||||||
server
|
server.stats().transaction(
|
||||||
.stats()
|
Instant::now()
|
||||||
.transaction(self.server_parameters.get_application_name());
|
.duration_since(server.transaction_start().into())
|
||||||
|
.as_millis() as u64,
|
||||||
|
self.server_parameters.get_application_name(),
|
||||||
|
);
|
||||||
|
|
||||||
// Release server back to the pool if we are in transaction mode.
|
// Release server back to the pool if we are in transaction mode.
|
||||||
// If we are in session mode, we keep the server until the client disconnects.
|
// If we are in session mode, we keep the server until the client disconnects.
|
||||||
@@ -1460,9 +1463,12 @@ where
|
|||||||
|
|
||||||
if !server.in_transaction() {
|
if !server.in_transaction() {
|
||||||
self.stats.transaction();
|
self.stats.transaction();
|
||||||
server
|
server.stats().transaction(
|
||||||
.stats()
|
Instant::now()
|
||||||
.transaction(self.server_parameters.get_application_name());
|
.duration_since(server.transaction_start().into())
|
||||||
|
.as_millis() as u64,
|
||||||
|
self.server_parameters.get_application_name(),
|
||||||
|
);
|
||||||
|
|
||||||
// Release server back to the pool if we are in transaction mode.
|
// Release server back to the pool if we are in transaction mode.
|
||||||
// If we are in session mode, we keep the server until the client disconnects.
|
// If we are in session mode, we keep the server until the client disconnects.
|
||||||
@@ -1511,9 +1517,12 @@ where
|
|||||||
|
|
||||||
if !server.in_transaction() {
|
if !server.in_transaction() {
|
||||||
self.stats.transaction();
|
self.stats.transaction();
|
||||||
server
|
server.stats().transaction(
|
||||||
.stats()
|
Instant::now()
|
||||||
.transaction(self.server_parameters.get_application_name());
|
.duration_since(server.transaction_start().into())
|
||||||
|
.as_millis() as u64,
|
||||||
|
self.server_parameters.get_application_name(),
|
||||||
|
);
|
||||||
|
|
||||||
// Release server back to the pool if we are in transaction mode.
|
// Release server back to the pool if we are in transaction mode.
|
||||||
// If we are in session mode, we keep the server until the client disconnects.
|
// If we are in session mode, we keep the server until the client disconnects.
|
||||||
|
|||||||
@@ -38,12 +38,12 @@ pub enum Role {
|
|||||||
Mirror,
|
Mirror,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ToString for Role {
|
impl std::fmt::Display for Role {
|
||||||
fn to_string(&self) -> String {
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
match *self {
|
match self {
|
||||||
Role::Primary => "primary".to_string(),
|
Role::Primary => write!(f, "primary"),
|
||||||
Role::Replica => "replica".to_string(),
|
Role::Replica => write!(f, "replica"),
|
||||||
Role::Mirror => "mirror".to_string(),
|
Role::Mirror => write!(f, "mirror"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -476,11 +476,11 @@ pub enum PoolMode {
|
|||||||
Session,
|
Session,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ToString for PoolMode {
|
impl std::fmt::Display for PoolMode {
|
||||||
fn to_string(&self) -> String {
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
match *self {
|
match self {
|
||||||
PoolMode::Transaction => "transaction".to_string(),
|
PoolMode::Transaction => write!(f, "transaction"),
|
||||||
PoolMode::Session => "session".to_string(),
|
PoolMode::Session => write!(f, "session"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -493,12 +493,13 @@ pub enum LoadBalancingMode {
|
|||||||
#[serde(alias = "loc", alias = "LOC", alias = "least_outstanding_connections")]
|
#[serde(alias = "loc", alias = "LOC", alias = "least_outstanding_connections")]
|
||||||
LeastOutstandingConnections,
|
LeastOutstandingConnections,
|
||||||
}
|
}
|
||||||
impl ToString for LoadBalancingMode {
|
|
||||||
fn to_string(&self) -> String {
|
impl std::fmt::Display for LoadBalancingMode {
|
||||||
match *self {
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
LoadBalancingMode::Random => "random".to_string(),
|
match self {
|
||||||
|
LoadBalancingMode::Random => write!(f, "random"),
|
||||||
LoadBalancingMode::LeastOutstandingConnections => {
|
LoadBalancingMode::LeastOutstandingConnections => {
|
||||||
"least_outstanding_connections".to_string()
|
write!(f, "least_outstanding_connections")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -999,15 +1000,17 @@ impl Config {
|
|||||||
pub fn fill_up_auth_query_config(&mut self) {
|
pub fn fill_up_auth_query_config(&mut self) {
|
||||||
for (_name, pool) in self.pools.iter_mut() {
|
for (_name, pool) in self.pools.iter_mut() {
|
||||||
if pool.auth_query.is_none() {
|
if pool.auth_query.is_none() {
|
||||||
pool.auth_query = self.general.auth_query.clone();
|
pool.auth_query.clone_from(&self.general.auth_query);
|
||||||
}
|
}
|
||||||
|
|
||||||
if pool.auth_query_user.is_none() {
|
if pool.auth_query_user.is_none() {
|
||||||
pool.auth_query_user = self.general.auth_query_user.clone();
|
pool.auth_query_user
|
||||||
|
.clone_from(&self.general.auth_query_user);
|
||||||
}
|
}
|
||||||
|
|
||||||
if pool.auth_query_password.is_none() {
|
if pool.auth_query_password.is_none() {
|
||||||
pool.auth_query_password = self.general.auth_query_password.clone();
|
pool.auth_query_password
|
||||||
|
.clone_from(&self.general.auth_query_password);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1155,7 +1158,7 @@ impl Config {
|
|||||||
"Default max server lifetime: {}ms",
|
"Default max server lifetime: {}ms",
|
||||||
self.general.server_lifetime
|
self.general.server_lifetime
|
||||||
);
|
);
|
||||||
info!("Sever round robin: {}", self.general.server_round_robin);
|
info!("Server round robin: {}", self.general.server_round_robin);
|
||||||
match self.general.tls_certificate.clone() {
|
match self.general.tls_certificate.clone() {
|
||||||
Some(tls_certificate) => {
|
Some(tls_certificate) => {
|
||||||
info!("TLS certificate: {}", tls_certificate);
|
info!("TLS certificate: {}", tls_certificate);
|
||||||
|
|||||||
@@ -733,6 +733,10 @@ pub fn configure_socket(stream: &TcpStream) {
|
|||||||
}
|
}
|
||||||
Err(err) => error!("Could not configure socket: {}", err),
|
Err(err) => error!("Could not configure socket: {}", err),
|
||||||
}
|
}
|
||||||
|
match sock_ref.set_nodelay(true) {
|
||||||
|
Ok(_) => (),
|
||||||
|
Err(err) => error!("Could not configure TCP_NODELAY for socket: {}", err),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait BytesMutReader {
|
pub trait BytesMutReader {
|
||||||
|
|||||||
@@ -813,7 +813,7 @@ impl ConnectionPool {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
client_stats.checkout_success();
|
client_stats.checkout_error();
|
||||||
|
|
||||||
Err(Error::AllServersDown)
|
Err(Error::AllServersDown)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -499,6 +499,7 @@ impl QueryRouter {
|
|||||||
table: _,
|
table: _,
|
||||||
on: _,
|
on: _,
|
||||||
returning: _,
|
returning: _,
|
||||||
|
ignore: _,
|
||||||
} => {
|
} => {
|
||||||
// Not supported in postgres.
|
// Not supported in postgres.
|
||||||
assert!(or.is_none());
|
assert!(or.is_none());
|
||||||
@@ -506,7 +507,9 @@ impl QueryRouter {
|
|||||||
assert!(after_columns.is_empty());
|
assert!(after_columns.is_empty());
|
||||||
|
|
||||||
Self::process_table(table_name, &mut table_names);
|
Self::process_table(table_name, &mut table_names);
|
||||||
Self::process_query(source, &mut exprs, &mut table_names, &Some(columns));
|
if let Some(source) = source {
|
||||||
|
Self::process_query(source, &mut exprs, &mut table_names, &Some(columns));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Delete {
|
Delete {
|
||||||
tables,
|
tables,
|
||||||
@@ -514,6 +517,8 @@ impl QueryRouter {
|
|||||||
using,
|
using,
|
||||||
selection,
|
selection,
|
||||||
returning: _,
|
returning: _,
|
||||||
|
order_by: _,
|
||||||
|
limit: _,
|
||||||
} => {
|
} => {
|
||||||
if let Some(expr) = selection {
|
if let Some(expr) = selection {
|
||||||
exprs.push(expr.clone());
|
exprs.push(expr.clone());
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ use std::sync::Arc;
|
|||||||
use std::time::SystemTime;
|
use std::time::SystemTime;
|
||||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, BufStream};
|
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, BufStream};
|
||||||
use tokio::net::TcpStream;
|
use tokio::net::TcpStream;
|
||||||
|
use tokio::time::Instant;
|
||||||
use tokio_rustls::rustls::{OwnedTrustAnchor, RootCertStore};
|
use tokio_rustls::rustls::{OwnedTrustAnchor, RootCertStore};
|
||||||
use tokio_rustls::{client::TlsStream, TlsConnector};
|
use tokio_rustls::{client::TlsStream, TlsConnector};
|
||||||
|
|
||||||
@@ -285,6 +286,9 @@ pub struct Server {
|
|||||||
/// Is the server inside a transaction or idle.
|
/// Is the server inside a transaction or idle.
|
||||||
in_transaction: bool,
|
in_transaction: bool,
|
||||||
|
|
||||||
|
/// The time the most recent transaction started.
|
||||||
|
transaction_start: Instant,
|
||||||
|
|
||||||
/// Is there more data for the client to read.
|
/// Is there more data for the client to read.
|
||||||
data_available: bool,
|
data_available: bool,
|
||||||
|
|
||||||
@@ -804,6 +808,7 @@ impl Server {
|
|||||||
process_id,
|
process_id,
|
||||||
secret_key,
|
secret_key,
|
||||||
in_transaction: false,
|
in_transaction: false,
|
||||||
|
transaction_start: Instant::now(),
|
||||||
in_copy_mode: false,
|
in_copy_mode: false,
|
||||||
data_available: false,
|
data_available: false,
|
||||||
bad: false,
|
bad: false,
|
||||||
@@ -936,6 +941,7 @@ impl Server {
|
|||||||
// In transaction.
|
// In transaction.
|
||||||
'T' => {
|
'T' => {
|
||||||
self.in_transaction = true;
|
self.in_transaction = true;
|
||||||
|
self.transaction_start = Instant::now();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Idle, transaction over.
|
// Idle, transaction over.
|
||||||
@@ -1220,6 +1226,12 @@ impl Server {
|
|||||||
self.in_transaction
|
self.in_transaction
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The start time of the most recent transaction.
|
||||||
|
/// Will be stale if not in a transaction.
|
||||||
|
pub fn transaction_start(&self) -> Instant {
|
||||||
|
self.transaction_start
|
||||||
|
}
|
||||||
|
|
||||||
/// Currently copying data from client to server or vice-versa.
|
/// Currently copying data from client to server or vice-versa.
|
||||||
pub fn in_copy_mode(&self) -> bool {
|
pub fn in_copy_mode(&self) -> bool {
|
||||||
self.in_copy_mode
|
self.in_copy_mode
|
||||||
|
|||||||
@@ -14,11 +14,11 @@ pub enum ShardingFunction {
|
|||||||
Sha1,
|
Sha1,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ToString for ShardingFunction {
|
impl std::fmt::Display for ShardingFunction {
|
||||||
fn to_string(&self) -> String {
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
match *self {
|
match self {
|
||||||
ShardingFunction::PgBigintHash => "pg_bigint_hash".to_string(),
|
ShardingFunction::PgBigintHash => write!(f, "pg_bigint_hash"),
|
||||||
ShardingFunction::Sha1 => "sha1".to_string(),
|
ShardingFunction::Sha1 => write!(f, "sha1"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -187,11 +187,12 @@ impl ServerStats {
|
|||||||
/// we report each individual queries outside a transaction as a transaction
|
/// we report each individual queries outside a transaction as a transaction
|
||||||
/// We only count the initial BEGIN as a transaction, all queries within do not
|
/// We only count the initial BEGIN as a transaction, all queries within do not
|
||||||
/// count as transactions
|
/// count as transactions
|
||||||
pub fn transaction(&self, application_name: &str) {
|
pub fn transaction(&self, milliseconds: u64, application_name: &str) {
|
||||||
self.set_application(application_name.to_string());
|
self.set_application(application_name.to_string());
|
||||||
|
|
||||||
self.transaction_count.fetch_add(1, Ordering::Relaxed);
|
self.transaction_count.fetch_add(1, Ordering::Relaxed);
|
||||||
self.address.stats.xact_count_add();
|
self.address.stats.xact_count_add();
|
||||||
|
self.address.stats.xact_time_add(milliseconds);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Report data sent to a server
|
/// Report data sent to a server
|
||||||
|
|||||||
@@ -16,7 +16,7 @@ describe "Stats" do
|
|||||||
it "updates *_query_time and *_wait_time" do
|
it "updates *_query_time and *_wait_time" do
|
||||||
connections = Array.new(3) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
|
connections = Array.new(3) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
|
||||||
connections.each do |c|
|
connections.each do |c|
|
||||||
Thread.new { c.async_exec("SELECT pg_sleep(0.25)") }
|
Thread.new { c.async_exec("BEGIN; SELECT pg_sleep(0.25); COMMIT;") }
|
||||||
end
|
end
|
||||||
sleep(1)
|
sleep(1)
|
||||||
connections.map(&:close)
|
connections.map(&:close)
|
||||||
@@ -25,10 +25,32 @@ describe "Stats" do
|
|||||||
sleep(15.5)
|
sleep(15.5)
|
||||||
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
||||||
results = admin_conn.async_exec("SHOW STATS")[0]
|
results = admin_conn.async_exec("SHOW STATS")[0]
|
||||||
admin_conn.close
|
|
||||||
expect(results["total_query_time"].to_i).to be_within(200).of(750)
|
expect(results["total_query_time"].to_i).to be_within(200).of(750)
|
||||||
expect(results["avg_query_time"].to_i).to be_within(50).of(250)
|
expect(results["avg_query_time"].to_i).to be_within(50).of(250)
|
||||||
|
|
||||||
|
expect(results["total_xact_time"].to_i).to be_within(200).of(750)
|
||||||
|
expect(results["avg_xact_time"].to_i).to be_within(50).of(250)
|
||||||
|
|
||||||
|
expect(results["total_wait_time"].to_i).to_not eq(0)
|
||||||
|
expect(results["avg_wait_time"].to_i).to_not eq(0)
|
||||||
|
|
||||||
|
connections = Array.new(3) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
|
||||||
|
connections.each do |c|
|
||||||
|
Thread.new { c.async_exec("SELECT pg_sleep(0.25);") }
|
||||||
|
end
|
||||||
|
sleep(1)
|
||||||
|
connections.map(&:close)
|
||||||
|
|
||||||
|
results = admin_conn.async_exec("SHOW STATS")[0]
|
||||||
|
admin_conn.close
|
||||||
|
# This should increase with more queries
|
||||||
|
expect(results["total_query_time"].to_i).to be_within(400).of(1500)
|
||||||
|
expect(results["avg_query_time"].to_i).to be_within(50).of(250)
|
||||||
|
|
||||||
|
# This should not increase as we did not run any additional transactions
|
||||||
|
expect(results["total_xact_time"].to_i).to be_within(200).of(750)
|
||||||
|
expect(results["avg_xact_time"].to_i).to be_within(50).of(250)
|
||||||
|
|
||||||
expect(results["total_wait_time"].to_i).to_not eq(0)
|
expect(results["total_wait_time"].to_i).to_not eq(0)
|
||||||
expect(results["avg_wait_time"].to_i).to_not eq(0)
|
expect(results["avg_wait_time"].to_i).to_not eq(0)
|
||||||
end
|
end
|
||||||
|
|||||||
Reference in New Issue
Block a user