From 8f74d1b94630c9c39f9e31556c0046cc0da1eb9c Mon Sep 17 00:00:00 2001 From: Ian Barwick Date: Tue, 2 May 2017 00:38:36 +0900 Subject: [PATCH] "standby clone": create replication slot --- dbutils.c | 128 +++++++++++++++++++++++++++++++++++++++- dbutils.h | 6 +- repmgr-action-standby.c | 37 ++++++++++++ 3 files changed, 168 insertions(+), 3 deletions(-) diff --git a/dbutils.c b/dbutils.c index 9b82c203..51931b88 100644 --- a/dbutils.c +++ b/dbutils.c @@ -10,7 +10,7 @@ #include #include "repmgr.h" - +#include "dbutils.h" #include "catalog/pg_control.h" @@ -23,6 +23,7 @@ static bool _set_config(PGconn *conn, const char *config_param, const char *sqlq static int _get_node_record(PGconn *conn, char *sqlquery, t_node_info *node_info); static void _populate_node_record(PGresult *res, t_node_info *node_info, int row); static bool _create_update_node_record(PGconn *conn, char *action, t_node_info *node_info); +static bool _create_event_record(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details, t_event_info *event_info); /* =================== */ /* extension functions */ @@ -1321,7 +1322,7 @@ create_event_record_extended(PGconn *conn, t_configuration_options *options, int return _create_event_record(conn, options, node_id, event, successful, details, event_info); } -bool +static bool _create_event_record(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details, t_event_info *event_info) { PQExpBufferData query; @@ -1546,3 +1547,126 @@ _create_event_record(PGconn *conn, t_configuration_options *options, int node_id return success; } + + +/* ========================== */ +/* replication slot functions */ +/* ========================== */ + +bool +create_replication_slot(PGconn *conn, char *slot_name, int server_version_num, PQExpBufferData *error_msg) +{ + PQExpBufferData query; + int query_res; + PGresult *res; + t_replication_slot slot_info; + + /* + * Check whether slot exists already; if it exists and is active, that + * means another active standby is using it, which creates an error situation; + * if not we can reuse it as-is + */ + + query_res = get_slot_record(conn, slot_name, &slot_info); + + if (query_res) + { + if (strcmp(slot_info.slot_type, "physical") != 0) + { + appendPQExpBuffer(error_msg, + _("slot '%s' exists and is not a physical slot\n"), + slot_name); + return false; + } + + if (slot_info.active == false) + { + // XXX is this a good idea? + log_debug("replication slot '%s' exists but is inactive; reusing", + slot_name); + + return true; + } + + appendPQExpBuffer(error_msg, + _("slot '%s' already exists as an active slot\n"), + slot_name); + return false; + } + + initPQExpBuffer(&query); + + /* In 9.6 and later, reserve the LSN straight away */ + if (server_version_num >= 90600) + { + appendPQExpBuffer(&query, + "SELECT * FROM pg_catalog.pg_create_physical_replication_slot('%s', TRUE)", + slot_name); + } + else + { + appendPQExpBuffer(&query, + "SELECT * FROM pg_catalog.pg_create_physical_replication_slot('%s')", + slot_name); + } + + log_debug(_("create_replication_slot(): Creating slot '%s' on upstream"), slot_name); + log_verbose(LOG_DEBUG, "create_replication_slot():\n%s", query.data); + + res = PQexec(conn, query.data); + termPQExpBuffer(&query); + + if (!res || PQresultStatus(res) != PGRES_TUPLES_OK) + { + appendPQExpBuffer(error_msg, + _("unable to create slot '%s' on the master node: %s\n"), + slot_name, + PQerrorMessage(conn)); + PQclear(res); + return false; + } + + PQclear(res); + return true; +} + +int +get_slot_record(PGconn *conn, char *slot_name, t_replication_slot *record) +{ + PQExpBufferData query; + PGresult *res; + + initPQExpBuffer(&query); + + appendPQExpBuffer(&query, + "SELECT slot_name, slot_type, active " + " FROM pg_catalog.pg_replication_slots " + " WHERE slot_name = '%s' ", + slot_name); + + log_verbose(LOG_DEBUG, "get_slot_record():\n%s", query.data); + + res = PQexec(conn, query.data); + if (!res || PQresultStatus(res) != PGRES_TUPLES_OK) + { + log_error(_("unable to query pg_replication_slots:\n %s"), + PQerrorMessage(conn)); + PQclear(res); + return -1; + } + + if (!PQntuples(res)) + { + return 0; + } + + strncpy(record->slot_name, PQgetvalue(res, 0, 0), MAXLEN); + strncpy(record->slot_type, PQgetvalue(res, 0, 1), MAXLEN); + record->active = (strcmp(PQgetvalue(res, 0, 2), "t") == 0) + ? true + : false; + + PQclear(res); + + return 1; +} diff --git a/dbutils.h b/dbutils.h index c2b646eb..27ef3227 100644 --- a/dbutils.h +++ b/dbutils.h @@ -178,7 +178,11 @@ bool update_node_record(PGconn *conn, char *repmgr_action, t_node_info *node_in /* event record functions */ bool create_event_record(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details); bool create_event_record_extended(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details, t_event_info *event_info); -bool _create_event_record(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details, t_event_info *event_info); + +/* replication slot functions */ +bool create_replication_slot(PGconn *conn, char *slot_name, int server_version_num, PQExpBufferData *error_msg); + +int get_slot_record(PGconn *conn, char *slot_name, t_replication_slot *record); #endif diff --git a/repmgr-action-standby.c b/repmgr-action-standby.c index 4e1e1a07..1ee75925 100644 --- a/repmgr-action-standby.c +++ b/repmgr-action-standby.c @@ -751,6 +751,43 @@ initialise_direct_clone(void) } PQclear(res); + + /* + * If replication slots requested, create appropriate slot on + * the primary; this must be done before pg_start_backup() is + * issued, either by us or by pg_basebackup. + * + * Replication slots are not supported (and not very useful + * anyway) in Barman mode. + */ + + if (config_file_options.use_replication_slots) + { + PQExpBufferData event_details; + initPQExpBuffer(&event_details); + + if (create_replication_slot(source_conn, repmgr_slot_name, server_version_num, &event_details) == false) + { + log_error("%s", event_details.data); + + create_event_record(primary_conn, + &config_file_options, + config_file_options.node_id, + "standby_clone", + false, + event_details.data); + + PQfinish(source_conn); + exit(ERR_DB_QUERY); + } + + termPQExpBuffer(&event_details); + + log_notice(_("replication slot '%s' created on upstream node (node_id: %i)"), + repmgr_slot_name, + upstream_node_id); + } + }