mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-27 18:56:30 +00:00
Report Query times (#166)
* Report avg and total query timing * Report query times * fmt
This commit is contained in:
committed by
GitHub
parent
4ae1bc8d32
commit
f7a951745c
@@ -2,6 +2,7 @@
|
|||||||
use bytes::{Buf, BufMut, BytesMut};
|
use bytes::{Buf, BufMut, BytesMut};
|
||||||
use log::{debug, error, info, trace};
|
use log::{debug, error, info, trace};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
use std::time::Instant;
|
||||||
use tokio::io::{split, AsyncReadExt, BufReader, ReadHalf, WriteHalf};
|
use tokio::io::{split, AsyncReadExt, BufReader, ReadHalf, WriteHalf};
|
||||||
use tokio::net::TcpStream;
|
use tokio::net::TcpStream;
|
||||||
use tokio::sync::broadcast::Receiver;
|
use tokio::sync::broadcast::Receiver;
|
||||||
@@ -994,6 +995,7 @@ where
|
|||||||
self.send_server_message(server, message, &address, &pool)
|
self.send_server_message(server, message, &address, &pool)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
let query_start = Instant::now();
|
||||||
// Read all data the server has to offer, which can be multiple messages
|
// Read all data the server has to offer, which can be multiple messages
|
||||||
// buffered in 8196 bytes chunks.
|
// buffered in 8196 bytes chunks.
|
||||||
loop {
|
loop {
|
||||||
@@ -1013,7 +1015,11 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Report query executed statistics.
|
// Report query executed statistics.
|
||||||
self.stats.query(self.process_id, server.server_id());
|
self.stats.query(
|
||||||
|
self.process_id,
|
||||||
|
server.server_id(),
|
||||||
|
Instant::now().duration_since(query_start).as_millis(),
|
||||||
|
);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
11
src/stats.rs
11
src/stats.rs
@@ -9,7 +9,6 @@ use tokio::sync::mpsc::{channel, Receiver, Sender};
|
|||||||
use tokio::time::Instant;
|
use tokio::time::Instant;
|
||||||
|
|
||||||
use crate::pool::{get_all_pools, get_number_of_addresses};
|
use crate::pool::{get_all_pools, get_number_of_addresses};
|
||||||
use crate::server;
|
|
||||||
|
|
||||||
/// Convenience types for various stats
|
/// Convenience types for various stats
|
||||||
type ClientStatesLookup = HashMap<i32, ClientInformation>;
|
type ClientStatesLookup = HashMap<i32, ClientInformation>;
|
||||||
@@ -138,6 +137,7 @@ enum EventName {
|
|||||||
Query {
|
Query {
|
||||||
client_id: i32,
|
client_id: i32,
|
||||||
server_id: i32,
|
server_id: i32,
|
||||||
|
duration_ms: u128,
|
||||||
},
|
},
|
||||||
Transaction {
|
Transaction {
|
||||||
client_id: i32,
|
client_id: i32,
|
||||||
@@ -269,11 +269,12 @@ impl Reporter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Report a query executed by a client against a server
|
/// Report a query executed by a client against a server
|
||||||
pub fn query(&self, client_id: i32, server_id: i32) {
|
pub fn query(&self, client_id: i32, server_id: i32, duration_ms: u128) {
|
||||||
let event = Event {
|
let event = Event {
|
||||||
name: EventName::Query {
|
name: EventName::Query {
|
||||||
client_id,
|
client_id,
|
||||||
server_id,
|
server_id,
|
||||||
|
duration_ms,
|
||||||
},
|
},
|
||||||
value: 1,
|
value: 1,
|
||||||
};
|
};
|
||||||
@@ -562,6 +563,7 @@ impl Collector {
|
|||||||
EventName::Query {
|
EventName::Query {
|
||||||
client_id,
|
client_id,
|
||||||
server_id,
|
server_id,
|
||||||
|
duration_ms,
|
||||||
} => {
|
} => {
|
||||||
// Update client stats
|
// Update client stats
|
||||||
let app_name = match client_states.get_mut(&client_id) {
|
let app_name = match client_states.get_mut(&client_id) {
|
||||||
@@ -585,6 +587,11 @@ impl Collector {
|
|||||||
.entry("total_query_count".to_string())
|
.entry("total_query_count".to_string())
|
||||||
.or_insert(0);
|
.or_insert(0);
|
||||||
*counter += stat.value;
|
*counter += stat.value;
|
||||||
|
|
||||||
|
let duration = pool_stats
|
||||||
|
.entry("total_query_time".to_string())
|
||||||
|
.or_insert(0);
|
||||||
|
*duration += duration_ms as i64;
|
||||||
}
|
}
|
||||||
None => (),
|
None => (),
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -11,6 +11,29 @@ describe "Admin" do
|
|||||||
processes.pgcat.shutdown
|
processes.pgcat.shutdown
|
||||||
end
|
end
|
||||||
|
|
||||||
|
describe "SHOW STATS" do
|
||||||
|
context "clients connect and make one query" do
|
||||||
|
it "updates *_query_time and *_wait_time" do
|
||||||
|
connection = PG::connect("#{pgcat_conn_str}?application_name=one_query")
|
||||||
|
connection.async_exec("SELECT pg_sleep(0.25)")
|
||||||
|
connection.async_exec("SELECT pg_sleep(0.25)")
|
||||||
|
connection.async_exec("SELECT pg_sleep(0.25)")
|
||||||
|
connection.close
|
||||||
|
|
||||||
|
# wait for averages to be calculated, we shouldn't do this too often
|
||||||
|
sleep(15.5)
|
||||||
|
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
|
||||||
|
results = admin_conn.async_exec("SHOW STATS")[0]
|
||||||
|
admin_conn.close
|
||||||
|
expect(results["total_query_time"].to_i).to be_within(200).of(750)
|
||||||
|
expect(results["avg_query_time"].to_i).to_not eq(0)
|
||||||
|
|
||||||
|
expect(results["total_wait_time"].to_i).to_not eq(0)
|
||||||
|
expect(results["avg_wait_time"].to_i).to_not eq(0)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
describe "SHOW POOLS" do
|
describe "SHOW POOLS" do
|
||||||
context "bad credentials" do
|
context "bad credentials" do
|
||||||
it "does not change any stats" do
|
it "does not change any stats" do
|
||||||
|
|||||||
Reference in New Issue
Block a user