diff --git a/src/errors.rs b/src/errors.rs index 34f7da2..47510f4 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -5,4 +5,5 @@ pub enum Error { ClientBadStartup, ProtocolSyncError, ServerError, + ServerTimeout, } diff --git a/src/pool.rs b/src/pool.rs index 11ca9a8..783d537 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -43,8 +43,20 @@ impl ManageConnection for ServerPool { } /// Determines if the connection is still connected to the database. - async fn is_valid(&self, _conn: &mut PooledConnection<'_, Self>) -> Result<(), Self::Error> { - Ok(()) + async fn is_valid(&self, conn: &mut PooledConnection<'_, Self>) -> Result<(), Self::Error> { + let server = &mut *conn; + + // If this fails, the connection will be closed and another will be grabbed from the pool quietly :-). + // Failover, step 1, complete. + match tokio::time::timeout( + tokio::time::Duration::from_millis(1000), + server.query("SELECT 1"), + ) + .await + { + Ok(_) => Ok(()), + Err(_err) => Err(Error::ServerTimeout), + } } /// Synchronously determine if the connection is no longer usable, if possible. diff --git a/src/server.rs b/src/server.rs index 7c4eced..baba5cd 100644 --- a/src/server.rs +++ b/src/server.rs @@ -209,8 +209,8 @@ impl Server { self.bad = true; } - pub async fn set_name(&mut self, name: &str) -> Result<(), Error> { - let mut query = BytesMut::from(&format!("SET application_name = {}", name).as_bytes()[..]); + pub async fn query(&mut self, query: &str) -> Result<(), Error> { + let mut query = BytesMut::from(&query.as_bytes()[..]); query.put_u8(0); let len = query.len() as i32 + 4; @@ -226,4 +226,10 @@ impl Server { Ok(()) } + + pub async fn set_name(&mut self, name: &str) -> Result<(), Error> { + Ok(self + .query(&format!("SET application_name = '{}'", name)) + .await?) + } }