diff --git a/src/client.rs b/src/client.rs index 23392b7..3fd03e4 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1204,9 +1204,12 @@ where if !server.in_transaction() { // Report transaction executed statistics. self.stats.transaction(); - server - .stats() - .transaction(self.server_parameters.get_application_name()); + server.stats().transaction( + Instant::now() + .duration_since(server.transaction_start().into()) + .as_millis() as u64, + self.server_parameters.get_application_name(), + ); // Release server back to the pool if we are in transaction mode. // If we are in session mode, we keep the server until the client disconnects. @@ -1460,9 +1463,12 @@ where if !server.in_transaction() { self.stats.transaction(); - server - .stats() - .transaction(self.server_parameters.get_application_name()); + server.stats().transaction( + Instant::now() + .duration_since(server.transaction_start().into()) + .as_millis() as u64, + self.server_parameters.get_application_name(), + ); // Release server back to the pool if we are in transaction mode. // If we are in session mode, we keep the server until the client disconnects. @@ -1511,9 +1517,12 @@ where if !server.in_transaction() { self.stats.transaction(); - server - .stats() - .transaction(self.server_parameters.get_application_name()); + server.stats().transaction( + Instant::now() + .duration_since(server.transaction_start().into()) + .as_millis() as u64, + self.server_parameters.get_application_name(), + ); // Release server back to the pool if we are in transaction mode. // If we are in session mode, we keep the server until the client disconnects. diff --git a/src/server.rs b/src/server.rs index 882450e..df43bfe 100644 --- a/src/server.rs +++ b/src/server.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use std::time::SystemTime; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, BufStream}; use tokio::net::TcpStream; +use tokio::time::Instant; use tokio_rustls::rustls::{OwnedTrustAnchor, RootCertStore}; use tokio_rustls::{client::TlsStream, TlsConnector}; @@ -285,6 +286,9 @@ pub struct Server { /// Is the server inside a transaction or idle. in_transaction: bool, + /// The time the most recent transaction started. + transaction_start: Instant, + /// Is there more data for the client to read. data_available: bool, @@ -804,6 +808,7 @@ impl Server { process_id, secret_key, in_transaction: false, + transaction_start: Instant::now(), in_copy_mode: false, data_available: false, bad: false, @@ -936,6 +941,7 @@ impl Server { // In transaction. 'T' => { self.in_transaction = true; + self.transaction_start = Instant::now(); } // Idle, transaction over. @@ -1220,6 +1226,12 @@ impl Server { self.in_transaction } + /// The start time of the most recent transaction. + /// Will be stale if not in a transaction. + pub fn transaction_start(&self) -> Instant { + self.transaction_start + } + /// Currently copying data from client to server or vice-versa. pub fn in_copy_mode(&self) -> bool { self.in_copy_mode diff --git a/src/stats/server.rs b/src/stats/server.rs index 5d25599..0de8e93 100644 --- a/src/stats/server.rs +++ b/src/stats/server.rs @@ -187,11 +187,12 @@ impl ServerStats { /// we report each individual queries outside a transaction as a transaction /// We only count the initial BEGIN as a transaction, all queries within do not /// count as transactions - pub fn transaction(&self, application_name: &str) { + pub fn transaction(&self, milliseconds: u64, application_name: &str) { self.set_application(application_name.to_string()); self.transaction_count.fetch_add(1, Ordering::Relaxed); self.address.stats.xact_count_add(); + self.address.stats.xact_time_add(milliseconds); } /// Report data sent to a server diff --git a/tests/ruby/stats_spec.rb b/tests/ruby/stats_spec.rb index 8a683a0..fff8be4 100644 --- a/tests/ruby/stats_spec.rb +++ b/tests/ruby/stats_spec.rb @@ -16,7 +16,7 @@ describe "Stats" do it "updates *_query_time and *_wait_time" do connections = Array.new(3) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } connections.each do |c| - Thread.new { c.async_exec("SELECT pg_sleep(0.25)") } + Thread.new { c.async_exec("BEGIN; SELECT pg_sleep(0.25); COMMIT;") } end sleep(1) connections.map(&:close) @@ -25,10 +25,32 @@ describe "Stats" do sleep(15.5) admin_conn = PG::connect(processes.pgcat.admin_connection_string) results = admin_conn.async_exec("SHOW STATS")[0] - admin_conn.close expect(results["total_query_time"].to_i).to be_within(200).of(750) expect(results["avg_query_time"].to_i).to be_within(50).of(250) + expect(results["total_xact_time"].to_i).to be_within(200).of(750) + expect(results["avg_xact_time"].to_i).to be_within(50).of(250) + + expect(results["total_wait_time"].to_i).to_not eq(0) + expect(results["avg_wait_time"].to_i).to_not eq(0) + + connections = Array.new(3) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } + connections.each do |c| + Thread.new { c.async_exec("SELECT pg_sleep(0.25);") } + end + sleep(1) + connections.map(&:close) + + results = admin_conn.async_exec("SHOW STATS")[0] + admin_conn.close + # This should increase with more queries + expect(results["total_query_time"].to_i).to be_within(400).of(1500) + expect(results["avg_query_time"].to_i).to be_within(50).of(250) + + # This should not increase as we did not run any additional transactions + expect(results["total_xact_time"].to_i).to be_within(200).of(750) + expect(results["avg_xact_time"].to_i).to be_within(50).of(250) + expect(results["total_wait_time"].to_i).to_not eq(0) expect(results["avg_wait_time"].to_i).to_not eq(0) end