Add "witness register" functionality

This commit is contained in:
Ian Barwick
2017-11-13 14:53:12 +09:00
parent 7fffe3ed96
commit a6cc4d80f0
13 changed files with 397 additions and 23 deletions

View File

@@ -50,6 +50,7 @@ static void _populate_node_records(PGresult *res, NodeInfoList *node_list);
static bool _create_update_node_record(PGconn *conn, char *action, t_node_info *node_info);
static bool _create_event(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details, t_event_info *event_info, bool send_notification);
static bool _is_bdr_db(PGconn *conn, PQExpBufferData *output, bool quiet);
static void _populate_bdr_node_record(PGresult *res, t_bdr_node_info *node_info, int row);
static void _populate_bdr_node_records(PGresult *res, BdrNodeInfoList *node_list);
@@ -1816,6 +1817,10 @@ parse_node_type(const char *type)
{
return STANDBY;
}
else if (strcmp(type, "witness") == 0)
{
return WITNESS;
}
else if (strcmp(type, "bdr") == 0)
{
return BDR;
@@ -1834,6 +1839,8 @@ get_node_type_string(t_server_type type)
return "primary";
case STANDBY:
return "standby";
case WITNESS:
return "witness";
case BDR:
return "bdr";
/* this should never happen */
@@ -2484,6 +2491,57 @@ update_node_record_conn_priority(PGconn *conn, t_configuration_options *options)
}
/*
* Copy node records from primary to witness servers.
*
* This is used when initially registering a witness server, and
* by repmgrd to update the node records when required.
*/
bool
witness_copy_node_records(PGconn *primary_conn, PGconn *witness_conn)
{
PGresult *res = NULL;
NodeInfoList nodes = T_NODE_INFO_LIST_INITIALIZER;
NodeInfoListCell *cell = NULL;
begin_transaction(witness_conn);
/* Defer constraints */
res = PQexec(witness_conn, "SET CONSTRAINTS ALL DEFERRED");
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
log_error(_("unable to defer constraints:\n %s"),
PQerrorMessage(witness_conn));
rollback_transaction(witness_conn);
return false;
}
/* truncate existing records */
if (truncate_node_records(witness_conn) == false)
{
rollback_transaction(witness_conn);
return false;
}
get_all_node_records(primary_conn, &nodes);
for (cell = nodes.head; cell; cell = cell->next)
{
create_node_record(witness_conn, NULL, cell->node_info);
}
/* and done */
commit_transaction(witness_conn);
return true;
}
bool
delete_node_record(PGconn *conn, int node)
{
@@ -2515,6 +2573,24 @@ delete_node_record(PGconn *conn, int node)
return true;
}
bool
truncate_node_records(PGconn *conn)
{
PGresult *res = NULL;
res = PQexec(conn, "TRUNCATE TABLE repmgr.nodes");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
log_error(_("unable to truncate node record table:\n %s"),
PQerrorMessage(conn));
PQclear(res);
return false;
}
PQclear(res);
return true;
}
void
get_node_replication_stats(PGconn *conn, t_node_info *node_info)
@@ -3906,8 +3982,8 @@ get_last_wal_receive_location(PGconn *conn)
/* BDR functions */
/* ============= */
bool
is_bdr_db(PGconn *conn, PQExpBufferData *output)
static bool
_is_bdr_db(PGconn *conn, PQExpBufferData *output, bool quiet)
{
PQExpBufferData query;
PGresult *res = NULL;
@@ -3938,7 +4014,7 @@ is_bdr_db(PGconn *conn, PQExpBufferData *output)
if (output != NULL)
appendPQExpBuffer(output, "%s", warning);
else
else if (quiet == false)
log_warning("%s", warning);
return is_bdr_db;
@@ -3959,7 +4035,7 @@ is_bdr_db(PGconn *conn, PQExpBufferData *output)
if (output != NULL)
appendPQExpBuffer(output, "%s", warning);
else
else if (quiet == false)
log_warning("%s", warning);
}
@@ -3968,6 +4044,19 @@ is_bdr_db(PGconn *conn, PQExpBufferData *output)
return is_bdr_db;
}
bool
is_bdr_db(PGconn *conn, PQExpBufferData *output)
{
return _is_bdr_db(conn, output, false);
}
bool
is_bdr_db_quiet(PGconn *conn)
{
return _is_bdr_db(conn, NULL, true);
}
bool
is_active_bdr_node(PGconn *conn, const char *node_name)