repmgrd: write monitoring statistics

This commit is contained in:
Ian Barwick
2017-08-24 11:49:44 +09:00
parent 5dfb8a5b06
commit a659132ea4
4 changed files with 237 additions and 35 deletions

163
dbutils.c
View File

@@ -1068,7 +1068,7 @@ get_recovery_type(PGconn *conn)
PGconn *
_get_primary_connection(PGconn *conn,
int *primary_id, char *primary_conninfo_out, bool quiet)
int *primary_id, char *primary_conninfo_out, bool quiet)
{
PQExpBufferData query;
@@ -1250,30 +1250,34 @@ get_replication_info(PGconn *conn, ReplInfo *replication_info)
initPQExpBuffer(&query);
appendPQExpBuffer(
&query,
" SELECT last_wal_receive_lsn, "
" last_wal_replay_lsn, "
" last_xact_replay_timestamp, "
" SELECT ts, "
" last_wal_receive_lsn, "
" last_wal_replay_lsn, "
" last_xact_replay_timestamp, "
" CASE WHEN (last_wal_receive_lsn = last_wal_replay_lsn) "
" THEN 0::INT "
" ELSE "
" EXTRACT(epoch FROM (clock_timestamp() - last_xact_replay_timestamp))::INT "
" END AS replication_lag_time "
" FROM ( ");
" THEN 0::INT "
" ELSE "
" EXTRACT(epoch FROM (clock_timestamp() - last_xact_replay_timestamp))::INT "
" END AS replication_lag_time, "
" COALESCE(last_wal_receive_lsn, '0/0') >= last_wal_replay_lsn AS receiving_streamed_wal "
" FROM ( ");
if (server_version_num >= 100000)
{
appendPQExpBuffer(
&query,
" SELECT pg_last_wal_receive_lsn() AS last_wal_receive_lsn, "
" pg_last_wal_replay_lsn() AS last_wal_replay_lsn, "
" SELECT CURRENT_TIMESTAMP AS ts, "
" pg_last_wal_receive_lsn() AS last_wal_receive_lsn, "
" pg_last_wal_replay_lsn() AS last_wal_replay_lsn, "
" pg_last_xact_replay_timestamp() AS last_xact_replay_timestamp ");
}
else
{
appendPQExpBuffer(
&query,
" SELECT pg_last_xlog_receive_location() AS last_wal_receive_lsn, "
" pg_last_xlog_replay_location() AS last_wal_replay_lsn, "
" SELECT CURRENT_TIMESTAMP AS ts, "
" pg_last_xlog_receive_location() AS last_wal_receive_lsn, "
" pg_last_xlog_replay_location() AS last_wal_replay_lsn, "
" pg_last_xact_replay_timestamp() AS last_xact_replay_timestamp ");
}
@@ -1295,10 +1299,12 @@ get_replication_info(PGconn *conn, ReplInfo *replication_info)
return false;
}
replication_info->last_wal_receive_lsn = parse_lsn(PQgetvalue(res, 0, 0));
replication_info->last_wal_replay_lsn = parse_lsn(PQgetvalue(res, 0, 1));
replication_info->replication_lag_time = atoi(PQgetvalue(res, 0, 3));
strncpy(replication_info->current_timestamp, PQgetvalue(res, 0, 0), MAXLEN);
replication_info->last_wal_receive_lsn = parse_lsn(PQgetvalue(res, 0, 1));
replication_info->last_wal_replay_lsn = parse_lsn(PQgetvalue(res, 0, 2));
strncpy(replication_info->last_xact_replay_timestamp, PQgetvalue(res, 0, 3), MAXLEN);
replication_info->replication_lag_time = atoi(PQgetvalue(res, 0, 4));
replication_info->receiving_streamed_wal = atobool(PQgetvalue(res, 0, 5));
PQclear(res);
return true;
@@ -2631,7 +2637,7 @@ get_configuration_file_locations(PGconn *conn, t_configfile_list *list)
config_file_list_add(list,
PQgetvalue(res, i, 0),
PQgetvalue(res, i, 1),
strcmp(PQgetvalue(res, i, 2), "t") == 1 ? true : false);
atobool(PQgetvalue(res, i, 2)));
}
PQclear(res);
@@ -2676,7 +2682,7 @@ get_configuration_file_locations(PGconn *conn, t_configfile_list *list)
list,
PQgetvalue(res, i, 0),
PQgetvalue(res, i, 1),
strcmp(PQgetvalue(res, i, 2), "t") == 1 ? true : false);
atobool(PQgetvalue(res, i, 2)));
}
PQclear(res);
@@ -3171,9 +3177,7 @@ get_slot_record(PGconn *conn, char *slot_name, t_replication_slot *record)
strncpy(record->slot_name, PQgetvalue(res, 0, 0), MAXLEN);
strncpy(record->slot_type, PQgetvalue(res, 0, 1), MAXLEN);
record->active = (strcmp(PQgetvalue(res, 0, 2), "t") == 0)
? true
: false;
record->active = atobool(PQgetvalue(res, 0, 2));
PQclear(res);
@@ -3347,6 +3351,84 @@ is_server_available(const char *conninfo)
}
/* ==================== */
/* monitoring functions */
/* ==================== */
void
add_monitoring_record(
PGconn *conn,
int primary_node_id,
int local_node_id,
char *monitor_standby_timestamp,
XLogRecPtr primary_last_wal_location,
XLogRecPtr last_wal_receive_lsn,
char *last_xact_replay_timestamp,
long long unsigned int replication_lag_bytes,
long long unsigned int apply_lag_bytes
)
{
PQExpBufferData query;
initPQExpBuffer(&query);
appendPQExpBuffer(
&query,
"INSERT INTO repmgr.monitoring_history "
" (primary_node_id, "
" standby_node_id, "
" last_monitor_time, "
" last_apply_time, "
" last_wal_primary_location, "
" last_wal_standby_location, "
" replication_lag, "
" apply_lag ) "
" VALUES(%i, "
" %i, "
" '%s'::TIMESTAMP WITH TIME ZONE, "
" '%s'::TIMESTAMP WITH TIME ZONE, "
" '%X/%X', "
" '%X/%X', "
" %llu, "
" %llu) ",
primary_node_id,
local_node_id,
monitor_standby_timestamp,
last_xact_replay_timestamp,
format_lsn(primary_last_wal_location),
format_lsn(last_wal_receive_lsn),
replication_lag_bytes,
apply_lag_bytes);
log_verbose(LOG_DEBUG, "standby_monitor:()\n%s", query.data);
if (PQsendQuery(conn, query.data) == 0)
{
log_warning(_("query could not be sent to master: %s\n"),
PQerrorMessage(conn));
}
else
{
//PGresult *res = NULL;
/* sqlquery_snprintf(sqlquery,
"SELECT %s.repmgr_update_last_updated();",
get_repmgr_schema_quoted(my_local_conn));
res = PQexec(my_local_conn, sqlquery);*/
/* not critical if the above query fails*/
/* if (PQresultStatus(res) != PGRES_TUPLES_OK)
log_warning(_("unable to set last_updated: %s\n"), PQerrorMessage(my_local_conn));
PQclear(res);*/
}
termPQExpBuffer(&query);
return;
}
/*
* node voting functions
*
@@ -3512,9 +3594,7 @@ announce_candidature(PGconn *conn, t_node_info *this_node, t_node_info *other_no
res = PQexec(conn, query.data);
termPQExpBuffer(&query);
retval = (strcmp(PQgetvalue(res, 0, 0), "t") == 0)
? true
: false;
retval = atobool(PQgetvalue(res, 0, 0));
PQclear(res);
@@ -3613,6 +3693,33 @@ reset_voting_status(PGconn *conn)
/* replication status functions */
/* ============================ */
XLogRecPtr
get_current_wal_lsn(PGconn *conn)
{
PGresult *res = NULL;
XLogRecPtr ptr = InvalidXLogRecPtr;
if (server_version_num >= 100000)
{
res = PQexec(conn, "SELECT pg_catalog.pg_current_wal_lsn()");
}
else
{
res = PQexec(conn, "SELECT pg_catalog.pg_current_xlog_location()");
}
if (PQresultStatus(res) == PGRES_TUPLES_OK)
{
ptr = parse_lsn(PQgetvalue(res, 0, 0));
}
PQclear(res);
return ptr;
}
XLogRecPtr
get_last_wal_receive_location(PGconn *conn)
{
@@ -3690,7 +3797,7 @@ is_bdr_db(PGconn *conn, PQExpBufferData *output)
res = PQexec(conn, query.data);
termPQExpBuffer(&query);
is_bdr_db = strcmp(PQgetvalue(res, 0, 0), "t") == 0 ? true : false;
is_bdr_db = atobool(PQgetvalue(res, 0, 0));
if (is_bdr_db == false)
{
@@ -3910,7 +4017,7 @@ get_bdr_node_replication_slot_status(PGconn *conn, const char *node_name)
}
else
{
status = (strcmp(PQgetvalue(res, 0, 0), "t") == 0)
status = (atobool(PQgetvalue(res, 0, 0)) == true)
? SLOT_ACTIVE
: SLOT_INACTIVE;
}
@@ -4143,7 +4250,7 @@ am_bdr_failover_handler(PGconn *conn, int node_id)
}
am_handler = (strcmp(PQgetvalue(res, 0, 0), "t") == 0) ? true : false;
am_handler = atobool(PQgetvalue(res, 0, 0));
PQclear(res);

View File

@@ -247,14 +247,19 @@ typedef struct BdrNodeInfoList
}
typedef struct {
char current_timestamp[MAXLEN];
uint64 last_wal_receive_lsn;
uint64 last_wal_replay_lsn;
char last_xact_replay_timestamp[MAXLEN];
int replication_lag_time;
bool receiving_streamed_wal;
} ReplInfo;
#define T_REPLINFO_INTIALIZER { \
"", \
InvalidXLogRecPtr, \
InvalidXLogRecPtr, \
"", \
0 \
}
@@ -304,6 +309,7 @@ XLogRecPtr parse_lsn(const char *str);
extern void wrap_ddl_query(PQExpBufferData *query_buf, int replication_type, const char *fmt, ...)
__attribute__((format(PG_PRINTF_ATTRIBUTE, 3, 4)));
bool atobool(const char *value);
/* connection functions */
PGconn *establish_db_connection(const char *conninfo,
@@ -366,8 +372,7 @@ ExtensionStatus get_repmgr_extension_status(PGconn *conn);
/* node management functions */
void checkpoint(PGconn *conn);
/* result functions */
bool atobool(const char *value);
/* node record functions */
t_server_type parse_node_type(const char *type);
@@ -424,6 +429,19 @@ int wait_connection_availability(PGconn *conn, long long timeout);
/* node availability functions */
bool is_server_available(const char *conninfo);
/* monitoring functions */
void add_monitoring_record(
PGconn *conn,
int primary_node_id,
int local_node_id,
char *monitor_standby_timestamp,
XLogRecPtr primary_last_wal_location,
XLogRecPtr last_wal_receive_lsn,
char *last_xact_replay_timestamp,
long long unsigned int replication_lag_bytes,
long long unsigned int apply_lag_bytes
);
/* node voting functions */
NodeVotingStatus get_voting_status(PGconn *conn);
@@ -435,6 +453,7 @@ bool get_new_primary(PGconn *conn, int *primary_node_id);
void reset_voting_status(PGconn *conn);
/* replication status functions */
XLogRecPtr get_current_wal_lsn(PGconn *conn);
XLogRecPtr get_last_wal_receive_location(PGconn *conn);
bool get_replication_info(PGconn *conn, ReplInfo *replication_info);
int get_replication_lag_seconds(PGconn *conn);

View File

@@ -42,6 +42,7 @@ static PGconn *primary_conn = NULL;
#ifndef BDR_ONLY
static FailoverState failover_state = FAILOVER_STATE_UNKNOWN;
static int primary_node_id = UNKNOWN_NODE_ID;
static t_node_info upstream_node_info = T_NODE_INFO_INITIALIZER;
static NodeInfoList standby_nodes = T_NODE_INFO_LIST_INITIALIZER;
@@ -66,6 +67,7 @@ void close_connections_physical();
static bool do_primary_failover(void);
static bool do_upstream_standby_failover(void);
static void update_monitoring_history(void);
#endif
@@ -355,6 +357,7 @@ monitor_streaming_primary(void)
}
}
log_verbose(LOG_DEBUG, "sleeping %i seconds (parameter \"monitor_interval_secs\")",
config_file_options.monitor_interval_secs);
@@ -423,7 +426,7 @@ monitor_streaming_standby(void)
* Upstream node must be running.
*
* We could possibly have repmgrd skip to degraded monitoring mode until it
* comes up, but there doesn't seem to be much point in doint that.
* comes up, but there doesn't seem to be much point in doing that.
*/
if (PQstatus(upstream_conn) != CONNECTION_OK)
{
@@ -461,6 +464,8 @@ monitor_streaming_standby(void)
primary_conn = upstream_conn;
}
primary_node_id = get_primary_node_id(primary_conn);
/* Log startup event */
if (startup_event_logged == false)
{
@@ -567,7 +572,10 @@ monitor_streaming_standby(void)
// it's possible it will make sense to return in
// all cases to restart monitoring
if (failover_done == true)
{
primary_node_id = get_primary_node_id(primary_conn);
return;
}
}
}
}
@@ -764,6 +772,9 @@ monitor_streaming_standby(void)
check_connection(&local_node_info, local_conn);
if (config_file_options.monitoring_history == true)
update_monitoring_history();
sleep(config_file_options.monitor_interval_secs);
}
#endif
@@ -1029,12 +1040,79 @@ do_primary_failover(void)
return false;
}
/* should never reach here */
return false;
}
static void
update_monitoring_history(void)
{
ReplInfo replication_info = T_REPLINFO_INTIALIZER;
XLogRecPtr primary_last_wal_location = InvalidXLogRecPtr;
long long unsigned int apply_lag_bytes = 0;
long long unsigned int replication_lag_bytes = 0;
/* both local and primary connections must be available */
if (PQstatus(primary_conn) != CONNECTION_OK || PQstatus(local_conn) != CONNECTION_OK)
return;
if (get_replication_info(local_conn, &replication_info) == false)
{
log_warning(_("unable to retrieve replication status information"));
return;
}
if (replication_info.receiving_streamed_wal == false)
{
log_verbose(LOG_DEBUG, _("standby %i not connected to streaming replication"),
local_node_info.node_id);
}
primary_last_wal_location = get_current_wal_lsn(primary_conn);
/* calculate apply lag in bytes */
if (replication_info.last_wal_receive_lsn >= replication_info.last_wal_replay_lsn)
{
apply_lag_bytes = (long long unsigned int) (replication_info.last_wal_receive_lsn - replication_info.last_wal_replay_lsn);
}
else
{
/* if this happens, it probably indicates archive recovery */
apply_lag_bytes = 0;
}
/* calculate replication lag in bytes */
if (primary_last_wal_location >= replication_info.last_wal_receive_lsn)
{
replication_lag_bytes = (long long unsigned int)(primary_last_wal_location - replication_info.last_wal_receive_lsn);
}
else
{
/* This should never happen, but in case it does set replication lag to zero */
log_warning("primary xlog (%X/%X) location appears less than standby receive location (%X/%X)",
format_lsn(primary_last_wal_location),
format_lsn(replication_info.last_wal_receive_lsn));
replication_lag_bytes = 0;
}
add_monitoring_record(
primary_conn,
primary_node_id,
local_node_info.node_id,
replication_info.current_timestamp,
primary_last_wal_location,
replication_info.last_wal_receive_lsn,
replication_info.last_xact_replay_timestamp,
replication_lag_bytes,
apply_lag_bytes);
}
/*
* do_upstream_standby_failover()
*
@@ -1268,7 +1346,7 @@ promote_self(void)
// XXX handle this!
// -> we'll need to let the other nodes know too....
/* no failover occurred but we'll want to restart connections */
//failover_done = true;
return FAILOVER_STATE_PRIMARY_REAPPEARED;
}

View File

@@ -242,7 +242,6 @@ main(int argc, char **argv)
config_file_options.monitoring_history = true;
}
fd = freopen("/dev/null", "r", stdin);
if (fd == NULL)
{
@@ -273,7 +272,6 @@ main(int argc, char **argv)
}
}
log_info(_("connecting to database \"%s\""),
config_file_options.conninfo);