repmgrd: optionally disconnect WAL receivers during failover

This is intended to ensure that all nodes have a constant LSN while
making the failover decision.

This feature is experimental and needs to be explicitly enabled with the
configuration file option "standby_disconnect_on_failover".

Note enabling this option will result in a delay in the failover decision
until the WAL receiver is disconnected on all nodes.
This commit is contained in:
Ian Barwick
2019-03-01 18:20:44 +09:00
committed by Ian Barwick
parent dd04ebb809
commit 1615353f48
16 changed files with 404 additions and 17 deletions

View File

@@ -358,6 +358,7 @@ _parse_config(t_configuration_options *options, ItemList *error_list, ItemList *
options->primary_notification_timeout = DEFAULT_PRIMARY_NOTIFICATION_TIMEOUT;
options->repmgrd_standby_startup_timeout = -1; /* defaults to "standby_reconnect_timeout" if not set */
memset(options->repmgrd_pid_file, 0, sizeof(options->repmgrd_pid_file));
options->standby_disconnect_on_failover = false;
options->connection_check_type = CHECK_PING;
/*-------------
@@ -619,6 +620,8 @@ _parse_config(t_configuration_options *options, ItemList *error_list, ItemList *
options->repmgrd_standby_startup_timeout = repmgr_atoi(value, name, error_list, 0);
else if (strcmp(name, "repmgrd_pid_file") == 0)
strncpy(options->repmgrd_pid_file, value, MAXPGPATH);
else if (strcmp(name, "standby_disconnect_on_failover") == 0)
options->standby_disconnect_on_failover = parse_bool(value, name, error_list);
else if (strcmp(name, "connection_check_type") == 0)
{
if (strcasecmp(value, "ping") == 0)

View File

@@ -141,6 +141,7 @@ typedef struct
int primary_notification_timeout;
int repmgrd_standby_startup_timeout;
char repmgrd_pid_file[MAXPGPATH];
bool standby_disconnect_on_failover;
ConnectionCheckType connection_check_type;
/* BDR settings */
@@ -212,8 +213,8 @@ typedef struct
DEFAULT_RECONNECTION_INTERVAL, \
false, -1, \
DEFAULT_ASYNC_QUERY_TIMEOUT, \
DEFAULT_PRIMARY_NOTIFICATION_TIMEOUT, \
-1, "", CHECK_PING, \
DEFAULT_PRIMARY_NOTIFICATION_TIMEOUT, \
-1, "", false, CHECK_PING, \
/* BDR settings */ \
false, DEFAULT_BDR_RECOVERY_TIMEOUT, \
/* service settings */ \

View File

@@ -1079,7 +1079,7 @@ get_pg_setting(PGconn *conn, const char *setting, char *output)
}
else
{
/* XXX highly unlikely this would ever happen */
/* highly unlikely this would ever happen */
log_error(_("get_pg_setting(): unknown parameter \"%s\""), PQgetvalue(res, i, 0));
}
}
@@ -1096,6 +1096,56 @@ get_pg_setting(PGconn *conn, const char *setting, char *output)
}
bool
alter_system_int(PGconn *conn, const char *name, int value)
{
PQExpBufferData query;
PGresult *res = NULL;
bool success = false;
initPQExpBuffer(&query);
appendPQExpBuffer(&query,
"ALTER SYSTEM SET %s = %i",
name, value);
res = PQexec(conn, query.data);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
log_db_error(conn, query.data, _("alter_system_int() - unable to execute query"));
success = false;
}
termPQExpBuffer(&query);
PQclear(res);
return success;
}
bool
pg_reload_conf(PGconn *conn)
{
PGresult *res = NULL;
bool success = false;
res = PQexec(conn, "SELECT pg_catalog.pg_reload_conf()");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
log_db_error(conn, NULL, _("pg_reload_conf() - unable to execute query"));
success = false;
}
PQclear(res);
return success;
}
/* ============================ */
/* Server information functions */
/* ============================ */
@@ -1859,6 +1909,29 @@ repmgrd_pause(PGconn *conn, bool pause)
return success;
}
pid_t
get_wal_receiver_pid(PGconn *conn)
{
PGresult *res = NULL;
pid_t wal_receiver_pid = UNKNOWN_PID;
res = PQexec(conn, "SELECT repmgr.get_wal_receiver_pid()");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
log_error(_("unable to execute \"SELECT repmgr.get_wal_receiver_pid()\""));
log_detail("%s", PQerrorMessage(conn));
}
else if (!PQgetisnull(res, 0, 0))
{
wal_receiver_pid = atoi(PQgetvalue(res, 0, 0));
}
PQclear(res);
return wal_receiver_pid;
}
/* ================ */
/* result functions */
/* ================ */

View File

@@ -415,6 +415,8 @@ bool set_config_bool(PGconn *conn, const char *config_param, bool state);
int guc_set(PGconn *conn, const char *parameter, const char *op, const char *value);
int guc_set_typed(PGconn *conn, const char *parameter, const char *op, const char *value, const char *datatype);
bool get_pg_setting(PGconn *conn, const char *setting, char *output);
bool alter_system_int(PGconn *conn, const char *name, int value);
bool pg_reload_conf(PGconn *conn);
/* server information functions */
bool get_cluster_size(PGconn *conn, char *size);
@@ -436,6 +438,7 @@ pid_t repmgrd_get_pid(PGconn *conn);
bool repmgrd_is_running(PGconn *conn);
bool repmgrd_is_paused(PGconn *conn);
bool repmgrd_pause(PGconn *conn, bool pause);
pid_t get_wal_receiver_pid(PGconn *conn);
/* extension functions */
ExtensionStatus get_repmgr_extension_status(PGconn *conn, t_extension_versions *extversions);

View File

@@ -10,3 +10,8 @@ CREATE FUNCTION get_upstream_last_seen()
RETURNS INT
AS 'MODULE_PATHNAME', 'get_upstream_last_seen'
LANGUAGE C STRICT;
CREATE FUNCTION get_wal_receiver_pid()
RETURNS INT
AS 'MODULE_PATHNAME', 'get_wal_receiver_pid'
LANGUAGE C STRICT;

View File

@@ -128,6 +128,7 @@ CREATE FUNCTION get_upstream_last_seen()
AS 'MODULE_PATHNAME', 'get_upstream_last_seen'
LANGUAGE C STRICT;
/* failover functions */
CREATE FUNCTION notify_follow_primary(INT)
@@ -185,6 +186,15 @@ CREATE FUNCTION repmgrd_is_paused()
AS 'MODULE_PATHNAME', 'repmgrd_is_paused'
LANGUAGE C STRICT;
CREATE FUNCTION get_wal_receiver_pid()
RETURNS INT
AS 'MODULE_PATHNAME', 'get_wal_receiver_pid'
LANGUAGE C STRICT;
/* views */
CREATE VIEW repmgr.replication_status AS
SELECT m.primary_node_id, m.standby_node_id, n.node_name AS standby_name,

View File

@@ -2681,6 +2681,39 @@ do_node_rejoin(void)
}
/*
* Currently for testing purposes only, not documented;
* use at own risk!
*/
void
do_node_control(void)
{
PGconn *conn = NULL;
pid_t wal_receiver_pid = UNKNOWN_PID;
conn = establish_db_connection(config_file_options.conninfo, true);
if (runtime_options.disable_wal_receiver == true)
{
wal_receiver_pid = disable_wal_receiver(conn);
PQfinish(conn);
if (wal_receiver_pid == UNKNOWN_PID)
exit(ERR_BAD_CONFIG);
exit(SUCCESS);
}
if (runtime_options.enable_wal_receiver == true)
{
wal_receiver_pid = enable_wal_receiver(conn);
}
PQfinish(conn);
}
/*
* For "internal" use by `node rejoin` on the local node when
* called by "standby switchover" from the remote node.

View File

@@ -24,6 +24,7 @@ extern void do_node_check(void);
extern void do_node_rejoin(void);
extern void do_node_service(void);
extern void do_node_control(void);
extern void do_node_help(void);

View File

@@ -135,6 +135,8 @@ typedef struct
/* following options for internal use */
char config_archive_dir[MAXPGPATH];
OutputMode output_mode;
bool disable_wal_receiver;
bool enable_wal_receiver;
} t_runtime_options;
#define T_RUNTIME_OPTIONS_INITIALIZER { \
@@ -174,7 +176,7 @@ typedef struct
/* "cluster cleanup" options */ \
0, \
/* following options for internal use */ \
"/tmp", OM_TEXT \
"/tmp", OM_TEXT, false, false \
}

View File

@@ -31,6 +31,7 @@
* NODE CHECK
* NODE REJOIN
* NODE SERVICE
* NODE CONTROL
*
* DAEMON STATUS
* DAEMON PAUSE
@@ -624,7 +625,7 @@ main(int argc, char **argv)
break;
/*--------------
/*---------------
* output options
*---------------
*/
@@ -640,6 +641,19 @@ main(int argc, char **argv)
runtime_options.optformat = true;
break;
/*---------------------------------
* undocumented options for testing
*----------------------------------
*/
case OPT_DISABLE_WAL_RECEIVER:
runtime_options.disable_wal_receiver = true;
break;
case OPT_ENABLE_WAL_RECEIVER:
runtime_options.enable_wal_receiver = true;
break;
/*-----------------------------
* options deprecated since 3.3
*-----------------------------
@@ -912,6 +926,8 @@ main(int argc, char **argv)
action = NODE_REJOIN;
else if (strcasecmp(repmgr_action, "SERVICE") == 0)
action = NODE_SERVICE;
else if (strcasecmp(repmgr_action, "CONTROL") == 0)
action = NODE_CONTROL;
}
else if (strcasecmp(repmgr_command, "CLUSTER") == 0)
@@ -1335,6 +1351,9 @@ main(int argc, char **argv)
case NODE_SERVICE:
do_node_service();
break;
case NODE_CONTROL:
do_node_control();
break;
/* CLUSTER */
case CLUSTER_SHOW:

View File

@@ -40,16 +40,17 @@
#define NODE_CHECK 14
#define NODE_SERVICE 15
#define NODE_REJOIN 16
#define CLUSTER_SHOW 17
#define CLUSTER_CLEANUP 18
#define CLUSTER_MATRIX 19
#define CLUSTER_CROSSCHECK 20
#define CLUSTER_EVENT 21
#define DAEMON_STATUS 22
#define DAEMON_PAUSE 23
#define DAEMON_UNPAUSE 24
#define DAEMON_START 25
#define DAEMON_STOP 26
#define NODE_CONTROL 17
#define CLUSTER_SHOW 18
#define CLUSTER_CLEANUP 19
#define CLUSTER_MATRIX 20
#define CLUSTER_CROSSCHECK 21
#define CLUSTER_EVENT 22
#define DAEMON_STATUS 23
#define DAEMON_PAUSE 24
#define DAEMON_UNPAUSE 25
#define DAEMON_START 26
#define DAEMON_STOP 27
/* command line options without short versions */
#define OPT_HELP 1001
@@ -97,7 +98,8 @@
#define OPT_VERSION_NUMBER 1043
#define OPT_DATA_DIRECTORY_CONFIG 1044
#define OPT_COMPACT 1045
#define OPT_DISABLE_WAL_RECEIVER 1046
#define OPT_ENABLE_WAL_RECEIVER 1047
/* deprecated since 3.3 */
#define OPT_DATA_DIR 999
@@ -202,6 +204,10 @@ static struct option long_options[] =
/* "cluster cleanup" options */
{"keep-history", required_argument, NULL, 'k'},
/* undocumented options for testing */
{"disable-wal-receiver", no_argument, NULL, OPT_DISABLE_WAL_RECEIVER},
{"enable-wal-receiver", no_argument, NULL, OPT_ENABLE_WAL_RECEIVER},
/* deprecated */
{"check-upstream-config", no_argument, NULL, OPT_CHECK_UPSTREAM_CONFIG},
{"no-conninfo-password", no_argument, NULL, OPT_NO_CONNINFO_PASSWORD},

View File

@@ -147,6 +147,8 @@ PG_FUNCTION_INFO_V1(repmgrd_pause);
Datum repmgrd_is_paused(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(repmgrd_is_paused);
Datum get_wal_receiver_pid(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(get_wal_receiver_pid);
/*
@@ -740,3 +742,17 @@ repmgrd_is_paused(PG_FUNCTION_ARGS)
PG_RETURN_BOOL(is_paused);
}
Datum
get_wal_receiver_pid(PG_FUNCTION_ARGS)
{
int wal_receiver_pid;
if (!shared_state)
PG_RETURN_NULL();
wal_receiver_pid = WalRcv->pid;
PG_RETURN_INT32(wal_receiver_pid);
}

View File

@@ -92,6 +92,8 @@
#define DEFAULT_NODE_REJOIN_TIMEOUT 60 /* seconds */
#define DEFAULT_WAL_RECEIVE_CHECK_TIMEOUT 30 /* seconds */
#define WALRECEIVER_DISABLE_TIMEOUT_VALUE 86400000 /* milliseconds */
#ifndef RECOVERY_COMMAND_FILE
#define RECOVERY_COMMAND_FILE "recovery.conf"
#endif

View File

@@ -87,7 +87,7 @@ static void update_monitoring_history(void);
static void handle_sighup(PGconn **conn, t_server_type server_type);
static const char * format_failover_state(FailoverState failover_state);
static const char *format_failover_state(FailoverState failover_state);
void
@@ -1979,6 +1979,85 @@ do_primary_failover(void)
*/
check_connection(&local_node_info, &local_conn);
/*
* if requested, disable WAL receiver and wait until WAL receivers on all
* sibling nodes are disconnected
*/
if (config_file_options.standby_disconnect_on_failover == true)
{
NodeInfoListCell *cell = NULL;
static NodeInfoList check_sibling_nodes = T_NODE_INFO_LIST_INITIALIZER;
int i;
// XXX make configurable
int sibling_nodes_disconnect_timeout = 30;
bool sibling_node_wal_receiver_connected = false;
disable_wal_receiver(local_conn);
/*
* Loop through all reachable sibling nodes to determine whether
* they have disabled their WAL receivers.
*
* TODO: do_election() also calls get_active_sibling_node_records(),
* consolidate calls if feasible
*
*/
get_active_sibling_node_records(local_conn,
local_node_info.node_id,
local_node_info.upstream_node_id,
&check_sibling_nodes);
for (i = 0; i < sibling_nodes_disconnect_timeout; i++)
{
for (cell = check_sibling_nodes.head; cell; cell = cell->next)
{
pid_t sibling_wal_receiver_pid;
if (cell->node_info->conn == NULL)
cell->node_info->conn = establish_db_connection(cell->node_info->conninfo, false);
sibling_wal_receiver_pid = (pid_t)get_wal_receiver_pid(cell->node_info->conn);
if (sibling_wal_receiver_pid == UNKNOWN_PID)
{
log_warning(_("unable to query WAL receiver PID on node %i"),
cell->node_info->node_id);
}
else if (sibling_wal_receiver_pid > 0)
{
log_info(_("WAL receiver PID on node %i is %i"),
cell->node_info->node_id,
sibling_wal_receiver_pid);
sibling_node_wal_receiver_connected = true;
}
}
if (sibling_node_wal_receiver_connected == false)
{
log_notice(_("WAL receiver disconnected on all sibling nodes"));
break;
}
log_debug("sleeping %i of max %i seconds (\"sibling_nodes_disconnect_timeout\")",
i + 1, sibling_nodes_disconnect_timeout)
sleep(1);
}
if (sibling_node_wal_receiver_connected == true)
{
// XXX what do we do here? abort or continue? make configurable?
log_warning(_("WAL receiver still connected on at least one sibling node"));
}
else
{
log_info(_("WAL receiver disconnected on all %i sibling nodes"),
check_sibling_nodes.node_count);
}
clear_node_info_list(&check_sibling_nodes);
}
/* attempt to initiate voting process */
election_result = do_election();
@@ -1987,6 +2066,13 @@ do_primary_failover(void)
log_debug("election result: %s", _print_election_result(election_result));
/* Reenable WAL receiver, if disabled and node is not the promotion candidate */
if (config_file_options.standby_disconnect_on_failover == true && election_result != ELECTION_WON)
{
// XXX check return value
enable_wal_receiver(local_conn);
}
if (election_result == ELECTION_CANCELLED)
{
log_notice(_("election cancelled"));
@@ -3685,3 +3771,5 @@ handle_sighup(PGconn **conn, t_server_type server_type)
got_SIGHUP = false;
}

View File

@@ -17,6 +17,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <signal.h>
#include "repmgr.h"
static bool _local_command(const char *command, PQExpBufferData *outputbuf, bool simple, int *return_value);
@@ -176,3 +178,122 @@ remote_command(const char *host, const char *user, const char *command, const ch
return true;
}
pid_t
disable_wal_receiver(PGconn *conn)
{
char buf[MAXLEN];
int wal_retrieve_retry_interval;
pid_t wal_receiver_pid = UNKNOWN_PID;
if (is_superuser_connection(conn, NULL) == false)
{
log_error(_("superuser connection required"));
return UNKNOWN_PID;
}
get_pg_setting(conn, "wal_retrieve_retry_interval", buf);
// XXX handle error
wal_retrieve_retry_interval = atoi(buf);
if (wal_retrieve_retry_interval < WALRECEIVER_DISABLE_TIMEOUT_VALUE)
{
alter_system_int(conn, "wal_retrieve_retry_interval", wal_retrieve_retry_interval + WALRECEIVER_DISABLE_TIMEOUT_VALUE);
pg_reload_conf(conn);
}
wal_receiver_pid = (pid_t)get_wal_receiver_pid(conn);
if (wal_receiver_pid == UNKNOWN_PID)
{
log_warning(_("unable to retrieve walreceiver PID"));
return UNKNOWN_PID;
}
if (wal_receiver_pid == 0)
{
log_warning(_("walreceiver not running"));
}
else
{
int kill_ret;
int i, j;
int max_retries = 2;
for (i = 0; i < max_retries; i++)
{
/* why 5? */
sleep(5);
log_notice(_("killing walreceiver with PID %i"), (int)wal_receiver_pid);
kill((int)wal_receiver_pid, SIGTERM);
for (j = 0; j < 30; j++)
{
kill_ret = kill(wal_receiver_pid, 0);
if (kill_ret != 0)
{
log_info("killed");
break;
}
sleep(1);
}
/* */
sleep(1);
wal_receiver_pid = (pid_t)get_wal_receiver_pid(conn);
if (wal_receiver_pid == UNKNOWN_PID || wal_receiver_pid == 0)
break;
}
}
return wal_receiver_pid;
}
pid_t
enable_wal_receiver(PGconn *conn)
{
char buf[MAXLEN];
int wal_retrieve_retry_interval;
pid_t wal_receiver_pid = UNKNOWN_PID;
if (is_superuser_connection(conn, NULL) == false)
{
log_error(_("superuser connection required"));
return UNKNOWN_PID;
}
get_pg_setting(conn, "wal_retrieve_retry_interval", buf);
// XXX handle error
wal_retrieve_retry_interval = atoi(buf);
if (wal_retrieve_retry_interval > WALRECEIVER_DISABLE_TIMEOUT_VALUE)
{
int new_wal_retrieve_retry_interval = wal_retrieve_retry_interval - WALRECEIVER_DISABLE_TIMEOUT_VALUE;
log_notice(_("setting \"wal_retrieve_retry_interval\" to %i ms"),
new_wal_retrieve_retry_interval);
// XXX handle error
alter_system_int(conn,
"wal_retrieve_retry_interval",
new_wal_retrieve_retry_interval);
pg_reload_conf(conn);
}
else
{
// XXX add threshold sanity check
log_info(_("\"wal_retrieve_retry_interval\" is %i, not changing"),
wal_retrieve_retry_interval);
}
// XXX get wal receiver PID
return wal_receiver_pid;
}

View File

@@ -25,4 +25,8 @@ extern bool local_command_simple(const char *command, PQExpBufferData *outputbuf
extern bool remote_command(const char *host, const char *user, const char *command, const char *ssh_options, PQExpBufferData *outputbuf);
extern pid_t disable_wal_receiver(PGconn *conn);
extern pid_t enable_wal_receiver(PGconn *conn);
#endif /* _SYSUTILS_H_ */