mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-23 01:16:30 +00:00
Compare commits
5 Commits
circleci_p
...
mostafa_fi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3fc9e5dec1 | ||
|
|
f7c5c0faf9 | ||
|
|
982d03c374 | ||
|
|
686b7ca7c5 | ||
|
|
7c55bf78fe |
@@ -26,7 +26,6 @@ PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard1 -i
|
||||
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard2 -i
|
||||
|
||||
# Start Toxiproxy
|
||||
kill -9 $(pgrep toxiproxy) || true
|
||||
LOG_LEVEL=error toxiproxy-server &
|
||||
sleep 1
|
||||
|
||||
@@ -178,6 +177,3 @@ killall pgcat -s SIGINT
|
||||
|
||||
# Allow for graceful shutdown
|
||||
sleep 1
|
||||
|
||||
kill -9 $(pgrep toxiproxy)
|
||||
sleep 1
|
||||
|
||||
@@ -1729,14 +1729,13 @@ where
|
||||
/// and also the pool's statement cache. Add it to extended protocol data.
|
||||
fn buffer_parse(&mut self, message: BytesMut, pool: &ConnectionPool) -> Result<(), Error> {
|
||||
// Avoid parsing if prepared statements not enabled
|
||||
if !self.prepared_statements_enabled {
|
||||
let client_given_name = Parse::get_name(&message)?;
|
||||
if !self.prepared_statements_enabled || client_given_name.is_empty() {
|
||||
debug!("Anonymous parse message");
|
||||
self.extended_protocol_data_buffer
|
||||
.push_back(ExtendedProtocolData::create_new_parse(message, None));
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let client_given_name = Parse::get_name(&message)?;
|
||||
let parse: Parse = (&message).try_into()?;
|
||||
|
||||
// Compute the hash of the parse statement
|
||||
@@ -1774,15 +1773,14 @@ where
|
||||
/// saved in the client cache.
|
||||
async fn buffer_bind(&mut self, message: BytesMut) -> Result<(), Error> {
|
||||
// Avoid parsing if prepared statements not enabled
|
||||
if !self.prepared_statements_enabled {
|
||||
let client_given_name = Bind::get_name(&message)?;
|
||||
if !self.prepared_statements_enabled || client_given_name.is_empty() {
|
||||
debug!("Anonymous bind message");
|
||||
self.extended_protocol_data_buffer
|
||||
.push_back(ExtendedProtocolData::create_new_bind(message, None));
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let client_given_name = Bind::get_name(&message)?;
|
||||
|
||||
match self.prepared_statements.get(&client_given_name) {
|
||||
Some((rewritten_parse, _)) => {
|
||||
let message = Bind::rename(message, &rewritten_parse.name)?;
|
||||
@@ -1834,7 +1832,8 @@ where
|
||||
}
|
||||
|
||||
let describe: Describe = (&message).try_into()?;
|
||||
if describe.target == 'P' {
|
||||
let client_given_name = describe.statement_name.clone();
|
||||
if describe.target == 'P' || client_given_name.is_empty() {
|
||||
debug!("Portal describe message");
|
||||
self.extended_protocol_data_buffer
|
||||
.push_back(ExtendedProtocolData::create_new_describe(message, None));
|
||||
@@ -1842,8 +1841,6 @@ where
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let client_given_name = describe.statement_name.clone();
|
||||
|
||||
match self.prepared_statements.get(&client_given_name) {
|
||||
Some((rewritten_parse, _)) => {
|
||||
let describe = describe.rename(&rewritten_parse.name);
|
||||
|
||||
@@ -821,10 +821,10 @@ impl ExtendedProtocolData {
|
||||
pub struct Parse {
|
||||
code: char,
|
||||
#[allow(dead_code)]
|
||||
len: i32,
|
||||
len: u32,
|
||||
pub name: String,
|
||||
query: String,
|
||||
num_params: i16,
|
||||
num_params: u16,
|
||||
param_types: Vec<i32>,
|
||||
}
|
||||
|
||||
@@ -834,12 +834,11 @@ impl TryFrom<&BytesMut> for Parse {
|
||||
fn try_from(buf: &BytesMut) -> Result<Parse, Error> {
|
||||
let mut cursor = Cursor::new(buf);
|
||||
let code = cursor.get_u8() as char;
|
||||
let len = cursor.get_i32();
|
||||
let len = cursor.get_u32();
|
||||
let name = cursor.read_string()?;
|
||||
let query = cursor.read_string()?;
|
||||
let num_params = cursor.get_i16();
|
||||
let num_params = cursor.get_u16();
|
||||
let mut param_types = Vec::new();
|
||||
|
||||
for _ in 0..num_params {
|
||||
param_types.push(cursor.get_i32());
|
||||
}
|
||||
@@ -875,10 +874,10 @@ impl TryFrom<Parse> for BytesMut {
|
||||
+ 4 * parse.num_params as usize;
|
||||
|
||||
bytes.put_u8(parse.code as u8);
|
||||
bytes.put_i32(len as i32);
|
||||
bytes.put_u32(len as u32);
|
||||
bytes.put_slice(name);
|
||||
bytes.put_slice(query);
|
||||
bytes.put_i16(parse.num_params);
|
||||
bytes.put_u16(parse.num_params);
|
||||
for param in parse.param_types {
|
||||
bytes.put_i32(param);
|
||||
}
|
||||
@@ -945,14 +944,14 @@ impl Parse {
|
||||
pub struct Bind {
|
||||
code: char,
|
||||
#[allow(dead_code)]
|
||||
len: i64,
|
||||
len: u64,
|
||||
portal: String,
|
||||
pub prepared_statement: String,
|
||||
num_param_format_codes: i16,
|
||||
num_param_format_codes: u16,
|
||||
param_format_codes: Vec<i16>,
|
||||
num_param_values: i16,
|
||||
num_param_values: u16,
|
||||
param_values: Vec<(i32, BytesMut)>,
|
||||
num_result_column_format_codes: i16,
|
||||
num_result_column_format_codes: u16,
|
||||
result_columns_format_codes: Vec<i16>,
|
||||
}
|
||||
|
||||
@@ -962,17 +961,17 @@ impl TryFrom<&BytesMut> for Bind {
|
||||
fn try_from(buf: &BytesMut) -> Result<Bind, Error> {
|
||||
let mut cursor = Cursor::new(buf);
|
||||
let code = cursor.get_u8() as char;
|
||||
let len = cursor.get_i32();
|
||||
let len = cursor.get_u32();
|
||||
let portal = cursor.read_string()?;
|
||||
let prepared_statement = cursor.read_string()?;
|
||||
let num_param_format_codes = cursor.get_i16();
|
||||
let num_param_format_codes = cursor.get_u16();
|
||||
let mut param_format_codes = Vec::new();
|
||||
|
||||
for _ in 0..num_param_format_codes {
|
||||
param_format_codes.push(cursor.get_i16());
|
||||
}
|
||||
|
||||
let num_param_values = cursor.get_i16();
|
||||
let num_param_values = cursor.get_u16();
|
||||
let mut param_values = Vec::new();
|
||||
|
||||
for _ in 0..num_param_values {
|
||||
@@ -994,7 +993,7 @@ impl TryFrom<&BytesMut> for Bind {
|
||||
}
|
||||
}
|
||||
|
||||
let num_result_column_format_codes = cursor.get_i16();
|
||||
let num_result_column_format_codes = cursor.get_u16();
|
||||
let mut result_columns_format_codes = Vec::new();
|
||||
|
||||
for _ in 0..num_result_column_format_codes {
|
||||
@@ -1003,7 +1002,7 @@ impl TryFrom<&BytesMut> for Bind {
|
||||
|
||||
Ok(Bind {
|
||||
code,
|
||||
len: len as i64,
|
||||
len: len as u64,
|
||||
portal,
|
||||
prepared_statement,
|
||||
num_param_format_codes,
|
||||
@@ -1042,19 +1041,19 @@ impl TryFrom<Bind> for BytesMut {
|
||||
len += 2 * bind.num_result_column_format_codes as usize;
|
||||
|
||||
bytes.put_u8(bind.code as u8);
|
||||
bytes.put_i32(len as i32);
|
||||
bytes.put_u32(len as u32);
|
||||
bytes.put_slice(portal);
|
||||
bytes.put_slice(prepared_statement);
|
||||
bytes.put_i16(bind.num_param_format_codes);
|
||||
bytes.put_u16(bind.num_param_format_codes);
|
||||
for param_format_code in bind.param_format_codes {
|
||||
bytes.put_i16(param_format_code);
|
||||
}
|
||||
bytes.put_i16(bind.num_param_values);
|
||||
bytes.put_u16(bind.num_param_values);
|
||||
for (param_len, param) in bind.param_values {
|
||||
bytes.put_i32(param_len);
|
||||
bytes.put_slice(¶m);
|
||||
}
|
||||
bytes.put_i16(bind.num_result_column_format_codes);
|
||||
bytes.put_u16(bind.num_result_column_format_codes);
|
||||
for result_column_format_code in bind.result_columns_format_codes {
|
||||
bytes.put_i16(result_column_format_code);
|
||||
}
|
||||
@@ -1068,7 +1067,7 @@ impl Bind {
|
||||
pub fn get_name(buf: &BytesMut) -> Result<String, Error> {
|
||||
let mut cursor = Cursor::new(buf);
|
||||
// Skip the code and length
|
||||
cursor.advance(mem::size_of::<u8>() + mem::size_of::<i32>());
|
||||
cursor.advance(mem::size_of::<u8>() + mem::size_of::<u32>());
|
||||
cursor.read_string()?;
|
||||
cursor.read_string()
|
||||
}
|
||||
@@ -1078,17 +1077,17 @@ impl Bind {
|
||||
let mut cursor = Cursor::new(&buf);
|
||||
// Read basic data from the cursor
|
||||
let code = cursor.get_u8();
|
||||
let current_len = cursor.get_i32();
|
||||
let current_len = cursor.get_u32();
|
||||
let portal = cursor.read_string()?;
|
||||
let prepared_statement = cursor.read_string()?;
|
||||
|
||||
// Calculate new length
|
||||
let new_len = current_len + new_name.len() as i32 - prepared_statement.len() as i32;
|
||||
let new_len = current_len + new_name.len() as u32 - prepared_statement.len() as u32;
|
||||
|
||||
// Begin building the response buffer
|
||||
let mut response_buf = BytesMut::with_capacity(new_len as usize + 1);
|
||||
response_buf.put_u8(code);
|
||||
response_buf.put_i32(new_len);
|
||||
response_buf.put_u32(new_len);
|
||||
|
||||
// Put the portal and new name into the buffer
|
||||
// Note: panic if the provided string contains null byte
|
||||
@@ -1112,7 +1111,7 @@ pub struct Describe {
|
||||
code: char,
|
||||
|
||||
#[allow(dead_code)]
|
||||
len: i32,
|
||||
len: u32,
|
||||
pub target: char,
|
||||
pub statement_name: String,
|
||||
}
|
||||
@@ -1123,7 +1122,7 @@ impl TryFrom<&BytesMut> for Describe {
|
||||
fn try_from(bytes: &BytesMut) -> Result<Describe, Error> {
|
||||
let mut cursor = Cursor::new(bytes);
|
||||
let code = cursor.get_u8() as char;
|
||||
let len = cursor.get_i32();
|
||||
let len = cursor.get_u32();
|
||||
let target = cursor.get_u8() as char;
|
||||
let statement_name = cursor.read_string()?;
|
||||
|
||||
@@ -1146,7 +1145,7 @@ impl TryFrom<Describe> for BytesMut {
|
||||
let len = 4 + 1 + statement_name.len();
|
||||
|
||||
bytes.put_u8(describe.code as u8);
|
||||
bytes.put_i32(len as i32);
|
||||
bytes.put_u32(len as u32);
|
||||
bytes.put_u8(describe.target as u8);
|
||||
bytes.put_slice(statement_name);
|
||||
|
||||
|
||||
@@ -200,17 +200,18 @@ struct PrometheusMetric<Value: fmt::Display> {
|
||||
|
||||
impl<Value: fmt::Display> fmt::Display for PrometheusMetric<Value> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
let mut sorted_labels: Vec<_> = self.labels.iter().collect();
|
||||
sorted_labels.sort_by_key(|&(key, _)| key);
|
||||
let formatted_labels = sorted_labels
|
||||
let formatted_labels = self
|
||||
.labels
|
||||
.iter()
|
||||
.map(|(key, value)| format!("{}=\"{}\"", key, value))
|
||||
.collect::<Vec<_>>()
|
||||
.join(",");
|
||||
write!(
|
||||
f,
|
||||
"{name}{{{formatted_labels}}} {value}",
|
||||
"# HELP {name} {help}\n# TYPE {name} {ty}\n{name}{{{formatted_labels}}} {value}\n",
|
||||
name = format_args!("pgcat_{}", self.name),
|
||||
help = self.help,
|
||||
ty = self.ty,
|
||||
formatted_labels = formatted_labels,
|
||||
value = self.value
|
||||
)
|
||||
@@ -246,7 +247,7 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
|
||||
labels.insert("pool", address.pool_name.clone());
|
||||
labels.insert("index", address.address_index.to_string());
|
||||
labels.insert("database", address.database.to_string());
|
||||
labels.insert("username", address.username.clone());
|
||||
labels.insert("user", address.username.clone());
|
||||
|
||||
Self::from_name(&format!("databases_{}", name), value, labels)
|
||||
}
|
||||
@@ -263,8 +264,7 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
|
||||
labels.insert("pool", address.pool_name.clone());
|
||||
labels.insert("index", address.address_index.to_string());
|
||||
labels.insert("database", address.database.to_string());
|
||||
labels.insert("username", address.username.clone());
|
||||
|
||||
labels.insert("user", address.username.clone());
|
||||
Self::from_name(&format!("servers_{}", name), value, labels)
|
||||
}
|
||||
|
||||
@@ -276,7 +276,7 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
|
||||
labels.insert("role", address.role.to_string());
|
||||
labels.insert("index", address.address_index.to_string());
|
||||
labels.insert("database", address.database.to_string());
|
||||
labels.insert("username", address.username.clone());
|
||||
labels.insert("user", address.username.clone());
|
||||
|
||||
Self::from_name(&format!("stats_{}", name), value, labels)
|
||||
}
|
||||
@@ -288,15 +288,6 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
|
||||
|
||||
Self::from_name(&format!("pools_{}", name), value, labels)
|
||||
}
|
||||
|
||||
fn get_header(&self) -> String {
|
||||
format!(
|
||||
"\n# HELP {name} {help}\n# TYPE {name} {ty}",
|
||||
name = format_args!("pgcat_{}", self.name),
|
||||
help = self.help,
|
||||
ty = self.ty,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
async fn prometheus_stats(
|
||||
@@ -322,7 +313,6 @@ async fn prometheus_stats(
|
||||
|
||||
// Adds metrics shown in a SHOW STATS admin command.
|
||||
fn push_address_stats(lines: &mut Vec<String>) {
|
||||
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u64>>> = HashMap::new();
|
||||
for (_, pool) in get_all_pools() {
|
||||
for shard in 0..pool.shards() {
|
||||
for server in 0..pool.servers(shard) {
|
||||
@@ -332,10 +322,7 @@ fn push_address_stats(lines: &mut Vec<String>) {
|
||||
if let Some(prometheus_metric) =
|
||||
PrometheusMetric::<u64>::from_address(address, &key, value)
|
||||
{
|
||||
grouped_metrics
|
||||
.entry(key)
|
||||
.or_default()
|
||||
.push(prometheus_metric);
|
||||
lines.push(prometheus_metric.to_string());
|
||||
} else {
|
||||
debug!("Metric {} not implemented for {}", key, address.name());
|
||||
}
|
||||
@@ -343,53 +330,33 @@ fn push_address_stats(lines: &mut Vec<String>) {
|
||||
}
|
||||
}
|
||||
}
|
||||
for (_key, metrics) in grouped_metrics {
|
||||
if !metrics.is_empty() {
|
||||
lines.push(metrics[0].get_header());
|
||||
for metric in metrics {
|
||||
lines.push(metric.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Adds relevant metrics shown in a SHOW POOLS admin command.
|
||||
fn push_pool_stats(lines: &mut Vec<String>) {
|
||||
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u64>>> = HashMap::new();
|
||||
let pool_stats = PoolStats::construct_pool_lookup();
|
||||
for (pool_id, stats) in pool_stats.iter() {
|
||||
for (name, value) in stats.clone() {
|
||||
if let Some(prometheus_metric) =
|
||||
PrometheusMetric::<u64>::from_pool(pool_id.clone(), &name, value)
|
||||
{
|
||||
grouped_metrics
|
||||
.entry(name)
|
||||
.or_default()
|
||||
.push(prometheus_metric);
|
||||
lines.push(prometheus_metric.to_string());
|
||||
} else {
|
||||
debug!("Metric {} not implemented for ({})", name, *pool_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
for (_key, metrics) in grouped_metrics {
|
||||
if !metrics.is_empty() {
|
||||
lines.push(metrics[0].get_header());
|
||||
for metric in metrics {
|
||||
lines.push(metric.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Adds relevant metrics shown in a SHOW DATABASES admin command.
|
||||
fn push_database_stats(lines: &mut Vec<String>) {
|
||||
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u32>>> = HashMap::new();
|
||||
for (_, pool) in get_all_pools() {
|
||||
let pool_config = pool.settings.clone();
|
||||
for shard in 0..pool.shards() {
|
||||
for server in 0..pool.servers(shard) {
|
||||
let address = pool.address(shard, server);
|
||||
let pool_state = pool.pool_state(shard, server);
|
||||
|
||||
let metrics = vec![
|
||||
("pool_size", pool_config.user.pool_size),
|
||||
("current_connections", pool_state.connections),
|
||||
@@ -398,10 +365,7 @@ fn push_database_stats(lines: &mut Vec<String>) {
|
||||
if let Some(prometheus_metric) =
|
||||
PrometheusMetric::<u32>::from_database_info(address, key, value)
|
||||
{
|
||||
grouped_metrics
|
||||
.entry(key.to_string())
|
||||
.or_default()
|
||||
.push(prometheus_metric);
|
||||
lines.push(prometheus_metric.to_string());
|
||||
} else {
|
||||
debug!("Metric {} not implemented for {}", key, address.name());
|
||||
}
|
||||
@@ -409,14 +373,6 @@ fn push_database_stats(lines: &mut Vec<String>) {
|
||||
}
|
||||
}
|
||||
}
|
||||
for (_key, metrics) in grouped_metrics {
|
||||
if !metrics.is_empty() {
|
||||
lines.push(metrics[0].get_header());
|
||||
for metric in metrics {
|
||||
lines.push(metric.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Adds relevant metrics shown in a SHOW SERVERS admin command.
|
||||
@@ -449,7 +405,7 @@ fn push_server_stats(lines: &mut Vec<String>) {
|
||||
crate::stats::ServerState::Idle => entry.idle_count += 1,
|
||||
}
|
||||
}
|
||||
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u64>>> = HashMap::new();
|
||||
|
||||
for (_, pool) in get_all_pools() {
|
||||
for shard in 0..pool.shards() {
|
||||
for server in 0..pool.servers(shard) {
|
||||
@@ -472,10 +428,7 @@ fn push_server_stats(lines: &mut Vec<String>) {
|
||||
if let Some(prometheus_metric) =
|
||||
PrometheusMetric::<u64>::from_server_info(address, key, value)
|
||||
{
|
||||
grouped_metrics
|
||||
.entry(key.to_string())
|
||||
.or_default()
|
||||
.push(prometheus_metric);
|
||||
lines.push(prometheus_metric.to_string());
|
||||
} else {
|
||||
debug!("Metric {} not implemented for {}", key, address.name());
|
||||
}
|
||||
@@ -484,14 +437,6 @@ fn push_server_stats(lines: &mut Vec<String>) {
|
||||
}
|
||||
}
|
||||
}
|
||||
for (_key, metrics) in grouped_metrics {
|
||||
if !metrics.is_empty() {
|
||||
lines.push(metrics[0].get_header());
|
||||
for metric in metrics {
|
||||
lines.push(metric.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn start_metric_server(http_addr: SocketAddr) {
|
||||
|
||||
@@ -698,7 +698,6 @@ impl Server {
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
trace!("Error: {}", error_code);
|
||||
|
||||
match error_code {
|
||||
@@ -1013,6 +1012,12 @@ impl Server {
|
||||
// which can leak between clients. This is a best effort to block bad clients
|
||||
// from poisoning a transaction-mode pool by setting inappropriate session variables
|
||||
match command.as_str() {
|
||||
"DISCARD ALL" => {
|
||||
self.clear_prepared_statement_cache();
|
||||
}
|
||||
"DEALLOCATE ALL" => {
|
||||
self.clear_prepared_statement_cache();
|
||||
}
|
||||
"SET" => {
|
||||
// We don't detect set statements in transactions
|
||||
// No great way to differentiate between set and set local
|
||||
@@ -1132,6 +1137,12 @@ impl Server {
|
||||
has_it
|
||||
}
|
||||
|
||||
fn clear_prepared_statement_cache(&mut self) {
|
||||
if let Some(cache) = &mut self.prepared_statement_cache {
|
||||
cache.clear();
|
||||
}
|
||||
}
|
||||
|
||||
fn add_prepared_statement_to_cache(&mut self, name: &str) -> Option<String> {
|
||||
let cache = match &mut self.prepared_statement_cache {
|
||||
Some(cache) => cache,
|
||||
|
||||
145
tests/ruby/helpers/frontend_messages.rb
Normal file
145
tests/ruby/helpers/frontend_messages.rb
Normal file
@@ -0,0 +1,145 @@
|
||||
|
||||
class PostgresMessage
|
||||
# Base class for common functionality
|
||||
|
||||
def encode_string(str)
|
||||
"#{str}\0" # Encode a string with a null terminator
|
||||
end
|
||||
|
||||
def encode_int16(value)
|
||||
[value].pack('n') # Encode an Int16
|
||||
end
|
||||
|
||||
def encode_int32(value)
|
||||
[value].pack('N') # Encode an Int32
|
||||
end
|
||||
|
||||
def message_prefix(type, length)
|
||||
"#{type}#{encode_int32(length)}" # Message type and length prefix
|
||||
end
|
||||
end
|
||||
|
||||
class SimpleQueryMessage < PostgresMessage
|
||||
attr_accessor :query
|
||||
|
||||
def initialize(query = "")
|
||||
@query = query
|
||||
end
|
||||
|
||||
def to_bytes
|
||||
query_bytes = encode_string(@query)
|
||||
length = 4 + query_bytes.size # Length includes 4 bytes for length itself
|
||||
message_prefix('Q', length) + query_bytes
|
||||
end
|
||||
end
|
||||
|
||||
class ParseMessage < PostgresMessage
|
||||
attr_accessor :statement_name, :query, :parameter_types
|
||||
|
||||
def initialize(statement_name = "", query = "", parameter_types = [])
|
||||
@statement_name = statement_name
|
||||
@query = query
|
||||
@parameter_types = parameter_types
|
||||
end
|
||||
|
||||
def to_bytes
|
||||
statement_name_bytes = encode_string(@statement_name)
|
||||
query_bytes = encode_string(@query)
|
||||
parameter_types_bytes = @parameter_types.pack('N*')
|
||||
|
||||
length = 4 + statement_name_bytes.size + query_bytes.size + 2 + parameter_types_bytes.size
|
||||
message_prefix('P', length) + statement_name_bytes + query_bytes + encode_int16(@parameter_types.size) + parameter_types_bytes
|
||||
end
|
||||
end
|
||||
|
||||
class BindMessage < PostgresMessage
|
||||
attr_accessor :portal_name, :statement_name, :parameter_format_codes, :parameters, :result_column_format_codes
|
||||
|
||||
def initialize(portal_name = "", statement_name = "", parameter_format_codes = [], parameters = [], result_column_format_codes = [])
|
||||
@portal_name = portal_name
|
||||
@statement_name = statement_name
|
||||
@parameter_format_codes = parameter_format_codes
|
||||
@parameters = parameters
|
||||
@result_column_format_codes = result_column_format_codes
|
||||
end
|
||||
|
||||
def to_bytes
|
||||
portal_name_bytes = encode_string(@portal_name)
|
||||
statement_name_bytes = encode_string(@statement_name)
|
||||
parameter_format_codes_bytes = @parameter_format_codes.pack('n*')
|
||||
|
||||
parameters_bytes = @parameters.map do |param|
|
||||
if param.nil?
|
||||
encode_int32(-1)
|
||||
else
|
||||
encode_int32(param.bytesize) + param
|
||||
end
|
||||
end.join
|
||||
|
||||
result_column_format_codes_bytes = @result_column_format_codes.pack('n*')
|
||||
|
||||
length = 4 + portal_name_bytes.size + statement_name_bytes.size + 2 + parameter_format_codes_bytes.size + 2 + parameters_bytes.size + 2 + result_column_format_codes_bytes.size
|
||||
message_prefix('B', length) + portal_name_bytes + statement_name_bytes + encode_int16(@parameter_format_codes.size) + parameter_format_codes_bytes + encode_int16(@parameters.size) + parameters_bytes + encode_int16(@result_column_format_codes.size) + result_column_format_codes_bytes
|
||||
end
|
||||
end
|
||||
|
||||
class DescribeMessage < PostgresMessage
|
||||
attr_accessor :type, :name
|
||||
|
||||
def initialize(type = 'S', name = "")
|
||||
@type = type
|
||||
@name = name
|
||||
end
|
||||
|
||||
def to_bytes
|
||||
name_bytes = encode_string(@name)
|
||||
length = 4 + 1 + name_bytes.size
|
||||
message_prefix('D', length) + @type + name_bytes
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
class ExecuteMessage < PostgresMessage
|
||||
attr_accessor :portal_name, :max_rows
|
||||
|
||||
def initialize(portal_name = "", max_rows = 0)
|
||||
@portal_name = portal_name
|
||||
@max_rows = max_rows
|
||||
end
|
||||
|
||||
def to_bytes
|
||||
portal_name_bytes = encode_string(@portal_name)
|
||||
length = 4 + portal_name_bytes.size + 4
|
||||
message_prefix('E', length) + portal_name_bytes + encode_int32(@max_rows)
|
||||
end
|
||||
end
|
||||
|
||||
class FlushMessage < PostgresMessage
|
||||
def to_bytes
|
||||
length = 4
|
||||
message_prefix('H', length)
|
||||
end
|
||||
end
|
||||
|
||||
class SyncMessage < PostgresMessage
|
||||
def to_bytes
|
||||
length = 4
|
||||
message_prefix('S', length)
|
||||
end
|
||||
end
|
||||
|
||||
class CloseMessage < PostgresMessage
|
||||
attr_accessor :type, :name
|
||||
|
||||
def initialize(type = 'S', name = "")
|
||||
@type = type
|
||||
@name = name
|
||||
end
|
||||
|
||||
def to_bytes
|
||||
name_bytes = encode_string(@name)
|
||||
length = 4 + 1 + name_bytes.size
|
||||
message_prefix('C', length) + @type + name_bytes
|
||||
end
|
||||
end
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
require 'socket'
|
||||
require 'digest/md5'
|
||||
require_relative 'frontend_messages'
|
||||
|
||||
BACKEND_MESSAGE_CODES = {
|
||||
'Z' => "ReadyForQuery",
|
||||
@@ -18,9 +19,13 @@ class PostgresSocket
|
||||
@host = host
|
||||
@socket = TCPSocket.new @host, @port
|
||||
@parameters = {}
|
||||
@verbose = true
|
||||
@verbose = false
|
||||
end
|
||||
|
||||
def send_message(message)
|
||||
@socket.write(message.to_bytes)
|
||||
end
|
||||
|
||||
def send_md5_password_message(username, password, salt)
|
||||
m = Digest::MD5.hexdigest(password + username)
|
||||
m = Digest::MD5.hexdigest(m + salt.map(&:chr).join(""))
|
||||
@@ -113,107 +118,6 @@ class PostgresSocket
|
||||
log "[F] Sent CancelRequest message"
|
||||
end
|
||||
|
||||
def send_query_message(query)
|
||||
query_size = query.length
|
||||
message_size = 1 + 4 + query_size
|
||||
message = []
|
||||
message << "Q".ord
|
||||
message << [message_size].pack('l>').unpack('CCCC') # 4
|
||||
message << query.split('').map(&:ord) # 2, 11
|
||||
message << 0 # 1, 12
|
||||
message.flatten!
|
||||
@socket.write(message.flatten.pack('C*'))
|
||||
log "[F] Sent Q message (#{query})"
|
||||
end
|
||||
|
||||
def send_parse_message(query)
|
||||
query_size = query.length
|
||||
message_size = 2 + 2 + 4 + query_size
|
||||
message = []
|
||||
message << "P".ord
|
||||
message << [message_size].pack('l>').unpack('CCCC') # 4
|
||||
message << 0 # unnamed statement
|
||||
message << query.split('').map(&:ord) # 2, 11
|
||||
message << 0 # 1, 12
|
||||
message << [0, 0]
|
||||
message.flatten!
|
||||
@socket.write(message.flatten.pack('C*'))
|
||||
log "[F] Sent P message (#{query})"
|
||||
end
|
||||
|
||||
def send_bind_message
|
||||
message = []
|
||||
message << "B".ord
|
||||
message << [12].pack('l>').unpack('CCCC') # 4
|
||||
message << 0 # unnamed statement
|
||||
message << 0 # unnamed statement
|
||||
message << [0, 0] # 2
|
||||
message << [0, 0] # 2
|
||||
message << [0, 0] # 2
|
||||
message.flatten!
|
||||
@socket.write(message.flatten.pack('C*'))
|
||||
log "[F] Sent B message"
|
||||
end
|
||||
|
||||
def send_describe_message(mode)
|
||||
message = []
|
||||
message << "D".ord
|
||||
message << [6].pack('l>').unpack('CCCC') # 4
|
||||
message << mode.ord
|
||||
message << 0 # unnamed statement
|
||||
message.flatten!
|
||||
@socket.write(message.flatten.pack('C*'))
|
||||
log "[F] Sent D message"
|
||||
end
|
||||
|
||||
def send_execute_message(limit=0)
|
||||
message = []
|
||||
message << "E".ord
|
||||
message << [9].pack('l>').unpack('CCCC') # 4
|
||||
message << 0 # unnamed statement
|
||||
message << [limit].pack('l>').unpack('CCCC') # 4
|
||||
message.flatten!
|
||||
@socket.write(message.flatten.pack('C*'))
|
||||
log "[F] Sent E message"
|
||||
end
|
||||
|
||||
def send_sync_message
|
||||
message = []
|
||||
message << "S".ord
|
||||
message << [4].pack('l>').unpack('CCCC') # 4
|
||||
message.flatten!
|
||||
@socket.write(message.flatten.pack('C*'))
|
||||
log "[F] Sent S message"
|
||||
end
|
||||
|
||||
def send_copydone_message
|
||||
message = []
|
||||
message << "c".ord
|
||||
message << [4].pack('l>').unpack('CCCC') # 4
|
||||
message.flatten!
|
||||
@socket.write(message.flatten.pack('C*'))
|
||||
log "[F] Sent c message"
|
||||
end
|
||||
|
||||
def send_copyfail_message
|
||||
message = []
|
||||
message << "f".ord
|
||||
message << [5].pack('l>').unpack('CCCC') # 4
|
||||
message << 0
|
||||
message.flatten!
|
||||
@socket.write(message.flatten.pack('C*'))
|
||||
log "[F] Sent f message"
|
||||
end
|
||||
|
||||
def send_flush_message
|
||||
message = []
|
||||
message << "H".ord
|
||||
message << [4].pack('l>').unpack('CCCC') # 4
|
||||
message.flatten!
|
||||
@socket.write(message.flatten.pack('C*'))
|
||||
log "[F] Sent H message"
|
||||
end
|
||||
|
||||
def read_from_server()
|
||||
output_messages = []
|
||||
retry_count = 0
|
||||
|
||||
@@ -16,10 +16,14 @@ describe "Portocol handling" do
|
||||
end
|
||||
|
||||
def run_comparison(sequence, socket_a, socket_b)
|
||||
sequence.each do |msg, *args|
|
||||
socket_a.send(msg, *args)
|
||||
socket_b.send(msg, *args)
|
||||
|
||||
sequence.each do |msg|
|
||||
if msg.is_a?(Symbol)
|
||||
socket_a.send(msg)
|
||||
socket_b.send(msg)
|
||||
else
|
||||
socket_a.send_message(msg)
|
||||
socket_b.send_message(msg)
|
||||
end
|
||||
compare_messages(
|
||||
socket_a.read_from_server,
|
||||
socket_b.read_from_server
|
||||
@@ -83,9 +87,9 @@ describe "Portocol handling" do
|
||||
|
||||
context "Cancel Query" do
|
||||
let(:sequence) {
|
||||
[
|
||||
[:send_query_message, "SELECT pg_sleep(5)"],
|
||||
[:cancel_query]
|
||||
[
|
||||
SimpleQueryMessage.new("SELECT pg_sleep(5)"),
|
||||
:cancel_query
|
||||
]
|
||||
}
|
||||
|
||||
@@ -95,12 +99,12 @@ describe "Portocol handling" do
|
||||
xcontext "Simple query after parse" do
|
||||
let(:sequence) {
|
||||
[
|
||||
[:send_parse_message, "SELECT 5"],
|
||||
[:send_query_message, "SELECT 1"],
|
||||
[:send_bind_message],
|
||||
[:send_describe_message, "P"],
|
||||
[:send_execute_message],
|
||||
[:send_sync_message],
|
||||
ParseMessage.new("", "SELECT 5", []),
|
||||
SimpleQueryMessage.new("SELECT 1"),
|
||||
BindMessage.new("", "", [], [], [0]),
|
||||
DescribeMessage.new("P", ""),
|
||||
ExecuteMessage.new("", 1),
|
||||
SyncMessage.new
|
||||
]
|
||||
}
|
||||
|
||||
@@ -111,8 +115,8 @@ describe "Portocol handling" do
|
||||
xcontext "Flush message" do
|
||||
let(:sequence) {
|
||||
[
|
||||
[:send_parse_message, "SELECT 1"],
|
||||
[:send_flush_message]
|
||||
ParseMessage.new("", "SELECT 1", []),
|
||||
FlushMessage.new
|
||||
]
|
||||
}
|
||||
|
||||
@@ -122,9 +126,7 @@ describe "Portocol handling" do
|
||||
|
||||
xcontext "Bind without parse" do
|
||||
let(:sequence) {
|
||||
[
|
||||
[:send_bind_message]
|
||||
]
|
||||
[BindMessage.new("", "", [], [], [0])]
|
||||
}
|
||||
# This is known to fail.
|
||||
# Server responds immediately, Proxy buffers the message
|
||||
@@ -133,23 +135,155 @@ describe "Portocol handling" do
|
||||
|
||||
context "Simple message" do
|
||||
let(:sequence) {
|
||||
[[:send_query_message, "SELECT 1"]]
|
||||
[SimpleQueryMessage.new("SELECT 1")]
|
||||
}
|
||||
|
||||
it_behaves_like "at parity with database"
|
||||
end
|
||||
|
||||
10.times do |i|
|
||||
context "Extended protocol" do
|
||||
let(:sequence) {
|
||||
[
|
||||
ParseMessage.new("", "SELECT 1", []),
|
||||
BindMessage.new("", "", [], [], [0]),
|
||||
DescribeMessage.new("S", ""),
|
||||
ExecuteMessage.new("", 1),
|
||||
SyncMessage.new
|
||||
]
|
||||
}
|
||||
|
||||
context "Extended protocol" do
|
||||
let(:sequence) {
|
||||
[
|
||||
[:send_parse_message, "SELECT 1"],
|
||||
[:send_bind_message],
|
||||
[:send_describe_message, "P"],
|
||||
[:send_execute_message],
|
||||
[:send_sync_message],
|
||||
]
|
||||
}
|
||||
|
||||
it_behaves_like "at parity with database"
|
||||
it_behaves_like "at parity with database"
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe "Protocol-level prepared statements" do
|
||||
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 1, "transaction") }
|
||||
before do
|
||||
q_sock = PostgresSocket.new('localhost', processes.pgcat.port)
|
||||
q_sock.send_startup_message("sharding_user", "sharded_db", "sharding_user")
|
||||
table_query = "CREATE TABLE IF NOT EXISTS employees (employee_id SERIAL PRIMARY KEY, salary NUMERIC(10, 2) CHECK (salary > 0));"
|
||||
q_sock.send_message(SimpleQueryMessage.new(table_query))
|
||||
q_sock.close
|
||||
|
||||
current_configs = processes.pgcat.current_config
|
||||
current_configs["pools"]["sharded_db"]["prepared_statements_cache_size"] = 500
|
||||
processes.pgcat.update_config(current_configs)
|
||||
processes.pgcat.reload_config
|
||||
end
|
||||
after do
|
||||
q_sock = PostgresSocket.new('localhost', processes.pgcat.port)
|
||||
q_sock.send_startup_message("sharding_user", "sharded_db", "sharding_user")
|
||||
table_query = "DROP TABLE IF EXISTS employees;"
|
||||
q_sock.send_message(SimpleQueryMessage.new(table_query))
|
||||
q_sock.close
|
||||
end
|
||||
|
||||
context "When unnamed prepared statements are used" do
|
||||
it "does not cache them" do
|
||||
socket = PostgresSocket.new('localhost', processes.pgcat.port)
|
||||
socket.send_startup_message("sharding_user", "sharded_db", "sharding_user")
|
||||
|
||||
socket.send_message(SimpleQueryMessage.new("DISCARD ALL"))
|
||||
socket.read_from_server
|
||||
|
||||
10.times do |i|
|
||||
socket.send_message(ParseMessage.new("", "SELECT #{i}", []))
|
||||
socket.send_message(BindMessage.new("", "", [], [], [0]))
|
||||
socket.send_message(DescribeMessage.new("S", ""))
|
||||
socket.send_message(ExecuteMessage.new("", 1))
|
||||
socket.send_message(SyncMessage.new)
|
||||
socket.read_from_server
|
||||
end
|
||||
|
||||
socket.send_message(SimpleQueryMessage.new("SELECT name, statement, prepare_time, parameter_types FROM pg_prepared_statements"))
|
||||
result = socket.read_from_server
|
||||
number_of_saved_statements = result.count { |m| m[:code] == 'D' }
|
||||
expect(number_of_saved_statements).to eq(0)
|
||||
end
|
||||
end
|
||||
|
||||
context "When named prepared statements are used" do
|
||||
it "caches them" do
|
||||
socket = PostgresSocket.new('localhost', processes.pgcat.port)
|
||||
socket.send_startup_message("sharding_user", "sharded_db", "sharding_user")
|
||||
|
||||
socket.send_message(SimpleQueryMessage.new("DISCARD ALL"))
|
||||
socket.read_from_server
|
||||
|
||||
3.times do
|
||||
socket.send_message(ParseMessage.new("my_query", "SELECT * FROM employees WHERE employee_id in ($1,$2,$3)", [0,0,0]))
|
||||
socket.send_message(BindMessage.new("", "my_query", [0,0,0], [0,0,0].map(&:to_s), [0,0,0,0,0,0]))
|
||||
socket.send_message(SyncMessage.new)
|
||||
socket.read_from_server
|
||||
end
|
||||
|
||||
3.times do
|
||||
socket.send_message(ParseMessage.new("my_other_query", "SELECT * FROM employees WHERE salary in ($1,$2,$3)", [0,0,0]))
|
||||
socket.send_message(BindMessage.new("", "my_other_query", [0,0,0], [0,0,0].map(&:to_s), [0,0,0,0,0,0]))
|
||||
socket.send_message(SyncMessage.new)
|
||||
socket.read_from_server
|
||||
end
|
||||
|
||||
socket.send_message(SimpleQueryMessage.new("SELECT name, statement, prepare_time, parameter_types FROM pg_prepared_statements"))
|
||||
result = socket.read_from_server
|
||||
number_of_saved_statements = result.count { |m| m[:code] == 'D' }
|
||||
expect(number_of_saved_statements).to eq(2)
|
||||
end
|
||||
end
|
||||
|
||||
context "When DISCARD ALL/DEALLOCATE ALL are called" do
|
||||
it "resets server and client caches" do
|
||||
socket = PostgresSocket.new('localhost', processes.pgcat.port)
|
||||
socket.send_startup_message("sharding_user", "sharded_db", "sharding_user")
|
||||
|
||||
20.times do |i|
|
||||
socket.send_message(ParseMessage.new("my_query_#{i}", "SELECT * FROM employees WHERE employee_id in ($1,$2,$3)", [0,0,0]))
|
||||
end
|
||||
|
||||
20.times do |i|
|
||||
socket.send_message(BindMessage.new("", "my_query_#{i}", [0,0,0], [0,0,0].map(&:to_s), [0,0]))
|
||||
end
|
||||
|
||||
socket.send_message(SyncMessage.new)
|
||||
socket.read_from_server
|
||||
|
||||
socket.send_message(SimpleQueryMessage.new("DISCARD ALL"))
|
||||
socket.read_from_server
|
||||
responses = []
|
||||
4.times do |i|
|
||||
socket.send_message(ParseMessage.new("my_query_#{i}", "SELECT * FROM employees WHERE employee_id in ($1,$2,$3)", [0,0,0]))
|
||||
socket.send_message(BindMessage.new("", "my_query_#{i}", [0,0,0], [0,0,0].map(&:to_s), [0,0]))
|
||||
socket.send_message(SyncMessage.new)
|
||||
|
||||
responses += socket.read_from_server
|
||||
end
|
||||
|
||||
errors = responses.select { |message| message[:code] == 'E' }
|
||||
error_message = errors.map { |message| message[:bytes].map(&:chr).join("") }.join("\n")
|
||||
raise StandardError, "Encountered the following errors: #{error_message}" if errors.length > 0
|
||||
end
|
||||
end
|
||||
|
||||
context "Maximum number of bound paramters" do
|
||||
it "does not crash" do
|
||||
test_socket = PostgresSocket.new('localhost', processes.pgcat.port)
|
||||
test_socket.send_startup_message("sharding_user", "sharded_db", "sharding_user")
|
||||
|
||||
types = Array.new(65_535) { |i| 0 }
|
||||
|
||||
params = Array.new(65_535) { |i| "$#{i+1}" }.join(",")
|
||||
test_socket.send_message(ParseMessage.new("my_query", "SELECT * FROM employees WHERE employee_id in (#{params})", types))
|
||||
|
||||
test_socket.send_message(BindMessage.new("my_query", "my_query", types, types.map(&:to_s), types))
|
||||
|
||||
test_socket.send_message(SyncMessage.new)
|
||||
|
||||
# If the proxy crashes, this will raise an error
|
||||
expect { test_socket.read_from_server }.to_not raise_error
|
||||
|
||||
test_socket.close
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
Reference in New Issue
Block a user