Refactor "standby follow" functionality

"standby follow" was originally co-opted to start up a demoted node;
this functionality is now delegated to "node rejoin", with the core
functionality of "standby follow" implemented as an internal function.
This commit is contained in:
Ian Barwick
2017-08-09 13:26:27 +09:00
parent b1e544f962
commit df425a38b7
4 changed files with 215 additions and 204 deletions

View File

@@ -1211,127 +1211,83 @@ _do_standby_promote_internal(const char *data_dir)
/*
* Follow a new primary.
*
* This function has two "modes":
* 1) no primary info provided - determine primary from standby metadata
* 2) primary info provided - use that info to connect to the primary.
*
* (2) is mainly for when a node has been stopped as part of a switchover
* and needs to be started with recovery.conf correctly configured.
* Node must be running. To start an inactive node and point it at a
* new primary, use "repmgr node rejoin".
*/
void
do_standby_follow(void)
{
PGconn *local_conn;
t_node_info local_node_record = T_NODE_INFO_INITIALIZER;
int original_upstream_node_id = UNKNOWN_NODE_ID;
PGconn *local_conn = NULL;
PGconn *primary_conn = NULL;
int primary_id = UNKNOWN_NODE_ID;
t_node_info primary_node_record = T_NODE_INFO_INITIALIZER;
char data_dir[MAXPGPATH];
t_conninfo_param_list recovery_conninfo;
char *errmsg = NULL;
RecordStatus record_status;
char restart_command[MAXLEN];
int r;
PQExpBufferData event_details;
int timer;
log_verbose(LOG_DEBUG, "do_standby_follow()");
local_conn = establish_db_connection(config_file_options.conninfo, true);
log_verbose(LOG_INFO, _("connected to local node"));
/* check this is a standby */
check_recovery_type(local_conn);
/*
* If -h/--host wasn't provided, attempt to connect to standby
* to determine primary, and carry out some other checks while we're
* at it.
*/
if (runtime_options.host_param_provided == false)
{
bool success;
int timer;
local_conn = establish_db_connection(config_file_options.conninfo, true);
log_verbose(LOG_INFO, _("connected to local node"));
check_recovery_type(local_conn);
success = get_pg_setting(local_conn, "data_directory", data_dir);
if (success == false)
{
log_error(_("unable to determine data directory"));
PQfinish(local_conn);
exit(ERR_BAD_CONFIG);
}
/*
* Attempt to connect to primary.
*
* If --wait provided, loop for up `primary_response_timeout`
* seconds before giving up
*/
// XXX ??? primary_follow_timeout
for (timer = 0; timer < config_file_options.primary_follow_timeout; timer++)
{
primary_conn = get_primary_connection_quiet(local_conn,
&primary_id,
NULL);
if (PQstatus(primary_conn) == CONNECTION_OK || runtime_options.wait == false)
{
break;
}
}
if (PQstatus(primary_conn) != CONNECTION_OK)
{
log_error(_("unable to determine primary node"));
PQfinish(local_conn);
exit(ERR_BAD_CONFIG);
}
check_primary_standby_version_match(local_conn, primary_conn);
PQfinish(local_conn);
}
/* local data directory and primary server info explictly provided -
* attempt to connect to that
* Attempt to connect to primary.
*
* XXX --wait option won't be effective here
* If --wait provided, loop for up `primary_response_timeout`
* seconds before giving up
*/
else
// XXX ??? primary_follow_timeout
for (timer = 0; timer < config_file_options.primary_follow_timeout; timer++)
{
if (config_file_options.data_directory[0] == '\0')
{
if (runtime_options.data_dir[0] == '\0')
{
log_error(_("-D/--pgdata required when providing connection parameters for \"standby follow\""));
exit(ERR_BAD_CONFIG);
}
strncpy(data_dir, runtime_options.data_dir, MAXPGPATH);
}
else
{
strncpy(data_dir, config_file_options.data_directory, MAXPGPATH);
}
primary_conn = get_primary_connection_quiet(local_conn,
&primary_id,
NULL);
primary_conn = establish_db_connection_by_params(&source_conninfo, true);
primary_id = get_primary_node_id(primary_conn);
if (PQstatus(primary_conn) == CONNECTION_OK || runtime_options.wait == false)
{
break;
}
}
if (get_recovery_type(primary_conn) != RECTYPE_PRIMARY)
if (PQstatus(primary_conn) != CONNECTION_OK)
{
log_error(_("the node to follow is not a primary"));
// XXX log detail
PQfinish(primary_conn);
log_error(_("unable to determine primary node"));
PQfinish(local_conn);
exit(ERR_BAD_CONFIG);
}
check_primary_standby_version_match(local_conn, primary_conn);
PQfinish(local_conn);
get_node_record(primary_conn, primary_id, &primary_node_record);
do_standby_follow_internal(primary_conn, &primary_node_record);
}
/*
* Perform the actuall "follow" operation; this is executed by
* "node rejoin" too.
*/
void
do_standby_follow_internal(PGconn *primary_conn, t_node_info *primary_node_record)
{
t_node_info local_node_record = T_NODE_INFO_INITIALIZER;
int original_upstream_node_id = UNKNOWN_NODE_ID;
char restart_command[MAXLEN];
int r;
RecordStatus record_status;
PQExpBufferData event_details;
char *errmsg = NULL;
/*
* Fetch our node record so we can write application_name, if set,
@@ -1383,20 +1339,19 @@ do_standby_follow(void)
termPQExpBuffer(&event_details);
}
get_node_record(primary_conn, primary_id, &primary_node_record);
/* Initialise connection parameters to write as `primary_conninfo` */
initialize_conninfo_params(&recovery_conninfo, false);
/* We ignore any application_name set in the primary's conninfo */
parse_conninfo_string(primary_node_record.conninfo, &recovery_conninfo, errmsg, true);
parse_conninfo_string(primary_node_record->conninfo, &recovery_conninfo, errmsg, true);
/* Set the default application name to this node's name */
param_set(&recovery_conninfo, "application_name", config_file_options.node_name);
/* Set the replication user from the primary node record */
param_set(&recovery_conninfo, "user", primary_node_record.repluser);
param_set(&recovery_conninfo, "user", primary_node_record->repluser);
{
@@ -1431,14 +1386,14 @@ do_standby_follow(void)
}
else
{
original_upstream_node_id = primary_id;
original_upstream_node_id = primary_node_record->node_id;
}
}
log_info(_("changing node %i's primary to node %i"),
config_file_options.node_id, primary_id);
config_file_options.node_id, primary_node_record->node_id);
if (!create_recovery_file(&local_node_record, &recovery_conninfo, data_dir))
if (!create_recovery_file(&local_node_record, &recovery_conninfo, config_file_options.data_directory))
{
PQfinish(primary_conn);
exit(ERR_BAD_CONFIG);
@@ -1449,7 +1404,7 @@ do_standby_follow(void)
// XXX here check if service is running!! if not, start
// ensure that problem with pg_ctl output is caught here
get_server_action(ACTION_RESTART, restart_command, data_dir);
get_server_action(ACTION_RESTART, restart_command, config_file_options.data_directory);
log_notice(_("restarting server using '%s'"),
restart_command);
@@ -1474,6 +1429,7 @@ do_standby_follow(void)
{
t_node_info upstream_node_record = T_NODE_INFO_INITIALIZER;
RecordStatus upstream_record_status;
PGconn *local_conn;
log_verbose(LOG_INFO, "attempting to remove replication slot from old upstream node %i",
original_upstream_node_id);
@@ -1518,7 +1474,7 @@ do_standby_follow(void)
if (update_node_record_status(primary_conn,
config_file_options.node_id,
"standby",
primary_id,
primary_node_record->node_id,
true) == false)
{
log_error(_("unable to update upstream node"));
@@ -1527,12 +1483,13 @@ do_standby_follow(void)
exit(ERR_BAD_CONFIG);
}
// XXX return to caller
log_notice(_("STANDBY FOLLOW successful"));
initPQExpBuffer(&event_details);
appendPQExpBuffer(&event_details,
_("node %i is now attached to node %i"),
config_file_options.node_id, primary_id);
config_file_options.node_id, primary_node_record->node_id);
create_event_notification(primary_conn,
&config_file_options,
@@ -1591,6 +1548,7 @@ do_standby_switchover(void)
RecoveryType recovery_type;
PQExpBufferData remote_command_str;
PQExpBufferData command_output;
PQExpBufferData node_rejoin_options;
int r, i;
@@ -2160,7 +2118,13 @@ do_standby_switchover(void)
/* promote standby */
_do_standby_promote_internal(config_file_options.data_directory);
if (1 || replication_info.last_wal_receive_lsn < remote_last_checkpoint_lsn)
/*
* Execute `repmgr node rejoin` to create recovery.conf and start
* the remote server. Additionally execute "pg_rewind", if required
* and requested.
*/
initPQExpBuffer(&node_rejoin_options);
if (replication_info.last_wal_receive_lsn < remote_last_checkpoint_lsn)
{
KeyValueListCell *cell;
bool first_entry = true;
@@ -2170,60 +2134,46 @@ do_standby_switchover(void)
log_error(_("new primary diverges from former primary and --force-rewind not provided"));
/* TODO: "repmgr node rejoin" example, when available */
log_hint(_("the former primary will need to be restored manually"));
termPQExpBuffer(&node_rejoin_options);
PQfinish(local_conn);
exit(ERR_SWITCHOVER_FAIL);
}
initPQExpBuffer(&remote_command_str);
make_remote_repmgr_path(&remote_command_str);
appendPQExpBuffer(&remote_command_str,
"node rejoin --upstream-conninfo='%s'",
local_node_record.conninfo);
appendPQExpBuffer(&remote_command_str,
" --config-files=");
appendPQExpBuffer(&node_rejoin_options,
" --force-rewind --config-files=");
for (cell = remote_config_files.head; cell; cell = cell->next)
{
if (first_entry == false)
appendPQExpBuffer(&remote_command_str, ",");
appendPQExpBuffer(&node_rejoin_options, ",");
else
first_entry = false;
appendPQExpBuffer(&remote_command_str, "%s", cell->key);
appendPQExpBuffer(&node_rejoin_options, "%s", cell->key);
}
log_debug("executing:\n \"%s\"", remote_command_str.data);
(void)remote_command(
remote_host,
runtime_options.remote_user,
remote_command_str.data,
NULL);
termPQExpBuffer(&remote_command_str);
appendPQExpBuffer(&node_rejoin_options, " ");
}
else
{
/*
* Execute `repmgr standby follow` to create recovery.conf and start
* the remote server
*
* XXX replace with "node rejoin"
*/
initPQExpBuffer(&remote_command_str);
make_remote_repmgr_path(&remote_command_str);
appendPQExpBuffer(&remote_command_str,
" -d \\'%s\\' standby follow",
local_node_record.conninfo);
log_debug("executing:\n \"%s\"", remote_command_str.data);
(void)remote_command(
remote_host,
runtime_options.remote_user,
remote_command_str.data,
NULL);
termPQExpBuffer(&remote_command_str);
}
initPQExpBuffer(&remote_command_str);
make_remote_repmgr_path(&remote_command_str);
appendPQExpBuffer(&remote_command_str,
"%s--upstream-conninfo=\\'%s\\' node rejoin",
node_rejoin_options.data,
local_node_record.conninfo);
log_debug("executing:\n \"%s\"", remote_command_str.data);
(void)remote_command(
remote_host,
runtime_options.remote_user,
remote_command_str.data,
NULL);
termPQExpBuffer(&remote_command_str);
termPQExpBuffer(&node_rejoin_options);
/* TODO: verify this node's record was updated correctly */