From a9b0c16b3c81d5d42db1bac1879fcb9d4eadeba9 Mon Sep 17 00:00:00 2001 From: Ian Barwick Date: Wed, 26 Jul 2017 11:24:33 +0900 Subject: [PATCH] Add "cluster matrix" and "cluster crosscheck" actions --- repmgr-action-cluster.c | 608 +++++++++++++++++++++++++++++++++++++++- repmgr-action-cluster.h | 26 ++ repmgr-client-global.h | 5 +- repmgr-client.c | 124 +++++++- 4 files changed, 757 insertions(+), 6 deletions(-) diff --git a/repmgr-action-cluster.c b/repmgr-action-cluster.c index e6f3f4fa..33785fd6 100644 --- a/repmgr-action-cluster.c +++ b/repmgr-action-cluster.c @@ -7,7 +7,7 @@ */ #include "repmgr.h" - +#include "compat.h" #include "repmgr-client-global.h" #include "repmgr-action-cluster.h" @@ -44,6 +44,13 @@ struct ColHeader { struct ColHeader headers_show[SHOW_HEADER_COUNT]; struct ColHeader headers_event[EVENT_HEADER_COUNT]; + + +static int build_cluster_matrix(t_node_matrix_rec ***matrix_rec_dest, int *name_length); +static int build_cluster_crosscheck(t_node_status_cube ***cube_dest, int *name_length); +static void cube_set_node_status(t_node_status_cube **cube, int n, int node_id, int matrix_node_id, int connection_node_id, int connection_status); + + void do_cluster_show(void) { @@ -393,6 +400,7 @@ do_cluster_event(void) } if (PQntuples(res) == 0) { + /* print this message directly, rather than as a log line */ printf(_("no matching events found\n")); PQclear(res); PQfinish(conn); @@ -474,3 +482,601 @@ do_cluster_event(void) PQfinish(conn); } + + +void +do_cluster_crosscheck(void) +{ + int i, n; + char c; + const char *node_header = "Name"; + int name_length = strlen(node_header); + + t_node_status_cube **cube; + + n = build_cluster_crosscheck(&cube, &name_length); + + printf("%*s | Id ", name_length, node_header); + for (i = 0; i < n; i++) + printf("| %2d ", cube[i]->node_id); + printf("\n"); + + for (i = 0; i < name_length; i++) + printf("-"); + printf("-+----"); + for (i = 0; i < n; i++) + printf("+----"); + printf("\n"); + + for (i = 0; i < n; i++) + { + int column_node_ix; + + printf("%*s | %2d ", name_length, + cube[i]->node_name, + cube[i]->node_id); + + for (column_node_ix = 0; column_node_ix < n; column_node_ix++) + { + int max_node_status = -2; + int node_ix; + + /* + * The value of entry (i,j) is equal to the + * maximum value of all the (i,j,k). Indeed: + * + * - if one of the (i,j,k) is 0 (node up), then 0 + * (the node is up); + * + * - if the (i,j,k) are either -1 (down) or -2 + * (unknown), then -1 (the node is down); + * + * - if all the (i,j,k) are -2 (unknown), then -2 + * (the node is in an unknown state). + */ + + for(node_ix = 0; node_ix < n; node_ix ++) + { + int node_status = cube[node_ix]->matrix_list_rec[i]->node_status_list[column_node_ix]->node_status; + if (node_status > max_node_status) + max_node_status = node_status; + } + + switch (max_node_status) + { + case -2: + c = '?'; + break; + case -1: + c = 'x'; + break; + case 0: + c = '*'; + break; + default: + exit(ERR_INTERNAL); + } + + printf("| %c ", c); + } + + printf("\n"); + } +} + + +void +do_cluster_matrix() +{ + int i, j; + int n; + const char *node_header = "Name"; + int name_length = strlen(node_header); + + t_node_matrix_rec **matrix_rec_list; + + n = build_cluster_matrix(&matrix_rec_list, &name_length); + + if (runtime_options.csv == true) + { + for (i = 0; i < n; i++) + for (j = 0; j < n; j++) + printf("%d,%d,%d\n", + matrix_rec_list[i]->node_id, + matrix_rec_list[i]->node_status_list[j]->node_id, + matrix_rec_list[i]->node_status_list[j]->node_status); + } + else + { + char c; + + printf("%*s | Id ", name_length, node_header); + for (i = 0; i < n; i++) + printf("| %2d ", matrix_rec_list[i]->node_id); + printf("\n"); + + for (i = 0; i < name_length; i++) + printf("-"); + printf("-+----"); + for (i = 0; i < n; i++) + printf("+----"); + printf("\n"); + + for (i = 0; i < n; i++) + { + printf("%*s | %2d ", name_length, + matrix_rec_list[i]->node_name, + matrix_rec_list[i]->node_id); + for (j = 0; j < n; j++) + { + switch (matrix_rec_list[i]->node_status_list[j]->node_status) + { + case -2: + c = '?'; + break; + case -1: + c = 'x'; + break; + case 0: + c = '*'; + break; + default: + exit(ERR_INTERNAL); + } + + printf("| %c ", c); + } + printf("\n"); + } + } +} + + +static void +matrix_set_node_status(t_node_matrix_rec **matrix_rec_list, int n, int node_id, int connection_node_id, int connection_status) +{ + int i, j; + + for (i = 0; i < n; i++) + { + if (matrix_rec_list[i]->node_id == node_id) + { + for (j = 0; j < n; j++) + { + if (matrix_rec_list[i]->node_status_list[j]->node_id == connection_node_id) + { + matrix_rec_list[i]->node_status_list[j]->node_status = connection_status; + break; + } + } + break; + } + } +} + + +static int +build_cluster_matrix(t_node_matrix_rec ***matrix_rec_dest, int *name_length) +{ + PGconn *conn; + int i, j; + int local_node_id; + NodeInfoList nodes = T_NODE_INFO_LIST_INITIALIZER; + NodeInfoListCell *cell; + + PQExpBufferData command; + PQExpBufferData command_output; + + t_node_matrix_rec **matrix_rec_list; + + /* obtain node list from the database */ + log_info(_("connecting to database")); + + if (strlen(config_file_options.conninfo)) + { + conn = establish_db_connection(config_file_options.conninfo, true); + local_node_id = config_file_options.node_id; + } + else + { + conn = establish_db_connection_by_params(&source_conninfo, true); + local_node_id = runtime_options.node_id; + } + + get_all_node_records(conn, &nodes); + + PQfinish(conn); + + if (nodes.node_count == 0) + { + log_error(_("unable to retrieve any node records")); + exit(ERR_BAD_CONFIG); + } + + /* + * Allocate an empty matrix record list + * + * -2 == NULL ? + * -1 == Error x + * 0 == OK * + */ + + matrix_rec_list = (t_node_matrix_rec **) pg_malloc0(sizeof(t_node_matrix_rec) * nodes.node_count); + + i = 0; + + /* Initialise matrix structure for each node */ + for (cell = nodes.head; cell; cell = cell->next) + { + int name_length_cur; + NodeInfoListCell *cell_j; + + matrix_rec_list[i] = (t_node_matrix_rec *) pg_malloc0(sizeof(t_node_matrix_rec)); + + matrix_rec_list[i]->node_id = cell->node_info->node_id; + strncpy(matrix_rec_list[i]->node_name, cell->node_info->node_name, MAXLEN); + + /* + * Find the maximum length of a node name + */ + name_length_cur = strlen(matrix_rec_list[i]->node_name); + if (name_length_cur > *name_length) + *name_length = name_length_cur; + + matrix_rec_list[i]->node_status_list = (t_node_status_rec **) pg_malloc0(sizeof(t_node_status_rec) * nodes.node_count); + + j = 0; + + for (cell_j = nodes.head; cell_j; cell_j = cell_j->next) + { + matrix_rec_list[i]->node_status_list[j] = (t_node_status_rec *) pg_malloc0(sizeof(t_node_status_rec)); + matrix_rec_list[i]->node_status_list[j]->node_id = cell_j->node_info->node_id; + matrix_rec_list[i]->node_status_list[j]->node_status = -2; /* default unknown */ + + j++; + } + + i++; + } + + /* Fetch `repmgr cluster show --csv` output for each node */ + i = 0; + + for (cell = nodes.head; cell; cell = cell->next) + { + int connection_status; + t_conninfo_param_list remote_conninfo; + char *host, *p; + int connection_node_id = cell->node_info->node_id; + int x, y; + + initialize_conninfo_params(&remote_conninfo, false); + parse_conninfo_string(cell->node_info->conninfo, + &remote_conninfo, + NULL, + false); + + host = param_get(&remote_conninfo, "host"); + + conn = establish_db_connection(cell->node_info->conninfo, false); + + connection_status = + (PQstatus(conn) == CONNECTION_OK) ? 0 : -1; + + + matrix_set_node_status(matrix_rec_list, + nodes.node_count, + local_node_id, + connection_node_id, + connection_status); + + + if (connection_status) + continue; + + /* We don't need to issue `cluster show --csv` for the local node */ + if (connection_node_id == local_node_id) + continue; + + initPQExpBuffer(&command); + + /* + * We'll pass cluster name and database connection string to the remote + * repmgr - those are the only values it needs to work, and saves us + * making assumptions about the location of repmgr.conf + */ + appendPQExpBuffer(&command, + "\"%s -d '%s' ", + make_pg_path(progname()), + cell->node_info->conninfo); + + + if (strlen(pg_bindir)) + { + appendPQExpBuffer(&command, + "--pg_bindir="); + appendShellString(&command, + pg_bindir); + appendPQExpBuffer(&command, + " "); + } + + appendPQExpBuffer(&command, + " cluster show --csv\""); + + log_verbose(LOG_DEBUG, "build_cluster_matrix(): executing:\n %s", command.data); + + initPQExpBuffer(&command_output); + + (void)remote_command( + host, + runtime_options.remote_user, + command.data, + &command_output); + + p = command_output.data; + + termPQExpBuffer(&command); + + for (j = 0; j < nodes.node_count; j++) + { + if (sscanf(p, "%d,%d", &x, &y) != 2) + { + fprintf(stderr, _("cannot parse --csv output: %s\n"), p); + PQfinish(conn); + exit(ERR_INTERNAL); + } + + matrix_set_node_status(matrix_rec_list, + nodes.node_count, + connection_node_id, + x, + (y == -1) ? -1 : 0 ); + + while (*p && (*p != '\n')) + p++; + if (*p == '\n') + p++; + } + + PQfinish(conn); + } + + *matrix_rec_dest = matrix_rec_list; + + return nodes.node_count; +} + + +static int +build_cluster_crosscheck(t_node_status_cube ***dest_cube, int *name_length) +{ + PGconn *conn; + int h, i, j; + NodeInfoList nodes = T_NODE_INFO_LIST_INITIALIZER; + NodeInfoListCell *cell; + + t_node_status_cube **cube; + + /* We need to connect to get the list of nodes */ + log_info(_("connecting to database\n")); + + if (strlen(config_file_options.conninfo)) + conn = establish_db_connection(config_file_options.conninfo, true); + else + conn = establish_db_connection_by_params(&source_conninfo, true); + + get_all_node_records(conn, &nodes); + + PQfinish(conn); + + if (nodes.node_count == 0) + { + log_error(_("unable to retrieve any node records")); + exit(ERR_BAD_CONFIG); + } + + /* + * Allocate an empty cube matrix structure + * + * -2 == NULL + * -1 == Error + * 0 == OK + */ + + cube = (t_node_status_cube **) pg_malloc(sizeof(t_node_status_cube *) * nodes.node_count); + + h = 0; + + for (cell = nodes.head; cell; cell = cell->next) + { + int name_length_cur; + NodeInfoListCell *cell_i; + + cube[h] = (t_node_status_cube *) pg_malloc(sizeof(t_node_status_cube)); + cube[h]->node_id = cell->node_info->node_id; + strncpy(cube[h]->node_name, cell->node_info->node_name, MAXLEN); + + /* + * Find the maximum length of a node name + */ + name_length_cur = strlen(cube[h]->node_name); + if (name_length_cur > *name_length) + *name_length = name_length_cur; + + cube[h]->matrix_list_rec = (t_node_matrix_rec **) pg_malloc(sizeof(t_node_matrix_rec) * nodes.node_count); + + i = 0; + for (cell_i = nodes.head; cell_i; cell_i = cell_i->next) + { + NodeInfoListCell *cell_j; + + cube[h]->matrix_list_rec[i] = (t_node_matrix_rec *) pg_malloc0(sizeof(t_node_matrix_rec)); + cube[h]->matrix_list_rec[i]->node_id = cell_i->node_info->node_id; + + /* we don't need the name here */ + cube[h]->matrix_list_rec[i]->node_name[0] = '\0'; + + cube[h]->matrix_list_rec[i]->node_status_list = (t_node_status_rec **) pg_malloc0(sizeof(t_node_status_rec) * nodes.node_count); + + j = 0; + + for (cell_j = nodes.head; cell_j; cell_j = cell_j->next) + { + cube[h]->matrix_list_rec[i]->node_status_list[j] = (t_node_status_rec *) pg_malloc0(sizeof(t_node_status_rec)); + cube[h]->matrix_list_rec[i]->node_status_list[j]->node_id = cell_j->node_info->node_id; + cube[h]->matrix_list_rec[i]->node_status_list[j]->node_status = -2; /* default unknown */ + + j++; + } + + i++; + } + + h++; + } + + + /* + * Build the connection cube + */ + i = 0; + + for (cell = nodes.head; cell; cell = cell->next) + { + int remote_node_id; + PQExpBufferData command; + PQExpBufferData command_output; + + char *p; + + remote_node_id = cell->node_info->node_id; + + initPQExpBuffer(&command); + + appendPQExpBuffer(&command, + "%s -d '%s' --node-id=%i ", + make_pg_path(progname()), + cell->node_info->conninfo, + remote_node_id); + + if (strlen(pg_bindir)) + { + appendPQExpBuffer(&command, + "--pg_bindir="); + appendShellString(&command, + pg_bindir); + appendPQExpBuffer(&command, + " "); + } + + appendPQExpBuffer(&command, + "cluster matrix --csv 2>/dev/null"); + + initPQExpBuffer(&command_output); + + /* fix to work with --node-id */ + if (cube[i]->node_id == config_file_options.node_id) + { + (void)local_command( + command.data, + &command_output); + } + else + { + t_conninfo_param_list remote_conninfo; + char *host; + PQExpBufferData quoted_command; + + initPQExpBuffer("ed_command); + appendPQExpBuffer("ed_command, + "\"%s\"", + command.data); + + initialize_conninfo_params(&remote_conninfo, false); + parse_conninfo_string(cell->node_info->conninfo, + &remote_conninfo, + NULL, + false); + + host = param_get(&remote_conninfo, "host"); + + log_verbose(LOG_DEBUG, "build_cluster_crosscheck(): executing\n %s", quoted_command.data); + + (void)remote_command( + host, + runtime_options.remote_user, + quoted_command.data, + &command_output); + + termPQExpBuffer("ed_command); + } + + p = command_output.data; + + if(!strlen(command_output.data)) + { + continue; + } + + for (j = 0; j < (nodes.node_count * nodes.node_count); j++) + { + int matrix_rec_node_id; + int node_status_node_id; + int node_status; + + if (sscanf(p, "%d,%d,%d", &matrix_rec_node_id, &node_status_node_id, &node_status) != 3) + { + fprintf(stderr, _("cannot parse --csv output: %s\n"), p); + exit(ERR_INTERNAL); + } + + cube_set_node_status(cube, + nodes.node_count, + remote_node_id, + matrix_rec_node_id, + node_status_node_id, + node_status); + + while (*p && (*p != '\n')) + p++; + if (*p == '\n') + p++; + } + + i++; + } + + *dest_cube = cube; + return nodes.node_count; +} + + +static void +cube_set_node_status(t_node_status_cube **cube, int n, int execute_node_id, int matrix_node_id, int connection_node_id, int connection_status) +{ + int h, i, j; + + + for (h = 0; h < n; h++) + { + if (cube[h]->node_id == execute_node_id) + { + for (i = 0; i < n; i++) + { + if (cube[h]->matrix_list_rec[i]->node_id == matrix_node_id) + { + for (j = 0; j < n; j++) + { + if (cube[h]->matrix_list_rec[i]->node_status_list[j]->node_id == connection_node_id) + { + cube[h]->matrix_list_rec[i]->node_status_list[j]->node_status = connection_status; + break; + } + } + break; + } + } + } + } +} diff --git a/repmgr-action-cluster.h b/repmgr-action-cluster.h index cc58ce0d..90934182 100644 --- a/repmgr-action-cluster.h +++ b/repmgr-action-cluster.h @@ -6,8 +6,34 @@ #ifndef _REPMGR_ACTION_CLUSTER_H_ #define _REPMGR_ACTION_CLUSTER_H_ + + +typedef struct +{ + int node_id; + int node_status; +} t_node_status_rec; + +typedef struct +{ + int node_id; + char node_name[MAXLEN]; + t_node_status_rec **node_status_list; +} t_node_matrix_rec; + +typedef struct +{ + int node_id; + char node_name[MAXLEN]; + t_node_matrix_rec **matrix_list_rec; +} t_node_status_cube; + + + extern void do_cluster_show(void); extern void do_cluster_event(void); +extern void do_cluster_crosscheck(void); +extern void do_cluster_matrix(void); #endif diff --git a/repmgr-client-global.h b/repmgr-client-global.h index bf060abf..78092ecd 100644 --- a/repmgr-client-global.h +++ b/repmgr-client-global.h @@ -137,9 +137,12 @@ extern int copy_remote_files(char *host, char *remote_user, char *remote_path, extern void print_error_list(ItemList *error_list, int log_level); -extern char * make_pg_path(char *file); +extern char *make_pg_path(const char *file); extern bool create_recovery_file(const char *data_dir, t_conninfo_param_list *recovery_conninfo); extern void get_superuser_connection(PGconn **conn, PGconn **superuser_conn, PGconn **privileged_conn); + +extern bool remote_command(const char *host, const char *user, const char *command, PQExpBufferData *outputbuf); + #endif diff --git a/repmgr-client.c b/repmgr-client.c index a5f499be..81b3be90 100644 --- a/repmgr-client.c +++ b/repmgr-client.c @@ -21,6 +21,8 @@ * * CLUSTER SHOW * CLUSTER EVENT + * CLUSTER CROSSCHECK + * CLUSTER MATRIX */ #include @@ -46,7 +48,7 @@ t_runtime_options runtime_options = T_RUNTIME_OPTIONS_INITIALIZER; t_configuration_options config_file_options = T_CONFIGURATION_OPTIONS_INITIALIZER; -/* conninfo params for the node we're cloning from */ +/* conninfo params for the node we're operating on */ t_conninfo_param_list source_conninfo; bool config_file_required = true; @@ -565,6 +567,7 @@ main(int argc, char **argv) * { PRIMARY | MASTER } REGISTER | * STANDBY {REGISTER | UNREGISTER | CLONE [node] | PROMOTE | FOLLOW [node] | SWITCHOVER | REWIND} | * BDR { REGISTER | UNREGISTER } | + * NODE { STATUS } | * CLUSTER { CROSSCHECK | MATRIX | SHOW | CLEANUP | EVENT } * * [node] is an optional hostname, provided instead of the -h/--host optipn @@ -654,7 +657,10 @@ main(int argc, char **argv) /* allow "CLUSTER EVENTS" as synonym for "CLUSTER EVENT" */ else if (strcasecmp(repmgr_action, "EVENTS") == 0) action = CLUSTER_EVENT; - + else if (strcasecmp(repmgr_action, "CROSSCHECK") == 0) + action = CLUSTER_CROSSCHECK; + else if (strcasecmp(repmgr_action, "MATRIX") == 0) + action = CLUSTER_MATRIX; } else { @@ -840,7 +846,11 @@ main(int argc, char **argv) RecordStatus record_status; log_verbose(LOG_DEBUG, "connecting to local node to retrieve record for node specified with --node-id or --node-name"); - conn = establish_db_connection(config_file_options.conninfo, true); + + if (strlen(config_file_options.conninfo)) + conn = establish_db_connection(config_file_options.conninfo, true); + else + conn = establish_db_connection_by_params(&source_conninfo, true); if (runtime_options.node_id != UNKNOWN_NODE_ID) { @@ -965,6 +975,12 @@ main(int argc, char **argv) case CLUSTER_EVENT: do_cluster_event(); break; + case CLUSTER_CROSSCHECK: + do_cluster_crosscheck(); + break; + case CLUSTER_MATRIX: + do_cluster_matrix(); + break; default: /* An action will have been determined by this point */ @@ -1078,6 +1094,8 @@ check_cli_parameters(const int action) } case CLUSTER_SHOW: + case CLUSTER_MATRIX: + case CLUSTER_CROSSCHECK: if (runtime_options.connection_param_provided) config_file_required = false; break; @@ -1100,6 +1118,8 @@ check_cli_parameters(const int action) case STANDBY_CLONE: case STANDBY_FOLLOW: case CLUSTER_SHOW: + case CLUSTER_MATRIX: + case CLUSTER_CROSSCHECK: break; default: item_list_append_format(&cli_warnings, @@ -1136,6 +1156,8 @@ check_cli_parameters(const int action) case PRIMARY_UNREGISTER: case STANDBY_UNREGISTER: case CLUSTER_EVENT: + case CLUSTER_MATRIX: + case CLUSTER_CROSSCHECK: break; default: item_list_append_format(&cli_warnings, @@ -1259,14 +1281,34 @@ action_name(const int action) { case PRIMARY_REGISTER: return "PRIMARY REGISTER"; + case PRIMARY_UNREGISTER: + return "PRIMARY UNREGISTER"; + case STANDBY_CLONE: return "STANDBY CLONE"; case STANDBY_REGISTER: return "STANDBY REGISTER"; case STANDBY_UNREGISTER: return "STANDBY UNREGISTER"; + case STANDBY_PROMOTE: + return "STANDBY PROMOTE"; + case STANDBY_FOLLOW: + return "STANDBY FOLLOW"; + + case BDR_REGISTER: + return "BDR REGISTER" +; case BDR_UNREGISTER: + return "BDR UNREGISTER"; + + case CLUSTER_SHOW: + return "CLUSTER SHOW"; case CLUSTER_EVENT: return "CLUSTER EVENT"; + case CLUSTER_MATRIX: + return "CLUSTER MATRIX"; + case CLUSTER_CROSSCHECK: + return "CLUSTER CROSSCHECK"; + } return "UNKNOWN ACTION"; @@ -2072,7 +2114,7 @@ get_standby_clone_mode(void) char * -make_pg_path(char *file) +make_pg_path(const char *file) { maxlen_snprintf(path_buf, "%s%s", pg_bindir, file); @@ -2373,3 +2415,77 @@ write_primary_conninfo(char *line, t_conninfo_param_list *param_list) } +/* + * Execute a command via ssh on the remote host. + * + * TODO: implement SSH calls using libssh2. + */ +bool +remote_command(const char *host, const char *user, const char *command, PQExpBufferData *outputbuf) +{ + FILE *fp; + char ssh_command[MAXLEN]; + PQExpBufferData ssh_host; + + char output[MAXLEN]; + + initPQExpBuffer(&ssh_host); + + if (*user != '\0') + { + appendPQExpBuffer(&ssh_host, "%s@", user); + } + + appendPQExpBuffer(&ssh_host, "%s",host); + + maxlen_snprintf(ssh_command, + "ssh -o Batchmode=yes %s %s %s", + config_file_options.ssh_options, + ssh_host.data, + command); + + termPQExpBuffer(&ssh_host); + + log_debug("remote_command(): %s\n", ssh_command); + + fp = popen(ssh_command, "r"); + + if (fp == NULL) + { + log_error(_("unable to execute remote command:\n %s"), ssh_command); + return false; + } + + if (outputbuf != NULL) + { + /* TODO: better error handling */ + while (fgets(output, MAXLEN, fp) != NULL) + { + appendPQExpBuffer(outputbuf, "%s", output); + } + } + else + { + /* + * When executed remotely, repmgr commands which execute pg_ctl (particularly + * `repmgr standby follow`) will see the pg_ctl command appear to fail with a + * non-zero return code when the output from the executed pg_ctl command + * has nowhere to go, even though the command actually succeeds. We'll consume an + * arbitrary amount of output and throw it away to work around this. + */ + int i = 0; + while (fgets(output, MAXLEN, fp) != NULL && i < 10) + { + i++; + } + } + + pclose(fp); + + if (outputbuf != NULL) + log_verbose(LOG_DEBUG, "remote_command(): output returned was:\n %s", outputbuf->data); + + return true; +} + +