diff --git a/repmgr.c b/repmgr.c index c0c0d71c..e976bdc7 100644 --- a/repmgr.c +++ b/repmgr.c @@ -96,8 +96,8 @@ static int copy_remote_files(char *host, char *remote_user, char *remote_path, static int run_basebackup(const char *data_dir, int server_version); static void check_parameters_for_action(const int action); static bool create_schema(PGconn *conn); -static bool create_recovery_file(const char *data_dir, PGconn *primary_conn, const char *barmans_conninfo); -static void write_primary_conninfo(char *line, PGconn *primary_conn); +static bool create_recovery_file(const char *data_dir, t_conninfo_param_list *upstream_conninfo, const char *barmans_conninfo); +static void write_primary_conninfo(char *line, t_conninfo_param_list *param_list); static bool write_recovery_file_line(FILE *recovery_file, char *recovery_file_path, char *line); static void check_master_standby_version_match(PGconn *conn, PGconn *master_conn); static int check_server_version(PGconn *conn, char *server_type, bool exit_on_error, char *server_version_string); @@ -145,8 +145,8 @@ static bool copy_file(const char *old_filename, const char *new_filename); static bool read_backup_label(const char *local_data_directory, struct BackupLabel *out_backup_label); -static void initialize_conninfo_params(t_conninfo_param_list *param_list); -static void param_set_new(t_conninfo_param_list *param_list, const char *param, const char *value); +static void initialize_conninfo_params(t_conninfo_param_list *param_list, bool set_defaults); +static void param_set(t_conninfo_param_list *param_list, const char *param, const char *value); static void parse_pg_basebackup_options(const char *pg_basebackup_options, t_basebackup_options *backup_options); @@ -156,8 +156,7 @@ static PQconninfoOption *opts = NULL; /* conninfo params for the node we're cloning from */ t_conninfo_param_list source_conninfo; -/* conninfo params for the actual upstream node (which might be different) */ -t_conninfo_param_list upstream_conninfo; + static bool config_file_required = true; @@ -253,10 +252,10 @@ main(int argc, char **argv) exit(1); } - initialize_conninfo_params(&source_conninfo); + initialize_conninfo_params(&source_conninfo, true); /* - * Pre-set any defaults, which can be overwritten if matching + * Pre-set any defaults , which can be overwritten if matching * command line parameters are provided */ @@ -351,13 +350,13 @@ main(int argc, char **argv) break; case 'h': strncpy(runtime_options.host, optarg, MAXLEN); - param_set_new(&source_conninfo, "host", optarg); + param_set(&source_conninfo, "host", optarg); connection_param_provided = true; host_param_provided = true; break; case 'p': repmgr_atoi(optarg, "-p/--port", &cli_errors, false); - param_set_new(&source_conninfo, "port", optarg); + param_set(&source_conninfo, "port", optarg); strncpy(runtime_options.masterport, optarg, MAXLEN); @@ -365,7 +364,7 @@ main(int argc, char **argv) break; case 'U': strncpy(runtime_options.username, optarg, MAXLEN); - param_set_new(&source_conninfo, "user", optarg); + param_set(&source_conninfo, "user", optarg); connection_param_provided = true; break; case 'S': @@ -564,7 +563,7 @@ main(int argc, char **argv) { if (opt->val != NULL && opt->val[0] != '\0') { - param_set_new(&source_conninfo, opt->keyword, opt->val); + param_set(&source_conninfo, opt->keyword, opt->val); } if (strcmp(opt->keyword, "host") == 0 && @@ -594,7 +593,7 @@ main(int argc, char **argv) } else { - param_set_new(&source_conninfo, "dbname", runtime_options.dbname); + param_set(&source_conninfo, "dbname", runtime_options.dbname); } } @@ -715,7 +714,7 @@ main(int argc, char **argv) else { strncpy(runtime_options.host, argv[optind++], MAXLEN); - param_set_new(&source_conninfo, "host", runtime_options.host); + param_set(&source_conninfo, "host", runtime_options.host); } } } @@ -1700,9 +1699,19 @@ static void do_standby_clone(void) { PGconn *primary_conn = NULL; - PGconn *upstream_conn = NULL; + PGconn *source_conn = NULL; PGresult *res; + char upstream_conninfo_str[MAXLEN]; + bool upstream_record_found = false; + int upstream_node_id; + + /* + * conninfo params for the actual upstream node (which might be different + * to the node we're cloning from) + */ + t_conninfo_param_list upstream_conninfo; + enum { barman, rsync, @@ -1748,6 +1757,7 @@ do_standby_clone(void) PQExpBufferData event_details; + /* * Detecting the appropriate mode */ @@ -1770,20 +1780,20 @@ do_standby_clone(void) runtime_options.dest_dir); } - - param_set_new(&source_conninfo, "application_name", options.node_name); + // XXX set this to "repmgr ? + param_set(&source_conninfo, "application_name", options.node_name); /* Attempt to connect to the upstream server to verify its configuration */ log_info(_("connecting to upstream node\n")); - upstream_conn = establish_db_connection_by_params((const char**)source_conninfo.keywords, (const char**)source_conninfo.values, false); + source_conn = establish_db_connection_by_params((const char**)source_conninfo.keywords, (const char**)source_conninfo.values, false); /* * Unless in barman mode, exit with an error; * establish_db_connection_by_params() will have already logged an error message */ - if (mode != barman && PQstatus(upstream_conn) != CONNECTION_OK) + if (mode != barman && PQstatus(source_conn) != CONNECTION_OK) { - PQfinish(upstream_conn); + PQfinish(source_conn); exit(ERR_DB_CON); } @@ -1791,15 +1801,15 @@ do_standby_clone(void) * If a connection was established, perform some sanity checks on the * provided upstream connection */ - if (PQstatus(upstream_conn) == CONNECTION_OK) + if (PQstatus(source_conn) == CONNECTION_OK) { /* Verify that upstream node is a supported server version */ log_verbose(LOG_INFO, _("connected to upstream node, checking its state\n")); - server_version_num = check_server_version(upstream_conn, "master", true, NULL); + server_version_num = check_server_version(source_conn, "master", true, NULL); - check_upstream_config(upstream_conn, server_version_num, true); + check_upstream_config(source_conn, server_version_num, true); - if (get_cluster_size(upstream_conn, cluster_size) == false) + if (get_cluster_size(source_conn, cluster_size) == false) exit(ERR_DB_QUERY); log_info(_("Successfully connected to upstream node. Current installation size is %s\n"), @@ -1814,7 +1824,7 @@ do_standby_clone(void) if (server_version_num < 90400) { log_err(_("PostgreSQL 9.4 or greater required for --recovery-min-apply-delay\n")); - PQfinish(upstream_conn); + PQfinish(source_conn); exit(ERR_BAD_CONFIG); } } @@ -1823,38 +1833,164 @@ do_standby_clone(void) * If the upstream node is a standby, try to connect to the primary too so we * can write an event record */ - if (is_standby(upstream_conn)) + if (is_standby(source_conn)) { if (strlen(options.cluster_name)) { - primary_conn = get_master_connection(upstream_conn, options.cluster_name, + primary_conn = get_master_connection(source_conn, options.cluster_name, NULL, NULL); } } else { - primary_conn = upstream_conn; + primary_conn = source_conn; } } + // XXX + --no-upstream-connection // now set up conninfo params for the actual upstream, as we may be cloning from a different node // if upstream conn // get upstream node rec directly - // else - // attempt to get upstream node rec via Barman - // if no upstream node rec - // if not --force - // exit with error - // else - // copy upstream_conn params to recovery primary_conninfo params - // else - // parse returned conninfo to recovery primary_conninfo params - // - // finally, set application name + if (options.upstream_node == NO_UPSTREAM_NODE) + { + upstream_node_id = get_master_node_id(source_conn, options.cluster_name); + } + else + { + upstream_node_id = options.upstream_node; + } + + + + // XXX merge with previous if() + if (PQstatus(source_conn) == CONNECTION_OK) + { + t_node_info upstream_node_record = T_NODE_INFO_INITIALIZER; + int query_result; + + query_result = get_node_record(source_conn, options.cluster_name, upstream_node_id, &upstream_node_record); + + if (query_result) + { + upstream_record_found = true; + strncpy(upstream_conninfo_str, upstream_node_record.conninfo_str, MAXLEN); + } + } + else + { + // attempt to get upstream node rec via Barman: + // - get barman connstr + // - parse to list (no set defaults) + // - set dbname to the one provided by the user + /*char buf[MAXLEN]; + char where_condition[MAXLEN]; + PQExpBufferData command_output; + + switch(options.upstream_node) + { + case NO_UPSTREAM_NODE: + maxlen_snprintf(where_condition, "type='master'"); + break; + default: + maxlen_snprintf(where_condition, "id=%d", options.upstream_node); + break; + } + + initPQExpBuffer(&command_output); + maxlen_snprintf(buf, + "ssh %s \"psql -Aqt \\\"%s\\\" -c \\\"" + " SELECT conninfo" + " FROM repmgr_%s.repl_nodes" + " WHERE %s" + " AND active" + "\\\"\"", options.barman_server, conninfo_on_barman, + options.cluster_name, where_condition); + (void)local_command(buf, &command_output); + maxlen_snprintf(buf, "%s", command_output.data); + string_remove_trailing_newlines(buf); + maxlen_snprintf(line, "primary_conninfo = '%s'\n", buf);*/ + } + + printf("upstream found? %c\n", upstream_record_found == true ? 'y' : 'n'); + initialize_conninfo_params(&upstream_conninfo, true); + + /* + * Copy the source connection so that we have some default values, + * particularly stuff like passwords extracted from PGPASSFILE; + * these will be overridden from the upstream conninfo, if provided + */ + // XXX merge with previous if() + if (PQstatus(source_conn) == CONNECTION_OK) + { + PQconninfoOption *connOptions; + PQconninfoOption *option; + + connOptions = PQconninfo(source_conn); + for (option = connOptions; option && option->keyword; option++) + { + /* Ignore non-set or blank parameter values*/ + if((option->val == NULL) || + (option->val != NULL && option->val[0] == '\0')) + continue; + + param_set(&upstream_conninfo, option->keyword, option->val); + } + } + + /* + * If no upstream node record found, we'll abort with an error here, unless + * --force is used, in which case we'll use the conninfo parameters + * from source_conn, and if that's not available, using those provided + * on the command line (and assume the user knows what they're doing). + */ + if (upstream_record_found == false) + { + // if not --force + // exit with error + if (!runtime_options.force) + { + log_err(_("No record found for upstream node with id %i\n"), upstream_node_id); + PQfinish(source_conn); + exit(ERR_BAD_CONFIG); + } + // copy upstream_conn params to recovery primary_conninfo params + if (PQstatus(source_conn) != CONNECTION_OK) + { + // copy command line options ??? + } + + } + else + { + /* parse returned upstream conninfo string to recovery primary_conninfo params*/ + + PQconninfoOption *connOptions; + PQconninfoOption *option; + char *errmsg = NULL; + + connOptions = PQconninfoParse(upstream_conninfo_str, &errmsg); + + printf("upstr conn: %s\n", upstream_conninfo_str); + if (connOptions == NULL) + { + log_err(_("Unable to parse conninfo string \"%s\" for upstream node %i\n"), + upstream_conninfo_str, upstream_node_id); + PQfinish(source_conn); + exit(ERR_BAD_CONFIG); + } + + for (option = connOptions; option && option->keyword; option++) + { + /* Ignore non-set or blank parameter values*/ + if((option->val == NULL) || + (option->val != NULL && option->val[0] == '\0')) + continue; + + param_set(&upstream_conninfo, option->keyword, option->val); + } + } - // !! make write_primary_conninfo() use recovery primary_conninfo params, - // !! don't pass the connection if (mode != barman) { @@ -1877,7 +2013,7 @@ do_standby_clone(void) if (server_version_num < 90400 && !runtime_options.rsync_only) { log_err(_("in PostgreSQL 9.3, tablespace mapping can only be used in conjunction with --rsync-only\n")); - PQfinish(upstream_conn); + PQfinish(source_conn); exit(ERR_BAD_CONFIG); } @@ -1888,12 +2024,12 @@ do_standby_clone(void) " FROM pg_tablespace " " WHERE pg_tablespace_location(oid) = '%s'", cell->old_dir); - res = PQexec(upstream_conn, sqlquery); + res = PQexec(source_conn, sqlquery); if (PQresultStatus(res) != PGRES_TUPLES_OK) { - log_err(_("unable to execute tablespace query: %s\n"), PQerrorMessage(upstream_conn)); + log_err(_("unable to execute tablespace query: %s\n"), PQerrorMessage(source_conn)); PQclear(res); - PQfinish(upstream_conn); + PQfinish(source_conn); exit(ERR_BAD_CONFIG); } @@ -1901,7 +2037,7 @@ do_standby_clone(void) { log_err(_("no tablespace matching path '%s' found\n"), cell->old_dir); PQclear(res); - PQfinish(upstream_conn); + PQfinish(source_conn); exit(ERR_BAD_CONFIG); } } @@ -1930,13 +2066,13 @@ do_standby_clone(void) " ORDER BY 1 "); log_debug(_("standby clone: %s\n"), sqlquery); - res = PQexec(upstream_conn, sqlquery); + res = PQexec(source_conn, sqlquery); if (PQresultStatus(res) != PGRES_TUPLES_OK) { log_err(_("can't get info about data directory and configuration files: %s\n"), - PQerrorMessage(upstream_conn)); + PQerrorMessage(source_conn)); PQclear(res); - PQfinish(upstream_conn); + PQfinish(source_conn); exit(ERR_BAD_CONFIG); } @@ -1945,7 +2081,7 @@ do_standby_clone(void) { log_err("STANDBY CLONE should be run by a SUPERUSER\n"); PQclear(res); - PQfinish(upstream_conn); + PQfinish(source_conn); exit(ERR_BAD_CONFIG); } @@ -2059,9 +2195,9 @@ do_standby_clone(void) */ if (mode != barman && options.use_replication_slots) { - if (create_replication_slot(upstream_conn, repmgr_slot_name, server_version_num) == false) + if (create_replication_slot(source_conn, repmgr_slot_name, server_version_num) == false) { - PQfinish(upstream_conn); + PQfinish(source_conn); exit(ERR_DB_QUERY); } } @@ -2384,14 +2520,14 @@ do_standby_clone(void) * From 9.1 default is to wait for a sync standby to ack, avoid that by * turning off sync rep for this session */ - if (set_config_bool(upstream_conn, "synchronous_commit", false) == false) + if (set_config_bool(source_conn, "synchronous_commit", false) == false) { r = ERR_BAD_CONFIG; retval = ERR_BAD_CONFIG; goto stop_backup; } - if (start_backup(upstream_conn, first_wal_segment, runtime_options.fast_checkpoint) == false) + if (start_backup(source_conn, first_wal_segment, runtime_options.fast_checkpoint) == false) { r = ERR_BAD_BASEBACKUP; retval = ERR_BAD_BASEBACKUP; @@ -2440,7 +2576,7 @@ do_standby_clone(void) } /* Copy tablespaces and, if required, remap to a new location */ - retval = get_tablespace_data(upstream_conn, &tablespace_list); + retval = get_tablespace_data(source_conn, &tablespace_list); if(retval != SUCCESS) goto stop_backup; } @@ -2735,7 +2871,7 @@ stop_backup: if (mode == rsync && pg_start_backup_executed) { log_notice(_("notifying master about backup completion...\n")); - if (stop_backup(upstream_conn, last_wal_segment) == false) + if (stop_backup(source_conn, last_wal_segment) == false) { r = ERR_BAD_BASEBACKUP; retval = ERR_BAD_BASEBACKUP; @@ -2748,14 +2884,14 @@ stop_backup: /* If a replication slot was previously created, drop it */ if (options.use_replication_slots) { - drop_replication_slot(upstream_conn, repmgr_slot_name); + drop_replication_slot(source_conn, repmgr_slot_name); } log_err(_("unable to take a base backup of the master server\n")); log_warning(_("destination directory (%s) may need to be cleaned up manually\n"), local_data_directory); - PQfinish(upstream_conn); + PQfinish(source_conn); exit(retval); } @@ -2826,7 +2962,7 @@ stop_backup: } /* Finally, write the recovery.conf file */ - if (mode == barman && upstream_conn == NULL) + if (mode == barman && source_conn == NULL) { char conninfo_on_barman[MAXLEN]; @@ -2836,14 +2972,14 @@ stop_backup: */ get_barman_property(conninfo_on_barman, "conninfo", local_repmgr_directory); - create_recovery_file(local_data_directory, upstream_conn, conninfo_on_barman); + create_recovery_file(local_data_directory, &upstream_conninfo, conninfo_on_barman); /* In Barman mode, remove local_repmgr_directory */ rmdir(local_repmgr_directory); } else { - create_recovery_file(local_data_directory, upstream_conn, NULL); + create_recovery_file(local_data_directory, &upstream_conninfo, NULL); } switch(mode) @@ -2907,8 +3043,20 @@ stop_backup: runtime_options.masterport); appendPQExpBuffer(&event_details, - _("; backup method: %s"), - runtime_options.rsync_only ? "rsync" : "pg_basebackup"); + _("; backup method: ")); + + switch(mode) + { + case rsync: + appendPQExpBuffer(&event_details, "rsync"); + break; + case pg_basebackup: + appendPQExpBuffer(&event_details, "pg_basebackup"); + break; + case barman: + appendPQExpBuffer(&event_details, "barman"); + break; + } appendPQExpBuffer(&event_details, _("; --force: %s"), @@ -2922,7 +3070,7 @@ stop_backup: event_details.data); } - PQfinish(upstream_conn); + PQfinish(source_conn); exit(retval); } @@ -3396,8 +3544,8 @@ do_standby_follow(void) log_info(_("changing standby's master\n")); /* write the recovery.conf file */ - if (!create_recovery_file(data_dir, master_conn, NULL)) - exit(ERR_BAD_CONFIG); +// if (!create_recovery_file(data_dir, master_conn, NULL)) +// exit(ERR_BAD_CONFIG); /* Finally, restart the service */ if (*options.restart_command) @@ -4483,8 +4631,8 @@ do_witness_create(void) get_conninfo_value(options.conninfo, "user", repmgr_user); get_conninfo_value(options.conninfo, "dbname", repmgr_db); - param_set_new(&source_conninfo, "user", repmgr_user); - param_set_new(&source_conninfo, "dbname", repmgr_db); + param_set(&source_conninfo, "user", repmgr_user); + param_set(&source_conninfo, "dbname", repmgr_db); /* We need to connect to check configuration and copy it */ masterconn = establish_db_connection_by_params((const char**)source_conninfo.keywords, (const char**)source_conninfo.values, false); @@ -4823,8 +4971,8 @@ do_witness_register(PGconn *masterconn) get_conninfo_value(options.conninfo, "user", repmgr_user); get_conninfo_value(options.conninfo, "dbname", repmgr_db); - param_set_new(&source_conninfo, "user", repmgr_user); - param_set_new(&source_conninfo, "dbname", repmgr_db); + param_set(&source_conninfo, "user", repmgr_user); + param_set(&source_conninfo, "dbname", repmgr_db); /* masterconn will only be set when called from do_witness_create() */ if (PQstatus(masterconn) != CONNECTION_OK) @@ -5162,7 +5310,7 @@ do_help(void) * Creates a recovery file for a standby. */ static bool -create_recovery_file(const char *data_dir, PGconn *primary_conn, const char *conninfo_on_barman) +create_recovery_file(const char *data_dir, t_conninfo_param_list *upstream_conninfo, const char *conninfo_on_barman) { FILE *recovery_file; char recovery_file_path[MAXLEN]; @@ -5187,42 +5335,8 @@ create_recovery_file(const char *data_dir, PGconn *primary_conn, const char *con log_debug(_("recovery.conf: %s"), line); - if (primary_conn == NULL) - { - char buf[MAXLEN]; - char where_condition[MAXLEN]; - PQExpBufferData command_output; - - switch(options.upstream_node) - { - case NO_UPSTREAM_NODE: - maxlen_snprintf(where_condition, "type='master'"); - break; - default: - maxlen_snprintf(where_condition, "id=%d", options.upstream_node); - break; - } - - initPQExpBuffer(&command_output); - maxlen_snprintf(buf, - "ssh %s \"psql -Aqt \\\"%s\\\" -c \\\"" - " SELECT conninfo" - " FROM repmgr_%s.repl_nodes" - " WHERE %s" - " AND active" - "\\\"\"", - options.barman_server, conninfo_on_barman, - options.cluster_name, where_condition); - (void)local_command(buf, &command_output); - maxlen_snprintf(buf, "%s", command_output.data); - string_remove_trailing_newlines(buf); - maxlen_snprintf(line, "primary_conninfo = '%s'\n", buf); - } - else - { - /* primary_conninfo = '...' */ - write_primary_conninfo(line, primary_conn); - } + /* primary_conninfo = '...' */ + write_primary_conninfo(line, upstream_conninfo); if (write_recovery_file_line(recovery_file, recovery_file_path, line) == false) return false; @@ -6069,37 +6183,34 @@ create_schema(PGconn *conn) static void -write_primary_conninfo(char *line, PGconn *primary_conn) +write_primary_conninfo(char *line, t_conninfo_param_list *param_list) { - PQconninfoOption *connOptions; - PQconninfoOption *option; PQExpBufferData conninfo_buf; bool application_name_provided = false; - - connOptions = PQconninfo(primary_conn); + int c; initPQExpBuffer(&conninfo_buf); - for (option = connOptions; option && option->keyword; option++) + for (c = 0; c < param_list->size && param_list->keywords[c] != NULL; c++) { /* * Skip empty settings and ones which don't make any sense in * recovery.conf */ - if (strcmp(option->keyword, "dbname") == 0 || - strcmp(option->keyword, "replication") == 0 || - (option->val == NULL) || - (option->val != NULL && option->val[0] == '\0')) + if (strcmp(param_list->keywords[c], "dbname") == 0 || + strcmp(param_list->keywords[c], "replication") == 0 || + (param_list->values[c] == NULL) || + (param_list->values[c] != NULL && param_list->values[c][0] == '\0')) continue; if (conninfo_buf.len != 0) appendPQExpBufferChar(&conninfo_buf, ' '); - if (strcmp(option->keyword, "application_name") == 0) + if (strcmp(param_list->keywords[c], "application_name") == 0) application_name_provided = true; - /* XXX escape option->val */ - appendPQExpBuffer(&conninfo_buf, "%s=%s", option->keyword, option->val); + /* XXX escape option->values */ + appendPQExpBuffer(&conninfo_buf, "%s=%s", param_list->keywords[c], param_list->values[c]); } /* `application_name` not provided - default to repmgr node name */ @@ -6785,7 +6896,7 @@ copy_file(const char *old_filename, const char *new_filename) static void -initialize_conninfo_params(t_conninfo_param_list *param_list) +initialize_conninfo_params(t_conninfo_param_list *param_list, bool set_defaults) { PQconninfoOption *defs = NULL; PQconninfoOption *def; @@ -6808,21 +6919,23 @@ initialize_conninfo_params(t_conninfo_param_list *param_list) param_list->values[c] = NULL; } - /* Pre-set any defaults */ - - for (def = defs; def->keyword; def++) + if (set_defaults == true) { - if (def->val != NULL && def->val[0] != '\0') + /* Pre-set any defaults */ + + for (def = defs; def->keyword; def++) { - param_set_new(param_list, def->keyword, def->val); + if (def->val != NULL && def->val[0] != '\0') + { + param_set(param_list, def->keyword, def->val); + } } } - } static void -param_set_new(t_conninfo_param_list *param_list, const char *param, const char *value) +param_set(t_conninfo_param_list *param_list, const char *param, const char *value) { int c; int value_len = strlen(value) + 1;