diff --git a/config.c b/config.c index 1b281a9b..2897992a 100644 --- a/config.c +++ b/config.c @@ -191,7 +191,9 @@ trim (char *s) ++s1; /* Copy finished string */ - strcpy (s, s1); + memmove (s, s1, s2 - s1); + s[s2 - s1 + 1] = '\0'; + return s; } diff --git a/dbutils.c b/dbutils.c index 5e5131fa..03ce950d 100644 --- a/dbutils.c +++ b/dbutils.c @@ -18,6 +18,8 @@ */ #include +#include +#include #include "repmgr.h" #include "strutil.h" @@ -426,34 +428,65 @@ getMasterConnection(PGconn *standby_conn, char *schema, char *cluster, * return 1 if Ok; 0 if any error ocurred; -1 if timeout reached */ int -wait_connection_availability(PGconn *conn, int timeout) +wait_connection_availability(PGconn *conn, long long timeout) { PGresult *res; + fd_set read_set; + int sock = PQsocket(conn); + struct timeval tmout, before, after; + struct timezone tz; - while(timeout-- >= 0) + /* recalc to microseconds */ + timeout *= 1000000; + + while (timeout > 0) { if (PQconsumeInput(conn) == 0) { log_warning(_("wait_connection_availability: could not receive data from connection. %s\n"), - PQerrorMessage(conn)); + PQerrorMessage(conn)); return 0; } if (PQisBusy(conn) == 0) { - res = PQgetResult(conn); - if (res == NULL) - break; - PQclear(res); + do { + res = PQgetResult(conn); + PQclear(res); + } while(res != NULL); + + break; } - sleep(1); + + + tmout.tv_sec = 0; + tmout.tv_usec = 250000; + + FD_ZERO(&read_set); + FD_SET(sock, &read_set); + + gettimeofday(&before, &tz); + if (select(sock, &read_set, NULL, NULL, &tmout) == -1) + { + log_warning( + _("wait_connection_availability: select() returned with error: %s"), + strerror(errno)); + return -1; + } + gettimeofday(&after, &tz); + + timeout -= (after.tv_sec * 1000000 + after.tv_usec) - + (before.tv_sec * 1000000 + before.tv_usec); } + + if (timeout >= 0) + { return 1; - else { - log_warning(_("wait_connection_availability: timeout reached")); - return -1; } + + log_warning(_("wait_connection_availability: timeout reached")); + return -1; } diff --git a/dbutils.h b/dbutils.h index 626ceed1..5fb882bc 100644 --- a/dbutils.h +++ b/dbutils.h @@ -39,6 +39,6 @@ const char *get_cluster_size(PGconn *conn); PGconn *getMasterConnection(PGconn *standby_conn, char *schema, char *cluster, int *master_id, char *master_conninfo_out); -int wait_connection_availability(PGconn *conn, int timeout); +int wait_connection_availability(PGconn *conn, long long timeout); bool CancelQuery(PGconn *conn, int timeout); #endif diff --git a/log.c b/log.c index e268b42a..be083b57 100644 --- a/log.c +++ b/log.c @@ -58,6 +58,8 @@ void stderr_log_with_level(const char *level_name, int level, const char *fmt, . vfprintf(stderr, fmt1, ap); va_end(ap); + + fflush(stderr); } } diff --git a/repmgr.c b/repmgr.c index f8a10808..aaf8b4cf 100644 --- a/repmgr.c +++ b/repmgr.c @@ -575,13 +575,15 @@ do_master_register(void) repmgr_schema, options.node); log_debug(_("master register: %s\n"), sqlquery); - if (!PQexec(conn, sqlquery)) + res = PQexec(conn, sqlquery); + if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { log_warning(_("Cannot delete node details, %s\n"), PQerrorMessage(conn)); PQfinish(conn); exit(ERR_BAD_CONFIG); } + PQclear(res); } /* Ensure there isn't any other master already registered */ @@ -603,13 +605,15 @@ do_master_register(void) options.conninfo, options.priority); log_debug(_("master register: %s\n"), sqlquery); - if (!PQexec(conn, sqlquery)) + res = PQexec(conn, sqlquery); + if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { log_warning(_("Cannot insert node details, %s\n"), PQerrorMessage(conn)); PQfinish(conn); exit(ERR_BAD_CONFIG); } + PQclear(res); PQfinish(conn); log_notice(_("Master node correctly registered for cluster %s with id %d (conninfo: %s)\n"), @@ -735,7 +739,8 @@ do_standby_register(void) log_debug(_("standby register: %s\n"), sqlquery); - if (!PQexec(master_conn, sqlquery)) + res = PQexec(master_conn, sqlquery); + if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { log_err(_("Cannot delete node details, %s\n"), PQerrorMessage(master_conn)); @@ -743,6 +748,7 @@ do_standby_register(void) PQfinish(conn); exit(ERR_BAD_CONFIG); } + PQclear(res); } sqlquery_snprintf(sqlquery, "INSERT INTO %s.repl_nodes(id, cluster, name, conninfo, priority) " @@ -1695,7 +1701,8 @@ do_witness_create(void) repmgr_schema, options.node, options.cluster_name, options.node_name, options.conninfo, options.priority); log_debug(_("witness create: %s"), sqlquery); - if (!PQexec(masterconn, sqlquery)) + res = PQexec(masterconn, sqlquery); + if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { log_err(_("Cannot insert node details, %s\n"), PQerrorMessage(masterconn)); PQfinish(masterconn); @@ -2068,16 +2075,49 @@ static bool create_schema(PGconn *conn) { char sqlquery[QUERY_STR_LEN]; + PGresult *res; sqlquery_snprintf(sqlquery, "CREATE SCHEMA %s", repmgr_schema); log_debug(_("master register: %s\n"), sqlquery); - if (!PQexec(conn, sqlquery)) + res = PQexec(conn, sqlquery); + if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { log_err(_("Cannot create the schema %s: %s\n"), repmgr_schema, PQerrorMessage(conn)); PQfinish(conn); exit(ERR_BAD_CONFIG); } + PQclear(res); + + /* to avoid confusion of the time_lag field and provide a consistent UI we + * use these functions for providing the latest update timestamp */ + sqlquery_snprintf(sqlquery, + "CREATE FUNCTION %s.repmgr_update_last_updated() RETURNS TIMESTAMP WITH TIME ZONE " + "AS '$libdir/repmgr_funcs', 'repmgr_update_last_updated' " + " LANGUAGE C STRICT", repmgr_schema); + res = PQexec(conn, sqlquery); + if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) + { + fprintf(stderr, "Cannot create the function repmgr_update_last_updated: %s\n", + PQerrorMessage(conn)); + return false; + } + PQclear(res); + + + sqlquery_snprintf(sqlquery, + "CREATE FUNCTION %s.repmgr_get_last_updated() RETURNS TIMESTAMP WITH TIME ZONE " + "AS '$libdir/repmgr_funcs', 'repmgr_get_last_updated' " + "LANGUAGE C STRICT", repmgr_schema); + res = PQexec(conn, sqlquery); + if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) + { + fprintf(stderr, "Cannot create the function repmgr_get_last_updated: %s\n", + PQerrorMessage(conn)); + return false; + } + PQclear(res); + /* ... the tables */ sqlquery_snprintf(sqlquery, "CREATE TABLE %s.repl_nodes ( " @@ -2088,86 +2128,101 @@ create_schema(PGconn *conn) " priority integer not null, " " witness boolean not null default false)", repmgr_schema); log_debug(_("master register: %s\n"), sqlquery); - if (!PQexec(conn, sqlquery)) + res = PQexec(conn, sqlquery); + if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { log_err(_("Cannot create the table %s.repl_nodes: %s\n"), repmgr_schema, PQerrorMessage(conn)); PQfinish(conn); exit(ERR_BAD_CONFIG); } + PQclear(res); sqlquery_snprintf(sqlquery, "CREATE TABLE %s.repl_monitor ( " " primary_node INTEGER NOT NULL, " " standby_node INTEGER NOT NULL, " " last_monitor_time TIMESTAMP WITH TIME ZONE NOT NULL, " + " last_apply_time TIMESTAMP WITH TIME ZONE, " " last_wal_primary_location TEXT NOT NULL, " " last_wal_standby_location TEXT, " " replication_lag BIGINT NOT NULL, " " apply_lag BIGINT NOT NULL) ", repmgr_schema); log_debug(_("master register: %s\n"), sqlquery); - if (!PQexec(conn, sqlquery)) + res = PQexec(conn, sqlquery); + if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { log_err(_("Cannot create the table %s.repl_monitor: %s\n"), repmgr_schema, PQerrorMessage(conn)); PQfinish(conn); exit(ERR_BAD_CONFIG); } + PQclear(res); /* a view */ sqlquery_snprintf(sqlquery, "CREATE VIEW %s.repl_status AS " " SELECT primary_node, standby_node, name AS standby_name, last_monitor_time, " " last_wal_primary_location, last_wal_standby_location, " " pg_size_pretty(replication_lag) replication_lag, " + " age(now(), last_apply_time) AS replication_time_lag, " " pg_size_pretty(apply_lag) apply_lag, " - " age(now(), last_monitor_time) AS time_lag " + " age(now(), CASE WHEN pg_is_in_recovery() THEN %s.repmgr_get_last_updated() ELSE last_monitor_time END) AS communication_time_lag " " FROM %s.repl_monitor JOIN %s.repl_nodes ON standby_node = id " " WHERE (standby_node, last_monitor_time) IN (SELECT standby_node, MAX(last_monitor_time) " " FROM %s.repl_monitor GROUP BY 1)", - repmgr_schema, repmgr_schema, repmgr_schema, repmgr_schema); + repmgr_schema, repmgr_schema, repmgr_schema, repmgr_schema, repmgr_schema); log_debug(_("master register: %s\n"), sqlquery); - if (!PQexec(conn, sqlquery)) + + res = PQexec(conn, sqlquery); + if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { log_err(_("Cannot create the view %s.repl_status: %s\n"), repmgr_schema, PQerrorMessage(conn)); PQfinish(conn); exit(ERR_BAD_CONFIG); } + PQclear(res); /* an index to improve performance of the view */ sqlquery_snprintf(sqlquery, "CREATE INDEX idx_repl_status_sort " " ON %s.repl_monitor (last_monitor_time, standby_node) ", repmgr_schema); log_debug(_("master register: %s\n"), sqlquery); - if (!PQexec(conn, sqlquery)) + res = PQexec(conn, sqlquery); + if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { - log_err(_("Cannot indexing table %s.repl_monitor: %s\n"), + log_err(_("Can't index table %s.repl_monitor: %s\n"), repmgr_schema, PQerrorMessage(conn)); PQfinish(conn); exit(ERR_BAD_CONFIG); } + PQclear(res); /* XXX Here we MUST try to load the repmgr_function.sql not hardcode it here */ sqlquery_snprintf(sqlquery, "CREATE OR REPLACE FUNCTION %s.repmgr_update_standby_location(text) RETURNS boolean " "AS '$libdir/repmgr_funcs', 'repmgr_update_standby_location' " "LANGUAGE C STRICT ", repmgr_schema); - if (!PQexec(conn, sqlquery)) + res = PQexec(conn, sqlquery); + if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { fprintf(stderr, "Cannot create the function repmgr_update_standby_location: %s\n", PQerrorMessage(conn)); return false; } + PQclear(res); sqlquery_snprintf(sqlquery, "CREATE OR REPLACE FUNCTION %s.repmgr_get_last_standby_location() RETURNS text " "AS '$libdir/repmgr_funcs', 'repmgr_get_last_standby_location' " "LANGUAGE C STRICT ", repmgr_schema); - if (!PQexec(conn, sqlquery)) + res = PQexec(conn, sqlquery); + if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { fprintf(stderr, "Cannot create the function repmgr_get_last_standby_location: %s\n", PQerrorMessage(conn)); return false; } + PQclear(res); return true; } @@ -2182,7 +2237,8 @@ copy_configuration(PGconn *masterconn, PGconn *witnessconn) sqlquery_snprintf(sqlquery, "TRUNCATE TABLE %s.repl_nodes", repmgr_schema); log_debug("copy_configuration: %s\n", sqlquery); - if (!PQexec(witnessconn, sqlquery)) + res = PQexec(witnessconn, sqlquery); + if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { fprintf(stderr, "Cannot clean node details in the witness, %s\n", PQerrorMessage(witnessconn)); @@ -2208,7 +2264,8 @@ copy_configuration(PGconn *masterconn, PGconn *witnessconn) atoi(PQgetvalue(res, i, 3)), PQgetvalue(res, i, 4)); - if (!PQexec(witnessconn, sqlquery)) + res = PQexec(witnessconn, sqlquery); + if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { fprintf(stderr, "Cannot copy configuration to witness, %s\n", PQerrorMessage(witnessconn)); diff --git a/repmgrd.c b/repmgrd.c index b6ecbe28..93c52f9f 100644 --- a/repmgrd.c +++ b/repmgrd.c @@ -144,17 +144,24 @@ static void terminate(int retval); static void setup_event_handlers(void); #endif -static void do_daemonize(); +static void do_daemonize(void); static void check_and_create_pid_file(const char *pid_file); -#define CloseConnections() \ - if (PQisBusy(primaryConn) == 1) \ - (void) CancelQuery(primaryConn, local_options.master_response_timeout); \ - if (myLocalConn != NULL) \ - PQfinish(myLocalConn); \ - if (primaryConn != NULL && primaryConn != myLocalConn) \ +static void +CloseConnections() { + if (primaryConn != NULL && PQisBusy(primaryConn) == 1) + CancelQuery(primaryConn, local_options.master_response_timeout); + + if (myLocalConn != NULL) + PQfinish(myLocalConn); + + if (primaryConn != NULL && primaryConn != myLocalConn) PQfinish(primaryConn); + primaryConn = NULL; + myLocalConn = NULL; +} + int main(int argc, char **argv) @@ -431,10 +438,6 @@ main(int argc, char **argv) } while (true); - /* Prevent a double-free */ - if (primaryConn == myLocalConn) - myLocalConn = NULL; - /* close the connection to the database and cleanup */ CloseConnections(); @@ -530,6 +533,7 @@ StandbyMonitor(void) char last_wal_primary_location[MAXLEN]; char last_wal_standby_received[MAXLEN]; char last_wal_standby_applied[MAXLEN]; + char last_wal_standby_applied_timestamp[MAXLEN]; unsigned long long int lsn_primary; unsigned long long int lsn_standby_received; @@ -546,11 +550,15 @@ StandbyMonitor(void) if (!CheckConnection(myLocalConn, "standby")) { + log_err("Failed to connect to local node, exiting!\n"); terminate(1); } if (PQstatus(primaryConn) != CONNECTION_OK) { + PQfinish(primaryConn); + primaryConn = NULL; + if (local_options.failover == MANUAL_FAILOVER) { log_err(_("We couldn't reconnect to master. Now checking if another node has been promoted.\n")); @@ -637,7 +645,7 @@ StandbyMonitor(void) sqlquery_snprintf( sqlquery, "SELECT CURRENT_TIMESTAMP, pg_last_xlog_receive_location(), " - "pg_last_xlog_replay_location()"); + "pg_last_xlog_replay_location(), pg_last_xact_replay_timestamp()"); res = PQexec(myLocalConn, sqlquery); if (PQresultStatus(res) != PGRES_TUPLES_OK) @@ -651,6 +659,7 @@ StandbyMonitor(void) strncpy(monitor_standby_timestamp, PQgetvalue(res, 0, 0), MAXLEN); strncpy(last_wal_standby_received , PQgetvalue(res, 0, 1), MAXLEN); strncpy(last_wal_standby_applied , PQgetvalue(res, 0, 2), MAXLEN); + strncpy(last_wal_standby_applied_timestamp, PQgetvalue(res, 0, 3), MAXLEN); PQclear(res); /* Get primary xlog info */ @@ -678,9 +687,10 @@ StandbyMonitor(void) sqlquery_snprintf(sqlquery, "INSERT INTO %s.repl_monitor " "VALUES(%d, %d, '%s'::timestamp with time zone, " - " '%s', '%s', " + " '%s'::timestamp with time zone, '%s', '%s', " " %lld, %lld)", repmgr_schema, primary_options.node, local_options.node, monitor_standby_timestamp, + last_wal_standby_applied_timestamp, last_wal_primary_location, last_wal_standby_received, (lsn_primary - lsn_standby_received), @@ -770,7 +780,12 @@ do_failover(void) /* if we can't see the node just skip it */ if (PQstatus(nodeConn) != CONNECTION_OK) + { + if (nodeConn != NULL) + PQfinish(nodeConn); + continue; + } visible_nodes++; nodes[i].is_visible = true; @@ -953,6 +968,7 @@ do_failover(void) /* Close the connection to this server */ PQfinish(myLocalConn); + myLocalConn = NULL; /* * determine which one is the best candidate to promote to primary @@ -1269,6 +1285,8 @@ terminate(int retval) unlink(pid_file); } + log_info("Terminating...\n"); + exit(retval); } diff --git a/sql/repmgr_funcs.c b/sql/repmgr_funcs.c index 9ff4b6f5..dfeb139b 100644 --- a/sql/repmgr_funcs.c +++ b/sql/repmgr_funcs.c @@ -15,6 +15,7 @@ #include "storage/shmem.h" #include "storage/spin.h" #include "utils/builtins.h" +#include "utils/timestamp.h" /* same definition as the one in xlog_internal.h */ #define MAXFNAMELEN 64 @@ -28,6 +29,7 @@ typedef struct repmgrSharedState { LWLockId lock; /* protects search/modification */ char location[MAXFNAMELEN]; /* last known xlog location */ + TimestampTz last_updated; } repmgrSharedState; /* Links to shared memory state */ @@ -49,6 +51,12 @@ Datum repmgr_get_last_standby_location(PG_FUNCTION_ARGS); PG_FUNCTION_INFO_V1(repmgr_update_standby_location); PG_FUNCTION_INFO_V1(repmgr_get_last_standby_location); +Datum repmgr_update_last_updated(PG_FUNCTION_ARGS); +Datum repmgr_get_last_updated(PG_FUNCTION_ARGS); + +PG_FUNCTION_INFO_V1(repmgr_update_last_updated); +PG_FUNCTION_INFO_V1(repmgr_get_last_updated); + /* * Module load callback @@ -187,3 +195,38 @@ repmgr_update_standby_location(PG_FUNCTION_ARGS) PG_RETURN_BOOL(repmgr_set_standby_location(locationstr)); } + +/* update and return last updated with current timestamp */ +Datum +repmgr_update_last_updated(PG_FUNCTION_ARGS) +{ + TimestampTz last_updated = GetCurrentTimestamp(); + + /* Safety check... */ + if (!shared_state) + PG_RETURN_NULL(); + + LWLockAcquire(shared_state->lock, LW_SHARED); + shared_state->last_updated = last_updated; + LWLockRelease(shared_state->lock); + + PG_RETURN_TIMESTAMPTZ(last_updated); +} + + +/* get last updated timestamp */ +Datum +repmgr_get_last_updated(PG_FUNCTION_ARGS) +{ + TimestampTz last_updated; + + /* Safety check... */ + if (!shared_state) + PG_RETURN_NULL(); + + LWLockAcquire(shared_state->lock, LW_EXCLUSIVE); + last_updated = shared_state->last_updated; + LWLockRelease(shared_state->lock); + + PG_RETURN_TIMESTAMPTZ(last_updated); +} diff --git a/sql/repmgr_funcs.sql.in b/sql/repmgr_funcs.sql.in index b637eda2..fc438cd6 100644 --- a/sql/repmgr_funcs.sql.in +++ b/sql/repmgr_funcs.sql.in @@ -1,6 +1,6 @@ /* * repmgr_function.sql - * Copyright (c) 2ndQuadrant, 2010 + * Copyright (c) 2ndQuadrant, 2010-2014 * */ @@ -13,3 +13,11 @@ LANGUAGE C STRICT; CREATE FUNCTION repmgr_get_last_standby_location() RETURNS text AS 'MODULE_PATHNAME', 'repmgr_get_last_standby_location' LANGUAGE C STRICT; + +CREATE FUNCTION repmgr_update_last_updated() RETURNS TIMESTAMP WITH TIME ZONE +AS 'MODULE_PATHNAME', 'repmgr_update_last_updated' +LANGUAGE C STRICT; + +CREATE FUNCTION repmgr_get_last_updated() RETURNS TIMESTAMP WITH TIME ZONE +AS 'MODULE_PATHNAME', 'repmgr_get_last_updated' +LANGUAGE C STRICT; diff --git a/sql/uninstall_repmgr_funcs.sql b/sql/uninstall_repmgr_funcs.sql index 2215503b..08ecca58 100644 --- a/sql/uninstall_repmgr_funcs.sql +++ b/sql/uninstall_repmgr_funcs.sql @@ -1,2 +1,11 @@ +/* + * uninstall_repmgr_funcs.sql + * Copyright (c) 2ndQuadrant, 2010-2014 + * + */ + DROP FUNCTION repmgr_update_standby_location(text); DROP FUNCTION repmgr_get_last_standby_location(); + +DROP FUNCTION repmgr_update_last_updated(); +DROP FUNCTION repmgr_get_last_updated();