repmgr master unregister: check for downstream nodes

Foreign key dependencies will make it impossible to remove the node
if it still has downstream nodes pointing to it.
This commit is contained in:
Ian Barwick
2017-06-12 22:24:50 +09:00
parent fbb534e394
commit bb7d3e41c3
3 changed files with 110 additions and 4 deletions

View File

@@ -20,10 +20,10 @@ static PGconn *_establish_db_connection(const char *conninfo,
const bool verbose_only);
static bool _set_config(PGconn *conn, const char *config_param, const char *sqlquery);
static int _get_node_record(PGconn *conn, char *sqlquery, t_node_info *node_info);
static int _get_node_record(PGconn *conn, char *sqlquery, t_node_info *node_info);
static void _populate_node_record(PGresult *res, t_node_info *node_info, int row);
static bool _create_update_node_record(PGconn *conn, char *action, t_node_info *node_info);
static bool _create_event_record(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details, t_event_info *event_info);
static bool _create_event_record(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details, t_event_info *event_info);
/* ==================== */
@@ -1303,6 +1303,58 @@ get_local_node_record(PGconn *conn, int node_id, t_node_info *node_info)
}
void
get_downstream_node_records(PGconn *conn, int node_id, NodeInfoList *node_list)
{
PQExpBufferData query;
PGresult *result;
int i;
initPQExpBuffer(&query);
appendPQExpBuffer(&query,
" SELECT node_id, type, upstream_node_id, node_name, conninfo, slot_name, priority, active"
" FROM repmgr.nodes "
" WHERE upstream_node_id = %i "
"ORDER BY node_id ",
node_id);
log_verbose(LOG_DEBUG, "get_node_records_by_priority():\n%s", query.data);
result = PQexec(conn, query.data);
termPQExpBuffer(&query);
node_list->head = NULL;
node_list->tail = NULL;
node_list->node_count = 0;
if (PQresultStatus(result) != PGRES_TUPLES_OK)
{
return;
}
for (i = 0; i < PQntuples(result); i++)
{
NodeInfoListCell *cell;
cell = (NodeInfoListCell *) pg_malloc0(sizeof(NodeInfoListCell));
cell->node_info = pg_malloc0(sizeof(t_node_info));
_populate_node_record(result, cell->node_info, i);
if (node_list->tail)
node_list->tail->next = cell;
else
node_list->head = cell;
node_list->tail = cell;
node_list->node_count++;
}
return;
}
bool
create_node_record(PGconn *conn, char *repmgr_action, t_node_info *node_info)
{

View File

@@ -73,6 +73,7 @@ typedef struct s_node_info
}
/* structs to store a list of node records */
typedef struct NodeInfoListCell
{
struct NodeInfoListCell *next;
@@ -83,8 +84,14 @@ typedef struct NodeInfoList
{
NodeInfoListCell *head;
NodeInfoListCell *tail;
int node_count;
} NodeInfoList;
#define T_NODE_INFO_LIST_INITIALIZER { \
NULL, \
NULL, \
0 \
}
typedef struct s_event_info
{
@@ -126,6 +133,8 @@ typedef struct s_connection_user
bool is_superuser;
} t_connection_user;
/* connection functions */
PGconn *establish_db_connection(const char *conninfo,
const bool exit_on_error);
@@ -190,6 +199,7 @@ int get_node_record(PGconn *conn, int node_id, t_node_info *node_info);
int get_node_record_by_name(PGconn *conn, const char *node_name, t_node_info *node_info);
bool get_local_node_record(PGconn *conn, int node_id, t_node_info *node_info);
bool get_master_node_record(PGconn *conn, t_node_info *node_info);
void get_downstream_node_records(PGconn *conn, int node_id, NodeInfoList *nodes);
bool create_node_record(PGconn *conn, char *repmgr_action, t_node_info *node_info);
bool update_node_record(PGconn *conn, char *repmgr_action, t_node_info *node_info);

View File

@@ -242,13 +242,18 @@ do_master_unregister(void)
t_node_info *target_node_info_ptr;
PGconn *target_node_conn = NULL;
NodeInfoList downstream_nodes = T_NODE_INFO_LIST_INITIALIZER;
/* We must be able to connect to the local node */
local_conn = establish_db_connection(config_file_options.conninfo, true);
/* Get local node record */
get_local_node_record(local_conn, config_file_options.node_id, &local_node_info);
/* Obtain a connection to the master node */
/*
* Obtain a connection to the current master node - if this isn't possible,
* abort as we won't be able to update the "nodes" table anyway.
*/
master_conn = establish_master_db_connection(local_conn, false);
if (PQstatus(master_conn) != CONNECTION_OK)
@@ -270,7 +275,6 @@ do_master_unregister(void)
exit(ERR_DB_CONN);
}
/* Local connection no longer required */
PQfinish(local_conn);
@@ -287,7 +291,47 @@ do_master_unregister(void)
target_node_info_ptr = &target_node_info;
}
/*
* Check for downstream nodes - if any still defined, we won't be able to
* delete the node record due to foreign key constraints.
*/
get_downstream_node_records(master_conn, target_node_info_ptr->node_id, &downstream_nodes);
if (downstream_nodes.node_count > 0)
{
NodeInfoListCell *cell;
PQExpBufferData detail;
if (downstream_nodes.node_count == 1)
{
log_error(_("%i other node still has this node as its upstream node"),
downstream_nodes.node_count);
}
else
{
log_error(_("%i other nodes still have this node as their upstream node"),
downstream_nodes.node_count);
}
log_hint(_("ensure these nodes are following the current master with \"repmgr standby follow\""));
initPQExpBuffer(&detail);
for (cell = downstream_nodes.head; cell; cell = cell->next)
{
appendPQExpBuffer(&detail,
" %s (id: %i)\n",
cell->node_info->node_name,
cell->node_info->node_id);
}
log_detail(_("the affected node(s) are:\n%s"), detail.data);
termPQExpBuffer(&detail);
PQfinish(master_conn);
exit(ERR_BAD_CONFIG);
}
target_node_conn = establish_db_connection_quiet(target_node_info_ptr->conninfo);