diff --git a/.circleci/run_tests.sh b/.circleci/run_tests.sh index 22ad483..12d19a4 100644 --- a/.circleci/run_tests.sh +++ b/.circleci/run_tests.sh @@ -62,6 +62,10 @@ psql -e -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW STATS' > /dev/null psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'RELOAD' > /dev/null psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW CONFIG' > /dev/null psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW DATABASES' > /dev/null +psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW LISTS' > /dev/null +psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW POOLS' > /dev/null +psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW VERSION' > /dev/null +psql -h 127.0.0.1 -p 6432 -d pgbouncer -c "SET client_encoding TO 'utf8'" > /dev/null # will ignore (! psql -e -h 127.0.0.1 -p 6432 -d random_db -c 'SHOW STATS' > /dev/null) # Start PgCat in debug to demonstrate failover better diff --git a/src/admin.rs b/src/admin.rs index 0d2e197..3301c64 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -4,7 +4,7 @@ use tokio::net::tcp::OwnedWriteHalf; use std::collections::HashMap; -use crate::config::{get_config, parse, Role}; +use crate::config::{get_config, parse}; use crate::errors::Error; use crate::messages::*; use crate::pool::ConnectionPool; @@ -41,6 +41,15 @@ pub async fn handle_admin( } else if query.starts_with("SHOW DATABASES") { trace!("SHOW DATABASES"); show_databases(stream, &pool).await + } else if query.starts_with("SHOW POOLS") { + trace!("SHOW POOLS"); + show_pools(stream, &pool).await + } else if query.starts_with("SHOW LISTS") { + trace!("SHOW LISTS"); + show_lists(stream, &pool).await + } else if query.starts_with("SHOW VERSION") { + trace!("SHOW VERSION"); + show_version(stream).await } else if query.starts_with("SET ") { trace!("SET"); ignore_set(stream).await @@ -49,6 +58,118 @@ pub async fn handle_admin( } } +/// SHOW LISTS +async fn show_lists(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> { + let stats = get_stats(); + + let columns = vec![("list", DataType::Text), ("items", DataType::Int4)]; + + let mut res = BytesMut::new(); + res.put(row_description(&columns)); + res.put(data_row(&vec![ + "databases".to_string(), + pool.databases().to_string(), + ])); + res.put(data_row(&vec!["users".to_string(), "1".to_string()])); + res.put(data_row(&vec![ + "pools".to_string(), + pool.databases().to_string(), + ])); + res.put(data_row(&vec![ + "free_clients".to_string(), + stats["cl_idle"].to_string(), + ])); + res.put(data_row(&vec![ + "used_clients".to_string(), + stats["cl_active"].to_string(), + ])); + res.put(data_row(&vec![ + "login_clients".to_string(), + "0".to_string(), + ])); + res.put(data_row(&vec![ + "free_servers".to_string(), + stats["sv_idle"].to_string(), + ])); + res.put(data_row(&vec![ + "used_servers".to_string(), + stats["sv_active"].to_string(), + ])); + res.put(data_row(&vec!["dns_names".to_string(), "0".to_string()])); + res.put(data_row(&vec!["dns_zones".to_string(), "0".to_string()])); + res.put(data_row(&vec!["dns_queries".to_string(), "0".to_string()])); + res.put(data_row(&vec!["dns_pending".to_string(), "0".to_string()])); + + res.put(command_complete("SHOW")); + + res.put_u8(b'Z'); + res.put_i32(5); + res.put_u8(b'I'); + + write_all_half(stream, res).await +} + +/// SHOW VERSION +async fn show_version(stream: &mut OwnedWriteHalf) -> Result<(), Error> { + let mut res = BytesMut::new(); + + res.put(row_description(&vec![("version", DataType::Text)])); + res.put(data_row(&vec!["PgCat 0.1.0".to_string()])); + res.put(command_complete("SHOW")); + + res.put_u8(b'Z'); + res.put_i32(5); + res.put_u8(b'I'); + + write_all_half(stream, res).await +} + +/// SHOW POOLS +async fn show_pools(stream: &mut OwnedWriteHalf, _pool: &ConnectionPool) -> Result<(), Error> { + let stats = get_stats(); + let config = { + let guard = get_config(); + &*guard.clone() + }; + + let columns = vec![ + ("database", DataType::Text), + ("user", DataType::Text), + ("cl_active", DataType::Numeric), + ("cl_waiting", DataType::Numeric), + ("cl_cancel_req", DataType::Numeric), + ("sv_active", DataType::Numeric), + ("sv_idle", DataType::Numeric), + ("sv_used", DataType::Numeric), + ("sv_tested", DataType::Numeric), + ("sv_login", DataType::Numeric), + ("maxwait", DataType::Numeric), + ("maxwait_us", DataType::Numeric), + ("pool_mode", DataType::Text), + ]; + + let mut res = BytesMut::new(); + res.put(row_description(&columns)); + + let mut row = vec![String::from("all"), config.user.name.clone()]; + + for column in &columns[2..columns.len() - 1] { + let value = stats.get(column.0).unwrap_or(&0).to_string(); + row.push(value); + } + + row.push(config.general.pool_mode.to_string()); + + res.put(data_row(&row)); + res.put(command_complete("SHOW")); + + res.put_u8(b'Z'); + res.put_i32(5); + res.put_u8(b'I'); + + write_all_half(stream, res).await +} + /// SHOW DATABASES async fn show_databases(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> Result<(), Error> { let guard = get_config(); @@ -79,23 +200,13 @@ async fn show_databases(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> R for shard in 0..pool.shards() { let database_name = &config.shards[&shard.to_string()].database; - let mut replica_count = 0; for server in 0..pool.servers(shard) { let address = pool.address(shard, server); - let name = match address.role { - Role::Primary => format!("shard_{}_primary", shard), - - Role::Replica => { - let name = format!("shard_{}_replica_{}", shard, replica_count); - replica_count += 1; - name - } - }; let pool_state = pool.pool_state(shard, server); res.put(data_row(&vec![ - name, // name + address.name(), // name address.host.to_string(), // host address.port.to_string(), // port database_name.to_string(), // database @@ -222,7 +333,7 @@ async fn show_stats(stream: &mut OwnedWriteHalf) -> Result<(), Error> { res.put(row_description(&columns)); let mut row = vec![ - String::from("all shards"), // TODO: per-database stats, + String::from("all"), // TODO: per-database stats, ]; for column in &columns[1..] { diff --git a/src/config.rs b/src/config.rs index 1687ff3..663a9f4 100644 --- a/src/config.rs +++ b/src/config.rs @@ -52,6 +52,7 @@ pub struct Address { pub port: String, pub shard: usize, pub role: Role, + pub replica_number: usize, } impl Default for Address { @@ -60,11 +61,22 @@ impl Default for Address { host: String::from("127.0.0.1"), port: String::from("5432"), shard: 0, + replica_number: 0, role: Role::Replica, } } } +impl Address { + pub fn name(&self) -> String { + match self.role { + Role::Primary => format!("shard_{}_primary", self.shard), + + Role::Replica => format!("shard_{}_replica_{}", self.shard, self.replica_number), + } + } +} + #[derive(Clone, PartialEq, Hash, std::cmp::Eq, Deserialize, Debug)] pub struct User { pub name: String, diff --git a/src/pool.rs b/src/pool.rs index 92b7663..b8a0ecf 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -49,7 +49,8 @@ impl ConnectionPool { for shard_idx in shard_ids { let shard = &config.shards[&shard_idx]; let mut pools = Vec::new(); - let mut replica_addresses = Vec::new(); + let mut servers = Vec::new(); + let mut replica_number = 0; for server in shard.servers.iter() { let role = match server.2.as_ref() { @@ -65,9 +66,14 @@ impl ConnectionPool { host: server.0.clone(), port: server.1.to_string(), role: role, + replica_number, shard: shard_idx.parse::().unwrap(), }; + if role == Role::Replica { + replica_number += 1; + } + let manager = ServerPool::new( address.clone(), config.user.clone(), @@ -87,11 +93,11 @@ impl ConnectionPool { .unwrap(); pools.push(pool); - replica_addresses.push(address); + servers.push(address); } shards.push(pools); - addresses.push(replica_addresses); + addresses.push(servers); banlist.push(HashMap::new()); } @@ -337,6 +343,14 @@ impl ConnectionPool { self.addresses[shard].len() } + pub fn databases(&self) -> usize { + let mut databases = 0; + for shard in 0..self.shards() { + databases += self.servers(shard); + } + databases + } + pub fn pool_state(&self, shard: usize, server: usize) -> bb8::State { self.databases[shard][server].state() }