From a82d37e48a9e358124c89769b581ef4abca272b7 Mon Sep 17 00:00:00 2001 From: Ian Barwick Date: Fri, 16 Jan 2015 10:27:44 +0900 Subject: [PATCH] Improve node metadata and upstream connecting mechanism To handle cascaded replication we're going to have to keep track of each node's upstream node. Also enumerate the node type ("primary", "standby" or "witness") and mark if active. --- dbutils.c | 91 +++++++++++++++++++++++++++++++++++++++---------------- dbutils.h | 4 ++- repmgr.c | 68 +++++++++++++++++++++++++---------------- repmgrd.c | 12 +++++--- 4 files changed, 117 insertions(+), 58 deletions(-) diff --git a/dbutils.c b/dbutils.c index 0bb16796..e97a30cb 100644 --- a/dbutils.c +++ b/dbutils.c @@ -145,11 +145,13 @@ is_witness(PGconn *conn, char *cluster, int node_id) int result = 0; char sqlquery[QUERY_STR_LEN]; + // ZZZ witness sqlquery_snprintf(sqlquery, - "SELECT witness " + "SELECT TRUE " " FROM %s.repl_nodes " " WHERE cluster = '%s' " - " AND id = %d ", + " AND id = %d " + " AND type = 'witness' ", get_repmgr_schema_quoted(conn), cluster, node_id); @@ -224,6 +226,50 @@ is_pgup(PGconn *conn, int timeout) return true; } +/* + * Return the id of the active primary node, or -1 if no + * record available. + * + * This reports the value stored in the database only and + * does not verify whether the node is actually available + */ +int +get_primary_node_id(PGconn *conn, char *cluster) +{ + char sqlquery[QUERY_STR_LEN]; + PGresult *res; + int retval; + + sqlquery_snprintf(sqlquery, + "SELECT id " + " FROM %s.repl_nodes " + " WHERE cluster = '%s' " + " AND type = 'primary' " + " AND active IS TRUE ", + get_repmgr_schema_quoted(conn), + cluster); + + res = PQexec(conn, sqlquery); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + log_err(_("get_primary_node_id(): query failed\n%s\n"), + PQerrorMessage(conn)); + retval = -1; + } + else if (PQntuples(res) == 0) + { + log_warning(_("get_primary_node_id(): no active primary found\n")); + retval = -1; + } + else + { + retval = atoi(PQgetvalue(res, 0, 0)); + } + PQclear(res); + + return retval; +} + /* * Return the server version number for the connection provided @@ -393,8 +439,8 @@ get_pg_setting(PGconn *conn, const char *setting, char *output) PGconn * -get_upstream_connection(PGconn *standby_conn, char *cluster, - int *upstream_node_id, char *upstream_conninfo_out) +get_upstream_connection(PGconn *standby_conn, char *cluster, int node_id, + int *upstream_node_id_ptr, char *upstream_conninfo_out) { PGconn *upstream_conn = NULL; PGresult *res; @@ -409,23 +455,17 @@ get_upstream_connection(PGconn *standby_conn, char *cluster, if (upstream_conninfo_out != NULL) upstream_conninfo = upstream_conninfo_out; - /* hacky query */ sqlquery_snprintf(sqlquery, - "WITH i AS ( " - " WITH p AS ( " - " SELECT repmgr_get_primary_conninfo() AS conninfo " - " )" - " SELECT p.conninfo, " - " unnest(regexp_matches(conninfo,'application_name=(\\S+)')) AS name " - " FROM p " - ") " - " SELECT rn.conninfo, i.name, rn.id " - " FROM i " - "INNER JOIN %s.repl_nodes rn " - " ON rn.name = i.name " - " WHERE cluster = '%s' ", + " SELECT un.conninfo, un.name, un.id " + " FROM %s.repl_nodes un " + "INNER JOIN %s.repl_nodes n " + " ON (un.id = n.upstream_node_id AND un.cluster = n.cluster)" + " WHERE n.cluster = '%s' " + " AND n.id = %i ", get_repmgr_schema_quoted(standby_conn), - cluster); + get_repmgr_schema_quoted(standby_conn), + cluster, + node_id); log_debug("%s", sqlquery); @@ -439,20 +479,19 @@ get_upstream_connection(PGconn *standby_conn, char *cluster, return NULL; } - // ZZZ check for empty result - // maybe modify function to return NULL - strncpy(upstream_conninfo, PQgetvalue(res, 0, 0), MAXCONNINFO); - *upstream_node_id = atoi(PQgetvalue(res, 0, 1)); + + if(upstream_node_id_ptr != NULL) + *upstream_node_id_ptr = atoi(PQgetvalue(res, 0, 1)); PQclear(res); - log_debug("conninfo is: '%s'", upstream_conninfo); + log_debug("conninfo is: '%s'\n", upstream_conninfo); upstream_conn = establish_db_connection(upstream_conninfo, false); if (PQstatus(upstream_conn) != CONNECTION_OK) { - log_err(_("Unable to connect to upstream server: %s\n"), + log_err(_("Unable to connect to upstream node: %s\n"), PQerrorMessage(upstream_conn)); return NULL; } @@ -494,7 +533,7 @@ get_master_connection(PGconn *standby_conn, char *cluster, "SELECT id, conninfo " " FROM %s.repl_nodes " " WHERE cluster = '%s' " - " AND NOT witness ", + " AND type != 'witness' ", get_repmgr_schema_quoted(standby_conn), cluster); diff --git a/dbutils.h b/dbutils.h index fb572ae8..6aad1081 100644 --- a/dbutils.h +++ b/dbutils.h @@ -31,6 +31,7 @@ bool check_cluster_schema(PGconn *conn); int is_standby(PGconn *conn); int is_witness(PGconn *conn,char *cluster, int node_id); bool is_pgup(PGconn *conn, int timeout); +int get_primary_node_id(PGconn *conn, char *cluster); int get_server_version(PGconn *conn, char *server_version); bool get_cluster_size(PGconn *conn, char *size); bool get_pg_setting(PGconn *conn, const char *setting, char *output); @@ -42,7 +43,8 @@ int guc_set_typed(PGconn *conn, const char *parameter, const char *op, PGconn *get_upstream_connection(PGconn *standby_conn, char *cluster, - int *upstream_node_id, + int node_id, + int *upstream_node_id_ptr, char *upstream_conninfo_out); PGconn *get_master_connection(PGconn *standby_conn, char *cluster, int *master_id, char *master_conninfo_out); diff --git a/repmgr.c b/repmgr.c index 2a2a145b..c1161d38 100644 --- a/repmgr.c +++ b/repmgr.c @@ -65,7 +65,7 @@ static bool write_recovery_file_line(FILE *recovery_file, char *recovery_file_pa static int check_server_version(PGconn *conn, char *server_type, bool exit_on_error, char *server_version_string); static bool check_upstream_config(PGconn *conn, bool exit_on_error); -static bool create_node_record(PGconn *conn, char *action, int node, int upstream_node, char *cluster_name, char *node_name, char *conninfo, int priority, bool witness); +static bool create_node_record(PGconn *conn, char *action, int node, char *type, int upstream_node, char *cluster_name, char *node_name, char *conninfo, int priority); static void do_master_register(void); static void do_standby_register(void); @@ -439,7 +439,7 @@ do_cluster_show(void) conn = establish_db_connection(options.conninfo, true); sqlquery_snprintf(sqlquery, - "SELECT conninfo, witness " + "SELECT conninfo, type " " FROM %s.repl_nodes ", get_repmgr_schema_quoted(conn)); res = PQexec(conn, sqlquery); @@ -457,10 +457,11 @@ do_cluster_show(void) printf("Role | Connection String \n"); for (i = 0; i < PQntuples(res); i++) { + // ZZZ witness conn = establish_db_connection(PQgetvalue(res, i, 0), false); if (PQstatus(conn) != CONNECTION_OK) strcpy(node_role, " FAILED"); - else if (strcmp(PQgetvalue(res, i, 1), "t") == 0) + else if (strcmp(PQgetvalue(res, i, 1), "witness") == 0) strcpy(node_role, " witness"); else if (is_standby(conn)) strcpy(node_role, " standby"); @@ -632,12 +633,12 @@ do_master_register(void) node_record_created = create_node_record(conn, "master register", options.node, + "primary", NO_UPSTREAM_NODE, options.cluster_name, options.node_name, options.conninfo, - options.priority, - false); + options.priority); PQfinish(conn); @@ -754,12 +755,12 @@ do_standby_register(void) node_record_created = create_node_record(master_conn, "standby register", options.node, + "standby", options.upstream_node, options.cluster_name, options.node_name, options.conninfo, - options.priority, - false); + options.priority); PQfinish(master_conn); PQfinish(conn); @@ -1557,12 +1558,12 @@ do_witness_create(void) node_record_created = create_node_record(masterconn, "witness create", options.node, + "witness", NO_UPSTREAM_NODE, options.cluster_name, options.node_name, options.conninfo, - options.priority, - true); + options.priority); if(node_record_created == false) { @@ -2044,12 +2045,13 @@ create_schema(PGconn *conn) sqlquery_snprintf(sqlquery, "CREATE TABLE %s.repl_nodes ( " " id INTEGER PRIMARY KEY, " + " type TEXT NOT NULL CHECK (type IN('primary','standby','witness')), " " upstream_node_id INTEGER NULL REFERENCES %s.repl_nodes (id), " " cluster TEXT NOT NULL, " " name TEXT NOT NULL, " " conninfo TEXT NOT NULL, " " priority INTEGER NOT NULL, " - " witness BOOLEAN NOT NULL DEFAULT FALSE) ", + " active BOOLEAN NOT NULL DEFAULT TRUE) ", get_repmgr_schema_quoted(conn), get_repmgr_schema_quoted(conn)); @@ -2212,7 +2214,7 @@ copy_configuration(PGconn *masterconn, PGconn *witnessconn) } sqlquery_snprintf(sqlquery, - "SELECT id, name, conninfo, priority, witness FROM %s.repl_nodes", + "SELECT id, type, upstream_node_id, name, conninfo, priority FROM %s.repl_nodes", get_repmgr_schema_quoted(masterconn)); res = PQexec(masterconn, sqlquery); if (PQresultStatus(res) != PGRES_TUPLES_OK) @@ -2232,16 +2234,18 @@ copy_configuration(PGconn *masterconn, PGconn *witnessconn) node_record_created = create_node_record(witnessconn, "copy_configuration", atoi(PQgetvalue(res, i, 0)), - NO_UPSTREAM_NODE, - options.cluster_name, PQgetvalue(res, i, 1), - PQgetvalue(res, i, 2), - atoi(PQgetvalue(res, i, 3)), - strcmp(witness, "t") == 0 ? true : false); - + strlen(PQgetvalue(res, i, 2)) + ? atoi(PQgetvalue(res, i, 2)) + : NO_UPSTREAM_NODE, + options.cluster_name, + PQgetvalue(res, i, 3), + PQgetvalue(res, i, 4), + atoi(PQgetvalue(res, i, 5))); if (node_record_created == false) { + // ZZZ fix error message? fprintf(stderr, "Unable to create node record for witness: %s\n", PQerrorMessage(witnessconn)); return false; @@ -2484,34 +2488,46 @@ do_check_upstream_config(void) static bool -create_node_record(PGconn *conn, char *action, int node, int upstream_node, char *cluster_name, char *node_name, char *conninfo, int priority, bool witness) +create_node_record(PGconn *conn, char *action, int node, char *type, int upstream_node, char *cluster_name, char *node_name, char *conninfo, int priority) { char sqlquery[QUERY_STR_LEN]; char upstream_node_id[QUERY_STR_LEN]; PGresult *res; - if(upstream_node != NO_UPSTREAM_NODE) + if(upstream_node == NO_UPSTREAM_NODE) { - sqlquery_snprintf(upstream_node_id, "%i", upstream_node); + /* + * No explicit upstream node id provided for standby - attempt to + * get primary node id + */ + if(strcmp(type, "standby") == 0) + { + int primary_node_id = get_primary_node_id(conn, cluster_name); + sqlquery_snprintf(upstream_node_id, "%i", primary_node_id); + } + else + { + sqlquery_snprintf(upstream_node_id, "%s", "NULL"); + } } else { - sqlquery_snprintf(upstream_node_id, "%s", "NULL"); + sqlquery_snprintf(upstream_node_id, "%i", upstream_node); } sqlquery_snprintf(sqlquery, "INSERT INTO %s.repl_nodes " - " (id, upstream_node_id, cluster, " - " name, conninfo, priority, witness) " - "VALUES (%d, %s, '%s', '%s', '%s', %d, %s) ", + " (id, type, upstream_node_id, cluster, " + " name, conninfo, priority) " + "VALUES (%i, '%s', %s, '%s', '%s', '%s', %i) ", get_repmgr_schema_quoted(conn), node, + type, upstream_node_id, cluster_name, node_name, conninfo, - priority, - witness == true ? "TRUE" : "FALSE"); + priority); if(action != NULL) { diff --git a/repmgrd.c b/repmgrd.c index 4f42d86f..b01f1dcd 100644 --- a/repmgrd.c +++ b/repmgrd.c @@ -432,7 +432,7 @@ main(int argc, char **argv) else if (my_local_mode == STANDBY_MODE) { standby_monitor(); - log_debug(_("returned from standby_monitor()\n")); + log_debug(_("returned from standby_monitor()\n")); // ZZZ } sleep(local_options.monitor_interval_secs); @@ -635,12 +635,13 @@ standby_monitor(void) upstream_conn = get_upstream_connection(my_local_conn, local_options.cluster_name, + local_options.node, &upstream_node_id, NULL); /* - * Check if the master is still available, if after 5 minutes of retries - * we cannot reconnect, try to get a new master. + * Check if the upstream node is still available, if after 5 minutes of retries + * we cannot reconnect, try to get a new upstream node. */ check_connection(upstream_conn, "master"); /* this take up to * local_options.reconnect_atte @@ -831,7 +832,7 @@ standby_monitor(void) PQerrorMessage(primary_conn)); } - +// ZZZ witness static void do_failover(void) { @@ -865,7 +866,7 @@ do_failover(void) /* get a list of standby nodes, including myself */ sprintf(sqlquery, - "SELECT id, conninfo, witness " + "SELECT id, conninfo, type " " FROM %s.repl_nodes " " WHERE cluster = '%s' " " ORDER BY priority, id " @@ -896,6 +897,7 @@ do_failover(void) { nodes[i].node_id = atoi(PQgetvalue(res, i, 0)); strncpy(nodes[i].conninfo_str, PQgetvalue(res, i, 1), MAXLEN); + // ZZZ witness nodes[i].is_witness = (strcmp(PQgetvalue(res, i, 2), "t") == 0) ? true : false; /*