diff --git a/dbutils.c b/dbutils.c index 91915063..2acc78f1 100644 --- a/dbutils.c +++ b/dbutils.c @@ -62,7 +62,7 @@ static void _populate_node_records(PGresult *res, NodeInfoList *node_list); static bool _create_update_node_record(PGconn *conn, char *action, t_node_info *node_info); -static bool _verify_replication_slot(PGconn *conn, char *slot_name, PQExpBufferData *error_msg); +static ReplSlotStatus _verify_replication_slot(PGconn *conn, char *slot_name, PQExpBufferData *error_msg); static bool _create_event(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details, t_event_info *event_info, bool send_notification); @@ -347,6 +347,44 @@ establish_db_connection_by_params(t_conninfo_param_list *param_list, } +PGconn * +get_primary_connection(PGconn *conn, + int *primary_id, char *primary_conninfo_out) +{ + return _get_primary_connection(conn, primary_id, primary_conninfo_out, false); +} + + +PGconn * +get_primary_connection_quiet(PGconn *conn, + int *primary_id, char *primary_conninfo_out) +{ + return _get_primary_connection(conn, primary_id, primary_conninfo_out, true); +} + +PGconn * +duplicate_connection(PGconn *conn, const char *user, bool replication) +{ + t_conninfo_param_list conninfo = T_CONNINFO_PARAM_LIST_INITIALIZER; + PGconn *duplicate_conn = NULL; + + initialize_conninfo_params(&conninfo, false); + conn_to_param_list(conn, &conninfo); + + if (user != NULL) + param_set(&conninfo, "user", user); + + if (replication == true) + param_set(&conninfo, "replication", "1"); + + duplicate_conn = establish_db_connection_by_params(&conninfo, false); + + free_conninfo_params(&conninfo); + + return duplicate_conn; +} + + bool is_superuser_connection(PGconn *conn, t_connection_user *userinfo) { @@ -1499,20 +1537,6 @@ _get_primary_connection(PGconn *conn, return NULL; } -PGconn * -get_primary_connection(PGconn *conn, - int *primary_id, char *primary_conninfo_out) -{ - return _get_primary_connection(conn, primary_id, primary_conninfo_out, false); -} - - -PGconn * -get_primary_connection_quiet(PGconn *conn, - int *primary_id, char *primary_conninfo_out) -{ - return _get_primary_connection(conn, primary_id, primary_conninfo_out, true); -} /* @@ -4151,7 +4175,7 @@ create_slot_name(char *slot_name, int node_id) } -static bool +static ReplSlotStatus _verify_replication_slot(PGconn *conn, char *slot_name, PQExpBufferData *error_msg) { RecordStatus record_status = RECORD_NOT_FOUND; @@ -4160,7 +4184,7 @@ _verify_replication_slot(PGconn *conn, char *slot_name, PQExpBufferData *error_m /* * Check whether slot exists already; if it exists and is active, that * means another active standby is using it, which creates an error - * situation; if not we can reuse it as-is + * situation; if not we can reuse it as-is. */ record_status = get_slot_record(conn, slot_name, &slot_info); @@ -4168,10 +4192,11 @@ _verify_replication_slot(PGconn *conn, char *slot_name, PQExpBufferData *error_m { if (strcmp(slot_info.slot_type, "physical") != 0) { - appendPQExpBuffer(error_msg, - _("slot \"%s\" exists and is not a physical slot\n"), - slot_name); - return false; + if (error_msg) + appendPQExpBuffer(error_msg, + _("slot \"%s\" exists and is not a physical slot\n"), + slot_name); + return SLOT_NOT_PHYSICAL; } if (slot_info.active == false) @@ -4179,16 +4204,17 @@ _verify_replication_slot(PGconn *conn, char *slot_name, PQExpBufferData *error_m log_debug("replication slot \"%s\" exists but is inactive; reusing", slot_name); - return true; + return SLOT_INACTIVE; } - appendPQExpBuffer(error_msg, - _("slot \"%s\" already exists as an active slot\n"), - slot_name); - return false; + if (error_msg) + appendPQExpBuffer(error_msg, + _("slot \"%s\" already exists as an active slot\n"), + slot_name); + return SLOT_ACTIVE; } - return true; + return SLOT_NOT_FOUND; } @@ -4199,9 +4225,16 @@ create_replication_slot_replprot(PGconn *conn, PGconn *repl_conn, char *slot_nam PGresult *res = NULL; bool success = true; - if (_verify_replication_slot(conn, slot_name, error_msg) == false) + ReplSlotStatus slot_status = _verify_replication_slot(conn, slot_name, error_msg); + + /* Replication slot is unusable */ + if (slot_status == SLOT_NOT_PHYSICAL || slot_status == SLOT_ACTIVE) return false; + /* Replication slot can be reused */ + if (slot_status == SLOT_INACTIVE) + return true; + initPQExpBuffer(&query); appendPQExpBuffer(&query, @@ -4219,10 +4252,11 @@ create_replication_slot_replprot(PGconn *conn, PGconn *repl_conn, char *slot_nam res = PQexec(repl_conn, query.data); + if ((PQresultStatus(res) != PGRES_TUPLES_OK || !PQntuples(res)) && error_msg != NULL) { appendPQExpBuffer(error_msg, - _("unable to create slot \"%s\" on the upstream node: %s\n"), + _("unable to create replication slot \"%s\" on the upstream node: %s\n"), slot_name, PQerrorMessage(conn)); success = false; @@ -4240,9 +4274,16 @@ create_replication_slot_sql(PGconn *conn, char *slot_name, PQExpBufferData *erro PGresult *res = NULL; bool success = true; - if (_verify_replication_slot(conn, slot_name, error_msg) == false) + ReplSlotStatus slot_status = _verify_replication_slot(conn, slot_name, error_msg); + + /* Replication slot is unusable */ + if (slot_status == SLOT_NOT_PHYSICAL || slot_status == SLOT_ACTIVE) return false; + /* Replication slot can be reused */ + if (slot_status == SLOT_INACTIVE) + return true; + initPQExpBuffer(&query); /* In 9.6 and later, reserve the LSN straight away */ @@ -4268,7 +4309,7 @@ create_replication_slot_sql(PGconn *conn, char *slot_name, PQExpBufferData *erro if (PQresultStatus(res) != PGRES_TUPLES_OK && error_msg != NULL) { appendPQExpBuffer(error_msg, - _("unable to create slot \"%s\" on the upstream node: %s\n"), + _("unable to create replication slot \"%s\" on the upstream node: %s\n"), slot_name, PQerrorMessage(conn)); success = false; @@ -4318,7 +4359,7 @@ drop_replication_slot_sql(PGconn *conn, char *slot_name) bool -drop_replication_slot_replprot(PGconn *conn, PGconn *repl_conn, char *slot_name) +drop_replication_slot_replprot(PGconn *repl_conn, char *slot_name) { PQExpBufferData query; PGresult *res = NULL; @@ -4336,7 +4377,7 @@ drop_replication_slot_replprot(PGconn *conn, PGconn *repl_conn, char *slot_name) if (PQresultStatus(res) != PGRES_TUPLES_OK) { - log_db_error(conn, query.data, + log_db_error(repl_conn, query.data, _("drop_replication_slot_sql(): unable to drop replication slot \"%s\""), slot_name); diff --git a/dbutils.h b/dbutils.h index b1c0bb02..00882e38 100644 --- a/dbutils.h +++ b/dbutils.h @@ -133,6 +133,8 @@ typedef enum typedef enum { SLOT_UNKNOWN = -1, + SLOT_NOT_FOUND, + SLOT_NOT_PHYSICAL, SLOT_INACTIVE, SLOT_ACTIVE } ReplSlotStatus; @@ -436,6 +438,7 @@ PGconn *establish_primary_db_connection(PGconn *conn, const bool exit_on_error); PGconn *get_primary_connection(PGconn *standby_conn, int *primary_id, char *primary_conninfo_out); PGconn *get_primary_connection_quiet(PGconn *standby_conn, int *primary_id, char *primary_conninfo_out); +PGconn *duplicate_connection(PGconn *conn, const char *user, bool replication); bool is_superuser_connection(PGconn *conn, t_connection_user *userinfo);\ bool is_replication_role(PGconn *conn, char *rolname); @@ -564,7 +567,7 @@ void create_slot_name(char *slot_name, int node_id); bool create_replication_slot_sql(PGconn *conn, char *slot_name, PQExpBufferData *error_msg); bool create_replication_slot_replprot(PGconn *conn, PGconn *repl_conn, char *slot_name, PQExpBufferData *error_msg); bool drop_replication_slot_sql(PGconn *conn, char *slot_name); -bool drop_replication_slot_replprot(PGconn *conn, PGconn *repl_conn, char *slot_name); +bool drop_replication_slot_replprot(PGconn *repl_conn, char *slot_name); RecordStatus get_slot_record(PGconn *conn, char *slot_name, t_replication_slot *record); int get_free_replication_slot_count(PGconn *conn); diff --git a/repmgr-action-standby.c b/repmgr-action-standby.c index 223fbe02..3ee914dc 100644 --- a/repmgr-action-standby.c +++ b/repmgr-action-standby.c @@ -107,7 +107,7 @@ static bool check_upstream_config(PGconn *conn, int server_version_num, t_node_i static void check_primary_standby_version_match(PGconn *conn, PGconn *primary_conn); static void check_recovery_type(PGconn *conn); -static void initialise_direct_clone(t_node_info *node_record); +static void initialise_direct_clone(t_node_info *local_node_record, t_node_info *upstream_node_record); static int run_basebackup(t_node_info *node_record); static int run_file_backup(t_node_info *node_record); @@ -162,6 +162,7 @@ do_standby_clone(void) /* dummy node record */ t_node_info local_node_record = T_NODE_INFO_INITIALIZER; + t_node_info upstream_node_record = T_NODE_INFO_INITIALIZER; bool local_data_directory_provided = false; @@ -308,10 +309,12 @@ do_standby_clone(void) /* * By default attempt to connect to the source node. This will fail if no * connection is possible, unless in Barman mode, in which case we can - * fall back to connecting to the source node via Barman. + * fall back to connecting to the source node via Barman (if available). */ if (runtime_options.no_upstream_connection == false) { + RecordStatus record_status = RECORD_NOT_FOUND; + /* * This connects to the source node and performs sanity checks, also * sets "recovery_conninfo_str", "upstream_repluser", "upstream_user" and @@ -321,6 +324,18 @@ do_standby_clone(void) * "barman" mode. */ check_source_server(); + + /* attempt to retrieve upstream node record */ + record_status = get_node_record(source_conn, + upstream_node_id, + &upstream_node_record); + + if (record_status != RECORD_FOUND) + { + log_error(_("unable to retrieve record for upstream node %i"), + upstream_node_id); + exit(ERR_BAD_CONFIG); + } } else { @@ -585,6 +600,35 @@ do_standby_clone(void) if (runtime_options.dry_run == true) { + /* + * If replication slots in use, sanity-check whether we can create them + * with the available user permissions. + */ + if (config_file_options.use_replication_slots == true && PQstatus(source_conn) == CONNECTION_OK) + { + PQExpBufferData msg; + bool success = true; + + initPQExpBuffer(&msg); + + /* + * "create_replication_slot()" knows about --dry-run mode and + * will perform checks but not actually create the slot. + */ + success = create_replication_slot(source_conn, + local_node_record.slot_name, + &upstream_node_record, + &msg); + if (success == false) + { + log_error(_("prerequisites not met for creating a replication slot on upstream node %i"), + upstream_node_record.node_id); + termPQExpBuffer(&msg); + exit(ERR_BAD_CONFIG); + } + termPQExpBuffer(&msg); + } + if (upstream_node_id != UNKNOWN_NODE_ID) { log_notice(_("standby will attach to upstream node %i"), upstream_node_id); @@ -599,15 +643,16 @@ do_standby_clone(void) log_hint(_("consider using the -c/--fast-checkpoint option")); } + PQfinish(source_conn); + log_info(_("all prerequisites for \"standby clone\" are met")); - PQfinish(source_conn); exit(SUCCESS); } if (mode != barman) { - initialise_direct_clone(&local_node_record); + initialise_direct_clone(&local_node_record, &upstream_node_record); } switch (mode) @@ -650,7 +695,15 @@ do_standby_clone(void) /* If a replication slot was previously created, drop it */ if (config_file_options.use_replication_slots == true) { - drop_replication_slot_sql(source_conn, local_node_record.slot_name); + /* + * In the case where a standby is being cloned from a node other than its + * intended upstream, We can't be sure of the source node's node_id. This + * is only required by "drop_replication_slot_if_exists()" to determine + * from the node's record whether it has a different replication user, and + * as in this case that would need to be supplied via "--replication-user" + * it's not a problem. + */ + drop_replication_slot_if_exists(source_conn, UNKNOWN_NODE_ID, local_node_record.slot_name); } log_error(_("unable to take a base backup of the primary server")); @@ -1313,8 +1366,20 @@ _do_create_replication_conf(void) /* add replication slot, if required */ if (slot_creation_required == true) { + PQExpBufferData msg; + initPQExpBuffer(&msg); + if (runtime_options.dry_run == true) { + /* + * In --dry-run mode this will check availability + * of a user who can create replication slots. + */ + // XXX check return value + create_replication_slot(upstream_conn, + local_node_record.slot_name, + NULL, + &msg); log_info(_("would create replication slot \"%s\" on upstream node \"%s\" (ID: %i)"), local_node_record.slot_name, upstream_node_record.node_name, @@ -1322,11 +1387,10 @@ _do_create_replication_conf(void) } else { - PQExpBufferData msg; - initPQExpBuffer(&msg); - if (create_replication_slot_sql(upstream_conn, + if (create_replication_slot(upstream_conn, local_node_record.slot_name, + NULL, &msg) == false) { log_error("%s", msg.data); @@ -1335,13 +1399,14 @@ _do_create_replication_conf(void) exit(ERR_BAD_CONFIG); } - termPQExpBuffer(&msg); log_notice(_("replication slot \"%s\" created on upstream node \"%s\" (ID: %i)"), local_node_record.slot_name, upstream_node_record.node_name, upstream_node_id); } + termPQExpBuffer(&msg); + } @@ -3070,8 +3135,9 @@ do_standby_follow_internal(PGconn *primary_conn, PGconn *follow_target_conn, t_n } - if (create_replication_slot_sql(follow_target_conn, + if (create_replication_slot(follow_target_conn, local_node_record.slot_name, + NULL, output) == false) { log_error("%s", output->data); @@ -5163,6 +5229,14 @@ check_source_server() exit(ERR_BAD_CONFIG); } } + + /* + * To create replication slots, we'll need a user with the REPLICATION + * privilege, or a superuser. + */ + if (config_file_options.use_replication_slots == true) + { + } } @@ -5718,7 +5792,7 @@ check_upstream_config(PGconn *conn, int server_version_num, t_node_info *upstrea * - standby_clone */ static void -initialise_direct_clone(t_node_info *node_record) +initialise_direct_clone(t_node_info *local_node_record, t_node_info *upstream_node_record) { /* * Check the destination data directory can be used (in Barman mode, this @@ -5820,15 +5894,11 @@ initialise_direct_clone(t_node_info *node_record) if (config_file_options.use_replication_slots == true) { - PGconn *superuser_conn = NULL; - PGconn *privileged_conn = NULL; PQExpBufferData event_details; initPQExpBuffer(&event_details); - get_superuser_connection(&source_conn, &superuser_conn, &privileged_conn); - - if (create_replication_slot_sql(privileged_conn, node_record->slot_name, &event_details) == false) + if (create_replication_slot(source_conn, local_node_record->slot_name, upstream_node_record, &event_details) == false) { log_error("%s", event_details.data); @@ -5841,9 +5911,6 @@ initialise_direct_clone(t_node_info *node_record) PQfinish(source_conn); - if (superuser_conn != NULL) - PQfinish(superuser_conn); - exit(ERR_DB_QUERY); } @@ -5851,10 +5918,7 @@ initialise_direct_clone(t_node_info *node_record) log_verbose(LOG_INFO, _("replication slot \"%s\" created on source node"), - node_record->slot_name); - - if (superuser_conn != NULL) - PQfinish(superuser_conn); + local_node_record->slot_name); } return; @@ -6050,10 +6114,6 @@ run_basebackup(t_node_info *node_record) */ if (config_file_options.use_replication_slots && upstream_node_id != UNKNOWN_NODE_ID) { - - PGconn *superuser_conn = NULL; - PGconn *privileged_conn = NULL; - t_node_info upstream_node_record = T_NODE_INFO_INITIALIZER; t_replication_slot slot_info = T_REPLICATION_SLOT_INITIALIZER; RecordStatus record_status = RECORD_NOT_FOUND; @@ -6062,7 +6122,7 @@ run_basebackup(t_node_info *node_record) record_status = get_node_record(source_conn, upstream_node_id, &upstream_node_record); /* - * if there's no upstream record, there's no point in trying to create + * If there's no upstream record, there's no point in trying to create * a replication slot on the designated upstream, as the assumption is * it won't exist at this point. */ @@ -6111,10 +6171,8 @@ run_basebackup(t_node_info *node_record) node_record->slot_name, upstream_node_id); - get_superuser_connection(&upstream_conn, &superuser_conn, &privileged_conn); - initPQExpBuffer(&event_details); - if (create_replication_slot_sql(privileged_conn, node_record->slot_name, &event_details) == false) + if (create_replication_slot(upstream_conn, node_record->slot_name, &upstream_node_record, &event_details) == false) { log_error("%s", event_details.data); @@ -6127,15 +6185,9 @@ run_basebackup(t_node_info *node_record) PQfinish(source_conn); - if (superuser_conn != NULL) - PQfinish(superuser_conn); - exit(ERR_DB_QUERY); } - if (superuser_conn != NULL) - PQfinish(superuser_conn); - termPQExpBuffer(&event_details); } @@ -6143,21 +6195,22 @@ run_basebackup(t_node_info *node_record) } } - /* delete slot on source server */ - - get_superuser_connection(&source_conn, &superuser_conn, &privileged_conn); - if (slot_info.active == false) { if (slot_exists_on_upstream == false) { - if (drop_replication_slot_sql(privileged_conn, node_record->slot_name) == true) + + /* delete slot on source server */ + + if (drop_replication_slot_if_exists(source_conn, UNKNOWN_NODE_ID, node_record->slot_name) == true) { - log_notice(_("replication slot \"%s\" deleted on source node"), node_record->slot_name); + log_notice(_("replication slot \"%s\" deleted on source node"), + node_record->slot_name); } else { - log_error(_("unable to delete replication slot \"%s\" on source node"), node_record->slot_name); + log_error(_("unable to delete replication slot \"%s\" on source node"), + node_record->slot_name); } } } @@ -6168,21 +6221,17 @@ run_basebackup(t_node_info *node_record) */ else { - log_warning(_("replication slot \"%s\" is still active on source node"), node_record->slot_name); + log_warning(_("replication slot \"%s\" is still active on source node"), + node_record->slot_name); } - - if (superuser_conn != NULL) - PQfinish(superuser_conn); - } - return SUCCESS; } static int -run_file_backup(t_node_info *node_record) +run_file_backup(t_node_info *local_node_record) { int r = SUCCESS, i; @@ -6681,31 +6730,37 @@ stop_backup: } else { - record_status = get_slot_record(upstream_conn, node_record->slot_name, &slot_info); + record_status = get_slot_record(upstream_conn, local_node_record->slot_name, &slot_info); if (record_status == RECORD_FOUND) { log_verbose(LOG_INFO, _("replication slot \"%s\" aleady exists on upstream node %i"), - node_record->slot_name, + local_node_record->slot_name, upstream_node_id); } else { PQExpBufferData errmsg; + bool success; initPQExpBuffer(&errmsg); - - if (create_replication_slot_sql(upstream_conn, node_record->slot_name, &errmsg) == false) + success = create_replication_slot(upstream_conn, + local_node_record->slot_name, + &upstream_node_record, + &errmsg); + if (success == false) { - log_error(_("unable to create replication slot on upstream node %i"), upstream_node_id); + log_error(_("unable to create replication slot \"%s\" on upstream node %i"), + local_node_record->slot_name, + upstream_node_id); log_detail("%s", errmsg.data); slot_warning = true; } else { log_notice(_("replication slot \"%s\" created on upstream node \"%s\" (ID: %i)"), - node_record->slot_name, + local_node_record->slot_name, upstream_node_record.node_name, upstream_node_id ); } @@ -6722,7 +6777,7 @@ stop_backup: { log_warning(_("\"use_replication_slots\" specified but a replication slot could not be created")); log_hint(_("ensure a replication slot called \"%s\" is created on the upstream node (ID: %i)"), - node_record->slot_name, + local_node_record->slot_name, upstream_node_id); } } diff --git a/repmgr-action-standby.h b/repmgr-action-standby.h index 243916da..eaba0871 100644 --- a/repmgr-action-standby.h +++ b/repmgr-action-standby.h @@ -19,14 +19,6 @@ #ifndef _REPMGR_ACTION_STANDBY_H_ #define _REPMGR_ACTION_STANDBY_H_ -typedef enum -{ - REPMGR_USER = 0, - REPLICATION_USER, - REPLICATION_PROTOCOL_USER, - SUPERUSER -} t_user_type; - extern void do_standby_clone(void); extern void do_standby_register(void); extern void do_standby_unregister(void); diff --git a/repmgr-client-global.h b/repmgr-client-global.h index 1a5884b5..f5b5431b 100644 --- a/repmgr-client-global.h +++ b/repmgr-client-global.h @@ -201,6 +201,16 @@ typedef enum } t_server_action; +typedef enum +{ + USER_TYPE_UNKNOWN = -1, + REPMGR_USER, + REPLICATION_USER_OPT, + REPLICATION_USER_NODE, + SUPERUSER +} t_user_type; + + typedef struct ColHeader { char title[MAXLEN]; @@ -255,7 +265,9 @@ extern void get_node_config_directory(char *config_dir_buf); extern void get_node_data_directory(char *data_dir_buf); extern void init_node_record(t_node_info *node_record); extern bool can_use_pg_rewind(PGconn *conn, const char *data_directory, PQExpBufferData *reason); -extern void drop_replication_slot_if_exists(PGconn *conn, int node_id, char *slot_name); + +extern bool create_replication_slot(PGconn *conn, char *slot_name, t_node_info *upstream_node_record, PQExpBufferData *error_msg); +extern bool drop_replication_slot_if_exists(PGconn *conn, int node_id, char *slot_name); extern bool check_node_can_attach(TimeLineID local_tli, XLogRecPtr local_xlogpos, PGconn *follow_target_conn, t_node_info *follow_target_node_record, bool is_rejoin); extern void check_shared_library(PGconn *conn); diff --git a/repmgr-client.c b/repmgr-client.c index 49c1025e..fad11c6d 100644 --- a/repmgr-client.c +++ b/repmgr-client.c @@ -95,11 +95,17 @@ char path_buf[MAXLEN] = ""; */ t_node_info target_node_info = T_NODE_INFO_INITIALIZER; +/* used by create_replication_slot() */ +static t_user_type ReplicationSlotUser = USER_TYPE_UNKNOWN; /* Collate command line errors and warnings here for friendlier reporting */ static ItemList cli_errors = {NULL, NULL}; static ItemList cli_warnings = {NULL, NULL}; +static void _determine_replication_slot_user(PGconn *conn, + t_node_info *upstream_node_record, + char **replication_user); + int main(int argc, char **argv) { @@ -3602,48 +3608,267 @@ can_use_pg_rewind(PGconn *conn, const char *data_directory, PQExpBufferData *rea } -void +// provided connection should be for the normal repmgr user +// upstream_node_record may be NULL or initialised to default values +bool +create_replication_slot(PGconn *conn, char *slot_name, t_node_info *upstream_node_record, PQExpBufferData *error_msg) +{ + PGconn *slot_conn = NULL; + bool use_replication_protocol = false; + bool success = true; + char *replication_user = NULL; + + _determine_replication_slot_user(conn, upstream_node_record, &replication_user); + /* + * If called in --dry-run context, if the replication slot user is not the + * repmgr user, attempt to validate the connection. + */ + if (runtime_options.dry_run == true) + { + switch (ReplicationSlotUser) + { + case USER_TYPE_UNKNOWN: + log_error("unable to determine user for replication slot creation"); + return false; + case REPMGR_USER: + log_info(_("replication slots will be created by user \"%s\""), + PQuser(conn)); + return true; + + case REPLICATION_USER_NODE: + case REPLICATION_USER_OPT: + { + PGconn *repl_conn = duplicate_connection(conn, + replication_user, + true); + if (repl_conn == NULL || PQstatus(repl_conn) != CONNECTION_OK) + { + log_error(_("unable to create replication connection as user \"%s\""), + replication_user); + log_detail("%s", PQerrorMessage(repl_conn)); + + PQfinish(repl_conn); + return false; + } + log_info(_("replication slots will be created by replication user \"%s\""), + replication_user); + PQfinish(repl_conn); + return true; + } + case SUPERUSER: + { + PGconn *superuser_conn = duplicate_connection(conn, + runtime_options.superuser, + false); + if (superuser_conn == NULL || PQstatus(superuser_conn )!= CONNECTION_OK) + { + log_error(_("unable to create superuser connection as user \"%s\""), + runtime_options.superuser); + log_detail("%s", PQerrorMessage(superuser_conn)); + + PQfinish(superuser_conn); + + return false; + } + + log_info(_("replication slots will be created by superuser \"%s\""), + runtime_options.superuser); + PQfinish(superuser_conn); + } + } + + } + + /* + * If we can't create a replication slot with the connection provided to + * the function, create an connection with appropriate permissions. + */ + switch (ReplicationSlotUser) + { + case USER_TYPE_UNKNOWN: + log_error("unable to determine user for replication slot creation"); + return false; + case REPMGR_USER: + slot_conn = conn; + log_info(_("creating replication slot as user \"%s\""), + PQuser(conn)); + break; + + case REPLICATION_USER_NODE: + case REPLICATION_USER_OPT: + { + slot_conn = duplicate_connection(conn, + replication_user, + true); + if (slot_conn == NULL || PQstatus(slot_conn) != CONNECTION_OK) + { + log_error(_("unable to create replication connection as user \"%s\""), + runtime_options.replication_user); + log_detail("%s", PQerrorMessage(slot_conn)); + + PQfinish(slot_conn); + return false; + } + use_replication_protocol = true; + log_info(_("creating replication slot as replication user \"%s\""), + replication_user); + } + break; + + case SUPERUSER: + { + slot_conn = duplicate_connection(conn, + runtime_options.superuser, + false); + if (slot_conn == NULL || PQstatus(slot_conn )!= CONNECTION_OK) + { + log_error(_("unable to create super connection as user \"%s\""), + runtime_options.superuser); + log_detail("%s", PQerrorMessage(slot_conn)); + + PQfinish(slot_conn); + + return false; + } + log_info(_("creating replication slot as superuser \"%s\""), + runtime_options.superuser); + } + break; + } + + if (use_replication_protocol == true) + { + success = create_replication_slot_replprot(conn, slot_conn, slot_name, error_msg); + } + else + { + success = create_replication_slot_sql(slot_conn, slot_name, error_msg); + } + + + if (slot_conn != conn) + PQfinish(slot_conn); + + return success; +} + + +bool drop_replication_slot_if_exists(PGconn *conn, int node_id, char *slot_name) { + t_node_info node_record = T_NODE_INFO_INITIALIZER; t_replication_slot slot_info = T_REPLICATION_SLOT_INITIALIZER; - RecordStatus record_status = get_slot_record(conn, slot_name, &slot_info); + RecordStatus record_status; + + char *replication_user = NULL; + bool success = true; + + if (node_id != UNKNOWN_NODE_ID) + { + record_status = get_node_record(conn, node_id, &node_record); + } + + _determine_replication_slot_user(conn, &node_record, &replication_user); + + record_status = get_slot_record(conn, slot_name, &slot_info); log_verbose(LOG_DEBUG, "attempting to delete slot \"%s\" on node %i", slot_name, node_id); if (record_status != RECORD_FOUND) { - /* this is a good thing */ + /* this is not a bad good thing */ log_verbose(LOG_INFO, _("slot \"%s\" does not exist on node %i, nothing to remove"), slot_name, node_id); + return true; } - else - { - if (slot_info.active == false) - { - if (drop_replication_slot_sql(conn, slot_name) == true) - { - log_notice(_("replication slot \"%s\" deleted on node %i"), slot_name, node_id); - } - else - { - log_error(_("unable to delete replication slot \"%s\" on node %i"), slot_name, node_id); - } - } - /* - * if active replication slot exists, call Houston as we have a - * problem - */ + if (slot_info.active == false) + { + if (drop_replication_slot_sql(conn, slot_name) == true) + { + log_notice(_("replication slot \"%s\" deleted on node %i"), slot_name, node_id); + } else { - log_warning(_("replication slot \"%s\" is still active on node %i"), slot_name, node_id); + log_error(_("unable to delete replication slot \"%s\" on node %i"), slot_name, node_id); + success = false; + } + } + + /* + * If an active replication slot exists, call Houston as we have a + * problem. + */ + else + { + log_warning(_("replication slot \"%s\" is still active on node %i"), slot_name, node_id); + success = false; + } + + return success; +} + + +static void +_determine_replication_slot_user(PGconn *conn, t_node_info *upstream_node_record, char **replication_user) +{ + /* + * If not previously done, work out which user will be responsible + * for creating replication slots. + */ + if (ReplicationSlotUser == USER_TYPE_UNKNOWN) + { + /* + * Is the repmgr user a superuser? + */ + if (is_superuser_connection(conn, NULL)) + { + ReplicationSlotUser = REPMGR_USER; + } + /* + * Does the repmgr user have the REPLICATION role? + * Note we don't care here whether the repmgr user can actually + * make a replication connection, we're just confirming that the + * connection we have has the appropriate permissions. + */ + else if (is_replication_role(conn, NULL)) + { + ReplicationSlotUser = REPMGR_USER; + } + /* + * Is a superuser provided with --superuser? + * We'll check later whether we can make a connection as that user. + */ + else if (runtime_options.superuser[0] != '\0') + { + ReplicationSlotUser = SUPERUSER; + } + /* + * Is a replication user provided with --replication-user? + * We'll check later whether we can make a replication connection as that user. + * Overrides any replication user defined in the upstream node record. + */ + else if (runtime_options.replication_user[0] != '\0') + { + ReplicationSlotUser = REPLICATION_USER_OPT; + *replication_user = runtime_options.replication_user; + } + /* + * Is the upstream's node record provided, and does it have a different + * replication user? + * We'll check later whether we can make a replication connection as that user. + */ + else if (upstream_node_record != NULL && upstream_node_record->node_id != UNKNOWN_NODE_ID + && strncmp(upstream_node_record->repluser, PQuser(conn), NAMEDATALEN) != 0) + { + ReplicationSlotUser = REPLICATION_USER_NODE; + *replication_user = upstream_node_record->repluser; } } } - /* * Here we'll perform some timeline sanity checks to ensure the follow target * can actually be followed.