"standby follow": verify status of follow target

This commit adds infrastruture for repmgr to be able to check
whether one standby can attach to another node, regardless whether
it is a standby or a primary.

This is intended to prevent a node from attempting to follow a
node whose timeline has diverged. The --dry-run option makes
it possible to test a follow operation before it is carried out.

As a useful side-effect this makes it possible for a standby to
follow another standby.

This is an initial implementation; documentation and possibly
further changes to follow.
This commit is contained in:
Ian Barwick
2018-11-29 17:14:38 +09:00
parent c53782cda3
commit 9e90fcd584
5 changed files with 355 additions and 97 deletions

View File

@@ -114,7 +114,7 @@ get_db_state(const char *data_directory)
}
extern XLogRecPtr
XLogRecPtr
get_latest_checkpoint_location(const char *data_directory)
{
ControlFileInfo *control_file_info = NULL;
@@ -171,6 +171,38 @@ describe_db_state(DBState state)
}
TimeLineID
get_timeline(const char *data_directory)
{
ControlFileInfo *control_file_info = NULL;
TimeLineID timeline = -1;
control_file_info = get_controlfile(data_directory);
timeline = (int) control_file_info->timeline;
pfree(control_file_info);
return timeline;
}
XLogRecPtr
get_min_recovery_location(const char *data_directory)
{
ControlFileInfo *control_file_info = NULL;
XLogRecPtr minRecoveryPoint = InvalidXLogRecPtr;
control_file_info = get_controlfile(data_directory);
minRecoveryPoint = control_file_info->minRecoveryPoint;
pfree(control_file_info);
return minRecoveryPoint;
}
/*
* We maintain our own version of get_controlfile() as we need cross-version
* compatibility, and also don't care if the file isn't readable.
@@ -193,6 +225,7 @@ get_controlfile(const char *DataDir)
control_file_info->state = DB_SHUTDOWNED;
control_file_info->checkPoint = InvalidXLogRecPtr;
control_file_info->data_checksum_version = -1;
control_file_info->timeline = -1;
/*
* Read PG_VERSION, as we'll need to determine which struct to read
@@ -264,6 +297,8 @@ get_controlfile(const char *DataDir)
control_file_info->state = ptr->state;
control_file_info->checkPoint = ptr->checkPoint;
control_file_info->data_checksum_version = ptr->data_checksum_version;
control_file_info->timeline = ptr->checkPointCopy.ThisTimeLineID;
control_file_info->minRecoveryPoint = ptr->minRecoveryPoint;
}
else if (version_num >= 90500)
{
@@ -272,6 +307,8 @@ get_controlfile(const char *DataDir)
control_file_info->state = ptr->state;
control_file_info->checkPoint = ptr->checkPoint;
control_file_info->data_checksum_version = ptr->data_checksum_version;
control_file_info->timeline = ptr->checkPointCopy.ThisTimeLineID;
control_file_info->minRecoveryPoint = ptr->minRecoveryPoint;
}
else if (version_num >= 90400)
{
@@ -280,6 +317,8 @@ get_controlfile(const char *DataDir)
control_file_info->state = ptr->state;
control_file_info->checkPoint = ptr->checkPoint;
control_file_info->data_checksum_version = ptr->data_checksum_version;
control_file_info->timeline = ptr->checkPointCopy.ThisTimeLineID;
control_file_info->minRecoveryPoint = ptr->minRecoveryPoint;
}
else if (version_num >= 90300)
{
@@ -288,6 +327,8 @@ get_controlfile(const char *DataDir)
control_file_info->state = ptr->state;
control_file_info->checkPoint = ptr->checkPoint;
control_file_info->data_checksum_version = ptr->data_checksum_version;
control_file_info->timeline = ptr->checkPointCopy.ThisTimeLineID;
control_file_info->minRecoveryPoint = ptr->minRecoveryPoint;
}
pfree(ControlFileDataPtr);

View File

@@ -24,6 +24,8 @@ typedef struct
DBState state;
XLogRecPtr checkPoint;
uint32 data_checksum_version;
TimeLineID timeline;
XLogRecPtr minRecoveryPoint;
} ControlFileInfo;
@@ -336,5 +338,7 @@ extern const char *describe_db_state(DBState state);
extern int get_data_checksum_version(const char *data_directory);
extern uint64 get_system_identifier(const char *data_directory);
extern XLogRecPtr get_latest_checkpoint_location(const char *data_directory);
extern TimeLineID get_timeline(const char *data_directory);
extern XLogRecPtr get_min_recovery_location(const char *data_directory);
#endif /* _CONTROLDATA_H_ */

View File

@@ -2301,7 +2301,14 @@ do_node_rejoin(void)
initPQExpBuffer(&follow_output);
/*
* do_standby_follow_internal() can handle situations where the follow
* target is not the primary, so requires database handles to both
* (even if they point to the same node). For the time being,
* "node rejoin" will only attatch a standby to the primary.
*/
success = do_standby_follow_internal(upstream_conn,
upstream_conn,
&primary_node_record,
&follow_output,
&follow_error_code);

View File

@@ -55,7 +55,7 @@ static bool local_data_directory_provided = false;
static bool upstream_conninfo_found = false;
static int upstream_node_id = UNKNOWN_NODE_ID;
static char upstream_data_directory[MAXPGPATH];
static char upstream_data_directory[MAXPGPATH] = "";
static t_conninfo_param_list recovery_conninfo = T_CONNINFO_PARAM_LIST_INITIALIZER;
static char recovery_conninfo_str[MAXLEN] = "";
@@ -69,8 +69,8 @@ static t_configfile_list config_files = T_CONFIGFILE_LIST_INITIALIZER;
static standy_clone_mode mode = pg_basebackup;
/* used by barman mode */
static char local_repmgr_tmp_directory[MAXPGPATH];
static char datadir_list_filename[MAXLEN];
static char local_repmgr_tmp_directory[MAXPGPATH] = "";
static char datadir_list_filename[MAXLEN] = "";
static char barman_command_buf[MAXLEN] = "";
static void _do_standby_promote_internal(PGconn *conn, int server_version_num);
@@ -2187,7 +2187,12 @@ do_standby_follow(void)
PGconn *primary_conn = NULL;
int primary_node_id = UNKNOWN_NODE_ID;
t_node_info primary_node_record = T_NODE_INFO_INITIALIZER;
PGconn *follow_target_conn = NULL;
int follow_target_node_id = UNKNOWN_NODE_ID;
t_node_info follow_target_node_record = T_NODE_INFO_INITIALIZER;
bool follow_target_is_primary = true;
RecordStatus record_status = RECORD_NOT_FOUND;
/* so we can pass info about the primary to event notification scripts */
t_event_info event_info = T_EVENT_INFO_INITIALIZER;
@@ -2201,7 +2206,8 @@ do_standby_follow(void)
uint64 local_system_identifier = UNKNOWN_SYSTEM_IDENTIFIER;
t_conninfo_param_list repl_conninfo = T_CONNINFO_PARAM_LIST_INITIALIZER;
PGconn *repl_conn = NULL;
t_system_identification primary_identification = T_SYSTEM_IDENTIFICATION_INITIALIZER;
t_system_identification follow_target_identifcation = T_SYSTEM_IDENTIFICATION_INITIALIZER;
TimeLineHistoryEntry *follow_target_history = NULL;
log_verbose(LOG_DEBUG, "do_standby_follow()");
@@ -2223,11 +2229,11 @@ do_standby_follow(void)
check_93_config();
/*
* --upstream-node-id provided; assume that is the desired primary
* and retrieve its record. We'll check if it's actualy a primary later.
* --upstream-node-id provided - attempt to follow that node
*/
if (runtime_options.upstream_node_id != UNKNOWN_NODE_ID)
{
/* we can't follow ourselves */
if (runtime_options.upstream_node_id == config_file_options.node_id)
{
log_error(_("provided --upstream-node-id %i is the current node"),
@@ -2236,67 +2242,91 @@ do_standby_follow(void)
exit(ERR_FOLLOW_FAIL);
}
primary_node_id = runtime_options.upstream_node_id;
record_status = get_node_record(local_conn, primary_node_id, &primary_node_record);
follow_target_node_id = runtime_options.upstream_node_id;
record_status = get_node_record(local_conn,
follow_target_node_id,
&follow_target_node_record);
/* but we must follow a node which exists (=registered) */
if (record_status != RECORD_FOUND)
{
log_error(_("unable to find record for upstream node %i"),
log_error(_("unable to find record for intended upstream node %i"),
runtime_options.upstream_node_id);
PQfinish(local_conn);
exit(ERR_FOLLOW_FAIL);
}
}
/*
* otherwise determine the current primary and attempt to follow that
*/
else
{
log_notice(_("attempting find to follow current primary"));
}
/*
* Attempt to connect to primary.
* Attempt to connect to follow target - if this was provided with --upstream-node-id,
* we'll connect to that, otherwise we'll attempt to find the current primary.
*
* If --wait provided, loop for up `primary_follow_timeout` seconds
* before giving up
*
* XXX add `upstream_follow_timeout` ?
*/
for (timer = 0; timer < config_file_options.primary_follow_timeout; timer++)
{
/* --upstream-node-id provided */
if (primary_node_id != UNKNOWN_NODE_ID)
/* --upstream-node-id provided - connect to specified node*/
if (follow_target_node_id != UNKNOWN_NODE_ID)
{
primary_conn = establish_db_connection_quiet(primary_node_record.conninfo);
follow_target_conn = establish_db_connection_quiet(follow_target_node_record.conninfo);
}
/* attempt to find current primary node */
else
{
primary_conn = get_primary_connection_quiet(local_conn,
&primary_node_id,
NULL);
follow_target_conn = get_primary_connection_quiet(local_conn,
&follow_target_node_id,
NULL);
}
if (PQstatus(primary_conn) == CONNECTION_OK || runtime_options.wait == false)
if (PQstatus(follow_target_conn) == CONNECTION_OK || runtime_options.wait == false)
{
break;
}
sleep(1);
}
PQfinish(local_conn);
if (PQstatus(primary_conn) != CONNECTION_OK)
/* unable to connect to the follow target */
if (PQstatus(follow_target_conn) != CONNECTION_OK)
{
if (primary_node_id == UNKNOWN_NODE_ID)
if (follow_target_node_id == UNKNOWN_NODE_ID)
{
log_error(_("unable to find a primary node"));
}
else
{
log_error(_("unable to connect to primary node %i"), primary_node_id);
log_error(_("unable to connect to target node %i"), follow_target_node_id);
}
if (runtime_options.wait == true)
{
log_detail(_("no primary appeared after %i seconds"),
config_file_options.primary_follow_timeout);
if (follow_target_node_id == UNKNOWN_NODE_ID)
{
log_detail(_("no primary appeared after %i seconds"),
config_file_options.primary_follow_timeout);
}
else
{
log_detail(_("unable to connect to target node %i after %i seconds"),
follow_target_node_id,
config_file_options.primary_follow_timeout);
}
log_hint(_("alter \"primary_follow_timeout\" in \"repmgr.conf\" to change this value"));
}
PQfinish(local_conn);
exit(ERR_FOLLOW_FAIL);
}
@@ -2305,126 +2335,275 @@ do_standby_follow(void)
{
if (runtime_options.dry_run == true)
{
log_info(_("connected to node %i, checking for current primary"), primary_node_id);
log_info(_("connected to node %i, checking for current primary"), follow_target_node_id);
}
else
{
log_verbose(LOG_INFO, _("connected to node %i, checking for current primary"), primary_node_id);
log_verbose(LOG_INFO, _("connected to node %i, checking for current primary"), follow_target_node_id);
}
record_status = get_node_record(primary_conn, primary_node_id, &primary_node_record);
record_status = get_node_record(follow_target_conn,
follow_target_node_id,
&follow_target_node_record);
if (record_status != RECORD_FOUND)
{
log_error(_("unable to find record for new primarynode %i"),
primary_node_id);
PQfinish(primary_conn);
log_error(_("unable to find record for follow target node %i"),
follow_target_node_id);
PQfinish(follow_target_conn);
exit(ERR_FOLLOW_FAIL);
}
}
/*
* Populate "event_info" with info about the primary for event notifications
* Populate "event_info" with info about the node to follow for event notifications
*
* XXX need to differentiate between primary and non-primary?
*/
event_info.node_id = primary_node_id;
event_info.node_name = primary_node_record.node_name;
event_info.conninfo_str = primary_node_record.conninfo;
event_info.node_id = follow_target_node_id;
event_info.node_name = follow_target_node_record.node_name;
event_info.conninfo_str = follow_target_node_record.conninfo;
if (runtime_options.dry_run == true)
// check whether follow target is in recovery, format message
{
log_info(_("primary node is \"%s\" (ID: %i)"),
primary_node_record.node_name,
primary_node_id);
}
else
{
log_verbose(LOG_INFO, ("primary node is \"%s\" (ID: %i)"),
primary_node_record.node_name,
primary_node_id);
PQExpBufferData node_info_msg;
RecoveryType recovery_type = RECTYPE_UNKNOWN;
initPQExpBuffer(&node_info_msg);
recovery_type = get_recovery_type(follow_target_conn);
/*
* unlikely this will happen, but it's conceivable the follow target will
* have vanished since we last talked to it, or something
*/
if (recovery_type == RECTYPE_UNKNOWN)
{
log_error(_("unable to determine recovery type of follow target"));
PQfinish(follow_target_conn);
exit(ERR_FOLLOW_FAIL);
}
if (recovery_type == RECTYPE_PRIMARY)
{
follow_target_is_primary = true;
appendPQExpBuffer(&node_info_msg,
_("follow target is primary node \"%s\" (ID: %i)"),
follow_target_node_record.node_name,
follow_target_node_id);
}
else
{
follow_target_is_primary = false;
appendPQExpBuffer(&node_info_msg,
_("follow target is standby node \"%s\" (ID: %i)"),
follow_target_node_record.node_name,
follow_target_node_id);
}
if (runtime_options.dry_run == true)
{
log_info("%s", node_info_msg.data);
}
else
{
log_verbose(LOG_INFO, "%s", node_info_msg.data);
}
termPQExpBuffer(&node_info_msg);
}
/* if replication slots in use, check at least one free slot is available */
if (config_file_options.use_replication_slots)
{
int free_slots = get_free_replication_slot_count(primary_conn);
int free_slots = get_free_replication_slot_count(follow_target_conn);
if (free_slots < 0)
{
log_error(_("unable to determine number of free replication slots on the primary"));
PQfinish(primary_conn);
log_error(_("unable to determine number of free replication slots on node %i"),
follow_target_node_id);
PQfinish(follow_target_conn);
PQfinish(local_conn);
exit(ERR_FOLLOW_FAIL);
}
if (free_slots == 0)
{
log_error(_("no free replication slots available on the primary"));
log_error(_("no free replication slots available on node %i"), follow_target_node_id);
log_hint(_("consider increasing \"max_replication_slots\""));
PQfinish(primary_conn);
PQfinish(follow_target_conn);
PQfinish(local_conn);
exit(ERR_FOLLOW_FAIL);
}
else if (runtime_options.dry_run == true)
{
log_info(_("replication slots in use, %i free slots on the primary"),
log_info(_("replication slots in use, %i free slots on node %i"),
follow_target_node_id,
free_slots);
}
}
/* XXX check this is not current upstream anyway */
/* check replication connection */
initialize_conninfo_params(&repl_conninfo, false);
conn_to_param_list(primary_conn, &repl_conninfo);
conn_to_param_list(follow_target_conn, &repl_conninfo);
if (strcmp(param_get(&repl_conninfo, "user"), primary_node_record.repluser) != 0)
if (strcmp(param_get(&repl_conninfo, "user"), follow_target_node_record.repluser) != 0)
{
param_set(&repl_conninfo, "user", primary_node_record.repluser);
param_set(&repl_conninfo, "user", follow_target_node_record.repluser);
param_set(&repl_conninfo, "dbname", "replication");
}
param_set(&repl_conninfo, "replication", "1");
repl_conn = establish_db_connection_by_params(&repl_conninfo, false);
if (PQstatus(repl_conn) != CONNECTION_OK)
{
log_error(_("unable to establish a replication connection to the primary node"));
PQfinish(primary_conn);
log_error(_("unable to establish a replication connection to the follow target node"));
PQfinish(follow_target_conn);
exit(ERR_FOLLOW_FAIL);
}
else if (runtime_options.dry_run == true)
{
log_info(_("replication connection to primary node was successful"));
log_info(_("replication connection to the follow target node was successful"));
}
/* check system_identifiers match */
local_system_identifier = get_system_identifier(config_file_options.data_directory);
success = identify_system(repl_conn, &primary_identification);
success = identify_system(repl_conn, &follow_target_identifcation);
if (success == false)
{
log_error(_("unable to query the primary node's system identification"));
PQfinish(primary_conn);
log_error(_("unable to query the follow target node's system identification"));
PQfinish(follow_target_conn);
PQfinish(repl_conn);
exit(ERR_FOLLOW_FAIL);
}
if (primary_identification.system_identifier != local_system_identifier)
if (follow_target_identifcation.system_identifier != local_system_identifier)
{
log_error(_("this node is not part of the primary node's replication cluster"));
log_detail(_("this node's system identifier is %lu, primary node's system identifier is %lu"),
log_error(_("this node is not part of the follow target node's replication cluster"));
log_detail(_("this node's system identifier is %lu, follow target node's system identifier is %lu"),
local_system_identifier,
primary_identification.system_identifier);
PQfinish(primary_conn);
follow_target_identifcation.system_identifier);
PQfinish(follow_target_conn);
PQfinish(repl_conn);
exit(ERR_FOLLOW_FAIL);
}
else if (runtime_options.dry_run == true)
{
log_info(_("local and primary system identifiers match"));
log_info(_("local and follow target system identifiers match"));
log_detail(_("system identifier is %lu"), local_system_identifier);
}
/* TODO: check timelines */
/*
* Execute CHECKPOINT on the local node - we'll need this to update
* the pg_control file so we can compare positions with the new upstream.
* There is no way of avoiding this for --dry-run.
*/
checkpoint(local_conn);
/*
* Here we'll perform some sanity checks to ensure the follow target
* can actually be followed.
*/
{
/* check timelines */
TimeLineID local_timeline = get_timeline(config_file_options.data_directory);
XLogRecPtr min_recovery_location = get_min_recovery_location(config_file_options.data_directory);
log_verbose(LOG_DEBUG, "local timeline: %i; follow target timeline: %i",
local_timeline,
follow_target_identifcation.timeline);
/* upstream's timeline is lower than ours - impossible case */
if (follow_target_identifcation.timeline < local_timeline)
{
log_error(_("this node's timeline is ahead of the follow target node's timeline"));
log_detail(_("this node's timeline is %i, follow target node's timeline is %i"),
local_timeline,
follow_target_identifcation.timeline);
PQfinish(follow_target_conn);
PQfinish(repl_conn);
PQfinish(local_conn);
exit(ERR_FOLLOW_FAIL);
}
if (follow_target_identifcation.timeline == local_timeline)
{
XLogRecPtr local_xlogpos = get_current_lsn(local_conn);
XLogRecPtr follow_target_xlogpos = get_current_lsn(follow_target_conn);
if (local_xlogpos == InvalidXLogRecPtr || follow_target_xlogpos == InvalidXLogRecPtr)
{
log_error(_("unable to compare LSN positions"));
PQfinish(follow_target_conn);
PQfinish(repl_conn);
PQfinish(local_conn);
exit(ERR_FOLLOW_FAIL);
}
/* timeline is the same - check relative positions */
if (local_xlogpos <= follow_target_xlogpos)
{
log_info(_("timelines are same, this server is not ahead"));
log_detail(_("local node lsn is %X/%X, follow target lsn is %X/%X"),
format_lsn(local_xlogpos),
format_lsn(follow_target_xlogpos));
}
else
{
log_error(_("this node is ahead of the follow target"));
log_detail(_("local node lsn is %X/%X, follow target lsn is %X/%X"),
format_lsn(local_xlogpos),
format_lsn(follow_target_xlogpos));
PQfinish(follow_target_conn);
PQfinish(repl_conn);
PQfinish(local_conn);
exit(ERR_FOLLOW_FAIL);
}
}
else
{
/* upstream has higher timeline - check where it forked off from this node's timeline */
follow_target_history = get_timeline_history(repl_conn, local_timeline + 1);
if (follow_target_history == NULL)
{
PQfinish(follow_target_conn);
PQfinish(repl_conn);
PQfinish(local_conn);
exit(ERR_FOLLOW_FAIL);
}
log_debug("upstream tli: %i; branch LSN: %X/%X",
follow_target_history->tli, format_lsn(follow_target_history->end));
if (follow_target_history->end < min_recovery_location)
{
log_error(_("this node cannot attach to follow target node %i"),
follow_target_node_id);
log_detail(_("follow target server's timeline %i forked off current database system timeline %i before current recovery point %X/%X\n"),
local_timeline + 1,
local_timeline,
format_lsn(min_recovery_location));
PQfinish(follow_target_conn);
PQfinish(repl_conn);
PQfinish(local_conn);
exit(ERR_FOLLOW_FAIL);
}
}
}
PQfinish(local_conn);
PQfinish(repl_conn);
free_conninfo_params(&repl_conninfo);
@@ -2435,18 +2614,38 @@ do_standby_follow(void)
exit(SUCCESS);
}
/*
* Here we'll need a connection to the primary, if the upstream is not a primary.
*/
if (follow_target_is_primary == false)
{
/*
* We'll try and establish primary from follow target, in the assumption its node
* record is more up-to-date.
*/
primary_conn = get_primary_connection_quiet(follow_target_conn,
&primary_node_id,
NULL);
}
else
{
primary_conn = follow_target_conn;
}
initPQExpBuffer(&follow_output);
success = do_standby_follow_internal(primary_conn,
&primary_node_record,
&follow_output,
&follow_error_code);
success = do_standby_follow_internal(
primary_conn,
follow_target_conn,
&follow_target_node_record,
&follow_output,
&follow_error_code);
/* unable to restart the standby */
if (success == false)
{
create_event_notification_extended(
primary_conn,
follow_target_conn,
&config_file_options,
config_file_options.node_id,
"standby_follow",
@@ -2454,7 +2653,10 @@ do_standby_follow(void)
follow_output.data,
&event_info);
PQfinish(primary_conn);
PQfinish(follow_target_conn);
if (follow_target_is_primary == false)
PQfinish(primary_conn);
log_notice(_("STANDBY FOLLOW failed"));
if (strlen( follow_output.data ))
@@ -2479,7 +2681,7 @@ do_standby_follow(void)
for (timer = 0; timer < config_file_options.standby_follow_timeout; timer++)
{
success = is_downstream_node_attached(primary_conn, config_file_options.node_name);
success = is_downstream_node_attached(follow_target_conn, config_file_options.node_name);
if (success == true)
break;
@@ -2494,16 +2696,16 @@ do_standby_follow(void)
log_notice(_("STANDBY FOLLOW successful"));
appendPQExpBuffer(&follow_output,
"standby attached to upstream node \"%s\" (node ID: %i)",
primary_node_record.node_name,
primary_node_id);
follow_target_node_record.node_name,
follow_target_node_id);
}
else
{
log_error(_("STANDBY FOLLOW failed"));
appendPQExpBuffer(&follow_output,
"standby did not attach to upstream node \"%s\" (node ID: %i) after %i seconds",
primary_node_record.node_name,
primary_node_id,
follow_target_node_record.node_name,
follow_target_node_id,
config_file_options.standby_follow_timeout);
}
@@ -2519,10 +2721,13 @@ do_standby_follow(void)
follow_output.data,
&event_info);
PQfinish(primary_conn);
termPQExpBuffer(&follow_output);
PQfinish(follow_target_conn);
if (follow_target_is_primary == false)
PQfinish(primary_conn);
if (success == false)
exit(ERR_FOLLOW_FAIL);
@@ -2538,7 +2743,7 @@ do_standby_follow(void)
* this function.
*/
bool
do_standby_follow_internal(PGconn *primary_conn, t_node_info *primary_node_record, PQExpBufferData *output, int *error_code)
do_standby_follow_internal(PGconn *primary_conn, PGconn *follow_target_conn, t_node_info *follow_target_node_record, PQExpBufferData *output, int *error_code)
{
t_node_info local_node_record = T_NODE_INFO_INITIALIZER;
int original_upstream_node_id = UNKNOWN_NODE_ID;
@@ -2548,10 +2753,12 @@ do_standby_follow_internal(PGconn *primary_conn, t_node_info *primary_node_recor
char *errmsg = NULL;
bool remove_old_replication_slot = false;
/*
* Fetch our node record so we can write application_name, if set, and to
* get the upstream node ID, which we'll need to know if replication slots
* are in use and we want to delete the old slot.
* get the current upstream node ID, which we'll need to know if replication
* slots are in use and we want to delete this node's slot on the current
* upstream.
*/
record_status = get_node_record(primary_conn,
config_file_options.node_id,
@@ -2567,8 +2774,8 @@ do_standby_follow_internal(PGconn *primary_conn, t_node_info *primary_node_recor
}
/*
* If replication slots are in use, we'll need to create a slot on the new
* primary
* If replication slots are in use, we'll need to create a slot on the
* follow target
*/
if (config_file_options.use_replication_slots)
@@ -2593,7 +2800,7 @@ do_standby_follow_internal(PGconn *primary_conn, t_node_info *primary_node_recor
}
if (create_replication_slot(primary_conn,
if (create_replication_slot(follow_target_conn,
local_node_record.slot_name,
output) == false)
{
@@ -2607,7 +2814,7 @@ do_standby_follow_internal(PGconn *primary_conn, t_node_info *primary_node_recor
initialize_conninfo_params(&recovery_conninfo, false);
/* We ignore any application_name set in the primary's conninfo */
parse_conninfo_string(primary_node_record->conninfo, &recovery_conninfo, &errmsg, true);
parse_conninfo_string(follow_target_node_record->conninfo, &recovery_conninfo, &errmsg, true);
{
t_conninfo_param_list local_node_conninfo = T_CONNINFO_PARAM_LIST_INITIALIZER;
@@ -2646,7 +2853,7 @@ do_standby_follow_internal(PGconn *primary_conn, t_node_info *primary_node_recor
}
else
{
original_upstream_node_id = primary_node_record->node_id;
original_upstream_node_id = follow_target_node_record->node_id;
}
@@ -2681,11 +2888,11 @@ do_standby_follow_internal(PGconn *primary_conn, t_node_info *primary_node_recor
/* Set the application name to this node's name */
param_set(&recovery_conninfo, "application_name", config_file_options.node_name);
/* Set the replication user from the primary node record */
param_set(&recovery_conninfo, "user", primary_node_record->repluser);
/* Set the replication user from the follow target node record */
param_set(&recovery_conninfo, "user", follow_target_node_record->repluser);
log_notice(_("setting node %i's primary to node %i"),
config_file_options.node_id, primary_node_record->node_id);
log_notice(_("setting node %i's upstream to node %i"),
config_file_options.node_id, follow_target_node_record->node_id);
if (!create_recovery_file(&local_node_record, &recovery_conninfo, config_file_options.data_directory, true))
{
@@ -2772,7 +2979,6 @@ do_standby_follow_internal(PGconn *primary_conn, t_node_info *primary_node_recor
}
else
{
action = "start";
get_server_action(ACTION_START, server_command, config_file_options.data_directory);
@@ -2833,7 +3039,7 @@ do_standby_follow_internal(PGconn *primary_conn, t_node_info *primary_node_recor
if (update_node_record_status(primary_conn,
config_file_options.node_id,
"standby",
primary_node_record->node_id,
follow_target_node_record->node_id,
true) == false)
{
appendPQExpBufferStr(output,
@@ -2845,7 +3051,7 @@ do_standby_follow_internal(PGconn *primary_conn, t_node_info *primary_node_recor
appendPQExpBuffer(output,
_("node %i is now attached to node %i"),
config_file_options.node_id,
primary_node_record->node_id);
follow_target_node_record->node_id);
return true;
}

View File

@@ -28,7 +28,7 @@ extern void do_standby_switchover(void);
extern void do_standby_help(void);
extern bool do_standby_follow_internal(PGconn *primary_conn, t_node_info *primary_node_record, PQExpBufferData *output, int *error_code);
extern bool do_standby_follow_internal(PGconn *primary_conn, PGconn *follow_target_conn, t_node_info *follow_target_node_record, PQExpBufferData *output, int *error_code);