repmgr node status: add information about current LSN locations for streaming standbys

This commit is contained in:
Ian Barwick
2017-07-31 09:37:06 +09:00
committed by Ian Barwick
parent d00cb63179
commit 8a5665a421
3 changed files with 122 additions and 8 deletions

View File

@@ -1153,7 +1153,6 @@ get_primary_node_id(PGconn *conn)
log_verbose(LOG_DEBUG, "get_primary_node_id():\n%s", query.data);
res = PQexec(conn, query.data);
termPQExpBuffer(&query);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
@@ -1177,6 +1176,65 @@ get_primary_node_id(PGconn *conn)
}
bool
get_replication_info(PGconn *conn, ReplInfo *replication_info)
{
PQExpBufferData query;
PGresult *res;
initPQExpBuffer(&query);
appendPQExpBuffer(
&query,
" SELECT last_wal_receive_lsn, "
" last_wal_replay_lsn, "
" last_xact_replay_timestamp, "
" CASE WHEN (last_wal_receive_lsn = last_wal_replay_lsn) "
" THEN '0 seconds'::INTERVAL "
" ELSE "
" clock_timestamp() - last_xact_replay_timestamp "
" END AS replication_lag_time "
" FROM ( ");
if (server_version_num >= 100000)
{
appendPQExpBuffer(
&query,
" SELECT pg_last_wal_receive_lsn() AS last_wal_receive_lsn, "
" pg_last_wal_replay_lsn() AS last_wal_replay_lsn, "
" pg_last_xact_replay_timestamp() AS last_xact_replay_timestamp ");
}
else
{
appendPQExpBuffer(
&query,
" SELECT pg_last_xlog_receive_location() AS last_wal_receive_lsn, "
" pg_last_xlog_replay_location() AS last_wal_replay_lsn, "
" pg_last_xact_replay_timestamp() AS last_xact_replay_timestamp ");
}
appendPQExpBuffer(
&query,
" ) q ");
res = PQexec(conn, query.data);
termPQExpBuffer(&query);
if (PQresultStatus(res) != PGRES_TUPLES_OK || !PQntuples(res))
{
log_error(_("unable to execute replication info query:\n %s"),
PQerrorMessage(conn));
PQclear(res);
return false;
}
replication_info->last_wal_receive_lsn = parse_lsn(PQgetvalue(res, 0, 0));
replication_info->last_wal_replay_lsn = parse_lsn(PQgetvalue(res, 0, 1));
strncpy(replication_info->replication_lag_time, PQgetvalue(res, 0, 3), MAXLEN);
return true;
}
/* ================ */
/* result functions */
@@ -1217,7 +1275,7 @@ get_repmgr_extension_status(PGconn *conn)
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
log_error(_("unable to execute extension query:\n %s"),
log_error(_("unable to execute extension query:\n %s"),
PQerrorMessage(conn));
PQclear(res);

View File

@@ -183,6 +183,7 @@ typedef struct s_connection_user
} t_connection_user;
/* represents an entry in bdr.bdr_nodes */
typedef struct s_bdr_node_info
{
char node_sysid[MAXLEN];
@@ -196,7 +197,6 @@ typedef struct s_bdr_node_info
uint32 node_seq_id;
} t_bdr_node_info;
#define T_BDR_NODE_INFO_INITIALIZER { \
"", InvalidOid, InvalidOid, \
'?', "", "", "", \
@@ -224,9 +224,24 @@ typedef struct BdrNodeInfoList
0 \
}
typedef struct {
uint64 last_wal_receive_lsn;
uint64 last_wal_replay_lsn;
char replication_lag_time[MAXLEN];
} ReplInfo;
#define T_REPLINFO_INTIALIZER { \
InvalidXLogRecPtr, \
InvalidXLogRecPtr, \
"" \
}
extern int server_version_num;
/* macros */
#define is_streaming_replication(x) (x == PRIMARY || x == STANDBY)
#define format_lsn(x) (uint32) (x >> 32), (uint32) x
/* utility functions */
XLogRecPtr parse_lsn(const char *str);
@@ -280,10 +295,11 @@ int guc_set_typed(PGconn *conn, const char *parameter, const char *op,
bool get_pg_setting(PGconn *conn, const char *setting, char *output);
/* server information functions */
bool get_cluster_size(PGconn *conn, char *size);
int get_server_version(PGconn *conn, char *server_version);
bool get_cluster_size(PGconn *conn, char *size);
int get_server_version(PGconn *conn, char *server_version);
RecoveryType get_recovery_type(PGconn *conn);
int get_primary_node_id(PGconn *conn);
int get_primary_node_id(PGconn *conn);
bool get_replication_info(PGconn *conn, ReplInfo *replication_info);
/* extension functions */
ExtensionStatus get_repmgr_extension_status(PGconn *conn);

View File

@@ -28,12 +28,15 @@ do_node_status(void)
ItemList warnings = { NULL, NULL };
RecoveryType recovery_type;
ReplInfo replication_info = T_REPLINFO_INTIALIZER;
if (strlen(config_file_options.conninfo))
conn = establish_db_connection(config_file_options.conninfo, true);
else
conn = establish_db_connection_by_params(&source_conninfo, true);
server_version_num = get_server_version(conn, NULL);
if (runtime_options.node_id != UNKNOWN_NODE_ID)
target_node_id = runtime_options.node_id;
else
@@ -157,6 +160,43 @@ do_node_status(void)
"disabled");
}
// if standby (and in recovery), show:
// upstream
// -> check if matches expected; parse recovery.conf for < 9.6 (must be superuser),
// otherwise use pg_stat_wal_receiver
// streaming/in archive recovery/disconnected
// last received
// last replayed
// lag if streaming, or if in recovery can compare with upstream
if (node_info.type == STANDBY)
{
get_replication_info(conn, &replication_info);
key_value_list_set_format(
&node_status,
"Last received LSN",
"%X/%X", format_lsn(replication_info.last_wal_receive_lsn));
key_value_list_set_format(
&node_status,
"Last replayed LSN",
"%X/%X", format_lsn(replication_info.last_wal_replay_lsn));
}
else
{
key_value_list_set(
&node_status,
"Last received LSN",
"");
key_value_list_set(
&node_status,
"Last replayed LSN",
"");
}
initPQExpBuffer(&output);
if (runtime_options.csv == true)
@@ -176,7 +216,7 @@ do_node_status(void)
/* we'll add the raw data as well */
appendPQExpBuffer(
&output,
"max_walsenders,occupied_walsenders,max_replication_slots,active_replication_slots,inactive_replaction_slots\n");
"\"max_walsenders\",\"occupied_walsenders\",\"max_replication_slots\",\"active_replication_slots\",\"inactive_replaction_slots\"\n");
/* output data */
appendPQExpBuffer(
@@ -212,7 +252,7 @@ do_node_status(void)
{
appendPQExpBuffer(
&output,
"\t%s: %s\n", cell->key, cell->value);
"\t%s: %s\n", cell->key, cell->value);
}
}