Implement "standby register"

This commit is contained in:
Ian Barwick
2017-05-05 10:50:41 +09:00
parent 06b79f2263
commit 7844afa074
9 changed files with 485 additions and 13 deletions

View File

@@ -113,6 +113,15 @@ establish_db_connection(const char *conninfo, const bool exit_on_error)
return _establish_db_connection(conninfo, exit_on_error, false, false); return _establish_db_connection(conninfo, exit_on_error, false, false);
} }
/*
* Attempt to establish a database connection, never exit on error, only
* output error messages if --verbose option used
*/
PGconn *
establish_db_connection_quiet(const char *conninfo)
{
return _establish_db_connection(conninfo, false, false, true);
}
PGconn * PGconn *
establish_db_connection_as_user(const char *conninfo, establish_db_connection_as_user(const char *conninfo,
@@ -571,34 +580,37 @@ _set_config(PGconn *conn, const char *config_param, const char *sqlquery)
return true; return true;
} }
bool bool
set_config(PGconn *conn, const char *config_param, const char *config_value) set_config(PGconn *conn, const char *config_param, const char *config_value)
{ {
char sqlquery[MAX_QUERY_LEN]; PQExpBufferData query;
sqlquery_snprintf(sqlquery, initPQExpBuffer(&query);
appendPQExpBuffer(&query,
"SET %s TO '%s'", "SET %s TO '%s'",
config_param, config_param,
config_value); config_value);
log_verbose(LOG_DEBUG, "set_config():\n%s", sqlquery); log_verbose(LOG_DEBUG, "set_config():\n %s", query.data);
return _set_config(conn, config_param, sqlquery); return _set_config(conn, config_param, query.data);
} }
bool bool
set_config_bool(PGconn *conn, const char *config_param, bool state) set_config_bool(PGconn *conn, const char *config_param, bool state)
{ {
char sqlquery[MAX_QUERY_LEN]; PQExpBufferData query;
sqlquery_snprintf(sqlquery, initPQExpBuffer(&query);
appendPQExpBuffer(&query,
"SET %s TO %s", "SET %s TO %s",
config_param, config_param,
state ? "TRUE" : "FALSE"); state ? "TRUE" : "FALSE");
log_verbose(LOG_DEBUG, "set_config_bool():\n%s\n", sqlquery); log_verbose(LOG_DEBUG, "set_config_bool():\n %s", query.data);
return _set_config(conn, config_param, sqlquery); return _set_config(conn, config_param, query.data);
} }
@@ -1276,14 +1288,16 @@ _create_update_node_record(PGconn *conn, char *action, t_node_info *node_info)
*/ */
int primary_node_id = get_master_node_id(conn); int primary_node_id = get_master_node_id(conn);
maxlen_snprintf(upstream_node_id, "%i", primary_node_id); maxlen_snprintf(upstream_node_id, "%i", primary_node_id);
upstream_node_id_ptr = upstream_node_id;
} }
else else if (node_info->upstream_node_id != NO_UPSTREAM_NODE)
{ {
maxlen_snprintf(upstream_node_id, "%i", node_info->upstream_node_id); maxlen_snprintf(upstream_node_id, "%i", node_info->upstream_node_id);
upstream_node_id_ptr = upstream_node_id;
} }
if (node_info->slot_name[0]) if (node_info->slot_name[0])
maxlen_snprintf(slot_name, "'%s'", node_info->slot_name); maxlen_snprintf(slot_name, "%s", node_info->slot_name);
param_values[0] = get_node_type_string(node_info->type); param_values[0] = get_node_type_string(node_info->type);

View File

@@ -123,7 +123,7 @@ typedef struct s_connection_user
/* connection functions */ /* connection functions */
PGconn *establish_db_connection(const char *conninfo, PGconn *establish_db_connection(const char *conninfo,
const bool exit_on_error); const bool exit_on_error);
PGconn *establish_db_connection_quiet(const char *conninfo);
PGconn *establish_db_connection_as_user(const char *conninfo, PGconn *establish_db_connection_as_user(const char *conninfo,
const char *user, const char *user,
const bool exit_on_error); const bool exit_on_error);

View File

@@ -138,7 +138,7 @@ do_master_register(void)
node_info.priority = config_file_options.priority; node_info.priority = config_file_options.priority;
initPQExpBuffer(&event_description); initPQExpBuffer(&event_description);
puts("here");
if (record_found) if (record_found)
{ {
record_created = update_node_record(conn, record_created = update_node_record(conn,

View File

@@ -79,6 +79,7 @@ static char barman_command_buf[MAXLEN] = "";
static void check_barman_config(void); static void check_barman_config(void);
static void check_source_server(void); static void check_source_server(void);
static void check_source_server_via_barman(void); static void check_source_server_via_barman(void);
static void check_master_standby_version_match(PGconn *conn, PGconn *master_conn);
static void initialise_direct_clone(void); static void initialise_direct_clone(void);
@@ -529,6 +530,391 @@ check_barman_config(void)
} }
void
do_standby_register(void)
{
PGconn *conn;
PGconn *master_conn;
int ret;
bool record_created;
t_node_info node_record = T_NODE_INFO_INITIALIZER;
int node_result;
log_info(_("connecting to standby database"));
conn = establish_db_connection_quiet(config_file_options.conninfo);
if (PQstatus(conn) != CONNECTION_OK)
{
if (!runtime_options.force)
{
log_error(_("unable to connect to local node %i (\"%s\"):"),
config_file_options.node_id,
config_file_options.node_name);
log_detail(_("%s"),
PQerrorMessage(conn));
log_hint(_("to register a standby which is not running, provide master connection parameters and use option -F/--force"));
exit(ERR_BAD_CONFIG);
}
if (!runtime_options.connection_param_provided)
{
log_error(_("unable to connect to local node %i (\"%s\") and no master connection parameters provided"),
config_file_options.node_id,
config_file_options.node_name);
exit(ERR_BAD_CONFIG);
}
}
if (PQstatus(conn) == CONNECTION_OK)
{
ret = is_standby(conn);
if (ret == 0 || ret == -1)
{
log_error(_(ret == 0 ? "this node should be a standby (%s)" :
"connection to node (%s) lost"), config_file_options.conninfo);
PQfinish(conn);
exit(ERR_BAD_CONFIG);
}
}
/* check if there is a master in this cluster */
log_info(_("connecting to master database"));
/* Normal case - we can connect to the local node */
if (PQstatus(conn) == CONNECTION_OK)
{
master_conn = get_master_connection(conn, NULL, NULL);
}
/* User is forcing a registration and must have supplied master connection info */
else
{
master_conn = establish_db_connection_by_params((const char**)source_conninfo.keywords,
(const char**)source_conninfo.values,
false);
}
/*
* no amount of --force will make it possible to register the standby
* without a master server to connect to
*/
if (PQstatus(master_conn) != CONNECTION_OK)
{
log_error(_("unable to connect to the master database"));
log_hint(_("a master must be configured before registering a standby"));
exit(ERR_BAD_CONFIG);
}
/*
* Verify that standby and master are supported and compatible server
* versions
*
* If the user is registering an inactive standby, we'll trust they know
* what they're doing
*/
if (PQstatus(conn) == CONNECTION_OK)
{
check_master_standby_version_match(conn, master_conn);
}
/*
* Check that an active node with the same node_name doesn't exist already
*/
node_result = get_node_record_by_name(master_conn,
config_file_options.node_name,
&node_record);
if (node_result)
{
if (node_record.active == true)
{
log_error(_("node %i exists already with node_name \"%s\""),
node_record.node_id,
config_file_options.node_name);
PQfinish(master_conn);
if (PQstatus(conn) == CONNECTION_OK)
PQfinish(conn);
exit(ERR_BAD_CONFIG);
}
}
/* Check if node record exists */
node_result = get_node_record(master_conn,
config_file_options.node_id,
&node_record);
if (node_result && !runtime_options.force)
{
log_error(_("node %i is already registered"),
config_file_options.node_id);
log_hint(_("use option -F/--force to overwrite an existing node record"));
PQfinish(master_conn);
if (PQstatus(conn) == CONNECTION_OK)
PQfinish(conn);
exit(ERR_BAD_CONFIG);
}
/*
* If an upstream node is defined, check if that node exists and is active
* If it doesn't exist, and --force set, create a minimal inactive record
*/
if (config_file_options.upstream_node_id != NO_UPSTREAM_NODE)
{
int upstream_node_result;
upstream_node_result = get_node_record(master_conn,
config_file_options.upstream_node_id,
&node_record);
if (!upstream_node_result)
{
t_node_info upstream_node_record = T_NODE_INFO_INITIALIZER;
if (!runtime_options.force)
{
log_error(_("no record found for upstream node %i"),
config_file_options.upstream_node_id);
/* footgun alert - only do this if you know what you're doing */
log_hint(_("use option -F/--force to create a dummy upstream record"));
PQfinish(master_conn);
if (PQstatus(conn) == CONNECTION_OK)
PQfinish(conn);
exit(ERR_BAD_CONFIG);
}
log_notice(_("creating placeholder record for upstream node %i"),
config_file_options.upstream_node_id);
upstream_node_record.node_id = config_file_options.upstream_node_id;
upstream_node_record.type = STANDBY;
upstream_node_record.upstream_node_id = NO_UPSTREAM_NODE;
strncpy(upstream_node_record.conninfo, runtime_options.upstream_conninfo, MAXLEN);
upstream_node_record.active = false;
record_created = create_node_record(master_conn,
"standby register",
&upstream_node_record);
/*
* It's possible, in the kind of scenario this functionality is intended
* to support, that there's a race condition where the node's actual
* record gets inserted, causing the insert of the placeholder record
* to fail. If this is the case, we don't worry about this insert failing;
* if not we bail out.
*
* TODO: teach create_node_record() to use ON CONFLICT DO NOTHING for
* 9.5 and later.
*/
if (record_created == false)
{
upstream_node_result = get_node_record(master_conn,
config_file_options.upstream_node_id,
&node_record);
if (!upstream_node_result)
{
log_error(_("unable to create placeholder record for upstream node %i"),
config_file_options.upstream_node_id);
PQfinish(master_conn);
if (PQstatus(conn) == CONNECTION_OK)
PQfinish(conn);
exit(ERR_BAD_CONFIG);
}
log_info(_("a record for upstream node %i was already created"),
config_file_options.upstream_node_id);
}
}
else if (node_record.active == false)
{
/*
* upstream node is inactive and --force not supplied - refuse to register
*/
if (!runtime_options.force)
{
log_error(_("record for upstream node %i is marked as inactive"),
config_file_options.upstream_node_id);
log_hint(_("use option -F/--force to register a standby with an inactive upstream node"));
PQfinish(master_conn);
if (PQstatus(conn) == CONNECTION_OK)
PQfinish(conn);
exit(ERR_BAD_CONFIG);
}
/*
* user is using the --force - notify about the potential footgun
*/
log_notice(_("registering node %i with inactive upstream node %i"),
config_file_options.node_id,
config_file_options.upstream_node_id);
}
}
/* populate node record structure */
node_record.node_id = config_file_options.node_id;
node_record.type = STANDBY;
node_record.upstream_node_id = config_file_options.upstream_node_id;
node_record.priority = config_file_options.priority;
node_record.active = true;
strncpy(node_record.node_name, config_file_options.node_name, MAXLEN);
strncpy(node_record.conninfo, config_file_options.conninfo, MAXLEN);
if (repmgr_slot_name_ptr != NULL)
strncpy(node_record.slot_name, repmgr_slot_name_ptr, MAXLEN);
/*
* node record exists - update it
* (at this point we have already established that -F/--force is in use)
*/
if (node_result)
{
record_created = update_node_record(master_conn,
"standby register",
&node_record);
}
else
{
record_created = create_node_record(master_conn,
"standby register",
&node_record);
}
if (record_created == false)
{
/* XXX add event description */
create_event_record(master_conn,
&config_file_options,
config_file_options.node_id,
"standby_register",
false,
NULL);
PQfinish(master_conn);
if (PQstatus(conn) == CONNECTION_OK)
PQfinish(conn);
exit(ERR_BAD_CONFIG);
}
/* Log the event */
create_event_record(master_conn,
&config_file_options,
config_file_options.node_id,
"standby_register",
true,
NULL);
/* if --wait-sync option set, wait for the records to synchronise */
if (PQstatus(conn) == CONNECTION_OK &&
runtime_options.wait_register_sync == true &&
runtime_options.wait_register_sync_seconds > 0)
{
bool sync_ok = false;
int timer = 0;
int node_record_result;
t_node_info node_record_on_master = T_NODE_INFO_INITIALIZER;
t_node_info node_record_on_standby = T_NODE_INFO_INITIALIZER;
node_record_result = get_node_record(master_conn,
config_file_options.node_id,
&node_record_on_master);
if (node_record_result != 1)
{
log_error(_("unable to retrieve node record from master"));
PQfinish(master_conn);
PQfinish(conn);
exit(ERR_REGISTRATION_SYNC);
}
for (;;)
{
bool records_match = true;
if (runtime_options.wait_register_sync_seconds && runtime_options.wait_register_sync_seconds == timer)
break;
node_record_result = get_node_record(conn,
config_file_options.node_id,
&node_record_on_standby);
if (node_record_result == 0)
{
/* no record available yet on standby*/
records_match = false;
}
else if (node_record_result == 1)
{
/* compare relevant fields */
if (node_record_on_standby.upstream_node_id != node_record_on_master.upstream_node_id)
records_match = false;
if (node_record_on_standby.type != node_record_on_master.type)
records_match = false;
if (node_record_on_standby.priority != node_record_on_master.priority)
records_match = false;
if (node_record_on_standby.active != node_record_on_master.active)
records_match = false;
if (strcmp(node_record_on_standby.node_name, node_record_on_master.node_name) != 0)
records_match = false;
if (strcmp(node_record_on_standby.conninfo, node_record_on_master.conninfo) != 0)
records_match = false;
if (strcmp(node_record_on_standby.slot_name, node_record_on_master.slot_name) != 0)
records_match = false;
if (records_match == true)
{
sync_ok = true;
break;
}
}
sleep(1);
timer ++;
}
if (sync_ok == false)
{
log_error(_("node record was not synchronised after %i seconds"),
runtime_options.wait_register_sync_seconds);
PQfinish(master_conn);
PQfinish(conn);
exit(ERR_REGISTRATION_SYNC);
}
log_info(_("node record on standby synchronised from master"));
}
PQfinish(master_conn);
if (PQstatus(conn) == CONNECTION_OK)
PQfinish(conn);
log_info(_("standby registration complete"));
log_notice(_("standby node \"%s\" (id: %i) successfully registered"),
config_file_options.node_name, config_file_options.node_id);
return;
}
static void static void
@@ -2309,3 +2695,43 @@ cleanup_data_directory(void)
} }
} }
} }
/*
* check_master_standby_version_match()
*
* Check server versions of supplied connections are compatible for
* replication purposes.
*
* Exits on error.
*/
static void
check_master_standby_version_match(PGconn *conn, PGconn *master_conn)
{
char standby_version[MAXVERSIONSTR];
int standby_version_num = 0;
char master_version[MAXVERSIONSTR];
int master_version_num = 0;
standby_version_num = check_server_version(conn, "standby", true, standby_version);
/* Verify that master is a supported server version */
master_version_num = check_server_version(conn, "master", false, master_version);
if (master_version_num < 0)
{
PQfinish(conn);
PQfinish(master_conn);
exit(ERR_BAD_CONFIG);
}
/* master and standby version should match */
if ((master_version_num / 100) != (standby_version_num / 100))
{
PQfinish(conn);
PQfinish(master_conn);
log_error(_("PostgreSQL versions on master (%s) and standby (%s) must match"),
master_version, standby_version);
exit(ERR_BAD_CONFIG);
}
}

View File

@@ -7,6 +7,7 @@
#define _REPMGR_ACTION_STANDBY_H_ #define _REPMGR_ACTION_STANDBY_H_
extern void do_standby_clone(void); extern void do_standby_clone(void);
extern void do_standby_register(void);
typedef struct typedef struct
{ {

View File

@@ -60,6 +60,10 @@ typedef struct
char wal_keep_segments[MAXLEN]; char wal_keep_segments[MAXLEN];
bool without_barman; bool without_barman;
/* standby register options */
bool wait_register_sync;
int wait_register_sync_seconds;
/* event options */ /* event options */
bool all; bool all;
char event[MAXLEN]; char event[MAXLEN];
@@ -82,6 +86,8 @@ typedef struct
UNKNOWN_NODE_ID, "", "", \ UNKNOWN_NODE_ID, "", "", \
/* standby clone options */ \ /* standby clone options */ \
false, CONFIG_FILE_SAMEPATH, false, false, false, "", "", "", false, "", false, \ false, CONFIG_FILE_SAMEPATH, false, false, false, "", "", "", false, "", false, \
/* standby register options */ \
false, 0, \
/* event options */ \ /* event options */ \
false, "", 20 } false, "", 20 }

View File

@@ -10,7 +10,8 @@
* *
* [ MASTER | PRIMARY ] REGISTER * [ MASTER | PRIMARY ] REGISTER
* *
* STANDBY CLONE (wip) * STANDBY CLONE
* STANDBY REGISTER (wip)
* *
* CLUSTER EVENT * CLUSTER EVENT
*/ */
@@ -344,6 +345,18 @@ main(int argc, char **argv)
runtime_options.without_barman = true; runtime_options.without_barman = true;
break; break;
/* standby register options *
* --------------------- */
case OPT_REGISTER_WAIT:
runtime_options.wait_register_sync = true;
if (optarg != NULL)
{
runtime_options.wait_register_sync_seconds = repmgr_atoi(optarg, "--wait-sync", &cli_errors, false);
}
break;
/* event options * /* event options *
* ------------- */ * ------------- */
@@ -534,6 +547,8 @@ main(int argc, char **argv)
{ {
if (strcasecmp(repmgr_action, "CLONE") == 0) if (strcasecmp(repmgr_action, "CLONE") == 0)
action = STANDBY_CLONE; action = STANDBY_CLONE;
else if(strcasecmp(repmgr_action, "REGISTER") == 0)
action = STANDBY_REGISTER;
} }
else if(strcasecmp(repmgr_node_type, "CLUSTER") == 0) else if(strcasecmp(repmgr_node_type, "CLUSTER") == 0)
@@ -760,12 +775,18 @@ main(int argc, char **argv)
case MASTER_REGISTER: case MASTER_REGISTER:
do_master_register(); do_master_register();
break; break;
case STANDBY_CLONE: case STANDBY_CLONE:
do_standby_clone(); do_standby_clone();
break; break;
case STANDBY_REGISTER:
do_standby_register();
break;
case CLUSTER_EVENT: case CLUSTER_EVENT:
do_cluster_event(); do_cluster_event();
break; break;
default: default:
/* An action will have been determined by this point */ /* An action will have been determined by this point */
break; break;

View File

@@ -103,6 +103,9 @@ static struct option long_options[] =
{"use-recovery-conninfo-password", no_argument, NULL, OPT_USE_RECOVERY_CONNINFO_PASSWORD}, {"use-recovery-conninfo-password", no_argument, NULL, OPT_USE_RECOVERY_CONNINFO_PASSWORD},
{"without-barman", no_argument, NULL, OPT_WITHOUT_BARMAN}, {"without-barman", no_argument, NULL, OPT_WITHOUT_BARMAN},
/* standby register options */
{"wait-sync", optional_argument, NULL, OPT_REGISTER_WAIT},
/* event options */ /* event options */
{"all", no_argument, NULL, OPT_ALL }, {"all", no_argument, NULL, OPT_ALL },
{"event", required_argument, NULL, OPT_EVENT }, {"event", required_argument, NULL, OPT_EVENT },

View File

@@ -10,6 +10,7 @@
#define MAXLEN 1024 #define MAXLEN 1024
#define MAX_QUERY_LEN 8192 #define MAX_QUERY_LEN 8192
#define MAXVERSIONSTR 16
/* same as defined in src/include/replication/walreceiver.h */ /* same as defined in src/include/replication/walreceiver.h */
#define MAXCONNINFO 1024 #define MAXCONNINFO 1024