mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-27 18:56:30 +00:00
print session duration; connect to all servers when validating (#11)
This commit is contained in:
53
src/main.rs
53
src/main.rs
@@ -123,10 +123,9 @@ async fn main() {
|
|||||||
|
|
||||||
// Client goes to another thread, bye.
|
// Client goes to another thread, bye.
|
||||||
tokio::task::spawn(async move {
|
tokio::task::spawn(async move {
|
||||||
println!(
|
let start = chrono::offset::Utc::now().naive_utc();
|
||||||
">> Client {:?} connected, transaction pooling: {}",
|
|
||||||
addr, transaction_mode
|
println!(">> Client {:?} connected", addr);
|
||||||
);
|
|
||||||
|
|
||||||
match client::Client::startup(
|
match client::Client::startup(
|
||||||
socket,
|
socket,
|
||||||
@@ -142,7 +141,13 @@ async fn main() {
|
|||||||
|
|
||||||
match client.handle(pool).await {
|
match client.handle(pool).await {
|
||||||
Ok(()) => {
|
Ok(()) => {
|
||||||
println!(">> Client {:?} disconnected.", addr);
|
let duration = chrono::offset::Utc::now().naive_utc() - start;
|
||||||
|
|
||||||
|
println!(
|
||||||
|
">> Client {:?} disconnected, session duration: {}",
|
||||||
|
addr,
|
||||||
|
format_duration(&duration)
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
@@ -159,3 +164,41 @@ async fn main() {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Format chrono::Duration to be more human-friendly.
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
///
|
||||||
|
/// * `duration` - A duration of time
|
||||||
|
fn format_duration(duration: &chrono::Duration) -> String {
|
||||||
|
let seconds = {
|
||||||
|
let seconds = duration.num_seconds() % 60;
|
||||||
|
if seconds < 10 {
|
||||||
|
format!("0{}", seconds)
|
||||||
|
} else {
|
||||||
|
format!("{}", seconds)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let minutes = {
|
||||||
|
let minutes = duration.num_minutes() % 60;
|
||||||
|
if minutes < 10 {
|
||||||
|
format!("0{}", minutes)
|
||||||
|
} else {
|
||||||
|
format!("{}", minutes)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let hours = {
|
||||||
|
let hours = duration.num_hours() % 24;
|
||||||
|
if hours < 10 {
|
||||||
|
format!("0{}", hours)
|
||||||
|
} else {
|
||||||
|
format!("{}", hours)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let days = duration.num_days().to_string();
|
||||||
|
|
||||||
|
format!("{}d {}:{}:{}", days, hours, minutes, seconds)
|
||||||
|
}
|
||||||
|
|||||||
31
src/pool.rs
31
src/pool.rs
@@ -109,24 +109,27 @@ impl ConnectionPool {
|
|||||||
/// Connect to all shards and grab server information.
|
/// Connect to all shards and grab server information.
|
||||||
/// Return server information we will pass to the clients
|
/// Return server information we will pass to the clients
|
||||||
/// when they connect.
|
/// when they connect.
|
||||||
|
/// This also warms up the pool for clients that connect when
|
||||||
|
/// the pooler starts up.
|
||||||
pub async fn validate(&mut self) -> Result<BytesMut, Error> {
|
pub async fn validate(&mut self) -> Result<BytesMut, Error> {
|
||||||
let mut server_infos = Vec::new();
|
let mut server_infos = Vec::new();
|
||||||
|
|
||||||
for shard in 0..self.shards() {
|
for shard in 0..self.shards() {
|
||||||
// TODO: query all primary and replicas in the shard configuration.
|
for _ in 0..self.replicas(shard) {
|
||||||
let connection = match self.get(Some(shard), None).await {
|
let connection = match self.get(Some(shard), None).await {
|
||||||
Ok(conn) => conn,
|
Ok(conn) => conn,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
println!("> Shard {} down or misconfigured.", shard);
|
println!("> Shard {} down or misconfigured.", shard);
|
||||||
return Err(err);
|
return Err(err);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut proxy = connection.0;
|
let mut proxy = connection.0;
|
||||||
let _address = connection.1;
|
let _address = connection.1;
|
||||||
let server = &mut *proxy;
|
let server = &mut *proxy;
|
||||||
|
|
||||||
server_infos.push(server.server_info());
|
server_infos.push(server.server_info());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: compare server information to make sure
|
// TODO: compare server information to make sure
|
||||||
@@ -326,6 +329,10 @@ impl ConnectionPool {
|
|||||||
pub fn shards(&self) -> usize {
|
pub fn shards(&self) -> usize {
|
||||||
self.databases.len()
|
self.databases.len()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn replicas(&self, shard: usize) -> usize {
|
||||||
|
self.addresses[shard].len()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct ServerPool {
|
pub struct ServerPool {
|
||||||
|
|||||||
Reference in New Issue
Block a user