repmgr standby follow: main code

This commit is contained in:
Ian Barwick
2017-06-16 21:38:53 +09:00
parent 7b976ef2df
commit 030fdc046b
3 changed files with 335 additions and 7 deletions

View File

@@ -1545,6 +1545,49 @@ update_node_record_set_master(PGconn *conn, int this_node_id)
}
/*
* Update node record following change of status
* (e.g. inactive primary converted to standby)
*/
bool
update_node_record_status(PGconn *conn, int this_node_id, char *type, int upstream_node_id, bool active)
{
PQExpBufferData query;
PGresult *res;
initPQExpBuffer(&query);
appendPQExpBuffer(&query,
" UPDATE repmgr.nodes "
" SET type = '%s', "
" upstream_node_id = %i, "
" active = %s "
" WHERE node_id = %i ",
type,
upstream_node_id,
active ? "TRUE" : "FALSE",
this_node_id);
log_verbose(LOG_DEBUG, "update_node_record_status():\n %s", query.data);
res = PQexec(conn, query.data);
termPQExpBuffer(&query);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
log_error(_("unable to update node record:\n %s"),
PQerrorMessage(conn));
PQclear(res);
return false;
}
PQclear(res);
return true;
}
bool
delete_node_record(PGconn *conn, int node)
@@ -1906,7 +1949,7 @@ create_replication_slot(PGconn *conn, char *slot_name, int server_version_num, P
slot_name);
}
log_debug(_("create_replication_slot(): Creating slot '%s' on upstream"), slot_name);
log_debug(_("create_replication_slot(): creating slot '%s' on upstream"), slot_name);
log_verbose(LOG_DEBUG, "create_replication_slot():\n%s", query.data);
res = PQexec(conn, query.data);
@@ -1915,7 +1958,7 @@ create_replication_slot(PGconn *conn, char *slot_name, int server_version_num, P
if (!res || PQresultStatus(res) != PGRES_TUPLES_OK)
{
appendPQExpBuffer(error_msg,
_("unable to create slot '%s' on the master node: %s\n"),
_("unable to create slot '%s' on the upstream node: %s\n"),
slot_name,
PQerrorMessage(conn));
PQclear(res);

View File

@@ -208,6 +208,7 @@ bool update_node_record(PGconn *conn, char *repmgr_action, t_node_info *node_in
bool delete_node_record(PGconn *conn, int node);
bool update_node_record_set_master(PGconn *conn, int this_node_id);
bool update_node_record_status(PGconn *conn, int this_node_id, char *type, int upstream_node_id, bool active);
/* event record functions */
bool create_event_record(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details);

View File

@@ -73,6 +73,8 @@ static void copy_configuration_files(void);
static int run_basebackup(void);
static int run_file_backup(void);
static void drop_replication_slot_if_exists(PGconn *conn, int node_id, char *slot_name);
static void tablespace_data_append(TablespaceDataList *list, const char *name, const char *oid, const char *location);
static void get_barman_property(char *dst, char *name, char *local_repmgr_directory);
@@ -1225,7 +1227,20 @@ void
do_standby_follow(void)
{
PGconn *local_conn;
t_node_info local_node_record = T_NODE_INFO_INITIALIZER;
int original_upstream_node_id = UNKNOWN_NODE_ID;
PGconn *master_conn = NULL;
int master_id = UNKNOWN_NODE_ID;
t_node_info master_node_record = T_NODE_INFO_INITIALIZER;
char data_dir[MAXPGPATH];
t_conninfo_param_list recovery_conninfo;
char *errmsg = NULL;
int query_result;
char restart_command[MAXLEN];
int r;
log_verbose(LOG_DEBUG, "do_standby_follow()");
@@ -1237,9 +1252,6 @@ do_standby_follow(void)
if (runtime_options.host_param_provided == false)
{
bool success;
PGconn *master_conn = NULL;
char master_conninfo[MAXLEN];
int master_id = UNKNOWN_NODE_ID;
int timer;
local_conn = establish_db_connection(config_file_options.conninfo, true);
@@ -1268,7 +1280,7 @@ do_standby_follow(void)
{
master_conn = get_master_connection(local_conn,
&master_id,
(char *) &master_conninfo);
NULL);
if (PQstatus(master_conn) == CONNECTION_OK || runtime_options.wait == false)
{
@@ -1283,8 +1295,250 @@ do_standby_follow(void)
exit(ERR_BAD_CONFIG);
}
puts("OK");
check_master_standby_version_match(local_conn, master_conn);
PQfinish(local_conn);
}
/* local data directory and primary server info explictly provided -
* attempt to connect to that
*
* XXX --wait option won't be effective here
*/
else
{
master_conn = establish_db_connection_by_params(
(const char**)source_conninfo.keywords,
(const char**)source_conninfo.values,
true);
master_id = get_master_node_id(master_conn);
strncpy(data_dir, runtime_options.data_dir, MAXPGPATH);
}
if (get_recovery_type(master_conn) != RECTYPE_MASTER)
{
log_error(_("the node to follow is not a master"));
// XXX log detail
PQfinish(master_conn);
exit(ERR_BAD_CONFIG);
}
/*
* If 9.4 or later, and replication slots in use, we'll need to create a
* slot on the new master
*/
if (config_file_options.use_replication_slots)
{
int server_version_num = get_server_version(master_conn, NULL);
PQExpBufferData event_details;
initPQExpBuffer(&event_details);
if (create_replication_slot(master_conn, repmgr_slot_name, server_version_num, &event_details) == false)
{
log_error("%s", event_details.data);
create_event_record(master_conn,
&config_file_options,
config_file_options.node_id,
"standby_follow",
false,
event_details.data);
PQfinish(master_conn);
exit(ERR_DB_QUERY);
}
termPQExpBuffer(&event_details);
}
get_node_record(master_conn, master_id, &master_node_record);
/* Initialise connection parameters to write as `primary_conninfo` */
initialize_conninfo_params(&recovery_conninfo, false);
/* We ignore any application_name set in the master's conninfo */
parse_conninfo_string(master_node_record.conninfo, &recovery_conninfo, errmsg, true);
/* Set the default application name to this node's name */
param_set(&recovery_conninfo, "application_name", config_file_options.node_name);
/* Set the replication user from the master node record */
param_set(&recovery_conninfo, "user", master_node_record.repluser);
/*
* Fetch our node record so we can write application_name, if set,
* and to get the upstream node ID, which we'll need to know if
* replication slots are in use and we want to delete the old slot.
*/
query_result = get_node_record(master_conn,
config_file_options.node_id,
&local_node_record);
if (query_result != 1)
{
/* this shouldn't happen, but if it does we'll plough on regardless */
log_warning(_("unable to retrieve record for node %i"),
config_file_options.node_id);
}
else
{
t_conninfo_param_list local_node_conninfo;
bool parse_success;
initialize_conninfo_params(&local_node_conninfo, false);
parse_success = parse_conninfo_string(local_node_record.conninfo, &local_node_conninfo, errmsg, false);
if (parse_success == false)
{
/* this shouldn't happen, but if it does we'll plough on regardless */
log_warning(_("unable to parse conninfo string \"%s\":\n %s"),
local_node_record.conninfo, errmsg);
}
else
{
char *application_name = param_get(&local_node_conninfo, "application_name");
if (application_name != NULL && strlen(application_name))
param_set(&recovery_conninfo, "application_name", application_name);
}
/*
* store the original upstream node id so we can delete the replication slot,
* if exists
*/
if (local_node_record.upstream_node_id != UNKNOWN_NODE_ID)
{
original_upstream_node_id = local_node_record.upstream_node_id;
}
else
{
original_upstream_node_id = master_id;
}
}
{
char *list = param_list_to_string(&recovery_conninfo);
printf("recovery: %s\n", list);
}
log_info(_("changing standby's master to node %i"), master_id);
if (!create_recovery_file(data_dir, &recovery_conninfo))
{
PQfinish(master_conn);
exit(ERR_BAD_CONFIG);
}
/* restart the service */
// XXX here check if service is running!! if not, start
// ensure that problem with pg_ctl output is caught here
if (*config_file_options.service_restart_command)
{
maxlen_snprintf(restart_command, "%s", config_file_options.service_restart_command);
}
else
{
maxlen_snprintf(restart_command,
"%s %s -w -D %s -m fast restart",
make_pg_path("pg_ctl"),
config_file_options.pg_ctl_options,
data_dir);
}
log_notice(_("restarting server using '%s'"),
restart_command);
r = system(restart_command);
if (r != 0)
{
log_error(_("unable to restart server"));
PQfinish(master_conn);
exit(ERR_NO_RESTART);
}
/*
* If replication slots are in use, and an inactive one for this node
* exists on the former upstream, drop it.
*
* XXX check if former upstream is current master?
*/
if (config_file_options.use_replication_slots && runtime_options.host_param_provided == false && original_upstream_node_id != UNKNOWN_NODE_ID)
{
t_node_info upstream_node_record = T_NODE_INFO_INITIALIZER;
int upstream_query_result;
log_verbose(LOG_INFO, "attempting to remove replication slot from old upstream node %i",
original_upstream_node_id);
/* XXX should we poll for server restart? */
local_conn = establish_db_connection(config_file_options.conninfo, true);
upstream_query_result = get_node_record(local_conn,
original_upstream_node_id,
&upstream_node_record);
PQfinish(local_conn);
if (upstream_query_result != 1)
{
log_warning(_("unable to retrieve node record for old upstream node %i"),
original_upstream_node_id);
}
else
{
PGconn *old_upstream_conn = establish_db_connection_quiet(upstream_node_record.conninfo);
if (PQstatus(old_upstream_conn) != CONNECTION_OK)
{
log_info(_("unable to connect to old upstream node %i to remove replication slot"),
original_upstream_node_id);
log_hint(_("if reusing this node, you should manually remove any inactive replication slots"));
}
else
{
drop_replication_slot_if_exists(old_upstream_conn,
original_upstream_node_id,
local_node_record.slot_name);
}
}
}
/*
* It's possible this node was an inactive primary - update the
* relevant fields to ensure it's marked as an active standby
*/
if (update_node_record_status(master_conn,
config_file_options.node_id,
"standby",
master_id,
true) == false)
{
log_error(_("unable to update upstream node"));
PQfinish(master_conn);
exit(ERR_BAD_CONFIG);
}
log_notice(_("STANDBY FOLLOW successful"));
create_event_record(master_conn,
&config_file_options,
config_file_options.node_id,
"standby_follow",
true,
NULL);
PQfinish(master_conn);
return;
}
@@ -2748,3 +3002,33 @@ check_recovery_type(PGconn *conn)
}
}
}
static void
drop_replication_slot_if_exists(PGconn *conn, int node_id, char *slot_name)
{
t_replication_slot slot_info;
int query_res;
query_res = get_slot_record(conn,slot_name, &slot_info);
if (query_res)
{
if (slot_info.active == false)
{
if (drop_replication_slot(conn, slot_name) == true)
{
log_notice(_("replication slot \"%s\" deleted on node %i"), slot_name, node_id);
}
else
{
log_error(_("unable to delete replication slot \"%s\" on node %i"), slot_name, node_id);
}
}
/* if active replication slot exists, call Houston as we have a problem */
else
{
log_warning(_("replication slot \"%s\" is still active on node %i"), slot_name, node_id);
}
}
}