Compare commits

..

9 Commits

Author SHA1 Message Date
Sean McGivern
04e346df80 Track transaction time 2024-09-06 16:46:40 +01:00
brandonpike
cbf4d58144 Fix lint warnings for rust-1.79 (#769)
2 things that are recommended by rust-lang - implementing `std::fmt::Display` rather than ToString (1) and using clone_from (2).

[1] https://rust-lang.github.io/rust-clippy/master/index.html#/to_string_trait_impl
[2] https://rust-lang.github.io/rust-clippy/master/index.html#assigning_clones

Signed-off-by: Brandon Pike <pikebrandon@att.net>
2024-07-15 20:30:26 -07:00
Олег Дулецкий
731aa047ba Add ExecReload option to pgcat.service for configuration reloads (#760) 2024-06-24 08:57:58 -07:00
Adrian Garcia Badaracco
88dbcc21d1 update rust version in docker image (#762) 2024-06-24 08:51:38 -07:00
Adrian Garcia Badaracco
c34b15bddc Add STOPSIGNAL to Dockerfile (#758) 2024-06-20 23:23:41 -07:00
Andrey Stikheev
0b034a6831 Add TCP_NODELAY option to improve performance for large response queries (#749)
This commit adds the TCP_NODELAY option to the socket configuration in
`configure_socket` function. Without this option, we observed significant
performance issues when executing SELECT queries with large responses.

Before the fix:
postgres=> SELECT repeat('a', 1); SELECT repeat('a', 8153);
Time: 1.368 ms
Time: 41.364 ms

After the fix:
postgres=> SELECT repeat('a', 1); SELECT repeat('a', 8153);
Time: 1.332 ms
Time: 1.528 ms

By setting TCP_NODELAY, we eliminate the Nagle's algorithm delay, which
results in a substantial improvement in response times for large queries.

This problem was discussed in https://github.com/postgresml/pgcat/issues/616.
2024-05-26 14:47:21 -07:00
Mostafa Abdelraouf
966b8e093c Report checkout error when all servers are down (#736)
We shouldn't report checkout_success when we are going to return Error.
2024-05-08 12:18:27 -05:00
Horacio
c9270a47d4 Use rust:bullseye as base image (#725)
Use rust:bullseye base image

With the original rust:1.70-bullseye image, the container cannot be
built:

17.06   Installing /usr/local/cargo/bin/rustfilt
17.06    Installed package `rustfilt v0.2.1` (executable `rustfilt`)
17.06 error: failed to compile `cargo-binutils v0.3.6`, intermediate artifacts can be found at `/tmp/cargo-installrc6mPb`
17.06
17.06 Caused by:
17.06   package `cargo-platform v0.1.8` cannot be built because it requires rustc 1.73 or newer, while the currently active rustc version is 1.70.0
17.06   Try re-running cargo install with `--locked`
17.06      Summary Successfully installed rustfilt! Failed to install cargo-binutils (see error(s) above).
17.06 error: some crates failed to install

This is the same base image used on tests/docker/Dockerfile
2024-04-19 09:12:57 -07:00
Toby Hede
0d94d0b90a Update sqlparser to 0.41 (#666) 2024-04-12 22:12:37 -07:00
15 changed files with 111 additions and 50 deletions

View File

@@ -27,7 +27,7 @@ jobs:
python-version: 3.7
- name: Set up chart-testing
uses: helm/chart-testing-action@v2.6.1
uses: helm/chart-testing-action@v2.2.1
with:
version: v3.5.1

10
Cargo.lock generated
View File

@@ -1510,9 +1510,9 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "sqlparser"
version = "0.34.0"
version = "0.41.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37d3706eefb17039056234df6b566b0014f303f867f2656108334a55b8096f59"
checksum = "5cc2c25a6c66789625ef164b4c7d2e548d627902280c13710d33da8222169964"
dependencies = [
"log",
"sqlparser_derive",
@@ -1520,13 +1520,13 @@ dependencies = [
[[package]]
name = "sqlparser_derive"
version = "0.1.1"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55fe75cb4a364c7f7ae06c7dbbc8d84bddd85d6cdf9975963c3935bc1991761e"
checksum = "01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554"
dependencies = [
"proc-macro2",
"quote",
"syn 1.0.109",
"syn 2.0.26",
]
[[package]]

View File

@@ -19,7 +19,7 @@ serde_derive = "1"
regex = "1"
num_cpus = "1"
once_cell = "1"
sqlparser = {version = "0.34", features = ["visitor"] }
sqlparser = { version = "0.41", features = ["visitor"] }
log = "0.4"
arc-swap = "1"
parking_lot = "0.12.1"
@@ -47,9 +47,12 @@ serde_json = "1"
itertools = "0.10"
clap = { version = "4.3.1", features = ["derive", "env"] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.17", features = ["json", "env-filter", "std"]}
tracing-subscriber = { version = "0.3.17", features = [
"json",
"env-filter",
"std",
] }
lru = "0.12.0"
[target.'cfg(not(target_env = "msvc"))'.dependencies]
jemallocator = "0.5.0"

View File

@@ -1,4 +1,4 @@
FROM rust:1-slim-bookworm AS builder
FROM rust:1.79.0-slim-bookworm AS builder
RUN apt-get update && \
apt-get install -y build-essential
@@ -19,3 +19,4 @@ COPY --from=builder /app/pgcat.toml /etc/pgcat/pgcat.toml
WORKDIR /etc/pgcat
ENV RUST_LOG=info
CMD ["pgcat"]
STOPSIGNAL SIGINT

View File

@@ -1,4 +1,4 @@
FROM rust:1.70-bullseye
FROM rust:bullseye
# Dependencies
COPY --from=sclevine/yj /bin/yj /bin/yj

View File

@@ -11,6 +11,7 @@ RestartSec=1
Environment=RUST_LOG=info
LimitNOFILE=65536
ExecStart=/usr/bin/pgcat /etc/pgcat.toml
ExecReload=/bin/kill -SIGHUP $MAINPID
[Install]
WantedBy=multi-user.target

View File

@@ -1204,9 +1204,12 @@ where
if !server.in_transaction() {
// Report transaction executed statistics.
self.stats.transaction();
server
.stats()
.transaction(self.server_parameters.get_application_name());
server.stats().transaction(
Instant::now()
.duration_since(server.transaction_start().into())
.as_millis() as u64,
self.server_parameters.get_application_name(),
);
// Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects.
@@ -1460,9 +1463,12 @@ where
if !server.in_transaction() {
self.stats.transaction();
server
.stats()
.transaction(self.server_parameters.get_application_name());
server.stats().transaction(
Instant::now()
.duration_since(server.transaction_start().into())
.as_millis() as u64,
self.server_parameters.get_application_name(),
);
// Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects.
@@ -1511,9 +1517,12 @@ where
if !server.in_transaction() {
self.stats.transaction();
server
.stats()
.transaction(self.server_parameters.get_application_name());
server.stats().transaction(
Instant::now()
.duration_since(server.transaction_start().into())
.as_millis() as u64,
self.server_parameters.get_application_name(),
);
// Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects.

View File

@@ -38,12 +38,12 @@ pub enum Role {
Mirror,
}
impl ToString for Role {
fn to_string(&self) -> String {
match *self {
Role::Primary => "primary".to_string(),
Role::Replica => "replica".to_string(),
Role::Mirror => "mirror".to_string(),
impl std::fmt::Display for Role {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Role::Primary => write!(f, "primary"),
Role::Replica => write!(f, "replica"),
Role::Mirror => write!(f, "mirror"),
}
}
}
@@ -476,11 +476,11 @@ pub enum PoolMode {
Session,
}
impl ToString for PoolMode {
fn to_string(&self) -> String {
match *self {
PoolMode::Transaction => "transaction".to_string(),
PoolMode::Session => "session".to_string(),
impl std::fmt::Display for PoolMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
PoolMode::Transaction => write!(f, "transaction"),
PoolMode::Session => write!(f, "session"),
}
}
}
@@ -493,12 +493,13 @@ pub enum LoadBalancingMode {
#[serde(alias = "loc", alias = "LOC", alias = "least_outstanding_connections")]
LeastOutstandingConnections,
}
impl ToString for LoadBalancingMode {
fn to_string(&self) -> String {
match *self {
LoadBalancingMode::Random => "random".to_string(),
impl std::fmt::Display for LoadBalancingMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
LoadBalancingMode::Random => write!(f, "random"),
LoadBalancingMode::LeastOutstandingConnections => {
"least_outstanding_connections".to_string()
write!(f, "least_outstanding_connections")
}
}
}
@@ -999,15 +1000,17 @@ impl Config {
pub fn fill_up_auth_query_config(&mut self) {
for (_name, pool) in self.pools.iter_mut() {
if pool.auth_query.is_none() {
pool.auth_query = self.general.auth_query.clone();
pool.auth_query.clone_from(&self.general.auth_query);
}
if pool.auth_query_user.is_none() {
pool.auth_query_user = self.general.auth_query_user.clone();
pool.auth_query_user
.clone_from(&self.general.auth_query_user);
}
if pool.auth_query_password.is_none() {
pool.auth_query_password = self.general.auth_query_password.clone();
pool.auth_query_password
.clone_from(&self.general.auth_query_password);
}
}
}
@@ -1155,7 +1158,7 @@ impl Config {
"Default max server lifetime: {}ms",
self.general.server_lifetime
);
info!("Sever round robin: {}", self.general.server_round_robin);
info!("Server round robin: {}", self.general.server_round_robin);
match self.general.tls_certificate.clone() {
Some(tls_certificate) => {
info!("TLS certificate: {}", tls_certificate);

View File

@@ -733,6 +733,10 @@ pub fn configure_socket(stream: &TcpStream) {
}
Err(err) => error!("Could not configure socket: {}", err),
}
match sock_ref.set_nodelay(true) {
Ok(_) => (),
Err(err) => error!("Could not configure TCP_NODELAY for socket: {}", err),
}
}
pub trait BytesMutReader {

View File

@@ -813,7 +813,7 @@ impl ConnectionPool {
}
}
client_stats.checkout_success();
client_stats.checkout_error();
Err(Error::AllServersDown)
}

View File

@@ -499,6 +499,7 @@ impl QueryRouter {
table: _,
on: _,
returning: _,
ignore: _,
} => {
// Not supported in postgres.
assert!(or.is_none());
@@ -506,7 +507,9 @@ impl QueryRouter {
assert!(after_columns.is_empty());
Self::process_table(table_name, &mut table_names);
Self::process_query(source, &mut exprs, &mut table_names, &Some(columns));
if let Some(source) = source {
Self::process_query(source, &mut exprs, &mut table_names, &Some(columns));
}
}
Delete {
tables,
@@ -514,6 +517,8 @@ impl QueryRouter {
using,
selection,
returning: _,
order_by: _,
limit: _,
} => {
if let Some(expr) = selection {
exprs.push(expr.clone());

View File

@@ -15,6 +15,7 @@ use std::sync::Arc;
use std::time::SystemTime;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, BufStream};
use tokio::net::TcpStream;
use tokio::time::Instant;
use tokio_rustls::rustls::{OwnedTrustAnchor, RootCertStore};
use tokio_rustls::{client::TlsStream, TlsConnector};
@@ -285,6 +286,9 @@ pub struct Server {
/// Is the server inside a transaction or idle.
in_transaction: bool,
/// The time the most recent transaction started.
transaction_start: Instant,
/// Is there more data for the client to read.
data_available: bool,
@@ -804,6 +808,7 @@ impl Server {
process_id,
secret_key,
in_transaction: false,
transaction_start: Instant::now(),
in_copy_mode: false,
data_available: false,
bad: false,
@@ -936,6 +941,7 @@ impl Server {
// In transaction.
'T' => {
self.in_transaction = true;
self.transaction_start = Instant::now();
}
// Idle, transaction over.
@@ -1220,6 +1226,12 @@ impl Server {
self.in_transaction
}
/// The start time of the most recent transaction.
/// Will be stale if not in a transaction.
pub fn transaction_start(&self) -> Instant {
self.transaction_start
}
/// Currently copying data from client to server or vice-versa.
pub fn in_copy_mode(&self) -> bool {
self.in_copy_mode

View File

@@ -14,11 +14,11 @@ pub enum ShardingFunction {
Sha1,
}
impl ToString for ShardingFunction {
fn to_string(&self) -> String {
match *self {
ShardingFunction::PgBigintHash => "pg_bigint_hash".to_string(),
ShardingFunction::Sha1 => "sha1".to_string(),
impl std::fmt::Display for ShardingFunction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ShardingFunction::PgBigintHash => write!(f, "pg_bigint_hash"),
ShardingFunction::Sha1 => write!(f, "sha1"),
}
}
}

View File

@@ -187,11 +187,12 @@ impl ServerStats {
/// we report each individual queries outside a transaction as a transaction
/// We only count the initial BEGIN as a transaction, all queries within do not
/// count as transactions
pub fn transaction(&self, application_name: &str) {
pub fn transaction(&self, milliseconds: u64, application_name: &str) {
self.set_application(application_name.to_string());
self.transaction_count.fetch_add(1, Ordering::Relaxed);
self.address.stats.xact_count_add();
self.address.stats.xact_time_add(milliseconds);
}
/// Report data sent to a server

View File

@@ -16,7 +16,7 @@ describe "Stats" do
it "updates *_query_time and *_wait_time" do
connections = Array.new(3) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
connections.each do |c|
Thread.new { c.async_exec("SELECT pg_sleep(0.25)") }
Thread.new { c.async_exec("BEGIN; SELECT pg_sleep(0.25); COMMIT;") }
end
sleep(1)
connections.map(&:close)
@@ -25,10 +25,32 @@ describe "Stats" do
sleep(15.5)
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW STATS")[0]
admin_conn.close
expect(results["total_query_time"].to_i).to be_within(200).of(750)
expect(results["avg_query_time"].to_i).to be_within(50).of(250)
expect(results["total_xact_time"].to_i).to be_within(200).of(750)
expect(results["avg_xact_time"].to_i).to be_within(50).of(250)
expect(results["total_wait_time"].to_i).to_not eq(0)
expect(results["avg_wait_time"].to_i).to_not eq(0)
connections = Array.new(3) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
connections.each do |c|
Thread.new { c.async_exec("SELECT pg_sleep(0.25);") }
end
sleep(1)
connections.map(&:close)
results = admin_conn.async_exec("SHOW STATS")[0]
admin_conn.close
# This should increase with more queries
expect(results["total_query_time"].to_i).to be_within(400).of(1500)
expect(results["avg_query_time"].to_i).to be_within(50).of(250)
# This should not increase as we did not run any additional transactions
expect(results["total_xact_time"].to_i).to be_within(200).of(750)
expect(results["avg_xact_time"].to_i).to be_within(50).of(250)
expect(results["total_wait_time"].to_i).to_not eq(0)
expect(results["avg_wait_time"].to_i).to_not eq(0)
end