add default server role; bug fix

This commit is contained in:
Lev Kokotov
2022-02-11 11:19:40 -08:00
parent 595e564216
commit 0d369ab90a
5 changed files with 65 additions and 12 deletions

View File

@@ -69,3 +69,15 @@ servers = [
# [ "127.0.1.1", 5432, "replica" ],
]
database = "shard2"
# Settings for our query routing layer.
[query_router]
# If the client doesn't specify, route traffic to
# this role by default.
#
# any: round-robin between primary and replicas,
# replica: round-robin between replicas only without touching the primary,
# primary: all queries go to the primary unless otherwise specified.
default_role = "any"

View File

@@ -48,6 +48,10 @@ pub struct Client {
// Clients are mapped to servers while they use them. This allows a client
// to connect and cancel a query.
client_server_map: ClientServerMap,
// Unless client specifies, route queries to the servers that have this role,
// e.g. primary or replicas or any.
default_server_role: Option<Role>,
}
impl Client {
@@ -58,6 +62,7 @@ impl Client {
mut stream: TcpStream,
client_server_map: ClientServerMap,
transaction_mode: bool,
default_server_role: Option<Role>,
) -> Result<Client, Error> {
loop {
// Could be StartupMessage or SSLRequest
@@ -114,6 +119,7 @@ impl Client {
process_id: process_id,
secret_key: secret_key,
client_server_map: client_server_map,
default_server_role: default_server_role,
});
}
@@ -133,6 +139,7 @@ impl Client {
process_id: process_id,
secret_key: secret_key,
client_server_map: client_server_map,
default_server_role: default_server_role,
});
}
@@ -172,7 +179,7 @@ impl Client {
let mut shard: Option<usize> = None;
// Active database role we want to talk to, e.g. primary or replica.
let mut role: Option<Role> = None;
let mut role: Option<Role> = self.default_server_role;
loop {
// Read a complete message from the client, which normally would be
@@ -275,7 +282,7 @@ impl Client {
// Release server
if !server.in_transaction() && self.transaction_mode {
shard = None;
role = None;
role = self.default_server_role;
break;
}
}
@@ -338,7 +345,7 @@ impl Client {
// Release server
if !server.in_transaction() && self.transaction_mode {
shard = None;
role = None;
role = self.default_server_role;
break;
}
}
@@ -366,7 +373,7 @@ impl Client {
if !server.in_transaction() && self.transaction_mode {
println!("Releasing after copy done");
shard = None;
role = None;
role = self.default_server_role;
break;
}
}
@@ -382,7 +389,7 @@ impl Client {
}
/// Release the server from being mine. I can't cancel its queries anymore.
pub fn release(&mut self) {
pub fn release(&self) {
let mut guard = self.client_server_map.lock().unwrap();
guard.remove(&(self.process_id, self.secret_key));
}
@@ -390,7 +397,7 @@ impl Client {
/// Determine if the query is part of our special syntax, extract
/// the shard key, and return the shard to query based on Postgres'
/// PARTITION BY HASH function.
fn select_shard(&mut self, mut buf: BytesMut, shards: usize) -> Option<usize> {
fn select_shard(&self, mut buf: BytesMut, shards: usize) -> Option<usize> {
let code = buf.get_u8() as char;
// Only supporting simpe protocol here, so
@@ -425,7 +432,7 @@ impl Client {
}
// Pick a primary or a replica from the pool.
fn select_role(&mut self, mut buf: BytesMut) -> Option<Role> {
fn select_role(&self, mut buf: BytesMut) -> Option<Role> {
let code = buf.get_u8() as char;
// Same story as select_shard() above.

View File

@@ -43,11 +43,17 @@ pub struct Shard {
pub database: String,
}
#[derive(Deserialize, Debug, Clone)]
pub struct QueryRouter {
pub default_role: String,
}
#[derive(Deserialize, Debug, Clone)]
pub struct Config {
pub general: General,
pub user: User,
pub shards: HashMap<String, Shard>,
pub query_router: QueryRouter,
}
/// Parse the config.
@@ -118,6 +124,19 @@ pub async fn parse(path: &str) -> Result<Config, Error> {
}
}
match config.query_router.default_role.as_ref() {
"any" => (),
"primary" => (),
"replica" => (),
other => {
println!(
"> Query router default_role must be 'primary', 'replica', or 'any', got: '{}'",
other
);
return Err(Error::BadConfig);
}
};
Ok(config)
}
@@ -132,5 +151,6 @@ mod test {
assert_eq!(config.shards.len(), 3);
assert_eq!(config.shards["1"].servers[0].0, "127.0.0.1");
assert_eq!(config.shards["0"].servers[0].2, "primary");
assert_eq!(config.query_router.default_role, "any");
}
}

View File

@@ -40,6 +40,7 @@ mod sharding;
// Support for query cancellation: this maps our process_ids and
// secret keys to the backend's.
use config::Role;
use pool::{ClientServerMap, ConnectionPool};
/// Main!
@@ -87,6 +88,15 @@ async fn main() {
let pool = ConnectionPool::from_config(config.clone(), client_server_map.clone()).await;
let transaction_mode = config.general.pool_mode == "transaction";
let default_server_role = match config.query_router.default_role.as_ref() {
"any" => None,
"primary" => Some(Role::Primary),
"replica" => Some(Role::Replica),
_ => {
println!("> Config error, got unexpected query_router.default_role.");
return;
}
};
println!("> Waiting for clients...");
@@ -109,7 +119,14 @@ async fn main() {
addr, transaction_mode
);
match client::Client::startup(socket, client_server_map, transaction_mode).await {
match client::Client::startup(
socket,
client_server_map,
transaction_mode,
default_server_role,
)
.await
{
Ok(mut client) => {
println!(">> Client {:?} authenticated successfully!", addr);

View File

@@ -124,10 +124,7 @@ impl ConnectionPool {
// Make sure if a specific role is requested, it's available in the pool.
match role {
Some(role) => {
let role_count = addresses
.iter()
.filter(|&db| db.role == Role::Primary)
.count();
let role_count = addresses.iter().filter(|&db| db.role == role).count();
if role_count == 0 {
println!(