Compare commits

...

15 Commits

Author SHA1 Message Date
Christian Kruse
0a71123920 Merge branch 'master' into REL2_0_STABLE 2014-03-03 09:25:08 +01:00
Christian Kruse
0ff14a2aa1 avoid compiler warnings 2014-02-21 13:47:29 +01:00
Christian Kruse
5215265694 fix: now CloseConnections() is much more safe 2014-02-18 17:06:36 +01:00
Christian Kruse
e45ac25348 fix: progname is const, do not free it
The leak is irrelevant
2014-02-18 16:45:35 +01:00
Christian Kruse
a1ce01f033 fix: fixed some leaks 2014-02-18 16:35:29 +01:00
Christian Kruse
516cde621a fix: strcpy() on overlapping memory regions is invalid 2014-02-18 15:42:20 +01:00
Christian Kruse
f0807923a3 fix: gettimeofday() expects two arguments 2014-02-18 15:33:56 +01:00
Christian Kruse
10ca8037f8 added some more log messages
Now we should be able to distinguish different events more easily
2014-02-18 14:10:12 +01:00
Christian Kruse
0dc46f0dc8 fix: set connection to NULL when finishing it
This will avoid CloseConnections() to try to close an already closed connection.
2014-02-18 13:42:49 +01:00
Christian Kruse
c3b58658ad fixing repmgr repl_status columns
repmgr repl_status had the column time_lag which was documented to be
the time a standby is behind master. In fact it only works like this
when viewed on the standby and not on the master: there it only was the
time of the last status update. We dropped that column and replaced it
by a new column „communication_time_lag“ which is the content of the
repl_status column on the master. On the standby we contain the time of
the last update in shared mem though refer always to the correct time
nonetheless where repl_status is queried. We also added a new column,
„replication_time_lag“, which refers to the apply delay.
2014-02-15 01:35:27 +01:00
Christian Kruse
18f1fed77f fixing wait_connection_availability()
wait_connection_availability() did take at least 2 seconds per call in
the old incarnation. Now we may finish a call without any sleep at all
when the result is already ready at the time called
2014-02-15 01:31:12 +01:00
Christian Kruse
d58fd080ca flush stderr after a log message appears
We had the problem that the log file appeared empty for a long time due
to file buffers. Thus we call fflush() after every log message so the
log file gets written out to disk quickly
2014-02-15 01:29:12 +01:00
Christian Kruse
c4ac2d3343 fixing PQexec() calls
fixing several calls where we did not check the result status but only
the return value; the query may fail nonetheless
2014-02-15 01:27:53 +01:00
Christian Kruse
a72c2296e9 Merge branch 'master' into REL2_0_STABLE 2014-02-11 09:28:40 +01:00
Christian Kruse
5ff1beeea7 do not enable autofailover by default
Autofailover is an experimental feature which should not be enabled by
default. The user has to be aware of what he is doing when enabling it.
2014-02-11 09:27:31 +01:00
10 changed files with 216 additions and 44 deletions

View File

@@ -191,7 +191,9 @@ trim (char *s)
++s1;
/* Copy finished string */
strcpy (s, s1);
memmove (s, s1, s2 - s1);
s[s2 - s1 + 1] = '\0';
return s;
}

View File

@@ -18,6 +18,8 @@
*/
#include <unistd.h>
#include <time.h>
#include <sys/time.h>
#include "repmgr.h"
#include "strutil.h"
@@ -426,34 +428,65 @@ getMasterConnection(PGconn *standby_conn, char *schema, char *cluster,
* return 1 if Ok; 0 if any error ocurred; -1 if timeout reached
*/
int
wait_connection_availability(PGconn *conn, int timeout)
wait_connection_availability(PGconn *conn, long long timeout)
{
PGresult *res;
fd_set read_set;
int sock = PQsocket(conn);
struct timeval tmout, before, after;
struct timezone tz;
while(timeout-- >= 0)
/* recalc to microseconds */
timeout *= 1000000;
while (timeout > 0)
{
if (PQconsumeInput(conn) == 0)
{
log_warning(_("wait_connection_availability: could not receive data from connection. %s\n"),
PQerrorMessage(conn));
PQerrorMessage(conn));
return 0;
}
if (PQisBusy(conn) == 0)
{
res = PQgetResult(conn);
if (res == NULL)
break;
PQclear(res);
do {
res = PQgetResult(conn);
PQclear(res);
} while(res != NULL);
break;
}
sleep(1);
tmout.tv_sec = 0;
tmout.tv_usec = 250000;
FD_ZERO(&read_set);
FD_SET(sock, &read_set);
gettimeofday(&before, &tz);
if (select(sock, &read_set, NULL, NULL, &tmout) == -1)
{
log_warning(
_("wait_connection_availability: select() returned with error: %s"),
strerror(errno));
return -1;
}
gettimeofday(&after, &tz);
timeout -= (after.tv_sec * 1000000 + after.tv_usec) -
(before.tv_sec * 1000000 + before.tv_usec);
}
if (timeout >= 0)
{
return 1;
else {
log_warning(_("wait_connection_availability: timeout reached"));
return -1;
}
log_warning(_("wait_connection_availability: timeout reached"));
return -1;
}

View File

@@ -39,6 +39,6 @@ const char *get_cluster_size(PGconn *conn);
PGconn *getMasterConnection(PGconn *standby_conn, char *schema, char *cluster,
int *master_id, char *master_conninfo_out);
int wait_connection_availability(PGconn *conn, int timeout);
int wait_connection_availability(PGconn *conn, long long timeout);
bool CancelQuery(PGconn *conn, int timeout);
#endif

2
log.c
View File

@@ -58,6 +58,8 @@ void stderr_log_with_level(const char *level_name, int level, const char *fmt, .
vfprintf(stderr, fmt1, ap);
va_end(ap);
fflush(stderr);
}
}

View File

@@ -575,13 +575,15 @@ do_master_register(void)
repmgr_schema, options.node);
log_debug(_("master register: %s\n"), sqlquery);
if (!PQexec(conn, sqlquery))
res = PQexec(conn, sqlquery);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
log_warning(_("Cannot delete node details, %s\n"),
PQerrorMessage(conn));
PQfinish(conn);
exit(ERR_BAD_CONFIG);
}
PQclear(res);
}
/* Ensure there isn't any other master already registered */
@@ -603,13 +605,15 @@ do_master_register(void)
options.conninfo, options.priority);
log_debug(_("master register: %s\n"), sqlquery);
if (!PQexec(conn, sqlquery))
res = PQexec(conn, sqlquery);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
log_warning(_("Cannot insert node details, %s\n"),
PQerrorMessage(conn));
PQfinish(conn);
exit(ERR_BAD_CONFIG);
}
PQclear(res);
PQfinish(conn);
log_notice(_("Master node correctly registered for cluster %s with id %d (conninfo: %s)\n"),
@@ -735,7 +739,8 @@ do_standby_register(void)
log_debug(_("standby register: %s\n"), sqlquery);
if (!PQexec(master_conn, sqlquery))
res = PQexec(master_conn, sqlquery);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
log_err(_("Cannot delete node details, %s\n"),
PQerrorMessage(master_conn));
@@ -743,6 +748,7 @@ do_standby_register(void)
PQfinish(conn);
exit(ERR_BAD_CONFIG);
}
PQclear(res);
}
sqlquery_snprintf(sqlquery, "INSERT INTO %s.repl_nodes(id, cluster, name, conninfo, priority) "
@@ -1695,7 +1701,8 @@ do_witness_create(void)
repmgr_schema, options.node, options.cluster_name, options.node_name, options.conninfo, options.priority);
log_debug(_("witness create: %s"), sqlquery);
if (!PQexec(masterconn, sqlquery))
res = PQexec(masterconn, sqlquery);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
log_err(_("Cannot insert node details, %s\n"), PQerrorMessage(masterconn));
PQfinish(masterconn);
@@ -2068,16 +2075,49 @@ static bool
create_schema(PGconn *conn)
{
char sqlquery[QUERY_STR_LEN];
PGresult *res;
sqlquery_snprintf(sqlquery, "CREATE SCHEMA %s", repmgr_schema);
log_debug(_("master register: %s\n"), sqlquery);
if (!PQexec(conn, sqlquery))
res = PQexec(conn, sqlquery);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
log_err(_("Cannot create the schema %s: %s\n"),
repmgr_schema, PQerrorMessage(conn));
PQfinish(conn);
exit(ERR_BAD_CONFIG);
}
PQclear(res);
/* to avoid confusion of the time_lag field and provide a consistent UI we
* use these functions for providing the latest update timestamp */
sqlquery_snprintf(sqlquery,
"CREATE FUNCTION %s.repmgr_update_last_updated() RETURNS TIMESTAMP WITH TIME ZONE "
"AS '$libdir/repmgr_funcs', 'repmgr_update_last_updated' "
" LANGUAGE C STRICT", repmgr_schema);
res = PQexec(conn, sqlquery);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "Cannot create the function repmgr_update_last_updated: %s\n",
PQerrorMessage(conn));
return false;
}
PQclear(res);
sqlquery_snprintf(sqlquery,
"CREATE FUNCTION %s.repmgr_get_last_updated() RETURNS TIMESTAMP WITH TIME ZONE "
"AS '$libdir/repmgr_funcs', 'repmgr_get_last_updated' "
"LANGUAGE C STRICT", repmgr_schema);
res = PQexec(conn, sqlquery);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "Cannot create the function repmgr_get_last_updated: %s\n",
PQerrorMessage(conn));
return false;
}
PQclear(res);
/* ... the tables */
sqlquery_snprintf(sqlquery, "CREATE TABLE %s.repl_nodes ( "
@@ -2088,86 +2128,101 @@ create_schema(PGconn *conn)
" priority integer not null, "
" witness boolean not null default false)", repmgr_schema);
log_debug(_("master register: %s\n"), sqlquery);
if (!PQexec(conn, sqlquery))
res = PQexec(conn, sqlquery);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
log_err(_("Cannot create the table %s.repl_nodes: %s\n"),
repmgr_schema, PQerrorMessage(conn));
PQfinish(conn);
exit(ERR_BAD_CONFIG);
}
PQclear(res);
sqlquery_snprintf(sqlquery, "CREATE TABLE %s.repl_monitor ( "
" primary_node INTEGER NOT NULL, "
" standby_node INTEGER NOT NULL, "
" last_monitor_time TIMESTAMP WITH TIME ZONE NOT NULL, "
" last_apply_time TIMESTAMP WITH TIME ZONE, "
" last_wal_primary_location TEXT NOT NULL, "
" last_wal_standby_location TEXT, "
" replication_lag BIGINT NOT NULL, "
" apply_lag BIGINT NOT NULL) ", repmgr_schema);
log_debug(_("master register: %s\n"), sqlquery);
if (!PQexec(conn, sqlquery))
res = PQexec(conn, sqlquery);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
log_err(_("Cannot create the table %s.repl_monitor: %s\n"),
repmgr_schema, PQerrorMessage(conn));
PQfinish(conn);
exit(ERR_BAD_CONFIG);
}
PQclear(res);
/* a view */
sqlquery_snprintf(sqlquery, "CREATE VIEW %s.repl_status AS "
" SELECT primary_node, standby_node, name AS standby_name, last_monitor_time, "
" last_wal_primary_location, last_wal_standby_location, "
" pg_size_pretty(replication_lag) replication_lag, "
" age(now(), last_apply_time) AS replication_time_lag, "
" pg_size_pretty(apply_lag) apply_lag, "
" age(now(), last_monitor_time) AS time_lag "
" age(now(), CASE WHEN pg_is_in_recovery() THEN %s.repmgr_get_last_updated() ELSE last_monitor_time END) AS communication_time_lag "
" FROM %s.repl_monitor JOIN %s.repl_nodes ON standby_node = id "
" WHERE (standby_node, last_monitor_time) IN (SELECT standby_node, MAX(last_monitor_time) "
" FROM %s.repl_monitor GROUP BY 1)",
repmgr_schema, repmgr_schema, repmgr_schema, repmgr_schema);
repmgr_schema, repmgr_schema, repmgr_schema, repmgr_schema, repmgr_schema);
log_debug(_("master register: %s\n"), sqlquery);
if (!PQexec(conn, sqlquery))
res = PQexec(conn, sqlquery);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
log_err(_("Cannot create the view %s.repl_status: %s\n"),
repmgr_schema, PQerrorMessage(conn));
PQfinish(conn);
exit(ERR_BAD_CONFIG);
}
PQclear(res);
/* an index to improve performance of the view */
sqlquery_snprintf(sqlquery, "CREATE INDEX idx_repl_status_sort "
" ON %s.repl_monitor (last_monitor_time, standby_node) ",
repmgr_schema);
log_debug(_("master register: %s\n"), sqlquery);
if (!PQexec(conn, sqlquery))
res = PQexec(conn, sqlquery);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
log_err(_("Cannot indexing table %s.repl_monitor: %s\n"),
log_err(_("Can't index table %s.repl_monitor: %s\n"),
repmgr_schema, PQerrorMessage(conn));
PQfinish(conn);
exit(ERR_BAD_CONFIG);
}
PQclear(res);
/* XXX Here we MUST try to load the repmgr_function.sql not hardcode it here */
sqlquery_snprintf(sqlquery,
"CREATE OR REPLACE FUNCTION %s.repmgr_update_standby_location(text) RETURNS boolean "
"AS '$libdir/repmgr_funcs', 'repmgr_update_standby_location' "
"LANGUAGE C STRICT ", repmgr_schema);
if (!PQexec(conn, sqlquery))
res = PQexec(conn, sqlquery);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "Cannot create the function repmgr_update_standby_location: %s\n",
PQerrorMessage(conn));
return false;
}
PQclear(res);
sqlquery_snprintf(sqlquery,
"CREATE OR REPLACE FUNCTION %s.repmgr_get_last_standby_location() RETURNS text "
"AS '$libdir/repmgr_funcs', 'repmgr_get_last_standby_location' "
"LANGUAGE C STRICT ", repmgr_schema);
if (!PQexec(conn, sqlquery))
res = PQexec(conn, sqlquery);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "Cannot create the function repmgr_get_last_standby_location: %s\n",
PQerrorMessage(conn));
return false;
}
PQclear(res);
return true;
}
@@ -2182,7 +2237,8 @@ copy_configuration(PGconn *masterconn, PGconn *witnessconn)
sqlquery_snprintf(sqlquery, "TRUNCATE TABLE %s.repl_nodes", repmgr_schema);
log_debug("copy_configuration: %s\n", sqlquery);
if (!PQexec(witnessconn, sqlquery))
res = PQexec(witnessconn, sqlquery);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "Cannot clean node details in the witness, %s\n",
PQerrorMessage(witnessconn));
@@ -2208,7 +2264,8 @@ copy_configuration(PGconn *masterconn, PGconn *witnessconn)
atoi(PQgetvalue(res, i, 3)),
PQgetvalue(res, i, 4));
if (!PQexec(witnessconn, sqlquery))
res = PQexec(witnessconn, sqlquery);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "Cannot copy configuration to witness, %s\n",
PQerrorMessage(witnessconn));

View File

@@ -22,7 +22,7 @@ reconnect_attempts=6
reconnect_interval=10
# Autofailover options
failover=automatic
failover=manual
priority=-1
promote_command='repmgr standby promote -f /path/to/repmgr.conf'
follow_command='repmgr standby follow -f /path/to/repmgr.conf -W'

View File

@@ -144,17 +144,24 @@ static void terminate(int retval);
static void setup_event_handlers(void);
#endif
static void do_daemonize();
static void do_daemonize(void);
static void check_and_create_pid_file(const char *pid_file);
#define CloseConnections() \
if (PQisBusy(primaryConn) == 1) \
(void) CancelQuery(primaryConn, local_options.master_response_timeout); \
if (myLocalConn != NULL) \
PQfinish(myLocalConn); \
if (primaryConn != NULL && primaryConn != myLocalConn) \
static void
CloseConnections() {
if (primaryConn != NULL && PQisBusy(primaryConn) == 1)
CancelQuery(primaryConn, local_options.master_response_timeout);
if (myLocalConn != NULL)
PQfinish(myLocalConn);
if (primaryConn != NULL && primaryConn != myLocalConn)
PQfinish(primaryConn);
primaryConn = NULL;
myLocalConn = NULL;
}
int
main(int argc, char **argv)
@@ -431,10 +438,6 @@ main(int argc, char **argv)
} while (true);
/* Prevent a double-free */
if (primaryConn == myLocalConn)
myLocalConn = NULL;
/* close the connection to the database and cleanup */
CloseConnections();
@@ -530,6 +533,7 @@ StandbyMonitor(void)
char last_wal_primary_location[MAXLEN];
char last_wal_standby_received[MAXLEN];
char last_wal_standby_applied[MAXLEN];
char last_wal_standby_applied_timestamp[MAXLEN];
unsigned long long int lsn_primary;
unsigned long long int lsn_standby_received;
@@ -546,11 +550,15 @@ StandbyMonitor(void)
if (!CheckConnection(myLocalConn, "standby"))
{
log_err("Failed to connect to local node, exiting!\n");
terminate(1);
}
if (PQstatus(primaryConn) != CONNECTION_OK)
{
PQfinish(primaryConn);
primaryConn = NULL;
if (local_options.failover == MANUAL_FAILOVER)
{
log_err(_("We couldn't reconnect to master. Now checking if another node has been promoted.\n"));
@@ -637,7 +645,7 @@ StandbyMonitor(void)
sqlquery_snprintf(
sqlquery,
"SELECT CURRENT_TIMESTAMP, pg_last_xlog_receive_location(), "
"pg_last_xlog_replay_location()");
"pg_last_xlog_replay_location(), pg_last_xact_replay_timestamp()");
res = PQexec(myLocalConn, sqlquery);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
@@ -651,6 +659,7 @@ StandbyMonitor(void)
strncpy(monitor_standby_timestamp, PQgetvalue(res, 0, 0), MAXLEN);
strncpy(last_wal_standby_received , PQgetvalue(res, 0, 1), MAXLEN);
strncpy(last_wal_standby_applied , PQgetvalue(res, 0, 2), MAXLEN);
strncpy(last_wal_standby_applied_timestamp, PQgetvalue(res, 0, 3), MAXLEN);
PQclear(res);
/* Get primary xlog info */
@@ -678,9 +687,10 @@ StandbyMonitor(void)
sqlquery_snprintf(sqlquery,
"INSERT INTO %s.repl_monitor "
"VALUES(%d, %d, '%s'::timestamp with time zone, "
" '%s', '%s', "
" '%s'::timestamp with time zone, '%s', '%s', "
" %lld, %lld)", repmgr_schema,
primary_options.node, local_options.node, monitor_standby_timestamp,
last_wal_standby_applied_timestamp,
last_wal_primary_location,
last_wal_standby_received,
(lsn_primary - lsn_standby_received),
@@ -770,7 +780,12 @@ do_failover(void)
/* if we can't see the node just skip it */
if (PQstatus(nodeConn) != CONNECTION_OK)
{
if (nodeConn != NULL)
PQfinish(nodeConn);
continue;
}
visible_nodes++;
nodes[i].is_visible = true;
@@ -953,6 +968,7 @@ do_failover(void)
/* Close the connection to this server */
PQfinish(myLocalConn);
myLocalConn = NULL;
/*
* determine which one is the best candidate to promote to primary
@@ -1269,6 +1285,8 @@ terminate(int retval)
unlink(pid_file);
}
log_info("Terminating...\n");
exit(retval);
}

View File

@@ -15,6 +15,7 @@
#include "storage/shmem.h"
#include "storage/spin.h"
#include "utils/builtins.h"
#include "utils/timestamp.h"
/* same definition as the one in xlog_internal.h */
#define MAXFNAMELEN 64
@@ -28,6 +29,7 @@ typedef struct repmgrSharedState
{
LWLockId lock; /* protects search/modification */
char location[MAXFNAMELEN]; /* last known xlog location */
TimestampTz last_updated;
} repmgrSharedState;
/* Links to shared memory state */
@@ -49,6 +51,12 @@ Datum repmgr_get_last_standby_location(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(repmgr_update_standby_location);
PG_FUNCTION_INFO_V1(repmgr_get_last_standby_location);
Datum repmgr_update_last_updated(PG_FUNCTION_ARGS);
Datum repmgr_get_last_updated(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(repmgr_update_last_updated);
PG_FUNCTION_INFO_V1(repmgr_get_last_updated);
/*
* Module load callback
@@ -187,3 +195,38 @@ repmgr_update_standby_location(PG_FUNCTION_ARGS)
PG_RETURN_BOOL(repmgr_set_standby_location(locationstr));
}
/* update and return last updated with current timestamp */
Datum
repmgr_update_last_updated(PG_FUNCTION_ARGS)
{
TimestampTz last_updated = GetCurrentTimestamp();
/* Safety check... */
if (!shared_state)
PG_RETURN_NULL();
LWLockAcquire(shared_state->lock, LW_SHARED);
shared_state->last_updated = last_updated;
LWLockRelease(shared_state->lock);
PG_RETURN_TIMESTAMPTZ(last_updated);
}
/* get last updated timestamp */
Datum
repmgr_get_last_updated(PG_FUNCTION_ARGS)
{
TimestampTz last_updated;
/* Safety check... */
if (!shared_state)
PG_RETURN_NULL();
LWLockAcquire(shared_state->lock, LW_EXCLUSIVE);
last_updated = shared_state->last_updated;
LWLockRelease(shared_state->lock);
PG_RETURN_TIMESTAMPTZ(last_updated);
}

View File

@@ -1,6 +1,6 @@
/*
* repmgr_function.sql
* Copyright (c) 2ndQuadrant, 2010
* Copyright (c) 2ndQuadrant, 2010-2014
*
*/
@@ -13,3 +13,11 @@ LANGUAGE C STRICT;
CREATE FUNCTION repmgr_get_last_standby_location() RETURNS text
AS 'MODULE_PATHNAME', 'repmgr_get_last_standby_location'
LANGUAGE C STRICT;
CREATE FUNCTION repmgr_update_last_updated() RETURNS TIMESTAMP WITH TIME ZONE
AS 'MODULE_PATHNAME', 'repmgr_update_last_updated'
LANGUAGE C STRICT;
CREATE FUNCTION repmgr_get_last_updated() RETURNS TIMESTAMP WITH TIME ZONE
AS 'MODULE_PATHNAME', 'repmgr_get_last_updated'
LANGUAGE C STRICT;

View File

@@ -1,2 +1,11 @@
/*
* uninstall_repmgr_funcs.sql
* Copyright (c) 2ndQuadrant, 2010-2014
*
*/
DROP FUNCTION repmgr_update_standby_location(text);
DROP FUNCTION repmgr_get_last_standby_location();
DROP FUNCTION repmgr_update_last_updated();
DROP FUNCTION repmgr_get_last_updated();