From cc540a54e51aa1637215cb9b1021bff99e663a43 Mon Sep 17 00:00:00 2001 From: Ian Barwick Date: Wed, 23 Oct 2019 14:22:48 +0900 Subject: [PATCH] Add functions for slot creation via the streaming replication protocol --- dbutils.c | 101 +++++++++++++++++++++++++++++++++++++++++++++++++++--- dbutils.h | 4 +++ 2 files changed, 100 insertions(+), 5 deletions(-) diff --git a/dbutils.c b/dbutils.c index ced2e403..01d30381 100644 --- a/dbutils.c +++ b/dbutils.c @@ -61,6 +61,9 @@ static void _populate_node_record(PGresult *res, t_node_info *node_info, int row static void _populate_node_records(PGresult *res, NodeInfoList *node_list); static bool _create_update_node_record(PGconn *conn, char *action, t_node_info *node_info); + +static bool _verify_replication_slot(PGconn *conn, char *slot_name, PQExpBufferData *error_msg); + static bool _create_event(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details, t_event_info *event_info, bool send_notification); static bool _is_bdr_db(PGconn *conn, PQExpBufferData *output, bool quiet); @@ -4148,12 +4151,10 @@ create_slot_name(char *slot_name, int node_id) } -bool -create_replication_slot_sql(PGconn *conn, char *slot_name, PQExpBufferData *error_msg) +static bool +_verify_replication_slot(PGconn *conn, char *slot_name, PQExpBufferData *error_msg) { - PQExpBufferData query; RecordStatus record_status = RECORD_NOT_FOUND; - PGresult *res = NULL; t_replication_slot slot_info = T_REPLICATION_SLOT_INITIALIZER; /* @@ -4161,7 +4162,6 @@ create_replication_slot_sql(PGconn *conn, char *slot_name, PQExpBufferData *erro * means another active standby is using it, which creates an error * situation; if not we can reuse it as-is */ - record_status = get_slot_record(conn, slot_name, &slot_info); if (record_status == RECORD_FOUND) @@ -4188,6 +4188,59 @@ create_replication_slot_sql(PGconn *conn, char *slot_name, PQExpBufferData *erro return false; } + return true; +} + + +bool +create_replication_slot_replprot(PGconn *conn, PGconn *repl_conn, char *slot_name, PQExpBufferData *error_msg) +{ + PQExpBufferData query; + PGresult *res = NULL; + + if (_verify_replication_slot(conn, slot_name, error_msg) == false) + return false; + + + initPQExpBuffer(&query); + + appendPQExpBuffer(&query, + "CREATE_REPLICATION_SLOT %s PHYSICAL", + slot_name); + + /* In 9.6 and later, reserve the LSN straight away */ + if (PQserverVersion(conn) >= 90600) + { + appendPQExpBufferStr(&query, + " RESERVE_WAL"); + } + + appendPQExpBufferChar(&query, ';'); + + res = PQexec(repl_conn, query.data); + + if (PQresultStatus(res) != PGRES_TUPLES_OK || !PQntuples(res)) + { + log_db_error(conn, NULL, _("unable to execute CREATE_REPLICATION_SLOT")); + + PQclear(res); + return false; + } + + PQclear(res); + return true; +} + + +bool +create_replication_slot_sql(PGconn *conn, char *slot_name, PQExpBufferData *error_msg) +{ + PQExpBufferData query; + PGresult *res = NULL; + + if (_verify_replication_slot(conn, slot_name, error_msg) == false) + return false; + initPQExpBuffer(&query); /* In 9.6 and later, reserve the LSN straight away */ @@ -4263,6 +4316,44 @@ drop_replication_slot_sql(PGconn *conn, char *slot_name) } +bool +drop_replication_slot_replprot(PGconn *conn, PGconn *repl_conn, char *slot_name) +{ + PQExpBufferData query; + PGresult *res = NULL; + bool success = true; + + initPQExpBuffer(&query); + + appendPQExpBuffer(&query, + "DROP_REPLICATION_SLOT %s", + slot_name); + + log_verbose(LOG_DEBUG, "drop_replication_slot_replprot():\n %s", query.data); + + res = PQexec(repl_conn, query.data); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + log_db_error(conn, query.data, + _("drop_replication_slot_sql(): unable to drop replication slot \"%s\""), + slot_name); + + success = false; + } + else + { + log_verbose(LOG_DEBUG, "replication slot \"%s\" successfully dropped", + slot_name); + } + + termPQExpBuffer(&query); + PQclear(res); + + return success; +} + + RecordStatus get_slot_record(PGconn *conn, char *slot_name, t_replication_slot *record) { diff --git a/dbutils.h b/dbutils.h index eb10b3e2..b1c0bb02 100644 --- a/dbutils.h +++ b/dbutils.h @@ -560,8 +560,12 @@ PGresult *get_event_records(PGconn *conn, int node_id, const char *node_name, /* replication slot functions */ void create_slot_name(char *slot_name, int node_id); + bool create_replication_slot_sql(PGconn *conn, char *slot_name, PQExpBufferData *error_msg); +bool create_replication_slot_replprot(PGconn *conn, PGconn *repl_conn, char *slot_name, PQExpBufferData *error_msg); bool drop_replication_slot_sql(PGconn *conn, char *slot_name); +bool drop_replication_slot_replprot(PGconn *conn, PGconn *repl_conn, char *slot_name); + RecordStatus get_slot_record(PGconn *conn, char *slot_name, t_replication_slot *record); int get_free_replication_slot_count(PGconn *conn); int get_inactive_replication_slots(PGconn *conn, KeyValueList *list);