"standby follow": check for replication slot availability on target node

This commit is contained in:
Ian Barwick
2018-02-02 15:01:23 +09:00
parent 94e187c476
commit 50894b6124
3 changed files with 69 additions and 1 deletions

View File

@@ -3550,6 +3550,45 @@ get_slot_record(PGconn *conn, char *slot_name, t_replication_slot *record)
return RECORD_FOUND;
}
int
get_free_replication_slots(PGconn *conn)
{
PQExpBufferData query;
PGresult *res = NULL;
int free_slots = 0;
initPQExpBuffer(&query);
appendPQExpBuffer(&query,
" SELECT pg_catalog.current_setting('max_replication_slots')::INT - "
" COUNT(*) AS free_slots"
" FROM pg_catalog.pg_replication_slots");
res = PQexec(conn, query.data);
termPQExpBuffer(&query);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
log_error(_("unable to execute replication slot query"));
log_detail("%s", PQerrorMessage(conn));
PQclear(res);
return -1;
}
if (PQntuples(res) == 0)
{
PQclear(res);
return -1;
}
free_slots = atoi(PQgetvalue(res, 0, 0));
PQclear(res);
return free_slots;
}
/* ==================== */
/* tablespace functions */
/* ==================== */

View File

@@ -445,6 +445,7 @@ void create_slot_name(char *slot_name, int node_id);
bool create_replication_slot(PGconn *conn, char *slot_name, int server_version_num, PQExpBufferData *error_msg);
bool drop_replication_slot(PGconn *conn, char *slot_name);
RecordStatus get_slot_record(PGconn *conn, char *slot_name, t_replication_slot *record);
int get_free_replication_slots(PGconn *conn);
/* tablespace functions */
bool get_tablespace_name_by_location(PGconn *conn, const char *location, char *name);

View File

@@ -1679,6 +1679,33 @@ do_standby_follow(void)
primary_id);
}
/* if replication slots in use, check at least one free slot is available */
if (config_file_options.use_replication_slots)
{
int free_slots = get_free_replication_slots(primary_conn);
if (free_slots < 0)
{
log_error(_("unable to determine number of free replication slots on the primary"));
PQfinish(primary_conn);
exit(ERR_BAD_CONFIG);
}
if (free_slots == 0)
{
log_error(_("no free replication slots available on the primary"));
log_hint(_("consider increasing \"max_replication_slots\""));
PQfinish(primary_conn);
exit(ERR_BAD_CONFIG);
}
else if (runtime_options.dry_run == true)
{
log_info(_("replication slots in use, %i free slots on the primary"),
free_slots);
}
}
/* XXX check this is not current upstream anyway */
/* check replication connection */
initialize_conninfo_params(&repl_conninfo, false);
@@ -1700,6 +1727,7 @@ do_standby_follow(void)
log_info(_("replication connection to primary node was successful"));
}
/* check system_identifiers match */
local_system_identifier = get_system_identifier(config_file_options.data_directory);
success = identify_system(repl_conn, &primary_identification);
@@ -1728,7 +1756,7 @@ do_standby_follow(void)
log_detail(_("system identifier is %lu"), local_system_identifier);
}
/* TODO: check timelines*/
/* TODO: check timelines */
PQfinish(repl_conn);
free_conninfo_params(&repl_conninfo);