Improve handling

not sure if we need to store the electoral term...
This commit is contained in:
Ian Barwick
2017-06-30 13:40:19 +09:00
parent 3514e20367
commit fc4f276844
6 changed files with 247 additions and 127 deletions

View File

@@ -1210,7 +1210,7 @@ _populate_node_record(PGresult *res, t_node_info *node_info, int row)
/* Set remaining struct fields with default values */
node_info->is_ready = false;
node_info->is_visible = false;
node_info->xlog_location = InvalidXLogRecPtr;
node_info->last_wal_receive_lsn = InvalidXLogRecPtr;
}
@@ -1676,7 +1676,6 @@ update_node_record_status(PGconn *conn, int this_node_id, char *type, int upstre
}
bool
delete_node_record(PGconn *conn, int node)
{
@@ -1703,10 +1702,40 @@ delete_node_record(PGconn *conn, int node)
}
PQclear(res);
return true;
return true;
}
void
clear_node_info_list(NodeInfoList *nodes)
{
NodeInfoListCell *cell;
NodeInfoListCell *next_cell;
/* close any open connections */
for (cell = nodes->head; cell; cell = cell->next)
{
if (cell->node_info->conn != NULL)
{
PQfinish(cell->node_info->conn);
cell->node_info->conn = NULL;
}
}
cell = nodes->head;
while (cell != NULL)
{
next_cell = cell->next;
pfree(cell->node_info);
pfree(cell);
cell = next_cell;
}
}
/* ====================== */
/* event record functions */
/* ====================== */
@@ -2289,37 +2318,43 @@ get_voting_status(PGconn *conn)
}
int
request_vote(PGconn *conn, t_node_info *this_node, t_node_info *other_node, XLogRecPtr last_wal_receive_lsn)
request_vote(PGconn *conn, t_node_info *this_node, t_node_info *other_node, int electoral_term)
{
PQExpBufferData query;
PGresult *res;
int lsn_diff;
other_node->last_wal_receive_lsn = InvalidXLogRecPtr;
initPQExpBuffer(&query);
appendPQExpBuffer(&query,
"SELECT repmgr.request_vote(%i, '%X/%X'::pg_lsn)",
"SELECT repmgr.request_vote(%i, %i)",
this_node->node_id,
electoral_term);
/* "SELECT repmgr.request_vote(%i, '%X/%X'::pg_lsn)",
this_node->node_id,
(uint32) (last_wal_receive_lsn >> 32),
(uint32) last_wal_receive_lsn);
(uint32) last_wal_receive_lsn);*/
res = PQexec(conn, query.data);
termPQExpBuffer(&query);
// check for NULL
/* check for NULL */
if (PQgetisnull(res, 0, 0))
{
log_debug("XXX NULL returned by repmgr.request_vote()");
return 0;
}
lsn_diff = atoi(PQgetvalue(res, 0, 0));
log_debug("XXX lsn_diff %i", lsn_diff);
other_node->last_wal_receive_lsn = parse_lsn(PQgetvalue(res, 0, 0));
PQclear(res);
lsn_diff = this_node->last_wal_receive_lsn - other_node->last_wal_receive_lsn;
log_debug("XXX lsn_diff %i", lsn_diff);
/* we're ahead */
if (lsn_diff > 0)
{
@@ -2327,6 +2362,7 @@ request_vote(PGconn *conn, t_node_info *this_node, t_node_info *other_node, XLog
return 1;
}
/* other node is ahead */
if (lsn_diff < 0)
{
@@ -2336,36 +2372,38 @@ request_vote(PGconn *conn, t_node_info *this_node, t_node_info *other_node, XLog
/* tiebreak */
/* we're higher priority */
if (this_node->priority > other_node->priority)
/* other node is higher priority */
if (this_node->priority < other_node->priority)
{
log_debug("this node has higher priority");
return 1;
log_debug("other node has higher priority");
return 0;
}
/* still tiebreak - decide by node_id */
// we're the candidate, so we win
/* still tiebreak - we're the candidate, so we win */
log_debug("win by default");
return 1;
}
void
int
set_voting_status_initiated(PGconn *conn)
{
PGresult *res;
int electoral_term;
res = PQexec(conn, "SELECT repmgr.set_voting_status_initiated()");
electoral_term = atoi(PQgetvalue(res, 0, 0));
PQclear(res);
return;
return electoral_term;
}
bool
announce_candidature(PGconn *conn, t_node_info *this_node, t_node_info *other_node)
announce_candidature(PGconn *conn, t_node_info *this_node, t_node_info *other_node, int electoral_term)
{
PQExpBufferData query;
PGresult *res;
@@ -2375,8 +2413,9 @@ announce_candidature(PGconn *conn, t_node_info *this_node, t_node_info *other_no
initPQExpBuffer(&query);
appendPQExpBuffer(&query,
"SELECT repmgr.other_node_is_candidate(%i)",
this_node->node_id);
"SELECT repmgr.other_node_is_candidate(%i, %i)",
this_node->node_id,
electoral_term);
res = PQexec(conn, query.data);
termPQExpBuffer(&query);

View File

@@ -4,8 +4,6 @@
* Copyright (c) 2ndQuadrant, 2010-2017
*/
#ifndef _REPMGR_DBUTILS_H_
#define _REPMGR_DBUTILS_H_
@@ -63,7 +61,7 @@ typedef struct s_node_info
/* used during failover to track node status */
bool is_ready;
bool is_visible;
XLogRecPtr xlog_location;
XLogRecPtr last_wal_receive_lsn;
PGconn *conn;
} t_node_info;
@@ -225,7 +223,7 @@ bool delete_node_record(PGconn *conn, int node);
bool update_node_record_set_primary(PGconn *conn, int this_node_id);
bool update_node_record_status(PGconn *conn, int this_node_id, char *type, int upstream_node_id, bool active);
void clear_node_info_list(NodeInfoList *nodes);
/* event record functions */
bool create_event_record(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details);
@@ -246,9 +244,9 @@ bool is_server_available(const char *conninfo);
/* node voting functions */
NodeVotingStatus get_voting_status(PGconn *conn);
int request_vote(PGconn *conn, t_node_info *this_node, t_node_info *other_node, XLogRecPtr last_wal_receive_lsn);
void set_voting_status_initiated(PGconn *conn);
bool announce_candidature(PGconn *conn, t_node_info *this_node, t_node_info *other_node);
int request_vote(PGconn *conn, t_node_info *this_node, t_node_info *other_node, int electoral_term);
int set_voting_status_initiated(PGconn *conn);
bool announce_candidature(PGconn *conn, t_node_info *this_node, t_node_info *other_node, int electoral_term);
/* replication status functions */

View File

@@ -36,23 +36,22 @@ LEFT JOIN nodes un
/* repmgrd functions */
CREATE FUNCTION request_vote(INT, pg_lsn)
RETURNS INT
CREATE FUNCTION request_vote(INT,INT)
RETURNS pg_lsn
AS '$libdir/repmgr', 'request_vote'
LANGUAGE C STRICT;
CREATE FUNCTION get_voting_status()
RETURNS INT
AS '$libdir/repmgr', 'get_voting_status'
LANGUAGE C STRICT;
CREATE FUNCTION set_voting_status_initiated()
RETURNS VOID
RETURNS INT
AS '$libdir/repmgr', 'set_voting_status_initiated'
LANGUAGE C STRICT;
CREATE FUNCTION other_node_is_candidate(INT)
CREATE FUNCTION other_node_is_candidate(INT, INT)
RETURNS BOOL
AS '$libdir/repmgr', 'other_node_is_candidate'
LANGUAGE C STRICT;

View File

@@ -48,6 +48,7 @@ typedef struct repmgrdSharedState
LWLockId lock; /* protects search/modification */
NodeState node_state;
NodeVotingStatus voting_status;
int current_electoral_term;
int candidate_node_id;
} repmgrdSharedState;
@@ -144,6 +145,7 @@ repmgr_shmem_startup(void)
shared_state->voting_status = VS_NO_VOTE;
shared_state->candidate_node_id = UNKNOWN_NODE_ID;
shared_state->current_electoral_term = 0;
}
LWLockRelease(AddinShmemInitLock);
@@ -153,59 +155,63 @@ repmgr_shmem_startup(void)
Datum
request_vote(PG_FUNCTION_ARGS)
{
StringInfoData query;
XLogRecPtr our_lsn = InvalidXLogRecPtr;
/* node_id used for logging purposes */
int requesting_node_id = PG_GETARG_INT32(0);
XLogRecPtr requesting_node_last_lsn = PG_GETARG_LSN(1);
StringInfoData query;
int current_electoral_term = PG_GETARG_INT32(1);
int ret;
bool isnull;
int lsn_diff;
NodeVotingStatus voting_status;
LWLockAcquire(shared_state->lock, LW_SHARED);
voting_status = shared_state->voting_status;
LWLockRelease(shared_state->lock);
// keep lock until end of function?
/* this node has initiated voting or already responded to another node */
if (voting_status != VS_NO_VOTE)
PG_RETURN_NULL();
if (shared_state->voting_status != VS_NO_VOTE)
{
LWLockRelease(shared_state->lock);
elog(INFO, "id is %i, lsn: %X/%X",
requesting_node_id,
(uint32) (requesting_node_last_lsn >> 32),
(uint32) requesting_node_last_lsn);
PG_RETURN_NULL();
}
elog(INFO, "requesting node id is %i", requesting_node_id);
SPI_connect();
initStringInfo(&query);
appendStringInfo(
&query,
"SELECT ('%X/%X'::pg_lsn - pg_catalog.pg_last_wal_receive_lsn()::pg_lsn)::INT",
(uint32) (requesting_node_last_lsn >> 32),
(uint32) requesting_node_last_lsn);
"SELECT pg_catalog.pg_last_wal_receive_lsn()"
);
elog(INFO, "query: %s", query.data);
ret = SPI_execute(query.data, true, 0);
// xxx handle errors
lsn_diff = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
SPI_tuptable->tupdesc,
1, &isnull));
our_lsn = DatumGetLSN(SPI_getbinval(SPI_tuptable->vals[0],
SPI_tuptable->tupdesc,
1, &isnull));
elog(INFO, "XXX diff is %i",
lsn_diff);
elog(INFO, "Our LSN is %X/%X",
(uint32) (our_lsn >> 32),
(uint32) our_lsn);
SPI_finish();
/* indicate this node has responded to a vote request */
LWLockAcquire(shared_state->lock, LW_SHARED);
shared_state->voting_status = VS_VOTE_REQUEST_RECEIVED;
shared_state->current_electoral_term += 1;
LWLockRelease(shared_state->lock);
PG_RETURN_INT32(lsn_diff);
// should we free "query" here?
PG_RETURN_LSN(our_lsn);
}
@@ -225,26 +231,35 @@ get_voting_status(PG_FUNCTION_ARGS)
Datum
set_voting_status_initiated(PG_FUNCTION_ARGS)
{
int electoral_term;
LWLockAcquire(shared_state->lock, LW_SHARED);
shared_state->voting_status = VS_VOTE_INITIATED;
shared_state->current_electoral_term += 1;
electoral_term = shared_state->current_electoral_term;
LWLockRelease(shared_state->lock);
PG_RETURN_VOID();
PG_RETURN_INT32(electoral_term);
}
Datum
other_node_is_candidate(PG_FUNCTION_ARGS)
{
int requesting_node_id = PG_GETARG_INT32(0);
int electoral_term = PG_GETARG_INT32(1);
LWLockAcquire(shared_state->lock, LW_SHARED);
if (shared_state->candidate_node_id != UNKNOWN_NODE_ID)
if (shared_state->current_electoral_term == electoral_term)
{
elog(INFO, "node %i requesting candidature, but node %i already candidate",
requesting_node_id,
shared_state->candidate_node_id);
PG_RETURN_BOOL(false);
if (shared_state->candidate_node_id != UNKNOWN_NODE_ID)
{
elog(INFO, "node %i requesting candidature, but node %i already candidate",
requesting_node_id,
shared_state->candidate_node_id);
PG_RETURN_BOOL(false);
}
}
shared_state->candidate_node_id = requesting_node_id;

189
repmgrd.c
View File

@@ -22,6 +22,12 @@ typedef enum {
NODE_STATUS_DOWN
} NodeStatus;
typedef enum {
ELECTION_NOT_CANDIDATE = -1,
ELECTION_WON,
ELECTION_LOST
} ElectionResult;
static char *config_file = NULL;
static bool verbose = false;
@@ -37,6 +43,8 @@ static t_node_info upstream_node_info = T_NODE_INFO_INITIALIZER;
static PGconn *upstream_conn = NULL;
static PGconn *primary_conn = NULL;
static NodeInfoList standby_nodes = T_NODE_INFO_LIST_INITIALIZER;
/* Collate command line errors here for friendlier reporting */
static ItemList cli_errors = { NULL, NULL };
@@ -64,7 +72,9 @@ static void handle_sigint(SIGNAL_ARGS);
#endif
static PGconn *try_reconnect(const char *conninfo, NodeStatus *node_status);
static NodeVotingStatus do_election(void);
static ElectionResult do_election(void);
static const char *_print_voting_status(NodeVotingStatus voting_status);
static const char *_print_election_result(ElectionResult result);
static void close_connections();
static void terminate(int retval);
@@ -534,38 +544,29 @@ monitor_streaming_standby(void)
/* still down after reconnect attempt(s) - */
if (upstream_node_status == NODE_STATUS_DOWN)
{
NodeVotingStatus voting_status = do_election();
switch(voting_status)
{
case VS_NO_VOTE:
log_info("NO VOTE");
break;
case VS_VOTE_REQUEST_RECEIVED:
log_info("VOTE REQUEST RECEIVED");
break;
case VS_VOTE_INITIATED:
log_info("VOTE REQUEST INITIATED");
break;
case VS_VOTE_WON:
log_info("VOTE REQUEST WON");
break;
case VS_VOTE_LOST:
log_info("VOTE REQUEST LOST");
break;
case VS_UNKNOWN:
log_info("VOTE REQUEST UNKNOWN");
break;
}
// begin voting process
// if VS_PROMOTION_CANDIDATE
ElectionResult election_result = do_election();
log_debug("election result: %s", _print_election_result(election_result));
if (election_result == ELECTION_WON)
{
log_info("I am the winner, will now promote self and inform other nodes");
}
else if (election_result == ELECTION_LOST)
{
log_info("I am the candidate but did not get all votes; will now determine the best candidate");
}
else
{
log_info("I am a follower and am waiting to be informed by the winner");
}
// if ELECTION_WON
// promote self, notify nodes
// else if VS_VOTE_REQUEST_RECEIVED, look for new primary and follow if necessary
// else if ELECTION_NOT_CANDIDATE, wait for new primary notification
// --> need timeout in case new primary doesn't come up, then rerun election
}
}
@@ -576,53 +577,104 @@ monitor_streaming_standby(void)
}
}
static const char *
_print_voting_status(NodeVotingStatus voting_status)
{
switch(voting_status)
{
case VS_NO_VOTE:
return "NO VOTE";
case VS_VOTE_REQUEST_RECEIVED:
return "VOTE REQUEST RECEIVED";
case VS_VOTE_INITIATED:
return "VOTE REQUEST INITIATED";
case VS_UNKNOWN:
return "VOTE REQUEST UNKNOWN";
}
return "UNKNOWN VOTE REQUEST STATE";
}
static const char *
_print_election_result(ElectionResult result)
{
switch(result)
{
case ELECTION_NOT_CANDIDATE:
return "NOT CANDIDATE";
case ELECTION_WON:
return "WON";
case ELECTION_LOST:
return "LOST";
}
/* should never reach here */
return "UNKNOWN";
}
// store lsndiffs, in the event we're not the best node,
// i.e. don't get all the votes, we pass the baton to the best node
static NodeVotingStatus
static ElectionResult
do_election(void)
{
int electoral_term = -1;
// int total_eligible_nodes = 0;
int votes_for_me = 0;
/* we're visible */
int visible_nodes = 1;
XLogRecPtr last_wal_receive_lsn = InvalidXLogRecPtr;
// get voting status from shared memory
// should be "VS_NO_VOTE" or "VS_VOTE_REQUEST_RECEIVED"
// if VS_NO_VOTE, initiate voting process
NodeVotingStatus voting_status;
NodeInfoList standby_nodes = T_NODE_INFO_LIST_INITIALIZER;
NodeInfoListCell *cell;
long unsigned rand_wait = (long) ((rand() % 50) + 10) * 10000;
bool other_node_is_candidate = false;
log_debug("do_election(): sleeping %li", rand_wait);
/* sleep for a random period of 100 ~ 500 ms
* XXX adjust this downwards if feasible
*/
long unsigned rand_wait = (long) ((rand() % 50) + 10) * 10000;
log_debug("do_election(): sleeping %lu", rand_wait);
pg_usleep(rand_wait);
local_node_info.last_wal_receive_lsn = InvalidXLogRecPtr;
log_debug("do_election(): executing get_voting_status()");
voting_status = get_voting_status(local_conn);
log_debug("do_election(): node voting status is %i", (int)voting_status);
log_debug("do_election(): node voting status is %s", _print_voting_status(voting_status));
if (voting_status == VS_VOTE_REQUEST_RECEIVED)
{
log_debug("vote request already received, not candidate");
/* we've already been requested to vote, so can't become a candidate */
return voting_status;
return ELECTION_NOT_CANDIDATE;
}
// XXX should we mark ourselves as candidate?
// -> so any further vote requests are rejected?
set_voting_status_initiated(local_conn);
// mark ourselves as candidate
// -> so any further vote requests are rejected
electoral_term = set_voting_status_initiated(local_conn);
/* get all active nodes attached to primary, excluding self */
// XXX include barman node in results
clear_node_info_list(&standby_nodes);
get_active_sibling_node_records(local_conn,
local_node_info.node_id,
upstream_node_info.node_id,
@@ -633,7 +685,7 @@ do_election(void)
if (standby_nodes.node_count == 0)
{
log_debug("no other nodes - we win by default");
return VS_VOTE_WON;
return ELECTION_WON;
}
for (cell = standby_nodes.head; cell; cell = cell->next)
@@ -649,10 +701,20 @@ do_election(void)
continue;
}
if (announce_candidature(cell->node_info->conn, &local_node_info, cell->node_info) == false)
/*
* tell the other node we're candidate - if the node has already declared
* itself, we withdraw
*
* XXX check for situations where more than one node could end up as candidate?
*/
// other node: if not candidate in this term, reset state (but don't bump term)
if (announce_candidature(cell->node_info->conn, &local_node_info, cell->node_info, electoral_term) == false)
{
log_debug("node %i is candidate", cell->node_info->node_id);
other_node_is_candidate = true;
/* don't perform any more checks */
break;
}
cell->node_info->is_visible = true;
@@ -661,6 +723,10 @@ do_election(void)
if (other_node_is_candidate == true)
{
clear_node_info_list(&standby_nodes);
// XXX do this
// unset_voting_status_initiated(local_conn);
return VS_NO_VOTE;
}
@@ -668,26 +734,31 @@ do_election(void)
/* check again if we've been asked to vote */
if (0)
{
voting_status = get_voting_status(local_conn);
log_debug("do_election(): node voting status is %i", (int)voting_status);
if (voting_status == VS_VOTE_REQUEST_RECEIVED)
if (0)
{
/* we've already been requested to vote, so can't become a candidate */
return voting_status;
voting_status = get_voting_status(local_conn);
log_debug("do_election(): node voting status is %i", (int)voting_status);
if (voting_status == VS_VOTE_REQUEST_RECEIVED)
{
/* we've already been requested to vote, so can't become a candidate */
return voting_status;
}
}
}
/* current node votes for itself by default */
// XXX check returned LSNs, if one is higher than ours, don't vote for ourselves
// either
votes_for_me += 1;
/* get our lsn*/
last_wal_receive_lsn = get_last_wal_receive_location(local_conn);
/* get our lsn */
local_node_info.last_wal_receive_lsn = get_last_wal_receive_location(local_conn);
log_debug("LAST receive lsn = %X/%X", (uint32) (last_wal_receive_lsn >> 32), (uint32) last_wal_receive_lsn);
log_debug("LAST receive lsn = %X/%X",
(uint32) (local_node_info.last_wal_receive_lsn >> 32),
(uint32) local_node_info.last_wal_receive_lsn);
/* request vote */
for (cell = standby_nodes.head; cell; cell = cell->next)
@@ -699,7 +770,7 @@ if (0)
votes_for_me += request_vote(cell->node_info->conn,
&local_node_info,
cell->node_info,
last_wal_receive_lsn);
electoral_term);
PQfinish(cell->node_info->conn);
cell->node_info->conn = NULL;
@@ -708,9 +779,9 @@ if (0)
log_notice(_("%i of of %i votes"), votes_for_me, visible_nodes);
if (votes_for_me == visible_nodes)
return VS_VOTE_WON;
return ELECTION_WON;
return VS_VOTE_LOST;
return ELECTION_LOST;
}
static void
@@ -913,7 +984,7 @@ try_reconnect(const char *conninfo, NodeStatus *node_status)
int i;
// XXX make this all configurable
int max_attempts = 10;
int max_attempts = 5;
for (i = 0; i < max_attempts; i++)
{

View File

@@ -9,9 +9,7 @@ typedef enum {
VS_UNKNOWN = -1,
VS_NO_VOTE,
VS_VOTE_REQUEST_RECEIVED,
VS_VOTE_INITIATED,
VS_VOTE_WON,
VS_VOTE_LOST
VS_VOTE_INITIATED
} NodeVotingStatus;
#endif /* _VOTING_H_ */