mirror of
https://github.com/EnterpriseDB/repmgr.git
synced 2026-03-27 17:06:29 +00:00
Add a master_response_timeout parameter and use it to limit the amount
of time we spent a reponse from master before declaring the failure. Also, change is_pgup() so it use PQsendQuery() instead of PQexec to execute the check of master
This commit is contained in:
10
config.c
10
config.c
@@ -41,6 +41,7 @@ parse_config(const char *config_file, t_configuration_options *options)
|
|||||||
memset(options->promote_command, 0, sizeof(options->promote_command));
|
memset(options->promote_command, 0, sizeof(options->promote_command));
|
||||||
memset(options->follow_command, 0, sizeof(options->follow_command));
|
memset(options->follow_command, 0, sizeof(options->follow_command));
|
||||||
memset(options->rsync_options, 0, sizeof(options->rsync_options));
|
memset(options->rsync_options, 0, sizeof(options->rsync_options));
|
||||||
|
options->master_response_timeout = 0;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Since some commands don't require a config file at all, not
|
* Since some commands don't require a config file at all, not
|
||||||
@@ -98,6 +99,8 @@ parse_config(const char *config_file, t_configuration_options *options)
|
|||||||
strncpy(options->promote_command, value, MAXLEN);
|
strncpy(options->promote_command, value, MAXLEN);
|
||||||
else if (strcmp(name, "follow_command") == 0)
|
else if (strcmp(name, "follow_command") == 0)
|
||||||
strncpy(options->follow_command, value, MAXLEN);
|
strncpy(options->follow_command, value, MAXLEN);
|
||||||
|
else if (strcmp(name, "master_response_timeout") == 0)
|
||||||
|
options->master_response_timeout = atoi(value);
|
||||||
else
|
else
|
||||||
log_warning(_("%s/%s: Unknown name/value pair!\n"), name, value);
|
log_warning(_("%s/%s: Unknown name/value pair!\n"), name, value);
|
||||||
}
|
}
|
||||||
@@ -215,6 +218,12 @@ reload_configuration(char *config_file, t_configuration_options *orig_options)
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (new_options.master_response_timeout <= 0)
|
||||||
|
{
|
||||||
|
log_warning(_("\nNew value for master_response_timeout is not valid. Should be greater than zero.\n"));
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
/* Test conninfo string */
|
/* Test conninfo string */
|
||||||
conn = establishDBConnection(new_options.conninfo, false);
|
conn = establishDBConnection(new_options.conninfo, false);
|
||||||
if (!conn || (PQstatus(conn) != CONNECTION_OK))
|
if (!conn || (PQstatus(conn) != CONNECTION_OK))
|
||||||
@@ -234,6 +243,7 @@ reload_configuration(char *config_file, t_configuration_options *orig_options)
|
|||||||
strcpy(orig_options->promote_command, new_options.promote_command);
|
strcpy(orig_options->promote_command, new_options.promote_command);
|
||||||
strcpy(orig_options->follow_command, new_options.follow_command);
|
strcpy(orig_options->follow_command, new_options.follow_command);
|
||||||
strcpy(orig_options->rsync_options, new_options.rsync_options);
|
strcpy(orig_options->rsync_options, new_options.rsync_options);
|
||||||
|
orig_options->master_response_timeout = new_options.master_response_timeout;
|
||||||
/*
|
/*
|
||||||
* XXX These ones can change with a simple SIGHUP?
|
* XXX These ones can change with a simple SIGHUP?
|
||||||
|
|
||||||
|
|||||||
1
config.h
1
config.h
@@ -36,6 +36,7 @@ typedef struct
|
|||||||
char loglevel[MAXLEN];
|
char loglevel[MAXLEN];
|
||||||
char logfacility[MAXLEN];
|
char logfacility[MAXLEN];
|
||||||
char rsync_options[QUERY_STR_LEN];
|
char rsync_options[QUERY_STR_LEN];
|
||||||
|
int master_response_timeout;
|
||||||
} t_configuration_options;
|
} t_configuration_options;
|
||||||
|
|
||||||
void parse_config(const char *config_file, t_configuration_options *options);
|
void parse_config(const char *config_file, t_configuration_options *options);
|
||||||
|
|||||||
89
dbutils.c
89
dbutils.c
@@ -17,6 +17,8 @@
|
|||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
#include "repmgr.h"
|
#include "repmgr.h"
|
||||||
#include "strutil.h"
|
#include "strutil.h"
|
||||||
#include "log.h"
|
#include "log.h"
|
||||||
@@ -123,9 +125,8 @@ is_witness(PGconn *conn, char *schema, char *cluster, int node_id)
|
|||||||
|
|
||||||
/* check the PQStatus and try to 'select 1' to confirm good connection */
|
/* check the PQStatus and try to 'select 1' to confirm good connection */
|
||||||
bool
|
bool
|
||||||
is_pgup(PGconn *conn)
|
is_pgup(PGconn *conn, int timeout)
|
||||||
{
|
{
|
||||||
PGresult *res;
|
|
||||||
char sqlquery[QUERY_STR_LEN];
|
char sqlquery[QUERY_STR_LEN];
|
||||||
/* Check the connection status twice in case it changes after reset */
|
/* Check the connection status twice in case it changes after reset */
|
||||||
bool twice = false;
|
bool twice = false;
|
||||||
@@ -143,29 +144,31 @@ is_pgup(PGconn *conn)
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* Send a SELECT 1 just to check if connection is OK
|
* Send a SELECT 1 just to check if the connection is OK
|
||||||
* the PQstatus() won't catch disconnected connection
|
|
||||||
* XXXX
|
|
||||||
* the error message can be used by repmgrd
|
|
||||||
*/
|
*/
|
||||||
|
CancelQuery(conn);
|
||||||
|
if (wait_connection_availability(conn, timeout) != 1)
|
||||||
|
goto failed;
|
||||||
|
|
||||||
sqlquery_snprintf(sqlquery, "SELECT 1");
|
sqlquery_snprintf(sqlquery, "SELECT 1");
|
||||||
res = PQexec(conn, sqlquery);
|
if (PQsendQuery(conn, sqlquery) == 0)
|
||||||
// we need to retry, because we might just have loose the connection once
|
|
||||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
||||||
{
|
{
|
||||||
log_err(_("PQexec failed: %s\n"), PQerrorMessage(conn));
|
log_warning(_("PQsendQuery: Query could not be sent to primary. %s\n"),
|
||||||
PQclear(res);
|
PQerrorMessage(conn));
|
||||||
|
goto failed;
|
||||||
|
}
|
||||||
|
if (wait_connection_availability(conn, timeout) != 1)
|
||||||
|
goto failed;
|
||||||
|
|
||||||
|
continue;
|
||||||
|
|
||||||
|
failed:
|
||||||
|
// we need to retry, because we might just have loose the connection once
|
||||||
if (twice)
|
if (twice)
|
||||||
return false;
|
return false;
|
||||||
PQreset(conn); // reconnect
|
PQreset(conn); // reconnect
|
||||||
twice = true;
|
twice = true;
|
||||||
}
|
}
|
||||||
else
|
|
||||||
{
|
|
||||||
PQclear(res);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -388,3 +391,55 @@ getMasterConnection(PGconn *standby_conn, char *schema, int id, char *cluster,
|
|||||||
PQclear(res1);
|
PQclear(res1);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* wait until current query finishes ignoring any results, this could be an async command
|
||||||
|
* or a cancelation of a query
|
||||||
|
* return 1 if Ok; 0 if any error ocurred; -1 if timeout reached
|
||||||
|
*/
|
||||||
|
int
|
||||||
|
wait_connection_availability(PGconn *conn, int timeout)
|
||||||
|
{
|
||||||
|
PGresult *res;
|
||||||
|
|
||||||
|
while(timeout-- >= 0)
|
||||||
|
{
|
||||||
|
if (PQconsumeInput(conn) == 0)
|
||||||
|
{
|
||||||
|
log_warning(_("PQconsumeInput: Query could not be sent to primary. %s\n"),
|
||||||
|
PQerrorMessage(conn));
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (PQisBusy(conn) == 0)
|
||||||
|
{
|
||||||
|
res = PQgetResult(conn);
|
||||||
|
if (res == NULL)
|
||||||
|
break;
|
||||||
|
PQclear(res);
|
||||||
|
}
|
||||||
|
sleep(1);
|
||||||
|
}
|
||||||
|
if (timeout >= 0)
|
||||||
|
return 1;
|
||||||
|
else
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void
|
||||||
|
CancelQuery(PGconn *conn)
|
||||||
|
{
|
||||||
|
char errbuf[ERRBUFF_SIZE];
|
||||||
|
PGcancel *pgcancel;
|
||||||
|
|
||||||
|
pgcancel = PQgetCancel(conn);
|
||||||
|
|
||||||
|
if (!pgcancel || PQcancel(pgcancel, errbuf, ERRBUFF_SIZE) == 0)
|
||||||
|
log_warning(_("Can't stop current query: %s\n"), errbuf);
|
||||||
|
|
||||||
|
PQfreeCancel(pgcancel);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -28,7 +28,7 @@ PGconn *establishDBConnectionByParams(const char *keywords[],
|
|||||||
const bool exit_on_error);
|
const bool exit_on_error);
|
||||||
bool is_standby(PGconn *conn);
|
bool is_standby(PGconn *conn);
|
||||||
bool is_witness(PGconn *conn, char *schema, char *cluster, int node_id);
|
bool is_witness(PGconn *conn, char *schema, char *cluster, int node_id);
|
||||||
bool is_pgup(PGconn *conn);
|
bool is_pgup(PGconn *conn, int timeout);
|
||||||
char *pg_version(PGconn *conn, char* major_version);
|
char *pg_version(PGconn *conn, char* major_version);
|
||||||
bool guc_setted(PGconn *conn, const char *parameter, const char *op,
|
bool guc_setted(PGconn *conn, const char *parameter, const char *op,
|
||||||
const char *value);
|
const char *value);
|
||||||
@@ -36,4 +36,6 @@ const char *get_cluster_size(PGconn *conn);
|
|||||||
PGconn *getMasterConnection(PGconn *standby_conn, char *schema, int id, char *cluster,
|
PGconn *getMasterConnection(PGconn *standby_conn, char *schema, int id, char *cluster,
|
||||||
int *master_id, char *master_conninfo_out);
|
int *master_id, char *master_conninfo_out);
|
||||||
|
|
||||||
|
int wait_connection_availability(PGconn *conn, int timeout);
|
||||||
|
void CancelQuery(PGconn *conn);
|
||||||
#endif
|
#endif
|
||||||
|
|||||||
2
repmgr.c
2
repmgr.c
@@ -81,7 +81,7 @@ bool require_password = false;
|
|||||||
|
|
||||||
/* Initialization of runtime options */
|
/* Initialization of runtime options */
|
||||||
t_runtime_options runtime_options = { "", "", "", "", "", "", DEFAULT_WAL_KEEP_SEGMENTS, false, false, false, "" };
|
t_runtime_options runtime_options = { "", "", "", "", "", "", DEFAULT_WAL_KEEP_SEGMENTS, false, false, false, "" };
|
||||||
t_configuration_options options = { "", -1, "", MANUAL_FAILOVER, -1, "", "", "", "", "" };
|
t_configuration_options options = { "", -1, "", MANUAL_FAILOVER, -1, "", "", "", "", "", "", -1 };
|
||||||
|
|
||||||
static char *server_mode = NULL;
|
static char *server_mode = NULL;
|
||||||
static char *server_cmd = NULL;
|
static char *server_cmd = NULL;
|
||||||
|
|||||||
@@ -13,6 +13,9 @@ node_name=standby2
|
|||||||
conninfo='host=192.168.204.104'
|
conninfo='host=192.168.204.104'
|
||||||
rsync_options=--archive --checksum --compress --progress --rsh=ssh
|
rsync_options=--archive --checksum --compress --progress --rsh=ssh
|
||||||
|
|
||||||
|
# How many seconds we wait for master response before declaring master failure
|
||||||
|
master_response_timeout=60
|
||||||
|
|
||||||
# Autofailover options
|
# Autofailover options
|
||||||
failover=automatic
|
failover=automatic
|
||||||
priority=-1
|
priority=-1
|
||||||
|
|||||||
42
repmgrd.c
42
repmgrd.c
@@ -76,7 +76,6 @@ static void help(const char* progname);
|
|||||||
static void usage(void);
|
static void usage(void);
|
||||||
static void checkClusterConfiguration(PGconn *conn, PGconn *primary);
|
static void checkClusterConfiguration(PGconn *conn, PGconn *primary);
|
||||||
static void checkNodeConfiguration(char *conninfo);
|
static void checkNodeConfiguration(char *conninfo);
|
||||||
static void CancelQuery(void);
|
|
||||||
|
|
||||||
static void StandbyMonitor(void);
|
static void StandbyMonitor(void);
|
||||||
static void WitnessMonitor(void);
|
static void WitnessMonitor(void);
|
||||||
@@ -99,7 +98,7 @@ static void setup_event_handlers(void);
|
|||||||
|
|
||||||
#define CloseConnections() \
|
#define CloseConnections() \
|
||||||
if (PQisBusy(primaryConn) == 1) \
|
if (PQisBusy(primaryConn) == 1) \
|
||||||
CancelQuery(); \
|
CancelQuery(primaryConn); \
|
||||||
if (myLocalConn != NULL) \
|
if (myLocalConn != NULL) \
|
||||||
PQfinish(myLocalConn); \
|
PQfinish(myLocalConn); \
|
||||||
if (primaryConn != NULL && primaryConn != myLocalConn) \
|
if (primaryConn != NULL && primaryConn != myLocalConn) \
|
||||||
@@ -353,12 +352,12 @@ WitnessMonitor(void)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* first check if there is a command being executed,
|
* Cancel any query that is still being executed,
|
||||||
* and if that is the case, cancel the query so i can
|
* so i can insert the current record
|
||||||
* insert the current record
|
|
||||||
*/
|
*/
|
||||||
if (PQisBusy(primaryConn) == 1)
|
CancelQuery(primaryConn);
|
||||||
CancelQuery();
|
if (wait_connection_availability(primaryConn, local_options.master_response_timeout) != 1)
|
||||||
|
return;
|
||||||
|
|
||||||
/* Get local xlog info */
|
/* Get local xlog info */
|
||||||
sqlquery_snprintf(sqlquery, "SELECT CURRENT_TIMESTAMP ");
|
sqlquery_snprintf(sqlquery, "SELECT CURRENT_TIMESTAMP ");
|
||||||
@@ -469,12 +468,12 @@ StandbyMonitor(void)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* first check if there is a command being executed,
|
* Cancel any query that is still being executed,
|
||||||
* and if that is the case, cancel the query so i can
|
* so i can insert the current record
|
||||||
* insert the current record
|
|
||||||
*/
|
*/
|
||||||
if (PQisBusy(primaryConn) == 1)
|
CancelQuery(primaryConn);
|
||||||
CancelQuery();
|
if (wait_connection_availability(primaryConn, local_options.master_response_timeout) != 1)
|
||||||
|
return;
|
||||||
|
|
||||||
/* Get local xlog info */
|
/* Get local xlog info */
|
||||||
sqlquery_snprintf(
|
sqlquery_snprintf(
|
||||||
@@ -753,7 +752,7 @@ CheckPrimaryConnection(void)
|
|||||||
*/
|
*/
|
||||||
for (connection_retries = 0; connection_retries < NUM_RETRY; connection_retries++)
|
for (connection_retries = 0; connection_retries < NUM_RETRY; connection_retries++)
|
||||||
{
|
{
|
||||||
if (!is_pgup(primaryConn))
|
if (!is_pgup(primaryConn, local_options.master_response_timeout))
|
||||||
{
|
{
|
||||||
log_warning(_("%s: Connection to master has been lost, trying to recover... %i seconds before failover decision\n"), progname, (SLEEP_RETRY*(NUM_RETRY-connection_retries)));
|
log_warning(_("%s: Connection to master has been lost, trying to recover... %i seconds before failover decision\n"), progname, (SLEEP_RETRY*(NUM_RETRY-connection_retries)));
|
||||||
/* wait SLEEP_RETRY seconds between retries */
|
/* wait SLEEP_RETRY seconds between retries */
|
||||||
@@ -768,7 +767,7 @@ CheckPrimaryConnection(void)
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!is_pgup(primaryConn))
|
if (!is_pgup(primaryConn, local_options.master_response_timeout))
|
||||||
{
|
{
|
||||||
log_err(_("%s: We couldn't reconnect for long enough, exiting...\n"), progname);
|
log_err(_("%s: We couldn't reconnect for long enough, exiting...\n"), progname);
|
||||||
/* XXX Anything else to do here? */
|
/* XXX Anything else to do here? */
|
||||||
@@ -939,21 +938,6 @@ setup_event_handlers(void)
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
||||||
static void
|
|
||||||
CancelQuery(void)
|
|
||||||
{
|
|
||||||
char errbuf[ERRBUFF_SIZE];
|
|
||||||
PGcancel *pgcancel;
|
|
||||||
|
|
||||||
pgcancel = PQgetCancel(primaryConn);
|
|
||||||
|
|
||||||
if (!pgcancel || PQcancel(pgcancel, errbuf, ERRBUFF_SIZE) == 0)
|
|
||||||
log_warning(_("Can't stop current query: %s\n"), errbuf);
|
|
||||||
|
|
||||||
PQfreeCancel(pgcancel);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
static void
|
static void
|
||||||
update_shared_memory(char *last_wal_standby_applied)
|
update_shared_memory(char *last_wal_standby_applied)
|
||||||
{
|
{
|
||||||
|
|||||||
Reference in New Issue
Block a user