Some infrastructure for supporting cascading replication

Does not fully work yet.
This commit is contained in:
Ian Barwick
2015-01-15 15:37:09 +09:00
parent d141d2a8aa
commit 2ae27521a3
3 changed files with 96 additions and 15 deletions

View File

@@ -356,6 +356,75 @@ get_pg_setting(PGconn *conn, const char *setting, char *output)
}
PGconn *
get_upstream_connection(PGconn *standby_conn, char *cluster,
int *upstream_node_id, char *upstream_conninfo_out)
{
PGconn *upstream_conn = NULL;
PGresult *res;
char sqlquery[QUERY_STR_LEN];
char upstream_conninfo_stack[MAXCONNINFO];
char *upstream_conninfo = &*upstream_conninfo_stack;
/*
* If the caller wanted to get a copy of the connection info string, sub
* out the local stack pointer for the pointer passed by the caller.
*/
if (upstream_conninfo_out != NULL)
upstream_conninfo = upstream_conninfo_out;
/* hacky query */
sqlquery_snprintf(sqlquery,
"WITH i AS ( "
" WITH p AS ( "
" SELECT repmgr_get_primary_conninfo() AS conninfo "
" )"
" SELECT p.conninfo, "
" unnest(regexp_matches(conninfo,'application_name=(\\S+)')) AS name "
" FROM p "
") "
" SELECT rn.conninfo, i.name, rn.id "
" FROM i "
"INNER JOIN %s.repl_nodes rn "
" ON rn.name = i.name "
" WHERE cluster = '%s' ",
get_repmgr_schema_quoted(standby_conn),
cluster);
log_debug("%s", sqlquery);
res = PQexec(standby_conn, sqlquery);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
log_err(_("Unable to get conninfo for upstream server: %s\n"),
PQerrorMessage(standby_conn));
PQclear(res);
return NULL;
}
// ZZZ check for empty result
// maybe modify function to return NULL
strncpy(upstream_conninfo, PQgetvalue(res, 0, 0), MAXCONNINFO);
*upstream_node_id = atoi(PQgetvalue(res, 0, 1));
PQclear(res);
log_debug("conninfo is: '%s'", upstream_conninfo);
upstream_conn = establish_db_connection(upstream_conninfo, false);
if (PQstatus(upstream_conn) != CONNECTION_OK)
{
log_err(_("Unable to connect to upstream server: %s\n"),
PQerrorMessage(upstream_conn));
return NULL;
}
return upstream_conn;
}
/*
* get a connection to master by reading repl_nodes, creating a connection
* to each node (one at a time) and finding if it is a master or a standby
@@ -364,6 +433,8 @@ get_pg_setting(PGconn *conn, const char *setting, char *output)
* point to allocated memory of MAXCONNINFO in length, and the master server
* connection string is placed there.
*/
// ZZZ value placed in `master_id` used by callers in repmgrd
PGconn *
get_master_connection(PGconn *standby_conn, char *cluster,
int *master_id, char *master_conninfo_out)
@@ -377,14 +448,8 @@ get_master_connection(PGconn *standby_conn, char *cluster,
int i;
/*
* If the caller wanted to get a copy of the connection info string, sub
* out the local stack pointer for the pointer passed by the caller.
*/
if (master_conninfo_out != NULL)
master_conninfo = master_conninfo_out;
// ZZZ below old stuff
/* find all nodes belonging to this cluster */
log_info(_("finding node list for cluster '%s'\n"),
cluster);

View File

@@ -40,6 +40,9 @@ int guc_set_typed(PGconn *conn, const char *parameter, const char *op,
const char *value, const char *datatype);
PGconn *get_upstream_connection(PGconn *standby_conn, char *cluster,
int *upstream_node_id,
char *upstream_conninfo_out);
PGconn *get_master_connection(PGconn *standby_conn, char *cluster,
int *master_id, char *master_conninfo_out);

View File

@@ -384,12 +384,15 @@ main(int argc, char **argv)
case WITNESS_MODE:
case STANDBY_MODE:
/* I need the id of the primary as well as a connection to it */
/* We need the node id of the upstream server as well as a connection to it */
log_info(_("%s Connecting to primary for cluster '%s'\n"),
progname, local_options.cluster_name);
primary_conn = get_master_connection(my_local_conn,
local_options.cluster_name,
&primary_options.node, NULL);
if (primary_conn == NULL)
{
terminate(ERR_BAD_CONFIG);
@@ -398,7 +401,7 @@ main(int argc, char **argv)
check_cluster_configuration(my_local_conn);
check_node_configuration();
if (reload_config(config_file, &local_options))
if (0 && reload_config(config_file, &local_options))
{
PQfinish(my_local_conn);
my_local_conn = establish_db_connection(local_options.conninfo, true);
@@ -627,11 +630,19 @@ standby_monitor(void)
ret;
bool did_retry = false;
PGconn *upstream_conn;
int upstream_node_id;
upstream_conn = get_upstream_connection(my_local_conn,
local_options.cluster_name,
&upstream_node_id, NULL);
/*
* Check if the master is still available, if after 5 minutes of retries
* we cannot reconnect, try to get a new master.
*/
check_connection(primary_conn, "master"); /* this take up to
check_connection(upstream_conn, "master"); /* this take up to
* local_options.reconnect_atte
* mpts *
* local_options.reconnect_intv
@@ -643,10 +654,10 @@ standby_monitor(void)
terminate(1);
}
if (PQstatus(primary_conn) != CONNECTION_OK)
if (PQstatus(upstream_conn) != CONNECTION_OK)
{
PQfinish(primary_conn);
primary_conn = NULL;
PQfinish(upstream_conn);
upstream_conn = NULL;
if (local_options.failover == MANUAL_FAILOVER)
{
@@ -695,12 +706,14 @@ standby_monitor(void)
}
}
PQfinish(upstream_conn);
/* Check if we still are a standby, we could have been promoted */
do
{
log_debug("standby_monitor() - checking if still standby\n");
log_debug("standby_monitor() - checking if still standby\n"); // ZZZ
ret = is_standby(my_local_conn);
log_debug("ret is %i", ret); // ZZZ
switch (ret)
{
case 0: