diff --git a/.gitignore b/.gitignore index 0810037d..4c21bfca 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,9 @@ *~ *.o +*.so repmgr repmgrd README.htm* README.pdf +sql/repmgr_funcs.so +sql/repmgr_funcs.sql diff --git a/Makefile b/Makefile index d225da13..76f62c97 100644 --- a/Makefile +++ b/Makefile @@ -11,9 +11,11 @@ PG_CPPFLAGS = -I$(libpq_srcdir) PG_LIBS = $(libpq_pgport) all: repmgrd repmgr + $(MAKE) -C sql repmgrd: $(repmgrd_OBJS) $(CC) $(CFLAGS) $(repmgrd_OBJS) $(PG_LIBS) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o repmgrd + $(MAKE) -C sql repmgr: $(repmgr_OBJS) $(CC) $(CFLAGS) $(repmgr_OBJS) $(PG_LIBS) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o repmgr @@ -33,6 +35,7 @@ endif install: $(INSTALL_PROGRAM) repmgrd$(X) '$(DESTDIR)$(bindir)' $(INSTALL_PROGRAM) repmgr$(X) '$(DESTDIR)$(bindir)' + $(MAKE) -C sql install ifneq (,$(DATA)$(DATA_built)) @for file in $(addprefix $(srcdir)/, $(DATA)) $(DATA_built); do \ @@ -45,10 +48,17 @@ clean: rm -f *.o rm -f repmgrd rm -f repmgr + $(MAKE) -C sql clean deb: repmgrd repmgr mkdir -p ./debian/usr/bin cp repmgrd repmgr ./debian/usr/bin/ + mkdir -p ./debian/usr/share/postgresql/9.0/contrib/ + cp sql/repmgr_funcs.sql ./debian/usr/share/postgresql/9.0/contrib/ + cp sql/uninstall_repmgr_funcs.sql ./debian/usr/share/postgresql/9.0/contrib/ + mkdir -p ./debian/usr/lib/postgresql/9.0/lib/ + cp sql/repmgr_funcs.so ./debian/usr/lib/postgresql/9.0/lib/ dpkg-deb --build debian mv debian.deb ../postgresql-repmgr-9.0_1.0.0.deb + rm -rf ./debian/usr diff --git a/check_dir.c b/check_dir.c index ccf94abe..ab25edde 100644 --- a/check_dir.c +++ b/check_dir.c @@ -31,8 +31,6 @@ #include "strutil.h" #include "log.h" -static int mkdir_p(char *path, mode_t omode); - /* * make sure the directory either doesn't exist or is empty * we use this function to check the new data directory and @@ -124,7 +122,7 @@ set_directory_permissions(char *dir) * note that on failure, the path arg has been modified to show the particular * directory level we had problems with. */ -static int +int mkdir_p(char *path, mode_t omode) { struct stat sb; @@ -225,8 +223,85 @@ is_pg_dir(char *dir) const size_t buf_sz = 8192; char path[buf_sz]; struct stat sb; + int r; + // test pgdata xsnprintf(path, buf_sz, "%s/PG_VERSION", dir); + if (stat(path, &sb) == 0) + return true; - return (stat(path, &sb) == 0) ? true : false; + // test tablespace dir + sprintf(path, "ls %s/PG_9.0_*/ -I*", dir); + r = system(path); + if (r == 0) + return true; + + return false; +} + + +bool +create_pgdir(char *dir, bool force) +{ + bool pg_dir = false; + + /* Check this directory could be used as a PGDATA dir */ + switch (check_dir(dir)) + { + case 0: + /* dir not there, must create it */ + log_info(_("creating directory \"%s\"...\n"), dir); + + if (!create_directory(dir)) + { + log_err(_("couldn't create directory \"%s\"...\n"), + dir); + exit(ERR_BAD_CONFIG); + } + break; + case 1: + /* Present but empty, fix permissions and use it */ + log_info(_("checking and correcting permissions on existing directory %s ...\n"), + dir); + + if (!set_directory_permissions(dir)) + { + log_err(_("could not change permissions of directory \"%s\": %s\n"), + dir, strerror(errno)); + exit(ERR_BAD_CONFIG); + } + break; + case 2: + /* Present and not empty */ + log_warning(_("directory \"%s\" exists but is not empty\n"), + dir); + + pg_dir = is_pg_dir(dir); + + /* + * we use force to reduce the time needed to restore a node which + * turn async after a failover or anything else + */ + if (pg_dir && force) + { + /* Let it continue */ + break; + } + else if (pg_dir && !force) + { + log_warning(_("\nThis looks like a PostgreSQL directory.\n" + "If you are sure you want to clone here, " + "please check there is no PostgreSQL server " + "running and use the --force option\n")); + exit(ERR_BAD_CONFIG); + } + + return false; + default: + /* Trouble accessing directory */ + log_err(_("could not access directory \"%s\": %s\n"), + dir, strerror(errno)); + exit(ERR_BAD_CONFIG); + } + return true; } diff --git a/check_dir.h b/check_dir.h index 37a14fe5..d0493dee 100644 --- a/check_dir.h +++ b/check_dir.h @@ -20,9 +20,11 @@ #ifndef _REPMGR_CHECK_DIR_H_ #define _REPMGR_CHECK_DIR_H_ +int mkdir_p(char *path, mode_t omode); int check_dir(char *dir); bool create_directory(char *dir); bool set_directory_permissions(char *dir); bool is_pg_dir(char *dir); +bool create_pgdir(char *dir, bool force); #endif diff --git a/config.c b/config.c index 401c4776..f58c80ab 100644 --- a/config.c +++ b/config.c @@ -35,6 +35,10 @@ parse_config(const char *config_file, t_configuration_options *options) memset(options->cluster_name, 0, sizeof(options->cluster_name)); options->node = -1; memset(options->conninfo, 0, sizeof(options->conninfo)); + options->failover = MANUAL_FAILOVER; + options->priority = 0; + memset(options->promote_command, 0, sizeof(options->promote_command)); + memset(options->follow_command, 0, sizeof(options->follow_command)); memset(options->rsync_options, 0, sizeof(options->rsync_options)); /* @@ -70,6 +74,27 @@ parse_config(const char *config_file, t_configuration_options *options) strncpy (options->loglevel, value, MAXLEN); else if (strcmp(name, "logfacility") == 0) strncpy (options->logfacility, value, MAXLEN); + else if (strcmp(name, "failover") == 0) + { + char failoverstr[MAXLEN]; + strncpy(failoverstr, value, MAXLEN); + + if (strcmp(failoverstr, "manual") == 0) + options->failover = MANUAL_FAILOVER; + else if (strcmp(failoverstr, "automatic") == 0) + options->failover = AUTOMATIC_FAILOVER; + else + { + log_warning(_("value for failover option is incorrect, it should be automatic or manual. Defaulting to manual.\n")); + options->failover = MANUAL_FAILOVER; + } + } + else if (strcmp(name, "priority") == 0) + options->priority = atoi(value); + else if (strcmp(name, "promote_command") == 0) + strncpy(options->promote_command, value, MAXLEN); + else if (strcmp(name, "follow_command") == 0) + strncpy(options->follow_command, value, MAXLEN); else log_warning(_("%s/%s: Unknown name/value pair!\n"), name, value); } @@ -145,3 +170,72 @@ parse_line(char *buff, char *name, char *value) trim(value); } +<<<<<<< HEAD:config.c + +bool +reload_configuration(char *config_file, t_configuration_options *orig_options) +{ + PGconn *conn; + + t_configuration_options new_options; + + /* + * Re-read the configuration file: repmgr.conf + */ + log_info(_("Reloading configuration file and updating repmgr tables\n")); + parse_config(config_file, &new_options); + if (new_options.node == -1) + { + log_warning(_("\nCannot load new configuration, will keep current one.\n")); + return false; + } + + if (strcmp(new_options.cluster_name, orig_options->cluster_name) != 0) + { + log_warning(_("\nCannot change cluster name, will keep current configuration.\n")); + return false; + } + + if (new_options.node != orig_options->node) + { + log_warning(_("\nCannot change node number, will keep current configuration.\n")); + return false; + } + + if (new_options.failover != MANUAL_FAILOVER && new_options.failover != AUTOMATIC_FAILOVER) + { + log_warning(_("\nNew value for failover is not valid. Should be MANUAL or AUTOMATIC.\n")); + return false; + } + + /* Test conninfo string */ + conn = establishDBConnection(new_options.conninfo, false); + if (!conn || (PQstatus(conn) != CONNECTION_OK)) + { + log_warning(_("\nconninfo string is not valid, will keep current configuration.\n")); + return false; + } + PQfinish(conn); + + /* Configuration seems ok, will load new values */ + strcpy(orig_options->cluster_name, new_options.cluster_name); + orig_options->node = new_options.node; + strcpy(orig_options->conninfo, new_options.conninfo); + orig_options->failover = new_options.failover; + orig_options->priority = new_options.priority; + strcpy(orig_options->promote_command, new_options.promote_command); + strcpy(orig_options->follow_command, new_options.follow_command); + strcpy(orig_options->rsync_options, new_options.rsync_options); + /* + * XXX These ones can change with a simple SIGHUP? + + strcpy (orig_options->loglevel, new_options.loglevel); + strcpy (orig_options->logfacility, new_options.logfacility); + + logger_shutdown(); + XXX do we have progname here ? + logger_init(progname, orig_options.loglevel, orig_options.logfacility); + */ + + return true; +} diff --git a/config.h b/config.h index a3dfa8f5..f2efd99a 100644 --- a/config.h +++ b/config.h @@ -28,6 +28,10 @@ typedef struct char cluster_name[MAXLEN]; int node; char conninfo[MAXLEN]; + int failover; + int priority; + char promote_command[MAXLEN]; + char follow_command[MAXLEN]; char loglevel[MAXLEN]; char logfacility[MAXLEN]; char rsync_options[QUERY_STR_LEN]; @@ -36,5 +40,6 @@ typedef struct void parse_config(const char *config_file, t_configuration_options *options); void parse_line(char *buff, char *name, char *value); char *trim(char *s); +bool reload_configuration(char *config_file, t_configuration_options *orig_options); #endif diff --git a/dbutils.c b/dbutils.c index 746bb983..8e64699a 100644 --- a/dbutils.c +++ b/dbutils.c @@ -26,7 +26,7 @@ establishDBConnection(const char *conninfo, const bool exit_on_error) { /* Make a connection to the database */ PGconn *conn = NULL; - char connection_string[MAXLEN]; + char connection_string[MAXLEN]; strcpy(connection_string, conninfo); strcat(connection_string, " fallback_application_name='repmgr'"); @@ -96,6 +96,84 @@ is_standby(PGconn *conn) } + +bool +is_witness(PGconn *conn, char *schema, char *cluster, int node_id) +{ + PGresult *res; + bool result; + char sqlquery[QUERY_STR_LEN]; + + sqlquery_snprintf(sqlquery, "SELECT witness from %s.repl_nodes where cluster = '%s' and id = %d", + schema, cluster, node_id); + res = PQexec(conn, sqlquery); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + log_err(_("Can't query server mode: %s"), PQerrorMessage(conn)); + PQclear(res); + PQfinish(conn); + exit(ERR_DB_QUERY); + } + + if (strcmp(PQgetvalue(res, 0, 0), "f") == 0) + result = false; + else + result = true; + + PQclear(res); + return result; +} + + +/* check the PQStatus and try to 'select 1' to confirm good connection */ +bool +is_pgup(PGconn *conn) +{ + PGresult *res; + char sqlquery[QUERY_STR_LEN]; + /* Check the connection status twice in case it changes after reset */ + bool twice = false; + + /* Check the connection status twice in case it changes after reset */ + for (;;) + { + if (PQstatus(conn) != CONNECTION_OK) + { + if (twice) + return false; + PQreset(conn); // reconnect + twice = true; + } + else + { + /* + * Send a SELECT 1 just to check if connection is OK + * the PQstatus() won't catch disconnected connection + * XXXX + * the error message can be used by repmgrd + */ + sqlquery_snprintf(sqlquery, "SELECT 1"); + res = PQexec(conn, sqlquery); + // we need to retry, because we might just have loose the connection once + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + log_err(_("PQexec failed: %s\n"), PQerrorMessage(conn)); + PQclear(res); + if (twice) + return false; + PQreset(conn); // reconnect + twice = true; + } + else + { + PQclear(res); + return true; + } + } + } +} + + /* * If postgreSQL version is 9 or superior returns the major version * if 8 or inferior returns an empty string @@ -207,7 +285,7 @@ get_cluster_size(PGconn *conn) * connection string is placed there. */ PGconn * -getMasterConnection(PGconn *standby_conn, int id, char *cluster, +getMasterConnection(PGconn *standby_conn, char *schema, int id, char *cluster, int *master_id, char *master_conninfo_out) { PGconn *master_conn = NULL; @@ -216,7 +294,6 @@ getMasterConnection(PGconn *standby_conn, int id, char *cluster, char sqlquery[QUERY_STR_LEN]; char master_conninfo_stack[MAXCONNINFO]; char *master_conninfo = &*master_conninfo_stack; - char schema_str[MAXLEN]; char schema_quoted[MAXLEN]; int i; @@ -233,10 +310,9 @@ getMasterConnection(PGconn *standby_conn, int id, char *cluster, * * Assemble the unquoted schema name */ - maxlen_snprintf(schema_str, "repmgr_%s", cluster); { - char *identifier = PQescapeIdentifier(standby_conn, schema_str, - strlen(schema_str)); + char *identifier = PQescapeIdentifier(standby_conn, schema, + strlen(schema)); maxlen_snprintf(schema_quoted, "%s", identifier); PQfreemem(identifier); @@ -247,7 +323,7 @@ getMasterConnection(PGconn *standby_conn, int id, char *cluster, cluster); sqlquery_snprintf(sqlquery, "SELECT * FROM %s.repl_nodes " - " WHERE cluster = '%s' and id <> %d", + " WHERE cluster = '%s' and id <> %d and not witness", schema_quoted, cluster, id); res1 = PQexec(standby_conn, sqlquery); diff --git a/dbutils.h b/dbutils.h index 7eb68ed6..83e15f86 100644 --- a/dbutils.h +++ b/dbutils.h @@ -24,12 +24,14 @@ PGconn *establishDBConnection(const char *conninfo, const bool exit_on_error); PGconn *establishDBConnectionByParams(const char *keywords[], const char *values[], const bool exit_on_error); -bool is_standby(PGconn *conn); +bool is_standby(PGconn *conn); +bool is_witness(PGconn *conn, char *schema, char *cluster, int node_id); +bool is_pgup(PGconn *conn); char *pg_version(PGconn *conn, char* major_version); bool guc_setted(PGconn *conn, const char *parameter, const char *op, const char *value); const char *get_cluster_size(PGconn *conn); -PGconn *getMasterConnection(PGconn *standby_conn, int id, char *cluster, +PGconn *getMasterConnection(PGconn *standby_conn, char *schema, int id, char *cluster, int *master_id, char *master_conninfo_out); #endif diff --git a/debian/DEBIAN/control b/debian/DEBIAN/control index 6fc360d8..4cadb9e1 100644 --- a/debian/DEBIAN/control +++ b/debian/DEBIAN/control @@ -1,4 +1,4 @@ -Package: repmgr +Package: repmgr-auto Version: 1.0-1 Section: database Priority: optional diff --git a/errcode.h b/errcode.h index 49433dae..d103bcb9 100644 --- a/errcode.h +++ b/errcode.h @@ -33,5 +33,6 @@ #define ERR_PROMOTED 8 #define ERR_BAD_PASSWORD 9 #define ERR_STR_OVERFLOW 10 +#define ERR_FAILOVER_FAIL 11 #endif /* _ERRCODE_H_ */ diff --git a/repmgr.c b/repmgr.c index 9da316cc..9ac49145 100644 --- a/repmgr.c +++ b/repmgr.c @@ -6,8 +6,9 @@ * hot standby servers for an HA environment * * Commands implemented are. - * MASTER REGISTER, STANDBY REGISTER, STANDBY CLONE, STANDBY FOLLOW, - * STANDBY PROMOTE + * MASTER REGISTER + * STANDBY REGISTER, STANDBY CLONE, STANDBY FOLLOW, STANDBY PROMOTE + * WITNESS CREATE * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -45,18 +46,23 @@ #define STANDBY_CLONE 3 #define STANDBY_PROMOTE 4 #define STANDBY_FOLLOW 5 +#define WITNESS_CREATE 6 static bool create_recovery_file(const char *data_dir, char *master_conninfo); static int test_ssh_connection(char *host, char *remote_user); static int copy_remote_files(char *host, char *remote_user, char *remote_path, char *local_path, bool is_directory); static bool check_parameters_for_action(const int action); +static bool create_schema(PGconn *conn); +static bool copy_configuration(PGconn *masterconn, PGconn *witnessconn); static void do_master_register(void); static void do_standby_register(void); static void do_standby_clone(void); static void do_standby_promote(void); static void do_standby_follow(void); +static void do_witness_create(void); + static void usage(void); static void help(const char *progname); @@ -72,7 +78,7 @@ bool require_password = false; /* Initialization of runtime options */ t_runtime_options runtime_options = { "", "", "", "", "", "", DEFAULT_WAL_KEEP_SEGMENTS, false, false, false, "" }; -t_configuration_options options = { "", -1, "", "", "" }; +t_configuration_options options = { "", -1, "", MANUAL_FAILOVER, -1, "", "", "", "", "" }; static char *server_mode = NULL; static char *server_cmd = NULL; @@ -87,6 +93,7 @@ main(int argc, char **argv) {"port", required_argument, NULL, 'p'}, {"username", required_argument, NULL, 'U'}, {"dest-dir", required_argument, NULL, 'D'}, + {"local-port", required_argument, NULL, 'l'}, {"config-file", required_argument, NULL, 'f'}, {"remote-user", required_argument, NULL, 'R'}, {"wal-keep-segments", required_argument, NULL, 'w'}, @@ -117,7 +124,7 @@ main(int argc, char **argv) } - while ((c = getopt_long(argc, argv, "d:h:p:U:D:f:R:w:F:I:v", long_options, + while ((c = getopt_long(argc, argv, "d:h:p:U:D:l:f:R:w:F:I:v", long_options, &optindex)) != -1) { switch (c) @@ -138,6 +145,10 @@ main(int argc, char **argv) case 'D': strncpy(runtime_options.dest_dir, optarg, MAXFILENAME); break; + case 'l': + if (atoi(optarg) > 0) + strncpy(runtime_options.localport, optarg, MAXLEN); + break; case 'f': strncpy(runtime_options.config_file, optarg, MAXLEN); break; @@ -166,7 +177,8 @@ main(int argc, char **argv) /* * Now we need to obtain the action, this comes in one of these forms: * MASTER REGISTER | - * STANDBY {REGISTER | CLONE [node] | PROMOTE | FOLLOW [node]} + * STANDBY {REGISTER | CLONE [node] | PROMOTE | FOLLOW [node]} | + * WITNESS CREATE * * the node part is optional, if we receive it then we shouldn't * have received a -h option @@ -174,8 +186,8 @@ main(int argc, char **argv) if (optind < argc) { server_mode = argv[optind++]; - if (strcasecmp(server_mode, "STANDBY") != 0 && - strcasecmp(server_mode, "MASTER") != 0) + if (strcasecmp(server_mode, "STANDBY") != 0 && strcasecmp(server_mode, "MASTER") != 0 && + strcasecmp(server_mode, "WITNESS") != 0) { usage(); exit(ERR_BAD_CONFIG); @@ -185,30 +197,32 @@ main(int argc, char **argv) if (optind < argc) { server_cmd = argv[optind++]; - if (strcasecmp(server_cmd, "REGISTER") == 0) + /* check posibilities for all server modes */ + if (strcasecmp(server_mode, "MASTER") == 0) { - /* - * we don't use this info in any other place so i will - * just execute the compare again instead of having an - * additional variable to hold a value that we will use - * no more - */ - if (strcasecmp(server_mode, "MASTER") == 0) + if (strcasecmp(server_cmd, "REGISTER") == 0) action = MASTER_REGISTER; - else if (strcasecmp(server_mode, "STANDBY") == 0) - action = STANDBY_REGISTER; } - else if (strcasecmp(server_cmd, "CLONE") == 0) - action = STANDBY_CLONE; - else if (strcasecmp(server_cmd, "PROMOTE") == 0) - action = STANDBY_PROMOTE; - else if (strcasecmp(server_cmd, "FOLLOW") == 0) - action = STANDBY_FOLLOW; - else + else if (strcasecmp(server_mode, "STANDBY") == 0) { - usage(); - exit(ERR_BAD_CONFIG); + if (strcasecmp(server_cmd, "REGISTER") == 0) + action = STANDBY_REGISTER; + else if (strcasecmp(server_cmd, "CLONE") == 0) + action = STANDBY_CLONE; + else if (strcasecmp(server_cmd, "PROMOTE") == 0) + action = STANDBY_PROMOTE; + else if (strcasecmp(server_cmd, "FOLLOW") == 0) + action = STANDBY_FOLLOW; } + else if (strcasecmp(server_mode, "WITNESS") == 0) + if (strcasecmp(server_cmd, "CREATE") == 0) + action = WITNESS_CREATE; + } + + if (action == NO_ACTION) + { + usage(); + exit(ERR_BAD_CONFIG); } /* For some actions we still can receive a last argument */ @@ -257,6 +271,10 @@ main(int argc, char **argv) if (runtime_options.verbose) printf(_("Opening configuration file: %s\n"), runtime_options.config_file); + /* + * XXX Do not read config files for action where it is not required (clone + * for example). + */ parse_config(runtime_options.config_file, &options); keywords[2] = "user"; @@ -314,6 +332,9 @@ main(int argc, char **argv) case STANDBY_FOLLOW: do_standby_follow(); break; + case WITNESS_CREATE: + do_witness_create(); + break; default: usage(); exit(ERR_BAD_CONFIG); @@ -399,65 +420,8 @@ do_master_register(void) log_info(_("master register: creating database objects inside the %s schema\n"), repmgr_schema); /* ok, create the schema */ - sqlquery_snprintf(sqlquery, "CREATE SCHEMA %s", repmgr_schema); - log_debug("master register: %s\n", sqlquery); - if (!PQexec(conn, sqlquery)) - { - log_err(_("Cannot create the schema %s: %s\n"), - repmgr_schema, PQerrorMessage(conn)); - PQfinish(conn); - exit(ERR_BAD_CONFIG); - } - - /* ... the tables */ - sqlquery_snprintf(sqlquery, "CREATE TABLE %s.repl_nodes ( " - " id integer primary key, " - " cluster text not null, " - " conninfo text not null)", repmgr_schema); - log_debug("master register: %s\n", sqlquery); - if (!PQexec(conn, sqlquery)) - { - log_err(_("Cannot create the table %s.repl_nodes: %s\n"), - repmgr_schema, PQerrorMessage(conn)); - PQfinish(conn); - exit(ERR_BAD_CONFIG); - } - - sqlquery_snprintf(sqlquery, "CREATE TABLE %s.repl_monitor ( " - " primary_node INTEGER NOT NULL, " - " standby_node INTEGER NOT NULL, " - " last_monitor_time TIMESTAMP WITH TIME ZONE NOT NULL, " - " last_wal_primary_location TEXT NOT NULL, " - " last_wal_standby_location TEXT NOT NULL, " - " replication_lag BIGINT NOT NULL, " - " apply_lag BIGINT NOT NULL) ", repmgr_schema); - log_debug("master register: %s\n", sqlquery); - if (!PQexec(conn, sqlquery)) - { - log_err(_("Cannot create the table %s.repl_monitor: %s\n"), - repmgr_schema, PQerrorMessage(conn)); - PQfinish(conn); - exit(ERR_BAD_CONFIG); - } - - /* and the view */ - sqlquery_snprintf(sqlquery, "CREATE VIEW %s.repl_status AS " - " WITH monitor_info AS (SELECT *, ROW_NUMBER() OVER (PARTITION BY primary_node, standby_node " - " ORDER BY last_monitor_time desc) " - " FROM %s.repl_monitor) " - " SELECT primary_node, standby_node, last_monitor_time, last_wal_primary_location, " - " last_wal_standby_location, pg_size_pretty(replication_lag) replication_lag, " - " pg_size_pretty(apply_lag) apply_lag, age(now(), last_monitor_time) AS time_lag " - " FROM monitor_info a " - " WHERE row_number = 1", repmgr_schema, repmgr_schema); - log_debug("master register: %s\n", sqlquery); - if (!PQexec(conn, sqlquery)) - { - log_err(_("Cannot create the view %s.repl_status: %s\n"), - repmgr_schema, PQerrorMessage(conn)); - PQfinish(conn); - exit(ERR_BAD_CONFIG); - } + if (!create_schema(conn)) + return; } else { @@ -465,7 +429,7 @@ do_master_register(void) int id; /* Ensure there isn't any other master already registered */ - master_conn = getMasterConnection(conn, options.node, + master_conn = getMasterConnection(conn, repmgr_schema, options.node, options.cluster_name, &id,NULL); if (master_conn != NULL) { @@ -492,9 +456,9 @@ do_master_register(void) } } - sqlquery_snprintf(sqlquery, "INSERT INTO %s.repl_nodes " - "VALUES (%d, '%s', '%s')", - repmgr_schema, options.node, options.cluster_name, options.conninfo); + sqlquery_snprintf(sqlquery, "INSERT INTO %s.repl_nodes (id, cluster, conninfo, priority) " + "VALUES (%d, '%s', '%s', %d)", + repmgr_schema, options.node, options.cluster_name, options.conninfo, options.priority); log_debug(_("master register: %s\n"), sqlquery); if (!PQexec(conn, sqlquery)) @@ -585,7 +549,7 @@ do_standby_register(void) /* check if there is a master in this cluster */ log_info(_("%s connecting to master database\n"), progname); - master_conn = getMasterConnection(conn, options.node, options.cluster_name, + master_conn = getMasterConnection(conn, repmgr_schema, options.node, options.cluster_name, &master_id, NULL); if (!master_conn) { @@ -634,10 +598,11 @@ do_standby_register(void) } } - sqlquery_snprintf(sqlquery, "INSERT INTO %s.repl_nodes " - "VALUES (%d, '%s', '%s')", - repmgr_schema, options.node, options.cluster_name, options.conninfo); + sqlquery_snprintf(sqlquery, "INSERT INTO %s.repl_nodes(id, cluster, conninfo, priority) " + "VALUES (%d, '%s', '%s', %d)", + repmgr_schema, options.node, options.cluster_name, options.conninfo, options.priority); log_debug(_("standby register: %s\n"), sqlquery); + if (!PQexec(master_conn, sqlquery)) { log_err(_("Cannot insert node details, %s\n"), @@ -650,6 +615,8 @@ do_standby_register(void) log_info(_("%s registering the standby complete\n"), progname); PQfinish(master_conn); PQfinish(conn); + log_notice(_("Standby node correctly registered for cluster %s with id %d (conninfo: %s)\n"), + options.cluster_name, options.node, options.conninfo); return; } @@ -663,79 +630,42 @@ do_standby_clone(void) int r = 0; int i; - bool pg_dir = false; + bool flag_success = false; + bool test_mode = false; + + char tblspc_dir[MAXFILENAME]; + char master_data_directory[MAXFILENAME]; - char master_config_file[MAXFILENAME]; - char master_hba_file[MAXFILENAME]; - char master_ident_file[MAXFILENAME]; + char local_data_directory[MAXFILENAME]; + char master_xlog_directory[MAXFILENAME]; + char local_xlog_directory[MAXFILENAME]; + char master_stats_temp_directory[MAXFILENAME]; + char local_stats_temp_directory[MAXFILENAME]; char master_control_file[MAXFILENAME]; char local_control_file[MAXFILENAME]; - char tblspc_dir[MAXFILENAME]; + char master_config_file[MAXFILENAME]; + char local_config_file[MAXFILENAME]; + char master_hba_file[MAXFILENAME]; + char local_hba_file[MAXFILENAME]; + char master_ident_file[MAXFILENAME]; + char local_ident_file[MAXFILENAME]; char *first_wal_segment = NULL; const char *last_wal_segment = NULL; - char master_version[MAXVERSIONSTR]; + char master_version[MAXVERSIONSTR]; - /* if dest_dir hasn't been provided, initialize to current directory */ - if (!runtime_options.dest_dir[0]) + /* + * if dest_dir has been provided, we copy everything in the same path + * if dest_dir is set and the master have tablespace, repmgr will stop + * because it is more complex to remap the path for the tablespaces and it + * does not look useful at the moment + */ + if (runtime_options.dest_dir[0]) { - strncpy(runtime_options.dest_dir, DEFAULT_DEST_DIR, MAXFILENAME); - } - - /* Check this directory could be used as a PGDATA dir */ - switch (check_dir(runtime_options.dest_dir)) - { - case 0: - /* dest_dir not there, must create it */ - log_info(_("creating directory %s ...\n"), runtime_options.dest_dir); - - if (!create_directory(runtime_options.dest_dir)) - { - log_err(_("%s: couldn't create directory %s ...\n"), - progname, runtime_options.dest_dir); - exit(ERR_BAD_CONFIG); - } - break; - case 1: - /* Present but empty, fix permissions and use it */ - log_info(_("checking and correcting permissions on existing directory %s ...\n"), - runtime_options.dest_dir); - - if (!set_directory_permissions(runtime_options.dest_dir)) - { - log_err(_("%s: could not change permissions of directory \"%s\": %s\n"), - progname, runtime_options.dest_dir, strerror(errno)); - exit(ERR_BAD_CONFIG); - } - break; - case 2: - /* Present and not empty */ - log_warning( _("%s: directory \"%s\" exists but is not empty\n"), - progname, runtime_options.dest_dir); - - pg_dir = is_pg_dir(runtime_options.dest_dir); - if (pg_dir && !runtime_options.force) - { - log_warning( _("\nThis looks like a PostgreSQL directory.\n" - "If you are sure you want to clone here, " - "please check there is no PostgreSQL server " - "running and use the --force option\n")); - exit(ERR_BAD_CONFIG); - } - else if (pg_dir && runtime_options.force) - { - /* Let it continue */ - break; - } - else - return; - default: - /* Trouble accessing directory */ - log_err( _("%s: could not access directory \"%s\": %s\n"), - progname, runtime_options.dest_dir, strerror(errno)); - exit(ERR_BAD_CONFIG); + test_mode = true; + log_notice(_("%s Destination directory %s provided, try to clone everything in it.\n"), progname, runtime_options.dest_dir); } /* Connection parameters for master only */ @@ -806,51 +736,22 @@ do_standby_clone(void) } for (i = 0; i < PQntuples(res); i++) { - strncpy(tblspc_dir, PQgetvalue(res, i, 0), MAXFILENAME); - /* Check this directory could be used as a PGDATA dir */ - switch (check_dir(tblspc_dir)) + if (test_mode) { - case 0: - /* tblspc_dir not there, must create it */ - log_info(_("creating directory \"%s\"... "), tblspc_dir); + log_err("Can't clone in test mode when master have tablespace\n"); + PQclear(res); + PQfinish(conn); + exit(ERR_BAD_CONFIG); + } - if (!create_directory(tblspc_dir)) - { - log_err(_("%s: couldn't create directory \"%s\"...\n"), - progname, tblspc_dir); - PQclear(res); - PQfinish(conn); - exit(ERR_BAD_CONFIG); - } - break; - case 1: - /* Present but empty, fix permissions and use it */ - log_info(_("fixing permissions on existing directory \"%s\"... "), - tblspc_dir); - - if (!set_directory_permissions(tblspc_dir)) - { - log_err(_("%s: could not change permissions of directory \"%s\": %s\n"), - progname, tblspc_dir, strerror(errno)); - PQclear(res); - PQfinish(conn); - exit(ERR_BAD_CONFIG); - } - break; - case 2: - /* Present and not empty */ - if (!runtime_options.force) - { - log_err(_("%s: directory \"%s\" exists but is not empty\n"), - progname, tblspc_dir); - PQclear(res); - PQfinish(conn); - exit(ERR_BAD_CONFIG); - } - default: - /* Trouble accessing directory */ - log_err(_("%s: could not access directory \"%s\": %s\n"), - progname, tblspc_dir, strerror(errno)); + strncpy(tblspc_dir, PQgetvalue(res, i, 0), MAXFILENAME); + /* + * Check this directory could be used for tablespace + * this will create the directory a bit too early + * XXX build an array of tablespace to create later in the backup + */ + if (!create_pgdir(tblspc_dir, runtime_options.force)) + { PQclear(res); PQfinish(conn); exit(ERR_BAD_CONFIG); @@ -870,7 +771,7 @@ do_standby_clone(void) sqlquery_snprintf(sqlquery, "SELECT name, setting " " FROM pg_settings " - " WHERE name IN ('data_directory', 'config_file', 'hba_file', 'ident_file')"); + " WHERE name IN ('data_directory', 'config_file', 'hba_file', 'ident_file', 'stats_temp_directory')"); log_debug(_("standby clone: %s\n"), sqlquery); res = PQexec(conn, sqlquery); if (PQresultStatus(res) != PGRES_TUPLES_OK) @@ -890,6 +791,8 @@ do_standby_clone(void) strncpy(master_hba_file, PQgetvalue(res, i, 1), MAXFILENAME); else if (strcmp(PQgetvalue(res, i, 0), "ident_file") == 0) strncpy(master_ident_file, PQgetvalue(res, i, 1), MAXFILENAME); + else if (strcmp(PQgetvalue(res, i, 0), "stats_temp_directory") == 0) + strncpy(master_stats_temp_directory, PQgetvalue(res, i, 1), MAXFILENAME); else log_warning(_("unknown parameter: %s\n"), PQgetvalue(res, i, 0)); } @@ -897,6 +800,39 @@ do_standby_clone(void) log_info(_("Succesfully connected to primary. Current installation size is %s\n"), get_cluster_size(conn)); + /* + * XXX master_xlog_directory should be discovered from master configuration + * but it is not possible via SQL. We need to use a command via ssh + */ + maxlen_snprintf(master_xlog_directory, "%s/pg_xlog", master_data_directory); + if (test_mode) + { + strncpy(local_data_directory, runtime_options.dest_dir, MAXFILENAME); + strncpy(local_config_file, runtime_options.dest_dir, MAXFILENAME); + strncpy(local_hba_file, runtime_options.dest_dir, MAXFILENAME); + strncpy(local_ident_file, runtime_options.dest_dir, MAXFILENAME); + maxlen_snprintf(local_stats_temp_directory, "%s/pg_stat_tmp", runtime_options.dest_dir); + maxlen_snprintf(local_xlog_directory, "%s/pg_xlog", runtime_options.dest_dir); + } + else + { + strncpy(local_data_directory, master_data_directory, MAXFILENAME); + strncpy(local_config_file, master_config_file, MAXFILENAME); + strncpy(local_hba_file, master_hba_file, MAXFILENAME); + strncpy(local_ident_file, master_ident_file, MAXFILENAME); + strncpy(local_stats_temp_directory, master_stats_temp_directory, MAXFILENAME); + strncpy(local_xlog_directory, master_xlog_directory, MAXFILENAME); + } + + r = test_ssh_connection(runtime_options.host, runtime_options.remote_user); + if (r != 0) + { + log_err(_("%s: Aborting, remote host %s is not reachable.\n"), progname, runtime_options.host); + goto stop_backup; + } + + log_notice(_("Starting backup...\n")); + /* * inform the master we will start a backup and get the first XLog filename * so we can say to the user we need those files @@ -926,6 +862,14 @@ do_standby_clone(void) PQclear(res); + /* Check the directory could be used as a PGDATA dir */ + if (!create_pgdir(local_data_directory, runtime_options.force)) + { + log_err(_("%s: couldn't use directory %s ...\nUse --force option to force\n"), + progname, local_data_directory); + goto stop_backup; + } + /* * 1) first move global/pg_control * @@ -941,19 +885,20 @@ do_standby_clone(void) */ /* need to create the global sub directory */ - maxlen_snprintf(master_control_file, "%s/global/pg_control", - master_data_directory); - maxlen_snprintf(local_control_file, "%s/global", runtime_options.dest_dir); + maxlen_snprintf(master_control_file, "%s/global/pg_control", master_data_directory); + maxlen_snprintf(local_control_file, "%s/global", local_data_directory); + log_info(_("standby clone: master control file '%s'\n"), master_control_file); if (!create_directory(local_control_file)) { log_err(_("%s: couldn't create directory %s ...\n"), - progname, runtime_options.dest_dir); + progname, local_control_file); goto stop_backup; } log_info(_("standby clone: master control file '%s'\n"), master_control_file); r = copy_remote_files(runtime_options.host, runtime_options.remote_user, - master_control_file, local_control_file, false); + master_control_file, local_control_file, + false); if (r != 0) { log_warning(_("standby clone: failed copying master control file '%s'\n"), master_control_file); @@ -962,7 +907,8 @@ do_standby_clone(void) log_info(_("standby clone: master data directory '%s'\n"), master_data_directory); r = copy_remote_files(runtime_options.host, runtime_options.remote_user, - master_data_directory, runtime_options.dest_dir, true); + master_data_directory, local_data_directory, + true); if (r != 0) { log_warning(_("standby clone: failed copying master data directory '%s'\n"), master_data_directory); @@ -973,6 +919,8 @@ do_standby_clone(void) * Copy tablespace locations, i'm doing this separately because i couldn't * find and appropiate rsync option but besides we could someday make all * these rsync happen concurrently + * XXX We may not do that if we are in test_mode but it does not hurt too much + * (except if a tablespace is created during the test) */ sqlquery_snprintf(sqlquery, "SELECT spclocation " @@ -1002,7 +950,7 @@ do_standby_clone(void) log_info(_("standby clone: master config file '%s'\n"), master_config_file); r = copy_remote_files(runtime_options.host, runtime_options.remote_user, - master_config_file, runtime_options.dest_dir, + master_config_file, local_config_file, false); if (r != 0) { @@ -1012,7 +960,7 @@ do_standby_clone(void) log_info(_("standby clone: master hba file '%s'\n"), master_hba_file); r = copy_remote_files(runtime_options.host, runtime_options.remote_user, - master_hba_file, runtime_options.dest_dir, + master_hba_file, local_hba_file, false); if (r != 0) { @@ -1020,15 +968,19 @@ do_standby_clone(void) goto stop_backup; } - log_info("standby clone: master ident file '%s'\n", master_ident_file); + log_info(_("standby clone: master ident file '%s'\n"), master_ident_file); r = copy_remote_files(runtime_options.host, runtime_options.remote_user, - master_ident_file, runtime_options.dest_dir, false); + master_ident_file, local_ident_file, + false); if (r != 0) { log_warning(_("standby clone: failed copying master ident file '%s'\n"), master_ident_file); goto stop_backup; } + /* we success so far, flag that to allow a better HINT */ + flag_success = true; + stop_backup: /* @@ -1071,28 +1023,44 @@ stop_backup: /* If the rsync failed then exit */ if (r != 0) + { + log_err(_("Couldn't rsync the master...\nYou have to cleanup the destination directory (%s) manually!\n"), + local_data_directory); exit(ERR_BAD_RSYNC); + } /* - * We need to create the pg_xlog sub directory too, I'm reusing a variable - * here. + * We need to create the pg_xlog sub directory too. */ - maxlen_snprintf(local_control_file, "%s/pg_xlog", runtime_options.dest_dir); - if (!create_directory(local_control_file)) + if (!create_directory(local_xlog_directory)) { log_err(_("%s: couldn't create directory %s, you will need to do it manually...\n"), - progname, runtime_options.dest_dir); + progname, local_xlog_directory); r = ERR_NEEDS_XLOG; /* continue, but eventually exit returning error */ } /* Finally, write the recovery.conf file */ - create_recovery_file(runtime_options.dest_dir, NULL); + create_recovery_file(local_data_directory, NULL); /* * We don't start the service yet because we still may want to * move the directory */ log_notice(_("%s standby clone complete\n"), progname); + + /* HINT message : what to do next ? */ + if (flag_success) + { + log_notice("HINT: You can now start your postgresql server\n"); + if (test_mode) + { + log_notice(_("for example : pg_ctl -D %s start\n"), local_data_directory); + } + else + { + log_notice("for example : /etc/init.d/postgresql start\n"); + } + } exit(r); } @@ -1137,7 +1105,7 @@ do_standby_promote(void) } /* we also need to check if there isn't any master already */ - old_master_conn = getMasterConnection(conn, options.node, options.cluster_name, + old_master_conn = getMasterConnection(conn, repmgr_schema, options.node, options.cluster_name, &old_master_id, NULL); if (old_master_conn != NULL) { @@ -1241,7 +1209,7 @@ do_standby_follow(void) /* we also need to check if there is any master in the cluster */ log_info(_("%s connecting to master database\n"), progname); - master_conn = getMasterConnection(conn, options.node, + master_conn = getMasterConnection(conn, repmgr_schema, options.node, options.cluster_name, &master_id,(char *) &master_conninfo); if (master_conn == NULL) { @@ -1325,13 +1293,239 @@ do_standby_follow(void) } -void usage(void) +static void +do_witness_create(void) +{ + PGconn *masterconn; + PGconn *witnessconn; + PGresult *res; + char sqlquery[QUERY_STR_LEN]; + + char script[MAXLEN]; + char buf[MAXLEN]; + FILE *pg_conf = NULL; + + int r = 0; + int i; + + char master_version[MAXVERSIONSTR]; + + char createcommand[MAXLEN]; + char master_hba_file[MAXLEN]; + + /* Check this directory could be used as a PGDATA dir */ + if (!create_pgdir(runtime_options.dest_dir, runtime_options.force)) + { + return; + } + + /* Connection parameters for master only */ + keywords[0] = "host"; + values[0] = runtime_options.host; + keywords[1] = "port"; + values[1] = runtime_options.masterport; + + /* We need to connect to check configuration and copy it */ + masterconn = PQconnectdbParams(keywords, values, true); + if (!masterconn) + { + log_err(_("%s: could not connect to master\n"), progname); + return; + } + + /* primary should be v9 or better */ + pg_version(masterconn, master_version); + if (strcmp(master_version, "") == 0) + { + PQfinish(masterconn); + log_err(_("%s needs master to be PostgreSQL 9.0 or better\n"), progname); + return; + } + + /* Check we are connecting to a primary node */ + if (is_standby(masterconn)) + { + PQfinish(masterconn); + log_err(_("The command should not run on a standby node\n")); + return; + } + + log_info(_("Succesfully connected to primary.\n")); + + r = test_ssh_connection(runtime_options.host, runtime_options.remote_user); + if (r != 0) + { + log_err(_("%s: Aborting, remote host %s is not reachable.\n"), progname, runtime_options.host); + return; + } + + /* + * To create a witness server we need to: + * 1) initialize the cluster + * 2) register the witness in repl_nodes + * 3) copy configuration from master + */ + + /* Create the cluster for witness */ + /* We assume the pg_ctl script is in the PATH */ + sprintf(script, "pg_ctl -D %s init", runtime_options.dest_dir); + log_info("Initialize cluster for witness: %s.\n", script); + + r = system(script); + if (r != 0) + { + log_err("Can't iniatialize cluster for witness server\n"); + return; + } + + /* + * default port for the witness is 5499, + * but user can provide a different one + */ + snprintf(buf, sizeof(buf), "%s/postgresql.conf", runtime_options.dest_dir); + pg_conf = fopen(buf, "a"); + if (pg_conf == NULL) + { + log_err(_("\n%s: could not open \"%s\" for adding extra config: %s\n"), progname, buf, strerror(errno)); + exit(ERR_BAD_CONFIG); + } + + snprintf(buf, sizeof(buf), "\n#Configuration added by %s\n", progname); + fputs(buf, pg_conf); + + if (!runtime_options.localport[0]) + strncpy(runtime_options.localport, "5499", MAXLEN); + snprintf(buf, sizeof(buf), "port = %s\n", runtime_options.localport); + fputs(buf, pg_conf); + + snprintf(buf, sizeof(buf), "shared_preload_libraries = 'repmgr_funcs'\n") ; + fputs(buf, pg_conf); + + snprintf(buf, sizeof(buf), "listen_addresses = '*'\n") ; + fputs(buf, pg_conf); + + fclose(pg_conf); + + /* Get the pg_hba.conf full path */ + sprintf(sqlquery, "SELECT name, setting " + " FROM pg_settings " + " WHERE name IN ('hba_file')"); + log_debug(_("witness create: %s"), sqlquery); + res = PQexec(masterconn, sqlquery); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + log_err(_("Can't get info about pg_hba.conf: %s\n"), PQerrorMessage(masterconn)); + PQclear(res); + PQfinish(masterconn); + return; + } + for (i = 0; i < PQntuples(res); i++) + { + if (strcmp(PQgetvalue(res, i, 0), "hba_file") == 0) + strcpy(master_hba_file, PQgetvalue(res, i, 1)); + else + log_err(_("uknown parameter: %s"), PQgetvalue(res, i, 0)); + } + PQclear(res); + + r = copy_remote_files(runtime_options.host, runtime_options.remote_user, + master_hba_file, runtime_options.dest_dir, + false); + if (r != 0) + { + log_err(_("Can't rsync the pg_hba.conf file from master\n")); + return; + } + + /* start new instance */ + sprintf(script, "pg_ctl -D %s start", runtime_options.dest_dir); + log_info(_("Start cluster for witness: %s"), script); + r = system(script); + if (r != 0) + { + log_err(_("Can't start cluster for witness server\n")); + return; + } + + /* register ourselves in the master */ + sqlquery_snprintf(sqlquery, "INSERT INTO %s.repl_nodes(id, cluster, conninfo, priority, witness) " + "VALUES (%d, '%s', '%s', %d, true)", + repmgr_schema, options.node, options.cluster_name, options.conninfo); + + log_debug(_("witness create: %s"), sqlquery); + if (!PQexec(masterconn, sqlquery)) + { + log_err(_("Cannot insert node details, %s\n"), PQerrorMessage(masterconn)); + PQfinish(masterconn); + return; + } + + /* Let the server start */ + sleep(2); + + /* + create the local user and local db if it is not the default one + values[2] is the username we use to connect to master, + values[3] is the dbname we use to connect to master, + we suppose it is the same in the repmgr.conf (obviously it is preferable) + FIXME this is fragile and its a temporary solution + */ + if (getenv("USER")) + { + if (!(strcmp(getenv("USER"), values[2]) == 0)) + { + sprintf(createcommand, "createuser -p %s -s %s", runtime_options.localport, values[2]); + log_info("creating user for witness: %s", createcommand); + r = system(createcommand); + if (r != 0) + { + log_err("Can't create local user\n"); + return; + } + sprintf(createcommand, "createdb -p %s -O %s %s", runtime_options.localport, values[2], values[3]); + log_info("creating database for witness: %s", createcommand); + r = system(createcommand); + if (r != 0) + { + log_err("Can't create local db\n"); + return; + } + } + } + /* establish a connection to the witness, and create the schema */ + witnessconn = establishDBConnection(options.conninfo, true); + + log_info(_("Starting copy of configuration from master")); + + if (!create_schema(witnessconn)) + { + PQfinish(masterconn); + PQfinish(witnessconn); + return; + } + + /* copy configuration from master, only repl_nodes is needed */ + if (!copy_configuration(masterconn, witnessconn)) + { + PQfinish(masterconn); + PQfinish(witnessconn); + return; + } + PQfinish(masterconn); + PQfinish(witnessconn); + + log_notice(_("Configuration has been succesfully copied to the witness\n")); +} + +static void +usage(void) { log_err(_("\n\n%s: Replicator manager \n"), progname); log_err(_("Try \"%s --help\" for more information.\n"), progname); } -void help(const char *progname) +static void +help(const char *progname) { printf(_("\n%s: Replicator manager \n"), progname); printf(_("Usage:\n")); @@ -1349,6 +1543,7 @@ void help(const char *progname) printf(_(" -U, --username=USERNAME database user name to connect as\n")); printf(_("\nConfiguration options:\n")); printf(_(" -D, --data-dir=DIR local directory where the files will be copied to\n")); + printf(_(" -l, --local-port=PORT standby or witness server local port\n")); printf(_(" -f, --config_file=PATH path to the configuration file\n")); printf(_(" -R, --remote-user=USERNAME database server username for rsync\n")); printf(_(" -w, --wal-keep-segments=VALUE minimum value for the GUC wal_keep_segments (default: 5000)\n")); @@ -1500,7 +1695,7 @@ copy_remote_files(char *host, char *remote_user, char *remote_path, } else { - maxlen_snprintf(script, "rsync %s %s:%s %s/.", + maxlen_snprintf(script, "rsync %s %s:%s %s", rsync_flags, host_string, remote_path, local_path); } @@ -1658,7 +1853,150 @@ check_parameters_for_action(const int action) } need_a_node = false; break; + case WITNESS_CREATE: + /* allow all parameters to be supplied */ + break; } return ok; } + + + +static bool +create_schema(PGconn *conn) +{ + char sqlquery[QUERY_STR_LEN]; + + sqlquery_snprintf(sqlquery, "CREATE SCHEMA %s", repmgr_schema); + log_debug(_("master register: %s\n"), sqlquery); + if (!PQexec(conn, sqlquery)) + { + log_err(_("Cannot create the schema %s: %s\n"), + repmgr_schema, PQerrorMessage(conn)); + PQfinish(conn); + exit(ERR_BAD_CONFIG); + } + + /* ... the tables */ + sqlquery_snprintf(sqlquery, "CREATE TABLE %s.repl_nodes ( " + " id integer primary key, " + " cluster text not null, " + " conninfo text not null, " + " priority integer not null, " + " witness boolean not null default false)", repmgr_schema); + log_debug(_("master register: %s\n"), sqlquery); + if (!PQexec(conn, sqlquery)) + { + log_err(_("Cannot create the table %s.repl_nodes: %s\n"), + repmgr_schema, PQerrorMessage(conn)); + PQfinish(conn); + exit(ERR_BAD_CONFIG); + } + + sqlquery_snprintf(sqlquery, "CREATE TABLE %s.repl_monitor ( " + " primary_node INTEGER NOT NULL, " + " standby_node INTEGER NOT NULL, " + " last_monitor_time TIMESTAMP WITH TIME ZONE NOT NULL, " + " last_wal_primary_location TEXT NOT NULL, " + " last_wal_standby_location TEXT NOT NULL, " + " replication_lag BIGINT NOT NULL, " + " apply_lag BIGINT NOT NULL) ", repmgr_schema); + log_debug(_("master register: %s\n"), sqlquery); + if (!PQexec(conn, sqlquery)) + { + log_err(_("Cannot create the table %s.repl_monitor: %s\n"), + repmgr_schema, PQerrorMessage(conn)); + PQfinish(conn); + exit(ERR_BAD_CONFIG); + } + + /* and the view */ + sqlquery_snprintf(sqlquery, "CREATE VIEW %s.repl_status AS " + " WITH monitor_info AS (SELECT *, ROW_NUMBER() OVER (PARTITION BY primary_node, standby_node " + " ORDER BY last_monitor_time desc) " + " FROM %s.repl_monitor) " + " SELECT primary_node, standby_node, last_monitor_time, last_wal_primary_location, " + " last_wal_standby_location, pg_size_pretty(replication_lag) replication_lag, " + " pg_size_pretty(apply_lag) apply_lag, age(now(), last_monitor_time) AS time_lag " + " FROM monitor_info a " + " WHERE row_number = 1", repmgr_schema, repmgr_schema); + log_debug(_("master register: %s\n"), sqlquery); + if (!PQexec(conn, sqlquery)) + { + log_err(_("Cannot create the view %s.repl_status: %s\n"), + repmgr_schema, PQerrorMessage(conn)); + PQfinish(conn); + exit(ERR_BAD_CONFIG); + } + + /* XXX Here we MUST try to load the repmgr_function.sql not hardcode it here */ + sprintf(sqlquery, + "CREATE OR REPLACE FUNCTION public.repmgr_update_standby_location(text) RETURNS boolean " + "AS '$libdir/repmgr_funcs', 'repmgr_update_standby_location' " + "LANGUAGE C STRICT "); + if (!PQexec(conn, sqlquery)) + { + fprintf(stderr, "Cannot create the function repmgr_update_standby_location: %s\n", + PQerrorMessage(conn)); + return false; + } + + sprintf(sqlquery, + "CREATE OR REPLACE FUNCTION public.repmgr_get_last_standby_location() RETURNS text " + "AS '$libdir/repmgr_funcs', 'repmgr_get_last_standby_location' " + "LANGUAGE C STRICT "); + if (!PQexec(conn, sqlquery)) + { + fprintf(stderr, "Cannot create the function repmgr_get_last_standby_location: %s\n", + PQerrorMessage(conn)); + return false; + } + + return true; +} + + +static bool +copy_configuration(PGconn *masterconn, PGconn *witnessconn) +{ + char sqlquery[MAXLEN]; + PGresult *res; + int i; + + sprintf(sqlquery, "TRUNCATE TABLE repmgr_%s.repl_nodes", options.cluster_name); + if (!PQexec(witnessconn, sqlquery)) + { + fprintf(stderr, "Cannot clean node details in the witness, %s\n", + PQerrorMessage(witnessconn)); + return false; + } + + sprintf(sqlquery, "SELECT * FROM repmgr_%s.repl_nodes", options.cluster_name); + res = PQexec(masterconn, sqlquery); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + fprintf(stderr, "Can't get configuration from master: %s\n", + PQerrorMessage(masterconn)); + PQclear(res); + return false; + } + for (i = 0; i < PQntuples(res); i++) + { + sqlquery_snprintf(sqlquery, "INSERT INTO %s.repl_nodes(id, cluster, conninfo, priority, witness) " + "VALUES (%d, '%s', '%s', %d, '%s')", + repmgr_schema, atoi(PQgetvalue(res, i, 0)), + options.cluster_name, PQgetvalue(res, i, 2), + atoi(PQgetvalue(res, i, 3)), + PQgetvalue(res, i, 4)); + + if (!PQexec(witnessconn, sqlquery)) + { + fprintf(stderr, "Cannot copy configuration to witness, %s\n", + PQerrorMessage(witnessconn)); + return false; + } + } + + return true; +} diff --git a/repmgr.conf b/repmgr.conf.sample similarity index 83% rename from repmgr.conf rename to repmgr.conf.sample index cf9402f4..e115d837 100644 --- a/repmgr.conf +++ b/repmgr.conf.sample @@ -12,6 +12,12 @@ node=2 conninfo='host=192.168.204.104' rsync_options=--archive --checksum --compress --progress --rsh=ssh +# Autofailover options +failover=automatic +priority=-1 +promote_command='repmgr promote' +follow_command='repmgr follow' + # Log level: possible values are DEBUG, INFO, NOTICE, WARNING, ERR, ALERT, CRIT or EMERG # Default: NOTICE loglevel=NOTICE diff --git a/repmgr.h b/repmgr.h index bd219692..4718af94 100644 --- a/repmgr.h +++ b/repmgr.h @@ -30,6 +30,7 @@ #define PRIMARY_MODE 0 #define STANDBY_MODE 1 +#define WITNESS_MODE 2 #include "config.h" #define MAXFILENAME 1024 @@ -42,6 +43,9 @@ #define DEFAULT_DBNAME "postgres" #define DEFAULT_REPMGR_SCHEMA_PREFIX "repmgr_" +#define MANUAL_FAILOVER 0 +#define AUTOMATIC_FAILOVER 1 + /* Run time options type */ typedef struct { @@ -58,7 +62,14 @@ typedef struct bool ignore_rsync_warn; char masterport[MAXLEN]; + char localport[MAXLEN]; } t_runtime_options; +#define SLEEP_MONITOR 2 +#define SLEEP_RETRY 3 +#define NUM_RETRY 40 + + + #endif diff --git a/repmgrd.c b/repmgrd.c index 99390526..96c05de9 100644 --- a/repmgrd.c +++ b/repmgrd.c @@ -31,8 +31,22 @@ #include "log.h" #include "strutil.h" +#include "access/xlogdefs.h" #include "libpq/pqsignal.h" +/* + * Struct to keep info about the nodes, used in the voting process in + * do_failover() + */ +typedef struct nodeInfo +{ + int nodeId; + XLogRecPtr xlog_location; + bool is_ready; +} nodeInfo; + + +char myClusterName[MAXLEN]; /* Local info */ t_configuration_options local_options; @@ -60,16 +74,28 @@ t_configuration_options config = {}; static void help(const char* progname); static void usage(void); -static void checkClusterConfiguration(PGconn *conn,PGconn *primary); +static void checkClusterConfiguration(PGconn *conn, PGconn *primary); static void checkNodeConfiguration(char *conninfo); static void CancelQuery(void); -static void MonitorExecute(void); +static void StandbyMonitor(void); +static void WitnessMonitor(void); +static bool CheckPrimaryConnection(void); +static void update_shared_memory(char *last_wal_standby_applied); +static void update_registration(void); +static void do_failover(void); static unsigned long long int walLocationToBytes(char *wal_location); +/* + * Flag to mark SIGHUP. Whenever the main loop comes around it + * will reread the configuration file. + */ +static volatile sig_atomic_t got_SIGHUP = false; + +static void handle_sighup(SIGNAL_ARGS); static void handle_sigint(SIGNAL_ARGS); -static void setup_cancel_handler(void); +static void setup_event_handlers(void); #define CloseConnections() \ if (PQisBusy(primaryConn) == 1) \ @@ -79,16 +105,6 @@ static void setup_cancel_handler(void); if (primaryConn != NULL && primaryConn != myLocalConn) \ PQfinish(primaryConn); -/* - * Every 3 seconds, insert monitor info - */ -#define MonitorCheck() \ - for (;;) \ - { \ - MonitorExecute(); \ - sleep(3); \ - } - int main(int argc, char **argv) @@ -137,7 +153,7 @@ main(int argc, char **argv) } } - setup_cancel_handler(); + setup_event_handlers(); /* * Read the configuration file: repmgr.conf @@ -173,38 +189,127 @@ main(int argc, char **argv) * Set my server mode, establish a connection to primary * and start monitor */ - myLocalMode = is_standby(myLocalConn) ? STANDBY_MODE : PRIMARY_MODE; - if (myLocalMode == PRIMARY_MODE) + if (is_witness(myLocalConn, repmgr_schema, local_options.cluster_name, local_options.node)) + myLocalMode = WITNESS_MODE; + else if (is_standby(myLocalConn)) + myLocalMode = STANDBY_MODE; + else /* is the master */ + myLocalMode = PRIMARY_MODE; + + switch (myLocalMode) { + case PRIMARY_MODE: primary_options.node = local_options.node; strncpy(primary_options.conninfo, local_options.conninfo, MAXLEN); primaryConn = myLocalConn; - } - else - { + + checkClusterConfiguration(myLocalConn, primaryConn); + checkNodeConfiguration(local_options.conninfo); + + if (reload_configuration(config_file, &local_options)) + { + PQfinish(myLocalConn); + myLocalConn = establishDBConnection(local_options.conninfo, true); + update_registration(); + } + + log_info(_("%s Starting continuous primary connection check\n"), progname); + /* Check that primary is still alive, and standbies are sending info */ + /* + * Every SLEEP_MONITOR seconds, do master checks + * XXX + * Check that standbies are sending info + */ + for (;;) + { + if (CheckPrimaryConnection()) + { + /* + CheckActiveStandbiesConnections(); + CheckInactiveStandbies(); + */ + sleep(SLEEP_MONITOR); + } + else + { + /* XXX + * May we do something more verbose ? + */ + exit (1); + } + + if (got_SIGHUP) + { + /* if we can reload, then could need to change myLocalConn */ + if (reload_configuration(config_file, &local_options)) + { + PQfinish(myLocalConn); + myLocalConn = establishDBConnection(local_options.conninfo, true); + update_registration(); + } + got_SIGHUP = false; + } + } + break; + case WITNESS_MODE: + case STANDBY_MODE: /* I need the id of the primary as well as a connection to it */ log_info(_("%s Connecting to primary for cluster '%s'\n"), progname, local_options.cluster_name); - primaryConn = getMasterConnection(myLocalConn, local_options.node, + primaryConn = getMasterConnection(myLocalConn, repmgr_schema, local_options.node, local_options.cluster_name, - &primary_options.node,NULL); + &primary_options.node, NULL); if (primaryConn == NULL) { CloseConnections(); exit(ERR_BAD_CONFIG); } - } - checkClusterConfiguration(myLocalConn,primaryConn); - checkNodeConfiguration(local_options.conninfo); - if (myLocalMode == STANDBY_MODE) - { - log_info(_("%s Starting continuous standby node monitoring\n"), progname); - MonitorCheck(); - } - else - { - log_info(_("%s This is a primary node, program not needed here; exiting'\n"), progname); + checkClusterConfiguration(myLocalConn, primaryConn); + checkNodeConfiguration(local_options.conninfo); + + if (reload_configuration(config_file, &local_options)) + { + PQfinish(myLocalConn); + myLocalConn = establishDBConnection(local_options.conninfo, true); + update_registration(); + } + + /* + * Every SLEEP_MONITOR seconds, do checks + */ + if (myLocalMode == WITNESS_MODE) + { + log_info(_("%s Starting continuous witness node monitoring\n"), progname); + } + else if (myLocalMode == STANDBY_MODE) + { + log_info(_("%s Starting continuous standby node monitoring\n"), progname); + } + + for (;;) + { + if (myLocalMode == WITNESS_MODE) + WitnessMonitor(); + else if (myLocalMode == STANDBY_MODE) + StandbyMonitor(); + sleep(SLEEP_MONITOR); + + if (got_SIGHUP) + { + /* if we can reload, then could need to change myLocalConn */ + if (reload_configuration(config_file, &local_options)) + { + PQfinish(myLocalConn); + myLocalConn = establishDBConnection(local_options.conninfo, true); + update_registration(); + } + got_SIGHUP = false; + } + } + break; + default: + log_err(_("%s: Unrecognized mode for node %d\n"), progname, local_options.node); } /* Prevent a double-free */ @@ -220,6 +325,73 @@ main(int argc, char **argv) return 0; } +/* + * + */ +static void +WitnessMonitor(void) +{ + char monitor_witness_timestamp[MAXLEN]; + PGresult *res; + + /* + * Check if the master is still available, if after 5 minutes of retries + * we cannot reconnect, return false. + */ + CheckPrimaryConnection(); // this take up to NUM_RETRY * SLEEP_RETRY seconds + + if (PQstatus(primaryConn) != CONNECTION_OK) + { + /* + * If we can't reconnect, just exit... + * XXX we need to make witness connect to the new master + */ + PQfinish(myLocalConn); + exit(0); + } + + /* + * first check if there is a command being executed, + * and if that is the case, cancel the query so i can + * insert the current record + */ + if (PQisBusy(primaryConn) == 1) + CancelQuery(); + + /* Get local xlog info */ + sqlquery_snprintf(sqlquery, "SELECT CURRENT_TIMESTAMP "); + + res = PQexec(myLocalConn, sqlquery); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + log_err(_("PQexec failed: %s\n"), PQerrorMessage(myLocalConn)); + PQclear(res); + /* if there is any error just let it be and retry in next loop */ + return; + } + + strcpy(monitor_witness_timestamp, PQgetvalue(res, 0, 0)); + PQclear(res); + + /* + * Build the SQL to execute on primary + */ + sqlquery_snprintf(sqlquery, + "INSERT INTO %s.repl_monitor " + "VALUES(%d, %d, '%s'::timestamp with time zone, " + " pg_current_xlog_location(), null, " + " 0, 0)", + repmgr_schema, primary_options.node, local_options.node, monitor_witness_timestamp); + + /* + * Execute the query asynchronously, but don't check for a result. We + * will check the result next time we pause for a monitor step. + */ + if (PQsendQuery(primaryConn, sqlquery) == 0) + log_warning(_("Query could not be sent to primary. %s\n"), + PQerrorMessage(primaryConn)); +} + /* * Insert monitor info, this is basically the time and xlog replayed, @@ -227,7 +399,7 @@ main(int argc, char **argv) * Also do the math to see how far are we in bytes for being uptodate */ static void -MonitorExecute(void) +StandbyMonitor(void) { PGresult *res; char monitor_standby_timestamp[MAXLEN]; @@ -245,50 +417,45 @@ MonitorExecute(void) * Check if the master is still available, if after 5 minutes of retries * we cannot reconnect, try to get a new master. */ - for (connection_retries = 0; connection_retries < 15; connection_retries++) - { - if (PQstatus(primaryConn) != CONNECTION_OK) - { - log_warning(_("Connection to master has been lost, trying to recover...\n")); - /* wait 20 seconds between retries */ - sleep(20); + CheckPrimaryConnection(); // this take up to NUM_RETRY * SLEEP_RETRY seconds - PQreset(primaryConn); - } - else - { - if (connection_retries > 0) - { - log_notice(_("Connection to master has been restored, continue monitoring.\n")); - } - break; - } - } if (PQstatus(primaryConn) != CONNECTION_OK) { - log_err(_("We couldn't reconnect to master. Now checking if another node has been promoted.\n")); - for (connection_retries = 0; connection_retries < 6; connection_retries++) + if (local_options.failover == MANUAL_FAILOVER) { - primaryConn = getMasterConnection(myLocalConn, local_options.node, - local_options.cluster_name, &primary_options.node,NULL); - if (PQstatus(primaryConn) == CONNECTION_OK) + log_err(_("We couldn't reconnect to master. Now checking if another node has been promoted.\n")); + for (connection_retries = 0; connection_retries < 6; connection_retries++) { - /* Connected, we can continue the process so break the loop */ - log_err(_("Connected to node %d, continue monitoring.\n"), primary_options.node); - break; + primaryConn = getMasterConnection(myLocalConn, repmgr_schema, local_options.node, + local_options.cluster_name, &primary_options.node, NULL); + if (PQstatus(primaryConn) == CONNECTION_OK) + { + /* Connected, we can continue the process so break the loop */ + log_err(_("Connected to node %d, continue monitoring.\n"), primary_options.node); + break; + } + else + { + log_err(_("We haven't found a new master, waiting before retry...\n")); + /* wait 5 minutes before retries, after 6 failures (30 minutes) we stop trying */ + sleep(300); + } } - else + + if (PQstatus(primaryConn) != CONNECTION_OK) { - log_err(_("We haven't found a new master, waiting before retry...\n")); - /* wait 5 minutes before retries, after 6 failures (30 minutes) we stop trying */ - sleep(300); + log_err(_("We couldn't reconnect for long enough, exiting...\n")); + exit(ERR_DB_CON); } } - } - if (PQstatus(primaryConn) != CONNECTION_OK) - { - log_err(_("We couldn't reconnect for long enough, exiting...\n")); - exit(ERR_DB_CON); + else if (local_options.failover == AUTOMATIC_FAILOVER) + { + /* + * When we returns from this function we will have a new primary and + * a new primaryConn + */ + do_failover(); + } } /* Check if we still are a standby, we could have been promoted */ @@ -370,6 +537,241 @@ MonitorExecute(void) } +static void +do_failover(void) +{ + PGresult *res1; + PGresult *res2; + char sqlquery[8192]; + + int total_nodes = 0; + int visible_nodes = 0; + bool find_best = false; + + int i; + int r; + + int node; + char nodeConninfo[MAXLEN]; + + unsigned int uxlogid; + unsigned int uxrecoff; + char last_wal_standby_applied[MAXLEN]; + + PGconn *nodeConn = NULL; + + /* + * will get info about until 50 nodes, + * which seems to be large enough for most scenarios + */ + nodeInfo nodes[50]; + nodeInfo best_candidate; + + /* first we get info about this node, and update shared memory */ + sprintf(sqlquery, "SELECT pg_last_xlog_replay_location()"); + res1 = PQexec(myLocalConn, sqlquery); + if (PQresultStatus(res1) != PGRES_TUPLES_OK) + { + log_err(_("PQexec failed: %s.\nReport an invalid value to not be considered as new primary and exit.\n"), PQerrorMessage(myLocalConn)); + PQclear(res1); + sprintf(last_wal_standby_applied, "'%X/%X'", 0, 0); + update_shared_memory(last_wal_standby_applied); + exit(ERR_DB_QUERY); + } + + /* write last location in shared memory */ + update_shared_memory(PQgetvalue(res1, 0, 0)); + + /* + * we sleep the monitor time + one second + * we bet it should be enough for other repmgrd to update their own data + */ + sleep(SLEEP_MONITOR + 1); + + /* get a list of standby nodes, including myself */ + sprintf(sqlquery, "SELECT * " + " FROM %s.repl_nodes " + " WHERE id IN (SELECT standby_node FROM %s.repl_status) " + " AND cluster = '%s' " + " ORDER BY priority ", + repmgr_schema, repmgr_schema, local_options.cluster_name); + + res1 = PQexec(myLocalConn, sqlquery); + if (PQresultStatus(res1) != PGRES_TUPLES_OK) + { + log_err(_("Can't get nodes info: %s\n"), PQerrorMessage(myLocalConn)); + PQclear(res1); + PQfinish(myLocalConn); + exit(ERR_DB_QUERY); + } + + /* ask for the locations */ + for (i = 0; i < PQntuples(res1); i++) + { + node = atoi(PQgetvalue(res1, i, 0)); + /* Initialize on false so if we can't reach this node we know that later */ + nodes[i].is_ready = false; + strncpy(nodeConninfo, PQgetvalue(res1, i, 2), MAXLEN); + nodeConn = establishDBConnection(nodeConninfo, false); + /* if we can't see the node just skip it */ + if (PQstatus(nodeConn) != CONNECTION_OK) + continue; + + sqlquery_snprintf(sqlquery, "SELECT repmgr_get_last_standby_location()"); + res2 = PQexec(nodeConn, sqlquery); + if (PQresultStatus(res2) != PGRES_TUPLES_OK) + { + log_info(_("Can't get node's last standby location: %s\n"), PQerrorMessage(nodeConn)); + log_info(_("Connection details: %s\n"), nodeConninfo); + PQclear(res2); + PQfinish(nodeConn); + continue; + } + + visible_nodes++; + + if (sscanf(PQgetvalue(res2, 0, 0), "%X/%X", &uxlogid, &uxrecoff) != 2) + log_info(_("could not parse transaction log location \"%s\"\n"), PQgetvalue(res2, 0, 0)); + + nodes[i].nodeId = node; + nodes[i].xlog_location.xlogid = uxlogid; + nodes[i].xlog_location.xrecoff = uxrecoff; + nodes[i].is_ready = true; + + PQclear(res2); + PQfinish(nodeConn); + } + PQclear(res1); + /* Close the connection to this server */ + PQfinish(myLocalConn); + + /* + * total nodes that are registered, include master which is a node but was + * not counted because it's not a standby + */ + total_nodes = i + 1; + + /* + * am i on the group that should keep alive? + * if i see less than half of total_nodes then i should do nothing + */ + if (visible_nodes < (total_nodes / 2.0)) + { + log_err(_("Can't reach most of the nodes.\n" + "Let the other standby servers decide which one will be the primary.\n" + "Manual action will be needed to readd this node to the cluster.\n")); + exit(ERR_FAILOVER_FAIL); + } + + /* + * determine which one is the best candidate to promote to primary + */ + for (i = 0; i < total_nodes - 1; i++) + { + if (!nodes[i].is_ready) + continue; + else if (!find_best) + { + /* start with the first ready node, and then move on to the next one */ + best_candidate.nodeId = nodes[i].nodeId; + best_candidate.xlog_location.xlogid = nodes[i].xlog_location.xlogid; + best_candidate.xlog_location.xrecoff = nodes[i].xlog_location.xrecoff; + best_candidate.is_ready = nodes[i].is_ready; + find_best = true; + } + + /* we use the macros provided by xlogdefs.h to compare XLogPtr */ + /* + * Nodes are retrieved ordered by priority, so if the current + * best candidate is lower or equal to the next node's wal location + * then assign next node as the new best candidate. + */ + if (XLByteLE(best_candidate.xlog_location, nodes[i].xlog_location)) + { + best_candidate.nodeId = nodes[i].nodeId; + best_candidate.xlog_location.xlogid = nodes[i].xlog_location.xlogid; + best_candidate.xlog_location.xrecoff = nodes[i].xlog_location.xrecoff; + best_candidate.is_ready = nodes[i].is_ready; + } + } + + /* once we know who is the best candidate, promote it */ + if (find_best && (best_candidate.nodeId == local_options.node)) + { + if (verbose) + log_info(_("%s: This node is the best candidate to be the new primary, promoting...\n"), + progname); + log_debug(_("promote command is: \"%s\"\n"), local_options.promote_command); + r = system(local_options.promote_command); + if (r != 0) + { + log_err(_("%s: promote command failed. You could check and try it manually.\n"), progname); + exit(ERR_BAD_CONFIG); + } + } + else if (find_best) + { + if (verbose) + log_info(_("%s: Node %d is the best candidate to be the new primary, we should follow it...\n"), + progname, best_candidate.nodeId); + log_debug(_("follow command is: \"%s\"\n"), local_options.follow_command); + /* + * New Primary need some time to be promoted. + * The follow command should take care of that. + */ + r = system(local_options.follow_command); + if (r != 0) + { + log_err(_("%s: follow command failed. You could check and try it manually.\n"), progname); + exit(ERR_BAD_CONFIG); + } + } + else + { + log_err(_("%s: Did not find candidates. You should check and try manually.\n"), progname); + exit(ERR_FAILOVER_FAIL); + } + + /* and reconnect to the local database */ + myLocalConn = establishDBConnection(local_options.conninfo, true); +} + + +static bool +CheckPrimaryConnection(void) +{ + int connection_retries; + + /* + * Check if the master is still available + * if after NUM_RETRY * SLEEP_RETRY seconds of retries + * we cannot reconnect + * return false + */ + for (connection_retries = 0; connection_retries < NUM_RETRY; connection_retries++) + { + if (!is_pgup(primaryConn)) + { + log_warning(_("%s: Connection to master has been lost, trying to recover... %i seconds before failover decision\n"), progname, (SLEEP_RETRY*(NUM_RETRY-connection_retries))); + /* wait SLEEP_RETRY seconds between retries */ + sleep(SLEEP_RETRY); + } + else + { + log_info(_("%s: Connection to master has been restored.\n"), progname); + break; + } + } + if (!is_pgup(primaryConn)) + { + log_err(_("%s: We couldn't reconnect for long enough, exiting...\n"), progname); + /* XXX Anything else to do here? */ + return false; + } + return true; +} + + static void checkClusterConfiguration(PGconn *conn, PGconn *primary) { @@ -433,17 +835,25 @@ checkNodeConfiguration(char *conninfo) /* * If there isn't any results then we have not configured this node yet - * in repmgr, if that is the case we will insert the node to the cluster + * in repmgr, if that is the case we will insert the node to the cluster, + * except if it is a witness */ if (PQntuples(res) == 0) { PQclear(res); + if (myLocalMode == WITNESS_MODE) + { + log_err(_("The witness is not configured\n")); + CloseConnections(); + exit(ERR_BAD_CONFIG); + } + /* Adding the node */ log_info(_("%s Adding node %d to cluster '%s'\n"), progname, local_options.node, local_options.cluster_name); sqlquery_snprintf(sqlquery, "INSERT INTO %s.repl_nodes " - "VALUES (%d, '%s', '%s')", + "VALUES (%d, '%s', '%s', 'f')", repmgr_schema, local_options.node, local_options.cluster_name, local_options.conninfo); @@ -500,12 +910,20 @@ static void handle_sigint(SIGNAL_ARGS) { CloseConnections(); + exit(1); } +/* SIGHUP: set flag to re-read config file at next convenient time */ +static void +handle_sighup(SIGNAL_ARGS) +{ + got_SIGHUP = true; +} static void -setup_cancel_handler(void) +setup_event_handlers(void) { + pqsignal(SIGHUP, handle_sighup); pqsignal(SIGINT, handle_sigint); } #endif @@ -524,3 +942,43 @@ CancelQuery(void) PQfreeCancel(pgcancel); } + + +static void +update_shared_memory(char *last_wal_standby_applied) +{ + PGresult *res; + + sprintf(sqlquery, "SELECT repmgr_update_standby_location('%s')", + last_wal_standby_applied); + + /* If an error happens, just inform about that and continue */ + res = PQexec(myLocalConn, sqlquery); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + log_warning(_("Cannot update this standby's shared memory: %s\n"), PQerrorMessage(myLocalConn)); + /* XXX is this enough reason to terminate this repmgrd? */ + } + PQclear(res); +} + +static void +update_registration(void) +{ + PGresult *res; + + sqlquery_snprintf(sqlquery, "UPDATE %s.repl_nodes " + " SET conninfo = '%s', " + " priority = %d " + " WHERE id = %d", + repmgr_schema, local_options.conninfo, local_options.priority, local_options.node); + + res = PQexec(primaryConn, sqlquery); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + log_err(_("Cannot update registration: %s\n"), PQerrorMessage(primaryConn)); + CloseConnections(); + exit(ERR_DB_CON); + } + PQclear(res); +} diff --git a/sql/Makefile b/sql/Makefile new file mode 100644 index 00000000..f8342c3d --- /dev/null +++ b/sql/Makefile @@ -0,0 +1,19 @@ +# +# Makefile +# Copyright (c) 2ndQuadrant, 2010 +# + +MODULE_big = repmgr_funcs +DATA_built=repmgr_funcs.sql +DATA=uninstall_repmgr_funcs.sql +OBJS=repmgr_funcs.o + +ifdef USE_PGXS +PGXS := $(shell pg_config --pgxs) +include $(PGXS) +else +subdir = contrib/repmgr/sql +top_builddir = ../../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/sql/repmgr_funcs.c b/sql/repmgr_funcs.c new file mode 100644 index 00000000..5adfb8b3 --- /dev/null +++ b/sql/repmgr_funcs.c @@ -0,0 +1,189 @@ +/* + * repmgr_funcs.c + * Copyright (c) 2ndQuadrant, 2010 + * + * Shared memory state management and some backend functions in SQL + */ + +#include "postgres.h" +#include "fmgr.h" +#include "access/xlog.h" +#include "miscadmin.h" +#include "storage/ipc.h" +#include "storage/lwlock.h" +#include "storage/procarray.h" +#include "storage/shmem.h" +#include "storage/spin.h" +#include "utils/builtins.h" + +/* same definition as the one in xlog_internal.h */ +#define MAXFNAMELEN 64 + +PG_MODULE_MAGIC; + +/* + * Global shared state + */ +typedef struct repmgrSharedState +{ + LWLockId lock; /* protects search/modification */ + char location[MAXFNAMELEN]; /* last known xlog location */ +} repmgrSharedState; + +/* Links to shared memory state */ +static repmgrSharedState *shared_state = NULL; + +static shmem_startup_hook_type prev_shmem_startup_hook = NULL; + +void _PG_init(void); +void _PG_fini(void); + +static void repmgr_shmem_startup(void); +static Size repmgr_memsize(void); + +static bool repmgr_set_standby_location(char *locationstr); + +Datum repmgr_update_standby_location(PG_FUNCTION_ARGS); +Datum repmgr_get_last_standby_location(PG_FUNCTION_ARGS); + +PG_FUNCTION_INFO_V1(repmgr_update_standby_location); +PG_FUNCTION_INFO_V1(repmgr_get_last_standby_location); + + +/* + * Module load callback + */ +void +_PG_init(void) +{ + /* + * In order to create our shared memory area, we have to be loaded via + * shared_preload_libraries. If not, fall out without hooking into any of + * the main system. (We don't throw error here because it seems useful to + * allow the repmgr functions to be created even when the + * module isn't active. The functions must protect themselves against + * being called then, however.) + */ + if (!process_shared_preload_libraries_in_progress) + return; + + /* + * Request additional shared resources. (These are no-ops if we're not in + * the postmaster process.) We'll allocate or attach to the shared + * resources in repmgr_shmem_startup(). + */ + RequestAddinShmemSpace(repmgr_memsize()); + RequestAddinLWLocks(1); + + /* + * Install hooks. + */ + prev_shmem_startup_hook = shmem_startup_hook; + shmem_startup_hook = repmgr_shmem_startup; +} + +/* + * Module unload callback + */ +void +_PG_fini(void) +{ + /* Uninstall hooks. */ + shmem_startup_hook = prev_shmem_startup_hook; +} + +/* + * shmem_startup hook: allocate or attach to shared memory, + */ +static void +repmgr_shmem_startup(void) +{ + bool found; + + if (prev_shmem_startup_hook) + prev_shmem_startup_hook(); + + /* reset in case this is a restart within the postmaster */ + shared_state = NULL; + + /* + * Create or attach to the shared memory state, including hash table + */ + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); + + shared_state = ShmemInitStruct("repmgr shared state", + sizeof(repmgrSharedState), + &found); + + if (!found) + { + /* First time through ... */ + shared_state->lock = LWLockAssign(); + snprintf(shared_state->location, + sizeof(shared_state->location), "%X/%X", 0, 0); + } + + LWLockRelease(AddinShmemInitLock); +} + + +/* + * Estimate shared memory space needed. + */ +static Size +repmgr_memsize(void) +{ + return MAXALIGN(sizeof(repmgrSharedState)); +} + + +static bool +repmgr_set_standby_location(char *locationstr) +{ + /* Safety check... */ + if (!shared_state) + return false; + + LWLockAcquire(shared_state->lock, LW_EXCLUSIVE); + strncpy(shared_state->location, locationstr, MAXFNAMELEN); + LWLockRelease(shared_state->lock); + + return true; +} + + +/* SQL Functions */ + +/* Read last xlog location reported by this standby from shared memory */ +Datum +repmgr_get_last_standby_location(PG_FUNCTION_ARGS) +{ + char location[MAXFNAMELEN]; + + /* Safety check... */ + if (!shared_state) + PG_RETURN_NULL(); + + LWLockAcquire(shared_state->lock, LW_SHARED); + strncpy(location, shared_state->location, MAXFNAMELEN); + LWLockRelease(shared_state->lock); + + PG_RETURN_TEXT_P(cstring_to_text(location)); +} + + +/* Set update last xlog location reported by this standby to shared memory */ +Datum +repmgr_update_standby_location(PG_FUNCTION_ARGS) +{ + text *location = PG_GETARG_TEXT_P(0); + char *locationstr; + + /* Safety check... */ + if (!shared_state) + PG_RETURN_BOOL(false); + + locationstr = text_to_cstring(location); + + PG_RETURN_BOOL(repmgr_set_standby_location(locationstr)); +} diff --git a/sql/repmgr_funcs.sql.in b/sql/repmgr_funcs.sql.in new file mode 100644 index 00000000..b637eda2 --- /dev/null +++ b/sql/repmgr_funcs.sql.in @@ -0,0 +1,15 @@ +/* + * repmgr_function.sql + * Copyright (c) 2ndQuadrant, 2010 + * + */ + +-- SET SEARCH_PATH TO 'repmgr'; + +CREATE FUNCTION repmgr_update_standby_location(text) RETURNS boolean +AS 'MODULE_PATHNAME', 'repmgr_update_standby_location' +LANGUAGE C STRICT; + +CREATE FUNCTION repmgr_get_last_standby_location() RETURNS text +AS 'MODULE_PATHNAME', 'repmgr_get_last_standby_location' +LANGUAGE C STRICT; diff --git a/sql/uninstall_repmgr_funcs.sql b/sql/uninstall_repmgr_funcs.sql new file mode 100644 index 00000000..2215503b --- /dev/null +++ b/sql/uninstall_repmgr_funcs.sql @@ -0,0 +1,2 @@ +DROP FUNCTION repmgr_update_standby_location(text); +DROP FUNCTION repmgr_get_last_standby_location();