mirror of
https://github.com/EnterpriseDB/repmgr.git
synced 2026-03-26 16:46:28 +00:00
Clean up dbutils.c
Put functions into the same "section" as noted in the header file.
This commit is contained in:
612
dbutils.c
612
dbutils.c
@@ -1414,83 +1414,6 @@ get_primary_node_id(PGconn *conn)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void
|
|
||||||
init_replication_info(ReplInfo *replication_info)
|
|
||||||
{
|
|
||||||
memset(replication_info->current_timestamp, 0, sizeof(replication_info->current_timestamp));
|
|
||||||
replication_info->last_wal_receive_lsn = InvalidXLogRecPtr;
|
|
||||||
replication_info->last_wal_replay_lsn = InvalidXLogRecPtr;
|
|
||||||
memset(replication_info->last_xact_replay_timestamp, 0, sizeof(replication_info->last_xact_replay_timestamp));
|
|
||||||
replication_info->replication_lag_time = 0;
|
|
||||||
replication_info->receiving_streamed_wal = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool
|
|
||||||
get_replication_info(PGconn *conn, ReplInfo *replication_info)
|
|
||||||
{
|
|
||||||
PQExpBufferData query;
|
|
||||||
PGresult *res = NULL;
|
|
||||||
bool success = true;
|
|
||||||
|
|
||||||
initPQExpBuffer(&query);
|
|
||||||
appendPQExpBufferStr(&query,
|
|
||||||
" SELECT ts, "
|
|
||||||
" last_wal_receive_lsn, "
|
|
||||||
" last_wal_replay_lsn, "
|
|
||||||
" last_xact_replay_timestamp, "
|
|
||||||
" CASE WHEN (last_wal_receive_lsn = last_wal_replay_lsn) "
|
|
||||||
" THEN 0::INT "
|
|
||||||
" ELSE "
|
|
||||||
" EXTRACT(epoch FROM (pg_catalog.clock_timestamp() - last_xact_replay_timestamp))::INT "
|
|
||||||
" END AS replication_lag_time, "
|
|
||||||
" COALESCE(last_wal_receive_lsn, '0/0') >= last_wal_replay_lsn AS receiving_streamed_wal "
|
|
||||||
" FROM ( ");
|
|
||||||
|
|
||||||
if (PQserverVersion(conn) >= 100000)
|
|
||||||
{
|
|
||||||
appendPQExpBufferStr(&query,
|
|
||||||
" SELECT CURRENT_TIMESTAMP AS ts, "
|
|
||||||
" pg_catalog.pg_last_wal_receive_lsn() AS last_wal_receive_lsn, "
|
|
||||||
" pg_catalog.pg_last_wal_replay_lsn() AS last_wal_replay_lsn, "
|
|
||||||
" pg_catalog.pg_last_xact_replay_timestamp() AS last_xact_replay_timestamp ");
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
appendPQExpBufferStr(&query,
|
|
||||||
" SELECT CURRENT_TIMESTAMP AS ts, "
|
|
||||||
" pg_catalog.pg_last_xlog_receive_location() AS last_wal_receive_lsn, "
|
|
||||||
" pg_catalog.pg_last_xlog_replay_location() AS last_wal_replay_lsn, "
|
|
||||||
" pg_catalog.pg_last_xact_replay_timestamp() AS last_xact_replay_timestamp ");
|
|
||||||
}
|
|
||||||
|
|
||||||
appendPQExpBufferStr(&query,
|
|
||||||
" ) q ");
|
|
||||||
|
|
||||||
log_verbose(LOG_DEBUG, "get_replication_info():\n%s", query.data);
|
|
||||||
|
|
||||||
res = PQexec(conn, query.data);
|
|
||||||
|
|
||||||
if (PQresultStatus(res) != PGRES_TUPLES_OK || !PQntuples(res))
|
|
||||||
{
|
|
||||||
log_db_error(conn, query.data, _("get_replication_info(): unable to execute query"));
|
|
||||||
|
|
||||||
success = false;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
strncpy(replication_info->current_timestamp, PQgetvalue(res, 0, 0), MAXLEN);
|
|
||||||
replication_info->last_wal_receive_lsn = parse_lsn(PQgetvalue(res, 0, 1));
|
|
||||||
replication_info->last_wal_replay_lsn = parse_lsn(PQgetvalue(res, 0, 2));
|
|
||||||
strncpy(replication_info->last_xact_replay_timestamp, PQgetvalue(res, 0, 3), MAXLEN);
|
|
||||||
replication_info->replication_lag_time = atoi(PQgetvalue(res, 0, 4));
|
|
||||||
replication_info->receiving_streamed_wal = atobool(PQgetvalue(res, 0, 5));
|
|
||||||
}
|
|
||||||
|
|
||||||
termPQExpBuffer(&query);
|
|
||||||
PQclear(res);
|
|
||||||
|
|
||||||
return success;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
int
|
int
|
||||||
@@ -1570,57 +1493,6 @@ get_ready_archive_files(PGconn *conn, const char *data_directory)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int
|
|
||||||
get_replication_lag_seconds(PGconn *conn)
|
|
||||||
{
|
|
||||||
PQExpBufferData query;
|
|
||||||
PGresult *res = NULL;
|
|
||||||
int lag_seconds = 0;
|
|
||||||
|
|
||||||
initPQExpBuffer(&query);
|
|
||||||
|
|
||||||
if (PQserverVersion(conn) >= 100000)
|
|
||||||
{
|
|
||||||
appendPQExpBufferStr(&query,
|
|
||||||
" SELECT CASE WHEN (pg_catalog.pg_last_wal_receive_lsn() = pg_catalog.pg_last_wal_replay_lsn()) ");
|
|
||||||
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
appendPQExpBufferStr(&query,
|
|
||||||
" SELECT CASE WHEN (pg_catalog.pg_last_xlog_receive_location() = pg_catalog.pg_last_xlog_replay_location()) ");
|
|
||||||
}
|
|
||||||
|
|
||||||
appendPQExpBufferStr(&query,
|
|
||||||
" THEN 0 "
|
|
||||||
" ELSE EXTRACT(epoch FROM (pg_catalog.clock_timestamp() - pg_catalog.pg_last_xact_replay_timestamp()))::INT "
|
|
||||||
" END "
|
|
||||||
" AS lag_seconds");
|
|
||||||
|
|
||||||
res = PQexec(conn, query.data);
|
|
||||||
log_verbose(LOG_DEBUG, "get_replication_lag_seconds():\n%s", query.data);
|
|
||||||
termPQExpBuffer(&query);
|
|
||||||
|
|
||||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
||||||
{
|
|
||||||
log_warning("%s", PQerrorMessage(conn));
|
|
||||||
PQclear(res);
|
|
||||||
|
|
||||||
/* XXX magic number */
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!PQntuples(res))
|
|
||||||
{
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
lag_seconds = atoi(PQgetvalue(res, 0, 0));
|
|
||||||
|
|
||||||
PQclear(res);
|
|
||||||
return lag_seconds;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
bool
|
bool
|
||||||
identify_system(PGconn *repl_conn, t_system_identification *identification)
|
identify_system(PGconn *repl_conn, t_system_identification *identification)
|
||||||
@@ -3206,180 +3078,6 @@ update_node_record_slot_name(PGconn *primary_conn, int node_id, char *slot_name)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void
|
|
||||||
get_node_replication_stats(PGconn *conn, t_node_info *node_info)
|
|
||||||
{
|
|
||||||
PQExpBufferData query;
|
|
||||||
PGresult *res = NULL;
|
|
||||||
|
|
||||||
initPQExpBuffer(&query);
|
|
||||||
|
|
||||||
appendPQExpBufferStr(&query,
|
|
||||||
" SELECT pg_catalog.current_setting('max_wal_senders')::INT AS max_wal_senders, "
|
|
||||||
" (SELECT pg_catalog.count(*) FROM pg_catalog.pg_stat_replication) AS attached_wal_receivers, ");
|
|
||||||
|
|
||||||
/* no replication slots in PostgreSQL 9.3 */
|
|
||||||
if (PQserverVersion(conn) < 90400)
|
|
||||||
{
|
|
||||||
appendPQExpBufferStr(&query,
|
|
||||||
" 0 AS max_replication_slots, "
|
|
||||||
" 0 AS total_replication_slots, "
|
|
||||||
" 0 AS active_replication_slots, "
|
|
||||||
" 0 AS inactive_replication_slots, ");
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
appendPQExpBufferStr(&query,
|
|
||||||
" current_setting('max_replication_slots')::INT AS max_replication_slots, "
|
|
||||||
" (SELECT pg_catalog.count(*) FROM pg_catalog.pg_replication_slots) AS total_replication_slots, "
|
|
||||||
" (SELECT pg_catalog.count(*) FROM pg_catalog.pg_replication_slots WHERE active IS TRUE) AS active_replication_slots, "
|
|
||||||
" (SELECT pg_catalog.count(*) FROM pg_catalog.pg_replication_slots WHERE active IS FALSE) AS inactive_replication_slots, ");
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
appendPQExpBufferStr(&query,
|
|
||||||
" pg_catalog.pg_is_in_recovery() AS in_recovery");
|
|
||||||
|
|
||||||
log_verbose(LOG_DEBUG, "get_node_replication_stats():\n%s", query.data);
|
|
||||||
|
|
||||||
res = PQexec(conn, query.data);
|
|
||||||
|
|
||||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
||||||
{
|
|
||||||
log_warning(_("unable to retrieve node replication statistics"));
|
|
||||||
log_detail("%s", PQerrorMessage(conn));
|
|
||||||
log_detail("%s", query.data);
|
|
||||||
|
|
||||||
termPQExpBuffer(&query);
|
|
||||||
PQclear(res);
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
node_info->max_wal_senders = atoi(PQgetvalue(res, 0, 0));
|
|
||||||
node_info->attached_wal_receivers = atoi(PQgetvalue(res, 0, 1));
|
|
||||||
node_info->max_replication_slots = atoi(PQgetvalue(res, 0, 2));
|
|
||||||
node_info->total_replication_slots = atoi(PQgetvalue(res, 0, 3));
|
|
||||||
node_info->active_replication_slots = atoi(PQgetvalue(res, 0, 4));
|
|
||||||
node_info->inactive_replication_slots = atoi(PQgetvalue(res, 0, 5));
|
|
||||||
node_info->recovery_type = strcmp(PQgetvalue(res, 0, 6), "f") == 0 ? RECTYPE_PRIMARY : RECTYPE_STANDBY;
|
|
||||||
|
|
||||||
termPQExpBuffer(&query);
|
|
||||||
PQclear(res);
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
bool
|
|
||||||
is_downstream_node_attached(PGconn *conn, char *node_name)
|
|
||||||
{
|
|
||||||
PQExpBufferData query;
|
|
||||||
PGresult *res = NULL;
|
|
||||||
int c = 0;
|
|
||||||
|
|
||||||
initPQExpBuffer(&query);
|
|
||||||
|
|
||||||
appendPQExpBuffer(&query,
|
|
||||||
" SELECT pg_catalog.count(*) FROM pg_catalog.pg_stat_replication "
|
|
||||||
" WHERE application_name = '%s'",
|
|
||||||
node_name);
|
|
||||||
|
|
||||||
res = PQexec(conn, query.data);
|
|
||||||
|
|
||||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
||||||
{
|
|
||||||
log_verbose(LOG_WARNING, _("unable to query pg_stat_replication"));
|
|
||||||
log_detail("%s", PQerrorMessage(conn));
|
|
||||||
log_detail("%s", query.data);
|
|
||||||
|
|
||||||
termPQExpBuffer(&query);
|
|
||||||
PQclear(res);
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (PQntuples(res) != 1)
|
|
||||||
{
|
|
||||||
log_verbose(LOG_WARNING, _("unexpected number of tuples (%i) returned"), PQntuples(res));
|
|
||||||
|
|
||||||
termPQExpBuffer(&query);
|
|
||||||
PQclear(res);
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
c = atoi(PQgetvalue(res, 0, 0));
|
|
||||||
|
|
||||||
termPQExpBuffer(&query);
|
|
||||||
PQclear(res);
|
|
||||||
|
|
||||||
if (c == 0)
|
|
||||||
{
|
|
||||||
log_verbose(LOG_WARNING, _("node \"%s\" not found in \"pg_stat_replication\""), node_name);
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (c > 1)
|
|
||||||
log_verbose(LOG_WARNING, _("multiple entries with \"application_name\" set to \"%s\" found in \"pg_stat_replication\""),
|
|
||||||
node_name);
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void
|
|
||||||
set_primary_last_seen(PGconn *conn)
|
|
||||||
{
|
|
||||||
PQExpBufferData query;
|
|
||||||
PGresult *res = NULL;
|
|
||||||
|
|
||||||
initPQExpBuffer(&query);
|
|
||||||
|
|
||||||
appendPQExpBufferStr(&query,
|
|
||||||
"SELECT repmgr.set_primary_last_seen()");
|
|
||||||
|
|
||||||
res = PQexec(conn, query.data);
|
|
||||||
|
|
||||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
||||||
{
|
|
||||||
log_db_error(conn, query.data, _("unable to execute repmgr.set_primary_last_seen()"));
|
|
||||||
}
|
|
||||||
|
|
||||||
termPQExpBuffer(&query);
|
|
||||||
PQclear(res);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
int
|
|
||||||
get_primary_last_seen(PGconn *conn)
|
|
||||||
{
|
|
||||||
PQExpBufferData query;
|
|
||||||
PGresult *res = NULL;
|
|
||||||
int primary_last_seen = -1;
|
|
||||||
|
|
||||||
initPQExpBuffer(&query);
|
|
||||||
|
|
||||||
appendPQExpBufferStr(&query,
|
|
||||||
"SELECT repmgr.get_primary_last_seen()");
|
|
||||||
|
|
||||||
res = PQexec(conn, query.data);
|
|
||||||
|
|
||||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
||||||
{
|
|
||||||
log_db_error(conn, query.data, _("unable to execute repmgr.get_primary_last_seen()"));
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
primary_last_seen = atoi(PQgetvalue(res, 0, 0));
|
|
||||||
}
|
|
||||||
|
|
||||||
termPQExpBuffer(&query);
|
|
||||||
PQclear(res);
|
|
||||||
|
|
||||||
return primary_last_seen;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void
|
void
|
||||||
@@ -4904,6 +4602,7 @@ get_last_wal_receive_location(PGconn *conn)
|
|||||||
return ptr;
|
return ptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
XLogRecPtr
|
XLogRecPtr
|
||||||
get_current_lsn(PGconn *conn)
|
get_current_lsn(PGconn *conn)
|
||||||
{
|
{
|
||||||
@@ -4992,6 +4691,315 @@ get_current_lsn(PGconn *conn)
|
|||||||
return ptr;
|
return ptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void
|
||||||
|
init_replication_info(ReplInfo *replication_info)
|
||||||
|
{
|
||||||
|
memset(replication_info->current_timestamp, 0, sizeof(replication_info->current_timestamp));
|
||||||
|
replication_info->last_wal_receive_lsn = InvalidXLogRecPtr;
|
||||||
|
replication_info->last_wal_replay_lsn = InvalidXLogRecPtr;
|
||||||
|
memset(replication_info->last_xact_replay_timestamp, 0, sizeof(replication_info->last_xact_replay_timestamp));
|
||||||
|
replication_info->replication_lag_time = 0;
|
||||||
|
replication_info->receiving_streamed_wal = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
bool
|
||||||
|
get_replication_info(PGconn *conn, ReplInfo *replication_info)
|
||||||
|
{
|
||||||
|
PQExpBufferData query;
|
||||||
|
PGresult *res = NULL;
|
||||||
|
bool success = true;
|
||||||
|
|
||||||
|
initPQExpBuffer(&query);
|
||||||
|
appendPQExpBufferStr(&query,
|
||||||
|
" SELECT ts, "
|
||||||
|
" last_wal_receive_lsn, "
|
||||||
|
" last_wal_replay_lsn, "
|
||||||
|
" last_xact_replay_timestamp, "
|
||||||
|
" CASE WHEN (last_wal_receive_lsn = last_wal_replay_lsn) "
|
||||||
|
" THEN 0::INT "
|
||||||
|
" ELSE "
|
||||||
|
" EXTRACT(epoch FROM (pg_catalog.clock_timestamp() - last_xact_replay_timestamp))::INT "
|
||||||
|
" END AS replication_lag_time, "
|
||||||
|
" COALESCE(last_wal_receive_lsn, '0/0') >= last_wal_replay_lsn AS receiving_streamed_wal "
|
||||||
|
" FROM ( ");
|
||||||
|
|
||||||
|
if (PQserverVersion(conn) >= 100000)
|
||||||
|
{
|
||||||
|
appendPQExpBufferStr(&query,
|
||||||
|
" SELECT CURRENT_TIMESTAMP AS ts, "
|
||||||
|
" pg_catalog.pg_last_wal_receive_lsn() AS last_wal_receive_lsn, "
|
||||||
|
" pg_catalog.pg_last_wal_replay_lsn() AS last_wal_replay_lsn, "
|
||||||
|
" pg_catalog.pg_last_xact_replay_timestamp() AS last_xact_replay_timestamp ");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
appendPQExpBufferStr(&query,
|
||||||
|
" SELECT CURRENT_TIMESTAMP AS ts, "
|
||||||
|
" pg_catalog.pg_last_xlog_receive_location() AS last_wal_receive_lsn, "
|
||||||
|
" pg_catalog.pg_last_xlog_replay_location() AS last_wal_replay_lsn, "
|
||||||
|
" pg_catalog.pg_last_xact_replay_timestamp() AS last_xact_replay_timestamp ");
|
||||||
|
}
|
||||||
|
|
||||||
|
appendPQExpBufferStr(&query,
|
||||||
|
" ) q ");
|
||||||
|
|
||||||
|
log_verbose(LOG_DEBUG, "get_replication_info():\n%s", query.data);
|
||||||
|
|
||||||
|
res = PQexec(conn, query.data);
|
||||||
|
|
||||||
|
if (PQresultStatus(res) != PGRES_TUPLES_OK || !PQntuples(res))
|
||||||
|
{
|
||||||
|
log_db_error(conn, query.data, _("get_replication_info(): unable to execute query"));
|
||||||
|
|
||||||
|
success = false;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
strncpy(replication_info->current_timestamp, PQgetvalue(res, 0, 0), MAXLEN);
|
||||||
|
replication_info->last_wal_receive_lsn = parse_lsn(PQgetvalue(res, 0, 1));
|
||||||
|
replication_info->last_wal_replay_lsn = parse_lsn(PQgetvalue(res, 0, 2));
|
||||||
|
strncpy(replication_info->last_xact_replay_timestamp, PQgetvalue(res, 0, 3), MAXLEN);
|
||||||
|
replication_info->replication_lag_time = atoi(PQgetvalue(res, 0, 4));
|
||||||
|
replication_info->receiving_streamed_wal = atobool(PQgetvalue(res, 0, 5));
|
||||||
|
}
|
||||||
|
|
||||||
|
termPQExpBuffer(&query);
|
||||||
|
PQclear(res);
|
||||||
|
|
||||||
|
return success;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int
|
||||||
|
get_replication_lag_seconds(PGconn *conn)
|
||||||
|
{
|
||||||
|
PQExpBufferData query;
|
||||||
|
PGresult *res = NULL;
|
||||||
|
int lag_seconds = 0;
|
||||||
|
|
||||||
|
initPQExpBuffer(&query);
|
||||||
|
|
||||||
|
if (PQserverVersion(conn) >= 100000)
|
||||||
|
{
|
||||||
|
appendPQExpBufferStr(&query,
|
||||||
|
" SELECT CASE WHEN (pg_catalog.pg_last_wal_receive_lsn() = pg_catalog.pg_last_wal_replay_lsn()) ");
|
||||||
|
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
appendPQExpBufferStr(&query,
|
||||||
|
" SELECT CASE WHEN (pg_catalog.pg_last_xlog_receive_location() = pg_catalog.pg_last_xlog_replay_location()) ");
|
||||||
|
}
|
||||||
|
|
||||||
|
appendPQExpBufferStr(&query,
|
||||||
|
" THEN 0 "
|
||||||
|
" ELSE EXTRACT(epoch FROM (pg_catalog.clock_timestamp() - pg_catalog.pg_last_xact_replay_timestamp()))::INT "
|
||||||
|
" END "
|
||||||
|
" AS lag_seconds");
|
||||||
|
|
||||||
|
res = PQexec(conn, query.data);
|
||||||
|
log_verbose(LOG_DEBUG, "get_replication_lag_seconds():\n%s", query.data);
|
||||||
|
termPQExpBuffer(&query);
|
||||||
|
|
||||||
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||||
|
{
|
||||||
|
log_warning("%s", PQerrorMessage(conn));
|
||||||
|
PQclear(res);
|
||||||
|
|
||||||
|
/* XXX magic number */
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!PQntuples(res))
|
||||||
|
{
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
lag_seconds = atoi(PQgetvalue(res, 0, 0));
|
||||||
|
|
||||||
|
PQclear(res);
|
||||||
|
return lag_seconds;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void
|
||||||
|
get_node_replication_stats(PGconn *conn, t_node_info *node_info)
|
||||||
|
{
|
||||||
|
PQExpBufferData query;
|
||||||
|
PGresult *res = NULL;
|
||||||
|
|
||||||
|
initPQExpBuffer(&query);
|
||||||
|
|
||||||
|
appendPQExpBufferStr(&query,
|
||||||
|
" SELECT pg_catalog.current_setting('max_wal_senders')::INT AS max_wal_senders, "
|
||||||
|
" (SELECT pg_catalog.count(*) FROM pg_catalog.pg_stat_replication) AS attached_wal_receivers, ");
|
||||||
|
|
||||||
|
/* no replication slots in PostgreSQL 9.3 */
|
||||||
|
if (PQserverVersion(conn) < 90400)
|
||||||
|
{
|
||||||
|
appendPQExpBufferStr(&query,
|
||||||
|
" 0 AS max_replication_slots, "
|
||||||
|
" 0 AS total_replication_slots, "
|
||||||
|
" 0 AS active_replication_slots, "
|
||||||
|
" 0 AS inactive_replication_slots, ");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
appendPQExpBufferStr(&query,
|
||||||
|
" current_setting('max_replication_slots')::INT AS max_replication_slots, "
|
||||||
|
" (SELECT pg_catalog.count(*) FROM pg_catalog.pg_replication_slots) AS total_replication_slots, "
|
||||||
|
" (SELECT pg_catalog.count(*) FROM pg_catalog.pg_replication_slots WHERE active IS TRUE) AS active_replication_slots, "
|
||||||
|
" (SELECT pg_catalog.count(*) FROM pg_catalog.pg_replication_slots WHERE active IS FALSE) AS inactive_replication_slots, ");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
appendPQExpBufferStr(&query,
|
||||||
|
" pg_catalog.pg_is_in_recovery() AS in_recovery");
|
||||||
|
|
||||||
|
log_verbose(LOG_DEBUG, "get_node_replication_stats():\n%s", query.data);
|
||||||
|
|
||||||
|
res = PQexec(conn, query.data);
|
||||||
|
|
||||||
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||||
|
{
|
||||||
|
log_warning(_("unable to retrieve node replication statistics"));
|
||||||
|
log_detail("%s", PQerrorMessage(conn));
|
||||||
|
log_detail("%s", query.data);
|
||||||
|
|
||||||
|
termPQExpBuffer(&query);
|
||||||
|
PQclear(res);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
node_info->max_wal_senders = atoi(PQgetvalue(res, 0, 0));
|
||||||
|
node_info->attached_wal_receivers = atoi(PQgetvalue(res, 0, 1));
|
||||||
|
node_info->max_replication_slots = atoi(PQgetvalue(res, 0, 2));
|
||||||
|
node_info->total_replication_slots = atoi(PQgetvalue(res, 0, 3));
|
||||||
|
node_info->active_replication_slots = atoi(PQgetvalue(res, 0, 4));
|
||||||
|
node_info->inactive_replication_slots = atoi(PQgetvalue(res, 0, 5));
|
||||||
|
node_info->recovery_type = strcmp(PQgetvalue(res, 0, 6), "f") == 0 ? RECTYPE_PRIMARY : RECTYPE_STANDBY;
|
||||||
|
|
||||||
|
termPQExpBuffer(&query);
|
||||||
|
PQclear(res);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
bool
|
||||||
|
is_downstream_node_attached(PGconn *conn, char *node_name)
|
||||||
|
{
|
||||||
|
PQExpBufferData query;
|
||||||
|
PGresult *res = NULL;
|
||||||
|
int c = 0;
|
||||||
|
|
||||||
|
initPQExpBuffer(&query);
|
||||||
|
|
||||||
|
appendPQExpBuffer(&query,
|
||||||
|
" SELECT pg_catalog.count(*) FROM pg_catalog.pg_stat_replication "
|
||||||
|
" WHERE application_name = '%s'",
|
||||||
|
node_name);
|
||||||
|
|
||||||
|
res = PQexec(conn, query.data);
|
||||||
|
|
||||||
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||||
|
{
|
||||||
|
log_verbose(LOG_WARNING, _("unable to query pg_stat_replication"));
|
||||||
|
log_detail("%s", PQerrorMessage(conn));
|
||||||
|
log_detail("%s", query.data);
|
||||||
|
|
||||||
|
termPQExpBuffer(&query);
|
||||||
|
PQclear(res);
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (PQntuples(res) != 1)
|
||||||
|
{
|
||||||
|
log_verbose(LOG_WARNING, _("unexpected number of tuples (%i) returned"), PQntuples(res));
|
||||||
|
|
||||||
|
termPQExpBuffer(&query);
|
||||||
|
PQclear(res);
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
c = atoi(PQgetvalue(res, 0, 0));
|
||||||
|
|
||||||
|
termPQExpBuffer(&query);
|
||||||
|
PQclear(res);
|
||||||
|
|
||||||
|
if (c == 0)
|
||||||
|
{
|
||||||
|
log_verbose(LOG_WARNING, _("node \"%s\" not found in \"pg_stat_replication\""), node_name);
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (c > 1)
|
||||||
|
log_verbose(LOG_WARNING, _("multiple entries with \"application_name\" set to \"%s\" found in \"pg_stat_replication\""),
|
||||||
|
node_name);
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
set_primary_last_seen(PGconn *conn)
|
||||||
|
{
|
||||||
|
PQExpBufferData query;
|
||||||
|
PGresult *res = NULL;
|
||||||
|
|
||||||
|
initPQExpBuffer(&query);
|
||||||
|
|
||||||
|
appendPQExpBufferStr(&query,
|
||||||
|
"SELECT repmgr.set_primary_last_seen()");
|
||||||
|
|
||||||
|
res = PQexec(conn, query.data);
|
||||||
|
|
||||||
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||||
|
{
|
||||||
|
log_db_error(conn, query.data, _("unable to execute repmgr.set_primary_last_seen()"));
|
||||||
|
}
|
||||||
|
|
||||||
|
termPQExpBuffer(&query);
|
||||||
|
PQclear(res);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int
|
||||||
|
get_primary_last_seen(PGconn *conn)
|
||||||
|
{
|
||||||
|
PQExpBufferData query;
|
||||||
|
PGresult *res = NULL;
|
||||||
|
int primary_last_seen = -1;
|
||||||
|
|
||||||
|
initPQExpBuffer(&query);
|
||||||
|
|
||||||
|
appendPQExpBufferStr(&query,
|
||||||
|
"SELECT repmgr.get_primary_last_seen()");
|
||||||
|
|
||||||
|
res = PQexec(conn, query.data);
|
||||||
|
|
||||||
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||||
|
{
|
||||||
|
log_db_error(conn, query.data, _("unable to execute repmgr.get_primary_last_seen()"));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
primary_last_seen = atoi(PQgetvalue(res, 0, 0));
|
||||||
|
}
|
||||||
|
|
||||||
|
termPQExpBuffer(&query);
|
||||||
|
PQclear(res);
|
||||||
|
|
||||||
|
return primary_last_seen;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/* ============= */
|
/* ============= */
|
||||||
/* BDR functions */
|
/* BDR functions */
|
||||||
/* ============= */
|
/* ============= */
|
||||||
|
|||||||
Reference in New Issue
Block a user