repmgr: improve slot handling in "node rejoin"

On the rejoined node, if a replication slot for the new upstream exists
(which is typically the case after a failover), delete that slot.

Also emit a warning about any inactive replication slots which may need
to be cleaned up manually.

GitHub #499.
This commit is contained in:
Ian Barwick
2018-08-30 11:18:08 +09:00
parent 3573950425
commit 9681708b1a
4 changed files with 98 additions and 44 deletions

View File

@@ -2417,6 +2417,54 @@ do_node_rejoin(void)
success = is_downstream_node_attached(upstream_conn, config_file_options.node_name);
}
/*
* Handle replication slots:
* - if a slot for the new upstream exists, delete that
* - warn about any other inactive replication slots
*/
if (runtime_options.force_rewind_used == false && config_file_options.use_replication_slots)
{
PGconn *local_conn = NULL;
local_conn = establish_db_connection(config_file_options.conninfo, false);
if (PQstatus(local_conn) != CONNECTION_OK)
{
log_warning(_("unable to connect to local node to check replication slot status"));
log_hint(_("execute \"repmgr node check\" to check inactive slots and drop manually if necessary"));
}
else
{
KeyValueList inactive_replication_slots = {NULL, NULL};
KeyValueListCell *cell = NULL;
int inactive_count = 0;
PQExpBufferData slotinfo;
drop_replication_slot_if_exists(local_conn,
config_file_options.node_id,
primary_node_record.slot_name);
(void) get_inactive_replication_slots(local_conn, &inactive_replication_slots);
initPQExpBuffer(&slotinfo);
for (cell = inactive_replication_slots.head; cell; cell = cell->next)
{
appendPQExpBuffer(&slotinfo,
" - %s (%s)", cell->key, cell->value);
inactive_count++;
}
if (inactive_count > 0)
{
log_warning(_("%i inactive replication slots detected"), inactive_count);
log_detail(_("inactive replication slots:\n%s"), slotinfo.data);
log_hint(_("these replication slots may need to be removed manually"));
}
termPQExpBuffer(&slotinfo);
PQfinish(local_conn);
}
}
if (success == true)
{
@@ -2426,7 +2474,8 @@ do_node_rejoin(void)
else
{
/*
* if we reach here, no record found in upstream node's pg_stat_replication */
* if we reach here, no record found in upstream node's pg_stat_replication
*/
log_notice(_("NODE REJOIN has completed but node is not yet reattached to upstream"));
log_hint(_("you will need to manually check the node's replication status"));
}

View File

@@ -89,8 +89,6 @@ static int run_file_backup(t_node_info *node_record);
static void copy_configuration_files(bool delete_after_copy);
static void drop_replication_slot_if_exists(PGconn *conn, int node_id, char *slot_name);
static void tablespace_data_append(TablespaceDataList *list, const char *name, const char *oid, const char *location);
static void get_barman_property(char *dst, char *name, char *local_repmgr_directory);
@@ -2734,6 +2732,10 @@ do_standby_follow_internal(PGconn *primary_conn, t_node_info *primary_node_recor
* If replication slots are in use, and an inactive one for this node
* exists on the former upstream, drop it.
*
* Note that if this function is called by do_standby_switchover(), the
* "repmgr node rejoin" command executed on the demotion candidate may already
* have removed the slot, so there may be nothing to do.
*
* XXX check if former upstream is current primary?
*/
@@ -3116,8 +3118,6 @@ do_standby_switchover(void)
}
termPQExpBuffer(&command_output);
// check walsenders here
/*
* populate local node record with current state of various replication-related
* values, so we can check for sufficient walsenders and replication slots
@@ -6067,45 +6067,6 @@ check_recovery_type(PGconn *conn)
}
static void
drop_replication_slot_if_exists(PGconn *conn, int node_id, char *slot_name)
{
t_replication_slot slot_info = T_REPLICATION_SLOT_INITIALIZER;
RecordStatus record_status = get_slot_record(conn, slot_name, &slot_info);
log_verbose(LOG_DEBUG, "attempting to delete slot \"%s\" on node %i",
slot_name, node_id);
if (record_status != RECORD_FOUND)
{
log_info(_("no slot record found for slot \"%s\" on node %i"),
slot_name, node_id);
}
else
{
if (slot_info.active == false)
{
if (drop_replication_slot(conn, slot_name) == true)
{
log_notice(_("replication slot \"%s\" deleted on node %i"), slot_name, node_id);
}
else
{
log_error(_("unable to delete replication slot \"%s\" on node %i"), slot_name, node_id);
}
}
/*
* if active replication slot exists, call Houston as we have a
* problem
*/
else
{
log_warning(_("replication slot \"%s\" is still active on node %i"), slot_name, node_id);
}
}
}
/*
* Creates a recovery.conf file for a standby

View File

@@ -237,5 +237,6 @@ extern void get_node_config_directory(char *config_dir_buf);
extern void get_node_data_directory(char *data_dir_buf);
extern void init_node_record(t_node_info *node_record);
extern bool can_use_pg_rewind(PGconn *conn, const char *data_directory, PQExpBufferData *reason);
extern void drop_replication_slot_if_exists(PGconn *conn, int node_id, char *slot_name);
#endif /* _REPMGR_CLIENT_GLOBAL_H_ */

View File

@@ -2978,3 +2978,46 @@ can_use_pg_rewind(PGconn *conn, const char *data_directory, PQExpBufferData *rea
return can_use;
}
void
drop_replication_slot_if_exists(PGconn *conn, int node_id, char *slot_name)
{
t_replication_slot slot_info = T_REPLICATION_SLOT_INITIALIZER;
RecordStatus record_status = get_slot_record(conn, slot_name, &slot_info);
log_verbose(LOG_DEBUG, "attempting to delete slot \"%s\" on node %i",
slot_name, node_id);
if (record_status != RECORD_FOUND)
{
/* this is a good thing */
log_verbose(LOG_INFO,
_("slot \"%s\" does not exist on node %i, nothing to remove"),
slot_name, node_id);
}
else
{
if (slot_info.active == false)
{
if (drop_replication_slot(conn, slot_name) == true)
{
log_notice(_("replication slot \"%s\" deleted on node %i"), slot_name, node_id);
}
else
{
log_error(_("unable to delete replication slot \"%s\" on node %i"), slot_name, node_id);
}
}
/*
* if active replication slot exists, call Houston as we have a
* problem
*/
else
{
log_warning(_("replication slot \"%s\" is still active on node %i"), slot_name, node_id);
}
}
}