Add signals to repmgrd and teach it to clean things up when

signaled before exit
This commit is contained in:
Jaime Casanova
2010-10-03 15:52:49 -05:00
parent 420edca4d1
commit d1d1232bef

126
repmgrd.c
View File

@@ -7,12 +7,16 @@
* how far are they from master * how far are they from master
*/ */
#include <signal.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <unistd.h> #include <unistd.h>
#include "repmgr.h" #include "repmgr.h"
#include "libpq/pqsignal.h"
char myClusterName[MAXLEN]; char myClusterName[MAXLEN];
/* Local info */ /* Local info */
@@ -25,6 +29,7 @@ int primaryId;
char primaryConninfo[MAXLEN]; char primaryConninfo[MAXLEN];
PGconn *primaryConn; PGconn *primaryConn;
char sqlquery[8192];
const char *progname; const char *progname;
@@ -37,11 +42,31 @@ static void checkClusterConfiguration(void);
static void checkNodeConfiguration(char *conninfo); static void checkNodeConfiguration(char *conninfo);
static void getPrimaryConnection(void); static void getPrimaryConnection(void);
static void MonitorCheck(void);
static void MonitorExecute(void); static void MonitorExecute(void);
static unsigned long long int walLocationToBytes(char *wal_location); static unsigned long long int walLocationToBytes(char *wal_location);
static void handle_sigint(SIGNAL_ARGS);
static void setup_cancel_handler(void);
#define CloseConnections() \
if (PQisBusy(primaryConn) == 1) \
CancelQuery(); \
if (myLocalConn != NULL) \
PQfinish(myLocalConn); \
if (primaryConn != NULL) \
PQfinish(primaryConn);
/*
* Every 3 seconds, insert monitor info
*/
#define MonitorCheck() \
for (;;) \
{ \
MonitorExecute(); \
sleep(3); \
}
int int
main(int argc, char **argv) main(int argc, char **argv)
@@ -89,6 +114,8 @@ main(int argc, char **argv)
exit(1); exit(1);
} }
} }
setup_cancel_handler();
if (config_file == NULL) if (config_file == NULL)
sprintf(config_file, "./%s", CONFIG_FILE); sprintf(config_file, "./%s", CONFIG_FILE);
@@ -125,19 +152,18 @@ main(int argc, char **argv)
/* I need the id of the primary as well as a connection to it */ /* I need the id of the primary as well as a connection to it */
getPrimaryConnection(); getPrimaryConnection();
MonitorCheck(); MonitorCheck();
PQfinish(primaryConn);
} }
/* close the connection to the database and cleanup */ /* close the connection to the database and cleanup */
PQfinish(myLocalConn); CloseConnections();
return 0; return 0;
} }
/* /*
* This function ask if we are in recovery, if false we are the primary else * get a connection to primary by reading repl_nodes, creating a connection
* we are a standby * to each node (one at a time) and finding if it is a primary or a standby
*/ */
static void static void
getPrimaryConnection(void) getPrimaryConnection(void)
@@ -146,43 +172,55 @@ getPrimaryConnection(void)
PGresult *res2; PGresult *res2;
int i; int i;
res1 = PQexec(myLocalConn, "SELECT * FROM repl_nodes"); /* find all nodes belonging to this cluster */
sprintf(sqlquery, "SELECT * FROM repl_nodes "
" WHERE cluster = '%s' ",
myClusterName);
res1 = PQexec(myLocalConn, sqlquery);
if (PQresultStatus(res1) != PGRES_TUPLES_OK) if (PQresultStatus(res1) != PGRES_TUPLES_OK)
{ {
fprintf(stderr, "Can't get nodes info: %s\n", PQerrorMessage(myLocalConn)); fprintf(stderr, "Can't get nodes info: %s\n", PQerrorMessage(myLocalConn));
PQclear(res1); PQclear(res1);
PQfinish(myLocalConn); CloseConnections();
exit(1); exit(1);
} }
for (i = 0; i < PQntuples(res1); i++) for (i = 0; i < PQntuples(res1); i++)
{ {
/* initialize with the values of the current node being processed */
primaryId = atoi(PQgetvalue(res1, i, 0)); primaryId = atoi(PQgetvalue(res1, i, 0));
strcpy(primaryConninfo, PQgetvalue(res1, i, 2)); strcpy(primaryConninfo, PQgetvalue(res1, i, 2));
primaryConn = establishDBConnection(primaryConninfo, false); primaryConn = establishDBConnection(primaryConninfo, false);
/*
* I can't use the is_standby() function here because on error that
* function closes the one i pass and exit, but i still need to close
* myLocalConn
*/
res2 = PQexec(primaryConn, "SELECT pg_is_in_recovery()"); res2 = PQexec(primaryConn, "SELECT pg_is_in_recovery()");
if (PQresultStatus(res2) != PGRES_TUPLES_OK) if (PQresultStatus(res2) != PGRES_TUPLES_OK)
{ {
fprintf(stderr, "Can't get nodes info: %s\n", PQerrorMessage(primaryConn)); fprintf(stderr, "Can't get nodes info: %s\n", PQerrorMessage(primaryConn));
PQclear(res1); PQclear(res1);
PQclear(res2); PQclear(res2);
PQfinish(primaryConn); CloseConnections();
PQfinish(myLocalConn);
exit(1); exit(1);
} }
/* if false, this is the primary */
if (strcmp(PQgetvalue(res2, 0, 0), "f") == 0) if (strcmp(PQgetvalue(res2, 0, 0), "f") == 0)
{ {
PQclear(res2); PQclear(res2);
PQclear(res1); PQclear(res1);
/* On the primary the monitor check is asynchronous */ /* We turn off synchronous_commit for the monitor info inserts */
res1 = PQexec(primaryConn, "SET synchronous_commit TO off"); res1 = PQexec(primaryConn, "SET synchronous_commit TO off");
PQclear(res1); PQclear(res1);
return; return;
} }
else else
{ {
/* if it is a standby clear info */
PQclear(res2); PQclear(res2);
PQfinish(primaryConn); PQfinish(primaryConn);
primaryId = -1; primaryId = -1;
@@ -194,37 +232,26 @@ getPrimaryConnection(void)
* reached max_connections or superuser_reserved_connections, * reached max_connections or superuser_reserved_connections,
* anything else i'm missing?), * anything else i'm missing?),
* Probably we will need to check the error to know if we need * Probably we will need to check the error to know if we need
* to start failover procedure o just fix some situation on the * to start failover procedure or just fix some situation on the
* standby. * standby.
*/ */
fprintf(stderr, "There isn't a primary node\n"); fprintf(stderr, "There isn't a primary node the cluster\n");
PQclear(res1); PQclear(res1);
PQfinish(myLocalConn); CloseConnections();
exit(1); exit(1);
} }
static void
MonitorCheck(void) {
/*
* Every 3 seconds, insert monitor info
*/
for (;;)
{
MonitorExecute();
sleep(3);
}
}
/* /*
* Check if its time for next monitor call and if so, do it. * Insert monitor info, this is basically the time and xlog replayed,
* applied on standby and current xlog location in primary.
* Also do the math to see how far are we in bytes for being uptodate
*/ */
static void static void
MonitorExecute(void) MonitorExecute(void)
{ {
PGresult *res; PGresult *res;
char sqlquery[8192];
char monitor_standby_timestamp[MAXLEN]; char monitor_standby_timestamp[MAXLEN];
char last_wal_primary_location[MAXLEN]; char last_wal_primary_location[MAXLEN];
char last_wal_standby_received[MAXLEN]; char last_wal_standby_received[MAXLEN];
@@ -234,6 +261,14 @@ MonitorExecute(void)
unsigned long long int lsn_standby_received; unsigned long long int lsn_standby_received;
unsigned long long int lsn_standby_applied; unsigned long long int lsn_standby_applied;
/*
* first check if there is a command being executed,
* and if that is the case, cancel the query so i can
* insert the current record
*/
if (PQisBusy(primaryConn) == 1)
CancelQuery();
/* Get local xlog info */ /* Get local xlog info */
sprintf(sqlquery, sprintf(sqlquery,
"SELECT CURRENT_TIMESTAMP, pg_last_xlog_receive_location(), " "SELECT CURRENT_TIMESTAMP, pg_last_xlog_receive_location(), "
@@ -244,6 +279,7 @@ MonitorExecute(void)
{ {
fprintf(stderr, "PQexec failed: %s\n", PQerrorMessage(myLocalConn)); fprintf(stderr, "PQexec failed: %s\n", PQerrorMessage(myLocalConn));
PQclear(res); PQclear(res);
/* if there is any error just let it be and retry in next loop */
return; return;
} }
@@ -289,8 +325,8 @@ MonitorExecute(void)
* Execute the query asynchronously, but don't check for a result. We * Execute the query asynchronously, but don't check for a result. We
* will check the result next time we pause for a monitor step. * will check the result next time we pause for a monitor step.
*/ */
if (!PQexec(primaryConn, sqlquery)) if (PQsendQuery(primaryConn, sqlquery) == 0)
fprintf(stderr, "replication monitor insert failed: %s\n", fprintf(stderr, "Query could not be sent to primary. %s\n",
PQerrorMessage(primaryConn)); PQerrorMessage(primaryConn));
} }
@@ -332,7 +368,6 @@ static void
checkNodeConfiguration(char *conninfo) checkNodeConfiguration(char *conninfo)
{ {
PGresult *res; PGresult *res;
char sqlquery[8192];
/* /*
* Check if we have my node information in repl_nodes * Check if we have my node information in repl_nodes
@@ -414,3 +449,34 @@ help(const char *progname)
printf(_("new master in the event of a failover\n")); printf(_("new master in the event of a failover\n"));
printf(_(" standby follow [node] - allows the standby to re-point itself to a new master\n")); printf(_(" standby follow [node] - allows the standby to re-point itself to a new master\n"));
} }
#ifndef WIN32
static void
handle_sigint(SIGNAL_ARGS)
{
CloseConnections();
}
static void
setup_cancel_handler(void)
{
pqsignal(SIGINT, handle_sigint);
}
#endif
static void
CancelQuery(void)
{
char errbuf[256];
PGcancel *pgcancel;
pgcancel = PQgetCancel(primaryConn);
if (!pgcancel || PQcancel(pgcancel, errbuf, 256) == 0)
fprintf(stderr, "Can't stop current query: %s", errbuf);
PQfreeCancel(pgcancel);
}