repmgrd: further code ported

This commit is contained in:
Ian Barwick
2017-06-20 09:17:29 +09:00
parent f713f30ef8
commit 94a88326ef
4 changed files with 190 additions and 2 deletions

104
dbutils.c
View File

@@ -2049,3 +2049,107 @@ get_slot_record(PGconn *conn, char *slot_name, t_replication_slot *record)
}
/* asynchronous query functions */
bool
cancel_query(PGconn *conn, int timeout)
{
char errbuf[ERRBUFF_SIZE];
PGcancel *pgcancel;
if (wait_connection_availability(conn, timeout) != 1)
return false;
pgcancel = PQgetCancel(conn);
if (pgcancel == NULL)
return false;
/*
* PQcancel can only return 0 if socket()/connect()/send() fails, in any
* of those cases we can assume something bad happened to the connection
*/
if (PQcancel(pgcancel, errbuf, ERRBUFF_SIZE) == 0)
{
log_warning(_("Can't stop current query: %s\n"), errbuf);
PQfreeCancel(pgcancel);
return false;
}
PQfreeCancel(pgcancel);
return true;
}
/*
* Wait until current query finishes, ignoring any results.
* Usually this will be an async query or query cancellation.
*
* Returns 1 for success; 0 if any error ocurred; -1 if timeout reached.
*/
int
wait_connection_availability(PGconn *conn, long long timeout)
{
PGresult *res;
fd_set read_set;
int sock = PQsocket(conn);
struct timeval tmout,
before,
after;
struct timezone tz;
/* recalc to microseconds */
timeout *= 1000000;
while (timeout > 0)
{
if (PQconsumeInput(conn) == 0)
{
log_warning(_("wait_connection_availability(): could not receive data from connection. %s\n"),
PQerrorMessage(conn));
return 0;
}
if (PQisBusy(conn) == 0)
{
do
{
res = PQgetResult(conn);
PQclear(res);
} while (res != NULL);
break;
}
tmout.tv_sec = 0;
tmout.tv_usec = 250000;
FD_ZERO(&read_set);
FD_SET(sock, &read_set);
gettimeofday(&before, &tz);
if (select(sock, &read_set, NULL, NULL, &tmout) == -1)
{
log_warning(
_("wait_connection_availability(): select() returned with error:\n %s"),
strerror(errno));
return -1;
}
gettimeofday(&after, &tz);
timeout -= (after.tv_sec * 1000000 + after.tv_usec) -
(before.tv_sec * 1000000 + before.tv_usec);
}
if (timeout >= 0)
{
return 1;
}
log_warning(_("wait_connection_availability(): timeout reached"));
return -1;
}

View File

@@ -219,5 +219,9 @@ bool create_replication_slot(PGconn *conn, char *slot_name, int server_version_
bool drop_replication_slot(PGconn *conn, char *slot_name);
int get_slot_record(PGconn *conn, char *slot_name, t_replication_slot *record);
/* asynchronous query functions */
bool cancel_query(PGconn *conn, int timeout);
int wait_connection_availability(PGconn *conn, long long timeout);
#endif

View File

@@ -47,6 +47,8 @@
#define TABLESPACE_MAP "tablespace_map"
#endif
#define ERRBUFF_SIZE 512
#define WITNESS_DEFAULT_PORT "5499" /* If this value is ever changed, remember
* to update comments and documentation */

View File

@@ -8,11 +8,11 @@
#include "config.h"
#include <stdio.h>
#include <signal.h>
#include <sys/stat.h>
#define OPT_HELP 1
#define OPT_HELP 1
static char *config_file = NULL;
static bool verbose = false;
@@ -22,12 +22,30 @@ static bool daemonize = false;
t_configuration_options config_file_options = T_CONFIGURATION_OPTIONS_INITIALIZER;
static t_node_info local_node_info;
static PGconn *local_conn = NULL;
static PGconn *master_conn = NULL;
/*
* Record receipt SIGHUP; will cause configuration file to be reread at the
* appropriate point in the main loop.
*/
static volatile sig_atomic_t got_SIGHUP = false;
static void show_help(void);
static void show_usage(void);
static void daemonize_process(void);
static void check_and_create_pid_file(const char *pid_file);
#ifndef WIN32
static void setup_event_handlers(void);
static void handle_sighup(SIGNAL_ARGS);
static void handle_sigint(SIGNAL_ARGS);
#endif
static void close_connections();
static void terminate(int retval);
int
main(int argc, char **argv)
{
@@ -171,6 +189,10 @@ main(int argc, char **argv)
check_and_create_pid_file(pid_file);
}
#ifndef WIN32
setup_event_handlers();
#endif
while(1) {
sleep(1);
}
@@ -306,6 +328,30 @@ check_and_create_pid_file(const char *pid_file)
}
#ifndef WIN32
static void
handle_sigint(SIGNAL_ARGS)
{
terminate(SUCCESS);
}
/* SIGHUP: set flag to re-read config file at next convenient time */
static void
handle_sighup(SIGNAL_ARGS)
{
got_SIGHUP = true;
}
static void
setup_event_handlers(void)
{
pqsignal(SIGHUP, handle_sighup);
pqsignal(SIGINT, handle_sigint);
pqsignal(SIGTERM, handle_sigint);
}
#endif
void
show_usage(void)
{
@@ -345,3 +391,35 @@ show_help(void)
printf(_("%s monitors a cluster of servers and optionally performs failover.\n"), progname());
}
static void
close_connections()
{
if (PQstatus(master_conn) == CONNECTION_OK)
{
/* cancel any pending queries to the master */
if (PQisBusy(master_conn) == 1)
cancel_query(master_conn, config_file_options.master_response_timeout);
PQfinish(master_conn);
}
if (PQstatus(local_conn) == CONNECTION_OK)
PQfinish(local_conn);
}
static void
terminate(int retval)
{
close_connections();
logger_shutdown();
if (pid_file)
{
unlink(pid_file);
}
log_info(_("%s terminating...\n"), progname());
exit(retval);
}