Files
pgcat/src/main.rs

217 lines
6.4 KiB
Rust
Raw Normal View History

2022-02-08 14:59:44 -08:00
// PgCat, a PostgreSQL pooler with load balancing, failover, and sharding support.
// Copyright (C) 2022 Lev Kokotov <lev@levthe.dev>
2022-02-08 14:59:10 -08:00
2022-02-08 14:59:44 -08:00
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
2022-02-08 14:59:10 -08:00
2022-02-08 14:59:44 -08:00
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
2022-02-08 15:00:16 -08:00
// GNU General Public License for more details.
2022-02-08 14:59:10 -08:00
2022-02-08 14:59:44 -08:00
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
extern crate async_trait;
extern crate bb8;
2022-02-03 13:35:40 -08:00
extern crate bytes;
2022-02-03 15:17:04 -08:00
extern crate md5;
2022-02-10 17:05:20 -08:00
extern crate num_cpus;
extern crate once_cell;
2022-02-08 09:25:59 -08:00
extern crate serde;
extern crate serde_derive;
2022-02-03 13:35:40 -08:00
extern crate tokio;
2022-02-08 09:25:59 -08:00
extern crate toml;
2022-02-03 13:35:40 -08:00
2022-02-10 17:05:20 -08:00
use regex::Regex;
use tokio::net::TcpListener;
use tokio::signal;
2022-02-03 13:35:40 -08:00
2022-02-04 09:28:52 -08:00
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
2022-02-03 15:17:04 -08:00
mod client;
2022-02-05 10:02:13 -08:00
mod config;
2022-02-03 13:35:40 -08:00
mod errors;
mod messages;
2022-02-03 16:25:05 -08:00
mod pool;
mod server;
2022-02-05 19:43:48 -08:00
mod sharding;
2022-02-03 13:35:40 -08:00
2022-02-05 10:02:13 -08:00
// Support for query cancellation: this maps our process_ids and
// secret keys to the backend's.
2022-02-11 11:19:40 -08:00
use config::Role;
2022-02-05 18:20:53 -08:00
use pool::{ClientServerMap, ConnectionPool};
2022-02-05 13:15:53 -08:00
/// Main!
2022-02-10 13:48:56 -08:00
#[tokio::main(worker_threads = 4)]
2022-02-03 13:35:40 -08:00
async fn main() {
2022-02-05 10:02:13 -08:00
println!("> Welcome to PgCat! Meow.");
2022-02-03 13:35:40 -08:00
2022-02-10 17:05:20 -08:00
client::SHARDING_REGEX_RE
.set(Regex::new(client::SHARDING_REGEX).unwrap())
.unwrap();
client::ROLE_REGEX_RE
.set(Regex::new(client::ROLE_REGEX).unwrap())
.unwrap();
2022-02-08 09:25:59 -08:00
let config = match config::parse("pgcat.toml").await {
Ok(config) => config,
Err(err) => {
2022-02-08 17:08:17 -08:00
println!("> Config parse error: {:?}", err);
2022-02-08 09:25:59 -08:00
return;
}
};
let addr = format!("{}:{}", config.general.host, config.general.port);
let listener = match TcpListener::bind(&addr).await {
2022-02-03 13:35:40 -08:00
Ok(sock) => sock,
Err(err) => {
println!("> Error: {:?}", err);
return;
}
};
2022-02-05 10:02:13 -08:00
println!("> Running on {}", addr);
2022-02-08 09:25:59 -08:00
// Tracks which client is connected to which server for query cancellation.
2022-02-04 16:01:35 -08:00
let client_server_map: ClientServerMap = Arc::new(Mutex::new(HashMap::new()));
2022-02-05 10:02:13 -08:00
2022-02-08 09:25:59 -08:00
println!("> Pool size: {}", config.general.pool_size);
println!("> Pool mode: {}", config.general.pool_mode);
println!("> Ban time: {}s", config.general.ban_time);
println!(
"> Healthcheck timeout: {}ms",
config.general.healthcheck_timeout
);
2022-02-10 08:54:06 -08:00
println!("> Connection timeout: {}ms", config.general.connect_timeout);
2022-02-05 10:02:13 -08:00
let mut pool = ConnectionPool::from_config(config.clone(), client_server_map.clone()).await;
2022-02-08 09:25:59 -08:00
let transaction_mode = config.general.pool_mode == "transaction";
2022-02-11 11:19:40 -08:00
let default_server_role = match config.query_router.default_role.as_ref() {
"any" => None,
"primary" => Some(Role::Primary),
"replica" => Some(Role::Replica),
_ => {
println!("> Config error, got unexpected query_router.default_role.");
return;
}
};
2022-02-05 13:15:53 -08:00
let server_info = match pool.validate().await {
Ok(info) => info,
Err(err) => {
println!("> Could not validate connection pool: {:?}", err);
return;
}
};
2022-02-08 09:25:59 -08:00
println!("> Waiting for clients...");
2022-02-03 16:25:05 -08:00
// Main app runs here.
tokio::task::spawn(async move {
loop {
let pool = pool.clone();
let client_server_map = client_server_map.clone();
let server_info = server_info.clone();
2022-02-03 13:35:40 -08:00
let (socket, addr) = match listener.accept().await {
Ok((socket, addr)) => (socket, addr),
2022-02-03 13:35:40 -08:00
Err(err) => {
println!("> Listener: {:?}", err);
continue;
2022-02-03 13:35:40 -08:00
}
};
// Client goes to another thread, bye.
tokio::task::spawn(async move {
let start = chrono::offset::Utc::now().naive_utc();
println!(">> Client {:?} connected", addr);
match client::Client::startup(
socket,
client_server_map,
transaction_mode,
default_server_role,
server_info,
)
.await
{
Ok(mut client) => {
println!(">> Client {:?} authenticated successfully!", addr);
match client.handle(pool).await {
Ok(()) => {
let duration = chrono::offset::Utc::now().naive_utc() - start;
println!(
">> Client {:?} disconnected, session duration: {}",
addr,
format_duration(&duration)
);
}
Err(err) => {
println!(">> Client disconnected with error: {:?}", err);
client.release();
}
}
}
Err(err) => {
println!(">> Error: {:?}", err);
}
};
});
}
});
// Setup shut down sequence
match signal::ctrl_c().await {
Ok(()) => {}
Err(err) => {
eprintln!("Unable to listen for shutdown signal: {}", err);
}
};
2022-02-03 13:35:40 -08:00
}
/// Format chrono::Duration to be more human-friendly.
///
/// # Arguments
///
/// * `duration` - A duration of time
fn format_duration(duration: &chrono::Duration) -> String {
let seconds = {
let seconds = duration.num_seconds() % 60;
if seconds < 10 {
format!("0{}", seconds)
} else {
format!("{}", seconds)
}
};
let minutes = {
let minutes = duration.num_minutes() % 60;
if minutes < 10 {
format!("0{}", minutes)
} else {
format!("{}", minutes)
}
};
let hours = {
let hours = duration.num_hours() % 24;
if hours < 10 {
format!("0{}", hours)
} else {
format!("{}", hours)
}
};
let days = duration.num_days().to_string();
format!("{}d {}:{}:{}", days, hours, minutes, seconds)
}