repmgrd: store upstream node ID in shared memory

This commit is contained in:
Ian Barwick
2019-04-04 14:55:25 +09:00
parent 5a8741199f
commit 008bd00a59
6 changed files with 86 additions and 8 deletions

View File

@@ -1940,6 +1940,31 @@ get_wal_receiver_pid(PGconn *conn)
return wal_receiver_pid; return wal_receiver_pid;
} }
int
repmgrd_get_upstream_node_id(PGconn *conn)
{
PGresult *res = NULL;
int upstream_node_id = UNKNOWN_NODE_ID;
const char *sqlquery = "SELECT repmgr.get_upstream_node_id()";
res = PQexec(conn, sqlquery);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
log_db_error(conn, sqlquery, _("repmgrd_get_upstream_node_id(): unable to execute query"));
}
else if (!PQgetisnull(res, 0, 0))
{
upstream_node_id = atoi(PQgetvalue(res, 0, 0));
}
PQclear(res);
return upstream_node_id;
}
/* ================ */ /* ================ */
/* result functions */ /* result functions */
/* ================ */ /* ================ */
@@ -5234,15 +5259,16 @@ is_downstream_node_attached(PGconn *conn, char *node_name)
void void
set_upstream_last_seen(PGconn *conn) set_upstream_last_seen(PGconn *conn, int upstream_node_id)
{ {
PQExpBufferData query; PQExpBufferData query;
PGresult *res = NULL; PGresult *res = NULL;
initPQExpBuffer(&query); initPQExpBuffer(&query);
appendPQExpBufferStr(&query, appendPQExpBuffer(&query,
"SELECT repmgr.set_upstream_last_seen()"); "SELECT repmgr.set_upstream_last_seen(%i)",
upstream_node_id);
res = PQexec(conn, query.data); res = PQexec(conn, query.data);

View File

@@ -440,6 +440,7 @@ bool repmgrd_is_running(PGconn *conn);
bool repmgrd_is_paused(PGconn *conn); bool repmgrd_is_paused(PGconn *conn);
bool repmgrd_pause(PGconn *conn, bool pause); bool repmgrd_pause(PGconn *conn, bool pause);
pid_t get_wal_receiver_pid(PGconn *conn); pid_t get_wal_receiver_pid(PGconn *conn);
int repmgrd_get_upstream_node_id(PGconn *conn);
/* extension functions */ /* extension functions */
ExtensionStatus get_repmgr_extension_status(PGconn *conn, t_extension_versions *extversions); ExtensionStatus get_repmgr_extension_status(PGconn *conn, t_extension_versions *extversions);
@@ -560,8 +561,9 @@ bool get_replication_info(PGconn *conn, t_server_type node_type, ReplInfo *repl
int get_replication_lag_seconds(PGconn *conn); int get_replication_lag_seconds(PGconn *conn);
void get_node_replication_stats(PGconn *conn, t_node_info *node_info); void get_node_replication_stats(PGconn *conn, t_node_info *node_info);
bool is_downstream_node_attached(PGconn *conn, char *node_name); bool is_downstream_node_attached(PGconn *conn, char *node_name);
void set_upstream_last_seen(PGconn *conn); void set_upstream_last_seen(PGconn *conn, int upstream_node_id);
int get_upstream_last_seen(PGconn *conn, t_server_type node_type); int get_upstream_last_seen(PGconn *conn, t_server_type node_type);
bool is_wal_replay_paused(PGconn *conn, bool check_pending_wal); bool is_wal_replay_paused(PGconn *conn, bool check_pending_wal);
/* BDR functions */ /* BDR functions */

View File

@@ -1,2 +1,14 @@
-- complain if script is sourced in psql, rather than via CREATE EXTENSION -- complain if script is sourced in psql, rather than via CREATE EXTENSION
\echo Use "CREATE EXTENSION repmgr" to load this file. \quit \echo Use "CREATE EXTENSION repmgr" to load this file. \quit
DROP FUNCTION set_upstream_last_seen();
CREATE FUNCTION set_upstream_last_seen(INT)
RETURNS VOID
AS 'MODULE_PATHNAME', 'set_upstream_last_seen'
LANGUAGE C STRICT;
CREATE FUNCTION get_upstream_node_id()
RETURNS INT
AS 'MODULE_PATHNAME', 'get_upstream_node_id'
LANGUAGE C STRICT;

View File

@@ -118,7 +118,7 @@ CREATE FUNCTION standby_get_last_updated()
AS 'MODULE_PATHNAME', 'standby_get_last_updated' AS 'MODULE_PATHNAME', 'standby_get_last_updated'
LANGUAGE C STRICT; LANGUAGE C STRICT;
CREATE FUNCTION set_upstream_last_seen() CREATE FUNCTION set_upstream_last_seen(INT)
RETURNS VOID RETURNS VOID
AS 'MODULE_PATHNAME', 'set_upstream_last_seen' AS 'MODULE_PATHNAME', 'set_upstream_last_seen'
LANGUAGE C STRICT; LANGUAGE C STRICT;
@@ -128,6 +128,11 @@ CREATE FUNCTION get_upstream_last_seen()
AS 'MODULE_PATHNAME', 'get_upstream_last_seen' AS 'MODULE_PATHNAME', 'get_upstream_last_seen'
LANGUAGE C STRICT; LANGUAGE C STRICT;
CREATE FUNCTION get_upstream_node_id()
RETURNS INT
AS 'MODULE_PATHNAME', 'get_upstream_node_id'
LANGUAGE C STRICT;
/* failover functions */ /* failover functions */

View File

@@ -78,6 +78,7 @@ typedef struct repmgrdSharedState
char repmgrd_pidfile[MAXPGPATH]; char repmgrd_pidfile[MAXPGPATH];
bool repmgrd_paused; bool repmgrd_paused;
/* streaming failover */ /* streaming failover */
int upstream_node_id;
TimestampTz upstream_last_seen; TimestampTz upstream_last_seen;
NodeVotingStatus voting_status; NodeVotingStatus voting_status;
int current_electoral_term; int current_electoral_term;
@@ -115,6 +116,9 @@ PG_FUNCTION_INFO_V1(set_upstream_last_seen);
Datum get_upstream_last_seen(PG_FUNCTION_ARGS); Datum get_upstream_last_seen(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(get_upstream_last_seen); PG_FUNCTION_INFO_V1(get_upstream_last_seen);
Datum get_upstream_node_id(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(get_upstream_node_id);
Datum notify_follow_primary(PG_FUNCTION_ARGS); Datum notify_follow_primary(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(notify_follow_primary); PG_FUNCTION_INFO_V1(notify_follow_primary);
@@ -228,6 +232,7 @@ repmgr_shmem_startup(void)
memset(shared_state->repmgrd_pidfile, 0, MAXPGPATH); memset(shared_state->repmgrd_pidfile, 0, MAXPGPATH);
shared_state->repmgrd_paused = false; shared_state->repmgrd_paused = false;
shared_state->current_electoral_term = 0; shared_state->current_electoral_term = 0;
shared_state->upstream_node_id = UNKNOWN_NODE_ID;
/* arbitrary "magic" date to indicate this field hasn't been updated */ /* arbitrary "magic" date to indicate this field hasn't been updated */
shared_state->upstream_last_seen = POSTGRES_EPOCH_JDATE; shared_state->upstream_last_seen = POSTGRES_EPOCH_JDATE;
shared_state->voting_status = VS_NO_VOTE; shared_state->voting_status = VS_NO_VOTE;
@@ -368,13 +373,20 @@ standby_get_last_updated(PG_FUNCTION_ARGS)
Datum Datum
set_upstream_last_seen(PG_FUNCTION_ARGS) set_upstream_last_seen(PG_FUNCTION_ARGS)
{ {
int upstream_node_id = UNKNOWN_NODE_ID;
if (!shared_state) if (!shared_state)
PG_RETURN_VOID(); PG_RETURN_VOID();
if (PG_ARGISNULL(0))
PG_RETURN_NULL();
upstream_node_id = PG_GETARG_INT32(0);
LWLockAcquire(shared_state->lock, LW_EXCLUSIVE); LWLockAcquire(shared_state->lock, LW_EXCLUSIVE);
shared_state->upstream_last_seen = GetCurrentTimestamp(); shared_state->upstream_last_seen = GetCurrentTimestamp();
shared_state->upstream_node_id = upstream_node_id;
LWLockRelease(shared_state->lock); LWLockRelease(shared_state->lock);
PG_RETURN_VOID(); PG_RETURN_VOID();
@@ -415,6 +427,27 @@ get_upstream_last_seen(PG_FUNCTION_ARGS)
} }
Datum
get_upstream_node_id(PG_FUNCTION_ARGS)
{
int upstream_node_id = UNKNOWN_NODE_ID;
if (!shared_state)
PG_RETURN_NULL();
/* A primary node cannot have an upstream ID */
if (!RecoveryInProgress())
PG_RETURN_INT32(UNKNOWN_NODE_ID);
LWLockAcquire(shared_state->lock, LW_SHARED);
upstream_node_id = shared_state->upstream_node_id;
LWLockRelease(shared_state->lock);
PG_RETURN_INT32(upstream_node_id);
}
/* ===================*/ /* ===================*/
/* failover functions */ /* failover functions */
/* ===================*/ /* ===================*/

View File

@@ -878,7 +878,7 @@ monitor_streaming_standby(void)
log_verbose(LOG_DEBUG, "checking %s", upstream_node_info.conninfo); log_verbose(LOG_DEBUG, "checking %s", upstream_node_info.conninfo);
if (check_upstream_connection(&upstream_conn, upstream_node_info.conninfo) == true) if (check_upstream_connection(&upstream_conn, upstream_node_info.conninfo) == true)
{ {
set_upstream_last_seen(local_conn); set_upstream_last_seen(local_conn, upstream_node_info.node_id);
} }
else else
{ {
@@ -1671,7 +1671,7 @@ monitor_streaming_witness(void)
{ {
if (check_upstream_connection(&primary_conn, upstream_node_info.conninfo) == true) if (check_upstream_connection(&primary_conn, upstream_node_info.conninfo) == true)
{ {
set_upstream_last_seen(local_conn); set_upstream_last_seen(local_conn, upstream_node_info.node_id);
} }
else else
{ {