Update view "replication_status"

This commit is contained in:
Ian Barwick
2017-08-24 15:05:13 +09:00
parent a127e8face
commit eee8d65259
5 changed files with 128 additions and 31 deletions

View File

@@ -3357,7 +3357,8 @@ is_server_available(const char *conninfo)
void void
add_monitoring_record( add_monitoring_record(
PGconn *conn, PGconn *primary_conn,
PGconn *local_conn,
int primary_node_id, int primary_node_id,
int local_node_id, int local_node_id,
char *monitor_standby_timestamp, char *monitor_standby_timestamp,
@@ -3402,25 +3403,22 @@ add_monitoring_record(
log_verbose(LOG_DEBUG, "standby_monitor:()\n%s", query.data); log_verbose(LOG_DEBUG, "standby_monitor:()\n%s", query.data);
if (PQsendQuery(conn, query.data) == 0) if (PQsendQuery(primary_conn, query.data) == 0)
{ {
log_warning(_("query could not be sent to master: %s\n"), log_warning(_("query could not be sent to master: %s\n"),
PQerrorMessage(conn)); PQerrorMessage(primary_conn));
} }
else else
{ {
//PGresult *res = NULL; PGresult *res = NULL;
/* sqlquery_snprintf(sqlquery, res = PQexec(local_conn, "SELECT repmgr.standby_set_last_updated()");
"SELECT %s.repmgr_update_last_updated();",
get_repmgr_schema_quoted(my_local_conn));
res = PQexec(my_local_conn, sqlquery);*/
/* not critical if the above query fails*/ /* not critical if the above query fails*/
/* if (PQresultStatus(res) != PGRES_TUPLES_OK) if (PQresultStatus(res) != PGRES_TUPLES_OK)
log_warning(_("unable to set last_updated: %s\n"), PQerrorMessage(my_local_conn)); log_warning(_("unable to set last_updated:\n %s"), PQerrorMessage(local_conn));
PQclear(res);*/ PQclear(res);
} }
termPQExpBuffer(&query); termPQExpBuffer(&query);

View File

@@ -431,7 +431,8 @@ bool is_server_available(const char *conninfo);
/* monitoring functions */ /* monitoring functions */
void add_monitoring_record( void add_monitoring_record(
PGconn *conn, PGconn *primary_conn,
PGconn *local_conn,
int primary_node_id, int primary_node_id,
int local_node_id, int local_node_id,
char *monitor_standby_timestamp, char *monitor_standby_timestamp,

View File

@@ -49,27 +49,25 @@ CREATE VIEW repmgr.show_nodes AS
LEFT JOIN repmgr.nodes un LEFT JOIN repmgr.nodes un
ON un.node_id = n.upstream_node_id; ON un.node_id = n.upstream_node_id;
-- repmgr.repmgr_get_last_updated()
CREATE VIEW repmgr.replication_status AS
SELECT m.primary_node_id, m.standby_node_id, n.node_name AS standby_name,
n.type AS node_type, n.active, last_monitor_time,
CASE WHEN n.type='standby' THEN m.last_wal_primary_location ELSE NULL END AS last_wal_primary_location,
m.last_wal_standby_location,
CASE WHEN n.type='standby' THEN pg_catalog.pg_size_pretty(m.replication_lag) ELSE NULL END AS replication_lag,
CASE WHEN n.type='standby' THEN
CASE WHEN replication_lag > 0 THEN age(now(), m.last_apply_time) ELSE '0'::INTERVAL END
ELSE NULL
END AS replication_time_lag,
CASE WHEN n.type='standby' THEN pg_catalog.pg_size_pretty(m.apply_lag) ELSE NULL END AS apply_lag,
AGE(NOW(), CASE WHEN pg_catalog.pg_is_in_recovery() THEN NOW() ELSE m.last_monitor_time END) AS communication_time_lag
FROM repmgr.monitoring_history m
JOIN repmgr.nodes n ON m.standby_node_id = n.node_id
WHERE (m.standby_node_id, m.last_monitor_time) IN (
SELECT m1.standby_node_id, MAX(m1.last_monitor_time)
FROM repmgr.monitoring_history m1 GROUP BY 1
);
/* ================= */
/* repmgrd functions */ /* repmgrd functions */
/* ================= */
/* monitoring functions */
CREATE FUNCTION standby_set_last_updated()
RETURNS TIMESTAMP WITH TIME ZONE
AS '$libdir/repmgr', 'standby_set_last_updated'
LANGUAGE C STRICT;
CREATE FUNCTION standby_get_last_updated()
RETURNS TIMESTAMP WITH TIME ZONE
AS '$libdir/repmgr', 'standby_get_last_updated'
LANGUAGE C STRICT;
/* failover functions */
CREATE FUNCTION request_vote(INT,INT) CREATE FUNCTION request_vote(INT,INT)
RETURNS pg_lsn RETURNS pg_lsn
@@ -117,3 +115,25 @@ CREATE FUNCTION unset_bdr_failover_handler()
RETURNS VOID RETURNS VOID
AS '$libdir/repmgr', 'unset_bdr_failover_handler' AS '$libdir/repmgr', 'unset_bdr_failover_handler'
LANGUAGE C STRICT; LANGUAGE C STRICT;
CREATE VIEW repmgr.replication_status AS
SELECT m.primary_node_id, m.standby_node_id, n.node_name AS standby_name,
n.type AS node_type, n.active, last_monitor_time,
CASE WHEN n.type='standby' THEN m.last_wal_primary_location ELSE NULL END AS last_wal_primary_location,
m.last_wal_standby_location,
CASE WHEN n.type='standby' THEN pg_catalog.pg_size_pretty(m.replication_lag) ELSE NULL END AS replication_lag,
CASE WHEN n.type='standby' THEN
CASE WHEN replication_lag > 0 THEN age(now(), m.last_apply_time) ELSE '0'::INTERVAL END
ELSE NULL
END AS replication_time_lag,
CASE WHEN n.type='standby' THEN pg_catalog.pg_size_pretty(m.apply_lag) ELSE NULL END AS apply_lag,
AGE(NOW(), CASE WHEN pg_catalog.pg_is_in_recovery() THEN repmgr.standby_get_last_updated() ELSE m.last_monitor_time END) AS communication_time_lag
FROM repmgr.monitoring_history m
JOIN repmgr.nodes n ON m.standby_node_id = n.node_id
WHERE (m.standby_node_id, m.last_monitor_time) IN (
SELECT m1.standby_node_id, MAX(m1.last_monitor_time)
FROM repmgr.monitoring_history m1 GROUP BY 1
);

View File

@@ -45,6 +45,7 @@ typedef enum {
typedef struct repmgrdSharedState typedef struct repmgrdSharedState
{ {
LWLockId lock; /* protects search/modification */ LWLockId lock; /* protects search/modification */
TimestampTz last_updated;
/* streaming failover */ /* streaming failover */
NodeState node_state; NodeState node_state;
NodeVotingStatus voting_status; NodeVotingStatus voting_status;
@@ -65,6 +66,13 @@ void _PG_fini(void);
static void repmgr_shmem_startup(void); static void repmgr_shmem_startup(void);
Datum standby_set_last_updated(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(standby_set_last_updated);
Datum standby_get_last_updated(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(standby_get_last_updated);
Datum request_vote(PG_FUNCTION_ARGS); Datum request_vote(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(request_vote); PG_FUNCTION_INFO_V1(request_vote);
@@ -174,6 +182,51 @@ repmgr_shmem_startup(void)
} }
/* ==================== */
/* monitoring functions */
/* ==================== */
/* update and return last updated with current timestamp */
Datum
standby_set_last_updated(PG_FUNCTION_ARGS)
{
TimestampTz last_updated = GetCurrentTimestamp();
if (!shared_state)
PG_RETURN_NULL();
LWLockAcquire(shared_state->lock, LW_SHARED);
shared_state->last_updated = last_updated;
LWLockRelease(shared_state->lock);
PG_RETURN_TIMESTAMPTZ(last_updated);
}
/* get last updated timestamp */
Datum
standby_get_last_updated(PG_FUNCTION_ARGS)
{
TimestampTz last_updated;
/* Safety check... */
if (!shared_state)
PG_RETURN_NULL();
LWLockAcquire(shared_state->lock, LW_EXCLUSIVE);
last_updated = shared_state->last_updated;
LWLockRelease(shared_state->lock);
PG_RETURN_TIMESTAMPTZ(last_updated);
}
/* ===================*/
/* failover functions */
/* ===================*/
Datum Datum
request_vote(PG_FUNCTION_ARGS) request_vote(PG_FUNCTION_ARGS)
{ {
@@ -188,6 +241,9 @@ request_vote(PG_FUNCTION_ARGS)
int ret; int ret;
bool isnull; bool isnull;
if (!shared_state)
PG_RETURN_NULL();
LWLockAcquire(shared_state->lock, LW_SHARED); LWLockAcquire(shared_state->lock, LW_SHARED);
/* this node has initiated voting or already responded to another node */ /* this node has initiated voting or already responded to another node */
@@ -256,6 +312,9 @@ get_voting_status(PG_FUNCTION_ARGS)
#ifndef BDR_ONLY #ifndef BDR_ONLY
NodeVotingStatus voting_status; NodeVotingStatus voting_status;
if (!shared_state)
PG_RETURN_NULL();
LWLockAcquire(shared_state->lock, LW_SHARED); LWLockAcquire(shared_state->lock, LW_SHARED);
voting_status = shared_state->voting_status; voting_status = shared_state->voting_status;
LWLockRelease(shared_state->lock); LWLockRelease(shared_state->lock);
@@ -294,6 +353,9 @@ other_node_is_candidate(PG_FUNCTION_ARGS)
int requesting_node_id = PG_GETARG_INT32(0); int requesting_node_id = PG_GETARG_INT32(0);
int electoral_term = PG_GETARG_INT32(1); int electoral_term = PG_GETARG_INT32(1);
if (!shared_state)
PG_RETURN_NULL();
LWLockAcquire(shared_state->lock, LW_SHARED); LWLockAcquire(shared_state->lock, LW_SHARED);
if (shared_state->current_electoral_term == electoral_term) if (shared_state->current_electoral_term == electoral_term)
@@ -323,6 +385,9 @@ notify_follow_primary(PG_FUNCTION_ARGS)
#ifndef BDR_ONLY #ifndef BDR_ONLY
int primary_node_id = PG_GETARG_INT32(0); int primary_node_id = PG_GETARG_INT32(0);
if (!shared_state)
PG_RETURN_NULL();
elog(INFO, "received notification to follow node %i", primary_node_id); elog(INFO, "received notification to follow node %i", primary_node_id);
LWLockAcquire(shared_state->lock, LW_SHARED); LWLockAcquire(shared_state->lock, LW_SHARED);
@@ -341,6 +406,9 @@ get_new_primary(PG_FUNCTION_ARGS)
{ {
int new_primary_node_id = UNKNOWN_NODE_ID; int new_primary_node_id = UNKNOWN_NODE_ID;
if (!shared_state)
PG_RETURN_NULL();
#ifndef BDR_ONLY #ifndef BDR_ONLY
LWLockAcquire(shared_state->lock, LW_SHARED); LWLockAcquire(shared_state->lock, LW_SHARED);
@@ -357,6 +425,9 @@ Datum
reset_voting_status(PG_FUNCTION_ARGS) reset_voting_status(PG_FUNCTION_ARGS)
{ {
#ifndef BDR_ONLY #ifndef BDR_ONLY
if (!shared_state)
PG_RETURN_NULL();
LWLockAcquire(shared_state->lock, LW_SHARED); LWLockAcquire(shared_state->lock, LW_SHARED);
shared_state->voting_status = VS_NO_VOTE; shared_state->voting_status = VS_NO_VOTE;
@@ -375,6 +446,9 @@ am_bdr_failover_handler(PG_FUNCTION_ARGS)
int node_id = PG_GETARG_INT32(0); int node_id = PG_GETARG_INT32(0);
bool am_handler = false; bool am_handler = false;
if (!shared_state)
PG_RETURN_NULL();
LWLockAcquire(shared_state->lock, LW_SHARED); LWLockAcquire(shared_state->lock, LW_SHARED);
if (shared_state->bdr_failover_handler == UNKNOWN_NODE_ID) if (shared_state->bdr_failover_handler == UNKNOWN_NODE_ID)
@@ -396,6 +470,9 @@ am_bdr_failover_handler(PG_FUNCTION_ARGS)
Datum Datum
unset_bdr_failover_handler(PG_FUNCTION_ARGS) unset_bdr_failover_handler(PG_FUNCTION_ARGS)
{ {
if (!shared_state)
PG_RETURN_NULL();
LWLockAcquire(shared_state->lock, LW_SHARED); LWLockAcquire(shared_state->lock, LW_SHARED);
shared_state->bdr_failover_handler = UNKNOWN_NODE_ID; shared_state->bdr_failover_handler = UNKNOWN_NODE_ID;
LWLockRelease(shared_state->lock); LWLockRelease(shared_state->lock);

View File

@@ -1102,6 +1102,7 @@ update_monitoring_history(void)
add_monitoring_record( add_monitoring_record(
primary_conn, primary_conn,
local_conn,
primary_node_id, primary_node_id,
local_node_info.node_id, local_node_info.node_id,
replication_info.current_timestamp, replication_info.current_timestamp,