From a659132ea4006cd03d9bceb67cbc3d650bdc50f8 Mon Sep 17 00:00:00 2001 From: Ian Barwick Date: Thu, 24 Aug 2017 11:49:44 +0900 Subject: [PATCH] repmgrd: write monitoring statistics --- dbutils.c | 163 +++++++++++++++++++++++++++++++++++++-------- dbutils.h | 23 ++++++- repmgrd-physical.c | 84 ++++++++++++++++++++++- repmgrd.c | 2 - 4 files changed, 237 insertions(+), 35 deletions(-) diff --git a/dbutils.c b/dbutils.c index 174dc8b0..e04a9114 100644 --- a/dbutils.c +++ b/dbutils.c @@ -1068,7 +1068,7 @@ get_recovery_type(PGconn *conn) PGconn * _get_primary_connection(PGconn *conn, - int *primary_id, char *primary_conninfo_out, bool quiet) + int *primary_id, char *primary_conninfo_out, bool quiet) { PQExpBufferData query; @@ -1250,30 +1250,34 @@ get_replication_info(PGconn *conn, ReplInfo *replication_info) initPQExpBuffer(&query); appendPQExpBuffer( &query, - " SELECT last_wal_receive_lsn, " - " last_wal_replay_lsn, " - " last_xact_replay_timestamp, " + " SELECT ts, " + " last_wal_receive_lsn, " + " last_wal_replay_lsn, " + " last_xact_replay_timestamp, " " CASE WHEN (last_wal_receive_lsn = last_wal_replay_lsn) " - " THEN 0::INT " - " ELSE " - " EXTRACT(epoch FROM (clock_timestamp() - last_xact_replay_timestamp))::INT " - " END AS replication_lag_time " - " FROM ( "); + " THEN 0::INT " + " ELSE " + " EXTRACT(epoch FROM (clock_timestamp() - last_xact_replay_timestamp))::INT " + " END AS replication_lag_time, " + " COALESCE(last_wal_receive_lsn, '0/0') >= last_wal_replay_lsn AS receiving_streamed_wal " + " FROM ( "); if (server_version_num >= 100000) { appendPQExpBuffer( &query, - " SELECT pg_last_wal_receive_lsn() AS last_wal_receive_lsn, " - " pg_last_wal_replay_lsn() AS last_wal_replay_lsn, " + " SELECT CURRENT_TIMESTAMP AS ts, " + " pg_last_wal_receive_lsn() AS last_wal_receive_lsn, " + " pg_last_wal_replay_lsn() AS last_wal_replay_lsn, " " pg_last_xact_replay_timestamp() AS last_xact_replay_timestamp "); } else { appendPQExpBuffer( &query, - " SELECT pg_last_xlog_receive_location() AS last_wal_receive_lsn, " - " pg_last_xlog_replay_location() AS last_wal_replay_lsn, " + " SELECT CURRENT_TIMESTAMP AS ts, " + " pg_last_xlog_receive_location() AS last_wal_receive_lsn, " + " pg_last_xlog_replay_location() AS last_wal_replay_lsn, " " pg_last_xact_replay_timestamp() AS last_xact_replay_timestamp "); } @@ -1295,10 +1299,12 @@ get_replication_info(PGconn *conn, ReplInfo *replication_info) return false; } - replication_info->last_wal_receive_lsn = parse_lsn(PQgetvalue(res, 0, 0)); - replication_info->last_wal_replay_lsn = parse_lsn(PQgetvalue(res, 0, 1)); - replication_info->replication_lag_time = atoi(PQgetvalue(res, 0, 3)); - + strncpy(replication_info->current_timestamp, PQgetvalue(res, 0, 0), MAXLEN); + replication_info->last_wal_receive_lsn = parse_lsn(PQgetvalue(res, 0, 1)); + replication_info->last_wal_replay_lsn = parse_lsn(PQgetvalue(res, 0, 2)); + strncpy(replication_info->last_xact_replay_timestamp, PQgetvalue(res, 0, 3), MAXLEN); + replication_info->replication_lag_time = atoi(PQgetvalue(res, 0, 4)); + replication_info->receiving_streamed_wal = atobool(PQgetvalue(res, 0, 5)); PQclear(res); return true; @@ -2631,7 +2637,7 @@ get_configuration_file_locations(PGconn *conn, t_configfile_list *list) config_file_list_add(list, PQgetvalue(res, i, 0), PQgetvalue(res, i, 1), - strcmp(PQgetvalue(res, i, 2), "t") == 1 ? true : false); + atobool(PQgetvalue(res, i, 2))); } PQclear(res); @@ -2676,7 +2682,7 @@ get_configuration_file_locations(PGconn *conn, t_configfile_list *list) list, PQgetvalue(res, i, 0), PQgetvalue(res, i, 1), - strcmp(PQgetvalue(res, i, 2), "t") == 1 ? true : false); + atobool(PQgetvalue(res, i, 2))); } PQclear(res); @@ -3171,9 +3177,7 @@ get_slot_record(PGconn *conn, char *slot_name, t_replication_slot *record) strncpy(record->slot_name, PQgetvalue(res, 0, 0), MAXLEN); strncpy(record->slot_type, PQgetvalue(res, 0, 1), MAXLEN); - record->active = (strcmp(PQgetvalue(res, 0, 2), "t") == 0) - ? true - : false; + record->active = atobool(PQgetvalue(res, 0, 2)); PQclear(res); @@ -3347,6 +3351,84 @@ is_server_available(const char *conninfo) } +/* ==================== */ +/* monitoring functions */ +/* ==================== */ + +void +add_monitoring_record( + PGconn *conn, + int primary_node_id, + int local_node_id, + char *monitor_standby_timestamp, + XLogRecPtr primary_last_wal_location, + XLogRecPtr last_wal_receive_lsn, + char *last_xact_replay_timestamp, + long long unsigned int replication_lag_bytes, + long long unsigned int apply_lag_bytes + ) +{ + PQExpBufferData query; + + initPQExpBuffer(&query); + + appendPQExpBuffer( + &query, + "INSERT INTO repmgr.monitoring_history " + " (primary_node_id, " + " standby_node_id, " + " last_monitor_time, " + " last_apply_time, " + " last_wal_primary_location, " + " last_wal_standby_location, " + " replication_lag, " + " apply_lag ) " + " VALUES(%i, " + " %i, " + " '%s'::TIMESTAMP WITH TIME ZONE, " + " '%s'::TIMESTAMP WITH TIME ZONE, " + " '%X/%X', " + " '%X/%X', " + " %llu, " + " %llu) ", + primary_node_id, + local_node_id, + monitor_standby_timestamp, + last_xact_replay_timestamp, + format_lsn(primary_last_wal_location), + format_lsn(last_wal_receive_lsn), + replication_lag_bytes, + apply_lag_bytes); + + log_verbose(LOG_DEBUG, "standby_monitor:()\n%s", query.data); + + if (PQsendQuery(conn, query.data) == 0) + { + log_warning(_("query could not be sent to master: %s\n"), + PQerrorMessage(conn)); + } + else + { + //PGresult *res = NULL; + +/* sqlquery_snprintf(sqlquery, + "SELECT %s.repmgr_update_last_updated();", + get_repmgr_schema_quoted(my_local_conn)); + res = PQexec(my_local_conn, sqlquery);*/ + + /* not critical if the above query fails*/ +/* if (PQresultStatus(res) != PGRES_TUPLES_OK) + log_warning(_("unable to set last_updated: %s\n"), PQerrorMessage(my_local_conn)); + + PQclear(res);*/ + } + + termPQExpBuffer(&query); + + return; +} + + /* * node voting functions * @@ -3512,9 +3594,7 @@ announce_candidature(PGconn *conn, t_node_info *this_node, t_node_info *other_no res = PQexec(conn, query.data); termPQExpBuffer(&query); - retval = (strcmp(PQgetvalue(res, 0, 0), "t") == 0) - ? true - : false; + retval = atobool(PQgetvalue(res, 0, 0)); PQclear(res); @@ -3613,6 +3693,33 @@ reset_voting_status(PGconn *conn) /* replication status functions */ /* ============================ */ + +XLogRecPtr +get_current_wal_lsn(PGconn *conn) +{ + PGresult *res = NULL; + XLogRecPtr ptr = InvalidXLogRecPtr; + + + if (server_version_num >= 100000) + { + res = PQexec(conn, "SELECT pg_catalog.pg_current_wal_lsn()"); + } + else + { + res = PQexec(conn, "SELECT pg_catalog.pg_current_xlog_location()"); + } + + if (PQresultStatus(res) == PGRES_TUPLES_OK) + { + ptr = parse_lsn(PQgetvalue(res, 0, 0)); + } + + PQclear(res); + + return ptr; +} + XLogRecPtr get_last_wal_receive_location(PGconn *conn) { @@ -3690,7 +3797,7 @@ is_bdr_db(PGconn *conn, PQExpBufferData *output) res = PQexec(conn, query.data); termPQExpBuffer(&query); - is_bdr_db = strcmp(PQgetvalue(res, 0, 0), "t") == 0 ? true : false; + is_bdr_db = atobool(PQgetvalue(res, 0, 0)); if (is_bdr_db == false) { @@ -3910,7 +4017,7 @@ get_bdr_node_replication_slot_status(PGconn *conn, const char *node_name) } else { - status = (strcmp(PQgetvalue(res, 0, 0), "t") == 0) + status = (atobool(PQgetvalue(res, 0, 0)) == true) ? SLOT_ACTIVE : SLOT_INACTIVE; } @@ -4143,7 +4250,7 @@ am_bdr_failover_handler(PGconn *conn, int node_id) } - am_handler = (strcmp(PQgetvalue(res, 0, 0), "t") == 0) ? true : false; + am_handler = atobool(PQgetvalue(res, 0, 0)); PQclear(res); diff --git a/dbutils.h b/dbutils.h index c8e946f4..e34d4da4 100644 --- a/dbutils.h +++ b/dbutils.h @@ -247,14 +247,19 @@ typedef struct BdrNodeInfoList } typedef struct { + char current_timestamp[MAXLEN]; uint64 last_wal_receive_lsn; uint64 last_wal_replay_lsn; + char last_xact_replay_timestamp[MAXLEN]; int replication_lag_time; + bool receiving_streamed_wal; } ReplInfo; #define T_REPLINFO_INTIALIZER { \ + "", \ InvalidXLogRecPtr, \ InvalidXLogRecPtr, \ + "", \ 0 \ } @@ -304,6 +309,7 @@ XLogRecPtr parse_lsn(const char *str); extern void wrap_ddl_query(PQExpBufferData *query_buf, int replication_type, const char *fmt, ...) __attribute__((format(PG_PRINTF_ATTRIBUTE, 3, 4))); +bool atobool(const char *value); /* connection functions */ PGconn *establish_db_connection(const char *conninfo, @@ -366,8 +372,7 @@ ExtensionStatus get_repmgr_extension_status(PGconn *conn); /* node management functions */ void checkpoint(PGconn *conn); -/* result functions */ -bool atobool(const char *value); + /* node record functions */ t_server_type parse_node_type(const char *type); @@ -424,6 +429,19 @@ int wait_connection_availability(PGconn *conn, long long timeout); /* node availability functions */ bool is_server_available(const char *conninfo); +/* monitoring functions */ +void add_monitoring_record( + PGconn *conn, + int primary_node_id, + int local_node_id, + char *monitor_standby_timestamp, + XLogRecPtr primary_last_wal_location, + XLogRecPtr last_wal_receive_lsn, + char *last_xact_replay_timestamp, + long long unsigned int replication_lag_bytes, + long long unsigned int apply_lag_bytes + ); + /* node voting functions */ NodeVotingStatus get_voting_status(PGconn *conn); @@ -435,6 +453,7 @@ bool get_new_primary(PGconn *conn, int *primary_node_id); void reset_voting_status(PGconn *conn); /* replication status functions */ +XLogRecPtr get_current_wal_lsn(PGconn *conn); XLogRecPtr get_last_wal_receive_location(PGconn *conn); bool get_replication_info(PGconn *conn, ReplInfo *replication_info); int get_replication_lag_seconds(PGconn *conn); diff --git a/repmgrd-physical.c b/repmgrd-physical.c index 484dc378..0eae0aa5 100644 --- a/repmgrd-physical.c +++ b/repmgrd-physical.c @@ -42,6 +42,7 @@ static PGconn *primary_conn = NULL; #ifndef BDR_ONLY static FailoverState failover_state = FAILOVER_STATE_UNKNOWN; +static int primary_node_id = UNKNOWN_NODE_ID; static t_node_info upstream_node_info = T_NODE_INFO_INITIALIZER; static NodeInfoList standby_nodes = T_NODE_INFO_LIST_INITIALIZER; @@ -66,6 +67,7 @@ void close_connections_physical(); static bool do_primary_failover(void); static bool do_upstream_standby_failover(void); +static void update_monitoring_history(void); #endif @@ -355,6 +357,7 @@ monitor_streaming_primary(void) } } + log_verbose(LOG_DEBUG, "sleeping %i seconds (parameter \"monitor_interval_secs\")", config_file_options.monitor_interval_secs); @@ -423,7 +426,7 @@ monitor_streaming_standby(void) * Upstream node must be running. * * We could possibly have repmgrd skip to degraded monitoring mode until it - * comes up, but there doesn't seem to be much point in doint that. + * comes up, but there doesn't seem to be much point in doing that. */ if (PQstatus(upstream_conn) != CONNECTION_OK) { @@ -461,6 +464,8 @@ monitor_streaming_standby(void) primary_conn = upstream_conn; } + primary_node_id = get_primary_node_id(primary_conn); + /* Log startup event */ if (startup_event_logged == false) { @@ -567,7 +572,10 @@ monitor_streaming_standby(void) // it's possible it will make sense to return in // all cases to restart monitoring if (failover_done == true) + { + primary_node_id = get_primary_node_id(primary_conn); return; + } } } } @@ -764,6 +772,9 @@ monitor_streaming_standby(void) check_connection(&local_node_info, local_conn); + if (config_file_options.monitoring_history == true) + update_monitoring_history(); + sleep(config_file_options.monitor_interval_secs); } #endif @@ -1029,12 +1040,79 @@ do_primary_failover(void) return false; } - /* should never reach here */ return false; } + + +static void +update_monitoring_history(void) +{ + ReplInfo replication_info = T_REPLINFO_INTIALIZER; + XLogRecPtr primary_last_wal_location = InvalidXLogRecPtr; + + long long unsigned int apply_lag_bytes = 0; + long long unsigned int replication_lag_bytes = 0; + + /* both local and primary connections must be available */ + if (PQstatus(primary_conn) != CONNECTION_OK || PQstatus(local_conn) != CONNECTION_OK) + return; + + if (get_replication_info(local_conn, &replication_info) == false) + { + log_warning(_("unable to retrieve replication status information")); + return; + } + + if (replication_info.receiving_streamed_wal == false) + { + log_verbose(LOG_DEBUG, _("standby %i not connected to streaming replication"), + local_node_info.node_id); + } + + primary_last_wal_location = get_current_wal_lsn(primary_conn); + + /* calculate apply lag in bytes */ + if (replication_info.last_wal_receive_lsn >= replication_info.last_wal_replay_lsn) + { + apply_lag_bytes = (long long unsigned int) (replication_info.last_wal_receive_lsn - replication_info.last_wal_replay_lsn); + } + else + { + /* if this happens, it probably indicates archive recovery */ + apply_lag_bytes = 0; + } + + /* calculate replication lag in bytes */ + + if (primary_last_wal_location >= replication_info.last_wal_receive_lsn) + { + replication_lag_bytes = (long long unsigned int)(primary_last_wal_location - replication_info.last_wal_receive_lsn); + } + else + { + /* This should never happen, but in case it does set replication lag to zero */ + log_warning("primary xlog (%X/%X) location appears less than standby receive location (%X/%X)", + format_lsn(primary_last_wal_location), + format_lsn(replication_info.last_wal_receive_lsn)); + replication_lag_bytes = 0; + } + + add_monitoring_record( + primary_conn, + primary_node_id, + local_node_info.node_id, + replication_info.current_timestamp, + primary_last_wal_location, + replication_info.last_wal_receive_lsn, + replication_info.last_xact_replay_timestamp, + replication_lag_bytes, + apply_lag_bytes); +} + + /* * do_upstream_standby_failover() * @@ -1268,7 +1346,7 @@ promote_self(void) // XXX handle this! // -> we'll need to let the other nodes know too.... /* no failover occurred but we'll want to restart connections */ - //failover_done = true; + return FAILOVER_STATE_PRIMARY_REAPPEARED; } diff --git a/repmgrd.c b/repmgrd.c index 2579ec80..660425c6 100644 --- a/repmgrd.c +++ b/repmgrd.c @@ -242,7 +242,6 @@ main(int argc, char **argv) config_file_options.monitoring_history = true; } - fd = freopen("/dev/null", "r", stdin); if (fd == NULL) { @@ -273,7 +272,6 @@ main(int argc, char **argv) } } - log_info(_("connecting to database \"%s\""), config_file_options.conninfo);