diff --git a/dbutils.c b/dbutils.c index e97a30cb..507fd382 100644 --- a/dbutils.c +++ b/dbutils.c @@ -388,6 +388,7 @@ get_cluster_size(PGconn *conn, char *size) } + bool get_pg_setting(PGconn *conn, const char *setting, char *output) { @@ -467,7 +468,7 @@ get_upstream_connection(PGconn *standby_conn, char *cluster, int node_id, cluster, node_id); - log_debug("%s", sqlquery); + log_debug("%s\n", sqlquery); res = PQexec(standby_conn, sqlquery); diff --git a/dbutils.h b/dbutils.h index 6aad1081..e9ab4535 100644 --- a/dbutils.h +++ b/dbutils.h @@ -22,6 +22,7 @@ #include "strutil.h" + PGconn *establish_db_connection(const char *conninfo, const bool exit_on_error); PGconn *establish_db_connection_by_params(const char *keywords[], diff --git a/repmgr.h b/repmgr.h index 6caf1f72..58e0cb9a 100644 --- a/repmgr.h +++ b/repmgr.h @@ -21,9 +21,11 @@ #define _REPMGR_H_ #include "postgres_fe.h" -#include "getopt_long.h" #include "libpq-fe.h" + +#include "getopt_long.h" + #include "strutil.h" #include "dbutils.h" #include "errcode.h" @@ -53,6 +55,17 @@ #define AUTOMATIC_FAILOVER 1 #define NO_UPSTREAM_NODE -1 + + +typedef enum { + UNKNOWN = 0, + PRIMARY, + STANDBY, + WITNESS +} t_server_type; + + + /* Run time options type */ typedef struct { diff --git a/repmgrd.c b/repmgrd.c index b01f1dcd..1e0eb42f 100644 --- a/repmgrd.c +++ b/repmgrd.c @@ -32,6 +32,7 @@ #include #include + #include "repmgr.h" #include "config.h" #include "log.h" @@ -48,14 +49,16 @@ typedef struct s_node_info { int node_id; + int upstream_node_id; char conninfo_str[MAXLEN]; XLogRecPtr xlog_location; + t_server_type type; bool is_ready; bool is_visible; - bool is_witness; } t_node_info; + /* Local info */ t_configuration_options local_options; int my_local_mode = STANDBY_MODE; @@ -71,6 +74,7 @@ const char *progname; char *config_file = DEFAULT_CONFIG_FILE; bool verbose = false; bool monitoring_history = false; +t_node_info node_info; bool failover_done = false; @@ -90,6 +94,7 @@ static void update_shared_memory(char *last_wal_standby_applied); static void update_registration(void); static void do_failover(void); +static t_node_info get_node_info(PGconn *conn,char *cluster, int node_id); static XLogRecPtr lsn_to_xlogrecptr(char *lsn, bool *format_ok); /* @@ -264,6 +269,9 @@ main(int argc, char **argv) terminate(ERR_BAD_CONFIG); } + /* Retrieve record for this node from the database */ + node_info = get_node_info(my_local_conn, local_options.cluster_name, local_options.node); + log_debug("Node id is %i, upstream is %i\n", node_info.node_id, node_info.upstream_node_id); /* * MAIN LOOP This loops cycles at startup and once per failover and @@ -832,7 +840,9 @@ standby_monitor(void) PQerrorMessage(primary_conn)); } -// ZZZ witness + +// ZZZ this handles failovers where node's upstream is cluster primary + static void do_failover(void) { @@ -862,11 +872,11 @@ do_failover(void) t_node_info nodes[FAILOVER_NODES_MAX_CHECK]; /* initialize to keep compiler quiet */ - t_node_info best_candidate = {-1, "", InvalidXLogRecPtr, false, false, false}; + t_node_info best_candidate = {-1, NO_UPSTREAM_NODE, "", InvalidXLogRecPtr, UNKNOWN, false, false}; /* get a list of standby nodes, including myself */ sprintf(sqlquery, - "SELECT id, conninfo, type " + "SELECT id, conninfo, type, upstream_node_id " " FROM %s.repl_nodes " " WHERE cluster = '%s' " " ORDER BY priority, id " @@ -880,6 +890,7 @@ do_failover(void) { log_err(_("Unable to retrieve node records: %s\n"), PQerrorMessage(my_local_conn)); PQclear(res); + PQfinish(my_local_conn); terminate(ERR_DB_QUERY); } @@ -896,9 +907,28 @@ do_failover(void) for (i = 0; i < total_nodes; i++) { nodes[i].node_id = atoi(PQgetvalue(res, i, 0)); + strncpy(nodes[i].conninfo_str, PQgetvalue(res, i, 1), MAXLEN); - // ZZZ witness - nodes[i].is_witness = (strcmp(PQgetvalue(res, i, 2), "t") == 0) ? true : false; + + if(strcmp(PQgetvalue(res, i, 2), "primary") == 0) + { + nodes[i].type = PRIMARY; + } + else if(strcmp(PQgetvalue(res, i, 2), "standby") == 0) + { + nodes[i].type = STANDBY; + } + else if(strcmp(PQgetvalue(res, i, 2), "witness") == 0) + { + nodes[i].type = WITNESS; + } + else + { + // ZZZ handle exception + } + + nodes[i].upstream_node_id = atoi(PQgetvalue(res, i, 3)); + /* * Initialize on false so if we can't reach this node we know that @@ -909,9 +939,9 @@ do_failover(void) nodes[i].xlog_location = InvalidXLogRecPtr; - log_debug(_("%s: node=%d conninfo=\"%s\" witness=%s\n"), + log_debug(_("%s: node=%d conninfo=\"%s\" type=%s\n"), progname, nodes[i].node_id, nodes[i].conninfo_str, - (nodes[i].is_witness) ? "true" : "false"); + PQgetvalue(res, i, 2)); node_conn = establish_db_connection(nodes[i].conninfo_str, false); @@ -956,9 +986,14 @@ do_failover(void) continue; /* if the node is a witness node, skip it */ - if (nodes[i].is_witness) + if (nodes[i].type == WITNESS) continue; + /* if node does not have same upstream node, skip it */ + if (nodes[i].upstream_node_id != node_info.upstream_node_id) + continue; + + node_conn = establish_db_connection(nodes[i].conninfo_str, false); /* @@ -1026,7 +1061,7 @@ do_failover(void) * ensure witness server is marked as ready, and skip * LSN check */ - if (nodes[i].is_witness) + if (nodes[i].type == WITNESS) { if (!nodes[i].is_ready) { @@ -1040,6 +1075,10 @@ do_failover(void) if (!nodes[i].is_visible) continue; + /* if node does not have same upstream node, skip it */ + if (nodes[i].upstream_node_id != node_info.upstream_node_id) + continue; + node_conn = establish_db_connection(nodes[i].conninfo_str, false); /* @@ -1145,7 +1184,7 @@ do_failover(void) for (i = 0; i < total_nodes; i++) { /* witness server can never be a candidate */ - if (nodes[i].is_witness) + if (nodes[i].type == WITNESS) continue; if (!nodes[i].is_ready || !nodes[i].is_visible) @@ -1671,3 +1710,59 @@ check_and_create_pid_file(const char *pid_file) fprintf(fd, "%d", getpid()); fclose(fd); } + + +t_node_info +get_node_info(PGconn *conn,char *cluster, int node_id) +{ + char sqlquery[QUERY_STR_LEN]; + PGresult *res; + + t_node_info node_info = {-1, NO_UPSTREAM_NODE, "", InvalidXLogRecPtr, UNKNOWN, false, false}; + + sprintf(sqlquery, + "SELECT id, upstream_node_id, conninfo, type " + " FROM %s.repl_nodes " + " WHERE cluster = '%s' " + " AND id = %i", + get_repmgr_schema_quoted(conn), + local_options.cluster_name, + node_id); + + log_debug("get_node_info(): %s\n", sqlquery); + + res = PQexec(my_local_conn, sqlquery); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + log_err(_("Unable to retrieve record for node %i: %s\n"), node_id, PQerrorMessage(conn)); + PQclear(res); + PQfinish(conn); + terminate(ERR_DB_QUERY); + } + + node_info.node_id = atoi(PQgetvalue(res, 0, 0)); + node_info.upstream_node_id = atoi(PQgetvalue(res, 0, 1)); + strncpy(node_info.conninfo_str, PQgetvalue(res, 0, 2), MAXLEN); + + // ZZZ consolidate similar constructs + if(strcmp(PQgetvalue(res, 0, 3), "primary") == 0) + { + node_info.type = PRIMARY; + } + else if(strcmp(PQgetvalue(res, 0, 3), "standby") == 0) + { + node_info.type = STANDBY; + } + else if(strcmp(PQgetvalue(res, 0, 3), "witness") == 0) + { + node_info.type = WITNESS; + } + else + { + node_info.type = UNKNOWN; + } + + PQclear(res); + + return node_info; +}