Add "standby switchover" mode

Perform a switchover by:
 - stopping current primary node
 - promoting this standby node to primary
 - forcing previous primary node to follow this node

Caveats:
 - repmgrd must not be running, otherwise it may
   attempt a failover
   (TODO: find some way of notifying repmgrd of planned
    activity like this)
 - currently only set up for two-node operation; any other
   standbys will probably become downstream cascaded standbys
   of the old primary once it's restarted
 - as we're executing repmgr remotely (on the old primary),
   we'll need the location of its configuration file; this
   can be provided explicitly with -C/--remote-config-file,
   otherwise repmgr will look in default locations on the
   remote server
 - this does not yet support "rewinding" stopped nodes
   which will be unable to catch up with the primary

TODO:
 - update help, docs
 - make connection test timeouts/intervals configurable
This commit is contained in:
Ian Barwick
2015-11-25 16:25:15 +09:00
parent f6d1db5edb
commit 120688013e
6 changed files with 1079 additions and 184 deletions

View File

@@ -31,7 +31,7 @@ static void exit_with_errors(ErrorList *config_errors);
const static char *_progname = '\0';
static char config_file_path[MAXPGPATH];
static bool config_file_provided = false;
static bool config_file_found = false;
bool config_file_found = false;
void
@@ -149,7 +149,7 @@ load_config(const char *config_file, bool verbose, t_configuration_options *opti
if (verbose == true)
{
log_notice(_("looking for configuration file in %s"), sysconf_etc_path);
log_notice(_("looking for configuration file in %s\n"), sysconf_etc_path);
}
snprintf(config_file_path, MAXPGPATH, "%s/%s", sysconf_etc_path, CONFIG_FILE_NAME);

306
dbutils.c
View File

@@ -445,7 +445,6 @@ get_cluster_size(PGconn *conn, char *size)
}
bool
get_pg_setting(PGconn *conn, const char *setting, char *output)
{
@@ -488,7 +487,7 @@ get_pg_setting(PGconn *conn, const char *setting, char *output)
if (success == true)
{
log_debug(_("get_pg_setting(): returned value is \"%s\"\n"), output);
log_verbose(LOG_DEBUG, _("get_pg_setting(): returned value is \"%s\"\n"), output);
}
PQclear(res);
@@ -497,6 +496,48 @@ get_pg_setting(PGconn *conn, const char *setting, char *output)
}
/*
* get_conninfo_value()
*
* Extract the value represented by 'keyword' in 'conninfo' and copy
* it to the 'output' buffer.
*
* Returns true on success, or false on failure (conninfo string could
* not be parsed, or provided keyword not found).
*/
bool
get_conninfo_value(const char *conninfo, const char *keyword, char *output)
{
PQconninfoOption *conninfo_options;
PQconninfoOption *conninfo_option;
conninfo_options = PQconninfoParse(conninfo, NULL);
if (conninfo_options == false)
{
log_err(_("Unable to parse provided conninfo string \"%s\""), conninfo);
return false;
}
for (conninfo_option = conninfo_options; conninfo_option->keyword != NULL; conninfo_option++)
{
if (strcmp(conninfo_option->keyword, keyword) == 0)
{
if (conninfo_option->val != NULL && conninfo_option->val[0] != '\0')
{
strncpy(output, conninfo_option->val, MAXLEN);
break;
}
}
}
PQconninfoFree(conninfo_options);
return true;
}
/*
* get_upstream_connection()
*
@@ -598,6 +639,13 @@ get_master_connection(PGconn *standby_conn, char *cluster,
int i,
node_id;
/*
* If the caller wanted to get a copy of the connection info string, sub
* out the local stack pointer for the pointer passed by the caller.
*/
if (master_conninfo_out != NULL)
remote_conninfo = master_conninfo_out;
if (master_id != NULL)
{
*master_id = NODE_NOT_FOUND;
@@ -819,8 +867,10 @@ get_repmgr_schema_quoted(PGconn *conn)
bool
create_replication_slot(PGconn *conn, char *slot_name)
{
char sqlquery[QUERY_STR_LEN];
PGresult *res;
char sqlquery[QUERY_STR_LEN];
int query_res;
PGresult *res;
t_replication_slot slot_info;
/*
* Check whether slot exists already; if it exists and is active, that
@@ -828,40 +878,25 @@ create_replication_slot(PGconn *conn, char *slot_name)
* if not we can reuse it as-is
*/
sqlquery_snprintf(sqlquery,
"SELECT active, slot_type "
" FROM pg_replication_slots "
" WHERE slot_name = '%s' ",
slot_name);
query_res = get_slot_record(conn, slot_name, &slot_info);
log_verbose(LOG_DEBUG, "create_replication_slot():\n%s\n", sqlquery);
res = PQexec(conn, sqlquery);
if (!res || PQresultStatus(res) != PGRES_TUPLES_OK)
if (query_res)
{
log_err(_("unable to query pg_replication_slots: %s\n"),
PQerrorMessage(conn));
PQclear(res);
return false;
}
if (PQntuples(res))
{
if (strcmp(PQgetvalue(res, 0, 1), "physical") != 0)
if (strcmp(slot_info.slot_type, "physical") != 0)
{
log_err(_("Slot '%s' exists and is not a physical slot\n"),
slot_name);
PQclear(res);
return false;
}
if (strcmp(PQgetvalue(res, 0, 0), "f") == 0)
if (slot_info.active == false)
{
PQclear(res);
log_debug("Replication slot '%s' exists but is inactive; reusing\n",
slot_name);
return true;
}
PQclear(res);
log_err(_("Slot '%s' already exists as an active slot\n"),
slot_name);
return false;
@@ -889,6 +924,73 @@ create_replication_slot(PGconn *conn, char *slot_name)
}
int
get_slot_record(PGconn *conn, char *slot_name, t_replication_slot *record)
{
char sqlquery[QUERY_STR_LEN];
PGresult *res;
sqlquery_snprintf(sqlquery,
"SELECT slot_name, slot_type, active "
" FROM pg_replication_slots "
" WHERE slot_name = '%s' ",
slot_name);
log_verbose(LOG_DEBUG, "get_slot_record():\n%s\n", sqlquery);
res = PQexec(conn, sqlquery);
if (!res || PQresultStatus(res) != PGRES_TUPLES_OK)
{
log_err(_("unable to query pg_replication_slots: %s\n"),
PQerrorMessage(conn));
PQclear(res);
return -1;
}
if (!PQntuples(res))
{
return 0;
}
strncpy(record->slot_name, PQgetvalue(res, 0, 0), MAXLEN);
strncpy(record->slot_type, PQgetvalue(res, 0, 1), MAXLEN);
record->active = (strcmp(PQgetvalue(res, 0, 2), "t") == 0)
? true
: false;
PQclear(res);
return 1;
}
bool
drop_replication_slot(PGconn *conn, char *slot_name)
{
char sqlquery[QUERY_STR_LEN];
PGresult *res;
sqlquery_snprintf(sqlquery,
"SELECT pg_drop_replication_slot('%s')",
slot_name);
log_verbose(LOG_DEBUG, "drop_replication_slot():\n%s\n", sqlquery);
res = PQexec(conn, sqlquery);
if (!res || PQresultStatus(res) != PGRES_TUPLES_OK)
{
log_err(_("unable to drop replication slot \"%s\":\n %s\n"),
slot_name,
PQerrorMessage(conn));
PQclear(res);
return false;
}
log_verbose(LOG_DEBUG, "replication slot \"%s\" successfully dropped\n",
slot_name);
return true;
}
bool
start_backup(PGconn *conn, char *first_wal_segment, bool fast_checkpoint)
{
@@ -1400,6 +1502,51 @@ create_event_record(PGconn *conn, t_configuration_options *options, int node_id,
return success;
}
/*
* Update node record following change of status
* (e.g. inactive primary converted to standby)
*/
bool
update_node_record_status(PGconn *conn, char *cluster_name, int this_node_id, char *type, int upstream_node_id, bool active)
{
PGresult *res;
char sqlquery[QUERY_STR_LEN];
sqlquery_snprintf(sqlquery,
" UPDATE %s.repl_nodes "
" SET type = '%s', "
" upstream_node_id = %i, "
" active = %s "
" WHERE cluster = '%s' "
" AND id = %i ",
get_repmgr_schema_quoted(conn),
type,
upstream_node_id,
active ? "TRUE" : "FALSE",
cluster_name,
this_node_id);
log_verbose(LOG_DEBUG, "update_node_record_status():\n%s\n", sqlquery);
res = PQexec(conn, sqlquery);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
log_err(_("Unable to update node record: %s\n"),
PQerrorMessage(conn));
PQclear(res);
return false;
}
PQclear(res);
return true;
}
bool
update_node_record_set_upstream(PGconn *conn, char *cluster_name, int this_node_id, int new_upstream_node_id)
{
@@ -1437,21 +1584,106 @@ update_node_record_set_upstream(PGconn *conn, char *cluster_name, int this_node_
}
PGresult *
get_node_record(PGconn *conn, char *cluster, int node_id)
int
get_node_record(PGconn *conn, char *cluster, int node_id, t_node_info *node_info)
{
char sqlquery[QUERY_STR_LEN];
PGresult *res;
int ntuples;
sprintf(sqlquery,
"SELECT id, upstream_node_id, conninfo, type, slot_name, active "
" FROM %s.repl_nodes "
" WHERE cluster = '%s' "
" AND id = %i",
get_repmgr_schema_quoted(conn),
cluster,
node_id);
sqlquery_snprintf(
sqlquery,
"SELECT id, type, upstream_node_id, name, conninfo, slot_name, priority, active"
" FROM %s.repl_nodes "
" WHERE cluster = '%s' "
" AND id = %i",
get_repmgr_schema_quoted(conn),
cluster,
node_id);
log_verbose(LOG_DEBUG, "get_node_record():\n%s\n", sqlquery);
return PQexec(conn, sqlquery);
res = PQexec(conn, sqlquery);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
return -1;
}
ntuples = PQntuples(res);
if (ntuples == 0)
{
log_verbose(LOG_DEBUG, "get_node_record(): no record found for node %i\n", node_id);
return 0;
}
node_info->node_id = atoi(PQgetvalue(res, 0, 0));
node_info->type = parse_node_type(PQgetvalue(res, 0, 1));
node_info->upstream_node_id = atoi(PQgetvalue(res, 0, 2));
strncpy(node_info->name, PQgetvalue(res, 0, 3), MAXLEN);
strncpy(node_info->conninfo_str, PQgetvalue(res, 0, 4), MAXLEN);
strncpy(node_info->slot_name, PQgetvalue(res, 0, 5), MAXLEN);
node_info->priority = atoi(PQgetvalue(res, 0, 6));
node_info->active = (strcmp(PQgetvalue(res, 0, 7), "t") == 0)
? true
: false;
PQclear(res);
return ntuples;
}
int
get_node_replication_state(PGconn *conn, char *node_name, char *output)
{
char sqlquery[QUERY_STR_LEN];
PGresult * res;
sqlquery_snprintf(
sqlquery,
" SELECT state "
" FROM pg_catalog.pg_stat_replication"
" WHERE application_name = '%s'",
node_name
);
res = PQexec(conn, sqlquery);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
PQclear(res);
return -1;
}
if (PQntuples(res) == 0)
{
PQclear(res);
return 0;
}
strncpy(output, PQgetvalue(res, 0, 0), MAXLEN);
PQclear(res);
return true;
}
t_server_type
parse_node_type(const char *type)
{
if (strcmp(type, "master") == 0)
{
return MASTER;
}
else if (strcmp(type, "standby") == 0)
{
return STANDBY;
}
else if (strcmp(type, "witness") == 0)
{
return WITNESS;
}
return UNKNOWN;
}

View File

@@ -52,6 +52,18 @@ typedef struct s_node_info
} t_node_info;
/*
* Struct to store replication slot information
*/
typedef struct s_replication_slot
{
char slot_name[MAXLEN];
char slot_type[MAXLEN];
bool active;
} t_replication_slot;
#define T_NODE_INFO_INITIALIZER { \
NODE_NOT_FOUND, \
NO_UPSTREAM_NODE, \
@@ -86,7 +98,7 @@ int guc_set(PGconn *conn, const char *parameter, const char *op,
const char *value);
int guc_set_typed(PGconn *conn, const char *parameter, const char *op,
const char *value, const char *datatype);
bool get_conninfo_value(const char *conninfo, const char *keyword, char *output);
PGconn *get_upstream_connection(PGconn *standby_conn, char *cluster,
int node_id,
int *upstream_node_id_ptr,
@@ -99,15 +111,19 @@ bool cancel_query(PGconn *conn, int timeout);
char *get_repmgr_schema(void);
char *get_repmgr_schema_quoted(PGconn *conn);
bool create_replication_slot(PGconn *conn, char *slot_name);
int get_slot_record(PGconn *conn, char *slot_name, t_replication_slot *record);
bool drop_replication_slot(PGconn *conn, char *slot_name);
bool start_backup(PGconn *conn, char *first_wal_segment, bool fast_checkpoint);
bool stop_backup(PGconn *conn, char *last_wal_segment);
bool set_config_bool(PGconn *conn, const char *config_param, bool state);
bool copy_configuration(PGconn *masterconn, PGconn *witnessconn, char *cluster_name);
bool create_node_record(PGconn *conn, char *action, int node, char *type, int upstream_node, char *cluster_name, char *node_name, char *conninfo, int priority, char *slot_name);
bool delete_node_record(PGconn *conn, int node, char *action);
bool create_event_record(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details);
int get_node_record(PGconn *conn, char *cluster, int node_id, t_node_info *node_info);
bool update_node_record_status(PGconn *conn, char *cluster_name, int this_node_id, char *type, int upstream_node_id, bool active);
bool update_node_record_set_upstream(PGconn *conn, char *cluster_name, int this_node_id, int new_upstream_node_id);
PGresult * get_node_record(PGconn *conn, char *cluster, int node_id);
bool create_event_record(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details);
int get_node_replication_state(PGconn *conn, char *node_name, char *output);
t_server_type parse_node_type(const char *type);
#endif

870
repmgr.c

File diff suppressed because it is too large Load Diff

View File

@@ -73,21 +73,30 @@ typedef struct
bool rsync_only;
bool fast_checkpoint;
bool ignore_external_config_files;
char pg_ctl_mode[MAXLEN];
char masterport[MAXLEN];
char localport[MAXLEN];
/*
* configuration file parameters which can be overridden on the
* command line
*/
char loglevel[MAXLEN];
/* parameter used by STANDBY SWITCHOVER */
char remote_config_file[MAXLEN];
/* parameter used by CLUSTER CLEANUP */
int keep_history;
char pg_bindir[MAXLEN];
char recovery_min_apply_delay[MAXLEN];
/* deprecated command line option */
char localport[MAXLEN];
} t_runtime_options;
#define T_RUNTIME_OPTIONS_INITIALIZER { "", "", "", "", "", "", "", DEFAULT_WAL_KEEP_SEGMENTS, false, false, false, false, false, false, false, false, false, "", "", "", 0, "", "" }
#define T_RUNTIME_OPTIONS_INITIALIZER { "", "", "", "", "", "", "", DEFAULT_WAL_KEEP_SEGMENTS, false, false, false, false, false, false, false, false, false, "smart", "", "", "", 0, "", "", "" }
extern char repmgr_schema[MAXLEN];
extern bool config_file_found;
#endif

View File

@@ -79,7 +79,6 @@ static void do_master_failover(void);
static bool do_upstream_standby_failover(t_node_info upstream_node);
static t_node_info get_node_info(PGconn *conn, char *cluster, int node_id);
static t_server_type parse_node_type(const char *type);
static XLogRecPtr lsn_to_xlogrecptr(char *lsn, bool *format_ok);
/*
@@ -2268,13 +2267,13 @@ check_and_create_pid_file(const char *pid_file)
t_node_info
get_node_info(PGconn *conn, char *cluster, int node_id)
{
PGresult *res;
int res;
t_node_info node_info = T_NODE_INFO_INITIALIZER;
res = get_node_record(conn, cluster, node_id);
res = get_node_record(conn, cluster, node_id, &node_info);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
if (res == -1)
{
PQExpBufferData errmsg;
initPQExpBuffer(&errmsg);
@@ -2293,47 +2292,16 @@ get_node_info(PGconn *conn, char *cluster, int node_id)
false,
errmsg.data);
PQclear(res);
PQfinish(conn);
terminate(ERR_DB_QUERY);
}
if (!PQntuples(res)) {
if (res == 0)
{
log_warning(_("No record found record for node %i\n"), node_id);
PQclear(res);
node_info.node_id = NODE_NOT_FOUND;
return node_info;
}
node_info.node_id = atoi(PQgetvalue(res, 0, 0));
node_info.upstream_node_id = atoi(PQgetvalue(res, 0, 1));
strncpy(node_info.conninfo_str, PQgetvalue(res, 0, 2), MAXLEN);
node_info.type = parse_node_type(PQgetvalue(res, 0, 3));
strncpy(node_info.slot_name, PQgetvalue(res, 0, 4), MAXLEN);
node_info.active = (strcmp(PQgetvalue(res, 0, 5), "t") == 0)
? true
: false;
PQclear(res);
return node_info;
}
static t_server_type
parse_node_type(const char *type)
{
if (strcmp(type, "master") == 0)
{
return MASTER;
}
else if (strcmp(type, "standby") == 0)
{
return STANDBY;
}
else if (strcmp(type, "witness") == 0)
{
return WITNESS;
}
return UNKNOWN;
}