diff --git a/dbutils.c b/dbutils.c index 9a4294e9..34e4396a 100644 --- a/dbutils.c +++ b/dbutils.c @@ -1545,6 +1545,49 @@ update_node_record_set_master(PGconn *conn, int this_node_id) } +/* + * Update node record following change of status + * (e.g. inactive primary converted to standby) + */ +bool +update_node_record_status(PGconn *conn, int this_node_id, char *type, int upstream_node_id, bool active) +{ + PQExpBufferData query; + PGresult *res; + + initPQExpBuffer(&query); + + appendPQExpBuffer(&query, + " UPDATE repmgr.nodes " + " SET type = '%s', " + " upstream_node_id = %i, " + " active = %s " + " WHERE node_id = %i ", + type, + upstream_node_id, + active ? "TRUE" : "FALSE", + this_node_id); + + log_verbose(LOG_DEBUG, "update_node_record_status():\n %s", query.data); + + res = PQexec(conn, query.data); + termPQExpBuffer(&query); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + log_error(_("unable to update node record:\n %s"), + PQerrorMessage(conn)); + PQclear(res); + + return false; + } + + PQclear(res); + + return true; +} + + bool delete_node_record(PGconn *conn, int node) @@ -1906,7 +1949,7 @@ create_replication_slot(PGconn *conn, char *slot_name, int server_version_num, P slot_name); } - log_debug(_("create_replication_slot(): Creating slot '%s' on upstream"), slot_name); + log_debug(_("create_replication_slot(): creating slot '%s' on upstream"), slot_name); log_verbose(LOG_DEBUG, "create_replication_slot():\n%s", query.data); res = PQexec(conn, query.data); @@ -1915,7 +1958,7 @@ create_replication_slot(PGconn *conn, char *slot_name, int server_version_num, P if (!res || PQresultStatus(res) != PGRES_TUPLES_OK) { appendPQExpBuffer(error_msg, - _("unable to create slot '%s' on the master node: %s\n"), + _("unable to create slot '%s' on the upstream node: %s\n"), slot_name, PQerrorMessage(conn)); PQclear(res); diff --git a/dbutils.h b/dbutils.h index 123c6c66..d671e0a0 100644 --- a/dbutils.h +++ b/dbutils.h @@ -208,6 +208,7 @@ bool update_node_record(PGconn *conn, char *repmgr_action, t_node_info *node_in bool delete_node_record(PGconn *conn, int node); bool update_node_record_set_master(PGconn *conn, int this_node_id); +bool update_node_record_status(PGconn *conn, int this_node_id, char *type, int upstream_node_id, bool active); /* event record functions */ bool create_event_record(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details); diff --git a/repmgr-action-standby.c b/repmgr-action-standby.c index 25c2b0d2..45b9ae52 100644 --- a/repmgr-action-standby.c +++ b/repmgr-action-standby.c @@ -73,6 +73,8 @@ static void copy_configuration_files(void); static int run_basebackup(void); static int run_file_backup(void); +static void drop_replication_slot_if_exists(PGconn *conn, int node_id, char *slot_name); + static void tablespace_data_append(TablespaceDataList *list, const char *name, const char *oid, const char *location); static void get_barman_property(char *dst, char *name, char *local_repmgr_directory); @@ -1225,7 +1227,20 @@ void do_standby_follow(void) { PGconn *local_conn; + t_node_info local_node_record = T_NODE_INFO_INITIALIZER; + int original_upstream_node_id = UNKNOWN_NODE_ID; + + PGconn *master_conn = NULL; + int master_id = UNKNOWN_NODE_ID; + t_node_info master_node_record = T_NODE_INFO_INITIALIZER; + char data_dir[MAXPGPATH]; + t_conninfo_param_list recovery_conninfo; + char *errmsg = NULL; + int query_result; + char restart_command[MAXLEN]; + int r; + log_verbose(LOG_DEBUG, "do_standby_follow()"); @@ -1237,9 +1252,6 @@ do_standby_follow(void) if (runtime_options.host_param_provided == false) { bool success; - PGconn *master_conn = NULL; - char master_conninfo[MAXLEN]; - int master_id = UNKNOWN_NODE_ID; int timer; local_conn = establish_db_connection(config_file_options.conninfo, true); @@ -1268,7 +1280,7 @@ do_standby_follow(void) { master_conn = get_master_connection(local_conn, &master_id, - (char *) &master_conninfo); + NULL); if (PQstatus(master_conn) == CONNECTION_OK || runtime_options.wait == false) { @@ -1283,8 +1295,250 @@ do_standby_follow(void) exit(ERR_BAD_CONFIG); } - puts("OK"); + check_master_standby_version_match(local_conn, master_conn); + + PQfinish(local_conn); } + /* local data directory and primary server info explictly provided - + * attempt to connect to that + * + * XXX --wait option won't be effective here + */ + else + { + master_conn = establish_db_connection_by_params( + (const char**)source_conninfo.keywords, + (const char**)source_conninfo.values, + true); + + master_id = get_master_node_id(master_conn); + strncpy(data_dir, runtime_options.data_dir, MAXPGPATH); + } + + if (get_recovery_type(master_conn) != RECTYPE_MASTER) + { + log_error(_("the node to follow is not a master")); + // XXX log detail + PQfinish(master_conn); + exit(ERR_BAD_CONFIG); + } + + + + /* + * If 9.4 or later, and replication slots in use, we'll need to create a + * slot on the new master + */ + + if (config_file_options.use_replication_slots) + { + int server_version_num = get_server_version(master_conn, NULL); + + PQExpBufferData event_details; + initPQExpBuffer(&event_details); + + if (create_replication_slot(master_conn, repmgr_slot_name, server_version_num, &event_details) == false) + { + log_error("%s", event_details.data); + + create_event_record(master_conn, + &config_file_options, + config_file_options.node_id, + "standby_follow", + false, + event_details.data); + + PQfinish(master_conn); + exit(ERR_DB_QUERY); + } + + termPQExpBuffer(&event_details); + } + + get_node_record(master_conn, master_id, &master_node_record); + + /* Initialise connection parameters to write as `primary_conninfo` */ + initialize_conninfo_params(&recovery_conninfo, false); + + /* We ignore any application_name set in the master's conninfo */ + parse_conninfo_string(master_node_record.conninfo, &recovery_conninfo, errmsg, true); + + + /* Set the default application name to this node's name */ + param_set(&recovery_conninfo, "application_name", config_file_options.node_name); + + /* Set the replication user from the master node record */ + param_set(&recovery_conninfo, "user", master_node_record.repluser); + + /* + * Fetch our node record so we can write application_name, if set, + * and to get the upstream node ID, which we'll need to know if + * replication slots are in use and we want to delete the old slot. + */ + query_result = get_node_record(master_conn, + config_file_options.node_id, + &local_node_record); + + if (query_result != 1) + { + /* this shouldn't happen, but if it does we'll plough on regardless */ + log_warning(_("unable to retrieve record for node %i"), + config_file_options.node_id); + } + else + { + t_conninfo_param_list local_node_conninfo; + bool parse_success; + + initialize_conninfo_params(&local_node_conninfo, false); + + parse_success = parse_conninfo_string(local_node_record.conninfo, &local_node_conninfo, errmsg, false); + + if (parse_success == false) + { + /* this shouldn't happen, but if it does we'll plough on regardless */ + log_warning(_("unable to parse conninfo string \"%s\":\n %s"), + local_node_record.conninfo, errmsg); + } + else + { + char *application_name = param_get(&local_node_conninfo, "application_name"); + + if (application_name != NULL && strlen(application_name)) + param_set(&recovery_conninfo, "application_name", application_name); + } + + /* + * store the original upstream node id so we can delete the replication slot, + * if exists + */ + if (local_node_record.upstream_node_id != UNKNOWN_NODE_ID) + { + original_upstream_node_id = local_node_record.upstream_node_id; + } + else + { + original_upstream_node_id = master_id; + } + } + { + char *list = param_list_to_string(&recovery_conninfo); + printf("recovery: %s\n", list); + } + + log_info(_("changing standby's master to node %i"), master_id); + + if (!create_recovery_file(data_dir, &recovery_conninfo)) + { + PQfinish(master_conn); + exit(ERR_BAD_CONFIG); + } + + /* restart the service */ + + // XXX here check if service is running!! if not, start + // ensure that problem with pg_ctl output is caught here + if (*config_file_options.service_restart_command) + { + maxlen_snprintf(restart_command, "%s", config_file_options.service_restart_command); + } + else + { + maxlen_snprintf(restart_command, + "%s %s -w -D %s -m fast restart", + make_pg_path("pg_ctl"), + config_file_options.pg_ctl_options, + data_dir); + } + + + log_notice(_("restarting server using '%s'"), + restart_command); + + r = system(restart_command); + if (r != 0) + { + log_error(_("unable to restart server")); + PQfinish(master_conn); + exit(ERR_NO_RESTART); + } + + + /* + * If replication slots are in use, and an inactive one for this node + * exists on the former upstream, drop it. + * + * XXX check if former upstream is current master? + */ + + if (config_file_options.use_replication_slots && runtime_options.host_param_provided == false && original_upstream_node_id != UNKNOWN_NODE_ID) + { + t_node_info upstream_node_record = T_NODE_INFO_INITIALIZER; + int upstream_query_result; + + log_verbose(LOG_INFO, "attempting to remove replication slot from old upstream node %i", + original_upstream_node_id); + + /* XXX should we poll for server restart? */ + local_conn = establish_db_connection(config_file_options.conninfo, true); + + upstream_query_result = get_node_record(local_conn, + original_upstream_node_id, + &upstream_node_record); + + PQfinish(local_conn); + + + if (upstream_query_result != 1) + { + log_warning(_("unable to retrieve node record for old upstream node %i"), + original_upstream_node_id); + } + else + { + PGconn *old_upstream_conn = establish_db_connection_quiet(upstream_node_record.conninfo); + + if (PQstatus(old_upstream_conn) != CONNECTION_OK) + { + log_info(_("unable to connect to old upstream node %i to remove replication slot"), + original_upstream_node_id); + log_hint(_("if reusing this node, you should manually remove any inactive replication slots")); + } + else + { + drop_replication_slot_if_exists(old_upstream_conn, + original_upstream_node_id, + local_node_record.slot_name); + } + } + } + + /* + * It's possible this node was an inactive primary - update the + * relevant fields to ensure it's marked as an active standby + */ + if (update_node_record_status(master_conn, + config_file_options.node_id, + "standby", + master_id, + true) == false) + { + log_error(_("unable to update upstream node")); + PQfinish(master_conn); + + exit(ERR_BAD_CONFIG); + } + + log_notice(_("STANDBY FOLLOW successful")); + + create_event_record(master_conn, + &config_file_options, + config_file_options.node_id, + "standby_follow", + true, + NULL); + + PQfinish(master_conn); return; } @@ -2748,3 +3002,33 @@ check_recovery_type(PGconn *conn) } } } + + +static void +drop_replication_slot_if_exists(PGconn *conn, int node_id, char *slot_name) +{ + t_replication_slot slot_info; + int query_res; + + query_res = get_slot_record(conn,slot_name, &slot_info); + + if (query_res) + { + if (slot_info.active == false) + { + if (drop_replication_slot(conn, slot_name) == true) + { + log_notice(_("replication slot \"%s\" deleted on node %i"), slot_name, node_id); + } + else + { + log_error(_("unable to delete replication slot \"%s\" on node %i"), slot_name, node_id); + } + } + /* if active replication slot exists, call Houston as we have a problem */ + else + { + log_warning(_("replication slot \"%s\" is still active on node %i"), slot_name, node_id); + } + } +}