mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-26 10:26:30 +00:00
Minor refactor for configs (#172)
* Changes shard struct to use vector of ServerConfig * Adds to query router * Change client disconnect with error message to warn instead of debug * Add warning logs for clean up actions
This commit is contained in:
@@ -23,7 +23,9 @@ static CONFIG: Lazy<ArcSwap<Config>> = Lazy::new(|| ArcSwap::from_pointee(Config
|
|||||||
/// Server role: primary or replica.
|
/// Server role: primary or replica.
|
||||||
#[derive(Clone, PartialEq, Serialize, Deserialize, Hash, std::cmp::Eq, Debug, Copy)]
|
#[derive(Clone, PartialEq, Serialize, Deserialize, Hash, std::cmp::Eq, Debug, Copy)]
|
||||||
pub enum Role {
|
pub enum Role {
|
||||||
|
#[serde(alias = "primary", alias = "Primary")]
|
||||||
Primary,
|
Primary,
|
||||||
|
#[serde(alias = "replica", alias = "Replica")]
|
||||||
Replica,
|
Replica,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -202,17 +204,28 @@ impl Default for Pool {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, PartialEq, Serialize, Deserialize, Debug, Hash, Eq)]
|
||||||
|
pub struct ServerConfig {
|
||||||
|
pub host: String,
|
||||||
|
pub port: u16,
|
||||||
|
pub role: Role,
|
||||||
|
}
|
||||||
|
|
||||||
/// Shard configuration.
|
/// Shard configuration.
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
||||||
pub struct Shard {
|
pub struct Shard {
|
||||||
pub database: String,
|
pub database: String,
|
||||||
pub servers: Vec<(String, u16, String)>,
|
pub servers: Vec<ServerConfig>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for Shard {
|
impl Default for Shard {
|
||||||
fn default() -> Shard {
|
fn default() -> Shard {
|
||||||
Shard {
|
Shard {
|
||||||
servers: vec![(String::from("localhost"), 5432, String::from("primary"))],
|
servers: vec![ServerConfig {
|
||||||
|
host: String::from("localhost"),
|
||||||
|
port: 5432,
|
||||||
|
role: Role::Primary,
|
||||||
|
}],
|
||||||
database: String::from("postgres"),
|
database: String::from("postgres"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -538,23 +551,10 @@ pub async fn parse(path: &str) -> Result<(), Error> {
|
|||||||
dup_check.insert(server);
|
dup_check.insert(server);
|
||||||
|
|
||||||
// Check that we define only zero or one primary.
|
// Check that we define only zero or one primary.
|
||||||
match server.2.as_ref() {
|
match server.role {
|
||||||
"primary" => primary_count += 1,
|
Role::Primary => primary_count += 1,
|
||||||
_ => (),
|
_ => (),
|
||||||
};
|
};
|
||||||
|
|
||||||
// Check role spelling.
|
|
||||||
match server.2.as_ref() {
|
|
||||||
"primary" => (),
|
|
||||||
"replica" => (),
|
|
||||||
_ => {
|
|
||||||
error!(
|
|
||||||
"Shard {} server role must be either 'primary' or 'replica', got: '{}'",
|
|
||||||
shard.0, server.2
|
|
||||||
);
|
|
||||||
return Err(Error::BadConfig);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if primary_count > 1 {
|
if primary_count > 1 {
|
||||||
@@ -617,12 +617,12 @@ mod test {
|
|||||||
assert_eq!(get_config().pools["simple_db"].users.len(), 1);
|
assert_eq!(get_config().pools["simple_db"].users.len(), 1);
|
||||||
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
get_config().pools["sharded_db"].shards["0"].servers[0].0,
|
get_config().pools["sharded_db"].shards["0"].servers[0].host,
|
||||||
"127.0.0.1"
|
"127.0.0.1"
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
get_config().pools["sharded_db"].shards["1"].servers[0].2,
|
get_config().pools["sharded_db"].shards["1"].servers[0].role,
|
||||||
"primary"
|
Role::Primary
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
get_config().pools["sharded_db"].shards["1"].database,
|
get_config().pools["sharded_db"].shards["1"].database,
|
||||||
@@ -640,11 +640,11 @@ mod test {
|
|||||||
assert_eq!(get_config().pools["sharded_db"].default_role, "any");
|
assert_eq!(get_config().pools["sharded_db"].default_role, "any");
|
||||||
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
get_config().pools["simple_db"].shards["0"].servers[0].0,
|
get_config().pools["simple_db"].shards["0"].servers[0].host,
|
||||||
"127.0.0.1"
|
"127.0.0.1"
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
get_config().pools["simple_db"].shards["0"].servers[0].1,
|
get_config().pools["simple_db"].shards["0"].servers[0].port,
|
||||||
5432
|
5432
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
|
|||||||
@@ -37,7 +37,7 @@ extern crate tokio;
|
|||||||
extern crate tokio_rustls;
|
extern crate tokio_rustls;
|
||||||
extern crate toml;
|
extern crate toml;
|
||||||
|
|
||||||
use log::{debug, error, info};
|
use log::{error, info, warn};
|
||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
use pgcat::format_duration;
|
use pgcat::format_duration;
|
||||||
use tokio::net::TcpListener;
|
use tokio::net::TcpListener;
|
||||||
@@ -279,7 +279,7 @@ async fn main() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!("Client disconnected with error {:?}", err);
|
warn!("Client disconnected with error {:?}", err);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
});
|
});
|
||||||
|
|||||||
17
src/pool.rs
17
src/pool.rs
@@ -143,21 +143,12 @@ impl ConnectionPool {
|
|||||||
let mut replica_number = 0;
|
let mut replica_number = 0;
|
||||||
|
|
||||||
for server in shard.servers.iter() {
|
for server in shard.servers.iter() {
|
||||||
let role = match server.2.as_ref() {
|
|
||||||
"primary" => Role::Primary,
|
|
||||||
"replica" => Role::Replica,
|
|
||||||
_ => {
|
|
||||||
error!("Config error: server role can be 'primary' or 'replica', have: '{}'. Defaulting to 'replica'.", server.2);
|
|
||||||
Role::Replica
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let address = Address {
|
let address = Address {
|
||||||
id: address_id,
|
id: address_id,
|
||||||
database: shard.database.clone(),
|
database: shard.database.clone(),
|
||||||
host: server.0.clone(),
|
host: server.host.clone(),
|
||||||
port: server.1 as u16,
|
port: server.port,
|
||||||
role: role,
|
role: server.role,
|
||||||
address_index,
|
address_index,
|
||||||
replica_number,
|
replica_number,
|
||||||
shard: shard_idx.parse::<usize>().unwrap(),
|
shard: shard_idx.parse::<usize>().unwrap(),
|
||||||
@@ -168,7 +159,7 @@ impl ConnectionPool {
|
|||||||
address_id += 1;
|
address_id += 1;
|
||||||
address_index += 1;
|
address_index += 1;
|
||||||
|
|
||||||
if role == Role::Replica {
|
if server.role == Role::Replica {
|
||||||
replica_number += 1;
|
replica_number += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -169,8 +169,8 @@ impl QueryRouter {
|
|||||||
|
|
||||||
Command::ShowShard => self.shard().to_string(),
|
Command::ShowShard => self.shard().to_string(),
|
||||||
Command::ShowServerRole => match self.active_role {
|
Command::ShowServerRole => match self.active_role {
|
||||||
Some(Role::Primary) => String::from("primary"),
|
Some(Role::Primary) => Role::Primary.to_string(),
|
||||||
Some(Role::Replica) => String::from("replica"),
|
Some(Role::Replica) => Role::Replica.to_string(),
|
||||||
None => {
|
None => {
|
||||||
if self.query_parser_enabled {
|
if self.query_parser_enabled {
|
||||||
String::from("auto")
|
String::from("auto")
|
||||||
|
|||||||
@@ -591,6 +591,7 @@ impl Server {
|
|||||||
// server connection thrashing if clients repeatedly do this.
|
// server connection thrashing if clients repeatedly do this.
|
||||||
// Instead, we ROLLBACK that transaction before putting the connection back in the pool
|
// Instead, we ROLLBACK that transaction before putting the connection back in the pool
|
||||||
if self.in_transaction() {
|
if self.in_transaction() {
|
||||||
|
warn!("Server returned while still in transaction, rolling back transaction");
|
||||||
self.query("ROLLBACK").await?;
|
self.query("ROLLBACK").await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -600,6 +601,7 @@ impl Server {
|
|||||||
// send `DISCARD ALL` if we think the session is altered instead of just sending
|
// send `DISCARD ALL` if we think the session is altered instead of just sending
|
||||||
// it before each checkin.
|
// it before each checkin.
|
||||||
if self.needs_cleanup {
|
if self.needs_cleanup {
|
||||||
|
warn!("Server returned with session state altered, discarding state");
|
||||||
self.query("DISCARD ALL").await?;
|
self.query("DISCARD ALL").await?;
|
||||||
self.needs_cleanup = false;
|
self.needs_cleanup = false;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user