mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-23 01:16:30 +00:00
COPY OUT support
This commit is contained in:
@@ -118,14 +118,21 @@ impl Client {
|
||||
match code {
|
||||
'Q' => {
|
||||
server.send(original).await?;
|
||||
let response = server.recv().await?;
|
||||
match write_all_half(&mut self.write, response).await {
|
||||
Ok(_) => (),
|
||||
Err(err) => {
|
||||
server.mark_bad();
|
||||
return Err(err);
|
||||
|
||||
loop {
|
||||
let response = server.recv().await?;
|
||||
match write_all_half(&mut self.write, response).await {
|
||||
Ok(_) => (),
|
||||
Err(err) => {
|
||||
server.mark_bad();
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
|
||||
if !server.is_data_available() {
|
||||
break;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// Release server
|
||||
if !server.in_transaction() {
|
||||
@@ -166,14 +173,20 @@ impl Client {
|
||||
server.send(self.buffer.clone()).await?;
|
||||
self.buffer.clear();
|
||||
|
||||
let response = server.recv().await?;
|
||||
match write_all_half(&mut self.write, response).await {
|
||||
Ok(_) => (),
|
||||
Err(err) => {
|
||||
server.mark_bad();
|
||||
return Err(err);
|
||||
loop {
|
||||
let response = server.recv().await?;
|
||||
match write_all_half(&mut self.write, response).await {
|
||||
Ok(_) => (),
|
||||
Err(err) => {
|
||||
server.mark_bad();
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
|
||||
if !server.is_data_available() {
|
||||
break;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// Release server
|
||||
if !server.in_transaction() {
|
||||
|
||||
@@ -17,6 +17,7 @@ pub struct Server {
|
||||
backend_id: i32,
|
||||
secret_key: i32,
|
||||
in_transaction: bool,
|
||||
data_available: bool,
|
||||
bad: bool,
|
||||
}
|
||||
|
||||
@@ -136,6 +137,7 @@ impl Server {
|
||||
backend_id: backend_id,
|
||||
secret_key: secret_key,
|
||||
in_transaction: false,
|
||||
data_available: false,
|
||||
bad: false,
|
||||
});
|
||||
}
|
||||
@@ -208,6 +210,21 @@ impl Server {
|
||||
// CopyInResponse: copy is starting from client to server
|
||||
'G' => break,
|
||||
|
||||
// CopyOutResponse: copy is starting from the server to the client
|
||||
'H' => {
|
||||
self.data_available = true;
|
||||
break;
|
||||
}
|
||||
|
||||
// CopyData
|
||||
'd' => break,
|
||||
|
||||
// CopyDone
|
||||
'c' => {
|
||||
self.data_available = false;
|
||||
// Buffer until ReadyForQuery shows up
|
||||
}
|
||||
|
||||
_ => {
|
||||
// Keep buffering,
|
||||
}
|
||||
@@ -224,6 +241,10 @@ impl Server {
|
||||
self.in_transaction
|
||||
}
|
||||
|
||||
pub fn is_data_available(&self) -> bool {
|
||||
self.data_available
|
||||
}
|
||||
|
||||
pub fn is_bad(&self) -> bool {
|
||||
self.bad
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user