Add "repmgr node check --downstream"

This commit is contained in:
Ian Barwick
2017-08-15 15:50:46 +09:00
parent 10ef30096c
commit 554673e83e
8 changed files with 169 additions and 11 deletions

View File

@@ -20,7 +20,7 @@ static ControlFileInfo *get_controlfile(const char *DataDir);
uint64 uint64
get_system_identifier(const char *data_directory) get_system_identifier(const char *data_directory)
{ {
ControlFileInfo *control_file_info = T_CONTROLFILEINFO_INITIALIZER; ControlFileInfo *control_file_info = NULL;
uint64 system_identifier = UNKNOWN_SYSTEM_IDENTIFIER; uint64 system_identifier = UNKNOWN_SYSTEM_IDENTIFIER;
control_file_info = get_controlfile(data_directory); control_file_info = get_controlfile(data_directory);
@@ -39,7 +39,7 @@ get_system_identifier(const char *data_directory)
DBState DBState
get_db_state(const char *data_directory) get_db_state(const char *data_directory)
{ {
ControlFileInfo *control_file_info = T_CONTROLFILEINFO_INITIALIZER; ControlFileInfo *control_file_info = NULL;
DBState state; DBState state;
control_file_info = get_controlfile(data_directory); control_file_info = get_controlfile(data_directory);
@@ -60,7 +60,7 @@ get_db_state(const char *data_directory)
extern XLogRecPtr extern XLogRecPtr
get_latest_checkpoint_location(const char *data_directory) get_latest_checkpoint_location(const char *data_directory)
{ {
ControlFileInfo *control_file_info = T_CONTROLFILEINFO_INITIALIZER; ControlFileInfo *control_file_info = NULL;
XLogRecPtr checkPoint = InvalidXLogRecPtr; XLogRecPtr checkPoint = InvalidXLogRecPtr;
control_file_info = get_controlfile(data_directory); control_file_info = get_controlfile(data_directory);
@@ -80,7 +80,7 @@ get_latest_checkpoint_location(const char *data_directory)
int int
get_data_checksum_version(const char *data_directory) get_data_checksum_version(const char *data_directory)
{ {
ControlFileInfo *control_file_info = T_CONTROLFILEINFO_INITIALIZER; ControlFileInfo *control_file_info = NULL;
int data_checksum_version = -1; int data_checksum_version = -1;
control_file_info = get_controlfile(data_directory); control_file_info = get_controlfile(data_directory);
@@ -132,7 +132,7 @@ describe_db_state(DBState state)
static ControlFileInfo * static ControlFileInfo *
get_controlfile(const char *DataDir) get_controlfile(const char *DataDir)
{ {
ControlFileInfo *control_file_info = T_CONTROLFILEINFO_INITIALIZER; ControlFileInfo *control_file_info;
int fd; int fd;
char ControlFilePath[MAXPGPATH] = ""; char ControlFilePath[MAXPGPATH] = "";

View File

@@ -18,8 +18,6 @@ typedef struct
ControlFileData *control_file; ControlFileData *control_file;
} ControlFileInfo; } ControlFileInfo;
#define T_CONTROLFILEINFO_INITIALIZER { false, NULL }
extern DBState get_db_state(const char *data_directory); extern DBState get_db_state(const char *data_directory);
extern const char * describe_db_state(DBState state); extern const char * describe_db_state(DBState state);
extern int get_data_checksum_version(const char *data_directory); extern int get_data_checksum_version(const char *data_directory);

View File

@@ -2426,6 +2426,53 @@ get_node_replication_stats(PGconn *conn, t_node_info *node_info)
} }
bool
is_downstream_node_attached(PGconn *conn, char *node_name)
{
PQExpBufferData query;
PGresult *res = NULL;
int c = 0;
initPQExpBuffer(&query);
appendPQExpBuffer(
&query,
" SELECT COUNT(*) FROM pg_catalog.pg_stat_replication "
" WHERE application_name = '%s'",
node_name);
res = PQexec(conn, query.data);
termPQExpBuffer(&query);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
log_verbose(LOG_WARNING, _("unable to query pg_stat_replication"));
log_detail("%s", PQerrorMessage(conn));
PQclear(res);
return false;
}
if (PQntuples(res) != 1)
{
log_verbose(LOG_WARNING, _("unexpected number of tuples (%i) returned"), PQntuples(res));
PQclear(res);
return false;
}
c = atoi(PQgetvalue(res, 0, 0));
PQclear(res);
if (c == 0)
{
log_verbose(LOG_WARNING, _("node \"%s\" not found in \"pg_stat_replication\""), node_name);
return false;
}
if (c > 1)
log_verbose(LOG_WARNING, _("multiple entries with \"application_name\" set to \"%s\" found in \"pg_stat_replication\""),
node_name);
return true;
}
void void
clear_node_info_list(NodeInfoList *nodes) clear_node_info_list(NodeInfoList *nodes)

View File

@@ -97,6 +97,7 @@ typedef struct s_node_info
/* for ad-hoc use e.g. when working with a list of nodes */ /* for ad-hoc use e.g. when working with a list of nodes */
char details[MAXLEN]; char details[MAXLEN];
bool reachable; bool reachable;
bool attached;
/* various statistics */ /* various statistics */
int max_wal_senders; int max_wal_senders;
int attached_wal_receivers; int attached_wal_receivers;
@@ -127,7 +128,7 @@ typedef struct s_node_info
MS_NORMAL, \ MS_NORMAL, \
NULL, \ NULL, \
/* for ad-hoc use e.g. when working with a list of nodes */ \ /* for ad-hoc use e.g. when working with a list of nodes */ \
"", true \ "", true, true \
/* various statistics */ \ /* various statistics */ \
-1, -1, -1, -1, -1 \ -1, -1, -1, -1, -1 \
} }
@@ -437,6 +438,7 @@ XLogRecPtr get_last_wal_receive_location(PGconn *conn);
bool get_replication_info(PGconn *conn, ReplInfo *replication_info); bool get_replication_info(PGconn *conn, ReplInfo *replication_info);
int get_replication_lag_seconds(PGconn *conn); int get_replication_lag_seconds(PGconn *conn);
void get_node_replication_stats(PGconn *conn, t_node_info *node_info); void get_node_replication_stats(PGconn *conn, t_node_info *node_info);
bool is_downstream_node_attached(PGconn *conn, char *node_name);
/* BDR functions */ /* BDR functions */
void get_all_bdr_node_records(PGconn *conn, BdrNodeInfoList *node_list); void get_all_bdr_node_records(PGconn *conn, BdrNodeInfoList *node_list);

View File

@@ -32,6 +32,7 @@ static void _do_node_restore_config(void);
static CheckStatus do_node_check_role(PGconn *conn, OutputMode mode, t_node_info *node_info, CheckStatusList *list_output); static CheckStatus do_node_check_role(PGconn *conn, OutputMode mode, t_node_info *node_info, CheckStatusList *list_output);
static CheckStatus do_node_check_archiver(PGconn *conn, OutputMode mode, CheckStatusList *list_output); static CheckStatus do_node_check_archiver(PGconn *conn, OutputMode mode, CheckStatusList *list_output);
static CheckStatus do_node_check_replication_lag(PGconn *conn, OutputMode mode, CheckStatusList *list_output); static CheckStatus do_node_check_replication_lag(PGconn *conn, OutputMode mode, CheckStatusList *list_output);
static CheckStatus do_node_check_downstream(PGconn *conn, OutputMode mode, CheckStatusList *list_output);
void void
@@ -558,6 +559,12 @@ do_node_check(void)
return; return;
} }
if (runtime_options.downstream == true)
{
(void) do_node_check_downstream(conn, runtime_options.output_mode, NULL);
PQfinish(conn);
return;
}
/* output general overview */ /* output general overview */
initPQExpBuffer(&output); initPQExpBuffer(&output);
@@ -565,7 +572,7 @@ do_node_check(void)
(void) do_node_check_role(conn, runtime_options.output_mode, &node_info, &status_list); (void) do_node_check_role(conn, runtime_options.output_mode, &node_info, &status_list);
(void) do_node_check_replication_lag(conn, runtime_options.output_mode, &status_list); (void) do_node_check_replication_lag(conn, runtime_options.output_mode, &status_list);
(void) do_node_check_archiver(conn, runtime_options.output_mode, &status_list); (void) do_node_check_archiver(conn, runtime_options.output_mode, &status_list);
(void) do_node_check_downstream(conn, runtime_options.output_mode, &status_list);
if (runtime_options.output_mode == OM_CSV) if (runtime_options.output_mode == OM_CSV)
{ {
@@ -1016,6 +1023,103 @@ do_node_check_replication_lag(PGconn *conn, OutputMode mode, CheckStatusList *li
return status; return status;
} }
/* TODO: ensure only runs on streaming replication nodes */
static CheckStatus
do_node_check_downstream(PGconn *conn, OutputMode mode, CheckStatusList *list_output)
{
NodeInfoList downstream_nodes = T_NODE_INFO_LIST_INITIALIZER;
NodeInfoListCell *cell = NULL;
int missing_nodes_count = 0;
CheckStatus status = CHECK_STATUS_OK;
ItemList missing_nodes = { NULL, NULL };
PQExpBufferData details;
initPQExpBuffer(&details);
get_downstream_node_records(conn, config_file_options.node_id, &downstream_nodes);
for (cell = downstream_nodes.head; cell; cell = cell->next)
{
if (is_downstream_node_attached(conn, cell->node_info->node_name) == false)
{
missing_nodes_count ++;
item_list_append_format(&missing_nodes,
"%s (ID: %i)",
cell->node_info->node_name,
cell->node_info->node_id);
}
}
if (missing_nodes_count == 0)
{
if (downstream_nodes.node_count == 0)
appendPQExpBuffer(
&details,
"this node has no downstream nodes");
else
appendPQExpBuffer(
&details,
"%i of %i downstream nodes attached",
downstream_nodes.node_count,
downstream_nodes.node_count);
}
else
{
ItemListCell *missing_cell = NULL;
bool first = true;
status = CHECK_STATUS_CRITICAL;
appendPQExpBuffer(
&details,
"%i of %i downstream nodes not attached (missing: ",
missing_nodes_count,
downstream_nodes.node_count);
for (missing_cell = missing_nodes.head; missing_cell; missing_cell = missing_cell->next)
{
if (first == false)
appendPQExpBuffer(
&details,
",");
else
first = false;
if (first == false)
appendPQExpBuffer(
&details,
"%s", missing_cell->string);
}
}
switch (mode)
{
case OM_NAGIOS:
printf("PG_DOWNSTREAM_SERVERS %s: %s\n",
output_check_status(status),
details.data);
break;
case OM_TEXT:
if (list_output != NULL)
{
check_status_list_set(list_output,
"Downstream servers",
status,
details.data);
}
else
{
printf("%s (%s)\n",
output_check_status(status),
details.data);
}
default:
break;
}
termPQExpBuffer(&details);
return status;
}
// --action=... // --action=...
// --check // --check

View File

@@ -85,6 +85,7 @@ typedef struct
/* "node check" options */ /* "node check" options */
bool archiver; bool archiver;
bool downstream;
bool replication_lag; bool replication_lag;
bool role; bool role;
@@ -132,7 +133,7 @@ typedef struct
/* "node status" options */ \ /* "node status" options */ \
false, \ false, \
/* "node check" options */ \ /* "node check" options */ \
false, false, false, \ false, false, false, false,\
/* "node join" options */ \ /* "node join" options */ \
"", \ "", \
/* "node service" options */ \ /* "node service" options */ \

View File

@@ -426,6 +426,10 @@ main(int argc, char **argv)
runtime_options.archiver = true; runtime_options.archiver = true;
break; break;
case OPT_DOWNSTREAM:
runtime_options.downstream = true;
break;
case OPT_REPLICATION_LAG: case OPT_REPLICATION_LAG:
runtime_options.replication_lag = true; runtime_options.replication_lag = true;
break; break;
@@ -3001,7 +3005,7 @@ init_node_record(t_node_info *node_record)
if (config_file_options.replication_user[0] != '\0') if (config_file_options.replication_user[0] != '\0')
{ {
/* Replication user explicitly provided */ /* replication user explicitly provided */
strncpy(node_record->repluser, config_file_options.replication_user, NAMEDATALEN); strncpy(node_record->repluser, config_file_options.replication_user, NAMEDATALEN);
} }
else else

View File

@@ -71,6 +71,7 @@
#define OPT_CONFIG_FILES 1035 #define OPT_CONFIG_FILES 1035
#define OPT_SIBLINGS_FOLLOW 1036 #define OPT_SIBLINGS_FOLLOW 1036
#define OPT_ROLE 1037 #define OPT_ROLE 1037
#define OPT_DOWNSTREAM 1038
/* deprecated since 3.3 */ /* deprecated since 3.3 */
#define OPT_DATA_DIR 999 #define OPT_DATA_DIR 999
#define OPT_NO_CONNINFO_PASSWORD 998 #define OPT_NO_CONNINFO_PASSWORD 998
@@ -139,6 +140,7 @@ static struct option long_options[] =
/* "node check" options */ /* "node check" options */
{"archiver", no_argument, NULL, OPT_ARCHIVER }, {"archiver", no_argument, NULL, OPT_ARCHIVER },
{"downstream", no_argument, NULL, OPT_DOWNSTREAM },
{"replication-lag", no_argument, NULL, OPT_REPLICATION_LAG }, {"replication-lag", no_argument, NULL, OPT_REPLICATION_LAG },
{"role", no_argument, NULL, OPT_ROLE }, {"role", no_argument, NULL, OPT_ROLE },