mirror of
https://github.com/EnterpriseDB/repmgr.git
synced 2026-03-23 15:16:29 +00:00
Compare commits
15 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0a71123920 | ||
|
|
0ff14a2aa1 | ||
|
|
5215265694 | ||
|
|
e45ac25348 | ||
|
|
a1ce01f033 | ||
|
|
516cde621a | ||
|
|
f0807923a3 | ||
|
|
10ca8037f8 | ||
|
|
0dc46f0dc8 | ||
|
|
c3b58658ad | ||
|
|
18f1fed77f | ||
|
|
d58fd080ca | ||
|
|
c4ac2d3343 | ||
|
|
a72c2296e9 | ||
|
|
5ff1beeea7 |
4
config.c
4
config.c
@@ -191,7 +191,9 @@ trim (char *s)
|
|||||||
++s1;
|
++s1;
|
||||||
|
|
||||||
/* Copy finished string */
|
/* Copy finished string */
|
||||||
strcpy (s, s1);
|
memmove (s, s1, s2 - s1);
|
||||||
|
s[s2 - s1 + 1] = '\0';
|
||||||
|
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
55
dbutils.c
55
dbutils.c
@@ -18,6 +18,8 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
|
#include <time.h>
|
||||||
|
#include <sys/time.h>
|
||||||
|
|
||||||
#include "repmgr.h"
|
#include "repmgr.h"
|
||||||
#include "strutil.h"
|
#include "strutil.h"
|
||||||
@@ -426,34 +428,65 @@ getMasterConnection(PGconn *standby_conn, char *schema, char *cluster,
|
|||||||
* return 1 if Ok; 0 if any error ocurred; -1 if timeout reached
|
* return 1 if Ok; 0 if any error ocurred; -1 if timeout reached
|
||||||
*/
|
*/
|
||||||
int
|
int
|
||||||
wait_connection_availability(PGconn *conn, int timeout)
|
wait_connection_availability(PGconn *conn, long long timeout)
|
||||||
{
|
{
|
||||||
PGresult *res;
|
PGresult *res;
|
||||||
|
fd_set read_set;
|
||||||
|
int sock = PQsocket(conn);
|
||||||
|
struct timeval tmout, before, after;
|
||||||
|
struct timezone tz;
|
||||||
|
|
||||||
while(timeout-- >= 0)
|
/* recalc to microseconds */
|
||||||
|
timeout *= 1000000;
|
||||||
|
|
||||||
|
while (timeout > 0)
|
||||||
{
|
{
|
||||||
if (PQconsumeInput(conn) == 0)
|
if (PQconsumeInput(conn) == 0)
|
||||||
{
|
{
|
||||||
log_warning(_("wait_connection_availability: could not receive data from connection. %s\n"),
|
log_warning(_("wait_connection_availability: could not receive data from connection. %s\n"),
|
||||||
PQerrorMessage(conn));
|
PQerrorMessage(conn));
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (PQisBusy(conn) == 0)
|
if (PQisBusy(conn) == 0)
|
||||||
{
|
{
|
||||||
res = PQgetResult(conn);
|
do {
|
||||||
if (res == NULL)
|
res = PQgetResult(conn);
|
||||||
break;
|
PQclear(res);
|
||||||
PQclear(res);
|
} while(res != NULL);
|
||||||
|
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
sleep(1);
|
|
||||||
|
|
||||||
|
tmout.tv_sec = 0;
|
||||||
|
tmout.tv_usec = 250000;
|
||||||
|
|
||||||
|
FD_ZERO(&read_set);
|
||||||
|
FD_SET(sock, &read_set);
|
||||||
|
|
||||||
|
gettimeofday(&before, &tz);
|
||||||
|
if (select(sock, &read_set, NULL, NULL, &tmout) == -1)
|
||||||
|
{
|
||||||
|
log_warning(
|
||||||
|
_("wait_connection_availability: select() returned with error: %s"),
|
||||||
|
strerror(errno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
gettimeofday(&after, &tz);
|
||||||
|
|
||||||
|
timeout -= (after.tv_sec * 1000000 + after.tv_usec) -
|
||||||
|
(before.tv_sec * 1000000 + before.tv_usec);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
if (timeout >= 0)
|
if (timeout >= 0)
|
||||||
|
{
|
||||||
return 1;
|
return 1;
|
||||||
else {
|
|
||||||
log_warning(_("wait_connection_availability: timeout reached"));
|
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log_warning(_("wait_connection_availability: timeout reached"));
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -39,6 +39,6 @@ const char *get_cluster_size(PGconn *conn);
|
|||||||
PGconn *getMasterConnection(PGconn *standby_conn, char *schema, char *cluster,
|
PGconn *getMasterConnection(PGconn *standby_conn, char *schema, char *cluster,
|
||||||
int *master_id, char *master_conninfo_out);
|
int *master_id, char *master_conninfo_out);
|
||||||
|
|
||||||
int wait_connection_availability(PGconn *conn, int timeout);
|
int wait_connection_availability(PGconn *conn, long long timeout);
|
||||||
bool CancelQuery(PGconn *conn, int timeout);
|
bool CancelQuery(PGconn *conn, int timeout);
|
||||||
#endif
|
#endif
|
||||||
|
|||||||
2
log.c
2
log.c
@@ -58,6 +58,8 @@ void stderr_log_with_level(const char *level_name, int level, const char *fmt, .
|
|||||||
vfprintf(stderr, fmt1, ap);
|
vfprintf(stderr, fmt1, ap);
|
||||||
|
|
||||||
va_end(ap);
|
va_end(ap);
|
||||||
|
|
||||||
|
fflush(stderr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
89
repmgr.c
89
repmgr.c
@@ -575,13 +575,15 @@ do_master_register(void)
|
|||||||
repmgr_schema, options.node);
|
repmgr_schema, options.node);
|
||||||
log_debug(_("master register: %s\n"), sqlquery);
|
log_debug(_("master register: %s\n"), sqlquery);
|
||||||
|
|
||||||
if (!PQexec(conn, sqlquery))
|
res = PQexec(conn, sqlquery);
|
||||||
|
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||||
{
|
{
|
||||||
log_warning(_("Cannot delete node details, %s\n"),
|
log_warning(_("Cannot delete node details, %s\n"),
|
||||||
PQerrorMessage(conn));
|
PQerrorMessage(conn));
|
||||||
PQfinish(conn);
|
PQfinish(conn);
|
||||||
exit(ERR_BAD_CONFIG);
|
exit(ERR_BAD_CONFIG);
|
||||||
}
|
}
|
||||||
|
PQclear(res);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Ensure there isn't any other master already registered */
|
/* Ensure there isn't any other master already registered */
|
||||||
@@ -603,13 +605,15 @@ do_master_register(void)
|
|||||||
options.conninfo, options.priority);
|
options.conninfo, options.priority);
|
||||||
log_debug(_("master register: %s\n"), sqlquery);
|
log_debug(_("master register: %s\n"), sqlquery);
|
||||||
|
|
||||||
if (!PQexec(conn, sqlquery))
|
res = PQexec(conn, sqlquery);
|
||||||
|
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||||
{
|
{
|
||||||
log_warning(_("Cannot insert node details, %s\n"),
|
log_warning(_("Cannot insert node details, %s\n"),
|
||||||
PQerrorMessage(conn));
|
PQerrorMessage(conn));
|
||||||
PQfinish(conn);
|
PQfinish(conn);
|
||||||
exit(ERR_BAD_CONFIG);
|
exit(ERR_BAD_CONFIG);
|
||||||
}
|
}
|
||||||
|
PQclear(res);
|
||||||
|
|
||||||
PQfinish(conn);
|
PQfinish(conn);
|
||||||
log_notice(_("Master node correctly registered for cluster %s with id %d (conninfo: %s)\n"),
|
log_notice(_("Master node correctly registered for cluster %s with id %d (conninfo: %s)\n"),
|
||||||
@@ -735,7 +739,8 @@ do_standby_register(void)
|
|||||||
|
|
||||||
log_debug(_("standby register: %s\n"), sqlquery);
|
log_debug(_("standby register: %s\n"), sqlquery);
|
||||||
|
|
||||||
if (!PQexec(master_conn, sqlquery))
|
res = PQexec(master_conn, sqlquery);
|
||||||
|
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||||
{
|
{
|
||||||
log_err(_("Cannot delete node details, %s\n"),
|
log_err(_("Cannot delete node details, %s\n"),
|
||||||
PQerrorMessage(master_conn));
|
PQerrorMessage(master_conn));
|
||||||
@@ -743,6 +748,7 @@ do_standby_register(void)
|
|||||||
PQfinish(conn);
|
PQfinish(conn);
|
||||||
exit(ERR_BAD_CONFIG);
|
exit(ERR_BAD_CONFIG);
|
||||||
}
|
}
|
||||||
|
PQclear(res);
|
||||||
}
|
}
|
||||||
|
|
||||||
sqlquery_snprintf(sqlquery, "INSERT INTO %s.repl_nodes(id, cluster, name, conninfo, priority) "
|
sqlquery_snprintf(sqlquery, "INSERT INTO %s.repl_nodes(id, cluster, name, conninfo, priority) "
|
||||||
@@ -1695,7 +1701,8 @@ do_witness_create(void)
|
|||||||
repmgr_schema, options.node, options.cluster_name, options.node_name, options.conninfo, options.priority);
|
repmgr_schema, options.node, options.cluster_name, options.node_name, options.conninfo, options.priority);
|
||||||
|
|
||||||
log_debug(_("witness create: %s"), sqlquery);
|
log_debug(_("witness create: %s"), sqlquery);
|
||||||
if (!PQexec(masterconn, sqlquery))
|
res = PQexec(masterconn, sqlquery);
|
||||||
|
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||||
{
|
{
|
||||||
log_err(_("Cannot insert node details, %s\n"), PQerrorMessage(masterconn));
|
log_err(_("Cannot insert node details, %s\n"), PQerrorMessage(masterconn));
|
||||||
PQfinish(masterconn);
|
PQfinish(masterconn);
|
||||||
@@ -2068,16 +2075,49 @@ static bool
|
|||||||
create_schema(PGconn *conn)
|
create_schema(PGconn *conn)
|
||||||
{
|
{
|
||||||
char sqlquery[QUERY_STR_LEN];
|
char sqlquery[QUERY_STR_LEN];
|
||||||
|
PGresult *res;
|
||||||
|
|
||||||
sqlquery_snprintf(sqlquery, "CREATE SCHEMA %s", repmgr_schema);
|
sqlquery_snprintf(sqlquery, "CREATE SCHEMA %s", repmgr_schema);
|
||||||
log_debug(_("master register: %s\n"), sqlquery);
|
log_debug(_("master register: %s\n"), sqlquery);
|
||||||
if (!PQexec(conn, sqlquery))
|
res = PQexec(conn, sqlquery);
|
||||||
|
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||||
{
|
{
|
||||||
log_err(_("Cannot create the schema %s: %s\n"),
|
log_err(_("Cannot create the schema %s: %s\n"),
|
||||||
repmgr_schema, PQerrorMessage(conn));
|
repmgr_schema, PQerrorMessage(conn));
|
||||||
PQfinish(conn);
|
PQfinish(conn);
|
||||||
exit(ERR_BAD_CONFIG);
|
exit(ERR_BAD_CONFIG);
|
||||||
}
|
}
|
||||||
|
PQclear(res);
|
||||||
|
|
||||||
|
/* to avoid confusion of the time_lag field and provide a consistent UI we
|
||||||
|
* use these functions for providing the latest update timestamp */
|
||||||
|
sqlquery_snprintf(sqlquery,
|
||||||
|
"CREATE FUNCTION %s.repmgr_update_last_updated() RETURNS TIMESTAMP WITH TIME ZONE "
|
||||||
|
"AS '$libdir/repmgr_funcs', 'repmgr_update_last_updated' "
|
||||||
|
" LANGUAGE C STRICT", repmgr_schema);
|
||||||
|
res = PQexec(conn, sqlquery);
|
||||||
|
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||||
|
{
|
||||||
|
fprintf(stderr, "Cannot create the function repmgr_update_last_updated: %s\n",
|
||||||
|
PQerrorMessage(conn));
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
PQclear(res);
|
||||||
|
|
||||||
|
|
||||||
|
sqlquery_snprintf(sqlquery,
|
||||||
|
"CREATE FUNCTION %s.repmgr_get_last_updated() RETURNS TIMESTAMP WITH TIME ZONE "
|
||||||
|
"AS '$libdir/repmgr_funcs', 'repmgr_get_last_updated' "
|
||||||
|
"LANGUAGE C STRICT", repmgr_schema);
|
||||||
|
res = PQexec(conn, sqlquery);
|
||||||
|
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||||
|
{
|
||||||
|
fprintf(stderr, "Cannot create the function repmgr_get_last_updated: %s\n",
|
||||||
|
PQerrorMessage(conn));
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
PQclear(res);
|
||||||
|
|
||||||
|
|
||||||
/* ... the tables */
|
/* ... the tables */
|
||||||
sqlquery_snprintf(sqlquery, "CREATE TABLE %s.repl_nodes ( "
|
sqlquery_snprintf(sqlquery, "CREATE TABLE %s.repl_nodes ( "
|
||||||
@@ -2088,86 +2128,101 @@ create_schema(PGconn *conn)
|
|||||||
" priority integer not null, "
|
" priority integer not null, "
|
||||||
" witness boolean not null default false)", repmgr_schema);
|
" witness boolean not null default false)", repmgr_schema);
|
||||||
log_debug(_("master register: %s\n"), sqlquery);
|
log_debug(_("master register: %s\n"), sqlquery);
|
||||||
if (!PQexec(conn, sqlquery))
|
res = PQexec(conn, sqlquery);
|
||||||
|
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||||
{
|
{
|
||||||
log_err(_("Cannot create the table %s.repl_nodes: %s\n"),
|
log_err(_("Cannot create the table %s.repl_nodes: %s\n"),
|
||||||
repmgr_schema, PQerrorMessage(conn));
|
repmgr_schema, PQerrorMessage(conn));
|
||||||
PQfinish(conn);
|
PQfinish(conn);
|
||||||
exit(ERR_BAD_CONFIG);
|
exit(ERR_BAD_CONFIG);
|
||||||
}
|
}
|
||||||
|
PQclear(res);
|
||||||
|
|
||||||
sqlquery_snprintf(sqlquery, "CREATE TABLE %s.repl_monitor ( "
|
sqlquery_snprintf(sqlquery, "CREATE TABLE %s.repl_monitor ( "
|
||||||
" primary_node INTEGER NOT NULL, "
|
" primary_node INTEGER NOT NULL, "
|
||||||
" standby_node INTEGER NOT NULL, "
|
" standby_node INTEGER NOT NULL, "
|
||||||
" last_monitor_time TIMESTAMP WITH TIME ZONE NOT NULL, "
|
" last_monitor_time TIMESTAMP WITH TIME ZONE NOT NULL, "
|
||||||
|
" last_apply_time TIMESTAMP WITH TIME ZONE, "
|
||||||
" last_wal_primary_location TEXT NOT NULL, "
|
" last_wal_primary_location TEXT NOT NULL, "
|
||||||
" last_wal_standby_location TEXT, "
|
" last_wal_standby_location TEXT, "
|
||||||
" replication_lag BIGINT NOT NULL, "
|
" replication_lag BIGINT NOT NULL, "
|
||||||
" apply_lag BIGINT NOT NULL) ", repmgr_schema);
|
" apply_lag BIGINT NOT NULL) ", repmgr_schema);
|
||||||
log_debug(_("master register: %s\n"), sqlquery);
|
log_debug(_("master register: %s\n"), sqlquery);
|
||||||
if (!PQexec(conn, sqlquery))
|
res = PQexec(conn, sqlquery);
|
||||||
|
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||||
{
|
{
|
||||||
log_err(_("Cannot create the table %s.repl_monitor: %s\n"),
|
log_err(_("Cannot create the table %s.repl_monitor: %s\n"),
|
||||||
repmgr_schema, PQerrorMessage(conn));
|
repmgr_schema, PQerrorMessage(conn));
|
||||||
PQfinish(conn);
|
PQfinish(conn);
|
||||||
exit(ERR_BAD_CONFIG);
|
exit(ERR_BAD_CONFIG);
|
||||||
}
|
}
|
||||||
|
PQclear(res);
|
||||||
|
|
||||||
/* a view */
|
/* a view */
|
||||||
sqlquery_snprintf(sqlquery, "CREATE VIEW %s.repl_status AS "
|
sqlquery_snprintf(sqlquery, "CREATE VIEW %s.repl_status AS "
|
||||||
" SELECT primary_node, standby_node, name AS standby_name, last_monitor_time, "
|
" SELECT primary_node, standby_node, name AS standby_name, last_monitor_time, "
|
||||||
" last_wal_primary_location, last_wal_standby_location, "
|
" last_wal_primary_location, last_wal_standby_location, "
|
||||||
" pg_size_pretty(replication_lag) replication_lag, "
|
" pg_size_pretty(replication_lag) replication_lag, "
|
||||||
|
" age(now(), last_apply_time) AS replication_time_lag, "
|
||||||
" pg_size_pretty(apply_lag) apply_lag, "
|
" pg_size_pretty(apply_lag) apply_lag, "
|
||||||
" age(now(), last_monitor_time) AS time_lag "
|
" age(now(), CASE WHEN pg_is_in_recovery() THEN %s.repmgr_get_last_updated() ELSE last_monitor_time END) AS communication_time_lag "
|
||||||
" FROM %s.repl_monitor JOIN %s.repl_nodes ON standby_node = id "
|
" FROM %s.repl_monitor JOIN %s.repl_nodes ON standby_node = id "
|
||||||
" WHERE (standby_node, last_monitor_time) IN (SELECT standby_node, MAX(last_monitor_time) "
|
" WHERE (standby_node, last_monitor_time) IN (SELECT standby_node, MAX(last_monitor_time) "
|
||||||
" FROM %s.repl_monitor GROUP BY 1)",
|
" FROM %s.repl_monitor GROUP BY 1)",
|
||||||
repmgr_schema, repmgr_schema, repmgr_schema, repmgr_schema);
|
repmgr_schema, repmgr_schema, repmgr_schema, repmgr_schema, repmgr_schema);
|
||||||
log_debug(_("master register: %s\n"), sqlquery);
|
log_debug(_("master register: %s\n"), sqlquery);
|
||||||
if (!PQexec(conn, sqlquery))
|
|
||||||
|
res = PQexec(conn, sqlquery);
|
||||||
|
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||||
{
|
{
|
||||||
log_err(_("Cannot create the view %s.repl_status: %s\n"),
|
log_err(_("Cannot create the view %s.repl_status: %s\n"),
|
||||||
repmgr_schema, PQerrorMessage(conn));
|
repmgr_schema, PQerrorMessage(conn));
|
||||||
PQfinish(conn);
|
PQfinish(conn);
|
||||||
exit(ERR_BAD_CONFIG);
|
exit(ERR_BAD_CONFIG);
|
||||||
}
|
}
|
||||||
|
PQclear(res);
|
||||||
|
|
||||||
/* an index to improve performance of the view */
|
/* an index to improve performance of the view */
|
||||||
sqlquery_snprintf(sqlquery, "CREATE INDEX idx_repl_status_sort "
|
sqlquery_snprintf(sqlquery, "CREATE INDEX idx_repl_status_sort "
|
||||||
" ON %s.repl_monitor (last_monitor_time, standby_node) ",
|
" ON %s.repl_monitor (last_monitor_time, standby_node) ",
|
||||||
repmgr_schema);
|
repmgr_schema);
|
||||||
log_debug(_("master register: %s\n"), sqlquery);
|
log_debug(_("master register: %s\n"), sqlquery);
|
||||||
if (!PQexec(conn, sqlquery))
|
res = PQexec(conn, sqlquery);
|
||||||
|
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||||
{
|
{
|
||||||
log_err(_("Cannot indexing table %s.repl_monitor: %s\n"),
|
log_err(_("Can't index table %s.repl_monitor: %s\n"),
|
||||||
repmgr_schema, PQerrorMessage(conn));
|
repmgr_schema, PQerrorMessage(conn));
|
||||||
PQfinish(conn);
|
PQfinish(conn);
|
||||||
exit(ERR_BAD_CONFIG);
|
exit(ERR_BAD_CONFIG);
|
||||||
}
|
}
|
||||||
|
PQclear(res);
|
||||||
|
|
||||||
/* XXX Here we MUST try to load the repmgr_function.sql not hardcode it here */
|
/* XXX Here we MUST try to load the repmgr_function.sql not hardcode it here */
|
||||||
sqlquery_snprintf(sqlquery,
|
sqlquery_snprintf(sqlquery,
|
||||||
"CREATE OR REPLACE FUNCTION %s.repmgr_update_standby_location(text) RETURNS boolean "
|
"CREATE OR REPLACE FUNCTION %s.repmgr_update_standby_location(text) RETURNS boolean "
|
||||||
"AS '$libdir/repmgr_funcs', 'repmgr_update_standby_location' "
|
"AS '$libdir/repmgr_funcs', 'repmgr_update_standby_location' "
|
||||||
"LANGUAGE C STRICT ", repmgr_schema);
|
"LANGUAGE C STRICT ", repmgr_schema);
|
||||||
if (!PQexec(conn, sqlquery))
|
res = PQexec(conn, sqlquery);
|
||||||
|
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||||
{
|
{
|
||||||
fprintf(stderr, "Cannot create the function repmgr_update_standby_location: %s\n",
|
fprintf(stderr, "Cannot create the function repmgr_update_standby_location: %s\n",
|
||||||
PQerrorMessage(conn));
|
PQerrorMessage(conn));
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
PQclear(res);
|
||||||
|
|
||||||
sqlquery_snprintf(sqlquery,
|
sqlquery_snprintf(sqlquery,
|
||||||
"CREATE OR REPLACE FUNCTION %s.repmgr_get_last_standby_location() RETURNS text "
|
"CREATE OR REPLACE FUNCTION %s.repmgr_get_last_standby_location() RETURNS text "
|
||||||
"AS '$libdir/repmgr_funcs', 'repmgr_get_last_standby_location' "
|
"AS '$libdir/repmgr_funcs', 'repmgr_get_last_standby_location' "
|
||||||
"LANGUAGE C STRICT ", repmgr_schema);
|
"LANGUAGE C STRICT ", repmgr_schema);
|
||||||
if (!PQexec(conn, sqlquery))
|
res = PQexec(conn, sqlquery);
|
||||||
|
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||||
{
|
{
|
||||||
fprintf(stderr, "Cannot create the function repmgr_get_last_standby_location: %s\n",
|
fprintf(stderr, "Cannot create the function repmgr_get_last_standby_location: %s\n",
|
||||||
PQerrorMessage(conn));
|
PQerrorMessage(conn));
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
PQclear(res);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@@ -2182,7 +2237,8 @@ copy_configuration(PGconn *masterconn, PGconn *witnessconn)
|
|||||||
|
|
||||||
sqlquery_snprintf(sqlquery, "TRUNCATE TABLE %s.repl_nodes", repmgr_schema);
|
sqlquery_snprintf(sqlquery, "TRUNCATE TABLE %s.repl_nodes", repmgr_schema);
|
||||||
log_debug("copy_configuration: %s\n", sqlquery);
|
log_debug("copy_configuration: %s\n", sqlquery);
|
||||||
if (!PQexec(witnessconn, sqlquery))
|
res = PQexec(witnessconn, sqlquery);
|
||||||
|
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||||
{
|
{
|
||||||
fprintf(stderr, "Cannot clean node details in the witness, %s\n",
|
fprintf(stderr, "Cannot clean node details in the witness, %s\n",
|
||||||
PQerrorMessage(witnessconn));
|
PQerrorMessage(witnessconn));
|
||||||
@@ -2208,7 +2264,8 @@ copy_configuration(PGconn *masterconn, PGconn *witnessconn)
|
|||||||
atoi(PQgetvalue(res, i, 3)),
|
atoi(PQgetvalue(res, i, 3)),
|
||||||
PQgetvalue(res, i, 4));
|
PQgetvalue(res, i, 4));
|
||||||
|
|
||||||
if (!PQexec(witnessconn, sqlquery))
|
res = PQexec(witnessconn, sqlquery);
|
||||||
|
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||||
{
|
{
|
||||||
fprintf(stderr, "Cannot copy configuration to witness, %s\n",
|
fprintf(stderr, "Cannot copy configuration to witness, %s\n",
|
||||||
PQerrorMessage(witnessconn));
|
PQerrorMessage(witnessconn));
|
||||||
|
|||||||
@@ -22,7 +22,7 @@ reconnect_attempts=6
|
|||||||
reconnect_interval=10
|
reconnect_interval=10
|
||||||
|
|
||||||
# Autofailover options
|
# Autofailover options
|
||||||
failover=automatic
|
failover=manual
|
||||||
priority=-1
|
priority=-1
|
||||||
promote_command='repmgr standby promote -f /path/to/repmgr.conf'
|
promote_command='repmgr standby promote -f /path/to/repmgr.conf'
|
||||||
follow_command='repmgr standby follow -f /path/to/repmgr.conf -W'
|
follow_command='repmgr standby follow -f /path/to/repmgr.conf -W'
|
||||||
|
|||||||
44
repmgrd.c
44
repmgrd.c
@@ -144,17 +144,24 @@ static void terminate(int retval);
|
|||||||
static void setup_event_handlers(void);
|
static void setup_event_handlers(void);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
static void do_daemonize();
|
static void do_daemonize(void);
|
||||||
static void check_and_create_pid_file(const char *pid_file);
|
static void check_and_create_pid_file(const char *pid_file);
|
||||||
|
|
||||||
#define CloseConnections() \
|
static void
|
||||||
if (PQisBusy(primaryConn) == 1) \
|
CloseConnections() {
|
||||||
(void) CancelQuery(primaryConn, local_options.master_response_timeout); \
|
if (primaryConn != NULL && PQisBusy(primaryConn) == 1)
|
||||||
if (myLocalConn != NULL) \
|
CancelQuery(primaryConn, local_options.master_response_timeout);
|
||||||
PQfinish(myLocalConn); \
|
|
||||||
if (primaryConn != NULL && primaryConn != myLocalConn) \
|
if (myLocalConn != NULL)
|
||||||
|
PQfinish(myLocalConn);
|
||||||
|
|
||||||
|
if (primaryConn != NULL && primaryConn != myLocalConn)
|
||||||
PQfinish(primaryConn);
|
PQfinish(primaryConn);
|
||||||
|
|
||||||
|
primaryConn = NULL;
|
||||||
|
myLocalConn = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int
|
int
|
||||||
main(int argc, char **argv)
|
main(int argc, char **argv)
|
||||||
@@ -431,10 +438,6 @@ main(int argc, char **argv)
|
|||||||
|
|
||||||
} while (true);
|
} while (true);
|
||||||
|
|
||||||
/* Prevent a double-free */
|
|
||||||
if (primaryConn == myLocalConn)
|
|
||||||
myLocalConn = NULL;
|
|
||||||
|
|
||||||
/* close the connection to the database and cleanup */
|
/* close the connection to the database and cleanup */
|
||||||
CloseConnections();
|
CloseConnections();
|
||||||
|
|
||||||
@@ -530,6 +533,7 @@ StandbyMonitor(void)
|
|||||||
char last_wal_primary_location[MAXLEN];
|
char last_wal_primary_location[MAXLEN];
|
||||||
char last_wal_standby_received[MAXLEN];
|
char last_wal_standby_received[MAXLEN];
|
||||||
char last_wal_standby_applied[MAXLEN];
|
char last_wal_standby_applied[MAXLEN];
|
||||||
|
char last_wal_standby_applied_timestamp[MAXLEN];
|
||||||
|
|
||||||
unsigned long long int lsn_primary;
|
unsigned long long int lsn_primary;
|
||||||
unsigned long long int lsn_standby_received;
|
unsigned long long int lsn_standby_received;
|
||||||
@@ -546,11 +550,15 @@ StandbyMonitor(void)
|
|||||||
|
|
||||||
if (!CheckConnection(myLocalConn, "standby"))
|
if (!CheckConnection(myLocalConn, "standby"))
|
||||||
{
|
{
|
||||||
|
log_err("Failed to connect to local node, exiting!\n");
|
||||||
terminate(1);
|
terminate(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (PQstatus(primaryConn) != CONNECTION_OK)
|
if (PQstatus(primaryConn) != CONNECTION_OK)
|
||||||
{
|
{
|
||||||
|
PQfinish(primaryConn);
|
||||||
|
primaryConn = NULL;
|
||||||
|
|
||||||
if (local_options.failover == MANUAL_FAILOVER)
|
if (local_options.failover == MANUAL_FAILOVER)
|
||||||
{
|
{
|
||||||
log_err(_("We couldn't reconnect to master. Now checking if another node has been promoted.\n"));
|
log_err(_("We couldn't reconnect to master. Now checking if another node has been promoted.\n"));
|
||||||
@@ -637,7 +645,7 @@ StandbyMonitor(void)
|
|||||||
sqlquery_snprintf(
|
sqlquery_snprintf(
|
||||||
sqlquery,
|
sqlquery,
|
||||||
"SELECT CURRENT_TIMESTAMP, pg_last_xlog_receive_location(), "
|
"SELECT CURRENT_TIMESTAMP, pg_last_xlog_receive_location(), "
|
||||||
"pg_last_xlog_replay_location()");
|
"pg_last_xlog_replay_location(), pg_last_xact_replay_timestamp()");
|
||||||
|
|
||||||
res = PQexec(myLocalConn, sqlquery);
|
res = PQexec(myLocalConn, sqlquery);
|
||||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||||
@@ -651,6 +659,7 @@ StandbyMonitor(void)
|
|||||||
strncpy(monitor_standby_timestamp, PQgetvalue(res, 0, 0), MAXLEN);
|
strncpy(monitor_standby_timestamp, PQgetvalue(res, 0, 0), MAXLEN);
|
||||||
strncpy(last_wal_standby_received , PQgetvalue(res, 0, 1), MAXLEN);
|
strncpy(last_wal_standby_received , PQgetvalue(res, 0, 1), MAXLEN);
|
||||||
strncpy(last_wal_standby_applied , PQgetvalue(res, 0, 2), MAXLEN);
|
strncpy(last_wal_standby_applied , PQgetvalue(res, 0, 2), MAXLEN);
|
||||||
|
strncpy(last_wal_standby_applied_timestamp, PQgetvalue(res, 0, 3), MAXLEN);
|
||||||
PQclear(res);
|
PQclear(res);
|
||||||
|
|
||||||
/* Get primary xlog info */
|
/* Get primary xlog info */
|
||||||
@@ -678,9 +687,10 @@ StandbyMonitor(void)
|
|||||||
sqlquery_snprintf(sqlquery,
|
sqlquery_snprintf(sqlquery,
|
||||||
"INSERT INTO %s.repl_monitor "
|
"INSERT INTO %s.repl_monitor "
|
||||||
"VALUES(%d, %d, '%s'::timestamp with time zone, "
|
"VALUES(%d, %d, '%s'::timestamp with time zone, "
|
||||||
" '%s', '%s', "
|
" '%s'::timestamp with time zone, '%s', '%s', "
|
||||||
" %lld, %lld)", repmgr_schema,
|
" %lld, %lld)", repmgr_schema,
|
||||||
primary_options.node, local_options.node, monitor_standby_timestamp,
|
primary_options.node, local_options.node, monitor_standby_timestamp,
|
||||||
|
last_wal_standby_applied_timestamp,
|
||||||
last_wal_primary_location,
|
last_wal_primary_location,
|
||||||
last_wal_standby_received,
|
last_wal_standby_received,
|
||||||
(lsn_primary - lsn_standby_received),
|
(lsn_primary - lsn_standby_received),
|
||||||
@@ -770,7 +780,12 @@ do_failover(void)
|
|||||||
|
|
||||||
/* if we can't see the node just skip it */
|
/* if we can't see the node just skip it */
|
||||||
if (PQstatus(nodeConn) != CONNECTION_OK)
|
if (PQstatus(nodeConn) != CONNECTION_OK)
|
||||||
|
{
|
||||||
|
if (nodeConn != NULL)
|
||||||
|
PQfinish(nodeConn);
|
||||||
|
|
||||||
continue;
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
visible_nodes++;
|
visible_nodes++;
|
||||||
nodes[i].is_visible = true;
|
nodes[i].is_visible = true;
|
||||||
@@ -953,6 +968,7 @@ do_failover(void)
|
|||||||
|
|
||||||
/* Close the connection to this server */
|
/* Close the connection to this server */
|
||||||
PQfinish(myLocalConn);
|
PQfinish(myLocalConn);
|
||||||
|
myLocalConn = NULL;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* determine which one is the best candidate to promote to primary
|
* determine which one is the best candidate to promote to primary
|
||||||
@@ -1269,6 +1285,8 @@ terminate(int retval)
|
|||||||
unlink(pid_file);
|
unlink(pid_file);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log_info("Terminating...\n");
|
||||||
|
|
||||||
exit(retval);
|
exit(retval);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -15,6 +15,7 @@
|
|||||||
#include "storage/shmem.h"
|
#include "storage/shmem.h"
|
||||||
#include "storage/spin.h"
|
#include "storage/spin.h"
|
||||||
#include "utils/builtins.h"
|
#include "utils/builtins.h"
|
||||||
|
#include "utils/timestamp.h"
|
||||||
|
|
||||||
/* same definition as the one in xlog_internal.h */
|
/* same definition as the one in xlog_internal.h */
|
||||||
#define MAXFNAMELEN 64
|
#define MAXFNAMELEN 64
|
||||||
@@ -28,6 +29,7 @@ typedef struct repmgrSharedState
|
|||||||
{
|
{
|
||||||
LWLockId lock; /* protects search/modification */
|
LWLockId lock; /* protects search/modification */
|
||||||
char location[MAXFNAMELEN]; /* last known xlog location */
|
char location[MAXFNAMELEN]; /* last known xlog location */
|
||||||
|
TimestampTz last_updated;
|
||||||
} repmgrSharedState;
|
} repmgrSharedState;
|
||||||
|
|
||||||
/* Links to shared memory state */
|
/* Links to shared memory state */
|
||||||
@@ -49,6 +51,12 @@ Datum repmgr_get_last_standby_location(PG_FUNCTION_ARGS);
|
|||||||
PG_FUNCTION_INFO_V1(repmgr_update_standby_location);
|
PG_FUNCTION_INFO_V1(repmgr_update_standby_location);
|
||||||
PG_FUNCTION_INFO_V1(repmgr_get_last_standby_location);
|
PG_FUNCTION_INFO_V1(repmgr_get_last_standby_location);
|
||||||
|
|
||||||
|
Datum repmgr_update_last_updated(PG_FUNCTION_ARGS);
|
||||||
|
Datum repmgr_get_last_updated(PG_FUNCTION_ARGS);
|
||||||
|
|
||||||
|
PG_FUNCTION_INFO_V1(repmgr_update_last_updated);
|
||||||
|
PG_FUNCTION_INFO_V1(repmgr_get_last_updated);
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Module load callback
|
* Module load callback
|
||||||
@@ -187,3 +195,38 @@ repmgr_update_standby_location(PG_FUNCTION_ARGS)
|
|||||||
|
|
||||||
PG_RETURN_BOOL(repmgr_set_standby_location(locationstr));
|
PG_RETURN_BOOL(repmgr_set_standby_location(locationstr));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* update and return last updated with current timestamp */
|
||||||
|
Datum
|
||||||
|
repmgr_update_last_updated(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
TimestampTz last_updated = GetCurrentTimestamp();
|
||||||
|
|
||||||
|
/* Safety check... */
|
||||||
|
if (!shared_state)
|
||||||
|
PG_RETURN_NULL();
|
||||||
|
|
||||||
|
LWLockAcquire(shared_state->lock, LW_SHARED);
|
||||||
|
shared_state->last_updated = last_updated;
|
||||||
|
LWLockRelease(shared_state->lock);
|
||||||
|
|
||||||
|
PG_RETURN_TIMESTAMPTZ(last_updated);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* get last updated timestamp */
|
||||||
|
Datum
|
||||||
|
repmgr_get_last_updated(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
TimestampTz last_updated;
|
||||||
|
|
||||||
|
/* Safety check... */
|
||||||
|
if (!shared_state)
|
||||||
|
PG_RETURN_NULL();
|
||||||
|
|
||||||
|
LWLockAcquire(shared_state->lock, LW_EXCLUSIVE);
|
||||||
|
last_updated = shared_state->last_updated;
|
||||||
|
LWLockRelease(shared_state->lock);
|
||||||
|
|
||||||
|
PG_RETURN_TIMESTAMPTZ(last_updated);
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
/*
|
/*
|
||||||
* repmgr_function.sql
|
* repmgr_function.sql
|
||||||
* Copyright (c) 2ndQuadrant, 2010
|
* Copyright (c) 2ndQuadrant, 2010-2014
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
|
|
||||||
@@ -13,3 +13,11 @@ LANGUAGE C STRICT;
|
|||||||
CREATE FUNCTION repmgr_get_last_standby_location() RETURNS text
|
CREATE FUNCTION repmgr_get_last_standby_location() RETURNS text
|
||||||
AS 'MODULE_PATHNAME', 'repmgr_get_last_standby_location'
|
AS 'MODULE_PATHNAME', 'repmgr_get_last_standby_location'
|
||||||
LANGUAGE C STRICT;
|
LANGUAGE C STRICT;
|
||||||
|
|
||||||
|
CREATE FUNCTION repmgr_update_last_updated() RETURNS TIMESTAMP WITH TIME ZONE
|
||||||
|
AS 'MODULE_PATHNAME', 'repmgr_update_last_updated'
|
||||||
|
LANGUAGE C STRICT;
|
||||||
|
|
||||||
|
CREATE FUNCTION repmgr_get_last_updated() RETURNS TIMESTAMP WITH TIME ZONE
|
||||||
|
AS 'MODULE_PATHNAME', 'repmgr_get_last_updated'
|
||||||
|
LANGUAGE C STRICT;
|
||||||
|
|||||||
@@ -1,2 +1,11 @@
|
|||||||
|
/*
|
||||||
|
* uninstall_repmgr_funcs.sql
|
||||||
|
* Copyright (c) 2ndQuadrant, 2010-2014
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
DROP FUNCTION repmgr_update_standby_location(text);
|
DROP FUNCTION repmgr_update_standby_location(text);
|
||||||
DROP FUNCTION repmgr_get_last_standby_location();
|
DROP FUNCTION repmgr_get_last_standby_location();
|
||||||
|
|
||||||
|
DROP FUNCTION repmgr_update_last_updated();
|
||||||
|
DROP FUNCTION repmgr_get_last_updated();
|
||||||
|
|||||||
Reference in New Issue
Block a user