mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-28 03:06:29 +00:00
Compare commits
3 Commits
levkk-log-
...
levkk-drop
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d6a13d047d | ||
|
|
5948fef6cf | ||
|
|
790898c20e |
120
src/client.rs
120
src/client.rs
@@ -667,7 +667,6 @@ where
|
|||||||
.client_disconnecting(self.process_id, last_address_id);
|
.client_disconnecting(self.process_id, last_address_id);
|
||||||
}
|
}
|
||||||
self.stats.client_active(self.process_id, address.id);
|
self.stats.client_active(self.process_id, address.id);
|
||||||
self.stats.server_active(server.process_id(), address.id);
|
|
||||||
|
|
||||||
self.last_address_id = Some(address.id);
|
self.last_address_id = Some(address.id);
|
||||||
self.last_server_id = Some(server.process_id());
|
self.last_server_id = Some(server.process_id());
|
||||||
@@ -731,20 +730,9 @@ where
|
|||||||
'Q' => {
|
'Q' => {
|
||||||
debug!("Sending query to server");
|
debug!("Sending query to server");
|
||||||
|
|
||||||
self.send_server_message(
|
self.send_and_receive_loop(
|
||||||
server,
|
code,
|
||||||
original,
|
original,
|
||||||
&address,
|
|
||||||
query_router.shard(),
|
|
||||||
&pool,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
// Read all data the server has to offer, which can be multiple messages
|
|
||||||
// buffered in 8196 bytes chunks.
|
|
||||||
loop {
|
|
||||||
let response = self
|
|
||||||
.receive_server_message(
|
|
||||||
server,
|
server,
|
||||||
&address,
|
&address,
|
||||||
query_router.shard(),
|
query_router.shard(),
|
||||||
@@ -752,23 +740,6 @@ where
|
|||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
// Send server reply to the client.
|
|
||||||
match write_all_half(&mut self.write, response).await {
|
|
||||||
Ok(_) => (),
|
|
||||||
Err(err) => {
|
|
||||||
server.mark_bad();
|
|
||||||
return Err(err);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if !server.is_data_available() {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Report query executed statistics.
|
|
||||||
self.stats.query(self.process_id, address.id);
|
|
||||||
|
|
||||||
if !server.in_transaction() {
|
if !server.in_transaction() {
|
||||||
// Report transaction executed statistics.
|
// Report transaction executed statistics.
|
||||||
self.stats.transaction(self.process_id, address.id);
|
self.stats.transaction(self.process_id, address.id);
|
||||||
@@ -776,7 +747,6 @@ where
|
|||||||
// Release server back to the pool if we are in transaction mode.
|
// Release server back to the pool if we are in transaction mode.
|
||||||
// If we are in session mode, we keep the server until the client disconnects.
|
// If we are in session mode, we keep the server until the client disconnects.
|
||||||
if self.transaction_mode {
|
if self.transaction_mode {
|
||||||
self.stats.server_idle(server.process_id(), address.id);
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -789,9 +759,13 @@ where
|
|||||||
// Pgbouncer closes the connection which leads to
|
// Pgbouncer closes the connection which leads to
|
||||||
// connection thrashing when clients misbehave.
|
// connection thrashing when clients misbehave.
|
||||||
if server.in_transaction() {
|
if server.in_transaction() {
|
||||||
server.query("ROLLBACK").await?;
|
// server.query("ROLLBACK").await?;
|
||||||
server.query("DISCARD ALL").await?;
|
// server.query("DISCARD ALL").await?;
|
||||||
server.set_name("pgcat").await?;
|
// server.set_name("pgcat").await?;
|
||||||
|
|
||||||
|
// TODO: Figure out a clever way to ensure
|
||||||
|
// the server has no more messages for us.
|
||||||
|
server.mark_bad();
|
||||||
}
|
}
|
||||||
|
|
||||||
self.release();
|
self.release();
|
||||||
@@ -830,9 +804,10 @@ where
|
|||||||
|
|
||||||
self.buffer.put(&original[..]);
|
self.buffer.put(&original[..]);
|
||||||
|
|
||||||
self.send_server_message(
|
self.send_and_receive_loop(
|
||||||
server,
|
code,
|
||||||
self.buffer.clone(),
|
self.buffer.clone(),
|
||||||
|
server,
|
||||||
&address,
|
&address,
|
||||||
query_router.shard(),
|
query_router.shard(),
|
||||||
&pool,
|
&pool,
|
||||||
@@ -841,41 +816,12 @@ where
|
|||||||
|
|
||||||
self.buffer.clear();
|
self.buffer.clear();
|
||||||
|
|
||||||
// Read all data the server has to offer, which can be multiple messages
|
|
||||||
// buffered in 8196 bytes chunks.
|
|
||||||
loop {
|
|
||||||
let response = self
|
|
||||||
.receive_server_message(
|
|
||||||
server,
|
|
||||||
&address,
|
|
||||||
query_router.shard(),
|
|
||||||
&pool,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
match write_all_half(&mut self.write, response).await {
|
|
||||||
Ok(_) => (),
|
|
||||||
Err(err) => {
|
|
||||||
server.mark_bad();
|
|
||||||
return Err(err);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if !server.is_data_available() {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Report query executed statistics.
|
|
||||||
self.stats.query(self.process_id, address.id);
|
|
||||||
|
|
||||||
if !server.in_transaction() {
|
if !server.in_transaction() {
|
||||||
self.stats.transaction(self.process_id, address.id);
|
self.stats.transaction(self.process_id, address.id);
|
||||||
|
|
||||||
// Release server back to the pool if we are in transaction mode.
|
// Release server back to the pool if we are in transaction mode.
|
||||||
// If we are in session mode, we keep the server until the client disconnects.
|
// If we are in session mode, we keep the server until the client disconnects.
|
||||||
if self.transaction_mode {
|
if self.transaction_mode {
|
||||||
self.stats.server_idle(server.process_id(), address.id);
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -925,7 +871,6 @@ where
|
|||||||
// Release server back to the pool if we are in transaction mode.
|
// Release server back to the pool if we are in transaction mode.
|
||||||
// If we are in session mode, we keep the server until the client disconnects.
|
// If we are in session mode, we keep the server until the client disconnects.
|
||||||
if self.transaction_mode {
|
if self.transaction_mode {
|
||||||
self.stats.server_idle(server.process_id(), address.id);
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -941,6 +886,7 @@ where
|
|||||||
|
|
||||||
// The server is no longer bound to us, we can't cancel it's queries anymore.
|
// The server is no longer bound to us, we can't cancel it's queries anymore.
|
||||||
debug!("Releasing server back into the pool");
|
debug!("Releasing server back into the pool");
|
||||||
|
self.stats.server_idle(server.process_id(), address.id);
|
||||||
self.release();
|
self.release();
|
||||||
self.stats.client_idle(self.process_id, address.id);
|
self.stats.client_idle(self.process_id, address.id);
|
||||||
}
|
}
|
||||||
@@ -952,6 +898,46 @@ where
|
|||||||
guard.remove(&(self.process_id, self.secret_key));
|
guard.remove(&(self.process_id, self.secret_key));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn send_and_receive_loop(
|
||||||
|
&mut self,
|
||||||
|
code: char,
|
||||||
|
message: BytesMut,
|
||||||
|
server: &mut Server,
|
||||||
|
address: &Address,
|
||||||
|
shard: usize,
|
||||||
|
pool: &ConnectionPool,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
debug!("Sending {} to server", code);
|
||||||
|
|
||||||
|
self.send_server_message(server, message, &address, shard, &pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// Read all data the server has to offer, which can be multiple messages
|
||||||
|
// buffered in 8196 bytes chunks.
|
||||||
|
loop {
|
||||||
|
let response = self
|
||||||
|
.receive_server_message(server, &address, shard, &pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
match write_all_half(&mut self.write, response).await {
|
||||||
|
Ok(_) => (),
|
||||||
|
Err(err) => {
|
||||||
|
server.mark_bad();
|
||||||
|
return Err(err);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if !server.is_data_available() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Report query executed statistics.
|
||||||
|
self.stats.query(self.process_id, address.id);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
async fn send_server_message(
|
async fn send_server_message(
|
||||||
&self,
|
&self,
|
||||||
server: &mut Server,
|
server: &mut Server,
|
||||||
|
|||||||
@@ -64,6 +64,8 @@ pub struct Address {
|
|||||||
pub database: String,
|
pub database: String,
|
||||||
pub role: Role,
|
pub role: Role,
|
||||||
pub replica_number: usize,
|
pub replica_number: usize,
|
||||||
|
pub username: String,
|
||||||
|
pub poolname: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for Address {
|
impl Default for Address {
|
||||||
@@ -76,6 +78,8 @@ impl Default for Address {
|
|||||||
replica_number: 0,
|
replica_number: 0,
|
||||||
database: String::from("database"),
|
database: String::from("database"),
|
||||||
role: Role::Replica,
|
role: Role::Replica,
|
||||||
|
username: String::from("username"),
|
||||||
|
poolname: String::from("poolname"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -84,11 +88,11 @@ impl Address {
|
|||||||
/// Address name (aka database) used in `SHOW STATS`, `SHOW DATABASES`, and `SHOW POOLS`.
|
/// Address name (aka database) used in `SHOW STATS`, `SHOW DATABASES`, and `SHOW POOLS`.
|
||||||
pub fn name(&self) -> String {
|
pub fn name(&self) -> String {
|
||||||
match self.role {
|
match self.role {
|
||||||
Role::Primary => format!("{}_shard_{}_primary", self.database, self.shard),
|
Role::Primary => format!("{}_shard_{}_primary", self.poolname, self.shard),
|
||||||
|
|
||||||
Role::Replica => format!(
|
Role::Replica => format!(
|
||||||
"{}_shard_{}_replica_{}",
|
"{}_shard_{}_replica_{}",
|
||||||
self.database, self.shard, self.replica_number
|
self.poolname, self.shard, self.replica_number
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -114,12 +114,14 @@ impl ConnectionPool {
|
|||||||
|
|
||||||
let address = Address {
|
let address = Address {
|
||||||
id: address_id,
|
id: address_id,
|
||||||
database: pool_name.clone(),
|
database: shard.database.clone(),
|
||||||
host: server.0.clone(),
|
host: server.0.clone(),
|
||||||
port: server.1.to_string(),
|
port: server.1.to_string(),
|
||||||
role: role,
|
role: role,
|
||||||
replica_number,
|
replica_number,
|
||||||
shard: shard_idx.parse::<usize>().unwrap(),
|
shard: shard_idx.parse::<usize>().unwrap(),
|
||||||
|
username: user_info.username.clone(),
|
||||||
|
poolname: pool_name.clone(),
|
||||||
};
|
};
|
||||||
|
|
||||||
address_id += 1;
|
address_id += 1;
|
||||||
@@ -333,7 +335,7 @@ impl ConnectionPool {
|
|||||||
if !require_healthcheck {
|
if !require_healthcheck {
|
||||||
self.stats
|
self.stats
|
||||||
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
|
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
|
||||||
self.stats.server_idle(conn.process_id(), address.id);
|
self.stats.server_active(conn.process_id(), address.id);
|
||||||
return Ok((conn, address.clone()));
|
return Ok((conn, address.clone()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -352,7 +354,7 @@ impl ConnectionPool {
|
|||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
self.stats
|
self.stats
|
||||||
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
|
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
|
||||||
self.stats.server_idle(conn.process_id(), address.id);
|
self.stats.server_active(conn.process_id(), address.id);
|
||||||
return Ok((conn, address.clone()));
|
return Ok((conn, address.clone()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user