Compare commits

..

17 Commits

Author SHA1 Message Date
Lev Kokotov
4cf54a6122 Release 1.1 (#526) 2023-07-25 10:27:04 -07:00
Mostafa Abdelraouf
2a8f3653a6 Fix COPY FROM and add tests (#522)
* Fix COPY FROM and add tests

* E

* fmt
2023-07-20 23:06:01 -07:00
Sebastian Webber
19cb8a3022 add --no-color option to disable colors in the terminal (#518)
add --no-color option to disable colors

this commit adds a new option to disable colors in the terminal and also
moves the logger configuration to a different crate.

Signed-off-by: Sebastian Webber <sebastian@swebber.me>
2023-07-19 21:15:55 -07:00
Sebastian Webber
f85e5bd9e8 add support for multiple log formats (#517)
this commit adds the tracing-subscriber crate and use its formatters to
support multiple log formats.

More details in
https://github.com/postgresml/pgcat/issues/464#issuecomment-1641430299

Signed-off-by: Sebastian Webber <sebastian@swebber.me>
2023-07-18 23:07:13 -07:00
Sebastian Webber
7bdb4e5cd9 Add cmd line parser (#512)
This commit adds the clap library and configures the necessary args to
parse from the command line,  expanding the current option of a single
file and adding support for environment variables.

Signed-off-by: Sebastian Webber <sebastian@swebber.me>
2023-07-18 13:52:40 -07:00
Sebastian Webber
5d87e3781e push and build only in main and tags (#508)
this commit changes the CI behavior to only build and push when something is committed to main or is a new tag.
2023-07-14 10:30:49 -07:00
dependabot[bot]
3e08c6bd8d chore(deps): bump num_cpus from 1.15.0 to 1.16.0 (#507)
Bumps [num_cpus](https://github.com/seanmonstar/num_cpus) from 1.15.0 to 1.16.0.
- [Release notes](https://github.com/seanmonstar/num_cpus/releases)
- [Changelog](https://github.com/seanmonstar/num_cpus/blob/master/CHANGELOG.md)
- [Commits](https://github.com/seanmonstar/num_cpus/compare/v1.15.0...v1.16.0)

---
updated-dependencies:
- dependency-name: num_cpus
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-07-14 07:58:11 -07:00
Sebastian Webber
15b6db8e4e add "show help" command (#505)
This commit adds a new function to handle notify and use it
in the SHOW HELP command, which displays the available options
in the admin console.

Also, adding Fabrízio as a co-author for all the help with the
protocol and the help to structure this PR.

Signed-off-by: Sebastian Webber <sebastian@swebber.me>
Co-authored-by: Fabrízio de Royes Mello <fabriziomello@gmail.com>
2023-07-13 22:40:04 -07:00
dependabot[bot]
b2e6dfd9bb chore(deps): bump rustls-pemfile from 1.0.2 to 1.0.3 (#504)
Bumps [rustls-pemfile](https://github.com/rustls/pemfile) from 1.0.2 to 1.0.3.
- [Commits](https://github.com/rustls/pemfile/commits)

---
updated-dependencies:
- dependency-name: rustls-pemfile
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-07-12 21:41:48 -07:00
Mostafa Abdelraouf
3c9565d351 Add support for tcp_user_timeout (#503)
* Add support for tcp_user_timeout

* option

* duration

* Some()

* docs

* fmt, compile
2023-07-12 11:24:30 -07:00
dependabot[bot]
67579c9af4 chore(deps): bump rustls from 0.21.1 to 0.21.5 (#501)
Bumps [rustls](https://github.com/rustls/rustls) from 0.21.1 to 0.21.5.
- [Release notes](https://github.com/rustls/rustls/releases)
- [Commits](https://github.com/rustls/rustls/compare/v/0.21.1...v/0.21.5)

---
updated-dependencies:
- dependency-name: rustls
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-07-12 05:46:31 -07:00
Cluas
cf7f6f35ab docs: fix general.autoreload description (#491)
* docs: fix autoreload description

Signed-off-by: Cluas <Cluas@live.cn>

* docs: add blank line

Signed-off-by: Cluas <Cluas@live.cn>

---------

Signed-off-by: Cluas <Cluas@live.cn>
2023-07-12 05:42:44 -07:00
Voldemarich
7205537b49 [BUG] Fix binding of NULL value parameters in prepared statements (#496)
Fix binding of NULL value parameters in prepared statements

Co-authored-by: anon <anon@non.existent>
2023-07-10 10:35:43 +02:00
Zain Kabani
1ed6e925ed Fixes the default for round robing in General (#488) 2023-06-23 09:15:44 -07:00
Lev Kokotov
4b78af9676 Implement Close for prepared statements (#482)
* Partial support for Close

* Close

* respect config value

* prepared spec

* Hmm

* Print cache size
2023-06-18 23:02:34 -07:00
Lev Kokotov
73500c0c96 Fix build (#481) 2023-06-17 09:09:54 -07:00
Lev Kokotov
b167de5aa3 fmt (#480) 2023-06-17 08:57:33 -07:00
19 changed files with 1133 additions and 456 deletions

View File

@@ -1,6 +1,11 @@
name: Build and Push
on: push
on:
push:
branches:
- main
tags:
- v*
env:
registry: ghcr.io

View File

@@ -1,4 +1,4 @@
# PgCat Configurations
# PgCat Configurations
## `general` Section
### host
@@ -116,10 +116,10 @@ If we should log client disconnections
### autoreload
```
path: general.autoreload
default: 15000
default: 15000 # milliseconds
```
When set to true, PgCat reloads configs if it detects a change in the config file.
When set, PgCat automatically reloads its configurations at the specified interval (in milliseconds) if it detects changes in the configuration file. The default interval is 15000 milliseconds or 15 seconds.
### worker_threads
```
@@ -151,7 +151,13 @@ path: general.tcp_keepalives_interval
default: 5
```
Number of seconds between keepalive packets.
### tcp_user_timeout
```
path: general.tcp_user_timeout
default: 10000
```
A linux-only parameters that defines the amount of time in milliseconds that transmitted data may remain unacknowledged or buffered data may remain untransmitted (due to zero window size) before TCP will forcibly disconnect
### tls_certificate
```

901
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
[package]
name = "pgcat"
version = "1.0.2-alpha3"
version = "1.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@@ -46,6 +46,9 @@ trust-dns-resolver = "0.22.0"
tokio-test = "0.4.2"
serde_json = "1"
itertools = "0.10"
clap = { version = "4.3.1", features = ["derive", "env"] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.17", features = ["json"]}
[target.'cfg(not(target_env = "msvc"))'.dependencies]
jemallocator = "0.5.0"

View File

@@ -63,6 +63,9 @@ tcp_keepalives_interval = 5
# Handle prepared statements.
prepared_statements = true
# Prepared statements server cache size.
prepared_statements_cache_size = 500
# Path to TLS Certificate file to use for TLS connections
# tls_certificate = ".circleci/server.cert"
# Path to TLS private key file to use for TLS connections

View File

@@ -84,6 +84,10 @@ where
shutdown(stream).await
}
"SHOW" => match query_parts[1].to_ascii_uppercase().as_str() {
"HELP" => {
trace!("SHOW HELP");
show_help(stream).await
}
"BANS" => {
trace!("SHOW BANS");
show_bans(stream).await
@@ -271,6 +275,45 @@ where
write_all_half(stream, &res).await
}
/// Show all available options.
async fn show_help<T>(stream: &mut T) -> Result<(), Error>
where
T: tokio::io::AsyncWrite + std::marker::Unpin,
{
let mut res = BytesMut::new();
let detail_msg = vec![
"",
"SHOW HELP|CONFIG|DATABASES|POOLS|CLIENTS|SERVERS|USERS|VERSION",
// "SHOW PEERS|PEER_POOLS", // missing PEERS|PEER_POOLS
// "SHOW FDS|SOCKETS|ACTIVE_SOCKETS|LISTS|MEM|STATE", // missing FDS|SOCKETS|ACTIVE_SOCKETS|MEM|STATE
"SHOW LISTS",
// "SHOW DNS_HOSTS|DNS_ZONES", // missing DNS_HOSTS|DNS_ZONES
"SHOW STATS", // missing STATS_TOTALS|STATS_AVERAGES|TOTALS
"SET key = arg",
"RELOAD",
"PAUSE [<db>, <user>]",
"RESUME [<db>, <user>]",
// "DISABLE <db>", // missing
// "ENABLE <db>", // missing
// "RECONNECT [<db>]", missing
// "KILL <db>",
// "SUSPEND",
"SHUTDOWN",
// "WAIT_CLOSE [<db>]", // missing
];
res.put(notify("Console usage", detail_msg.join("\n\t")));
res.put(command_complete("SHOW"));
// ReadyForQuery
res.put_u8(b'Z');
res.put_i32(5);
res.put_u8(b'I');
write_all_half(stream, &res).await
}
/// Show shards and replicas.
async fn show_databases<T>(stream: &mut T) -> Result<(), Error>
where
@@ -701,6 +744,7 @@ where
("age_seconds", DataType::Numeric),
("prepare_cache_hit", DataType::Numeric),
("prepare_cache_miss", DataType::Numeric),
("prepare_cache_size", DataType::Numeric),
];
let new_map = get_server_stats();
@@ -732,6 +776,10 @@ where
.prepared_miss_count
.load(Ordering::Relaxed)
.to_string(),
server
.prepared_cache_size
.load(Ordering::Relaxed)
.to_string(),
];
res.put(data_row(&row));

View File

@@ -906,6 +906,19 @@ where
return Ok(());
}
// Close (F)
'C' => {
if prepared_statements_enabled {
let close: Close = (&message).try_into()?;
if close.is_prepared_statement() && !close.anonymous() {
self.prepared_statements.remove(&close.name);
write_all_flush(&mut self.write, &close_complete()).await?;
continue;
}
}
}
_ => (),
}
@@ -1130,7 +1143,17 @@ where
} else {
// The statement is not prepared on the server, so we need to prepare it.
if server.should_prepare(&statement.name) {
server.prepare(statement).await?;
match server.prepare(statement).await {
Ok(_) => (),
Err(err) => {
pool.ban(
&address,
BanReason::MessageSendFailed,
Some(&self.stats),
);
return Err(err);
}
}
}
}
@@ -1237,7 +1260,7 @@ where
// Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects.
if self.transaction_mode {
if self.transaction_mode && !server.in_copy_mode() {
self.stats.idle();
break;
@@ -1251,6 +1274,10 @@ where
self.stats.disconnect();
self.release();
if prepared_statements_enabled {
server.maintain_cache().await?;
}
return Ok(());
}
@@ -1300,6 +1327,21 @@ where
// Close the prepared statement.
'C' => {
if prepared_statements_enabled {
let close: Close = (&message).try_into()?;
if close.is_prepared_statement() && !close.anonymous() {
match self.prepared_statements.get(&close.name) {
Some(parse) => {
server.will_close(&parse.generated_name);
}
// A prepared statement slipped through? Not impossible, since we don't support PREPARE yet.
None => (),
};
}
}
self.buffer.put(&message[..]);
}
@@ -1368,7 +1410,7 @@ where
// Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects.
if self.transaction_mode {
if self.transaction_mode && !server.in_copy_mode() {
break;
}
}
@@ -1433,7 +1475,13 @@ where
// The server is no longer bound to us, we can't cancel it's queries anymore.
debug!("Releasing server back into the pool");
server.checkin_cleanup().await?;
if prepared_statements_enabled {
server.maintain_cache().await?;
}
server.stats().idle();
self.connected_to_server = false;

36
src/cmd_args.rs Normal file
View File

@@ -0,0 +1,36 @@
use clap::{Parser, ValueEnum};
use tracing::Level;
/// PgCat: Nextgen PostgreSQL Pooler
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
pub struct Args {
#[arg(default_value_t = String::from("pgcat.toml"), env)]
pub config_file: String,
#[arg(short, long, default_value_t = tracing::Level::INFO, env)]
pub log_level: Level,
#[clap(short='F', long, value_enum, default_value_t=LogFormat::Text, env)]
pub log_format: LogFormat,
#[arg(
short,
long,
default_value_t = false,
env,
help = "disable colors in the log output"
)]
pub no_color: bool,
}
pub fn parse() -> Args {
return Args::parse();
}
#[derive(ValueEnum, Clone, Debug)]
pub enum LogFormat {
Text,
Structured,
Debug,
}

View File

@@ -261,6 +261,8 @@ pub struct General {
pub tcp_keepalives_count: u32,
#[serde(default = "General::default_tcp_keepalives_interval")]
pub tcp_keepalives_interval: u64,
#[serde(default = "General::default_tcp_user_timeout")]
pub tcp_user_timeout: u64,
#[serde(default)] // False
pub log_client_connections: bool,
@@ -323,6 +325,9 @@ pub struct General {
#[serde(default)]
pub prepared_statements: bool,
#[serde(default = "General::default_prepared_statements_cache_size")]
pub prepared_statements_cache_size: usize,
}
impl General {
@@ -357,6 +362,10 @@ impl General {
5 // 5 seconds
}
pub fn default_tcp_user_timeout() -> u64 {
10000 // 10000 milliseconds
}
pub fn default_idle_timeout() -> u64 {
600000 // 10 minutes
}
@@ -400,6 +409,10 @@ impl General {
pub fn default_server_round_robin() -> bool {
true
}
pub fn default_prepared_statements_cache_size() -> usize {
500
}
}
impl Default for General {
@@ -420,6 +433,7 @@ impl Default for General {
tcp_keepalives_idle: Self::default_tcp_keepalives_idle(),
tcp_keepalives_count: Self::default_tcp_keepalives_count(),
tcp_keepalives_interval: Self::default_tcp_keepalives_interval(),
tcp_user_timeout: Self::default_tcp_user_timeout(),
log_client_connections: false,
log_client_disconnections: false,
autoreload: None,
@@ -435,9 +449,10 @@ impl Default for General {
auth_query_user: None,
auth_query_password: None,
server_lifetime: Self::default_server_lifetime(),
server_round_robin: false,
server_round_robin: Self::default_server_round_robin(),
validate_config: true,
prepared_statements: false,
prepared_statements_cache_size: 500,
}
}
}
@@ -1020,6 +1035,12 @@ impl Config {
self.general.verify_server_certificate
);
info!("Prepared statements: {}", self.general.prepared_statements);
if self.general.prepared_statements {
info!(
"Prepared statements server cache size: {}",
self.general.prepared_statements_cache_size
);
}
info!(
"Plugins: {}",
match self.plugins {
@@ -1239,13 +1260,15 @@ pub fn get_config() -> Config {
}
pub fn get_idle_client_in_transaction_timeout() -> u64 {
(*(*CONFIG.load()))
.general
.idle_client_in_transaction_timeout
CONFIG.load().general.idle_client_in_transaction_timeout
}
pub fn get_prepared_statements() -> bool {
(*(*CONFIG.load())).general.prepared_statements
CONFIG.load().general.prepared_statements
}
pub fn get_prepared_statements_cache_size() -> usize {
CONFIG.load().general.prepared_statements_cache_size
}
/// Parse the configuration file located at the path.

View File

@@ -1,13 +1,14 @@
pub mod admin;
pub mod auth_passthrough;
pub mod client;
pub mod cmd_args;
pub mod config;
pub mod constants;
pub mod dns_cache;
pub mod errors;
pub mod logger;
pub mod messages;
pub mod mirrors;
pub mod multi_logger;
pub mod plugins;
pub mod pool;
pub mod prometheus;

14
src/logger.rs Normal file
View File

@@ -0,0 +1,14 @@
use crate::cmd_args::{Args, LogFormat};
use tracing_subscriber;
pub fn init(args: &Args) {
let trace_sub = tracing_subscriber::fmt()
.with_max_level(args.log_level)
.with_ansi(!args.no_color);
match args.log_format {
LogFormat::Structured => trace_sub.json().init(),
LogFormat::Debug => trace_sub.pretty().init(),
_ => trace_sub.init(),
};
}

View File

@@ -61,15 +61,18 @@ use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::broadcast;
use pgcat::cmd_args;
use pgcat::config::{get_config, reload_config, VERSION};
use pgcat::dns_cache;
use pgcat::logger;
use pgcat::messages::configure_socket;
use pgcat::pool::{ClientServerMap, ConnectionPool};
use pgcat::prometheus::start_metric_server;
use pgcat::stats::{Collector, Reporter, REPORTER};
fn main() -> Result<(), Box<dyn std::error::Error>> {
pgcat::multi_logger::MultiLogger::init().unwrap();
let args = cmd_args::parse();
logger::init(&args);
info!("Welcome to PgCat! Meow. (Version {})", VERSION);
@@ -78,20 +81,12 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
std::process::exit(exitcode::CONFIG);
}
let args = std::env::args().collect::<Vec<String>>();
let config_file = if args.len() == 2 {
args[1].to_string()
} else {
String::from("pgcat.toml")
};
// Create a transient runtime for loading the config for the first time.
{
let runtime = Builder::new_multi_thread().worker_threads(1).build()?;
runtime.block_on(async {
match pgcat::config::parse(&config_file).await {
match pgcat::config::parse(args.config_file.as_str()).await {
Ok(_) => (),
Err(err) => {
error!("Config parse error: {:?}", err);

View File

@@ -1,7 +1,7 @@
/// Helper functions to send one-off protocol messages
/// and handle TcpStream (TCP socket).
use bytes::{Buf, BufMut, BytesMut};
use log::error;
use log::{debug, error};
use md5::{Digest, Md5};
use socket2::{SockRef, TcpKeepalive};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
@@ -530,6 +530,26 @@ pub fn command_complete(command: &str) -> BytesMut {
res
}
/// Create a notify message.
pub fn notify(message: &str, details: String) -> BytesMut {
let mut notify_cmd = BytesMut::new();
notify_cmd.put_slice("SNOTICE\0".as_bytes());
notify_cmd.put_slice("C00000\0".as_bytes());
notify_cmd.put_slice(format!("M{}\0", message).as_bytes());
notify_cmd.put_slice(format!("D{}\0", details).as_bytes());
// this extra byte says that is the end of the package
notify_cmd.put_u8(0);
let mut res = BytesMut::new();
res.put_u8(b'N');
res.put_i32(notify_cmd.len() as i32 + 4);
res.put(notify_cmd);
res
}
pub fn flush() -> BytesMut {
let mut bytes = BytesMut::new();
bytes.put_u8(b'H');
@@ -669,6 +689,13 @@ pub fn configure_socket(stream: &TcpStream) {
let sock_ref = SockRef::from(stream);
let conf = get_config();
#[cfg(target_os = "linux")]
match sock_ref.set_tcp_user_timeout(Some(Duration::from_millis(conf.general.tcp_user_timeout)))
{
Ok(_) => (),
Err(err) => error!("Could not configure tcp_user_timeout for socket: {}", err),
}
match sock_ref.set_keepalive(true) {
Ok(_) => {
match sock_ref.set_tcp_keepalive(
@@ -678,7 +705,7 @@ pub fn configure_socket(stream: &TcpStream) {
.with_time(Duration::from_secs(conf.general.tcp_keepalives_idle)),
) {
Ok(_) => (),
Err(err) => error!("Could not configure socket: {}", err),
Err(err) => error!("Could not configure tcp_keepalive for socket: {}", err),
}
}
Err(err) => error!("Could not configure socket: {}", err),
@@ -832,10 +859,21 @@ impl TryFrom<&BytesMut> for Bind {
for _ in 0..num_param_values {
let param_len = cursor.get_i32();
let mut param = BytesMut::with_capacity(param_len as usize);
param.resize(param_len as usize, b'0');
cursor.copy_to_slice(&mut param);
param_values.push((param_len, param));
// There is special occasion when the parameter is NULL
// In that case, param length is defined as -1
// So if the passed parameter len is over 0
if param_len > 0 {
let mut param = BytesMut::with_capacity(param_len as usize);
param.resize(param_len as usize, b'0');
cursor.copy_to_slice(&mut param);
// we push and the length and the parameter into vector
param_values.push((param_len, param));
} else {
// otherwise we push a tuple with -1 and 0-len BytesMut
// which means that after encountering -1 postgres proceeds
// to processing another parameter
param_values.push((param_len, BytesMut::new()));
}
}
let num_result_column_format_codes = cursor.get_i16();
@@ -976,6 +1014,84 @@ impl Describe {
}
}
/// Close (F) message.
/// See: <https://www.postgresql.org/docs/current/protocol-message-formats.html>
#[derive(Clone, Debug)]
pub struct Close {
code: char,
#[allow(dead_code)]
len: i32,
close_type: char,
pub name: String,
}
impl TryFrom<&BytesMut> for Close {
type Error = Error;
fn try_from(bytes: &BytesMut) -> Result<Close, Error> {
let mut cursor = Cursor::new(bytes);
let code = cursor.get_u8() as char;
let len = cursor.get_i32();
let close_type = cursor.get_u8() as char;
let name = cursor.read_string()?;
Ok(Close {
code,
len,
close_type,
name,
})
}
}
impl TryFrom<Close> for BytesMut {
type Error = Error;
fn try_from(close: Close) -> Result<BytesMut, Error> {
debug!("Close: {:?}", close);
let mut bytes = BytesMut::new();
let name_binding = CString::new(close.name)?;
let name = name_binding.as_bytes_with_nul();
let len = 4 + 1 + name.len();
bytes.put_u8(close.code as u8);
bytes.put_i32(len as i32);
bytes.put_u8(close.close_type as u8);
bytes.put_slice(name);
Ok(bytes)
}
}
impl Close {
pub fn new(name: &str) -> Close {
let name = name.to_string();
Close {
code: 'C',
len: 4 + 1 + name.len() as i32 + 1, // will be recalculated
close_type: 'S',
name,
}
}
pub fn is_prepared_statement(&self) -> bool {
self.close_type == 'S'
}
pub fn anonymous(&self) -> bool {
self.name.is_empty()
}
}
pub fn close_complete() -> BytesMut {
let mut bytes = BytesMut::new();
bytes.put_u8(b'3');
bytes.put_i32(4);
bytes
}
pub fn prepared_statement_name() -> String {
format!(
"P_{}",

View File

@@ -1,80 +0,0 @@
use log::{Level, Log, Metadata, Record, SetLoggerError};
// This is a special kind of logger that allows sending logs to different
// targets depending on the log level.
//
// By default, if nothing is set, it acts as a regular env_log logger,
// it sends everything to standard error.
//
// If the Env variable `STDOUT_LOG` is defined, it will be used for
// configuring the standard out logger.
//
// The behavior is:
// - If it is an error, the message is written to standard error.
// - If it is not, and it matches the log level of the standard output logger (`STDOUT_LOG` env var), it will be send to standard output.
// - If the above is not true, it is sent to the stderr logger that will log it or not depending on the value
// of the RUST_LOG env var.
//
// So to summarize, if no `STDOUT_LOG` env var is present, the logger is the default logger. If `STDOUT_LOG` is set, everything
// but errors, that matches the log level set in the `STDOUT_LOG` env var is sent to stdout. You can have also some esoteric configuration
// where you set `RUST_LOG=debug` and `STDOUT_LOG=info`, in here, errors will go to stderr, warns and infos to stdout and debugs to stderr.
//
pub struct MultiLogger {
stderr_logger: env_logger::Logger,
stdout_logger: env_logger::Logger,
}
impl MultiLogger {
fn new() -> Self {
let stderr_logger = env_logger::builder().format_timestamp_micros().build();
let stdout_logger = env_logger::Builder::from_env("STDOUT_LOG")
.format_timestamp_micros()
.target(env_logger::Target::Stdout)
.build();
Self {
stderr_logger,
stdout_logger,
}
}
pub fn init() -> Result<(), SetLoggerError> {
let logger = Self::new();
log::set_max_level(logger.stderr_logger.filter());
log::set_boxed_logger(Box::new(logger))
}
}
impl Log for MultiLogger {
fn enabled(&self, metadata: &Metadata) -> bool {
self.stderr_logger.enabled(metadata) && self.stdout_logger.enabled(metadata)
}
fn log(&self, record: &Record) {
if record.level() == Level::Error {
self.stderr_logger.log(record);
} else {
if self.stdout_logger.matches(record) {
self.stdout_logger.log(record);
} else {
self.stderr_logger.log(record);
}
}
}
fn flush(&self) {
self.stderr_logger.flush();
self.stdout_logger.flush();
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_init() {
MultiLogger::init().unwrap();
}
}

View File

@@ -364,7 +364,7 @@ fn push_server_stats(lines: &mut Vec<String>) {
{
lines.push(prometheus_metric.to_string());
} else {
warn!("Metric {} not implemented for {}", key, address.name());
debug!("Metric {} not implemented for {}", key, address.name());
}
}
}

View File

@@ -15,7 +15,7 @@ use tokio::net::TcpStream;
use tokio_rustls::rustls::{OwnedTrustAnchor, RootCertStore};
use tokio_rustls::{client::TlsStream, TlsConnector};
use crate::config::{get_config, Address, User};
use crate::config::{get_config, get_prepared_statements_cache_size, Address, User};
use crate::constants::*;
use crate::dns_cache::{AddrSet, CACHED_RESOLVER};
use crate::errors::{Error, ServerIdentifier};
@@ -170,6 +170,9 @@ pub struct Server {
/// Is there more data for the client to read.
data_available: bool,
/// Is the server in copy-in or copy-out modes
in_copy_mode: bool,
/// Is the server broken? We'll remote it from the pool if so.
bad: bool,
@@ -677,6 +680,7 @@ impl Server {
process_id,
secret_key,
in_transaction: false,
in_copy_mode: false,
data_available: false,
bad: false,
cleanup_state: CleanupState::new(),
@@ -828,8 +832,19 @@ impl Server {
break;
}
// ErrorResponse
'E' => {
if self.in_copy_mode {
self.in_copy_mode = false;
}
}
// CommandComplete
'C' => {
if self.in_copy_mode {
self.in_copy_mode = false;
}
let mut command_tag = String::new();
match message.reader().read_to_string(&mut command_tag) {
Ok(_) => {
@@ -873,10 +888,14 @@ impl Server {
}
// CopyInResponse: copy is starting from client to server.
'G' => break,
'G' => {
self.in_copy_mode = true;
break;
}
// CopyOutResponse: copy is starting from the server to the client.
'H' => {
self.in_copy_mode = true;
self.data_available = true;
break;
}
@@ -914,12 +933,16 @@ impl Server {
Ok(bytes)
}
/// Add the prepared statement to being tracked by this server.
/// The client is processing data that will create a prepared statement on this server.
pub fn will_prepare(&mut self, name: &str) {
debug!("Will prepare `{}`", name);
self.prepared_statements.insert(name.to_string());
self.stats.prepared_cache_add();
}
/// Check if we should prepare a statement on the server.
pub fn should_prepare(&self, name: &str) -> bool {
let should_prepare = !self.prepared_statements.contains(name);
@@ -934,6 +957,7 @@ impl Server {
should_prepare
}
/// Create a prepared statement on the server.
pub async fn prepare(&mut self, parse: &Parse) -> Result<(), Error> {
debug!("Preparing `{}`", parse.name);
@@ -942,15 +966,82 @@ impl Server {
self.send(&flush()).await?;
// Read and discard ParseComplete (B)
let _ = read_message(&mut self.stream).await?;
match read_message(&mut self.stream).await {
Ok(_) => (),
Err(err) => {
self.bad = true;
return Err(err);
}
}
self.prepared_statements.insert(parse.name.to_string());
self.stats.prepared_cache_add();
debug!("Prepared `{}`", parse.name);
Ok(())
}
/// Maintain adequate cache size on the server.
pub async fn maintain_cache(&mut self) -> Result<(), Error> {
debug!("Cache maintenance run");
let max_cache_size = get_prepared_statements_cache_size();
let mut names = Vec::new();
while self.prepared_statements.len() >= max_cache_size {
// The prepared statmeents are alphanumerically sorted by the BTree.
// FIFO.
if let Some(name) = self.prepared_statements.pop_last() {
names.push(name);
}
}
self.deallocate(names).await?;
Ok(())
}
/// Remove the prepared statement from being tracked by this server.
/// The client is processing data that will cause the server to close the prepared statement.
pub fn will_close(&mut self, name: &str) {
debug!("Will close `{}`", name);
self.prepared_statements.remove(name);
}
/// Close a prepared statement on the server.
pub async fn deallocate(&mut self, names: Vec<String>) -> Result<(), Error> {
for name in &names {
debug!("Deallocating prepared statement `{}`", name);
let close = Close::new(name);
let bytes: BytesMut = close.try_into()?;
self.send(&bytes).await?;
}
self.send(&flush()).await?;
// Read and discard CloseComplete (3)
for name in &names {
match read_message(&mut self.stream).await {
Ok(_) => {
self.prepared_statements.remove(name);
self.stats.prepared_cache_remove();
debug!("Closed `{}`", name);
}
Err(err) => {
self.bad = true;
return Err(err);
}
};
}
Ok(())
}
/// If the server is still inside a transaction.
/// If the client disconnects while the server is in a transaction, we will clean it up.
pub fn in_transaction(&self) -> bool {
@@ -958,6 +1049,10 @@ impl Server {
self.in_transaction
}
pub fn in_copy_mode(&self) -> bool {
self.in_copy_mode
}
/// We don't buffer all of server responses, e.g. COPY OUT produces too much data.
/// The client is responsible to call `self.recv()` while this method returns true.
pub fn is_data_available(&self) -> bool {
@@ -1057,6 +1152,10 @@ impl Server {
self.cleanup_state.reset();
}
if self.in_copy_mode() {
warn!("Server returned while still in copy-mode");
}
Ok(())
}

View File

@@ -49,6 +49,7 @@ pub struct ServerStats {
pub error_count: Arc<AtomicU64>,
pub prepared_hit_count: Arc<AtomicU64>,
pub prepared_miss_count: Arc<AtomicU64>,
pub prepared_cache_size: Arc<AtomicU64>,
}
impl Default for ServerStats {
@@ -67,6 +68,7 @@ impl Default for ServerStats {
reporter: get_reporter(),
prepared_hit_count: Arc::new(AtomicU64::new(0)),
prepared_miss_count: Arc::new(AtomicU64::new(0)),
prepared_cache_size: Arc::new(AtomicU64::new(0)),
}
}
}
@@ -213,4 +215,12 @@ impl ServerStats {
pub fn prepared_cache_miss(&self) {
self.prepared_miss_count.fetch_add(1, Ordering::Relaxed);
}
pub fn prepared_cache_add(&self) {
self.prepared_cache_size.fetch_add(1, Ordering::Relaxed);
}
pub fn prepared_cache_remove(&self) {
self.prepared_cache_size.fetch_sub(1, Ordering::Relaxed);
}
}

102
tests/ruby/copy_spec.rb Normal file
View File

@@ -0,0 +1,102 @@
# frozen_string_literal: true
require_relative 'spec_helper'
describe "COPY Handling" do
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 5) }
before do
new_configs = processes.pgcat.current_config
# Allow connections in the pool to expire faster
new_configs["general"]["idle_timeout"] = 5
processes.pgcat.update_config(new_configs)
# We need to kill the old process that was using the default configs
processes.pgcat.stop
processes.pgcat.start
processes.pgcat.wait_until_ready
end
before do
processes.all_databases.first.with_connection do |conn|
conn.async_exec "CREATE TABLE copy_test_table (a TEXT,b TEXT,c TEXT,d TEXT)"
end
end
after do
processes.all_databases.first.with_connection do |conn|
conn.async_exec "DROP TABLE copy_test_table;"
end
end
after do
processes.all_databases.map(&:reset)
processes.pgcat.shutdown
end
describe "COPY FROM" do
context "within transaction" do
it "finishes within alloted time" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
Timeout.timeout(3) do
conn.async_exec("BEGIN")
conn.copy_data "COPY copy_test_table FROM STDIN CSV" do
sleep 0.5
conn.put_copy_data "some,data,to,copy\n"
conn.put_copy_data "more,data,to,copy\n"
end
conn.async_exec("COMMIT")
end
res = conn.async_exec("SELECT * FROM copy_test_table").to_a
expect(res).to eq([
{"a"=>"some", "b"=>"data", "c"=>"to", "d"=>"copy"},
{"a"=>"more", "b"=>"data", "c"=>"to", "d"=>"copy"}
])
end
end
context "outside transaction" do
it "finishes within alloted time" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
Timeout.timeout(3) do
conn.copy_data "COPY copy_test_table FROM STDIN CSV" do
sleep 0.5
conn.put_copy_data "some,data,to,copy\n"
conn.put_copy_data "more,data,to,copy\n"
end
end
res = conn.async_exec("SELECT * FROM copy_test_table").to_a
expect(res).to eq([
{"a"=>"some", "b"=>"data", "c"=>"to", "d"=>"copy"},
{"a"=>"more", "b"=>"data", "c"=>"to", "d"=>"copy"}
])
end
end
end
describe "COPY TO" do
before do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.async_exec("BEGIN")
conn.copy_data "COPY copy_test_table FROM STDIN CSV" do
conn.put_copy_data "some,data,to,copy\n"
conn.put_copy_data "more,data,to,copy\n"
end
conn.async_exec("COMMIT")
conn.close
end
it "works" do
res = []
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.copy_data "COPY copy_test_table TO STDOUT CSV" do
while row=conn.get_copy_data
res << row
end
end
expect(res).to eq(["some,data,to,copy\n", "more,data,to,copy\n"])
end
end
end

View File

@@ -0,0 +1,29 @@
require_relative 'spec_helper'
describe 'Prepared statements' do
let(:processes) { Helpers::Pgcat.three_shard_setup('sharded_db', 5) }
context 'enabled' do
it 'will work over the same connection' do
conn = PG.connect(processes.pgcat.connection_string('sharded_db', 'sharding_user'))
10.times do |i|
statement_name = "statement_#{i}"
conn.prepare(statement_name, 'SELECT $1::int')
conn.exec_prepared(statement_name, [1])
conn.describe_prepared(statement_name)
end
end
it 'will work with new connections' do
10.times do
conn = PG.connect(processes.pgcat.connection_string('sharded_db', 'sharding_user'))
statement_name = 'statement1'
conn.prepare('statement1', 'SELECT $1::int')
conn.exec_prepared('statement1', [1])
conn.describe_prepared('statement1')
end
end
end
end