diff --git a/dbutils.c b/dbutils.c index 34e4396a..a2f31af1 100644 --- a/dbutils.c +++ b/dbutils.c @@ -2049,3 +2049,107 @@ get_slot_record(PGconn *conn, char *slot_name, t_replication_slot *record) } +/* asynchronous query functions */ + +bool +cancel_query(PGconn *conn, int timeout) +{ + char errbuf[ERRBUFF_SIZE]; + PGcancel *pgcancel; + + if (wait_connection_availability(conn, timeout) != 1) + return false; + + pgcancel = PQgetCancel(conn); + + if (pgcancel == NULL) + return false; + + /* + * PQcancel can only return 0 if socket()/connect()/send() fails, in any + * of those cases we can assume something bad happened to the connection + */ + if (PQcancel(pgcancel, errbuf, ERRBUFF_SIZE) == 0) + { + log_warning(_("Can't stop current query: %s\n"), errbuf); + PQfreeCancel(pgcancel); + return false; + } + + PQfreeCancel(pgcancel); + + return true; +} + + +/* + * Wait until current query finishes, ignoring any results. + * Usually this will be an async query or query cancellation. + * + * Returns 1 for success; 0 if any error ocurred; -1 if timeout reached. + */ +int +wait_connection_availability(PGconn *conn, long long timeout) +{ + PGresult *res; + fd_set read_set; + int sock = PQsocket(conn); + struct timeval tmout, + before, + after; + struct timezone tz; + + /* recalc to microseconds */ + timeout *= 1000000; + + while (timeout > 0) + { + if (PQconsumeInput(conn) == 0) + { + log_warning(_("wait_connection_availability(): could not receive data from connection. %s\n"), + PQerrorMessage(conn)); + return 0; + } + + if (PQisBusy(conn) == 0) + { + do + { + res = PQgetResult(conn); + PQclear(res); + } while (res != NULL); + + break; + } + + tmout.tv_sec = 0; + tmout.tv_usec = 250000; + + FD_ZERO(&read_set); + FD_SET(sock, &read_set); + + gettimeofday(&before, &tz); + if (select(sock, &read_set, NULL, NULL, &tmout) == -1) + { + log_warning( + _("wait_connection_availability(): select() returned with error:\n %s"), + strerror(errno)); + return -1; + } + + gettimeofday(&after, &tz); + + timeout -= (after.tv_sec * 1000000 + after.tv_usec) - + (before.tv_sec * 1000000 + before.tv_usec); + } + + + if (timeout >= 0) + { + return 1; + } + + log_warning(_("wait_connection_availability(): timeout reached")); + return -1; +} + diff --git a/dbutils.h b/dbutils.h index d671e0a0..8f2e2056 100644 --- a/dbutils.h +++ b/dbutils.h @@ -219,5 +219,9 @@ bool create_replication_slot(PGconn *conn, char *slot_name, int server_version_ bool drop_replication_slot(PGconn *conn, char *slot_name); int get_slot_record(PGconn *conn, char *slot_name, t_replication_slot *record); +/* asynchronous query functions */ +bool cancel_query(PGconn *conn, int timeout); +int wait_connection_availability(PGconn *conn, long long timeout); + #endif diff --git a/repmgr.h b/repmgr.h index 6e1ddd9c..8021dc00 100644 --- a/repmgr.h +++ b/repmgr.h @@ -47,6 +47,8 @@ #define TABLESPACE_MAP "tablespace_map" #endif +#define ERRBUFF_SIZE 512 + #define WITNESS_DEFAULT_PORT "5499" /* If this value is ever changed, remember * to update comments and documentation */ diff --git a/repmgrd.c b/repmgrd.c index 7bba6efe..6724184e 100644 --- a/repmgrd.c +++ b/repmgrd.c @@ -8,11 +8,11 @@ #include "config.h" #include - +#include #include -#define OPT_HELP 1 +#define OPT_HELP 1 static char *config_file = NULL; static bool verbose = false; @@ -22,12 +22,30 @@ static bool daemonize = false; t_configuration_options config_file_options = T_CONFIGURATION_OPTIONS_INITIALIZER; static t_node_info local_node_info; +static PGconn *local_conn = NULL; + +static PGconn *master_conn = NULL; + +/* + * Record receipt SIGHUP; will cause configuration file to be reread at the + * appropriate point in the main loop. + */ +static volatile sig_atomic_t got_SIGHUP = false; static void show_help(void); static void show_usage(void); static void daemonize_process(void); static void check_and_create_pid_file(const char *pid_file); +#ifndef WIN32 +static void setup_event_handlers(void); +static void handle_sighup(SIGNAL_ARGS); +static void handle_sigint(SIGNAL_ARGS); +#endif + +static void close_connections(); +static void terminate(int retval); + int main(int argc, char **argv) { @@ -171,6 +189,10 @@ main(int argc, char **argv) check_and_create_pid_file(pid_file); } +#ifndef WIN32 + setup_event_handlers(); +#endif + while(1) { sleep(1); } @@ -306,6 +328,30 @@ check_and_create_pid_file(const char *pid_file) } +#ifndef WIN32 +static void +handle_sigint(SIGNAL_ARGS) +{ + terminate(SUCCESS); +} + +/* SIGHUP: set flag to re-read config file at next convenient time */ +static void +handle_sighup(SIGNAL_ARGS) +{ + got_SIGHUP = true; +} + +static void +setup_event_handlers(void) +{ + pqsignal(SIGHUP, handle_sighup); + pqsignal(SIGINT, handle_sigint); + pqsignal(SIGTERM, handle_sigint); +} +#endif + + void show_usage(void) { @@ -345,3 +391,35 @@ show_help(void) printf(_("%s monitors a cluster of servers and optionally performs failover.\n"), progname()); } + +static void +close_connections() +{ + if (PQstatus(master_conn) == CONNECTION_OK) + { + /* cancel any pending queries to the master */ + if (PQisBusy(master_conn) == 1) + cancel_query(master_conn, config_file_options.master_response_timeout); + PQfinish(master_conn); + } + + if (PQstatus(local_conn) == CONNECTION_OK) + PQfinish(local_conn); +} + + +static void +terminate(int retval) +{ + close_connections(); + logger_shutdown(); + + if (pid_file) + { + unlink(pid_file); + } + + log_info(_("%s terminating...\n"), progname()); + + exit(retval); +}