Consolidate replication connection code

In a few places, replication connections are generated from the
parameters used by existing connections. This has resulted in a
number of similar blocks of code which do more-or-less the same
thing almost but not quite identically. In two cases, the code
omitted to set "dbname=replication", which can cause problems
in some contexts.

These code blocks have now been consolidated into standardized
functions.

This also resolves the issue addressed by GitHub #619.
This commit is contained in:
Ian Barwick
2020-03-05 15:12:03 +09:00
parent 10304a1a3b
commit 9de31428f1
6 changed files with 59 additions and 77 deletions

View File

@@ -43,6 +43,8 @@ static PGconn *_establish_db_connection(const char *conninfo,
const bool log_notice,
const bool verbose_only);
static PGconn * _establish_replication_connection_from_params(PGconn *conn, const char *conninfo, const char *repluser);
static PGconn *_get_primary_connection(PGconn *standby_conn, int *primary_id, char *primary_conninfo_out, bool quiet);
static bool _set_config(PGconn *conn, const char *config_param, const char *sqlquery);
@@ -306,6 +308,50 @@ establish_db_connection_by_params(t_conninfo_param_list *param_list,
}
/*
* Given an existing active connection and the name of a replication
* user, extract the connection parameters from that connection and
* attempt to return a replication connection.
*/
PGconn *
establish_replication_connection_from_conn(PGconn *conn, const char *repluser)
{
return _establish_replication_connection_from_params(conn, NULL, repluser);
}
PGconn *
establish_replication_connection_from_conninfo(const char *conninfo, const char *repluser)
{
return _establish_replication_connection_from_params(NULL, conninfo, repluser);
}
static PGconn *
_establish_replication_connection_from_params(PGconn *conn, const char *conninfo, const char *repluser)
{
t_conninfo_param_list repl_conninfo = T_CONNINFO_PARAM_LIST_INITIALIZER;
PGconn *repl_conn = NULL;
initialize_conninfo_params(&repl_conninfo, false);
if (conn != NULL)
conn_to_param_list(conn, &repl_conninfo);
else if (conninfo != NULL)
parse_conninfo_string(conninfo, &repl_conninfo, NULL, false);
/* Set the provided replication user */
param_set(&repl_conninfo, "user", repluser);
param_set(&repl_conninfo, "replication", "1");
param_set(&repl_conninfo, "dbname", "replication");
repl_conn = establish_db_connection_by_params(&repl_conninfo, false);
free_conninfo_params(&repl_conninfo);
return repl_conn;
}
PGconn *
get_primary_connection(PGconn *conn,
int *primary_id, char *primary_conninfo_out)

View File

@@ -388,6 +388,9 @@ PGconn *establish_db_connection(const char *conninfo,
PGconn *establish_db_connection_quiet(const char *conninfo);
PGconn *establish_db_connection_by_params(t_conninfo_param_list *param_list,
const bool exit_on_error);
PGconn *establish_replication_connection_from_conn(PGconn *conn, const char *repluser);
PGconn *establish_replication_connection_from_conninfo(const char *conninfo, const char *repluser);
PGconn *establish_primary_db_connection(PGconn *conn,
const bool exit_on_error);
PGconn *get_primary_connection(PGconn *standby_conn, int *primary_id, char *primary_conninfo_out);

View File

@@ -918,7 +918,6 @@ do_node_check_replication_connection(void)
PGconn *repl_conn = NULL;
t_node_info node_record = T_NODE_INFO_INITIALIZER;
RecordStatus record_status = RECORD_NOT_FOUND;
t_conninfo_param_list remote_conninfo = T_CONNINFO_PARAM_LIST_INITIALIZER;
PQExpBufferData output;
@@ -948,17 +947,8 @@ do_node_check_replication_connection(void)
return;
}
initialize_conninfo_params(&remote_conninfo, false);
parse_conninfo_string(node_record.conninfo, &remote_conninfo, NULL, false);
if (strcmp(param_get(&remote_conninfo, "user"), node_record.repluser) != 0)
{
param_set(&remote_conninfo, "user", node_record.repluser);
param_set(&remote_conninfo, "dbname", "replication");
}
param_set(&remote_conninfo, "replication", "1");
repl_conn = establish_db_connection_by_params(&remote_conninfo, false);
repl_conn = establish_replication_connection_from_conninfo(node_record.conninfo,
node_record.repluser);
if (PQstatus(repl_conn) != CONNECTION_OK)
{

View File

@@ -2870,10 +2870,8 @@ do_standby_follow(void)
/* XXX check this is not current upstream anyway */
/* check if we can attach to the follow target */
{
t_conninfo_param_list local_repl_conninfo = T_CONNINFO_PARAM_LIST_INITIALIZER;
PGconn *local_repl_conn = NULL;
t_system_identification local_identification = T_SYSTEM_IDENTIFICATION_INITIALIZER;
@@ -2887,18 +2885,8 @@ do_standby_follow(void)
* TODO: from 9.6, query "pg_stat_wal_receiver" via the existing local connection
*/
initialize_conninfo_params(&local_repl_conninfo, false);
conn_to_param_list(local_conn, &local_repl_conninfo);
/* Set the replication user from the node record */
param_set(&local_repl_conninfo, "user", local_node_record.repluser);
param_set(&local_repl_conninfo, "replication", "1");
local_repl_conn = establish_db_connection_by_params(&local_repl_conninfo, false);
free_conninfo_params(&local_repl_conninfo);
local_repl_conn = establish_replication_connection_from_conn(local_conn,
local_node_record.repluser);
if (PQstatus(local_repl_conn) != CONNECTION_OK)
{
log_error(_("unable to establish a replication connection to the local node"));

View File

@@ -4014,7 +4014,6 @@ bool
check_node_can_attach(TimeLineID local_tli, XLogRecPtr local_xlogpos, PGconn *follow_target_conn, t_node_info *follow_target_node_record, bool is_rejoin)
{
uint64 local_system_identifier = UNKNOWN_SYSTEM_IDENTIFIER;
t_conninfo_param_list follow_target_repl_conninfo = T_CONNINFO_PARAM_LIST_INITIALIZER;
PGconn *follow_target_repl_conn = NULL;
t_system_identification follow_target_identification = T_SYSTEM_IDENTIFICATION_INITIALIZER;
bool success = true;
@@ -4022,21 +4021,8 @@ check_node_can_attach(TimeLineID local_tli, XLogRecPtr local_xlogpos, PGconn *fo
const char *action = is_rejoin == true ? "rejoin" : "follow";
/* check replication connection */
initialize_conninfo_params(&follow_target_repl_conninfo, false);
conn_to_param_list(follow_target_conn, &follow_target_repl_conninfo);
if (strcmp(param_get(&follow_target_repl_conninfo, "user"), follow_target_node_record->repluser) != 0)
{
param_set(&follow_target_repl_conninfo, "user", follow_target_node_record->repluser);
param_set(&follow_target_repl_conninfo, "dbname", "replication");
}
param_set(&follow_target_repl_conninfo, "replication", "1");
follow_target_repl_conn = establish_db_connection_by_params(&follow_target_repl_conninfo, false);
free_conninfo_params(&follow_target_repl_conninfo);
follow_target_repl_conn = establish_replication_connection_from_conn(follow_target_conn,
follow_target_node_record->repluser);
if (PQstatus(follow_target_repl_conn) != CONNECTION_OK)
{

View File

@@ -4779,11 +4779,9 @@ parse_failover_validation_command(const char *template, t_node_info *node_info,
static bool
check_node_can_follow(PGconn *local_conn, XLogRecPtr local_xlogpos, PGconn *follow_target_conn, t_node_info *follow_target_node_info)
{
t_conninfo_param_list local_repl_conninfo = T_CONNINFO_PARAM_LIST_INITIALIZER;
PGconn *local_repl_conn = NULL;
t_system_identification local_identification = T_SYSTEM_IDENTIFICATION_INITIALIZER;
t_conninfo_param_list follow_target_repl_conninfo = T_CONNINFO_PARAM_LIST_INITIALIZER;
PGconn *follow_target_repl_conn = NULL;
t_system_identification follow_target_identification = T_SYSTEM_IDENTIFICATION_INITIALIZER;
TimeLineHistoryEntry *follow_target_history = NULL;
@@ -4791,23 +4789,7 @@ check_node_can_follow(PGconn *local_conn, XLogRecPtr local_xlogpos, PGconn *foll
bool can_follow = true;
bool success;
/* Check local replication connection - we want to execute IDENTIFY_SYSTEM
* to get the current timeline ID, which might not yet be written to
* pg_control.
*
* TODO: from 9.6, query "pg_stat_wal_receiver" via the existing local connection
*/
initialize_conninfo_params(&local_repl_conninfo, false);
conn_to_param_list(local_conn, &local_repl_conninfo);
/* Set the replication user from the node record */
param_set(&local_repl_conninfo, "user", local_node_info.repluser);
param_set(&local_repl_conninfo, "replication", "1");
local_repl_conn = establish_db_connection_by_params(&local_repl_conninfo, false);
free_conninfo_params(&local_repl_conninfo);
local_repl_conn = establish_replication_connection_from_conn(local_conn, local_node_info.repluser);
if (PQstatus(local_repl_conn) != CONNECTION_OK)
{
@@ -4816,6 +4798,7 @@ check_node_can_follow(PGconn *local_conn, XLogRecPtr local_xlogpos, PGconn *foll
return false;
}
success = identify_system(local_repl_conn, &local_identification);
PQfinish(local_repl_conn);
@@ -4827,22 +4810,8 @@ check_node_can_follow(PGconn *local_conn, XLogRecPtr local_xlogpos, PGconn *foll
}
/* check replication connection */
initialize_conninfo_params(&follow_target_repl_conninfo, false);
conn_to_param_list(follow_target_conn, &follow_target_repl_conninfo);
if (strcmp(param_get(&follow_target_repl_conninfo, "user"), follow_target_node_info->repluser) != 0)
{
param_set(&follow_target_repl_conninfo, "user", follow_target_node_info->repluser);
param_set(&follow_target_repl_conninfo, "dbname", "replication");
}
param_set(&follow_target_repl_conninfo, "replication", "1");
follow_target_repl_conn = establish_db_connection_by_params(&follow_target_repl_conninfo, false);
free_conninfo_params(&follow_target_repl_conninfo);
follow_target_repl_conn = establish_replication_connection_from_conn(follow_target_conn,
follow_target_node_info->repluser);
if (PQstatus(follow_target_repl_conn) != CONNECTION_OK)
{
log_error(_("unable to establish a replication connection to the follow target node"));