diff --git a/dbutils.c b/dbutils.c index 75980d60..bfd5aff1 100644 --- a/dbutils.c +++ b/dbutils.c @@ -1134,7 +1134,6 @@ get_repmgr_extension_status(PGconn *conn) } /* 1. Check extension is actually available */ - if (PQntuples(res) == 0) { return REPMGR_UNAVAILABLE; @@ -1234,6 +1233,7 @@ parse_node_type(const char *type) return UNKNOWN; } + const char * get_node_type_string(t_server_type type) { @@ -1311,6 +1311,24 @@ get_node_record_by_name(PGconn *conn, const char *node_name, t_node_info *node_i } +t_node_info * +get_node_record_pointer(PGconn *conn, int node_id) +{ + t_node_info *node_info = pg_malloc0(sizeof(t_node_info)); + RecordStatus record_status; + + record_status = get_node_record(conn, node_id, node_info); + + if (record_status != RECORD_FOUND) + { + pfree(node_info); + return NULL; + } + + return node_info; +} + + bool get_primary_node_record(PGconn *conn, t_node_info *node_info) { @@ -1444,11 +1462,42 @@ get_active_sibling_node_records(PGconn *conn, int node_id, int upstream_node_id, termPQExpBuffer(&query); + /* res cleared by this function */ _populate_node_records(res, node_list); return; } + +void +get_node_records_by_priority(PGconn *conn, NodeInfoList *node_list) +{ + PQExpBufferData query; + PGresult *res; + + clear_node_info_list(node_list); + + initPQExpBuffer(&query); + + appendPQExpBuffer( + &query, + " SELECT node_id, type, upstream_node_id, node_name, conninfo, repluser, slot_name, location, priority, active" + " FROM repmgr.nodes " + "ORDER BY priority DESC, node_name "); + + log_verbose(LOG_DEBUG, "get_node_records_by_priority():\n%s", query.data); + + res = PQexec(conn, query.data); + + termPQExpBuffer(&query); + + /* res cleared by this function */ + _populate_node_records(res, node_list); + + return; +} + + bool create_node_record(PGconn *conn, char *repmgr_action, t_node_info *node_info) { @@ -1573,6 +1622,41 @@ _create_update_node_record(PGconn *conn, char *action, t_node_info *node_info) return true; } + +bool +update_node_record_set_active(PGconn *conn, int this_node_id, bool active) +{ + PQExpBufferData query; + PGresult *res; + + initPQExpBuffer(&query); + + appendPQExpBuffer( + &query, + "UPDATE repmgr.nodes SET active = %s " + " WHERE id = %i", + active == true ? "TRUE" : "FALSE", + this_node_id); + + log_verbose(LOG_DEBUG, "update_node_record_set_active():\n %s", query.data); + + res = PQexec(conn, query.data); + termPQExpBuffer(&query); + + if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) + { + log_error(_("unable to update node record:\n %s"), + PQerrorMessage(conn)); + PQclear(res); + return false; + } + + PQclear(res); + + return true; +} + + bool update_node_record_set_primary(PGconn *conn, int this_node_id) { @@ -1593,6 +1677,7 @@ update_node_record_set_primary(PGconn *conn, int this_node_id) " AND active IS TRUE "); res = PQexec(conn, query.data); + termPQExpBuffer(&query); if (PQresultStatus(res) != PGRES_COMMAND_OK) { @@ -1605,7 +1690,6 @@ update_node_record_set_primary(PGconn *conn, int this_node_id) } PQclear(res); - termPQExpBuffer(&query); initPQExpBuffer(&query); @@ -1715,6 +1799,43 @@ update_node_record_status(PGconn *conn, int this_node_id, char *type, int upstre } +/* + * Update node record's "conninfo" and "priority" fields. Called by repmgrd + * following a configuration file reload. + */ +bool +update_node_record_conn_priority(PGconn *conn, t_configuration_options *options) +{ + PQExpBufferData query; + PGresult *res; + + initPQExpBuffer(&query); + + appendPQExpBuffer( + &query, + "UPDATE repmgr.nodes " + " SET conninfo = '%s', " + " priority = %d " + " WHERE id = %d ", + options->conninfo, + options->priority, + options->node_id); + + res = PQexec(conn, query.data); + termPQExpBuffer(&query); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + + PQclear(res); + return false; + } + + PQclear(res); + return true; +} + + bool delete_node_record(PGconn *conn, int node) { diff --git a/dbutils.h b/dbutils.h index 3b90e20b..e8afc689 100644 --- a/dbutils.h +++ b/dbutils.h @@ -212,18 +212,24 @@ const char * get_node_type_string(t_server_type type); RecordStatus get_node_record(PGconn *conn, int node_id, t_node_info *node_info); RecordStatus get_node_record_by_name(PGconn *conn, const char *node_name, t_node_info *node_info); +t_node_info *get_node_record_pointer(PGconn *conn, int node_id); + bool get_local_node_record(PGconn *conn, int node_id, t_node_info *node_info); bool get_primary_node_record(PGconn *conn, t_node_info *node_info); + void get_downstream_node_records(PGconn *conn, int node_id, NodeInfoList *nodes); void get_active_sibling_node_records(PGconn *conn, int node_id, int upstream_node_id, NodeInfoList *node_list); +void get_node_records_by_priority(PGconn *conn, NodeInfoList *node_list); bool create_node_record(PGconn *conn, char *repmgr_action, t_node_info *node_info); bool update_node_record(PGconn *conn, char *repmgr_action, t_node_info *node_info); bool delete_node_record(PGconn *conn, int node); +bool update_node_record_set_active(PGconn *conn, int this_node_id, bool active); bool update_node_record_set_primary(PGconn *conn, int this_node_id); bool update_node_record_set_upstream(PGconn *conn, int this_node_id, int new_upstream_node_id); bool update_node_record_status(PGconn *conn, int this_node_id, char *type, int upstream_node_id, bool active); +bool update_node_record_conn_priority(PGconn *conn, t_configuration_options *options); void clear_node_info_list(NodeInfoList *nodes); diff --git a/repmgrd.c b/repmgrd.c index e214f6ac..0543328e 100644 --- a/repmgrd.c +++ b/repmgrd.c @@ -95,17 +95,11 @@ static void monitor_streaming_primary(void); static void monitor_streaming_standby(void); static void monitor_bdr(void); - -#ifndef WIN32 -static void setup_event_handlers(void); -static void handle_sighup(SIGNAL_ARGS); -static void handle_sigint(SIGNAL_ARGS); -#endif - static PGconn *try_reconnect(const char *conninfo, NodeStatus *node_status); static bool do_primary_failover(void); static bool do_upstream_standby_failover(void); +static t_node_info *do_bdr_failover(NodeInfoList *nodes, t_node_info *monitored_node); static ElectionResult do_election(void); static const char *_print_voting_status(NodeVotingStatus voting_status); @@ -123,7 +117,13 @@ static FailoverState follow_new_primary(int new_primary_id); static void reset_node_voting_status(void); static int calculate_elapsed(instr_time start_time); +static void update_registration(PGconn *conn); +#ifndef WIN32 +static void setup_event_handlers(void); +static void handle_sighup(SIGNAL_ARGS); +static void handle_sigint(SIGNAL_ARGS); +#endif static void close_connections(); static void terminate(int retval); @@ -739,6 +739,8 @@ monitor_streaming_standby(void) { log_error(_("unable connect to upstream node (ID: %i), terminating"), local_node_info.upstream_node_id); + log_hint(_("upstream node must be running before repmgrd can start")); + PQfinish(local_conn); exit(ERR_DB_CONN); } @@ -2039,10 +2041,392 @@ reset_node_voting_status(void) static void monitor_bdr(void) { + NodeInfoList nodes = T_NODE_INFO_LIST_INITIALIZER; + PGconn *monitoring_conn = NULL; + t_node_info *monitored_node = NULL; + RecordStatus record_status; + + bool failover_done = false; + + /* sanity check local database */ + log_info(_("connecting to local database '%s'"), + config_file_options.conninfo); + + local_conn = establish_db_connection(config_file_options.conninfo, true); + + /* + * Local node must be running + */ + if (PQstatus(local_conn) != CONNECTION_OK) + { + log_error(_("unable connect to local node (ID: %i), terminating"), + local_node_info.node_id); + log_hint(_("local node must be running before repmgrd can start")); + PQfinish(local_conn); + exit(ERR_DB_CONN); + } + + /* + * Verify that database is a BDR one + * TODO: check if supported BDR version? + */ + log_info(_("connected to database, checking for BDR")); + + if (!is_bdr_db(local_conn)) + { + log_error(_("database is not BDR-enabled")); + exit(ERR_BAD_CONFIG); + } + + + if (is_table_in_bdr_replication_set(local_conn, "nodes", "repmgr")) + { + log_error(_("repmgr metadata table 'repmgr.%s' is not in the 'repmgr' replication set"), + "nodes"); + + /* TODO: add `repmgr bdr sync` or similar for this situation, and hint here */ + + exit(ERR_BAD_CONFIG); + } + + /* Retrieve record for this node from the local database */ + record_status = get_node_record(local_conn, config_file_options.node_id, &local_node_info); + + /* + * Terminate if we can't find the local node record. This is a "fix-the-config" + * situation, not a lot else we can do. + */ + if (record_status != RECORD_FOUND) + { + log_error(_("unable to retrieve record for local node (ID: %i), terminating"), + local_node_info.node_id); + log_hint(_("check that 'repmgr bdr register' was executed for this node\n")); + PQfinish(local_conn); + exit(ERR_BAD_CONFIG); + } + + + // check if inactive node + // -> what to do? + + /* Log startup event */ + + create_event_record(local_conn, + &config_file_options, + config_file_options.node_id, + "repmgrd_start", + true, + NULL); + + /* + * retrieve list of nodes - we'll need these if the DB connection goes away, + * or if we're monitoring a non-local node + */ + get_node_records_by_priority(local_conn, &nodes); + + /* decided which node to monitor */ + + if (config_file_options.bdr_monitoring_mode == BDR_MONITORING_LOCAL) + { + // if local, reuse local_conn and node info + //record_status = get_node_record(local_conn, config_file_options.node_id, &monitored_node); + monitored_node = &local_node_info; + + monitoring_conn = establish_db_connection(monitored_node->conninfo, false); + log_debug("main_loop_bdr() monitoring local node %i", config_file_options.node_id); + } + else + { + NodeInfoListCell *cell; + + for (cell = nodes.head; cell; cell = cell->next) + { + log_debug("main_loop_bdr() checking node %s %i", cell->node_info->node_name, cell->node_info->priority); + + monitoring_conn = establish_db_connection(cell->node_info->conninfo, false); + if (PQstatus(monitoring_conn) == CONNECTION_OK) + { + log_debug("main_loop_bdr() monitoring node '%s' (ID %i, priority %i)", + cell->node_info->node_name, cell->node_info->node_id, cell->node_info->priority); + /* fetch the record again, as the node list is transient */ + monitored_node = get_node_record_pointer(monitoring_conn, cell->node_info->node_id); + + break; + } + } + } + + // check monitored_node not null! + while (true) { - sleep(1); + /* normal state - connection active */ + if (PQstatus(monitoring_conn) == CONNECTION_OK) + { + // XXX detail + log_info(_("starting continuous bdr node monitoring")); + + /* monitoring loop */ + do + { + log_verbose(LOG_DEBUG, "bdr check loop..."); + + { + NodeInfoListCell *cell; + + for (cell = nodes.head; cell; cell = cell->next) + { + log_debug("bdr_monitor() %s", cell->node_info->node_name); + } + } + + if (is_server_available(monitored_node->conninfo) == false) + { + t_node_info *new_monitored_node; + + // XXX improve + log_warning("connection problem!"); + new_monitored_node = do_bdr_failover(&nodes, monitored_node); + + if (new_monitored_node != NULL) + { + pfree(monitored_node); + monitored_node = new_monitored_node; + } + log_notice(_("monitored_node->node_name is now '%s' \n"), monitored_node->node_name); + } + else + { + sleep(config_file_options.monitor_interval_secs); + } + + if (got_SIGHUP) + { + /* + * if we can reload, then could need to change + * local_conn + */ + if (reload_config(&config_file_options)) + { + PQfinish(local_conn); + local_conn = establish_db_connection(config_file_options.conninfo, true); + update_registration(local_conn); + } + + /* reload node list */ + get_node_records_by_priority(local_conn, &nodes); + + got_SIGHUP = false; + } + + } while (!failover_done); + } + /* local connection inactive - periodically try and connect */ + /* TODO: make this an option */ + else + { + + monitoring_conn = establish_db_connection(monitored_node->conninfo, false); + + if (PQstatus(monitoring_conn) == CONNECTION_OK) + { + // XXX event bdr_node_recovered -> if monitored == local node + + if (monitored_node->node_id == config_file_options.node_id) + { + log_notice(_("local connection has returned, resuming monitoring")); + } + else + { + log_notice(_("connection to '%s' has returned, resuming monitoring"), monitored_node->node_name); + } + } + else + { + sleep(config_file_options.monitor_interval_secs); + } + + + if (got_SIGHUP) + { + /* + * if we can reload, then could need to change + * local_conn + */ + if (reload_config(&config_file_options)) + { + if (PQstatus(local_conn) == CONNECTION_OK) + { + PQfinish(local_conn); + local_conn = establish_db_connection(config_file_options.conninfo, true); + update_registration(local_conn); + } + } + + /* reload node list */ + if (PQstatus(local_conn) == CONNECTION_OK) + get_node_records_by_priority(local_conn, &nodes); + + got_SIGHUP = false; + } + } + + failover_done = false; } + + return; +} + +/* + * do_bdr_failover() + * + * Here we attempt to perform a BDR "failover". + * + * As there's no equivalent of a physical replication failover, + * we'll do the following: + * + * - attempt to find another node, to set our node record as inactive + * - generate an event log record on that node + * - optionally execute `bdr_failover_command`, passing the conninfo string + * of that node to the command; this can be used for e.g. reconfiguring + * pgbouncer. + * - if mode is 'BDR_MONITORING_PRIORITY', redirect monitoring to that node. + * + */ +static t_node_info * +do_bdr_failover(NodeInfoList *nodes, t_node_info *monitored_node) +{ + PGconn *next_node_conn = NULL; + NodeInfoListCell *cell; + bool failover_success = false; + PQExpBufferData event_details; + t_event_info event_info = T_EVENT_INFO_INITIALIZER; + t_node_info *new_monitored_node = NULL; + + initPQExpBuffer(&event_details); + + /* get next active priority node */ + + for (cell = nodes->head; cell; cell = cell->next) + { + log_debug("do_bdr_failover() %s", cell->node_info->node_name); + + /* don't attempt to connect to the current monitored node, as that's the one which has failed */ + if (cell->node_info->node_id == monitored_node->node_id) + continue; + + /* XXX skip inactive node? */ + + next_node_conn = establish_db_connection(cell->node_info->conninfo, false); + + if (PQstatus(next_node_conn) == CONNECTION_OK) + { + // XXX check if record returned + new_monitored_node = get_node_record_pointer(next_node_conn, cell->node_info->node_id); + + break; + } + + next_node_conn = NULL; + } + + if (next_node_conn == NULL) + { + appendPQExpBuffer(&event_details, + _("no other available node found")); + + log_error("%s", event_details.data); + + + // no other nodes found + // continue degraded monitoring until node is restored? + } + else + { + log_info(_("connecting to target node %s"), cell->node_info->node_name); + + failover_success = true; + + event_info.conninfo_str = cell->node_info->conninfo; + event_info.node_name = cell->node_info->node_name; + + /* update our own record on the other node */ + if (monitored_node->node_id == config_file_options.node_id) + { + update_node_record_set_active(next_node_conn, monitored_node->node_id, false); + } + + if (config_file_options.bdr_monitoring_mode == BDR_MONITORING_PRIORITY) + { + log_notice(_("monitoring next available node by prioriy: %s (ID %i)"), + new_monitored_node->node_name, + new_monitored_node->node_id); + } + + appendPQExpBuffer(&event_details, + _("node '%s' (ID: %i) detected as failed; next available node is '%s' (ID: %i)"), + monitored_node->node_name, + monitored_node->node_id, + cell->node_info->node_name, + cell->node_info->node_id); + } + + /* + * Create an event record + * + * If we were able to connect to another node, we'll update the + * event log there. + * + * In any case the event notification command will be triggered + * with the event "bdr_failover" + */ + + create_event_notification_extended( + next_node_conn, + &config_file_options, + config_file_options.node_id, + "bdr_failover", + failover_success, + event_details.data, + &event_info); + + termPQExpBuffer(&event_details); + + //failover_done = true; + + if (config_file_options.bdr_monitoring_mode == BDR_MONITORING_PRIORITY) + return new_monitored_node; + + /* local monitoring mode - there's no new node to monitor */ + return NULL; +} + +static void +update_registration(PGconn *conn) +{ + bool success = update_node_record_conn_priority(local_conn, + &config_file_options); + // check values have actually changed + + if (success == false) + { + PQExpBufferData errmsg; + initPQExpBuffer(&errmsg); + + appendPQExpBuffer(&errmsg, + _("unable to update local node record:\n %s"), + PQerrorMessage(conn)); + + create_event_record(conn, + &config_file_options, + config_file_options.node_id, + "repmgrd_config_reload", + false, + errmsg.data); + termPQExpBuffer(&errmsg); + } + + return; }