Add functionality to "pause" repmgrd

In some circumstances, e.g. while performing a switchover, it is essential
that repmgrd does not take any kind of failover action, as this will put
the cluster into an incorrect state.

Previously it was necessary to stop repmgrd on all nodes (or at least
those nodes which repmgrd would consider as promotion candidates), however
this is a cumbersome and potentially risk-prone operation, particularly if the
replication cluster contains more than a couple of servers.

To prevent this issue from occurring, this patch introduces the ability
to "pause" repmgrd on all nodes wth a single command ("repmgr daemon pause")
which notifies repmgrd not to take any failover action until the node
is "unpaused" ("repmgr daemon unpause").

"repmgr daemon status" provides an overview of each node and whether repmgrd
is running, and if so whether it is paused.

"repmgr standby switchover" has been modified to automatically pause repmgrd
while carrying out the switchover.

See documentation for further details.
This commit is contained in:
Ian Barwick
2018-09-27 16:42:10 +09:00
parent fce3c02760
commit 2491b8ae52
27 changed files with 1943 additions and 121 deletions

260
repmgr.c
View File

@@ -26,6 +26,7 @@
#include "access/xlog.h"
#include "miscadmin.h"
#include "replication/walreceiver.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
#include "storage/procarray.h"
@@ -43,14 +44,21 @@
#include "lib/stringinfo.h"
#include "access/xact.h"
#include "utils/snapmgr.h"
#include "pgstat.h"
#if (PG_VERSION_NUM >= 90400)
#include "pgstat.h"
#else
#define PGSTAT_STAT_PERMANENT_DIRECTORY "pg_stat"
#endif
#include "voting.h"
#define UNKNOWN_NODE_ID -1
#define UNKNOWN_PID -1
#define TRANCHE_NAME "repmgrd"
#define REPMGRD_STATE_FILE PGSTAT_STAT_PERMANENT_DIRECTORY "/repmgrd_state.txt"
PG_MODULE_MAGIC;
@@ -66,6 +74,9 @@ typedef struct repmgrdSharedState
LWLockId lock; /* protects search/modification */
TimestampTz last_updated;
int local_node_id;
int repmgrd_pid;
char repmgrd_pidfile[MAXPGPATH];
bool repmgrd_paused;
/* streaming failover */
NodeVotingStatus voting_status;
int current_electoral_term;
@@ -112,6 +123,25 @@ PG_FUNCTION_INFO_V1(am_bdr_failover_handler);
Datum unset_bdr_failover_handler(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(unset_bdr_failover_handler);
Datum set_repmgrd_pid(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(set_repmgrd_pid);
Datum get_repmgrd_pid(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(get_repmgrd_pid);
Datum get_repmgrd_pidfile(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(get_repmgrd_pidfile);
Datum repmgrd_is_running(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(repmgrd_is_running);
Datum repmgrd_pause(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(repmgrd_pause);
Datum repmgrd_is_paused(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(repmgrd_is_paused);
/*
* Module load callback
@@ -185,6 +215,9 @@ repmgr_shmem_startup(void)
#endif
shared_state->local_node_id = UNKNOWN_NODE_ID;
shared_state->repmgrd_pid = UNKNOWN_PID;
memset(shared_state->repmgrd_pidfile, 0, MAXPGPATH);
shared_state->repmgrd_paused = false;
shared_state->current_electoral_term = 0;
shared_state->voting_status = VS_NO_VOTE;
shared_state->candidate_node_id = UNKNOWN_NODE_ID;
@@ -204,6 +237,8 @@ Datum
set_local_node_id(PG_FUNCTION_ARGS)
{
int local_node_id = UNKNOWN_NODE_ID;
int stored_node_id = UNKNOWN_NODE_ID;
int paused = -1;
if (!shared_state)
PG_RETURN_NULL();
@@ -213,6 +248,34 @@ set_local_node_id(PG_FUNCTION_ARGS)
local_node_id = PG_GETARG_INT32(0);
/* read state file and if exists/valid, update "repmgrd_paused" */
{
FILE *file = NULL;
file = AllocateFile(REPMGRD_STATE_FILE, PG_BINARY_R);
if (file != NULL)
{
int buffer_size = 128;
char buffer[buffer_size];
if (fgets(buffer, buffer_size, file) != NULL)
{
if (sscanf(buffer, "%i:%i", &stored_node_id, &paused) != 2)
{
elog(WARNING, "unable to parse repmgrd state file");
}
else
{
elog(DEBUG1, "node_id: %i; paused: %i", stored_node_id, paused);
}
}
FreeFile(file);
}
}
LWLockAcquire(shared_state->lock, LW_EXCLUSIVE);
/* only set local_node_id once, as it should never change */
@@ -221,6 +284,19 @@ set_local_node_id(PG_FUNCTION_ARGS)
shared_state->local_node_id = local_node_id;
}
/* only update if state file valid */
if (stored_node_id == shared_state->local_node_id)
{
if (paused == 0)
{
shared_state->repmgrd_paused = false;
}
else if (paused == 1)
{
shared_state->repmgrd_paused = true;
}
}
LWLockRelease(shared_state->lock);
PG_RETURN_VOID();
@@ -422,3 +498,185 @@ unset_bdr_failover_handler(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}
/*
* Returns the repmgrd pid; or NULL if none set; or -1 if set but repmgrd
* process not running (TODO!)
*/
Datum
get_repmgrd_pid(PG_FUNCTION_ARGS)
{
int repmgrd_pid = UNKNOWN_PID;
if (!shared_state)
PG_RETURN_NULL();
LWLockAcquire(shared_state->lock, LW_SHARED);
repmgrd_pid = shared_state->repmgrd_pid;
LWLockRelease(shared_state->lock);
PG_RETURN_INT32(repmgrd_pid);
}
/*
* Returns the repmgrd pidfile
*/
Datum
get_repmgrd_pidfile(PG_FUNCTION_ARGS)
{
char repmgrd_pidfile[MAXPGPATH];
if (!shared_state)
PG_RETURN_NULL();
memset(repmgrd_pidfile, 0, MAXPGPATH);
LWLockAcquire(shared_state->lock, LW_SHARED);
strncpy(repmgrd_pidfile, shared_state->repmgrd_pidfile, MAXPGPATH);
LWLockRelease(shared_state->lock);
if (repmgrd_pidfile[0] == '\0')
PG_RETURN_NULL();
PG_RETURN_TEXT_P(cstring_to_text(repmgrd_pidfile));
}
Datum
set_repmgrd_pid(PG_FUNCTION_ARGS)
{
int repmgrd_pid = UNKNOWN_PID;
char *repmgrd_pidfile = NULL;
if (!shared_state)
PG_RETURN_VOID();
if (PG_ARGISNULL(0))
{
repmgrd_pid = UNKNOWN_PID;
}
else
{
repmgrd_pid = PG_GETARG_INT32(0);
}
elog(DEBUG3, "set_repmgrd_pid(): provided pid is %i", repmgrd_pid);
if (repmgrd_pid != UNKNOWN_PID && !PG_ARGISNULL(1))
{
repmgrd_pidfile = text_to_cstring(PG_GETARG_TEXT_PP(1));
elog(INFO, "set_repmgrd_pid(): provided pidfile is %s", repmgrd_pidfile);
}
LWLockAcquire(shared_state->lock, LW_EXCLUSIVE);
shared_state->repmgrd_pid = repmgrd_pid;
memset(shared_state->repmgrd_pidfile, 0, MAXPGPATH);
if(repmgrd_pidfile != NULL)
{
strncpy(shared_state->repmgrd_pidfile, repmgrd_pidfile, MAXPGPATH);
}
LWLockRelease(shared_state->lock);
PG_RETURN_VOID();
}
Datum
repmgrd_is_running(PG_FUNCTION_ARGS)
{
int repmgrd_pid = UNKNOWN_PID;
int kill_ret;
if (!shared_state)
PG_RETURN_NULL();
LWLockAcquire(shared_state->lock, LW_SHARED);
repmgrd_pid = shared_state->repmgrd_pid;
LWLockRelease(shared_state->lock);
/* No PID registered - assume not running */
if (repmgrd_pid == UNKNOWN_PID)
{
PG_RETURN_BOOL(false);
}
kill_ret = kill(repmgrd_pid, 0);
if (kill_ret == 0)
{
PG_RETURN_BOOL(true);
}
PG_RETURN_BOOL(false);
}
Datum
repmgrd_pause(PG_FUNCTION_ARGS)
{
bool pause;
FILE *file = NULL;
StringInfoData buf;
if (!shared_state)
PG_RETURN_NULL();
if (PG_ARGISNULL(0))
PG_RETURN_NULL();
pause = PG_GETARG_BOOL(0);
LWLockAcquire(shared_state->lock, LW_EXCLUSIVE);
shared_state->repmgrd_paused = pause;
LWLockRelease(shared_state->lock);
/* write state to file */
file = AllocateFile(REPMGRD_STATE_FILE, PG_BINARY_W);
if (file == NULL)
{
elog(DEBUG1, "unable to allocate %s", REPMGRD_STATE_FILE);
// XXX anything else we can do? log?
PG_RETURN_VOID();
}
elog(DEBUG1, "allocated");
initStringInfo(&buf);
LWLockAcquire(shared_state->lock, LW_SHARED);
appendStringInfo(&buf, "%i:%i",
shared_state->local_node_id,
pause ? 1 : 0);
LWLockRelease(shared_state->lock);
// XXX check success
fwrite(buf.data, strlen(buf.data) + 1, 1, file);
resetStringInfo(&buf);
FreeFile(file);
PG_RETURN_VOID();
}
Datum
repmgrd_is_paused(PG_FUNCTION_ARGS)
{
bool is_paused;
if (!shared_state)
PG_RETURN_NULL();
LWLockAcquire(shared_state->lock, LW_SHARED);
is_paused = shared_state->repmgrd_paused;
LWLockRelease(shared_state->lock);
PG_RETURN_BOOL(is_paused);
}