diff --git a/dbutils.c b/dbutils.c index 828a57a5..dd0c3576 100644 --- a/dbutils.c +++ b/dbutils.c @@ -1153,7 +1153,6 @@ get_primary_node_id(PGconn *conn) log_verbose(LOG_DEBUG, "get_primary_node_id():\n%s", query.data); res = PQexec(conn, query.data); - termPQExpBuffer(&query); if (PQresultStatus(res) != PGRES_TUPLES_OK) @@ -1177,6 +1176,65 @@ get_primary_node_id(PGconn *conn) } +bool +get_replication_info(PGconn *conn, ReplInfo *replication_info) +{ + PQExpBufferData query; + PGresult *res; + + initPQExpBuffer(&query); + appendPQExpBuffer( + &query, + " SELECT last_wal_receive_lsn, " + " last_wal_replay_lsn, " + " last_xact_replay_timestamp, " + " CASE WHEN (last_wal_receive_lsn = last_wal_replay_lsn) " + " THEN '0 seconds'::INTERVAL " + " ELSE " + " clock_timestamp() - last_xact_replay_timestamp " + " END AS replication_lag_time " + " FROM ( "); + + if (server_version_num >= 100000) + { + appendPQExpBuffer( + &query, + " SELECT pg_last_wal_receive_lsn() AS last_wal_receive_lsn, " + " pg_last_wal_replay_lsn() AS last_wal_replay_lsn, " + " pg_last_xact_replay_timestamp() AS last_xact_replay_timestamp "); + } + else + { + appendPQExpBuffer( + &query, + " SELECT pg_last_xlog_receive_location() AS last_wal_receive_lsn, " + " pg_last_xlog_replay_location() AS last_wal_replay_lsn, " + " pg_last_xact_replay_timestamp() AS last_xact_replay_timestamp "); + } + + appendPQExpBuffer( + &query, + " ) q "); + + res = PQexec(conn, query.data); + termPQExpBuffer(&query); + + if (PQresultStatus(res) != PGRES_TUPLES_OK || !PQntuples(res)) + { + log_error(_("unable to execute replication info query:\n %s"), + PQerrorMessage(conn)); + PQclear(res); + + return false; + } + + replication_info->last_wal_receive_lsn = parse_lsn(PQgetvalue(res, 0, 0)); + replication_info->last_wal_replay_lsn = parse_lsn(PQgetvalue(res, 0, 1)); + strncpy(replication_info->replication_lag_time, PQgetvalue(res, 0, 3), MAXLEN); + + return true; +} + /* ================ */ /* result functions */ @@ -1217,7 +1275,7 @@ get_repmgr_extension_status(PGconn *conn) if (PQresultStatus(res) != PGRES_TUPLES_OK) { - log_error(_("unable to execute extension query:\n %s"), + log_error(_("unable to execute extension query:\n %s"), PQerrorMessage(conn)); PQclear(res); diff --git a/dbutils.h b/dbutils.h index ffb06ee8..ee83f64c 100644 --- a/dbutils.h +++ b/dbutils.h @@ -183,6 +183,7 @@ typedef struct s_connection_user } t_connection_user; +/* represents an entry in bdr.bdr_nodes */ typedef struct s_bdr_node_info { char node_sysid[MAXLEN]; @@ -196,7 +197,6 @@ typedef struct s_bdr_node_info uint32 node_seq_id; } t_bdr_node_info; - #define T_BDR_NODE_INFO_INITIALIZER { \ "", InvalidOid, InvalidOid, \ '?', "", "", "", \ @@ -224,9 +224,24 @@ typedef struct BdrNodeInfoList 0 \ } +typedef struct { + uint64 last_wal_receive_lsn; + uint64 last_wal_replay_lsn; + char replication_lag_time[MAXLEN]; +} ReplInfo; +#define T_REPLINFO_INTIALIZER { \ + InvalidXLogRecPtr, \ + InvalidXLogRecPtr, \ + "" \ +} extern int server_version_num; +/* macros */ + +#define is_streaming_replication(x) (x == PRIMARY || x == STANDBY) +#define format_lsn(x) (uint32) (x >> 32), (uint32) x + /* utility functions */ XLogRecPtr parse_lsn(const char *str); @@ -280,10 +295,11 @@ int guc_set_typed(PGconn *conn, const char *parameter, const char *op, bool get_pg_setting(PGconn *conn, const char *setting, char *output); /* server information functions */ -bool get_cluster_size(PGconn *conn, char *size); -int get_server_version(PGconn *conn, char *server_version); +bool get_cluster_size(PGconn *conn, char *size); +int get_server_version(PGconn *conn, char *server_version); RecoveryType get_recovery_type(PGconn *conn); -int get_primary_node_id(PGconn *conn); +int get_primary_node_id(PGconn *conn); +bool get_replication_info(PGconn *conn, ReplInfo *replication_info); /* extension functions */ ExtensionStatus get_repmgr_extension_status(PGconn *conn); diff --git a/repmgr-action-node.c b/repmgr-action-node.c index 3e886c06..4251da3b 100644 --- a/repmgr-action-node.c +++ b/repmgr-action-node.c @@ -28,12 +28,15 @@ do_node_status(void) ItemList warnings = { NULL, NULL }; RecoveryType recovery_type; + ReplInfo replication_info = T_REPLINFO_INTIALIZER; if (strlen(config_file_options.conninfo)) conn = establish_db_connection(config_file_options.conninfo, true); else conn = establish_db_connection_by_params(&source_conninfo, true); + server_version_num = get_server_version(conn, NULL); + if (runtime_options.node_id != UNKNOWN_NODE_ID) target_node_id = runtime_options.node_id; else @@ -157,6 +160,43 @@ do_node_status(void) "disabled"); } + + // if standby (and in recovery), show: + // upstream + // -> check if matches expected; parse recovery.conf for < 9.6 (must be superuser), + // otherwise use pg_stat_wal_receiver + // streaming/in archive recovery/disconnected + // last received + // last replayed + // lag if streaming, or if in recovery can compare with upstream + + if (node_info.type == STANDBY) + { + get_replication_info(conn, &replication_info); + + key_value_list_set_format( + &node_status, + "Last received LSN", + "%X/%X", format_lsn(replication_info.last_wal_receive_lsn)); + + key_value_list_set_format( + &node_status, + "Last replayed LSN", + "%X/%X", format_lsn(replication_info.last_wal_replay_lsn)); + } + else + { + key_value_list_set( + &node_status, + "Last received LSN", + ""); + key_value_list_set( + &node_status, + "Last replayed LSN", + ""); + + } + initPQExpBuffer(&output); if (runtime_options.csv == true) @@ -176,7 +216,7 @@ do_node_status(void) /* we'll add the raw data as well */ appendPQExpBuffer( &output, - "max_walsenders,occupied_walsenders,max_replication_slots,active_replication_slots,inactive_replaction_slots\n"); + "\"max_walsenders\",\"occupied_walsenders\",\"max_replication_slots\",\"active_replication_slots\",\"inactive_replaction_slots\"\n"); /* output data */ appendPQExpBuffer( @@ -212,7 +252,7 @@ do_node_status(void) { appendPQExpBuffer( &output, - "\t%s: %s\n", cell->key, cell->value); + "\t%s: %s\n", cell->key, cell->value); } }