From eee8d65259bef7b177e3e09a78de237c6925a028 Mon Sep 17 00:00:00 2001 From: Ian Barwick Date: Thu, 24 Aug 2017 15:05:13 +0900 Subject: [PATCH] Update view "replication_status" --- dbutils.c | 20 ++++++------ dbutils.h | 3 +- repmgr--4.0.sql | 58 ++++++++++++++++++++++------------ repmgr.c | 77 ++++++++++++++++++++++++++++++++++++++++++++++ repmgrd-physical.c | 1 + 5 files changed, 128 insertions(+), 31 deletions(-) diff --git a/dbutils.c b/dbutils.c index e04a9114..5221c761 100644 --- a/dbutils.c +++ b/dbutils.c @@ -3357,7 +3357,8 @@ is_server_available(const char *conninfo) void add_monitoring_record( - PGconn *conn, + PGconn *primary_conn, + PGconn *local_conn, int primary_node_id, int local_node_id, char *monitor_standby_timestamp, @@ -3402,25 +3403,22 @@ add_monitoring_record( log_verbose(LOG_DEBUG, "standby_monitor:()\n%s", query.data); - if (PQsendQuery(conn, query.data) == 0) + if (PQsendQuery(primary_conn, query.data) == 0) { log_warning(_("query could not be sent to master: %s\n"), - PQerrorMessage(conn)); + PQerrorMessage(primary_conn)); } else { - //PGresult *res = NULL; + PGresult *res = NULL; -/* sqlquery_snprintf(sqlquery, - "SELECT %s.repmgr_update_last_updated();", - get_repmgr_schema_quoted(my_local_conn)); - res = PQexec(my_local_conn, sqlquery);*/ + res = PQexec(local_conn, "SELECT repmgr.standby_set_last_updated()"); /* not critical if the above query fails*/ -/* if (PQresultStatus(res) != PGRES_TUPLES_OK) - log_warning(_("unable to set last_updated: %s\n"), PQerrorMessage(my_local_conn)); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + log_warning(_("unable to set last_updated:\n %s"), PQerrorMessage(local_conn)); - PQclear(res);*/ + PQclear(res); } termPQExpBuffer(&query); diff --git a/dbutils.h b/dbutils.h index e34d4da4..37b596cf 100644 --- a/dbutils.h +++ b/dbutils.h @@ -431,7 +431,8 @@ bool is_server_available(const char *conninfo); /* monitoring functions */ void add_monitoring_record( - PGconn *conn, + PGconn *primary_conn, + PGconn *local_conn, int primary_node_id, int local_node_id, char *monitor_standby_timestamp, diff --git a/repmgr--4.0.sql b/repmgr--4.0.sql index b24c82b6..e260890d 100644 --- a/repmgr--4.0.sql +++ b/repmgr--4.0.sql @@ -49,27 +49,25 @@ CREATE VIEW repmgr.show_nodes AS LEFT JOIN repmgr.nodes un ON un.node_id = n.upstream_node_id; --- repmgr.repmgr_get_last_updated() -CREATE VIEW repmgr.replication_status AS - SELECT m.primary_node_id, m.standby_node_id, n.node_name AS standby_name, - n.type AS node_type, n.active, last_monitor_time, - CASE WHEN n.type='standby' THEN m.last_wal_primary_location ELSE NULL END AS last_wal_primary_location, - m.last_wal_standby_location, - CASE WHEN n.type='standby' THEN pg_catalog.pg_size_pretty(m.replication_lag) ELSE NULL END AS replication_lag, - CASE WHEN n.type='standby' THEN - CASE WHEN replication_lag > 0 THEN age(now(), m.last_apply_time) ELSE '0'::INTERVAL END - ELSE NULL - END AS replication_time_lag, - CASE WHEN n.type='standby' THEN pg_catalog.pg_size_pretty(m.apply_lag) ELSE NULL END AS apply_lag, - AGE(NOW(), CASE WHEN pg_catalog.pg_is_in_recovery() THEN NOW() ELSE m.last_monitor_time END) AS communication_time_lag - FROM repmgr.monitoring_history m - JOIN repmgr.nodes n ON m.standby_node_id = n.node_id - WHERE (m.standby_node_id, m.last_monitor_time) IN ( - SELECT m1.standby_node_id, MAX(m1.last_monitor_time) - FROM repmgr.monitoring_history m1 GROUP BY 1 - ); +/* ================= */ /* repmgrd functions */ +/* ================= */ + +/* monitoring functions */ + +CREATE FUNCTION standby_set_last_updated() + RETURNS TIMESTAMP WITH TIME ZONE + AS '$libdir/repmgr', 'standby_set_last_updated' + LANGUAGE C STRICT; + +CREATE FUNCTION standby_get_last_updated() + RETURNS TIMESTAMP WITH TIME ZONE + AS '$libdir/repmgr', 'standby_get_last_updated' + LANGUAGE C STRICT; + + +/* failover functions */ CREATE FUNCTION request_vote(INT,INT) RETURNS pg_lsn @@ -117,3 +115,25 @@ CREATE FUNCTION unset_bdr_failover_handler() RETURNS VOID AS '$libdir/repmgr', 'unset_bdr_failover_handler' LANGUAGE C STRICT; + + + +CREATE VIEW repmgr.replication_status AS + SELECT m.primary_node_id, m.standby_node_id, n.node_name AS standby_name, + n.type AS node_type, n.active, last_monitor_time, + CASE WHEN n.type='standby' THEN m.last_wal_primary_location ELSE NULL END AS last_wal_primary_location, + m.last_wal_standby_location, + CASE WHEN n.type='standby' THEN pg_catalog.pg_size_pretty(m.replication_lag) ELSE NULL END AS replication_lag, + CASE WHEN n.type='standby' THEN + CASE WHEN replication_lag > 0 THEN age(now(), m.last_apply_time) ELSE '0'::INTERVAL END + ELSE NULL + END AS replication_time_lag, + CASE WHEN n.type='standby' THEN pg_catalog.pg_size_pretty(m.apply_lag) ELSE NULL END AS apply_lag, + AGE(NOW(), CASE WHEN pg_catalog.pg_is_in_recovery() THEN repmgr.standby_get_last_updated() ELSE m.last_monitor_time END) AS communication_time_lag + FROM repmgr.monitoring_history m + JOIN repmgr.nodes n ON m.standby_node_id = n.node_id + WHERE (m.standby_node_id, m.last_monitor_time) IN ( + SELECT m1.standby_node_id, MAX(m1.last_monitor_time) + FROM repmgr.monitoring_history m1 GROUP BY 1 + ); + diff --git a/repmgr.c b/repmgr.c index 697f178a..ac0d6661 100644 --- a/repmgr.c +++ b/repmgr.c @@ -45,6 +45,7 @@ typedef enum { typedef struct repmgrdSharedState { LWLockId lock; /* protects search/modification */ + TimestampTz last_updated; /* streaming failover */ NodeState node_state; NodeVotingStatus voting_status; @@ -65,6 +66,13 @@ void _PG_fini(void); static void repmgr_shmem_startup(void); +Datum standby_set_last_updated(PG_FUNCTION_ARGS); +PG_FUNCTION_INFO_V1(standby_set_last_updated); + +Datum standby_get_last_updated(PG_FUNCTION_ARGS); +PG_FUNCTION_INFO_V1(standby_get_last_updated); + + Datum request_vote(PG_FUNCTION_ARGS); PG_FUNCTION_INFO_V1(request_vote); @@ -174,6 +182,51 @@ repmgr_shmem_startup(void) } +/* ==================== */ +/* monitoring functions */ +/* ==================== */ + +/* update and return last updated with current timestamp */ +Datum +standby_set_last_updated(PG_FUNCTION_ARGS) +{ + TimestampTz last_updated = GetCurrentTimestamp(); + + if (!shared_state) + PG_RETURN_NULL(); + + LWLockAcquire(shared_state->lock, LW_SHARED); + shared_state->last_updated = last_updated; + LWLockRelease(shared_state->lock); + + PG_RETURN_TIMESTAMPTZ(last_updated); +} + + +/* get last updated timestamp */ +Datum +standby_get_last_updated(PG_FUNCTION_ARGS) +{ + TimestampTz last_updated; + + /* Safety check... */ + if (!shared_state) + PG_RETURN_NULL(); + + LWLockAcquire(shared_state->lock, LW_EXCLUSIVE); + last_updated = shared_state->last_updated; + LWLockRelease(shared_state->lock); + + PG_RETURN_TIMESTAMPTZ(last_updated); +} + + + + +/* ===================*/ +/* failover functions */ +/* ===================*/ + Datum request_vote(PG_FUNCTION_ARGS) { @@ -188,6 +241,9 @@ request_vote(PG_FUNCTION_ARGS) int ret; bool isnull; + if (!shared_state) + PG_RETURN_NULL(); + LWLockAcquire(shared_state->lock, LW_SHARED); /* this node has initiated voting or already responded to another node */ @@ -256,6 +312,9 @@ get_voting_status(PG_FUNCTION_ARGS) #ifndef BDR_ONLY NodeVotingStatus voting_status; + if (!shared_state) + PG_RETURN_NULL(); + LWLockAcquire(shared_state->lock, LW_SHARED); voting_status = shared_state->voting_status; LWLockRelease(shared_state->lock); @@ -294,6 +353,9 @@ other_node_is_candidate(PG_FUNCTION_ARGS) int requesting_node_id = PG_GETARG_INT32(0); int electoral_term = PG_GETARG_INT32(1); + if (!shared_state) + PG_RETURN_NULL(); + LWLockAcquire(shared_state->lock, LW_SHARED); if (shared_state->current_electoral_term == electoral_term) @@ -323,6 +385,9 @@ notify_follow_primary(PG_FUNCTION_ARGS) #ifndef BDR_ONLY int primary_node_id = PG_GETARG_INT32(0); + if (!shared_state) + PG_RETURN_NULL(); + elog(INFO, "received notification to follow node %i", primary_node_id); LWLockAcquire(shared_state->lock, LW_SHARED); @@ -341,6 +406,9 @@ get_new_primary(PG_FUNCTION_ARGS) { int new_primary_node_id = UNKNOWN_NODE_ID; + if (!shared_state) + PG_RETURN_NULL(); + #ifndef BDR_ONLY LWLockAcquire(shared_state->lock, LW_SHARED); @@ -357,6 +425,9 @@ Datum reset_voting_status(PG_FUNCTION_ARGS) { #ifndef BDR_ONLY + if (!shared_state) + PG_RETURN_NULL(); + LWLockAcquire(shared_state->lock, LW_SHARED); shared_state->voting_status = VS_NO_VOTE; @@ -375,6 +446,9 @@ am_bdr_failover_handler(PG_FUNCTION_ARGS) int node_id = PG_GETARG_INT32(0); bool am_handler = false; + if (!shared_state) + PG_RETURN_NULL(); + LWLockAcquire(shared_state->lock, LW_SHARED); if (shared_state->bdr_failover_handler == UNKNOWN_NODE_ID) @@ -396,6 +470,9 @@ am_bdr_failover_handler(PG_FUNCTION_ARGS) Datum unset_bdr_failover_handler(PG_FUNCTION_ARGS) { + if (!shared_state) + PG_RETURN_NULL(); + LWLockAcquire(shared_state->lock, LW_SHARED); shared_state->bdr_failover_handler = UNKNOWN_NODE_ID; LWLockRelease(shared_state->lock); diff --git a/repmgrd-physical.c b/repmgrd-physical.c index 0eae0aa5..f291eb91 100644 --- a/repmgrd-physical.c +++ b/repmgrd-physical.c @@ -1102,6 +1102,7 @@ update_monitoring_history(void) add_monitoring_record( primary_conn, + local_conn, primary_node_id, local_node_info.node_id, replication_info.current_timestamp,