repmgr: automatically create slot name if missing

It's possible that a node was registered with "use_replication_slots=false"
but that was later changed to "use_replication_slots=true". If the node
was not subsequently re-registered, the node record will contain an empty
slot name, which will cause any slot creation operation during
"standby follow" or "node rejoin" to fail.

To prevent this happening, check for an empty slot name and automatically
set before proceeding.

Addresses GitHub #343.
This commit is contained in:
Ian Barwick
2018-01-11 11:13:41 +09:00
parent faffb2a6e7
commit ae7963dc64
5 changed files with 80 additions and 16 deletions

View File

@@ -2592,6 +2592,36 @@ truncate_node_records(PGconn *conn)
return true;
}
bool
update_node_record_slot_name(PGconn *primary_conn, int node_id, char *slot_name)
{
PQExpBufferData query;
PGresult *res = NULL;
initPQExpBuffer(&query);
appendPQExpBuffer(&query,
" UPDATE repmgr.nodes "
" SET slot_name = '%s' "
" WHERE node_id = %i ",
slot_name,
node_id);
res = PQexec(primary_conn, query.data);
termPQExpBuffer(&query);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
log_error(_("unable to set node record slot name:\n %s"),
PQerrorMessage(primary_conn));
PQclear(res);
return false;
}
PQclear(res);
return true;
}
void
get_node_replication_stats(PGconn *conn, int server_version_num, t_node_info *node_info)
{
@@ -3350,6 +3380,14 @@ get_event_records(PGconn *conn, int node_id, const char *node_name, const char *
/* replication slot functions */
/* ========================== */
void
create_slot_name(char *slot_name, int node_id)
{
maxlen_snprintf(slot_name, "repmgr_slot_%i", node_id);
}
bool
create_replication_slot(PGconn *conn, char *slot_name, int server_version_num, PQExpBufferData *error_msg)
{

View File

@@ -422,10 +422,10 @@ bool update_node_record_set_primary(PGconn *conn, int this_node_id);
bool update_node_record_set_upstream(PGconn *conn, int this_node_id, int new_upstream_node_id);
bool update_node_record_status(PGconn *conn, int this_node_id, char *type, int upstream_node_id, bool active);
bool update_node_record_conn_priority(PGconn *conn, t_configuration_options *options);
bool update_node_record_slot_name(PGconn *primary_conn, int node_id, char *slot_name);
bool witness_copy_node_records(PGconn *primary_conn, PGconn *witness_conn);
void clear_node_info_list(NodeInfoList *nodes);
/* PostgreSQL configuration file location functions */
@@ -441,6 +441,7 @@ bool create_event_notification_extended(PGconn *conn, t_configuration_options *
PGresult *get_event_records(PGconn *conn, int node_id, const char *node_name, const char *event, bool all, int limit);
/* replication slot functions */
void create_slot_name(char *slot_name, int node_id);
bool create_replication_slot(PGconn *conn, char *slot_name, int server_version_num, PQExpBufferData *error_msg);
bool drop_replication_slot(PGconn *conn, char *slot_name);
RecordStatus get_slot_record(PGconn *conn, char *slot_name, t_replication_slot *record);

View File

@@ -1575,13 +1575,11 @@ parse_server_action(const char *action_name)
/*
* Intended mainly for "internal" use by "standby switchover", which
* calls this on the target server to excute pg_rewind on a demoted
* primary with a forked (sic) timeline. This function does not
* currently check whether this is a useful thing to do (however
* "standby switchover" will perform a check before calling it).
* Rejoin a dormant (shut down) node to the replication cluster; this
* is typically a former primary which needs to be demoted to a standby.
*
* TODO: make this into a more generally useful function.
* Note that "repmgr node rejoin" is also executed by
* "repmgr standby switchover" after promoting the new primary.
*/
void
do_node_rejoin(void)
@@ -1634,14 +1632,21 @@ do_node_rejoin(void)
/* check if cleanly shut down */
if (db_state != DB_SHUTDOWNED && db_state != DB_SHUTDOWNED_IN_RECOVERY)
{
log_error(_("database is not shut down cleanly"));
if (runtime_options.force_rewind == true)
if (db_state == DB_SHUTDOWNING)
{
log_detail(_("pg_rewind will not be able to run"));
log_error(_("database is still shutting down"));
}
else
{
log_error(_("database is not shut down cleanly"));
if (runtime_options.force_rewind == true)
{
log_detail(_("pg_rewind will not be able to run"));
}
log_hint(_("database should be restarted then shut down cleanly after crash recovery completes"));
exit(ERR_BAD_CONFIG);
}
log_hint(_("database should be restarted and shut down cleanly after crash recovery completes"));
exit(ERR_BAD_CONFIG);
}

View File

@@ -1775,6 +1775,26 @@ do_standby_follow_internal(PGconn *primary_conn, t_node_info *primary_node_recor
{
int primary_server_version_num = get_server_version(primary_conn, NULL);
/*
* Here we add a sanity check for the "slot_name" field - it's possible
* the node was initially registered with "use_replication_slots=false"
* but the configuration was subsequently changed, leaving the field NULL.
*
* To avoid annoying failures we can just update the node record and proceed.
*/
if (!strlen(local_node_record.slot_name))
{
create_slot_name(local_node_record.slot_name, config_file_options.node_id);
log_notice(_("setting node %i's slot name to \"%s\""),
config_file_options.node_id,
local_node_record.slot_name);
update_node_record_slot_name(primary_conn, config_file_options.node_id, local_node_record.slot_name);
}
if (create_replication_slot(primary_conn,
local_node_record.slot_name,
primary_server_version_num,
@@ -1867,8 +1887,8 @@ do_standby_follow_internal(PGconn *primary_conn, t_node_info *primary_node_recor
/* Set the replication user from the primary node record */
param_set(&recovery_conninfo, "user", primary_node_record->repluser);
log_info(_("setting node %i's primary to node %i"),
config_file_options.node_id, primary_node_record->node_id);
log_notice(_("setting node %i's primary to node %i"),
config_file_options.node_id, primary_node_record->node_id);
if (!create_recovery_file(&local_node_record, &recovery_conninfo, config_file_options.data_directory))
{

View File

@@ -2729,6 +2729,6 @@ init_node_record(t_node_info *node_record)
if (config_file_options.use_replication_slots == true)
{
maxlen_snprintf(node_record->slot_name, "repmgr_slot_%i", config_file_options.node_id);
create_slot_name(node_record->slot_name, config_file_options.node_id);
}
}