"standby clone": improve handling when designated upstream doesn't yet exist

This situation can occur in provisioning environments, where a node's
upstream may not exist at the point it's cloned. If replication slots
are in use, we'll need to make sure no attempt is made to create
the replication slot on the designated upstream, as that will end in
tears. We assume the user will be prepared to complete this step manually.
This commit is contained in:
Ian Barwick
2017-09-07 12:35:27 +09:00
parent 3787dd3795
commit b6a27b975d

View File

@@ -251,6 +251,7 @@ do_standby_clone(void)
if (recovery_conninfo_str[0] == '\0')
{
log_error(_("unable to determine a connection string to use as \"primary_conninfo\""));
log_hint(_("use \"--upstream-conninfo\" to explicitly provide a value for \"primary_conninfo\""));
if (PQstatus(source_conn) == CONNECTION_OK)
PQfinish(source_conn);
exit(ERR_BAD_CONFIG);
@@ -2737,7 +2738,7 @@ do_standby_switchover(void)
{
log_error(_("rejoin failed %i"), r);
if (strlen(command_output.data > 2))
if (strlen(command_output.data) > 2)
log_detail("%s", command_output.data);
create_event_record(local_conn,
@@ -2880,7 +2881,7 @@ check_source_server()
/* Attempt to connect to the upstream server to verify its configuration */
log_verbose(LOG_DEBUG, "check_source_server()");
log_info(_("connecting to upstream node"));
log_info(_("connecting to source node"));
source_conn = establish_db_connection_by_params(&source_conninfo, false);
@@ -2903,9 +2904,6 @@ check_source_server()
* provided upstream connection
*/
/* Verify that upstream node is a supported server version */
log_verbose(LOG_INFO, _("connected to source node, checking its state"));
source_server_version_num = check_server_version(source_conn, "primary", true, NULL);
if (get_cluster_size(source_conn, cluster_size) == false)
@@ -3496,7 +3494,7 @@ check_upstream_config(PGconn *conn, int server_version_num, t_node_info *node_in
}
}
log_verbose(LOG_INFO, "sufficient walsenders available on upstream node (%i required)",
log_verbose(LOG_INFO, "sufficient walsenders available on source node (%i required)",
min_replication_connections);
}
@@ -3639,7 +3637,7 @@ initialise_direct_clone(t_node_info *node_record)
termPQExpBuffer(&event_details);
log_verbose(LOG_INFO,
_("replication slot \"%s\" created on upstream node"),
_("replication slot \"%s\" created on source node"),
node_record->slot_name);
if (superuser_conn != NULL)
@@ -3822,96 +3820,109 @@ run_basebackup(t_node_info *node_record)
*/
if (config_file_options.use_replication_slots && upstream_node_id != UNKNOWN_NODE_ID)
{
PGconn *upstream_conn = NULL;
PGconn *superuser_conn = NULL;
PGconn *privileged_conn = NULL;
t_node_info upstream_node_record = T_NODE_INFO_INITIALIZER;
t_replication_slot slot_info = T_REPLICATION_SLOT_INITIALIZER;
RecordStatus record_status = RECORD_NOT_FOUND;
//PQExpBufferData event_details;
//initPQExpBuffer(&event_details);
record_status = get_node_record(source_conn, upstream_node_id, &upstream_node_record);
//get_superuser_connection(&source_conn, &superuser_conn, &privileged_conn);
/* existence of node record previously established in check_source_server() */
get_node_record(source_conn, upstream_node_id, &upstream_node_record);
upstream_conn = establish_db_connection(upstream_node_record.conninfo, true);
record_status = get_slot_record(upstream_conn, node_record->slot_name, &slot_info);
if (record_status == RECORD_FOUND)
/*
* if there's no upstream record, there's no point in trying to create a replication
* slot on the designated upstream, as the assumption is it won't exist at this point.
*/
if (record_status != RECORD_FOUND)
{
log_verbose(LOG_INFO,
_("replication slot \"%s\" aleady exists on upstream node %i"),
node_record->slot_name,
log_warning(_("no record exists for designated upstream node %i"),
upstream_node_id);
log_hint(_("you'll need to create the replication slot (\"%s\") manually"),
node_record->slot_name);
}
else
{
PGconn *superuser_conn = NULL;
PGconn *privileged_conn = NULL;
PQExpBufferData event_details;
initPQExpBuffer(&event_details);
PGconn *upstream_conn = NULL;
log_notice(_("creating replication slot \"%s\" on upstream node %i"),
node_record->slot_name,
upstream_node_id);
upstream_conn = establish_db_connection(upstream_node_record.conninfo, true);
get_superuser_connection(&upstream_conn, &superuser_conn, &privileged_conn);
record_status = get_slot_record(upstream_conn, node_record->slot_name, &slot_info);
if (create_replication_slot(privileged_conn, node_record->slot_name, source_server_version_num, &event_details) == false)
if (record_status == RECORD_FOUND)
{
log_verbose(LOG_INFO,
_("replication slot \"%s\" aleady exists on upstream node %i"),
node_record->slot_name,
upstream_node_id);
}
else
{
log_error("%s", event_details.data);
create_event_notification(
primary_conn,
&config_file_options,
config_file_options.node_id,
"standby_clone",
false,
event_details.data);
PQExpBufferData event_details;
PQfinish(source_conn);
log_notice(_("creating replication slot \"%s\" on upstream node %i"),
node_record->slot_name,
upstream_node_id);
get_superuser_connection(&upstream_conn, &superuser_conn, &privileged_conn);
initPQExpBuffer(&event_details);
if (create_replication_slot(privileged_conn, node_record->slot_name, source_server_version_num, &event_details) == false)
{
log_error("%s", event_details.data);
create_event_notification(
primary_conn,
&config_file_options,
config_file_options.node_id,
"standby_clone",
false,
event_details.data);
PQfinish(source_conn);
if (superuser_conn != NULL)
PQfinish(superuser_conn);
exit(ERR_DB_QUERY);
}
if (superuser_conn != NULL)
PQfinish(superuser_conn);
exit(ERR_DB_QUERY);
termPQExpBuffer(&event_details);
}
termPQExpBuffer(&event_details);
if (superuser_conn != NULL)
PQfinish(superuser_conn);
get_superuser_connection(&source_conn, &superuser_conn, &privileged_conn);
if (slot_info.active == false)
{
if (drop_replication_slot(source_conn, node_record->slot_name) == true)
{
log_notice(_("replication slot \"%s\" deleted on source node"), node_record->slot_name);
}
else
{
log_error(_("unable to delete replication slot \"%s\" on source node"), node_record->slot_name);
}
}
/* if replication slot is still active (shouldn't happen), emit a warning*/
else
{
log_warning(_("replication slot \"%s\" is still active on source node"), node_record->slot_name);
}
if (superuser_conn != NULL)
PQfinish(superuser_conn);
PQfinish(upstream_conn);
}
PQfinish(upstream_conn);
get_superuser_connection(&source_conn, &superuser_conn, &privileged_conn);
if (slot_info.active == false)
{
if (drop_replication_slot(source_conn, node_record->slot_name) == true)
{
log_notice(_("replication slot \"%s\" deleted on source node"), node_record->slot_name);
}
else
{
log_error(_("unable to delete replication slot \"%s\" on source node"), node_record->slot_name);
}
}
/* if replication slot is still active (shouldn't happen), emit a warning*/
else
{
log_warning(_("replication slot \"%s\" is still active on source node"), node_record->slot_name);
}
if (superuser_conn != NULL)
PQfinish(superuser_conn);
}
return SUCCESS;
}