mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-23 01:16:30 +00:00
Protocol-level test helpers (#393)
I needed to have granular control over protocol message testing. For example, being able to send protocol messages one-by-one and then be able to inspect the results. In order to do that, I created this low-level ruby client that can be used to send protocol messages in any order without blocking and also allows inspection of response messages.
This commit is contained in:
committed by
GitHub
parent
faa9c1f64a
commit
7ddd23b514
259
tests/ruby/helpers/pg_socket.rb
Normal file
259
tests/ruby/helpers/pg_socket.rb
Normal file
@@ -0,0 +1,259 @@
|
||||
require 'socket'
|
||||
require 'digest/md5'
|
||||
|
||||
BACKEND_MESSAGE_CODES = {
|
||||
'Z' => "ReadyForQuery",
|
||||
'C' => "CommandComplete",
|
||||
'T' => "RowDescription",
|
||||
'D' => "DataRow",
|
||||
'1' => "ParseComplete",
|
||||
'2' => "BindComplete",
|
||||
'E' => "ErrorResponse",
|
||||
's' => "PortalSuspended",
|
||||
}
|
||||
|
||||
class PostgresSocket
|
||||
def initialize(host, port)
|
||||
@port = port
|
||||
@host = host
|
||||
@socket = TCPSocket.new @host, @port
|
||||
@parameters = {}
|
||||
@verbose = true
|
||||
end
|
||||
|
||||
def send_md5_password_message(username, password, salt)
|
||||
m = Digest::MD5.hexdigest(password + username)
|
||||
m = Digest::MD5.hexdigest(m + salt.map(&:chr).join(""))
|
||||
m = 'md5' + m
|
||||
bytes = (m.split("").map(&:ord) + [0]).flatten
|
||||
message_size = bytes.count + 4
|
||||
|
||||
message = []
|
||||
|
||||
message << 'p'.ord
|
||||
message << [message_size].pack('l>').unpack('CCCC') # 4
|
||||
message << bytes
|
||||
message.flatten!
|
||||
|
||||
|
||||
@socket.write(message.pack('C*'))
|
||||
end
|
||||
|
||||
def send_startup_message(username, database, password)
|
||||
message = []
|
||||
|
||||
message << [196608].pack('l>').unpack('CCCC') # 4
|
||||
message << "user".split('').map(&:ord) # 4, 8
|
||||
message << 0 # 1, 9
|
||||
message << username.split('').map(&:ord) # 2, 11
|
||||
message << 0 # 1, 12
|
||||
message << "database".split('').map(&:ord) # 8, 20
|
||||
message << 0 # 1, 21
|
||||
message << database.split('').map(&:ord) # 2, 23
|
||||
message << 0 # 1, 24
|
||||
message << 0 # 1, 25
|
||||
message.flatten!
|
||||
|
||||
total_message_size = message.size + 4
|
||||
|
||||
message_len = [total_message_size].pack('l>').unpack('CCCC')
|
||||
|
||||
@socket.write([message_len + message].flatten.pack('C*'))
|
||||
|
||||
sleep 0.1
|
||||
|
||||
read_startup_response(username, password)
|
||||
end
|
||||
|
||||
def read_startup_response(username, password)
|
||||
message_code, message_len = @socket.recv(5).unpack("al>")
|
||||
while message_code == 'R'
|
||||
auth_code = @socket.recv(4).unpack('l>').pop
|
||||
case auth_code
|
||||
when 5 # md5
|
||||
salt = @socket.recv(4).unpack('CCCC')
|
||||
send_md5_password_message(username, password, salt)
|
||||
message_code, message_len = @socket.recv(5).unpack("al>")
|
||||
when 0 # trust
|
||||
break
|
||||
end
|
||||
end
|
||||
loop do
|
||||
message_code, message_len = @socket.recv(5).unpack("al>")
|
||||
if message_code == 'Z'
|
||||
@socket.recv(1).unpack("a") # most likely I
|
||||
break # We are good to go
|
||||
end
|
||||
if message_code == 'S'
|
||||
actual_message = @socket.recv(message_len - 4).unpack("C*")
|
||||
k,v = actual_message.pack('U*').split(/\x00/)
|
||||
@parameters[k] = v
|
||||
end
|
||||
if message_code == 'K'
|
||||
process_id, secret_key = @socket.recv(message_len - 4).unpack("l>l>")
|
||||
@parameters["process_id"] = process_id
|
||||
@parameters["secret_key"] = secret_key
|
||||
end
|
||||
end
|
||||
return @parameters
|
||||
end
|
||||
|
||||
def cancel_query
|
||||
socket = TCPSocket.new @host, @port
|
||||
process_key = @parameters["process_id"]
|
||||
secret_key = @parameters["secret_key"]
|
||||
message = []
|
||||
message << [16].pack('l>').unpack('CCCC') # 4
|
||||
message << [80877102].pack('l>').unpack('CCCC') # 4
|
||||
message << [process_key.to_i].pack('l>').unpack('CCCC') # 4
|
||||
message << [secret_key.to_i].pack('l>').unpack('CCCC') # 4
|
||||
message.flatten!
|
||||
socket.write(message.flatten.pack('C*'))
|
||||
socket.close
|
||||
log "[F] Sent CancelRequest message"
|
||||
end
|
||||
|
||||
def send_query_message(query)
|
||||
query_size = query.length
|
||||
message_size = 1 + 4 + query_size
|
||||
message = []
|
||||
message << "Q".ord
|
||||
message << [message_size].pack('l>').unpack('CCCC') # 4
|
||||
message << query.split('').map(&:ord) # 2, 11
|
||||
message << 0 # 1, 12
|
||||
message.flatten!
|
||||
@socket.write(message.flatten.pack('C*'))
|
||||
log "[F] Sent Q message (#{query})"
|
||||
end
|
||||
|
||||
def send_parse_message(query)
|
||||
query_size = query.length
|
||||
message_size = 2 + 2 + 4 + query_size
|
||||
message = []
|
||||
message << "P".ord
|
||||
message << [message_size].pack('l>').unpack('CCCC') # 4
|
||||
message << 0 # unnamed statement
|
||||
message << query.split('').map(&:ord) # 2, 11
|
||||
message << 0 # 1, 12
|
||||
message << [0, 0]
|
||||
message.flatten!
|
||||
@socket.write(message.flatten.pack('C*'))
|
||||
log "[F] Sent P message (#{query})"
|
||||
end
|
||||
|
||||
def send_bind_message
|
||||
message = []
|
||||
message << "B".ord
|
||||
message << [12].pack('l>').unpack('CCCC') # 4
|
||||
message << 0 # unnamed statement
|
||||
message << 0 # unnamed statement
|
||||
message << [0, 0] # 2
|
||||
message << [0, 0] # 2
|
||||
message << [0, 0] # 2
|
||||
message.flatten!
|
||||
@socket.write(message.flatten.pack('C*'))
|
||||
log "[F] Sent B message"
|
||||
end
|
||||
|
||||
def send_describe_message(mode)
|
||||
message = []
|
||||
message << "D".ord
|
||||
message << [6].pack('l>').unpack('CCCC') # 4
|
||||
message << mode.ord
|
||||
message << 0 # unnamed statement
|
||||
message.flatten!
|
||||
@socket.write(message.flatten.pack('C*'))
|
||||
log "[F] Sent D message"
|
||||
end
|
||||
|
||||
def send_execute_message(limit=0)
|
||||
message = []
|
||||
message << "E".ord
|
||||
message << [9].pack('l>').unpack('CCCC') # 4
|
||||
message << 0 # unnamed statement
|
||||
message << [limit].pack('l>').unpack('CCCC') # 4
|
||||
message.flatten!
|
||||
@socket.write(message.flatten.pack('C*'))
|
||||
log "[F] Sent E message"
|
||||
end
|
||||
|
||||
def send_sync_message
|
||||
message = []
|
||||
message << "S".ord
|
||||
message << [4].pack('l>').unpack('CCCC') # 4
|
||||
message.flatten!
|
||||
@socket.write(message.flatten.pack('C*'))
|
||||
log "[F] Sent S message"
|
||||
end
|
||||
|
||||
def send_copydone_message
|
||||
message = []
|
||||
message << "c".ord
|
||||
message << [4].pack('l>').unpack('CCCC') # 4
|
||||
message.flatten!
|
||||
@socket.write(message.flatten.pack('C*'))
|
||||
log "[F] Sent c message"
|
||||
end
|
||||
|
||||
def send_copyfail_message
|
||||
message = []
|
||||
message << "f".ord
|
||||
message << [5].pack('l>').unpack('CCCC') # 4
|
||||
message << 0
|
||||
message.flatten!
|
||||
@socket.write(message.flatten.pack('C*'))
|
||||
log "[F] Sent f message"
|
||||
end
|
||||
|
||||
def send_flush_message
|
||||
message = []
|
||||
message << "H".ord
|
||||
message << [4].pack('l>').unpack('CCCC') # 4
|
||||
message.flatten!
|
||||
@socket.write(message.flatten.pack('C*'))
|
||||
log "[F] Sent H message"
|
||||
end
|
||||
|
||||
def read_from_server()
|
||||
output_messages = []
|
||||
retry_count = 0
|
||||
message_code = nil
|
||||
message_len = 0
|
||||
loop do
|
||||
begin
|
||||
message_code, message_len = @socket.recv_nonblock(5).unpack("al>")
|
||||
rescue IO::WaitReadable
|
||||
return output_messages if retry_count > 50
|
||||
|
||||
retry_count += 1
|
||||
sleep(0.01)
|
||||
next
|
||||
end
|
||||
message = {
|
||||
code: message_code,
|
||||
len: message_len,
|
||||
bytes: []
|
||||
}
|
||||
log "[B] #{BACKEND_MESSAGE_CODES[message_code] || ('UnknownMessage(' + message_code + ')')}"
|
||||
|
||||
actual_message_length = message_len - 4
|
||||
if actual_message_length > 0
|
||||
message[:bytes] = @socket.recv(message_len - 4).unpack("C*")
|
||||
log "\t#{message[:bytes].join(",")}"
|
||||
log "\t#{message[:bytes].map(&:chr).join(" ")}"
|
||||
end
|
||||
output_messages << message
|
||||
return output_messages if message_code == 'Z'
|
||||
end
|
||||
end
|
||||
|
||||
def log(msg)
|
||||
return unless @verbose
|
||||
|
||||
puts msg
|
||||
end
|
||||
|
||||
def close
|
||||
@socket.close
|
||||
end
|
||||
end
|
||||
@@ -2,6 +2,7 @@ require 'json'
|
||||
require 'ostruct'
|
||||
require_relative 'pgcat_process'
|
||||
require_relative 'pg_instance'
|
||||
require_relative 'pg_socket'
|
||||
|
||||
class ::Hash
|
||||
def deep_merge(second)
|
||||
|
||||
155
tests/ruby/protocol_spec.rb
Normal file
155
tests/ruby/protocol_spec.rb
Normal file
@@ -0,0 +1,155 @@
|
||||
# frozen_string_literal: true
|
||||
require_relative 'spec_helper'
|
||||
|
||||
|
||||
describe "Portocol handling" do
|
||||
let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 1, "session") }
|
||||
let(:sequence) { [] }
|
||||
let(:pgcat_socket) { PostgresSocket.new('localhost', processes.pgcat.port) }
|
||||
let(:pgdb_socket) { PostgresSocket.new('localhost', processes.all_databases.first.port) }
|
||||
|
||||
after do
|
||||
pgdb_socket.close
|
||||
pgcat_socket.close
|
||||
processes.all_databases.map(&:reset)
|
||||
processes.pgcat.shutdown
|
||||
end
|
||||
|
||||
def run_comparison(sequence, socket_a, socket_b)
|
||||
sequence.each do |msg, *args|
|
||||
socket_a.send(msg, *args)
|
||||
socket_b.send(msg, *args)
|
||||
|
||||
compare_messages(
|
||||
socket_a.read_from_server,
|
||||
socket_b.read_from_server
|
||||
)
|
||||
end
|
||||
end
|
||||
|
||||
def compare_messages(msg_arr0, msg_arr1)
|
||||
if msg_arr0.count != msg_arr1.count
|
||||
error_output = []
|
||||
|
||||
error_output << "#{msg_arr0.count} : #{msg_arr1.count}"
|
||||
error_output << "PgCat Messages"
|
||||
error_output += msg_arr0.map { |message| "\t#{message[:code]} - #{message[:bytes].map(&:chr).join(" ")}" }
|
||||
error_output << "PgServer Messages"
|
||||
error_output += msg_arr1.map { |message| "\t#{message[:code]} - #{message[:bytes].map(&:chr).join(" ")}" }
|
||||
error_desc = error_output.join("\n")
|
||||
raise StandardError, "Message count mismatch #{error_desc}"
|
||||
end
|
||||
|
||||
(0..msg_arr0.count - 1).all? do |i|
|
||||
msg0 = msg_arr0[i]
|
||||
msg1 = msg_arr1[i]
|
||||
|
||||
result = [
|
||||
msg0[:code] == msg1[:code],
|
||||
msg0[:len] == msg1[:len],
|
||||
msg0[:bytes] == msg1[:bytes],
|
||||
].all?
|
||||
|
||||
next result if result
|
||||
|
||||
if result == false
|
||||
error_string = []
|
||||
if msg0[:code] != msg1[:code]
|
||||
error_string << "code #{msg0[:code]} != #{msg1[:code]}"
|
||||
end
|
||||
if msg0[:len] != msg1[:len]
|
||||
error_string << "len #{msg0[:len]} != #{msg1[:len]}"
|
||||
end
|
||||
if msg0[:bytes] != msg1[:bytes]
|
||||
error_string << "bytes #{msg0[:bytes]} != #{msg1[:bytes]}"
|
||||
end
|
||||
err = error_string.join("\n")
|
||||
|
||||
raise StandardError, "Message mismatch #{err}"
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
RSpec.shared_examples "at parity with database" do
|
||||
before do
|
||||
pgcat_socket.send_startup_message("sharding_user", "sharded_db", "sharding_user")
|
||||
pgdb_socket.send_startup_message("sharding_user", "shard0", "sharding_user")
|
||||
end
|
||||
|
||||
it "works" do
|
||||
run_comparison(sequence, pgcat_socket, pgdb_socket)
|
||||
end
|
||||
end
|
||||
|
||||
context "Cancel Query" do
|
||||
let(:sequence) {
|
||||
[
|
||||
[:send_query_message, "SELECT pg_sleep(5)"],
|
||||
[:cancel_query]
|
||||
]
|
||||
}
|
||||
|
||||
it_behaves_like "at parity with database"
|
||||
end
|
||||
|
||||
xcontext "Simple query after parse" do
|
||||
let(:sequence) {
|
||||
[
|
||||
[:send_parse_message, "SELECT 5"],
|
||||
[:send_query_message, "SELECT 1"],
|
||||
[:send_bind_message],
|
||||
[:send_describe_message, "P"],
|
||||
[:send_execute_message],
|
||||
[:send_sync_message],
|
||||
]
|
||||
}
|
||||
|
||||
# Known to fail due to PgCat not supporting flush
|
||||
it_behaves_like "at parity with database"
|
||||
end
|
||||
|
||||
xcontext "Flush message" do
|
||||
let(:sequence) {
|
||||
[
|
||||
[:send_parse_message, "SELECT 1"],
|
||||
[:send_flush_message]
|
||||
]
|
||||
}
|
||||
|
||||
# Known to fail due to PgCat not supporting flush
|
||||
it_behaves_like "at parity with database"
|
||||
end
|
||||
|
||||
xcontext "Bind without parse" do
|
||||
let(:sequence) {
|
||||
[
|
||||
[:send_bind_message]
|
||||
]
|
||||
}
|
||||
# This is known to fail.
|
||||
# Server responds immediately, Proxy buffers the message
|
||||
it_behaves_like "at parity with database"
|
||||
end
|
||||
|
||||
context "Simple message" do
|
||||
let(:sequence) {
|
||||
[[:send_query_message, "SELECT 1"]]
|
||||
}
|
||||
|
||||
it_behaves_like "at parity with database"
|
||||
end
|
||||
|
||||
context "Extended protocol" do
|
||||
let(:sequence) {
|
||||
[
|
||||
[:send_parse_message, "SELECT 1"],
|
||||
[:send_bind_message],
|
||||
[:send_describe_message, "P"],
|
||||
[:send_execute_message],
|
||||
[:send_sync_message],
|
||||
]
|
||||
}
|
||||
|
||||
it_behaves_like "at parity with database"
|
||||
end
|
||||
end
|
||||
Reference in New Issue
Block a user