mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-25 02:06:28 +00:00
"query cancellation"
This commit is contained in:
@@ -10,6 +10,7 @@ use tokio::net::TcpStream;
|
||||
|
||||
use crate::errors::Error;
|
||||
use crate::messages::*;
|
||||
use crate::ClientServerMap;
|
||||
|
||||
/// Server state.
|
||||
pub struct Server {
|
||||
@@ -22,6 +23,7 @@ pub struct Server {
|
||||
in_transaction: bool,
|
||||
data_available: bool,
|
||||
bad: bool,
|
||||
client_server_map: ClientServerMap,
|
||||
}
|
||||
|
||||
impl Server {
|
||||
@@ -33,6 +35,7 @@ impl Server {
|
||||
user: &str,
|
||||
password: &str,
|
||||
database: &str,
|
||||
client_server_map: ClientServerMap,
|
||||
) -> Result<Server, Error> {
|
||||
let mut stream = match TcpStream::connect(&format!("{}:{}", host, port)).await {
|
||||
Ok(stream) => stream,
|
||||
@@ -144,6 +147,7 @@ impl Server {
|
||||
in_transaction: false,
|
||||
data_available: false,
|
||||
bad: false,
|
||||
client_server_map: client_server_map,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -155,6 +159,31 @@ impl Server {
|
||||
}
|
||||
}
|
||||
|
||||
/// Issue a cancellation request to the server.
|
||||
/// Uses a separate connection that's not part of the connection pool.
|
||||
pub async fn cancel(
|
||||
host: &str,
|
||||
port: &str,
|
||||
process_id: i32,
|
||||
secret_key: i32,
|
||||
) -> Result<(), Error> {
|
||||
let mut stream = match TcpStream::connect(&format!("{}:{}", host, port)).await {
|
||||
Ok(stream) => stream,
|
||||
Err(err) => {
|
||||
println!(">> Could not connect to server: {}", err);
|
||||
return Err(Error::SocketError);
|
||||
}
|
||||
};
|
||||
|
||||
let mut bytes = BytesMut::with_capacity(16);
|
||||
bytes.put_i32(16);
|
||||
bytes.put_i32(80877102);
|
||||
bytes.put_i32(process_id);
|
||||
bytes.put_i32(secret_key);
|
||||
|
||||
Ok(write_all(&mut stream, bytes).await?)
|
||||
}
|
||||
|
||||
/// Send data to the server from the client.
|
||||
pub async fn send(&mut self, messages: BytesMut) -> Result<(), Error> {
|
||||
match write_all_half(&mut self.write, messages).await {
|
||||
@@ -276,6 +305,11 @@ impl Server {
|
||||
self.bad = true;
|
||||
}
|
||||
|
||||
pub fn claim(&mut self, process_id: i32, secret_key: i32) {
|
||||
let mut guard = self.client_server_map.lock().unwrap();
|
||||
guard.insert((process_id, secret_key), (self.backend_id, self.secret_key));
|
||||
}
|
||||
|
||||
/// Execute an arbitrary query against the server.
|
||||
/// It will use the Simple query protocol.
|
||||
/// Result will not be returned, so this is useful for things like `SET` or `ROLLBACK`.
|
||||
|
||||
Reference in New Issue
Block a user