Add repmgr.nodes to the BDR replication set

This commit is contained in:
Ian Barwick
2018-04-23 17:15:32 +09:00
parent e1d807188d
commit bf0d67c60a
4 changed files with 199 additions and 73 deletions

205
dbutils.c
View File

@@ -2248,6 +2248,7 @@ _create_update_node_record(PGconn *conn, char *action, t_node_info *node_info)
const char *param_values[param_count];
PGresult *res;
bool success = true;
maxlen_snprintf(node_id, "%i", node_info->node_id);
maxlen_snprintf(priority, "%i", node_info->priority);
@@ -2334,13 +2335,13 @@ _create_update_node_record(PGconn *conn, char *action, t_node_info *node_info)
node_info->node_name,
node_info->node_id);
log_detail("%s", PQerrorMessage(conn));
PQclear(res);
return false;
success = false;
}
PQclear(res);
return true;
return success;
}
@@ -2349,6 +2350,7 @@ update_node_record_set_active(PGconn *conn, int this_node_id, bool active)
{
PQExpBufferData query;
PGresult *res = NULL;
bool success = true;
initPQExpBuffer(&query);
@@ -2367,13 +2369,13 @@ update_node_record_set_active(PGconn *conn, int this_node_id, bool active)
{
log_error(_("unable to update node record:\n %s"),
PQerrorMessage(conn));
PQclear(res);
return false;
success = false;
}
PQclear(res);
return true;
return success;
}
@@ -2382,6 +2384,7 @@ update_node_record_set_active_standby(PGconn *conn, int this_node_id)
{
PQExpBufferData query;
PGresult *res = NULL;
bool success = true;
initPQExpBuffer(&query);
@@ -2401,13 +2404,13 @@ update_node_record_set_active_standby(PGconn *conn, int this_node_id)
{
log_error(_("unable to update node record:\n %s"),
PQerrorMessage(conn));
PQclear(res);
return false;
success = false;
}
PQclear(res);
return true;
return success;
}
@@ -2476,11 +2479,13 @@ update_node_record_set_primary(PGconn *conn, int this_node_id)
return commit_transaction(conn);
}
bool
update_node_record_set_upstream(PGconn *conn, int this_node_id, int new_upstream_node_id)
{
PQExpBufferData query;
PGresult *res = NULL;
bool success = true;
log_debug(_("update_node_record_set_upstream(): Updating node %i's upstream node to %i"),
this_node_id, new_upstream_node_id);
@@ -2502,14 +2507,13 @@ update_node_record_set_upstream(PGconn *conn, int this_node_id, int new_upstream
{
log_error(_("unable to set new upstream node id:\n %s"),
PQerrorMessage(conn));
PQclear(res);
return false;
success = false;
}
PQclear(res);
return true;
return success;
}
@@ -2522,6 +2526,7 @@ update_node_record_status(PGconn *conn, int this_node_id, char *type, int upstre
{
PQExpBufferData query;
PGresult *res = NULL;
bool success = true;
initPQExpBuffer(&query);
@@ -2545,14 +2550,13 @@ update_node_record_status(PGconn *conn, int this_node_id, char *type, int upstre
{
log_error(_("unable to update node record:\n %s"),
PQerrorMessage(conn));
PQclear(res);
return false;
success = false;
}
PQclear(res);
return true;
return success;
}
@@ -2565,6 +2569,7 @@ update_node_record_conn_priority(PGconn *conn, t_configuration_options *options)
{
PQExpBufferData query;
PGresult *res = NULL;
bool success = true;
initPQExpBuffer(&query);
@@ -2582,13 +2587,12 @@ update_node_record_conn_priority(PGconn *conn, t_configuration_options *options)
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
PQclear(res);
return false;
success = false;
}
PQclear(res);
return true;
return success;
}
@@ -2652,6 +2656,7 @@ delete_node_record(PGconn *conn, int node)
{
PQExpBufferData query;
PGresult *res = NULL;
bool success = true;
initPQExpBuffer(&query);
@@ -2669,19 +2674,20 @@ delete_node_record(PGconn *conn, int node)
{
log_error(_("unable to delete node record:\n %s"),
PQerrorMessage(conn));
PQclear(res);
return false;
success = false;
}
PQclear(res);
return true;
return success;
}
bool
truncate_node_records(PGconn *conn)
{
PGresult *res = NULL;
bool success = true;
res = PQexec(conn, "TRUNCATE TABLE repmgr.nodes");
@@ -2689,12 +2695,13 @@ truncate_node_records(PGconn *conn)
{
log_error(_("unable to truncate node record table:\n %s"),
PQerrorMessage(conn));
PQclear(res);
return false;
success = false;
}
PQclear(res);
return true;
return success;
}
@@ -4320,6 +4327,7 @@ get_last_wal_receive_location(PGconn *conn)
/* BDR functions */
/* ============= */
static bool
_is_bdr_db(PGconn *conn, PQExpBufferData *output, bool quiet)
{
@@ -4350,6 +4358,8 @@ _is_bdr_db(PGconn *conn, PQExpBufferData *output, bool quiet)
PQclear(res);
log_verbose(LOG_DEBUG, "BDR ext version number is %i", bdr_version_num);
if (is_bdr_db == false)
{
const char *warning = _("BDR extension is not available for this database");
@@ -4405,6 +4415,12 @@ is_bdr_db_quiet(PGconn *conn)
}
int
get_bdr_version_num(void)
{
return bdr_version_num;
}
bool
is_active_bdr_node(PGconn *conn, const char *node_name)
{
@@ -4489,6 +4505,64 @@ is_bdr_repmgr(PGconn *conn)
}
/*
* Get name of default BDR replication set.
*
* Caller must free provided value.
*/
char *
get_default_bdr_replication_set(PGconn *conn)
{
PQExpBufferData query;
PGresult *res = NULL;
char *default_replication_set = NULL;
int namelen;
if (bdr_version_num < 3)
{
/* For BDR2, we use a custom replication set */
namelen = strlen(BDR2_REPLICATION_SET_NAME);
default_replication_set = pg_malloc0(namelen + 1);
strncpy(default_replication_set, BDR2_REPLICATION_SET_NAME, namelen);
return default_replication_set;
}
initPQExpBuffer(&query);
appendPQExpBuffer(&query,
" SELECT rs.set_name "
" FROM pglogical.replication_set rs "
" INNER JOIN bdr.node_group ng "
" ON ng.node_group_default_repset = rs.set_id ");
res = PQexec(conn, query.data);
termPQExpBuffer(&query);
if (PQresultStatus(res) != PGRES_TUPLES_OK || PQntuples(res) == 0)
{
log_warning(_("unable to retrieve default BDR replication set name"));
if (PQresultStatus(res) != PGRES_TUPLES_OK)
log_detail("%s", PQerrorMessage(conn));
PQclear(res);
return NULL;
}
namelen = strlen(PQgetvalue(res, 0, 0));
default_replication_set = pg_malloc0(namelen + 1);
strncpy(default_replication_set, PQgetvalue(res, 0, 0), namelen);
PQclear(res);
return default_replication_set;
}
bool
is_table_in_bdr_replication_set(PGconn *conn, const char *tablename, const char *set)
{
@@ -4509,8 +4583,16 @@ is_table_in_bdr_replication_set(PGconn *conn, const char *tablename, const char
}
else
{
puts("is_table_in_bdr_replication_set(): not yet implemented for 3");
exit(1);
appendPQExpBuffer(&query,
" SELECT pg_catalog.count(*) "
" FROM pglogical.replication_set s "
" INNER JOIN pglogical.replication_set_table st "
" ON s.set_id = st.set_id "
" WHERE s.set_name = '%s' "
" AND st.set_reloid = 'repmgr.%s'::REGCLASS ",
set,
tablename);
}
res = PQexec(conn, query.data);
@@ -4537,6 +4619,7 @@ add_table_to_bdr_replication_set(PGconn *conn, const char *tablename, const char
{
PQExpBufferData query;
PGresult *res = NULL;
bool success = true;
initPQExpBuffer(&query);
@@ -4549,8 +4632,13 @@ add_table_to_bdr_replication_set(PGconn *conn, const char *tablename, const char
}
else
{
puts("add_table_to_bdr_replication_set(): not yet implemented for 3");
exit(1);
appendPQExpBuffer(&query,
" SELECT bdr.replication_set_add_table( "
" relation := 'repmgr.%s', "
" set_name := '%s' "
" ) ",
tablename,
set);
}
res = PQexec(conn, query.data);
@@ -4558,19 +4646,17 @@ add_table_to_bdr_replication_set(PGconn *conn, const char *tablename, const char
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
log_error(_("unable to add table \"repmgr.%s\" to replication set \"%s\":\n %s"),
log_error(_("unable to add table \"repmgr.%s\" to replication set \"%s\""),
tablename,
set,
PQerrorMessage(conn));
set);
log_detail("%s", PQerrorMessage(conn));
if (res != NULL)
PQclear(res);
return false;
success = false;
}
PQclear(res);
return true;
return success;
}
@@ -4703,6 +4789,9 @@ get_bdr_other_node_name(PGconn *conn, int node_id, char *node_name)
}
/*
* For BDR 2.x only
*/
void
add_extension_tables_to_bdr_replication_set(PGconn *conn)
{
@@ -4724,7 +4813,7 @@ add_extension_tables_to_bdr_replication_set(PGconn *conn)
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
/* */
/* XXX log error */
}
else
{
@@ -4933,21 +5022,17 @@ bdr_node_has_repmgr_set(PGconn *conn, const char *node_name)
PGresult *res = NULL;
bool has_repmgr_set = false;
if (bdr_version_num >= 3)
return true;
initPQExpBuffer(&query);
if (bdr_version_num < 3)
{
appendPQExpBuffer(&query,
" SELECT pg_catalog.count(*) "
" FROM pg_catalog.unnest(bdr.connection_get_replication_sets('%s') AS repset "
" WHERE repset = 'repmgr'",
node_name);
}
else
{
puts("bdr_node_has_repmgr_set(): not implemented yet for BDR3");
exit(1);
}
" WHERE repset = '%s'",
node_name,
BDR2_REPLICATION_SET_NAME);
res = PQexec(conn, query.data);
termPQExpBuffer(&query);
@@ -4974,34 +5059,40 @@ bdr_node_set_repmgr_set(PGconn *conn, const char *node_name)
PGresult *res = NULL;
bool success = true;
if (bdr_version_num >= 3)
return true;
initPQExpBuffer(&query);
if (bdr_version_num < 3)
{
/*
* Here we extract a list of existing replication sets, add 'repmgr', and
* set the replication sets to the new list.
*/
appendPQExpBuffer(&query,
" SELECT bdr.connection_set_replication_sets( "
" ARRAY( "
" SELECT repset::TEXT "
" FROM pg_catalog.unnest(bdr.connection_get_replication_sets('%s')) AS repset "
" UNION "
" SELECT 'repmgr'::TEXT "
" SELECT '%s'::TEXT "
" ), "
" '%s' "
" ) ",
node_name,
BDR2_REPLICATION_SET_NAME,
node_name);
}
else
{
puts("bdr_node_set_repmgr_set(): not implemented yet for BDR3");
exit(1);
}
log_debug("bdr_node_set_repmgr_set():\n%s", query.data);
res = PQexec(conn, query.data);
termPQExpBuffer(&query);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
log_debug("result status: %s", PQresStatus(PQresultStatus(res)));
log_error(_("unable to create replication set \"repmgr\""));
log_detail("%s", PQerrorMessage(conn));
success = false;
}

View File

@@ -503,12 +503,14 @@ void get_node_replication_stats(PGconn *conn, int server_version_num, t_node_in
bool is_downstream_node_attached(PGconn *conn, char *node_name);
/* BDR functions */
int get_bdr_version_num(void);
void get_all_bdr_node_records(PGconn *conn, BdrNodeInfoList *node_list);
RecordStatus get_bdr_node_record_by_name(PGconn *conn, const char *node_name, t_bdr_node_info *node_info);
bool is_bdr_db(PGconn *conn, PQExpBufferData *output);
bool is_bdr_db_quiet(PGconn *conn);
bool is_active_bdr_node(PGconn *conn, const char *node_name);
bool is_bdr_repmgr(PGconn *conn);
char *get_default_bdr_replication_set(PGconn *conn);
bool is_table_in_bdr_replication_set(PGconn *conn, const char *tablename, const char *set);
bool add_table_to_bdr_replication_set(PGconn *conn, const char *tablename, const char *set);
void add_extension_tables_to_bdr_replication_set(PGconn *conn);

View File

@@ -83,7 +83,8 @@ do_bdr_register(void)
exit(ERR_BAD_CONFIG);
}
if (bdr_nodes.node_count > 2)
/* BDR 2 implementation is for 2 nodes only */
if (get_bdr_version_num() < 3 && bdr_nodes.node_count > 2)
{
log_error(_("repmgr can only support BDR clusters with 2 nodes"));
log_detail(_("this BDR cluster has %i nodes"), bdr_nodes.node_count);
@@ -176,6 +177,7 @@ do_bdr_register(void)
if (bdr_node_has_repmgr_set(conn, config_file_options.node_name) == false)
{
log_debug("bdr_node_has_repmgr_set() = false");
bdr_node_set_repmgr_set(conn, config_file_options.node_name);
}
@@ -201,6 +203,7 @@ do_bdr_register(void)
if (bdr_nodes.node_count == 0)
{
log_error(_("unable to retrieve any BDR node records"));
log_detail("%s", PQerrorMessage(conn));
PQfinish(conn);
exit(ERR_BAD_CONFIG);
}
@@ -252,7 +255,35 @@ do_bdr_register(void)
}
/* Add the repmgr extension tables to a replication set */
if (get_bdr_version_num() < 3)
{
add_extension_tables_to_bdr_replication_set(conn);
}
else
{
/* this is the only table we need to replicate */
char *replication_set = get_default_bdr_replication_set(conn);
/*
* this probably won't happen, but we need to be sure we're using
* the replication set metadata correctly...
*/
if (conn == NULL)
{
log_error(_("unable to retrieve default BDR replication set"));
log_hint(_("see preceding messages"));
log_debug("check query in get_default_bdr_replication_set()");
exit(ERR_BAD_CONFIG);
}
if (is_table_in_bdr_replication_set(conn, "nodes", replication_set) == false)
{
add_table_to_bdr_replication_set(conn, "nodes", replication_set);
}
pfree(replication_set);
}
initPQExpBuffer(&event_details);

View File

@@ -60,6 +60,8 @@
#define VOTING_TERM_NOT_SET -1
#define BDR2_REPLICATION_SET_NAME "repmgr"
/*
* various default values - ensure repmgr.conf.sample is update
* if any of these are changed