checkpoint

This commit is contained in:
Lev Kokotov
2022-09-12 13:50:39 -04:00
parent 075167431d
commit cb55802917
2 changed files with 184 additions and 150 deletions

View File

@@ -88,6 +88,9 @@ pub struct Client<S, T> {
/// Used to notify clients about an impending shutdown
shutdown: Receiver<()>,
// Sharding key column position
sharding_key_column: Option<usize>,
}
/// Client entrypoint.
@@ -505,6 +508,7 @@ where
application_name: application_name.to_string(),
shutdown,
connected_to_server: false,
sharding_key_column: None,
});
}
@@ -539,6 +543,7 @@ where
application_name: String::from("undefined"),
shutdown,
connected_to_server: false,
sharding_key_column: None,
});
}
@@ -724,6 +729,13 @@ where
show_response(&mut self.write, "primary reads", &value).await?;
continue;
}
// COPY .. SHARDING_KEY_COLUMN ..
Some((Command::StartShardedCopy, value)) => {
custom_protocol_response_ok(&mut self.write, "SHARDED_COPY").await?;
self.sharding_key_column = Some(value.parse::<usize>().unwrap());
continue;
}
};
debug!("Waiting for connection from pool");
@@ -792,7 +804,7 @@ where
// If the client is in session mode, no more custom protocol
// commands will be accepted.
loop {
let mut message = if message.len() == 0 {
let message = if message.len() == 0 {
trace!("Waiting for message inside transaction or in session mode");
match read_message(&mut self.read).await {
@@ -811,10 +823,26 @@ where
msg
};
// The message will be forwarded to the server intact. We still would like to
// parse it below to figure out what to do with it.
let original = message.clone();
match self.handle_message(&pool, server, &address, message).await? {
Some(done) => if done { break; },
None => return Ok(()),
};
}
// The server is no longer bound to us, we can't cancel it's queries anymore.
debug!("Releasing server back into the pool");
server.checkin_cleanup().await?;
self.stats.server_idle(server.process_id(), address.id);
self.connected_to_server = false;
self.release();
self.stats.client_idle(self.process_id, address.id);
}
}
async fn handle_message(&mut self, pool: &ConnectionPool, server: &mut Server, address: &Address, mut message: BytesMut) -> Result<Option<bool>, Error> {
let original = message.clone();
let code = message.get_u8() as char;
let _len = message.get_i32() as usize;
@@ -835,7 +863,7 @@ where
// Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects.
if self.transaction_mode {
break;
return Ok(Some(true));
}
}
}
@@ -845,7 +873,7 @@ where
server.checkin_cleanup().await?;
self.release();
return Ok(());
return Ok(None);
}
// Parse
@@ -913,7 +941,7 @@ where
// Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects.
if self.transaction_mode {
break;
return Ok(Some(true));
}
}
}
@@ -948,7 +976,7 @@ where
// Release server back to the pool if we are in transaction mode.
// If we are in session mode, we keep the server until the client disconnects.
if self.transaction_mode {
break;
return Ok(Some(true));
}
}
}
@@ -958,18 +986,9 @@ where
_ => {
error!("Unexpected code: {}", code);
}
}
}
};
// The server is no longer bound to us, we can't cancel it's queries anymore.
debug!("Releasing server back into the pool");
server.checkin_cleanup().await?;
self.stats.server_idle(server.process_id(), address.id);
self.connected_to_server = false;
self.release();
self.stats.client_idle(self.process_id, address.id);
}
Ok(Some(false))
}
/// Release the server from the client: it can't cancel its queries anymore.

View File

@@ -13,7 +13,7 @@ use crate::pool::PoolSettings;
use crate::sharding::Sharder;
/// Regexes used to parse custom commands.
const CUSTOM_SQL_REGEXES: [&str; 7] = [
const CUSTOM_SQL_REGEXES: [&str; 8] = [
r"(?i)^ *SET SHARDING KEY TO '?([0-9]+)'? *;? *$",
r"(?i)^ *SET SHARD TO '?([0-9]+|ANY)'? *;? *$",
r"(?i)^ *SHOW SHARD *;? *$",
@@ -21,6 +21,7 @@ const CUSTOM_SQL_REGEXES: [&str; 7] = [
r"(?i)^ *SHOW SERVER ROLE *;? *$",
r"(?i)^ *SET PRIMARY READS TO '?(on|off|default)'? *;? *$",
r"(?i)^ *SHOW PRIMARY READS *;? *$",
r"(?i)^ *SHARDED_COPY '?([0-9]+)'? *;? *$",
];
/// Custom commands.
@@ -33,6 +34,7 @@ pub enum Command {
ShowServerRole,
SetPrimaryReads,
ShowPrimaryReads,
StartShardedCopy,
}
/// Quickly test for match when a query is received.
@@ -57,6 +59,9 @@ pub struct QueryRouter {
/// Pool configuration.
pool_settings: PoolSettings,
// Sharding key column
sharding_key_column: Option<usize>,
}
impl QueryRouter {
@@ -98,6 +103,7 @@ impl QueryRouter {
query_parser_enabled: false,
primary_reads_enabled: false,
pool_settings: PoolSettings::default(),
sharding_key_column: None,
}
}
@@ -145,6 +151,7 @@ impl QueryRouter {
4 => Command::ShowServerRole,
5 => Command::SetPrimaryReads,
6 => Command::ShowPrimaryReads,
7 => Command::StartShardedCopy,
_ => unreachable!(),
};
@@ -152,7 +159,8 @@ impl QueryRouter {
Command::SetShardingKey
| Command::SetShard
| Command::SetServerRole
| Command::SetPrimaryReads => {
| Command::SetPrimaryReads
| Command::StartShardedCopy => {
// Capture value. I know this re-runs the regex engine, but I haven't
// figured out a better way just yet. I think I can write a single Regex
// that matches all 5 custom SQL patterns, but maybe that's not very legible?
@@ -204,6 +212,13 @@ impl QueryRouter {
};
}
Command::StartShardedCopy => {
self.sharding_key_column = match value.parse::<usize>() {
Ok(value) => Some(value),
Err(_) => return None,
}
}
Command::SetServerRole => {
self.active_role = match value.to_ascii_lowercase().as_ref() {
"primary" => {