From bf0d67c60a11a27eea9677fe3e5eec387da14d64 Mon Sep 17 00:00:00 2001 From: Ian Barwick Date: Mon, 23 Apr 2018 17:15:32 +0900 Subject: [PATCH] Add repmgr.nodes to the BDR replication set --- dbutils.c | 233 ++++++++++++++++++++++++++++++-------------- dbutils.h | 2 + repmgr-action-bdr.c | 35 ++++++- repmgr.h | 2 + 4 files changed, 199 insertions(+), 73 deletions(-) diff --git a/dbutils.c b/dbutils.c index d2419219..ca0ca979 100644 --- a/dbutils.c +++ b/dbutils.c @@ -2248,6 +2248,7 @@ _create_update_node_record(PGconn *conn, char *action, t_node_info *node_info) const char *param_values[param_count]; PGresult *res; + bool success = true; maxlen_snprintf(node_id, "%i", node_info->node_id); maxlen_snprintf(priority, "%i", node_info->priority); @@ -2334,13 +2335,13 @@ _create_update_node_record(PGconn *conn, char *action, t_node_info *node_info) node_info->node_name, node_info->node_id); log_detail("%s", PQerrorMessage(conn)); - PQclear(res); - return false; + + success = false; } PQclear(res); - return true; + return success; } @@ -2349,6 +2350,7 @@ update_node_record_set_active(PGconn *conn, int this_node_id, bool active) { PQExpBufferData query; PGresult *res = NULL; + bool success = true; initPQExpBuffer(&query); @@ -2367,13 +2369,13 @@ update_node_record_set_active(PGconn *conn, int this_node_id, bool active) { log_error(_("unable to update node record:\n %s"), PQerrorMessage(conn)); - PQclear(res); - return false; + + success = false; } PQclear(res); - return true; + return success; } @@ -2382,6 +2384,7 @@ update_node_record_set_active_standby(PGconn *conn, int this_node_id) { PQExpBufferData query; PGresult *res = NULL; + bool success = true; initPQExpBuffer(&query); @@ -2401,13 +2404,13 @@ update_node_record_set_active_standby(PGconn *conn, int this_node_id) { log_error(_("unable to update node record:\n %s"), PQerrorMessage(conn)); - PQclear(res); - return false; + + success = false; } PQclear(res); - return true; + return success; } @@ -2476,11 +2479,13 @@ update_node_record_set_primary(PGconn *conn, int this_node_id) return commit_transaction(conn); } + bool update_node_record_set_upstream(PGconn *conn, int this_node_id, int new_upstream_node_id) { PQExpBufferData query; PGresult *res = NULL; + bool success = true; log_debug(_("update_node_record_set_upstream(): Updating node %i's upstream node to %i"), this_node_id, new_upstream_node_id); @@ -2502,14 +2507,13 @@ update_node_record_set_upstream(PGconn *conn, int this_node_id, int new_upstream { log_error(_("unable to set new upstream node id:\n %s"), PQerrorMessage(conn)); - PQclear(res); - return false; + success = false; } PQclear(res); - return true; + return success; } @@ -2522,6 +2526,7 @@ update_node_record_status(PGconn *conn, int this_node_id, char *type, int upstre { PQExpBufferData query; PGresult *res = NULL; + bool success = true; initPQExpBuffer(&query); @@ -2545,14 +2550,13 @@ update_node_record_status(PGconn *conn, int this_node_id, char *type, int upstre { log_error(_("unable to update node record:\n %s"), PQerrorMessage(conn)); - PQclear(res); - return false; + success = false; } PQclear(res); - return true; + return success; } @@ -2565,6 +2569,7 @@ update_node_record_conn_priority(PGconn *conn, t_configuration_options *options) { PQExpBufferData query; PGresult *res = NULL; + bool success = true; initPQExpBuffer(&query); @@ -2582,13 +2587,12 @@ update_node_record_conn_priority(PGconn *conn, t_configuration_options *options) if (PQresultStatus(res) != PGRES_COMMAND_OK) { - - PQclear(res); - return false; + success = false; } PQclear(res); - return true; + + return success; } @@ -2652,6 +2656,7 @@ delete_node_record(PGconn *conn, int node) { PQExpBufferData query; PGresult *res = NULL; + bool success = true; initPQExpBuffer(&query); @@ -2669,19 +2674,20 @@ delete_node_record(PGconn *conn, int node) { log_error(_("unable to delete node record:\n %s"), PQerrorMessage(conn)); - PQclear(res); - return false; + + success = false; } PQclear(res); - return true; + return success; } bool truncate_node_records(PGconn *conn) { PGresult *res = NULL; + bool success = true; res = PQexec(conn, "TRUNCATE TABLE repmgr.nodes"); @@ -2689,12 +2695,13 @@ truncate_node_records(PGconn *conn) { log_error(_("unable to truncate node record table:\n %s"), PQerrorMessage(conn)); - PQclear(res); - return false; + + success = false; } PQclear(res); - return true; + + return success; } @@ -4320,6 +4327,7 @@ get_last_wal_receive_location(PGconn *conn) /* BDR functions */ /* ============= */ + static bool _is_bdr_db(PGconn *conn, PQExpBufferData *output, bool quiet) { @@ -4350,6 +4358,8 @@ _is_bdr_db(PGconn *conn, PQExpBufferData *output, bool quiet) PQclear(res); + log_verbose(LOG_DEBUG, "BDR ext version number is %i", bdr_version_num); + if (is_bdr_db == false) { const char *warning = _("BDR extension is not available for this database"); @@ -4405,6 +4415,12 @@ is_bdr_db_quiet(PGconn *conn) } +int +get_bdr_version_num(void) +{ + return bdr_version_num; +} + bool is_active_bdr_node(PGconn *conn, const char *node_name) { @@ -4489,6 +4505,64 @@ is_bdr_repmgr(PGconn *conn) } + +/* + * Get name of default BDR replication set. + * + * Caller must free provided value. + */ +char * +get_default_bdr_replication_set(PGconn *conn) +{ + PQExpBufferData query; + PGresult *res = NULL; + char *default_replication_set = NULL; + int namelen; + + if (bdr_version_num < 3) + { + /* For BDR2, we use a custom replication set */ + namelen = strlen(BDR2_REPLICATION_SET_NAME); + default_replication_set = pg_malloc0(namelen + 1); + strncpy(default_replication_set, BDR2_REPLICATION_SET_NAME, namelen); + + return default_replication_set; + } + + + initPQExpBuffer(&query); + + appendPQExpBuffer(&query, + " SELECT rs.set_name " + " FROM pglogical.replication_set rs " + " INNER JOIN bdr.node_group ng " + " ON ng.node_group_default_repset = rs.set_id "); + + res = PQexec(conn, query.data); + termPQExpBuffer(&query); + + if (PQresultStatus(res) != PGRES_TUPLES_OK || PQntuples(res) == 0) + { + log_warning(_("unable to retrieve default BDR replication set name")); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + log_detail("%s", PQerrorMessage(conn)); + + PQclear(res); + return NULL; + } + + namelen = strlen(PQgetvalue(res, 0, 0)); + default_replication_set = pg_malloc0(namelen + 1); + + strncpy(default_replication_set, PQgetvalue(res, 0, 0), namelen); + + PQclear(res); + + return default_replication_set; +} + + bool is_table_in_bdr_replication_set(PGconn *conn, const char *tablename, const char *set) { @@ -4509,8 +4583,16 @@ is_table_in_bdr_replication_set(PGconn *conn, const char *tablename, const char } else { - puts("is_table_in_bdr_replication_set(): not yet implemented for 3"); - exit(1); + appendPQExpBuffer(&query, + " SELECT pg_catalog.count(*) " + " FROM pglogical.replication_set s " + " INNER JOIN pglogical.replication_set_table st " + " ON s.set_id = st.set_id " + " WHERE s.set_name = '%s' " + " AND st.set_reloid = 'repmgr.%s'::REGCLASS ", + set, + tablename); + } res = PQexec(conn, query.data); @@ -4537,6 +4619,7 @@ add_table_to_bdr_replication_set(PGconn *conn, const char *tablename, const char { PQExpBufferData query; PGresult *res = NULL; + bool success = true; initPQExpBuffer(&query); @@ -4549,8 +4632,13 @@ add_table_to_bdr_replication_set(PGconn *conn, const char *tablename, const char } else { - puts("add_table_to_bdr_replication_set(): not yet implemented for 3"); - exit(1); + appendPQExpBuffer(&query, + " SELECT bdr.replication_set_add_table( " + " relation := 'repmgr.%s', " + " set_name := '%s' " + " ) ", + tablename, + set); } res = PQexec(conn, query.data); @@ -4558,19 +4646,17 @@ add_table_to_bdr_replication_set(PGconn *conn, const char *tablename, const char if (PQresultStatus(res) != PGRES_TUPLES_OK) { - log_error(_("unable to add table \"repmgr.%s\" to replication set \"%s\":\n %s"), + log_error(_("unable to add table \"repmgr.%s\" to replication set \"%s\""), tablename, - set, - PQerrorMessage(conn)); + set); + log_detail("%s", PQerrorMessage(conn)); - if (res != NULL) - PQclear(res); - - return false; + success = false; } + PQclear(res); - return true; + return success; } @@ -4703,6 +4789,9 @@ get_bdr_other_node_name(PGconn *conn, int node_id, char *node_name) } +/* + * For BDR 2.x only + */ void add_extension_tables_to_bdr_replication_set(PGconn *conn) { @@ -4724,7 +4813,7 @@ add_extension_tables_to_bdr_replication_set(PGconn *conn) if (PQresultStatus(res) != PGRES_TUPLES_OK) { - /* */ + /* XXX log error */ } else { @@ -4933,21 +5022,17 @@ bdr_node_has_repmgr_set(PGconn *conn, const char *node_name) PGresult *res = NULL; bool has_repmgr_set = false; + if (bdr_version_num >= 3) + return true; + initPQExpBuffer(&query); - if (bdr_version_num < 3) - { - appendPQExpBuffer(&query, - " SELECT pg_catalog.count(*) " - " FROM pg_catalog.unnest(bdr.connection_get_replication_sets('%s') AS repset " - " WHERE repset = 'repmgr'", - node_name); - } - else - { - puts("bdr_node_has_repmgr_set(): not implemented yet for BDR3"); - exit(1); - } + appendPQExpBuffer(&query, + " SELECT pg_catalog.count(*) " + " FROM pg_catalog.unnest(bdr.connection_get_replication_sets('%s') AS repset " + " WHERE repset = '%s'", + node_name, + BDR2_REPLICATION_SET_NAME); res = PQexec(conn, query.data); termPQExpBuffer(&query); @@ -4974,34 +5059,40 @@ bdr_node_set_repmgr_set(PGconn *conn, const char *node_name) PGresult *res = NULL; bool success = true; + if (bdr_version_num >= 3) + return true; + initPQExpBuffer(&query); - if (bdr_version_num < 3) - { - appendPQExpBuffer(&query, - " SELECT bdr.connection_set_replication_sets( " - " ARRAY( " - " SELECT repset::TEXT " - " FROM pg_catalog.unnest(bdr.connection_get_replication_sets('%s')) AS repset " - " UNION " - " SELECT 'repmgr'::TEXT " - " ), " - " '%s' " - " ) ", - node_name, - node_name); - } - else - { - puts("bdr_node_set_repmgr_set(): not implemented yet for BDR3"); - exit(1); - } + /* + * Here we extract a list of existing replication sets, add 'repmgr', and + * set the replication sets to the new list. + */ + appendPQExpBuffer(&query, + " SELECT bdr.connection_set_replication_sets( " + " ARRAY( " + " SELECT repset::TEXT " + " FROM pg_catalog.unnest(bdr.connection_get_replication_sets('%s')) AS repset " + " UNION " + " SELECT '%s'::TEXT " + " ), " + " '%s' " + " ) ", + node_name, + BDR2_REPLICATION_SET_NAME, + node_name); + + log_debug("bdr_node_set_repmgr_set():\n%s", query.data); res = PQexec(conn, query.data); termPQExpBuffer(&query); + if (PQresultStatus(res) != PGRES_TUPLES_OK) { + log_debug("result status: %s", PQresStatus(PQresultStatus(res))); + log_error(_("unable to create replication set \"repmgr\"")); + log_detail("%s", PQerrorMessage(conn)); success = false; } diff --git a/dbutils.h b/dbutils.h index 6ee14ee6..e1a024f6 100644 --- a/dbutils.h +++ b/dbutils.h @@ -503,12 +503,14 @@ void get_node_replication_stats(PGconn *conn, int server_version_num, t_node_in bool is_downstream_node_attached(PGconn *conn, char *node_name); /* BDR functions */ +int get_bdr_version_num(void); void get_all_bdr_node_records(PGconn *conn, BdrNodeInfoList *node_list); RecordStatus get_bdr_node_record_by_name(PGconn *conn, const char *node_name, t_bdr_node_info *node_info); bool is_bdr_db(PGconn *conn, PQExpBufferData *output); bool is_bdr_db_quiet(PGconn *conn); bool is_active_bdr_node(PGconn *conn, const char *node_name); bool is_bdr_repmgr(PGconn *conn); +char *get_default_bdr_replication_set(PGconn *conn); bool is_table_in_bdr_replication_set(PGconn *conn, const char *tablename, const char *set); bool add_table_to_bdr_replication_set(PGconn *conn, const char *tablename, const char *set); void add_extension_tables_to_bdr_replication_set(PGconn *conn); diff --git a/repmgr-action-bdr.c b/repmgr-action-bdr.c index 3c03d345..22e75765 100644 --- a/repmgr-action-bdr.c +++ b/repmgr-action-bdr.c @@ -83,7 +83,8 @@ do_bdr_register(void) exit(ERR_BAD_CONFIG); } - if (bdr_nodes.node_count > 2) + /* BDR 2 implementation is for 2 nodes only */ + if (get_bdr_version_num() < 3 && bdr_nodes.node_count > 2) { log_error(_("repmgr can only support BDR clusters with 2 nodes")); log_detail(_("this BDR cluster has %i nodes"), bdr_nodes.node_count); @@ -176,6 +177,7 @@ do_bdr_register(void) if (bdr_node_has_repmgr_set(conn, config_file_options.node_name) == false) { + log_debug("bdr_node_has_repmgr_set() = false"); bdr_node_set_repmgr_set(conn, config_file_options.node_name); } @@ -201,6 +203,7 @@ do_bdr_register(void) if (bdr_nodes.node_count == 0) { log_error(_("unable to retrieve any BDR node records")); + log_detail("%s", PQerrorMessage(conn)); PQfinish(conn); exit(ERR_BAD_CONFIG); } @@ -252,7 +255,35 @@ do_bdr_register(void) } /* Add the repmgr extension tables to a replication set */ - add_extension_tables_to_bdr_replication_set(conn); + + if (get_bdr_version_num() < 3) + { + add_extension_tables_to_bdr_replication_set(conn); + } + else + { + /* this is the only table we need to replicate */ + char *replication_set = get_default_bdr_replication_set(conn); + + /* + * this probably won't happen, but we need to be sure we're using + * the replication set metadata correctly... + */ + if (conn == NULL) + { + log_error(_("unable to retrieve default BDR replication set")); + log_hint(_("see preceding messages")); + log_debug("check query in get_default_bdr_replication_set()"); + exit(ERR_BAD_CONFIG); + } + + if (is_table_in_bdr_replication_set(conn, "nodes", replication_set) == false) + { + add_table_to_bdr_replication_set(conn, "nodes", replication_set); + } + + pfree(replication_set); + } initPQExpBuffer(&event_details); diff --git a/repmgr.h b/repmgr.h index ac5bacfb..8bdac74a 100644 --- a/repmgr.h +++ b/repmgr.h @@ -60,6 +60,8 @@ #define VOTING_TERM_NOT_SET -1 +#define BDR2_REPLICATION_SET_NAME "repmgr" + /* * various default values - ensure repmgr.conf.sample is update * if any of these are changed