diff --git a/repmgrd.c b/repmgrd.c index 72db3c70..05d7d9cc 100644 --- a/repmgrd.c +++ b/repmgrd.c @@ -91,6 +91,8 @@ static void standby_monitor(void); static void witness_monitor(void); static bool check_connection(PGconn *conn, const char *type); static void update_node_record_set_primary(PGconn *conn, int this_node_id, int old_primary_node_id); +static void update_node_record_set_upstream(PGconn *conn, int this_node_id, int new_upstream_node_id); + static void update_shared_memory(char *last_wal_standby_applied); static void update_registration(void); static void do_failover(void); @@ -649,6 +651,7 @@ standby_monitor(void) &upstream_node_id, NULL); + // ZZZ "5 minutes"? /* * Check if the upstream node is still available, if after 5 minutes of retries * we cannot reconnect, try to get a new upstream node. @@ -724,7 +727,7 @@ standby_monitor(void) { log_debug("standby_monitor() - checking if still standby\n"); // ZZZ ret = is_standby(my_local_conn); - log_debug("ret is %i", ret); // ZZZ + switch (ret) { case 0: @@ -878,6 +881,7 @@ do_failover(void) /* initialize to keep compiler quiet */ t_node_info best_candidate = {-1, NO_UPSTREAM_NODE, "", InvalidXLogRecPtr, UNKNOWN, false, false}; + /* get a list of standby nodes, including myself */ sprintf(sqlquery, "SELECT id, conninfo, type, upstream_node_id " @@ -913,7 +917,7 @@ do_failover(void) { nodes[i].node_id = atoi(PQgetvalue(res, i, 0)); - strncpy(nodes[i].conninfo_str, PQgetvalue(res, i, 1), MAXLEN); + strncpy(nodes[i].conninfo_str, PQgetvalue(res, i, 1), MAXCONNINFO); if(strcmp(PQgetvalue(res, i, 2), "primary") == 0) { @@ -1207,6 +1211,7 @@ do_failover(void) best_candidate.node_id = nodes[i].node_id; best_candidate.xlog_location = nodes[i].xlog_location; best_candidate.is_ready = nodes[i].is_ready; + strncpy(best_candidate.conninfo_str, nodes[i].conninfo_str, MAXCONNINFO); candidate_found = true; } @@ -1220,6 +1225,7 @@ do_failover(void) best_candidate.node_id = nodes[i].node_id; best_candidate.xlog_location = nodes[i].xlog_location; best_candidate.is_ready = nodes[i].is_ready; + strncpy(best_candidate.conninfo_str, nodes[i].conninfo_str, MAXCONNINFO); } } @@ -1259,11 +1265,12 @@ do_failover(void) /* and reconnect to the local database */ my_local_conn = establish_db_connection(local_options.conninfo, true); - // ZZZ update master record, this node record + /* update node information to reflect new status */ update_node_record_set_primary(my_local_conn, node_info.node_id, failed_primary.node_id); } else { + PGconn *upstream_conn; /* wait */ sleep(10); @@ -1291,11 +1298,17 @@ do_failover(void) /* and reconnect to the local database */ my_local_conn = establish_db_connection(local_options.conninfo, true); - + upstream_conn = establish_db_connection(best_candidate.conninfo_str, true); + // reconnect to new primary + /* update node information to reflect new status */ + update_node_record_set_upstream(upstream_conn, node_info.node_id, best_candidate.node_id); + PQfinish(upstream_conn); } log_debug("failover done\n"); // ZZZ + /* to force it to re-calculate mode and master node */ + // ^ ZZZ check that behaviour ^ failover_done = true; } @@ -1787,5 +1800,69 @@ get_node_info(PGconn *conn,char *cluster, int node_id) static void update_node_record_set_primary(PGconn *conn, int this_node_id, int old_primary_node_id) { - log_debug("this: %i failed: %i\n", this_node_id, old_primary_node_id); + PGresult *res; + char sqlquery[QUERY_STR_LEN]; + + log_debug(_("Setting failed node %i inactive; marking node %i as primary\n"), old_primary_node_id, this_node_id); + + // ZZZ handle errors + sqlquery_snprintf(sqlquery, "BEGIN"); + res = PQexec(conn, sqlquery); + PQclear(res); + + sqlquery_snprintf(sqlquery, + " UPDATE %s.repl_nodes " + " SET active = FALSE " + " WHERE cluster = '%s' " + " AND id = %i ", + get_repmgr_schema_quoted(conn), + local_options.cluster_name, + old_primary_node_id); + + res = PQexec(conn, sqlquery); + PQclear(res); + + sqlquery_snprintf(sqlquery, + " UPDATE %s.repl_nodes " + " SET type = 'primary', " + " upstream_node_id = NULL " + " WHERE cluster = '%s' " + " AND id = %i ", + get_repmgr_schema_quoted(conn), + local_options.cluster_name, + this_node_id); + + res = PQexec(conn, sqlquery); + PQclear(res); + + sqlquery_snprintf(sqlquery, "COMMIT"); + res = PQexec(conn, sqlquery); + PQclear(res); + + +} + + +static void +update_node_record_set_upstream(PGconn *conn, int this_node_id, int new_upstream_node_id) +{ + PGresult *res; + char sqlquery[QUERY_STR_LEN]; + + // ZZZ handle errors + + log_debug(_("update_node_record_set_upstream(): Updating node %i's upstream node to %i\n"), this_node_id, new_upstream_node_id); + + sqlquery_snprintf(sqlquery, + " UPDATE %s.repl_nodes " + " SET upstream_node_id = %i " + " WHERE cluster = '%s' " + " AND id = %i ", + get_repmgr_schema_quoted(conn), + new_upstream_node_id, + local_options.cluster_name, + this_node_id); + res = PQexec(conn, sqlquery); + + PQclear(res); }