mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-27 10:46:30 +00:00
sharded query routing
This commit is contained in:
27
Cargo.lock
generated
27
Cargo.lock
generated
@@ -2,6 +2,15 @@
|
|||||||
# It is not intended for manual editing.
|
# It is not intended for manual editing.
|
||||||
version = 3
|
version = 3
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "aho-corasick"
|
||||||
|
version = "0.7.18"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "1e37cfd5e7657ada45f742d6e99ca5788580b5c529dc78faf11ece6dc702656f"
|
||||||
|
dependencies = [
|
||||||
|
"memchr",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "async-trait"
|
name = "async-trait"
|
||||||
version = "0.1.52"
|
version = "0.1.52"
|
||||||
@@ -315,6 +324,7 @@ dependencies = [
|
|||||||
"chrono",
|
"chrono",
|
||||||
"md-5",
|
"md-5",
|
||||||
"rand",
|
"rand",
|
||||||
|
"regex",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_derive",
|
"serde_derive",
|
||||||
"sha-1",
|
"sha-1",
|
||||||
@@ -407,6 +417,23 @@ dependencies = [
|
|||||||
"bitflags",
|
"bitflags",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "regex"
|
||||||
|
version = "1.5.4"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461"
|
||||||
|
dependencies = [
|
||||||
|
"aho-corasick",
|
||||||
|
"memchr",
|
||||||
|
"regex-syntax",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "regex-syntax"
|
||||||
|
version = "0.6.25"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "scopeguard"
|
name = "scopeguard"
|
||||||
version = "1.1.0"
|
version = "1.1.0"
|
||||||
|
|||||||
@@ -16,4 +16,5 @@ chrono = "0.4"
|
|||||||
sha-1 = "*"
|
sha-1 = "*"
|
||||||
toml = "*"
|
toml = "*"
|
||||||
serde = "*"
|
serde = "*"
|
||||||
serde_derive = "*"
|
serde_derive = "*"
|
||||||
|
regex = "1"
|
||||||
@@ -3,6 +3,7 @@
|
|||||||
/// and this module implements that.
|
/// and this module implements that.
|
||||||
use bytes::{Buf, BufMut, BytesMut};
|
use bytes::{Buf, BufMut, BytesMut};
|
||||||
use rand::{distributions::Alphanumeric, Rng};
|
use rand::{distributions::Alphanumeric, Rng};
|
||||||
|
use regex::Regex;
|
||||||
use tokio::io::{AsyncReadExt, BufReader};
|
use tokio::io::{AsyncReadExt, BufReader};
|
||||||
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
|
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
|
||||||
use tokio::net::TcpStream;
|
use tokio::net::TcpStream;
|
||||||
@@ -11,6 +12,9 @@ use crate::errors::Error;
|
|||||||
use crate::messages::*;
|
use crate::messages::*;
|
||||||
use crate::pool::{ClientServerMap, ConnectionPool};
|
use crate::pool::{ClientServerMap, ConnectionPool};
|
||||||
use crate::server::Server;
|
use crate::server::Server;
|
||||||
|
use crate::sharding::Sharder;
|
||||||
|
|
||||||
|
const SHARDING_REGEX: &str = r"SET SHARDING KEY TO '[0-9]+';";
|
||||||
|
|
||||||
/// The client state. One of these is created per client.
|
/// The client state. One of these is created per client.
|
||||||
pub struct Client {
|
pub struct Client {
|
||||||
@@ -39,6 +43,9 @@ pub struct Client {
|
|||||||
// Clients are mapped to servers while they use them. This allows a client
|
// Clients are mapped to servers while they use them. This allows a client
|
||||||
// to connect and cancel a query.
|
// to connect and cancel a query.
|
||||||
client_server_map: ClientServerMap,
|
client_server_map: ClientServerMap,
|
||||||
|
|
||||||
|
// sharding regex
|
||||||
|
sharding_regex: Regex,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Client {
|
impl Client {
|
||||||
@@ -50,6 +57,8 @@ impl Client {
|
|||||||
client_server_map: ClientServerMap,
|
client_server_map: ClientServerMap,
|
||||||
transaction_mode: bool,
|
transaction_mode: bool,
|
||||||
) -> Result<Client, Error> {
|
) -> Result<Client, Error> {
|
||||||
|
let sharding_regex = Regex::new(SHARDING_REGEX).unwrap();
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
// Could be StartupMessage or SSLRequest
|
// Could be StartupMessage or SSLRequest
|
||||||
// which makes this variable length.
|
// which makes this variable length.
|
||||||
@@ -105,6 +114,7 @@ impl Client {
|
|||||||
process_id: process_id,
|
process_id: process_id,
|
||||||
secret_key: secret_key,
|
secret_key: secret_key,
|
||||||
client_server_map: client_server_map,
|
client_server_map: client_server_map,
|
||||||
|
sharding_regex: sharding_regex,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -124,6 +134,7 @@ impl Client {
|
|||||||
process_id: process_id,
|
process_id: process_id,
|
||||||
secret_key: secret_key,
|
secret_key: secret_key,
|
||||||
client_server_map: client_server_map,
|
client_server_map: client_server_map,
|
||||||
|
sharding_regex: sharding_regex,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -156,6 +167,12 @@ impl Client {
|
|||||||
return Ok(Server::cancel(&address, &port, process_id, secret_key).await?);
|
return Ok(Server::cancel(&address, &port, process_id, secret_key).await?);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Active shard we're talking to.
|
||||||
|
// The lifetime of this depends on the pool mode:
|
||||||
|
// - if in session mode, this lives until client disconnects or changes it,
|
||||||
|
// - if in transaction mode, this lives for the duration of one transaction.
|
||||||
|
let mut shard: Option<usize> = None;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
// Read a complete message from the client, which normally would be
|
// Read a complete message from the client, which normally would be
|
||||||
// either a `Q` (query) or `P` (prepare, extended protocol).
|
// either a `Q` (query) or `P` (prepare, extended protocol).
|
||||||
@@ -164,15 +181,23 @@ impl Client {
|
|||||||
// SET sharding_context.key = '1234';
|
// SET sharding_context.key = '1234';
|
||||||
let mut message = read_message(&mut self.read).await?;
|
let mut message = read_message(&mut self.read).await?;
|
||||||
|
|
||||||
// TODO: parse the message here. If it's part of our protocol,
|
// Parse for special select shard command.
|
||||||
// don't grab a server yet and continue loop.
|
// SET SHARDING KEY TO 'bigint';
|
||||||
|
match self.select_shard(message.clone(), pool.shards()).await {
|
||||||
|
Some(s) => {
|
||||||
|
set_sharding_key(&mut self.write).await?;
|
||||||
|
shard = Some(s);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
None => (),
|
||||||
|
};
|
||||||
|
|
||||||
// The message is part of the regular protocol.
|
// The message is part of the regular protocol.
|
||||||
// self.buffer.put(message);
|
// self.buffer.put(message);
|
||||||
|
|
||||||
// Grab a server from the pool.
|
// Grab a server from the pool.
|
||||||
// None = any shard
|
// None = any shard
|
||||||
let connection = pool.get(None).await.unwrap();
|
let connection = pool.get(shard).await.unwrap();
|
||||||
let mut proxy = connection.0;
|
let mut proxy = connection.0;
|
||||||
let _address = connection.1;
|
let _address = connection.1;
|
||||||
let server = &mut *proxy;
|
let server = &mut *proxy;
|
||||||
@@ -230,6 +255,7 @@ impl Client {
|
|||||||
|
|
||||||
// Release server
|
// Release server
|
||||||
if !server.in_transaction() && self.transaction_mode {
|
if !server.in_transaction() && self.transaction_mode {
|
||||||
|
shard = None;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -288,6 +314,7 @@ impl Client {
|
|||||||
|
|
||||||
// Release server
|
// Release server
|
||||||
if !server.in_transaction() && self.transaction_mode {
|
if !server.in_transaction() && self.transaction_mode {
|
||||||
|
shard = None;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -314,6 +341,7 @@ impl Client {
|
|||||||
// Release the server
|
// Release the server
|
||||||
if !server.in_transaction() && self.transaction_mode {
|
if !server.in_transaction() && self.transaction_mode {
|
||||||
println!("Releasing after copy done");
|
println!("Releasing after copy done");
|
||||||
|
shard = None;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -333,4 +361,30 @@ impl Client {
|
|||||||
let mut guard = self.client_server_map.lock().unwrap();
|
let mut guard = self.client_server_map.lock().unwrap();
|
||||||
guard.remove(&(self.process_id, self.secret_key));
|
guard.remove(&(self.process_id, self.secret_key));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn select_shard(&mut self, mut buf: BytesMut, shards: usize) -> Option<usize> {
|
||||||
|
let code = buf.get_u8() as char;
|
||||||
|
|
||||||
|
match code {
|
||||||
|
'Q' => (),
|
||||||
|
// 'P' => (),
|
||||||
|
_ => return None,
|
||||||
|
};
|
||||||
|
|
||||||
|
let len = buf.get_i32();
|
||||||
|
let query = String::from_utf8_lossy(&buf[..len as usize - 4 - 1]).to_ascii_uppercase(); // Don't read the ternminating null
|
||||||
|
|
||||||
|
if self.sharding_regex.is_match(&query) {
|
||||||
|
let shard = query.split("'").collect::<Vec<&str>>()[1];
|
||||||
|
match shard.parse::<i64>() {
|
||||||
|
Ok(shard) => {
|
||||||
|
let sharder = Sharder::new(shards);
|
||||||
|
Some(sharder.pg_bigint_hash(shard))
|
||||||
|
}
|
||||||
|
Err(_) => None,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -138,6 +138,23 @@ pub async fn md5_password(
|
|||||||
Ok(write_all(stream, message).await?)
|
Ok(write_all(stream, message).await?)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn set_sharding_key(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
|
||||||
|
let mut res = BytesMut::with_capacity(25);
|
||||||
|
|
||||||
|
let set_complete = BytesMut::from(&"SET SHARDING KEY\0"[..]);
|
||||||
|
let len = (set_complete.len() + 4) as i32;
|
||||||
|
|
||||||
|
res.put_u8(b'C');
|
||||||
|
res.put_i32(len);
|
||||||
|
res.put_slice(&set_complete[..]);
|
||||||
|
|
||||||
|
res.put_u8(b'Z');
|
||||||
|
res.put_i32(5);
|
||||||
|
res.put_u8(b'I');
|
||||||
|
|
||||||
|
write_all_half(stream, res).await
|
||||||
|
}
|
||||||
|
|
||||||
/// Write all data in the buffer to the TcpStream.
|
/// Write all data in the buffer to the TcpStream.
|
||||||
pub async fn write_all(stream: &mut TcpStream, buf: BytesMut) -> Result<(), Error> {
|
pub async fn write_all(stream: &mut TcpStream, buf: BytesMut) -> Result<(), Error> {
|
||||||
match stream.write_all(&buf).await {
|
match stream.write_all(&buf).await {
|
||||||
|
|||||||
Reference in New Issue
Block a user