diff --git a/dbutils.c b/dbutils.c index 28a2578c..21601ecd 100644 --- a/dbutils.c +++ b/dbutils.c @@ -1940,6 +1940,31 @@ get_wal_receiver_pid(PGconn *conn) return wal_receiver_pid; } + +int +repmgrd_get_upstream_node_id(PGconn *conn) +{ + PGresult *res = NULL; + int upstream_node_id = UNKNOWN_NODE_ID; + + const char *sqlquery = "SELECT repmgr.get_upstream_node_id()"; + + res = PQexec(conn, sqlquery); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + log_db_error(conn, sqlquery, _("repmgrd_get_upstream_node_id(): unable to execute query")); + } + else if (!PQgetisnull(res, 0, 0)) + { + upstream_node_id = atoi(PQgetvalue(res, 0, 0)); + } + + PQclear(res); + + return upstream_node_id; +} + /* ================ */ /* result functions */ /* ================ */ @@ -5234,15 +5259,16 @@ is_downstream_node_attached(PGconn *conn, char *node_name) void -set_upstream_last_seen(PGconn *conn) +set_upstream_last_seen(PGconn *conn, int upstream_node_id) { PQExpBufferData query; PGresult *res = NULL; initPQExpBuffer(&query); - appendPQExpBufferStr(&query, - "SELECT repmgr.set_upstream_last_seen()"); + appendPQExpBuffer(&query, + "SELECT repmgr.set_upstream_last_seen(%i)", + upstream_node_id); res = PQexec(conn, query.data); diff --git a/dbutils.h b/dbutils.h index d15de5e1..29bdf4b4 100644 --- a/dbutils.h +++ b/dbutils.h @@ -440,6 +440,7 @@ bool repmgrd_is_running(PGconn *conn); bool repmgrd_is_paused(PGconn *conn); bool repmgrd_pause(PGconn *conn, bool pause); pid_t get_wal_receiver_pid(PGconn *conn); +int repmgrd_get_upstream_node_id(PGconn *conn); /* extension functions */ ExtensionStatus get_repmgr_extension_status(PGconn *conn, t_extension_versions *extversions); @@ -560,8 +561,9 @@ bool get_replication_info(PGconn *conn, t_server_type node_type, ReplInfo *repl int get_replication_lag_seconds(PGconn *conn); void get_node_replication_stats(PGconn *conn, t_node_info *node_info); bool is_downstream_node_attached(PGconn *conn, char *node_name); -void set_upstream_last_seen(PGconn *conn); +void set_upstream_last_seen(PGconn *conn, int upstream_node_id); int get_upstream_last_seen(PGconn *conn, t_server_type node_type); + bool is_wal_replay_paused(PGconn *conn, bool check_pending_wal); /* BDR functions */ diff --git a/repmgr--4.3--4.4.sql b/repmgr--4.3--4.4.sql index efddf870..b5666bec 100644 --- a/repmgr--4.3--4.4.sql +++ b/repmgr--4.3--4.4.sql @@ -1,2 +1,14 @@ -- complain if script is sourced in psql, rather than via CREATE EXTENSION \echo Use "CREATE EXTENSION repmgr" to load this file. \quit + +DROP FUNCTION set_upstream_last_seen(); + +CREATE FUNCTION set_upstream_last_seen(INT) + RETURNS VOID + AS 'MODULE_PATHNAME', 'set_upstream_last_seen' + LANGUAGE C STRICT; + +CREATE FUNCTION get_upstream_node_id() + RETURNS INT + AS 'MODULE_PATHNAME', 'get_upstream_node_id' + LANGUAGE C STRICT; diff --git a/repmgr--4.4.sql b/repmgr--4.4.sql index 8496b607..a4ff77d6 100644 --- a/repmgr--4.4.sql +++ b/repmgr--4.4.sql @@ -118,7 +118,7 @@ CREATE FUNCTION standby_get_last_updated() AS 'MODULE_PATHNAME', 'standby_get_last_updated' LANGUAGE C STRICT; -CREATE FUNCTION set_upstream_last_seen() +CREATE FUNCTION set_upstream_last_seen(INT) RETURNS VOID AS 'MODULE_PATHNAME', 'set_upstream_last_seen' LANGUAGE C STRICT; @@ -128,6 +128,11 @@ CREATE FUNCTION get_upstream_last_seen() AS 'MODULE_PATHNAME', 'get_upstream_last_seen' LANGUAGE C STRICT; +CREATE FUNCTION get_upstream_node_id() + RETURNS INT + AS 'MODULE_PATHNAME', 'get_upstream_node_id' + LANGUAGE C STRICT; + /* failover functions */ diff --git a/repmgr.c b/repmgr.c index 29a12785..666b2719 100644 --- a/repmgr.c +++ b/repmgr.c @@ -78,6 +78,7 @@ typedef struct repmgrdSharedState char repmgrd_pidfile[MAXPGPATH]; bool repmgrd_paused; /* streaming failover */ + int upstream_node_id; TimestampTz upstream_last_seen; NodeVotingStatus voting_status; int current_electoral_term; @@ -115,6 +116,9 @@ PG_FUNCTION_INFO_V1(set_upstream_last_seen); Datum get_upstream_last_seen(PG_FUNCTION_ARGS); PG_FUNCTION_INFO_V1(get_upstream_last_seen); +Datum get_upstream_node_id(PG_FUNCTION_ARGS); +PG_FUNCTION_INFO_V1(get_upstream_node_id); + Datum notify_follow_primary(PG_FUNCTION_ARGS); PG_FUNCTION_INFO_V1(notify_follow_primary); @@ -228,6 +232,7 @@ repmgr_shmem_startup(void) memset(shared_state->repmgrd_pidfile, 0, MAXPGPATH); shared_state->repmgrd_paused = false; shared_state->current_electoral_term = 0; + shared_state->upstream_node_id = UNKNOWN_NODE_ID; /* arbitrary "magic" date to indicate this field hasn't been updated */ shared_state->upstream_last_seen = POSTGRES_EPOCH_JDATE; shared_state->voting_status = VS_NO_VOTE; @@ -368,13 +373,20 @@ standby_get_last_updated(PG_FUNCTION_ARGS) Datum set_upstream_last_seen(PG_FUNCTION_ARGS) { + int upstream_node_id = UNKNOWN_NODE_ID; + if (!shared_state) PG_RETURN_VOID(); + if (PG_ARGISNULL(0)) + PG_RETURN_NULL(); + + upstream_node_id = PG_GETARG_INT32(0); + LWLockAcquire(shared_state->lock, LW_EXCLUSIVE); shared_state->upstream_last_seen = GetCurrentTimestamp(); - + shared_state->upstream_node_id = upstream_node_id; LWLockRelease(shared_state->lock); PG_RETURN_VOID(); @@ -415,6 +427,27 @@ get_upstream_last_seen(PG_FUNCTION_ARGS) } + +Datum +get_upstream_node_id(PG_FUNCTION_ARGS) +{ + int upstream_node_id = UNKNOWN_NODE_ID; + + if (!shared_state) + PG_RETURN_NULL(); + + /* A primary node cannot have an upstream ID */ + if (!RecoveryInProgress()) + PG_RETURN_INT32(UNKNOWN_NODE_ID); + + LWLockAcquire(shared_state->lock, LW_SHARED); + upstream_node_id = shared_state->upstream_node_id; + LWLockRelease(shared_state->lock); + + PG_RETURN_INT32(upstream_node_id); +} + + /* ===================*/ /* failover functions */ /* ===================*/ diff --git a/repmgrd-physical.c b/repmgrd-physical.c index b8e662d0..31fbed6c 100644 --- a/repmgrd-physical.c +++ b/repmgrd-physical.c @@ -878,7 +878,7 @@ monitor_streaming_standby(void) log_verbose(LOG_DEBUG, "checking %s", upstream_node_info.conninfo); if (check_upstream_connection(&upstream_conn, upstream_node_info.conninfo) == true) { - set_upstream_last_seen(local_conn); + set_upstream_last_seen(local_conn, upstream_node_info.node_id); } else { @@ -1671,7 +1671,7 @@ monitor_streaming_witness(void) { if (check_upstream_connection(&primary_conn, upstream_node_info.conninfo) == true) { - set_upstream_last_seen(local_conn); + set_upstream_last_seen(local_conn, upstream_node_info.node_id); } else {