repmgrd: refactor standby monitoring status query and code

This had grown somewhat complex with addition of handling for
various corner cases. Much of the work has now been delegated
to the query itself.
This commit is contained in:
Ian Barwick
2016-08-16 15:19:06 +09:00
parent 6bd1c6a36d
commit ef7bed1b3d

View File

@@ -718,13 +718,14 @@ static void
standby_monitor(void)
{
PGresult *res;
char sqlquery[QUERY_STR_LEN];
char monitor_standby_timestamp[MAXLEN];
char last_wal_primary_location[MAXLEN];
char last_xlog_receive_location[MAXLEN];
char last_xlog_replay_location[MAXLEN];
char last_xact_replay_timestamp[MAXLEN];
bool last_xlog_receive_location_gte_replayed;
char sqlquery[QUERY_STR_LEN];
bool receiving_streamed_wal = true;
XLogRecPtr lsn_master_current_xlog_location;
XLogRecPtr lsn_last_xlog_receive_location;
@@ -745,7 +746,6 @@ standby_monitor(void)
int active_master_id;
const char *upstream_node_type = NULL;
bool receiving_streamed_wal = true;
/*
@@ -1080,14 +1080,21 @@ standby_monitor(void)
if (wait_connection_availability(master_conn, local_options.master_response_timeout) != 1)
return;
/* Get local xlog info */
/* Get local xlog info
*
* If receive_location is NULL, we're in archive recovery and not streaming WAL
* If receive_location is less than replay location, we were streaming WAL but are
* somehow disconnected and evidently in archive recovery
*/
sqlquery_snprintf(sqlquery,
" SELECT ts, "
" receive_location, "
" CASE WHEN (receive_location IS NULL OR receive_location < replay_location) "
" THEN replay_location "
" ELSE receive_location"
" END AS receive_location,"
" replay_location, "
" replay_timestamp, "
" receive_location >= replay_location "
" COALESCE(receive_location, '0/0') >= replay_location AS receiving_streamed_wal "
" FROM (SELECT CURRENT_TIMESTAMP AS ts, "
" pg_catalog.pg_last_xlog_receive_location() AS receive_location, "
" pg_catalog.pg_last_xlog_replay_location() AS replay_location, "
@@ -1095,6 +1102,7 @@ standby_monitor(void)
" ) q ");
res = PQexec(my_local_conn, sqlquery);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
@@ -1109,40 +1117,17 @@ standby_monitor(void)
strncpy(last_xlog_replay_location, PQgetvalue(res, 0, 2), MAXLEN);
strncpy(last_xact_replay_timestamp, PQgetvalue(res, 0, 3), MAXLEN);
last_xlog_receive_location_gte_replayed = (strcmp(PQgetvalue(res, 0, 4), "t") == 0)
receiving_streamed_wal = (strcmp(PQgetvalue(res, 0, 4), "t") == 0)
? true
: false;
/*
* If pg_last_xlog_receive_location is NULL, this means we're in archive
* recovery and will need to calculate lag based on pg_last_xlog_replay_location
*/
/*
* Replayed WAL is greater than received streamed WAL
*/
if (PQgetisnull(res, 0, 1))
if (receiving_streamed_wal == false)
{
receiving_streamed_wal = false;
log_verbose(LOG_DEBUG, _("standby %i not connected to streaming replication"), local_options.node);
}
PQclear(res);
/*
* In the unusual event of a standby becoming disconnected from the primary,
* while this repmgrd remains connected to the primary, subtracting
* "last_xlog_replay_location" from "lsn_last_xlog_receive_location" and coercing to
* (long long unsigned int) will result in a meaningless, very large
* value which will overflow a BIGINT column and spew error messages into the
* PostgreSQL log. In the absence of a better strategy, skip attempting
* to insert a monitoring record.
*/
if (receiving_streamed_wal == true && last_xlog_receive_location_gte_replayed == false)
{
log_verbose(LOG_WARNING,
"Replayed WAL newer than received WAL - is this standby connected to its upstream?\n");
}
/*
* Get master xlog position
*
@@ -1164,25 +1149,9 @@ standby_monitor(void)
lsn_master_current_xlog_location = lsn_to_xlogrecptr(last_wal_primary_location, NULL);
lsn_last_xlog_replay_location = lsn_to_xlogrecptr(last_xlog_replay_location, NULL);
lsn_last_xlog_receive_location = lsn_to_xlogrecptr(last_xlog_receive_location, NULL);
/* Calculate apply lag */
if (last_xlog_receive_location_gte_replayed == false)
{
/*
* We're not receiving streaming WAL - in this case the receive location
* equals the last replayed location
*/
apply_lag = 0;
strncpy(last_xlog_receive_location, last_xlog_replay_location, MAXLEN);
lsn_last_xlog_receive_location = lsn_to_xlogrecptr(last_xlog_replay_location, NULL);
}
else
{
lsn_last_xlog_receive_location = lsn_to_xlogrecptr(last_xlog_receive_location, NULL);
apply_lag = (long long unsigned int)lsn_last_xlog_receive_location - lsn_last_xlog_replay_location;
}
apply_lag = (long long unsigned int)lsn_last_xlog_receive_location - lsn_last_xlog_replay_location;
/* Calculate replication lag */
if (lsn_master_current_xlog_location >= lsn_last_xlog_receive_location)