diff --git a/configfile.c b/configfile.c
index 677e1920..fdbfe5fc 100644
--- a/configfile.c
+++ b/configfile.c
@@ -358,6 +358,7 @@ _parse_config(t_configuration_options *options, ItemList *error_list, ItemList *
options->primary_notification_timeout = DEFAULT_PRIMARY_NOTIFICATION_TIMEOUT;
options->repmgrd_standby_startup_timeout = -1; /* defaults to "standby_reconnect_timeout" if not set */
memset(options->repmgrd_pid_file, 0, sizeof(options->repmgrd_pid_file));
+ options->standby_disconnect_on_failover = false;
options->connection_check_type = CHECK_PING;
/*-------------
@@ -619,6 +620,8 @@ _parse_config(t_configuration_options *options, ItemList *error_list, ItemList *
options->repmgrd_standby_startup_timeout = repmgr_atoi(value, name, error_list, 0);
else if (strcmp(name, "repmgrd_pid_file") == 0)
strncpy(options->repmgrd_pid_file, value, MAXPGPATH);
+ else if (strcmp(name, "standby_disconnect_on_failover") == 0)
+ options->standby_disconnect_on_failover = parse_bool(value, name, error_list);
else if (strcmp(name, "connection_check_type") == 0)
{
if (strcasecmp(value, "ping") == 0)
diff --git a/configfile.h b/configfile.h
index 34e220f7..319a5959 100644
--- a/configfile.h
+++ b/configfile.h
@@ -141,6 +141,7 @@ typedef struct
int primary_notification_timeout;
int repmgrd_standby_startup_timeout;
char repmgrd_pid_file[MAXPGPATH];
+ bool standby_disconnect_on_failover;
ConnectionCheckType connection_check_type;
/* BDR settings */
@@ -212,8 +213,8 @@ typedef struct
DEFAULT_RECONNECTION_INTERVAL, \
false, -1, \
DEFAULT_ASYNC_QUERY_TIMEOUT, \
- DEFAULT_PRIMARY_NOTIFICATION_TIMEOUT, \
- -1, "", CHECK_PING, \
+ DEFAULT_PRIMARY_NOTIFICATION_TIMEOUT, \
+ -1, "", false, CHECK_PING, \
/* BDR settings */ \
false, DEFAULT_BDR_RECOVERY_TIMEOUT, \
/* service settings */ \
diff --git a/dbutils.c b/dbutils.c
index 3b9f267a..7074625f 100644
--- a/dbutils.c
+++ b/dbutils.c
@@ -1079,7 +1079,7 @@ get_pg_setting(PGconn *conn, const char *setting, char *output)
}
else
{
- /* XXX highly unlikely this would ever happen */
+ /* highly unlikely this would ever happen */
log_error(_("get_pg_setting(): unknown parameter \"%s\""), PQgetvalue(res, i, 0));
}
}
@@ -1096,6 +1096,56 @@ get_pg_setting(PGconn *conn, const char *setting, char *output)
}
+bool
+alter_system_int(PGconn *conn, const char *name, int value)
+{
+ PQExpBufferData query;
+ PGresult *res = NULL;
+ bool success = false;
+
+ initPQExpBuffer(&query);
+ appendPQExpBuffer(&query,
+ "ALTER SYSTEM SET %s = %i",
+ name, value);
+
+ res = PQexec(conn, query.data);
+
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ log_db_error(conn, query.data, _("alter_system_int() - unable to execute query"));
+
+ success = false;
+ }
+
+
+ termPQExpBuffer(&query);
+ PQclear(res);
+
+ return success;
+}
+
+
+bool
+pg_reload_conf(PGconn *conn)
+{
+ PGresult *res = NULL;
+ bool success = false;
+
+ res = PQexec(conn, "SELECT pg_catalog.pg_reload_conf()");
+
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ log_db_error(conn, NULL, _("pg_reload_conf() - unable to execute query"));
+
+ success = false;
+ }
+
+ PQclear(res);
+
+ return success;
+}
+
+
/* ============================ */
/* Server information functions */
/* ============================ */
@@ -1859,6 +1909,29 @@ repmgrd_pause(PGconn *conn, bool pause)
return success;
}
+pid_t
+get_wal_receiver_pid(PGconn *conn)
+{
+ PGresult *res = NULL;
+ pid_t wal_receiver_pid = UNKNOWN_PID;
+
+ res = PQexec(conn, "SELECT repmgr.get_wal_receiver_pid()");
+
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ log_error(_("unable to execute \"SELECT repmgr.get_wal_receiver_pid()\""));
+ log_detail("%s", PQerrorMessage(conn));
+ }
+ else if (!PQgetisnull(res, 0, 0))
+ {
+ wal_receiver_pid = atoi(PQgetvalue(res, 0, 0));
+ }
+
+ PQclear(res);
+
+ return wal_receiver_pid;
+}
+
/* ================ */
/* result functions */
/* ================ */
diff --git a/dbutils.h b/dbutils.h
index 268b3fb8..f4d9ea0f 100644
--- a/dbutils.h
+++ b/dbutils.h
@@ -415,6 +415,8 @@ bool set_config_bool(PGconn *conn, const char *config_param, bool state);
int guc_set(PGconn *conn, const char *parameter, const char *op, const char *value);
int guc_set_typed(PGconn *conn, const char *parameter, const char *op, const char *value, const char *datatype);
bool get_pg_setting(PGconn *conn, const char *setting, char *output);
+bool alter_system_int(PGconn *conn, const char *name, int value);
+bool pg_reload_conf(PGconn *conn);
/* server information functions */
bool get_cluster_size(PGconn *conn, char *size);
@@ -436,6 +438,7 @@ pid_t repmgrd_get_pid(PGconn *conn);
bool repmgrd_is_running(PGconn *conn);
bool repmgrd_is_paused(PGconn *conn);
bool repmgrd_pause(PGconn *conn, bool pause);
+pid_t get_wal_receiver_pid(PGconn *conn);
/* extension functions */
ExtensionStatus get_repmgr_extension_status(PGconn *conn, t_extension_versions *extversions);
diff --git a/repmgr--4.2--4.3.sql b/repmgr--4.2--4.3.sql
index 962957c0..96a09196 100644
--- a/repmgr--4.2--4.3.sql
+++ b/repmgr--4.2--4.3.sql
@@ -10,3 +10,8 @@ CREATE FUNCTION get_upstream_last_seen()
RETURNS INT
AS 'MODULE_PATHNAME', 'get_upstream_last_seen'
LANGUAGE C STRICT;
+
+CREATE FUNCTION get_wal_receiver_pid()
+ RETURNS INT
+ AS 'MODULE_PATHNAME', 'get_wal_receiver_pid'
+ LANGUAGE C STRICT;
diff --git a/repmgr--4.3.sql b/repmgr--4.3.sql
index ce6ccfc3..8496b607 100644
--- a/repmgr--4.3.sql
+++ b/repmgr--4.3.sql
@@ -128,6 +128,7 @@ CREATE FUNCTION get_upstream_last_seen()
AS 'MODULE_PATHNAME', 'get_upstream_last_seen'
LANGUAGE C STRICT;
+
/* failover functions */
CREATE FUNCTION notify_follow_primary(INT)
@@ -185,6 +186,15 @@ CREATE FUNCTION repmgrd_is_paused()
AS 'MODULE_PATHNAME', 'repmgrd_is_paused'
LANGUAGE C STRICT;
+CREATE FUNCTION get_wal_receiver_pid()
+ RETURNS INT
+ AS 'MODULE_PATHNAME', 'get_wal_receiver_pid'
+ LANGUAGE C STRICT;
+
+
+
+
+/* views */
CREATE VIEW repmgr.replication_status AS
SELECT m.primary_node_id, m.standby_node_id, n.node_name AS standby_name,
diff --git a/repmgr-action-node.c b/repmgr-action-node.c
index 63b98ee2..7f26d929 100644
--- a/repmgr-action-node.c
+++ b/repmgr-action-node.c
@@ -2681,6 +2681,39 @@ do_node_rejoin(void)
}
+/*
+ * Currently for testing purposes only, not documented;
+ * use at own risk!
+ */
+
+void
+do_node_control(void)
+{
+ PGconn *conn = NULL;
+ pid_t wal_receiver_pid = UNKNOWN_PID;
+ conn = establish_db_connection(config_file_options.conninfo, true);
+
+ if (runtime_options.disable_wal_receiver == true)
+ {
+ wal_receiver_pid = disable_wal_receiver(conn);
+
+ PQfinish(conn);
+
+ if (wal_receiver_pid == UNKNOWN_PID)
+ exit(ERR_BAD_CONFIG);
+
+ exit(SUCCESS);
+ }
+
+ if (runtime_options.enable_wal_receiver == true)
+ {
+ wal_receiver_pid = enable_wal_receiver(conn);
+ }
+
+ PQfinish(conn);
+}
+
+
/*
* For "internal" use by `node rejoin` on the local node when
* called by "standby switchover" from the remote node.
diff --git a/repmgr-action-node.h b/repmgr-action-node.h
index d6bec2a1..d1553235 100644
--- a/repmgr-action-node.h
+++ b/repmgr-action-node.h
@@ -24,6 +24,7 @@ extern void do_node_check(void);
extern void do_node_rejoin(void);
extern void do_node_service(void);
+extern void do_node_control(void);
extern void do_node_help(void);
diff --git a/repmgr-client-global.h b/repmgr-client-global.h
index e7c70b37..46dfb66e 100644
--- a/repmgr-client-global.h
+++ b/repmgr-client-global.h
@@ -135,6 +135,8 @@ typedef struct
/* following options for internal use */
char config_archive_dir[MAXPGPATH];
OutputMode output_mode;
+ bool disable_wal_receiver;
+ bool enable_wal_receiver;
} t_runtime_options;
#define T_RUNTIME_OPTIONS_INITIALIZER { \
@@ -174,7 +176,7 @@ typedef struct
/* "cluster cleanup" options */ \
0, \
/* following options for internal use */ \
- "/tmp", OM_TEXT \
+ "/tmp", OM_TEXT, false, false \
}
diff --git a/repmgr-client.c b/repmgr-client.c
index 0df6f420..33091274 100644
--- a/repmgr-client.c
+++ b/repmgr-client.c
@@ -31,6 +31,7 @@
* NODE CHECK
* NODE REJOIN
* NODE SERVICE
+ * NODE CONTROL
*
* DAEMON STATUS
* DAEMON PAUSE
@@ -624,7 +625,7 @@ main(int argc, char **argv)
break;
- /*--------------
+ /*---------------
* output options
*---------------
*/
@@ -640,6 +641,19 @@ main(int argc, char **argv)
runtime_options.optformat = true;
break;
+ /*---------------------------------
+ * undocumented options for testing
+ *----------------------------------
+ */
+
+ case OPT_DISABLE_WAL_RECEIVER:
+ runtime_options.disable_wal_receiver = true;
+ break;
+
+ case OPT_ENABLE_WAL_RECEIVER:
+ runtime_options.enable_wal_receiver = true;
+ break;
+
/*-----------------------------
* options deprecated since 3.3
*-----------------------------
@@ -912,6 +926,8 @@ main(int argc, char **argv)
action = NODE_REJOIN;
else if (strcasecmp(repmgr_action, "SERVICE") == 0)
action = NODE_SERVICE;
+ else if (strcasecmp(repmgr_action, "CONTROL") == 0)
+ action = NODE_CONTROL;
}
else if (strcasecmp(repmgr_command, "CLUSTER") == 0)
@@ -1335,6 +1351,9 @@ main(int argc, char **argv)
case NODE_SERVICE:
do_node_service();
break;
+ case NODE_CONTROL:
+ do_node_control();
+ break;
/* CLUSTER */
case CLUSTER_SHOW:
diff --git a/repmgr-client.h b/repmgr-client.h
index 8a8bbf15..ed4bbf68 100644
--- a/repmgr-client.h
+++ b/repmgr-client.h
@@ -40,16 +40,17 @@
#define NODE_CHECK 14
#define NODE_SERVICE 15
#define NODE_REJOIN 16
-#define CLUSTER_SHOW 17
-#define CLUSTER_CLEANUP 18
-#define CLUSTER_MATRIX 19
-#define CLUSTER_CROSSCHECK 20
-#define CLUSTER_EVENT 21
-#define DAEMON_STATUS 22
-#define DAEMON_PAUSE 23
-#define DAEMON_UNPAUSE 24
-#define DAEMON_START 25
-#define DAEMON_STOP 26
+#define NODE_CONTROL 17
+#define CLUSTER_SHOW 18
+#define CLUSTER_CLEANUP 19
+#define CLUSTER_MATRIX 20
+#define CLUSTER_CROSSCHECK 21
+#define CLUSTER_EVENT 22
+#define DAEMON_STATUS 23
+#define DAEMON_PAUSE 24
+#define DAEMON_UNPAUSE 25
+#define DAEMON_START 26
+#define DAEMON_STOP 27
/* command line options without short versions */
#define OPT_HELP 1001
@@ -97,7 +98,8 @@
#define OPT_VERSION_NUMBER 1043
#define OPT_DATA_DIRECTORY_CONFIG 1044
#define OPT_COMPACT 1045
-
+#define OPT_DISABLE_WAL_RECEIVER 1046
+#define OPT_ENABLE_WAL_RECEIVER 1047
/* deprecated since 3.3 */
#define OPT_DATA_DIR 999
@@ -202,6 +204,10 @@ static struct option long_options[] =
/* "cluster cleanup" options */
{"keep-history", required_argument, NULL, 'k'},
+/* undocumented options for testing */
+ {"disable-wal-receiver", no_argument, NULL, OPT_DISABLE_WAL_RECEIVER},
+ {"enable-wal-receiver", no_argument, NULL, OPT_ENABLE_WAL_RECEIVER},
+
/* deprecated */
{"check-upstream-config", no_argument, NULL, OPT_CHECK_UPSTREAM_CONFIG},
{"no-conninfo-password", no_argument, NULL, OPT_NO_CONNINFO_PASSWORD},
diff --git a/repmgr.c b/repmgr.c
index 0a73b5fe..a6f49607 100644
--- a/repmgr.c
+++ b/repmgr.c
@@ -147,6 +147,8 @@ PG_FUNCTION_INFO_V1(repmgrd_pause);
Datum repmgrd_is_paused(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(repmgrd_is_paused);
+Datum get_wal_receiver_pid(PG_FUNCTION_ARGS);
+PG_FUNCTION_INFO_V1(get_wal_receiver_pid);
/*
@@ -740,3 +742,17 @@ repmgrd_is_paused(PG_FUNCTION_ARGS)
PG_RETURN_BOOL(is_paused);
}
+
+
+Datum
+get_wal_receiver_pid(PG_FUNCTION_ARGS)
+{
+ int wal_receiver_pid;
+
+ if (!shared_state)
+ PG_RETURN_NULL();
+
+ wal_receiver_pid = WalRcv->pid;
+
+ PG_RETURN_INT32(wal_receiver_pid);
+}
diff --git a/repmgr.h b/repmgr.h
index 25586020..ceac63aa 100644
--- a/repmgr.h
+++ b/repmgr.h
@@ -92,6 +92,8 @@
#define DEFAULT_NODE_REJOIN_TIMEOUT 60 /* seconds */
#define DEFAULT_WAL_RECEIVE_CHECK_TIMEOUT 30 /* seconds */
+#define WALRECEIVER_DISABLE_TIMEOUT_VALUE 86400000 /* milliseconds */
+
#ifndef RECOVERY_COMMAND_FILE
#define RECOVERY_COMMAND_FILE "recovery.conf"
#endif
diff --git a/repmgrd-physical.c b/repmgrd-physical.c
index 2ffdaa92..6b9c4e29 100644
--- a/repmgrd-physical.c
+++ b/repmgrd-physical.c
@@ -87,7 +87,7 @@ static void update_monitoring_history(void);
static void handle_sighup(PGconn **conn, t_server_type server_type);
-static const char * format_failover_state(FailoverState failover_state);
+static const char *format_failover_state(FailoverState failover_state);
void
@@ -1979,6 +1979,85 @@ do_primary_failover(void)
*/
check_connection(&local_node_info, &local_conn);
+ /*
+ * if requested, disable WAL receiver and wait until WAL receivers on all
+ * sibling nodes are disconnected
+ */
+ if (config_file_options.standby_disconnect_on_failover == true)
+ {
+ NodeInfoListCell *cell = NULL;
+ static NodeInfoList check_sibling_nodes = T_NODE_INFO_LIST_INITIALIZER;
+ int i;
+
+ // XXX make configurable
+ int sibling_nodes_disconnect_timeout = 30;
+ bool sibling_node_wal_receiver_connected = false;
+
+ disable_wal_receiver(local_conn);
+
+ /*
+ * Loop through all reachable sibling nodes to determine whether
+ * they have disabled their WAL receivers.
+ *
+ * TODO: do_election() also calls get_active_sibling_node_records(),
+ * consolidate calls if feasible
+ *
+ */
+ get_active_sibling_node_records(local_conn,
+ local_node_info.node_id,
+ local_node_info.upstream_node_id,
+ &check_sibling_nodes);
+
+ for (i = 0; i < sibling_nodes_disconnect_timeout; i++)
+ {
+ for (cell = check_sibling_nodes.head; cell; cell = cell->next)
+ {
+ pid_t sibling_wal_receiver_pid;
+
+ if (cell->node_info->conn == NULL)
+ cell->node_info->conn = establish_db_connection(cell->node_info->conninfo, false);
+
+ sibling_wal_receiver_pid = (pid_t)get_wal_receiver_pid(cell->node_info->conn);
+
+ if (sibling_wal_receiver_pid == UNKNOWN_PID)
+ {
+ log_warning(_("unable to query WAL receiver PID on node %i"),
+ cell->node_info->node_id);
+ }
+ else if (sibling_wal_receiver_pid > 0)
+ {
+ log_info(_("WAL receiver PID on node %i is %i"),
+ cell->node_info->node_id,
+ sibling_wal_receiver_pid);
+ sibling_node_wal_receiver_connected = true;
+ }
+ }
+
+ if (sibling_node_wal_receiver_connected == false)
+ {
+ log_notice(_("WAL receiver disconnected on all sibling nodes"));
+ break;
+ }
+
+ log_debug("sleeping %i of max %i seconds (\"sibling_nodes_disconnect_timeout\")",
+ i + 1, sibling_nodes_disconnect_timeout)
+ sleep(1);
+ }
+
+ if (sibling_node_wal_receiver_connected == true)
+ {
+ // XXX what do we do here? abort or continue? make configurable?
+ log_warning(_("WAL receiver still connected on at least one sibling node"));
+ }
+ else
+ {
+ log_info(_("WAL receiver disconnected on all %i sibling nodes"),
+ check_sibling_nodes.node_count);
+ }
+
+ clear_node_info_list(&check_sibling_nodes);
+ }
+
/* attempt to initiate voting process */
election_result = do_election();
@@ -1987,6 +2066,13 @@ do_primary_failover(void)
log_debug("election result: %s", _print_election_result(election_result));
+ /* Reenable WAL receiver, if disabled and node is not the promotion candidate */
+ if (config_file_options.standby_disconnect_on_failover == true && election_result != ELECTION_WON)
+ {
+ // XXX check return value
+ enable_wal_receiver(local_conn);
+ }
+
if (election_result == ELECTION_CANCELLED)
{
log_notice(_("election cancelled"));
@@ -3685,3 +3771,5 @@ handle_sighup(PGconn **conn, t_server_type server_type)
got_SIGHUP = false;
}
+
+
diff --git a/sysutils.c b/sysutils.c
index af5b1063..da17541d 100644
--- a/sysutils.c
+++ b/sysutils.c
@@ -17,6 +17,8 @@
* along with this program. If not, see .
*/
+#include
+
#include "repmgr.h"
static bool _local_command(const char *command, PQExpBufferData *outputbuf, bool simple, int *return_value);
@@ -176,3 +178,122 @@ remote_command(const char *host, const char *user, const char *command, const ch
return true;
}
+
+
+pid_t
+disable_wal_receiver(PGconn *conn)
+{
+ char buf[MAXLEN];
+ int wal_retrieve_retry_interval;
+ pid_t wal_receiver_pid = UNKNOWN_PID;
+
+ if (is_superuser_connection(conn, NULL) == false)
+ {
+ log_error(_("superuser connection required"));
+ return UNKNOWN_PID;
+ }
+
+ get_pg_setting(conn, "wal_retrieve_retry_interval", buf);
+
+ // XXX handle error
+ wal_retrieve_retry_interval = atoi(buf);
+
+
+ if (wal_retrieve_retry_interval < WALRECEIVER_DISABLE_TIMEOUT_VALUE)
+ {
+ alter_system_int(conn, "wal_retrieve_retry_interval", wal_retrieve_retry_interval + WALRECEIVER_DISABLE_TIMEOUT_VALUE);
+ pg_reload_conf(conn);
+ }
+
+ wal_receiver_pid = (pid_t)get_wal_receiver_pid(conn);
+
+ if (wal_receiver_pid == UNKNOWN_PID)
+ {
+ log_warning(_("unable to retrieve walreceiver PID"));
+ return UNKNOWN_PID;
+ }
+
+ if (wal_receiver_pid == 0)
+ {
+ log_warning(_("walreceiver not running"));
+ }
+ else
+ {
+ int kill_ret;
+ int i, j;
+ int max_retries = 2;
+
+ for (i = 0; i < max_retries; i++)
+ {
+ /* why 5? */
+ sleep(5);
+ log_notice(_("killing walreceiver with PID %i"), (int)wal_receiver_pid);
+
+ kill((int)wal_receiver_pid, SIGTERM);
+
+ for (j = 0; j < 30; j++)
+ {
+ kill_ret = kill(wal_receiver_pid, 0);
+
+ if (kill_ret != 0)
+ {
+ log_info("killed");
+ break;
+ }
+ sleep(1);
+ }
+
+ /* */
+ sleep(1);
+ wal_receiver_pid = (pid_t)get_wal_receiver_pid(conn);
+ if (wal_receiver_pid == UNKNOWN_PID || wal_receiver_pid == 0)
+ break;
+ }
+ }
+
+
+ return wal_receiver_pid;
+}
+
+pid_t
+enable_wal_receiver(PGconn *conn)
+{
+ char buf[MAXLEN];
+ int wal_retrieve_retry_interval;
+ pid_t wal_receiver_pid = UNKNOWN_PID;
+
+ if (is_superuser_connection(conn, NULL) == false)
+ {
+ log_error(_("superuser connection required"));
+ return UNKNOWN_PID;
+ }
+
+ get_pg_setting(conn, "wal_retrieve_retry_interval", buf);
+
+ // XXX handle error
+ wal_retrieve_retry_interval = atoi(buf);
+
+ if (wal_retrieve_retry_interval > WALRECEIVER_DISABLE_TIMEOUT_VALUE)
+ {
+ int new_wal_retrieve_retry_interval = wal_retrieve_retry_interval - WALRECEIVER_DISABLE_TIMEOUT_VALUE;
+ log_notice(_("setting \"wal_retrieve_retry_interval\" to %i ms"),
+ new_wal_retrieve_retry_interval);
+
+ // XXX handle error
+ alter_system_int(conn,
+ "wal_retrieve_retry_interval",
+ new_wal_retrieve_retry_interval);
+ pg_reload_conf(conn);
+ }
+ else
+ {
+ // XXX add threshold sanity check
+ log_info(_("\"wal_retrieve_retry_interval\" is %i, not changing"),
+ wal_retrieve_retry_interval);
+ }
+
+
+ // XXX get wal receiver PID
+
+ return wal_receiver_pid;
+}
diff --git a/sysutils.h b/sysutils.h
index 92d114e6..4ddc3d36 100644
--- a/sysutils.h
+++ b/sysutils.h
@@ -25,4 +25,8 @@ extern bool local_command_simple(const char *command, PQExpBufferData *outputbuf
extern bool remote_command(const char *host, const char *user, const char *command, const char *ssh_options, PQExpBufferData *outputbuf);
+extern pid_t disable_wal_receiver(PGconn *conn);
+extern pid_t enable_wal_receiver(PGconn *conn);
+
+
#endif /* _SYSUTILS_H_ */