From debe5a18c5c8a33932644aa73811ffde630e4603 Mon Sep 17 00:00:00 2001 From: Ian Barwick Date: Fri, 30 Jun 2017 21:45:25 +0900 Subject: [PATCH] have new primary communicate to standbys --- dbutils.c | 54 +++++++++++++++++++++++++++++++++++++ dbutils.h | 2 ++ repmgr--4.0.sql | 10 +++++++ repmgr.c | 49 ++++++++++++++++++++++++++++++--- repmgrd.c | 72 +++++++++++++++++++++++++++++++++++++++++++++---- 5 files changed, 178 insertions(+), 9 deletions(-) diff --git a/dbutils.c b/dbutils.c index 904263b2..6211c41c 100644 --- a/dbutils.c +++ b/dbutils.c @@ -2429,6 +2429,60 @@ announce_candidature(PGconn *conn, t_node_info *this_node, t_node_info *other_no return retval; } +void +notify_follow_primary(PGconn *conn, int primary_node_id) +{ + PQExpBufferData query; + PGresult *res; + + initPQExpBuffer(&query); + + appendPQExpBuffer(&query, + "SELECT repmgr.notify_follow_primary(%i)", + primary_node_id); + + // XXX handle failure + res = PQexec(conn, query.data); + termPQExpBuffer(&query); + + PQclear(res); + return; +} + + +bool +get_new_primary(PGconn *conn, int *primary_node_id) +{ + PQExpBufferData query; + PGresult *res; + + int new_primary_node_id; + + initPQExpBuffer(&query); + + appendPQExpBuffer(&query, + "SELECT repmgr.get_new_primary()"); + + res = PQexec(conn, query.data); + termPQExpBuffer(&query); + // XXX handle error + + new_primary_node_id = atoi(PQgetvalue(res, 0, 0)); + + if (new_primary_node_id == UNKNOWN_NODE_ID) + { + PQclear(res); + return false; + } + + PQclear(res); + + *primary_node_id = new_primary_node_id; + + return true; +} + + /* ============================ */ /* replication status functions */ /* ============================ */ diff --git a/dbutils.h b/dbutils.h index 8f419169..f0b80f0e 100644 --- a/dbutils.h +++ b/dbutils.h @@ -247,6 +247,8 @@ NodeVotingStatus get_voting_status(PGconn *conn); int request_vote(PGconn *conn, t_node_info *this_node, t_node_info *other_node, int electoral_term); int set_voting_status_initiated(PGconn *conn); bool announce_candidature(PGconn *conn, t_node_info *this_node, t_node_info *other_node, int electoral_term); +void notify_follow_primary(PGconn *conn, int primary_node_id); +bool get_new_primary(PGconn *conn, int *primary_node_id); /* replication status functions */ diff --git a/repmgr--4.0.sql b/repmgr--4.0.sql index eb9e9b58..23ee0cc4 100644 --- a/repmgr--4.0.sql +++ b/repmgr--4.0.sql @@ -55,3 +55,13 @@ CREATE FUNCTION other_node_is_candidate(INT, INT) RETURNS BOOL AS '$libdir/repmgr', 'other_node_is_candidate' LANGUAGE C STRICT; + +CREATE FUNCTION notify_follow_primary(INT) + RETURNS VOID + AS '$libdir/repmgr', 'notify_follow_primary' + LANGUAGE C STRICT; + +CREATE FUNCTION get_new_primary() + RETURNS INT + AS '$libdir/repmgr', 'get_new_primary' + LANGUAGE C STRICT; diff --git a/repmgr.c b/repmgr.c index 3e6f0f0d..d76d215e 100644 --- a/repmgr.c +++ b/repmgr.c @@ -32,7 +32,6 @@ #define UNKNOWN_NODE_ID -1 -#define MAXFNAMELEN 64 #define TRANCHE_NAME "repmgrd" PG_MODULE_MAGIC; @@ -50,6 +49,7 @@ typedef struct repmgrdSharedState NodeVotingStatus voting_status; int current_electoral_term; int candidate_node_id; + bool follow_new_primary; } repmgrdSharedState; static repmgrdSharedState *shared_state = NULL; @@ -71,8 +71,16 @@ PG_FUNCTION_INFO_V1(get_voting_status); Datum set_voting_status_initiated(PG_FUNCTION_ARGS); PG_FUNCTION_INFO_V1(set_voting_status_initiated); -Datum other_node_is_candidate(PG_FUNCTION_ARGS); +Datum other_node_is_candidate(PG_FUNCTION_ARGS); PG_FUNCTION_INFO_V1(other_node_is_candidate); + +Datum notify_follow_primary(PG_FUNCTION_ARGS); +PG_FUNCTION_INFO_V1(notify_follow_primary); + +Datum get_new_primary(PG_FUNCTION_ARGS); +PG_FUNCTION_INFO_V1(get_new_primary); + + /* * Module load callback */ @@ -146,6 +154,7 @@ repmgr_shmem_startup(void) shared_state->voting_status = VS_NO_VOTE; shared_state->candidate_node_id = UNKNOWN_NODE_ID; shared_state->current_electoral_term = 0; + shared_state->follow_new_primary = false; } LWLockRelease(AddinShmemInitLock); @@ -168,8 +177,9 @@ request_vote(PG_FUNCTION_ARGS) LWLockAcquire(shared_state->lock, LW_SHARED); /* this node has initiated voting or already responded to another node */ - if (current_electoral_term == shared_state->current_electoral_term - && shared_state->voting_status != VS_NO_VOTE) +// if (current_electoral_term == shared_state->current_electoral_term +// && shared_state->voting_status != VS_NO_VOTE) + if (shared_state->voting_status != VS_NO_VOTE) { LWLockRelease(shared_state->lock); @@ -270,3 +280,34 @@ other_node_is_candidate(PG_FUNCTION_ARGS) elog(INFO, "node %i is candidate", requesting_node_id); PG_RETURN_BOOL(true); } + +Datum +notify_follow_primary(PG_FUNCTION_ARGS) +{ + int primary_node_id = PG_GETARG_INT32(0); + + LWLockAcquire(shared_state->lock, LW_SHARED); + + /* Explicitly set the primary node id */ + shared_state->candidate_node_id = primary_node_id; + shared_state->follow_new_primary = true; + LWLockRelease(shared_state->lock); + + PG_RETURN_VOID(); +} + + +Datum +get_new_primary(PG_FUNCTION_ARGS) +{ + int new_primary_node_id = UNKNOWN_NODE_ID; + + LWLockAcquire(shared_state->lock, LW_SHARED); + + if (shared_state->follow_new_primary == true) + new_primary_node_id = shared_state->candidate_node_id; + + LWLockRelease(shared_state->lock); + + PG_RETURN_INT32(new_primary_node_id); +} diff --git a/repmgrd.c b/repmgrd.c index 87e2bea7..cb8e3820 100644 --- a/repmgrd.c +++ b/repmgrd.c @@ -28,7 +28,8 @@ typedef enum { FAILOVER_STATE_PROMOTED, FAILOVER_STATE_PROMOTION_FAILED, FAILOVER_STATE_PRIMARY_REAPPEARED, - FAILOVER_STATE_LOCAL_NODE_FAILURE + FAILOVER_STATE_LOCAL_NODE_FAILURE, + FAILOVER_STATE_WAITING_NEW_PRIMARY // FOLLOWED_NEW_PRIMARY // FOLLOW_WAIT_TIMEOUT } FailoverState; @@ -90,6 +91,8 @@ static const char *_print_voting_status(NodeVotingStatus voting_status); static const char *_print_election_result(ElectionResult result); static FailoverState promote_self(void); +static void wait_primary_notification(void); +static void notify_followers(NodeInfoList *standby_nodes); static void close_connections(); static void terminate(int retval); @@ -579,14 +582,27 @@ monitor_streaming_standby(void) // --> need timeout in case new primary doesn't come up, then rerun election log_info("I am a follower and am waiting to be informed by the winner"); + failover_state = FAILOVER_STATE_WAITING_NEW_PRIMARY; } switch(failover_state) { case FAILOVER_STATE_PROMOTED: - // inform nodes - // pass control back down and start primary monitoring - break; + /* inform former siblings that we are Number 1 */ + + notify_followers(&standby_nodes); + /* we no longer care about our former siblings */ + clear_node_info_list(&standby_nodes); + + /* pass control back down to start_monitoring() */ + log_info(_("switching to primary monitoring mode")); + + return; + case FAILOVER_STATE_WAITING_NEW_PRIMARY: + /* either follow or time out; either way resume monitoring */ + wait_primary_notification(); + /* pass control back down to start_monitoring() */ + return; case FAILOVER_STATE_PROMOTION_FAILED: case FAILOVER_STATE_PRIMARY_REAPPEARED: case FAILOVER_STATE_LOCAL_NODE_FAILURE: @@ -644,7 +660,7 @@ promote_self(void) // XXX handle this return FAILOVER_STATE_LOCAL_NODE_FAILURE; } - } +} if (r != 0) { @@ -701,6 +717,52 @@ promote_self(void) return FAILOVER_STATE_PROMOTED; } +static void +wait_primary_notification(void) +{ + // XXX make this configurable + int wait_primary_timeout = 60; + int i; + int new_primary_id; + + for (i = 0; i < wait_primary_timeout; i++) + { + if (get_new_primary(local_conn, &new_primary_id) == true) + { + log_debug("XXX new primary is %i", new_primary_id); + //return; + } + sleep(1); + } +} + + +static void +notify_followers(NodeInfoList *standby_nodes) +{ + NodeInfoListCell *cell; + + for (cell = standby_nodes->head; cell; cell = cell->next) + { + log_debug("intending to notify %i... ", cell->node_info->node_id); + if (PQstatus(cell->node_info->conn) != CONNECTION_OK) + { + log_debug("connection to %i lost... ", cell->node_info->node_id); + + cell->node_info->conn = establish_db_connection(cell->node_info->conninfo, false); + } + + if (PQstatus(cell->node_info->conn) != CONNECTION_OK) + { + log_debug("unable to reconnect to %i ... ", cell->node_info->node_id); + + continue; + } + log_debug("notifying node %i to follow new primary", cell->node_info->node_id); + notify_follow_primary(cell->node_info->conn, local_node_info.node_id); + } +} + static const char * _print_voting_status(NodeVotingStatus voting_status)