From bcb17dd71a8a3ad4501f429a43f8a2804cbab157 Mon Sep 17 00:00:00 2001 From: Ian Barwick Date: Sat, 29 Apr 2017 22:41:43 +0900 Subject: [PATCH] More functionality for "standby clone" --- config.c | 148 +++++++++++- config.h | 18 ++ dbutils.c | 234 ++++++++++++++++++ dbutils.h | 15 ++ repmgr-action-standby.c | 207 ++++++++++++++-- repmgr-client-global.h | 24 +- repmgr-client.c | 524 ++++++++++++++++++++++++++++++++++++---- repmgr-client.h | 11 +- 8 files changed, 1102 insertions(+), 79 deletions(-) diff --git a/config.c b/config.c index 7e421f3c..204964a2 100644 --- a/config.c +++ b/config.c @@ -539,11 +539,12 @@ _parse_config(t_configuration_options *options, ItemList *error_list, ItemList * } /* add warning about changed "barman_" parameter meanings */ - if (options->barman_server[0] == '\0' && options->barman_server[0] != '\0') + if ((options->barman_host[0] == '\0' && options->barman_server[0] != '\0') || + (options->barman_host[0] != '\0' && options->barman_server[0] == '\0')) { - item_list_append(warning_list, + item_list_append(error_list, _("use \"barman_host\" for the hostname of the Barman server")); - item_list_append(warning_list, + item_list_append(error_list, _("use \"barman_server\" for the name of the [server] section in the Barman configururation file")); } @@ -923,3 +924,144 @@ parse_event_notifications_list(t_configuration_options *options, const char *arg } } + +bool +parse_pg_basebackup_options(const char *pg_basebackup_options, t_basebackup_options *backup_options, int server_version_num, ItemList *error_list) +{ + int options_len = strlen(pg_basebackup_options) + 1; + char *options_string = pg_malloc(options_len); + + char *options_string_ptr = options_string; + /* + * Add parsed options to this list, then copy to an array + * to pass to getopt + */ + static ItemList option_argv = { NULL, NULL }; + + char *argv_item; + int c, argc_item = 1; + + char **argv_array; + ItemListCell *cell; + + int optindex = 0; + + struct option *long_options; + + bool backup_options_ok = true; + + /* We're only interested in these options */ + static struct option long_options_9[] = + { + {"slot", required_argument, NULL, 'S'}, + {"xlog-method", required_argument, NULL, 'X'}, + {NULL, 0, NULL, 0} + }; + + /* + * From PostgreSQL 10, --xlog-method is renamed --wal-method + * and there's also --no-slot, which we'll want to consider. + */ + static struct option long_options_10[] = + { + {"slot", required_argument, NULL, 'S'}, + {"wal-method", required_argument, NULL, 'X'}, + {"no-slot", no_argument, NULL, 1}, + {NULL, 0, NULL, 0} + }; + + /* Don't attempt to tokenise an empty string */ + if (!strlen(pg_basebackup_options)) + return backup_options_ok; + + if (server_version_num >= 100000) + long_options = long_options_10; + else + long_options = long_options_9; + + /* Copy the string before operating on it with strtok() */ + strncpy(options_string, pg_basebackup_options, options_len); + + /* Extract arguments into a list and keep a count of the total */ + while ((argv_item = strtok(options_string_ptr, " ")) != NULL) + { + item_list_append(&option_argv, argv_item); + + argc_item++; + + if (options_string_ptr != NULL) + options_string_ptr = NULL; + } + + /* + * Array of argument values to pass to getopt_long - this will need to + * include an empty string as the first value (normally this would be + * the program name) + */ + argv_array = pg_malloc0(sizeof(char *) * (argc_item + 2)); + + /* Insert a blank dummy program name at the start of the array */ + argv_array[0] = pg_malloc0(1); + + c = 1; + + /* + * Copy the previously extracted arguments from our list to the array + */ + for (cell = option_argv.head; cell; cell = cell->next) + { + int argv_len = strlen(cell->string) + 1; + + argv_array[c] = pg_malloc0(argv_len); + + strncpy(argv_array[c], cell->string, argv_len); + + c++; + } + + argv_array[c] = NULL; + + /* Reset getopt's optind variable */ + optind = 0; + + /* Prevent getopt from emitting errors */ + opterr = 0; + + while ((c = getopt_long(argc_item, argv_array, "S:X:", long_options, + &optindex)) != -1) + { + switch (c) + { + case 'S': + strncpy(backup_options->slot, optarg, MAXLEN); + break; + case 'X': + strncpy(backup_options->xlog_method, optarg, MAXLEN); + break; + case 1: + backup_options->no_slot = true; + break; + case '?': + if (server_version_num >= 100000 && optopt == 1) + { + if (error_list != NULL) + { + item_list_append(error_list, "invalid use of --no-slot"); + } + backup_options_ok = false; + } + break; + } + } + + if (backup_options->no_slot == true && backup_options->slot[0] != '\0') + { + if (error_list != NULL) + { + item_list_append(error_list, "--no-slot cannot be used with -S/--slot"); + } + backup_options_ok = false; + } + + return backup_options_ok; +} diff --git a/config.h b/config.h index 565e276f..ebf490a8 100644 --- a/config.h +++ b/config.h @@ -8,6 +8,8 @@ #ifndef _REPMGR_CONFIG_H_ #define _REPMGR_CONFIG_H_ +#include + #define CONFIG_FILE_NAME "repmgr.conf" #define MAXLINELENGTH 4096 extern bool config_file_found; @@ -127,6 +129,17 @@ typedef struct +typedef struct +{ + char slot[MAXLEN]; + char xlog_method[MAXLEN]; + bool no_slot; /* from PostgreSQL 10 */ +} t_basebackup_options; + +#define T_BASEBACKUP_OPTIONS_INITIALIZER { "", "", false } + + + void set_progname(const char *argv0); const char *progname(void); @@ -144,4 +157,9 @@ bool parse_bool(const char *s, const char *config_item, ItemList *error_list); +bool parse_pg_basebackup_options(const char *pg_basebackup_options, + t_basebackup_options *backup_options, + int server_version_num, + ItemList *error_list); + #endif diff --git a/dbutils.c b/dbutils.c index 8b6e894d..ba03c4c1 100644 --- a/dbutils.c +++ b/dbutils.c @@ -24,6 +24,56 @@ static int _get_node_record(PGconn *conn, char *sqlquery, t_node_info *node_info static void _populate_node_record(PGresult *res, t_node_info *node_info, int row); static bool _create_update_node_record(PGconn *conn, char *action, t_node_info *node_info); +/* =================== */ +/* extension functions */ +/* =================== */ + +t_extension_status +get_repmgr_extension_status(PGconn *conn) +{ + PQExpBufferData query; + PGresult *res; + + /* TODO: check version */ + + initPQExpBuffer(&query); + + appendPQExpBuffer(&query, + " SELECT ae.name, e.extname " + " FROM pg_catalog.pg_available_extensions ae " + "LEFT JOIN pg_catalog.pg_extension e " + " ON e.extname=ae.name " + " WHERE ae.name='repmgr' "); + + res = PQexec(conn, query.data); + + termPQExpBuffer(&query); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + log_error(_("unable to execute extension query:\n %s"), + PQerrorMessage(conn)); + PQclear(res); + + return REPMGR_UNKNOWN; + } + + /* 1. Check extension is actually available */ + + if (PQntuples(res) == 0) + { + return REPMGR_UNAVAILABLE; + } + + /* 2. Check if extension installed */ + if (PQgetisnull(res, 0, 1) == 0) + { + return REPMGR_INSTALLED; + } + + return REPMGR_AVAILABLE; +} + /* ==================== */ /* Connection functions */ /* ==================== */ @@ -539,10 +589,193 @@ set_config_bool(PGconn *conn, const char *config_param, bool state) return _set_config(conn, config_param, sqlquery); } + +int +guc_set(PGconn *conn, const char *parameter, const char *op, + const char *value) +{ + PQExpBufferData query; + PGresult *res; + int retval = 1; + + char *escaped_parameter = escape_string(conn, parameter); + char *escaped_value = escape_string(conn, value); + + initPQExpBuffer(&query); + appendPQExpBuffer(&query, + "SELECT true FROM pg_catalog.pg_settings " + " WHERE name = '%s' AND setting %s '%s'", + escaped_parameter, op, escaped_value); + + log_verbose(LOG_DEBUG, "guc_set():\n%s", query.data); + + res = PQexec(conn, query.data); + + termPQExpBuffer(&query); + pfree(escaped_parameter); + pfree(escaped_value); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + log_error(_("guc_set(): unable to execute query\n%s"), + PQerrorMessage(conn)); + retval = -1; + } + else if (PQntuples(res) == 0) + { + retval = 0; + } + + PQclear(res); + + return retval; +} + +/** + * Just like guc_set except with an extra parameter containing the name of + * the pg datatype so that the comparison can be done properly. + */ +int +guc_set_typed(PGconn *conn, const char *parameter, const char *op, + const char *value, const char *datatype) +{ + PQExpBufferData query; + PGresult *res; + int retval = 1; + + char *escaped_parameter = escape_string(conn, parameter); + char *escaped_value = escape_string(conn, value); + + initPQExpBuffer(&query); + appendPQExpBuffer(&query, + "SELECT true FROM pg_catalog.pg_settings " + " WHERE name = '%s' AND setting::%s %s '%s'::%s", + parameter, datatype, op, value, datatype); + + log_verbose(LOG_DEBUG, "guc_set_typed():\n%s\n", query.data); + + res = PQexec(conn, query.data); + + termPQExpBuffer(&query); + pfree(escaped_parameter); + pfree(escaped_value); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + log_error(_("guc_set_typed(): unable to execute query\n %s"), + PQerrorMessage(conn)); + retval = -1; + } + else if (PQntuples(res) == 0) + { + retval = 0; + } + + PQclear(res); + + return retval; +} + + +bool +get_pg_setting(PGconn *conn, const char *setting, char *output) +{ + PQExpBufferData query; + PGresult *res; + int i; + bool success = false; + + char *escaped_setting = escape_string(conn, setting); + + if (escaped_setting == NULL) + { + log_error(_("unable to escape setting '%s'"), setting); + return false; + } + + initPQExpBuffer(&query); + appendPQExpBuffer(&query, + "SELECT name, setting " + " FROM pg_catalog.pg_settings WHERE name = '%s'", + escaped_setting); + + log_verbose(LOG_DEBUG, "get_pg_setting(): %s\n", query.data); + + res = PQexec(conn, query.data); + + termPQExpBuffer(&query); + pfree(escaped_setting); + + if (res == NULL || PQresultStatus(res) != PGRES_TUPLES_OK) + { + log_error(_("get_pg_setting() - PQexec failed: %s"), + PQerrorMessage(conn)); + PQclear(res); + return false; + } + + for (i = 0; i < PQntuples(res); i++) + { + if (strcmp(PQgetvalue(res, i, 0), setting) == 0) + { + strncpy(output, PQgetvalue(res, i, 1), MAXLEN); + success = true; + break; + } + else + { + /* XXX highly unlikely this would ever happen */ + log_error(_("get_pg_setting(): unknown parameter \"%s\""), PQgetvalue(res, i, 0)); + } + } + + if (success == true) + { + log_verbose(LOG_DEBUG, _("get_pg_setting(): returned value is \"%s\""), output); + } + + PQclear(res); + + return success; +} + + /* ============================ */ /* Server information functions */ /* ============================ */ + +bool +get_cluster_size(PGconn *conn, char *size) +{ + PQExpBufferData query; + PGresult *res; + + initPQExpBuffer(&query); + appendPQExpBuffer(&query, + "SELECT pg_catalog.pg_size_pretty(SUM(pg_catalog.pg_database_size(oid))::bigint) " + " FROM pg_catalog.pg_database "); + + log_verbose(LOG_DEBUG, "get_cluster_size():\n%s\n", query.data); + + res = PQexec(conn, query.data); + termPQExpBuffer(&query); + + if (res == NULL || PQresultStatus(res) != PGRES_TUPLES_OK) + { + log_error(_("get_cluster_size(): unable to execute query\n%s"), + PQerrorMessage(conn)); + + PQclear(res); + return false; + } + + strncpy(size, PQgetvalue(res, 0, 0), MAXLEN); + + PQclear(res); + return true; +} + /* * Return the server version number for the connection provided */ @@ -635,6 +868,7 @@ get_master_connection(PGconn *conn, /* find all registered nodes */ log_info(_("retrieving node list")); + initPQExpBuffer(&query); appendPQExpBuffer(&query, " SELECT node_id, conninfo, " diff --git a/dbutils.h b/dbutils.h index 5f792134..c2b646eb 100644 --- a/dbutils.h +++ b/dbutils.h @@ -26,6 +26,13 @@ typedef enum { BDR } t_server_type; +typedef enum { + REPMGR_INSTALLED = 0, + REPMGR_AVAILABLE, + REPMGR_UNAVAILABLE, + REPMGR_UNKNOWN +} t_extension_status; + /* * Struct to store node information */ @@ -119,6 +126,8 @@ PGconn *establish_db_connection_by_params(const char *keywords[], const char *values[], const bool exit_on_error); +/* extension functions */ +t_extension_status get_repmgr_extension_status(PGconn *conn); /* conninfo manipulation functions */ bool get_conninfo_value(const char *conninfo, const char *keyword, char *output); @@ -139,8 +148,14 @@ bool check_cluster_schema(PGconn *conn); /* GUC manipulation functions */ bool set_config(PGconn *conn, const char *config_param, const char *config_value); bool set_config_bool(PGconn *conn, const char *config_param, bool state); +int guc_set(PGconn *conn, const char *parameter, const char *op, + const char *value); +int guc_set_typed(PGconn *conn, const char *parameter, const char *op, + const char *value, const char *datatype); +bool get_pg_setting(PGconn *conn, const char *setting, char *output); /* server information functions */ +bool get_cluster_size(PGconn *conn, char *size); int get_server_version(PGconn *conn, char *server_version); int is_standby(PGconn *conn); PGconn *get_master_connection(PGconn *standby_conn, int *master_id, char *master_conninfo_out); diff --git a/repmgr-action-standby.c b/repmgr-action-standby.c index 131c456b..9f7f2b28 100644 --- a/repmgr-action-standby.c +++ b/repmgr-action-standby.c @@ -12,7 +12,21 @@ #include "repmgr-client-global.h" #include "repmgr-action-standby.h" +static PGconn *primary_conn = NULL; +static PGconn *source_conn = NULL; + + static char local_data_directory[MAXPGPATH]; +static bool local_data_directory_provided = false; + +static bool upstream_record_found = false; +static int upstream_node_id = UNKNOWN_NODE_ID; +static char upstream_data_directory[MAXPGPATH]; + +static t_conninfo_param_list recovery_conninfo; +static char recovery_conninfo_str[MAXLEN]; + +static standy_clone_mode mode; /* used by barman mode */ static char local_repmgr_tmp_directory[MAXPGPATH]; @@ -20,46 +34,22 @@ static char local_repmgr_tmp_directory[MAXPGPATH]; static void check_barman_config(void); static char *make_barman_ssh_command(char *buf); +static void check_source_server(void); void do_standby_clone(void) { - PGconn *primary_conn = NULL; - PGconn *source_conn = NULL; - PGresult *res; - - int server_version_num = UNKNOWN_SERVER_VERSION_NUM; - char cluster_size[MAXLEN]; /* * conninfo params for the actual upstream node (which might be different * to the node we're cloning from) to write to recovery.conf */ - t_conninfo_param_list recovery_conninfo; - char recovery_conninfo_str[MAXLEN]; - bool upstream_record_found = false; - int upstream_node_id = UNKNOWN_NODE_ID; - - char upstream_data_directory[MAXPGPATH]; - bool local_data_directory_provided = false; - - enum { - barman, - rsync, - pg_basebackup - } mode; - /* * detecting the cloning mode */ - if (runtime_options.rsync_only) - mode = rsync; - else if (strcmp(config_file_options.barman_host, "") != 0 && ! runtime_options.without_barman) - mode = barman; - else - mode = pg_basebackup; + mode = get_standby_clone_mode(); /* * In rsync mode, we need to check the SSH connection early @@ -154,6 +144,22 @@ do_standby_clone(void) } } + + /* + * --upstream-conninfo supplied, which we interpret to imply + * --no-upstream-connection as well - the use case for this option is when + * the upstream is not available, so no point in checking for it. + */ + + if (*runtime_options.upstream_conninfo) + runtime_options.no_upstream_connection = false; + + /* By default attempt to connect to the source server */ + if (runtime_options.no_upstream_connection == false) + { + check_source_server(); + } + } @@ -253,3 +259,152 @@ make_barman_ssh_command(char *buf) return buf; } + + +static void +check_source_server() +{ + int server_version_num = UNKNOWN_SERVER_VERSION_NUM; + char cluster_size[MAXLEN]; + + /* Attempt to connect to the upstream server to verify its configuration */ + log_info(_("connecting to upstream node\n")); + + source_conn = establish_db_connection_by_params((const char**)source_conninfo.keywords, + (const char**)source_conninfo.values, + false); + + /* + * Unless in barman mode, exit with an error; + * establish_db_connection_by_params() will have already logged an error message + */ + if (PQstatus(source_conn) != CONNECTION_OK) + { + if (mode != barman) + { + PQfinish(source_conn); + exit(ERR_DB_CON); + } + } + else + { + /* + * If a connection was established, perform some sanity checks on the + * provided upstream connection + */ + t_node_info upstream_node_record = T_NODE_INFO_INITIALIZER; + int query_result; + t_extension_status extension_status; + + /* Verify that upstream node is a supported server version */ + log_verbose(LOG_INFO, _("connected to upstream node, checking its state\n")); + + server_version_num = check_server_version(source_conn, "master", true, NULL); + + check_upstream_config(source_conn, server_version_num, true); + + if (get_cluster_size(source_conn, cluster_size) == false) + exit(ERR_DB_QUERY); + + log_info(_("Successfully connected to source node. Current installation size is %s"), + cluster_size); + + /* + * If --recovery-min-apply-delay was passed, check that + * we're connected to PostgreSQL 9.4 or later + */ + // XXX should this be a config file parameter? + if (*runtime_options.recovery_min_apply_delay) + { + if (server_version_num < 90400) + { + log_error(_("PostgreSQL 9.4 or greater required for --recovery-min-apply-delay\n")); + PQfinish(source_conn); + exit(ERR_BAD_CONFIG); + } + } + + /* + * If the upstream node is a standby, try to connect to the primary too so we + * can write an event record + */ + if (is_standby(source_conn)) + { + primary_conn = get_master_connection(source_conn, NULL, NULL); + + // XXX check this worked? + } + else + { + primary_conn = source_conn; + } + + /* + * Sanity-check that the master node has a repmgr schema - if not + * present, fail with an error unless -F/--force is used (to enable + * repmgr to be used as a standalone clone tool) + */ + + extension_status = get_repmgr_extension_status(primary_conn); + + if (extension_status != REPMGR_INSTALLED) + { + if (!runtime_options.force) + { + /* schema doesn't exist */ + log_error(_("repmgr extension not found on upstream server")); + log_hint(_("check that the upstream server is part of a repmgr cluster")); + PQfinish(source_conn); + exit(ERR_BAD_CONFIG); + } + + log_warning(_("repmgr extension not found on upstream server")); + } + + /* Fetch the source's data directory */ + if (get_pg_setting(source_conn, "data_directory", upstream_data_directory) == false) + { + log_error(_("unable to retrieve upstream node's data directory")); + log_hint(_("STANDBY CLONE must be run as a database superuser")); + PQfinish(source_conn); + exit(ERR_BAD_CONFIG); + } + + /* + * If no target directory was explicitly provided, we'll default to + * the same directory as on the source host. + */ + if (local_data_directory_provided == false) + { + strncpy(local_data_directory, upstream_data_directory, MAXPGPATH); + + log_notice(_("setting data directory to: \"%s\""), local_data_directory); + log_hint(_("use -D/--pgdata to explicitly specify a data directory")); + } + + /* + * Copy the source connection so that we have some default values, + * particularly stuff like passwords extracted from PGPASSFILE; + * these will be overridden from the upstream conninfo, if provided. + */ + conn_to_param_list(source_conn, &recovery_conninfo); + + /* + * Attempt to find the upstream node record + */ + if (config_file_options.upstream_node_id == NO_UPSTREAM_NODE) + upstream_node_id = get_master_node_id(source_conn); + else + upstream_node_id = config_file_options.upstream_node_id; + + query_result = get_node_record(source_conn, upstream_node_id, &upstream_node_record); + + if (query_result) + { + upstream_record_found = true; + strncpy(recovery_conninfo_str, upstream_node_record.conninfo, MAXLEN); + } + } +} + + diff --git a/repmgr-client-global.h b/repmgr-client-global.h index 3b5d743d..4d0f593d 100644 --- a/repmgr-client-global.h +++ b/repmgr-client-global.h @@ -15,6 +15,7 @@ typedef struct bool connection_param_provided; bool host_param_provided; bool limit_provided; + bool wal_keep_segments_used; /* general configuration options */ char config_file[MAXPGPATH]; @@ -38,19 +39,25 @@ typedef struct char data_dir[MAXPGPATH]; /* standby clone options */ + bool fast_checkpoint; bool rsync_only; + bool no_upstream_connection; + char recovery_min_apply_delay[MAXLEN]; + char replication_user[MAXLEN]; + char upstream_conninfo[MAXLEN]; + char wal_keep_segments[MAXLEN]; bool without_barman; /* event options */ + bool all; char event[MAXLEN]; int limit; - bool all; } t_runtime_options; #define T_RUNTIME_OPTIONS_INITIALIZER { \ /* configuration metadata */ \ - false, false, false, false, \ + false, false, false, false, false, \ /* general configuration options */ \ "", false, "", \ /* logging options */ \ @@ -60,11 +67,17 @@ typedef struct /* node options */ \ UNKNOWN_NODE_ID, "", "", \ /* standby clone options */ \ - false, false, \ + false, false, false, "", "", "", "", false, \ /* event options */ \ - "", 20, false} + false, "", 20 } +typedef enum { + barman, + rsync, + pg_basebackup +} standy_clone_mode; + /* global configuration structures */ extern t_runtime_options runtime_options; @@ -86,6 +99,9 @@ extern int check_server_version(PGconn *conn, char *server_type, bool exit_on_er extern bool create_repmgr_extension(PGconn *conn); extern int test_ssh_connection(char *host, char *remote_user); extern bool local_command(const char *command, PQExpBufferData *outputbuf); +extern bool check_upstream_config(PGconn *conn, int server_version_num, bool exit_on_error); +extern standy_clone_mode get_standby_clone_mode(void); +extern void print_error_list(ItemList *error_list, int log_level); #endif diff --git a/repmgr-client.c b/repmgr-client.c index 2ad9c09e..a30ad42b 100644 --- a/repmgr-client.c +++ b/repmgr-client.c @@ -65,7 +65,6 @@ main(int argc, char **argv) bool config_file_parsed = false; - set_progname(argv[0]); /* @@ -75,8 +74,7 @@ main(int argc, char **argv) */ logger_output_mode = OM_COMMAND_LINE; - - while ((c = getopt_long(argc, argv, "?Vf:Fb:S:L:vtD:", long_options, + while ((c = getopt_long(argc, argv, "?Vf:Fb:S:L:vtD:cr", long_options, &optindex)) != -1) { /* @@ -161,6 +159,59 @@ main(int argc, char **argv) strncpy(runtime_options.node_name, optarg, MAXLEN); break; + /* standby clone options * + * --------------------- */ + + case 'c': + runtime_options.fast_checkpoint = true; + break; + + case 'r': + runtime_options.rsync_only = true; + break; + + case OPT_NO_UPSTREAM_CONNECTION: + runtime_options.no_upstream_connection = true; + break; + + case OPT_RECOVERY_MIN_APPLY_DELAY: + { + char *ptr = NULL; + int targ = strtol(optarg, &ptr, 10); + + if (targ < 1) + { + item_list_append(&cli_errors, _("Invalid value provided for '--recovery-min-apply-delay'")); + break; + } + if (ptr && *ptr) + { + if (strcmp(ptr, "ms") != 0 && strcmp(ptr, "s") != 0 && + strcmp(ptr, "min") != 0 && strcmp(ptr, "h") != 0 && + strcmp(ptr, "d") != 0) + { + item_list_append(&cli_errors, + _("Value provided for '--recovery-min-apply-delay' must be one of ms/s/min/h/d")); + break; + } + } + + strncpy(runtime_options.recovery_min_apply_delay, optarg, MAXLEN); + break; + } + + case OPT_REPLICATION_USER: + strncpy(runtime_options.replication_user, optarg, MAXLEN); + break; + + case OPT_UPSTREAM_CONNINFO: + strncpy(runtime_options.upstream_conninfo, optarg, MAXLEN); + break; + + case OPT_WITHOUT_BARMAN: + runtime_options.without_barman = true; + break; + /* event options * * ------------- */ @@ -204,16 +255,17 @@ main(int argc, char **argv) runtime_options.log_to_file = true; logger_output_mode = OM_DAEMON; break; + /* --terse */ case 't': runtime_options.terse = true; break; + /* --verbose */ case 'v': runtime_options.verbose = true; break; - } } @@ -694,6 +746,7 @@ action_name(const int action) return "UNKNOWN ACTION"; } + static void exit_with_errors(void) { @@ -719,6 +772,27 @@ print_item_list(ItemList *item_list) } +void +print_error_list(ItemList *error_list, int log_level) +{ + ItemListCell *cell; + + for (cell = error_list->head; cell; cell = cell->next) + { + switch(log_level) + { + /* Currently we only need errors and warnings */ + case LOG_ERROR: + log_error("%s", cell->string); + break; + case LOG_WARNING: + log_warning("%s", cell->string); + break; + } + } +} + + static void do_help(void) { @@ -771,9 +845,6 @@ do_help(void) - - - /* * Create the repmgr extension, and grant access for the repmgr * user if not a superuser. @@ -790,51 +861,36 @@ create_repmgr_extension(PGconn *conn) PQExpBufferData query; PGresult *res; + t_extension_status extension_status; char *current_user; const char *superuser_status; bool is_superuser; PGconn *superuser_conn = NULL; PGconn *schema_create_conn = NULL; - initPQExpBuffer(&query); + extension_status = get_repmgr_extension_status(conn); - appendPQExpBuffer(&query, - " SELECT ae.name, e.extname " - " FROM pg_catalog.pg_available_extensions ae " - "LEFT JOIN pg_catalog.pg_extension e " - " ON e.extname=ae.name " - " WHERE ae.name='repmgr' "); - - res = PQexec(conn, query.data); - if (PQresultStatus(res) != PGRES_TUPLES_OK) + switch(extension_status) { - log_error(_("unable to execute extension query:\n %s"), - PQerrorMessage(conn)); - PQclear(res); + case REPMGR_UNKNOWN: + log_error(_("unable to determine status of repmgr extension")); + return false; + + case REPMGR_UNAVAILABLE: + log_error(_("\"repmgr\" extension is not available")); + return false; + + case REPMGR_INSTALLED: + /* TODO: check version */ + log_info(_("extension \"repmgr\" already installed")); + return true; + + case REPMGR_AVAILABLE: + log_notice(_("attempting to install extension \"repmgr\"")); + break; - return false; } - /* 1. Check extension is actually available */ - - if (PQntuples(res) == 0) - { - log_error(_("\"repmgr\" extension is not available")); - return false; - } - - /* 2. Check if extension installed */ - if (PQgetisnull(res, 0, 1) == 0) - { - /* TODO: check version */ - log_info(_("extension \"repmgr\" already installed")); - return true; - } - - PQclear(res); - termPQExpBuffer(&query); - - log_notice(_("attempting to install extension \"repmgr\"")); /* 3. Check if repmgr user is superuser, if not connect as superuser */ current_user = PQuser(conn); @@ -1091,3 +1147,389 @@ local_command(const char *command, PQExpBufferData *outputbuf) return true; } } + + +/* + * check_upstream_config() + * + * Perform sanity check on upstream server configuration before starting cloning + * process + * + * TODO: + * - check user is qualified to perform base backup + */ + +bool +check_upstream_config(PGconn *conn, int server_version_num, bool exit_on_error) +{ + int i; + bool config_ok = true; + char *wal_error_message = NULL; + t_basebackup_options backup_options = T_BASEBACKUP_OPTIONS_INITIALIZER; + bool backup_options_ok = true; + ItemList backup_option_errors = { NULL, NULL }; + bool xlog_stream = true; + standy_clone_mode mode; + + /* + * Detecting the intended cloning mode + */ + mode = get_standby_clone_mode(); + + /* + * Parse `pg_basebackup_options`, if set, to detect whether --xlog-method + * has been set to something other than `stream` (i.e. `fetch`), as + * this will influence some checks + */ + + backup_options_ok = parse_pg_basebackup_options( + config_file_options.pg_basebackup_options, + &backup_options, server_version_num, + &backup_option_errors); + + if (backup_options_ok == false) + { + if (exit_on_error == true) + { + log_error(_("error(s) encountered parsing 'pg_basebackup_options'")); + print_error_list(&backup_option_errors, LOG_ERR); + log_hint(_("'pg_basebackup_options' is: '%s'"), + config_file_options.pg_basebackup_options); + exit(ERR_BAD_CONFIG); + } + + config_ok = false; + } + + if (strlen(backup_options.xlog_method) && strcmp(backup_options.xlog_method, "stream") != 0) + xlog_stream = false; + + /* Check that WAL level is set correctly */ + if (server_version_num < 90400) + { + i = guc_set(conn, "wal_level", "=", "hot_standby"); + wal_error_message = _("parameter 'wal_level' must be set to 'hot_standby'"); + } + else + { + char *levels_pre96[] = { + "hot_standby", + "logical", + NULL, + }; + + /* + * Note that in 9.6+, "hot_standby" and "archive" are accepted as aliases + * for "replica", but current_setting() will of course always return "replica" + */ + char *levels_96plus[] = { + "replica", + "logical", + NULL, + }; + + char **levels; + int j = 0; + + if (server_version_num < 90600) + { + levels = (char **)levels_pre96; + wal_error_message = _("parameter 'wal_level' must be set to 'hot_standby' or 'logical'"); + } + else + { + levels = (char **)levels_96plus; + wal_error_message = _("parameter 'wal_level' must be set to 'replica' or 'logical'"); + } + + do + { + i = guc_set(conn, "wal_level", "=", levels[j]); + if (i) + { + break; + } + j++; + } while (levels[j] != NULL); + } + + if (i == 0 || i == -1) + { + if (i == 0) + log_error("%s", + wal_error_message); + + if (exit_on_error == true) + { + PQfinish(conn); + exit(ERR_BAD_CONFIG); + } + + config_ok = false; + } + + if (config_file_options.use_replication_slots) + { + /* Does the server support physical replication slots? */ + if (server_version_num < 90400) + { + log_error(_("server version must be 9.4 or later to enable replication slots")); + + if (exit_on_error == true) + { + PQfinish(conn); + exit(ERR_BAD_CONFIG); + } + + config_ok = false; + } + /* Server is 9.4 or greater - non-zero `max_replication_slots` required */ + else + { + i = guc_set_typed(conn, "max_replication_slots", ">", + "0", "integer"); + if (i == 0 || i == -1) + { + if (i == 0) + { + log_error(_("parameter 'max_replication_slots' must be set to at least 1 to enable replication slots")); + log_hint(_("'max_replication_slots' should be set to at least the number of expected standbys")); + if (exit_on_error == true) + { + PQfinish(conn); + exit(ERR_BAD_CONFIG); + } + + config_ok = false; + } + } + } + + } + /* + * physical replication slots not available or not requested - check if + * there are any circumstances where `wal_keep_segments` should be set + */ + else if (mode != barman) + { + bool check_wal_keep_segments = false; + char min_wal_keep_segments[MAXLEN] = "1"; + + /* + * -w/--wal-keep-segments was supplied - check against that value + */ + if (runtime_options.wal_keep_segments_used == true) + { + check_wal_keep_segments = true; + strncpy(min_wal_keep_segments, runtime_options.wal_keep_segments, MAXLEN); + } + + /* + * A non-zero `wal_keep_segments` value will almost certainly be required + * if rsync mode is being used, or pg_basebackup with --xlog-method=fetch, + * *and* no restore command has been specified + */ + else if ( (runtime_options.rsync_only == true || xlog_stream == false) + && strcmp(config_file_options.restore_command, "") == 0) + { + check_wal_keep_segments = true; + } + + if (check_wal_keep_segments == true) + { + i = guc_set_typed(conn, "wal_keep_segments", ">=", min_wal_keep_segments, "integer"); + + if (i == 0 || i == -1) + { + if (i == 0) + { + log_error(_("parameter 'wal_keep_segments' on the upstream server must be be set to %s or greater"), + min_wal_keep_segments); + log_hint(_("Choose a value sufficiently high enough to retain enough WAL " + "until the standby has been cloned and started.\n " + "Alternatively set up WAL archiving using e.g. PgBarman and configure " + "'restore_command' in repmgr.conf to fetch WALs from there." + )); + if (server_version_num >= 90400) + { + log_hint(_("In PostgreSQL 9.4 and later, replication slots can be used, which " + "do not require 'wal_keep_segments' to be set " + "(set parameter 'use_replication_slots' in repmgr.conf to enable)\n" + )); + } + } + + if (exit_on_error == true) + { + PQfinish(conn); + exit(ERR_BAD_CONFIG); + } + + config_ok = false; + } + } + } + + + /* + * If archive_mode is enabled, check that 'archive_command' is non empty + * (however it's not practical to check that it actually represents a valid + * command). + * + * From PostgreSQL 9.5, archive_mode can be one of 'off', 'on' or 'always' + * so for ease of backwards compatibility, rather than explicitly check for an + * enabled mode, check that it's not "off". + */ + + if (guc_set(conn, "archive_mode", "!=", "off")) + { + i = guc_set(conn, "archive_command", "!=", ""); + + if (i == 0 || i == -1) + { + if (i == 0) + log_error(_("parameter 'archive_command' must be set to a valid command")); + + if (exit_on_error == true) + { + PQfinish(conn); + exit(ERR_BAD_CONFIG); + } + + config_ok = false; + } + } + + + /* + * Check that 'hot_standby' is on. This isn't strictly necessary + * for the primary server, however the assumption is that we'll be + * cloning standbys and thus copying the primary configuration; + * this way the standby will be correctly configured by default. + */ + + i = guc_set(conn, "hot_standby", "=", "on"); + if (i == 0 || i == -1) + { + if (i == 0) + log_error(_("parameter 'hot_standby' must be set to 'on'")); + + if (exit_on_error == true) + { + PQfinish(conn); + exit(ERR_BAD_CONFIG); + } + + config_ok = false; + } + + i = guc_set_typed(conn, "max_wal_senders", ">", "0", "integer"); + if (i == 0 || i == -1) + { + if (i == 0) + { + log_error(_("parameter 'max_wal_senders' must be set to be at least 1")); + log_hint(_("'max_wal_senders' should be set to at least the number of expected standbys")); + } + + if (exit_on_error == true) + { + PQfinish(conn); + exit(ERR_BAD_CONFIG); + } + + config_ok = false; + } + + /* + * If using pg_basebackup, ensure sufficient replication connections can be made. + * There's no guarantee they'll still be available by the time pg_basebackup + * is executed, but there's nothing we can do about that. + */ + if (mode == pg_basebackup) + { + + PGconn **connections; + int i; + int min_replication_connections = 1, + possible_replication_connections = 0; + + t_conninfo_param_list repl_conninfo; + + /* Make a copy of the connection parameter arrays, and append "replication" */ + + initialize_conninfo_params(&repl_conninfo, false); + + conn_to_param_list(conn, &repl_conninfo); + + param_set(&repl_conninfo, "replication", "1"); + + if (*runtime_options.replication_user) + param_set(&repl_conninfo, "user", runtime_options.replication_user); + + /* + * work out how many replication connections are required (1 or 2) + */ + + if (xlog_stream == true) + min_replication_connections += 1; + + log_verbose(LOG_NOTICE, "checking for available walsenders on upstream node (%i required)", + min_replication_connections); + + connections = pg_malloc0(sizeof(PGconn *) * min_replication_connections); + + /* Attempt to create the minimum number of required concurrent connections */ + for (i = 0; i < min_replication_connections; i++) + { + PGconn *replication_conn; + + replication_conn = establish_db_connection_by_params((const char**)repl_conninfo.keywords, (const char**)repl_conninfo.values, false); + + if (PQstatus(replication_conn) == CONNECTION_OK) + { + connections[i] = replication_conn; + possible_replication_connections++; + } + } + + /* Close previously created connections */ + for (i = 0; i < possible_replication_connections; i++) + { + PQfinish(connections[i]); + } + + if (possible_replication_connections < min_replication_connections) + { + config_ok = false; + log_error(_("unable to establish necessary replication connections")); + log_hint(_("increase 'max_wal_senders' by at least %i"), min_replication_connections - possible_replication_connections); + + if (exit_on_error == true) + { + PQfinish(conn); + exit(ERR_BAD_CONFIG); + } + } + + log_verbose(LOG_INFO, "sufficient walsenders available on upstream node (%i required)", + min_replication_connections); + } + + return config_ok; +} + +standy_clone_mode +get_standby_clone_mode(void) +{ + standy_clone_mode mode; + + if (runtime_options.rsync_only) + mode = rsync; + else if (strcmp(config_file_options.barman_host, "") != 0 && ! runtime_options.without_barman) + mode = barman; + else + mode = pg_basebackup; + + return mode; +} diff --git a/repmgr-client.h b/repmgr-client.h index cee10e8a..f6405595 100644 --- a/repmgr-client.h +++ b/repmgr-client.h @@ -88,13 +88,18 @@ static struct option long_options[] = {"verbose", no_argument, NULL, 'v'}, /* standby clone options */ + {"fast-checkpoint", no_argument, NULL, 'c'}, {"rsync-only", no_argument, NULL, 'r'}, + {"no-upstream-connection", no_argument, NULL, OPT_NO_UPSTREAM_CONNECTION}, + {"recovery-min-apply-delay", required_argument, NULL, OPT_RECOVERY_MIN_APPLY_DELAY}, + {"replication-user", required_argument, NULL, OPT_REPLICATION_USER}, + {"upstream-conninfo", required_argument, NULL, OPT_UPSTREAM_CONNINFO}, {"without-barman", no_argument, NULL, OPT_WITHOUT_BARMAN}, /* event options */ + {"all", no_argument, NULL, OPT_ALL }, {"event", required_argument, NULL, OPT_EVENT }, {"limit", required_argument, NULL, OPT_LIMIT }, - {"all", no_argument, NULL, OPT_ALL }, /* not yet handled */ {"dbname", required_argument, NULL, 'd'}, @@ -108,17 +113,13 @@ static struct option long_options[] = {"mode", required_argument, NULL, 'm'}, {"remote-config-file", required_argument, NULL, 'C'}, {"check-upstream-config", no_argument, NULL, OPT_CHECK_UPSTREAM_CONFIG}, - {"recovery-min-apply-delay", required_argument, NULL, OPT_RECOVERY_MIN_APPLY_DELAY}, {"pg_rewind", optional_argument, NULL, OPT_PG_REWIND}, {"pwprompt", optional_argument, NULL, OPT_PWPROMPT}, {"csv", no_argument, NULL, OPT_CSV}, {"node", required_argument, NULL, OPT_NODE}, {"without-barman", no_argument, NULL, OPT_WITHOUT_BARMAN}, - {"no-upstream-connection", no_argument, NULL, OPT_NO_UPSTREAM_CONNECTION}, {"copy-external-config-files", optional_argument, NULL, OPT_COPY_EXTERNAL_CONFIG_FILES}, {"wait-sync", optional_argument, NULL, OPT_REGISTER_WAIT}, - {"upstream-conninfo", required_argument, NULL, OPT_UPSTREAM_CONNINFO}, - {"replication-user", required_argument, NULL, OPT_REPLICATION_USER}, {"no-conninfo-password", no_argument, NULL, OPT_NO_CONNINFO_PASSWORD}, /* Following options for internal use */ {"cluster", required_argument, NULL, OPT_CLUSTER},