Compare commits

..

4 Commits

Author SHA1 Message Date
Ian Barwick
4a73bdcfd0 need to provide the follow target node record for node follow internal
otherwise fails when using a dedicated replication user.

WIP.
2022-05-16 15:50:02 +09:00
Ian Barwick
e27f213949 split out replication slot connection creation code into function
WIP
2022-05-16 13:51:45 +09:00
Ian Barwick
cce5ca2245 doc: update release notes 2022-05-16 12:05:35 +09:00
Ian Barwick
6f87d2c61e repmgrd: improve walsender disable check
Specifically, don't attempt to disable walsenders if "standby_disconnect_on_failover"
is "true", but the repmgr user is not a superuser.

This restriction can be lifted from PostgreSQL 15.
2022-05-16 11:52:10 +09:00
3 changed files with 122 additions and 74 deletions

View File

@@ -14,6 +14,8 @@
<para>
See also: <xref linkend="upgrading-repmgr"/>
</para>
<!-- remember to update the release date in ../repmgr_version.h.in -->
<sect1 id="release-5.3.2">
<title id="release-current">Release 5.3.2</title>
<para><emphasis>??? ??? ???, 2022</emphasis></para>
@@ -64,7 +66,6 @@
</sect2>
</sect1>
<!-- remember to update the release date in ../repmgr_version.h.in -->
<sect1 id="release-5.3.1">
<title>Release 5.3.1</title>
<para><emphasis>Tue 15 February, 2022</emphasis></para>

View File

@@ -3348,10 +3348,9 @@ do_standby_follow_internal(PGconn *primary_conn, PGconn *follow_target_conn, t_n
update_node_record_slot_name(primary_conn, config_file_options.node_id, local_node_record.slot_name);
}
if (create_replication_slot(follow_target_conn,
local_node_record.slot_name,
NULL,
follow_target_node_record,
output) == false)
{
log_error("%s", output->data);

View File

@@ -90,17 +90,22 @@ char pg_bindir[MAXPGPATH] = "";
*/
t_node_info target_node_info = T_NODE_INFO_INITIALIZER;
/* used by create_replication_slot() */
/* set by the first call to _determine_replication_slot_user() */
static t_user_type ReplicationSlotUser = USER_TYPE_UNKNOWN;
/* Collate command line errors and warnings here for friendlier reporting */
static ItemList cli_errors = {NULL, NULL};
static ItemList cli_warnings = {NULL, NULL};
static void _determine_replication_slot_user(PGconn *conn,
t_node_info *upstream_node_record,
char **replication_user);
static PGconn *_get_replication_slot_connection(PGconn *conn,
char *replication_user,
bool *use_replication_protocol);
int
main(int argc, char **argv)
{
@@ -3739,6 +3744,7 @@ create_replication_slot(PGconn *conn, char *slot_name, t_node_info *upstream_nod
char *replication_user = NULL;
_determine_replication_slot_user(conn, upstream_node_record, &replication_user);
/*
* If called in --dry-run context, if the replication slot user is not the
* repmgr user, attempt to validate the connection.
@@ -3750,7 +3756,7 @@ create_replication_slot(PGconn *conn, char *slot_name, t_node_info *upstream_nod
case USER_TYPE_UNKNOWN:
log_error("unable to determine user for replication slot creation");
return false;
case REPMGR_USER:
case REPMGR_USER:
log_info(_("replication slots will be created by user \"%s\""),
PQuser(conn));
return true;
@@ -3796,65 +3802,12 @@ create_replication_slot(PGconn *conn, char *slot_name, t_node_info *upstream_nod
PQfinish(superuser_conn);
}
}
}
/*
* If we can't create a replication slot with the connection provided to
* the function, create an connection with appropriate permissions.
*/
switch (ReplicationSlotUser)
{
case USER_TYPE_UNKNOWN:
log_error("unable to determine user for replication slot creation");
return false;
case REPMGR_USER:
slot_conn = conn;
log_info(_("creating replication slot as user \"%s\""),
PQuser(conn));
break;
slot_conn = _get_replication_slot_connection(conn, replication_user, &use_replication_protocol);
case REPLICATION_USER_NODE:
case REPLICATION_USER_OPT:
{
slot_conn = duplicate_connection(conn,
replication_user,
true);
if (slot_conn == NULL || PQstatus(slot_conn) != CONNECTION_OK)
{
log_error(_("unable to create replication connection as user \"%s\""),
runtime_options.replication_user);
log_detail("%s", PQerrorMessage(slot_conn));
PQfinish(slot_conn);
return false;
}
use_replication_protocol = true;
log_info(_("creating replication slot as replication user \"%s\""),
replication_user);
}
break;
case SUPERUSER:
{
slot_conn = duplicate_connection(conn,
runtime_options.superuser,
false);
if (slot_conn == NULL || PQstatus(slot_conn )!= CONNECTION_OK)
{
log_error(_("unable to create super connection as user \"%s\""),
runtime_options.superuser);
log_detail("%s", PQerrorMessage(slot_conn));
PQfinish(slot_conn);
return false;
}
log_info(_("creating replication slot as superuser \"%s\""),
runtime_options.superuser);
}
break;
}
if (slot_conn == NULL)
return false;
if (use_replication_protocol == true)
{
@@ -3897,34 +3850,55 @@ drop_replication_slot_if_exists(PGconn *conn, int node_id, char *slot_name)
if (record_status != RECORD_FOUND)
{
/* this is not a bad good thing */
/* no slot, no problem */
log_verbose(LOG_INFO,
_("slot \"%s\" does not exist on node %i, nothing to remove"),
slot_name, node_id);
return true;
}
if (slot_info.active == false)
if (slot_info.active == true)
{
if (drop_replication_slot_sql(conn, slot_name) == true)
/*
* If an active replication slot exists, bail out as we have a problem
* we can't solve here.
*/
log_warning(_("replication slot \"%s\" is still active on node %i"), slot_name, node_id);
success = false;
}
else
{
/*
* Create the appropriate connection with which to drop the slot
*/
bool use_replication_protocol = false;
PGconn *slot_conn = NULL;
slot_conn = _get_replication_slot_connection(conn,
replication_user,
&use_replication_protocol);
if (use_replication_protocol == true)
{
success = drop_replication_slot_replprot(conn, slot_name);
}
else
{
success = drop_replication_slot_sql(conn, slot_name);
}
if (success == true)
{
log_notice(_("replication slot \"%s\" deleted on node %i"), slot_name, node_id);
}
else
{
log_error(_("unable to delete replication slot \"%s\" on node %i"), slot_name, node_id);
success = false;
}
}
/*
* If an active replication slot exists, call Houston as we have a
* problem.
*/
else
{
log_warning(_("replication slot \"%s\" is still active on node %i"), slot_name, node_id);
success = false;
if (slot_conn != conn)
PQfinish(slot_conn);
}
return success;
@@ -3986,10 +3960,84 @@ _determine_replication_slot_user(PGconn *conn, t_node_info *upstream_node_record
ReplicationSlotUser = REPLICATION_USER_NODE;
*replication_user = upstream_node_record->repluser;
}
else
{
/* This should never happen */
log_error("unable to determine replication slot user");
if (upstream_node_record != NULL)
log_debug("%i %s %s", upstream_node_record->node_id, upstream_node_record->repluser, PQuser(conn));
else
log_debug("upstream_node_record not provided");
}
}
}
static PGconn *
_get_replication_slot_connection(PGconn *conn, char *replication_user, bool *use_replication_protocol)
{
PGconn *slot_conn = NULL;
/*
* If we can't create a replication slot with the connection provided to
* the function, create an connection with appropriate permissions.
*/
switch (ReplicationSlotUser)
{
case USER_TYPE_UNKNOWN:
log_error("unable to determine user for replication slot creation");
return NULL;
case REPMGR_USER:
slot_conn = conn;
log_info(_("creating replication slot as user \"%s\""),
PQuser(conn));
break;
case REPLICATION_USER_NODE:
case REPLICATION_USER_OPT:
{
slot_conn = duplicate_connection(conn,
replication_user,
true);
if (slot_conn == NULL || PQstatus(slot_conn) != CONNECTION_OK)
{
log_error(_("unable to create replication connection as user \"%s\""),
runtime_options.replication_user);
log_detail("%s", PQerrorMessage(slot_conn));
PQfinish(slot_conn);
return NULL;
}
*use_replication_protocol = true;
log_info(_("creating replication slot as replication user \"%s\""),
replication_user);
}
break;
case SUPERUSER:
{
slot_conn = duplicate_connection(conn,
runtime_options.superuser,
false);
if (slot_conn == NULL || PQstatus(slot_conn )!= CONNECTION_OK)
{
log_error(_("unable to create super connection as user \"%s\""),
runtime_options.superuser);
log_detail("%s", PQerrorMessage(slot_conn));
PQfinish(slot_conn);
return NULL;
}
log_info(_("creating replication slot as superuser \"%s\""),
runtime_options.superuser);
}
break;
}
return slot_conn;
}
bool
check_replication_slots_available(int node_id, PGconn* conn)
{