Improve node metadata and upstream connecting mechanism

To handle cascaded replication we're going to have to keep track
of each node's upstream node. Also enumerate the node type
("primary", "standby" or "witness") and mark if active.
This commit is contained in:
Ian Barwick
2015-01-16 10:27:44 +09:00
parent 4b6c097a3e
commit a82d37e48a
4 changed files with 117 additions and 58 deletions

View File

@@ -145,11 +145,13 @@ is_witness(PGconn *conn, char *cluster, int node_id)
int result = 0;
char sqlquery[QUERY_STR_LEN];
// ZZZ witness
sqlquery_snprintf(sqlquery,
"SELECT witness "
"SELECT TRUE "
" FROM %s.repl_nodes "
" WHERE cluster = '%s' "
" AND id = %d ",
" AND id = %d "
" AND type = 'witness' ",
get_repmgr_schema_quoted(conn),
cluster,
node_id);
@@ -224,6 +226,50 @@ is_pgup(PGconn *conn, int timeout)
return true;
}
/*
* Return the id of the active primary node, or -1 if no
* record available.
*
* This reports the value stored in the database only and
* does not verify whether the node is actually available
*/
int
get_primary_node_id(PGconn *conn, char *cluster)
{
char sqlquery[QUERY_STR_LEN];
PGresult *res;
int retval;
sqlquery_snprintf(sqlquery,
"SELECT id "
" FROM %s.repl_nodes "
" WHERE cluster = '%s' "
" AND type = 'primary' "
" AND active IS TRUE ",
get_repmgr_schema_quoted(conn),
cluster);
res = PQexec(conn, sqlquery);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
log_err(_("get_primary_node_id(): query failed\n%s\n"),
PQerrorMessage(conn));
retval = -1;
}
else if (PQntuples(res) == 0)
{
log_warning(_("get_primary_node_id(): no active primary found\n"));
retval = -1;
}
else
{
retval = atoi(PQgetvalue(res, 0, 0));
}
PQclear(res);
return retval;
}
/*
* Return the server version number for the connection provided
@@ -393,8 +439,8 @@ get_pg_setting(PGconn *conn, const char *setting, char *output)
PGconn *
get_upstream_connection(PGconn *standby_conn, char *cluster,
int *upstream_node_id, char *upstream_conninfo_out)
get_upstream_connection(PGconn *standby_conn, char *cluster, int node_id,
int *upstream_node_id_ptr, char *upstream_conninfo_out)
{
PGconn *upstream_conn = NULL;
PGresult *res;
@@ -409,23 +455,17 @@ get_upstream_connection(PGconn *standby_conn, char *cluster,
if (upstream_conninfo_out != NULL)
upstream_conninfo = upstream_conninfo_out;
/* hacky query */
sqlquery_snprintf(sqlquery,
"WITH i AS ( "
" WITH p AS ( "
" SELECT repmgr_get_primary_conninfo() AS conninfo "
" )"
" SELECT p.conninfo, "
" unnest(regexp_matches(conninfo,'application_name=(\\S+)')) AS name "
" FROM p "
") "
" SELECT rn.conninfo, i.name, rn.id "
" FROM i "
"INNER JOIN %s.repl_nodes rn "
" ON rn.name = i.name "
" WHERE cluster = '%s' ",
" SELECT un.conninfo, un.name, un.id "
" FROM %s.repl_nodes un "
"INNER JOIN %s.repl_nodes n "
" ON (un.id = n.upstream_node_id AND un.cluster = n.cluster)"
" WHERE n.cluster = '%s' "
" AND n.id = %i ",
get_repmgr_schema_quoted(standby_conn),
cluster);
get_repmgr_schema_quoted(standby_conn),
cluster,
node_id);
log_debug("%s", sqlquery);
@@ -439,20 +479,19 @@ get_upstream_connection(PGconn *standby_conn, char *cluster,
return NULL;
}
// ZZZ check for empty result
// maybe modify function to return NULL
strncpy(upstream_conninfo, PQgetvalue(res, 0, 0), MAXCONNINFO);
*upstream_node_id = atoi(PQgetvalue(res, 0, 1));
if(upstream_node_id_ptr != NULL)
*upstream_node_id_ptr = atoi(PQgetvalue(res, 0, 1));
PQclear(res);
log_debug("conninfo is: '%s'", upstream_conninfo);
log_debug("conninfo is: '%s'\n", upstream_conninfo);
upstream_conn = establish_db_connection(upstream_conninfo, false);
if (PQstatus(upstream_conn) != CONNECTION_OK)
{
log_err(_("Unable to connect to upstream server: %s\n"),
log_err(_("Unable to connect to upstream node: %s\n"),
PQerrorMessage(upstream_conn));
return NULL;
}
@@ -494,7 +533,7 @@ get_master_connection(PGconn *standby_conn, char *cluster,
"SELECT id, conninfo "
" FROM %s.repl_nodes "
" WHERE cluster = '%s' "
" AND NOT witness ",
" AND type != 'witness' ",
get_repmgr_schema_quoted(standby_conn),
cluster);

View File

@@ -31,6 +31,7 @@ bool check_cluster_schema(PGconn *conn);
int is_standby(PGconn *conn);
int is_witness(PGconn *conn,char *cluster, int node_id);
bool is_pgup(PGconn *conn, int timeout);
int get_primary_node_id(PGconn *conn, char *cluster);
int get_server_version(PGconn *conn, char *server_version);
bool get_cluster_size(PGconn *conn, char *size);
bool get_pg_setting(PGconn *conn, const char *setting, char *output);
@@ -42,7 +43,8 @@ int guc_set_typed(PGconn *conn, const char *parameter, const char *op,
PGconn *get_upstream_connection(PGconn *standby_conn, char *cluster,
int *upstream_node_id,
int node_id,
int *upstream_node_id_ptr,
char *upstream_conninfo_out);
PGconn *get_master_connection(PGconn *standby_conn, char *cluster,
int *master_id, char *master_conninfo_out);

View File

@@ -65,7 +65,7 @@ static bool write_recovery_file_line(FILE *recovery_file, char *recovery_file_pa
static int check_server_version(PGconn *conn, char *server_type, bool exit_on_error, char *server_version_string);
static bool check_upstream_config(PGconn *conn, bool exit_on_error);
static bool create_node_record(PGconn *conn, char *action, int node, int upstream_node, char *cluster_name, char *node_name, char *conninfo, int priority, bool witness);
static bool create_node_record(PGconn *conn, char *action, int node, char *type, int upstream_node, char *cluster_name, char *node_name, char *conninfo, int priority);
static void do_master_register(void);
static void do_standby_register(void);
@@ -439,7 +439,7 @@ do_cluster_show(void)
conn = establish_db_connection(options.conninfo, true);
sqlquery_snprintf(sqlquery,
"SELECT conninfo, witness "
"SELECT conninfo, type "
" FROM %s.repl_nodes ",
get_repmgr_schema_quoted(conn));
res = PQexec(conn, sqlquery);
@@ -457,10 +457,11 @@ do_cluster_show(void)
printf("Role | Connection String \n");
for (i = 0; i < PQntuples(res); i++)
{
// ZZZ witness
conn = establish_db_connection(PQgetvalue(res, i, 0), false);
if (PQstatus(conn) != CONNECTION_OK)
strcpy(node_role, " FAILED");
else if (strcmp(PQgetvalue(res, i, 1), "t") == 0)
else if (strcmp(PQgetvalue(res, i, 1), "witness") == 0)
strcpy(node_role, " witness");
else if (is_standby(conn))
strcpy(node_role, " standby");
@@ -632,12 +633,12 @@ do_master_register(void)
node_record_created = create_node_record(conn,
"master register",
options.node,
"primary",
NO_UPSTREAM_NODE,
options.cluster_name,
options.node_name,
options.conninfo,
options.priority,
false);
options.priority);
PQfinish(conn);
@@ -754,12 +755,12 @@ do_standby_register(void)
node_record_created = create_node_record(master_conn,
"standby register",
options.node,
"standby",
options.upstream_node,
options.cluster_name,
options.node_name,
options.conninfo,
options.priority,
false);
options.priority);
PQfinish(master_conn);
PQfinish(conn);
@@ -1557,12 +1558,12 @@ do_witness_create(void)
node_record_created = create_node_record(masterconn,
"witness create",
options.node,
"witness",
NO_UPSTREAM_NODE,
options.cluster_name,
options.node_name,
options.conninfo,
options.priority,
true);
options.priority);
if(node_record_created == false)
{
@@ -2044,12 +2045,13 @@ create_schema(PGconn *conn)
sqlquery_snprintf(sqlquery,
"CREATE TABLE %s.repl_nodes ( "
" id INTEGER PRIMARY KEY, "
" type TEXT NOT NULL CHECK (type IN('primary','standby','witness')), "
" upstream_node_id INTEGER NULL REFERENCES %s.repl_nodes (id), "
" cluster TEXT NOT NULL, "
" name TEXT NOT NULL, "
" conninfo TEXT NOT NULL, "
" priority INTEGER NOT NULL, "
" witness BOOLEAN NOT NULL DEFAULT FALSE) ",
" active BOOLEAN NOT NULL DEFAULT TRUE) ",
get_repmgr_schema_quoted(conn),
get_repmgr_schema_quoted(conn));
@@ -2212,7 +2214,7 @@ copy_configuration(PGconn *masterconn, PGconn *witnessconn)
}
sqlquery_snprintf(sqlquery,
"SELECT id, name, conninfo, priority, witness FROM %s.repl_nodes",
"SELECT id, type, upstream_node_id, name, conninfo, priority FROM %s.repl_nodes",
get_repmgr_schema_quoted(masterconn));
res = PQexec(masterconn, sqlquery);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
@@ -2232,16 +2234,18 @@ copy_configuration(PGconn *masterconn, PGconn *witnessconn)
node_record_created = create_node_record(witnessconn,
"copy_configuration",
atoi(PQgetvalue(res, i, 0)),
NO_UPSTREAM_NODE,
options.cluster_name,
PQgetvalue(res, i, 1),
PQgetvalue(res, i, 2),
atoi(PQgetvalue(res, i, 3)),
strcmp(witness, "t") == 0 ? true : false);
strlen(PQgetvalue(res, i, 2))
? atoi(PQgetvalue(res, i, 2))
: NO_UPSTREAM_NODE,
options.cluster_name,
PQgetvalue(res, i, 3),
PQgetvalue(res, i, 4),
atoi(PQgetvalue(res, i, 5)));
if (node_record_created == false)
{
// ZZZ fix error message?
fprintf(stderr, "Unable to create node record for witness: %s\n",
PQerrorMessage(witnessconn));
return false;
@@ -2484,34 +2488,46 @@ do_check_upstream_config(void)
static bool
create_node_record(PGconn *conn, char *action, int node, int upstream_node, char *cluster_name, char *node_name, char *conninfo, int priority, bool witness)
create_node_record(PGconn *conn, char *action, int node, char *type, int upstream_node, char *cluster_name, char *node_name, char *conninfo, int priority)
{
char sqlquery[QUERY_STR_LEN];
char upstream_node_id[QUERY_STR_LEN];
PGresult *res;
if(upstream_node != NO_UPSTREAM_NODE)
if(upstream_node == NO_UPSTREAM_NODE)
{
sqlquery_snprintf(upstream_node_id, "%i", upstream_node);
/*
* No explicit upstream node id provided for standby - attempt to
* get primary node id
*/
if(strcmp(type, "standby") == 0)
{
int primary_node_id = get_primary_node_id(conn, cluster_name);
sqlquery_snprintf(upstream_node_id, "%i", primary_node_id);
}
else
{
sqlquery_snprintf(upstream_node_id, "%s", "NULL");
}
}
else
{
sqlquery_snprintf(upstream_node_id, "%s", "NULL");
sqlquery_snprintf(upstream_node_id, "%i", upstream_node);
}
sqlquery_snprintf(sqlquery,
"INSERT INTO %s.repl_nodes "
" (id, upstream_node_id, cluster, "
" name, conninfo, priority, witness) "
"VALUES (%d, %s, '%s', '%s', '%s', %d, %s) ",
" (id, type, upstream_node_id, cluster, "
" name, conninfo, priority) "
"VALUES (%i, '%s', %s, '%s', '%s', '%s', %i) ",
get_repmgr_schema_quoted(conn),
node,
type,
upstream_node_id,
cluster_name,
node_name,
conninfo,
priority,
witness == true ? "TRUE" : "FALSE");
priority);
if(action != NULL)
{

View File

@@ -432,7 +432,7 @@ main(int argc, char **argv)
else if (my_local_mode == STANDBY_MODE)
{
standby_monitor();
log_debug(_("returned from standby_monitor()\n"));
log_debug(_("returned from standby_monitor()\n")); // ZZZ
}
sleep(local_options.monitor_interval_secs);
@@ -635,12 +635,13 @@ standby_monitor(void)
upstream_conn = get_upstream_connection(my_local_conn,
local_options.cluster_name,
local_options.node,
&upstream_node_id, NULL);
/*
* Check if the master is still available, if after 5 minutes of retries
* we cannot reconnect, try to get a new master.
* Check if the upstream node is still available, if after 5 minutes of retries
* we cannot reconnect, try to get a new upstream node.
*/
check_connection(upstream_conn, "master"); /* this take up to
* local_options.reconnect_atte
@@ -831,7 +832,7 @@ standby_monitor(void)
PQerrorMessage(primary_conn));
}
// ZZZ witness
static void
do_failover(void)
{
@@ -865,7 +866,7 @@ do_failover(void)
/* get a list of standby nodes, including myself */
sprintf(sqlquery,
"SELECT id, conninfo, witness "
"SELECT id, conninfo, type "
" FROM %s.repl_nodes "
" WHERE cluster = '%s' "
" ORDER BY priority, id "
@@ -896,6 +897,7 @@ do_failover(void)
{
nodes[i].node_id = atoi(PQgetvalue(res, i, 0));
strncpy(nodes[i].conninfo_str, PQgetvalue(res, i, 1), MAXLEN);
// ZZZ witness
nodes[i].is_witness = (strcmp(PQgetvalue(res, i, 2), "t") == 0) ? true : false;
/*