Files
repmgr/repmgr.c
Ian Barwick 951c7dbd07 repmgrd: in BDR mode, have each repmgrd monitor each node
This will cover both the case when an entire node including
repmgrd goes down, and when one PostgreSQL instance goes down
but repmgrd is still up (in which case only one of the repmgrds
will handle the failover).
2017-07-14 15:01:18 +09:00

379 lines
8.1 KiB
C

/*
* repmgr.c - repmgr extension
*
* Copyright (c) 2ndQuadrant, 2010-2017
*
* This is the actual extension code; see repmgr-client.c for the code which
* generates the repmgr binary
*/
#include "postgres.h"
#include "fmgr.h"
#include "access/xlog.h"
#include "miscadmin.h"
#include "replication/walreceiver.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
#include "storage/procarray.h"
#include "storage/shmem.h"
#include "storage/spin.h"
#include "utils/builtins.h"
#include "utils/pg_lsn.h"
#include "utils/timestamp.h"
#include "executor/spi.h"
#include "lib/stringinfo.h"
#include "access/xact.h"
#include "utils/snapmgr.h"
#include "pgstat.h"
#include "voting.h"
#define UNKNOWN_NODE_ID -1
#define TRANCHE_NAME "repmgrd"
PG_MODULE_MAGIC;
typedef enum {
LEADER_NODE,
FOLLOWER_NODE,
CANDIDATE_NODE
} NodeState;
typedef struct repmgrdSharedState
{
LWLockId lock; /* protects search/modification */
/* streaming failover */
NodeState node_state;
NodeVotingStatus voting_status;
int current_electoral_term;
int candidate_node_id;
bool follow_new_primary;
/* BDR failover */
int bdr_failover_handler;
} repmgrdSharedState;
static repmgrdSharedState *shared_state = NULL;
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
void _PG_init(void);
void _PG_fini(void);
static void repmgr_shmem_startup(void);
Datum request_vote(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(request_vote);
Datum get_voting_status(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(get_voting_status);
Datum set_voting_status_initiated(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(set_voting_status_initiated);
Datum other_node_is_candidate(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(other_node_is_candidate);
Datum notify_follow_primary(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(notify_follow_primary);
Datum get_new_primary(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(get_new_primary);
Datum reset_voting_status(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(reset_voting_status);
Datum am_bdr_failover_handler(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(am_bdr_failover_handler);
Datum unset_bdr_failover_handler(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(unset_bdr_failover_handler);
/*
* Module load callback
*/
void
_PG_init(void)
{
elog(INFO, "repmgr init");
// error here?
if (!process_shared_preload_libraries_in_progress)
return;
RequestAddinShmemSpace(MAXALIGN(sizeof(repmgrdSharedState)));
#if (PG_VERSION_NUM >= 90600)
RequestNamedLWLockTranche(TRANCHE_NAME, 1);
#else
RequestAddinLWLocks(1);
#endif
/*
* Install hooks.
*/
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = repmgr_shmem_startup;
}
/*
* Module unload callback
*/
void
_PG_fini(void)
{
/* Uninstall hook */
shmem_startup_hook = prev_shmem_startup_hook;
}
/*
* shmem_startup hook: allocate or attach to shared memory,
*/
static void
repmgr_shmem_startup(void)
{
bool found;
if (prev_shmem_startup_hook)
prev_shmem_startup_hook();
/* reset in case this is a restart within the postmaster */
shared_state = NULL;
/*
* Create or attach to the shared memory state, including hash table
*/
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
shared_state = ShmemInitStruct("repmgrd shared state",
sizeof(repmgrdSharedState),
&found);
if (!found)
{
/* Initialise shared memory struct */
#if (PG_VERSION_NUM >= 90600)
shared_state->lock = &(GetNamedLWLockTranche(TRANCHE_NAME))->lock;
#else
shared_state->lock = LWLockAssign();
#endif
shared_state->current_electoral_term = 0;
shared_state->voting_status = VS_NO_VOTE;
shared_state->candidate_node_id = UNKNOWN_NODE_ID;
shared_state->follow_new_primary = false;
shared_state->bdr_failover_handler = UNKNOWN_NODE_ID;
}
LWLockRelease(AddinShmemInitLock);
}
Datum
request_vote(PG_FUNCTION_ARGS)
{
StringInfoData query;
XLogRecPtr our_lsn = InvalidXLogRecPtr;
/* node_id used for logging purposes */
int requesting_node_id = PG_GETARG_INT32(0);
int current_electoral_term = PG_GETARG_INT32(1);
int ret;
bool isnull;
LWLockAcquire(shared_state->lock, LW_SHARED);
/* this node has initiated voting or already responded to another node */
// if (current_electoral_term == shared_state->current_electoral_term
// && shared_state->voting_status != VS_NO_VOTE)
if (shared_state->voting_status != VS_NO_VOTE)
{
LWLockRelease(shared_state->lock);
PG_RETURN_NULL();
}
elog(INFO, "requesting node id is %i for electoral term %i (our term: %i)",
requesting_node_id, current_electoral_term,
shared_state->current_electoral_term);
SPI_connect();
initStringInfo(&query);
appendStringInfo(
&query,
"SELECT pg_catalog.pg_last_wal_receive_lsn()");
elog(INFO, "query: %s", query.data);
ret = SPI_execute(query.data, true, 0);
// XXX handle errors
our_lsn = DatumGetLSN(SPI_getbinval(SPI_tuptable->vals[0],
SPI_tuptable->tupdesc,
1, &isnull));
elog(INFO, "Our LSN is %X/%X",
(uint32) (our_lsn >> 32),
(uint32) our_lsn);
SPI_finish();
/* indicate this node has responded to a vote request */
shared_state->voting_status = VS_VOTE_REQUEST_RECEIVED;
shared_state->current_electoral_term = current_electoral_term;
LWLockRelease(shared_state->lock);
// should we free "query" here?
PG_RETURN_LSN(our_lsn);
}
Datum
get_voting_status(PG_FUNCTION_ARGS)
{
NodeVotingStatus voting_status;
LWLockAcquire(shared_state->lock, LW_SHARED);
voting_status = shared_state->voting_status;
LWLockRelease(shared_state->lock);
PG_RETURN_INT32(voting_status);
}
Datum
set_voting_status_initiated(PG_FUNCTION_ARGS)
{
int electoral_term;
LWLockAcquire(shared_state->lock, LW_SHARED);
shared_state->voting_status = VS_VOTE_INITIATED;
shared_state->current_electoral_term += 1;
electoral_term = shared_state->current_electoral_term;
LWLockRelease(shared_state->lock);
elog(INFO, "setting voting term to %i", electoral_term);
PG_RETURN_INT32(electoral_term);
}
Datum
other_node_is_candidate(PG_FUNCTION_ARGS)
{
int requesting_node_id = PG_GETARG_INT32(0);
int electoral_term = PG_GETARG_INT32(1);
LWLockAcquire(shared_state->lock, LW_SHARED);
if (shared_state->current_electoral_term == electoral_term)
{
if (shared_state->candidate_node_id != UNKNOWN_NODE_ID)
{
elog(INFO, "node %i requesting candidature, but node %i already candidate",
requesting_node_id,
shared_state->candidate_node_id);
PG_RETURN_BOOL(false);
}
}
shared_state->candidate_node_id = requesting_node_id;
LWLockRelease(shared_state->lock);
elog(INFO, "node %i is candidate", requesting_node_id);
PG_RETURN_BOOL(true);
}
Datum
notify_follow_primary(PG_FUNCTION_ARGS)
{
int primary_node_id = PG_GETARG_INT32(0);
elog(INFO, "received notification to follow node %i", primary_node_id);
LWLockAcquire(shared_state->lock, LW_SHARED);
/* Explicitly set the primary node id */
shared_state->candidate_node_id = primary_node_id;
shared_state->follow_new_primary = true;
LWLockRelease(shared_state->lock);
PG_RETURN_VOID();
}
Datum
get_new_primary(PG_FUNCTION_ARGS)
{
int new_primary_node_id = UNKNOWN_NODE_ID;
LWLockAcquire(shared_state->lock, LW_SHARED);
if (shared_state->follow_new_primary == true)
new_primary_node_id = shared_state->candidate_node_id;
LWLockRelease(shared_state->lock);
PG_RETURN_INT32(new_primary_node_id);
}
Datum
reset_voting_status(PG_FUNCTION_ARGS)
{
LWLockAcquire(shared_state->lock, LW_SHARED);
shared_state->voting_status = VS_NO_VOTE;
shared_state->candidate_node_id = UNKNOWN_NODE_ID;
shared_state->follow_new_primary = false;
LWLockRelease(shared_state->lock);
PG_RETURN_VOID();
}
Datum
am_bdr_failover_handler(PG_FUNCTION_ARGS)
{
int node_id = PG_GETARG_INT32(0);
bool am_handler = false;
LWLockAcquire(shared_state->lock, LW_SHARED);
if (shared_state->bdr_failover_handler == UNKNOWN_NODE_ID)
{
shared_state->bdr_failover_handler = node_id;
am_handler = true;
}
else if (shared_state->bdr_failover_handler == node_id)
{
am_handler = true;
}
LWLockRelease(shared_state->lock);
PG_RETURN_BOOL(am_handler);
}
Datum
unset_bdr_failover_handler(PG_FUNCTION_ARGS)
{
LWLockAcquire(shared_state->lock, LW_SHARED);
shared_state->bdr_failover_handler = UNKNOWN_NODE_ID;
LWLockRelease(shared_state->lock);
PG_RETURN_VOID();
}