More functionality for "standby clone"

This commit is contained in:
Ian Barwick
2017-04-29 22:41:43 +09:00
parent dc347f1484
commit bcb17dd71a
8 changed files with 1102 additions and 79 deletions

View File

@@ -65,7 +65,6 @@ main(int argc, char **argv)
bool config_file_parsed = false;
set_progname(argv[0]);
/*
@@ -75,8 +74,7 @@ main(int argc, char **argv)
*/
logger_output_mode = OM_COMMAND_LINE;
while ((c = getopt_long(argc, argv, "?Vf:Fb:S:L:vtD:", long_options,
while ((c = getopt_long(argc, argv, "?Vf:Fb:S:L:vtD:cr", long_options,
&optindex)) != -1)
{
/*
@@ -161,6 +159,59 @@ main(int argc, char **argv)
strncpy(runtime_options.node_name, optarg, MAXLEN);
break;
/* standby clone options *
* --------------------- */
case 'c':
runtime_options.fast_checkpoint = true;
break;
case 'r':
runtime_options.rsync_only = true;
break;
case OPT_NO_UPSTREAM_CONNECTION:
runtime_options.no_upstream_connection = true;
break;
case OPT_RECOVERY_MIN_APPLY_DELAY:
{
char *ptr = NULL;
int targ = strtol(optarg, &ptr, 10);
if (targ < 1)
{
item_list_append(&cli_errors, _("Invalid value provided for '--recovery-min-apply-delay'"));
break;
}
if (ptr && *ptr)
{
if (strcmp(ptr, "ms") != 0 && strcmp(ptr, "s") != 0 &&
strcmp(ptr, "min") != 0 && strcmp(ptr, "h") != 0 &&
strcmp(ptr, "d") != 0)
{
item_list_append(&cli_errors,
_("Value provided for '--recovery-min-apply-delay' must be one of ms/s/min/h/d"));
break;
}
}
strncpy(runtime_options.recovery_min_apply_delay, optarg, MAXLEN);
break;
}
case OPT_REPLICATION_USER:
strncpy(runtime_options.replication_user, optarg, MAXLEN);
break;
case OPT_UPSTREAM_CONNINFO:
strncpy(runtime_options.upstream_conninfo, optarg, MAXLEN);
break;
case OPT_WITHOUT_BARMAN:
runtime_options.without_barman = true;
break;
/* event options *
* ------------- */
@@ -204,16 +255,17 @@ main(int argc, char **argv)
runtime_options.log_to_file = true;
logger_output_mode = OM_DAEMON;
break;
/* --terse */
case 't':
runtime_options.terse = true;
break;
/* --verbose */
case 'v':
runtime_options.verbose = true;
break;
}
}
@@ -694,6 +746,7 @@ action_name(const int action)
return "UNKNOWN ACTION";
}
static void
exit_with_errors(void)
{
@@ -719,6 +772,27 @@ print_item_list(ItemList *item_list)
}
void
print_error_list(ItemList *error_list, int log_level)
{
ItemListCell *cell;
for (cell = error_list->head; cell; cell = cell->next)
{
switch(log_level)
{
/* Currently we only need errors and warnings */
case LOG_ERROR:
log_error("%s", cell->string);
break;
case LOG_WARNING:
log_warning("%s", cell->string);
break;
}
}
}
static void
do_help(void)
{
@@ -771,9 +845,6 @@ do_help(void)
/*
* Create the repmgr extension, and grant access for the repmgr
* user if not a superuser.
@@ -790,51 +861,36 @@ create_repmgr_extension(PGconn *conn)
PQExpBufferData query;
PGresult *res;
t_extension_status extension_status;
char *current_user;
const char *superuser_status;
bool is_superuser;
PGconn *superuser_conn = NULL;
PGconn *schema_create_conn = NULL;
initPQExpBuffer(&query);
extension_status = get_repmgr_extension_status(conn);
appendPQExpBuffer(&query,
" SELECT ae.name, e.extname "
" FROM pg_catalog.pg_available_extensions ae "
"LEFT JOIN pg_catalog.pg_extension e "
" ON e.extname=ae.name "
" WHERE ae.name='repmgr' ");
res = PQexec(conn, query.data);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
switch(extension_status)
{
log_error(_("unable to execute extension query:\n %s"),
PQerrorMessage(conn));
PQclear(res);
case REPMGR_UNKNOWN:
log_error(_("unable to determine status of repmgr extension"));
return false;
case REPMGR_UNAVAILABLE:
log_error(_("\"repmgr\" extension is not available"));
return false;
case REPMGR_INSTALLED:
/* TODO: check version */
log_info(_("extension \"repmgr\" already installed"));
return true;
case REPMGR_AVAILABLE:
log_notice(_("attempting to install extension \"repmgr\""));
break;
return false;
}
/* 1. Check extension is actually available */
if (PQntuples(res) == 0)
{
log_error(_("\"repmgr\" extension is not available"));
return false;
}
/* 2. Check if extension installed */
if (PQgetisnull(res, 0, 1) == 0)
{
/* TODO: check version */
log_info(_("extension \"repmgr\" already installed"));
return true;
}
PQclear(res);
termPQExpBuffer(&query);
log_notice(_("attempting to install extension \"repmgr\""));
/* 3. Check if repmgr user is superuser, if not connect as superuser */
current_user = PQuser(conn);
@@ -1091,3 +1147,389 @@ local_command(const char *command, PQExpBufferData *outputbuf)
return true;
}
}
/*
* check_upstream_config()
*
* Perform sanity check on upstream server configuration before starting cloning
* process
*
* TODO:
* - check user is qualified to perform base backup
*/
bool
check_upstream_config(PGconn *conn, int server_version_num, bool exit_on_error)
{
int i;
bool config_ok = true;
char *wal_error_message = NULL;
t_basebackup_options backup_options = T_BASEBACKUP_OPTIONS_INITIALIZER;
bool backup_options_ok = true;
ItemList backup_option_errors = { NULL, NULL };
bool xlog_stream = true;
standy_clone_mode mode;
/*
* Detecting the intended cloning mode
*/
mode = get_standby_clone_mode();
/*
* Parse `pg_basebackup_options`, if set, to detect whether --xlog-method
* has been set to something other than `stream` (i.e. `fetch`), as
* this will influence some checks
*/
backup_options_ok = parse_pg_basebackup_options(
config_file_options.pg_basebackup_options,
&backup_options, server_version_num,
&backup_option_errors);
if (backup_options_ok == false)
{
if (exit_on_error == true)
{
log_error(_("error(s) encountered parsing 'pg_basebackup_options'"));
print_error_list(&backup_option_errors, LOG_ERR);
log_hint(_("'pg_basebackup_options' is: '%s'"),
config_file_options.pg_basebackup_options);
exit(ERR_BAD_CONFIG);
}
config_ok = false;
}
if (strlen(backup_options.xlog_method) && strcmp(backup_options.xlog_method, "stream") != 0)
xlog_stream = false;
/* Check that WAL level is set correctly */
if (server_version_num < 90400)
{
i = guc_set(conn, "wal_level", "=", "hot_standby");
wal_error_message = _("parameter 'wal_level' must be set to 'hot_standby'");
}
else
{
char *levels_pre96[] = {
"hot_standby",
"logical",
NULL,
};
/*
* Note that in 9.6+, "hot_standby" and "archive" are accepted as aliases
* for "replica", but current_setting() will of course always return "replica"
*/
char *levels_96plus[] = {
"replica",
"logical",
NULL,
};
char **levels;
int j = 0;
if (server_version_num < 90600)
{
levels = (char **)levels_pre96;
wal_error_message = _("parameter 'wal_level' must be set to 'hot_standby' or 'logical'");
}
else
{
levels = (char **)levels_96plus;
wal_error_message = _("parameter 'wal_level' must be set to 'replica' or 'logical'");
}
do
{
i = guc_set(conn, "wal_level", "=", levels[j]);
if (i)
{
break;
}
j++;
} while (levels[j] != NULL);
}
if (i == 0 || i == -1)
{
if (i == 0)
log_error("%s",
wal_error_message);
if (exit_on_error == true)
{
PQfinish(conn);
exit(ERR_BAD_CONFIG);
}
config_ok = false;
}
if (config_file_options.use_replication_slots)
{
/* Does the server support physical replication slots? */
if (server_version_num < 90400)
{
log_error(_("server version must be 9.4 or later to enable replication slots"));
if (exit_on_error == true)
{
PQfinish(conn);
exit(ERR_BAD_CONFIG);
}
config_ok = false;
}
/* Server is 9.4 or greater - non-zero `max_replication_slots` required */
else
{
i = guc_set_typed(conn, "max_replication_slots", ">",
"0", "integer");
if (i == 0 || i == -1)
{
if (i == 0)
{
log_error(_("parameter 'max_replication_slots' must be set to at least 1 to enable replication slots"));
log_hint(_("'max_replication_slots' should be set to at least the number of expected standbys"));
if (exit_on_error == true)
{
PQfinish(conn);
exit(ERR_BAD_CONFIG);
}
config_ok = false;
}
}
}
}
/*
* physical replication slots not available or not requested - check if
* there are any circumstances where `wal_keep_segments` should be set
*/
else if (mode != barman)
{
bool check_wal_keep_segments = false;
char min_wal_keep_segments[MAXLEN] = "1";
/*
* -w/--wal-keep-segments was supplied - check against that value
*/
if (runtime_options.wal_keep_segments_used == true)
{
check_wal_keep_segments = true;
strncpy(min_wal_keep_segments, runtime_options.wal_keep_segments, MAXLEN);
}
/*
* A non-zero `wal_keep_segments` value will almost certainly be required
* if rsync mode is being used, or pg_basebackup with --xlog-method=fetch,
* *and* no restore command has been specified
*/
else if ( (runtime_options.rsync_only == true || xlog_stream == false)
&& strcmp(config_file_options.restore_command, "") == 0)
{
check_wal_keep_segments = true;
}
if (check_wal_keep_segments == true)
{
i = guc_set_typed(conn, "wal_keep_segments", ">=", min_wal_keep_segments, "integer");
if (i == 0 || i == -1)
{
if (i == 0)
{
log_error(_("parameter 'wal_keep_segments' on the upstream server must be be set to %s or greater"),
min_wal_keep_segments);
log_hint(_("Choose a value sufficiently high enough to retain enough WAL "
"until the standby has been cloned and started.\n "
"Alternatively set up WAL archiving using e.g. PgBarman and configure "
"'restore_command' in repmgr.conf to fetch WALs from there."
));
if (server_version_num >= 90400)
{
log_hint(_("In PostgreSQL 9.4 and later, replication slots can be used, which "
"do not require 'wal_keep_segments' to be set "
"(set parameter 'use_replication_slots' in repmgr.conf to enable)\n"
));
}
}
if (exit_on_error == true)
{
PQfinish(conn);
exit(ERR_BAD_CONFIG);
}
config_ok = false;
}
}
}
/*
* If archive_mode is enabled, check that 'archive_command' is non empty
* (however it's not practical to check that it actually represents a valid
* command).
*
* From PostgreSQL 9.5, archive_mode can be one of 'off', 'on' or 'always'
* so for ease of backwards compatibility, rather than explicitly check for an
* enabled mode, check that it's not "off".
*/
if (guc_set(conn, "archive_mode", "!=", "off"))
{
i = guc_set(conn, "archive_command", "!=", "");
if (i == 0 || i == -1)
{
if (i == 0)
log_error(_("parameter 'archive_command' must be set to a valid command"));
if (exit_on_error == true)
{
PQfinish(conn);
exit(ERR_BAD_CONFIG);
}
config_ok = false;
}
}
/*
* Check that 'hot_standby' is on. This isn't strictly necessary
* for the primary server, however the assumption is that we'll be
* cloning standbys and thus copying the primary configuration;
* this way the standby will be correctly configured by default.
*/
i = guc_set(conn, "hot_standby", "=", "on");
if (i == 0 || i == -1)
{
if (i == 0)
log_error(_("parameter 'hot_standby' must be set to 'on'"));
if (exit_on_error == true)
{
PQfinish(conn);
exit(ERR_BAD_CONFIG);
}
config_ok = false;
}
i = guc_set_typed(conn, "max_wal_senders", ">", "0", "integer");
if (i == 0 || i == -1)
{
if (i == 0)
{
log_error(_("parameter 'max_wal_senders' must be set to be at least 1"));
log_hint(_("'max_wal_senders' should be set to at least the number of expected standbys"));
}
if (exit_on_error == true)
{
PQfinish(conn);
exit(ERR_BAD_CONFIG);
}
config_ok = false;
}
/*
* If using pg_basebackup, ensure sufficient replication connections can be made.
* There's no guarantee they'll still be available by the time pg_basebackup
* is executed, but there's nothing we can do about that.
*/
if (mode == pg_basebackup)
{
PGconn **connections;
int i;
int min_replication_connections = 1,
possible_replication_connections = 0;
t_conninfo_param_list repl_conninfo;
/* Make a copy of the connection parameter arrays, and append "replication" */
initialize_conninfo_params(&repl_conninfo, false);
conn_to_param_list(conn, &repl_conninfo);
param_set(&repl_conninfo, "replication", "1");
if (*runtime_options.replication_user)
param_set(&repl_conninfo, "user", runtime_options.replication_user);
/*
* work out how many replication connections are required (1 or 2)
*/
if (xlog_stream == true)
min_replication_connections += 1;
log_verbose(LOG_NOTICE, "checking for available walsenders on upstream node (%i required)",
min_replication_connections);
connections = pg_malloc0(sizeof(PGconn *) * min_replication_connections);
/* Attempt to create the minimum number of required concurrent connections */
for (i = 0; i < min_replication_connections; i++)
{
PGconn *replication_conn;
replication_conn = establish_db_connection_by_params((const char**)repl_conninfo.keywords, (const char**)repl_conninfo.values, false);
if (PQstatus(replication_conn) == CONNECTION_OK)
{
connections[i] = replication_conn;
possible_replication_connections++;
}
}
/* Close previously created connections */
for (i = 0; i < possible_replication_connections; i++)
{
PQfinish(connections[i]);
}
if (possible_replication_connections < min_replication_connections)
{
config_ok = false;
log_error(_("unable to establish necessary replication connections"));
log_hint(_("increase 'max_wal_senders' by at least %i"), min_replication_connections - possible_replication_connections);
if (exit_on_error == true)
{
PQfinish(conn);
exit(ERR_BAD_CONFIG);
}
}
log_verbose(LOG_INFO, "sufficient walsenders available on upstream node (%i required)",
min_replication_connections);
}
return config_ok;
}
standy_clone_mode
get_standby_clone_mode(void)
{
standy_clone_mode mode;
if (runtime_options.rsync_only)
mode = rsync;
else if (strcmp(config_file_options.barman_host, "") != 0 && ! runtime_options.without_barman)
mode = barman;
else
mode = pg_basebackup;
return mode;
}