diff --git a/dbutils.c b/dbutils.c index 0bb16796..e97a30cb 100644 --- a/dbutils.c +++ b/dbutils.c @@ -145,11 +145,13 @@ is_witness(PGconn *conn, char *cluster, int node_id) int result = 0; char sqlquery[QUERY_STR_LEN]; + // ZZZ witness sqlquery_snprintf(sqlquery, - "SELECT witness " + "SELECT TRUE " " FROM %s.repl_nodes " " WHERE cluster = '%s' " - " AND id = %d ", + " AND id = %d " + " AND type = 'witness' ", get_repmgr_schema_quoted(conn), cluster, node_id); @@ -224,6 +226,50 @@ is_pgup(PGconn *conn, int timeout) return true; } +/* + * Return the id of the active primary node, or -1 if no + * record available. + * + * This reports the value stored in the database only and + * does not verify whether the node is actually available + */ +int +get_primary_node_id(PGconn *conn, char *cluster) +{ + char sqlquery[QUERY_STR_LEN]; + PGresult *res; + int retval; + + sqlquery_snprintf(sqlquery, + "SELECT id " + " FROM %s.repl_nodes " + " WHERE cluster = '%s' " + " AND type = 'primary' " + " AND active IS TRUE ", + get_repmgr_schema_quoted(conn), + cluster); + + res = PQexec(conn, sqlquery); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + log_err(_("get_primary_node_id(): query failed\n%s\n"), + PQerrorMessage(conn)); + retval = -1; + } + else if (PQntuples(res) == 0) + { + log_warning(_("get_primary_node_id(): no active primary found\n")); + retval = -1; + } + else + { + retval = atoi(PQgetvalue(res, 0, 0)); + } + PQclear(res); + + return retval; +} + /* * Return the server version number for the connection provided @@ -393,8 +439,8 @@ get_pg_setting(PGconn *conn, const char *setting, char *output) PGconn * -get_upstream_connection(PGconn *standby_conn, char *cluster, - int *upstream_node_id, char *upstream_conninfo_out) +get_upstream_connection(PGconn *standby_conn, char *cluster, int node_id, + int *upstream_node_id_ptr, char *upstream_conninfo_out) { PGconn *upstream_conn = NULL; PGresult *res; @@ -409,23 +455,17 @@ get_upstream_connection(PGconn *standby_conn, char *cluster, if (upstream_conninfo_out != NULL) upstream_conninfo = upstream_conninfo_out; - /* hacky query */ sqlquery_snprintf(sqlquery, - "WITH i AS ( " - " WITH p AS ( " - " SELECT repmgr_get_primary_conninfo() AS conninfo " - " )" - " SELECT p.conninfo, " - " unnest(regexp_matches(conninfo,'application_name=(\\S+)')) AS name " - " FROM p " - ") " - " SELECT rn.conninfo, i.name, rn.id " - " FROM i " - "INNER JOIN %s.repl_nodes rn " - " ON rn.name = i.name " - " WHERE cluster = '%s' ", + " SELECT un.conninfo, un.name, un.id " + " FROM %s.repl_nodes un " + "INNER JOIN %s.repl_nodes n " + " ON (un.id = n.upstream_node_id AND un.cluster = n.cluster)" + " WHERE n.cluster = '%s' " + " AND n.id = %i ", get_repmgr_schema_quoted(standby_conn), - cluster); + get_repmgr_schema_quoted(standby_conn), + cluster, + node_id); log_debug("%s", sqlquery); @@ -439,20 +479,19 @@ get_upstream_connection(PGconn *standby_conn, char *cluster, return NULL; } - // ZZZ check for empty result - // maybe modify function to return NULL - strncpy(upstream_conninfo, PQgetvalue(res, 0, 0), MAXCONNINFO); - *upstream_node_id = atoi(PQgetvalue(res, 0, 1)); + + if(upstream_node_id_ptr != NULL) + *upstream_node_id_ptr = atoi(PQgetvalue(res, 0, 1)); PQclear(res); - log_debug("conninfo is: '%s'", upstream_conninfo); + log_debug("conninfo is: '%s'\n", upstream_conninfo); upstream_conn = establish_db_connection(upstream_conninfo, false); if (PQstatus(upstream_conn) != CONNECTION_OK) { - log_err(_("Unable to connect to upstream server: %s\n"), + log_err(_("Unable to connect to upstream node: %s\n"), PQerrorMessage(upstream_conn)); return NULL; } @@ -494,7 +533,7 @@ get_master_connection(PGconn *standby_conn, char *cluster, "SELECT id, conninfo " " FROM %s.repl_nodes " " WHERE cluster = '%s' " - " AND NOT witness ", + " AND type != 'witness' ", get_repmgr_schema_quoted(standby_conn), cluster); diff --git a/dbutils.h b/dbutils.h index fb572ae8..6aad1081 100644 --- a/dbutils.h +++ b/dbutils.h @@ -31,6 +31,7 @@ bool check_cluster_schema(PGconn *conn); int is_standby(PGconn *conn); int is_witness(PGconn *conn,char *cluster, int node_id); bool is_pgup(PGconn *conn, int timeout); +int get_primary_node_id(PGconn *conn, char *cluster); int get_server_version(PGconn *conn, char *server_version); bool get_cluster_size(PGconn *conn, char *size); bool get_pg_setting(PGconn *conn, const char *setting, char *output); @@ -42,7 +43,8 @@ int guc_set_typed(PGconn *conn, const char *parameter, const char *op, PGconn *get_upstream_connection(PGconn *standby_conn, char *cluster, - int *upstream_node_id, + int node_id, + int *upstream_node_id_ptr, char *upstream_conninfo_out); PGconn *get_master_connection(PGconn *standby_conn, char *cluster, int *master_id, char *master_conninfo_out); diff --git a/repmgr.c b/repmgr.c index 2a2a145b..c1161d38 100644 --- a/repmgr.c +++ b/repmgr.c @@ -65,7 +65,7 @@ static bool write_recovery_file_line(FILE *recovery_file, char *recovery_file_pa static int check_server_version(PGconn *conn, char *server_type, bool exit_on_error, char *server_version_string); static bool check_upstream_config(PGconn *conn, bool exit_on_error); -static bool create_node_record(PGconn *conn, char *action, int node, int upstream_node, char *cluster_name, char *node_name, char *conninfo, int priority, bool witness); +static bool create_node_record(PGconn *conn, char *action, int node, char *type, int upstream_node, char *cluster_name, char *node_name, char *conninfo, int priority); static void do_master_register(void); static void do_standby_register(void); @@ -439,7 +439,7 @@ do_cluster_show(void) conn = establish_db_connection(options.conninfo, true); sqlquery_snprintf(sqlquery, - "SELECT conninfo, witness " + "SELECT conninfo, type " " FROM %s.repl_nodes ", get_repmgr_schema_quoted(conn)); res = PQexec(conn, sqlquery); @@ -457,10 +457,11 @@ do_cluster_show(void) printf("Role | Connection String \n"); for (i = 0; i < PQntuples(res); i++) { + // ZZZ witness conn = establish_db_connection(PQgetvalue(res, i, 0), false); if (PQstatus(conn) != CONNECTION_OK) strcpy(node_role, " FAILED"); - else if (strcmp(PQgetvalue(res, i, 1), "t") == 0) + else if (strcmp(PQgetvalue(res, i, 1), "witness") == 0) strcpy(node_role, " witness"); else if (is_standby(conn)) strcpy(node_role, " standby"); @@ -632,12 +633,12 @@ do_master_register(void) node_record_created = create_node_record(conn, "master register", options.node, + "primary", NO_UPSTREAM_NODE, options.cluster_name, options.node_name, options.conninfo, - options.priority, - false); + options.priority); PQfinish(conn); @@ -754,12 +755,12 @@ do_standby_register(void) node_record_created = create_node_record(master_conn, "standby register", options.node, + "standby", options.upstream_node, options.cluster_name, options.node_name, options.conninfo, - options.priority, - false); + options.priority); PQfinish(master_conn); PQfinish(conn); @@ -1557,12 +1558,12 @@ do_witness_create(void) node_record_created = create_node_record(masterconn, "witness create", options.node, + "witness", NO_UPSTREAM_NODE, options.cluster_name, options.node_name, options.conninfo, - options.priority, - true); + options.priority); if(node_record_created == false) { @@ -2044,12 +2045,13 @@ create_schema(PGconn *conn) sqlquery_snprintf(sqlquery, "CREATE TABLE %s.repl_nodes ( " " id INTEGER PRIMARY KEY, " + " type TEXT NOT NULL CHECK (type IN('primary','standby','witness')), " " upstream_node_id INTEGER NULL REFERENCES %s.repl_nodes (id), " " cluster TEXT NOT NULL, " " name TEXT NOT NULL, " " conninfo TEXT NOT NULL, " " priority INTEGER NOT NULL, " - " witness BOOLEAN NOT NULL DEFAULT FALSE) ", + " active BOOLEAN NOT NULL DEFAULT TRUE) ", get_repmgr_schema_quoted(conn), get_repmgr_schema_quoted(conn)); @@ -2212,7 +2214,7 @@ copy_configuration(PGconn *masterconn, PGconn *witnessconn) } sqlquery_snprintf(sqlquery, - "SELECT id, name, conninfo, priority, witness FROM %s.repl_nodes", + "SELECT id, type, upstream_node_id, name, conninfo, priority FROM %s.repl_nodes", get_repmgr_schema_quoted(masterconn)); res = PQexec(masterconn, sqlquery); if (PQresultStatus(res) != PGRES_TUPLES_OK) @@ -2232,16 +2234,18 @@ copy_configuration(PGconn *masterconn, PGconn *witnessconn) node_record_created = create_node_record(witnessconn, "copy_configuration", atoi(PQgetvalue(res, i, 0)), - NO_UPSTREAM_NODE, - options.cluster_name, PQgetvalue(res, i, 1), - PQgetvalue(res, i, 2), - atoi(PQgetvalue(res, i, 3)), - strcmp(witness, "t") == 0 ? true : false); - + strlen(PQgetvalue(res, i, 2)) + ? atoi(PQgetvalue(res, i, 2)) + : NO_UPSTREAM_NODE, + options.cluster_name, + PQgetvalue(res, i, 3), + PQgetvalue(res, i, 4), + atoi(PQgetvalue(res, i, 5))); if (node_record_created == false) { + // ZZZ fix error message? fprintf(stderr, "Unable to create node record for witness: %s\n", PQerrorMessage(witnessconn)); return false; @@ -2484,34 +2488,46 @@ do_check_upstream_config(void) static bool -create_node_record(PGconn *conn, char *action, int node, int upstream_node, char *cluster_name, char *node_name, char *conninfo, int priority, bool witness) +create_node_record(PGconn *conn, char *action, int node, char *type, int upstream_node, char *cluster_name, char *node_name, char *conninfo, int priority) { char sqlquery[QUERY_STR_LEN]; char upstream_node_id[QUERY_STR_LEN]; PGresult *res; - if(upstream_node != NO_UPSTREAM_NODE) + if(upstream_node == NO_UPSTREAM_NODE) { - sqlquery_snprintf(upstream_node_id, "%i", upstream_node); + /* + * No explicit upstream node id provided for standby - attempt to + * get primary node id + */ + if(strcmp(type, "standby") == 0) + { + int primary_node_id = get_primary_node_id(conn, cluster_name); + sqlquery_snprintf(upstream_node_id, "%i", primary_node_id); + } + else + { + sqlquery_snprintf(upstream_node_id, "%s", "NULL"); + } } else { - sqlquery_snprintf(upstream_node_id, "%s", "NULL"); + sqlquery_snprintf(upstream_node_id, "%i", upstream_node); } sqlquery_snprintf(sqlquery, "INSERT INTO %s.repl_nodes " - " (id, upstream_node_id, cluster, " - " name, conninfo, priority, witness) " - "VALUES (%d, %s, '%s', '%s', '%s', %d, %s) ", + " (id, type, upstream_node_id, cluster, " + " name, conninfo, priority) " + "VALUES (%i, '%s', %s, '%s', '%s', '%s', %i) ", get_repmgr_schema_quoted(conn), node, + type, upstream_node_id, cluster_name, node_name, conninfo, - priority, - witness == true ? "TRUE" : "FALSE"); + priority); if(action != NULL) { diff --git a/repmgrd.c b/repmgrd.c index 4f42d86f..b01f1dcd 100644 --- a/repmgrd.c +++ b/repmgrd.c @@ -432,7 +432,7 @@ main(int argc, char **argv) else if (my_local_mode == STANDBY_MODE) { standby_monitor(); - log_debug(_("returned from standby_monitor()\n")); + log_debug(_("returned from standby_monitor()\n")); // ZZZ } sleep(local_options.monitor_interval_secs); @@ -635,12 +635,13 @@ standby_monitor(void) upstream_conn = get_upstream_connection(my_local_conn, local_options.cluster_name, + local_options.node, &upstream_node_id, NULL); /* - * Check if the master is still available, if after 5 minutes of retries - * we cannot reconnect, try to get a new master. + * Check if the upstream node is still available, if after 5 minutes of retries + * we cannot reconnect, try to get a new upstream node. */ check_connection(upstream_conn, "master"); /* this take up to * local_options.reconnect_atte @@ -831,7 +832,7 @@ standby_monitor(void) PQerrorMessage(primary_conn)); } - +// ZZZ witness static void do_failover(void) { @@ -865,7 +866,7 @@ do_failover(void) /* get a list of standby nodes, including myself */ sprintf(sqlquery, - "SELECT id, conninfo, witness " + "SELECT id, conninfo, type " " FROM %s.repl_nodes " " WHERE cluster = '%s' " " ORDER BY priority, id " @@ -896,6 +897,7 @@ do_failover(void) { nodes[i].node_id = atoi(PQgetvalue(res, i, 0)); strncpy(nodes[i].conninfo_str, PQgetvalue(res, i, 1), MAXLEN); + // ZZZ witness nodes[i].is_witness = (strcmp(PQgetvalue(res, i, 2), "t") == 0) ? true : false; /*