create and update node records

This commit is contained in:
Ian Barwick
2017-04-24 10:24:44 +09:00
parent 18c2cfda0b
commit ca6b493763
4 changed files with 385 additions and 4 deletions

319
dbutils.c
View File

@@ -18,7 +18,11 @@ static PGconn *_establish_db_connection(const char *conninfo,
const bool exit_on_error,
const bool log_notice,
const bool verbose_only);
static bool _set_config(PGconn *conn, const char *config_param, const char *sqlquery);
static int _get_node_record(PGconn *conn, char *sqlquery, t_node_info *node_info);
static void _populate_node_record(PGresult *res, t_node_info *node_info, int row);
/* ==================== */
/* Connection functions */
/* ==================== */
@@ -703,3 +707,318 @@ get_master_node_id(PGconn *conn)
return retval;
}
/* ================ */
/* result functions */
/* ================ */
bool atobool(const char *value)
{
return (strcmp(value, "t") == 0)
? true
: false;
}
/* ===================== */
/* Node record functions */
/* ===================== */
static int
_get_node_record(PGconn *conn, char *sqlquery, t_node_info *node_info)
{
int ntuples;
PGresult *res;
res = PQexec(conn, sqlquery);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
PQclear(res);
return NODE_RECORD_QUERY_ERROR;
}
ntuples = PQntuples(res);
if (ntuples == 0)
{
PQclear(res);
return NODE_RECORD_NOT_FOUND;
}
_populate_node_record(res, node_info, 0);
PQclear(res);
return ntuples;
}
static void
_populate_node_record(PGresult *res, t_node_info *node_info, int row)
{
node_info->node_id = atoi(PQgetvalue(res, row, 0));
node_info->type = parse_node_type(PQgetvalue(res, row, 1));
if (PQgetisnull(res, row, 2))
{
node_info->upstream_node_id = NO_UPSTREAM_NODE;
}
else
{
node_info->upstream_node_id = atoi(PQgetvalue(res, row, 2));
}
strncpy(node_info->node_name, PQgetvalue(res, row, 3), MAXLEN);
strncpy(node_info->conninfo, PQgetvalue(res, row, 4), MAXLEN);
strncpy(node_info->slot_name, PQgetvalue(res, row, 5), MAXLEN);
node_info->priority = atoi(PQgetvalue(res, row, 6));
node_info->active = atobool(PQgetvalue(res, row, 7));
/* Set remaining struct fields with default values */
node_info->is_ready = false;
node_info->is_visible = false;
node_info->xlog_location = InvalidXLogRecPtr;
}
t_server_type
parse_node_type(const char *type)
{
if (strcmp(type, "master") == 0)
{
return MASTER;
}
else if (strcmp(type, "standby") == 0)
{
return STANDBY;
}
else if (strcmp(type, "witness") == 0)
{
return WITNESS;
}
else if (strcmp(type, "bdr") == 0)
{
return BDR;
}
return UNKNOWN;
}
const char *
get_node_type_string(t_server_type type)
{
switch(type)
{
case MASTER:
return "master";
case STANDBY:
return "standby";
case WITNESS:
return "witness";
case BDR:
return "bdr";
/* this should never happen */
case UNKNOWN:
default:
log_error(_("unknown node type %i"), type);
return "unknown";
}
}
int
get_node_record(PGconn *conn, int node_id, t_node_info *node_info)
{
PQExpBufferData query;
int result;
initPQExpBuffer(&query);
appendPQExpBuffer(&query,
"SELECT node_id, type, upstream_node_id, node_name, conninfo, slot_name, priority, active"
" FROM repmgr.nodes "
" WHERE node_id = %i",
node_id);
log_verbose(LOG_DEBUG, "get_node_record():\n%s", query.data);
result = _get_node_record(conn, query.data, node_info);
termPQExpBuffer(&query);
if (result == NODE_RECORD_NOT_FOUND)
{
log_verbose(LOG_DEBUG, "get_node_record(): no record found for node %i", node_id);
}
return result;
}
bool
create_node_record(PGconn *conn, char *action, t_node_info *node_info)
{
PQExpBufferData query;
char upstream_node_id[MAXLEN];
char slot_name_buf[MAXLEN];
PGresult *res;
if (node_info->upstream_node_id == NO_UPSTREAM_NODE)
{
/*
* No explicit upstream node id provided for standby - attempt to
* get primary node id
*/
if (node_info->type == STANDBY)
{
int primary_node_id = get_master_node_id(conn);
maxlen_snprintf(upstream_node_id, "%i", primary_node_id);
}
else
{
maxlen_snprintf(upstream_node_id, "%s", "NULL");
}
}
else
{
maxlen_snprintf(upstream_node_id, "%i", node_info->upstream_node_id);
}
if (node_info->slot_name[0])
{
maxlen_snprintf(slot_name_buf, "'%s'", node_info->slot_name);
}
else
{
maxlen_snprintf(slot_name_buf, "%s", "NULL");
}
/* XXX convert to placeholder query */
initPQExpBuffer(&query);
appendPQExpBuffer(&query,
"INSERT INTO repmgr.nodes "
" (node_id, type, upstream_node_id, "
" node_name, conninfo, slot_name, "
" priority, active) "
"VALUES (%i, '%s', %s, '%s', '%s', %s, %i, %s) ",
node_info->node_id,
get_node_type_string(node_info->type),
upstream_node_id,
node_info->node_name,
node_info->conninfo,
slot_name_buf,
node_info->priority,
node_info->active == true ? "TRUE" : "FALSE");
log_verbose(LOG_DEBUG, "create_node_record(): %s", query.data);
if (action != NULL)
{
log_verbose(LOG_DEBUG, "create_node_record(): action is \"%s\"", action);
}
res = PQexec(conn, query.data);
termPQExpBuffer(&query);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
log_error(_("unable to create node record:\n %s"),
PQerrorMessage(conn));
PQclear(res);
return false;
}
PQclear(res);
return true;
}
bool
update_node_record(PGconn *conn, char *action, t_node_info *node_info)
{
PQExpBufferData query;
char upstream_node_id[MAXLEN];
char slot_name_buf[MAXLEN];
PGresult *res;
/* XXX this segment copied from create_node_record() */
if (node_info->upstream_node_id == NO_UPSTREAM_NODE)
{
/*
* No explicit upstream node id provided for standby - attempt to
* get primary node id
*/
if (node_info->type == STANDBY)
{
int primary_node_id = get_master_node_id(conn);
maxlen_snprintf(upstream_node_id, "%i", primary_node_id);
}
else
{
maxlen_snprintf(upstream_node_id, "%s", "NULL");
}
}
else
{
maxlen_snprintf(upstream_node_id, "%i", node_info->upstream_node_id);
}
if (node_info->slot_name[0])
{
maxlen_snprintf(slot_name_buf, "'%s'", node_info->slot_name);
}
else
{
maxlen_snprintf(slot_name_buf, "%s", "NULL");
}
initPQExpBuffer(&query);
/* XXX convert to placeholder query */
appendPQExpBuffer(&query,
"UPDATE repmgr.nodes SET "
" type = '%s', "
" upstream_node_id = %s, "
" node_name = '%s', "
" conninfo = '%s', "
" slot_name = %s, "
" priority = %i, "
" active = %s "
" WHERE node_id = %i ",
get_node_type_string(node_info->type),
upstream_node_id,
node_info->node_name,
node_info->conninfo,
slot_name_buf,
node_info->priority,
node_info->active == true ? "TRUE" : "FALSE",
node_info->node_id);
log_verbose(LOG_DEBUG, "update_node_record(): %s", query.data);
if (action != NULL)
{
log_verbose(LOG_DEBUG, "update_node_record(): action is \"%s\"", action);
}
res = PQexec(conn, query.data);
termPQExpBuffer(&query);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
log_error(_("unable to update node record:\n %s"),
PQerrorMessage(conn));
PQclear(res);
return false;
}
PQclear(res);
return true;
}