Add functions for slot creation via the streaming replication protocol

This commit is contained in:
Ian Barwick
2019-10-23 14:22:48 +09:00
parent 9083f26990
commit cc540a54e5
2 changed files with 100 additions and 5 deletions

101
dbutils.c
View File

@@ -61,6 +61,9 @@ static void _populate_node_record(PGresult *res, t_node_info *node_info, int row
static void _populate_node_records(PGresult *res, NodeInfoList *node_list);
static bool _create_update_node_record(PGconn *conn, char *action, t_node_info *node_info);
static bool _verify_replication_slot(PGconn *conn, char *slot_name, PQExpBufferData *error_msg);
static bool _create_event(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details, t_event_info *event_info, bool send_notification);
static bool _is_bdr_db(PGconn *conn, PQExpBufferData *output, bool quiet);
@@ -4148,12 +4151,10 @@ create_slot_name(char *slot_name, int node_id)
}
bool
create_replication_slot_sql(PGconn *conn, char *slot_name, PQExpBufferData *error_msg)
static bool
_verify_replication_slot(PGconn *conn, char *slot_name, PQExpBufferData *error_msg)
{
PQExpBufferData query;
RecordStatus record_status = RECORD_NOT_FOUND;
PGresult *res = NULL;
t_replication_slot slot_info = T_REPLICATION_SLOT_INITIALIZER;
/*
@@ -4161,7 +4162,6 @@ create_replication_slot_sql(PGconn *conn, char *slot_name, PQExpBufferData *erro
* means another active standby is using it, which creates an error
* situation; if not we can reuse it as-is
*/
record_status = get_slot_record(conn, slot_name, &slot_info);
if (record_status == RECORD_FOUND)
@@ -4188,6 +4188,59 @@ create_replication_slot_sql(PGconn *conn, char *slot_name, PQExpBufferData *erro
return false;
}
return true;
}
bool
create_replication_slot_replprot(PGconn *conn, PGconn *repl_conn, char *slot_name, PQExpBufferData *error_msg)
{
PQExpBufferData query;
PGresult *res = NULL;
if (_verify_replication_slot(conn, slot_name, error_msg) == false)
return false;
initPQExpBuffer(&query);
appendPQExpBuffer(&query,
"CREATE_REPLICATION_SLOT %s PHYSICAL",
slot_name);
/* In 9.6 and later, reserve the LSN straight away */
if (PQserverVersion(conn) >= 90600)
{
appendPQExpBufferStr(&query,
" RESERVE_WAL");
}
appendPQExpBufferChar(&query, ';');
res = PQexec(repl_conn, query.data);
if (PQresultStatus(res) != PGRES_TUPLES_OK || !PQntuples(res))
{
log_db_error(conn, NULL, _("unable to execute CREATE_REPLICATION_SLOT"));
PQclear(res);
return false;
}
PQclear(res);
return true;
}
bool
create_replication_slot_sql(PGconn *conn, char *slot_name, PQExpBufferData *error_msg)
{
PQExpBufferData query;
PGresult *res = NULL;
if (_verify_replication_slot(conn, slot_name, error_msg) == false)
return false;
initPQExpBuffer(&query);
/* In 9.6 and later, reserve the LSN straight away */
@@ -4263,6 +4316,44 @@ drop_replication_slot_sql(PGconn *conn, char *slot_name)
}
bool
drop_replication_slot_replprot(PGconn *conn, PGconn *repl_conn, char *slot_name)
{
PQExpBufferData query;
PGresult *res = NULL;
bool success = true;
initPQExpBuffer(&query);
appendPQExpBuffer(&query,
"DROP_REPLICATION_SLOT %s",
slot_name);
log_verbose(LOG_DEBUG, "drop_replication_slot_replprot():\n %s", query.data);
res = PQexec(repl_conn, query.data);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
log_db_error(conn, query.data,
_("drop_replication_slot_sql(): unable to drop replication slot \"%s\""),
slot_name);
success = false;
}
else
{
log_verbose(LOG_DEBUG, "replication slot \"%s\" successfully dropped",
slot_name);
}
termPQExpBuffer(&query);
PQclear(res);
return success;
}
RecordStatus
get_slot_record(PGconn *conn, char *slot_name, t_replication_slot *record)
{

View File

@@ -560,8 +560,12 @@ PGresult *get_event_records(PGconn *conn, int node_id, const char *node_name,
/* replication slot functions */
void create_slot_name(char *slot_name, int node_id);
bool create_replication_slot_sql(PGconn *conn, char *slot_name, PQExpBufferData *error_msg);
bool create_replication_slot_replprot(PGconn *conn, PGconn *repl_conn, char *slot_name, PQExpBufferData *error_msg);
bool drop_replication_slot_sql(PGconn *conn, char *slot_name);
bool drop_replication_slot_replprot(PGconn *conn, PGconn *repl_conn, char *slot_name);
RecordStatus get_slot_record(PGconn *conn, char *slot_name, t_replication_slot *record);
int get_free_replication_slot_count(PGconn *conn);
int get_inactive_replication_slots(PGconn *conn, KeyValueList *list);