mirror of
https://github.com/EnterpriseDB/repmgr.git
synced 2026-03-23 07:06:30 +00:00
Lag is now monitored in bytes, we show replication lag
and applied lag
This commit is contained in:
112
main.c
112
main.c
@@ -30,12 +30,12 @@ void setMyLocalMode(void);
|
||||
void checkClusterConfiguration(void);
|
||||
void checkNodeConfiguration(char *conninfo);
|
||||
void getPrimaryConnection(void);
|
||||
void getLocalMonitoredInfo(char *currTimestamp, char *xlogLocation,
|
||||
char *xlogTimestamp);
|
||||
|
||||
void MonitorCheck(void);
|
||||
void MonitorExecute(void);
|
||||
|
||||
unsigned long long int walLocationToBytes(char *wal_location);
|
||||
|
||||
|
||||
int
|
||||
main(int argc, char **argv)
|
||||
@@ -123,7 +123,7 @@ getPrimaryConnection(void)
|
||||
for (i = 0; i < PQntuples(res1); i++)
|
||||
{
|
||||
primaryId = atoi(PQgetvalue(res1, i, 0));
|
||||
strcmp(primaryConninfo, PQgetvalue(res1, i, 1));
|
||||
strcmp(primaryConninfo, PQgetvalue(res1, i, 2));
|
||||
primaryConn = establishDBConnection(primaryConninfo, false);
|
||||
|
||||
res2 = PQexec(primaryConn, "SELECT pg_is_in_recovery()");
|
||||
@@ -169,32 +169,6 @@ getPrimaryConnection(void)
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
getLocalMonitoredInfo(char *currTimestamp, char *xlogLocation, char *xlogTimestamp)
|
||||
{
|
||||
PGresult *res;
|
||||
char sqlquery[8192];
|
||||
|
||||
sprintf(sqlquery,
|
||||
"SELECT CURRENT_TIMESTAMP, pg_last_xlog_receive_location(), "
|
||||
" get_last_xlog_replay_timestamp()");
|
||||
|
||||
res = PQexec(myLocalConn, sqlquery);
|
||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||
{
|
||||
fprintf(stderr, "PQexec failed: %s", PQerrorMessage(myLocalConn));
|
||||
PQclear(res);
|
||||
PQfinish(myLocalConn);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
strcpy(currTimestamp, PQgetvalue(res, 0, 0));
|
||||
strcpy(xlogLocation , PQgetvalue(res, 0, 1));
|
||||
strcpy(xlogTimestamp, PQgetvalue(res, 0, 2));
|
||||
PQclear(res);
|
||||
return;
|
||||
}
|
||||
|
||||
void
|
||||
MonitorCheck(void) {
|
||||
/*
|
||||
@@ -215,14 +189,55 @@ MonitorCheck(void) {
|
||||
void
|
||||
MonitorExecute(void)
|
||||
{
|
||||
PGresult *res;
|
||||
char sqlquery[8192];
|
||||
char monitor_timestamp[MAXLEN];
|
||||
char last_wal_location[MAXLEN];
|
||||
char last_wal_timestamp[MAXLEN];
|
||||
char monitor_standby_timestamp[MAXLEN];
|
||||
char last_wal_primary_location[MAXLEN];
|
||||
char last_wal_standby_received[MAXLEN];
|
||||
char last_wal_standby_applied[MAXLEN];
|
||||
char last_wal_standby_timestamp[MAXLEN];
|
||||
|
||||
getLocalMonitoredInfo(monitor_timestamp,
|
||||
last_wal_location,
|
||||
last_wal_timestamp);
|
||||
unsigned long long int lsn_primary;
|
||||
unsigned long long int lsn_standby_received;
|
||||
unsigned long long int lsn_standby_applied;
|
||||
|
||||
/* Get local xlog info */
|
||||
sprintf(sqlquery,
|
||||
"SELECT CURRENT_TIMESTAMP, pg_last_xlog_receive_location(), "
|
||||
"pg_last_xlog_replay_location(), get_last_xlog_replay_timestamp()");
|
||||
|
||||
res = PQexec(myLocalConn, sqlquery);
|
||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||
{
|
||||
fprintf(stderr, "PQexec failed: %s", PQerrorMessage(myLocalConn));
|
||||
PQclear(res);
|
||||
return;
|
||||
}
|
||||
|
||||
strcpy(monitor_standby_timestamp, PQgetvalue(res, 0, 0));
|
||||
strcpy(last_wal_standby_received , PQgetvalue(res, 0, 1));
|
||||
strcpy(last_wal_standby_applied , PQgetvalue(res, 0, 2));
|
||||
strcpy(last_wal_standby_timestamp, PQgetvalue(res, 0, 3));
|
||||
PQclear(res);
|
||||
|
||||
/* Get primary xlog info */
|
||||
sprintf(sqlquery, "SELECT pg_current_xlog_location() ");
|
||||
|
||||
res = PQexec(primaryConn, sqlquery);
|
||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||
{
|
||||
fprintf(stderr, "PQexec failed: %s", PQerrorMessage(primaryConn));
|
||||
PQclear(res);
|
||||
return;
|
||||
}
|
||||
|
||||
strcpy(last_wal_primary_location, PQgetvalue(res, 0, 0));
|
||||
PQclear(res);
|
||||
|
||||
/* Calculate the lag */
|
||||
lsn_primary = walLocationToBytes(last_wal_primary_location);
|
||||
lsn_standby_received = walLocationToBytes(last_wal_standby_received);
|
||||
lsn_standby_applied = walLocationToBytes(last_wal_standby_applied);
|
||||
|
||||
/*
|
||||
* Build the SQL to execute on primary
|
||||
@@ -230,12 +245,14 @@ MonitorExecute(void)
|
||||
sprintf(sqlquery,
|
||||
"INSERT INTO repl_status "
|
||||
"VALUES(%d, %d, '%s'::timestamp with time zone, "
|
||||
" pg_current_xlog_location(), '%s', "
|
||||
" '%s', '%s', "
|
||||
" '%s'::timestamp with time zone, "
|
||||
" CURRENT_TIMESTAMP - '%s'::timestamp with time zone) ",
|
||||
primaryId, myLocalId, monitor_timestamp,
|
||||
last_wal_location, last_wal_timestamp,
|
||||
last_wal_timestamp);
|
||||
" %lld, %lld)",
|
||||
primaryId, myLocalId, monitor_standby_timestamp,
|
||||
last_wal_primary_location,
|
||||
last_wal_standby_received, last_wal_standby_timestamp,
|
||||
(lsn_primary - lsn_standby_received),
|
||||
(lsn_standby_applied - lsn_standby_received));
|
||||
|
||||
/*
|
||||
* Execute the query asynchronously, but don't check for a result. We
|
||||
@@ -326,3 +343,18 @@ checkNodeConfiguration(char *conninfo)
|
||||
}
|
||||
PQclear(res);
|
||||
}
|
||||
|
||||
|
||||
unsigned long long int
|
||||
walLocationToBytes(char *wal_location)
|
||||
{
|
||||
unsigned int xlogid;
|
||||
unsigned int xrecoff;
|
||||
|
||||
if (sscanf(wal_location, "%X/%X", &xlogid, &xrecoff) != 2)
|
||||
{
|
||||
fprintf(stderr, "wrong log location format: %s", wal_location);
|
||||
return 0;
|
||||
}
|
||||
return ((xlogid * 16 * 1024 * 1024 * 255) + xrecoff);
|
||||
}
|
||||
|
||||
@@ -13,6 +13,6 @@ CREATE TABLE repl_status(
|
||||
last_wal_primary_location TEXT NOT NULL,
|
||||
last_wal_standby_location TEXT NOT NULL,
|
||||
last_wal_standby_timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
|
||||
lag_time INTERVAL NOT NULL
|
||||
-- num_wal_packets INTEGER NOT NULL
|
||||
replication_lag BIGINT NOT NULL,
|
||||
apply_lag BIGINT NOT NULL
|
||||
);
|
||||
|
||||
@@ -19,7 +19,7 @@ Datum
|
||||
last_xlog_replay_timestamp(PG_FUNCTION_ARGS)
|
||||
{
|
||||
TimestampTz rTime;
|
||||
bool fromSource;
|
||||
bool fromStream;
|
||||
|
||||
if (!InRecovery)
|
||||
PG_RETURN_NULL();
|
||||
|
||||
Reference in New Issue
Block a user