From 1615353f48e3f4d0aa1ed4d5b65bec83381847d1 Mon Sep 17 00:00:00 2001 From: Ian Barwick Date: Fri, 1 Mar 2019 18:20:44 +0900 Subject: [PATCH] repmgrd: optionally disconnect WAL receivers during failover This is intended to ensure that all nodes have a constant LSN while making the failover decision. This feature is experimental and needs to be explicitly enabled with the configuration file option "standby_disconnect_on_failover". Note enabling this option will result in a delay in the failover decision until the WAL receiver is disconnected on all nodes. --- configfile.c | 3 + configfile.h | 5 +- dbutils.c | 75 ++++++++++++++++++++++++- dbutils.h | 3 + repmgr--4.2--4.3.sql | 5 ++ repmgr--4.3.sql | 10 ++++ repmgr-action-node.c | 33 +++++++++++ repmgr-action-node.h | 1 + repmgr-client-global.h | 4 +- repmgr-client.c | 21 ++++++- repmgr-client.h | 28 ++++++---- repmgr.c | 16 ++++++ repmgr.h | 2 + repmgrd-physical.c | 90 +++++++++++++++++++++++++++++- sysutils.c | 121 +++++++++++++++++++++++++++++++++++++++++ sysutils.h | 4 ++ 16 files changed, 404 insertions(+), 17 deletions(-) diff --git a/configfile.c b/configfile.c index 677e1920..fdbfe5fc 100644 --- a/configfile.c +++ b/configfile.c @@ -358,6 +358,7 @@ _parse_config(t_configuration_options *options, ItemList *error_list, ItemList * options->primary_notification_timeout = DEFAULT_PRIMARY_NOTIFICATION_TIMEOUT; options->repmgrd_standby_startup_timeout = -1; /* defaults to "standby_reconnect_timeout" if not set */ memset(options->repmgrd_pid_file, 0, sizeof(options->repmgrd_pid_file)); + options->standby_disconnect_on_failover = false; options->connection_check_type = CHECK_PING; /*------------- @@ -619,6 +620,8 @@ _parse_config(t_configuration_options *options, ItemList *error_list, ItemList * options->repmgrd_standby_startup_timeout = repmgr_atoi(value, name, error_list, 0); else if (strcmp(name, "repmgrd_pid_file") == 0) strncpy(options->repmgrd_pid_file, value, MAXPGPATH); + else if (strcmp(name, "standby_disconnect_on_failover") == 0) + options->standby_disconnect_on_failover = parse_bool(value, name, error_list); else if (strcmp(name, "connection_check_type") == 0) { if (strcasecmp(value, "ping") == 0) diff --git a/configfile.h b/configfile.h index 34e220f7..319a5959 100644 --- a/configfile.h +++ b/configfile.h @@ -141,6 +141,7 @@ typedef struct int primary_notification_timeout; int repmgrd_standby_startup_timeout; char repmgrd_pid_file[MAXPGPATH]; + bool standby_disconnect_on_failover; ConnectionCheckType connection_check_type; /* BDR settings */ @@ -212,8 +213,8 @@ typedef struct DEFAULT_RECONNECTION_INTERVAL, \ false, -1, \ DEFAULT_ASYNC_QUERY_TIMEOUT, \ - DEFAULT_PRIMARY_NOTIFICATION_TIMEOUT, \ - -1, "", CHECK_PING, \ + DEFAULT_PRIMARY_NOTIFICATION_TIMEOUT, \ + -1, "", false, CHECK_PING, \ /* BDR settings */ \ false, DEFAULT_BDR_RECOVERY_TIMEOUT, \ /* service settings */ \ diff --git a/dbutils.c b/dbutils.c index 3b9f267a..7074625f 100644 --- a/dbutils.c +++ b/dbutils.c @@ -1079,7 +1079,7 @@ get_pg_setting(PGconn *conn, const char *setting, char *output) } else { - /* XXX highly unlikely this would ever happen */ + /* highly unlikely this would ever happen */ log_error(_("get_pg_setting(): unknown parameter \"%s\""), PQgetvalue(res, i, 0)); } } @@ -1096,6 +1096,56 @@ get_pg_setting(PGconn *conn, const char *setting, char *output) } +bool +alter_system_int(PGconn *conn, const char *name, int value) +{ + PQExpBufferData query; + PGresult *res = NULL; + bool success = false; + + initPQExpBuffer(&query); + appendPQExpBuffer(&query, + "ALTER SYSTEM SET %s = %i", + name, value); + + res = PQexec(conn, query.data); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + log_db_error(conn, query.data, _("alter_system_int() - unable to execute query")); + + success = false; + } + + + termPQExpBuffer(&query); + PQclear(res); + + return success; +} + + +bool +pg_reload_conf(PGconn *conn) +{ + PGresult *res = NULL; + bool success = false; + + res = PQexec(conn, "SELECT pg_catalog.pg_reload_conf()"); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + log_db_error(conn, NULL, _("pg_reload_conf() - unable to execute query")); + + success = false; + } + + PQclear(res); + + return success; +} + + /* ============================ */ /* Server information functions */ /* ============================ */ @@ -1859,6 +1909,29 @@ repmgrd_pause(PGconn *conn, bool pause) return success; } +pid_t +get_wal_receiver_pid(PGconn *conn) +{ + PGresult *res = NULL; + pid_t wal_receiver_pid = UNKNOWN_PID; + + res = PQexec(conn, "SELECT repmgr.get_wal_receiver_pid()"); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + log_error(_("unable to execute \"SELECT repmgr.get_wal_receiver_pid()\"")); + log_detail("%s", PQerrorMessage(conn)); + } + else if (!PQgetisnull(res, 0, 0)) + { + wal_receiver_pid = atoi(PQgetvalue(res, 0, 0)); + } + + PQclear(res); + + return wal_receiver_pid; +} + /* ================ */ /* result functions */ /* ================ */ diff --git a/dbutils.h b/dbutils.h index 268b3fb8..f4d9ea0f 100644 --- a/dbutils.h +++ b/dbutils.h @@ -415,6 +415,8 @@ bool set_config_bool(PGconn *conn, const char *config_param, bool state); int guc_set(PGconn *conn, const char *parameter, const char *op, const char *value); int guc_set_typed(PGconn *conn, const char *parameter, const char *op, const char *value, const char *datatype); bool get_pg_setting(PGconn *conn, const char *setting, char *output); +bool alter_system_int(PGconn *conn, const char *name, int value); +bool pg_reload_conf(PGconn *conn); /* server information functions */ bool get_cluster_size(PGconn *conn, char *size); @@ -436,6 +438,7 @@ pid_t repmgrd_get_pid(PGconn *conn); bool repmgrd_is_running(PGconn *conn); bool repmgrd_is_paused(PGconn *conn); bool repmgrd_pause(PGconn *conn, bool pause); +pid_t get_wal_receiver_pid(PGconn *conn); /* extension functions */ ExtensionStatus get_repmgr_extension_status(PGconn *conn, t_extension_versions *extversions); diff --git a/repmgr--4.2--4.3.sql b/repmgr--4.2--4.3.sql index 962957c0..96a09196 100644 --- a/repmgr--4.2--4.3.sql +++ b/repmgr--4.2--4.3.sql @@ -10,3 +10,8 @@ CREATE FUNCTION get_upstream_last_seen() RETURNS INT AS 'MODULE_PATHNAME', 'get_upstream_last_seen' LANGUAGE C STRICT; + +CREATE FUNCTION get_wal_receiver_pid() + RETURNS INT + AS 'MODULE_PATHNAME', 'get_wal_receiver_pid' + LANGUAGE C STRICT; diff --git a/repmgr--4.3.sql b/repmgr--4.3.sql index ce6ccfc3..8496b607 100644 --- a/repmgr--4.3.sql +++ b/repmgr--4.3.sql @@ -128,6 +128,7 @@ CREATE FUNCTION get_upstream_last_seen() AS 'MODULE_PATHNAME', 'get_upstream_last_seen' LANGUAGE C STRICT; + /* failover functions */ CREATE FUNCTION notify_follow_primary(INT) @@ -185,6 +186,15 @@ CREATE FUNCTION repmgrd_is_paused() AS 'MODULE_PATHNAME', 'repmgrd_is_paused' LANGUAGE C STRICT; +CREATE FUNCTION get_wal_receiver_pid() + RETURNS INT + AS 'MODULE_PATHNAME', 'get_wal_receiver_pid' + LANGUAGE C STRICT; + + + + +/* views */ CREATE VIEW repmgr.replication_status AS SELECT m.primary_node_id, m.standby_node_id, n.node_name AS standby_name, diff --git a/repmgr-action-node.c b/repmgr-action-node.c index 63b98ee2..7f26d929 100644 --- a/repmgr-action-node.c +++ b/repmgr-action-node.c @@ -2681,6 +2681,39 @@ do_node_rejoin(void) } +/* + * Currently for testing purposes only, not documented; + * use at own risk! + */ + +void +do_node_control(void) +{ + PGconn *conn = NULL; + pid_t wal_receiver_pid = UNKNOWN_PID; + conn = establish_db_connection(config_file_options.conninfo, true); + + if (runtime_options.disable_wal_receiver == true) + { + wal_receiver_pid = disable_wal_receiver(conn); + + PQfinish(conn); + + if (wal_receiver_pid == UNKNOWN_PID) + exit(ERR_BAD_CONFIG); + + exit(SUCCESS); + } + + if (runtime_options.enable_wal_receiver == true) + { + wal_receiver_pid = enable_wal_receiver(conn); + } + + PQfinish(conn); +} + + /* * For "internal" use by `node rejoin` on the local node when * called by "standby switchover" from the remote node. diff --git a/repmgr-action-node.h b/repmgr-action-node.h index d6bec2a1..d1553235 100644 --- a/repmgr-action-node.h +++ b/repmgr-action-node.h @@ -24,6 +24,7 @@ extern void do_node_check(void); extern void do_node_rejoin(void); extern void do_node_service(void); +extern void do_node_control(void); extern void do_node_help(void); diff --git a/repmgr-client-global.h b/repmgr-client-global.h index e7c70b37..46dfb66e 100644 --- a/repmgr-client-global.h +++ b/repmgr-client-global.h @@ -135,6 +135,8 @@ typedef struct /* following options for internal use */ char config_archive_dir[MAXPGPATH]; OutputMode output_mode; + bool disable_wal_receiver; + bool enable_wal_receiver; } t_runtime_options; #define T_RUNTIME_OPTIONS_INITIALIZER { \ @@ -174,7 +176,7 @@ typedef struct /* "cluster cleanup" options */ \ 0, \ /* following options for internal use */ \ - "/tmp", OM_TEXT \ + "/tmp", OM_TEXT, false, false \ } diff --git a/repmgr-client.c b/repmgr-client.c index 0df6f420..33091274 100644 --- a/repmgr-client.c +++ b/repmgr-client.c @@ -31,6 +31,7 @@ * NODE CHECK * NODE REJOIN * NODE SERVICE + * NODE CONTROL * * DAEMON STATUS * DAEMON PAUSE @@ -624,7 +625,7 @@ main(int argc, char **argv) break; - /*-------------- + /*--------------- * output options *--------------- */ @@ -640,6 +641,19 @@ main(int argc, char **argv) runtime_options.optformat = true; break; + /*--------------------------------- + * undocumented options for testing + *---------------------------------- + */ + + case OPT_DISABLE_WAL_RECEIVER: + runtime_options.disable_wal_receiver = true; + break; + + case OPT_ENABLE_WAL_RECEIVER: + runtime_options.enable_wal_receiver = true; + break; + /*----------------------------- * options deprecated since 3.3 *----------------------------- @@ -912,6 +926,8 @@ main(int argc, char **argv) action = NODE_REJOIN; else if (strcasecmp(repmgr_action, "SERVICE") == 0) action = NODE_SERVICE; + else if (strcasecmp(repmgr_action, "CONTROL") == 0) + action = NODE_CONTROL; } else if (strcasecmp(repmgr_command, "CLUSTER") == 0) @@ -1335,6 +1351,9 @@ main(int argc, char **argv) case NODE_SERVICE: do_node_service(); break; + case NODE_CONTROL: + do_node_control(); + break; /* CLUSTER */ case CLUSTER_SHOW: diff --git a/repmgr-client.h b/repmgr-client.h index 8a8bbf15..ed4bbf68 100644 --- a/repmgr-client.h +++ b/repmgr-client.h @@ -40,16 +40,17 @@ #define NODE_CHECK 14 #define NODE_SERVICE 15 #define NODE_REJOIN 16 -#define CLUSTER_SHOW 17 -#define CLUSTER_CLEANUP 18 -#define CLUSTER_MATRIX 19 -#define CLUSTER_CROSSCHECK 20 -#define CLUSTER_EVENT 21 -#define DAEMON_STATUS 22 -#define DAEMON_PAUSE 23 -#define DAEMON_UNPAUSE 24 -#define DAEMON_START 25 -#define DAEMON_STOP 26 +#define NODE_CONTROL 17 +#define CLUSTER_SHOW 18 +#define CLUSTER_CLEANUP 19 +#define CLUSTER_MATRIX 20 +#define CLUSTER_CROSSCHECK 21 +#define CLUSTER_EVENT 22 +#define DAEMON_STATUS 23 +#define DAEMON_PAUSE 24 +#define DAEMON_UNPAUSE 25 +#define DAEMON_START 26 +#define DAEMON_STOP 27 /* command line options without short versions */ #define OPT_HELP 1001 @@ -97,7 +98,8 @@ #define OPT_VERSION_NUMBER 1043 #define OPT_DATA_DIRECTORY_CONFIG 1044 #define OPT_COMPACT 1045 - +#define OPT_DISABLE_WAL_RECEIVER 1046 +#define OPT_ENABLE_WAL_RECEIVER 1047 /* deprecated since 3.3 */ #define OPT_DATA_DIR 999 @@ -202,6 +204,10 @@ static struct option long_options[] = /* "cluster cleanup" options */ {"keep-history", required_argument, NULL, 'k'}, +/* undocumented options for testing */ + {"disable-wal-receiver", no_argument, NULL, OPT_DISABLE_WAL_RECEIVER}, + {"enable-wal-receiver", no_argument, NULL, OPT_ENABLE_WAL_RECEIVER}, + /* deprecated */ {"check-upstream-config", no_argument, NULL, OPT_CHECK_UPSTREAM_CONFIG}, {"no-conninfo-password", no_argument, NULL, OPT_NO_CONNINFO_PASSWORD}, diff --git a/repmgr.c b/repmgr.c index 0a73b5fe..a6f49607 100644 --- a/repmgr.c +++ b/repmgr.c @@ -147,6 +147,8 @@ PG_FUNCTION_INFO_V1(repmgrd_pause); Datum repmgrd_is_paused(PG_FUNCTION_ARGS); PG_FUNCTION_INFO_V1(repmgrd_is_paused); +Datum get_wal_receiver_pid(PG_FUNCTION_ARGS); +PG_FUNCTION_INFO_V1(get_wal_receiver_pid); /* @@ -740,3 +742,17 @@ repmgrd_is_paused(PG_FUNCTION_ARGS) PG_RETURN_BOOL(is_paused); } + + +Datum +get_wal_receiver_pid(PG_FUNCTION_ARGS) +{ + int wal_receiver_pid; + + if (!shared_state) + PG_RETURN_NULL(); + + wal_receiver_pid = WalRcv->pid; + + PG_RETURN_INT32(wal_receiver_pid); +} diff --git a/repmgr.h b/repmgr.h index 25586020..ceac63aa 100644 --- a/repmgr.h +++ b/repmgr.h @@ -92,6 +92,8 @@ #define DEFAULT_NODE_REJOIN_TIMEOUT 60 /* seconds */ #define DEFAULT_WAL_RECEIVE_CHECK_TIMEOUT 30 /* seconds */ +#define WALRECEIVER_DISABLE_TIMEOUT_VALUE 86400000 /* milliseconds */ + #ifndef RECOVERY_COMMAND_FILE #define RECOVERY_COMMAND_FILE "recovery.conf" #endif diff --git a/repmgrd-physical.c b/repmgrd-physical.c index 2ffdaa92..6b9c4e29 100644 --- a/repmgrd-physical.c +++ b/repmgrd-physical.c @@ -87,7 +87,7 @@ static void update_monitoring_history(void); static void handle_sighup(PGconn **conn, t_server_type server_type); -static const char * format_failover_state(FailoverState failover_state); +static const char *format_failover_state(FailoverState failover_state); void @@ -1979,6 +1979,85 @@ do_primary_failover(void) */ check_connection(&local_node_info, &local_conn); + /* + * if requested, disable WAL receiver and wait until WAL receivers on all + * sibling nodes are disconnected + */ + if (config_file_options.standby_disconnect_on_failover == true) + { + NodeInfoListCell *cell = NULL; + static NodeInfoList check_sibling_nodes = T_NODE_INFO_LIST_INITIALIZER; + int i; + + // XXX make configurable + int sibling_nodes_disconnect_timeout = 30; + bool sibling_node_wal_receiver_connected = false; + + disable_wal_receiver(local_conn); + + /* + * Loop through all reachable sibling nodes to determine whether + * they have disabled their WAL receivers. + * + * TODO: do_election() also calls get_active_sibling_node_records(), + * consolidate calls if feasible + * + */ + get_active_sibling_node_records(local_conn, + local_node_info.node_id, + local_node_info.upstream_node_id, + &check_sibling_nodes); + + for (i = 0; i < sibling_nodes_disconnect_timeout; i++) + { + for (cell = check_sibling_nodes.head; cell; cell = cell->next) + { + pid_t sibling_wal_receiver_pid; + + if (cell->node_info->conn == NULL) + cell->node_info->conn = establish_db_connection(cell->node_info->conninfo, false); + + sibling_wal_receiver_pid = (pid_t)get_wal_receiver_pid(cell->node_info->conn); + + if (sibling_wal_receiver_pid == UNKNOWN_PID) + { + log_warning(_("unable to query WAL receiver PID on node %i"), + cell->node_info->node_id); + } + else if (sibling_wal_receiver_pid > 0) + { + log_info(_("WAL receiver PID on node %i is %i"), + cell->node_info->node_id, + sibling_wal_receiver_pid); + sibling_node_wal_receiver_connected = true; + } + } + + if (sibling_node_wal_receiver_connected == false) + { + log_notice(_("WAL receiver disconnected on all sibling nodes")); + break; + } + + log_debug("sleeping %i of max %i seconds (\"sibling_nodes_disconnect_timeout\")", + i + 1, sibling_nodes_disconnect_timeout) + sleep(1); + } + + if (sibling_node_wal_receiver_connected == true) + { + // XXX what do we do here? abort or continue? make configurable? + log_warning(_("WAL receiver still connected on at least one sibling node")); + } + else + { + log_info(_("WAL receiver disconnected on all %i sibling nodes"), + check_sibling_nodes.node_count); + } + + clear_node_info_list(&check_sibling_nodes); + } + /* attempt to initiate voting process */ election_result = do_election(); @@ -1987,6 +2066,13 @@ do_primary_failover(void) log_debug("election result: %s", _print_election_result(election_result)); + /* Reenable WAL receiver, if disabled and node is not the promotion candidate */ + if (config_file_options.standby_disconnect_on_failover == true && election_result != ELECTION_WON) + { + // XXX check return value + enable_wal_receiver(local_conn); + } + if (election_result == ELECTION_CANCELLED) { log_notice(_("election cancelled")); @@ -3685,3 +3771,5 @@ handle_sighup(PGconn **conn, t_server_type server_type) got_SIGHUP = false; } + + diff --git a/sysutils.c b/sysutils.c index af5b1063..da17541d 100644 --- a/sysutils.c +++ b/sysutils.c @@ -17,6 +17,8 @@ * along with this program. If not, see . */ +#include + #include "repmgr.h" static bool _local_command(const char *command, PQExpBufferData *outputbuf, bool simple, int *return_value); @@ -176,3 +178,122 @@ remote_command(const char *host, const char *user, const char *command, const ch return true; } + + +pid_t +disable_wal_receiver(PGconn *conn) +{ + char buf[MAXLEN]; + int wal_retrieve_retry_interval; + pid_t wal_receiver_pid = UNKNOWN_PID; + + if (is_superuser_connection(conn, NULL) == false) + { + log_error(_("superuser connection required")); + return UNKNOWN_PID; + } + + get_pg_setting(conn, "wal_retrieve_retry_interval", buf); + + // XXX handle error + wal_retrieve_retry_interval = atoi(buf); + + + if (wal_retrieve_retry_interval < WALRECEIVER_DISABLE_TIMEOUT_VALUE) + { + alter_system_int(conn, "wal_retrieve_retry_interval", wal_retrieve_retry_interval + WALRECEIVER_DISABLE_TIMEOUT_VALUE); + pg_reload_conf(conn); + } + + wal_receiver_pid = (pid_t)get_wal_receiver_pid(conn); + + if (wal_receiver_pid == UNKNOWN_PID) + { + log_warning(_("unable to retrieve walreceiver PID")); + return UNKNOWN_PID; + } + + if (wal_receiver_pid == 0) + { + log_warning(_("walreceiver not running")); + } + else + { + int kill_ret; + int i, j; + int max_retries = 2; + + for (i = 0; i < max_retries; i++) + { + /* why 5? */ + sleep(5); + log_notice(_("killing walreceiver with PID %i"), (int)wal_receiver_pid); + + kill((int)wal_receiver_pid, SIGTERM); + + for (j = 0; j < 30; j++) + { + kill_ret = kill(wal_receiver_pid, 0); + + if (kill_ret != 0) + { + log_info("killed"); + break; + } + sleep(1); + } + + /* */ + sleep(1); + wal_receiver_pid = (pid_t)get_wal_receiver_pid(conn); + if (wal_receiver_pid == UNKNOWN_PID || wal_receiver_pid == 0) + break; + } + } + + + return wal_receiver_pid; +} + +pid_t +enable_wal_receiver(PGconn *conn) +{ + char buf[MAXLEN]; + int wal_retrieve_retry_interval; + pid_t wal_receiver_pid = UNKNOWN_PID; + + if (is_superuser_connection(conn, NULL) == false) + { + log_error(_("superuser connection required")); + return UNKNOWN_PID; + } + + get_pg_setting(conn, "wal_retrieve_retry_interval", buf); + + // XXX handle error + wal_retrieve_retry_interval = atoi(buf); + + if (wal_retrieve_retry_interval > WALRECEIVER_DISABLE_TIMEOUT_VALUE) + { + int new_wal_retrieve_retry_interval = wal_retrieve_retry_interval - WALRECEIVER_DISABLE_TIMEOUT_VALUE; + log_notice(_("setting \"wal_retrieve_retry_interval\" to %i ms"), + new_wal_retrieve_retry_interval); + + // XXX handle error + alter_system_int(conn, + "wal_retrieve_retry_interval", + new_wal_retrieve_retry_interval); + pg_reload_conf(conn); + } + else + { + // XXX add threshold sanity check + log_info(_("\"wal_retrieve_retry_interval\" is %i, not changing"), + wal_retrieve_retry_interval); + } + + + // XXX get wal receiver PID + + return wal_receiver_pid; +} diff --git a/sysutils.h b/sysutils.h index 92d114e6..4ddc3d36 100644 --- a/sysutils.h +++ b/sysutils.h @@ -25,4 +25,8 @@ extern bool local_command_simple(const char *command, PQExpBufferData *outputbuf extern bool remote_command(const char *host, const char *user, const char *command, const char *ssh_options, PQExpBufferData *outputbuf); +extern pid_t disable_wal_receiver(PGconn *conn); +extern pid_t enable_wal_receiver(PGconn *conn); + + #endif /* _SYSUTILS_H_ */