Compare commits

...

1 Commits

Author SHA1 Message Date
Lev Kokotov
cb55802917 checkpoint 2022-09-12 13:50:39 -04:00
2 changed files with 184 additions and 150 deletions

View File

@@ -88,6 +88,9 @@ pub struct Client<S, T> {
/// Used to notify clients about an impending shutdown /// Used to notify clients about an impending shutdown
shutdown: Receiver<()>, shutdown: Receiver<()>,
// Sharding key column position
sharding_key_column: Option<usize>,
} }
/// Client entrypoint. /// Client entrypoint.
@@ -505,6 +508,7 @@ where
application_name: application_name.to_string(), application_name: application_name.to_string(),
shutdown, shutdown,
connected_to_server: false, connected_to_server: false,
sharding_key_column: None,
}); });
} }
@@ -539,6 +543,7 @@ where
application_name: String::from("undefined"), application_name: String::from("undefined"),
shutdown, shutdown,
connected_to_server: false, connected_to_server: false,
sharding_key_column: None,
}); });
} }
@@ -724,6 +729,13 @@ where
show_response(&mut self.write, "primary reads", &value).await?; show_response(&mut self.write, "primary reads", &value).await?;
continue; continue;
} }
// COPY .. SHARDING_KEY_COLUMN ..
Some((Command::StartShardedCopy, value)) => {
custom_protocol_response_ok(&mut self.write, "SHARDED_COPY").await?;
self.sharding_key_column = Some(value.parse::<usize>().unwrap());
continue;
}
}; };
debug!("Waiting for connection from pool"); debug!("Waiting for connection from pool");
@@ -792,7 +804,7 @@ where
// If the client is in session mode, no more custom protocol // If the client is in session mode, no more custom protocol
// commands will be accepted. // commands will be accepted.
loop { loop {
let mut message = if message.len() == 0 { let message = if message.len() == 0 {
trace!("Waiting for message inside transaction or in session mode"); trace!("Waiting for message inside transaction or in session mode");
match read_message(&mut self.read).await { match read_message(&mut self.read).await {
@@ -811,10 +823,26 @@ where
msg msg
}; };
// The message will be forwarded to the server intact. We still would like to match self.handle_message(&pool, server, &address, message).await? {
// parse it below to figure out what to do with it. Some(done) => if done { break; },
let original = message.clone(); None => return Ok(()),
};
}
// The server is no longer bound to us, we can't cancel it's queries anymore.
debug!("Releasing server back into the pool");
server.checkin_cleanup().await?;
self.stats.server_idle(server.process_id(), address.id);
self.connected_to_server = false;
self.release();
self.stats.client_idle(self.process_id, address.id);
}
}
async fn handle_message(&mut self, pool: &ConnectionPool, server: &mut Server, address: &Address, mut message: BytesMut) -> Result<Option<bool>, Error> {
let original = message.clone();
let code = message.get_u8() as char; let code = message.get_u8() as char;
let _len = message.get_i32() as usize; let _len = message.get_i32() as usize;
@@ -835,7 +863,7 @@ where
// Release server back to the pool if we are in transaction mode. // Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects. // If we are in session mode, we keep the server until the client disconnects.
if self.transaction_mode { if self.transaction_mode {
break; return Ok(Some(true));
} }
} }
} }
@@ -845,7 +873,7 @@ where
server.checkin_cleanup().await?; server.checkin_cleanup().await?;
self.release(); self.release();
return Ok(()); return Ok(None);
} }
// Parse // Parse
@@ -913,7 +941,7 @@ where
// Release server back to the pool if we are in transaction mode. // Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects. // If we are in session mode, we keep the server until the client disconnects.
if self.transaction_mode { if self.transaction_mode {
break; return Ok(Some(true));
} }
} }
} }
@@ -948,7 +976,7 @@ where
// Release server back to the pool if we are in transaction mode. // Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects. // If we are in session mode, we keep the server until the client disconnects.
if self.transaction_mode { if self.transaction_mode {
break; return Ok(Some(true));
} }
} }
} }
@@ -958,18 +986,9 @@ where
_ => { _ => {
error!("Unexpected code: {}", code); error!("Unexpected code: {}", code);
} }
} };
}
// The server is no longer bound to us, we can't cancel it's queries anymore. Ok(Some(false))
debug!("Releasing server back into the pool");
server.checkin_cleanup().await?;
self.stats.server_idle(server.process_id(), address.id);
self.connected_to_server = false;
self.release();
self.stats.client_idle(self.process_id, address.id);
}
} }
/// Release the server from the client: it can't cancel its queries anymore. /// Release the server from the client: it can't cancel its queries anymore.

View File

@@ -13,7 +13,7 @@ use crate::pool::PoolSettings;
use crate::sharding::Sharder; use crate::sharding::Sharder;
/// Regexes used to parse custom commands. /// Regexes used to parse custom commands.
const CUSTOM_SQL_REGEXES: [&str; 7] = [ const CUSTOM_SQL_REGEXES: [&str; 8] = [
r"(?i)^ *SET SHARDING KEY TO '?([0-9]+)'? *;? *$", r"(?i)^ *SET SHARDING KEY TO '?([0-9]+)'? *;? *$",
r"(?i)^ *SET SHARD TO '?([0-9]+|ANY)'? *;? *$", r"(?i)^ *SET SHARD TO '?([0-9]+|ANY)'? *;? *$",
r"(?i)^ *SHOW SHARD *;? *$", r"(?i)^ *SHOW SHARD *;? *$",
@@ -21,6 +21,7 @@ const CUSTOM_SQL_REGEXES: [&str; 7] = [
r"(?i)^ *SHOW SERVER ROLE *;? *$", r"(?i)^ *SHOW SERVER ROLE *;? *$",
r"(?i)^ *SET PRIMARY READS TO '?(on|off|default)'? *;? *$", r"(?i)^ *SET PRIMARY READS TO '?(on|off|default)'? *;? *$",
r"(?i)^ *SHOW PRIMARY READS *;? *$", r"(?i)^ *SHOW PRIMARY READS *;? *$",
r"(?i)^ *SHARDED_COPY '?([0-9]+)'? *;? *$",
]; ];
/// Custom commands. /// Custom commands.
@@ -33,6 +34,7 @@ pub enum Command {
ShowServerRole, ShowServerRole,
SetPrimaryReads, SetPrimaryReads,
ShowPrimaryReads, ShowPrimaryReads,
StartShardedCopy,
} }
/// Quickly test for match when a query is received. /// Quickly test for match when a query is received.
@@ -57,6 +59,9 @@ pub struct QueryRouter {
/// Pool configuration. /// Pool configuration.
pool_settings: PoolSettings, pool_settings: PoolSettings,
// Sharding key column
sharding_key_column: Option<usize>,
} }
impl QueryRouter { impl QueryRouter {
@@ -98,6 +103,7 @@ impl QueryRouter {
query_parser_enabled: false, query_parser_enabled: false,
primary_reads_enabled: false, primary_reads_enabled: false,
pool_settings: PoolSettings::default(), pool_settings: PoolSettings::default(),
sharding_key_column: None,
} }
} }
@@ -145,6 +151,7 @@ impl QueryRouter {
4 => Command::ShowServerRole, 4 => Command::ShowServerRole,
5 => Command::SetPrimaryReads, 5 => Command::SetPrimaryReads,
6 => Command::ShowPrimaryReads, 6 => Command::ShowPrimaryReads,
7 => Command::StartShardedCopy,
_ => unreachable!(), _ => unreachable!(),
}; };
@@ -152,7 +159,8 @@ impl QueryRouter {
Command::SetShardingKey Command::SetShardingKey
| Command::SetShard | Command::SetShard
| Command::SetServerRole | Command::SetServerRole
| Command::SetPrimaryReads => { | Command::SetPrimaryReads
| Command::StartShardedCopy => {
// Capture value. I know this re-runs the regex engine, but I haven't // Capture value. I know this re-runs the regex engine, but I haven't
// figured out a better way just yet. I think I can write a single Regex // figured out a better way just yet. I think I can write a single Regex
// that matches all 5 custom SQL patterns, but maybe that's not very legible? // that matches all 5 custom SQL patterns, but maybe that's not very legible?
@@ -204,6 +212,13 @@ impl QueryRouter {
}; };
} }
Command::StartShardedCopy => {
self.sharding_key_column = match value.parse::<usize>() {
Ok(value) => Some(value),
Err(_) => return None,
}
}
Command::SetServerRole => { Command::SetServerRole => {
self.active_role = match value.to_ascii_lowercase().as_ref() { self.active_role = match value.to_ascii_lowercase().as_ref() {
"primary" => { "primary" => {