Merge branch 'master' into REL2_0_STABLE

This commit is contained in:
Christian Kruse
2014-03-03 09:25:08 +01:00
9 changed files with 215 additions and 43 deletions

View File

@@ -191,7 +191,9 @@ trim (char *s)
++s1;
/* Copy finished string */
strcpy (s, s1);
memmove (s, s1, s2 - s1);
s[s2 - s1 + 1] = '\0';
return s;
}

View File

@@ -18,6 +18,8 @@
*/
#include <unistd.h>
#include <time.h>
#include <sys/time.h>
#include "repmgr.h"
#include "strutil.h"
@@ -426,34 +428,65 @@ getMasterConnection(PGconn *standby_conn, char *schema, char *cluster,
* return 1 if Ok; 0 if any error ocurred; -1 if timeout reached
*/
int
wait_connection_availability(PGconn *conn, int timeout)
wait_connection_availability(PGconn *conn, long long timeout)
{
PGresult *res;
fd_set read_set;
int sock = PQsocket(conn);
struct timeval tmout, before, after;
struct timezone tz;
while(timeout-- >= 0)
/* recalc to microseconds */
timeout *= 1000000;
while (timeout > 0)
{
if (PQconsumeInput(conn) == 0)
{
log_warning(_("wait_connection_availability: could not receive data from connection. %s\n"),
PQerrorMessage(conn));
PQerrorMessage(conn));
return 0;
}
if (PQisBusy(conn) == 0)
{
res = PQgetResult(conn);
if (res == NULL)
break;
PQclear(res);
do {
res = PQgetResult(conn);
PQclear(res);
} while(res != NULL);
break;
}
sleep(1);
tmout.tv_sec = 0;
tmout.tv_usec = 250000;
FD_ZERO(&read_set);
FD_SET(sock, &read_set);
gettimeofday(&before, &tz);
if (select(sock, &read_set, NULL, NULL, &tmout) == -1)
{
log_warning(
_("wait_connection_availability: select() returned with error: %s"),
strerror(errno));
return -1;
}
gettimeofday(&after, &tz);
timeout -= (after.tv_sec * 1000000 + after.tv_usec) -
(before.tv_sec * 1000000 + before.tv_usec);
}
if (timeout >= 0)
{
return 1;
else {
log_warning(_("wait_connection_availability: timeout reached"));
return -1;
}
log_warning(_("wait_connection_availability: timeout reached"));
return -1;
}

View File

@@ -39,6 +39,6 @@ const char *get_cluster_size(PGconn *conn);
PGconn *getMasterConnection(PGconn *standby_conn, char *schema, char *cluster,
int *master_id, char *master_conninfo_out);
int wait_connection_availability(PGconn *conn, int timeout);
int wait_connection_availability(PGconn *conn, long long timeout);
bool CancelQuery(PGconn *conn, int timeout);
#endif

2
log.c
View File

@@ -58,6 +58,8 @@ void stderr_log_with_level(const char *level_name, int level, const char *fmt, .
vfprintf(stderr, fmt1, ap);
va_end(ap);
fflush(stderr);
}
}

View File

@@ -575,13 +575,15 @@ do_master_register(void)
repmgr_schema, options.node);
log_debug(_("master register: %s\n"), sqlquery);
if (!PQexec(conn, sqlquery))
res = PQexec(conn, sqlquery);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
log_warning(_("Cannot delete node details, %s\n"),
PQerrorMessage(conn));
PQfinish(conn);
exit(ERR_BAD_CONFIG);
}
PQclear(res);
}
/* Ensure there isn't any other master already registered */
@@ -603,13 +605,15 @@ do_master_register(void)
options.conninfo, options.priority);
log_debug(_("master register: %s\n"), sqlquery);
if (!PQexec(conn, sqlquery))
res = PQexec(conn, sqlquery);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
log_warning(_("Cannot insert node details, %s\n"),
PQerrorMessage(conn));
PQfinish(conn);
exit(ERR_BAD_CONFIG);
}
PQclear(res);
PQfinish(conn);
log_notice(_("Master node correctly registered for cluster %s with id %d (conninfo: %s)\n"),
@@ -735,7 +739,8 @@ do_standby_register(void)
log_debug(_("standby register: %s\n"), sqlquery);
if (!PQexec(master_conn, sqlquery))
res = PQexec(master_conn, sqlquery);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
log_err(_("Cannot delete node details, %s\n"),
PQerrorMessage(master_conn));
@@ -743,6 +748,7 @@ do_standby_register(void)
PQfinish(conn);
exit(ERR_BAD_CONFIG);
}
PQclear(res);
}
sqlquery_snprintf(sqlquery, "INSERT INTO %s.repl_nodes(id, cluster, name, conninfo, priority) "
@@ -1695,7 +1701,8 @@ do_witness_create(void)
repmgr_schema, options.node, options.cluster_name, options.node_name, options.conninfo, options.priority);
log_debug(_("witness create: %s"), sqlquery);
if (!PQexec(masterconn, sqlquery))
res = PQexec(masterconn, sqlquery);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
log_err(_("Cannot insert node details, %s\n"), PQerrorMessage(masterconn));
PQfinish(masterconn);
@@ -2068,16 +2075,49 @@ static bool
create_schema(PGconn *conn)
{
char sqlquery[QUERY_STR_LEN];
PGresult *res;
sqlquery_snprintf(sqlquery, "CREATE SCHEMA %s", repmgr_schema);
log_debug(_("master register: %s\n"), sqlquery);
if (!PQexec(conn, sqlquery))
res = PQexec(conn, sqlquery);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
log_err(_("Cannot create the schema %s: %s\n"),
repmgr_schema, PQerrorMessage(conn));
PQfinish(conn);
exit(ERR_BAD_CONFIG);
}
PQclear(res);
/* to avoid confusion of the time_lag field and provide a consistent UI we
* use these functions for providing the latest update timestamp */
sqlquery_snprintf(sqlquery,
"CREATE FUNCTION %s.repmgr_update_last_updated() RETURNS TIMESTAMP WITH TIME ZONE "
"AS '$libdir/repmgr_funcs', 'repmgr_update_last_updated' "
" LANGUAGE C STRICT", repmgr_schema);
res = PQexec(conn, sqlquery);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "Cannot create the function repmgr_update_last_updated: %s\n",
PQerrorMessage(conn));
return false;
}
PQclear(res);
sqlquery_snprintf(sqlquery,
"CREATE FUNCTION %s.repmgr_get_last_updated() RETURNS TIMESTAMP WITH TIME ZONE "
"AS '$libdir/repmgr_funcs', 'repmgr_get_last_updated' "
"LANGUAGE C STRICT", repmgr_schema);
res = PQexec(conn, sqlquery);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "Cannot create the function repmgr_get_last_updated: %s\n",
PQerrorMessage(conn));
return false;
}
PQclear(res);
/* ... the tables */
sqlquery_snprintf(sqlquery, "CREATE TABLE %s.repl_nodes ( "
@@ -2088,86 +2128,101 @@ create_schema(PGconn *conn)
" priority integer not null, "
" witness boolean not null default false)", repmgr_schema);
log_debug(_("master register: %s\n"), sqlquery);
if (!PQexec(conn, sqlquery))
res = PQexec(conn, sqlquery);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
log_err(_("Cannot create the table %s.repl_nodes: %s\n"),
repmgr_schema, PQerrorMessage(conn));
PQfinish(conn);
exit(ERR_BAD_CONFIG);
}
PQclear(res);
sqlquery_snprintf(sqlquery, "CREATE TABLE %s.repl_monitor ( "
" primary_node INTEGER NOT NULL, "
" standby_node INTEGER NOT NULL, "
" last_monitor_time TIMESTAMP WITH TIME ZONE NOT NULL, "
" last_apply_time TIMESTAMP WITH TIME ZONE, "
" last_wal_primary_location TEXT NOT NULL, "
" last_wal_standby_location TEXT, "
" replication_lag BIGINT NOT NULL, "
" apply_lag BIGINT NOT NULL) ", repmgr_schema);
log_debug(_("master register: %s\n"), sqlquery);
if (!PQexec(conn, sqlquery))
res = PQexec(conn, sqlquery);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
log_err(_("Cannot create the table %s.repl_monitor: %s\n"),
repmgr_schema, PQerrorMessage(conn));
PQfinish(conn);
exit(ERR_BAD_CONFIG);
}
PQclear(res);
/* a view */
sqlquery_snprintf(sqlquery, "CREATE VIEW %s.repl_status AS "
" SELECT primary_node, standby_node, name AS standby_name, last_monitor_time, "
" last_wal_primary_location, last_wal_standby_location, "
" pg_size_pretty(replication_lag) replication_lag, "
" age(now(), last_apply_time) AS replication_time_lag, "
" pg_size_pretty(apply_lag) apply_lag, "
" age(now(), last_monitor_time) AS time_lag "
" age(now(), CASE WHEN pg_is_in_recovery() THEN %s.repmgr_get_last_updated() ELSE last_monitor_time END) AS communication_time_lag "
" FROM %s.repl_monitor JOIN %s.repl_nodes ON standby_node = id "
" WHERE (standby_node, last_monitor_time) IN (SELECT standby_node, MAX(last_monitor_time) "
" FROM %s.repl_monitor GROUP BY 1)",
repmgr_schema, repmgr_schema, repmgr_schema, repmgr_schema);
repmgr_schema, repmgr_schema, repmgr_schema, repmgr_schema, repmgr_schema);
log_debug(_("master register: %s\n"), sqlquery);
if (!PQexec(conn, sqlquery))
res = PQexec(conn, sqlquery);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
log_err(_("Cannot create the view %s.repl_status: %s\n"),
repmgr_schema, PQerrorMessage(conn));
PQfinish(conn);
exit(ERR_BAD_CONFIG);
}
PQclear(res);
/* an index to improve performance of the view */
sqlquery_snprintf(sqlquery, "CREATE INDEX idx_repl_status_sort "
" ON %s.repl_monitor (last_monitor_time, standby_node) ",
repmgr_schema);
log_debug(_("master register: %s\n"), sqlquery);
if (!PQexec(conn, sqlquery))
res = PQexec(conn, sqlquery);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
log_err(_("Cannot indexing table %s.repl_monitor: %s\n"),
log_err(_("Can't index table %s.repl_monitor: %s\n"),
repmgr_schema, PQerrorMessage(conn));
PQfinish(conn);
exit(ERR_BAD_CONFIG);
}
PQclear(res);
/* XXX Here we MUST try to load the repmgr_function.sql not hardcode it here */
sqlquery_snprintf(sqlquery,
"CREATE OR REPLACE FUNCTION %s.repmgr_update_standby_location(text) RETURNS boolean "
"AS '$libdir/repmgr_funcs', 'repmgr_update_standby_location' "
"LANGUAGE C STRICT ", repmgr_schema);
if (!PQexec(conn, sqlquery))
res = PQexec(conn, sqlquery);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "Cannot create the function repmgr_update_standby_location: %s\n",
PQerrorMessage(conn));
return false;
}
PQclear(res);
sqlquery_snprintf(sqlquery,
"CREATE OR REPLACE FUNCTION %s.repmgr_get_last_standby_location() RETURNS text "
"AS '$libdir/repmgr_funcs', 'repmgr_get_last_standby_location' "
"LANGUAGE C STRICT ", repmgr_schema);
if (!PQexec(conn, sqlquery))
res = PQexec(conn, sqlquery);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "Cannot create the function repmgr_get_last_standby_location: %s\n",
PQerrorMessage(conn));
return false;
}
PQclear(res);
return true;
}
@@ -2182,7 +2237,8 @@ copy_configuration(PGconn *masterconn, PGconn *witnessconn)
sqlquery_snprintf(sqlquery, "TRUNCATE TABLE %s.repl_nodes", repmgr_schema);
log_debug("copy_configuration: %s\n", sqlquery);
if (!PQexec(witnessconn, sqlquery))
res = PQexec(witnessconn, sqlquery);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "Cannot clean node details in the witness, %s\n",
PQerrorMessage(witnessconn));
@@ -2208,7 +2264,8 @@ copy_configuration(PGconn *masterconn, PGconn *witnessconn)
atoi(PQgetvalue(res, i, 3)),
PQgetvalue(res, i, 4));
if (!PQexec(witnessconn, sqlquery))
res = PQexec(witnessconn, sqlquery);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "Cannot copy configuration to witness, %s\n",
PQerrorMessage(witnessconn));

View File

@@ -144,17 +144,24 @@ static void terminate(int retval);
static void setup_event_handlers(void);
#endif
static void do_daemonize();
static void do_daemonize(void);
static void check_and_create_pid_file(const char *pid_file);
#define CloseConnections() \
if (PQisBusy(primaryConn) == 1) \
(void) CancelQuery(primaryConn, local_options.master_response_timeout); \
if (myLocalConn != NULL) \
PQfinish(myLocalConn); \
if (primaryConn != NULL && primaryConn != myLocalConn) \
static void
CloseConnections() {
if (primaryConn != NULL && PQisBusy(primaryConn) == 1)
CancelQuery(primaryConn, local_options.master_response_timeout);
if (myLocalConn != NULL)
PQfinish(myLocalConn);
if (primaryConn != NULL && primaryConn != myLocalConn)
PQfinish(primaryConn);
primaryConn = NULL;
myLocalConn = NULL;
}
int
main(int argc, char **argv)
@@ -431,10 +438,6 @@ main(int argc, char **argv)
} while (true);
/* Prevent a double-free */
if (primaryConn == myLocalConn)
myLocalConn = NULL;
/* close the connection to the database and cleanup */
CloseConnections();
@@ -530,6 +533,7 @@ StandbyMonitor(void)
char last_wal_primary_location[MAXLEN];
char last_wal_standby_received[MAXLEN];
char last_wal_standby_applied[MAXLEN];
char last_wal_standby_applied_timestamp[MAXLEN];
unsigned long long int lsn_primary;
unsigned long long int lsn_standby_received;
@@ -546,11 +550,15 @@ StandbyMonitor(void)
if (!CheckConnection(myLocalConn, "standby"))
{
log_err("Failed to connect to local node, exiting!\n");
terminate(1);
}
if (PQstatus(primaryConn) != CONNECTION_OK)
{
PQfinish(primaryConn);
primaryConn = NULL;
if (local_options.failover == MANUAL_FAILOVER)
{
log_err(_("We couldn't reconnect to master. Now checking if another node has been promoted.\n"));
@@ -637,7 +645,7 @@ StandbyMonitor(void)
sqlquery_snprintf(
sqlquery,
"SELECT CURRENT_TIMESTAMP, pg_last_xlog_receive_location(), "
"pg_last_xlog_replay_location()");
"pg_last_xlog_replay_location(), pg_last_xact_replay_timestamp()");
res = PQexec(myLocalConn, sqlquery);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
@@ -651,6 +659,7 @@ StandbyMonitor(void)
strncpy(monitor_standby_timestamp, PQgetvalue(res, 0, 0), MAXLEN);
strncpy(last_wal_standby_received , PQgetvalue(res, 0, 1), MAXLEN);
strncpy(last_wal_standby_applied , PQgetvalue(res, 0, 2), MAXLEN);
strncpy(last_wal_standby_applied_timestamp, PQgetvalue(res, 0, 3), MAXLEN);
PQclear(res);
/* Get primary xlog info */
@@ -678,9 +687,10 @@ StandbyMonitor(void)
sqlquery_snprintf(sqlquery,
"INSERT INTO %s.repl_monitor "
"VALUES(%d, %d, '%s'::timestamp with time zone, "
" '%s', '%s', "
" '%s'::timestamp with time zone, '%s', '%s', "
" %lld, %lld)", repmgr_schema,
primary_options.node, local_options.node, monitor_standby_timestamp,
last_wal_standby_applied_timestamp,
last_wal_primary_location,
last_wal_standby_received,
(lsn_primary - lsn_standby_received),
@@ -770,7 +780,12 @@ do_failover(void)
/* if we can't see the node just skip it */
if (PQstatus(nodeConn) != CONNECTION_OK)
{
if (nodeConn != NULL)
PQfinish(nodeConn);
continue;
}
visible_nodes++;
nodes[i].is_visible = true;
@@ -953,6 +968,7 @@ do_failover(void)
/* Close the connection to this server */
PQfinish(myLocalConn);
myLocalConn = NULL;
/*
* determine which one is the best candidate to promote to primary
@@ -1269,6 +1285,8 @@ terminate(int retval)
unlink(pid_file);
}
log_info("Terminating...\n");
exit(retval);
}

View File

@@ -15,6 +15,7 @@
#include "storage/shmem.h"
#include "storage/spin.h"
#include "utils/builtins.h"
#include "utils/timestamp.h"
/* same definition as the one in xlog_internal.h */
#define MAXFNAMELEN 64
@@ -28,6 +29,7 @@ typedef struct repmgrSharedState
{
LWLockId lock; /* protects search/modification */
char location[MAXFNAMELEN]; /* last known xlog location */
TimestampTz last_updated;
} repmgrSharedState;
/* Links to shared memory state */
@@ -49,6 +51,12 @@ Datum repmgr_get_last_standby_location(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(repmgr_update_standby_location);
PG_FUNCTION_INFO_V1(repmgr_get_last_standby_location);
Datum repmgr_update_last_updated(PG_FUNCTION_ARGS);
Datum repmgr_get_last_updated(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(repmgr_update_last_updated);
PG_FUNCTION_INFO_V1(repmgr_get_last_updated);
/*
* Module load callback
@@ -187,3 +195,38 @@ repmgr_update_standby_location(PG_FUNCTION_ARGS)
PG_RETURN_BOOL(repmgr_set_standby_location(locationstr));
}
/* update and return last updated with current timestamp */
Datum
repmgr_update_last_updated(PG_FUNCTION_ARGS)
{
TimestampTz last_updated = GetCurrentTimestamp();
/* Safety check... */
if (!shared_state)
PG_RETURN_NULL();
LWLockAcquire(shared_state->lock, LW_SHARED);
shared_state->last_updated = last_updated;
LWLockRelease(shared_state->lock);
PG_RETURN_TIMESTAMPTZ(last_updated);
}
/* get last updated timestamp */
Datum
repmgr_get_last_updated(PG_FUNCTION_ARGS)
{
TimestampTz last_updated;
/* Safety check... */
if (!shared_state)
PG_RETURN_NULL();
LWLockAcquire(shared_state->lock, LW_EXCLUSIVE);
last_updated = shared_state->last_updated;
LWLockRelease(shared_state->lock);
PG_RETURN_TIMESTAMPTZ(last_updated);
}

View File

@@ -1,6 +1,6 @@
/*
* repmgr_function.sql
* Copyright (c) 2ndQuadrant, 2010
* Copyright (c) 2ndQuadrant, 2010-2014
*
*/
@@ -13,3 +13,11 @@ LANGUAGE C STRICT;
CREATE FUNCTION repmgr_get_last_standby_location() RETURNS text
AS 'MODULE_PATHNAME', 'repmgr_get_last_standby_location'
LANGUAGE C STRICT;
CREATE FUNCTION repmgr_update_last_updated() RETURNS TIMESTAMP WITH TIME ZONE
AS 'MODULE_PATHNAME', 'repmgr_update_last_updated'
LANGUAGE C STRICT;
CREATE FUNCTION repmgr_get_last_updated() RETURNS TIMESTAMP WITH TIME ZONE
AS 'MODULE_PATHNAME', 'repmgr_get_last_updated'
LANGUAGE C STRICT;

View File

@@ -1,2 +1,11 @@
/*
* uninstall_repmgr_funcs.sql
* Copyright (c) 2ndQuadrant, 2010-2014
*
*/
DROP FUNCTION repmgr_update_standby_location(text);
DROP FUNCTION repmgr_get_last_standby_location();
DROP FUNCTION repmgr_update_last_updated();
DROP FUNCTION repmgr_get_last_updated();