Refactor upstream attachment check code

Move it from the "standby follow" code to an independent function so it can
be used in other contexts, e.g. "node rejoin".
This commit is contained in:
Ian Barwick
2019-01-23 15:03:40 +09:00
parent 42fa9a2a88
commit 3f5762e03a
4 changed files with 184 additions and 177 deletions

View File

@@ -2218,12 +2218,6 @@ do_standby_follow(void)
bool success = false;
int follow_error_code = SUCCESS;
uint64 local_system_identifier = UNKNOWN_SYSTEM_IDENTIFIER;
t_conninfo_param_list follow_target_repl_conninfo = T_CONNINFO_PARAM_LIST_INITIALIZER;
PGconn *follow_target_repl_conn = NULL;
t_system_identification follow_target_identification = T_SYSTEM_IDENTIFICATION_INITIALIZER;
TimeLineHistoryEntry *follow_target_history = NULL;
log_verbose(LOG_DEBUG, "do_standby_follow()");
local_conn = establish_db_connection(config_file_options.conninfo, false);
@@ -2466,82 +2460,23 @@ do_standby_follow(void)
/* XXX check this is not current upstream anyway */
/* check replication connection */
initialize_conninfo_params(&follow_target_repl_conninfo, false);
conn_to_param_list(follow_target_conn, &follow_target_repl_conninfo);
if (strcmp(param_get(&follow_target_repl_conninfo, "user"), follow_target_node_record.repluser) != 0)
{
param_set(&follow_target_repl_conninfo, "user", follow_target_node_record.repluser);
param_set(&follow_target_repl_conninfo, "dbname", "replication");
}
param_set(&follow_target_repl_conninfo, "replication", "1");
follow_target_repl_conn = establish_db_connection_by_params(&follow_target_repl_conninfo, false);
free_conninfo_params(&follow_target_repl_conninfo);
if (PQstatus(follow_target_repl_conn) != CONNECTION_OK)
{
log_error(_("unable to establish a replication connection to the follow target node"));
PQfinish(follow_target_conn);
exit(ERR_FOLLOW_FAIL);
}
else if (runtime_options.dry_run == true)
{
log_info(_("replication connection to the follow target node was successful"));
}
/* check system_identifiers match */
local_system_identifier = get_system_identifier(config_file_options.data_directory);
success = identify_system(follow_target_repl_conn, &follow_target_identification);
if (success == false)
{
log_error(_("unable to query the follow target node's system identification"));
PQfinish(follow_target_conn);
PQfinish(follow_target_repl_conn);
exit(ERR_FOLLOW_FAIL);
}
if (follow_target_identification.system_identifier != local_system_identifier)
{
log_error(_("this node is not part of the follow target node's replication cluster"));
log_detail(_("this node's system identifier is %lu, follow target node's system identifier is %lu"),
local_system_identifier,
follow_target_identification.system_identifier);
PQfinish(follow_target_conn);
PQfinish(follow_target_repl_conn);
exit(ERR_FOLLOW_FAIL);
}
else if (runtime_options.dry_run == true)
{
log_info(_("local and follow target system identifiers match"));
log_detail(_("system identifier is %lu"), local_system_identifier);
}
/*
* Here we'll perform some timeline sanity checks to ensure the follow target
* can actually be followed.
*/
/* check if we can attach to the follow target */
{
t_conninfo_param_list local_repl_conninfo = T_CONNINFO_PARAM_LIST_INITIALIZER;
PGconn *local_repl_conn = NULL;
t_system_identification local_identification = T_SYSTEM_IDENTIFICATION_INITIALIZER;
/*
* Check local replication connection - we want to execute IDENTIFY_SYSTEM
bool can_follow;
XLogRecPtr local_xlogpos = get_current_lsn(local_conn);
/* Check local replication connection - we want to execute IDENTIFY_SYSTEM
* to get the current timeline ID, which might not yet be written to
* pg_control.
*
* TODO: from 9.6, query "pg_stat_wal_receiver" via the existing local connection
*/
initialize_conninfo_params(&local_repl_conninfo, false);
conn_to_param_list(local_conn, &local_repl_conninfo);
@@ -2554,8 +2489,8 @@ do_standby_follow(void)
if (PQstatus(local_repl_conn) != CONNECTION_OK)
{
log_error(_("unable to establish a replication connection to the local node"));
PQfinish(follow_target_conn);
PQfinish(local_conn);
PQfinish(follow_target_conn);
exit(ERR_FOLLOW_FAIL);
}
else if (runtime_options.dry_run == true)
@@ -2568,121 +2503,27 @@ do_standby_follow(void)
if (success == false)
{
log_error(_("unable to query the local node's system identification"));
PQfinish(follow_target_conn);
PQfinish(follow_target_repl_conn);
PQfinish(local_conn);
PQfinish(local_repl_conn);
PQfinish(follow_target_conn);
exit(ERR_FOLLOW_FAIL);
}
PQfinish(local_repl_conn);
/* check timelines */
can_follow = check_node_can_attach(local_identification.timeline,
local_xlogpos,
follow_target_conn,
&follow_target_node_record);
log_verbose(LOG_DEBUG, "local timeline: %i; follow target timeline: %i",
local_identification.timeline,
follow_target_identification.timeline);
/* upstream's timeline is lower than ours - impossible case */
if (follow_target_identification.timeline < local_identification.timeline)
if (can_follow == false)
{
log_error(_("this node's timeline is ahead of the follow target node's timeline"));
log_detail(_("this node's timeline is %i, follow target node's timeline is %i"),
local_identification.timeline,
follow_target_identification.timeline);
PQfinish(follow_target_conn);
PQfinish(follow_target_repl_conn);
PQfinish(local_conn);
PQfinish(follow_target_conn);
exit(ERR_FOLLOW_FAIL);
}
if (follow_target_identification.timeline == local_identification.timeline)
{
XLogRecPtr local_xlogpos = get_current_lsn(local_conn);
XLogRecPtr follow_target_xlogpos = get_current_lsn(follow_target_conn);
if (local_xlogpos == InvalidXLogRecPtr || follow_target_xlogpos == InvalidXLogRecPtr)
{
log_error(_("unable to compare LSN positions"));
PQfinish(follow_target_conn);
PQfinish(follow_target_repl_conn);
PQfinish(local_conn);
exit(ERR_FOLLOW_FAIL);
}
/* timeline is the same - check relative positions */
if (local_xlogpos <= follow_target_xlogpos)
{
log_info(_("timelines are same, this server is not ahead"));
log_detail(_("local node lsn is %X/%X, follow target lsn is %X/%X"),
format_lsn(local_xlogpos),
format_lsn(follow_target_xlogpos));
}
else
{
log_error(_("this node is ahead of the follow target"));
log_detail(_("local node lsn is %X/%X, follow target lsn is %X/%X"),
format_lsn(local_xlogpos),
format_lsn(follow_target_xlogpos));
PQfinish(follow_target_conn);
PQfinish(follow_target_repl_conn);
PQfinish(local_conn);
exit(ERR_FOLLOW_FAIL);
}
}
else
{
XLogRecPtr local_xlogpos = get_current_lsn(local_conn);
/*
* upstream has higher timeline - check where it forked off from this node's timeline
*/
follow_target_history = get_timeline_history(follow_target_repl_conn, local_identification.timeline + 1);
if (follow_target_history == NULL)
{
/* get_timeline_history() will emit relevant error messages */
PQfinish(follow_target_conn);
PQfinish(follow_target_repl_conn);
PQfinish(local_conn);
exit(ERR_FOLLOW_FAIL);
}
/*
* Local node has proceeded beyond the follow target's fork, so we
* definitely can't attach.
*
* This could be the case if the follow target was promoted, but does
* not contain all changes which are being replayed to this standby.
*/
if (local_xlogpos > follow_target_history->end)
{
log_error(_("this node cannot attach to follow target node %i"),
follow_target_node_id);
log_detail(_("follow target server's timeline %i forked off current database system timeline %i before current recovery point %X/%X\n"),
local_identification.timeline + 1,
local_identification.timeline,
format_lsn(local_xlogpos));
PQfinish(follow_target_conn);
PQfinish(local_conn);
exit(ERR_FOLLOW_FAIL);
}
if (runtime_options.dry_run == true)
{
log_info(_("local node %i can follow target node %i"),
config_file_options.node_id,
follow_target_node_id);
log_detail(_("local node's recovery point: %X/%X; follow target node's fork point: %X/%X"),
format_lsn(local_xlogpos),
format_lsn(follow_target_history->end));
}
}
}
PQfinish(local_conn);
PQfinish(follow_target_repl_conn);
if (runtime_options.dry_run == true)
{