diff --git a/repmgrd.c b/repmgrd.c index ad89f28d..b2b8735c 100644 --- a/repmgrd.c +++ b/repmgrd.c @@ -7,12 +7,16 @@ * how far are they from master */ +#include + #include #include #include #include "repmgr.h" +#include "libpq/pqsignal.h" + char myClusterName[MAXLEN]; /* Local info */ @@ -25,6 +29,7 @@ int primaryId; char primaryConninfo[MAXLEN]; PGconn *primaryConn; +char sqlquery[8192]; const char *progname; @@ -37,11 +42,31 @@ static void checkClusterConfiguration(void); static void checkNodeConfiguration(char *conninfo); static void getPrimaryConnection(void); -static void MonitorCheck(void); static void MonitorExecute(void); static unsigned long long int walLocationToBytes(char *wal_location); +static void handle_sigint(SIGNAL_ARGS); +static void setup_cancel_handler(void); + +#define CloseConnections() \ + if (PQisBusy(primaryConn) == 1) \ + CancelQuery(); \ + if (myLocalConn != NULL) \ + PQfinish(myLocalConn); \ + if (primaryConn != NULL) \ + PQfinish(primaryConn); + +/* + * Every 3 seconds, insert monitor info + */ +#define MonitorCheck() \ + for (;;) \ + { \ + MonitorExecute(); \ + sleep(3); \ + } + int main(int argc, char **argv) @@ -89,6 +114,8 @@ main(int argc, char **argv) exit(1); } } + + setup_cancel_handler(); if (config_file == NULL) sprintf(config_file, "./%s", CONFIG_FILE); @@ -125,19 +152,18 @@ main(int argc, char **argv) /* I need the id of the primary as well as a connection to it */ getPrimaryConnection(); MonitorCheck(); - PQfinish(primaryConn); } /* close the connection to the database and cleanup */ - PQfinish(myLocalConn); + CloseConnections(); return 0; } /* - * This function ask if we are in recovery, if false we are the primary else - * we are a standby + * get a connection to primary by reading repl_nodes, creating a connection + * to each node (one at a time) and finding if it is a primary or a standby */ static void getPrimaryConnection(void) @@ -146,43 +172,55 @@ getPrimaryConnection(void) PGresult *res2; int i; - res1 = PQexec(myLocalConn, "SELECT * FROM repl_nodes"); + /* find all nodes belonging to this cluster */ + sprintf(sqlquery, "SELECT * FROM repl_nodes " + " WHERE cluster = '%s' ", + myClusterName); + + res1 = PQexec(myLocalConn, sqlquery); if (PQresultStatus(res1) != PGRES_TUPLES_OK) { fprintf(stderr, "Can't get nodes info: %s\n", PQerrorMessage(myLocalConn)); PQclear(res1); - PQfinish(myLocalConn); + CloseConnections(); exit(1); } for (i = 0; i < PQntuples(res1); i++) { + /* initialize with the values of the current node being processed */ primaryId = atoi(PQgetvalue(res1, i, 0)); strcpy(primaryConninfo, PQgetvalue(res1, i, 2)); primaryConn = establishDBConnection(primaryConninfo, false); + /* + * I can't use the is_standby() function here because on error that + * function closes the one i pass and exit, but i still need to close + * myLocalConn + */ res2 = PQexec(primaryConn, "SELECT pg_is_in_recovery()"); if (PQresultStatus(res2) != PGRES_TUPLES_OK) { fprintf(stderr, "Can't get nodes info: %s\n", PQerrorMessage(primaryConn)); PQclear(res1); PQclear(res2); - PQfinish(primaryConn); - PQfinish(myLocalConn); + CloseConnections(); exit(1); } + /* if false, this is the primary */ if (strcmp(PQgetvalue(res2, 0, 0), "f") == 0) { PQclear(res2); PQclear(res1); - /* On the primary the monitor check is asynchronous */ + /* We turn off synchronous_commit for the monitor info inserts */ res1 = PQexec(primaryConn, "SET synchronous_commit TO off"); PQclear(res1); return; } else { + /* if it is a standby clear info */ PQclear(res2); PQfinish(primaryConn); primaryId = -1; @@ -194,37 +232,26 @@ getPrimaryConnection(void) * reached max_connections or superuser_reserved_connections, * anything else i'm missing?), * Probably we will need to check the error to know if we need - * to start failover procedure o just fix some situation on the + * to start failover procedure or just fix some situation on the * standby. */ - fprintf(stderr, "There isn't a primary node\n"); + fprintf(stderr, "There isn't a primary node the cluster\n"); PQclear(res1); - PQfinish(myLocalConn); + CloseConnections(); exit(1); } -static void -MonitorCheck(void) { - /* - * Every 3 seconds, insert monitor info - */ - for (;;) - { - MonitorExecute(); - sleep(3); - } -} - /* - * Check if its time for next monitor call and if so, do it. + * Insert monitor info, this is basically the time and xlog replayed, + * applied on standby and current xlog location in primary. + * Also do the math to see how far are we in bytes for being uptodate */ static void MonitorExecute(void) { PGresult *res; - char sqlquery[8192]; char monitor_standby_timestamp[MAXLEN]; char last_wal_primary_location[MAXLEN]; char last_wal_standby_received[MAXLEN]; @@ -234,6 +261,14 @@ MonitorExecute(void) unsigned long long int lsn_standby_received; unsigned long long int lsn_standby_applied; + /* + * first check if there is a command being executed, + * and if that is the case, cancel the query so i can + * insert the current record + */ + if (PQisBusy(primaryConn) == 1) + CancelQuery(); + /* Get local xlog info */ sprintf(sqlquery, "SELECT CURRENT_TIMESTAMP, pg_last_xlog_receive_location(), " @@ -244,6 +279,7 @@ MonitorExecute(void) { fprintf(stderr, "PQexec failed: %s\n", PQerrorMessage(myLocalConn)); PQclear(res); + /* if there is any error just let it be and retry in next loop */ return; } @@ -289,8 +325,8 @@ MonitorExecute(void) * Execute the query asynchronously, but don't check for a result. We * will check the result next time we pause for a monitor step. */ - if (!PQexec(primaryConn, sqlquery)) - fprintf(stderr, "replication monitor insert failed: %s\n", + if (PQsendQuery(primaryConn, sqlquery) == 0) + fprintf(stderr, "Query could not be sent to primary. %s\n", PQerrorMessage(primaryConn)); } @@ -332,7 +368,6 @@ static void checkNodeConfiguration(char *conninfo) { PGresult *res; - char sqlquery[8192]; /* * Check if we have my node information in repl_nodes @@ -414,3 +449,34 @@ help(const char *progname) printf(_("new master in the event of a failover\n")); printf(_(" standby follow [node] - allows the standby to re-point itself to a new master\n")); } + + + +#ifndef WIN32 +static void +handle_sigint(SIGNAL_ARGS) +{ + CloseConnections(); +} + +static void +setup_cancel_handler(void) +{ + pqsignal(SIGINT, handle_sigint); +} +#endif + + +static void +CancelQuery(void) +{ + char errbuf[256]; + PGcancel *pgcancel; + + pgcancel = PQgetCancel(primaryConn); + + if (!pgcancel || PQcancel(pgcancel, errbuf, 256) == 0) + fprintf(stderr, "Can't stop current query: %s", errbuf); + + PQfreeCancel(pgcancel); +}