have new primary communicate to standbys

This commit is contained in:
Ian Barwick
2017-06-30 21:45:25 +09:00
parent 1857e23fef
commit debe5a18c5
5 changed files with 178 additions and 9 deletions

View File

@@ -2429,6 +2429,60 @@ announce_candidature(PGconn *conn, t_node_info *this_node, t_node_info *other_no
return retval;
}
void
notify_follow_primary(PGconn *conn, int primary_node_id)
{
PQExpBufferData query;
PGresult *res;
initPQExpBuffer(&query);
appendPQExpBuffer(&query,
"SELECT repmgr.notify_follow_primary(%i)",
primary_node_id);
// XXX handle failure
res = PQexec(conn, query.data);
termPQExpBuffer(&query);
PQclear(res);
return;
}
bool
get_new_primary(PGconn *conn, int *primary_node_id)
{
PQExpBufferData query;
PGresult *res;
int new_primary_node_id;
initPQExpBuffer(&query);
appendPQExpBuffer(&query,
"SELECT repmgr.get_new_primary()");
res = PQexec(conn, query.data);
termPQExpBuffer(&query);
// XXX handle error
new_primary_node_id = atoi(PQgetvalue(res, 0, 0));
if (new_primary_node_id == UNKNOWN_NODE_ID)
{
PQclear(res);
return false;
}
PQclear(res);
*primary_node_id = new_primary_node_id;
return true;
}
/* ============================ */
/* replication status functions */
/* ============================ */

View File

@@ -247,6 +247,8 @@ NodeVotingStatus get_voting_status(PGconn *conn);
int request_vote(PGconn *conn, t_node_info *this_node, t_node_info *other_node, int electoral_term);
int set_voting_status_initiated(PGconn *conn);
bool announce_candidature(PGconn *conn, t_node_info *this_node, t_node_info *other_node, int electoral_term);
void notify_follow_primary(PGconn *conn, int primary_node_id);
bool get_new_primary(PGconn *conn, int *primary_node_id);
/* replication status functions */

View File

@@ -55,3 +55,13 @@ CREATE FUNCTION other_node_is_candidate(INT, INT)
RETURNS BOOL
AS '$libdir/repmgr', 'other_node_is_candidate'
LANGUAGE C STRICT;
CREATE FUNCTION notify_follow_primary(INT)
RETURNS VOID
AS '$libdir/repmgr', 'notify_follow_primary'
LANGUAGE C STRICT;
CREATE FUNCTION get_new_primary()
RETURNS INT
AS '$libdir/repmgr', 'get_new_primary'
LANGUAGE C STRICT;

View File

@@ -32,7 +32,6 @@
#define UNKNOWN_NODE_ID -1
#define MAXFNAMELEN 64
#define TRANCHE_NAME "repmgrd"
PG_MODULE_MAGIC;
@@ -50,6 +49,7 @@ typedef struct repmgrdSharedState
NodeVotingStatus voting_status;
int current_electoral_term;
int candidate_node_id;
bool follow_new_primary;
} repmgrdSharedState;
static repmgrdSharedState *shared_state = NULL;
@@ -73,6 +73,14 @@ PG_FUNCTION_INFO_V1(set_voting_status_initiated);
Datum other_node_is_candidate(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(other_node_is_candidate);
Datum notify_follow_primary(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(notify_follow_primary);
Datum get_new_primary(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(get_new_primary);
/*
* Module load callback
*/
@@ -146,6 +154,7 @@ repmgr_shmem_startup(void)
shared_state->voting_status = VS_NO_VOTE;
shared_state->candidate_node_id = UNKNOWN_NODE_ID;
shared_state->current_electoral_term = 0;
shared_state->follow_new_primary = false;
}
LWLockRelease(AddinShmemInitLock);
@@ -168,8 +177,9 @@ request_vote(PG_FUNCTION_ARGS)
LWLockAcquire(shared_state->lock, LW_SHARED);
/* this node has initiated voting or already responded to another node */
if (current_electoral_term == shared_state->current_electoral_term
&& shared_state->voting_status != VS_NO_VOTE)
// if (current_electoral_term == shared_state->current_electoral_term
// && shared_state->voting_status != VS_NO_VOTE)
if (shared_state->voting_status != VS_NO_VOTE)
{
LWLockRelease(shared_state->lock);
@@ -270,3 +280,34 @@ other_node_is_candidate(PG_FUNCTION_ARGS)
elog(INFO, "node %i is candidate", requesting_node_id);
PG_RETURN_BOOL(true);
}
Datum
notify_follow_primary(PG_FUNCTION_ARGS)
{
int primary_node_id = PG_GETARG_INT32(0);
LWLockAcquire(shared_state->lock, LW_SHARED);
/* Explicitly set the primary node id */
shared_state->candidate_node_id = primary_node_id;
shared_state->follow_new_primary = true;
LWLockRelease(shared_state->lock);
PG_RETURN_VOID();
}
Datum
get_new_primary(PG_FUNCTION_ARGS)
{
int new_primary_node_id = UNKNOWN_NODE_ID;
LWLockAcquire(shared_state->lock, LW_SHARED);
if (shared_state->follow_new_primary == true)
new_primary_node_id = shared_state->candidate_node_id;
LWLockRelease(shared_state->lock);
PG_RETURN_INT32(new_primary_node_id);
}

View File

@@ -28,7 +28,8 @@ typedef enum {
FAILOVER_STATE_PROMOTED,
FAILOVER_STATE_PROMOTION_FAILED,
FAILOVER_STATE_PRIMARY_REAPPEARED,
FAILOVER_STATE_LOCAL_NODE_FAILURE
FAILOVER_STATE_LOCAL_NODE_FAILURE,
FAILOVER_STATE_WAITING_NEW_PRIMARY
// FOLLOWED_NEW_PRIMARY
// FOLLOW_WAIT_TIMEOUT
} FailoverState;
@@ -90,6 +91,8 @@ static const char *_print_voting_status(NodeVotingStatus voting_status);
static const char *_print_election_result(ElectionResult result);
static FailoverState promote_self(void);
static void wait_primary_notification(void);
static void notify_followers(NodeInfoList *standby_nodes);
static void close_connections();
static void terminate(int retval);
@@ -579,14 +582,27 @@ monitor_streaming_standby(void)
// --> need timeout in case new primary doesn't come up, then rerun election
log_info("I am a follower and am waiting to be informed by the winner");
failover_state = FAILOVER_STATE_WAITING_NEW_PRIMARY;
}
switch(failover_state)
{
case FAILOVER_STATE_PROMOTED:
// inform nodes
// pass control back down and start primary monitoring
break;
/* inform former siblings that we are Number 1 */
notify_followers(&standby_nodes);
/* we no longer care about our former siblings */
clear_node_info_list(&standby_nodes);
/* pass control back down to start_monitoring() */
log_info(_("switching to primary monitoring mode"));
return;
case FAILOVER_STATE_WAITING_NEW_PRIMARY:
/* either follow or time out; either way resume monitoring */
wait_primary_notification();
/* pass control back down to start_monitoring() */
return;
case FAILOVER_STATE_PROMOTION_FAILED:
case FAILOVER_STATE_PRIMARY_REAPPEARED:
case FAILOVER_STATE_LOCAL_NODE_FAILURE:
@@ -644,7 +660,7 @@ promote_self(void)
// XXX handle this
return FAILOVER_STATE_LOCAL_NODE_FAILURE;
}
}
}
if (r != 0)
{
@@ -701,6 +717,52 @@ promote_self(void)
return FAILOVER_STATE_PROMOTED;
}
static void
wait_primary_notification(void)
{
// XXX make this configurable
int wait_primary_timeout = 60;
int i;
int new_primary_id;
for (i = 0; i < wait_primary_timeout; i++)
{
if (get_new_primary(local_conn, &new_primary_id) == true)
{
log_debug("XXX new primary is %i", new_primary_id);
//return;
}
sleep(1);
}
}
static void
notify_followers(NodeInfoList *standby_nodes)
{
NodeInfoListCell *cell;
for (cell = standby_nodes->head; cell; cell = cell->next)
{
log_debug("intending to notify %i... ", cell->node_info->node_id);
if (PQstatus(cell->node_info->conn) != CONNECTION_OK)
{
log_debug("connection to %i lost... ", cell->node_info->node_id);
cell->node_info->conn = establish_db_connection(cell->node_info->conninfo, false);
}
if (PQstatus(cell->node_info->conn) != CONNECTION_OK)
{
log_debug("unable to reconnect to %i ... ", cell->node_info->node_id);
continue;
}
log_debug("notifying node %i to follow new primary", cell->node_info->node_id);
notify_follow_primary(cell->node_info->conn, local_node_info.node_id);
}
}
static const char *
_print_voting_status(NodeVotingStatus voting_status)