diff --git a/dbutils.c b/dbutils.c index 91698ba6..904263b2 100644 --- a/dbutils.c +++ b/dbutils.c @@ -1210,7 +1210,7 @@ _populate_node_record(PGresult *res, t_node_info *node_info, int row) /* Set remaining struct fields with default values */ node_info->is_ready = false; node_info->is_visible = false; - node_info->xlog_location = InvalidXLogRecPtr; + node_info->last_wal_receive_lsn = InvalidXLogRecPtr; } @@ -1676,7 +1676,6 @@ update_node_record_status(PGconn *conn, int this_node_id, char *type, int upstre } - bool delete_node_record(PGconn *conn, int node) { @@ -1703,10 +1702,40 @@ delete_node_record(PGconn *conn, int node) } PQclear(res); - return true; + return true; } + + +void +clear_node_info_list(NodeInfoList *nodes) +{ + NodeInfoListCell *cell; + NodeInfoListCell *next_cell; + + /* close any open connections */ + for (cell = nodes->head; cell; cell = cell->next) + { + if (cell->node_info->conn != NULL) + { + PQfinish(cell->node_info->conn); + cell->node_info->conn = NULL; + } + } + + cell = nodes->head; + + while (cell != NULL) + { + next_cell = cell->next; + pfree(cell->node_info); + pfree(cell); + cell = next_cell; + } +} + + /* ====================== */ /* event record functions */ /* ====================== */ @@ -2289,37 +2318,43 @@ get_voting_status(PGconn *conn) } int -request_vote(PGconn *conn, t_node_info *this_node, t_node_info *other_node, XLogRecPtr last_wal_receive_lsn) +request_vote(PGconn *conn, t_node_info *this_node, t_node_info *other_node, int electoral_term) { PQExpBufferData query; PGresult *res; int lsn_diff; + other_node->last_wal_receive_lsn = InvalidXLogRecPtr; + initPQExpBuffer(&query); appendPQExpBuffer(&query, - "SELECT repmgr.request_vote(%i, '%X/%X'::pg_lsn)", + "SELECT repmgr.request_vote(%i, %i)", + this_node->node_id, + electoral_term); +/* "SELECT repmgr.request_vote(%i, '%X/%X'::pg_lsn)", this_node->node_id, (uint32) (last_wal_receive_lsn >> 32), - (uint32) last_wal_receive_lsn); + (uint32) last_wal_receive_lsn);*/ res = PQexec(conn, query.data); - termPQExpBuffer(&query); - // check for NULL + /* check for NULL */ if (PQgetisnull(res, 0, 0)) { log_debug("XXX NULL returned by repmgr.request_vote()"); return 0; } - lsn_diff = atoi(PQgetvalue(res, 0, 0)); - - log_debug("XXX lsn_diff %i", lsn_diff); + other_node->last_wal_receive_lsn = parse_lsn(PQgetvalue(res, 0, 0)); PQclear(res); + lsn_diff = this_node->last_wal_receive_lsn - other_node->last_wal_receive_lsn; + + log_debug("XXX lsn_diff %i", lsn_diff); + /* we're ahead */ if (lsn_diff > 0) { @@ -2327,6 +2362,7 @@ request_vote(PGconn *conn, t_node_info *this_node, t_node_info *other_node, XLog return 1; } + /* other node is ahead */ if (lsn_diff < 0) { @@ -2336,36 +2372,38 @@ request_vote(PGconn *conn, t_node_info *this_node, t_node_info *other_node, XLog /* tiebreak */ - /* we're higher priority */ - if (this_node->priority > other_node->priority) + /* other node is higher priority */ + if (this_node->priority < other_node->priority) { - log_debug("this node has higher priority"); - return 1; + log_debug("other node has higher priority"); + return 0; } - /* still tiebreak - decide by node_id */ - - // we're the candidate, so we win + /* still tiebreak - we're the candidate, so we win */ log_debug("win by default"); return 1; } -void +int set_voting_status_initiated(PGconn *conn) { PGresult *res; + int electoral_term; res = PQexec(conn, "SELECT repmgr.set_voting_status_initiated()"); + electoral_term = atoi(PQgetvalue(res, 0, 0)); + PQclear(res); - return; + + return electoral_term; } bool -announce_candidature(PGconn *conn, t_node_info *this_node, t_node_info *other_node) +announce_candidature(PGconn *conn, t_node_info *this_node, t_node_info *other_node, int electoral_term) { PQExpBufferData query; PGresult *res; @@ -2375,8 +2413,9 @@ announce_candidature(PGconn *conn, t_node_info *this_node, t_node_info *other_no initPQExpBuffer(&query); appendPQExpBuffer(&query, - "SELECT repmgr.other_node_is_candidate(%i)", - this_node->node_id); + "SELECT repmgr.other_node_is_candidate(%i, %i)", + this_node->node_id, + electoral_term); res = PQexec(conn, query.data); termPQExpBuffer(&query); diff --git a/dbutils.h b/dbutils.h index 33c3fa19..8f419169 100644 --- a/dbutils.h +++ b/dbutils.h @@ -4,8 +4,6 @@ * Copyright (c) 2ndQuadrant, 2010-2017 */ - - #ifndef _REPMGR_DBUTILS_H_ #define _REPMGR_DBUTILS_H_ @@ -63,7 +61,7 @@ typedef struct s_node_info /* used during failover to track node status */ bool is_ready; bool is_visible; - XLogRecPtr xlog_location; + XLogRecPtr last_wal_receive_lsn; PGconn *conn; } t_node_info; @@ -225,7 +223,7 @@ bool delete_node_record(PGconn *conn, int node); bool update_node_record_set_primary(PGconn *conn, int this_node_id); bool update_node_record_status(PGconn *conn, int this_node_id, char *type, int upstream_node_id, bool active); - +void clear_node_info_list(NodeInfoList *nodes); /* event record functions */ bool create_event_record(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details); @@ -246,9 +244,9 @@ bool is_server_available(const char *conninfo); /* node voting functions */ NodeVotingStatus get_voting_status(PGconn *conn); -int request_vote(PGconn *conn, t_node_info *this_node, t_node_info *other_node, XLogRecPtr last_wal_receive_lsn); -void set_voting_status_initiated(PGconn *conn); -bool announce_candidature(PGconn *conn, t_node_info *this_node, t_node_info *other_node); +int request_vote(PGconn *conn, t_node_info *this_node, t_node_info *other_node, int electoral_term); +int set_voting_status_initiated(PGconn *conn); +bool announce_candidature(PGconn *conn, t_node_info *this_node, t_node_info *other_node, int electoral_term); /* replication status functions */ diff --git a/repmgr--4.0.sql b/repmgr--4.0.sql index 9142a209..eb9e9b58 100644 --- a/repmgr--4.0.sql +++ b/repmgr--4.0.sql @@ -36,23 +36,22 @@ LEFT JOIN nodes un /* repmgrd functions */ -CREATE FUNCTION request_vote(INT, pg_lsn) - RETURNS INT +CREATE FUNCTION request_vote(INT,INT) + RETURNS pg_lsn AS '$libdir/repmgr', 'request_vote' LANGUAGE C STRICT; - CREATE FUNCTION get_voting_status() RETURNS INT AS '$libdir/repmgr', 'get_voting_status' LANGUAGE C STRICT; CREATE FUNCTION set_voting_status_initiated() - RETURNS VOID + RETURNS INT AS '$libdir/repmgr', 'set_voting_status_initiated' LANGUAGE C STRICT; -CREATE FUNCTION other_node_is_candidate(INT) +CREATE FUNCTION other_node_is_candidate(INT, INT) RETURNS BOOL AS '$libdir/repmgr', 'other_node_is_candidate' LANGUAGE C STRICT; diff --git a/repmgr.c b/repmgr.c index c21ca7f0..0d36f5fd 100644 --- a/repmgr.c +++ b/repmgr.c @@ -48,6 +48,7 @@ typedef struct repmgrdSharedState LWLockId lock; /* protects search/modification */ NodeState node_state; NodeVotingStatus voting_status; + int current_electoral_term; int candidate_node_id; } repmgrdSharedState; @@ -144,6 +145,7 @@ repmgr_shmem_startup(void) shared_state->voting_status = VS_NO_VOTE; shared_state->candidate_node_id = UNKNOWN_NODE_ID; + shared_state->current_electoral_term = 0; } LWLockRelease(AddinShmemInitLock); @@ -153,59 +155,63 @@ repmgr_shmem_startup(void) Datum request_vote(PG_FUNCTION_ARGS) { + StringInfoData query; + XLogRecPtr our_lsn = InvalidXLogRecPtr; + /* node_id used for logging purposes */ int requesting_node_id = PG_GETARG_INT32(0); - XLogRecPtr requesting_node_last_lsn = PG_GETARG_LSN(1); - StringInfoData query; + int current_electoral_term = PG_GETARG_INT32(1); int ret; bool isnull; - int lsn_diff; - - NodeVotingStatus voting_status; - LWLockAcquire(shared_state->lock, LW_SHARED); - voting_status = shared_state->voting_status; - LWLockRelease(shared_state->lock); + + // keep lock until end of function? /* this node has initiated voting or already responded to another node */ - if (voting_status != VS_NO_VOTE) - PG_RETURN_NULL(); + if (shared_state->voting_status != VS_NO_VOTE) + { + LWLockRelease(shared_state->lock); - elog(INFO, "id is %i, lsn: %X/%X", - requesting_node_id, - (uint32) (requesting_node_last_lsn >> 32), - (uint32) requesting_node_last_lsn); + PG_RETURN_NULL(); + } + + + elog(INFO, "requesting node id is %i", requesting_node_id); SPI_connect(); initStringInfo(&query); appendStringInfo( &query, - "SELECT ('%X/%X'::pg_lsn - pg_catalog.pg_last_wal_receive_lsn()::pg_lsn)::INT", - (uint32) (requesting_node_last_lsn >> 32), - (uint32) requesting_node_last_lsn); + "SELECT pg_catalog.pg_last_wal_receive_lsn()" + ); elog(INFO, "query: %s", query.data); ret = SPI_execute(query.data, true, 0); // xxx handle errors - lsn_diff = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0], - SPI_tuptable->tupdesc, - 1, &isnull)); + our_lsn = DatumGetLSN(SPI_getbinval(SPI_tuptable->vals[0], + SPI_tuptable->tupdesc, + 1, &isnull)); - elog(INFO, "XXX diff is %i", - lsn_diff); + + elog(INFO, "Our LSN is %X/%X", + (uint32) (our_lsn >> 32), + (uint32) our_lsn); SPI_finish(); /* indicate this node has responded to a vote request */ - LWLockAcquire(shared_state->lock, LW_SHARED); shared_state->voting_status = VS_VOTE_REQUEST_RECEIVED; + shared_state->current_electoral_term += 1; + LWLockRelease(shared_state->lock); - PG_RETURN_INT32(lsn_diff); + // should we free "query" here? + + PG_RETURN_LSN(our_lsn); } @@ -225,26 +231,35 @@ get_voting_status(PG_FUNCTION_ARGS) Datum set_voting_status_initiated(PG_FUNCTION_ARGS) { + int electoral_term; + LWLockAcquire(shared_state->lock, LW_SHARED); shared_state->voting_status = VS_VOTE_INITIATED; + shared_state->current_electoral_term += 1; + + electoral_term = shared_state->current_electoral_term; LWLockRelease(shared_state->lock); - PG_RETURN_VOID(); + PG_RETURN_INT32(electoral_term); } Datum other_node_is_candidate(PG_FUNCTION_ARGS) { int requesting_node_id = PG_GETARG_INT32(0); + int electoral_term = PG_GETARG_INT32(1); LWLockAcquire(shared_state->lock, LW_SHARED); - if (shared_state->candidate_node_id != UNKNOWN_NODE_ID) + if (shared_state->current_electoral_term == electoral_term) { - elog(INFO, "node %i requesting candidature, but node %i already candidate", - requesting_node_id, - shared_state->candidate_node_id); - PG_RETURN_BOOL(false); + if (shared_state->candidate_node_id != UNKNOWN_NODE_ID) + { + elog(INFO, "node %i requesting candidature, but node %i already candidate", + requesting_node_id, + shared_state->candidate_node_id); + PG_RETURN_BOOL(false); + } } shared_state->candidate_node_id = requesting_node_id; diff --git a/repmgrd.c b/repmgrd.c index 8746ead0..693833fe 100644 --- a/repmgrd.c +++ b/repmgrd.c @@ -22,6 +22,12 @@ typedef enum { NODE_STATUS_DOWN } NodeStatus; +typedef enum { + ELECTION_NOT_CANDIDATE = -1, + ELECTION_WON, + ELECTION_LOST +} ElectionResult; + static char *config_file = NULL; static bool verbose = false; @@ -37,6 +43,8 @@ static t_node_info upstream_node_info = T_NODE_INFO_INITIALIZER; static PGconn *upstream_conn = NULL; static PGconn *primary_conn = NULL; +static NodeInfoList standby_nodes = T_NODE_INFO_LIST_INITIALIZER; + /* Collate command line errors here for friendlier reporting */ static ItemList cli_errors = { NULL, NULL }; @@ -64,7 +72,9 @@ static void handle_sigint(SIGNAL_ARGS); #endif static PGconn *try_reconnect(const char *conninfo, NodeStatus *node_status); -static NodeVotingStatus do_election(void); +static ElectionResult do_election(void); +static const char *_print_voting_status(NodeVotingStatus voting_status); +static const char *_print_election_result(ElectionResult result); static void close_connections(); static void terminate(int retval); @@ -534,38 +544,29 @@ monitor_streaming_standby(void) /* still down after reconnect attempt(s) - */ if (upstream_node_status == NODE_STATUS_DOWN) { - NodeVotingStatus voting_status = do_election(); - - switch(voting_status) - { - case VS_NO_VOTE: - log_info("NO VOTE"); - break; - case VS_VOTE_REQUEST_RECEIVED: - log_info("VOTE REQUEST RECEIVED"); - break; - case VS_VOTE_INITIATED: - log_info("VOTE REQUEST INITIATED"); - break; - - case VS_VOTE_WON: - log_info("VOTE REQUEST WON"); - break; - case VS_VOTE_LOST: - log_info("VOTE REQUEST LOST"); - break; - - case VS_UNKNOWN: - log_info("VOTE REQUEST UNKNOWN"); - break; - } - // begin voting process - // if VS_PROMOTION_CANDIDATE + ElectionResult election_result = do_election(); + + log_debug("election result: %s", _print_election_result(election_result)); + + if (election_result == ELECTION_WON) + { + log_info("I am the winner, will now promote self and inform other nodes"); + } + else if (election_result == ELECTION_LOST) + { + log_info("I am the candidate but did not get all votes; will now determine the best candidate"); + } + else + { + log_info("I am a follower and am waiting to be informed by the winner"); + } + // if ELECTION_WON // promote self, notify nodes - // else if VS_VOTE_REQUEST_RECEIVED, look for new primary and follow if necessary + // else if ELECTION_NOT_CANDIDATE, wait for new primary notification + // --> need timeout in case new primary doesn't come up, then rerun election } } @@ -576,53 +577,104 @@ monitor_streaming_standby(void) } } + +static const char * +_print_voting_status(NodeVotingStatus voting_status) +{ + switch(voting_status) + { + case VS_NO_VOTE: + return "NO VOTE"; + + case VS_VOTE_REQUEST_RECEIVED: + return "VOTE REQUEST RECEIVED"; + + case VS_VOTE_INITIATED: + return "VOTE REQUEST INITIATED"; + + case VS_UNKNOWN: + return "VOTE REQUEST UNKNOWN"; + } + + return "UNKNOWN VOTE REQUEST STATE"; +} + +static const char * +_print_election_result(ElectionResult result) +{ + switch(result) + { + case ELECTION_NOT_CANDIDATE: + return "NOT CANDIDATE"; + + case ELECTION_WON: + return "WON"; + + case ELECTION_LOST: + return "LOST"; + } + + /* should never reach here */ + return "UNKNOWN"; +} + + + // store lsndiffs, in the event we're not the best node, // i.e. don't get all the votes, we pass the baton to the best node -static NodeVotingStatus +static ElectionResult do_election(void) { + int electoral_term = -1; + // int total_eligible_nodes = 0; int votes_for_me = 0; /* we're visible */ int visible_nodes = 1; - XLogRecPtr last_wal_receive_lsn = InvalidXLogRecPtr; // get voting status from shared memory // should be "VS_NO_VOTE" or "VS_VOTE_REQUEST_RECEIVED" // if VS_NO_VOTE, initiate voting process NodeVotingStatus voting_status; - NodeInfoList standby_nodes = T_NODE_INFO_LIST_INITIALIZER; NodeInfoListCell *cell; - long unsigned rand_wait = (long) ((rand() % 50) + 10) * 10000; - bool other_node_is_candidate = false; - log_debug("do_election(): sleeping %li", rand_wait); + /* sleep for a random period of 100 ~ 500 ms + * XXX adjust this downwards if feasible + */ + + long unsigned rand_wait = (long) ((rand() % 50) + 10) * 10000; + + log_debug("do_election(): sleeping %lu", rand_wait); pg_usleep(rand_wait); + local_node_info.last_wal_receive_lsn = InvalidXLogRecPtr; + log_debug("do_election(): executing get_voting_status()"); voting_status = get_voting_status(local_conn); - log_debug("do_election(): node voting status is %i", (int)voting_status); + log_debug("do_election(): node voting status is %s", _print_voting_status(voting_status)); if (voting_status == VS_VOTE_REQUEST_RECEIVED) { log_debug("vote request already received, not candidate"); /* we've already been requested to vote, so can't become a candidate */ - return voting_status; + return ELECTION_NOT_CANDIDATE; } - // XXX should we mark ourselves as candidate? - // -> so any further vote requests are rejected? - set_voting_status_initiated(local_conn); + // mark ourselves as candidate + // -> so any further vote requests are rejected + electoral_term = set_voting_status_initiated(local_conn); /* get all active nodes attached to primary, excluding self */ // XXX include barman node in results + clear_node_info_list(&standby_nodes); + get_active_sibling_node_records(local_conn, local_node_info.node_id, upstream_node_info.node_id, @@ -633,7 +685,7 @@ do_election(void) if (standby_nodes.node_count == 0) { log_debug("no other nodes - we win by default"); - return VS_VOTE_WON; + return ELECTION_WON; } for (cell = standby_nodes.head; cell; cell = cell->next) @@ -649,10 +701,20 @@ do_election(void) continue; } - if (announce_candidature(cell->node_info->conn, &local_node_info, cell->node_info) == false) + /* + * tell the other node we're candidate - if the node has already declared + * itself, we withdraw + * + * XXX check for situations where more than one node could end up as candidate? + */ + // other node: if not candidate in this term, reset state (but don't bump term) + if (announce_candidature(cell->node_info->conn, &local_node_info, cell->node_info, electoral_term) == false) { log_debug("node %i is candidate", cell->node_info->node_id); other_node_is_candidate = true; + + /* don't perform any more checks */ + break; } cell->node_info->is_visible = true; @@ -661,6 +723,10 @@ do_election(void) if (other_node_is_candidate == true) { + clear_node_info_list(&standby_nodes); + + // XXX do this + // unset_voting_status_initiated(local_conn); return VS_NO_VOTE; } @@ -668,26 +734,31 @@ do_election(void) /* check again if we've been asked to vote */ -if (0) -{ - voting_status = get_voting_status(local_conn); - log_debug("do_election(): node voting status is %i", (int)voting_status); - - - if (voting_status == VS_VOTE_REQUEST_RECEIVED) + if (0) { - /* we've already been requested to vote, so can't become a candidate */ - return voting_status; + voting_status = get_voting_status(local_conn); + log_debug("do_election(): node voting status is %i", (int)voting_status); + + + if (voting_status == VS_VOTE_REQUEST_RECEIVED) + { + /* we've already been requested to vote, so can't become a candidate */ + return voting_status; + } } -} /* current node votes for itself by default */ + // XXX check returned LSNs, if one is higher than ours, don't vote for ourselves + // either + votes_for_me += 1; - /* get our lsn*/ - last_wal_receive_lsn = get_last_wal_receive_location(local_conn); + /* get our lsn */ + local_node_info.last_wal_receive_lsn = get_last_wal_receive_location(local_conn); - log_debug("LAST receive lsn = %X/%X", (uint32) (last_wal_receive_lsn >> 32), (uint32) last_wal_receive_lsn); + log_debug("LAST receive lsn = %X/%X", + (uint32) (local_node_info.last_wal_receive_lsn >> 32), + (uint32) local_node_info.last_wal_receive_lsn); /* request vote */ for (cell = standby_nodes.head; cell; cell = cell->next) @@ -699,7 +770,7 @@ if (0) votes_for_me += request_vote(cell->node_info->conn, &local_node_info, cell->node_info, - last_wal_receive_lsn); + electoral_term); PQfinish(cell->node_info->conn); cell->node_info->conn = NULL; @@ -708,9 +779,9 @@ if (0) log_notice(_("%i of of %i votes"), votes_for_me, visible_nodes); if (votes_for_me == visible_nodes) - return VS_VOTE_WON; + return ELECTION_WON; - return VS_VOTE_LOST; + return ELECTION_LOST; } static void @@ -913,7 +984,7 @@ try_reconnect(const char *conninfo, NodeStatus *node_status) int i; // XXX make this all configurable - int max_attempts = 10; + int max_attempts = 5; for (i = 0; i < max_attempts; i++) { diff --git a/voting.h b/voting.h index abad4386..083f293a 100644 --- a/voting.h +++ b/voting.h @@ -9,9 +9,7 @@ typedef enum { VS_UNKNOWN = -1, VS_NO_VOTE, VS_VOTE_REQUEST_RECEIVED, - VS_VOTE_INITIATED, - VS_VOTE_WON, - VS_VOTE_LOST + VS_VOTE_INITIATED } NodeVotingStatus; #endif /* _VOTING_H_ */