mirror of
https://github.com/EnterpriseDB/repmgr.git
synced 2026-03-22 22:56:29 +00:00
Improve replication connection check
Previously the check verifying that a node has connected to its upstream merely assumed the presence of a record in pg_stat_replication indicates a successful replication connection. However the record may contain a state other than "streaming", typically "startup" (which will occur when a node has diverged from its upstream and will therefore never transition to "streaming"), which needs to be taken into account when considering the state of the replication connection to avoid false positives.
This commit is contained in:
56
dbutils.c
56
dbutils.c
@@ -5720,16 +5720,16 @@ get_node_replication_stats(PGconn *conn, t_node_info *node_info)
|
||||
|
||||
|
||||
NodeAttached
|
||||
is_downstream_node_attached(PGconn *conn, char *node_name)
|
||||
is_downstream_node_attached(PGconn *conn, char *node_name, char **node_state)
|
||||
{
|
||||
PQExpBufferData query;
|
||||
PGresult *res = NULL;
|
||||
int c = 0;
|
||||
const char *state = NULL;
|
||||
|
||||
initPQExpBuffer(&query);
|
||||
|
||||
appendPQExpBuffer(&query,
|
||||
" SELECT pg_catalog.count(*) "
|
||||
" SELECT pid, state "
|
||||
" FROM pg_catalog.pg_stat_replication "
|
||||
" WHERE application_name = '%s'",
|
||||
node_name);
|
||||
@@ -5748,31 +5748,53 @@ is_downstream_node_attached(PGconn *conn, char *node_name)
|
||||
return NODE_ATTACHED_UNKNOWN;
|
||||
}
|
||||
|
||||
if (PQntuples(res) != 1)
|
||||
{
|
||||
log_verbose(LOG_WARNING, _("unexpected number of tuples (%i) returned"), PQntuples(res));
|
||||
termPQExpBuffer(&query);
|
||||
|
||||
/*
|
||||
* If there's more than one entry in pg_stat_application, there's no
|
||||
* way we can reliably determine which one belongs to the node we're
|
||||
* checking, so there's nothing more we can do.
|
||||
*/
|
||||
if (PQntuples(res) > 1)
|
||||
{
|
||||
log_error(_("multiple entries with \"application_name\" set to \"%s\" found in \"pg_stat_replication\""),
|
||||
node_name);
|
||||
log_hint(_("verify that a unique node name is configured for each node"));
|
||||
|
||||
termPQExpBuffer(&query);
|
||||
PQclear(res);
|
||||
|
||||
return NODE_ATTACHED_UNKNOWN;
|
||||
}
|
||||
|
||||
c = atoi(PQgetvalue(res, 0, 0));
|
||||
|
||||
termPQExpBuffer(&query);
|
||||
PQclear(res);
|
||||
|
||||
if (c == 0)
|
||||
if (PQntuples(res) == 0)
|
||||
{
|
||||
log_verbose(LOG_WARNING, _("node \"%s\" not found in \"pg_stat_replication\""), node_name);
|
||||
log_warning(_("node \"%s\" not found in \"pg_stat_replication\""), node_name);
|
||||
|
||||
PQclear(res);
|
||||
|
||||
return NODE_DETACHED;
|
||||
}
|
||||
|
||||
if (c > 1)
|
||||
log_verbose(LOG_WARNING, _("multiple entries with \"application_name\" set to \"%s\" found in \"pg_stat_replication\""),
|
||||
node_name);
|
||||
state = PQgetvalue(res, 0, 1);
|
||||
|
||||
if (node_state != NULL)
|
||||
{
|
||||
*node_state = palloc0(strlen(state) + 1);
|
||||
strncpy(*node_state, state, strlen(state));
|
||||
}
|
||||
|
||||
if (strcmp(state, "streaming") != 0)
|
||||
{
|
||||
log_warning(_("node \"%s\" attached in state \"%s\""),
|
||||
node_name,
|
||||
state);
|
||||
|
||||
PQclear(res);
|
||||
|
||||
return NODE_NOT_ATTACHED;
|
||||
}
|
||||
|
||||
PQclear(res);
|
||||
|
||||
return NODE_ATTACHED;
|
||||
}
|
||||
|
||||
11
dbutils.h
11
dbutils.h
@@ -119,9 +119,14 @@ typedef enum
|
||||
|
||||
typedef enum
|
||||
{
|
||||
/* unable to query "pg_stat_replication" or other error */
|
||||
NODE_ATTACHED_UNKNOWN = -1,
|
||||
NODE_DETACHED,
|
||||
NODE_ATTACHED
|
||||
/* node has record in "pg_stat_replication" and state is not "streaming" */
|
||||
NODE_ATTACHED,
|
||||
/* node has record in "pg_stat_replication" but state is not "streaming" */
|
||||
NODE_NOT_ATTACHED,
|
||||
/* node has no record in "pg_stat_replication" */
|
||||
NODE_DETACHED
|
||||
} NodeAttached;
|
||||
|
||||
typedef enum
|
||||
@@ -589,7 +594,7 @@ bool get_replication_info(PGconn *conn, t_server_type node_type, ReplInfo *repl
|
||||
int get_replication_lag_seconds(PGconn *conn);
|
||||
TimeLineID get_node_timeline(PGconn *conn, char *timeline_id_str);
|
||||
void get_node_replication_stats(PGconn *conn, t_node_info *node_info);
|
||||
NodeAttached is_downstream_node_attached(PGconn *conn, char *node_name);
|
||||
NodeAttached is_downstream_node_attached(PGconn *conn, char *node_name, char **node_state);
|
||||
void set_upstream_last_seen(PGconn *conn, int upstream_node_id);
|
||||
int get_upstream_last_seen(PGconn *conn, t_server_type node_type);
|
||||
|
||||
|
||||
@@ -303,7 +303,7 @@ do_node_status(void)
|
||||
continue;
|
||||
}
|
||||
|
||||
if (is_downstream_node_attached(conn, node_cell->node_info->node_name) != NODE_ATTACHED)
|
||||
if (is_downstream_node_attached(conn, node_cell->node_info->node_name, NULL) != NODE_ATTACHED)
|
||||
{
|
||||
missing_nodes_count++;
|
||||
item_list_append_format(&missing_nodes,
|
||||
@@ -1227,7 +1227,7 @@ do_node_check_downstream(PGconn *conn, OutputMode mode, t_node_info *node_info,
|
||||
continue;
|
||||
}
|
||||
|
||||
if (is_downstream_node_attached(conn, cell->node_info->node_name) != NODE_ATTACHED)
|
||||
if (is_downstream_node_attached(conn, cell->node_info->node_name, NULL) != NODE_ATTACHED)
|
||||
{
|
||||
missing_nodes_count++;
|
||||
item_list_append_format(&missing_nodes,
|
||||
@@ -1410,7 +1410,7 @@ do_node_check_upstream(PGconn *conn, OutputMode mode, t_node_info *node_info, Ch
|
||||
upstream_conn = establish_db_connection(upstream_node_info.conninfo, true);
|
||||
|
||||
/* check our node is connected */
|
||||
if (is_downstream_node_attached(upstream_conn, config_file_options.node_name) != NODE_ATTACHED)
|
||||
if (is_downstream_node_attached(upstream_conn, config_file_options.node_name, NULL) != NODE_ATTACHED)
|
||||
{
|
||||
appendPQExpBuffer(&details,
|
||||
_("node \"%s\" (ID: %i) is not attached to expected upstream node \"%s\" (ID: %i)"),
|
||||
@@ -2799,7 +2799,7 @@ do_node_rejoin(void)
|
||||
config_file_options.node_rejoin_timeout);
|
||||
}
|
||||
else {
|
||||
log_detail(_("no record for local node \"%s\" found in node \"%s\"'s \"pg_stat_replication\" table"),
|
||||
log_detail(_("no active record for local node \"%s\" found in node \"%s\"'s \"pg_stat_replication\" table"),
|
||||
config_file_options.node_name,
|
||||
primary_node_record.node_name);
|
||||
}
|
||||
@@ -2811,7 +2811,7 @@ do_node_rejoin(void)
|
||||
else
|
||||
{
|
||||
/* -W/--no-wait provided - check once */
|
||||
NodeAttached node_attached = is_downstream_node_attached(primary_conn, config_file_options.node_name);
|
||||
NodeAttached node_attached = is_downstream_node_attached(primary_conn, config_file_options.node_name, NULL);
|
||||
if (node_attached == NODE_ATTACHED)
|
||||
success = true;
|
||||
}
|
||||
|
||||
@@ -1797,7 +1797,7 @@ do_standby_register(void)
|
||||
else
|
||||
{
|
||||
/* check our standby is connected */
|
||||
if (is_downstream_node_attached(upstream_conn, config_file_options.node_name) == NODE_ATTACHED)
|
||||
if (is_downstream_node_attached(upstream_conn, config_file_options.node_name, NULL) == NODE_ATTACHED)
|
||||
{
|
||||
log_verbose(LOG_INFO, _("local node is attached to specified upstream node %i"), runtime_options.upstream_node_id);
|
||||
}
|
||||
@@ -1856,7 +1856,7 @@ do_standby_register(void)
|
||||
primary_node_id);
|
||||
|
||||
/* check our standby is connected */
|
||||
if (is_downstream_node_attached(primary_conn, config_file_options.node_name) == NODE_ATTACHED)
|
||||
if (is_downstream_node_attached(primary_conn, config_file_options.node_name, NULL) == NODE_ATTACHED)
|
||||
{
|
||||
log_verbose(LOG_INFO, _("local node is attached to primary"));
|
||||
}
|
||||
@@ -3073,7 +3073,9 @@ do_standby_follow(void)
|
||||
|
||||
for (timer = 0; timer < config_file_options.standby_follow_timeout; timer++)
|
||||
{
|
||||
NodeAttached node_attached = is_downstream_node_attached(follow_target_conn, config_file_options.node_name);
|
||||
NodeAttached node_attached = is_downstream_node_attached(follow_target_conn,
|
||||
config_file_options.node_name,
|
||||
NULL);
|
||||
|
||||
if (node_attached == NODE_ATTACHED)
|
||||
{
|
||||
@@ -3711,7 +3713,7 @@ do_standby_switchover(void)
|
||||
exit(ERR_BAD_CONFIG);
|
||||
}
|
||||
|
||||
if (is_downstream_node_attached(remote_conn, local_node_record.node_name) != NODE_ATTACHED)
|
||||
if (is_downstream_node_attached(remote_conn, local_node_record.node_name, NULL) != NODE_ATTACHED)
|
||||
{
|
||||
log_error(_("local node \"%s\" (ID: %i) is not attached to demotion candidate \"%s\" (ID: %i)"),
|
||||
local_node_record.node_name,
|
||||
@@ -5195,7 +5197,8 @@ do_standby_switchover(void)
|
||||
*/
|
||||
|
||||
node_attached = is_downstream_node_attached(local_conn,
|
||||
remote_node_record.node_name);
|
||||
remote_node_record.node_name,
|
||||
NULL);
|
||||
if (node_attached == NODE_ATTACHED)
|
||||
{
|
||||
switchover_success = true;
|
||||
|
||||
@@ -2351,6 +2351,7 @@ format_node_status(t_node_info *node_info, PQExpBufferData *node_status, PQExpBu
|
||||
* connected to the upstream
|
||||
*/
|
||||
NodeAttached attached_to_upstream = NODE_ATTACHED_UNKNOWN;
|
||||
char *replication_state = NULL;
|
||||
t_node_info upstream_node_rec = T_NODE_INFO_INITIALIZER;
|
||||
RecordStatus upstream_node_rec_found = get_node_record(node_info->conn,
|
||||
node_info->upstream_node_id,
|
||||
@@ -2378,7 +2379,7 @@ format_node_status(t_node_info *node_info, PQExpBufferData *node_status, PQExpBu
|
||||
}
|
||||
else
|
||||
{
|
||||
attached_to_upstream = is_downstream_node_attached(upstream_conn, node_info->node_name);
|
||||
attached_to_upstream = is_downstream_node_attached(upstream_conn, node_info->node_name, &replication_state);
|
||||
}
|
||||
|
||||
PQfinish(upstream_conn);
|
||||
@@ -2394,6 +2395,18 @@ format_node_status(t_node_info *node_info, PQExpBufferData *node_status, PQExpBu
|
||||
upstream_node_rec.node_name,
|
||||
upstream_node_rec.node_id);
|
||||
}
|
||||
if (attached_to_upstream == NODE_NOT_ATTACHED)
|
||||
{
|
||||
appendPQExpBufferStr(upstream, "? ");
|
||||
item_list_append_format(warnings,
|
||||
"node \"%s\" (ID: %i) attached to its upstream node \"%s\" (ID: %i) in state \"%s\"",
|
||||
node_info->node_name,
|
||||
node_info->node_id,
|
||||
upstream_node_rec.node_name,
|
||||
upstream_node_rec.node_id,
|
||||
replication_state);
|
||||
}
|
||||
|
||||
else if (attached_to_upstream == NODE_DETACHED)
|
||||
{
|
||||
appendPQExpBufferStr(upstream, "! ");
|
||||
@@ -3986,8 +3999,10 @@ check_standby_join(PGconn *upstream_conn, t_node_info *upstream_node_record, t_n
|
||||
|
||||
for (; i < config_file_options.node_rejoin_timeout; i++)
|
||||
{
|
||||
char *node_state = NULL;
|
||||
NodeAttached node_attached = is_downstream_node_attached(upstream_conn,
|
||||
standby_node_record->node_name);
|
||||
standby_node_record->node_name,
|
||||
&node_state);
|
||||
if (node_attached == NODE_ATTACHED)
|
||||
{
|
||||
log_verbose(LOG_INFO, _("node \"%s\" (ID: %i) has attached to its upstream node"),
|
||||
@@ -4004,9 +4019,19 @@ check_standby_join(PGconn *upstream_conn, t_node_info *upstream_node_record, t_n
|
||||
i + 1,
|
||||
config_file_options.node_rejoin_timeout);
|
||||
|
||||
log_detail(_("checking for record in node \"%s\"'s \"pg_stat_replication\" table where \"application_name\" is \"%s\""),
|
||||
upstream_node_record->node_name,
|
||||
standby_node_record->node_name);
|
||||
if (node_attached == NODE_NOT_ATTACHED)
|
||||
{
|
||||
log_detail(_("node \"%s\" (ID: %i) is currrently attached to its upstream node in state \"%s\""),
|
||||
upstream_node_record->node_name,
|
||||
standby_node_record->node_id,
|
||||
node_state);
|
||||
}
|
||||
else
|
||||
{
|
||||
log_detail(_("checking for record in node \"%s\"'s \"pg_stat_replication\" table where \"application_name\" is \"%s\""),
|
||||
upstream_node_record->node_name,
|
||||
standby_node_record->node_name);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user