mirror of
https://github.com/postgresml/pgcat.git
synced 2026-03-27 18:56:30 +00:00
fixed health check; sharding setup and tests
This commit is contained in:
@@ -66,7 +66,7 @@ either lose 1/x of your traffic or risk losing it all eventually. Ideally you ov
|
|||||||
to make this choice :-).
|
to make this choice :-).
|
||||||
|
|
||||||
### Sharding
|
### Sharding
|
||||||
We're implemeting Postgres' `PARTITION BY HASH` sharding function for `BIGINT` fields. This works well for tables that use `BIGSERIAL` primary key which I think is common enough these days. We can also add many more functions here, but this is a good start. See `src/sharding.rs` and `tests/sharding/setup.sql` for more details on the implementation.
|
We're implemeting Postgres' `PARTITION BY HASH` sharding function for `BIGINT` fields. This works well for tables that use `BIGSERIAL` primary key which I think is common enough these days. We can also add many more functions here, but this is a good start. See `src/sharding.rs` and `tests/sharding/partition_hash_test_setup.sql` for more details on the implementation.
|
||||||
|
|
||||||
The biggest advantage of using this sharding function is that anyone can shard the dataset using Postgres partitions
|
The biggest advantage of using this sharding function is that anyone can shard the dataset using Postgres partitions
|
||||||
while also access it for both reads and writes using this pooler. No custom obscure sharding function is needed and database sharding can be done entirely in Postgres.
|
while also access it for both reads and writes using this pooler. No custom obscure sharding function is needed and database sharding can be done entirely in Postgres.
|
||||||
|
|||||||
16
pgcat.toml
16
pgcat.toml
@@ -32,8 +32,8 @@ ban_time = 60 # Seconds
|
|||||||
#
|
#
|
||||||
# User to use for authentication against the server.
|
# User to use for authentication against the server.
|
||||||
[user]
|
[user]
|
||||||
name = "lev"
|
name = "sharding_user"
|
||||||
password = "lev"
|
password = "sharding_user"
|
||||||
|
|
||||||
|
|
||||||
#
|
#
|
||||||
@@ -49,7 +49,7 @@ servers = [
|
|||||||
[ "localhost", 5432 ],
|
[ "localhost", 5432 ],
|
||||||
]
|
]
|
||||||
# Database name (e.g. "postgres")
|
# Database name (e.g. "postgres")
|
||||||
database = "lev"
|
database = "shard0"
|
||||||
|
|
||||||
[shards.1]
|
[shards.1]
|
||||||
# [ host, port ]
|
# [ host, port ]
|
||||||
@@ -57,4 +57,12 @@ servers = [
|
|||||||
[ "127.0.0.1", 5432 ],
|
[ "127.0.0.1", 5432 ],
|
||||||
[ "localhost", 5432 ],
|
[ "localhost", 5432 ],
|
||||||
]
|
]
|
||||||
database = "lev"
|
database = "shard1"
|
||||||
|
|
||||||
|
[shards.2]
|
||||||
|
# [ host, port ]
|
||||||
|
servers = [
|
||||||
|
[ "127.0.0.1", 5432 ],
|
||||||
|
[ "localhost", 5432 ],
|
||||||
|
]
|
||||||
|
database = "shard2"
|
||||||
18
src/pool.rs
18
src/pool.rs
@@ -185,10 +185,22 @@ impl ConnectionPool {
|
|||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(_) => return Ok((conn, address)),
|
// Check if health check succeeded
|
||||||
|
Ok(res) => match res {
|
||||||
|
Ok(_) => return Ok((conn, address)),
|
||||||
|
Err(_) => {
|
||||||
|
println!(
|
||||||
|
">> Banning replica {} because of failed health check",
|
||||||
|
index
|
||||||
|
);
|
||||||
|
self.ban(&address, shard);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
// Health check never came back, database is really really down
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
println!(
|
println!(
|
||||||
">> Banning replica {} because of failed health check",
|
">> Banning replica {} because of health check timeout",
|
||||||
index
|
index
|
||||||
);
|
);
|
||||||
self.ban(&address, shard);
|
self.ban(&address, shard);
|
||||||
@@ -280,7 +292,7 @@ impl ManageConnection for ServerPool {
|
|||||||
|
|
||||||
/// Attempts to create a new connection.
|
/// Attempts to create a new connection.
|
||||||
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
|
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
|
||||||
println!(">> Getting new connection from the pool");
|
println!(">> Creating a new connection for the pool");
|
||||||
|
|
||||||
Server::startup(
|
Server::startup(
|
||||||
&self.address.host,
|
&self.address.host,
|
||||||
|
|||||||
12
tests/sharding/query_routing.sh
Normal file
12
tests/sharding/query_routing.sh
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
#/bin/bash
|
||||||
|
|
||||||
|
# Setup all the shards.
|
||||||
|
sudo service postgresql restart
|
||||||
|
|
||||||
|
psql -f query_routing_setup.sql
|
||||||
|
|
||||||
|
psql -h 127.0.0.1 -p 6432 -f query_routing_test_insert.sql
|
||||||
|
|
||||||
|
psql -h 127.0.0.1 -p 6432 -f query_routing_test_select.sql
|
||||||
|
|
||||||
|
psql -f query_routing_test_validate.sql
|
||||||
61
tests/sharding/query_routing_setup.sql
Normal file
61
tests/sharding/query_routing_setup.sql
Normal file
@@ -0,0 +1,61 @@
|
|||||||
|
|
||||||
|
DROP DATABASE IF EXISTS shard0;
|
||||||
|
DROP DATABASE IF EXISTS shard1;
|
||||||
|
DROP DATABASE IF EXISTS shard2;
|
||||||
|
|
||||||
|
CREATE DATABASE shard0;
|
||||||
|
CREATE DATABASE shard1;
|
||||||
|
CREATE DATABASE shard2;
|
||||||
|
|
||||||
|
\c shard0
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS data CASCADE;
|
||||||
|
|
||||||
|
CREATE TABLE data (
|
||||||
|
id BIGINT,
|
||||||
|
value VARCHAR
|
||||||
|
) PARTITION BY HASH (id);
|
||||||
|
|
||||||
|
CREATE TABLE data_shard_0 PARTITION OF data FOR VALUES WITH (MODULUS 3, REMAINDER 0);
|
||||||
|
|
||||||
|
\c shard1
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS data CASCADE;
|
||||||
|
|
||||||
|
CREATE TABLE data (
|
||||||
|
id BIGINT,
|
||||||
|
value VARCHAR
|
||||||
|
) PARTITION BY HASH (id);
|
||||||
|
|
||||||
|
CREATE TABLE data_shard_1 PARTITION OF data FOR VALUES WITH (MODULUS 3, REMAINDER 1);
|
||||||
|
|
||||||
|
|
||||||
|
\c shard2
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS data CASCADE;
|
||||||
|
|
||||||
|
CREATE TABLE data (
|
||||||
|
id BIGINT,
|
||||||
|
value VARCHAR
|
||||||
|
) PARTITION BY HASH (id);
|
||||||
|
|
||||||
|
CREATE TABLE data_shard_2 PARTITION OF data FOR VALUES WITH (MODULUS 3, REMAINDER 2);
|
||||||
|
|
||||||
|
DROP ROLE IF EXISTS sharding_user;
|
||||||
|
CREATE ROLE sharding_user ENCRYPTED PASSWORD 'sharding_user' LOGIN;
|
||||||
|
|
||||||
|
GRANT CONNECT ON DATABASE shard0 TO sharding_user;
|
||||||
|
GRANT CONNECT ON DATABASE shard1 TO sharding_user;
|
||||||
|
GRANT CONNECT ON DATABASE shard2 TO sharding_user;
|
||||||
|
|
||||||
|
\c shard0
|
||||||
|
GRANT ALL ON SCHEMA public TO sharding_user;
|
||||||
|
GRANT ALL ON TABLE data TO sharding_user;
|
||||||
|
|
||||||
|
\c shard1
|
||||||
|
GRANT ALL ON SCHEMA public TO sharding_user;
|
||||||
|
GRANT ALL ON TABLE data TO sharding_user;
|
||||||
|
|
||||||
|
\c shard2
|
||||||
|
GRANT ALL ON SCHEMA public TO sharding_user;
|
||||||
|
GRANT ALL ON TABLE data TO sharding_user;
|
||||||
47
tests/sharding/query_routing_test_insert.sql
Normal file
47
tests/sharding/query_routing_test_insert.sql
Normal file
@@ -0,0 +1,47 @@
|
|||||||
|
SET SHARDING KEY TO '1';
|
||||||
|
INSERT INTO data (id, value) VALUES (1, 'value_1');
|
||||||
|
|
||||||
|
SET SHARDING KEY TO '2';
|
||||||
|
INSERT INTO data (id, value) VALUES (2, 'value_1');
|
||||||
|
|
||||||
|
SET SHARDING KEY TO '3';
|
||||||
|
INSERT INTO data (id, value) VALUES (3, 'value_1');
|
||||||
|
|
||||||
|
SET SHARDING KEY TO '4';
|
||||||
|
INSERT INTO data (id, value) VALUES (4, 'value_1');
|
||||||
|
|
||||||
|
SET SHARDING KEY TO '5';
|
||||||
|
INSERT INTO data (id, value) VALUES (5, 'value_1');
|
||||||
|
|
||||||
|
SET SHARDING KEY TO '6';
|
||||||
|
INSERT INTO data (id, value) VALUES (6, 'value_1');
|
||||||
|
|
||||||
|
SET SHARDING KEY TO '7';
|
||||||
|
INSERT INTO data (id, value) VALUES (7, 'value_1');
|
||||||
|
|
||||||
|
SET SHARDING KEY TO '8';
|
||||||
|
INSERT INTO data (id, value) VALUES (8, 'value_1');
|
||||||
|
|
||||||
|
SET SHARDING KEY TO '9';
|
||||||
|
INSERT INTO data (id, value) VALUES (9, 'value_1');
|
||||||
|
|
||||||
|
SET SHARDING KEY TO '10';
|
||||||
|
INSERT INTO data (id, value) VALUES (10, 'value_1');
|
||||||
|
|
||||||
|
SET SHARDING KEY TO '11';
|
||||||
|
INSERT INTO data (id, value) VALUES (11, 'value_1');
|
||||||
|
|
||||||
|
SET SHARDING KEY TO '12';
|
||||||
|
INSERT INTO data (id, value) VALUES (12, 'value_1');
|
||||||
|
|
||||||
|
SET SHARDING KEY TO '13';
|
||||||
|
INSERT INTO data (id, value) VALUES (13, 'value_1');
|
||||||
|
|
||||||
|
SET SHARDING KEY TO '14';
|
||||||
|
INSERT INTO data (id, value) VALUES (14, 'value_1');
|
||||||
|
|
||||||
|
SET SHARDING KEY TO '15';
|
||||||
|
INSERT INTO data (id, value) VALUES (15, 'value_1');
|
||||||
|
|
||||||
|
SET SHARDING KEY TO '16';
|
||||||
|
INSERT INTO data (id, value) VALUES (16, 'value_1');
|
||||||
47
tests/sharding/query_routing_test_select.sql
Normal file
47
tests/sharding/query_routing_test_select.sql
Normal file
@@ -0,0 +1,47 @@
|
|||||||
|
SET SHARDING KEY TO '1';
|
||||||
|
SELECT * FROM data WHERE id = 1;
|
||||||
|
|
||||||
|
SET SHARDING KEY TO '2';
|
||||||
|
SELECT * FROM data WHERE id = 2;
|
||||||
|
|
||||||
|
SET SHARDING KEY TO '3';
|
||||||
|
SELECT * FROM data WHERE id = 3;
|
||||||
|
|
||||||
|
SET SHARDING KEY TO '4';
|
||||||
|
SELECT * FROM data WHERE id = 4;
|
||||||
|
|
||||||
|
SET SHARDING KEY TO '5';
|
||||||
|
SELECT * FROM data WHERE id = 5;
|
||||||
|
|
||||||
|
SET SHARDING KEY TO '6';
|
||||||
|
SELECT * FROM data WHERE id = 6;
|
||||||
|
|
||||||
|
SET SHARDING KEY TO '7';
|
||||||
|
SELECT * FROM data WHERE id = 7;
|
||||||
|
|
||||||
|
SET SHARDING KEY TO '8';
|
||||||
|
SELECT * FROM data WHERE id = 8;
|
||||||
|
|
||||||
|
SET SHARDING KEY TO '9';
|
||||||
|
SELECT * FROM data WHERE id = 9;
|
||||||
|
|
||||||
|
SET SHARDING KEY TO '10';
|
||||||
|
SELECT * FROM data WHERE id = 10;
|
||||||
|
|
||||||
|
SET SHARDING KEY TO '11';
|
||||||
|
SELECT * FROM data WHERE id = 11;
|
||||||
|
|
||||||
|
SET SHARDING KEY TO '12';
|
||||||
|
SELECT * FROM data WHERE id = 12;
|
||||||
|
|
||||||
|
SET SHARDING KEY TO '13';
|
||||||
|
SELECT * FROM data WHERE id = 13;
|
||||||
|
|
||||||
|
SET SHARDING KEY TO '14';
|
||||||
|
SELECT * FROM data WHERE id = 14;
|
||||||
|
|
||||||
|
SET SHARDING KEY TO '15';
|
||||||
|
SELECT * FROM data WHERE id = 15;
|
||||||
|
|
||||||
|
SET SHARDING KEY TO '16';
|
||||||
|
SELECT * FROM data WHERE id = 16;
|
||||||
11
tests/sharding/query_routing_test_validate.sql
Normal file
11
tests/sharding/query_routing_test_validate.sql
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
\c shard0
|
||||||
|
|
||||||
|
SELECT * FROM data;
|
||||||
|
|
||||||
|
\c shard1
|
||||||
|
|
||||||
|
SELECT * FROM data;
|
||||||
|
|
||||||
|
\c shard2
|
||||||
|
|
||||||
|
SELECT * FROM data;
|
||||||
Reference in New Issue
Block a user