diff --git a/dbutils.c b/dbutils.c index 0b3c6a23..4971b84a 100644 --- a/dbutils.c +++ b/dbutils.c @@ -51,8 +51,8 @@ static PGconn *_establish_db_connection(const char *conninfo, static PGconn *_get_primary_connection(PGconn *standby_conn, int *primary_id, char *primary_conninfo_out, bool quiet); static bool _set_config(PGconn *conn, const char *config_param, const char *sqlquery); -static RecordStatus _get_node_record(PGconn *conn, char *sqlquery, t_node_info *node_info); -static void _populate_node_record(PGresult *res, t_node_info *node_info, int row); +static RecordStatus _get_node_record(PGconn *conn, char *sqlquery, t_node_info *node_info, bool init_defaults); +static void _populate_node_record(PGresult *res, t_node_info *node_info, int row, bool init_defaults); static void _populate_node_records(PGresult *res, NodeInfoList *node_list); @@ -2094,7 +2094,7 @@ promote_standby(PGconn *conn, bool wait, int wait_seconds) static RecordStatus -_get_node_record(PGconn *conn, char *sqlquery, t_node_info *node_info) +_get_node_record(PGconn *conn, char *sqlquery, t_node_info *node_info, bool init_defaults) { int ntuples = 0; PGresult *res = NULL; @@ -2115,7 +2115,7 @@ _get_node_record(PGconn *conn, char *sqlquery, t_node_info *node_info) return RECORD_NOT_FOUND; } - _populate_node_record(res, node_info, 0); + _populate_node_record(res, node_info, 0, init_defaults); PQclear(res); @@ -2124,7 +2124,7 @@ _get_node_record(PGconn *conn, char *sqlquery, t_node_info *node_info) static void -_populate_node_record(PGresult *res, t_node_info *node_info, int row) +_populate_node_record(PGresult *res, t_node_info *node_info, int row, bool init_defaults) { node_info->node_id = atoi(PQgetvalue(res, row, 0)); node_info->type = parse_node_type(PQgetvalue(res, row, 1)); @@ -2151,11 +2151,15 @@ _populate_node_record(PGresult *res, t_node_info *node_info, int row) strncpy(node_info->upstream_node_name, PQgetvalue(res, row, 11), MAXLEN); /* Set remaining struct fields with default values */ - node_info->node_status = NODE_STATUS_UNKNOWN; - node_info->recovery_type = RECTYPE_UNKNOWN; - node_info->last_wal_receive_lsn = InvalidXLogRecPtr; - node_info->monitoring_state = MS_NORMAL; - node_info->conn = NULL; + + if (init_defaults == true) + { + node_info->node_status = NODE_STATUS_UNKNOWN; + node_info->recovery_type = RECTYPE_UNKNOWN; + node_info->last_wal_receive_lsn = InvalidXLogRecPtr; + node_info->monitoring_state = MS_NORMAL; + node_info->conn = NULL; + } } @@ -2220,7 +2224,7 @@ get_node_record(PGconn *conn, int node_id, t_node_info *node_info) log_verbose(LOG_DEBUG, "get_node_record():\n %s", query.data); - result = _get_node_record(conn, query.data, node_info); + result = _get_node_record(conn, query.data, node_info, true); termPQExpBuffer(&query); if (result == RECORD_NOT_FOUND) @@ -2232,6 +2236,33 @@ get_node_record(PGconn *conn, int node_id, t_node_info *node_info) } +RecordStatus +refresh_node_record(PGconn *conn, int node_id, t_node_info *node_info) +{ + PQExpBufferData query; + RecordStatus result; + + initPQExpBuffer(&query); + appendPQExpBuffer(&query, + "SELECT " REPMGR_NODES_COLUMNS + " FROM repmgr.nodes n " + " WHERE n.node_id = %i", + node_id); + + log_verbose(LOG_DEBUG, "get_node_record():\n %s", query.data); + + result = _get_node_record(conn, query.data, node_info, false); + termPQExpBuffer(&query); + + if (result == RECORD_NOT_FOUND) + { + log_verbose(LOG_DEBUG, "refresh_node_record(): no record found for node %i", node_id); + } + + return result; +} + + RecordStatus get_node_record_with_upstream(PGconn *conn, int node_id, t_node_info *node_info) { @@ -2250,7 +2281,7 @@ get_node_record_with_upstream(PGconn *conn, int node_id, t_node_info *node_info) log_verbose(LOG_DEBUG, "get_node_record():\n %s", query.data); - result = _get_node_record(conn, query.data, node_info); + result = _get_node_record(conn, query.data, node_info, true); termPQExpBuffer(&query); if (result == RECORD_NOT_FOUND) @@ -2278,7 +2309,7 @@ get_node_record_by_name(PGconn *conn, const char *node_name, t_node_info *node_i log_verbose(LOG_DEBUG, "get_node_record_by_name():\n %s", query.data); - record_status = _get_node_record(conn, query.data, node_info); + record_status = _get_node_record(conn, query.data, node_info, true); termPQExpBuffer(&query); @@ -2373,7 +2404,7 @@ _populate_node_records(PGresult *res, NodeInfoList *node_list) cell->node_info = pg_malloc0(sizeof(t_node_info)); - _populate_node_record(res, cell->node_info, i); + _populate_node_record(res, cell->node_info, i, true); if (node_list->tail) node_list->tail->next = cell; diff --git a/dbutils.h b/dbutils.h index 560603f3..ff22dd4e 100644 --- a/dbutils.h +++ b/dbutils.h @@ -451,6 +451,8 @@ t_server_type parse_node_type(const char *type); const char *get_node_type_string(t_server_type type); RecordStatus get_node_record(PGconn *conn, int node_id, t_node_info *node_info); +RecordStatus refresh_node_record(PGconn *conn, int node_id, t_node_info *node_info); + RecordStatus get_node_record_with_upstream(PGconn *conn, int node_id, t_node_info *node_info); RecordStatus get_node_record_by_name(PGconn *conn, const char *node_name, t_node_info *node_info); diff --git a/repmgrd-physical.c b/repmgrd-physical.c index 2468a12d..b18ac852 100644 --- a/repmgrd-physical.c +++ b/repmgrd-physical.c @@ -630,7 +630,9 @@ check_primary_status(int degraded_monitoring_elapsed) } - +/* + * repmgrd running on a standby server + */ void monitor_streaming_standby(void) { @@ -641,6 +643,8 @@ monitor_streaming_standby(void) MonitoringState local_monitoring_state = MS_NORMAL; instr_time local_degraded_monitoring_start; + int last_known_upstream_node_id = UNKNOWN_NODE_ID; + log_debug("monitor_streaming_standby()"); reset_node_voting_status(); @@ -729,6 +733,8 @@ monitor_streaming_standby(void) return; } + last_known_upstream_node_id = local_node_info.upstream_node_id; + /* * refresh upstream node record from upstream node, so it's as up-to-date * as possible @@ -875,7 +881,6 @@ monitor_streaming_standby(void) if (PQstatus(local_conn) != CONNECTION_OK) { - check_connection(&local_node_info, &local_conn); } @@ -1144,7 +1149,7 @@ monitor_streaming_standby(void) */ /* refresh own internal node record */ - record_status = get_node_record(local_conn, local_node_info.node_id, &local_node_info); + record_status = refresh_node_record(local_conn, local_node_info.node_id, &local_node_info); if (local_node_info.type == PRIMARY) { @@ -1354,20 +1359,48 @@ loop: } else { - int stored_local_node_id = repmgrd_get_local_node_id(local_conn); - + int stored_local_node_id = UNKNOWN_NODE_ID; if (local_monitoring_state == MS_DEGRADED) { log_info(_("connection to local node recovered after %i seconds"), calculate_elapsed(local_degraded_monitoring_start)); local_monitoring_state = MS_NORMAL; + + /* + * Check if anything has changed since the local node came back on line; + * we may need to restart monitoring. + */ + refresh_node_record(local_conn, local_node_info.node_id, &local_node_info); + + if (last_known_upstream_node_id != local_node_info.upstream_node_id) + { + log_notice(_("local node %i upstream appears to have changed, restarting monitoring"), + local_node_info.node_id); + log_detail(_("currently monitoring upstream %i; new upstream is %i"), + last_known_upstream_node_id, + local_node_info.upstream_node_id); + close_connection(&upstream_conn); + return; + } + + /* + * + */ + if (local_node_info.type != STANDBY) + { + log_notice(_("local node %i is no longer a standby, restarting monitoring"), + local_node_info.node_id); + close_connection(&upstream_conn); + return; + } } /* * If the local node was restarted, we'll need to reinitialise values * stored in shared memory. */ + stored_local_node_id = repmgrd_get_local_node_id(local_conn); if (stored_local_node_id == UNKNOWN_NODE_ID) { @@ -1428,9 +1461,23 @@ loop: handle_sighup(&local_conn, STANDBY); } + refresh_node_record(local_conn, local_node_info.node_id, &local_node_info); + + if (local_monitoring_state == MS_NORMAL && last_known_upstream_node_id != local_node_info.upstream_node_id) + { + log_notice(_("local node %i's upstream appears to have changed, restarting monitoring"), + local_node_info.node_id); + log_detail(_("currently monitoring upstream %i; new upstream is %i"), + last_known_upstream_node_id, + local_node_info.upstream_node_id); + close_connection(&upstream_conn); + return; + } + log_verbose(LOG_DEBUG, "sleeping %i seconds (parameter \"monitor_interval_secs\")", config_file_options.monitor_interval_secs); + sleep(config_file_options.monitor_interval_secs); } }