mirror of
https://github.com/EnterpriseDB/repmgr.git
synced 2026-03-27 00:46:29 +00:00
Minimize user permissions requirements for replication slots
Enable operations which create or drop replication slots to be carried out with the minimum necessary user permissions, i.e. a user with the REPLICATION attribute. This can be the repmgr user, or a dedicated replication user. In the latter case, if the dedicated replication user is only permitted to make replication connections, the streaming replication protocol is used to create/drop slots. Implements part of GitHub #536.
This commit is contained in:
107
dbutils.c
107
dbutils.c
@@ -62,7 +62,7 @@ static void _populate_node_records(PGresult *res, NodeInfoList *node_list);
|
|||||||
|
|
||||||
static bool _create_update_node_record(PGconn *conn, char *action, t_node_info *node_info);
|
static bool _create_update_node_record(PGconn *conn, char *action, t_node_info *node_info);
|
||||||
|
|
||||||
static bool _verify_replication_slot(PGconn *conn, char *slot_name, PQExpBufferData *error_msg);
|
static ReplSlotStatus _verify_replication_slot(PGconn *conn, char *slot_name, PQExpBufferData *error_msg);
|
||||||
|
|
||||||
static bool _create_event(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details, t_event_info *event_info, bool send_notification);
|
static bool _create_event(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details, t_event_info *event_info, bool send_notification);
|
||||||
|
|
||||||
@@ -347,6 +347,44 @@ establish_db_connection_by_params(t_conninfo_param_list *param_list,
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
PGconn *
|
||||||
|
get_primary_connection(PGconn *conn,
|
||||||
|
int *primary_id, char *primary_conninfo_out)
|
||||||
|
{
|
||||||
|
return _get_primary_connection(conn, primary_id, primary_conninfo_out, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
PGconn *
|
||||||
|
get_primary_connection_quiet(PGconn *conn,
|
||||||
|
int *primary_id, char *primary_conninfo_out)
|
||||||
|
{
|
||||||
|
return _get_primary_connection(conn, primary_id, primary_conninfo_out, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
PGconn *
|
||||||
|
duplicate_connection(PGconn *conn, const char *user, bool replication)
|
||||||
|
{
|
||||||
|
t_conninfo_param_list conninfo = T_CONNINFO_PARAM_LIST_INITIALIZER;
|
||||||
|
PGconn *duplicate_conn = NULL;
|
||||||
|
|
||||||
|
initialize_conninfo_params(&conninfo, false);
|
||||||
|
conn_to_param_list(conn, &conninfo);
|
||||||
|
|
||||||
|
if (user != NULL)
|
||||||
|
param_set(&conninfo, "user", user);
|
||||||
|
|
||||||
|
if (replication == true)
|
||||||
|
param_set(&conninfo, "replication", "1");
|
||||||
|
|
||||||
|
duplicate_conn = establish_db_connection_by_params(&conninfo, false);
|
||||||
|
|
||||||
|
free_conninfo_params(&conninfo);
|
||||||
|
|
||||||
|
return duplicate_conn;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
bool
|
bool
|
||||||
is_superuser_connection(PGconn *conn, t_connection_user *userinfo)
|
is_superuser_connection(PGconn *conn, t_connection_user *userinfo)
|
||||||
{
|
{
|
||||||
@@ -1499,20 +1537,6 @@ _get_primary_connection(PGconn *conn,
|
|||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
PGconn *
|
|
||||||
get_primary_connection(PGconn *conn,
|
|
||||||
int *primary_id, char *primary_conninfo_out)
|
|
||||||
{
|
|
||||||
return _get_primary_connection(conn, primary_id, primary_conninfo_out, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
PGconn *
|
|
||||||
get_primary_connection_quiet(PGconn *conn,
|
|
||||||
int *primary_id, char *primary_conninfo_out)
|
|
||||||
{
|
|
||||||
return _get_primary_connection(conn, primary_id, primary_conninfo_out, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@@ -4151,7 +4175,7 @@ create_slot_name(char *slot_name, int node_id)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static bool
|
static ReplSlotStatus
|
||||||
_verify_replication_slot(PGconn *conn, char *slot_name, PQExpBufferData *error_msg)
|
_verify_replication_slot(PGconn *conn, char *slot_name, PQExpBufferData *error_msg)
|
||||||
{
|
{
|
||||||
RecordStatus record_status = RECORD_NOT_FOUND;
|
RecordStatus record_status = RECORD_NOT_FOUND;
|
||||||
@@ -4160,7 +4184,7 @@ _verify_replication_slot(PGconn *conn, char *slot_name, PQExpBufferData *error_m
|
|||||||
/*
|
/*
|
||||||
* Check whether slot exists already; if it exists and is active, that
|
* Check whether slot exists already; if it exists and is active, that
|
||||||
* means another active standby is using it, which creates an error
|
* means another active standby is using it, which creates an error
|
||||||
* situation; if not we can reuse it as-is
|
* situation; if not we can reuse it as-is.
|
||||||
*/
|
*/
|
||||||
record_status = get_slot_record(conn, slot_name, &slot_info);
|
record_status = get_slot_record(conn, slot_name, &slot_info);
|
||||||
|
|
||||||
@@ -4168,10 +4192,11 @@ _verify_replication_slot(PGconn *conn, char *slot_name, PQExpBufferData *error_m
|
|||||||
{
|
{
|
||||||
if (strcmp(slot_info.slot_type, "physical") != 0)
|
if (strcmp(slot_info.slot_type, "physical") != 0)
|
||||||
{
|
{
|
||||||
appendPQExpBuffer(error_msg,
|
if (error_msg)
|
||||||
_("slot \"%s\" exists and is not a physical slot\n"),
|
appendPQExpBuffer(error_msg,
|
||||||
slot_name);
|
_("slot \"%s\" exists and is not a physical slot\n"),
|
||||||
return false;
|
slot_name);
|
||||||
|
return SLOT_NOT_PHYSICAL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (slot_info.active == false)
|
if (slot_info.active == false)
|
||||||
@@ -4179,16 +4204,17 @@ _verify_replication_slot(PGconn *conn, char *slot_name, PQExpBufferData *error_m
|
|||||||
log_debug("replication slot \"%s\" exists but is inactive; reusing",
|
log_debug("replication slot \"%s\" exists but is inactive; reusing",
|
||||||
slot_name);
|
slot_name);
|
||||||
|
|
||||||
return true;
|
return SLOT_INACTIVE;
|
||||||
}
|
}
|
||||||
|
|
||||||
appendPQExpBuffer(error_msg,
|
if (error_msg)
|
||||||
_("slot \"%s\" already exists as an active slot\n"),
|
appendPQExpBuffer(error_msg,
|
||||||
slot_name);
|
_("slot \"%s\" already exists as an active slot\n"),
|
||||||
return false;
|
slot_name);
|
||||||
|
return SLOT_ACTIVE;
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return SLOT_NOT_FOUND;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -4199,9 +4225,16 @@ create_replication_slot_replprot(PGconn *conn, PGconn *repl_conn, char *slot_nam
|
|||||||
PGresult *res = NULL;
|
PGresult *res = NULL;
|
||||||
bool success = true;
|
bool success = true;
|
||||||
|
|
||||||
if (_verify_replication_slot(conn, slot_name, error_msg) == false)
|
ReplSlotStatus slot_status = _verify_replication_slot(conn, slot_name, error_msg);
|
||||||
|
|
||||||
|
/* Replication slot is unusable */
|
||||||
|
if (slot_status == SLOT_NOT_PHYSICAL || slot_status == SLOT_ACTIVE)
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
|
/* Replication slot can be reused */
|
||||||
|
if (slot_status == SLOT_INACTIVE)
|
||||||
|
return true;
|
||||||
|
|
||||||
initPQExpBuffer(&query);
|
initPQExpBuffer(&query);
|
||||||
|
|
||||||
appendPQExpBuffer(&query,
|
appendPQExpBuffer(&query,
|
||||||
@@ -4219,10 +4252,11 @@ create_replication_slot_replprot(PGconn *conn, PGconn *repl_conn, char *slot_nam
|
|||||||
|
|
||||||
res = PQexec(repl_conn, query.data);
|
res = PQexec(repl_conn, query.data);
|
||||||
|
|
||||||
|
|
||||||
if ((PQresultStatus(res) != PGRES_TUPLES_OK || !PQntuples(res)) && error_msg != NULL)
|
if ((PQresultStatus(res) != PGRES_TUPLES_OK || !PQntuples(res)) && error_msg != NULL)
|
||||||
{
|
{
|
||||||
appendPQExpBuffer(error_msg,
|
appendPQExpBuffer(error_msg,
|
||||||
_("unable to create slot \"%s\" on the upstream node: %s\n"),
|
_("unable to create replication slot \"%s\" on the upstream node: %s\n"),
|
||||||
slot_name,
|
slot_name,
|
||||||
PQerrorMessage(conn));
|
PQerrorMessage(conn));
|
||||||
success = false;
|
success = false;
|
||||||
@@ -4240,9 +4274,16 @@ create_replication_slot_sql(PGconn *conn, char *slot_name, PQExpBufferData *erro
|
|||||||
PGresult *res = NULL;
|
PGresult *res = NULL;
|
||||||
bool success = true;
|
bool success = true;
|
||||||
|
|
||||||
if (_verify_replication_slot(conn, slot_name, error_msg) == false)
|
ReplSlotStatus slot_status = _verify_replication_slot(conn, slot_name, error_msg);
|
||||||
|
|
||||||
|
/* Replication slot is unusable */
|
||||||
|
if (slot_status == SLOT_NOT_PHYSICAL || slot_status == SLOT_ACTIVE)
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
|
/* Replication slot can be reused */
|
||||||
|
if (slot_status == SLOT_INACTIVE)
|
||||||
|
return true;
|
||||||
|
|
||||||
initPQExpBuffer(&query);
|
initPQExpBuffer(&query);
|
||||||
|
|
||||||
/* In 9.6 and later, reserve the LSN straight away */
|
/* In 9.6 and later, reserve the LSN straight away */
|
||||||
@@ -4268,7 +4309,7 @@ create_replication_slot_sql(PGconn *conn, char *slot_name, PQExpBufferData *erro
|
|||||||
if (PQresultStatus(res) != PGRES_TUPLES_OK && error_msg != NULL)
|
if (PQresultStatus(res) != PGRES_TUPLES_OK && error_msg != NULL)
|
||||||
{
|
{
|
||||||
appendPQExpBuffer(error_msg,
|
appendPQExpBuffer(error_msg,
|
||||||
_("unable to create slot \"%s\" on the upstream node: %s\n"),
|
_("unable to create replication slot \"%s\" on the upstream node: %s\n"),
|
||||||
slot_name,
|
slot_name,
|
||||||
PQerrorMessage(conn));
|
PQerrorMessage(conn));
|
||||||
success = false;
|
success = false;
|
||||||
@@ -4318,7 +4359,7 @@ drop_replication_slot_sql(PGconn *conn, char *slot_name)
|
|||||||
|
|
||||||
|
|
||||||
bool
|
bool
|
||||||
drop_replication_slot_replprot(PGconn *conn, PGconn *repl_conn, char *slot_name)
|
drop_replication_slot_replprot(PGconn *repl_conn, char *slot_name)
|
||||||
{
|
{
|
||||||
PQExpBufferData query;
|
PQExpBufferData query;
|
||||||
PGresult *res = NULL;
|
PGresult *res = NULL;
|
||||||
@@ -4336,7 +4377,7 @@ drop_replication_slot_replprot(PGconn *conn, PGconn *repl_conn, char *slot_name)
|
|||||||
|
|
||||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||||
{
|
{
|
||||||
log_db_error(conn, query.data,
|
log_db_error(repl_conn, query.data,
|
||||||
_("drop_replication_slot_sql(): unable to drop replication slot \"%s\""),
|
_("drop_replication_slot_sql(): unable to drop replication slot \"%s\""),
|
||||||
slot_name);
|
slot_name);
|
||||||
|
|
||||||
|
|||||||
@@ -133,6 +133,8 @@ typedef enum
|
|||||||
typedef enum
|
typedef enum
|
||||||
{
|
{
|
||||||
SLOT_UNKNOWN = -1,
|
SLOT_UNKNOWN = -1,
|
||||||
|
SLOT_NOT_FOUND,
|
||||||
|
SLOT_NOT_PHYSICAL,
|
||||||
SLOT_INACTIVE,
|
SLOT_INACTIVE,
|
||||||
SLOT_ACTIVE
|
SLOT_ACTIVE
|
||||||
} ReplSlotStatus;
|
} ReplSlotStatus;
|
||||||
@@ -436,6 +438,7 @@ PGconn *establish_primary_db_connection(PGconn *conn,
|
|||||||
const bool exit_on_error);
|
const bool exit_on_error);
|
||||||
PGconn *get_primary_connection(PGconn *standby_conn, int *primary_id, char *primary_conninfo_out);
|
PGconn *get_primary_connection(PGconn *standby_conn, int *primary_id, char *primary_conninfo_out);
|
||||||
PGconn *get_primary_connection_quiet(PGconn *standby_conn, int *primary_id, char *primary_conninfo_out);
|
PGconn *get_primary_connection_quiet(PGconn *standby_conn, int *primary_id, char *primary_conninfo_out);
|
||||||
|
PGconn *duplicate_connection(PGconn *conn, const char *user, bool replication);
|
||||||
|
|
||||||
bool is_superuser_connection(PGconn *conn, t_connection_user *userinfo);\
|
bool is_superuser_connection(PGconn *conn, t_connection_user *userinfo);\
|
||||||
bool is_replication_role(PGconn *conn, char *rolname);
|
bool is_replication_role(PGconn *conn, char *rolname);
|
||||||
@@ -564,7 +567,7 @@ void create_slot_name(char *slot_name, int node_id);
|
|||||||
bool create_replication_slot_sql(PGconn *conn, char *slot_name, PQExpBufferData *error_msg);
|
bool create_replication_slot_sql(PGconn *conn, char *slot_name, PQExpBufferData *error_msg);
|
||||||
bool create_replication_slot_replprot(PGconn *conn, PGconn *repl_conn, char *slot_name, PQExpBufferData *error_msg);
|
bool create_replication_slot_replprot(PGconn *conn, PGconn *repl_conn, char *slot_name, PQExpBufferData *error_msg);
|
||||||
bool drop_replication_slot_sql(PGconn *conn, char *slot_name);
|
bool drop_replication_slot_sql(PGconn *conn, char *slot_name);
|
||||||
bool drop_replication_slot_replprot(PGconn *conn, PGconn *repl_conn, char *slot_name);
|
bool drop_replication_slot_replprot(PGconn *repl_conn, char *slot_name);
|
||||||
|
|
||||||
RecordStatus get_slot_record(PGconn *conn, char *slot_name, t_replication_slot *record);
|
RecordStatus get_slot_record(PGconn *conn, char *slot_name, t_replication_slot *record);
|
||||||
int get_free_replication_slot_count(PGconn *conn);
|
int get_free_replication_slot_count(PGconn *conn);
|
||||||
|
|||||||
@@ -107,7 +107,7 @@ static bool check_upstream_config(PGconn *conn, int server_version_num, t_node_i
|
|||||||
static void check_primary_standby_version_match(PGconn *conn, PGconn *primary_conn);
|
static void check_primary_standby_version_match(PGconn *conn, PGconn *primary_conn);
|
||||||
static void check_recovery_type(PGconn *conn);
|
static void check_recovery_type(PGconn *conn);
|
||||||
|
|
||||||
static void initialise_direct_clone(t_node_info *node_record);
|
static void initialise_direct_clone(t_node_info *local_node_record, t_node_info *upstream_node_record);
|
||||||
static int run_basebackup(t_node_info *node_record);
|
static int run_basebackup(t_node_info *node_record);
|
||||||
static int run_file_backup(t_node_info *node_record);
|
static int run_file_backup(t_node_info *node_record);
|
||||||
|
|
||||||
@@ -162,6 +162,7 @@ do_standby_clone(void)
|
|||||||
|
|
||||||
/* dummy node record */
|
/* dummy node record */
|
||||||
t_node_info local_node_record = T_NODE_INFO_INITIALIZER;
|
t_node_info local_node_record = T_NODE_INFO_INITIALIZER;
|
||||||
|
t_node_info upstream_node_record = T_NODE_INFO_INITIALIZER;
|
||||||
|
|
||||||
bool local_data_directory_provided = false;
|
bool local_data_directory_provided = false;
|
||||||
|
|
||||||
@@ -308,10 +309,12 @@ do_standby_clone(void)
|
|||||||
/*
|
/*
|
||||||
* By default attempt to connect to the source node. This will fail if no
|
* By default attempt to connect to the source node. This will fail if no
|
||||||
* connection is possible, unless in Barman mode, in which case we can
|
* connection is possible, unless in Barman mode, in which case we can
|
||||||
* fall back to connecting to the source node via Barman.
|
* fall back to connecting to the source node via Barman (if available).
|
||||||
*/
|
*/
|
||||||
if (runtime_options.no_upstream_connection == false)
|
if (runtime_options.no_upstream_connection == false)
|
||||||
{
|
{
|
||||||
|
RecordStatus record_status = RECORD_NOT_FOUND;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* This connects to the source node and performs sanity checks, also
|
* This connects to the source node and performs sanity checks, also
|
||||||
* sets "recovery_conninfo_str", "upstream_repluser", "upstream_user" and
|
* sets "recovery_conninfo_str", "upstream_repluser", "upstream_user" and
|
||||||
@@ -321,6 +324,18 @@ do_standby_clone(void)
|
|||||||
* "barman" mode.
|
* "barman" mode.
|
||||||
*/
|
*/
|
||||||
check_source_server();
|
check_source_server();
|
||||||
|
|
||||||
|
/* attempt to retrieve upstream node record */
|
||||||
|
record_status = get_node_record(source_conn,
|
||||||
|
upstream_node_id,
|
||||||
|
&upstream_node_record);
|
||||||
|
|
||||||
|
if (record_status != RECORD_FOUND)
|
||||||
|
{
|
||||||
|
log_error(_("unable to retrieve record for upstream node %i"),
|
||||||
|
upstream_node_id);
|
||||||
|
exit(ERR_BAD_CONFIG);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@@ -585,6 +600,35 @@ do_standby_clone(void)
|
|||||||
|
|
||||||
if (runtime_options.dry_run == true)
|
if (runtime_options.dry_run == true)
|
||||||
{
|
{
|
||||||
|
/*
|
||||||
|
* If replication slots in use, sanity-check whether we can create them
|
||||||
|
* with the available user permissions.
|
||||||
|
*/
|
||||||
|
if (config_file_options.use_replication_slots == true && PQstatus(source_conn) == CONNECTION_OK)
|
||||||
|
{
|
||||||
|
PQExpBufferData msg;
|
||||||
|
bool success = true;
|
||||||
|
|
||||||
|
initPQExpBuffer(&msg);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* "create_replication_slot()" knows about --dry-run mode and
|
||||||
|
* will perform checks but not actually create the slot.
|
||||||
|
*/
|
||||||
|
success = create_replication_slot(source_conn,
|
||||||
|
local_node_record.slot_name,
|
||||||
|
&upstream_node_record,
|
||||||
|
&msg);
|
||||||
|
if (success == false)
|
||||||
|
{
|
||||||
|
log_error(_("prerequisites not met for creating a replication slot on upstream node %i"),
|
||||||
|
upstream_node_record.node_id);
|
||||||
|
termPQExpBuffer(&msg);
|
||||||
|
exit(ERR_BAD_CONFIG);
|
||||||
|
}
|
||||||
|
termPQExpBuffer(&msg);
|
||||||
|
}
|
||||||
|
|
||||||
if (upstream_node_id != UNKNOWN_NODE_ID)
|
if (upstream_node_id != UNKNOWN_NODE_ID)
|
||||||
{
|
{
|
||||||
log_notice(_("standby will attach to upstream node %i"), upstream_node_id);
|
log_notice(_("standby will attach to upstream node %i"), upstream_node_id);
|
||||||
@@ -599,15 +643,16 @@ do_standby_clone(void)
|
|||||||
log_hint(_("consider using the -c/--fast-checkpoint option"));
|
log_hint(_("consider using the -c/--fast-checkpoint option"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
PQfinish(source_conn);
|
||||||
|
|
||||||
log_info(_("all prerequisites for \"standby clone\" are met"));
|
log_info(_("all prerequisites for \"standby clone\" are met"));
|
||||||
|
|
||||||
PQfinish(source_conn);
|
|
||||||
exit(SUCCESS);
|
exit(SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mode != barman)
|
if (mode != barman)
|
||||||
{
|
{
|
||||||
initialise_direct_clone(&local_node_record);
|
initialise_direct_clone(&local_node_record, &upstream_node_record);
|
||||||
}
|
}
|
||||||
|
|
||||||
switch (mode)
|
switch (mode)
|
||||||
@@ -650,7 +695,15 @@ do_standby_clone(void)
|
|||||||
/* If a replication slot was previously created, drop it */
|
/* If a replication slot was previously created, drop it */
|
||||||
if (config_file_options.use_replication_slots == true)
|
if (config_file_options.use_replication_slots == true)
|
||||||
{
|
{
|
||||||
drop_replication_slot_sql(source_conn, local_node_record.slot_name);
|
/*
|
||||||
|
* In the case where a standby is being cloned from a node other than its
|
||||||
|
* intended upstream, We can't be sure of the source node's node_id. This
|
||||||
|
* is only required by "drop_replication_slot_if_exists()" to determine
|
||||||
|
* from the node's record whether it has a different replication user, and
|
||||||
|
* as in this case that would need to be supplied via "--replication-user"
|
||||||
|
* it's not a problem.
|
||||||
|
*/
|
||||||
|
drop_replication_slot_if_exists(source_conn, UNKNOWN_NODE_ID, local_node_record.slot_name);
|
||||||
}
|
}
|
||||||
|
|
||||||
log_error(_("unable to take a base backup of the primary server"));
|
log_error(_("unable to take a base backup of the primary server"));
|
||||||
@@ -1313,8 +1366,20 @@ _do_create_replication_conf(void)
|
|||||||
/* add replication slot, if required */
|
/* add replication slot, if required */
|
||||||
if (slot_creation_required == true)
|
if (slot_creation_required == true)
|
||||||
{
|
{
|
||||||
|
PQExpBufferData msg;
|
||||||
|
initPQExpBuffer(&msg);
|
||||||
|
|
||||||
if (runtime_options.dry_run == true)
|
if (runtime_options.dry_run == true)
|
||||||
{
|
{
|
||||||
|
/*
|
||||||
|
* In --dry-run mode this will check availability
|
||||||
|
* of a user who can create replication slots.
|
||||||
|
*/
|
||||||
|
// XXX check return value
|
||||||
|
create_replication_slot(upstream_conn,
|
||||||
|
local_node_record.slot_name,
|
||||||
|
NULL,
|
||||||
|
&msg);
|
||||||
log_info(_("would create replication slot \"%s\" on upstream node \"%s\" (ID: %i)"),
|
log_info(_("would create replication slot \"%s\" on upstream node \"%s\" (ID: %i)"),
|
||||||
local_node_record.slot_name,
|
local_node_record.slot_name,
|
||||||
upstream_node_record.node_name,
|
upstream_node_record.node_name,
|
||||||
@@ -1322,11 +1387,10 @@ _do_create_replication_conf(void)
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
PQExpBufferData msg;
|
|
||||||
initPQExpBuffer(&msg);
|
|
||||||
|
|
||||||
if (create_replication_slot_sql(upstream_conn,
|
if (create_replication_slot(upstream_conn,
|
||||||
local_node_record.slot_name,
|
local_node_record.slot_name,
|
||||||
|
NULL,
|
||||||
&msg) == false)
|
&msg) == false)
|
||||||
{
|
{
|
||||||
log_error("%s", msg.data);
|
log_error("%s", msg.data);
|
||||||
@@ -1335,13 +1399,14 @@ _do_create_replication_conf(void)
|
|||||||
exit(ERR_BAD_CONFIG);
|
exit(ERR_BAD_CONFIG);
|
||||||
}
|
}
|
||||||
|
|
||||||
termPQExpBuffer(&msg);
|
|
||||||
|
|
||||||
log_notice(_("replication slot \"%s\" created on upstream node \"%s\" (ID: %i)"),
|
log_notice(_("replication slot \"%s\" created on upstream node \"%s\" (ID: %i)"),
|
||||||
local_node_record.slot_name,
|
local_node_record.slot_name,
|
||||||
upstream_node_record.node_name,
|
upstream_node_record.node_name,
|
||||||
upstream_node_id);
|
upstream_node_id);
|
||||||
}
|
}
|
||||||
|
termPQExpBuffer(&msg);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -3070,8 +3135,9 @@ do_standby_follow_internal(PGconn *primary_conn, PGconn *follow_target_conn, t_n
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
if (create_replication_slot_sql(follow_target_conn,
|
if (create_replication_slot(follow_target_conn,
|
||||||
local_node_record.slot_name,
|
local_node_record.slot_name,
|
||||||
|
NULL,
|
||||||
output) == false)
|
output) == false)
|
||||||
{
|
{
|
||||||
log_error("%s", output->data);
|
log_error("%s", output->data);
|
||||||
@@ -5163,6 +5229,14 @@ check_source_server()
|
|||||||
exit(ERR_BAD_CONFIG);
|
exit(ERR_BAD_CONFIG);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* To create replication slots, we'll need a user with the REPLICATION
|
||||||
|
* privilege, or a superuser.
|
||||||
|
*/
|
||||||
|
if (config_file_options.use_replication_slots == true)
|
||||||
|
{
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -5718,7 +5792,7 @@ check_upstream_config(PGconn *conn, int server_version_num, t_node_info *upstrea
|
|||||||
* - standby_clone
|
* - standby_clone
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
initialise_direct_clone(t_node_info *node_record)
|
initialise_direct_clone(t_node_info *local_node_record, t_node_info *upstream_node_record)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* Check the destination data directory can be used (in Barman mode, this
|
* Check the destination data directory can be used (in Barman mode, this
|
||||||
@@ -5820,15 +5894,11 @@ initialise_direct_clone(t_node_info *node_record)
|
|||||||
|
|
||||||
if (config_file_options.use_replication_slots == true)
|
if (config_file_options.use_replication_slots == true)
|
||||||
{
|
{
|
||||||
PGconn *superuser_conn = NULL;
|
|
||||||
PGconn *privileged_conn = NULL;
|
|
||||||
PQExpBufferData event_details;
|
PQExpBufferData event_details;
|
||||||
|
|
||||||
initPQExpBuffer(&event_details);
|
initPQExpBuffer(&event_details);
|
||||||
|
|
||||||
get_superuser_connection(&source_conn, &superuser_conn, &privileged_conn);
|
if (create_replication_slot(source_conn, local_node_record->slot_name, upstream_node_record, &event_details) == false)
|
||||||
|
|
||||||
if (create_replication_slot_sql(privileged_conn, node_record->slot_name, &event_details) == false)
|
|
||||||
{
|
{
|
||||||
log_error("%s", event_details.data);
|
log_error("%s", event_details.data);
|
||||||
|
|
||||||
@@ -5841,9 +5911,6 @@ initialise_direct_clone(t_node_info *node_record)
|
|||||||
|
|
||||||
PQfinish(source_conn);
|
PQfinish(source_conn);
|
||||||
|
|
||||||
if (superuser_conn != NULL)
|
|
||||||
PQfinish(superuser_conn);
|
|
||||||
|
|
||||||
exit(ERR_DB_QUERY);
|
exit(ERR_DB_QUERY);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -5851,10 +5918,7 @@ initialise_direct_clone(t_node_info *node_record)
|
|||||||
|
|
||||||
log_verbose(LOG_INFO,
|
log_verbose(LOG_INFO,
|
||||||
_("replication slot \"%s\" created on source node"),
|
_("replication slot \"%s\" created on source node"),
|
||||||
node_record->slot_name);
|
local_node_record->slot_name);
|
||||||
|
|
||||||
if (superuser_conn != NULL)
|
|
||||||
PQfinish(superuser_conn);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return;
|
return;
|
||||||
@@ -6050,10 +6114,6 @@ run_basebackup(t_node_info *node_record)
|
|||||||
*/
|
*/
|
||||||
if (config_file_options.use_replication_slots && upstream_node_id != UNKNOWN_NODE_ID)
|
if (config_file_options.use_replication_slots && upstream_node_id != UNKNOWN_NODE_ID)
|
||||||
{
|
{
|
||||||
|
|
||||||
PGconn *superuser_conn = NULL;
|
|
||||||
PGconn *privileged_conn = NULL;
|
|
||||||
|
|
||||||
t_node_info upstream_node_record = T_NODE_INFO_INITIALIZER;
|
t_node_info upstream_node_record = T_NODE_INFO_INITIALIZER;
|
||||||
t_replication_slot slot_info = T_REPLICATION_SLOT_INITIALIZER;
|
t_replication_slot slot_info = T_REPLICATION_SLOT_INITIALIZER;
|
||||||
RecordStatus record_status = RECORD_NOT_FOUND;
|
RecordStatus record_status = RECORD_NOT_FOUND;
|
||||||
@@ -6062,7 +6122,7 @@ run_basebackup(t_node_info *node_record)
|
|||||||
record_status = get_node_record(source_conn, upstream_node_id, &upstream_node_record);
|
record_status = get_node_record(source_conn, upstream_node_id, &upstream_node_record);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* if there's no upstream record, there's no point in trying to create
|
* If there's no upstream record, there's no point in trying to create
|
||||||
* a replication slot on the designated upstream, as the assumption is
|
* a replication slot on the designated upstream, as the assumption is
|
||||||
* it won't exist at this point.
|
* it won't exist at this point.
|
||||||
*/
|
*/
|
||||||
@@ -6111,10 +6171,8 @@ run_basebackup(t_node_info *node_record)
|
|||||||
node_record->slot_name,
|
node_record->slot_name,
|
||||||
upstream_node_id);
|
upstream_node_id);
|
||||||
|
|
||||||
get_superuser_connection(&upstream_conn, &superuser_conn, &privileged_conn);
|
|
||||||
|
|
||||||
initPQExpBuffer(&event_details);
|
initPQExpBuffer(&event_details);
|
||||||
if (create_replication_slot_sql(privileged_conn, node_record->slot_name, &event_details) == false)
|
if (create_replication_slot(upstream_conn, node_record->slot_name, &upstream_node_record, &event_details) == false)
|
||||||
{
|
{
|
||||||
log_error("%s", event_details.data);
|
log_error("%s", event_details.data);
|
||||||
|
|
||||||
@@ -6127,15 +6185,9 @@ run_basebackup(t_node_info *node_record)
|
|||||||
|
|
||||||
PQfinish(source_conn);
|
PQfinish(source_conn);
|
||||||
|
|
||||||
if (superuser_conn != NULL)
|
|
||||||
PQfinish(superuser_conn);
|
|
||||||
|
|
||||||
exit(ERR_DB_QUERY);
|
exit(ERR_DB_QUERY);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (superuser_conn != NULL)
|
|
||||||
PQfinish(superuser_conn);
|
|
||||||
|
|
||||||
termPQExpBuffer(&event_details);
|
termPQExpBuffer(&event_details);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -6143,21 +6195,22 @@ run_basebackup(t_node_info *node_record)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* delete slot on source server */
|
|
||||||
|
|
||||||
get_superuser_connection(&source_conn, &superuser_conn, &privileged_conn);
|
|
||||||
|
|
||||||
if (slot_info.active == false)
|
if (slot_info.active == false)
|
||||||
{
|
{
|
||||||
if (slot_exists_on_upstream == false)
|
if (slot_exists_on_upstream == false)
|
||||||
{
|
{
|
||||||
if (drop_replication_slot_sql(privileged_conn, node_record->slot_name) == true)
|
|
||||||
|
/* delete slot on source server */
|
||||||
|
|
||||||
|
if (drop_replication_slot_if_exists(source_conn, UNKNOWN_NODE_ID, node_record->slot_name) == true)
|
||||||
{
|
{
|
||||||
log_notice(_("replication slot \"%s\" deleted on source node"), node_record->slot_name);
|
log_notice(_("replication slot \"%s\" deleted on source node"),
|
||||||
|
node_record->slot_name);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
log_error(_("unable to delete replication slot \"%s\" on source node"), node_record->slot_name);
|
log_error(_("unable to delete replication slot \"%s\" on source node"),
|
||||||
|
node_record->slot_name);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -6168,21 +6221,17 @@ run_basebackup(t_node_info *node_record)
|
|||||||
*/
|
*/
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
log_warning(_("replication slot \"%s\" is still active on source node"), node_record->slot_name);
|
log_warning(_("replication slot \"%s\" is still active on source node"),
|
||||||
|
node_record->slot_name);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (superuser_conn != NULL)
|
|
||||||
PQfinish(superuser_conn);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
return SUCCESS;
|
return SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static int
|
static int
|
||||||
run_file_backup(t_node_info *node_record)
|
run_file_backup(t_node_info *local_node_record)
|
||||||
{
|
{
|
||||||
int r = SUCCESS,
|
int r = SUCCESS,
|
||||||
i;
|
i;
|
||||||
@@ -6681,31 +6730,37 @@ stop_backup:
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
record_status = get_slot_record(upstream_conn, node_record->slot_name, &slot_info);
|
record_status = get_slot_record(upstream_conn, local_node_record->slot_name, &slot_info);
|
||||||
|
|
||||||
if (record_status == RECORD_FOUND)
|
if (record_status == RECORD_FOUND)
|
||||||
{
|
{
|
||||||
log_verbose(LOG_INFO,
|
log_verbose(LOG_INFO,
|
||||||
_("replication slot \"%s\" aleady exists on upstream node %i"),
|
_("replication slot \"%s\" aleady exists on upstream node %i"),
|
||||||
node_record->slot_name,
|
local_node_record->slot_name,
|
||||||
upstream_node_id);
|
upstream_node_id);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
PQExpBufferData errmsg;
|
PQExpBufferData errmsg;
|
||||||
|
bool success;
|
||||||
|
|
||||||
initPQExpBuffer(&errmsg);
|
initPQExpBuffer(&errmsg);
|
||||||
|
success = create_replication_slot(upstream_conn,
|
||||||
if (create_replication_slot_sql(upstream_conn, node_record->slot_name, &errmsg) == false)
|
local_node_record->slot_name,
|
||||||
|
&upstream_node_record,
|
||||||
|
&errmsg);
|
||||||
|
if (success == false)
|
||||||
{
|
{
|
||||||
log_error(_("unable to create replication slot on upstream node %i"), upstream_node_id);
|
log_error(_("unable to create replication slot \"%s\" on upstream node %i"),
|
||||||
|
local_node_record->slot_name,
|
||||||
|
upstream_node_id);
|
||||||
log_detail("%s", errmsg.data);
|
log_detail("%s", errmsg.data);
|
||||||
slot_warning = true;
|
slot_warning = true;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
log_notice(_("replication slot \"%s\" created on upstream node \"%s\" (ID: %i)"),
|
log_notice(_("replication slot \"%s\" created on upstream node \"%s\" (ID: %i)"),
|
||||||
node_record->slot_name,
|
local_node_record->slot_name,
|
||||||
upstream_node_record.node_name,
|
upstream_node_record.node_name,
|
||||||
upstream_node_id );
|
upstream_node_id );
|
||||||
}
|
}
|
||||||
@@ -6722,7 +6777,7 @@ stop_backup:
|
|||||||
{
|
{
|
||||||
log_warning(_("\"use_replication_slots\" specified but a replication slot could not be created"));
|
log_warning(_("\"use_replication_slots\" specified but a replication slot could not be created"));
|
||||||
log_hint(_("ensure a replication slot called \"%s\" is created on the upstream node (ID: %i)"),
|
log_hint(_("ensure a replication slot called \"%s\" is created on the upstream node (ID: %i)"),
|
||||||
node_record->slot_name,
|
local_node_record->slot_name,
|
||||||
upstream_node_id);
|
upstream_node_id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -19,14 +19,6 @@
|
|||||||
#ifndef _REPMGR_ACTION_STANDBY_H_
|
#ifndef _REPMGR_ACTION_STANDBY_H_
|
||||||
#define _REPMGR_ACTION_STANDBY_H_
|
#define _REPMGR_ACTION_STANDBY_H_
|
||||||
|
|
||||||
typedef enum
|
|
||||||
{
|
|
||||||
REPMGR_USER = 0,
|
|
||||||
REPLICATION_USER,
|
|
||||||
REPLICATION_PROTOCOL_USER,
|
|
||||||
SUPERUSER
|
|
||||||
} t_user_type;
|
|
||||||
|
|
||||||
extern void do_standby_clone(void);
|
extern void do_standby_clone(void);
|
||||||
extern void do_standby_register(void);
|
extern void do_standby_register(void);
|
||||||
extern void do_standby_unregister(void);
|
extern void do_standby_unregister(void);
|
||||||
|
|||||||
@@ -201,6 +201,16 @@ typedef enum
|
|||||||
} t_server_action;
|
} t_server_action;
|
||||||
|
|
||||||
|
|
||||||
|
typedef enum
|
||||||
|
{
|
||||||
|
USER_TYPE_UNKNOWN = -1,
|
||||||
|
REPMGR_USER,
|
||||||
|
REPLICATION_USER_OPT,
|
||||||
|
REPLICATION_USER_NODE,
|
||||||
|
SUPERUSER
|
||||||
|
} t_user_type;
|
||||||
|
|
||||||
|
|
||||||
typedef struct ColHeader
|
typedef struct ColHeader
|
||||||
{
|
{
|
||||||
char title[MAXLEN];
|
char title[MAXLEN];
|
||||||
@@ -255,7 +265,9 @@ extern void get_node_config_directory(char *config_dir_buf);
|
|||||||
extern void get_node_data_directory(char *data_dir_buf);
|
extern void get_node_data_directory(char *data_dir_buf);
|
||||||
extern void init_node_record(t_node_info *node_record);
|
extern void init_node_record(t_node_info *node_record);
|
||||||
extern bool can_use_pg_rewind(PGconn *conn, const char *data_directory, PQExpBufferData *reason);
|
extern bool can_use_pg_rewind(PGconn *conn, const char *data_directory, PQExpBufferData *reason);
|
||||||
extern void drop_replication_slot_if_exists(PGconn *conn, int node_id, char *slot_name);
|
|
||||||
|
extern bool create_replication_slot(PGconn *conn, char *slot_name, t_node_info *upstream_node_record, PQExpBufferData *error_msg);
|
||||||
|
extern bool drop_replication_slot_if_exists(PGconn *conn, int node_id, char *slot_name);
|
||||||
|
|
||||||
extern bool check_node_can_attach(TimeLineID local_tli, XLogRecPtr local_xlogpos, PGconn *follow_target_conn, t_node_info *follow_target_node_record, bool is_rejoin);
|
extern bool check_node_can_attach(TimeLineID local_tli, XLogRecPtr local_xlogpos, PGconn *follow_target_conn, t_node_info *follow_target_node_record, bool is_rejoin);
|
||||||
extern void check_shared_library(PGconn *conn);
|
extern void check_shared_library(PGconn *conn);
|
||||||
|
|||||||
269
repmgr-client.c
269
repmgr-client.c
@@ -95,11 +95,17 @@ char path_buf[MAXLEN] = "";
|
|||||||
*/
|
*/
|
||||||
t_node_info target_node_info = T_NODE_INFO_INITIALIZER;
|
t_node_info target_node_info = T_NODE_INFO_INITIALIZER;
|
||||||
|
|
||||||
|
/* used by create_replication_slot() */
|
||||||
|
static t_user_type ReplicationSlotUser = USER_TYPE_UNKNOWN;
|
||||||
|
|
||||||
/* Collate command line errors and warnings here for friendlier reporting */
|
/* Collate command line errors and warnings here for friendlier reporting */
|
||||||
static ItemList cli_errors = {NULL, NULL};
|
static ItemList cli_errors = {NULL, NULL};
|
||||||
static ItemList cli_warnings = {NULL, NULL};
|
static ItemList cli_warnings = {NULL, NULL};
|
||||||
|
|
||||||
|
static void _determine_replication_slot_user(PGconn *conn,
|
||||||
|
t_node_info *upstream_node_record,
|
||||||
|
char **replication_user);
|
||||||
|
|
||||||
int
|
int
|
||||||
main(int argc, char **argv)
|
main(int argc, char **argv)
|
||||||
{
|
{
|
||||||
@@ -3602,48 +3608,267 @@ can_use_pg_rewind(PGconn *conn, const char *data_directory, PQExpBufferData *rea
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void
|
// provided connection should be for the normal repmgr user
|
||||||
|
// upstream_node_record may be NULL or initialised to default values
|
||||||
|
bool
|
||||||
|
create_replication_slot(PGconn *conn, char *slot_name, t_node_info *upstream_node_record, PQExpBufferData *error_msg)
|
||||||
|
{
|
||||||
|
PGconn *slot_conn = NULL;
|
||||||
|
bool use_replication_protocol = false;
|
||||||
|
bool success = true;
|
||||||
|
char *replication_user = NULL;
|
||||||
|
|
||||||
|
_determine_replication_slot_user(conn, upstream_node_record, &replication_user);
|
||||||
|
/*
|
||||||
|
* If called in --dry-run context, if the replication slot user is not the
|
||||||
|
* repmgr user, attempt to validate the connection.
|
||||||
|
*/
|
||||||
|
if (runtime_options.dry_run == true)
|
||||||
|
{
|
||||||
|
switch (ReplicationSlotUser)
|
||||||
|
{
|
||||||
|
case USER_TYPE_UNKNOWN:
|
||||||
|
log_error("unable to determine user for replication slot creation");
|
||||||
|
return false;
|
||||||
|
case REPMGR_USER:
|
||||||
|
log_info(_("replication slots will be created by user \"%s\""),
|
||||||
|
PQuser(conn));
|
||||||
|
return true;
|
||||||
|
|
||||||
|
case REPLICATION_USER_NODE:
|
||||||
|
case REPLICATION_USER_OPT:
|
||||||
|
{
|
||||||
|
PGconn *repl_conn = duplicate_connection(conn,
|
||||||
|
replication_user,
|
||||||
|
true);
|
||||||
|
if (repl_conn == NULL || PQstatus(repl_conn) != CONNECTION_OK)
|
||||||
|
{
|
||||||
|
log_error(_("unable to create replication connection as user \"%s\""),
|
||||||
|
replication_user);
|
||||||
|
log_detail("%s", PQerrorMessage(repl_conn));
|
||||||
|
|
||||||
|
PQfinish(repl_conn);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
log_info(_("replication slots will be created by replication user \"%s\""),
|
||||||
|
replication_user);
|
||||||
|
PQfinish(repl_conn);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
case SUPERUSER:
|
||||||
|
{
|
||||||
|
PGconn *superuser_conn = duplicate_connection(conn,
|
||||||
|
runtime_options.superuser,
|
||||||
|
false);
|
||||||
|
if (superuser_conn == NULL || PQstatus(superuser_conn )!= CONNECTION_OK)
|
||||||
|
{
|
||||||
|
log_error(_("unable to create superuser connection as user \"%s\""),
|
||||||
|
runtime_options.superuser);
|
||||||
|
log_detail("%s", PQerrorMessage(superuser_conn));
|
||||||
|
|
||||||
|
PQfinish(superuser_conn);
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
log_info(_("replication slots will be created by superuser \"%s\""),
|
||||||
|
runtime_options.superuser);
|
||||||
|
PQfinish(superuser_conn);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If we can't create a replication slot with the connection provided to
|
||||||
|
* the function, create an connection with appropriate permissions.
|
||||||
|
*/
|
||||||
|
switch (ReplicationSlotUser)
|
||||||
|
{
|
||||||
|
case USER_TYPE_UNKNOWN:
|
||||||
|
log_error("unable to determine user for replication slot creation");
|
||||||
|
return false;
|
||||||
|
case REPMGR_USER:
|
||||||
|
slot_conn = conn;
|
||||||
|
log_info(_("creating replication slot as user \"%s\""),
|
||||||
|
PQuser(conn));
|
||||||
|
break;
|
||||||
|
|
||||||
|
case REPLICATION_USER_NODE:
|
||||||
|
case REPLICATION_USER_OPT:
|
||||||
|
{
|
||||||
|
slot_conn = duplicate_connection(conn,
|
||||||
|
replication_user,
|
||||||
|
true);
|
||||||
|
if (slot_conn == NULL || PQstatus(slot_conn) != CONNECTION_OK)
|
||||||
|
{
|
||||||
|
log_error(_("unable to create replication connection as user \"%s\""),
|
||||||
|
runtime_options.replication_user);
|
||||||
|
log_detail("%s", PQerrorMessage(slot_conn));
|
||||||
|
|
||||||
|
PQfinish(slot_conn);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
use_replication_protocol = true;
|
||||||
|
log_info(_("creating replication slot as replication user \"%s\""),
|
||||||
|
replication_user);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
case SUPERUSER:
|
||||||
|
{
|
||||||
|
slot_conn = duplicate_connection(conn,
|
||||||
|
runtime_options.superuser,
|
||||||
|
false);
|
||||||
|
if (slot_conn == NULL || PQstatus(slot_conn )!= CONNECTION_OK)
|
||||||
|
{
|
||||||
|
log_error(_("unable to create super connection as user \"%s\""),
|
||||||
|
runtime_options.superuser);
|
||||||
|
log_detail("%s", PQerrorMessage(slot_conn));
|
||||||
|
|
||||||
|
PQfinish(slot_conn);
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
log_info(_("creating replication slot as superuser \"%s\""),
|
||||||
|
runtime_options.superuser);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (use_replication_protocol == true)
|
||||||
|
{
|
||||||
|
success = create_replication_slot_replprot(conn, slot_conn, slot_name, error_msg);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
success = create_replication_slot_sql(slot_conn, slot_name, error_msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
if (slot_conn != conn)
|
||||||
|
PQfinish(slot_conn);
|
||||||
|
|
||||||
|
return success;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
bool
|
||||||
drop_replication_slot_if_exists(PGconn *conn, int node_id, char *slot_name)
|
drop_replication_slot_if_exists(PGconn *conn, int node_id, char *slot_name)
|
||||||
{
|
{
|
||||||
|
t_node_info node_record = T_NODE_INFO_INITIALIZER;
|
||||||
t_replication_slot slot_info = T_REPLICATION_SLOT_INITIALIZER;
|
t_replication_slot slot_info = T_REPLICATION_SLOT_INITIALIZER;
|
||||||
RecordStatus record_status = get_slot_record(conn, slot_name, &slot_info);
|
RecordStatus record_status;
|
||||||
|
|
||||||
|
char *replication_user = NULL;
|
||||||
|
bool success = true;
|
||||||
|
|
||||||
|
if (node_id != UNKNOWN_NODE_ID)
|
||||||
|
{
|
||||||
|
record_status = get_node_record(conn, node_id, &node_record);
|
||||||
|
}
|
||||||
|
|
||||||
|
_determine_replication_slot_user(conn, &node_record, &replication_user);
|
||||||
|
|
||||||
|
record_status = get_slot_record(conn, slot_name, &slot_info);
|
||||||
|
|
||||||
log_verbose(LOG_DEBUG, "attempting to delete slot \"%s\" on node %i",
|
log_verbose(LOG_DEBUG, "attempting to delete slot \"%s\" on node %i",
|
||||||
slot_name, node_id);
|
slot_name, node_id);
|
||||||
|
|
||||||
if (record_status != RECORD_FOUND)
|
if (record_status != RECORD_FOUND)
|
||||||
{
|
{
|
||||||
/* this is a good thing */
|
/* this is not a bad good thing */
|
||||||
log_verbose(LOG_INFO,
|
log_verbose(LOG_INFO,
|
||||||
_("slot \"%s\" does not exist on node %i, nothing to remove"),
|
_("slot \"%s\" does not exist on node %i, nothing to remove"),
|
||||||
slot_name, node_id);
|
slot_name, node_id);
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
else
|
|
||||||
{
|
|
||||||
if (slot_info.active == false)
|
|
||||||
{
|
|
||||||
if (drop_replication_slot_sql(conn, slot_name) == true)
|
|
||||||
{
|
|
||||||
log_notice(_("replication slot \"%s\" deleted on node %i"), slot_name, node_id);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
log_error(_("unable to delete replication slot \"%s\" on node %i"), slot_name, node_id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
if (slot_info.active == false)
|
||||||
* if active replication slot exists, call Houston as we have a
|
{
|
||||||
* problem
|
if (drop_replication_slot_sql(conn, slot_name) == true)
|
||||||
*/
|
{
|
||||||
|
log_notice(_("replication slot \"%s\" deleted on node %i"), slot_name, node_id);
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
log_warning(_("replication slot \"%s\" is still active on node %i"), slot_name, node_id);
|
log_error(_("unable to delete replication slot \"%s\" on node %i"), slot_name, node_id);
|
||||||
|
success = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If an active replication slot exists, call Houston as we have a
|
||||||
|
* problem.
|
||||||
|
*/
|
||||||
|
else
|
||||||
|
{
|
||||||
|
log_warning(_("replication slot \"%s\" is still active on node %i"), slot_name, node_id);
|
||||||
|
success = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return success;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
_determine_replication_slot_user(PGconn *conn, t_node_info *upstream_node_record, char **replication_user)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* If not previously done, work out which user will be responsible
|
||||||
|
* for creating replication slots.
|
||||||
|
*/
|
||||||
|
if (ReplicationSlotUser == USER_TYPE_UNKNOWN)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Is the repmgr user a superuser?
|
||||||
|
*/
|
||||||
|
if (is_superuser_connection(conn, NULL))
|
||||||
|
{
|
||||||
|
ReplicationSlotUser = REPMGR_USER;
|
||||||
|
}
|
||||||
|
/*
|
||||||
|
* Does the repmgr user have the REPLICATION role?
|
||||||
|
* Note we don't care here whether the repmgr user can actually
|
||||||
|
* make a replication connection, we're just confirming that the
|
||||||
|
* connection we have has the appropriate permissions.
|
||||||
|
*/
|
||||||
|
else if (is_replication_role(conn, NULL))
|
||||||
|
{
|
||||||
|
ReplicationSlotUser = REPMGR_USER;
|
||||||
|
}
|
||||||
|
/*
|
||||||
|
* Is a superuser provided with --superuser?
|
||||||
|
* We'll check later whether we can make a connection as that user.
|
||||||
|
*/
|
||||||
|
else if (runtime_options.superuser[0] != '\0')
|
||||||
|
{
|
||||||
|
ReplicationSlotUser = SUPERUSER;
|
||||||
|
}
|
||||||
|
/*
|
||||||
|
* Is a replication user provided with --replication-user?
|
||||||
|
* We'll check later whether we can make a replication connection as that user.
|
||||||
|
* Overrides any replication user defined in the upstream node record.
|
||||||
|
*/
|
||||||
|
else if (runtime_options.replication_user[0] != '\0')
|
||||||
|
{
|
||||||
|
ReplicationSlotUser = REPLICATION_USER_OPT;
|
||||||
|
*replication_user = runtime_options.replication_user;
|
||||||
|
}
|
||||||
|
/*
|
||||||
|
* Is the upstream's node record provided, and does it have a different
|
||||||
|
* replication user?
|
||||||
|
* We'll check later whether we can make a replication connection as that user.
|
||||||
|
*/
|
||||||
|
else if (upstream_node_record != NULL && upstream_node_record->node_id != UNKNOWN_NODE_ID
|
||||||
|
&& strncmp(upstream_node_record->repluser, PQuser(conn), NAMEDATALEN) != 0)
|
||||||
|
{
|
||||||
|
ReplicationSlotUser = REPLICATION_USER_NODE;
|
||||||
|
*replication_user = upstream_node_record->repluser;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Here we'll perform some timeline sanity checks to ensure the follow target
|
* Here we'll perform some timeline sanity checks to ensure the follow target
|
||||||
* can actually be followed.
|
* can actually be followed.
|
||||||
|
|||||||
Reference in New Issue
Block a user