From 2ece01495239a222a7fafd5a498e4014a267fdff Mon Sep 17 00:00:00 2001 From: Ian Barwick Date: Mon, 2 Feb 2015 15:53:53 +0900 Subject: [PATCH] Initial support for physical replication slots Todo: - if slots specified in repmgr.conf, verify server version - store generated slot name in `repl_nodes` table --- config.c | 2 ++ config.h | 3 ++- dbutils.c | 28 ++++++++++++++++++++++++++++ dbutils.h | 1 + repmgr.c | 47 ++++++++++++++++++++++++++++++++++++++++++++--- repmgrd.c | 38 +++++++++++++++++++++++++++++--------- 6 files changed, 106 insertions(+), 13 deletions(-) diff --git a/config.c b/config.c index b94fb93d..8ec50e17 100644 --- a/config.c +++ b/config.c @@ -137,6 +137,8 @@ parse_config(const char *config_file, t_configuration_options * options) options->monitor_interval_secs = atoi(value); else if (strcmp(name, "retry_promote_interval_secs") == 0) options->retry_promote_interval_secs = atoi(value); + else if (strcmp(name, "use_replication_slots") == 0) + options->use_replication_slots = atoi(value); else log_warning(_("%s/%s: Unknown name/value pair!\n"), name, value); } diff --git a/config.h b/config.h index 035ca7f2..e608b574 100644 --- a/config.h +++ b/config.h @@ -48,9 +48,10 @@ typedef struct char logfile[MAXLEN]; int monitor_interval_secs; int retry_promote_interval_secs; + int use_replication_slots; } t_configuration_options; -#define T_CONFIGURATION_OPTIONS_INITIALIZER { "", -1, NO_UPSTREAM_NODE, "", MANUAL_FAILOVER, -1, "", "", "", "", "", "", "", -1, -1, -1, "", "", "", 0, 0 } +#define T_CONFIGURATION_OPTIONS_INITIALIZER { "", -1, NO_UPSTREAM_NODE, "", MANUAL_FAILOVER, -1, "", "", "", "", "", "", "", -1, -1, -1, "", "", "", 0, 0, 0 } void parse_config(const char *config_file, t_configuration_options * options); diff --git a/dbutils.c b/dbutils.c index eae8441a..a961a6f6 100644 --- a/dbutils.c +++ b/dbutils.c @@ -558,6 +558,7 @@ get_master_connection(PGconn *standby_conn, char *cluster, { PQclear(res2); PQclear(res1); + log_debug(_("get_master_connection(): current primary node is %i\n"), node_id); if(master_id != NULL) { @@ -709,3 +710,30 @@ get_repmgr_schema_quoted(PGconn *conn) return repmgr_schema_quoted; } + + +bool +create_replication_slot(PGconn *conn, char *slot_name) +{ + char sqlquery[QUERY_STR_LEN]; + PGresult *res; + + sqlquery_snprintf(sqlquery, + "SELECT * FROM pg_create_physical_replication_slot('%s')", + slot_name); + + log_debug(_("create_replication_slot(): Creating slot '%s' on primary\n"), slot_name); + + res = PQexec(conn, sqlquery); + if (!res || PQresultStatus(res) != PGRES_TUPLES_OK) + { + log_err(_("Unable to create slot '%s' on the primary node: %s\n"), + slot_name, + PQerrorMessage(conn)); + PQclear(res); + return false; + } + + PQclear(res); + return true; +} diff --git a/dbutils.h b/dbutils.h index dfd4115d..4b7d09de 100644 --- a/dbutils.h +++ b/dbutils.h @@ -53,5 +53,6 @@ int wait_connection_availability(PGconn *conn, long long timeout); bool cancel_query(PGconn *conn, int timeout); char *get_repmgr_schema(void); char *get_repmgr_schema_quoted(PGconn *conn); +bool create_replication_slot(PGconn *conn, char *slot_name); #endif diff --git a/repmgr.c b/repmgr.c index 480ecd3a..461765c4 100644 --- a/repmgr.c +++ b/repmgr.c @@ -96,6 +96,8 @@ t_configuration_options options = T_CONFIGURATION_OPTIONS_INITIALIZER; static char *server_mode = NULL; static char *server_cmd = NULL; +static char repmgr_slot_name[MAXLEN]; + int main(int argc, char **argv) { @@ -390,6 +392,12 @@ main(int argc, char **argv) maxlen_snprintf(repmgr_schema, "%s%s", DEFAULT_REPMGR_SCHEMA_PREFIX, options.cluster_name); + /* Initialise slot name, if required (9.4 and later) */ + if(options.use_replication_slots) + { + maxlen_snprintf(repmgr_slot_name, "repmgr_slot_%i", options.node); + } + switch (action) { case MASTER_REGISTER: @@ -1015,7 +1023,6 @@ do_standby_clone(void) { log_err(_("%s: Aborting, remote host %s is not reachable.\n"), progname, runtime_options.host); - PQfinish(conn); retval = ERR_BAD_SSH; goto stop_backup; } @@ -1071,14 +1078,38 @@ stop_backup: log_err(_("Unable to take a base backup of the master server\n")); log_warning(_("The destination directory (%s) will need to be cleaned up manually\n"), local_data_directory); + PQfinish(conn); exit(retval); } /* Finally, write the recovery.conf file */ create_recovery_file(local_data_directory); + /* + * If replication slots requested, create appropriate slot on the primary; + * create_recovery_file() will already have written `primary_slot_name` into + * `recovery.conf` + */ + if(options.use_replication_slots) + { + if(create_replication_slot(conn, repmgr_slot_name) == false) + { + log_err(_("Unable to create slot '%s' on the primary node: %s\n"), + repmgr_slot_name, + PQerrorMessage(conn)); + PQfinish(conn); + exit(ERR_DB_QUERY); + } + } + log_notice(_("%s base backup of standby complete\n"), progname); + /* + * XXX It might be nice to provide the following options: + * - have repmgr start the daemon automatically + * - provide a custom pg_ctl command + */ + log_notice("HINT: You can now start your postgresql server\n"); if (test_mode) { @@ -1087,9 +1118,10 @@ stop_backup: } else { - log_notice("for example : /etc/init.d/postgresql start\n"); + log_notice("for example : /etc/init.d/postgresql start\n"); } + PQfinish(conn); exit(retval); } @@ -1709,12 +1741,21 @@ create_recovery_file(const char *data_dir) /* min_recovery_apply_delay = ... (optional) */ if(*runtime_options.min_recovery_apply_delay) { - maxlen_snprintf(line, "\nmin_recovery_apply_delay = %s\n", + maxlen_snprintf(line, "min_recovery_apply_delay = %s\n", runtime_options.min_recovery_apply_delay); if(write_recovery_file_line(recovery_file, recovery_file_path, line) == false) return false; } + /* primary_slot_name = '...' (optional, for 9.4 and later) */ + if(options.use_replication_slots) + { + maxlen_snprintf(line, "primary_slot_name = %s\n", + repmgr_slot_name); + if(write_recovery_file_line(recovery_file, recovery_file_path, line) == false) + return false; + } + fclose(recovery_file); return true; diff --git a/repmgrd.c b/repmgrd.c index 86fa7751..377e9baa 100644 --- a/repmgrd.c +++ b/repmgrd.c @@ -1320,11 +1320,11 @@ do_primary_failover(void) terminate(ERR_DB_QUERY); } - /* and reconnect to the local database */ - my_local_conn = establish_db_connection(local_options.conninfo, true); + /* and reconnect to the local database */ + my_local_conn = establish_db_connection(local_options.conninfo, true); - /* update node information to reflect new status */ - if(update_node_record_set_primary(my_local_conn, node_info.node_id, failed_primary.node_id) == false) + /* update node information to reflect new status */ + if(update_node_record_set_primary(my_local_conn, node_info.node_id, failed_primary.node_id) == false) { terminate(ERR_DB_QUERY); } @@ -1345,7 +1345,7 @@ do_primary_failover(void) log_debug(_("follow command is: \"%s\"\n"), local_options.follow_command); /* - * New Primary need some time to be promoted. The follow command + * The new primary may some time to be promoted. The follow command * should take care of that. */ if (log_type == REPMGR_STDERR && *local_options.logfile) @@ -1353,6 +1353,28 @@ do_primary_failover(void) fflush(stderr); } + /* + * If 9.4 or later, and replication slots in use, we'll need to create a + * slot on the new primary + */ + new_primary_conn = establish_db_connection(best_candidate.conninfo_str, true); + + if(local_options.use_replication_slots) + { + // ZZZ store slot name in `repl_nodes` + char repmgr_slot_name[MAXLEN]; + maxlen_snprintf(repmgr_slot_name, "repmgr_slot_%i", local_options.node); + + if(create_replication_slot(new_primary_conn, repmgr_slot_name) == false) + { + log_err(_("Unable to create slot '%s' on the primary node: %s\n"), + repmgr_slot_name, + PQerrorMessage(new_primary_conn)); + PQfinish(new_primary_conn); + terminate(ERR_DB_QUERY); + } + } + r = system(local_options.follow_command); if (r != 0) { @@ -1361,10 +1383,8 @@ do_primary_failover(void) terminate(ERR_BAD_CONFIG); } - /* and reconnect to the local database */ - my_local_conn = establish_db_connection(local_options.conninfo, true); - - new_primary_conn = establish_db_connection(best_candidate.conninfo_str, true); + /* and reconnect to the local database */ + my_local_conn = establish_db_connection(local_options.conninfo, true); /* update node information to reflect new status */ update_node_record_set_upstream(new_primary_conn, node_info.node_id, best_candidate.node_id);