repmgr: before cloning a standby, verify sufficient free walsenders available

Previously repmgr only checked that 'max_wal_senders' is a positive value.
It will now additionally verify that the requisite number of replication
connections can actually be made before commencing with a cloning operation
using pg_basebackup.

GitHub #214
This commit is contained in:
Ian Barwick
2016-09-21 12:26:48 +09:00
parent e043d5c9a9
commit 5e338473f7

View File

@@ -157,9 +157,6 @@ static PQconninfoOption *opts = NULL;
/* conninfo params for the node we're cloning from */
t_conninfo_param_list source_conninfo;
static bool config_file_required = true;
/* Initialization of runtime options */
@@ -6670,6 +6667,10 @@ check_upstream_config(PGconn *conn, int server_version_num, bool exit_on_error)
config_ok = false;
}
/*
* work out how many replication connections we'll be making (1 or 2)
*/
i = guc_set_typed(conn, "max_wal_senders", ">", "0", "integer");
if (i == 0 || i == -1)
{
@@ -6688,6 +6689,81 @@ check_upstream_config(PGconn *conn, int server_version_num, bool exit_on_error)
config_ok = false;
}
if (!runtime_options.rsync_only)
{
t_basebackup_options backup_options = T_BASEBACKUP_OPTIONS_INITIALIZER;
PGconn **connections;
bool xlog_stream = true;
int i;
int min_replication_connections = 1,
possible_replication_connections = 0;
t_conninfo_param_list repl_conninfo;
parse_pg_basebackup_options(options.pg_basebackup_options, &backup_options);
/* Make a copy of the connection parameter arrays, and append "replication" */
initialize_conninfo_params(&repl_conninfo, false);
conn_to_param_list(conn, &repl_conninfo);
param_set(&repl_conninfo, "replication", "1");
/*
* work out how many replication connections are required (1 or 2)
*/
/*
* --xlog-method set by user to something other than "stream" (should be "fetch",
* but we're just checking it's not "stream")
*/
if (strlen(backup_options.xlog_method) && strcmp(backup_options.xlog_method, "stream") != 0)
xlog_stream = false;
if (xlog_stream == true)
min_replication_connections += 1;
log_verbose(LOG_NOTICE, "checking for available walsenders (%i required)\n", min_replication_connections);
connections = pg_malloc0(sizeof(PGconn *) * min_replication_connections);
/* Attempt to create the minimum number of required concurrent connections */
for (i = 0; i < min_replication_connections; i++)
{
PGconn *replication_conn;
replication_conn = establish_db_connection_by_params((const char**)repl_conninfo.keywords, (const char**)repl_conninfo.values, false);
if (PQstatus(replication_conn) == CONNECTION_OK)
{
connections[i] = replication_conn;
possible_replication_connections++;
}
}
/* Close previously created connections */
for (i = 0; i < possible_replication_connections; i++)
{
PQfinish(connections[i]);
}
if (possible_replication_connections < min_replication_connections)
{
config_ok = false;
log_err(_("unable to establish necessary replication connections\n"));
log_hint(_("increase 'max_wal_senders' by at least %i\n"), min_replication_connections - possible_replication_connections);
if (exit_on_error == true)
{
PQfinish(conn);
exit(ERR_BAD_CONFIG);
}
}
}
return config_ok;
}
@@ -7009,9 +7085,9 @@ format_db_cli_params(const char *conninfo, char *output)
strncpy(output, buf.data, MAXLEN);
termPQExpBuffer(&buf);
}
bool
copy_file(const char *old_filename, const char *new_filename)
{
@@ -7053,8 +7129,6 @@ copy_file(const char *old_filename, const char *new_filename)
}
static void
initialize_conninfo_params(t_conninfo_param_list *param_list, bool set_defaults)
{
@@ -7138,6 +7212,11 @@ param_set(t_conninfo_param_list *param_list, const char *param, const char *valu
}
/*
* Parse a conninfo string into a t_conninfo_param_list
*
* See conn_to_param_list() to do the same for a PQconn
*/
static bool
parse_conninfo_string(const char *conninfo_str, t_conninfo_param_list *param_list, char *errmsg)
{
@@ -7163,6 +7242,11 @@ parse_conninfo_string(const char *conninfo_str, t_conninfo_param_list *param_lis
}
/*
* Parse a PQconn into a t_conninfo_param_list
*
* See parse_conninfo_string() to do the same for a conninfo string
*/
static void
conn_to_param_list(PGconn *conn, t_conninfo_param_list *param_list)
{