Compare commits

...

21 Commits

Author SHA1 Message Date
Lev Kokotov
1f2c6507f7 debug -> release 2023-08-01 17:47:34 -07:00
Lev Kokotov
aefcf4281c Fix for #534 and #535 2023-08-01 17:46:34 -07:00
Bertrand Paquet
9d1c46a3e9 Fix typo in the config documentation (#532) 2023-07-28 00:31:53 -07:00
Spindel Ljungmark
328108aeb5 Restore the ability to filter spammy log messages (#530)
* Move connection checkin log messages to their own target

Under heavy load they can happen thousands of times per second, and
should generally be considered a nuisance at best. This marks the state
discard as an info rather than a warning, and moves all the messages
into their own log-target, so they can be filtered separately from the
more relevant warnings.

Signed-off-by: D.S. Ljungmark <spider@skuggor.se>

* Remove left-over env_logger dependencies

When moving to tracing-subscriber for logging, the env_logger
dependencies were left around, this cuts them out as dead code.

Signed-off-by: D.S. Ljungmark <spider@skuggor.se>

* Restore ability to filter log messages at runtime

This restores the RUST_LOG filters from env_logger but now with the
tracing subscriber setup. The filters are chained so commandline options
mark the default in case either option is set, which should be the path
of least confusion for users.  ( RUST_LOG setting level to debug, and
commandline to warning is an odd user case, and I don't know what a user
who does that is expecting. )

It also bumps the version number as a fix to see which versions have
which behaviour.

Signed-off-by: D.S. Ljungmark <spider@skuggor.se>

---------

Signed-off-by: D.S. Ljungmark <spider@skuggor.se>
2023-07-27 08:51:23 -07:00
Lev Kokotov
4cf54a6122 Release 1.1 (#526) 2023-07-25 10:27:04 -07:00
Mostafa Abdelraouf
2a8f3653a6 Fix COPY FROM and add tests (#522)
* Fix COPY FROM and add tests

* E

* fmt
2023-07-20 23:06:01 -07:00
Sebastian Webber
19cb8a3022 add --no-color option to disable colors in the terminal (#518)
add --no-color option to disable colors

this commit adds a new option to disable colors in the terminal and also
moves the logger configuration to a different crate.

Signed-off-by: Sebastian Webber <sebastian@swebber.me>
2023-07-19 21:15:55 -07:00
Sebastian Webber
f85e5bd9e8 add support for multiple log formats (#517)
this commit adds the tracing-subscriber crate and use its formatters to
support multiple log formats.

More details in
https://github.com/postgresml/pgcat/issues/464#issuecomment-1641430299

Signed-off-by: Sebastian Webber <sebastian@swebber.me>
2023-07-18 23:07:13 -07:00
Sebastian Webber
7bdb4e5cd9 Add cmd line parser (#512)
This commit adds the clap library and configures the necessary args to
parse from the command line,  expanding the current option of a single
file and adding support for environment variables.

Signed-off-by: Sebastian Webber <sebastian@swebber.me>
2023-07-18 13:52:40 -07:00
Sebastian Webber
5d87e3781e push and build only in main and tags (#508)
this commit changes the CI behavior to only build and push when something is committed to main or is a new tag.
2023-07-14 10:30:49 -07:00
dependabot[bot]
3e08c6bd8d chore(deps): bump num_cpus from 1.15.0 to 1.16.0 (#507)
Bumps [num_cpus](https://github.com/seanmonstar/num_cpus) from 1.15.0 to 1.16.0.
- [Release notes](https://github.com/seanmonstar/num_cpus/releases)
- [Changelog](https://github.com/seanmonstar/num_cpus/blob/master/CHANGELOG.md)
- [Commits](https://github.com/seanmonstar/num_cpus/compare/v1.15.0...v1.16.0)

---
updated-dependencies:
- dependency-name: num_cpus
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-07-14 07:58:11 -07:00
Sebastian Webber
15b6db8e4e add "show help" command (#505)
This commit adds a new function to handle notify and use it
in the SHOW HELP command, which displays the available options
in the admin console.

Also, adding Fabrízio as a co-author for all the help with the
protocol and the help to structure this PR.

Signed-off-by: Sebastian Webber <sebastian@swebber.me>
Co-authored-by: Fabrízio de Royes Mello <fabriziomello@gmail.com>
2023-07-13 22:40:04 -07:00
dependabot[bot]
b2e6dfd9bb chore(deps): bump rustls-pemfile from 1.0.2 to 1.0.3 (#504)
Bumps [rustls-pemfile](https://github.com/rustls/pemfile) from 1.0.2 to 1.0.3.
- [Commits](https://github.com/rustls/pemfile/commits)

---
updated-dependencies:
- dependency-name: rustls-pemfile
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-07-12 21:41:48 -07:00
Mostafa Abdelraouf
3c9565d351 Add support for tcp_user_timeout (#503)
* Add support for tcp_user_timeout

* option

* duration

* Some()

* docs

* fmt, compile
2023-07-12 11:24:30 -07:00
dependabot[bot]
67579c9af4 chore(deps): bump rustls from 0.21.1 to 0.21.5 (#501)
Bumps [rustls](https://github.com/rustls/rustls) from 0.21.1 to 0.21.5.
- [Release notes](https://github.com/rustls/rustls/releases)
- [Commits](https://github.com/rustls/rustls/compare/v/0.21.1...v/0.21.5)

---
updated-dependencies:
- dependency-name: rustls
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-07-12 05:46:31 -07:00
Cluas
cf7f6f35ab docs: fix general.autoreload description (#491)
* docs: fix autoreload description

Signed-off-by: Cluas <Cluas@live.cn>

* docs: add blank line

Signed-off-by: Cluas <Cluas@live.cn>

---------

Signed-off-by: Cluas <Cluas@live.cn>
2023-07-12 05:42:44 -07:00
Voldemarich
7205537b49 [BUG] Fix binding of NULL value parameters in prepared statements (#496)
Fix binding of NULL value parameters in prepared statements

Co-authored-by: anon <anon@non.existent>
2023-07-10 10:35:43 +02:00
Zain Kabani
1ed6e925ed Fixes the default for round robing in General (#488) 2023-06-23 09:15:44 -07:00
Lev Kokotov
4b78af9676 Implement Close for prepared statements (#482)
* Partial support for Close

* Close

* respect config value

* prepared spec

* Hmm

* Print cache size
2023-06-18 23:02:34 -07:00
Lev Kokotov
73500c0c96 Fix build (#481) 2023-06-17 09:09:54 -07:00
Lev Kokotov
b167de5aa3 fmt (#480) 2023-06-17 08:57:33 -07:00
19 changed files with 1173 additions and 499 deletions

View File

@@ -1,6 +1,11 @@
name: Build and Push
on: push
on:
push:
branches:
- main
tags:
- v*
env:
registry: ghcr.io

View File

@@ -1,4 +1,4 @@
# PgCat Configurations
# PgCat Configurations
## `general` Section
### host
@@ -116,10 +116,10 @@ If we should log client disconnections
### autoreload
```
path: general.autoreload
default: 15000
default: 15000 # milliseconds
```
When set to true, PgCat reloads configs if it detects a change in the config file.
When set, PgCat automatically reloads its configurations at the specified interval (in milliseconds) if it detects changes in the configuration file. The default interval is 15000 milliseconds or 15 seconds.
### worker_threads
```
@@ -151,7 +151,13 @@ path: general.tcp_keepalives_interval
default: 5
```
Number of seconds between keepalive packets.
### tcp_user_timeout
```
path: general.tcp_user_timeout
default: 10000
```
A linux-only parameters that defines the amount of time in milliseconds that transmitted data may remain unacknowledged or buffered data may remain untransmitted (due to zero window size) before TCP will forcibly disconnect
### tls_certificate
```
@@ -224,7 +230,7 @@ default: "random"
Load balancing mode
`random` selects the server at random
`loc` selects the server with the least outstanding busy conncetions
`loc` selects the server with the least outstanding busy connections
### default_role
```

963
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
[package]
name = "pgcat"
version = "1.0.2-alpha3"
version = "1.1.1"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@@ -22,7 +22,6 @@ once_cell = "1"
sqlparser = {version = "0.34", features = ["visitor"] }
log = "0.4"
arc-swap = "1"
env_logger = "0.10"
parking_lot = "0.12.1"
hmac = "0.12"
sha2 = "0.10"
@@ -46,6 +45,9 @@ trust-dns-resolver = "0.22.0"
tokio-test = "0.4.2"
serde_json = "1"
itertools = "0.10"
clap = { version = "4.3.1", features = ["derive", "env"] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.17", features = ["json", "env-filter", "std"]}
[target.'cfg(not(target_env = "msvc"))'.dependencies]
jemallocator = "0.5.0"

View File

@@ -63,6 +63,9 @@ tcp_keepalives_interval = 5
# Handle prepared statements.
prepared_statements = true
# Prepared statements server cache size.
prepared_statements_cache_size = 500
# Path to TLS Certificate file to use for TLS connections
# tls_certificate = ".circleci/server.cert"
# Path to TLS private key file to use for TLS connections
@@ -267,7 +270,7 @@ username = "sharding_user"
# if `server_password` is not set.
password = "sharding_user"
pool_mode = "session"
pool_mode = "transaction"
# PostgreSQL username used to connect to the server.
# server_username = "another_user"

View File

@@ -84,6 +84,10 @@ where
shutdown(stream).await
}
"SHOW" => match query_parts[1].to_ascii_uppercase().as_str() {
"HELP" => {
trace!("SHOW HELP");
show_help(stream).await
}
"BANS" => {
trace!("SHOW BANS");
show_bans(stream).await
@@ -271,6 +275,45 @@ where
write_all_half(stream, &res).await
}
/// Show all available options.
async fn show_help<T>(stream: &mut T) -> Result<(), Error>
where
T: tokio::io::AsyncWrite + std::marker::Unpin,
{
let mut res = BytesMut::new();
let detail_msg = vec![
"",
"SHOW HELP|CONFIG|DATABASES|POOLS|CLIENTS|SERVERS|USERS|VERSION",
// "SHOW PEERS|PEER_POOLS", // missing PEERS|PEER_POOLS
// "SHOW FDS|SOCKETS|ACTIVE_SOCKETS|LISTS|MEM|STATE", // missing FDS|SOCKETS|ACTIVE_SOCKETS|MEM|STATE
"SHOW LISTS",
// "SHOW DNS_HOSTS|DNS_ZONES", // missing DNS_HOSTS|DNS_ZONES
"SHOW STATS", // missing STATS_TOTALS|STATS_AVERAGES|TOTALS
"SET key = arg",
"RELOAD",
"PAUSE [<db>, <user>]",
"RESUME [<db>, <user>]",
// "DISABLE <db>", // missing
// "ENABLE <db>", // missing
// "RECONNECT [<db>]", missing
// "KILL <db>",
// "SUSPEND",
"SHUTDOWN",
// "WAIT_CLOSE [<db>]", // missing
];
res.put(notify("Console usage", detail_msg.join("\n\t")));
res.put(command_complete("SHOW"));
// ReadyForQuery
res.put_u8(b'Z');
res.put_i32(5);
res.put_u8(b'I');
write_all_half(stream, &res).await
}
/// Show shards and replicas.
async fn show_databases<T>(stream: &mut T) -> Result<(), Error>
where
@@ -701,6 +744,7 @@ where
("age_seconds", DataType::Numeric),
("prepare_cache_hit", DataType::Numeric),
("prepare_cache_miss", DataType::Numeric),
("prepare_cache_size", DataType::Numeric),
];
let new_map = get_server_stats();
@@ -732,6 +776,10 @@ where
.prepared_miss_count
.load(Ordering::Relaxed)
.to_string(),
server
.prepared_cache_size
.load(Ordering::Relaxed)
.to_string(),
];
res.put(data_row(&row));

View File

@@ -906,6 +906,19 @@ where
return Ok(());
}
// Close (F)
'C' => {
if prepared_statements_enabled {
let close: Close = (&message).try_into()?;
if close.is_prepared_statement() && !close.anonymous() {
self.prepared_statements.remove(&close.name);
write_all_flush(&mut self.write, &close_complete()).await?;
continue;
}
}
}
_ => (),
}
@@ -1130,7 +1143,17 @@ where
} else {
// The statement is not prepared on the server, so we need to prepare it.
if server.should_prepare(&statement.name) {
server.prepare(statement).await?;
match server.prepare(statement).await {
Ok(_) => (),
Err(err) => {
pool.ban(
&address,
BanReason::MessageSendFailed,
Some(&self.stats),
);
return Err(err);
}
}
}
}
@@ -1237,7 +1260,7 @@ where
// Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects.
if self.transaction_mode {
if self.transaction_mode && !server.in_copy_mode() {
self.stats.idle();
break;
@@ -1251,6 +1274,10 @@ where
self.stats.disconnect();
self.release();
if prepared_statements_enabled {
server.maintain_cache().await?;
}
return Ok(());
}
@@ -1300,6 +1327,21 @@ where
// Close the prepared statement.
'C' => {
if prepared_statements_enabled {
let close: Close = (&message).try_into()?;
if close.is_prepared_statement() && !close.anonymous() {
match self.prepared_statements.get(&close.name) {
Some(parse) => {
server.will_close(&parse.generated_name);
}
// A prepared statement slipped through? Not impossible, since we don't support PREPARE yet.
None => (),
};
}
}
self.buffer.put(&message[..]);
}
@@ -1368,7 +1410,7 @@ where
// Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects.
if self.transaction_mode {
if self.transaction_mode && !server.in_copy_mode() {
break;
}
}
@@ -1433,7 +1475,13 @@ where
// The server is no longer bound to us, we can't cancel it's queries anymore.
debug!("Releasing server back into the pool");
server.checkin_cleanup().await?;
if prepared_statements_enabled {
server.maintain_cache().await?;
}
server.stats().idle();
self.connected_to_server = false;

36
src/cmd_args.rs Normal file
View File

@@ -0,0 +1,36 @@
use clap::{Parser, ValueEnum};
use tracing::Level;
/// PgCat: Nextgen PostgreSQL Pooler
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
pub struct Args {
#[arg(default_value_t = String::from("pgcat.toml"), env)]
pub config_file: String,
#[arg(short, long, default_value_t = tracing::Level::INFO, env)]
pub log_level: Level,
#[clap(short='F', long, value_enum, default_value_t=LogFormat::Text, env)]
pub log_format: LogFormat,
#[arg(
short,
long,
default_value_t = false,
env,
help = "disable colors in the log output"
)]
pub no_color: bool,
}
pub fn parse() -> Args {
return Args::parse();
}
#[derive(ValueEnum, Clone, Debug)]
pub enum LogFormat {
Text,
Structured,
Debug,
}

View File

@@ -261,6 +261,8 @@ pub struct General {
pub tcp_keepalives_count: u32,
#[serde(default = "General::default_tcp_keepalives_interval")]
pub tcp_keepalives_interval: u64,
#[serde(default = "General::default_tcp_user_timeout")]
pub tcp_user_timeout: u64,
#[serde(default)] // False
pub log_client_connections: bool,
@@ -323,6 +325,9 @@ pub struct General {
#[serde(default)]
pub prepared_statements: bool,
#[serde(default = "General::default_prepared_statements_cache_size")]
pub prepared_statements_cache_size: usize,
}
impl General {
@@ -357,6 +362,10 @@ impl General {
5 // 5 seconds
}
pub fn default_tcp_user_timeout() -> u64 {
10000 // 10000 milliseconds
}
pub fn default_idle_timeout() -> u64 {
600000 // 10 minutes
}
@@ -400,6 +409,10 @@ impl General {
pub fn default_server_round_robin() -> bool {
true
}
pub fn default_prepared_statements_cache_size() -> usize {
500
}
}
impl Default for General {
@@ -420,6 +433,7 @@ impl Default for General {
tcp_keepalives_idle: Self::default_tcp_keepalives_idle(),
tcp_keepalives_count: Self::default_tcp_keepalives_count(),
tcp_keepalives_interval: Self::default_tcp_keepalives_interval(),
tcp_user_timeout: Self::default_tcp_user_timeout(),
log_client_connections: false,
log_client_disconnections: false,
autoreload: None,
@@ -435,9 +449,10 @@ impl Default for General {
auth_query_user: None,
auth_query_password: None,
server_lifetime: Self::default_server_lifetime(),
server_round_robin: false,
server_round_robin: Self::default_server_round_robin(),
validate_config: true,
prepared_statements: false,
prepared_statements_cache_size: 500,
}
}
}
@@ -1020,6 +1035,12 @@ impl Config {
self.general.verify_server_certificate
);
info!("Prepared statements: {}", self.general.prepared_statements);
if self.general.prepared_statements {
info!(
"Prepared statements server cache size: {}",
self.general.prepared_statements_cache_size
);
}
info!(
"Plugins: {}",
match self.plugins {
@@ -1239,13 +1260,15 @@ pub fn get_config() -> Config {
}
pub fn get_idle_client_in_transaction_timeout() -> u64 {
(*(*CONFIG.load()))
.general
.idle_client_in_transaction_timeout
CONFIG.load().general.idle_client_in_transaction_timeout
}
pub fn get_prepared_statements() -> bool {
(*(*CONFIG.load())).general.prepared_statements
CONFIG.load().general.prepared_statements
}
pub fn get_prepared_statements_cache_size() -> usize {
CONFIG.load().general.prepared_statements_cache_size
}
/// Parse the configuration file located at the path.

View File

@@ -1,13 +1,14 @@
pub mod admin;
pub mod auth_passthrough;
pub mod client;
pub mod cmd_args;
pub mod config;
pub mod constants;
pub mod dns_cache;
pub mod errors;
pub mod logger;
pub mod messages;
pub mod mirrors;
pub mod multi_logger;
pub mod plugins;
pub mod pool;
pub mod prometheus;

19
src/logger.rs Normal file
View File

@@ -0,0 +1,19 @@
use crate::cmd_args::{Args, LogFormat};
use tracing_subscriber;
use tracing_subscriber::EnvFilter;
pub fn init(args: &Args) {
// Iniitalize a default filter, and then override the builtin default "warning" with our
// commandline, (default: "info")
let filter = EnvFilter::from_default_env().add_directive(args.log_level.into());
let trace_sub = tracing_subscriber::fmt()
.with_env_filter(filter)
.with_ansi(!args.no_color);
match args.log_format {
LogFormat::Structured => trace_sub.json().init(),
LogFormat::Debug => trace_sub.pretty().init(),
_ => trace_sub.init(),
};
}

View File

@@ -23,7 +23,6 @@ extern crate arc_swap;
extern crate async_trait;
extern crate bb8;
extern crate bytes;
extern crate env_logger;
extern crate exitcode;
extern crate log;
extern crate md5;
@@ -61,15 +60,18 @@ use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::broadcast;
use pgcat::cmd_args;
use pgcat::config::{get_config, reload_config, VERSION};
use pgcat::dns_cache;
use pgcat::logger;
use pgcat::messages::configure_socket;
use pgcat::pool::{ClientServerMap, ConnectionPool};
use pgcat::prometheus::start_metric_server;
use pgcat::stats::{Collector, Reporter, REPORTER};
fn main() -> Result<(), Box<dyn std::error::Error>> {
pgcat::multi_logger::MultiLogger::init().unwrap();
let args = cmd_args::parse();
logger::init(&args);
info!("Welcome to PgCat! Meow. (Version {})", VERSION);
@@ -78,20 +80,12 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
std::process::exit(exitcode::CONFIG);
}
let args = std::env::args().collect::<Vec<String>>();
let config_file = if args.len() == 2 {
args[1].to_string()
} else {
String::from("pgcat.toml")
};
// Create a transient runtime for loading the config for the first time.
{
let runtime = Builder::new_multi_thread().worker_threads(1).build()?;
runtime.block_on(async {
match pgcat::config::parse(&config_file).await {
match pgcat::config::parse(args.config_file.as_str()).await {
Ok(_) => (),
Err(err) => {
error!("Config parse error: {:?}", err);

View File

@@ -1,7 +1,7 @@
/// Helper functions to send one-off protocol messages
/// and handle TcpStream (TCP socket).
use bytes::{Buf, BufMut, BytesMut};
use log::error;
use log::{debug, error};
use md5::{Digest, Md5};
use socket2::{SockRef, TcpKeepalive};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
@@ -530,6 +530,26 @@ pub fn command_complete(command: &str) -> BytesMut {
res
}
/// Create a notify message.
pub fn notify(message: &str, details: String) -> BytesMut {
let mut notify_cmd = BytesMut::new();
notify_cmd.put_slice("SNOTICE\0".as_bytes());
notify_cmd.put_slice("C00000\0".as_bytes());
notify_cmd.put_slice(format!("M{}\0", message).as_bytes());
notify_cmd.put_slice(format!("D{}\0", details).as_bytes());
// this extra byte says that is the end of the package
notify_cmd.put_u8(0);
let mut res = BytesMut::new();
res.put_u8(b'N');
res.put_i32(notify_cmd.len() as i32 + 4);
res.put(notify_cmd);
res
}
pub fn flush() -> BytesMut {
let mut bytes = BytesMut::new();
bytes.put_u8(b'H');
@@ -669,6 +689,13 @@ pub fn configure_socket(stream: &TcpStream) {
let sock_ref = SockRef::from(stream);
let conf = get_config();
#[cfg(target_os = "linux")]
match sock_ref.set_tcp_user_timeout(Some(Duration::from_millis(conf.general.tcp_user_timeout)))
{
Ok(_) => (),
Err(err) => error!("Could not configure tcp_user_timeout for socket: {}", err),
}
match sock_ref.set_keepalive(true) {
Ok(_) => {
match sock_ref.set_tcp_keepalive(
@@ -678,7 +705,7 @@ pub fn configure_socket(stream: &TcpStream) {
.with_time(Duration::from_secs(conf.general.tcp_keepalives_idle)),
) {
Ok(_) => (),
Err(err) => error!("Could not configure socket: {}", err),
Err(err) => error!("Could not configure tcp_keepalive for socket: {}", err),
}
}
Err(err) => error!("Could not configure socket: {}", err),
@@ -832,10 +859,21 @@ impl TryFrom<&BytesMut> for Bind {
for _ in 0..num_param_values {
let param_len = cursor.get_i32();
let mut param = BytesMut::with_capacity(param_len as usize);
param.resize(param_len as usize, b'0');
cursor.copy_to_slice(&mut param);
param_values.push((param_len, param));
// There is special occasion when the parameter is NULL
// In that case, param length is defined as -1
// So if the passed parameter len is over 0
if param_len > 0 {
let mut param = BytesMut::with_capacity(param_len as usize);
param.resize(param_len as usize, b'0');
cursor.copy_to_slice(&mut param);
// we push and the length and the parameter into vector
param_values.push((param_len, param));
} else {
// otherwise we push a tuple with -1 and 0-len BytesMut
// which means that after encountering -1 postgres proceeds
// to processing another parameter
param_values.push((param_len, BytesMut::new()));
}
}
let num_result_column_format_codes = cursor.get_i16();
@@ -976,6 +1014,84 @@ impl Describe {
}
}
/// Close (F) message.
/// See: <https://www.postgresql.org/docs/current/protocol-message-formats.html>
#[derive(Clone, Debug)]
pub struct Close {
code: char,
#[allow(dead_code)]
len: i32,
close_type: char,
pub name: String,
}
impl TryFrom<&BytesMut> for Close {
type Error = Error;
fn try_from(bytes: &BytesMut) -> Result<Close, Error> {
let mut cursor = Cursor::new(bytes);
let code = cursor.get_u8() as char;
let len = cursor.get_i32();
let close_type = cursor.get_u8() as char;
let name = cursor.read_string()?;
Ok(Close {
code,
len,
close_type,
name,
})
}
}
impl TryFrom<Close> for BytesMut {
type Error = Error;
fn try_from(close: Close) -> Result<BytesMut, Error> {
debug!("Close: {:?}", close);
let mut bytes = BytesMut::new();
let name_binding = CString::new(close.name)?;
let name = name_binding.as_bytes_with_nul();
let len = 4 + 1 + name.len();
bytes.put_u8(close.code as u8);
bytes.put_i32(len as i32);
bytes.put_u8(close.close_type as u8);
bytes.put_slice(name);
Ok(bytes)
}
}
impl Close {
pub fn new(name: &str) -> Close {
let name = name.to_string();
Close {
code: 'C',
len: 4 + 1 + name.len() as i32 + 1, // will be recalculated
close_type: 'S',
name,
}
}
pub fn is_prepared_statement(&self) -> bool {
self.close_type == 'S'
}
pub fn anonymous(&self) -> bool {
self.name.is_empty()
}
}
pub fn close_complete() -> BytesMut {
let mut bytes = BytesMut::new();
bytes.put_u8(b'3');
bytes.put_i32(4);
bytes
}
pub fn prepared_statement_name() -> String {
format!(
"P_{}",

View File

@@ -1,80 +0,0 @@
use log::{Level, Log, Metadata, Record, SetLoggerError};
// This is a special kind of logger that allows sending logs to different
// targets depending on the log level.
//
// By default, if nothing is set, it acts as a regular env_log logger,
// it sends everything to standard error.
//
// If the Env variable `STDOUT_LOG` is defined, it will be used for
// configuring the standard out logger.
//
// The behavior is:
// - If it is an error, the message is written to standard error.
// - If it is not, and it matches the log level of the standard output logger (`STDOUT_LOG` env var), it will be send to standard output.
// - If the above is not true, it is sent to the stderr logger that will log it or not depending on the value
// of the RUST_LOG env var.
//
// So to summarize, if no `STDOUT_LOG` env var is present, the logger is the default logger. If `STDOUT_LOG` is set, everything
// but errors, that matches the log level set in the `STDOUT_LOG` env var is sent to stdout. You can have also some esoteric configuration
// where you set `RUST_LOG=debug` and `STDOUT_LOG=info`, in here, errors will go to stderr, warns and infos to stdout and debugs to stderr.
//
pub struct MultiLogger {
stderr_logger: env_logger::Logger,
stdout_logger: env_logger::Logger,
}
impl MultiLogger {
fn new() -> Self {
let stderr_logger = env_logger::builder().format_timestamp_micros().build();
let stdout_logger = env_logger::Builder::from_env("STDOUT_LOG")
.format_timestamp_micros()
.target(env_logger::Target::Stdout)
.build();
Self {
stderr_logger,
stdout_logger,
}
}
pub fn init() -> Result<(), SetLoggerError> {
let logger = Self::new();
log::set_max_level(logger.stderr_logger.filter());
log::set_boxed_logger(Box::new(logger))
}
}
impl Log for MultiLogger {
fn enabled(&self, metadata: &Metadata) -> bool {
self.stderr_logger.enabled(metadata) && self.stdout_logger.enabled(metadata)
}
fn log(&self, record: &Record) {
if record.level() == Level::Error {
self.stderr_logger.log(record);
} else {
if self.stdout_logger.matches(record) {
self.stdout_logger.log(record);
} else {
self.stderr_logger.log(record);
}
}
}
fn flush(&self) {
self.stderr_logger.flush();
self.stdout_logger.flush();
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_init() {
MultiLogger::init().unwrap();
}
}

View File

@@ -1,6 +1,6 @@
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use log::{error, info, debug};
use log::{debug, error, info};
use phf::phf_map;
use std::collections::HashMap;
use std::fmt;
@@ -364,7 +364,7 @@ fn push_server_stats(lines: &mut Vec<String>) {
{
lines.push(prometheus_metric.to_string());
} else {
warn!("Metric {} not implemented for {}", key, address.name());
debug!("Metric {} not implemented for {}", key, address.name());
}
}
}

View File

@@ -15,7 +15,7 @@ use tokio::net::TcpStream;
use tokio_rustls::rustls::{OwnedTrustAnchor, RootCertStore};
use tokio_rustls::{client::TlsStream, TlsConnector};
use crate::config::{get_config, Address, User};
use crate::config::{get_config, get_prepared_statements_cache_size, Address, User};
use crate::constants::*;
use crate::dns_cache::{AddrSet, CACHED_RESOLVER};
use crate::errors::{Error, ServerIdentifier};
@@ -170,6 +170,9 @@ pub struct Server {
/// Is there more data for the client to read.
data_available: bool,
/// Is the server in copy-in or copy-out modes
in_copy_mode: bool,
/// Is the server broken? We'll remote it from the pool if so.
bad: bool,
@@ -677,6 +680,7 @@ impl Server {
process_id,
secret_key,
in_transaction: false,
in_copy_mode: false,
data_available: false,
bad: false,
cleanup_state: CleanupState::new(),
@@ -828,8 +832,19 @@ impl Server {
break;
}
// ErrorResponse
'E' => {
if self.in_copy_mode {
self.in_copy_mode = false;
}
}
// CommandComplete
'C' => {
if self.in_copy_mode {
self.in_copy_mode = false;
}
let mut command_tag = String::new();
match message.reader().read_to_string(&mut command_tag) {
Ok(_) => {
@@ -873,10 +888,14 @@ impl Server {
}
// CopyInResponse: copy is starting from client to server.
'G' => break,
'G' => {
self.in_copy_mode = true;
break;
}
// CopyOutResponse: copy is starting from the server to the client.
'H' => {
self.in_copy_mode = true;
self.data_available = true;
break;
}
@@ -914,12 +933,16 @@ impl Server {
Ok(bytes)
}
/// Add the prepared statement to being tracked by this server.
/// The client is processing data that will create a prepared statement on this server.
pub fn will_prepare(&mut self, name: &str) {
debug!("Will prepare `{}`", name);
self.prepared_statements.insert(name.to_string());
self.stats.prepared_cache_add();
}
/// Check if we should prepare a statement on the server.
pub fn should_prepare(&self, name: &str) -> bool {
let should_prepare = !self.prepared_statements.contains(name);
@@ -934,6 +957,7 @@ impl Server {
should_prepare
}
/// Create a prepared statement on the server.
pub async fn prepare(&mut self, parse: &Parse) -> Result<(), Error> {
debug!("Preparing `{}`", parse.name);
@@ -942,15 +966,86 @@ impl Server {
self.send(&flush()).await?;
// Read and discard ParseComplete (B)
let _ = read_message(&mut self.stream).await?;
match read_message(&mut self.stream).await {
Ok(_) => (),
Err(err) => {
self.bad = true;
return Err(err);
}
}
self.prepared_statements.insert(parse.name.to_string());
self.stats.prepared_cache_add();
debug!("Prepared `{}`", parse.name);
Ok(())
}
/// Maintain adequate cache size on the server.
pub async fn maintain_cache(&mut self) -> Result<(), Error> {
debug!("Cache maintenance run");
let max_cache_size = get_prepared_statements_cache_size();
let mut names = Vec::new();
while self.prepared_statements.len() >= max_cache_size {
// The prepared statmeents are alphanumerically sorted by the BTree.
// FIFO.
if let Some(name) = self.prepared_statements.pop_last() {
names.push(name);
}
}
if !names.is_empty() {
self.deallocate(names).await?;
}
Ok(())
}
/// Remove the prepared statement from being tracked by this server.
/// The client is processing data that will cause the server to close the prepared statement.
pub fn will_close(&mut self, name: &str) {
debug!("Will close `{}`", name);
self.prepared_statements.remove(name);
}
/// Close a prepared statement on the server.
pub async fn deallocate(&mut self, names: Vec<String>) -> Result<(), Error> {
for name in &names {
debug!("Deallocating prepared statement `{}`", name);
let close = Close::new(name);
let bytes: BytesMut = close.try_into()?;
self.send(&bytes).await?;
}
if !names.is_empty() {
self.send(&flush()).await?;
}
// Read and discard CloseComplete (3)
for name in &names {
match read_message(&mut self.stream).await {
Ok(_) => {
self.prepared_statements.remove(name);
self.stats.prepared_cache_remove();
debug!("Closed `{}`", name);
}
Err(err) => {
self.bad = true;
return Err(err);
}
};
}
Ok(())
}
/// If the server is still inside a transaction.
/// If the client disconnects while the server is in a transaction, we will clean it up.
pub fn in_transaction(&self) -> bool {
@@ -958,6 +1053,10 @@ impl Server {
self.in_transaction
}
pub fn in_copy_mode(&self) -> bool {
self.in_copy_mode
}
/// We don't buffer all of server responses, e.g. COPY OUT produces too much data.
/// The client is responsible to call `self.recv()` while this method returns true.
pub fn is_data_available(&self) -> bool {
@@ -1041,7 +1140,7 @@ impl Server {
// server connection thrashing if clients repeatedly do this.
// Instead, we ROLLBACK that transaction before putting the connection back in the pool
if self.in_transaction() {
warn!("Server returned while still in transaction, rolling back transaction");
warn!(target: "pgcat::server::cleanup", "Server returned while still in transaction, rolling back transaction");
self.query("ROLLBACK").await?;
}
@@ -1051,12 +1150,16 @@ impl Server {
// send `DISCARD ALL` if we think the session is altered instead of just sending
// it before each checkin.
if self.cleanup_state.needs_cleanup() && self.cleanup_connections {
warn!("Server returned with session state altered, discarding state ({}) for application {}", self.cleanup_state, self.application_name);
info!(target: "pgcat::server::cleanup", "Server returned with session state altered, discarding state ({}) for application {}", self.cleanup_state, self.application_name);
self.query("DISCARD ALL").await?;
self.query("RESET ROLE").await?;
self.cleanup_state.reset();
}
if self.in_copy_mode() {
warn!(target: "pgcat::server::cleanup", "Server returned while still in copy-mode");
}
Ok(())
}

View File

@@ -49,6 +49,7 @@ pub struct ServerStats {
pub error_count: Arc<AtomicU64>,
pub prepared_hit_count: Arc<AtomicU64>,
pub prepared_miss_count: Arc<AtomicU64>,
pub prepared_cache_size: Arc<AtomicU64>,
}
impl Default for ServerStats {
@@ -67,6 +68,7 @@ impl Default for ServerStats {
reporter: get_reporter(),
prepared_hit_count: Arc::new(AtomicU64::new(0)),
prepared_miss_count: Arc::new(AtomicU64::new(0)),
prepared_cache_size: Arc::new(AtomicU64::new(0)),
}
}
}
@@ -213,4 +215,12 @@ impl ServerStats {
pub fn prepared_cache_miss(&self) {
self.prepared_miss_count.fetch_add(1, Ordering::Relaxed);
}
pub fn prepared_cache_add(&self) {
self.prepared_cache_size.fetch_add(1, Ordering::Relaxed);
}
pub fn prepared_cache_remove(&self) {
self.prepared_cache_size.fetch_sub(1, Ordering::Relaxed);
}
}

102
tests/ruby/copy_spec.rb Normal file
View File

@@ -0,0 +1,102 @@
# frozen_string_literal: true
require_relative 'spec_helper'
describe "COPY Handling" do
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 5) }
before do
new_configs = processes.pgcat.current_config
# Allow connections in the pool to expire faster
new_configs["general"]["idle_timeout"] = 5
processes.pgcat.update_config(new_configs)
# We need to kill the old process that was using the default configs
processes.pgcat.stop
processes.pgcat.start
processes.pgcat.wait_until_ready
end
before do
processes.all_databases.first.with_connection do |conn|
conn.async_exec "CREATE TABLE copy_test_table (a TEXT,b TEXT,c TEXT,d TEXT)"
end
end
after do
processes.all_databases.first.with_connection do |conn|
conn.async_exec "DROP TABLE copy_test_table;"
end
end
after do
processes.all_databases.map(&:reset)
processes.pgcat.shutdown
end
describe "COPY FROM" do
context "within transaction" do
it "finishes within alloted time" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
Timeout.timeout(3) do
conn.async_exec("BEGIN")
conn.copy_data "COPY copy_test_table FROM STDIN CSV" do
sleep 0.5
conn.put_copy_data "some,data,to,copy\n"
conn.put_copy_data "more,data,to,copy\n"
end
conn.async_exec("COMMIT")
end
res = conn.async_exec("SELECT * FROM copy_test_table").to_a
expect(res).to eq([
{"a"=>"some", "b"=>"data", "c"=>"to", "d"=>"copy"},
{"a"=>"more", "b"=>"data", "c"=>"to", "d"=>"copy"}
])
end
end
context "outside transaction" do
it "finishes within alloted time" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
Timeout.timeout(3) do
conn.copy_data "COPY copy_test_table FROM STDIN CSV" do
sleep 0.5
conn.put_copy_data "some,data,to,copy\n"
conn.put_copy_data "more,data,to,copy\n"
end
end
res = conn.async_exec("SELECT * FROM copy_test_table").to_a
expect(res).to eq([
{"a"=>"some", "b"=>"data", "c"=>"to", "d"=>"copy"},
{"a"=>"more", "b"=>"data", "c"=>"to", "d"=>"copy"}
])
end
end
end
describe "COPY TO" do
before do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.async_exec("BEGIN")
conn.copy_data "COPY copy_test_table FROM STDIN CSV" do
conn.put_copy_data "some,data,to,copy\n"
conn.put_copy_data "more,data,to,copy\n"
end
conn.async_exec("COMMIT")
conn.close
end
it "works" do
res = []
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.copy_data "COPY copy_test_table TO STDOUT CSV" do
while row=conn.get_copy_data
res << row
end
end
expect(res).to eq(["some,data,to,copy\n", "more,data,to,copy\n"])
end
end
end

View File

@@ -0,0 +1,29 @@
require_relative 'spec_helper'
describe 'Prepared statements' do
let(:processes) { Helpers::Pgcat.three_shard_setup('sharded_db', 5) }
context 'enabled' do
it 'will work over the same connection' do
conn = PG.connect(processes.pgcat.connection_string('sharded_db', 'sharding_user'))
10.times do |i|
statement_name = "statement_#{i}"
conn.prepare(statement_name, 'SELECT $1::int')
conn.exec_prepared(statement_name, [1])
conn.describe_prepared(statement_name)
end
end
it 'will work with new connections' do
10.times do
conn = PG.connect(processes.pgcat.connection_string('sharded_db', 'sharding_user'))
statement_name = 'statement1'
conn.prepare('statement1', 'SELECT $1::int')
conn.exec_prepared('statement1', [1])
conn.describe_prepared('statement1')
end
end
end
end