Handle failover of top-level standby

Cascaded standbys will not go into failover so we need to ignore
these when looking for candidates for promotion.
This commit is contained in:
Ian Barwick
2015-01-16 12:35:01 +09:00
parent a82d37e48a
commit 609453a848
4 changed files with 123 additions and 13 deletions

View File

@@ -388,6 +388,7 @@ get_cluster_size(PGconn *conn, char *size)
} }
bool bool
get_pg_setting(PGconn *conn, const char *setting, char *output) get_pg_setting(PGconn *conn, const char *setting, char *output)
{ {
@@ -467,7 +468,7 @@ get_upstream_connection(PGconn *standby_conn, char *cluster, int node_id,
cluster, cluster,
node_id); node_id);
log_debug("%s", sqlquery); log_debug("%s\n", sqlquery);
res = PQexec(standby_conn, sqlquery); res = PQexec(standby_conn, sqlquery);

View File

@@ -22,6 +22,7 @@
#include "strutil.h" #include "strutil.h"
PGconn *establish_db_connection(const char *conninfo, PGconn *establish_db_connection(const char *conninfo,
const bool exit_on_error); const bool exit_on_error);
PGconn *establish_db_connection_by_params(const char *keywords[], PGconn *establish_db_connection_by_params(const char *keywords[],

View File

@@ -21,9 +21,11 @@
#define _REPMGR_H_ #define _REPMGR_H_
#include "postgres_fe.h" #include "postgres_fe.h"
#include "getopt_long.h"
#include "libpq-fe.h" #include "libpq-fe.h"
#include "getopt_long.h"
#include "strutil.h" #include "strutil.h"
#include "dbutils.h" #include "dbutils.h"
#include "errcode.h" #include "errcode.h"
@@ -53,6 +55,17 @@
#define AUTOMATIC_FAILOVER 1 #define AUTOMATIC_FAILOVER 1
#define NO_UPSTREAM_NODE -1 #define NO_UPSTREAM_NODE -1
typedef enum {
UNKNOWN = 0,
PRIMARY,
STANDBY,
WITNESS
} t_server_type;
/* Run time options type */ /* Run time options type */
typedef struct typedef struct
{ {

117
repmgrd.c
View File

@@ -32,6 +32,7 @@
#include <stdlib.h> #include <stdlib.h>
#include <unistd.h> #include <unistd.h>
#include "repmgr.h" #include "repmgr.h"
#include "config.h" #include "config.h"
#include "log.h" #include "log.h"
@@ -48,14 +49,16 @@
typedef struct s_node_info typedef struct s_node_info
{ {
int node_id; int node_id;
int upstream_node_id;
char conninfo_str[MAXLEN]; char conninfo_str[MAXLEN];
XLogRecPtr xlog_location; XLogRecPtr xlog_location;
t_server_type type;
bool is_ready; bool is_ready;
bool is_visible; bool is_visible;
bool is_witness;
} t_node_info; } t_node_info;
/* Local info */ /* Local info */
t_configuration_options local_options; t_configuration_options local_options;
int my_local_mode = STANDBY_MODE; int my_local_mode = STANDBY_MODE;
@@ -71,6 +74,7 @@ const char *progname;
char *config_file = DEFAULT_CONFIG_FILE; char *config_file = DEFAULT_CONFIG_FILE;
bool verbose = false; bool verbose = false;
bool monitoring_history = false; bool monitoring_history = false;
t_node_info node_info;
bool failover_done = false; bool failover_done = false;
@@ -90,6 +94,7 @@ static void update_shared_memory(char *last_wal_standby_applied);
static void update_registration(void); static void update_registration(void);
static void do_failover(void); static void do_failover(void);
static t_node_info get_node_info(PGconn *conn,char *cluster, int node_id);
static XLogRecPtr lsn_to_xlogrecptr(char *lsn, bool *format_ok); static XLogRecPtr lsn_to_xlogrecptr(char *lsn, bool *format_ok);
/* /*
@@ -264,6 +269,9 @@ main(int argc, char **argv)
terminate(ERR_BAD_CONFIG); terminate(ERR_BAD_CONFIG);
} }
/* Retrieve record for this node from the database */
node_info = get_node_info(my_local_conn, local_options.cluster_name, local_options.node);
log_debug("Node id is %i, upstream is %i\n", node_info.node_id, node_info.upstream_node_id);
/* /*
* MAIN LOOP This loops cycles at startup and once per failover and * MAIN LOOP This loops cycles at startup and once per failover and
@@ -832,7 +840,9 @@ standby_monitor(void)
PQerrorMessage(primary_conn)); PQerrorMessage(primary_conn));
} }
// ZZZ witness
// ZZZ this handles failovers where node's upstream is cluster primary
static void static void
do_failover(void) do_failover(void)
{ {
@@ -862,11 +872,11 @@ do_failover(void)
t_node_info nodes[FAILOVER_NODES_MAX_CHECK]; t_node_info nodes[FAILOVER_NODES_MAX_CHECK];
/* initialize to keep compiler quiet */ /* initialize to keep compiler quiet */
t_node_info best_candidate = {-1, "", InvalidXLogRecPtr, false, false, false}; t_node_info best_candidate = {-1, NO_UPSTREAM_NODE, "", InvalidXLogRecPtr, UNKNOWN, false, false};
/* get a list of standby nodes, including myself */ /* get a list of standby nodes, including myself */
sprintf(sqlquery, sprintf(sqlquery,
"SELECT id, conninfo, type " "SELECT id, conninfo, type, upstream_node_id "
" FROM %s.repl_nodes " " FROM %s.repl_nodes "
" WHERE cluster = '%s' " " WHERE cluster = '%s' "
" ORDER BY priority, id " " ORDER BY priority, id "
@@ -880,6 +890,7 @@ do_failover(void)
{ {
log_err(_("Unable to retrieve node records: %s\n"), PQerrorMessage(my_local_conn)); log_err(_("Unable to retrieve node records: %s\n"), PQerrorMessage(my_local_conn));
PQclear(res); PQclear(res);
PQfinish(my_local_conn);
terminate(ERR_DB_QUERY); terminate(ERR_DB_QUERY);
} }
@@ -896,9 +907,28 @@ do_failover(void)
for (i = 0; i < total_nodes; i++) for (i = 0; i < total_nodes; i++)
{ {
nodes[i].node_id = atoi(PQgetvalue(res, i, 0)); nodes[i].node_id = atoi(PQgetvalue(res, i, 0));
strncpy(nodes[i].conninfo_str, PQgetvalue(res, i, 1), MAXLEN); strncpy(nodes[i].conninfo_str, PQgetvalue(res, i, 1), MAXLEN);
// ZZZ witness
nodes[i].is_witness = (strcmp(PQgetvalue(res, i, 2), "t") == 0) ? true : false; if(strcmp(PQgetvalue(res, i, 2), "primary") == 0)
{
nodes[i].type = PRIMARY;
}
else if(strcmp(PQgetvalue(res, i, 2), "standby") == 0)
{
nodes[i].type = STANDBY;
}
else if(strcmp(PQgetvalue(res, i, 2), "witness") == 0)
{
nodes[i].type = WITNESS;
}
else
{
// ZZZ handle exception
}
nodes[i].upstream_node_id = atoi(PQgetvalue(res, i, 3));
/* /*
* Initialize on false so if we can't reach this node we know that * Initialize on false so if we can't reach this node we know that
@@ -909,9 +939,9 @@ do_failover(void)
nodes[i].xlog_location = InvalidXLogRecPtr; nodes[i].xlog_location = InvalidXLogRecPtr;
log_debug(_("%s: node=%d conninfo=\"%s\" witness=%s\n"), log_debug(_("%s: node=%d conninfo=\"%s\" type=%s\n"),
progname, nodes[i].node_id, nodes[i].conninfo_str, progname, nodes[i].node_id, nodes[i].conninfo_str,
(nodes[i].is_witness) ? "true" : "false"); PQgetvalue(res, i, 2));
node_conn = establish_db_connection(nodes[i].conninfo_str, false); node_conn = establish_db_connection(nodes[i].conninfo_str, false);
@@ -956,9 +986,14 @@ do_failover(void)
continue; continue;
/* if the node is a witness node, skip it */ /* if the node is a witness node, skip it */
if (nodes[i].is_witness) if (nodes[i].type == WITNESS)
continue; continue;
/* if node does not have same upstream node, skip it */
if (nodes[i].upstream_node_id != node_info.upstream_node_id)
continue;
node_conn = establish_db_connection(nodes[i].conninfo_str, false); node_conn = establish_db_connection(nodes[i].conninfo_str, false);
/* /*
@@ -1026,7 +1061,7 @@ do_failover(void)
* ensure witness server is marked as ready, and skip * ensure witness server is marked as ready, and skip
* LSN check * LSN check
*/ */
if (nodes[i].is_witness) if (nodes[i].type == WITNESS)
{ {
if (!nodes[i].is_ready) if (!nodes[i].is_ready)
{ {
@@ -1040,6 +1075,10 @@ do_failover(void)
if (!nodes[i].is_visible) if (!nodes[i].is_visible)
continue; continue;
/* if node does not have same upstream node, skip it */
if (nodes[i].upstream_node_id != node_info.upstream_node_id)
continue;
node_conn = establish_db_connection(nodes[i].conninfo_str, false); node_conn = establish_db_connection(nodes[i].conninfo_str, false);
/* /*
@@ -1145,7 +1184,7 @@ do_failover(void)
for (i = 0; i < total_nodes; i++) for (i = 0; i < total_nodes; i++)
{ {
/* witness server can never be a candidate */ /* witness server can never be a candidate */
if (nodes[i].is_witness) if (nodes[i].type == WITNESS)
continue; continue;
if (!nodes[i].is_ready || !nodes[i].is_visible) if (!nodes[i].is_ready || !nodes[i].is_visible)
@@ -1671,3 +1710,59 @@ check_and_create_pid_file(const char *pid_file)
fprintf(fd, "%d", getpid()); fprintf(fd, "%d", getpid());
fclose(fd); fclose(fd);
} }
t_node_info
get_node_info(PGconn *conn,char *cluster, int node_id)
{
char sqlquery[QUERY_STR_LEN];
PGresult *res;
t_node_info node_info = {-1, NO_UPSTREAM_NODE, "", InvalidXLogRecPtr, UNKNOWN, false, false};
sprintf(sqlquery,
"SELECT id, upstream_node_id, conninfo, type "
" FROM %s.repl_nodes "
" WHERE cluster = '%s' "
" AND id = %i",
get_repmgr_schema_quoted(conn),
local_options.cluster_name,
node_id);
log_debug("get_node_info(): %s\n", sqlquery);
res = PQexec(my_local_conn, sqlquery);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
log_err(_("Unable to retrieve record for node %i: %s\n"), node_id, PQerrorMessage(conn));
PQclear(res);
PQfinish(conn);
terminate(ERR_DB_QUERY);
}
node_info.node_id = atoi(PQgetvalue(res, 0, 0));
node_info.upstream_node_id = atoi(PQgetvalue(res, 0, 1));
strncpy(node_info.conninfo_str, PQgetvalue(res, 0, 2), MAXLEN);
// ZZZ consolidate similar constructs
if(strcmp(PQgetvalue(res, 0, 3), "primary") == 0)
{
node_info.type = PRIMARY;
}
else if(strcmp(PQgetvalue(res, 0, 3), "standby") == 0)
{
node_info.type = STANDBY;
}
else if(strcmp(PQgetvalue(res, 0, 3), "witness") == 0)
{
node_info.type = WITNESS;
}
else
{
node_info.type = UNKNOWN;
}
PQclear(res);
return node_info;
}