Ensure witness server updates its node records following a failover

This involves mainly abstracting the functions which copy
and create records from repmgr.c to dbutils.c, as they need
to be shared between repmgr and repmgrd.

Per issue noted here:

  https://groups.google.com/forum/#!topic/repmgr/v5nu1Xwf6X0
This commit is contained in:
Ian Barwick
2015-03-03 08:57:20 +09:00
parent db5db06244
commit 3d3f082617
4 changed files with 172 additions and 144 deletions

145
dbutils.c
View File

@@ -22,6 +22,7 @@
#include <sys/time.h>
#include "repmgr.h"
#include "config.h"
#include "strutil.h"
#include "log.h"
@@ -830,3 +831,147 @@ set_config_bool(PGconn *conn, const char *config_param, bool state)
return true;
}
/*
* copy_configuration()
*
* Copy records in master's `repl_nodes` table to witness database
*
* This is used by `repmgr` when setting up the witness database, and
* `repmgrd` after a failover event occurs
*/
bool
copy_configuration(PGconn *masterconn, PGconn *witnessconn, char *cluster_name)
{
char sqlquery[MAXLEN];
PGresult *res;
int i;
sqlquery_snprintf(sqlquery, "TRUNCATE TABLE %s.repl_nodes", get_repmgr_schema_quoted(witnessconn));
log_debug("copy_configuration: %s\n", sqlquery);
res = PQexec(witnessconn, sqlquery);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "Cannot clean node details in the witness, %s\n",
PQerrorMessage(witnessconn));
return false;
}
sqlquery_snprintf(sqlquery,
"SELECT id, type, upstream_node_id, name, conninfo, priority, slot_name FROM %s.repl_nodes",
get_repmgr_schema_quoted(masterconn));
res = PQexec(masterconn, sqlquery);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
fprintf(stderr, "Can't get configuration from master: %s\n",
PQerrorMessage(masterconn));
PQclear(res);
return false;
}
for (i = 0; i < PQntuples(res); i++)
{
bool node_record_created;
char *witness = PQgetvalue(res, i, 4);
log_debug(_("copy_configuration(): %s\n"), witness);
node_record_created = create_node_record(witnessconn,
"copy_configuration",
atoi(PQgetvalue(res, i, 0)),
PQgetvalue(res, i, 1),
strlen(PQgetvalue(res, i, 2))
? atoi(PQgetvalue(res, i, 2))
: NO_UPSTREAM_NODE,
cluster_name,
PQgetvalue(res, i, 3),
PQgetvalue(res, i, 4),
atoi(PQgetvalue(res, i, 5)),
strlen(PQgetvalue(res, i, 6))
? PQgetvalue(res, i, 6)
: NULL
);
if (node_record_created == false)
{
fprintf(stderr, "Unable to copy node record to witness database: %s\n",
PQerrorMessage(witnessconn));
return false;
}
}
PQclear(res);
return true;
}
bool
create_node_record(PGconn *conn, char *action, int node, char *type, int upstream_node, char *cluster_name, char *node_name, char *conninfo, int priority, char *slot_name)
{
char sqlquery[QUERY_STR_LEN];
char upstream_node_id[MAXLEN];
char slot_name_buf[MAXLEN];
PGresult *res;
if(upstream_node == NO_UPSTREAM_NODE)
{
/*
* No explicit upstream node id provided for standby - attempt to
* get primary node id
*/
if(strcmp(type, "standby") == 0)
{
int primary_node_id = get_primary_node_id(conn, cluster_name);
maxlen_snprintf(upstream_node_id, "%i", primary_node_id);
}
else
{
maxlen_snprintf(upstream_node_id, "%s", "NULL");
}
}
else
{
maxlen_snprintf(upstream_node_id, "%i", upstream_node);
}
if(slot_name == NULL)
{
maxlen_snprintf(slot_name_buf, "'%s'", slot_name);
}
else
{
maxlen_snprintf(slot_name_buf, "%s", "NULL");
}
sqlquery_snprintf(sqlquery,
"INSERT INTO %s.repl_nodes "
" (id, type, upstream_node_id, cluster, "
" name, conninfo, slot_name, priority) "
"VALUES (%i, '%s', %s, '%s', '%s', '%s', %s, %i) ",
get_repmgr_schema_quoted(conn),
node,
type,
upstream_node_id,
cluster_name,
node_name,
conninfo,
slot_name_buf,
priority);
if(action != NULL)
{
log_debug(_("%s: %s\n"), action, sqlquery);
}
res = PQexec(conn, sqlquery);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
log_warning(_("Cannot insert node details, %s\n"),
PQerrorMessage(conn));
return false;
}
PQclear(res);
return true;
}

View File

@@ -21,7 +21,7 @@
#define _REPMGR_DBUTILS_H_
#include "strutil.h"
#include "config.h"
PGconn *establish_db_connection(const char *conninfo,
const bool exit_on_error);
@@ -58,5 +58,7 @@ bool create_replication_slot(PGconn *conn, char *slot_name);
bool start_backup(PGconn *conn, char *first_wal_segment);
bool stop_backup(PGconn *conn, char *last_wal_segment);
bool set_config_bool(PGconn *conn, const char *config_param, bool state);
bool copy_configuration(PGconn *masterconn, PGconn *witnessconn, char *cluster_name);
bool create_node_record(PGconn *conn, char *action, int node, char *type, int upstream_node, char *cluster_name, char *node_name, char *conninfo, int priority, char *slot_name);
#endif

155
repmgr.c
View File

@@ -56,6 +56,8 @@
#define CLUSTER_SHOW 7
#define CLUSTER_CLEANUP 8
static bool create_recovery_file(const char *data_dir);
static int test_ssh_connection(char *host, char *remote_user);
static int copy_remote_files(char *host, char *remote_user, char *remote_path,
@@ -63,12 +65,10 @@ static int copy_remote_files(char *host, char *remote_user, char *remote_path,
static int run_basebackup(void);
static bool check_parameters_for_action(const int action);
static bool create_schema(PGconn *conn);
static bool copy_configuration(PGconn *masterconn, PGconn *witnessconn);
static void write_primary_conninfo(char *line);
static bool write_recovery_file_line(FILE *recovery_file, char *recovery_file_path, char *line);
static int check_server_version(PGconn *conn, char *server_type, bool exit_on_error, char *server_version_string);
static bool check_upstream_config(PGconn *conn, int server_version_num, bool exit_on_error);
static bool create_node_record(PGconn *conn, char *action, int node, char *type, int upstream_node, char *cluster_name, char *node_name, char *conninfo, int priority);
static char *make_pg_path(char *file);
static void do_master_register(void);
@@ -102,6 +102,7 @@ static char *server_cmd = NULL;
static char pg_bindir[MAXLEN] = "";
static char repmgr_slot_name[MAXLEN] = "";
static char *repmgr_slot_name_ptr = NULL;
static char path_buf[MAXLEN] = "";
int
@@ -485,6 +486,7 @@ main(int argc, char **argv)
if(options.use_replication_slots)
{
maxlen_snprintf(repmgr_slot_name, "repmgr_slot_%i", options.node);
repmgr_slot_name_ptr = repmgr_slot_name;
}
switch (action)
@@ -733,7 +735,8 @@ do_master_register(void)
options.cluster_name,
options.node_name,
options.conninfo,
options.priority);
options.priority,
repmgr_slot_name_ptr);
PQfinish(conn);
@@ -854,7 +857,8 @@ do_standby_register(void)
options.cluster_name,
options.node_name,
options.conninfo,
options.priority);
options.priority,
repmgr_slot_name_ptr);
PQfinish(master_conn);
PQfinish(conn);
@@ -1825,7 +1829,7 @@ do_witness_create(void)
/* check if we need to create a user */
if (runtime_options.username[0] && runtime_options.localport[0] && strcmp(runtime_options.username,"postgres")!=0 )
{
/* create required user needs to be superuser to create untrusted language function in c */
/* create required user; needs to be superuser to create untrusted language function in c */
sprintf(script, "%s -p %s --superuser --login -U %s %s",
make_pg_path("createuser"),
runtime_options.localport, runtime_options.superuser, runtime_options.username);
@@ -1834,7 +1838,7 @@ do_witness_create(void)
r = system(script);
if (r != 0)
{
log_err("Can't create user for witness server\n");
log_err(_("Can't create user for witness server\n"));
PQfinish(masterconn);
exit(ERR_BAD_CONFIG);
}
@@ -1899,7 +1903,8 @@ do_witness_create(void)
options.cluster_name,
options.node_name,
options.conninfo,
options.priority);
options.priority,
NULL);
if(node_record_created == false)
{
@@ -1920,7 +1925,7 @@ do_witness_create(void)
}
/* copy configuration from master, only repl_nodes is needed */
if (!copy_configuration(masterconn, witnessconn))
if (!copy_configuration(masterconn, witnessconn, options.cluster_name))
{
PQfinish(masterconn);
PQfinish(witnessconn);
@@ -2595,70 +2600,6 @@ create_schema(PGconn *conn)
return true;
}
/*
* copy_configuration()
*
* Copy records in master's `repl_nodes` table to witness database
*/
static bool
copy_configuration(PGconn *masterconn, PGconn *witnessconn)
{
char sqlquery[MAXLEN];
PGresult *res;
int i;
sqlquery_snprintf(sqlquery, "TRUNCATE TABLE %s.repl_nodes", get_repmgr_schema_quoted(witnessconn));
log_debug("copy_configuration: %s\n", sqlquery);
res = PQexec(witnessconn, sqlquery);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "Cannot clean node details in the witness, %s\n",
PQerrorMessage(witnessconn));
return false;
}
sqlquery_snprintf(sqlquery,
"SELECT id, type, upstream_node_id, name, conninfo, priority FROM %s.repl_nodes",
get_repmgr_schema_quoted(masterconn));
res = PQexec(masterconn, sqlquery);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
fprintf(stderr, "Can't get configuration from master: %s\n",
PQerrorMessage(masterconn));
PQclear(res);
return false;
}
for (i = 0; i < PQntuples(res); i++)
{
bool node_record_created;
char *witness = PQgetvalue(res, i, 4);
log_debug(_("copy_configuration(): %s\n"), witness);
node_record_created = create_node_record(witnessconn,
"copy_configuration",
atoi(PQgetvalue(res, i, 0)),
PQgetvalue(res, i, 1),
strlen(PQgetvalue(res, i, 2))
? atoi(PQgetvalue(res, i, 2))
: NO_UPSTREAM_NODE,
options.cluster_name,
PQgetvalue(res, i, 3),
PQgetvalue(res, i, 4),
atoi(PQgetvalue(res, i, 5)));
if (node_record_created == false)
{
fprintf(stderr, "Unable to copy node record to witness database: %s\n",
PQerrorMessage(witnessconn));
return false;
}
}
PQclear(res);
return true;
}
/* This function uses global variables to determine connection settings. Special
* usage of the PGPASSWORD variable is handled, but strongly discouraged */
@@ -2947,76 +2888,6 @@ do_check_upstream_config(void)
}
static bool
create_node_record(PGconn *conn, char *action, int node, char *type, int upstream_node, char *cluster_name, char *node_name, char *conninfo, int priority)
{
char sqlquery[QUERY_STR_LEN];
char upstream_node_id[MAXLEN];
char slot_name[MAXLEN];
PGresult *res;
if(upstream_node == NO_UPSTREAM_NODE)
{
/*
* No explicit upstream node id provided for standby - attempt to
* get primary node id
*/
if(strcmp(type, "standby") == 0)
{
int primary_node_id = get_primary_node_id(conn, cluster_name);
maxlen_snprintf(upstream_node_id, "%i", primary_node_id);
}
else
{
maxlen_snprintf(upstream_node_id, "%s", "NULL");
}
}
else
{
maxlen_snprintf(upstream_node_id, "%i", upstream_node);
}
if(options.use_replication_slots && strcmp(type, "standby") == 0)
{
maxlen_snprintf(slot_name, "'%s'", repmgr_slot_name);
}
else
{
maxlen_snprintf(slot_name, "%s", "NULL");
}
sqlquery_snprintf(sqlquery,
"INSERT INTO %s.repl_nodes "
" (id, type, upstream_node_id, cluster, "
" name, conninfo, slot_name, priority) "
"VALUES (%i, '%s', %s, '%s', '%s', '%s', %s, %i) ",
get_repmgr_schema_quoted(conn),
node,
type,
upstream_node_id,
cluster_name,
node_name,
conninfo,
slot_name,
priority);
if(action != NULL)
{
log_debug(_("%s: %s\n"), action, sqlquery);
}
res = PQexec(conn, sqlquery);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
log_warning(_("Cannot insert node details, %s\n"),
PQerrorMessage(conn));
return false;
}
PQclear(res);
return true;
}
static char *
make_pg_path(char *file)

View File

@@ -519,7 +519,7 @@ witness_monitor(void)
local_options.reconnect_attempts
);
primary_conn = get_master_connection(my_local_conn,
local_options.cluster_name, &primary_options.node, NULL);
local_options.cluster_name, &primary_options.node, NULL);
if (PQstatus(primary_conn) != CONNECTION_OK)
{
@@ -534,6 +534,16 @@ witness_monitor(void)
{
log_debug(_("New master found with node ID: %i\n"), primary_options.node);
connection_ok = true;
/*
* Update the repl_nodes table from the new primary to reflect the changed
* node configuration
*
* XXX it would be neat to be able to handle this with e.g. table-based
* logical replication
*/
copy_configuration(primary_conn, my_local_conn, local_options.cluster_name);
break;
}
}