repmgrd: check for a change to the upstream node

If the upstream node has changed, for example after "repmgr standby follow"
was manually executed, restart monitoring to ensure repmgrd is monitoring the
correct node.
This commit is contained in:
Ian Barwick
2019-01-22 13:31:00 +09:00
parent b6fe91ebcd
commit 1980deb480
3 changed files with 99 additions and 19 deletions

View File

@@ -51,8 +51,8 @@ static PGconn *_establish_db_connection(const char *conninfo,
static PGconn *_get_primary_connection(PGconn *standby_conn, int *primary_id, char *primary_conninfo_out, bool quiet);
static bool _set_config(PGconn *conn, const char *config_param, const char *sqlquery);
static RecordStatus _get_node_record(PGconn *conn, char *sqlquery, t_node_info *node_info);
static void _populate_node_record(PGresult *res, t_node_info *node_info, int row);
static RecordStatus _get_node_record(PGconn *conn, char *sqlquery, t_node_info *node_info, bool init_defaults);
static void _populate_node_record(PGresult *res, t_node_info *node_info, int row, bool init_defaults);
static void _populate_node_records(PGresult *res, NodeInfoList *node_list);
@@ -2094,7 +2094,7 @@ promote_standby(PGconn *conn, bool wait, int wait_seconds)
static RecordStatus
_get_node_record(PGconn *conn, char *sqlquery, t_node_info *node_info)
_get_node_record(PGconn *conn, char *sqlquery, t_node_info *node_info, bool init_defaults)
{
int ntuples = 0;
PGresult *res = NULL;
@@ -2115,7 +2115,7 @@ _get_node_record(PGconn *conn, char *sqlquery, t_node_info *node_info)
return RECORD_NOT_FOUND;
}
_populate_node_record(res, node_info, 0);
_populate_node_record(res, node_info, 0, init_defaults);
PQclear(res);
@@ -2124,7 +2124,7 @@ _get_node_record(PGconn *conn, char *sqlquery, t_node_info *node_info)
static void
_populate_node_record(PGresult *res, t_node_info *node_info, int row)
_populate_node_record(PGresult *res, t_node_info *node_info, int row, bool init_defaults)
{
node_info->node_id = atoi(PQgetvalue(res, row, 0));
node_info->type = parse_node_type(PQgetvalue(res, row, 1));
@@ -2151,11 +2151,15 @@ _populate_node_record(PGresult *res, t_node_info *node_info, int row)
strncpy(node_info->upstream_node_name, PQgetvalue(res, row, 11), MAXLEN);
/* Set remaining struct fields with default values */
node_info->node_status = NODE_STATUS_UNKNOWN;
node_info->recovery_type = RECTYPE_UNKNOWN;
node_info->last_wal_receive_lsn = InvalidXLogRecPtr;
node_info->monitoring_state = MS_NORMAL;
node_info->conn = NULL;
if (init_defaults == true)
{
node_info->node_status = NODE_STATUS_UNKNOWN;
node_info->recovery_type = RECTYPE_UNKNOWN;
node_info->last_wal_receive_lsn = InvalidXLogRecPtr;
node_info->monitoring_state = MS_NORMAL;
node_info->conn = NULL;
}
}
@@ -2220,7 +2224,7 @@ get_node_record(PGconn *conn, int node_id, t_node_info *node_info)
log_verbose(LOG_DEBUG, "get_node_record():\n %s", query.data);
result = _get_node_record(conn, query.data, node_info);
result = _get_node_record(conn, query.data, node_info, true);
termPQExpBuffer(&query);
if (result == RECORD_NOT_FOUND)
@@ -2232,6 +2236,33 @@ get_node_record(PGconn *conn, int node_id, t_node_info *node_info)
}
RecordStatus
refresh_node_record(PGconn *conn, int node_id, t_node_info *node_info)
{
PQExpBufferData query;
RecordStatus result;
initPQExpBuffer(&query);
appendPQExpBuffer(&query,
"SELECT " REPMGR_NODES_COLUMNS
" FROM repmgr.nodes n "
" WHERE n.node_id = %i",
node_id);
log_verbose(LOG_DEBUG, "get_node_record():\n %s", query.data);
result = _get_node_record(conn, query.data, node_info, false);
termPQExpBuffer(&query);
if (result == RECORD_NOT_FOUND)
{
log_verbose(LOG_DEBUG, "refresh_node_record(): no record found for node %i", node_id);
}
return result;
}
RecordStatus
get_node_record_with_upstream(PGconn *conn, int node_id, t_node_info *node_info)
{
@@ -2250,7 +2281,7 @@ get_node_record_with_upstream(PGconn *conn, int node_id, t_node_info *node_info)
log_verbose(LOG_DEBUG, "get_node_record():\n %s", query.data);
result = _get_node_record(conn, query.data, node_info);
result = _get_node_record(conn, query.data, node_info, true);
termPQExpBuffer(&query);
if (result == RECORD_NOT_FOUND)
@@ -2278,7 +2309,7 @@ get_node_record_by_name(PGconn *conn, const char *node_name, t_node_info *node_i
log_verbose(LOG_DEBUG, "get_node_record_by_name():\n %s", query.data);
record_status = _get_node_record(conn, query.data, node_info);
record_status = _get_node_record(conn, query.data, node_info, true);
termPQExpBuffer(&query);
@@ -2373,7 +2404,7 @@ _populate_node_records(PGresult *res, NodeInfoList *node_list)
cell->node_info = pg_malloc0(sizeof(t_node_info));
_populate_node_record(res, cell->node_info, i);
_populate_node_record(res, cell->node_info, i, true);
if (node_list->tail)
node_list->tail->next = cell;

View File

@@ -451,6 +451,8 @@ t_server_type parse_node_type(const char *type);
const char *get_node_type_string(t_server_type type);
RecordStatus get_node_record(PGconn *conn, int node_id, t_node_info *node_info);
RecordStatus refresh_node_record(PGconn *conn, int node_id, t_node_info *node_info);
RecordStatus get_node_record_with_upstream(PGconn *conn, int node_id, t_node_info *node_info);
RecordStatus get_node_record_by_name(PGconn *conn, const char *node_name, t_node_info *node_info);

View File

@@ -630,7 +630,9 @@ check_primary_status(int degraded_monitoring_elapsed)
}
/*
* repmgrd running on a standby server
*/
void
monitor_streaming_standby(void)
{
@@ -641,6 +643,8 @@ monitor_streaming_standby(void)
MonitoringState local_monitoring_state = MS_NORMAL;
instr_time local_degraded_monitoring_start;
int last_known_upstream_node_id = UNKNOWN_NODE_ID;
log_debug("monitor_streaming_standby()");
reset_node_voting_status();
@@ -729,6 +733,8 @@ monitor_streaming_standby(void)
return;
}
last_known_upstream_node_id = local_node_info.upstream_node_id;
/*
* refresh upstream node record from upstream node, so it's as up-to-date
* as possible
@@ -875,7 +881,6 @@ monitor_streaming_standby(void)
if (PQstatus(local_conn) != CONNECTION_OK)
{
check_connection(&local_node_info, &local_conn);
}
@@ -1144,7 +1149,7 @@ monitor_streaming_standby(void)
*/
/* refresh own internal node record */
record_status = get_node_record(local_conn, local_node_info.node_id, &local_node_info);
record_status = refresh_node_record(local_conn, local_node_info.node_id, &local_node_info);
if (local_node_info.type == PRIMARY)
{
@@ -1354,20 +1359,48 @@ loop:
}
else
{
int stored_local_node_id = repmgrd_get_local_node_id(local_conn);
int stored_local_node_id = UNKNOWN_NODE_ID;
if (local_monitoring_state == MS_DEGRADED)
{
log_info(_("connection to local node recovered after %i seconds"),
calculate_elapsed(local_degraded_monitoring_start));
local_monitoring_state = MS_NORMAL;
/*
* Check if anything has changed since the local node came back on line;
* we may need to restart monitoring.
*/
refresh_node_record(local_conn, local_node_info.node_id, &local_node_info);
if (last_known_upstream_node_id != local_node_info.upstream_node_id)
{
log_notice(_("local node %i upstream appears to have changed, restarting monitoring"),
local_node_info.node_id);
log_detail(_("currently monitoring upstream %i; new upstream is %i"),
last_known_upstream_node_id,
local_node_info.upstream_node_id);
close_connection(&upstream_conn);
return;
}
/*
*
*/
if (local_node_info.type != STANDBY)
{
log_notice(_("local node %i is no longer a standby, restarting monitoring"),
local_node_info.node_id);
close_connection(&upstream_conn);
return;
}
}
/*
* If the local node was restarted, we'll need to reinitialise values
* stored in shared memory.
*/
stored_local_node_id = repmgrd_get_local_node_id(local_conn);
if (stored_local_node_id == UNKNOWN_NODE_ID)
{
@@ -1428,9 +1461,23 @@ loop:
handle_sighup(&local_conn, STANDBY);
}
refresh_node_record(local_conn, local_node_info.node_id, &local_node_info);
if (local_monitoring_state == MS_NORMAL && last_known_upstream_node_id != local_node_info.upstream_node_id)
{
log_notice(_("local node %i's upstream appears to have changed, restarting monitoring"),
local_node_info.node_id);
log_detail(_("currently monitoring upstream %i; new upstream is %i"),
last_known_upstream_node_id,
local_node_info.upstream_node_id);
close_connection(&upstream_conn);
return;
}
log_verbose(LOG_DEBUG, "sleeping %i seconds (parameter \"monitor_interval_secs\")",
config_file_options.monitor_interval_secs);
sleep(config_file_options.monitor_interval_secs);
}
}