From 2ae27521a3559c9867a660da1adc9f0c73f68796 Mon Sep 17 00:00:00 2001 From: Ian Barwick Date: Thu, 15 Jan 2015 15:37:09 +0900 Subject: [PATCH] Some infrastructure for supporting cascading replication Does not fully work yet. --- dbutils.c | 79 ++++++++++++++++++++++++++++++++++++++++++++++++++----- dbutils.h | 3 +++ repmgrd.c | 29 ++++++++++++++------ 3 files changed, 96 insertions(+), 15 deletions(-) diff --git a/dbutils.c b/dbutils.c index 69cf48bc..7b9e8306 100644 --- a/dbutils.c +++ b/dbutils.c @@ -356,6 +356,75 @@ get_pg_setting(PGconn *conn, const char *setting, char *output) } +PGconn * +get_upstream_connection(PGconn *standby_conn, char *cluster, + int *upstream_node_id, char *upstream_conninfo_out) +{ + PGconn *upstream_conn = NULL; + PGresult *res; + char sqlquery[QUERY_STR_LEN]; + char upstream_conninfo_stack[MAXCONNINFO]; + char *upstream_conninfo = &*upstream_conninfo_stack; + + /* + * If the caller wanted to get a copy of the connection info string, sub + * out the local stack pointer for the pointer passed by the caller. + */ + if (upstream_conninfo_out != NULL) + upstream_conninfo = upstream_conninfo_out; + + /* hacky query */ + sqlquery_snprintf(sqlquery, + "WITH i AS ( " + " WITH p AS ( " + " SELECT repmgr_get_primary_conninfo() AS conninfo " + " )" + " SELECT p.conninfo, " + " unnest(regexp_matches(conninfo,'application_name=(\\S+)')) AS name " + " FROM p " + ") " + " SELECT rn.conninfo, i.name, rn.id " + " FROM i " + "INNER JOIN %s.repl_nodes rn " + " ON rn.name = i.name " + " WHERE cluster = '%s' ", + get_repmgr_schema_quoted(standby_conn), + cluster); + + log_debug("%s", sqlquery); + + res = PQexec(standby_conn, sqlquery); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + log_err(_("Unable to get conninfo for upstream server: %s\n"), + PQerrorMessage(standby_conn)); + PQclear(res); + return NULL; + } + + // ZZZ check for empty result + // maybe modify function to return NULL + + strncpy(upstream_conninfo, PQgetvalue(res, 0, 0), MAXCONNINFO); + *upstream_node_id = atoi(PQgetvalue(res, 0, 1)); + + PQclear(res); + + log_debug("conninfo is: '%s'", upstream_conninfo); + upstream_conn = establish_db_connection(upstream_conninfo, false); + + if (PQstatus(upstream_conn) != CONNECTION_OK) + { + log_err(_("Unable to connect to upstream server: %s\n"), + PQerrorMessage(upstream_conn)); + return NULL; + } + + return upstream_conn; +} + + /* * get a connection to master by reading repl_nodes, creating a connection * to each node (one at a time) and finding if it is a master or a standby @@ -364,6 +433,8 @@ get_pg_setting(PGconn *conn, const char *setting, char *output) * point to allocated memory of MAXCONNINFO in length, and the master server * connection string is placed there. */ + +// ZZZ value placed in `master_id` used by callers in repmgrd PGconn * get_master_connection(PGconn *standby_conn, char *cluster, int *master_id, char *master_conninfo_out) @@ -377,14 +448,8 @@ get_master_connection(PGconn *standby_conn, char *cluster, int i; - /* - * If the caller wanted to get a copy of the connection info string, sub - * out the local stack pointer for the pointer passed by the caller. - */ - if (master_conninfo_out != NULL) - master_conninfo = master_conninfo_out; - + // ZZZ below old stuff /* find all nodes belonging to this cluster */ log_info(_("finding node list for cluster '%s'\n"), cluster); diff --git a/dbutils.h b/dbutils.h index 0157edc3..e8399774 100644 --- a/dbutils.h +++ b/dbutils.h @@ -40,6 +40,9 @@ int guc_set_typed(PGconn *conn, const char *parameter, const char *op, const char *value, const char *datatype); +PGconn *get_upstream_connection(PGconn *standby_conn, char *cluster, + int *upstream_node_id, + char *upstream_conninfo_out); PGconn *get_master_connection(PGconn *standby_conn, char *cluster, int *master_id, char *master_conninfo_out); diff --git a/repmgrd.c b/repmgrd.c index ee68fc7e..4f42d86f 100644 --- a/repmgrd.c +++ b/repmgrd.c @@ -384,12 +384,15 @@ main(int argc, char **argv) case WITNESS_MODE: case STANDBY_MODE: - /* I need the id of the primary as well as a connection to it */ + + /* We need the node id of the upstream server as well as a connection to it */ log_info(_("%s Connecting to primary for cluster '%s'\n"), progname, local_options.cluster_name); + primary_conn = get_master_connection(my_local_conn, local_options.cluster_name, &primary_options.node, NULL); + if (primary_conn == NULL) { terminate(ERR_BAD_CONFIG); @@ -398,7 +401,7 @@ main(int argc, char **argv) check_cluster_configuration(my_local_conn); check_node_configuration(); - if (reload_config(config_file, &local_options)) + if (0 && reload_config(config_file, &local_options)) { PQfinish(my_local_conn); my_local_conn = establish_db_connection(local_options.conninfo, true); @@ -627,11 +630,19 @@ standby_monitor(void) ret; bool did_retry = false; + PGconn *upstream_conn; + int upstream_node_id; + + upstream_conn = get_upstream_connection(my_local_conn, + local_options.cluster_name, + &upstream_node_id, NULL); + + /* * Check if the master is still available, if after 5 minutes of retries * we cannot reconnect, try to get a new master. */ - check_connection(primary_conn, "master"); /* this take up to + check_connection(upstream_conn, "master"); /* this take up to * local_options.reconnect_atte * mpts * * local_options.reconnect_intv @@ -643,10 +654,10 @@ standby_monitor(void) terminate(1); } - if (PQstatus(primary_conn) != CONNECTION_OK) + if (PQstatus(upstream_conn) != CONNECTION_OK) { - PQfinish(primary_conn); - primary_conn = NULL; + PQfinish(upstream_conn); + upstream_conn = NULL; if (local_options.failover == MANUAL_FAILOVER) { @@ -695,12 +706,14 @@ standby_monitor(void) } } + PQfinish(upstream_conn); + /* Check if we still are a standby, we could have been promoted */ do { - log_debug("standby_monitor() - checking if still standby\n"); + log_debug("standby_monitor() - checking if still standby\n"); // ZZZ ret = is_standby(my_local_conn); - + log_debug("ret is %i", ret); // ZZZ switch (ret) { case 0: