diff --git a/repmgr.c b/repmgr.c index 7ddc6465..51cf2ebd 100644 --- a/repmgr.c +++ b/repmgr.c @@ -157,9 +157,6 @@ static PQconninfoOption *opts = NULL; /* conninfo params for the node we're cloning from */ t_conninfo_param_list source_conninfo; - - - static bool config_file_required = true; /* Initialization of runtime options */ @@ -6670,6 +6667,10 @@ check_upstream_config(PGconn *conn, int server_version_num, bool exit_on_error) config_ok = false; } + /* + * work out how many replication connections we'll be making (1 or 2) + */ + i = guc_set_typed(conn, "max_wal_senders", ">", "0", "integer"); if (i == 0 || i == -1) { @@ -6688,6 +6689,81 @@ check_upstream_config(PGconn *conn, int server_version_num, bool exit_on_error) config_ok = false; } + if (!runtime_options.rsync_only) + { + t_basebackup_options backup_options = T_BASEBACKUP_OPTIONS_INITIALIZER; + PGconn **connections; + bool xlog_stream = true; + int i; + + int min_replication_connections = 1, + possible_replication_connections = 0; + + t_conninfo_param_list repl_conninfo; + + parse_pg_basebackup_options(options.pg_basebackup_options, &backup_options); + + /* Make a copy of the connection parameter arrays, and append "replication" */ + + initialize_conninfo_params(&repl_conninfo, false); + + conn_to_param_list(conn, &repl_conninfo); + + param_set(&repl_conninfo, "replication", "1"); + + /* + * work out how many replication connections are required (1 or 2) + */ + + /* + * --xlog-method set by user to something other than "stream" (should be "fetch", + * but we're just checking it's not "stream") + */ + if (strlen(backup_options.xlog_method) && strcmp(backup_options.xlog_method, "stream") != 0) + xlog_stream = false; + + if (xlog_stream == true) + min_replication_connections += 1; + + log_verbose(LOG_NOTICE, "checking for available walsenders (%i required)\n", min_replication_connections); + + connections = pg_malloc0(sizeof(PGconn *) * min_replication_connections); + + /* Attempt to create the minimum number of required concurrent connections */ + for (i = 0; i < min_replication_connections; i++) + { + PGconn *replication_conn; + + replication_conn = establish_db_connection_by_params((const char**)repl_conninfo.keywords, (const char**)repl_conninfo.values, false); + + if (PQstatus(replication_conn) == CONNECTION_OK) + { + connections[i] = replication_conn; + possible_replication_connections++; + } + } + + /* Close previously created connections */ + for (i = 0; i < possible_replication_connections; i++) + { + PQfinish(connections[i]); + } + + if (possible_replication_connections < min_replication_connections) + { + config_ok = false; + log_err(_("unable to establish necessary replication connections\n")); + log_hint(_("increase 'max_wal_senders' by at least %i\n"), min_replication_connections - possible_replication_connections); + + + if (exit_on_error == true) + { + PQfinish(conn); + exit(ERR_BAD_CONFIG); + } + } + } + return config_ok; } @@ -7009,9 +7085,9 @@ format_db_cli_params(const char *conninfo, char *output) strncpy(output, buf.data, MAXLEN); termPQExpBuffer(&buf); - } + bool copy_file(const char *old_filename, const char *new_filename) { @@ -7053,8 +7129,6 @@ copy_file(const char *old_filename, const char *new_filename) } - - static void initialize_conninfo_params(t_conninfo_param_list *param_list, bool set_defaults) { @@ -7138,6 +7212,11 @@ param_set(t_conninfo_param_list *param_list, const char *param, const char *valu } +/* + * Parse a conninfo string into a t_conninfo_param_list + * + * See conn_to_param_list() to do the same for a PQconn + */ static bool parse_conninfo_string(const char *conninfo_str, t_conninfo_param_list *param_list, char *errmsg) { @@ -7163,6 +7242,11 @@ parse_conninfo_string(const char *conninfo_str, t_conninfo_param_list *param_lis } +/* + * Parse a PQconn into a t_conninfo_param_list + * + * See parse_conninfo_string() to do the same for a conninfo string + */ static void conn_to_param_list(PGconn *conn, t_conninfo_param_list *param_list) {