mirror of
https://github.com/EnterpriseDB/repmgr.git
synced 2026-03-22 22:56:29 +00:00
"standby clone": improve replication slots handling
Ensure replication slot is created on the upstream node and deleted from the source node, if upstream node and source nodes differ.
This commit is contained in:
@@ -3184,6 +3184,8 @@ drop_replication_slot(PGconn *conn, char *slot_name)
|
||||
log_verbose(LOG_DEBUG, "replication slot \"%s\" successfully dropped",
|
||||
slot_name);
|
||||
|
||||
PQclear(res);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
@@ -2804,7 +2804,6 @@ check_source_server()
|
||||
* provided upstream connection
|
||||
*/
|
||||
|
||||
|
||||
/* Verify that upstream node is a supported server version */
|
||||
log_verbose(LOG_INFO, _("connected to source node, checking its state"));
|
||||
|
||||
@@ -2813,7 +2812,6 @@ check_source_server()
|
||||
if (get_cluster_size(source_conn, cluster_size) == false)
|
||||
exit(ERR_DB_QUERY);
|
||||
|
||||
log_info(_("successfully connected to source node"));
|
||||
log_detail(_("current installation size is %s"),
|
||||
cluster_size);
|
||||
|
||||
@@ -2825,7 +2823,11 @@ check_source_server()
|
||||
{
|
||||
primary_conn = get_primary_connection(source_conn, NULL, NULL);
|
||||
|
||||
// XXX check this worked?
|
||||
if (PQstatus(primary_conn) != CONNECTION_OK)
|
||||
{
|
||||
log_error(_("unable to connect to primary node"));
|
||||
exit(ERR_BAD_CONFIG);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -2928,31 +2930,35 @@ check_source_server()
|
||||
else
|
||||
upstream_node_id = runtime_options.upstream_node_id;
|
||||
|
||||
record_status = get_node_record(source_conn, upstream_node_id, &node_record);
|
||||
if (record_status == RECORD_FOUND)
|
||||
{
|
||||
upstream_conninfo_found = true;
|
||||
strncpy(recovery_conninfo_str, node_record.conninfo, MAXLEN);
|
||||
strncpy(upstream_repluser, node_record.repluser, NAMEDATALEN);
|
||||
}
|
||||
log_debug("upstream_node_id determined as %i", upstream_node_id);
|
||||
|
||||
/*
|
||||
* check that there's no existing node record with the same name but
|
||||
* different ID
|
||||
*/
|
||||
record_status = get_node_record_by_name(source_conn, config_file_options.node_name, &node_record);
|
||||
|
||||
if (record_status == RECORD_FOUND && node_record.node_id != config_file_options.node_id)
|
||||
if (upstream_node_id != UNKNOWN_NODE_ID)
|
||||
{
|
||||
log_error(_("another node (node_id: %i) already exists with node_name \"%s\""),
|
||||
node_record.node_id,
|
||||
config_file_options.node_name);
|
||||
PQfinish(source_conn);
|
||||
exit(ERR_BAD_CONFIG);
|
||||
record_status = get_node_record(source_conn, upstream_node_id, &node_record);
|
||||
if (record_status == RECORD_FOUND)
|
||||
{
|
||||
upstream_conninfo_found = true;
|
||||
strncpy(recovery_conninfo_str, node_record.conninfo, MAXLEN);
|
||||
strncpy(upstream_repluser, node_record.repluser, NAMEDATALEN);
|
||||
}
|
||||
|
||||
/*
|
||||
* check that there's no existing node record with the same name but
|
||||
* different ID
|
||||
*/
|
||||
record_status = get_node_record_by_name(source_conn, config_file_options.node_name, &node_record);
|
||||
|
||||
if (record_status == RECORD_FOUND && node_record.node_id != config_file_options.node_id)
|
||||
{
|
||||
log_error(_("another node (node_id: %i) already exists with node_name \"%s\""),
|
||||
node_record.node_id,
|
||||
config_file_options.node_name);
|
||||
PQfinish(source_conn);
|
||||
exit(ERR_BAD_CONFIG);
|
||||
}
|
||||
}
|
||||
|
||||
check_upstream_config(source_conn, source_server_version_num, &node_record, true);
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -3412,10 +3418,6 @@ check_upstream_config(PGconn *conn, int server_version_num, t_node_info *node_in
|
||||
static void
|
||||
initialise_direct_clone(t_node_info *node_record)
|
||||
{
|
||||
PGconn *superuser_conn = NULL;
|
||||
PGconn *privileged_conn = NULL;
|
||||
bool success = false;
|
||||
|
||||
/*
|
||||
* Check the destination data directory can be used
|
||||
* (in Barman mode, this directory will already have been created)
|
||||
@@ -3443,6 +3445,7 @@ initialise_direct_clone(t_node_info *node_record)
|
||||
TablespaceListCell *cell = false;
|
||||
KeyValueList not_found = { NULL, NULL };
|
||||
int total = 0, matched = 0;
|
||||
bool success = false;
|
||||
|
||||
for (cell = config_file_options.tablespace_mapping.head; cell; cell = cell->next)
|
||||
{
|
||||
@@ -3492,12 +3495,14 @@ initialise_direct_clone(t_node_info *node_record)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
get_superuser_connection(&source_conn, &superuser_conn, &privileged_conn);
|
||||
|
||||
/*
|
||||
* If replication slots requested, create appropriate slot on the
|
||||
* primary; this must be done before pg_basebackup is called.
|
||||
* source node; this must be done before pg_basebackup is called.
|
||||
*
|
||||
* Note: if the source node is different to the specified upstream
|
||||
* node, we'll need to drop the slot and recreate it on the upstream.
|
||||
*
|
||||
* TODO: skip this for Pg10, and ensure temp slot option used
|
||||
*
|
||||
* Replication slots are not supported (and not very useful
|
||||
* anyway) in Barman mode.
|
||||
@@ -3505,9 +3510,13 @@ initialise_direct_clone(t_node_info *node_record)
|
||||
|
||||
if (config_file_options.use_replication_slots == true)
|
||||
{
|
||||
PGconn *superuser_conn = NULL;
|
||||
PGconn *privileged_conn = NULL;
|
||||
PQExpBufferData event_details;
|
||||
initPQExpBuffer(&event_details);
|
||||
|
||||
get_superuser_connection(&source_conn, &superuser_conn, &privileged_conn);
|
||||
|
||||
if (create_replication_slot(privileged_conn, node_record->slot_name, source_server_version_num, &event_details) == false)
|
||||
{
|
||||
log_error("%s", event_details.data);
|
||||
@@ -3530,13 +3539,13 @@ initialise_direct_clone(t_node_info *node_record)
|
||||
|
||||
termPQExpBuffer(&event_details);
|
||||
|
||||
log_notice(_("replication slot \"%s\" created on upstream node (node_id: %i)"),
|
||||
node_record->slot_name,
|
||||
upstream_node_id);
|
||||
}
|
||||
log_verbose(LOG_INFO,
|
||||
_("replication slot \"%s\" created on upstream node"),
|
||||
node_record->slot_name);
|
||||
|
||||
if (superuser_conn != NULL)
|
||||
PQfinish(superuser_conn);
|
||||
if (superuser_conn != NULL)
|
||||
PQfinish(superuser_conn);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
@@ -3707,6 +3716,103 @@ run_basebackup(t_node_info *node_record)
|
||||
if (r != 0)
|
||||
return ERR_BAD_BASEBACKUP;
|
||||
|
||||
/*
|
||||
* If replication slots in use, check the created slot is on the correct node;
|
||||
* the slot will initially get created on the source node, and will need to be
|
||||
* dropped and recreated on the actual upstream node if these differ.
|
||||
*/
|
||||
if (config_file_options.use_replication_slots && upstream_node_id != UNKNOWN_NODE_ID)
|
||||
{
|
||||
PGconn *upstream_conn = NULL;
|
||||
|
||||
|
||||
t_node_info upstream_node_record = T_NODE_INFO_INITIALIZER;
|
||||
t_replication_slot slot_info = T_REPLICATION_SLOT_INITIALIZER;
|
||||
RecordStatus record_status = RECORD_NOT_FOUND;
|
||||
|
||||
//PQExpBufferData event_details;
|
||||
//initPQExpBuffer(&event_details);
|
||||
|
||||
//get_superuser_connection(&source_conn, &superuser_conn, &privileged_conn);
|
||||
|
||||
/* existence of node record previously established in check_source_server() */
|
||||
get_node_record(source_conn, upstream_node_id, &upstream_node_record);
|
||||
upstream_conn = establish_db_connection(upstream_node_record.conninfo, true);
|
||||
|
||||
record_status = get_slot_record(upstream_conn, node_record->slot_name, &slot_info);
|
||||
|
||||
if (record_status == RECORD_FOUND)
|
||||
{
|
||||
log_verbose(LOG_INFO,
|
||||
_("replication slot \"%s\" aleady exists on upstream node %i"),
|
||||
node_record->slot_name,
|
||||
upstream_node_id);
|
||||
}
|
||||
else
|
||||
{
|
||||
PGconn *superuser_conn = NULL;
|
||||
PGconn *privileged_conn = NULL;
|
||||
PQExpBufferData event_details;
|
||||
initPQExpBuffer(&event_details);
|
||||
|
||||
log_notice(_("creating replication slot \"%s\" on upstream node %i"),
|
||||
node_record->slot_name,
|
||||
upstream_node_id);
|
||||
|
||||
get_superuser_connection(&upstream_conn, &superuser_conn, &privileged_conn);
|
||||
|
||||
if (create_replication_slot(privileged_conn, node_record->slot_name, source_server_version_num, &event_details) == false)
|
||||
{
|
||||
log_error("%s", event_details.data);
|
||||
|
||||
create_event_notification(
|
||||
primary_conn,
|
||||
&config_file_options,
|
||||
config_file_options.node_id,
|
||||
"standby_clone",
|
||||
false,
|
||||
event_details.data);
|
||||
|
||||
PQfinish(source_conn);
|
||||
|
||||
if (superuser_conn != NULL)
|
||||
PQfinish(superuser_conn);
|
||||
|
||||
exit(ERR_DB_QUERY);
|
||||
}
|
||||
|
||||
termPQExpBuffer(&event_details);
|
||||
if (superuser_conn != NULL)
|
||||
PQfinish(superuser_conn);
|
||||
|
||||
get_superuser_connection(&source_conn, &superuser_conn, &privileged_conn);
|
||||
|
||||
if (slot_info.active == false)
|
||||
{
|
||||
if (drop_replication_slot(source_conn, node_record->slot_name) == true)
|
||||
{
|
||||
log_notice(_("replication slot \"%s\" deleted on source node"), node_record->slot_name);
|
||||
}
|
||||
else
|
||||
{
|
||||
log_error(_("unable to delete replication slot \"%s\" on source node"), node_record->slot_name);
|
||||
}
|
||||
}
|
||||
/* if replication slot is still active (shouldn't happen), emit a warning*/
|
||||
else
|
||||
{
|
||||
log_warning(_("replication slot \"%s\" is still active on source node"), node_record->slot_name);
|
||||
}
|
||||
|
||||
if (superuser_conn != NULL)
|
||||
PQfinish(superuser_conn);
|
||||
|
||||
|
||||
}
|
||||
|
||||
PQfinish(upstream_conn);
|
||||
}
|
||||
|
||||
return SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user