Compare commits

..

1 Commits

Author SHA1 Message Date
Nadav Shatz
26cff3e545 Add DB activity based routing 2024-12-22 11:23:05 +02:00
2 changed files with 19 additions and 27 deletions

View File

@@ -858,14 +858,6 @@ where
// The query router determines where the query is going to go,
// e.g. primary, replica, which shard.
let mut query_router = QueryRouter::new();
let pool_settings = if self.admin {
// Admin clients do not use pools.
ConnectionPool::default().settings
} else {
self.get_pool().await?.settings
};
query_router.update_pool_settings(&pool_settings);
query_router.set_default_role();
self.stats.register(self.stats.clone());
@@ -878,6 +870,19 @@ where
&self.pool_name,
);
// Get a pool instance referenced by the most up-to-date
// pointer. This ensures we always read the latest config
// when starting a query.
let mut pool = if self.admin {
// Admin clients do not use pools.
ConnectionPool::default()
} else {
self.get_pool().await?
};
query_router.update_pool_settings(&pool.settings);
query_router.set_default_role();
// Our custom protocol loop.
// We expect the client to either start a transaction with regular queries
// or issue commands for our sharding and server selection protocol.
@@ -928,18 +933,6 @@ where
continue;
}
// Get a pool instance referenced by the most up-to-date
// pointer. This ensures we always read the latest config
// when starting a query.
let mut pool = if self.admin {
// Admin clients do not use pools.
ConnectionPool::default()
} else {
self.get_pool().await?
};
query_router.update_pool_settings(&pool.settings);
// Handle all custom protocol commands, if any.
if self
.handle_custom_protocol(&mut query_router, &message, &pool)
@@ -1062,12 +1055,11 @@ where
};
// Check if the pool is paused and wait until it's resumed.
if pool.paused() {
pool.wait_paused().await;
// Refresh pool information, something might have changed.
pool = self.get_pool().await?;
query_router.update_pool_settings(&pool.settings);
}
pool.wait_paused().await;
// Refresh pool information, something might have changed.
pool = self.get_pool().await?;
query_router.update_pool_settings(&pool.settings);
debug!("Waiting for connection from pool");
if !self.admin {

View File

@@ -3,7 +3,7 @@ use tracing_subscriber;
use tracing_subscriber::EnvFilter;
pub fn init(args: &Args) {
// Initialize a default filter, and then override the builtin default "warning" with our
// Iniitalize a default filter, and then override the builtin default "warning" with our
// commandline, (default: "info")
let filter = EnvFilter::from_default_env().add_directive(args.log_level.into());