master register: create repmgr extension

This commit is contained in:
Ian Barwick
2017-04-21 18:38:28 +09:00
parent 001d887e8d
commit 77870d3887
4 changed files with 543 additions and 5 deletions

282
dbutils.c
View File

@@ -104,6 +104,262 @@ establish_db_connection(const char *conninfo, const bool exit_on_error)
return _establish_db_connection(conninfo, exit_on_error, false, false);
}
PGconn *
establish_db_connection_as_user(const char *conninfo,
const char *user,
const bool exit_on_error)
{
PGconn *conn = NULL;
t_conninfo_param_list conninfo_params;
bool parse_success;
char *errmsg = NULL;
initialize_conninfo_params(&conninfo_params, false);
parse_success = parse_conninfo_string(conninfo, &conninfo_params, errmsg, true);
if (parse_success == false)
{
log_error(_("unable to pass provided conninfo string:\n %s"), errmsg);
return NULL;
}
param_set(&conninfo_params, "user", user);
conn = establish_db_connection_by_params((const char**)conninfo_params.keywords,
(const char**)conninfo_params.values,
false);
return conn;
}
PGconn *
establish_db_connection_by_params(const char *keywords[], const char *values[],
const bool exit_on_error)
{
PGconn *conn;
bool replication_connection = false;
int i;
/* Connect to the database using the provided parameters */
conn = PQconnectdbParams(keywords, values, true);
/* Check to see that the backend connection was successfully made */
if ((PQstatus(conn) != CONNECTION_OK))
{
log_error(_("connection to database failed:\n %s"),
PQerrorMessage(conn));
if (exit_on_error)
{
PQfinish(conn);
exit(ERR_DB_CON);
}
}
else
{
/*
* set "synchronous_commit" to "local" in case synchronous replication is in
* use (provided this is not a replication connection)
*/
for (i = 0; keywords[i]; i++)
{
if (strcmp(keywords[i], "replication") == 0)
replication_connection = true;
}
if (replication_connection == false && set_config(conn, "synchronous_commit", "local") == false)
{
if (exit_on_error)
{
PQfinish(conn);
exit(ERR_DB_CON);
}
}
}
return conn;
}
/* =============================== */
/* conninfo manipulation functions */
/* =============================== */
void
initialize_conninfo_params(t_conninfo_param_list *param_list, bool set_defaults)
{
PQconninfoOption *defs = NULL;
PQconninfoOption *def;
int c;
defs = PQconndefaults();
param_list->size = 0;
/* Count maximum number of parameters */
for (def = defs; def->keyword; def++)
param_list->size ++;
/* Initialize our internal parameter list */
param_list->keywords = pg_malloc0(sizeof(char *) * (param_list->size + 1));
param_list->values = pg_malloc0(sizeof(char *) * (param_list->size + 1));
for (c = 0; c < param_list->size; c++)
{
param_list->keywords[c] = NULL;
param_list->values[c] = NULL;
}
if (set_defaults == true)
{
/* Pre-set any defaults */
for (def = defs; def->keyword; def++)
{
if (def->val != NULL && def->val[0] != '\0')
{
param_set(param_list, def->keyword, def->val);
}
}
}
}
void
copy_conninfo_params(t_conninfo_param_list *dest_list, t_conninfo_param_list *source_list)
{
int c;
for (c = 0; c < source_list->size && source_list->keywords[c] != NULL; c++)
{
if (source_list->values[c] != NULL && source_list->values[c][0] != '\0')
{
param_set(dest_list, source_list->keywords[c], source_list->values[c]);
}
}
}
void
param_set(t_conninfo_param_list *param_list, const char *param, const char *value)
{
int c;
int value_len = strlen(value) + 1;
/*
* Scan array to see if the parameter is already set - if not, replace it
*/
for (c = 0; c < param_list->size && param_list->keywords[c] != NULL; c++)
{
if (strcmp(param_list->keywords[c], param) == 0)
{
if (param_list->values[c] != NULL)
pfree(param_list->values[c]);
param_list->values[c] = pg_malloc0(value_len);
strncpy(param_list->values[c], value, value_len);
return;
}
}
/*
* Parameter not in array - add it and its associated value
*/
if (c < param_list->size)
{
int param_len = strlen(param) + 1;
param_list->keywords[c] = pg_malloc0(param_len);
param_list->values[c] = pg_malloc0(value_len);
strncpy(param_list->keywords[c], param, param_len);
strncpy(param_list->values[c], value, value_len);
}
/*
* It's theoretically possible a parameter couldn't be added as
* the array is full, but it's highly improbable so we won't
* handle it at the moment.
*/
}
char *
param_get(t_conninfo_param_list *param_list, const char *param)
{
int c;
for (c = 0; c < param_list->size && param_list->keywords[c] != NULL; c++)
{
if (strcmp(param_list->keywords[c], param) == 0)
{
if (param_list->values[c] != NULL && param_list->values[c][0] != '\0')
return param_list->values[c];
else
return NULL;
}
}
return NULL;
}
/*
* Parse a conninfo string into a t_conninfo_param_list
*
* See conn_to_param_list() to do the same for a PQconn
*/
bool
parse_conninfo_string(const char *conninfo_str, t_conninfo_param_list *param_list, char *errmsg, bool ignore_application_name)
{
PQconninfoOption *connOptions;
PQconninfoOption *option;
connOptions = PQconninfoParse(conninfo_str, &errmsg);
if (connOptions == NULL)
return false;
for (option = connOptions; option && option->keyword; option++)
{
/* Ignore non-set or blank parameter values*/
if ((option->val == NULL) ||
(option->val != NULL && option->val[0] == '\0'))
continue;
/* Ignore application_name */
if (ignore_application_name == true && strcmp(option->keyword, "application_name") == 0)
continue;
param_set(param_list, option->keyword, option->val);
}
return true;
}
/*
* Parse a PQconn into a t_conninfo_param_list
*
* See parse_conninfo_string() to do the same for a conninfo string
*/
void
conn_to_param_list(PGconn *conn, t_conninfo_param_list *param_list)
{
PQconninfoOption *connOptions;
PQconninfoOption *option;
connOptions = PQconninfo(conn);
for (option = connOptions; option && option->keyword; option++)
{
/* Ignore non-set or blank parameter values*/
if ((option->val == NULL) ||
(option->val != NULL && option->val[0] == '\0'))
continue;
param_set(param_list, option->keyword, option->val);
}
}
/* ========================== */
/* GUC manipulation functions */
/* ========================== */
@@ -185,3 +441,29 @@ get_server_version(PGconn *conn, char *server_version)
return atoi(PQgetvalue(res, 0, 0));
}
int
is_standby(PGconn *conn)
{
PGresult *res;
int result = 0;
char *sqlquery = "SELECT pg_catalog.pg_is_in_recovery()";
log_verbose(LOG_DEBUG, "is_standby(): %s", sqlquery);
res = PQexec(conn, sqlquery);
if (res == NULL || PQresultStatus(res) != PGRES_TUPLES_OK)
{
log_error(_("unable to determine if server is in recovery: %s"),
PQerrorMessage(conn));
result = -1;
}
else if (PQntuples(res) == 1 && strcmp(PQgetvalue(res, 0, 0), "t") == 0)
{
result = 1;
}
PQclear(res);
return result;
}

View File

@@ -82,10 +82,21 @@ typedef struct s_event_info
NULL \
}
/*
* Struct to store list of conninfo keywords and values
*/
typedef struct
{
int size;
char **keywords;
char **values;
} t_conninfo_param_list;
/*
* Struct to store replication slot information
*/
typedef struct s_replication_slot
{
char slot_name[MAXLEN];
@@ -98,6 +109,22 @@ typedef struct s_replication_slot
PGconn *establish_db_connection(const char *conninfo,
const bool exit_on_error);
PGconn *establish_db_connection_as_user(const char *conninfo,
const char *user,
const bool exit_on_error);
PGconn *establish_db_connection_by_params(const char *keywords[],
const char *values[],
const bool exit_on_error);
/* conninfo manipulation functions */
void initialize_conninfo_params(t_conninfo_param_list *param_list, bool set_defaults);
void copy_conninfo_params(t_conninfo_param_list *dest_list, t_conninfo_param_list *source_list);
void conn_to_param_list(PGconn *conn, t_conninfo_param_list *param_list);
void param_set(t_conninfo_param_list *param_list, const char *param, const char *value);
char *param_get(t_conninfo_param_list *param_list, const char *param);
bool parse_conninfo_string(const char *conninfo_str, t_conninfo_param_list *param_list, char *errmsg, bool ignore_application_name);
/* GUC manipulation functions */
bool set_config(PGconn *conn, const char *config_param, const char *config_value);
@@ -105,6 +132,7 @@ bool set_config_bool(PGconn *conn, const char *config_param, bool state);
/* Server information functions */
int get_server_version(PGconn *conn, char *server_version);
int is_standby(PGconn *conn);
#endif

View File

@@ -27,6 +27,7 @@ ItemList cli_errors = { NULL, NULL };
ItemList cli_warnings = { NULL, NULL };
static bool config_file_required = true;
static char pg_bindir[MAXLEN] = "";
static char repmgr_slot_name[MAXLEN] = "";
static char *repmgr_slot_name_ptr = NULL;
@@ -56,7 +57,7 @@ main(int argc, char **argv)
logger_output_mode = OM_COMMAND_LINE;
while ((c = getopt_long(argc, argv, "?Vf:vtFb:", long_options,
while ((c = getopt_long(argc, argv, "?Vf:vtFb:S:L:", long_options,
&optindex)) != -1)
{
/*
@@ -102,6 +103,14 @@ main(int argc, char **argv)
strncpy(runtime_options.pg_bindir, optarg, MAXLEN);
break;
/* connection options */
/* ------------------ */
/* -S/--superuser */
case 'S':
strncpy(runtime_options.superuser, optarg, MAXLEN);
break;
/* logging options
* --------------- */
@@ -255,6 +264,37 @@ main(int argc, char **argv)
runtime_options.verbose,
&config_file_options,
argv[0]);
/* Some configuration file items can be overriden by command line options */
/* Command-line parameter -L/--log-level overrides any setting in config file*/
if (*runtime_options.loglevel != '\0')
{
strncpy(config_file_options.loglevel, runtime_options.loglevel, MAXLEN);
}
/*
* Initialise pg_bindir - command line parameter will override
* any setting in the configuration file
*/
if (!strlen(runtime_options.pg_bindir))
{
strncpy(runtime_options.pg_bindir, config_file_options.pg_bindir, MAXLEN);
}
/* Add trailing slash */
if (strlen(runtime_options.pg_bindir))
{
int len = strlen(runtime_options.pg_bindir);
if (runtime_options.pg_bindir[len - 1] != '/')
{
maxlen_snprintf(pg_bindir, "%s/", runtime_options.pg_bindir);
}
else
{
strncpy(pg_bindir, runtime_options.pg_bindir, MAXLEN);
}
}
/*
* Initialize the logger. We've previously requested STDERR logging only
* to ensure the repmgr command doesn't have its output diverted to a logging
@@ -395,16 +435,193 @@ do_help(void)
static void
do_master_register(void)
{
PGconn *conn;
PGconn *conn = NULL;
int ret;
log_info(_("connecting to master database..."));
// XXX if con fails, have this print offending conninfo!
conn = establish_db_connection(config_file_options.conninfo, true);
log_verbose(LOG_INFO, _("connected to server, checking its state"));
/* verify that node is running a supported server version */
check_server_version(conn, "master", true, NULL);
/* check that node is actually a master */
ret = is_standby(conn);
if (ret)
{
log_error(_(ret == 1 ? "server is in standby mode and cannot be registered as a master" :
"connection to node lost!"));
PQfinish(conn);
exit(ERR_BAD_CONFIG);
}
log_verbose(LOG_INFO, _("server is not in recovery"));
/* create the repmgr extension if it doesn't already exist */
if (!create_repmgr_extension(conn))
{
PQfinish(conn);
exit(ERR_BAD_CONFIG);
}
}
// this should be the only place where superuser rights required
static
bool create_repmgr_extension(PGconn *conn)
{
PQExpBufferData query;
PGresult *res;
char *current_user;
const char *superuser_status;
bool is_superuser;
PGconn *superuser_conn = NULL;
PGconn *schema_create_conn = NULL;
initPQExpBuffer(&query);
appendPQExpBuffer(&query,
" SELECT ae.name, e.extname "
" FROM pg_catalog.pg_available_extensions ae "
"LEFT JOIN pg_catalog.pg_extension e "
" ON e.extname=ae.name "
" WHERE ae.name='repmgr' ");
res = PQexec(conn, query.data);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
log_error(_("unable to execute extension query:\n %s"),
PQerrorMessage(conn));
PQclear(res);
return false;
}
/* 1. Check if extension installed */
if (PQgetisnull(res, 0, 1) == 0)
{
/* TODO: check version */
log_info(_("extension \"repmgr\" already installed"));
return true;
}
/* 2. If not, check extension available */
if (PQgetisnull(res, 0, 0) == 1)
{
log_error(_("\"repmgr\" extension is not available"));
return false;
}
PQclear(res);
termPQExpBuffer(&query);
log_notice(_("attempting to install extension \"repmgr\""));
/* 3. Check if repmgr user is superuser, if not connect as superuser */
current_user = PQuser(conn);
superuser_status = PQparameterStatus(conn, "is_superuser");
is_superuser = (strcmp(superuser_status, "on") == 0) ? true : false;
if (is_superuser == false)
{
if (runtime_options.superuser[0] == '\0')
{
log_error(_("\"%s\" is not a superuser and no superuser name supplied"), current_user);
log_hint(_("supply a valid superuser name with -S/--superuser"));
return false;
}
superuser_conn = establish_db_connection_as_user(config_file_options.conninfo,
runtime_options.superuser,
false);
if (PQstatus(superuser_conn) != CONNECTION_OK)
{
log_error(_("unable to establish superuser connection as \"%s\""), runtime_options.superuser);
return false;
}
superuser_status = PQparameterStatus(superuser_conn, "is_superuser");
if (strcmp(superuser_status, "off") == 0)
{
log_error(_("\"%s\" is not a superuser"), runtime_options.superuser);
PQfinish(superuser_conn);
return false;
}
schema_create_conn = superuser_conn;
}
else
{
schema_create_conn = conn;
}
/* 4. Create extension */
initPQExpBuffer(&query);
appendPQExpBuffer(&query,
"CREATE EXTENSION repmgr");
res = PQexec(schema_create_conn, query.data);
termPQExpBuffer(&query);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
log_error(_("unable to create \"repmgr\" extension:\n %s"),
PQerrorMessage(schema_create_conn));
log_hint(_("check that the provided user has sufficient privileges for CREATE EXTENSION"));
PQclear(res);
if (superuser_conn != 0)
PQfinish(superuser_conn);
return false;
}
PQclear(res);
/* 5. If not superuser, grant usage */
if (is_superuser == false)
{
initPQExpBuffer(&query);
appendPQExpBuffer(&query,
"GRANT ALL ON ALL TABLES IN SCHEMA repmgr TO %s",
current_user);
res = PQexec(schema_create_conn, query.data);
termPQExpBuffer(&query);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
log_error(_("unable to grant usage on \"repmgr\" extension to %s:\n %s"),
current_user,
PQerrorMessage(schema_create_conn));
PQclear(res);
if (superuser_conn != 0)
PQfinish(superuser_conn);
return false;
}
}
if (superuser_conn != 0)
PQfinish(superuser_conn);
log_notice(_("\"repmgr\" extension successfully installed"));
return true;
}
/**
* check_server_version()
*

View File

@@ -64,11 +64,15 @@ static struct option long_options[] =
/* general options */
{"version", no_argument, NULL, 'V'},
{"help", no_argument, NULL, OPT_HELP},
/* general configuration options */
{"config-file", required_argument, NULL, 'f'},
{"force", no_argument, NULL, 'F'},
{"pg_bindir", required_argument, NULL, 'b'},
/* connection options */
{"superuser", required_argument, NULL, 'S'},
/* logging options */
{"log-level", required_argument, NULL, 'L'},
{"log-to-file", no_argument, NULL, OPT_LOG_TO_FILE},
@@ -115,12 +119,16 @@ typedef struct
char config_file[MAXPGPATH];
bool force;
char pg_bindir[MAXLEN]; /* overrides setting in repmgr.conf */
/* logging options */
char loglevel[MAXLEN]; /* overrides setting in repmgr.conf */
bool log_to_file;
bool terse;
bool verbose;
/* connection options */
char superuser[MAXLEN];
} t_runtime_options;
@@ -128,7 +136,9 @@ typedef struct
/* general configuration options */ \
"", false, "", \
/* logging options */ \
"", false, false, false}
"", false, false, false, \
/* connection options */ \
""}
static void do_help(void);
static void do_master_register(void);
@@ -136,5 +146,6 @@ static void do_master_register(void);
static void exit_with_errors(void);
static void print_error_list(ItemList *error_list, int log_level);
static int check_server_version(PGconn *conn, char *server_type, bool exit_on_error, char *server_version_string);
static bool create_repmgr_extension(PGconn *conn);
#endif