Convert BDR query functions to handle BDR2/BDR3

This commit is contained in:
Ian Barwick
2018-04-13 20:59:46 +09:00
committed by Ian Barwick
parent 4f642f8332
commit 8377704596
2 changed files with 174 additions and 85 deletions

248
dbutils.c
View File

@@ -4359,23 +4359,26 @@ _is_bdr_db(PGconn *conn, PQExpBufferData *output, bool quiet)
return is_bdr_db;
}
initPQExpBuffer(&query);
appendPQExpBuffer(&query,
"SELECT bdr.bdr_is_active_in_db()");
res = PQexec(conn, query.data);
termPQExpBuffer(&query);
is_bdr_db = atobool(PQgetvalue(res, 0, 0));
if (is_bdr_db == false)
if (bdr_version_num < 3)
{
const char *warning = _("BDR extension available for this database, but the database is not configured for BDR");
initPQExpBuffer(&query);
if (output != NULL)
appendPQExpBuffer(output, "%s", warning);
else if (quiet == false)
log_warning("%s", warning);
appendPQExpBuffer(&query,
"SELECT bdr.bdr_is_active_in_db()");
res = PQexec(conn, query.data);
termPQExpBuffer(&query);
is_bdr_db = atobool(PQgetvalue(res, 0, 0));
if (is_bdr_db == false)
{
const char *warning = _("BDR extension available for this database, but the database is not configured for BDR");
if (output != NULL)
appendPQExpBuffer(output, "%s", warning);
else if (quiet == false)
log_warning("%s", warning);
}
}
PQclear(res);
@@ -4406,13 +4409,29 @@ is_active_bdr_node(PGconn *conn, const char *node_name)
bool is_active_bdr_node = false;
initPQExpBuffer(&query);
appendPQExpBuffer(&query,
" SELECT COALESCE(s.active, TRUE) AS active"
" FROM bdr.bdr_nodes n "
" LEFT JOIN pg_catalog.pg_replication_slots s "
" ON s.slot_name=bdr.bdr_format_slot_name(n.node_sysid, n.node_timeline, n.node_dboid, (SELECT oid FROM pg_catalog.pg_database WHERE datname = pg_catalog.current_database())) "
" WHERE n.node_name='%s' ",
node_name);
if (bdr_version_num < 3)
{
appendPQExpBuffer(&query,
" SELECT COALESCE(s.active, FALSE) AS active"
" FROM bdr.bdr_nodes n "
" LEFT JOIN pg_catalog.pg_replication_slots s "
" ON s.slot_name=bdr.bdr_format_slot_name(n.node_sysid, n.node_timeline, n.node_dboid, (SELECT oid FROM pg_catalog.pg_database WHERE datname = pg_catalog.current_database())) "
" WHERE n.node_name='%s' ",
node_name);
}
else
{
appendPQExpBuffer(&query,
" SELECT COALESCE(s.active, FALSE) AS active"
" FROM bdr.node bn "
" INNER JOIN pglogical.node pn "
" ON (pn.node_id = bn.pglogical_node_id) "
" LEFT JOIN pg_catalog.pg_replication_slots s "
" ON s.slot_name=bn.local_slot_name "
" WHERE pn.node_name='%s' ",
node_name);
}
log_verbose(LOG_DEBUG, "is_active_bdr_node():\n %s", query.data);
@@ -4475,12 +4494,20 @@ is_table_in_bdr_replication_set(PGconn *conn, const char *tablename, const char
initPQExpBuffer(&query);
appendPQExpBuffer(&query,
"SELECT pg_catalog.count(*) "
" FROM pg_catalog.unnest(bdr.table_get_replication_sets('repmgr.%s')) AS repset "
" WHERE repset='%s' ",
tablename,
set);
if (bdr_version_num < 3)
{
appendPQExpBuffer(&query,
"SELECT pg_catalog.count(*) "
" FROM pg_catalog.unnest(bdr.table_get_replication_sets('repmgr.%s')) AS repset "
" WHERE repset='%s' ",
tablename,
set);
}
else
{
puts("is_table_in_bdr_replication_set(): not yet implemented for 3");
exit(1);
}
res = PQexec(conn, query.data);
termPQExpBuffer(&query);
@@ -4509,10 +4536,18 @@ add_table_to_bdr_replication_set(PGconn *conn, const char *tablename, const char
initPQExpBuffer(&query);
appendPQExpBuffer(&query,
"SELECT bdr.table_set_replication_sets('repmgr.%s', '{%s}')",
tablename,
set);
if (bdr_version_num < 3)
{
appendPQExpBuffer(&query,
"SELECT bdr.table_set_replication_sets('repmgr.%s', '{%s}')",
tablename,
set);
}
else
{
puts("add_table_to_bdr_replication_set(): not yet implemented for 3");
exit(1);
}
res = PQexec(conn, query.data);
termPQExpBuffer(&query);
@@ -4544,8 +4579,16 @@ bdr_node_name_matches(PGconn *conn, const char *node_name, PQExpBufferData *bdr_
initPQExpBuffer(&query);
appendPQExpBuffer(&query,
"SELECT bdr.bdr_get_local_node_name() AS node_name");
if (bdr_version_num < 3)
{
appendPQExpBuffer(&query,
"SELECT bdr.bdr_get_local_node_name() AS node_name");
}
else
{
appendPQExpBuffer(&query,
"SELECT node_name FROM bdr.local_node_info()");
}
res = PQexec(conn, query.data);
termPQExpBuffer(&query);
@@ -4576,21 +4619,36 @@ get_bdr_node_replication_slot_status(PGconn *conn, const char *node_name)
initPQExpBuffer(&query);
appendPQExpBuffer(&query,
" SELECT s.active "
" FROM pg_catalog.pg_replication_slots s "
" WHERE slot_name = "
" (SELECT bdr.bdr_format_slot_name(node_sysid, node_timeline, node_dboid, datoid) "
" FROM bdr.bdr_nodes "
" WHERE node_name = '%s') ",
node_name);
if (bdr_version_num < 3)
{
appendPQExpBuffer(&query,
" SELECT s.active "
" FROM pg_catalog.pg_replication_slots s "
" WHERE slot_name = "
" (SELECT bdr.bdr_format_slot_name(node_sysid, node_timeline, node_dboid, datoid) "
" FROM bdr.bdr_nodes "
" WHERE node_name = '%s') ",
node_name);
}
else
{
appendPQExpBuffer(&query,
" SELECT COALESCE(s.active, FALSE) AS active"
" FROM bdr.node bn "
" INNER JOIN pglogical.node pn "
" ON (pn.node_id = bn.pglogical_node_id) "
" INNER JOIN pg_catalog.pg_replication_slots s "
" ON s.slot_name=bn.local_slot_name "
" WHERE pn.node_name='%s' ",
node_name);
}
log_verbose(LOG_DEBUG, "get_bdr_node_replication_slot_status():\n %s", query.data);
res = PQexec(conn, query.data);
termPQExpBuffer(&query);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
if (PQresultStatus(res) != PGRES_TUPLES_OK || PQntuples(res) == 0)
{
status = SLOT_UNKNOWN;
}
@@ -4670,8 +4728,7 @@ add_extension_tables_to_bdr_replication_set(PGconn *conn)
for (i = 0; i < PQntuples(res); i++)
{
add_table_to_bdr_replication_set(
conn,
add_table_to_bdr_replication_set(conn,
PQgetvalue(res, i, 0),
"repmgr");
}
@@ -4690,10 +4747,21 @@ get_all_bdr_node_records(PGconn *conn, BdrNodeInfoList *node_list)
initPQExpBuffer(&query);
appendPQExpBuffer(&query,
" SELECT " BDR_NODES_COLUMNS
" FROM bdr.bdr_nodes "
"ORDER BY node_seq_id ");
if (bdr_version_num < 3)
{
appendPQExpBuffer(&query,
" SELECT " BDR2_NODES_COLUMNS
" FROM bdr.bdr_nodes "
"ORDER BY node_seq_id ");
}
else
{
appendPQExpBuffer(&query,
" SELECT " BDR3_NODES_COLUMNS
" FROM bdr.node bn "
" INNER JOIN pglogical.node_interface pni "
" ON bn.pglogical_node_id = pni.if_nodeid ");
}
log_verbose(LOG_DEBUG, "get_all_node_records():\n%s", query.data);
@@ -4714,11 +4782,24 @@ get_bdr_node_record_by_name(PGconn *conn, const char *node_name, t_bdr_node_info
initPQExpBuffer(&query);
appendPQExpBuffer(&query,
" SELECT " BDR_NODES_COLUMNS
" FROM bdr.bdr_nodes "
" WHERE node_name = '%s'",
node_name);
if (bdr_version_num < 3)
{
appendPQExpBuffer(&query,
" SELECT " BDR2_NODES_COLUMNS
" FROM bdr.bdr_nodes "
" WHERE node_name = '%s'",
node_name);
}
else
{
appendPQExpBuffer(&query,
" SELECT " BDR3_NODES_COLUMNS
" FROM bdr.node bn "
" INNER JOIN pglogical.node_interface pni "
" ON bn.pglogical_node_id = pni.if_nodeid "
" WHERE bn.node_name = '%s'",
node_name);
}
log_verbose(LOG_DEBUG, "get_bdr_node_record_by_name():\n%s", query.data);
@@ -4788,16 +4869,11 @@ _populate_bdr_node_records(PGresult *res, BdrNodeInfoList *node_list)
static void
_populate_bdr_node_record(PGresult *res, t_bdr_node_info *node_info, int row)
{
char buf[MAXLEN] = "";
strncpy(node_info->node_sysid, PQgetvalue(res, row, 0), MAXLEN);
node_info->node_timeline = atoi(PQgetvalue(res, row, 1));
node_info->node_dboid = atoi(PQgetvalue(res, row, 2));
strncpy(buf, PQgetvalue(res, row, 3), MAXLEN);
node_info->node_status = buf[0];
strncpy(node_info->node_name, PQgetvalue(res, row, 4), MAXLEN);
strncpy(node_info->node_local_dsn, PQgetvalue(res, row, 5), MAXLEN);
strncpy(node_info->node_init_from_dsn, PQgetvalue(res, row, 6), MAXLEN);
strncpy(node_info->node_name, PQgetvalue(res, row, 3), MAXLEN);
strncpy(node_info->node_local_dsn, PQgetvalue(res, row, 4), MAXLEN);
}
@@ -4854,11 +4930,19 @@ bdr_node_has_repmgr_set(PGconn *conn, const char *node_name)
initPQExpBuffer(&query);
appendPQExpBuffer(&query,
" SELECT pg_catalog.count(*) "
" FROM pg_catalog.unnest(bdr.connection_get_replication_sets('%s') AS repset "
" WHERE repset = 'repmgr'",
node_name);
if (bdr_version_num < 3)
{
appendPQExpBuffer(&query,
" SELECT pg_catalog.count(*) "
" FROM pg_catalog.unnest(bdr.connection_get_replication_sets('%s') AS repset "
" WHERE repset = 'repmgr'",
node_name);
}
else
{
puts("bdr_node_has_repmgr_set(): not implemented yet for BDR3");
exit(1);
}
res = PQexec(conn, query.data);
termPQExpBuffer(&query);
@@ -4887,18 +4971,26 @@ bdr_node_set_repmgr_set(PGconn *conn, const char *node_name)
initPQExpBuffer(&query);
appendPQExpBuffer(&query,
" SELECT bdr.connection_set_replication_sets( "
" ARRAY( "
" SELECT repset::TEXT "
" FROM pg_catalog.unnest(bdr.connection_get_replication_sets('%s')) AS repset "
" UNION "
" SELECT 'repmgr'::TEXT "
" ), "
" '%s' "
" ) ",
node_name,
node_name);
if (bdr_version_num < 3)
{
appendPQExpBuffer(&query,
" SELECT bdr.connection_set_replication_sets( "
" ARRAY( "
" SELECT repset::TEXT "
" FROM pg_catalog.unnest(bdr.connection_get_replication_sets('%s')) AS repset "
" UNION "
" SELECT 'repmgr'::TEXT "
" ), "
" '%s' "
" ) ",
node_name,
node_name);
}
else
{
puts("bdr_node_set_repmgr_set(): not implemented yet for BDR3");
exit(1);
}
res = PQexec(conn, query.data);
termPQExpBuffer(&query);

View File

@@ -29,7 +29,9 @@
#include "voting.h"
#define REPMGR_NODES_COLUMNS "n.node_id, n.type, n.upstream_node_id, n.node_name, n.conninfo, n.repluser, n.slot_name, n.location, n.priority, n.active, n.config_file, '' AS upstream_node_name "
#define BDR_NODES_COLUMNS "node_sysid, node_timeline, node_dboid, node_status, node_name, node_local_dsn, node_init_from_dsn, node_read_only, node_seq_id"
#define BDR2_NODES_COLUMNS "node_sysid, node_timeline, node_dboid, node_name, node_local_dsn"
#define BDR3_NODES_COLUMNS "'', 0, 0, node_name, node_init_from_dsn"
#define ERRBUFF_SIZE 512
@@ -237,18 +239,13 @@ typedef struct s_bdr_node_info
char node_sysid[MAXLEN];
uint32 node_timeline;
uint32 node_dboid;
char node_status;
char node_name[MAXLEN];
char node_local_dsn[MAXLEN];
char node_init_from_dsn[MAXLEN];
bool read_only;
uint32 node_seq_id;
} t_bdr_node_info;
#define T_BDR_NODE_INFO_INITIALIZER { \
"", InvalidOid, InvalidOid, \
'?', "", "", "", \
false, -1 \
"", "", \
}