From 92c73b68a03cbdf4c524708c75bc068aa3821315 Mon Sep 17 00:00:00 2001 From: Ian Barwick Date: Tue, 5 Feb 2019 09:36:54 +0900 Subject: [PATCH] Clean up dbutils.c Put functions into the same "section" as noted in the header file. --- dbutils.c | 612 +++++++++++++++++++++++++++--------------------------- 1 file changed, 310 insertions(+), 302 deletions(-) diff --git a/dbutils.c b/dbutils.c index fc3feb6d..abc3610a 100644 --- a/dbutils.c +++ b/dbutils.c @@ -1414,83 +1414,6 @@ get_primary_node_id(PGconn *conn) } -void -init_replication_info(ReplInfo *replication_info) -{ - memset(replication_info->current_timestamp, 0, sizeof(replication_info->current_timestamp)); - replication_info->last_wal_receive_lsn = InvalidXLogRecPtr; - replication_info->last_wal_replay_lsn = InvalidXLogRecPtr; - memset(replication_info->last_xact_replay_timestamp, 0, sizeof(replication_info->last_xact_replay_timestamp)); - replication_info->replication_lag_time = 0; - replication_info->receiving_streamed_wal = true; -} - -bool -get_replication_info(PGconn *conn, ReplInfo *replication_info) -{ - PQExpBufferData query; - PGresult *res = NULL; - bool success = true; - - initPQExpBuffer(&query); - appendPQExpBufferStr(&query, - " SELECT ts, " - " last_wal_receive_lsn, " - " last_wal_replay_lsn, " - " last_xact_replay_timestamp, " - " CASE WHEN (last_wal_receive_lsn = last_wal_replay_lsn) " - " THEN 0::INT " - " ELSE " - " EXTRACT(epoch FROM (pg_catalog.clock_timestamp() - last_xact_replay_timestamp))::INT " - " END AS replication_lag_time, " - " COALESCE(last_wal_receive_lsn, '0/0') >= last_wal_replay_lsn AS receiving_streamed_wal " - " FROM ( "); - - if (PQserverVersion(conn) >= 100000) - { - appendPQExpBufferStr(&query, - " SELECT CURRENT_TIMESTAMP AS ts, " - " pg_catalog.pg_last_wal_receive_lsn() AS last_wal_receive_lsn, " - " pg_catalog.pg_last_wal_replay_lsn() AS last_wal_replay_lsn, " - " pg_catalog.pg_last_xact_replay_timestamp() AS last_xact_replay_timestamp "); - } - else - { - appendPQExpBufferStr(&query, - " SELECT CURRENT_TIMESTAMP AS ts, " - " pg_catalog.pg_last_xlog_receive_location() AS last_wal_receive_lsn, " - " pg_catalog.pg_last_xlog_replay_location() AS last_wal_replay_lsn, " - " pg_catalog.pg_last_xact_replay_timestamp() AS last_xact_replay_timestamp "); - } - - appendPQExpBufferStr(&query, - " ) q "); - - log_verbose(LOG_DEBUG, "get_replication_info():\n%s", query.data); - - res = PQexec(conn, query.data); - - if (PQresultStatus(res) != PGRES_TUPLES_OK || !PQntuples(res)) - { - log_db_error(conn, query.data, _("get_replication_info(): unable to execute query")); - - success = false; - } - else - { - strncpy(replication_info->current_timestamp, PQgetvalue(res, 0, 0), MAXLEN); - replication_info->last_wal_receive_lsn = parse_lsn(PQgetvalue(res, 0, 1)); - replication_info->last_wal_replay_lsn = parse_lsn(PQgetvalue(res, 0, 2)); - strncpy(replication_info->last_xact_replay_timestamp, PQgetvalue(res, 0, 3), MAXLEN); - replication_info->replication_lag_time = atoi(PQgetvalue(res, 0, 4)); - replication_info->receiving_streamed_wal = atobool(PQgetvalue(res, 0, 5)); - } - - termPQExpBuffer(&query); - PQclear(res); - - return success; -} int @@ -1570,57 +1493,6 @@ get_ready_archive_files(PGconn *conn, const char *data_directory) } -int -get_replication_lag_seconds(PGconn *conn) -{ - PQExpBufferData query; - PGresult *res = NULL; - int lag_seconds = 0; - - initPQExpBuffer(&query); - - if (PQserverVersion(conn) >= 100000) - { - appendPQExpBufferStr(&query, - " SELECT CASE WHEN (pg_catalog.pg_last_wal_receive_lsn() = pg_catalog.pg_last_wal_replay_lsn()) "); - - } - else - { - appendPQExpBufferStr(&query, - " SELECT CASE WHEN (pg_catalog.pg_last_xlog_receive_location() = pg_catalog.pg_last_xlog_replay_location()) "); - } - - appendPQExpBufferStr(&query, - " THEN 0 " - " ELSE EXTRACT(epoch FROM (pg_catalog.clock_timestamp() - pg_catalog.pg_last_xact_replay_timestamp()))::INT " - " END " - " AS lag_seconds"); - - res = PQexec(conn, query.data); - log_verbose(LOG_DEBUG, "get_replication_lag_seconds():\n%s", query.data); - termPQExpBuffer(&query); - - if (PQresultStatus(res) != PGRES_TUPLES_OK) - { - log_warning("%s", PQerrorMessage(conn)); - PQclear(res); - - /* XXX magic number */ - return -1; - } - - if (!PQntuples(res)) - { - return -1; - } - - lag_seconds = atoi(PQgetvalue(res, 0, 0)); - - PQclear(res); - return lag_seconds; -} - bool identify_system(PGconn *repl_conn, t_system_identification *identification) @@ -3206,180 +3078,6 @@ update_node_record_slot_name(PGconn *primary_conn, int node_id, char *slot_name) } -void -get_node_replication_stats(PGconn *conn, t_node_info *node_info) -{ - PQExpBufferData query; - PGresult *res = NULL; - - initPQExpBuffer(&query); - - appendPQExpBufferStr(&query, - " SELECT pg_catalog.current_setting('max_wal_senders')::INT AS max_wal_senders, " - " (SELECT pg_catalog.count(*) FROM pg_catalog.pg_stat_replication) AS attached_wal_receivers, "); - - /* no replication slots in PostgreSQL 9.3 */ - if (PQserverVersion(conn) < 90400) - { - appendPQExpBufferStr(&query, - " 0 AS max_replication_slots, " - " 0 AS total_replication_slots, " - " 0 AS active_replication_slots, " - " 0 AS inactive_replication_slots, "); - } - else - { - appendPQExpBufferStr(&query, - " current_setting('max_replication_slots')::INT AS max_replication_slots, " - " (SELECT pg_catalog.count(*) FROM pg_catalog.pg_replication_slots) AS total_replication_slots, " - " (SELECT pg_catalog.count(*) FROM pg_catalog.pg_replication_slots WHERE active IS TRUE) AS active_replication_slots, " - " (SELECT pg_catalog.count(*) FROM pg_catalog.pg_replication_slots WHERE active IS FALSE) AS inactive_replication_slots, "); - } - - - appendPQExpBufferStr(&query, - " pg_catalog.pg_is_in_recovery() AS in_recovery"); - - log_verbose(LOG_DEBUG, "get_node_replication_stats():\n%s", query.data); - - res = PQexec(conn, query.data); - - if (PQresultStatus(res) != PGRES_TUPLES_OK) - { - log_warning(_("unable to retrieve node replication statistics")); - log_detail("%s", PQerrorMessage(conn)); - log_detail("%s", query.data); - - termPQExpBuffer(&query); - PQclear(res); - - return; - } - - node_info->max_wal_senders = atoi(PQgetvalue(res, 0, 0)); - node_info->attached_wal_receivers = atoi(PQgetvalue(res, 0, 1)); - node_info->max_replication_slots = atoi(PQgetvalue(res, 0, 2)); - node_info->total_replication_slots = atoi(PQgetvalue(res, 0, 3)); - node_info->active_replication_slots = atoi(PQgetvalue(res, 0, 4)); - node_info->inactive_replication_slots = atoi(PQgetvalue(res, 0, 5)); - node_info->recovery_type = strcmp(PQgetvalue(res, 0, 6), "f") == 0 ? RECTYPE_PRIMARY : RECTYPE_STANDBY; - - termPQExpBuffer(&query); - PQclear(res); - - return; -} - - -bool -is_downstream_node_attached(PGconn *conn, char *node_name) -{ - PQExpBufferData query; - PGresult *res = NULL; - int c = 0; - - initPQExpBuffer(&query); - - appendPQExpBuffer(&query, - " SELECT pg_catalog.count(*) FROM pg_catalog.pg_stat_replication " - " WHERE application_name = '%s'", - node_name); - - res = PQexec(conn, query.data); - - if (PQresultStatus(res) != PGRES_TUPLES_OK) - { - log_verbose(LOG_WARNING, _("unable to query pg_stat_replication")); - log_detail("%s", PQerrorMessage(conn)); - log_detail("%s", query.data); - - termPQExpBuffer(&query); - PQclear(res); - - return false; - } - - if (PQntuples(res) != 1) - { - log_verbose(LOG_WARNING, _("unexpected number of tuples (%i) returned"), PQntuples(res)); - - termPQExpBuffer(&query); - PQclear(res); - - return false; - } - - c = atoi(PQgetvalue(res, 0, 0)); - - termPQExpBuffer(&query); - PQclear(res); - - if (c == 0) - { - log_verbose(LOG_WARNING, _("node \"%s\" not found in \"pg_stat_replication\""), node_name); - - return false; - } - - if (c > 1) - log_verbose(LOG_WARNING, _("multiple entries with \"application_name\" set to \"%s\" found in \"pg_stat_replication\""), - node_name); - - return true; -} - - -void -set_primary_last_seen(PGconn *conn) -{ - PQExpBufferData query; - PGresult *res = NULL; - - initPQExpBuffer(&query); - - appendPQExpBufferStr(&query, - "SELECT repmgr.set_primary_last_seen()"); - - res = PQexec(conn, query.data); - - if (PQresultStatus(res) != PGRES_TUPLES_OK) - { - log_db_error(conn, query.data, _("unable to execute repmgr.set_primary_last_seen()")); - } - - termPQExpBuffer(&query); - PQclear(res); - -} - -int -get_primary_last_seen(PGconn *conn) -{ - PQExpBufferData query; - PGresult *res = NULL; - int primary_last_seen = -1; - - initPQExpBuffer(&query); - - appendPQExpBufferStr(&query, - "SELECT repmgr.get_primary_last_seen()"); - - res = PQexec(conn, query.data); - - if (PQresultStatus(res) != PGRES_TUPLES_OK) - { - log_db_error(conn, query.data, _("unable to execute repmgr.get_primary_last_seen()")); - } - else - { - primary_last_seen = atoi(PQgetvalue(res, 0, 0)); - } - - termPQExpBuffer(&query); - PQclear(res); - - return primary_last_seen; -} void @@ -4904,6 +4602,7 @@ get_last_wal_receive_location(PGconn *conn) return ptr; } + XLogRecPtr get_current_lsn(PGconn *conn) { @@ -4992,6 +4691,315 @@ get_current_lsn(PGconn *conn) return ptr; } + +void +init_replication_info(ReplInfo *replication_info) +{ + memset(replication_info->current_timestamp, 0, sizeof(replication_info->current_timestamp)); + replication_info->last_wal_receive_lsn = InvalidXLogRecPtr; + replication_info->last_wal_replay_lsn = InvalidXLogRecPtr; + memset(replication_info->last_xact_replay_timestamp, 0, sizeof(replication_info->last_xact_replay_timestamp)); + replication_info->replication_lag_time = 0; + replication_info->receiving_streamed_wal = true; +} + + +bool +get_replication_info(PGconn *conn, ReplInfo *replication_info) +{ + PQExpBufferData query; + PGresult *res = NULL; + bool success = true; + + initPQExpBuffer(&query); + appendPQExpBufferStr(&query, + " SELECT ts, " + " last_wal_receive_lsn, " + " last_wal_replay_lsn, " + " last_xact_replay_timestamp, " + " CASE WHEN (last_wal_receive_lsn = last_wal_replay_lsn) " + " THEN 0::INT " + " ELSE " + " EXTRACT(epoch FROM (pg_catalog.clock_timestamp() - last_xact_replay_timestamp))::INT " + " END AS replication_lag_time, " + " COALESCE(last_wal_receive_lsn, '0/0') >= last_wal_replay_lsn AS receiving_streamed_wal " + " FROM ( "); + + if (PQserverVersion(conn) >= 100000) + { + appendPQExpBufferStr(&query, + " SELECT CURRENT_TIMESTAMP AS ts, " + " pg_catalog.pg_last_wal_receive_lsn() AS last_wal_receive_lsn, " + " pg_catalog.pg_last_wal_replay_lsn() AS last_wal_replay_lsn, " + " pg_catalog.pg_last_xact_replay_timestamp() AS last_xact_replay_timestamp "); + } + else + { + appendPQExpBufferStr(&query, + " SELECT CURRENT_TIMESTAMP AS ts, " + " pg_catalog.pg_last_xlog_receive_location() AS last_wal_receive_lsn, " + " pg_catalog.pg_last_xlog_replay_location() AS last_wal_replay_lsn, " + " pg_catalog.pg_last_xact_replay_timestamp() AS last_xact_replay_timestamp "); + } + + appendPQExpBufferStr(&query, + " ) q "); + + log_verbose(LOG_DEBUG, "get_replication_info():\n%s", query.data); + + res = PQexec(conn, query.data); + + if (PQresultStatus(res) != PGRES_TUPLES_OK || !PQntuples(res)) + { + log_db_error(conn, query.data, _("get_replication_info(): unable to execute query")); + + success = false; + } + else + { + strncpy(replication_info->current_timestamp, PQgetvalue(res, 0, 0), MAXLEN); + replication_info->last_wal_receive_lsn = parse_lsn(PQgetvalue(res, 0, 1)); + replication_info->last_wal_replay_lsn = parse_lsn(PQgetvalue(res, 0, 2)); + strncpy(replication_info->last_xact_replay_timestamp, PQgetvalue(res, 0, 3), MAXLEN); + replication_info->replication_lag_time = atoi(PQgetvalue(res, 0, 4)); + replication_info->receiving_streamed_wal = atobool(PQgetvalue(res, 0, 5)); + } + + termPQExpBuffer(&query); + PQclear(res); + + return success; +} + + +int +get_replication_lag_seconds(PGconn *conn) +{ + PQExpBufferData query; + PGresult *res = NULL; + int lag_seconds = 0; + + initPQExpBuffer(&query); + + if (PQserverVersion(conn) >= 100000) + { + appendPQExpBufferStr(&query, + " SELECT CASE WHEN (pg_catalog.pg_last_wal_receive_lsn() = pg_catalog.pg_last_wal_replay_lsn()) "); + + } + else + { + appendPQExpBufferStr(&query, + " SELECT CASE WHEN (pg_catalog.pg_last_xlog_receive_location() = pg_catalog.pg_last_xlog_replay_location()) "); + } + + appendPQExpBufferStr(&query, + " THEN 0 " + " ELSE EXTRACT(epoch FROM (pg_catalog.clock_timestamp() - pg_catalog.pg_last_xact_replay_timestamp()))::INT " + " END " + " AS lag_seconds"); + + res = PQexec(conn, query.data); + log_verbose(LOG_DEBUG, "get_replication_lag_seconds():\n%s", query.data); + termPQExpBuffer(&query); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + log_warning("%s", PQerrorMessage(conn)); + PQclear(res); + + /* XXX magic number */ + return -1; + } + + if (!PQntuples(res)) + { + return -1; + } + + lag_seconds = atoi(PQgetvalue(res, 0, 0)); + + PQclear(res); + return lag_seconds; +} + + +void +get_node_replication_stats(PGconn *conn, t_node_info *node_info) +{ + PQExpBufferData query; + PGresult *res = NULL; + + initPQExpBuffer(&query); + + appendPQExpBufferStr(&query, + " SELECT pg_catalog.current_setting('max_wal_senders')::INT AS max_wal_senders, " + " (SELECT pg_catalog.count(*) FROM pg_catalog.pg_stat_replication) AS attached_wal_receivers, "); + + /* no replication slots in PostgreSQL 9.3 */ + if (PQserverVersion(conn) < 90400) + { + appendPQExpBufferStr(&query, + " 0 AS max_replication_slots, " + " 0 AS total_replication_slots, " + " 0 AS active_replication_slots, " + " 0 AS inactive_replication_slots, "); + } + else + { + appendPQExpBufferStr(&query, + " current_setting('max_replication_slots')::INT AS max_replication_slots, " + " (SELECT pg_catalog.count(*) FROM pg_catalog.pg_replication_slots) AS total_replication_slots, " + " (SELECT pg_catalog.count(*) FROM pg_catalog.pg_replication_slots WHERE active IS TRUE) AS active_replication_slots, " + " (SELECT pg_catalog.count(*) FROM pg_catalog.pg_replication_slots WHERE active IS FALSE) AS inactive_replication_slots, "); + } + + + appendPQExpBufferStr(&query, + " pg_catalog.pg_is_in_recovery() AS in_recovery"); + + log_verbose(LOG_DEBUG, "get_node_replication_stats():\n%s", query.data); + + res = PQexec(conn, query.data); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + log_warning(_("unable to retrieve node replication statistics")); + log_detail("%s", PQerrorMessage(conn)); + log_detail("%s", query.data); + + termPQExpBuffer(&query); + PQclear(res); + + return; + } + + node_info->max_wal_senders = atoi(PQgetvalue(res, 0, 0)); + node_info->attached_wal_receivers = atoi(PQgetvalue(res, 0, 1)); + node_info->max_replication_slots = atoi(PQgetvalue(res, 0, 2)); + node_info->total_replication_slots = atoi(PQgetvalue(res, 0, 3)); + node_info->active_replication_slots = atoi(PQgetvalue(res, 0, 4)); + node_info->inactive_replication_slots = atoi(PQgetvalue(res, 0, 5)); + node_info->recovery_type = strcmp(PQgetvalue(res, 0, 6), "f") == 0 ? RECTYPE_PRIMARY : RECTYPE_STANDBY; + + termPQExpBuffer(&query); + PQclear(res); + + return; +} + + +bool +is_downstream_node_attached(PGconn *conn, char *node_name) +{ + PQExpBufferData query; + PGresult *res = NULL; + int c = 0; + + initPQExpBuffer(&query); + + appendPQExpBuffer(&query, + " SELECT pg_catalog.count(*) FROM pg_catalog.pg_stat_replication " + " WHERE application_name = '%s'", + node_name); + + res = PQexec(conn, query.data); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + log_verbose(LOG_WARNING, _("unable to query pg_stat_replication")); + log_detail("%s", PQerrorMessage(conn)); + log_detail("%s", query.data); + + termPQExpBuffer(&query); + PQclear(res); + + return false; + } + + if (PQntuples(res) != 1) + { + log_verbose(LOG_WARNING, _("unexpected number of tuples (%i) returned"), PQntuples(res)); + + termPQExpBuffer(&query); + PQclear(res); + + return false; + } + + c = atoi(PQgetvalue(res, 0, 0)); + + termPQExpBuffer(&query); + PQclear(res); + + if (c == 0) + { + log_verbose(LOG_WARNING, _("node \"%s\" not found in \"pg_stat_replication\""), node_name); + + return false; + } + + if (c > 1) + log_verbose(LOG_WARNING, _("multiple entries with \"application_name\" set to \"%s\" found in \"pg_stat_replication\""), + node_name); + + return true; +} + +void +set_primary_last_seen(PGconn *conn) +{ + PQExpBufferData query; + PGresult *res = NULL; + + initPQExpBuffer(&query); + + appendPQExpBufferStr(&query, + "SELECT repmgr.set_primary_last_seen()"); + + res = PQexec(conn, query.data); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + log_db_error(conn, query.data, _("unable to execute repmgr.set_primary_last_seen()")); + } + + termPQExpBuffer(&query); + PQclear(res); + +} + + +int +get_primary_last_seen(PGconn *conn) +{ + PQExpBufferData query; + PGresult *res = NULL; + int primary_last_seen = -1; + + initPQExpBuffer(&query); + + appendPQExpBufferStr(&query, + "SELECT repmgr.get_primary_last_seen()"); + + res = PQexec(conn, query.data); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + log_db_error(conn, query.data, _("unable to execute repmgr.get_primary_last_seen()")); + } + else + { + primary_last_seen = atoi(PQgetvalue(res, 0, 0)); + } + + termPQExpBuffer(&query); + PQclear(res); + + return primary_last_seen; +} + + /* ============= */ /* BDR functions */ /* ============= */