diff --git a/config.c b/config.c index 8c6d50cf..03623eae 100644 --- a/config.c +++ b/config.c @@ -41,6 +41,7 @@ parse_config(const char *config_file, t_configuration_options *options) memset(options->promote_command, 0, sizeof(options->promote_command)); memset(options->follow_command, 0, sizeof(options->follow_command)); memset(options->rsync_options, 0, sizeof(options->rsync_options)); + options->master_response_timeout = 0; /* * Since some commands don't require a config file at all, not @@ -98,6 +99,8 @@ parse_config(const char *config_file, t_configuration_options *options) strncpy(options->promote_command, value, MAXLEN); else if (strcmp(name, "follow_command") == 0) strncpy(options->follow_command, value, MAXLEN); + else if (strcmp(name, "master_response_timeout") == 0) + options->master_response_timeout = atoi(value); else log_warning(_("%s/%s: Unknown name/value pair!\n"), name, value); } @@ -215,6 +218,12 @@ reload_configuration(char *config_file, t_configuration_options *orig_options) return false; } + if (new_options.master_response_timeout <= 0) + { + log_warning(_("\nNew value for master_response_timeout is not valid. Should be greater than zero.\n")); + return false; + } + /* Test conninfo string */ conn = establishDBConnection(new_options.conninfo, false); if (!conn || (PQstatus(conn) != CONNECTION_OK)) @@ -234,6 +243,7 @@ reload_configuration(char *config_file, t_configuration_options *orig_options) strcpy(orig_options->promote_command, new_options.promote_command); strcpy(orig_options->follow_command, new_options.follow_command); strcpy(orig_options->rsync_options, new_options.rsync_options); + orig_options->master_response_timeout = new_options.master_response_timeout; /* * XXX These ones can change with a simple SIGHUP? diff --git a/config.h b/config.h index c3a379b5..90e3cc2f 100644 --- a/config.h +++ b/config.h @@ -36,6 +36,7 @@ typedef struct char loglevel[MAXLEN]; char logfacility[MAXLEN]; char rsync_options[QUERY_STR_LEN]; + int master_response_timeout; } t_configuration_options; void parse_config(const char *config_file, t_configuration_options *options); diff --git a/dbutils.c b/dbutils.c index 7458b822..5cdac381 100644 --- a/dbutils.c +++ b/dbutils.c @@ -17,6 +17,8 @@ * */ +#include + #include "repmgr.h" #include "strutil.h" #include "log.h" @@ -123,9 +125,8 @@ is_witness(PGconn *conn, char *schema, char *cluster, int node_id) /* check the PQStatus and try to 'select 1' to confirm good connection */ bool -is_pgup(PGconn *conn) +is_pgup(PGconn *conn, int timeout) { - PGresult *res; char sqlquery[QUERY_STR_LEN]; /* Check the connection status twice in case it changes after reset */ bool twice = false; @@ -143,28 +144,30 @@ is_pgup(PGconn *conn) else { /* - * Send a SELECT 1 just to check if connection is OK - * the PQstatus() won't catch disconnected connection - * XXXX - * the error message can be used by repmgrd + * Send a SELECT 1 just to check if the connection is OK */ + CancelQuery(conn); + if (wait_connection_availability(conn, timeout) != 1) + goto failed; + sqlquery_snprintf(sqlquery, "SELECT 1"); - res = PQexec(conn, sqlquery); + if (PQsendQuery(conn, sqlquery) == 0) + { + log_warning(_("PQsendQuery: Query could not be sent to primary. %s\n"), + PQerrorMessage(conn)); + goto failed; + } + if (wait_connection_availability(conn, timeout) != 1) + goto failed; + + continue; + +failed: // we need to retry, because we might just have loose the connection once - if (PQresultStatus(res) != PGRES_TUPLES_OK) - { - log_err(_("PQexec failed: %s\n"), PQerrorMessage(conn)); - PQclear(res); - if (twice) - return false; - PQreset(conn); // reconnect - twice = true; - } - else - { - PQclear(res); - return true; - } + if (twice) + return false; + PQreset(conn); // reconnect + twice = true; } } } @@ -388,3 +391,55 @@ getMasterConnection(PGconn *standby_conn, char *schema, int id, char *cluster, PQclear(res1); return NULL; } + + +/* + * wait until current query finishes ignoring any results, this could be an async command + * or a cancelation of a query + * return 1 if Ok; 0 if any error ocurred; -1 if timeout reached + */ +int +wait_connection_availability(PGconn *conn, int timeout) +{ + PGresult *res; + + while(timeout-- >= 0) + { + if (PQconsumeInput(conn) == 0) + { + log_warning(_("PQconsumeInput: Query could not be sent to primary. %s\n"), + PQerrorMessage(conn)); + return 0; + } + + if (PQisBusy(conn) == 0) + { + res = PQgetResult(conn); + if (res == NULL) + break; + PQclear(res); + } + sleep(1); + } + if (timeout >= 0) + return 1; + else + return -1; +} + + +void +CancelQuery(PGconn *conn) +{ + char errbuf[ERRBUFF_SIZE]; + PGcancel *pgcancel; + + pgcancel = PQgetCancel(conn); + + if (!pgcancel || PQcancel(pgcancel, errbuf, ERRBUFF_SIZE) == 0) + log_warning(_("Can't stop current query: %s\n"), errbuf); + + PQfreeCancel(pgcancel); +} + + diff --git a/dbutils.h b/dbutils.h index 0b987e64..5ed35dec 100644 --- a/dbutils.h +++ b/dbutils.h @@ -28,7 +28,7 @@ PGconn *establishDBConnectionByParams(const char *keywords[], const bool exit_on_error); bool is_standby(PGconn *conn); bool is_witness(PGconn *conn, char *schema, char *cluster, int node_id); -bool is_pgup(PGconn *conn); +bool is_pgup(PGconn *conn, int timeout); char *pg_version(PGconn *conn, char* major_version); bool guc_setted(PGconn *conn, const char *parameter, const char *op, const char *value); @@ -36,4 +36,6 @@ const char *get_cluster_size(PGconn *conn); PGconn *getMasterConnection(PGconn *standby_conn, char *schema, int id, char *cluster, int *master_id, char *master_conninfo_out); +int wait_connection_availability(PGconn *conn, int timeout); +void CancelQuery(PGconn *conn); #endif diff --git a/repmgr.c b/repmgr.c index e2e733e2..ef6e7944 100644 --- a/repmgr.c +++ b/repmgr.c @@ -81,7 +81,7 @@ bool require_password = false; /* Initialization of runtime options */ t_runtime_options runtime_options = { "", "", "", "", "", "", DEFAULT_WAL_KEEP_SEGMENTS, false, false, false, "" }; -t_configuration_options options = { "", -1, "", MANUAL_FAILOVER, -1, "", "", "", "", "" }; +t_configuration_options options = { "", -1, "", MANUAL_FAILOVER, -1, "", "", "", "", "", "", -1 }; static char *server_mode = NULL; static char *server_cmd = NULL; diff --git a/repmgr.conf.sample b/repmgr.conf.sample index cedf1baf..73d8ed40 100644 --- a/repmgr.conf.sample +++ b/repmgr.conf.sample @@ -13,6 +13,9 @@ node_name=standby2 conninfo='host=192.168.204.104' rsync_options=--archive --checksum --compress --progress --rsh=ssh +# How many seconds we wait for master response before declaring master failure +master_response_timeout=60 + # Autofailover options failover=automatic priority=-1 diff --git a/repmgrd.c b/repmgrd.c index 5d1bd6da..5a01b0da 100644 --- a/repmgrd.c +++ b/repmgrd.c @@ -76,7 +76,6 @@ static void help(const char* progname); static void usage(void); static void checkClusterConfiguration(PGconn *conn, PGconn *primary); static void checkNodeConfiguration(char *conninfo); -static void CancelQuery(void); static void StandbyMonitor(void); static void WitnessMonitor(void); @@ -99,7 +98,7 @@ static void setup_event_handlers(void); #define CloseConnections() \ if (PQisBusy(primaryConn) == 1) \ - CancelQuery(); \ + CancelQuery(primaryConn); \ if (myLocalConn != NULL) \ PQfinish(myLocalConn); \ if (primaryConn != NULL && primaryConn != myLocalConn) \ @@ -353,12 +352,12 @@ WitnessMonitor(void) } /* - * first check if there is a command being executed, - * and if that is the case, cancel the query so i can - * insert the current record + * Cancel any query that is still being executed, + * so i can insert the current record */ - if (PQisBusy(primaryConn) == 1) - CancelQuery(); + CancelQuery(primaryConn); + if (wait_connection_availability(primaryConn, local_options.master_response_timeout) != 1) + return; /* Get local xlog info */ sqlquery_snprintf(sqlquery, "SELECT CURRENT_TIMESTAMP "); @@ -469,12 +468,12 @@ StandbyMonitor(void) } /* - * first check if there is a command being executed, - * and if that is the case, cancel the query so i can - * insert the current record + * Cancel any query that is still being executed, + * so i can insert the current record */ - if (PQisBusy(primaryConn) == 1) - CancelQuery(); + CancelQuery(primaryConn); + if (wait_connection_availability(primaryConn, local_options.master_response_timeout) != 1) + return; /* Get local xlog info */ sqlquery_snprintf( @@ -753,7 +752,7 @@ CheckPrimaryConnection(void) */ for (connection_retries = 0; connection_retries < NUM_RETRY; connection_retries++) { - if (!is_pgup(primaryConn)) + if (!is_pgup(primaryConn, local_options.master_response_timeout)) { log_warning(_("%s: Connection to master has been lost, trying to recover... %i seconds before failover decision\n"), progname, (SLEEP_RETRY*(NUM_RETRY-connection_retries))); /* wait SLEEP_RETRY seconds between retries */ @@ -768,7 +767,7 @@ CheckPrimaryConnection(void) break; } } - if (!is_pgup(primaryConn)) + if (!is_pgup(primaryConn, local_options.master_response_timeout)) { log_err(_("%s: We couldn't reconnect for long enough, exiting...\n"), progname); /* XXX Anything else to do here? */ @@ -939,21 +938,6 @@ setup_event_handlers(void) #endif -static void -CancelQuery(void) -{ - char errbuf[ERRBUFF_SIZE]; - PGcancel *pgcancel; - - pgcancel = PQgetCancel(primaryConn); - - if (!pgcancel || PQcancel(pgcancel, errbuf, ERRBUFF_SIZE) == 0) - log_warning(_("Can't stop current query: %s\n"), errbuf); - - PQfreeCancel(pgcancel); -} - - static void update_shared_memory(char *last_wal_standby_applied) {