"standby clone": create replication slot

This commit is contained in:
Ian Barwick
2017-05-02 00:38:36 +09:00
parent 84a2bf4375
commit 8f74d1b946
3 changed files with 168 additions and 3 deletions

128
dbutils.c
View File

@@ -10,7 +10,7 @@
#include <sys/time.h>
#include "repmgr.h"
#include "dbutils.h"
#include "catalog/pg_control.h"
@@ -23,6 +23,7 @@ static bool _set_config(PGconn *conn, const char *config_param, const char *sqlq
static int _get_node_record(PGconn *conn, char *sqlquery, t_node_info *node_info);
static void _populate_node_record(PGresult *res, t_node_info *node_info, int row);
static bool _create_update_node_record(PGconn *conn, char *action, t_node_info *node_info);
static bool _create_event_record(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details, t_event_info *event_info);
/* =================== */
/* extension functions */
@@ -1321,7 +1322,7 @@ create_event_record_extended(PGconn *conn, t_configuration_options *options, int
return _create_event_record(conn, options, node_id, event, successful, details, event_info);
}
bool
static bool
_create_event_record(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details, t_event_info *event_info)
{
PQExpBufferData query;
@@ -1546,3 +1547,126 @@ _create_event_record(PGconn *conn, t_configuration_options *options, int node_id
return success;
}
/* ========================== */
/* replication slot functions */
/* ========================== */
bool
create_replication_slot(PGconn *conn, char *slot_name, int server_version_num, PQExpBufferData *error_msg)
{
PQExpBufferData query;
int query_res;
PGresult *res;
t_replication_slot slot_info;
/*
* Check whether slot exists already; if it exists and is active, that
* means another active standby is using it, which creates an error situation;
* if not we can reuse it as-is
*/
query_res = get_slot_record(conn, slot_name, &slot_info);
if (query_res)
{
if (strcmp(slot_info.slot_type, "physical") != 0)
{
appendPQExpBuffer(error_msg,
_("slot '%s' exists and is not a physical slot\n"),
slot_name);
return false;
}
if (slot_info.active == false)
{
// XXX is this a good idea?
log_debug("replication slot '%s' exists but is inactive; reusing",
slot_name);
return true;
}
appendPQExpBuffer(error_msg,
_("slot '%s' already exists as an active slot\n"),
slot_name);
return false;
}
initPQExpBuffer(&query);
/* In 9.6 and later, reserve the LSN straight away */
if (server_version_num >= 90600)
{
appendPQExpBuffer(&query,
"SELECT * FROM pg_catalog.pg_create_physical_replication_slot('%s', TRUE)",
slot_name);
}
else
{
appendPQExpBuffer(&query,
"SELECT * FROM pg_catalog.pg_create_physical_replication_slot('%s')",
slot_name);
}
log_debug(_("create_replication_slot(): Creating slot '%s' on upstream"), slot_name);
log_verbose(LOG_DEBUG, "create_replication_slot():\n%s", query.data);
res = PQexec(conn, query.data);
termPQExpBuffer(&query);
if (!res || PQresultStatus(res) != PGRES_TUPLES_OK)
{
appendPQExpBuffer(error_msg,
_("unable to create slot '%s' on the master node: %s\n"),
slot_name,
PQerrorMessage(conn));
PQclear(res);
return false;
}
PQclear(res);
return true;
}
int
get_slot_record(PGconn *conn, char *slot_name, t_replication_slot *record)
{
PQExpBufferData query;
PGresult *res;
initPQExpBuffer(&query);
appendPQExpBuffer(&query,
"SELECT slot_name, slot_type, active "
" FROM pg_catalog.pg_replication_slots "
" WHERE slot_name = '%s' ",
slot_name);
log_verbose(LOG_DEBUG, "get_slot_record():\n%s", query.data);
res = PQexec(conn, query.data);
if (!res || PQresultStatus(res) != PGRES_TUPLES_OK)
{
log_error(_("unable to query pg_replication_slots:\n %s"),
PQerrorMessage(conn));
PQclear(res);
return -1;
}
if (!PQntuples(res))
{
return 0;
}
strncpy(record->slot_name, PQgetvalue(res, 0, 0), MAXLEN);
strncpy(record->slot_type, PQgetvalue(res, 0, 1), MAXLEN);
record->active = (strcmp(PQgetvalue(res, 0, 2), "t") == 0)
? true
: false;
PQclear(res);
return 1;
}

View File

@@ -178,7 +178,11 @@ bool update_node_record(PGconn *conn, char *repmgr_action, t_node_info *node_in
/* event record functions */
bool create_event_record(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details);
bool create_event_record_extended(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details, t_event_info *event_info);
bool _create_event_record(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details, t_event_info *event_info);
/* replication slot functions */
bool create_replication_slot(PGconn *conn, char *slot_name, int server_version_num, PQExpBufferData *error_msg);
int get_slot_record(PGconn *conn, char *slot_name, t_replication_slot *record);
#endif

View File

@@ -751,6 +751,43 @@ initialise_direct_clone(void)
}
PQclear(res);
/*
* If replication slots requested, create appropriate slot on
* the primary; this must be done before pg_start_backup() is
* issued, either by us or by pg_basebackup.
*
* Replication slots are not supported (and not very useful
* anyway) in Barman mode.
*/
if (config_file_options.use_replication_slots)
{
PQExpBufferData event_details;
initPQExpBuffer(&event_details);
if (create_replication_slot(source_conn, repmgr_slot_name, server_version_num, &event_details) == false)
{
log_error("%s", event_details.data);
create_event_record(primary_conn,
&config_file_options,
config_file_options.node_id,
"standby_clone",
false,
event_details.data);
PQfinish(source_conn);
exit(ERR_DB_QUERY);
}
termPQExpBuffer(&event_details);
log_notice(_("replication slot '%s' created on upstream node (node_id: %i)"),
repmgr_slot_name,
upstream_node_id);
}
}