Improve replication connection check

Previously the check verifying that a node has connected to its upstream
merely assumed the presence of a record in pg_stat_replication indicates
a successful replication connection. However the record may contain a
state other than "streaming", typically "startup" (which will occur when
a node has diverged from its upstream and will therefore never
transition to "streaming"), which needs to be taken into account when
considering the state of the replication connection to avoid false
positives.
This commit is contained in:
Ian Barwick
2020-08-27 16:05:12 +09:00
parent c50a2d049c
commit 0630d9644e
5 changed files with 90 additions and 35 deletions

View File

@@ -5758,16 +5758,16 @@ get_node_replication_stats(PGconn *conn, t_node_info *node_info)
NodeAttached NodeAttached
is_downstream_node_attached(PGconn *conn, char *node_name) is_downstream_node_attached(PGconn *conn, char *node_name, char **node_state)
{ {
PQExpBufferData query; PQExpBufferData query;
PGresult *res = NULL; PGresult *res = NULL;
int c = 0; const char *state = NULL;
initPQExpBuffer(&query); initPQExpBuffer(&query);
appendPQExpBuffer(&query, appendPQExpBuffer(&query,
" SELECT pg_catalog.count(*) " " SELECT pid, state "
" FROM pg_catalog.pg_stat_replication " " FROM pg_catalog.pg_stat_replication "
" WHERE application_name = '%s'", " WHERE application_name = '%s'",
node_name); node_name);
@@ -5786,31 +5786,53 @@ is_downstream_node_attached(PGconn *conn, char *node_name)
return NODE_ATTACHED_UNKNOWN; return NODE_ATTACHED_UNKNOWN;
} }
if (PQntuples(res) != 1) termPQExpBuffer(&query);
{
log_verbose(LOG_WARNING, _("unexpected number of tuples (%i) returned"), PQntuples(res)); /*
* If there's more than one entry in pg_stat_application, there's no
* way we can reliably determine which one belongs to the node we're
* checking, so there's nothing more we can do.
*/
if (PQntuples(res) > 1)
{
log_error(_("multiple entries with \"application_name\" set to \"%s\" found in \"pg_stat_replication\""),
node_name);
log_hint(_("verify that a unique node name is configured for each node"));
termPQExpBuffer(&query);
PQclear(res); PQclear(res);
return NODE_ATTACHED_UNKNOWN; return NODE_ATTACHED_UNKNOWN;
} }
c = atoi(PQgetvalue(res, 0, 0)); if (PQntuples(res) == 0)
termPQExpBuffer(&query);
PQclear(res);
if (c == 0)
{ {
log_verbose(LOG_WARNING, _("node \"%s\" not found in \"pg_stat_replication\""), node_name); log_warning(_("node \"%s\" not found in \"pg_stat_replication\""), node_name);
PQclear(res);
return NODE_DETACHED; return NODE_DETACHED;
} }
if (c > 1) state = PQgetvalue(res, 0, 1);
log_verbose(LOG_WARNING, _("multiple entries with \"application_name\" set to \"%s\" found in \"pg_stat_replication\""),
node_name); if (node_state != NULL)
{
*node_state = palloc0(strlen(state) + 1);
strncpy(*node_state, state, strlen(state));
}
if (strcmp(state, "streaming") != 0)
{
log_warning(_("node \"%s\" attached in state \"%s\""),
node_name,
state);
PQclear(res);
return NODE_NOT_ATTACHED;
}
PQclear(res);
return NODE_ATTACHED; return NODE_ATTACHED;
} }

View File

@@ -119,9 +119,14 @@ typedef enum
typedef enum typedef enum
{ {
/* unable to query "pg_stat_replication" or other error */
NODE_ATTACHED_UNKNOWN = -1, NODE_ATTACHED_UNKNOWN = -1,
NODE_DETACHED, /* node has record in "pg_stat_replication" and state is not "streaming" */
NODE_ATTACHED NODE_ATTACHED,
/* node has record in "pg_stat_replication" but state is not "streaming" */
NODE_NOT_ATTACHED,
/* node has no record in "pg_stat_replication" */
NODE_DETACHED
} NodeAttached; } NodeAttached;
typedef enum typedef enum
@@ -590,7 +595,7 @@ bool get_replication_info(PGconn *conn, t_server_type node_type, ReplInfo *repl
int get_replication_lag_seconds(PGconn *conn); int get_replication_lag_seconds(PGconn *conn);
TimeLineID get_node_timeline(PGconn *conn, char *timeline_id_str); TimeLineID get_node_timeline(PGconn *conn, char *timeline_id_str);
void get_node_replication_stats(PGconn *conn, t_node_info *node_info); void get_node_replication_stats(PGconn *conn, t_node_info *node_info);
NodeAttached is_downstream_node_attached(PGconn *conn, char *node_name); NodeAttached is_downstream_node_attached(PGconn *conn, char *node_name, char **node_state);
void set_upstream_last_seen(PGconn *conn, int upstream_node_id); void set_upstream_last_seen(PGconn *conn, int upstream_node_id);
int get_upstream_last_seen(PGconn *conn, t_server_type node_type); int get_upstream_last_seen(PGconn *conn, t_server_type node_type);

View File

@@ -307,7 +307,7 @@ do_node_status(void)
continue; continue;
} }
if (is_downstream_node_attached(conn, node_cell->node_info->node_name) != NODE_ATTACHED) if (is_downstream_node_attached(conn, node_cell->node_info->node_name, NULL) != NODE_ATTACHED)
{ {
missing_nodes_count++; missing_nodes_count++;
item_list_append_format(&missing_nodes, item_list_append_format(&missing_nodes,
@@ -1302,7 +1302,7 @@ do_node_check_downstream(PGconn *conn, OutputMode mode, t_node_info *node_info,
continue; continue;
} }
if (is_downstream_node_attached(conn, cell->node_info->node_name) != NODE_ATTACHED) if (is_downstream_node_attached(conn, cell->node_info->node_name, NULL) != NODE_ATTACHED)
{ {
missing_nodes_count++; missing_nodes_count++;
item_list_append_format(&missing_nodes, item_list_append_format(&missing_nodes,
@@ -1485,7 +1485,7 @@ do_node_check_upstream(PGconn *conn, OutputMode mode, t_node_info *node_info, Ch
upstream_conn = establish_db_connection(upstream_node_info.conninfo, true); upstream_conn = establish_db_connection(upstream_node_info.conninfo, true);
/* check our node is connected */ /* check our node is connected */
if (is_downstream_node_attached(upstream_conn, config_file_options.node_name) != NODE_ATTACHED) if (is_downstream_node_attached(upstream_conn, config_file_options.node_name, NULL) != NODE_ATTACHED)
{ {
appendPQExpBuffer(&details, appendPQExpBuffer(&details,
_("node \"%s\" (ID: %i) is not attached to expected upstream node \"%s\" (ID: %i)"), _("node \"%s\" (ID: %i) is not attached to expected upstream node \"%s\" (ID: %i)"),
@@ -2940,7 +2940,7 @@ do_node_rejoin(void)
config_file_options.node_rejoin_timeout); config_file_options.node_rejoin_timeout);
} }
else { else {
log_detail(_("no record for local node \"%s\" found in node \"%s\"'s \"pg_stat_replication\" table"), log_detail(_("no active record for local node \"%s\" found in node \"%s\"'s \"pg_stat_replication\" table"),
config_file_options.node_name, config_file_options.node_name,
primary_node_record.node_name); primary_node_record.node_name);
} }
@@ -2952,7 +2952,7 @@ do_node_rejoin(void)
else else
{ {
/* -W/--no-wait provided - check once */ /* -W/--no-wait provided - check once */
NodeAttached node_attached = is_downstream_node_attached(primary_conn, config_file_options.node_name); NodeAttached node_attached = is_downstream_node_attached(primary_conn, config_file_options.node_name, NULL);
if (node_attached == NODE_ATTACHED) if (node_attached == NODE_ATTACHED)
success = true; success = true;
} }

View File

@@ -1800,7 +1800,7 @@ do_standby_register(void)
else else
{ {
/* check our standby is connected */ /* check our standby is connected */
if (is_downstream_node_attached(upstream_conn, config_file_options.node_name) == NODE_ATTACHED) if (is_downstream_node_attached(upstream_conn, config_file_options.node_name, NULL) == NODE_ATTACHED)
{ {
log_verbose(LOG_INFO, _("local node is attached to specified upstream node %i"), runtime_options.upstream_node_id); log_verbose(LOG_INFO, _("local node is attached to specified upstream node %i"), runtime_options.upstream_node_id);
} }
@@ -1859,7 +1859,7 @@ do_standby_register(void)
primary_node_id); primary_node_id);
/* check our standby is connected */ /* check our standby is connected */
if (is_downstream_node_attached(primary_conn, config_file_options.node_name) == NODE_ATTACHED) if (is_downstream_node_attached(primary_conn, config_file_options.node_name, NULL) == NODE_ATTACHED)
{ {
log_verbose(LOG_INFO, _("local node is attached to primary")); log_verbose(LOG_INFO, _("local node is attached to primary"));
} }
@@ -3076,7 +3076,9 @@ do_standby_follow(void)
for (timer = 0; timer < config_file_options.standby_follow_timeout; timer++) for (timer = 0; timer < config_file_options.standby_follow_timeout; timer++)
{ {
NodeAttached node_attached = is_downstream_node_attached(follow_target_conn, config_file_options.node_name); NodeAttached node_attached = is_downstream_node_attached(follow_target_conn,
config_file_options.node_name,
NULL);
if (node_attached == NODE_ATTACHED) if (node_attached == NODE_ATTACHED)
{ {
@@ -3717,7 +3719,7 @@ do_standby_switchover(void)
exit(ERR_BAD_CONFIG); exit(ERR_BAD_CONFIG);
} }
if (is_downstream_node_attached(remote_conn, local_node_record.node_name) != NODE_ATTACHED) if (is_downstream_node_attached(remote_conn, local_node_record.node_name, NULL) != NODE_ATTACHED)
{ {
log_error(_("local node \"%s\" (ID: %i) is not attached to demotion candidate \"%s\" (ID: %i)"), log_error(_("local node \"%s\" (ID: %i) is not attached to demotion candidate \"%s\" (ID: %i)"),
local_node_record.node_name, local_node_record.node_name,
@@ -5341,7 +5343,8 @@ do_standby_switchover(void)
*/ */
node_attached = is_downstream_node_attached(local_conn, node_attached = is_downstream_node_attached(local_conn,
remote_node_record.node_name); remote_node_record.node_name,
NULL);
if (node_attached == NODE_ATTACHED) if (node_attached == NODE_ATTACHED)
{ {
switchover_success = true; switchover_success = true;

View File

@@ -2354,6 +2354,7 @@ format_node_status(t_node_info *node_info, PQExpBufferData *node_status, PQExpBu
* connected to the upstream * connected to the upstream
*/ */
NodeAttached attached_to_upstream = NODE_ATTACHED_UNKNOWN; NodeAttached attached_to_upstream = NODE_ATTACHED_UNKNOWN;
char *replication_state = NULL;
t_node_info upstream_node_rec = T_NODE_INFO_INITIALIZER; t_node_info upstream_node_rec = T_NODE_INFO_INITIALIZER;
RecordStatus upstream_node_rec_found = get_node_record(node_info->conn, RecordStatus upstream_node_rec_found = get_node_record(node_info->conn,
node_info->upstream_node_id, node_info->upstream_node_id,
@@ -2381,7 +2382,7 @@ format_node_status(t_node_info *node_info, PQExpBufferData *node_status, PQExpBu
} }
else else
{ {
attached_to_upstream = is_downstream_node_attached(upstream_conn, node_info->node_name); attached_to_upstream = is_downstream_node_attached(upstream_conn, node_info->node_name, &replication_state);
} }
PQfinish(upstream_conn); PQfinish(upstream_conn);
@@ -2397,6 +2398,18 @@ format_node_status(t_node_info *node_info, PQExpBufferData *node_status, PQExpBu
upstream_node_rec.node_name, upstream_node_rec.node_name,
upstream_node_rec.node_id); upstream_node_rec.node_id);
} }
if (attached_to_upstream == NODE_NOT_ATTACHED)
{
appendPQExpBufferStr(upstream, "? ");
item_list_append_format(warnings,
"node \"%s\" (ID: %i) attached to its upstream node \"%s\" (ID: %i) in state \"%s\"",
node_info->node_name,
node_info->node_id,
upstream_node_rec.node_name,
upstream_node_rec.node_id,
replication_state);
}
else if (attached_to_upstream == NODE_DETACHED) else if (attached_to_upstream == NODE_DETACHED)
{ {
appendPQExpBufferStr(upstream, "! "); appendPQExpBufferStr(upstream, "! ");
@@ -3991,8 +4004,10 @@ check_standby_join(PGconn *upstream_conn, t_node_info *upstream_node_record, t_n
for (; i < config_file_options.node_rejoin_timeout; i++) for (; i < config_file_options.node_rejoin_timeout; i++)
{ {
char *node_state = NULL;
NodeAttached node_attached = is_downstream_node_attached(upstream_conn, NodeAttached node_attached = is_downstream_node_attached(upstream_conn,
standby_node_record->node_name); standby_node_record->node_name,
&node_state);
if (node_attached == NODE_ATTACHED) if (node_attached == NODE_ATTACHED)
{ {
log_verbose(LOG_INFO, _("node \"%s\" (ID: %i) has attached to its upstream node"), log_verbose(LOG_INFO, _("node \"%s\" (ID: %i) has attached to its upstream node"),
@@ -4009,9 +4024,19 @@ check_standby_join(PGconn *upstream_conn, t_node_info *upstream_node_record, t_n
i + 1, i + 1,
config_file_options.node_rejoin_timeout); config_file_options.node_rejoin_timeout);
log_detail(_("checking for record in node \"%s\"'s \"pg_stat_replication\" table where \"application_name\" is \"%s\""), if (node_attached == NODE_NOT_ATTACHED)
upstream_node_record->node_name, {
standby_node_record->node_name); log_detail(_("node \"%s\" (ID: %i) is currrently attached to its upstream node in state \"%s\""),
upstream_node_record->node_name,
standby_node_record->node_id,
node_state);
}
else
{
log_detail(_("checking for record in node \"%s\"'s \"pg_stat_replication\" table where \"application_name\" is \"%s\""),
upstream_node_record->node_name,
standby_node_record->node_name);
}
} }
else else
{ {