Various improvements to "repmgr bdr register/unregister"

This commit is contained in:
Ian Barwick
2017-07-12 22:38:03 +09:00
parent 0a1addfdc0
commit 7eadbf6b17
3 changed files with 180 additions and 79 deletions

115
dbutils.c
View File

@@ -30,6 +30,10 @@ static void _populate_node_records(PGresult *res, NodeInfoList *node_list);
static bool _create_update_node_record(PGconn *conn, char *action, t_node_info *node_info);
static bool _create_event(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details, t_event_info *event_info, bool send_notification);
static void _populate_bdr_node_record(PGresult *res, t_bdr_node_info *node_info, int row);
static void _populate_bdr_node_records(PGresult *res, BdrNodeInfoList *node_list);
/* ================= */
/* utility functions */
/* ================= */
@@ -1457,6 +1461,8 @@ get_all_node_records(PGconn *conn, NodeInfoList *node_list)
_populate_node_records(res, node_list);
PQclear(res);
return;
}
@@ -1483,6 +1489,8 @@ get_downstream_node_records(PGconn *conn, int node_id, NodeInfoList *node_list)
_populate_node_records(res, node_list);
PQclear(res);
return;
}
@@ -1513,9 +1521,10 @@ get_active_sibling_node_records(PGconn *conn, int node_id, int upstream_node_id,
termPQExpBuffer(&query);
/* res cleared by this function */
_populate_node_records(res, node_list);
PQclear(res);
return;
}
@@ -1542,9 +1551,10 @@ get_node_records_by_priority(PGconn *conn, NodeInfoList *node_list)
termPQExpBuffer(&query);
/* res cleared by this function */
_populate_node_records(res, node_list);
PQclear(res);
return;
}
@@ -2991,9 +3001,8 @@ add_extension_tables_to_bdr_replication_set(PGconn *conn)
return;
}
RecordStatus
get_bdr_init_node_record(PGconn *conn, t_bdr_node_info *node_info)
void
get_all_bdr_node_records(PGconn *conn, BdrNodeInfoList *node_list)
{
PQExpBufferData query;
PGresult *res;
@@ -3002,39 +3011,79 @@ get_bdr_init_node_record(PGconn *conn, t_bdr_node_info *node_info)
appendPQExpBuffer(
&query,
" SELECT node_sysid, "
" node_timeline, "
" node_dboid, "
" node_status, "
" node_name, "
" node_local_dsn, "
" node_init_from_dsn, "
" node_read_only, "
" node_seq_id "
" FROM bdr.bdr_nodes "
" WHERE node_init_from_dsn IS NULL "
);
" SELECT node_sysid, "
" node_timeline, "
" node_dboid, "
" node_status, "
" node_name, "
" node_local_dsn, "
" node_init_from_dsn, "
" node_read_only, "
" node_seq_id "
" FROM bdr.bdr_nodes "
"ORDER BY node_seq_id ");
log_verbose(LOG_DEBUG, "get_all_node_records():\n%s", query.data);
res = PQexec(conn, query.data);
termPQExpBuffer(&query);
if (!res || PQresultStatus(res) != PGRES_TUPLES_OK)
{
PQclear(res);
return RECORD_NOT_FOUND;
}
else
{
strncpy(node_info->node_sysid, PQgetvalue(res, 0, 0), MAXLEN);
node_info->node_timeline = atoi(PQgetvalue(res, 0, 1));
node_info->node_dboid = atoi(PQgetvalue(res, 0, 2));
// node_status 3
strncpy(node_info->node_name, PQgetvalue(res, 0, 4), MAXLEN);
strncpy(node_info->node_local_dsn, PQgetvalue(res, 0, 5), MAXLEN);
strncpy(node_info->node_init_from_dsn, PQgetvalue(res, 0, 6), MAXLEN);
}
_populate_bdr_node_records(res, node_list);
PQclear(res);
return RECORD_FOUND;
return;
}
static
void _populate_bdr_node_records(PGresult *res, BdrNodeInfoList *node_list)
{
int i;
node_list->head = NULL;
node_list->tail = NULL;
node_list->node_count = 0;
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
return;
}
for (i = 0; i < PQntuples(res); i++)
{
BdrNodeInfoListCell *cell;
cell = (BdrNodeInfoListCell *) pg_malloc0(sizeof(BdrNodeInfoListCell));
cell->node_info = pg_malloc0(sizeof(t_bdr_node_info));
_populate_bdr_node_record(res, cell->node_info, i);
if (node_list->tail)
node_list->tail->next = cell;
else
node_list->head = cell;
node_list->tail = cell;
node_list->node_count++;
}
return;
}
static void
_populate_bdr_node_record(PGresult *res, t_bdr_node_info *node_info, int row)
{
char buf[MAXLEN];
strncpy(node_info->node_sysid, PQgetvalue(res, row, 0), MAXLEN);
node_info->node_timeline = atoi(PQgetvalue(res, row, 1));
node_info->node_dboid = atoi(PQgetvalue(res, row, 2));
strncpy(buf, PQgetvalue(res, row, 3), MAXLEN);
node_info->node_status = buf[0];
strncpy(node_info->node_name, PQgetvalue(res, row, 4), MAXLEN);
strncpy(node_info->node_local_dsn, PQgetvalue(res, row, 5), MAXLEN);
strncpy(node_info->node_init_from_dsn, PQgetvalue(res, row, 6), MAXLEN);
}

View File

@@ -84,7 +84,7 @@ typedef struct s_node_info
}
/* structs to store a list of node records */
/* structs to store a list of repmgr node records */
typedef struct NodeInfoListCell
{
struct NodeInfoListCell *next;
@@ -165,6 +165,27 @@ typedef struct s_bdr_node_info
false, -1 \
}
/* structs to store a list of BDR node records */
typedef struct BdrNodeInfoListCell
{
struct BdrNodeInfoListCell *next;
t_bdr_node_info *node_info;
} BdrNodeInfoListCell;
typedef struct BdrNodeInfoList
{
BdrNodeInfoListCell *head;
BdrNodeInfoListCell *tail;
int node_count;
} BdrNodeInfoList;
#define T_BDR_NODE_INFO_LIST_INITIALIZER { \
NULL, \
NULL, \
0 \
}
/* utility functions */
XLogRecPtr parse_lsn(const char *str);
@@ -277,19 +298,18 @@ bool is_server_available(const char *conninfo);
/* node voting functions */
NodeVotingStatus get_voting_status(PGconn *conn);
int request_vote(PGconn *conn, t_node_info *this_node, t_node_info *other_node, int electoral_term);
int set_voting_status_initiated(PGconn *conn);
bool announce_candidature(PGconn *conn, t_node_info *this_node, t_node_info *other_node, int electoral_term);
void notify_follow_primary(PGconn *conn, int primary_node_id);
bool get_new_primary(PGconn *conn, int *primary_node_id);
void reset_voting_status(PGconn *conn);
int request_vote(PGconn *conn, t_node_info *this_node, t_node_info *other_node, int electoral_term);
int set_voting_status_initiated(PGconn *conn);
bool announce_candidature(PGconn *conn, t_node_info *this_node, t_node_info *other_node, int electoral_term);
void notify_follow_primary(PGconn *conn, int primary_node_id);
bool get_new_primary(PGconn *conn, int *primary_node_id);
void reset_voting_status(PGconn *conn);
/* replication status functions */
XLogRecPtr get_last_wal_receive_location(PGconn *conn);
/* BDR functions */
RecordStatus get_bdr_init_node_record(PGconn *conn, t_bdr_node_info *node_info);
void get_all_bdr_node_records(PGconn *conn, BdrNodeInfoList *node_list);
bool is_bdr_db(PGconn *conn);
bool is_bdr_repmgr(PGconn *conn);
bool is_table_in_bdr_replication_set(PGconn *conn, const char *tablename, const char *set);

View File

@@ -27,6 +27,7 @@ do_bdr_register(void)
RecordStatus record_status;
PQExpBufferData event_details;
bool success = true;
char dbname[MAXLEN];
/* sanity-check configuration for BDR-compatability */
if (config_file_options.replication_type != REPLICATION_TYPE_BDR)
@@ -35,12 +36,14 @@ do_bdr_register(void)
exit(ERR_BAD_CONFIG);
}
/* store the database name for future reference */
get_conninfo_value(config_file_options.conninfo, "dbname", dbname);
conn = establish_db_connection(config_file_options.conninfo, true);
if (!is_bdr_db(conn))
{
/* TODO: name database */
log_error(_("database is not BDR-enabled"));
log_error(_("database \"%s\" is not BDR-enabled"), dbname);
log_hint(_("when using repmgr with BDR, the repmgr schema must be stored in the BDR database"));
exit(ERR_BAD_CONFIG);
}
@@ -50,7 +53,9 @@ do_bdr_register(void)
if (extension_status == REPMGR_UNKNOWN)
{
log_error(_("unable to determine status of \"repmgr\" extension"));
log_error(_("unable to determine status of \"repmgr\" extension in database \"%s\""),
dbname
);
PQfinish(conn);
}
@@ -72,7 +77,7 @@ do_bdr_register(void)
}
else
{
log_info(_("creating repmgr extension"));
log_debug("creating repmgr extension in database \"%s\"", dbname);
begin_transaction(conn);
@@ -87,6 +92,19 @@ do_bdr_register(void)
commit_transaction(conn);
}
/* check for a matching BDR node */
{
bool node_exists = bdr_node_exists(conn, config_file_options.node_name);
if (node_exists == false)
{
log_error(_("no BDR node with node_name \"%s\" found"), config_file_options.node_name);
log_hint(_("\"node_name\" in repmgr.conf must match \"node_name\" in bdr.bdr_nodes\n"));
PQfinish(conn);
exit(ERR_BAD_CONFIG);
}
}
/*
* before adding the extension tables to the replication set,
* if any other BDR nodes exist, populate repmgr.nodes with a copy
@@ -101,36 +119,60 @@ do_bdr_register(void)
if (local_node_records.node_count == 0)
{
/* XXX get all BDR node records */
RecordStatus bdr_record_status;
t_bdr_node_info bdr_init_node_info = T_BDR_NODE_INFO_INITIALIZER;
BdrNodeInfoList bdr_nodes = T_BDR_NODE_INFO_LIST_INITIALIZER;
BdrNodeInfoListCell *bdr_cell;
bdr_record_status = get_bdr_init_node_record(conn, &bdr_init_node_info);
get_all_bdr_node_records(conn, &bdr_nodes);
if (bdr_record_status != RECORD_FOUND)
if (bdr_nodes.node_count == 0)
{
/* XXX don't assume the original node will still be part of the cluster */
log_error(_("unable to retrieve record for originating node"));
log_error(_("unable to retrieve any BDR node records"));
PQfinish(conn);
exit(ERR_BAD_CONFIG);
}
if (strncmp(node_info.node_name, bdr_init_node_info.node_name, MAXLEN) != 0)
for (bdr_cell = bdr_nodes.head; bdr_cell; bdr_cell = bdr_cell->next)
{
/* */
PGconn *init_node_conn;
PGconn *bdr_node_conn = NULL;
NodeInfoList existing_nodes = T_NODE_INFO_LIST_INITIALIZER;
NodeInfoListCell *cell;
ExtensionStatus other_node_extension_status;
init_node_conn = establish_db_connection_quiet(bdr_init_node_info.node_local_dsn);
/* skip the local node */
if (strncmp(node_info.node_name, bdr_cell->node_info->node_name, MAXLEN) == 0)
{
continue;
}
/* XXX check repmgr schema exists */
get_all_node_records(init_node_conn, &existing_nodes);
log_debug("connecting to BDR node \"%s\" (conninfo: \"%s\")",
bdr_cell->node_info->node_name,
bdr_cell->node_info->node_local_dsn);
bdr_node_conn = establish_db_connection_quiet(bdr_cell->node_info->node_local_dsn);
if (PQstatus(bdr_node_conn) != CONNECTION_OK)
{
continue;
}
/* check repmgr schema exists, skip if not */
other_node_extension_status = get_repmgr_extension_status(bdr_node_conn);
if (other_node_extension_status != REPMGR_INSTALLED)
{
continue;
}
get_all_node_records(bdr_node_conn, &existing_nodes);
for (cell = existing_nodes.head; cell; cell = cell->next)
{
log_debug("creating record for node \"%s\" (ID: %i)",
cell->node_info->node_name, cell->node_info->node_id);
create_node_record(conn, "bdr register", cell->node_info);
}
PQfinish(bdr_node_conn);
break;
}
}
}
@@ -138,19 +180,6 @@ do_bdr_register(void)
/* Add the repmgr extension tables to a replication set */
add_extension_tables_to_bdr_replication_set(conn);
/* check for a matching BDR node */
{
bool node_exists = bdr_node_exists(conn, config_file_options.node_name);
if (node_exists == false)
{
log_error(_("no BDR node with node_name '%s' found"), config_file_options.node_name);
log_hint(_("'node_name' in repmgr.conf must match 'node_name' in bdr.bdr_nodes\n"));
PQfinish(conn);
exit(ERR_BAD_CONFIG);
}
}
initPQExpBuffer(&event_details);
begin_transaction(conn);
@@ -275,6 +304,7 @@ do_bdr_unregister(void)
RecordStatus record_status;
bool node_record_deleted;
PQExpBufferData event_details;
char dbname[MAXLEN];
/* sanity-check configuration for BDR-compatability */
@@ -284,19 +314,21 @@ do_bdr_unregister(void)
exit(ERR_BAD_CONFIG);
}
/* store the database name for future reference */
get_conninfo_value(config_file_options.conninfo, "dbname", dbname);
conn = establish_db_connection(config_file_options.conninfo, true);
if (!is_bdr_db(conn))
{
/* TODO: name database */
log_error(_("database is not BDR-enabled"));
log_error(_("database \"%s\" is not BDR-enabled"), dbname);
exit(ERR_BAD_CONFIG);
}
extension_status = get_repmgr_extension_status(conn);
if (extension_status != REPMGR_INSTALLED)
{
log_error(_("repmgr is not installed on this database"));
log_error(_("repmgr is not installed on database \"%s\""), dbname);
exit(ERR_BAD_CONFIG);
}
@@ -323,11 +355,9 @@ do_bdr_unregister(void)
exit(ERR_BAD_CONFIG);
}
// BDR node
begin_transaction(conn);
log_info(_("unregistering node %i"), target_node_id);
log_debug("unregistering node %i", target_node_id);
node_record_deleted = delete_node_record(conn, target_node_id);
@@ -337,6 +367,7 @@ do_bdr_unregister(void)
"unable to delete node record for node \"%s\" (ID: %i)",
node_info.node_name,
target_node_id);
rollback_transaction(conn);
}
else
{
@@ -344,8 +375,9 @@ do_bdr_unregister(void)
"node record deleted for node \"%s\" (ID: %i)",
node_info.node_name,
target_node_id);
commit_transaction(conn);
}
commit_transaction(conn);
/* Log the event */
create_event_notification(