From 7ec3485811a747fef0b66aed34d7e09c7cde2fd1 Mon Sep 17 00:00:00 2001 From: postgres Date: Tue, 14 Sep 2010 01:40:59 -0500 Subject: [PATCH] Lag is now monitored in bytes, we show replication lag and applied lag --- main.c | 112 ++++++++++++++++++++----------- repmgr.sql | 4 +- sql_utils/repmgr_wrapper_funcs.c | 2 +- 3 files changed, 75 insertions(+), 43 deletions(-) diff --git a/main.c b/main.c index b664cc15..3f47aaa6 100644 --- a/main.c +++ b/main.c @@ -30,12 +30,12 @@ void setMyLocalMode(void); void checkClusterConfiguration(void); void checkNodeConfiguration(char *conninfo); void getPrimaryConnection(void); -void getLocalMonitoredInfo(char *currTimestamp, char *xlogLocation, - char *xlogTimestamp); void MonitorCheck(void); void MonitorExecute(void); +unsigned long long int walLocationToBytes(char *wal_location); + int main(int argc, char **argv) @@ -123,7 +123,7 @@ getPrimaryConnection(void) for (i = 0; i < PQntuples(res1); i++) { primaryId = atoi(PQgetvalue(res1, i, 0)); - strcmp(primaryConninfo, PQgetvalue(res1, i, 1)); + strcmp(primaryConninfo, PQgetvalue(res1, i, 2)); primaryConn = establishDBConnection(primaryConninfo, false); res2 = PQexec(primaryConn, "SELECT pg_is_in_recovery()"); @@ -169,32 +169,6 @@ getPrimaryConnection(void) } -void -getLocalMonitoredInfo(char *currTimestamp, char *xlogLocation, char *xlogTimestamp) -{ - PGresult *res; - char sqlquery[8192]; - - sprintf(sqlquery, - "SELECT CURRENT_TIMESTAMP, pg_last_xlog_receive_location(), " - " get_last_xlog_replay_timestamp()"); - - res = PQexec(myLocalConn, sqlquery); - if (PQresultStatus(res) != PGRES_TUPLES_OK) - { - fprintf(stderr, "PQexec failed: %s", PQerrorMessage(myLocalConn)); - PQclear(res); - PQfinish(myLocalConn); - exit(1); - } - - strcpy(currTimestamp, PQgetvalue(res, 0, 0)); - strcpy(xlogLocation , PQgetvalue(res, 0, 1)); - strcpy(xlogTimestamp, PQgetvalue(res, 0, 2)); - PQclear(res); - return; -} - void MonitorCheck(void) { /* @@ -215,14 +189,55 @@ MonitorCheck(void) { void MonitorExecute(void) { + PGresult *res; char sqlquery[8192]; - char monitor_timestamp[MAXLEN]; - char last_wal_location[MAXLEN]; - char last_wal_timestamp[MAXLEN]; + char monitor_standby_timestamp[MAXLEN]; + char last_wal_primary_location[MAXLEN]; + char last_wal_standby_received[MAXLEN]; + char last_wal_standby_applied[MAXLEN]; + char last_wal_standby_timestamp[MAXLEN]; - getLocalMonitoredInfo(monitor_timestamp, - last_wal_location, - last_wal_timestamp); + unsigned long long int lsn_primary; + unsigned long long int lsn_standby_received; + unsigned long long int lsn_standby_applied; + + /* Get local xlog info */ + sprintf(sqlquery, + "SELECT CURRENT_TIMESTAMP, pg_last_xlog_receive_location(), " + "pg_last_xlog_replay_location(), get_last_xlog_replay_timestamp()"); + + res = PQexec(myLocalConn, sqlquery); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + fprintf(stderr, "PQexec failed: %s", PQerrorMessage(myLocalConn)); + PQclear(res); + return; + } + + strcpy(monitor_standby_timestamp, PQgetvalue(res, 0, 0)); + strcpy(last_wal_standby_received , PQgetvalue(res, 0, 1)); + strcpy(last_wal_standby_applied , PQgetvalue(res, 0, 2)); + strcpy(last_wal_standby_timestamp, PQgetvalue(res, 0, 3)); + PQclear(res); + + /* Get primary xlog info */ + sprintf(sqlquery, "SELECT pg_current_xlog_location() "); + + res = PQexec(primaryConn, sqlquery); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + fprintf(stderr, "PQexec failed: %s", PQerrorMessage(primaryConn)); + PQclear(res); + return; + } + + strcpy(last_wal_primary_location, PQgetvalue(res, 0, 0)); + PQclear(res); + + /* Calculate the lag */ + lsn_primary = walLocationToBytes(last_wal_primary_location); + lsn_standby_received = walLocationToBytes(last_wal_standby_received); + lsn_standby_applied = walLocationToBytes(last_wal_standby_applied); /* * Build the SQL to execute on primary @@ -230,12 +245,14 @@ MonitorExecute(void) sprintf(sqlquery, "INSERT INTO repl_status " "VALUES(%d, %d, '%s'::timestamp with time zone, " - " pg_current_xlog_location(), '%s', " + " '%s', '%s', " " '%s'::timestamp with time zone, " - " CURRENT_TIMESTAMP - '%s'::timestamp with time zone) ", - primaryId, myLocalId, monitor_timestamp, - last_wal_location, last_wal_timestamp, - last_wal_timestamp); + " %lld, %lld)", + primaryId, myLocalId, monitor_standby_timestamp, + last_wal_primary_location, + last_wal_standby_received, last_wal_standby_timestamp, + (lsn_primary - lsn_standby_received), + (lsn_standby_applied - lsn_standby_received)); /* * Execute the query asynchronously, but don't check for a result. We @@ -326,3 +343,18 @@ checkNodeConfiguration(char *conninfo) } PQclear(res); } + + +unsigned long long int +walLocationToBytes(char *wal_location) +{ + unsigned int xlogid; + unsigned int xrecoff; + + if (sscanf(wal_location, "%X/%X", &xlogid, &xrecoff) != 2) + { + fprintf(stderr, "wrong log location format: %s", wal_location); + return 0; + } + return ((xlogid * 16 * 1024 * 1024 * 255) + xrecoff); +} diff --git a/repmgr.sql b/repmgr.sql index a0092170..65205618 100644 --- a/repmgr.sql +++ b/repmgr.sql @@ -13,6 +13,6 @@ CREATE TABLE repl_status( last_wal_primary_location TEXT NOT NULL, last_wal_standby_location TEXT NOT NULL, last_wal_standby_timestamp TIMESTAMP WITH TIME ZONE NOT NULL, - lag_time INTERVAL NOT NULL --- num_wal_packets INTEGER NOT NULL + replication_lag BIGINT NOT NULL, + apply_lag BIGINT NOT NULL ); diff --git a/sql_utils/repmgr_wrapper_funcs.c b/sql_utils/repmgr_wrapper_funcs.c index ae952946..3278e34d 100644 --- a/sql_utils/repmgr_wrapper_funcs.c +++ b/sql_utils/repmgr_wrapper_funcs.c @@ -19,7 +19,7 @@ Datum last_xlog_replay_timestamp(PG_FUNCTION_ARGS) { TimestampTz rTime; -bool fromSource; +bool fromStream; if (!InRecovery) PG_RETURN_NULL();