mirror of
https://github.com/EnterpriseDB/repmgr.git
synced 2026-03-25 16:16:29 +00:00
Update repl_nodes following failover
repl_nodes table updated by each node following failover to show that either it is the primary, or which primary it has started to follow.
This commit is contained in:
87
repmgrd.c
87
repmgrd.c
@@ -91,6 +91,8 @@ static void standby_monitor(void);
|
|||||||
static void witness_monitor(void);
|
static void witness_monitor(void);
|
||||||
static bool check_connection(PGconn *conn, const char *type);
|
static bool check_connection(PGconn *conn, const char *type);
|
||||||
static void update_node_record_set_primary(PGconn *conn, int this_node_id, int old_primary_node_id);
|
static void update_node_record_set_primary(PGconn *conn, int this_node_id, int old_primary_node_id);
|
||||||
|
static void update_node_record_set_upstream(PGconn *conn, int this_node_id, int new_upstream_node_id);
|
||||||
|
|
||||||
static void update_shared_memory(char *last_wal_standby_applied);
|
static void update_shared_memory(char *last_wal_standby_applied);
|
||||||
static void update_registration(void);
|
static void update_registration(void);
|
||||||
static void do_failover(void);
|
static void do_failover(void);
|
||||||
@@ -649,6 +651,7 @@ standby_monitor(void)
|
|||||||
&upstream_node_id, NULL);
|
&upstream_node_id, NULL);
|
||||||
|
|
||||||
|
|
||||||
|
// ZZZ "5 minutes"?
|
||||||
/*
|
/*
|
||||||
* Check if the upstream node is still available, if after 5 minutes of retries
|
* Check if the upstream node is still available, if after 5 minutes of retries
|
||||||
* we cannot reconnect, try to get a new upstream node.
|
* we cannot reconnect, try to get a new upstream node.
|
||||||
@@ -724,7 +727,7 @@ standby_monitor(void)
|
|||||||
{
|
{
|
||||||
log_debug("standby_monitor() - checking if still standby\n"); // ZZZ
|
log_debug("standby_monitor() - checking if still standby\n"); // ZZZ
|
||||||
ret = is_standby(my_local_conn);
|
ret = is_standby(my_local_conn);
|
||||||
log_debug("ret is %i", ret); // ZZZ
|
|
||||||
switch (ret)
|
switch (ret)
|
||||||
{
|
{
|
||||||
case 0:
|
case 0:
|
||||||
@@ -878,6 +881,7 @@ do_failover(void)
|
|||||||
|
|
||||||
/* initialize to keep compiler quiet */
|
/* initialize to keep compiler quiet */
|
||||||
t_node_info best_candidate = {-1, NO_UPSTREAM_NODE, "", InvalidXLogRecPtr, UNKNOWN, false, false};
|
t_node_info best_candidate = {-1, NO_UPSTREAM_NODE, "", InvalidXLogRecPtr, UNKNOWN, false, false};
|
||||||
|
|
||||||
/* get a list of standby nodes, including myself */
|
/* get a list of standby nodes, including myself */
|
||||||
sprintf(sqlquery,
|
sprintf(sqlquery,
|
||||||
"SELECT id, conninfo, type, upstream_node_id "
|
"SELECT id, conninfo, type, upstream_node_id "
|
||||||
@@ -913,7 +917,7 @@ do_failover(void)
|
|||||||
{
|
{
|
||||||
nodes[i].node_id = atoi(PQgetvalue(res, i, 0));
|
nodes[i].node_id = atoi(PQgetvalue(res, i, 0));
|
||||||
|
|
||||||
strncpy(nodes[i].conninfo_str, PQgetvalue(res, i, 1), MAXLEN);
|
strncpy(nodes[i].conninfo_str, PQgetvalue(res, i, 1), MAXCONNINFO);
|
||||||
|
|
||||||
if(strcmp(PQgetvalue(res, i, 2), "primary") == 0)
|
if(strcmp(PQgetvalue(res, i, 2), "primary") == 0)
|
||||||
{
|
{
|
||||||
@@ -1207,6 +1211,7 @@ do_failover(void)
|
|||||||
best_candidate.node_id = nodes[i].node_id;
|
best_candidate.node_id = nodes[i].node_id;
|
||||||
best_candidate.xlog_location = nodes[i].xlog_location;
|
best_candidate.xlog_location = nodes[i].xlog_location;
|
||||||
best_candidate.is_ready = nodes[i].is_ready;
|
best_candidate.is_ready = nodes[i].is_ready;
|
||||||
|
strncpy(best_candidate.conninfo_str, nodes[i].conninfo_str, MAXCONNINFO);
|
||||||
candidate_found = true;
|
candidate_found = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1220,6 +1225,7 @@ do_failover(void)
|
|||||||
best_candidate.node_id = nodes[i].node_id;
|
best_candidate.node_id = nodes[i].node_id;
|
||||||
best_candidate.xlog_location = nodes[i].xlog_location;
|
best_candidate.xlog_location = nodes[i].xlog_location;
|
||||||
best_candidate.is_ready = nodes[i].is_ready;
|
best_candidate.is_ready = nodes[i].is_ready;
|
||||||
|
strncpy(best_candidate.conninfo_str, nodes[i].conninfo_str, MAXCONNINFO);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1259,11 +1265,12 @@ do_failover(void)
|
|||||||
/* and reconnect to the local database */
|
/* and reconnect to the local database */
|
||||||
my_local_conn = establish_db_connection(local_options.conninfo, true);
|
my_local_conn = establish_db_connection(local_options.conninfo, true);
|
||||||
|
|
||||||
// ZZZ update master record, this node record
|
/* update node information to reflect new status */
|
||||||
update_node_record_set_primary(my_local_conn, node_info.node_id, failed_primary.node_id);
|
update_node_record_set_primary(my_local_conn, node_info.node_id, failed_primary.node_id);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
PGconn *upstream_conn;
|
||||||
/* wait */
|
/* wait */
|
||||||
sleep(10);
|
sleep(10);
|
||||||
|
|
||||||
@@ -1291,11 +1298,17 @@ do_failover(void)
|
|||||||
|
|
||||||
/* and reconnect to the local database */
|
/* and reconnect to the local database */
|
||||||
my_local_conn = establish_db_connection(local_options.conninfo, true);
|
my_local_conn = establish_db_connection(local_options.conninfo, true);
|
||||||
|
upstream_conn = establish_db_connection(best_candidate.conninfo_str, true);
|
||||||
|
// reconnect to new primary
|
||||||
|
/* update node information to reflect new status */
|
||||||
|
update_node_record_set_upstream(upstream_conn, node_info.node_id, best_candidate.node_id);
|
||||||
|
PQfinish(upstream_conn);
|
||||||
}
|
}
|
||||||
|
|
||||||
log_debug("failover done\n"); // ZZZ
|
log_debug("failover done\n"); // ZZZ
|
||||||
|
|
||||||
/* to force it to re-calculate mode and master node */
|
/* to force it to re-calculate mode and master node */
|
||||||
|
// ^ ZZZ check that behaviour ^
|
||||||
failover_done = true;
|
failover_done = true;
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -1787,5 +1800,69 @@ get_node_info(PGconn *conn,char *cluster, int node_id)
|
|||||||
static void
|
static void
|
||||||
update_node_record_set_primary(PGconn *conn, int this_node_id, int old_primary_node_id)
|
update_node_record_set_primary(PGconn *conn, int this_node_id, int old_primary_node_id)
|
||||||
{
|
{
|
||||||
log_debug("this: %i failed: %i\n", this_node_id, old_primary_node_id);
|
PGresult *res;
|
||||||
|
char sqlquery[QUERY_STR_LEN];
|
||||||
|
|
||||||
|
log_debug(_("Setting failed node %i inactive; marking node %i as primary\n"), old_primary_node_id, this_node_id);
|
||||||
|
|
||||||
|
// ZZZ handle errors
|
||||||
|
sqlquery_snprintf(sqlquery, "BEGIN");
|
||||||
|
res = PQexec(conn, sqlquery);
|
||||||
|
PQclear(res);
|
||||||
|
|
||||||
|
sqlquery_snprintf(sqlquery,
|
||||||
|
" UPDATE %s.repl_nodes "
|
||||||
|
" SET active = FALSE "
|
||||||
|
" WHERE cluster = '%s' "
|
||||||
|
" AND id = %i ",
|
||||||
|
get_repmgr_schema_quoted(conn),
|
||||||
|
local_options.cluster_name,
|
||||||
|
old_primary_node_id);
|
||||||
|
|
||||||
|
res = PQexec(conn, sqlquery);
|
||||||
|
PQclear(res);
|
||||||
|
|
||||||
|
sqlquery_snprintf(sqlquery,
|
||||||
|
" UPDATE %s.repl_nodes "
|
||||||
|
" SET type = 'primary', "
|
||||||
|
" upstream_node_id = NULL "
|
||||||
|
" WHERE cluster = '%s' "
|
||||||
|
" AND id = %i ",
|
||||||
|
get_repmgr_schema_quoted(conn),
|
||||||
|
local_options.cluster_name,
|
||||||
|
this_node_id);
|
||||||
|
|
||||||
|
res = PQexec(conn, sqlquery);
|
||||||
|
PQclear(res);
|
||||||
|
|
||||||
|
sqlquery_snprintf(sqlquery, "COMMIT");
|
||||||
|
res = PQexec(conn, sqlquery);
|
||||||
|
PQclear(res);
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
update_node_record_set_upstream(PGconn *conn, int this_node_id, int new_upstream_node_id)
|
||||||
|
{
|
||||||
|
PGresult *res;
|
||||||
|
char sqlquery[QUERY_STR_LEN];
|
||||||
|
|
||||||
|
// ZZZ handle errors
|
||||||
|
|
||||||
|
log_debug(_("update_node_record_set_upstream(): Updating node %i's upstream node to %i\n"), this_node_id, new_upstream_node_id);
|
||||||
|
|
||||||
|
sqlquery_snprintf(sqlquery,
|
||||||
|
" UPDATE %s.repl_nodes "
|
||||||
|
" SET upstream_node_id = %i "
|
||||||
|
" WHERE cluster = '%s' "
|
||||||
|
" AND id = %i ",
|
||||||
|
get_repmgr_schema_quoted(conn),
|
||||||
|
new_upstream_node_id,
|
||||||
|
local_options.cluster_name,
|
||||||
|
this_node_id);
|
||||||
|
res = PQexec(conn, sqlquery);
|
||||||
|
|
||||||
|
PQclear(res);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user