"standby follow": check node system identifiers match

This commit is contained in:
Ian Barwick
2017-08-14 11:45:08 +09:00
parent 0f31756733
commit eabd56f3be
9 changed files with 114 additions and 37 deletions

View File

@@ -17,6 +17,25 @@
static ControlFileInfo *get_controlfile(const char *DataDir); static ControlFileInfo *get_controlfile(const char *DataDir);
uint64
get_system_identifier(const char *data_directory)
{
ControlFileInfo *control_file_info;
uint64 system_identifier = UNKNOWN_SYSTEM_IDENTIFIER;
control_file_info = get_controlfile(data_directory);
if (control_file_info->control_file_processed == true)
system_identifier = control_file_info->control_file->system_identifier;
else
system_identifier = UNKNOWN_SYSTEM_IDENTIFIER;
pfree(control_file_info->control_file);
pfree(control_file_info);
return system_identifier;
}
DBState DBState
get_db_state(const char *data_directory) get_db_state(const char *data_directory)
{ {
@@ -33,6 +52,7 @@ get_db_state(const char *data_directory)
pfree(control_file_info->control_file); pfree(control_file_info->control_file);
pfree(control_file_info); pfree(control_file_info);
return state; return state;
} }

View File

@@ -18,10 +18,10 @@ typedef struct
ControlFileData *control_file; ControlFileData *control_file;
} ControlFileInfo; } ControlFileInfo;
DBState get_db_state(const char *data_directory); extern DBState get_db_state(const char *data_directory);
const char * describe_db_state(DBState state); extern const char * describe_db_state(DBState state);
int get_data_checksum_version(const char *data_directory); extern int get_data_checksum_version(const char *data_directory);
extern uint64 get_system_identifier(const char *data_directory);
extern XLogRecPtr get_latest_checkpoint_location(const char *data_directory); extern XLogRecPtr get_latest_checkpoint_location(const char *data_directory);
#endif /* _CONTROLDATA_H_ */ #endif /* _CONTROLDATA_H_ */

View File

@@ -115,6 +115,7 @@ _establish_db_connection(const char *conninfo, const bool exit_on_error, const b
if (parse_success == false) if (parse_success == false)
{ {
log_error(_("unable to pass provided conninfo string:\n %s"), errmsg); log_error(_("unable to pass provided conninfo string:\n %s"), errmsg);
free_conninfo_params(&conninfo_params);
return NULL; return NULL;
} }
@@ -155,6 +156,7 @@ _establish_db_connection(const char *conninfo, const bool exit_on_error, const b
if (exit_on_error) if (exit_on_error)
{ {
PQfinish(conn); PQfinish(conn);
free_conninfo_params(&conninfo_params);
exit(ERR_DB_CONN); exit(ERR_DB_CONN);
} }
} }
@@ -170,11 +172,13 @@ _establish_db_connection(const char *conninfo, const bool exit_on_error, const b
if (exit_on_error) if (exit_on_error)
{ {
PQfinish(conn); PQfinish(conn);
free_conninfo_params(&conninfo_params);
exit(ERR_DB_CONN); exit(ERR_DB_CONN);
} }
} }
pfree(connection_string); pfree(connection_string);
free_conninfo_params(&conninfo_params);
return conn; return conn;
} }
@@ -1284,6 +1288,8 @@ get_replication_info(PGconn *conn, ReplInfo *replication_info)
replication_info->last_wal_replay_lsn = parse_lsn(PQgetvalue(res, 0, 1)); replication_info->last_wal_replay_lsn = parse_lsn(PQgetvalue(res, 0, 1));
strncpy(replication_info->replication_lag_time, PQgetvalue(res, 0, 3), MAXLEN); strncpy(replication_info->replication_lag_time, PQgetvalue(res, 0, 3), MAXLEN);
PQclear(res);
return true; return true;
} }
@@ -1478,11 +1484,11 @@ get_replication_lag_seconds(PGconn *conn)
bool bool
identify_system(PGconn *replconn, t_system_identification *identification) identify_system(PGconn *repl_conn, t_system_identification *identification)
{ {
PGresult *res; PGresult *res;
res = PQexec(replconn, "IDENTIFY_SYSTEM;"); res = PQexec(repl_conn, "IDENTIFY_SYSTEM;");
if (PQresultStatus(res) != PGRES_TUPLES_OK || !PQntuples(res)) if (PQresultStatus(res) != PGRES_TUPLES_OK || !PQntuples(res))
{ {
@@ -2406,6 +2412,8 @@ get_node_replication_stats(PGconn *conn, t_node_info *node_info)
node_info->active_replication_slots = atoi(PQgetvalue(res, 0, 3)); node_info->active_replication_slots = atoi(PQgetvalue(res, 0, 3));
node_info->inactive_replication_slots = atoi(PQgetvalue(res, 0, 4)); node_info->inactive_replication_slots = atoi(PQgetvalue(res, 0, 4));
PQclear(res);
return; return;
} }

View File

@@ -345,7 +345,7 @@ RecoveryType get_recovery_type(PGconn *conn);
int get_primary_node_id(PGconn *conn); int get_primary_node_id(PGconn *conn);
bool can_use_pg_rewind(PGconn *conn, const char *data_directory, PQExpBufferData *reason); bool can_use_pg_rewind(PGconn *conn, const char *data_directory, PQExpBufferData *reason);
int get_ready_archive_files(PGconn *conn, const char *data_directory); int get_ready_archive_files(PGconn *conn, const char *data_directory);
bool identify_system(PGconn *replconn, t_system_identification *identification); bool identify_system(PGconn *repl_conn, t_system_identification *identification);
/* extension functions */ /* extension functions */
ExtensionStatus get_repmgr_extension_status(PGconn *conn); ExtensionStatus get_repmgr_extension_status(PGconn *conn);

View File

@@ -96,28 +96,6 @@ do_node_status(void)
get_node_replication_stats(conn, &node_info); get_node_replication_stats(conn, &node_info);
/* get system information */
{
t_conninfo_param_list repl_conninfo;
PGconn *replication_conn;
t_system_identification sysid = T_SYSTEM_IDENTIFICATION_INITIALIZER;
initialize_conninfo_params(&repl_conninfo, false);
conn_to_param_list(conn, &repl_conninfo);
param_set(&repl_conninfo, "replication", "1");
param_set(&repl_conninfo, "user", node_info.repluser);
replication_conn = establish_db_connection_by_params(&repl_conninfo, false);
identify_system(replication_conn, &sysid);
printf("%lu\n", sysid.system_identifier);
exit(0);
}
key_value_list_set( key_value_list_set(
&node_status, &node_status,
"PostgreSQL version", "PostgreSQL version",
@@ -418,6 +396,8 @@ do_node_status(void)
/* log_hint(_("execute \"repmgr node check\" for more details")); */ /* log_hint(_("execute \"repmgr node check\" for more details")); */
} }
key_value_list_free(&node_status);
PQfinish(conn);
} }
/* /*

View File

@@ -1247,8 +1247,12 @@ do_standby_follow(void)
PQExpBufferData follow_output; PQExpBufferData follow_output;
bool success; bool success;
log_verbose(LOG_DEBUG, "do_standby_follow()"); uint64 local_system_identifier = UNKNOWN_SYSTEM_IDENTIFIER;
t_conninfo_param_list repl_conninfo;
PGconn *repl_conn = NULL;
t_system_identification primary_identification = T_SYSTEM_IDENTIFICATION_INITIALIZER;
log_verbose(LOG_DEBUG, "do_standby_follow()");
local_conn = establish_db_connection(config_file_options.conninfo, true); local_conn = establish_db_connection(config_file_options.conninfo, true);
@@ -1263,7 +1267,7 @@ do_standby_follow(void)
* If --wait provided, loop for up `primary_response_timeout` * If --wait provided, loop for up `primary_response_timeout`
* seconds before giving up * seconds before giving up
*/ */
// XXX ??? primary_follow_timeout
for (timer = 0; timer < config_file_options.primary_follow_timeout; timer++) for (timer = 0; timer < config_file_options.primary_follow_timeout; timer++)
{ {
primary_conn = get_primary_connection_quiet(local_conn, primary_conn = get_primary_connection_quiet(local_conn,
@@ -1275,20 +1279,60 @@ do_standby_follow(void)
break; break;
} }
} }
PQfinish(local_conn);
if (PQstatus(primary_conn) != CONNECTION_OK) if (PQstatus(primary_conn) != CONNECTION_OK)
{ {
log_error(_("unable to determine primary node")); log_error(_("unable to determine primary node"));
PQfinish(local_conn);
exit(ERR_BAD_CONFIG); exit(ERR_BAD_CONFIG);
} }
check_primary_standby_version_match(local_conn, primary_conn);
PQfinish(local_conn);
get_node_record(primary_conn, primary_id, &primary_node_record); get_node_record(primary_conn, primary_id, &primary_node_record);
/* check replication connection */
initialize_conninfo_params(&repl_conninfo, false);
conn_to_param_list(primary_conn, &repl_conninfo);
param_set(&repl_conninfo, "replication", "1");
param_set(&repl_conninfo, "user", primary_node_record.repluser);
repl_conn = establish_db_connection_by_params(&repl_conninfo, false);
if (PQstatus(repl_conn) != CONNECTION_OK)
{
log_error(_("unable to establish a replication connection to the primary node"));
PQfinish(primary_conn);
exit(ERR_BAD_CONFIG);
}
/* check system_identifiers match */
local_system_identifier = get_system_identifier(config_file_options.data_directory);
success = identify_system(repl_conn, &primary_identification);
if (success == false)
{
log_error(_("unable to query the primary node's system identification"));
PQfinish(primary_conn);
PQfinish(repl_conn);
exit(ERR_BAD_CONFIG);
}
if (primary_identification.system_identifier != local_system_identifier)
{
log_error(_("this node is not part of the primary node's replication cluster"));
log_detail(_("this node's system identifier is %lu, primary node's system identifier is %lu"),
local_system_identifier,
primary_identification.system_identifier);
PQfinish(primary_conn);
PQfinish(repl_conn);
exit(ERR_BAD_CONFIG);
}
PQfinish(repl_conn);
free_conninfo_params(&repl_conninfo);
initPQExpBuffer(&follow_output); initPQExpBuffer(&follow_output);
success = do_standby_follow_internal( success = do_standby_follow_internal(
@@ -1425,6 +1469,8 @@ do_standby_follow_internal(PGconn *primary_conn, t_node_info *primary_node_recor
{ {
original_upstream_node_id = primary_node_record->node_id; original_upstream_node_id = primary_node_record->node_id;
} }
free_conninfo_params(&local_node_conninfo);
} }
log_info(_("changing node %i's primary to node %i"), log_info(_("changing node %i's primary to node %i"),

View File

@@ -34,7 +34,7 @@
#define UNKNOWN_SERVER_VERSION_NUM -1 #define UNKNOWN_SERVER_VERSION_NUM -1
#define UNKNOWN_TIMELINE_ID -1 #define UNKNOWN_TIMELINE_ID -1
#define UNKNOWN_SYSTEM_IDENTIFIER -1 #define UNKNOWN_SYSTEM_IDENTIFIER 0
#define NODE_NOT_FOUND -1 #define NODE_NOT_FOUND -1
#define NO_UPSTREAM_NODE -1 #define NO_UPSTREAM_NODE -1

View File

@@ -191,6 +191,27 @@ key_value_list_get(KeyValueList *item_list, const char *key)
return NULL; return NULL;
} }
void
key_value_list_free(KeyValueList *item_list)
{
KeyValueListCell *cell;
KeyValueListCell *next_cell;
cell = item_list->head;
while (cell != NULL)
{
next_cell = cell->next;
pfree(cell->key);
pfree(cell->value);
pfree(cell);
cell = next_cell;
}
}
/* /*
* Escape a string for use as a parameter in recovery.conf * Escape a string for use as a parameter in recovery.conf
* Caller must free returned value * Caller must free returned value

View File

@@ -82,6 +82,8 @@ key_value_list_set_output_mode(KeyValueList *item_list, const char *key, OutputM
extern const char * extern const char *
key_value_list_get(KeyValueList *item_list, const char *key); key_value_list_get(KeyValueList *item_list, const char *key);
extern void
key_value_list_free(KeyValueList *item_list);
extern char * extern char *
escape_recovery_conf_value(const char *src); escape_recovery_conf_value(const char *src);