From a28bbd68eb7f46250a81e505df3f802913e44d32 Mon Sep 17 00:00:00 2001 From: Ian Barwick Date: Wed, 6 Sep 2017 12:16:02 +0900 Subject: [PATCH] "standby clone": improve replication slots handling Ensure replication slot is created on the upstream node and deleted from the source node, if upstream node and source nodes differ. --- dbutils.c | 2 + repmgr-action-standby.c | 180 +++++++++++++++++++++++++++++++--------- 2 files changed, 145 insertions(+), 37 deletions(-) diff --git a/dbutils.c b/dbutils.c index d9510a46..85988664 100644 --- a/dbutils.c +++ b/dbutils.c @@ -3184,6 +3184,8 @@ drop_replication_slot(PGconn *conn, char *slot_name) log_verbose(LOG_DEBUG, "replication slot \"%s\" successfully dropped", slot_name); + PQclear(res); + return true; } diff --git a/repmgr-action-standby.c b/repmgr-action-standby.c index 9c45975e..89004453 100644 --- a/repmgr-action-standby.c +++ b/repmgr-action-standby.c @@ -2804,7 +2804,6 @@ check_source_server() * provided upstream connection */ - /* Verify that upstream node is a supported server version */ log_verbose(LOG_INFO, _("connected to source node, checking its state")); @@ -2813,7 +2812,6 @@ check_source_server() if (get_cluster_size(source_conn, cluster_size) == false) exit(ERR_DB_QUERY); - log_info(_("successfully connected to source node")); log_detail(_("current installation size is %s"), cluster_size); @@ -2825,7 +2823,11 @@ check_source_server() { primary_conn = get_primary_connection(source_conn, NULL, NULL); - // XXX check this worked? + if (PQstatus(primary_conn) != CONNECTION_OK) + { + log_error(_("unable to connect to primary node")); + exit(ERR_BAD_CONFIG); + } } else { @@ -2928,31 +2930,35 @@ check_source_server() else upstream_node_id = runtime_options.upstream_node_id; - record_status = get_node_record(source_conn, upstream_node_id, &node_record); - if (record_status == RECORD_FOUND) - { - upstream_conninfo_found = true; - strncpy(recovery_conninfo_str, node_record.conninfo, MAXLEN); - strncpy(upstream_repluser, node_record.repluser, NAMEDATALEN); - } + log_debug("upstream_node_id determined as %i", upstream_node_id); - /* - * check that there's no existing node record with the same name but - * different ID - */ - record_status = get_node_record_by_name(source_conn, config_file_options.node_name, &node_record); - - if (record_status == RECORD_FOUND && node_record.node_id != config_file_options.node_id) + if (upstream_node_id != UNKNOWN_NODE_ID) { - log_error(_("another node (node_id: %i) already exists with node_name \"%s\""), - node_record.node_id, - config_file_options.node_name); - PQfinish(source_conn); - exit(ERR_BAD_CONFIG); + record_status = get_node_record(source_conn, upstream_node_id, &node_record); + if (record_status == RECORD_FOUND) + { + upstream_conninfo_found = true; + strncpy(recovery_conninfo_str, node_record.conninfo, MAXLEN); + strncpy(upstream_repluser, node_record.repluser, NAMEDATALEN); + } + + /* + * check that there's no existing node record with the same name but + * different ID + */ + record_status = get_node_record_by_name(source_conn, config_file_options.node_name, &node_record); + + if (record_status == RECORD_FOUND && node_record.node_id != config_file_options.node_id) + { + log_error(_("another node (node_id: %i) already exists with node_name \"%s\""), + node_record.node_id, + config_file_options.node_name); + PQfinish(source_conn); + exit(ERR_BAD_CONFIG); + } } check_upstream_config(source_conn, source_server_version_num, &node_record, true); - } @@ -3412,10 +3418,6 @@ check_upstream_config(PGconn *conn, int server_version_num, t_node_info *node_in static void initialise_direct_clone(t_node_info *node_record) { - PGconn *superuser_conn = NULL; - PGconn *privileged_conn = NULL; - bool success = false; - /* * Check the destination data directory can be used * (in Barman mode, this directory will already have been created) @@ -3443,6 +3445,7 @@ initialise_direct_clone(t_node_info *node_record) TablespaceListCell *cell = false; KeyValueList not_found = { NULL, NULL }; int total = 0, matched = 0; + bool success = false; for (cell = config_file_options.tablespace_mapping.head; cell; cell = cell->next) { @@ -3492,12 +3495,14 @@ initialise_direct_clone(t_node_info *node_record) } } - - get_superuser_connection(&source_conn, &superuser_conn, &privileged_conn); - /* * If replication slots requested, create appropriate slot on the - * primary; this must be done before pg_basebackup is called. + * source node; this must be done before pg_basebackup is called. + * + * Note: if the source node is different to the specified upstream + * node, we'll need to drop the slot and recreate it on the upstream. + * + * TODO: skip this for Pg10, and ensure temp slot option used * * Replication slots are not supported (and not very useful * anyway) in Barman mode. @@ -3505,9 +3510,13 @@ initialise_direct_clone(t_node_info *node_record) if (config_file_options.use_replication_slots == true) { + PGconn *superuser_conn = NULL; + PGconn *privileged_conn = NULL; PQExpBufferData event_details; initPQExpBuffer(&event_details); + get_superuser_connection(&source_conn, &superuser_conn, &privileged_conn); + if (create_replication_slot(privileged_conn, node_record->slot_name, source_server_version_num, &event_details) == false) { log_error("%s", event_details.data); @@ -3530,13 +3539,13 @@ initialise_direct_clone(t_node_info *node_record) termPQExpBuffer(&event_details); - log_notice(_("replication slot \"%s\" created on upstream node (node_id: %i)"), - node_record->slot_name, - upstream_node_id); - } + log_verbose(LOG_INFO, + _("replication slot \"%s\" created on upstream node"), + node_record->slot_name); - if (superuser_conn != NULL) - PQfinish(superuser_conn); + if (superuser_conn != NULL) + PQfinish(superuser_conn); + } return; } @@ -3707,6 +3716,103 @@ run_basebackup(t_node_info *node_record) if (r != 0) return ERR_BAD_BASEBACKUP; + /* + * If replication slots in use, check the created slot is on the correct node; + * the slot will initially get created on the source node, and will need to be + * dropped and recreated on the actual upstream node if these differ. + */ + if (config_file_options.use_replication_slots && upstream_node_id != UNKNOWN_NODE_ID) + { + PGconn *upstream_conn = NULL; + + + t_node_info upstream_node_record = T_NODE_INFO_INITIALIZER; + t_replication_slot slot_info = T_REPLICATION_SLOT_INITIALIZER; + RecordStatus record_status = RECORD_NOT_FOUND; + + //PQExpBufferData event_details; + //initPQExpBuffer(&event_details); + + //get_superuser_connection(&source_conn, &superuser_conn, &privileged_conn); + + /* existence of node record previously established in check_source_server() */ + get_node_record(source_conn, upstream_node_id, &upstream_node_record); + upstream_conn = establish_db_connection(upstream_node_record.conninfo, true); + + record_status = get_slot_record(upstream_conn, node_record->slot_name, &slot_info); + + if (record_status == RECORD_FOUND) + { + log_verbose(LOG_INFO, + _("replication slot \"%s\" aleady exists on upstream node %i"), + node_record->slot_name, + upstream_node_id); + } + else + { + PGconn *superuser_conn = NULL; + PGconn *privileged_conn = NULL; + PQExpBufferData event_details; + initPQExpBuffer(&event_details); + + log_notice(_("creating replication slot \"%s\" on upstream node %i"), + node_record->slot_name, + upstream_node_id); + + get_superuser_connection(&upstream_conn, &superuser_conn, &privileged_conn); + + if (create_replication_slot(privileged_conn, node_record->slot_name, source_server_version_num, &event_details) == false) + { + log_error("%s", event_details.data); + + create_event_notification( + primary_conn, + &config_file_options, + config_file_options.node_id, + "standby_clone", + false, + event_details.data); + + PQfinish(source_conn); + + if (superuser_conn != NULL) + PQfinish(superuser_conn); + + exit(ERR_DB_QUERY); + } + + termPQExpBuffer(&event_details); + if (superuser_conn != NULL) + PQfinish(superuser_conn); + + get_superuser_connection(&source_conn, &superuser_conn, &privileged_conn); + + if (slot_info.active == false) + { + if (drop_replication_slot(source_conn, node_record->slot_name) == true) + { + log_notice(_("replication slot \"%s\" deleted on source node"), node_record->slot_name); + } + else + { + log_error(_("unable to delete replication slot \"%s\" on source node"), node_record->slot_name); + } + } + /* if replication slot is still active (shouldn't happen), emit a warning*/ + else + { + log_warning(_("replication slot \"%s\" is still active on source node"), node_record->slot_name); + } + + if (superuser_conn != NULL) + PQfinish(superuser_conn); + + + } + + PQfinish(upstream_conn); + } + return SUCCESS; }