mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-23 17:36:28 +00:00
This is an implementation of Query mirroring in PgCat (outlined here #302) In configs, we match mirror hosts with the servers handling the traffic. A mirror host will receive the same protocol messages as the main server it was matched with. This is done by creating an async task for each mirror server, it communicates with the main server through two channels, one for the protocol messages and one for the exit signal. The mirror server sends the protocol packets to the underlying PostgreSQL server. We receive from the underlying PostgreSQL server as soon as the data is available and we immediately discard it. We use bb8 to manage the life cycle of the connection, not for pooling since each mirror server handler is more or less single-threaded. We don't have any connection pooling in the mirrors. Matching each mirror connection to an actual server connection guarantees that we will not have more connections to any of the mirrors than the parent pool would allow.
95 lines
2.6 KiB
Ruby
95 lines
2.6 KiB
Ruby
require 'pg'
|
|
require 'toxiproxy'
|
|
|
|
class PgInstance
|
|
attr_reader :port
|
|
attr_reader :username
|
|
attr_reader :password
|
|
attr_reader :database_name
|
|
|
|
def initialize(port, username, password, database_name)
|
|
@original_port = port
|
|
@toxiproxy_port = 10000 + port.to_i
|
|
@port = @toxiproxy_port
|
|
|
|
@username = username
|
|
@password = password
|
|
@database_name = database_name
|
|
@toxiproxy_name = "database_#{@original_port}"
|
|
Toxiproxy.populate([{
|
|
name: @toxiproxy_name,
|
|
listen: "0.0.0.0:#{@toxiproxy_port}",
|
|
upstream: "localhost:#{@original_port}",
|
|
}])
|
|
|
|
# Toxiproxy server will outlive our PgInstance objects
|
|
# so we want to destroy our proxies before exiting
|
|
# Ruby finalizer is ideal for doing this
|
|
ObjectSpace.define_finalizer(@toxiproxy_name, proc { Toxiproxy[@toxiproxy_name].destroy })
|
|
end
|
|
|
|
def with_connection
|
|
conn = PG.connect("postgres://#{@username}:#{@password}@localhost:#{port}/#{database_name}")
|
|
yield conn
|
|
ensure
|
|
conn&.close
|
|
end
|
|
|
|
def reset
|
|
reset_toxics
|
|
reset_stats
|
|
drop_connections
|
|
sleep 0.1
|
|
end
|
|
|
|
def toxiproxy
|
|
Toxiproxy[@toxiproxy_name]
|
|
end
|
|
|
|
def take_down
|
|
if block_given?
|
|
Toxiproxy[@toxiproxy_name].toxic(:limit_data, bytes: 5).apply { yield }
|
|
else
|
|
Toxiproxy[@toxiproxy_name].toxic(:limit_data, bytes: 5).toxics.each(&:save)
|
|
end
|
|
end
|
|
|
|
def add_latency(latency)
|
|
if block_given?
|
|
Toxiproxy[@toxiproxy_name].toxic(:latency, latency: latency).apply { yield }
|
|
else
|
|
Toxiproxy[@toxiproxy_name].toxic(:latency, latency: latency).toxics.each(&:save)
|
|
end
|
|
end
|
|
|
|
def delete_proxy
|
|
Toxiproxy[@toxiproxy_name].delete
|
|
end
|
|
|
|
def reset_toxics
|
|
Toxiproxy[@toxiproxy_name].toxics.each(&:destroy)
|
|
sleep 0.1
|
|
end
|
|
|
|
def reset_stats
|
|
with_connection { |c| c.async_exec("SELECT pg_stat_statements_reset()") }
|
|
end
|
|
|
|
def drop_connections
|
|
username = with_connection { |c| c.async_exec("SELECT current_user")[0]["current_user"] }
|
|
with_connection { |c| c.async_exec("SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE pid <> pg_backend_pid() AND usename='#{username}'") }
|
|
end
|
|
|
|
def count_connections
|
|
with_connection { |c| c.async_exec("SELECT COUNT(*) as count FROM pg_stat_activity")[0]["count"].to_i }
|
|
end
|
|
|
|
def count_query(query)
|
|
with_connection { |c| c.async_exec("SELECT SUM(calls) FROM pg_stat_statements WHERE query = '#{query}'")[0]["sum"].to_i }
|
|
end
|
|
|
|
def count_select_1_plus_2
|
|
with_connection { |c| c.async_exec("SELECT SUM(calls) FROM pg_stat_statements WHERE query = 'SELECT $1 + $2'")[0]["sum"].to_i }
|
|
end
|
|
end
|