repmgrd: add option "connection_check_type"

This enable selection of the method repmgrd uses to check whether the upstream
node is available. Possible values are:

 - "ping" (default): uses PQping() to check server availability
 - "connection":  executes a query on the connection to check server
   availability (similar to repmgr3.x).
This commit is contained in:
Ian Barwick
2019-03-06 12:07:30 +09:00
parent 4f83111033
commit 63f7ad546e
11 changed files with 150 additions and 23 deletions

View File

@@ -27,8 +27,9 @@
repmgrd: check binary and extension major versions match; GitHub #515 (Ian) repmgrd: check binary and extension major versions match; GitHub #515 (Ian)
repmgrd: on a cascaded standby, don't fail over if "failover=manual"; repmgrd: on a cascaded standby, don't fail over if "failover=manual";
GitHub #531 (Ian) GitHub #531 (Ian)
repmgrd: don't consider nodes where repmgrd is not running as promotion repmgrd: don't consider nodes where repmgrd is not running as promotion
candidates (Ian) candidates (Ian)
repmgrd: add option "connection_check_type" (Ian)
4.2.1 2018-??-?? 4.2.1 2018-??-??
repmgr: add sanity check for correct extension version (Ian) repmgr: add sanity check for correct extension version (Ian)

View File

@@ -358,6 +358,7 @@ _parse_config(t_configuration_options *options, ItemList *error_list, ItemList *
options->primary_notification_timeout = DEFAULT_PRIMARY_NOTIFICATION_TIMEOUT; options->primary_notification_timeout = DEFAULT_PRIMARY_NOTIFICATION_TIMEOUT;
options->repmgrd_standby_startup_timeout = -1; /* defaults to "standby_reconnect_timeout" if not set */ options->repmgrd_standby_startup_timeout = -1; /* defaults to "standby_reconnect_timeout" if not set */
memset(options->repmgrd_pid_file, 0, sizeof(options->repmgrd_pid_file)); memset(options->repmgrd_pid_file, 0, sizeof(options->repmgrd_pid_file));
options->connection_check_type = CHECK_PING;
/*------------- /*-------------
* witness settings * witness settings
@@ -618,6 +619,22 @@ _parse_config(t_configuration_options *options, ItemList *error_list, ItemList *
options->repmgrd_standby_startup_timeout = repmgr_atoi(value, name, error_list, 0); options->repmgrd_standby_startup_timeout = repmgr_atoi(value, name, error_list, 0);
else if (strcmp(name, "repmgrd_pid_file") == 0) else if (strcmp(name, "repmgrd_pid_file") == 0)
strncpy(options->repmgrd_pid_file, value, MAXPGPATH); strncpy(options->repmgrd_pid_file, value, MAXPGPATH);
else if (strcmp(name, "connection_check_type") == 0)
{
if (strcasecmp(value, "ping") == 0)
{
options->connection_check_type = CHECK_PING;
}
else if (strcasecmp(value, "connection") == 0)
{
options->connection_check_type = CHECK_CONNECTION;
}
else
{
item_list_append(error_list,
_("value for \"connection_check_type\" must be \"ping\" or \"connect\"\n"));
}
}
/* witness settings */ /* witness settings */
else if (strcmp(name, "witness_sync_interval") == 0) else if (strcmp(name, "witness_sync_interval") == 0)
@@ -1155,7 +1172,6 @@ reload_config(t_configuration_options *orig_options, t_server_type server_type)
return false; return false;
} }
/* /*
* No configuration problems detected - copy any changed values * No configuration problems detected - copy any changed values
* *
@@ -1330,6 +1346,14 @@ reload_config(t_configuration_options *orig_options, t_server_type server_type)
config_changed = true; config_changed = true;
} }
if (orig_options->connection_check_type != new_options.connection_check_type)
{
orig_options->connection_check_type = new_options.connection_check_type;
log_info(_("\"connection_check_type\" is now \"%s\""),
new_options.connection_check_type == CHECK_PING ? "ping" : "connection");
config_changed = true;
}
/* /*
* Handle changes to logging configuration * Handle changes to logging configuration
*/ */

View File

@@ -37,6 +37,12 @@ typedef enum
FAILOVER_AUTOMATIC FAILOVER_AUTOMATIC
} failover_mode_opt; } failover_mode_opt;
typedef enum
{
CHECK_PING,
CHECK_CONNECTION
} ConnectionCheckType;
typedef struct EventNotificationListCell typedef struct EventNotificationListCell
{ {
struct EventNotificationListCell *next; struct EventNotificationListCell *next;
@@ -135,6 +141,7 @@ typedef struct
int primary_notification_timeout; int primary_notification_timeout;
int repmgrd_standby_startup_timeout; int repmgrd_standby_startup_timeout;
char repmgrd_pid_file[MAXPGPATH]; char repmgrd_pid_file[MAXPGPATH];
ConnectionCheckType connection_check_type;
/* BDR settings */ /* BDR settings */
bool bdr_local_monitoring_only; bool bdr_local_monitoring_only;
@@ -206,7 +213,7 @@ typedef struct
false, -1, \ false, -1, \
DEFAULT_ASYNC_QUERY_TIMEOUT, \ DEFAULT_ASYNC_QUERY_TIMEOUT, \
DEFAULT_PRIMARY_NOTIFICATION_TIMEOUT, \ DEFAULT_PRIMARY_NOTIFICATION_TIMEOUT, \
-1, "", \ -1, "", CHECK_PING, \
/* BDR settings */ \ /* BDR settings */ \
false, DEFAULT_BDR_RECOVERY_TIMEOUT, \ false, DEFAULT_BDR_RECOVERY_TIMEOUT, \
/* service settings */ \ /* service settings */ \

View File

@@ -4132,7 +4132,8 @@ cancel_query(PGconn *conn, int timeout)
*/ */
if (PQcancel(pgcancel, errbuf, ERRBUFF_SIZE) == 0) if (PQcancel(pgcancel, errbuf, ERRBUFF_SIZE) == 0)
{ {
log_warning(_("unable to stop current query:\n %s"), errbuf); log_warning(_("unable to cancel current query"));
log_detail("%s", errbuf);
PQfreeCancel(pgcancel); PQfreeCancel(pgcancel);
return false; return false;
} }
@@ -4150,7 +4151,7 @@ cancel_query(PGconn *conn, int timeout)
* Returns 1 for success; 0 if any error ocurred; -1 if timeout reached. * Returns 1 for success; 0 if any error ocurred; -1 if timeout reached.
*/ */
int int
wait_connection_availability(PGconn *conn, long long timeout) wait_connection_availability(PGconn *conn, int timeout)
{ {
PGresult *res = NULL; PGresult *res = NULL;
fd_set read_set; fd_set read_set;
@@ -4159,16 +4160,17 @@ wait_connection_availability(PGconn *conn, long long timeout)
before, before,
after; after;
struct timezone tz; struct timezone tz;
long long timeout_ms;
/* recalc to microseconds */ /* calculate timeout in microseconds */
timeout *= 1000000; timeout_ms = timeout * 1000000;
while (timeout > 0) while (timeout_ms > 0)
{ {
if (PQconsumeInput(conn) == 0) if (PQconsumeInput(conn) == 0)
{ {
log_warning(_("wait_connection_availability(): could not receive data from connection:\n %s"), log_warning(_("wait_connection_availability(): unable to receive data from connection"));
PQerrorMessage(conn)); log_detail("%s", PQerrorMessage(conn));
return 0; return 0;
} }
@@ -4199,17 +4201,17 @@ wait_connection_availability(PGconn *conn, long long timeout)
gettimeofday(&after, &tz); gettimeofday(&after, &tz);
timeout -= (after.tv_sec * 1000000 + after.tv_usec) - timeout_ms -= (after.tv_sec * 1000000 + after.tv_usec) -
(before.tv_sec * 1000000 + before.tv_usec); (before.tv_sec * 1000000 + before.tv_usec);
} }
if (timeout >= 0) if (timeout_ms >= 0)
{ {
return 1; return 1;
} }
log_warning(_("wait_connection_availability(): timeout reached")); log_warning(_("wait_connection_availability(): timeout (%i secs) reached"), timeout);
return -1; return -1;
} }

View File

@@ -510,7 +510,7 @@ bool get_tablespace_name_by_location(PGconn *conn, const char *location, char *
/* asynchronous query functions */ /* asynchronous query functions */
bool cancel_query(PGconn *conn, int timeout); bool cancel_query(PGconn *conn, int timeout);
int wait_connection_availability(PGconn *conn, long long timeout); int wait_connection_availability(PGconn *conn, int timeout);
/* node availability functions */ /* node availability functions */
bool is_server_available(const char *conninfo); bool is_server_available(const char *conninfo);

View File

@@ -160,7 +160,17 @@ REPMGRD_OPTS="--daemonize=false"</programlisting>
the absence of a running <application>repmgrd</application>. the absence of a running <application>repmgrd</application>.
</para> </para>
</listitem> </listitem>
<listitem>
<para>
Add option <option>connection_check_type</option> to enable selection of the method
<application>repmgrd</application> uses to determine whether the upstream node is available.
</para>
<para>
Possible values are <literal>ping</literal> (default; uses <command>PQping()</command> to
determine server availability); and <literal>connection</literal> (determines server availability
by executing an SQL statement on the node via the existing connection).
</para>
</listitem>
</itemizedlist> </itemizedlist>
</para> </para>
</sect2> </sect2>

View File

@@ -101,6 +101,32 @@
<command>repmgr standby follow</command> will result in the node continuing to follow <command>repmgr standby follow</command> will result in the node continuing to follow
the original primary. the original primary.
</para> </para>
<para>
<indexterm>
<primary>connection_check_type</primary>
</indexterm>
Additionally, the option <option>connection_check_type</option> to enable selection of the method
<application>repmgrd</application> uses to determine whether the upstream node is available.
</para>
<para>
Possible values are:
<itemizedlist spacing="compact" mark="bullet">
<listitem>
<simpara>
<literal>ping</literal> (default) - uses <command>PQping()</command> to
determine server availability
</simpara>
</listitem>
<listitem>
<simpara>
<literal>connection</literal> - determines server availability
by executing an SQL statement on the node via the existing connection
</simpara>
</listitem>
</itemizedlist>
</para>
</sect2> </sect2>
<sect2 id="postgresql-service-configuration"> <sect2 id="postgresql-service-configuration">

View File

@@ -285,6 +285,9 @@ ssh_options='-q -o ConnectTimeout=10' # Options to append to "ssh"
# a value of zero prevents the node being promoted to primary # a value of zero prevents the node being promoted to primary
# (default: 100) # (default: 100)
#connection_check_type=ping # How to check availability of the upstream node; valid options:
# 'ping': use PQping() to check if the node is accepting connections
# 'connection': execute a throwaway query on the current connection
#reconnect_attempts=6 # Number of attempts which will be made to reconnect to an unreachable #reconnect_attempts=6 # Number of attempts which will be made to reconnect to an unreachable
# primary (or other upstream node) # primary (or other upstream node)
#reconnect_interval=10 # Interval between attempts to reconnect to an unreachable #reconnect_interval=10 # Interval between attempts to reconnect to an unreachable

View File

@@ -831,7 +831,7 @@ monitor_streaming_standby(void)
while (true) while (true)
{ {
log_verbose(LOG_DEBUG, "checking %s", upstream_node_info.conninfo); log_verbose(LOG_DEBUG, "checking %s", upstream_node_info.conninfo);
if (is_server_available(upstream_node_info.conninfo) == true) if (check_upstream_connection(upstream_conn, upstream_node_info.conninfo) == true)
{ {
set_upstream_last_seen(local_conn); set_upstream_last_seen(local_conn);
} }
@@ -1030,9 +1030,10 @@ monitor_streaming_standby(void)
upstream_node_info.node_id, upstream_node_info.node_id,
degraded_monitoring_elapsed); degraded_monitoring_elapsed);
if (is_server_available(upstream_node_info.conninfo) == true) if (check_upstream_connection(upstream_conn, upstream_node_info.conninfo) == true)
{ {
upstream_conn = establish_db_connection(upstream_node_info.conninfo, false); if (config_file_options.connection_check_type == CHECK_PING)
upstream_conn = establish_db_connection(upstream_node_info.conninfo, false);
if (PQstatus(upstream_conn) == CONNECTION_OK) if (PQstatus(upstream_conn) == CONNECTION_OK)
{ {
@@ -1604,7 +1605,7 @@ monitor_streaming_witness(void)
while (true) while (true)
{ {
if (is_server_available(upstream_node_info.conninfo) == false) if (check_upstream_connection(upstream_conn, upstream_node_info.conninfo) == false)
{ {
if (upstream_node_info.node_status == NODE_STATUS_UP) if (upstream_node_info.node_status == NODE_STATUS_UP)
{ {
@@ -1693,9 +1694,10 @@ monitor_streaming_witness(void)
upstream_node_info.node_id, upstream_node_info.node_id,
degraded_monitoring_elapsed); degraded_monitoring_elapsed);
if (is_server_available(upstream_node_info.conninfo) == true) if (check_upstream_connection(primary_conn, upstream_node_info.conninfo) == true)
{ {
primary_conn = establish_db_connection(upstream_node_info.conninfo, false); if (config_file_options.connection_check_type == CHECK_PING)
primary_conn = establish_db_connection(upstream_node_info.conninfo, false);
if (PQstatus(primary_conn) == CONNECTION_OK) if (PQstatus(primary_conn) == CONNECTION_OK)
{ {

View File

@@ -818,6 +818,58 @@ show_help(void)
} }
bool
check_upstream_connection(PGconn *conn, const char *conninfo)
{
/* Check the connection status twice in case it changes after reset */
bool twice = false;
if (config_file_options.connection_check_type == CHECK_PING)
return is_server_available(conninfo);
for (;;)
{
if (PQstatus(conn) != CONNECTION_OK)
{
if (twice)
return false;
PQreset(conn); /* reconnect */
twice = true;
}
else
{
if (!cancel_query(conn, config_file_options.async_query_timeout))
goto failed;
if (wait_connection_availability(conn, config_file_options.async_query_timeout) != 1)
goto failed;
/* execute a simple query to verify connection availability */
if (PQsendQuery(conn, "SELECT 1") == 0)
{
log_warning(_("unable to send query to upstream"));
log_detail("%s", PQerrorMessage(conn));
goto failed;
}
if (wait_connection_availability(conn, config_file_options.async_query_timeout) != 1)
goto failed;
break;
failed:
/* retry once */
if (twice)
return false;
PQreset(conn); /* reconnect */
twice = true;
}
}
return true;
}
void void
try_reconnect(PGconn **conn, t_node_info *node_info) try_reconnect(PGconn **conn, t_node_info *node_info)
{ {

View File

@@ -23,6 +23,7 @@ extern PGconn *local_conn;
extern bool startup_event_logged; extern bool startup_event_logged;
extern char pid_file[MAXPGPATH]; extern char pid_file[MAXPGPATH];
bool check_upstream_connection(PGconn *conn, const char *conninfo);
void try_reconnect(PGconn **conn, t_node_info *node_info); void try_reconnect(PGconn **conn, t_node_info *node_info);
int calculate_elapsed(instr_time start_time); int calculate_elapsed(instr_time start_time);
@@ -31,5 +32,4 @@ const char *print_monitoring_state(MonitoringState monitoring_state);
void update_registration(PGconn *conn); void update_registration(PGconn *conn);
void terminate(int retval); void terminate(int retval);
#endif /* _REPMGRD_H_ */ #endif /* _REPMGRD_H_ */