Initial support for physical replication slots

Todo:
 - if slots specified in repmgr.conf, verify server version
 - store generated slot name in `repl_nodes` table
This commit is contained in:
Ian Barwick
2015-02-02 15:53:53 +09:00
parent 01360c3d39
commit 2ece014952
6 changed files with 106 additions and 13 deletions

View File

@@ -137,6 +137,8 @@ parse_config(const char *config_file, t_configuration_options * options)
options->monitor_interval_secs = atoi(value);
else if (strcmp(name, "retry_promote_interval_secs") == 0)
options->retry_promote_interval_secs = atoi(value);
else if (strcmp(name, "use_replication_slots") == 0)
options->use_replication_slots = atoi(value);
else
log_warning(_("%s/%s: Unknown name/value pair!\n"), name, value);
}

View File

@@ -48,9 +48,10 @@ typedef struct
char logfile[MAXLEN];
int monitor_interval_secs;
int retry_promote_interval_secs;
int use_replication_slots;
} t_configuration_options;
#define T_CONFIGURATION_OPTIONS_INITIALIZER { "", -1, NO_UPSTREAM_NODE, "", MANUAL_FAILOVER, -1, "", "", "", "", "", "", "", -1, -1, -1, "", "", "", 0, 0 }
#define T_CONFIGURATION_OPTIONS_INITIALIZER { "", -1, NO_UPSTREAM_NODE, "", MANUAL_FAILOVER, -1, "", "", "", "", "", "", "", -1, -1, -1, "", "", "", 0, 0, 0 }
void parse_config(const char *config_file, t_configuration_options * options);

View File

@@ -558,6 +558,7 @@ get_master_connection(PGconn *standby_conn, char *cluster,
{
PQclear(res2);
PQclear(res1);
log_debug(_("get_master_connection(): current primary node is %i\n"), node_id);
if(master_id != NULL)
{
@@ -709,3 +710,30 @@ get_repmgr_schema_quoted(PGconn *conn)
return repmgr_schema_quoted;
}
bool
create_replication_slot(PGconn *conn, char *slot_name)
{
char sqlquery[QUERY_STR_LEN];
PGresult *res;
sqlquery_snprintf(sqlquery,
"SELECT * FROM pg_create_physical_replication_slot('%s')",
slot_name);
log_debug(_("create_replication_slot(): Creating slot '%s' on primary\n"), slot_name);
res = PQexec(conn, sqlquery);
if (!res || PQresultStatus(res) != PGRES_TUPLES_OK)
{
log_err(_("Unable to create slot '%s' on the primary node: %s\n"),
slot_name,
PQerrorMessage(conn));
PQclear(res);
return false;
}
PQclear(res);
return true;
}

View File

@@ -53,5 +53,6 @@ int wait_connection_availability(PGconn *conn, long long timeout);
bool cancel_query(PGconn *conn, int timeout);
char *get_repmgr_schema(void);
char *get_repmgr_schema_quoted(PGconn *conn);
bool create_replication_slot(PGconn *conn, char *slot_name);
#endif

View File

@@ -96,6 +96,8 @@ t_configuration_options options = T_CONFIGURATION_OPTIONS_INITIALIZER;
static char *server_mode = NULL;
static char *server_cmd = NULL;
static char repmgr_slot_name[MAXLEN];
int
main(int argc, char **argv)
{
@@ -390,6 +392,12 @@ main(int argc, char **argv)
maxlen_snprintf(repmgr_schema, "%s%s", DEFAULT_REPMGR_SCHEMA_PREFIX,
options.cluster_name);
/* Initialise slot name, if required (9.4 and later) */
if(options.use_replication_slots)
{
maxlen_snprintf(repmgr_slot_name, "repmgr_slot_%i", options.node);
}
switch (action)
{
case MASTER_REGISTER:
@@ -1015,7 +1023,6 @@ do_standby_clone(void)
{
log_err(_("%s: Aborting, remote host %s is not reachable.\n"),
progname, runtime_options.host);
PQfinish(conn);
retval = ERR_BAD_SSH;
goto stop_backup;
}
@@ -1071,14 +1078,38 @@ stop_backup:
log_err(_("Unable to take a base backup of the master server\n"));
log_warning(_("The destination directory (%s) will need to be cleaned up manually\n"),
local_data_directory);
PQfinish(conn);
exit(retval);
}
/* Finally, write the recovery.conf file */
create_recovery_file(local_data_directory);
/*
* If replication slots requested, create appropriate slot on the primary;
* create_recovery_file() will already have written `primary_slot_name` into
* `recovery.conf`
*/
if(options.use_replication_slots)
{
if(create_replication_slot(conn, repmgr_slot_name) == false)
{
log_err(_("Unable to create slot '%s' on the primary node: %s\n"),
repmgr_slot_name,
PQerrorMessage(conn));
PQfinish(conn);
exit(ERR_DB_QUERY);
}
}
log_notice(_("%s base backup of standby complete\n"), progname);
/*
* XXX It might be nice to provide the following options:
* - have repmgr start the daemon automatically
* - provide a custom pg_ctl command
*/
log_notice("HINT: You can now start your postgresql server\n");
if (test_mode)
{
@@ -1087,9 +1118,10 @@ stop_backup:
}
else
{
log_notice("for example : /etc/init.d/postgresql start\n");
log_notice("for example : /etc/init.d/postgresql start\n");
}
PQfinish(conn);
exit(retval);
}
@@ -1709,12 +1741,21 @@ create_recovery_file(const char *data_dir)
/* min_recovery_apply_delay = ... (optional) */
if(*runtime_options.min_recovery_apply_delay)
{
maxlen_snprintf(line, "\nmin_recovery_apply_delay = %s\n",
maxlen_snprintf(line, "min_recovery_apply_delay = %s\n",
runtime_options.min_recovery_apply_delay);
if(write_recovery_file_line(recovery_file, recovery_file_path, line) == false)
return false;
}
/* primary_slot_name = '...' (optional, for 9.4 and later) */
if(options.use_replication_slots)
{
maxlen_snprintf(line, "primary_slot_name = %s\n",
repmgr_slot_name);
if(write_recovery_file_line(recovery_file, recovery_file_path, line) == false)
return false;
}
fclose(recovery_file);
return true;

View File

@@ -1320,11 +1320,11 @@ do_primary_failover(void)
terminate(ERR_DB_QUERY);
}
/* and reconnect to the local database */
my_local_conn = establish_db_connection(local_options.conninfo, true);
/* and reconnect to the local database */
my_local_conn = establish_db_connection(local_options.conninfo, true);
/* update node information to reflect new status */
if(update_node_record_set_primary(my_local_conn, node_info.node_id, failed_primary.node_id) == false)
/* update node information to reflect new status */
if(update_node_record_set_primary(my_local_conn, node_info.node_id, failed_primary.node_id) == false)
{
terminate(ERR_DB_QUERY);
}
@@ -1345,7 +1345,7 @@ do_primary_failover(void)
log_debug(_("follow command is: \"%s\"\n"), local_options.follow_command);
/*
* New Primary need some time to be promoted. The follow command
* The new primary may some time to be promoted. The follow command
* should take care of that.
*/
if (log_type == REPMGR_STDERR && *local_options.logfile)
@@ -1353,6 +1353,28 @@ do_primary_failover(void)
fflush(stderr);
}
/*
* If 9.4 or later, and replication slots in use, we'll need to create a
* slot on the new primary
*/
new_primary_conn = establish_db_connection(best_candidate.conninfo_str, true);
if(local_options.use_replication_slots)
{
// ZZZ store slot name in `repl_nodes`
char repmgr_slot_name[MAXLEN];
maxlen_snprintf(repmgr_slot_name, "repmgr_slot_%i", local_options.node);
if(create_replication_slot(new_primary_conn, repmgr_slot_name) == false)
{
log_err(_("Unable to create slot '%s' on the primary node: %s\n"),
repmgr_slot_name,
PQerrorMessage(new_primary_conn));
PQfinish(new_primary_conn);
terminate(ERR_DB_QUERY);
}
}
r = system(local_options.follow_command);
if (r != 0)
{
@@ -1361,10 +1383,8 @@ do_primary_failover(void)
terminate(ERR_BAD_CONFIG);
}
/* and reconnect to the local database */
my_local_conn = establish_db_connection(local_options.conninfo, true);
new_primary_conn = establish_db_connection(best_candidate.conninfo_str, true);
/* and reconnect to the local database */
my_local_conn = establish_db_connection(local_options.conninfo, true);
/* update node information to reflect new status */
update_node_record_set_upstream(new_primary_conn, node_info.node_id, best_candidate.node_id);