repmgr: have "standby follow" delete old replication slot, if possible

Addresses GitHub #272
This commit is contained in:
Ian Barwick
2017-02-16 16:12:16 +09:00
committed by Ian Barwick
parent fa30382f2c
commit 989f683bc6
2 changed files with 90 additions and 26 deletions

View File

@@ -3,6 +3,8 @@
monitoring table (Ian)
repmgrd: fix error in XLogRecPtr conversion when calculating
monitoring statistics (Ian)
repmgr: if replication slots in use, where possible delete slot on old
upstream node after following new upstream (Ian)
3.3 2016-12-27
repmgr: always log to STDERR even if log facility defined (Ian)

114
repmgr.c
View File

@@ -166,6 +166,8 @@ static void config_file_list_add(t_configfile_list *list, const char *file, cons
static void matrix_set_node_status(t_node_matrix_rec **matrix_rec_list, int n, int node_id, int connection_node_id, int connection_status);
static void cube_set_node_status(t_node_status_cube **cube, int n, int node_id, int matrix_node_id, int connection_node_id, int connection_status);
static void drop_replication_slot_if_exists(PGconn *conn, int node_id, char *slot_name);
/* Global variables */
static PQconninfoOption *opts = NULL;
@@ -4527,7 +4529,8 @@ do_standby_follow(void)
char script[MAXLEN];
char master_conninfo[MAXLEN];
PGconn *master_conn;
int master_id = 0;
int master_id = UNKNOWN_NODE_ID,
original_upstream_node_id = UNKNOWN_NODE_ID;
int r,
retval;
@@ -4674,7 +4677,11 @@ do_standby_follow(void)
}
/* Fetch our node record so we can write application_name, if set */
/*
* Fetch our node record so we can write application_name, if set,
* and to get the upstream node ID, which we'll need to know if
* replication slots are in use and we want to delete the old slot.
*/
query_result = get_node_record(master_conn,
options.cluster_name,
options.node,
@@ -4707,6 +4714,15 @@ do_standby_follow(void)
if (application_name != NULL && strlen(application_name))
param_set(&recovery_conninfo, "application_name", application_name);
}
if (local_node_record.upstream_node_id != UNKNOWN_NODE_ID)
{
original_upstream_node_id = local_node_record.upstream_node_id;
}
else
{
original_upstream_node_id = master_id;
}
}
log_info(_("changing standby's master to node %i\n"), master_id);
@@ -4736,6 +4752,43 @@ do_standby_follow(void)
exit(ERR_NO_RESTART);
}
/*
* If replication slots are in use, and an inactive one for this node
* (a former standby) exists on the former upstream, drop it.
*/
if (options.use_replication_slots)
{
t_node_info upstream_node_record = T_NODE_INFO_INITIALIZER;
int upstream_query_result;
/* XXX should we poll for server restart? */
conn = establish_db_connection(options.conninfo, true);
upstream_query_result = get_node_record(conn,
options.cluster_name,
original_upstream_node_id,
&upstream_node_record);
PQfinish(conn);
conn = establish_db_connection(upstream_node_record.conninfo_str, false);
if (PQstatus(conn) != CONNECTION_OK)
{
log_warning("unable to connect to old upstream node %i to remove replication slot\n",
original_upstream_node_id);
}
else
{
log_debug("attempting to remove replication slot from old upstream node %i\n",
original_upstream_node_id);
drop_replication_slot_if_exists(conn,
original_upstream_node_id,
local_node_record.slot_name);
}
}
/*
* It's possible this node was an inactive primary - update the
* relevant fields to ensure it's marked as an active standby
@@ -5629,30 +5682,9 @@ do_standby_switchover(void)
}
else
{
t_replication_slot slot_info;
int query_res;
query_res = get_slot_record(remote_conn, local_node_record.slot_name, &slot_info);
if (query_res)
{
if (slot_info.active == false)
{
if (drop_replication_slot(remote_conn, local_node_record.slot_name) == true)
{
log_notice(_("replication slot \"%s\" deleted on former master\n"), local_node_record.slot_name);
}
else
{
log_err(_("unable to delete replication slot \"%s\" on former master\n"), local_node_record.slot_name);
}
}
/* if active replication slot exists, call Houston as we have a problem */
else
{
log_err(_("replication slot \"%s\" is still active on former master\n"), local_node_record.slot_name);
}
}
drop_replication_slot_if_exists(remote_conn,
remote_node_id,
local_node_record.slot_name);
}
PQfinish(remote_conn);
@@ -8754,3 +8786,33 @@ config_file_list_add(t_configfile_list *list, const char *file, const char *file
list->entries ++;
}
static void
drop_replication_slot_if_exists(PGconn *conn, int node_id, char *slot_name)
{
t_replication_slot slot_info;
int query_res;
query_res = get_slot_record(conn,slot_name, &slot_info);
if (query_res)
{
if (slot_info.active == false)
{
if (drop_replication_slot(conn, slot_name) == true)
{
log_notice(_("replication slot \"%s\" deleted on node %i\n"), slot_name, node_id);
}
else
{
log_err(_("unable to delete replication slot \"%s\" on node %i\n"), slot_name, node_id);
}
}
/* if active replication slot exists, call Houston as we have a problem */
else
{
log_err(_("replication slot \"%s\" is still active on node %i\n"), slot_name, node_id);
}
}
}