master register: check for existing master presence/active record

This commit is contained in:
Ian Barwick
2017-04-21 19:47:31 +09:00
parent 77870d3887
commit a16bbbf107
4 changed files with 295 additions and 10 deletions

238
dbutils.c
View File

@@ -360,6 +360,83 @@ conn_to_param_list(PGconn *conn, t_conninfo_param_list *param_list)
}
}
/* ===================== */
/* transaction functions */
/* ===================== */
bool
begin_transaction(PGconn *conn)
{
PGresult *res;
log_verbose(LOG_DEBUG, "begin_transaction()");
res = PQexec(conn, "BEGIN");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
log_error(_("Unable to begin transaction:\n %s"),
PQerrorMessage(conn));
PQclear(res);
return false;
}
PQclear(res);
return true;
}
bool
commit_transaction(PGconn *conn)
{
PGresult *res;
log_verbose(LOG_DEBUG, "commit_transaction()");
res = PQexec(conn, "COMMIT");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
log_error(_("Unable to commit transaction:\n %s"),
PQerrorMessage(conn));
PQclear(res);
return false;
}
PQclear(res);
return true;
}
bool
rollback_transaction(PGconn *conn)
{
PGresult *res;
log_verbose(LOG_DEBUG, "rollback_transaction()");
res = PQexec(conn, "ROLLBACK");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
log_error(_("Unable to rollback transaction:\n %s"),
PQerrorMessage(conn));
PQclear(res);
return false;
}
PQclear(res);
return true;
}
/* ========================== */
/* GUC manipulation functions */
/* ========================== */
@@ -455,7 +532,7 @@ is_standby(PGconn *conn)
if (res == NULL || PQresultStatus(res) != PGRES_TUPLES_OK)
{
log_error(_("unable to determine if server is in recovery: %s"),
log_error(_("unable to determine if server is in recovery:\n %s"),
PQerrorMessage(conn));
result = -1;
}
@@ -467,3 +544,162 @@ is_standby(PGconn *conn)
PQclear(res);
return result;
}
/*
* Read the node list from the provided connection and attempt to connect to each node
* in turn to definitely establish if it's the cluster primary.
*
* The node list is returned in the order which makes it likely that the
* current primary will be returned first, reducing the number of speculative
* connections which need to be made to other nodes.
*
* If master_conninfo_out points to allocated memory of MAXCONNINFO in length,
* the primary server's conninfo string will be copied there.
*/
PGconn *
get_master_connection(PGconn *conn,
int *master_id, char *master_conninfo_out)
{
PQExpBufferData query;
PGconn *remote_conn = NULL;
PGresult *res;
char remote_conninfo_stack[MAXCONNINFO];
char *remote_conninfo = &*remote_conninfo_stack;
int i,
node_id;
/*
* If the caller wanted to get a copy of the connection info string, sub
* out the local stack pointer for the pointer passed by the caller.
*/
if (master_conninfo_out != NULL)
remote_conninfo = master_conninfo_out;
if (master_id != NULL)
{
*master_id = NODE_NOT_FOUND;
}
/* find all registered nodes */
log_info(_("retrieving node list"));
initPQExpBuffer(&query);
appendPQExpBuffer(&query,
" SELECT node_id, conninfo, "
" CASE WHEN type = 'master' THEN 1 ELSE 2 END AS type_priority"
" FROM repmgr.nodes "
" WHERE type != 'witness' "
"ORDER BY active DESC, type_priority, priority, node_id");
log_verbose(LOG_DEBUG, "get_master_connection():\n%s", query.data);
res = PQexec(conn, query.data);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
log_error(_("unable to retrieve node records:\n %s"),
PQerrorMessage(conn));
PQclear(res);
return NULL;
}
termPQExpBuffer(&query);
for (i = 0; i < PQntuples(res); i++)
{
int is_node_standby;
/* initialize with the values of the current node being processed */
node_id = atoi(PQgetvalue(res, i, 0));
strncpy(remote_conninfo, PQgetvalue(res, i, 1), MAXCONNINFO);
log_verbose(LOG_INFO,
_("checking role of cluster node '%i'"),
node_id);
remote_conn = establish_db_connection(remote_conninfo, false);
if (PQstatus(remote_conn) != CONNECTION_OK)
continue;
is_node_standby = is_standby(remote_conn);
if (is_node_standby == -1)
{
log_error(_("unable to retrieve recovery state from node %i:\n %s"),
node_id,
PQerrorMessage(remote_conn));
PQfinish(remote_conn);
continue;
}
/* if is_standby() returns 0, queried node is the master */
if (is_node_standby == 0)
{
PQclear(res);
log_debug(_("get_master_connection(): current master node is %i"), node_id);
if (master_id != NULL)
{
*master_id = node_id;
}
return remote_conn;
}
PQfinish(remote_conn);
}
PQclear(res);
return NULL;
}
/*
* Return the id of the active master node, or NODE_NOT_FOUND if no
* record available.
*
* This reports the value stored in the database only and
* does not verify whether the node is actually available
*/
int
get_master_node_id(PGconn *conn)
{
PQExpBufferData query;
PGresult *res;
int retval;
initPQExpBuffer(&query);
appendPQExpBuffer(&query,
"SELECT node_id "
" FROM repmgr.nodes "
" WHERE type = 'master' "
" AND active IS TRUE ");
log_verbose(LOG_DEBUG, "get_master_node_id():\n%s", query.data);
res = PQexec(conn, query.data);
termPQExpBuffer(&query);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
log_error(_("get_master_node_id(): query failed\n %s"),
PQerrorMessage(conn));
retval = NODE_NOT_FOUND;
}
else if (PQntuples(res) == 0)
{
log_verbose(LOG_WARNING, _("get_master_node_id(): no active primary found\n"));
retval = NODE_NOT_FOUND;
}
else
{
retval = atoi(PQgetvalue(res, 0, 0));
}
PQclear(res);
return retval;
}