interim commit

This commit is contained in:
Ian Barwick
2017-06-28 16:38:41 +09:00
parent 35b6178e07
commit ded8d95e5a
7 changed files with 522 additions and 62 deletions

192
dbutils.c
View File

@@ -24,9 +24,28 @@ static PGconn *_get_primary_connection(PGconn *standby_conn, int *primary_id, c
static bool _set_config(PGconn *conn, const char *config_param, const char *sqlquery);
static RecordStatus _get_node_record(PGconn *conn, char *sqlquery, t_node_info *node_info);
static void _populate_node_record(PGresult *res, t_node_info *node_info, int row);
static void _populate_node_records(PGresult *res, NodeInfoList *node_list);
static bool _create_update_node_record(PGconn *conn, char *action, t_node_info *node_info);
static bool _create_event_record(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details, t_event_info *event_info);
/* ================= */
/* utility functions */
/* ================= */
XLogRecPtr
parse_lsn(const char *str)
{
XLogRecPtr ptr = InvalidXLogRecPtr;
uint32 high, low;
if (sscanf(str, "%x/%x", &high, &low) == 2)
ptr = (((XLogRecPtr)high) << 32) + (XLogRecPtr)low;
return ptr;
}
/* ==================== */
/* Connection functions */
@@ -835,6 +854,8 @@ int
get_server_version(PGconn *conn, char *server_version)
{
PGresult *res;
int server_version_num;
res = PQexec(conn,
"SELECT pg_catalog.current_setting('server_version_num'), "
" pg_catalog.current_setting('server_version')");
@@ -848,9 +869,12 @@ get_server_version(PGconn *conn, char *server_version)
}
if (server_version != NULL)
strcpy(server_version, PQgetvalue(res, 0, 0));
strcpy(server_version, PQgetvalue(res, 0, 1));
return atoi(PQgetvalue(res, 0, 0));
server_version_num = atoi(PQgetvalue(res, 0, 0));
PQclear(res);
return server_version_num;
}
@@ -1332,44 +1356,28 @@ get_local_node_record(PGconn *conn, int node_id, t_node_info *node_info)
}
void
get_downstream_node_records(PGconn *conn, int node_id, NodeInfoList *node_list)
static
void _populate_node_records(PGresult *res, NodeInfoList *node_list)
{
PQExpBufferData query;
PGresult *result;
int i;
initPQExpBuffer(&query);
appendPQExpBuffer(&query,
" SELECT node_id, type, upstream_node_id, node_name, conninfo, repluser, slot_name, priority, active"
" FROM repmgr.nodes "
" WHERE upstream_node_id = %i "
"ORDER BY node_id ",
node_id);
log_verbose(LOG_DEBUG, "get_node_records_by_priority():\n%s", query.data);
result = PQexec(conn, query.data);
termPQExpBuffer(&query);
node_list->head = NULL;
node_list->tail = NULL;
node_list->node_count = 0;
if (PQresultStatus(result) != PGRES_TUPLES_OK)
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
return;
}
for (i = 0; i < PQntuples(result); i++)
for (i = 0; i < PQntuples(res); i++)
{
NodeInfoListCell *cell;
cell = (NodeInfoListCell *) pg_malloc0(sizeof(NodeInfoListCell));
cell->node_info = pg_malloc0(sizeof(t_node_info));
_populate_node_record(result, cell->node_info, i);
_populate_node_record(res, cell->node_info, i);
if (node_list->tail)
node_list->tail->next = cell;
@@ -1384,6 +1392,62 @@ get_downstream_node_records(PGconn *conn, int node_id, NodeInfoList *node_list)
}
void
get_downstream_node_records(PGconn *conn, int node_id, NodeInfoList *node_list)
{
PQExpBufferData query;
PGresult *res;
initPQExpBuffer(&query);
appendPQExpBuffer(&query,
" SELECT node_id, type, upstream_node_id, node_name, conninfo, repluser, slot_name, priority, active"
" FROM repmgr.nodes "
" WHERE upstream_node_id = %i "
"ORDER BY node_id ",
node_id);
log_verbose(LOG_DEBUG, "get_downstream_node_records():\n%s", query.data);
res = PQexec(conn, query.data);
termPQExpBuffer(&query);
_populate_node_records(res, node_list);
return;
}
void
get_active_sibling_node_records(PGconn *conn, int node_id, int upstream_node_id, NodeInfoList *node_list)
{
PQExpBufferData query;
PGresult *res;
initPQExpBuffer(&query);
appendPQExpBuffer(&query,
" SELECT node_id, type, upstream_node_id, node_name, conninfo, repluser, slot_name, priority, active"
" FROM repmgr.nodes "
" WHERE upstream_node_id = %i "
" AND node_id != %i "
" AND active IS TRUE "
"ORDER BY node_id ",
upstream_node_id,
node_id);
log_verbose(LOG_DEBUG, "get_active_sibling_node_records():\n%s", query.data);
res = PQexec(conn, query.data);
termPQExpBuffer(&query);
_populate_node_records(res, node_list);
return;
}
bool
create_node_record(PGconn *conn, char *repmgr_action, t_node_info *node_info)
{
@@ -2073,7 +2137,9 @@ get_slot_record(PGconn *conn, char *slot_name, t_replication_slot *record)
}
/* ============================ */
/* asynchronous query functions */
/* ============================ */
bool
cancel_query(PGconn *conn, int timeout)
@@ -2177,7 +2243,10 @@ wait_connection_availability(PGconn *conn, long long timeout)
return -1;
}
/* =========================== */
/* node availability functions */
/* =========================== */
bool
is_server_available(const char *conninfo)
@@ -2189,3 +2258,80 @@ is_server_available(const char *conninfo)
return false;
}
/*
* node voting functions
*
* These are intended to run under repmgrd and rely on shared memory
*/
NodeVotingStatus
get_voting_status(PGconn *conn)
{
PGresult *res;
NodeVotingStatus voting_status;
res = PQexec(conn, "SELECT repmgr.get_voting_status()");
if (!res || PQresultStatus(res) != PGRES_TUPLES_OK)
{
log_error(_("unable to query repmgr.get_voting_status():\n %s"),
PQerrorMessage(conn));
PQclear(res);
return VS_UNKNOWN;
}
voting_status = atoi(PQgetvalue(res, 0, 0));
PQclear(res);
return voting_status;
}
int request_vote(PGconn *conn, int this_node_id, int this_node_priority, XLogRecPtr last_wal_receive_lsn)
{
PQExpBufferData query;
PGresult *res;
int vote;
initPQExpBuffer(&query);
appendPQExpBuffer(&query,
"SELECT repmgr.request_vote(%i, %i, '%X/%X'::pg_lsn)",
this_node_id,
this_node_priority,
(uint32) (last_wal_receive_lsn >> 32),
(uint32) last_wal_receive_lsn);
res = PQexec(conn, query.data);
termPQExpBuffer(&query);
vote = (strcmp(PQgetvalue(res, 0, 0), "t") == 0) ? 1 : 0;
PQclear(res);
return vote;
}
/* ============================ */
/* replication status functions */
/* ============================ */
XLogRecPtr
get_last_wal_receive_location(PGconn *conn)
{
PGresult *res;
XLogRecPtr ptr = InvalidXLogRecPtr;
res = PQexec(conn, "SELECT pg_catalog.pg_last_wal_receive_lsn()");
if (PQresultStatus(res) == PGRES_TUPLES_OK)
{
ptr = parse_lsn(PQgetvalue(res, 0, 0));
}
PQclear(res);
return ptr;
}