Refactor follow verification to reduce need for CHECKPOINT

A CHECKPOINT is not always required; hopefully we can narrow it down
to one corner case where we need to determine the minium recovery
location.

Also get local timeline ID via IDENTIFY_SYSTEM, as fetching it from
pg_control risks returning the prior timeline ID if the timeline
switch has just taken place and no restart point has yet occurred.
This commit is contained in:
Ian Barwick
2018-12-04 15:26:05 +09:00
parent 10d46f7e85
commit 313aa3c5d7
2 changed files with 136 additions and 44 deletions

View File

@@ -1624,7 +1624,7 @@ get_timeline_history(PGconn *repl_conn, TimeLineID tli)
termPQExpBuffer(&query); termPQExpBuffer(&query);
if (PQnfields(res) != 2 || PQntuples(res) != 1) if (PQntuples(res) != 1 || PQnfields(res) != 2)
{ {
log_error(_("unexpected response to TIMELINE_HISTORY command")); log_error(_("unexpected response to TIMELINE_HISTORY command"));
log_detail(_("got %i rows and %i fields, expected %i rows and %i fields"), log_detail(_("got %i rows and %i fields, expected %i rows and %i fields"),
@@ -1645,12 +1645,13 @@ get_timeline_history(PGconn *repl_conn, TimeLineID tli)
return NULL; return NULL;
} }
PQclear(res);
history = (TimeLineHistoryEntry *) palloc(sizeof(TimeLineHistoryEntry)); history = (TimeLineHistoryEntry *) palloc(sizeof(TimeLineHistoryEntry));
history->tli = file_tli; history->tli = file_tli;
history->begin = InvalidXLogRecPtr; /* we don't care about this */ history->begin = InvalidXLogRecPtr; /* we don't care about this */
history->end = ((uint64) (switchpoint_hi)) << 32 | (uint64) switchpoint_lo; history->end = ((uint64) (switchpoint_hi)) << 32 | (uint64) switchpoint_lo;
PQclear(res);
return history; return history;
} }

View File

@@ -2204,8 +2204,8 @@ do_standby_follow(void)
int follow_error_code = SUCCESS; int follow_error_code = SUCCESS;
uint64 local_system_identifier = UNKNOWN_SYSTEM_IDENTIFIER; uint64 local_system_identifier = UNKNOWN_SYSTEM_IDENTIFIER;
t_conninfo_param_list repl_conninfo = T_CONNINFO_PARAM_LIST_INITIALIZER; t_conninfo_param_list follow_target_repl_conninfo = T_CONNINFO_PARAM_LIST_INITIALIZER;
PGconn *repl_conn = NULL; PGconn *follow_target_repl_conn = NULL;
t_system_identification follow_target_identification = T_SYSTEM_IDENTIFICATION_INITIALIZER; t_system_identification follow_target_identification = T_SYSTEM_IDENTIFICATION_INITIALIZER;
TimeLineHistoryEntry *follow_target_history = NULL; TimeLineHistoryEntry *follow_target_history = NULL;
@@ -2364,7 +2364,11 @@ do_standby_follow(void)
event_info.node_name = follow_target_node_record.node_name; event_info.node_name = follow_target_node_record.node_name;
event_info.conninfo_str = follow_target_node_record.conninfo; event_info.conninfo_str = follow_target_node_record.conninfo;
// check whether follow target is in recovery, format message /*
* Check whether follow target is in recovery, so we know later whether
* we'll need to open a connection to the primary to update the metadata.
* Also emit an informative message.
*/
{ {
PQExpBufferData node_info_msg; PQExpBufferData node_info_msg;
RecoveryType recovery_type = RECTYPE_UNKNOWN; RecoveryType recovery_type = RECTYPE_UNKNOWN;
@@ -2446,21 +2450,23 @@ do_standby_follow(void)
/* XXX check this is not current upstream anyway */ /* XXX check this is not current upstream anyway */
/* check replication connection */ /* check replication connection */
initialize_conninfo_params(&repl_conninfo, false); initialize_conninfo_params(&follow_target_repl_conninfo, false);
conn_to_param_list(follow_target_conn, &repl_conninfo); conn_to_param_list(follow_target_conn, &follow_target_repl_conninfo);
if (strcmp(param_get(&repl_conninfo, "user"), follow_target_node_record.repluser) != 0) if (strcmp(param_get(&follow_target_repl_conninfo, "user"), follow_target_node_record.repluser) != 0)
{ {
param_set(&repl_conninfo, "user", follow_target_node_record.repluser); param_set(&follow_target_repl_conninfo, "user", follow_target_node_record.repluser);
param_set(&repl_conninfo, "dbname", "replication"); param_set(&follow_target_repl_conninfo, "dbname", "replication");
} }
param_set(&repl_conninfo, "replication", "1"); param_set(&follow_target_repl_conninfo, "replication", "1");
repl_conn = establish_db_connection_by_params(&repl_conninfo, false); follow_target_repl_conn = establish_db_connection_by_params(&follow_target_repl_conninfo, false);
if (PQstatus(repl_conn) != CONNECTION_OK) free_conninfo_params(&follow_target_repl_conninfo);
if (PQstatus(follow_target_repl_conn) != CONNECTION_OK)
{ {
log_error(_("unable to establish a replication connection to the follow target node")); log_error(_("unable to establish a replication connection to the follow target node"));
PQfinish(follow_target_conn); PQfinish(follow_target_conn);
@@ -2475,13 +2481,13 @@ do_standby_follow(void)
/* check system_identifiers match */ /* check system_identifiers match */
local_system_identifier = get_system_identifier(config_file_options.data_directory); local_system_identifier = get_system_identifier(config_file_options.data_directory);
success = identify_system(repl_conn, &follow_target_identification); success = identify_system(follow_target_repl_conn, &follow_target_identification);
if (success == false) if (success == false)
{ {
log_error(_("unable to query the follow target node's system identification")); log_error(_("unable to query the follow target node's system identification"));
PQfinish(follow_target_conn); PQfinish(follow_target_conn);
PQfinish(repl_conn); PQfinish(follow_target_repl_conn);
exit(ERR_FOLLOW_FAIL); exit(ERR_FOLLOW_FAIL);
} }
@@ -2492,7 +2498,7 @@ do_standby_follow(void)
local_system_identifier, local_system_identifier,
follow_target_identification.system_identifier); follow_target_identification.system_identifier);
PQfinish(follow_target_conn); PQfinish(follow_target_conn);
PQfinish(repl_conn); PQfinish(follow_target_repl_conn);
exit(ERR_FOLLOW_FAIL); exit(ERR_FOLLOW_FAIL);
} }
else if (runtime_options.dry_run == true) else if (runtime_options.dry_run == true)
@@ -2501,42 +2507,80 @@ do_standby_follow(void)
log_detail(_("system identifier is %lu"), local_system_identifier); log_detail(_("system identifier is %lu"), local_system_identifier);
} }
/*
* Execute CHECKPOINT on the local node - we'll need this to update
* the pg_control file so we can compare positions with the new upstream.
* There is no way of avoiding this for --dry-run.
*/
checkpoint(local_conn);
/* /*
* Here we'll perform some sanity checks to ensure the follow target * Here we'll perform some timeline sanity checks to ensure the follow target
* can actually be followed. * can actually be followed.
*/ */
{ {
t_conninfo_param_list local_repl_conninfo = T_CONNINFO_PARAM_LIST_INITIALIZER;
PGconn *local_repl_conn = NULL;
t_system_identification local_identification = T_SYSTEM_IDENTIFICATION_INITIALIZER;
/*
* Check local replication connection - we want to execute IDENTIFY_SYSTEM
* to get the current timeline ID, which might not yet be written to
* pg_control.
*
* TODO: from 9.6, query "pg_stat_wal_receiver" via the existing local connection
*/
initialize_conninfo_params(&local_repl_conninfo, false);
conn_to_param_list(local_conn, &local_repl_conninfo);
param_set(&local_repl_conninfo, "replication", "1");
local_repl_conn = establish_db_connection_by_params(&local_repl_conninfo, false);
free_conninfo_params(&local_repl_conninfo);
if (PQstatus(local_repl_conn) != CONNECTION_OK)
{
log_error(_("unable to establish a replication connection to the local node"));
PQfinish(follow_target_conn);
PQfinish(local_conn);
exit(ERR_FOLLOW_FAIL);
}
else if (runtime_options.dry_run == true)
{
log_info(_("replication connection to the local node was successful"));
}
success = identify_system(local_repl_conn, &local_identification);
if (success == false)
{
log_error(_("unable to query the local node's system identification"));
PQfinish(follow_target_conn);
PQfinish(follow_target_repl_conn);
PQfinish(local_conn);
PQfinish(local_repl_conn);
exit(ERR_FOLLOW_FAIL);
}
PQfinish(local_repl_conn);
/* check timelines */ /* check timelines */
TimeLineID local_timeline = get_timeline(config_file_options.data_directory);
XLogRecPtr min_recovery_location = get_min_recovery_location(config_file_options.data_directory);
log_verbose(LOG_DEBUG, "local timeline: %i; follow target timeline: %i", log_verbose(LOG_DEBUG, "local timeline: %i; follow target timeline: %i",
local_timeline, local_identification.timeline,
follow_target_identification.timeline); follow_target_identification.timeline);
/* upstream's timeline is lower than ours - impossible case */ /* upstream's timeline is lower than ours - impossible case */
if (follow_target_identification.timeline < local_timeline) if (follow_target_identification.timeline < local_identification.timeline)
{ {
log_error(_("this node's timeline is ahead of the follow target node's timeline")); log_error(_("this node's timeline is ahead of the follow target node's timeline"));
log_detail(_("this node's timeline is %i, follow target node's timeline is %i"), log_detail(_("this node's timeline is %i, follow target node's timeline is %i"),
local_timeline, local_identification.timeline,
follow_target_identification.timeline); follow_target_identification.timeline);
PQfinish(follow_target_conn); PQfinish(follow_target_conn);
PQfinish(repl_conn); PQfinish(follow_target_repl_conn);
PQfinish(local_conn); PQfinish(local_conn);
exit(ERR_FOLLOW_FAIL); exit(ERR_FOLLOW_FAIL);
} }
if (follow_target_identification.timeline == local_timeline) if (follow_target_identification.timeline == local_identification.timeline)
{ {
XLogRecPtr local_xlogpos = get_current_lsn(local_conn); XLogRecPtr local_xlogpos = get_current_lsn(local_conn);
XLogRecPtr follow_target_xlogpos = get_current_lsn(follow_target_conn); XLogRecPtr follow_target_xlogpos = get_current_lsn(follow_target_conn);
@@ -2545,7 +2589,7 @@ do_standby_follow(void)
{ {
log_error(_("unable to compare LSN positions")); log_error(_("unable to compare LSN positions"));
PQfinish(follow_target_conn); PQfinish(follow_target_conn);
PQfinish(repl_conn); PQfinish(follow_target_repl_conn);
PQfinish(local_conn); PQfinish(local_conn);
exit(ERR_FOLLOW_FAIL); exit(ERR_FOLLOW_FAIL);
} }
@@ -2565,38 +2609,86 @@ do_standby_follow(void)
format_lsn(local_xlogpos), format_lsn(local_xlogpos),
format_lsn(follow_target_xlogpos)); format_lsn(follow_target_xlogpos));
PQfinish(follow_target_conn); PQfinish(follow_target_conn);
PQfinish(repl_conn); PQfinish(follow_target_repl_conn);
PQfinish(local_conn); PQfinish(local_conn);
exit(ERR_FOLLOW_FAIL); exit(ERR_FOLLOW_FAIL);
} }
} }
else else
{ {
/* upstream has higher timeline - check where it forked off from this node's timeline */ XLogRecPtr local_xlogpos = get_current_lsn(local_conn);
follow_target_history = get_timeline_history(repl_conn, local_timeline + 1); bool can_follow = true;
XLogRecPtr local_min_recovery_location = InvalidXLogRecPtr;
/*
* upstream has higher timeline - check where it forked off from this node's timeline
*/
follow_target_history = get_timeline_history(follow_target_repl_conn, local_identification.timeline + 1);
if (follow_target_history == NULL) if (follow_target_history == NULL)
{ {
/* get_timeline_history() will emit relevant error messages */
PQfinish(follow_target_conn); PQfinish(follow_target_conn);
PQfinish(repl_conn); PQfinish(follow_target_repl_conn);
PQfinish(local_conn); PQfinish(local_conn);
exit(ERR_FOLLOW_FAIL); exit(ERR_FOLLOW_FAIL);
} }
log_debug("upstream tli: %i; branch LSN: %X/%X", local_min_recovery_location = get_min_recovery_location(config_file_options.data_directory);
follow_target_history->tli, format_lsn(follow_target_history->end));
if (follow_target_history->end < min_recovery_location) /*
* Local node has proceeded beyond the follow target's fork, so we
* definitely can't attach.
*
* This could be the case if the follow target was promoted, but does
* not contain all changes which are being replayed to this standby.
*/
if (local_xlogpos > follow_target_history->end)
{
can_follow = false;
}
else
{
/*
* XXX can we establish what the window is where we *need* to execute
* a CHECKPOINT?
*/
/*
* Execute CHECKPOINT on the local node - we'll need this to update
* the pg_control file so we can compare positions with the new upstream.
* There is no way of avoiding this for --dry-run.
*/
if (is_superuser_connection(local_conn, NULL) == true)
{
log_notice(_("executing CHECKPOINT"));
checkpoint(local_conn);
}
else
{
log_warning(_("connection is not a superuser, unable to execute CHECKPOINT"));
log_detail(_("a CHECKPOINT is required in order to compare local and follow target states"));
}
log_debug("upstream tli: %i; branch LSN: %X/%X",
follow_target_history->tli, format_lsn(follow_target_history->end));
if (follow_target_history->end < local_min_recovery_location)
can_follow = false;
}
if (can_follow == false)
{ {
log_error(_("this node cannot attach to follow target node %i"), log_error(_("this node cannot attach to follow target node %i"),
follow_target_node_id); follow_target_node_id);
log_detail(_("follow target server's timeline %i forked off current database system timeline %i before current recovery point %X/%X\n"), log_detail(_("follow target server's timeline %i forked off current database system timeline %i before current recovery point %X/%X\n"),
local_timeline + 1, local_identification.timeline + 1,
local_timeline, local_identification.timeline,
format_lsn(min_recovery_location)); format_lsn(local_min_recovery_location));
PQfinish(follow_target_conn); PQfinish(follow_target_conn);
PQfinish(repl_conn); PQfinish(follow_target_repl_conn);
PQfinish(local_conn); PQfinish(local_conn);
exit(ERR_FOLLOW_FAIL); exit(ERR_FOLLOW_FAIL);
} }
@@ -2605,8 +2697,7 @@ do_standby_follow(void)
PQfinish(local_conn); PQfinish(local_conn);
PQfinish(repl_conn); PQfinish(follow_target_repl_conn);
free_conninfo_params(&repl_conninfo);
if (runtime_options.dry_run == true) if (runtime_options.dry_run == true)
{ {