From ded8d95e5ac990bf241e0ab92fcd4675bd6af648 Mon Sep 17 00:00:00 2001 From: Ian Barwick Date: Wed, 28 Jun 2017 16:38:41 +0900 Subject: [PATCH] interim commit --- dbutils.c | 192 ++++++++++++++++++++++++++++++++----- dbutils.h | 21 +++- repmgr--4.0.sql | 5 + repmgr.c | 101 +++++++++++++++++++- repmgr.h | 2 +- repmgrd.c | 247 +++++++++++++++++++++++++++++++++++++++++------- voting.h | 16 ++++ 7 files changed, 522 insertions(+), 62 deletions(-) create mode 100644 voting.h diff --git a/dbutils.c b/dbutils.c index 59b844e5..da02c1b9 100644 --- a/dbutils.c +++ b/dbutils.c @@ -24,9 +24,28 @@ static PGconn *_get_primary_connection(PGconn *standby_conn, int *primary_id, c static bool _set_config(PGconn *conn, const char *config_param, const char *sqlquery); static RecordStatus _get_node_record(PGconn *conn, char *sqlquery, t_node_info *node_info); static void _populate_node_record(PGresult *res, t_node_info *node_info, int row); + +static void _populate_node_records(PGresult *res, NodeInfoList *node_list); + static bool _create_update_node_record(PGconn *conn, char *action, t_node_info *node_info); static bool _create_event_record(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details, t_event_info *event_info); +/* ================= */ +/* utility functions */ +/* ================= */ + +XLogRecPtr +parse_lsn(const char *str) +{ + XLogRecPtr ptr = InvalidXLogRecPtr; + uint32 high, low; + + if (sscanf(str, "%x/%x", &high, &low) == 2) + ptr = (((XLogRecPtr)high) << 32) + (XLogRecPtr)low; + + return ptr; +} + /* ==================== */ /* Connection functions */ @@ -835,6 +854,8 @@ int get_server_version(PGconn *conn, char *server_version) { PGresult *res; + int server_version_num; + res = PQexec(conn, "SELECT pg_catalog.current_setting('server_version_num'), " " pg_catalog.current_setting('server_version')"); @@ -848,9 +869,12 @@ get_server_version(PGconn *conn, char *server_version) } if (server_version != NULL) - strcpy(server_version, PQgetvalue(res, 0, 0)); + strcpy(server_version, PQgetvalue(res, 0, 1)); - return atoi(PQgetvalue(res, 0, 0)); + server_version_num = atoi(PQgetvalue(res, 0, 0)); + + PQclear(res); + return server_version_num; } @@ -1332,44 +1356,28 @@ get_local_node_record(PGconn *conn, int node_id, t_node_info *node_info) } -void -get_downstream_node_records(PGconn *conn, int node_id, NodeInfoList *node_list) +static +void _populate_node_records(PGresult *res, NodeInfoList *node_list) { - PQExpBufferData query; - PGresult *result; int i; - initPQExpBuffer(&query); - - appendPQExpBuffer(&query, - " SELECT node_id, type, upstream_node_id, node_name, conninfo, repluser, slot_name, priority, active" - " FROM repmgr.nodes " - " WHERE upstream_node_id = %i " - "ORDER BY node_id ", - node_id); - - log_verbose(LOG_DEBUG, "get_node_records_by_priority():\n%s", query.data); - - result = PQexec(conn, query.data); - termPQExpBuffer(&query); - node_list->head = NULL; node_list->tail = NULL; node_list->node_count = 0; - if (PQresultStatus(result) != PGRES_TUPLES_OK) + if (PQresultStatus(res) != PGRES_TUPLES_OK) { return; } - for (i = 0; i < PQntuples(result); i++) + for (i = 0; i < PQntuples(res); i++) { NodeInfoListCell *cell; cell = (NodeInfoListCell *) pg_malloc0(sizeof(NodeInfoListCell)); cell->node_info = pg_malloc0(sizeof(t_node_info)); - _populate_node_record(result, cell->node_info, i); + _populate_node_record(res, cell->node_info, i); if (node_list->tail) node_list->tail->next = cell; @@ -1384,6 +1392,62 @@ get_downstream_node_records(PGconn *conn, int node_id, NodeInfoList *node_list) } +void +get_downstream_node_records(PGconn *conn, int node_id, NodeInfoList *node_list) +{ + PQExpBufferData query; + PGresult *res; + + initPQExpBuffer(&query); + + appendPQExpBuffer(&query, + " SELECT node_id, type, upstream_node_id, node_name, conninfo, repluser, slot_name, priority, active" + " FROM repmgr.nodes " + " WHERE upstream_node_id = %i " + "ORDER BY node_id ", + node_id); + + log_verbose(LOG_DEBUG, "get_downstream_node_records():\n%s", query.data); + + res = PQexec(conn, query.data); + + termPQExpBuffer(&query); + + _populate_node_records(res, node_list); + + return; +} + + +void +get_active_sibling_node_records(PGconn *conn, int node_id, int upstream_node_id, NodeInfoList *node_list) +{ + PQExpBufferData query; + PGresult *res; + + initPQExpBuffer(&query); + + appendPQExpBuffer(&query, + " SELECT node_id, type, upstream_node_id, node_name, conninfo, repluser, slot_name, priority, active" + " FROM repmgr.nodes " + " WHERE upstream_node_id = %i " + " AND node_id != %i " + " AND active IS TRUE " + "ORDER BY node_id ", + upstream_node_id, + node_id); + + log_verbose(LOG_DEBUG, "get_active_sibling_node_records():\n%s", query.data); + + res = PQexec(conn, query.data); + + termPQExpBuffer(&query); + + _populate_node_records(res, node_list); + + return; +} + bool create_node_record(PGconn *conn, char *repmgr_action, t_node_info *node_info) { @@ -2073,7 +2137,9 @@ get_slot_record(PGconn *conn, char *slot_name, t_replication_slot *record) } +/* ============================ */ /* asynchronous query functions */ +/* ============================ */ bool cancel_query(PGconn *conn, int timeout) @@ -2177,7 +2243,10 @@ wait_connection_availability(PGconn *conn, long long timeout) return -1; } + +/* =========================== */ /* node availability functions */ +/* =========================== */ bool is_server_available(const char *conninfo) @@ -2189,3 +2258,80 @@ is_server_available(const char *conninfo) return false; } + + +/* + * node voting functions + * + * These are intended to run under repmgrd and rely on shared memory + */ + +NodeVotingStatus +get_voting_status(PGconn *conn) +{ + PGresult *res; + NodeVotingStatus voting_status; + + res = PQexec(conn, "SELECT repmgr.get_voting_status()"); + + if (!res || PQresultStatus(res) != PGRES_TUPLES_OK) + { + log_error(_("unable to query repmgr.get_voting_status():\n %s"), + PQerrorMessage(conn)); + PQclear(res); + return VS_UNKNOWN; + } + + voting_status = atoi(PQgetvalue(res, 0, 0)); + + PQclear(res); + return voting_status; +} + +int request_vote(PGconn *conn, int this_node_id, int this_node_priority, XLogRecPtr last_wal_receive_lsn) +{ + PQExpBufferData query; + PGresult *res; + int vote; + + initPQExpBuffer(&query); + + appendPQExpBuffer(&query, + "SELECT repmgr.request_vote(%i, %i, '%X/%X'::pg_lsn)", + this_node_id, + this_node_priority, + (uint32) (last_wal_receive_lsn >> 32), + (uint32) last_wal_receive_lsn); + + res = PQexec(conn, query.data); + + termPQExpBuffer(&query); + + vote = (strcmp(PQgetvalue(res, 0, 0), "t") == 0) ? 1 : 0; + + PQclear(res); + return vote; +} + + +/* ============================ */ +/* replication status functions */ +/* ============================ */ + +XLogRecPtr +get_last_wal_receive_location(PGconn *conn) +{ + PGresult *res; + XLogRecPtr ptr = InvalidXLogRecPtr; + + res = PQexec(conn, "SELECT pg_catalog.pg_last_wal_receive_lsn()"); + + if (PQresultStatus(res) == PGRES_TUPLES_OK) + { + ptr = parse_lsn(PQgetvalue(res, 0, 0)); + } + + PQclear(res); + + return ptr; +} diff --git a/dbutils.h b/dbutils.h index bc8d5366..8b93939e 100644 --- a/dbutils.h +++ b/dbutils.h @@ -14,6 +14,7 @@ #include "config.h" #include "strutil.h" +#include "voting.h" typedef enum { UNKNOWN = 0, @@ -63,6 +64,7 @@ typedef struct s_node_info bool is_ready; bool is_visible; XLogRecPtr xlog_location; + PGconn *conn; } t_node_info; @@ -78,7 +80,8 @@ typedef struct s_node_info true, \ false, \ false, \ - InvalidXLogRecPtr \ + InvalidXLogRecPtr, \ + NULL \ } @@ -142,6 +145,9 @@ typedef struct s_connection_user bool is_superuser; } t_connection_user; +/* utility functions */ + +XLogRecPtr parse_lsn(const char *str); /* connection functions */ @@ -210,6 +216,7 @@ RecordStatus get_node_record_by_name(PGconn *conn, const char *node_name, t_node bool get_local_node_record(PGconn *conn, int node_id, t_node_info *node_info); bool get_primary_node_record(PGconn *conn, t_node_info *node_info); void get_downstream_node_records(PGconn *conn, int node_id, NodeInfoList *nodes); +void get_active_sibling_node_records(PGconn *conn, int node_id, int upstream_node_id, NodeInfoList *node_list); bool create_node_record(PGconn *conn, char *repmgr_action, t_node_info *node_info); bool update_node_record(PGconn *conn, char *repmgr_action, t_node_info *node_info); @@ -218,6 +225,8 @@ bool delete_node_record(PGconn *conn, int node); bool update_node_record_set_primary(PGconn *conn, int this_node_id); bool update_node_record_status(PGconn *conn, int this_node_id, char *type, int upstream_node_id, bool active); + + /* event record functions */ bool create_event_record(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details); bool create_event_record_extended(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details, t_event_info *event_info); @@ -235,5 +244,13 @@ int wait_connection_availability(PGconn *conn, long long timeout); bool is_server_available(const char *conninfo); -#endif +/* node voting functions */ +NodeVotingStatus get_voting_status(PGconn *conn); +int request_vote(PGconn *conn, int this_node_id, int this_node_priority, XLogRecPtr last_wal_receive_lsn); + +/* replication status functions */ + +XLogRecPtr get_last_wal_receive_location(PGconn *conn); + +#endif /* dbutils.h */ diff --git a/repmgr--4.0.sql b/repmgr--4.0.sql index 48489b60..ac57e98f 100644 --- a/repmgr--4.0.sql +++ b/repmgr--4.0.sql @@ -37,3 +37,8 @@ LEFT JOIN nodes un CREATE FUNCTION request_vote(int) RETURNS boolean AS '$libdir/repmgr', 'request_vote' LANGUAGE C STRICT; + + +CREATE FUNCTION get_voting_status() RETURNS int +AS '$libdir/repmgr', 'get_voting_status' +LANGUAGE C STRICT; diff --git a/repmgr.c b/repmgr.c index 84097917..97c77f78 100644 --- a/repmgr.c +++ b/repmgr.c @@ -20,17 +20,43 @@ #include "utils/builtins.h" #include "utils/timestamp.h" +#include "voting.h" + #define MAXFNAMELEN 64 +#define TRANCHE_NAME "repmgrd" PG_MODULE_MAGIC; +typedef enum { + LEADER_NODE, + FOLLOWER_NODE, + CANDIDATE_NODE +} NodeState; + +typedef struct repmgrdSharedState +{ + LWLockId lock; /* protects search/modification */ + NodeState node_state; + NodeVotingStatus voting_status; +} repmgrdSharedState; + +static repmgrdSharedState *shared_state = NULL; + +static shmem_startup_hook_type prev_shmem_startup_hook = NULL; + void _PG_init(void); void _PG_fini(void); +static void repmgr_shmem_startup(void); + Datum request_vote(PG_FUNCTION_ARGS); PG_FUNCTION_INFO_V1(request_vote); +Datum get_voting_status(PG_FUNCTION_ARGS); +PG_FUNCTION_INFO_V1(get_voting_status); + + /* * Module load callback */ @@ -38,6 +64,25 @@ void _PG_init(void) { elog(INFO, "repmgr init"); + + // error here? + if (!process_shared_preload_libraries_in_progress) + return; + + RequestAddinShmemSpace(MAXALIGN(sizeof(repmgrdSharedState))); + +#if (PG_VERSION_NUM >= 90600) + RequestNamedLWLockTranche(TRANCHE_NAME, 1); +#else + RequestAddinLWLocks(1); +#endif + + /* + * Install hooks. + */ + prev_shmem_startup_hook = shmem_startup_hook; + shmem_startup_hook = repmgr_shmem_startup; + } /* * Module unload callback @@ -45,7 +90,47 @@ _PG_init(void) void _PG_fini(void) { - elog(INFO, "repmgr fini"); + /* Uninstall hook */ + shmem_startup_hook = prev_shmem_startup_hook; +} + + +/* + * shmem_startup hook: allocate or attach to shared memory, + */ +static void +repmgr_shmem_startup(void) +{ + bool found; + + if (prev_shmem_startup_hook) + prev_shmem_startup_hook(); + + /* reset in case this is a restart within the postmaster */ + shared_state = NULL; + + /* + * Create or attach to the shared memory state, including hash table + */ + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); + + shared_state = ShmemInitStruct("repmgrd shared state", + sizeof(repmgrdSharedState), + &found); + + if (!found) + { + /* First time through ... */ +#if (PG_VERSION_NUM >= 90600) + shared_state->lock = &(GetNamedLWLockTranche(TRANCHE_NAME))->lock; +#else + shared_state->lock = LWLockAssign(); +#endif + + shared_state->voting_status = VS_NO_VOTE; + } + + LWLockRelease(AddinShmemInitLock); } @@ -58,3 +143,17 @@ request_vote(PG_FUNCTION_ARGS) PG_RETURN_BOOL(true); } + + + +Datum +get_voting_status(PG_FUNCTION_ARGS) +{ + NodeVotingStatus voting_status; + + LWLockAcquire(shared_state->lock, LW_SHARED); + voting_status = shared_state->voting_status; + LWLockRelease(shared_state->lock); + + PG_RETURN_INT32(voting_status); +} diff --git a/repmgr.h b/repmgr.h index 1b143bbd..d76f1123 100644 --- a/repmgr.h +++ b/repmgr.h @@ -50,4 +50,4 @@ #define WITNESS_DEFAULT_PORT "5499" /* If this value is ever changed, remember * to update comments and documentation */ -#endif +#endif /* _REPMGR_H_ */ diff --git a/repmgrd.c b/repmgrd.c index 575900e2..8f33e51d 100644 --- a/repmgrd.c +++ b/repmgrd.c @@ -4,13 +4,13 @@ * Copyright (c) 2ndQuadrant, 2010-2017 */ -#include "repmgr.h" -#include "config.h" - #include #include #include +#include "repmgr.h" +#include "config.h" +#include "voting.h" #define OPT_HELP 1 @@ -31,7 +31,9 @@ t_configuration_options config_file_options = T_CONFIGURATION_OPTIONS_INITIALIZE static t_node_info local_node_info = T_NODE_INFO_INITIALIZER; static PGconn *local_conn = NULL; -static PGconn *primary_conn = NULL; +static t_node_info upstream_node_info = T_NODE_INFO_INITIALIZER; +static PGconn *upstream_conn = NULL; +static PGconn *primary_conn = NULL; /* Collate command line errors here for friendlier reporting */ static ItemList cli_errors = { NULL, NULL }; @@ -59,6 +61,9 @@ static void handle_sighup(SIGNAL_ARGS); static void handle_sigint(SIGNAL_ARGS); #endif +static PGconn *try_reconnect(const char *conninfo, NodeStatus *node_status); +static NodeVotingStatus do_election(void); + static void close_connections(); static void terminate(int retval); @@ -267,6 +272,7 @@ main(int argc, char **argv) log_info(_("connecting to database \"%s\""), config_file_options.conninfo); + /* abort if local node not available at startup */ local_conn = establish_db_connection(config_file_options.conninfo, true); /* @@ -383,7 +389,6 @@ main(int argc, char **argv) static void start_monitoring(void) { - log_notice(_("starting monitoring of node \"%s\" (ID: %i)"), local_node_info.node_name, local_node_info.node_id); @@ -428,7 +433,6 @@ monitor_streaming_primary(void) log_notice(_("monitoring cluster primary \"%s\" (node ID: %i)"), local_node_info.node_name, local_node_info.node_id); - } while (true) @@ -436,47 +440,30 @@ monitor_streaming_primary(void) // cache node list here, refresh at `node_list_refresh_interval` if (is_server_available(local_node_info.conninfo) == false) { + /* node is down, we were expecting it to be up */ if (node_status == NODE_STATUS_UP) { - int i; - - int max_attempts = 30; + // log disconnect event + log_warning(_("unable to connect to local node")); node_status = NODE_STATUS_UNKNOWN; - log_warning(_("unable to connect to local node")); - PQfinish(local_conn); - for (i = 0; i < max_attempts; i++) + + local_conn = try_reconnect(local_node_info.conninfo, &node_status); + + if (node_status == NODE_STATUS_UP) { - log_info(_("checking state of local node, %i of %i attempts"), i, max_attempts); - if (is_server_available(local_node_info.conninfo) == true) - { - log_notice(_("local node has recovered, reconnecting")); - - local_conn = establish_db_connection(local_node_info.conninfo, true); - - if (PQstatus(local_conn) == CONNECTION_OK) - { - // log reconnect event - node_status = NODE_STATUS_UP; - - goto loop; - } - - PQfinish(local_conn); - log_notice(_("unable to reconnect to local node")); - } - sleep(1); + // log reconnect event, details + log_notice(_("reconnected to local node")); + goto loop; } - - log_warning(_("unable to reconnect to local node after %i attempts"), max_attempts); - node_status = NODE_STATUS_DOWN; } if (node_status == NODE_STATUS_DOWN) { // attempt to find another node from cached list + // loop, if starts up check status, switch monitoring mode } } @@ -490,17 +477,169 @@ monitor_streaming_primary(void) static void monitor_streaming_standby(void) { - t_node_info upstream_node_info = T_NODE_INFO_INITIALIZER; + + NodeStatus upstream_node_status = NODE_STATUS_UP; // check result (void) get_node_record(local_conn, local_node_info.upstream_node_id, &upstream_node_info); + // check result, fail if not up (must start on running node) + upstream_conn = establish_db_connection(config_file_options.conninfo, false); + + // fix for cascaded standbys + primary_conn = upstream_conn; + + /* Log startup event */ + if (startup_event_logged == false) + { + create_event_record(upstream_conn, + &config_file_options, + config_file_options.node_id, + "repmgrd_start", + true, + NULL); + startup_event_logged = true; + + log_notice(_("monitoring node \"%s\" (node ID: %i)"), + upstream_node_info.node_name, + upstream_node_info.node_id); + } + + while (true) { + if (is_server_available(upstream_node_info.conninfo) == false) + { + /* upstream node is down, we were expecting it to be up */ + if (upstream_node_status == NODE_STATUS_UP) + { + // log disconnect event + log_warning(_("unable to connect to upstream node")); + upstream_node_status = NODE_STATUS_UNKNOWN; + + PQfinish(upstream_conn); + upstream_conn = try_reconnect(upstream_node_info.conninfo, &upstream_node_status); + + if (upstream_node_status == NODE_STATUS_UP) + { + // log reconnect event + log_notice(_("reconnected to upstream node")); + goto loop; + } + + /* still down after reconnect attempt(s) - */ + if (upstream_node_status == NODE_STATUS_DOWN) + { + do_election(); + // begin voting process + + // if VS_PROMOTION_CANDIDATE + // promote self, notify nodes + + // else if VS_VOTE_REQUEST_RECEIVED, look for new primary and follow if necessary + } + + } + } + + loop: sleep(1); } } + +static NodeVotingStatus +do_election(void) +{ + int total_eligible_nodes = 0; + /* current node votes for itself by default */ + int votes_for_me = 1; + + /* we're visible */ + int visible_nodes = 1; + + XLogRecPtr last_wal_receive_lsn = InvalidXLogRecPtr; + + // get voting status from shared memory + // should be "VS_NO_VOTE" or "VS_VOTE_REQUEST_RECEIVED" + // if VS_NO_VOTE, initiate voting process + NodeVotingStatus voting_status; + + NodeInfoList standby_nodes = T_NODE_INFO_LIST_INITIALIZER; + NodeInfoListCell *cell; + + voting_status = get_voting_status(local_conn); + log_debug("do_election(): node voting status is %i", (int)voting_status); + + if (voting_status == VS_VOTE_REQUEST_RECEIVED) + { + /* we've already been requested to vote, so can't become a candidate */ + return voting_status; + } + + /* get all active nodes attached to primary, excluding self */ + // XXX include barman node in results + + get_active_sibling_node_records(local_conn, + local_node_info.node_id, + upstream_node_info.node_id, + &standby_nodes); + + /* no other standbys - win by default */ + + if (standby_nodes.node_count == 0) + { + log_debug("no other nodes - we win by default"); + return VS_VOTE_WON; + } + + for (cell = standby_nodes.head; cell; cell = cell->next) + { + /* assume the worst case */ + cell->node_info->is_visible = false; + + // XXX handle witness-barman + cell->node_info->conn = establish_db_connection(local_node_info.conninfo, false); + + if (PQstatus(cell->node_info->conn) != CONNECTION_OK) + { + continue; + } + + cell->node_info->is_visible = true; + visible_nodes ++; + } + + // XXX check if > 50% visible + + /* check if we've been asked to vote again */ + // XXX do that + + // XXX should we mark ourselves as candidate? + // -> so any further vote requests are rejected? + + + /* get our lsn*/ + + last_wal_receive_lsn = get_last_wal_receive_location(local_conn); + + log_debug("LAST receive lsn = %X/%X", (uint32) (last_wal_receive_lsn >> 32), (uint32) last_wal_receive_lsn); + /* request vote */ + + for (cell = standby_nodes.head; cell; cell = cell->next) + { + /* ignore unreachable nodes */ + if (cell->node_info->is_visible == false) + continue; + votes_for_me += request_vote(cell->node_info->conn, + local_node_info.node_id, + local_node_info.priority, + last_wal_receive_lsn); + } + + return VS_VOTE_WON; +} + static void daemonize_process(void) { @@ -693,6 +832,44 @@ show_help(void) printf(_("%s monitors a cluster of servers and optionally performs failover.\n"), progname()); } +static PGconn * +try_reconnect(const char *conninfo, NodeStatus *node_status) +{ + PGconn *conn; + + int i; + + // XXX make this all configurable + int max_attempts = 15; + + for (i = 0; i < max_attempts; i++) + { + log_info(_("checking state of node, %i of %i attempts"), i, max_attempts); + if (is_server_available(conninfo) == true) + { + log_notice(_("node has recovered, reconnecting")); + + // XXX how to handle case where node is reachable + // but connection denied due to connection exhaustion + conn = establish_db_connection(conninfo, false); + if (PQstatus(conn) == CONNECTION_OK) + { + *node_status = NODE_STATUS_UP; + return conn; + } + + PQfinish(conn); + log_notice(_("unable to reconnect to node")); + } + sleep(1); + } + + + log_warning(_("unable to reconnect to node after %i attempts"), max_attempts); + *node_status = NODE_STATUS_DOWN; + return NULL; +} + static void close_connections() diff --git a/voting.h b/voting.h new file mode 100644 index 00000000..5ddd4a08 --- /dev/null +++ b/voting.h @@ -0,0 +1,16 @@ +/* + * voting.h + * Copyright (c) 2ndQuadrant, 2010-2017 + */ +#ifndef _VOTING_H_ +#define _VOTING_H_ + +typedef enum { + VS_UNKNOWN = -1, + VS_NO_VOTE, + VS_VOTE_REQUEST_RECEIVED, + VS_VOTE_INITIATED, + VS_VOTE_WON +} NodeVotingStatus; + +#endif /* _VOTING_H_ */