From a16bbbf107f8ce66b3b3ee779fdaaac733bace80 Mon Sep 17 00:00:00 2001 From: Ian Barwick Date: Fri, 21 Apr 2017 19:47:31 +0900 Subject: [PATCH] master register: check for existing master presence/active record --- dbutils.c | 238 +++++++++++++++++++++++++++++++++++++++++++++++- dbutils.h | 21 +++-- repmgr-client.c | 42 ++++++++- strutil.h | 4 + 4 files changed, 295 insertions(+), 10 deletions(-) diff --git a/dbutils.c b/dbutils.c index b5750cfe..60b0f1ef 100644 --- a/dbutils.c +++ b/dbutils.c @@ -360,6 +360,83 @@ conn_to_param_list(PGconn *conn, t_conninfo_param_list *param_list) } } + +/* ===================== */ +/* transaction functions */ +/* ===================== */ + +bool +begin_transaction(PGconn *conn) +{ + PGresult *res; + + log_verbose(LOG_DEBUG, "begin_transaction()"); + + res = PQexec(conn, "BEGIN"); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + log_error(_("Unable to begin transaction:\n %s"), + PQerrorMessage(conn)); + + PQclear(res); + return false; + } + + PQclear(res); + + return true; +} + + +bool +commit_transaction(PGconn *conn) +{ + PGresult *res; + + log_verbose(LOG_DEBUG, "commit_transaction()"); + + res = PQexec(conn, "COMMIT"); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + log_error(_("Unable to commit transaction:\n %s"), + PQerrorMessage(conn)); + PQclear(res); + + return false; + } + + PQclear(res); + + return true; +} + + +bool +rollback_transaction(PGconn *conn) +{ + PGresult *res; + + log_verbose(LOG_DEBUG, "rollback_transaction()"); + + res = PQexec(conn, "ROLLBACK"); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + log_error(_("Unable to rollback transaction:\n %s"), + PQerrorMessage(conn)); + PQclear(res); + + return false; + } + + PQclear(res); + + return true; +} + + /* ========================== */ /* GUC manipulation functions */ /* ========================== */ @@ -455,7 +532,7 @@ is_standby(PGconn *conn) if (res == NULL || PQresultStatus(res) != PGRES_TUPLES_OK) { - log_error(_("unable to determine if server is in recovery: %s"), + log_error(_("unable to determine if server is in recovery:\n %s"), PQerrorMessage(conn)); result = -1; } @@ -467,3 +544,162 @@ is_standby(PGconn *conn) PQclear(res); return result; } + +/* + * Read the node list from the provided connection and attempt to connect to each node + * in turn to definitely establish if it's the cluster primary. + * + * The node list is returned in the order which makes it likely that the + * current primary will be returned first, reducing the number of speculative + * connections which need to be made to other nodes. + * + * If master_conninfo_out points to allocated memory of MAXCONNINFO in length, + * the primary server's conninfo string will be copied there. + */ + +PGconn * +get_master_connection(PGconn *conn, + int *master_id, char *master_conninfo_out) +{ + PQExpBufferData query; + + PGconn *remote_conn = NULL; + PGresult *res; + + char remote_conninfo_stack[MAXCONNINFO]; + char *remote_conninfo = &*remote_conninfo_stack; + + int i, + node_id; + + /* + * If the caller wanted to get a copy of the connection info string, sub + * out the local stack pointer for the pointer passed by the caller. + */ + if (master_conninfo_out != NULL) + remote_conninfo = master_conninfo_out; + + if (master_id != NULL) + { + *master_id = NODE_NOT_FOUND; + } + + /* find all registered nodes */ + log_info(_("retrieving node list")); + initPQExpBuffer(&query); + appendPQExpBuffer(&query, + " SELECT node_id, conninfo, " + " CASE WHEN type = 'master' THEN 1 ELSE 2 END AS type_priority" + " FROM repmgr.nodes " + " WHERE type != 'witness' " + "ORDER BY active DESC, type_priority, priority, node_id"); + + log_verbose(LOG_DEBUG, "get_master_connection():\n%s", query.data); + + res = PQexec(conn, query.data); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + log_error(_("unable to retrieve node records:\n %s"), + PQerrorMessage(conn)); + PQclear(res); + return NULL; + } + + termPQExpBuffer(&query); + + for (i = 0; i < PQntuples(res); i++) + { + int is_node_standby; + + /* initialize with the values of the current node being processed */ + node_id = atoi(PQgetvalue(res, i, 0)); + strncpy(remote_conninfo, PQgetvalue(res, i, 1), MAXCONNINFO); + log_verbose(LOG_INFO, + _("checking role of cluster node '%i'"), + node_id); + remote_conn = establish_db_connection(remote_conninfo, false); + + if (PQstatus(remote_conn) != CONNECTION_OK) + continue; + + is_node_standby = is_standby(remote_conn); + + if (is_node_standby == -1) + { + log_error(_("unable to retrieve recovery state from node %i:\n %s"), + node_id, + PQerrorMessage(remote_conn)); + PQfinish(remote_conn); + continue; + } + + /* if is_standby() returns 0, queried node is the master */ + if (is_node_standby == 0) + { + PQclear(res); + log_debug(_("get_master_connection(): current master node is %i"), node_id); + + if (master_id != NULL) + { + *master_id = node_id; + } + + return remote_conn; + } + + PQfinish(remote_conn); + } + + PQclear(res); + return NULL; +} + + + +/* + * Return the id of the active master node, or NODE_NOT_FOUND if no + * record available. + * + * This reports the value stored in the database only and + * does not verify whether the node is actually available + */ +int +get_master_node_id(PGconn *conn) +{ + PQExpBufferData query; + PGresult *res; + int retval; + + initPQExpBuffer(&query); + appendPQExpBuffer(&query, + "SELECT node_id " + " FROM repmgr.nodes " + " WHERE type = 'master' " + " AND active IS TRUE "); + + log_verbose(LOG_DEBUG, "get_master_node_id():\n%s", query.data); + + res = PQexec(conn, query.data); + + termPQExpBuffer(&query); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + log_error(_("get_master_node_id(): query failed\n %s"), + PQerrorMessage(conn)); + retval = NODE_NOT_FOUND; + } + else if (PQntuples(res) == 0) + { + log_verbose(LOG_WARNING, _("get_master_node_id(): no active primary found\n")); + retval = NODE_NOT_FOUND; + } + else + { + retval = atoi(PQgetvalue(res, 0, 0)); + } + PQclear(res); + + return retval; +} + diff --git a/dbutils.h b/dbutils.h index 5686d323..9784322f 100644 --- a/dbutils.h +++ b/dbutils.h @@ -119,12 +119,18 @@ PGconn *establish_db_connection_by_params(const char *keywords[], /* conninfo manipulation functions */ -void initialize_conninfo_params(t_conninfo_param_list *param_list, bool set_defaults); -void copy_conninfo_params(t_conninfo_param_list *dest_list, t_conninfo_param_list *source_list); -void conn_to_param_list(PGconn *conn, t_conninfo_param_list *param_list); -void param_set(t_conninfo_param_list *param_list, const char *param, const char *value); -char *param_get(t_conninfo_param_list *param_list, const char *param); -bool parse_conninfo_string(const char *conninfo_str, t_conninfo_param_list *param_list, char *errmsg, bool ignore_application_name); +void initialize_conninfo_params(t_conninfo_param_list *param_list, bool set_defaults); +void copy_conninfo_params(t_conninfo_param_list *dest_list, t_conninfo_param_list *source_list); +void conn_to_param_list(PGconn *conn, t_conninfo_param_list *param_list); +void param_set(t_conninfo_param_list *param_list, const char *param, const char *value); +char *param_get(t_conninfo_param_list *param_list, const char *param); +bool parse_conninfo_string(const char *conninfo_str, t_conninfo_param_list *param_list, char *errmsg, bool ignore_application_name); + +/* transaction functions */ +bool begin_transaction(PGconn *conn); +bool commit_transaction(PGconn *conn); +bool rollback_transaction(PGconn *conn); +bool check_cluster_schema(PGconn *conn); /* GUC manipulation functions */ bool set_config(PGconn *conn, const char *config_param, const char *config_value); @@ -133,6 +139,7 @@ bool set_config_bool(PGconn *conn, const char *config_param, bool state); /* Server information functions */ int get_server_version(PGconn *conn, char *server_version); int is_standby(PGconn *conn); - +PGconn *get_master_connection(PGconn *standby_conn, int *master_id, char *master_conninfo_out); +int get_master_node_id(PGconn *conn); #endif diff --git a/repmgr-client.c b/repmgr-client.c index 13440557..c41e0760 100644 --- a/repmgr-client.c +++ b/repmgr-client.c @@ -436,8 +436,8 @@ static void do_master_register(void) { PGconn *conn = NULL; - - + PGconn *master_conn = NULL; + int current_master_id = UNKNOWN_NODE_ID; int ret; log_info(_("connecting to master database...")); @@ -468,6 +468,44 @@ do_master_register(void) exit(ERR_BAD_CONFIG); } + /* Ensure there isn't another active master already registered */ + master_conn = get_master_connection(conn, ¤t_master_id, NULL); + + if (master_conn != NULL) + { + if (current_master_id != config_file_options.node_id) + { + /* it's impossible to add a second master to a streaming replication cluster */ + log_error(_("there is already an active registered master (node ID: %i) in this cluster"), current_master_id); + PQfinish(master_conn); + PQfinish(conn); + exit(ERR_BAD_CONFIG); + } + + /* we've probably connected to ourselves */ + PQfinish(master_conn); + } + + + begin_transaction(conn); + + /* + * Check if a node with a different ID is registered as master. This shouldn't + * happen but could do if an existing master was shut down without being + * unregistered. + */ + + current_master_id = get_master_node_id(conn); + if (current_master_id != NODE_NOT_FOUND && current_master_id != config_file_options.node_id) + { + log_error(_("another node with id %i is already registered as master"), current_master_id); + // attempt to connect, add info/hint depending if active... + log_info(_("a streaming replication cluster can have only one master node")); + rollback_transaction(conn); + PQfinish(conn); + exit(ERR_BAD_CONFIG); + } + } diff --git a/strutil.h b/strutil.h index f0940727..8a18b205 100644 --- a/strutil.h +++ b/strutil.h @@ -8,6 +8,10 @@ #define MAXLEN 1024 #define MAX_QUERY_LEN 8192 + +/* same as defined in src/include/replication/walreceiver.h */ +#define MAXCONNINFO 1024 + /* Why? http://stackoverflow.com/a/5459929/398670 */ #define STR(x) CppAsString(x)