Attack of whitespace pedantry

pgsql conventions (tabs, four-spaces-wide, etc) applied all around.

Also tried to fix some very tiny capitalization errors, auto-fill
problems, and some inter-block vertical whitespacing issues.

Long strings in repmgr.c were left intact, though. They are rather
numerous and are less of a problem than tiny bits of function calls
and comments wrapping over a line; the latter kind of problem has been
mostly fixed.

Signed-off-by: Dan Farina <drfarina@acm.org>
Signed-off-by: Peter van Hardenberg <pvh@heroku.com>
This commit is contained in:
Dan Farina
2010-12-05 23:31:22 -08:00
committed by Peter van Hardenberg
parent 56c65acd99
commit af2edf10a0
8 changed files with 941 additions and 851 deletions

View File

@@ -31,45 +31,45 @@ static int mkdir_p(char *path, mode_t omode);
int int
check_dir(char *dir) check_dir(char *dir)
{ {
DIR *chkdir; DIR *chkdir;
struct dirent *file; struct dirent *file;
int result = 1; int result = 1;
errno = 0; errno = 0;
chkdir = opendir(dir); chkdir = opendir(dir);
if (!chkdir) if (!chkdir)
return (errno == ENOENT) ? 0 : -1; return (errno == ENOENT) ? 0 : -1;
while ((file = readdir(chkdir)) != NULL) while ((file = readdir(chkdir)) != NULL)
{ {
if (strcmp(".", file->d_name) == 0 || if (strcmp(".", file->d_name) == 0 ||
strcmp("..", file->d_name) == 0) strcmp("..", file->d_name) == 0)
{ {
/* skip this and parent directory */ /* skip this and parent directory */
continue; continue;
} }
else else
{ {
result = 2; /* not empty */ result = 2; /* not empty */
break; break;
} }
} }
#ifdef WIN32 #ifdef WIN32
/* /*
* This fix is in mingw cvs (runtime/mingwex/dirent.c rev 1.4), but not in * This fix is in mingw cvs (runtime/mingwex/dirent.c rev 1.4), but not in
* released version * released version
*/ */
if (GetLastError() == ERROR_NO_MORE_FILES) if (GetLastError() == ERROR_NO_MORE_FILES)
errno = 0; errno = 0;
#endif #endif
closedir(chkdir); closedir(chkdir);
if (errno != 0) if (errno != 0)
return -1; /* some kind of I/O error? */ return -1; /* some kind of I/O error? */
return result; return result;
} }
@@ -81,13 +81,13 @@ check_dir(char *dir)
bool bool
create_directory(char *dir) create_directory(char *dir)
{ {
if (mkdir_p(dir, 0700) == 0) if (mkdir_p(dir, 0700) == 0)
return true; return true;
fprintf(stderr, _("Could not create directory \"%s\": %s\n"), fprintf(stderr, _("Could not create directory \"%s\": %s\n"),
dir, strerror(errno)); dir, strerror(errno));
return false; return false;
} }
bool bool
@@ -114,10 +114,10 @@ mkdir_p(char *path, mode_t omode)
{ {
struct stat sb; struct stat sb;
mode_t numask, mode_t numask,
oumask; oumask;
int first, int first,
last, last,
retval; retval;
char *p; char *p;
p = path; p = path;
@@ -212,5 +212,5 @@ is_pg_dir(char *dir)
sprintf(path, "%s/PG_VERSION", dir); sprintf(path, "%s/PG_VERSION", dir);
return (stat(path, &sb) == 0) ? true : false; return (stat(path, &sb) == 0) ? true : false;
} }

View File

@@ -8,13 +8,14 @@
#include "repmgr.h" #include "repmgr.h"
void void
parse_config(const char *config_file, char *cluster_name, int *node, char *conninfo) parse_config(const char *config_file, char *cluster_name, int *node,
char *conninfo)
{ {
char *s, buff[256]; char *s, buff[256];
FILE *fp = fopen (config_file, "r"); FILE *fp = fopen (config_file, "r");
if (fp == NULL) if (fp == NULL)
return; return;
/* Read next line */ /* Read next line */
while ((s = fgets (buff, sizeof buff, fp)) != NULL) while ((s = fgets (buff, sizeof buff, fp)) != NULL)
@@ -22,46 +23,47 @@ parse_config(const char *config_file, char *cluster_name, int *node, char *conni
char name[MAXLEN]; char name[MAXLEN];
char value[MAXLEN]; char value[MAXLEN];
/* Skip blank lines and comments */ /* Skip blank lines and comments */
if (buff[0] == '\n' || buff[0] == '#') if (buff[0] == '\n' || buff[0] == '#')
continue; continue;
/* Parse name/value pair from line */ /* Parse name/value pair from line */
parse_line(buff, name, value); parse_line(buff, name, value);
/* Copy into correct entry in parameters struct */ /* Copy into correct entry in parameters struct */
if (strcmp(name, "cluster") == 0) if (strcmp(name, "cluster") == 0)
strncpy (cluster_name, value, MAXLEN); strncpy (cluster_name, value, MAXLEN);
else if (strcmp(name, "node") == 0) else if (strcmp(name, "node") == 0)
*node = atoi(value); *node = atoi(value);
else if (strcmp(name, "conninfo") == 0) else if (strcmp(name, "conninfo") == 0)
strncpy (conninfo, value, MAXLEN); strncpy (conninfo, value, MAXLEN);
else else
printf ("WARNING: %s/%s: Unknown name/value pair!\n", name, value); printf("WARNING: %s/%s: Unknown name/value pair!\n",
} name, value);
}
/* Close file */ /* Close file */
fclose (fp); fclose (fp);
} }
char * char *
trim (char *s) trim (char *s)
{ {
/* Initialize start, end pointers */ /* Initialize start, end pointers */
char *s1 = s, *s2 = &s[strlen (s) - 1]; char *s1 = s, *s2 = &s[strlen (s) - 1];
/* Trim and delimit right side */ /* Trim and delimit right side */
while ( (isspace (*s2)) && (s2 >= s1) ) while ( (isspace (*s2)) && (s2 >= s1) )
s2--; s2--;
*(s2+1) = '\0'; *(s2+1) = '\0';
/* Trim left side */ /* Trim left side */
while ( (isspace (*s1)) && (s1 < s2) ) while ( (isspace (*s1)) && (s1 < s2) )
s1++; s1++;
/* Copy finished string */ /* Copy finished string */
strcpy (s, s1); strcpy (s, s1);
return s; return s;
} }
void void
@@ -96,5 +98,5 @@ parse_line(char *buff, char *name, char *value)
else else
break; break;
value[j] = '\0'; value[j] = '\0';
trim(value); trim(value);
} }

View File

@@ -4,6 +4,7 @@
* *
*/ */
void parse_config(const char *config_file, char *cluster_name, int *node, char *service); void parse_config(const char *config_file, char *cluster_name, int *node,
char *service);
void parse_line(char *buff, char *name, char *value); void parse_line(char *buff, char *name, char *value);
char *trim(char *s); char *trim(char *s);

160
dbutils.c
View File

@@ -12,19 +12,21 @@ PGconn *
establishDBConnection(const char *conninfo, const bool exit_on_error) establishDBConnection(const char *conninfo, const bool exit_on_error)
{ {
PGconn *conn; PGconn *conn;
/* Make a connection to the database */
conn = PQconnectdb(conninfo); /* Make a connection to the database */
/* Check to see that the backend connection was successfully made */ conn = PQconnectdb(conninfo);
if ((PQstatus(conn) != CONNECTION_OK))
{ /* Check to see that the backend connection was successfully made */
fprintf(stderr, "Connection to database failed: %s", if ((PQstatus(conn) != CONNECTION_OK))
PQerrorMessage(conn)); {
fprintf(stderr, "Connection to database failed: %s",
PQerrorMessage(conn));
if (exit_on_error) if (exit_on_error)
{ {
PQfinish(conn); PQfinish(conn);
exit(1); exit(1);
} }
} }
return conn; return conn;
} }
@@ -34,17 +36,18 @@ establishDBConnection(const char *conninfo, const bool exit_on_error)
bool bool
is_standby(PGconn *conn) is_standby(PGconn *conn)
{ {
PGresult *res; PGresult *res;
bool result; bool result;
res = PQexec(conn, "SELECT pg_is_in_recovery()"); res = PQexec(conn, "SELECT pg_is_in_recovery()");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{ if (PQresultStatus(res) != PGRES_TUPLES_OK)
fprintf(stderr, "Can't query server mode: %s", PQerrorMessage(conn)); {
PQclear(res); fprintf(stderr, "Can't query server mode: %s", PQerrorMessage(conn));
PQfinish(conn); PQclear(res);
PQfinish(conn);
exit(1); exit(1);
} }
if (strcmp(PQgetvalue(res, 0, 0), "f") == 0) if (strcmp(PQgetvalue(res, 0, 0), "f") == 0)
result = false; result = false;
@@ -69,20 +72,26 @@ pg_version(PGconn *conn)
int major_version1; int major_version1;
char *major_version2; char *major_version2;
res = PQexec(conn, "WITH pg_version(ver) AS (SELECT split_part(version(), ' ', 2)) " res = PQexec(conn,
"SELECT split_part(ver, '.', 1), split_part(ver, '.', 2) FROM pg_version"); "WITH pg_version(ver) AS "
if (PQresultStatus(res) != PGRES_TUPLES_OK) "(SELECT split_part(version(), ' ', 2)) "
{ "SELECT split_part(ver, '.', 1), split_part(ver, '.', 2) "
"FROM pg_version");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
fprintf(stderr, "PQexec failed: %s", PQerrorMessage(conn)); fprintf(stderr, "PQexec failed: %s", PQerrorMessage(conn));
PQclear(res); PQclear(res);
PQfinish(conn); PQfinish(conn);
exit(1); exit(1);
} }
major_version1 = atoi(PQgetvalue(res, 0, 0));
major_version2 = PQgetvalue(res, 0, 1); major_version1 = atoi(PQgetvalue(res, 0, 0));
PQclear(res); major_version2 = PQgetvalue(res, 0, 1);
PQclear(res);
major_version = malloc(10); major_version = malloc(10);
if (major_version1 >= 9) if (major_version1 >= 9)
{ {
/* form a major version string */ /* form a major version string */
@@ -96,27 +105,28 @@ pg_version(PGconn *conn)
bool bool
guc_setted(PGconn *conn, const char *parameter, const char *op, const char *value) guc_setted(PGconn *conn, const char *parameter, const char *op,
const char *value)
{ {
PGresult *res; PGresult *res;
char sqlquery[8192]; char sqlquery[8192];
sprintf(sqlquery, "SELECT true FROM pg_settings " sprintf(sqlquery, "SELECT true FROM pg_settings "
" WHERE name = '%s' AND setting %s '%s'", " WHERE name = '%s' AND setting %s '%s'",
parameter, op, value); parameter, op, value);
res = PQexec(conn, sqlquery); res = PQexec(conn, sqlquery);
if (PQresultStatus(res) != PGRES_TUPLES_OK) if (PQresultStatus(res) != PGRES_TUPLES_OK)
{ {
fprintf(stderr, "PQexec failed: %s", PQerrorMessage(conn)); fprintf(stderr, "PQexec failed: %s", PQerrorMessage(conn));
PQclear(res); PQclear(res);
PQfinish(conn); PQfinish(conn);
exit(1); exit(1);
} }
if (PQntuples(res) == 0) if (PQntuples(res) == 0)
{ {
PQclear(res); PQclear(res);
return false; return false;
} }
PQclear(res); PQclear(res);
@@ -131,18 +141,19 @@ get_cluster_size(PGconn *conn)
const char *size; const char *size;
char sqlquery[8192]; char sqlquery[8192];
sprintf(sqlquery, "SELECT pg_size_pretty(SUM(pg_database_size(oid))::bigint) " sprintf(sqlquery,
" FROM pg_database "); "SELECT pg_size_pretty(SUM(pg_database_size(oid))::bigint) "
" FROM pg_database ");
res = PQexec(conn, sqlquery); res = PQexec(conn, sqlquery);
if (PQresultStatus(res) != PGRES_TUPLES_OK) if (PQresultStatus(res) != PGRES_TUPLES_OK)
{ {
fprintf(stderr, "PQexec failed: %s", PQerrorMessage(conn)); fprintf(stderr, "PQexec failed: %s", PQerrorMessage(conn));
PQclear(res); PQclear(res);
PQfinish(conn); PQfinish(conn);
exit(1); exit(1);
} }
size = PQgetvalue(res, 0, 0); size = PQgetvalue(res, 0, 0);
PQclear(res); PQclear(res);
return size; return size;
} }
@@ -152,51 +163,56 @@ get_cluster_size(PGconn *conn)
* to each node (one at a time) and finding if it is a master or a standby * to each node (one at a time) and finding if it is a master or a standby
*/ */
PGconn * PGconn *
getMasterConnection(PGconn *standby_conn, int id, char *cluster, int *master_id) getMasterConnection(PGconn *standby_conn, int id, char *cluster,
int *master_id)
{ {
PGconn *master_conn = NULL; PGconn *master_conn = NULL;
PGresult *res1; PGresult *res1;
PGresult *res2; PGresult *res2;
char sqlquery[8192]; char sqlquery[8192];
char master_conninfo[8192]; char master_conninfo[8192];
int i; int i;
/* find all nodes belonging to this cluster */ /* find all nodes belonging to this cluster */
sprintf(sqlquery, "SELECT * FROM repmgr_%s.repl_nodes " sprintf(sqlquery, "SELECT * FROM repmgr_%s.repl_nodes "
" WHERE cluster = '%s' and id <> %d", " WHERE cluster = '%s' and id <> %d",
cluster, cluster, id); cluster, cluster, id);
res1 = PQexec(standby_conn, sqlquery); res1 = PQexec(standby_conn, sqlquery);
if (PQresultStatus(res1) != PGRES_TUPLES_OK) if (PQresultStatus(res1) != PGRES_TUPLES_OK)
{ {
fprintf(stderr, "Can't get nodes info: %s\n", PQerrorMessage(standby_conn)); fprintf(stderr, "Can't get nodes info: %s\n",
PQclear(res1); PQerrorMessage(standby_conn));
PQclear(res1);
PQfinish(standby_conn); PQfinish(standby_conn);
exit(1); exit(1);
} }
for (i = 0; i < PQntuples(res1); i++) for (i = 0; i < PQntuples(res1); i++)
{ {
/* initialize with the values of the current node being processed */ /* initialize with the values of the current node being processed */
*master_id = atoi(PQgetvalue(res1, i, 0)); *master_id = atoi(PQgetvalue(res1, i, 0));
strcpy(master_conninfo, PQgetvalue(res1, i, 2)); strcpy(master_conninfo, PQgetvalue(res1, i, 2));
master_conn = establishDBConnection(master_conninfo, false); master_conn = establishDBConnection(master_conninfo, false);
if (PQstatus(master_conn) != CONNECTION_OK) if (PQstatus(master_conn) != CONNECTION_OK)
continue; continue;
/* /*
* I can't use the is_standby() function here because on error that * I can't use the is_standby() function here because on error that
* function closes the connection i pass and exit, but i still need to close * function closes the connection i pass and exit, but i still need to
* standby_conn * close standby_conn
*/ */
res2 = PQexec(master_conn, "SELECT pg_is_in_recovery()"); res2 = PQexec(master_conn, "SELECT pg_is_in_recovery()");
if (PQresultStatus(res2) != PGRES_TUPLES_OK)
{ if (PQresultStatus(res2) != PGRES_TUPLES_OK)
fprintf(stderr, "Can't get recovery state from this node: %s\n", PQerrorMessage(master_conn)); {
PQclear(res2); fprintf(stderr, "Can't get recovery state from this node: %s\n",
PQerrorMessage(master_conn));
PQclear(res2);
PQfinish(master_conn); PQfinish(master_conn);
continue; continue;
} }
/* if false, this is the master */ /* if false, this is the master */
if (strcmp(PQgetvalue(res2, 0, 0), "f") == 0) if (strcmp(PQgetvalue(res2, 0, 0), "f") == 0)
@@ -212,17 +228,17 @@ getMasterConnection(PGconn *standby_conn, int id, char *cluster, int *master_id)
PQfinish(master_conn); PQfinish(master_conn);
*master_id = -1; *master_id = -1;
} }
} }
/* If we finish this loop without finding a master then /* If we finish this loop without finding a master then
* we doesn't have the info or the master has failed (or we * we doesn't have the info or the master has failed (or we
* reached max_connections or superuser_reserved_connections, * reached max_connections or superuser_reserved_connections,
* anything else i'm missing?), * anything else I'm missing?).
*
* Probably we will need to check the error to know if we need * Probably we will need to check the error to know if we need
* to start failover procedure or just fix some situation on the * to start failover procedure or just fix some situation on the
* standby. * standby.
*/ */
PQclear(res1); PQclear(res1);
return NULL; return NULL;
} }

View File

@@ -5,8 +5,10 @@
*/ */
PGconn *establishDBConnection(const char *conninfo, const bool exit_on_error); PGconn *establishDBConnection(const char *conninfo, const bool exit_on_error);
bool is_standby(PGconn *conn); bool is_standby(PGconn *conn);
char *pg_version(PGconn *conn); char *pg_version(PGconn *conn);
bool guc_setted(PGconn *conn, const char *parameter, const char *op, const char *value); bool guc_setted(PGconn *conn, const char *parameter, const char *op,
const char *get_cluster_size(PGconn *conn); const char *value);
PGconn * getMasterConnection(PGconn *standby_conn, int id, char *cluster, int *master_id); const char *get_cluster_size(PGconn *conn);
PGconn * getMasterConnection(PGconn *standby_conn, int id, char *cluster,
int *master_id);

972
repmgr.c

File diff suppressed because it is too large Load Diff

View File

@@ -7,9 +7,9 @@ CREATE SCHEMA repmgr;
*/ */
drop table if exists repl_nodes cascade; drop table if exists repl_nodes cascade;
CREATE TABLE repl_nodes ( CREATE TABLE repl_nodes (
id integer primary key, id integer primary key,
cluster text not null, -- Name to identify the cluster cluster text not null, -- Name to identify the cluster
conninfo text not null conninfo text not null
); );
ALTER TABLE repl_nodes OWNER TO repmgr; ALTER TABLE repl_nodes OWNER TO repmgr;
@@ -21,7 +21,7 @@ drop table if exists repl_monitor cascade;
CREATE TABLE repl_monitor ( CREATE TABLE repl_monitor (
primary_node INTEGER NOT NULL, primary_node INTEGER NOT NULL,
standby_node INTEGER NOT NULL, standby_node INTEGER NOT NULL,
last_monitor_time TIMESTAMP WITH TIME ZONE NOT NULL, last_monitor_time TIMESTAMP WITH TIME ZONE NOT NULL,
last_wal_primary_location TEXT NOT NULL, last_wal_primary_location TEXT NOT NULL,
last_wal_standby_location TEXT NOT NULL, last_wal_standby_location TEXT NOT NULL,
replication_lag BIGINT NOT NULL, replication_lag BIGINT NOT NULL,

267
repmgrd.c
View File

@@ -17,11 +17,11 @@
#include "libpq/pqsignal.h" #include "libpq/pqsignal.h"
char myClusterName[MAXLEN]; char myClusterName[MAXLEN];
/* Local info */ /* Local info */
int myLocalMode = STANDBY_MODE; int myLocalMode = STANDBY_MODE;
int myLocalId = -1; int myLocalId = -1;
PGconn *myLocalConn; PGconn *myLocalConn;
/* Primary info */ /* Primary info */
@@ -49,23 +49,23 @@ static unsigned long long int walLocationToBytes(char *wal_location);
static void handle_sigint(SIGNAL_ARGS); static void handle_sigint(SIGNAL_ARGS);
static void setup_cancel_handler(void); static void setup_cancel_handler(void);
#define CloseConnections() \ #define CloseConnections() \
if (PQisBusy(primaryConn) == 1) \ if (PQisBusy(primaryConn) == 1) \
CancelQuery(); \ CancelQuery(); \
if (myLocalConn != NULL) \ if (myLocalConn != NULL) \
PQfinish(myLocalConn); \ PQfinish(myLocalConn); \
if (primaryConn != NULL) \ if (primaryConn != NULL) \
PQfinish(primaryConn); PQfinish(primaryConn);
/* /*
* Every 3 seconds, insert monitor info * Every 3 seconds, insert monitor info
*/ */
#define MonitorCheck() \ #define MonitorCheck() \
for (;;) \ for (;;) \
{ \ { \
MonitorExecute(); \ MonitorExecute(); \
sleep(3); \ sleep(3); \
} }
int int
@@ -80,7 +80,7 @@ main(int argc, char **argv)
int optindex; int optindex;
int c; int c;
char conninfo[MAXLEN]; char conninfo[MAXLEN];
const char *standby_version = NULL; const char *standby_version = NULL;
progname = get_progname(argv[0]); progname = get_progname(argv[0]);
@@ -111,7 +111,8 @@ main(int argc, char **argv)
verbose = true; verbose = true;
break; break;
default: default:
fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname); fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
progname);
exit(1); exit(1);
} }
} }
@@ -126,30 +127,31 @@ main(int argc, char **argv)
/* /*
* Read the configuration file: repmgr.conf * Read the configuration file: repmgr.conf
*/ */
parse_config(config_file, myClusterName, &myLocalId, conninfo); parse_config(config_file, myClusterName, &myLocalId, conninfo);
if (myLocalId == -1) if (myLocalId == -1)
{ {
fprintf(stderr, "Node information is missing. " fprintf(stderr, "Node information is missing. "
"Check the configuration file.\n"); "Check the configuration file.\n");
exit(1); exit(1);
} }
myLocalConn = establishDBConnection(conninfo, true); myLocalConn = establishDBConnection(conninfo, true);
/* should be v9 or better */ /* should be v9 or better */
standby_version = pg_version(myLocalConn); standby_version = pg_version(myLocalConn);
if (strcmp(standby_version, "") == 0) if (strcmp(standby_version, "") == 0)
{ {
PQfinish(myLocalConn); PQfinish(myLocalConn);
fprintf(stderr, _("%s needs standby to be PostgreSQL 9.0 or better\n"), progname); fprintf(stderr, _("%s needs standby to be PostgreSQL 9.0 or better\n"),
progname);
exit(1); exit(1);
} }
/* /*
* Set my server mode, establish a connection to primary * Set my server mode, establish a connection to primary
* and start monitor * and start monitor
*/ */
myLocalMode = is_standby(myLocalConn) ? STANDBY_MODE : PRIMARY_MODE; myLocalMode = is_standby(myLocalConn) ? STANDBY_MODE : PRIMARY_MODE;
if (myLocalMode == PRIMARY_MODE) if (myLocalMode == PRIMARY_MODE)
{ {
@@ -160,7 +162,8 @@ main(int argc, char **argv)
else else
{ {
/* I need the id of the primary as well as a connection to it */ /* I need the id of the primary as well as a connection to it */
primaryConn = getMasterConnection(myLocalConn, myLocalId, myClusterName, &primaryId); primaryConn = getMasterConnection(myLocalConn, myLocalId,
myClusterName, &primaryId);
if (primaryConn == NULL) if (primaryConn == NULL)
exit(1); exit(1);
} }
@@ -172,10 +175,10 @@ main(int argc, char **argv)
MonitorCheck(); MonitorCheck();
} }
/* close the connection to the database and cleanup */ /* close the connection to the database and cleanup */
CloseConnections(); CloseConnections();
return 0; return 0;
} }
@@ -187,7 +190,7 @@ main(int argc, char **argv)
static void static void
MonitorExecute(void) MonitorExecute(void)
{ {
PGresult *res; PGresult *res;
char monitor_standby_timestamp[MAXLEN]; char monitor_standby_timestamp[MAXLEN];
char last_wal_primary_location[MAXLEN]; char last_wal_primary_location[MAXLEN];
char last_wal_standby_received[MAXLEN]; char last_wal_standby_received[MAXLEN];
@@ -221,11 +224,14 @@ MonitorExecute(void)
} }
if (PQstatus(primaryConn) != CONNECTION_OK) if (PQstatus(primaryConn) != CONNECTION_OK)
{ {
fprintf(stderr, "\n%s: We couldn't reconnect to master, checking if ", progname); fprintf(stderr, "\n%s: We couldn't reconnect to master, checking if ",
progname);
fprintf(stderr, "%s: another node has been promoted.\n", progname); fprintf(stderr, "%s: another node has been promoted.\n", progname);
for (connection_retries = 0; connection_retries < 6; connection_retries++) for (connection_retries = 0; connection_retries < 6;
connection_retries++)
{ {
primaryConn = getMasterConnection(myLocalConn, myLocalId, myClusterName, &primaryId); primaryConn = getMasterConnection(myLocalConn, myLocalId,
myClusterName, &primaryId);
if (PQstatus(primaryConn) == CONNECTION_OK) if (PQstatus(primaryConn) == CONNECTION_OK)
{ {
/* Connected, we can continue the process so break the loop */ /* Connected, we can continue the process so break the loop */
@@ -258,43 +264,43 @@ MonitorExecute(void)
/* /*
* first check if there is a command being executed, * first check if there is a command being executed,
* and if that is the case, cancel the query so i can * and if that is the case, cancel the query so i can
* insert the current record * insert the current record
*/ */
if (PQisBusy(primaryConn) == 1) if (PQisBusy(primaryConn) == 1)
CancelQuery(); CancelQuery();
/* Get local xlog info */ /* Get local xlog info */
sprintf(sqlquery, sprintf(sqlquery,
"SELECT CURRENT_TIMESTAMP, pg_last_xlog_receive_location(), " "SELECT CURRENT_TIMESTAMP, pg_last_xlog_receive_location(), "
"pg_last_xlog_replay_location()"); "pg_last_xlog_replay_location()");
res = PQexec(myLocalConn, sqlquery); res = PQexec(myLocalConn, sqlquery);
if (PQresultStatus(res) != PGRES_TUPLES_OK) if (PQresultStatus(res) != PGRES_TUPLES_OK)
{ {
fprintf(stderr, "PQexec failed: %s\n", PQerrorMessage(myLocalConn)); fprintf(stderr, "PQexec failed: %s\n", PQerrorMessage(myLocalConn));
PQclear(res); PQclear(res);
/* if there is any error just let it be and retry in next loop */ /* if there is any error just let it be and retry in next loop */
return; return;
} }
strcpy(monitor_standby_timestamp, PQgetvalue(res, 0, 0)); strcpy(monitor_standby_timestamp, PQgetvalue(res, 0, 0));
strcpy(last_wal_standby_received , PQgetvalue(res, 0, 1)); strcpy(last_wal_standby_received , PQgetvalue(res, 0, 1));
strcpy(last_wal_standby_applied , PQgetvalue(res, 0, 2)); strcpy(last_wal_standby_applied , PQgetvalue(res, 0, 2));
PQclear(res); PQclear(res);
/* Get primary xlog info */ /* Get primary xlog info */
sprintf(sqlquery, "SELECT pg_current_xlog_location() "); sprintf(sqlquery, "SELECT pg_current_xlog_location() ");
res = PQexec(primaryConn, sqlquery); res = PQexec(primaryConn, sqlquery);
if (PQresultStatus(res) != PGRES_TUPLES_OK) if (PQresultStatus(res) != PGRES_TUPLES_OK)
{ {
fprintf(stderr, "PQexec failed: %s\n", PQerrorMessage(primaryConn)); fprintf(stderr, "PQexec failed: %s\n", PQerrorMessage(primaryConn));
PQclear(res); PQclear(res);
return; return;
} }
strcpy(last_wal_primary_location, PQgetvalue(res, 0, 0)); strcpy(last_wal_primary_location, PQgetvalue(res, 0, 0));
PQclear(res); PQclear(res);
/* Calculate the lag */ /* Calculate the lag */
lsn_primary = walLocationToBytes(last_wal_primary_location); lsn_primary = walLocationToBytes(last_wal_primary_location);
@@ -305,15 +311,15 @@ MonitorExecute(void)
* Build the SQL to execute on primary * Build the SQL to execute on primary
*/ */
sprintf(sqlquery, sprintf(sqlquery,
"INSERT INTO repmgr_%s.repl_monitor " "INSERT INTO repmgr_%s.repl_monitor "
"VALUES(%d, %d, '%s'::timestamp with time zone, " "VALUES(%d, %d, '%s'::timestamp with time zone, "
" '%s', '%s', " " '%s', '%s', "
" %lld, %lld)", myClusterName, " %lld, %lld)", myClusterName,
primaryId, myLocalId, monitor_standby_timestamp, primaryId, myLocalId, monitor_standby_timestamp,
last_wal_primary_location, last_wal_primary_location,
last_wal_standby_received, last_wal_standby_received,
(lsn_primary - lsn_standby_received), (lsn_primary - lsn_standby_received),
(lsn_standby_received - lsn_standby_applied)); (lsn_standby_received - lsn_standby_applied));
/* /*
* Execute the query asynchronously, but don't check for a result. We * Execute the query asynchronously, but don't check for a result. We
@@ -321,39 +327,41 @@ MonitorExecute(void)
*/ */
if (PQsendQuery(primaryConn, sqlquery) == 0) if (PQsendQuery(primaryConn, sqlquery) == 0)
fprintf(stderr, "Query could not be sent to primary. %s\n", fprintf(stderr, "Query could not be sent to primary. %s\n",
PQerrorMessage(primaryConn)); PQerrorMessage(primaryConn));
} }
static void static void
checkClusterConfiguration(void) checkClusterConfiguration(void)
{ {
PGresult *res; PGresult *res;
sprintf(sqlquery, "SELECT oid FROM pg_class " sprintf(sqlquery, "SELECT oid FROM pg_class "
" WHERE oid = 'repmgr_%s.repl_nodes'::regclass", " WHERE oid = 'repmgr_%s.repl_nodes'::regclass",
myClusterName); myClusterName);
res = PQexec(myLocalConn, sqlquery); res = PQexec(myLocalConn, sqlquery);
if (PQresultStatus(res) != PGRES_TUPLES_OK) if (PQresultStatus(res) != PGRES_TUPLES_OK)
{ {
fprintf(stderr, "PQexec failed: %s\n", PQerrorMessage(myLocalConn)); fprintf(stderr, "PQexec failed: %s\n", PQerrorMessage(myLocalConn));
PQclear(res); PQclear(res);
PQfinish(myLocalConn); PQfinish(myLocalConn);
PQfinish(primaryConn); PQfinish(primaryConn);
exit(1); exit(1);
} }
/* /*
* If there isn't any results then we have not configured a primary node yet * If there isn't any results then we have not configured a primary node
* in repmgr or the connection string is pointing to the wrong database. * yet in repmgr or the connection string is pointing to the wrong
* database.
*
* XXX if we are the primary, should we try to create the tables needed? * XXX if we are the primary, should we try to create the tables needed?
*/ */
if (PQntuples(res) == 0) if (PQntuples(res) == 0)
{ {
fprintf(stderr, "The replication cluster is not configured\n"); fprintf(stderr, "The replication cluster is not configured\n");
PQclear(res); PQclear(res);
PQfinish(myLocalConn); PQfinish(myLocalConn);
PQfinish(primaryConn); PQfinish(primaryConn);
exit(1); exit(1);
} }
PQclear(res); PQclear(res);
@@ -363,24 +371,22 @@ checkClusterConfiguration(void)
static void static void
checkNodeConfiguration(char *conninfo) checkNodeConfiguration(char *conninfo)
{ {
PGresult *res; PGresult *res;
/* /* Check if we have my node information in repl_nodes */
* Check if we have my node information in repl_nodes
*/
sprintf(sqlquery, "SELECT * FROM repmgr_%s.repl_nodes " sprintf(sqlquery, "SELECT * FROM repmgr_%s.repl_nodes "
" WHERE id = %d AND cluster = '%s' ", " WHERE id = %d AND cluster = '%s' ",
myClusterName, myLocalId, myClusterName); myClusterName, myLocalId, myClusterName);
res = PQexec(myLocalConn, sqlquery); res = PQexec(myLocalConn, sqlquery);
if (PQresultStatus(res) != PGRES_TUPLES_OK) if (PQresultStatus(res) != PGRES_TUPLES_OK)
{ {
fprintf(stderr, "PQexec failed: %s\n", PQerrorMessage(myLocalConn)); fprintf(stderr, "PQexec failed: %s\n", PQerrorMessage(myLocalConn));
PQclear(res); PQclear(res);
PQfinish(myLocalConn); PQfinish(myLocalConn);
PQfinish(primaryConn); PQfinish(primaryConn);
exit(1); exit(1);
} }
/* /*
* If there isn't any results then we have not configured this node yet * If there isn't any results then we have not configured this node yet
@@ -388,16 +394,17 @@ checkNodeConfiguration(char *conninfo)
*/ */
if (PQntuples(res) == 0) if (PQntuples(res) == 0)
{ {
PQclear(res); PQclear(res);
/* Adding the node */ /* Adding the node */
sprintf(sqlquery, "INSERT INTO repmgr_%s.repl_nodes " sprintf(sqlquery, "INSERT INTO repmgr_%s.repl_nodes "
"VALUES (%d, '%s', '%s')", "VALUES (%d, '%s', '%s')",
myClusterName, myLocalId, myClusterName, conninfo); myClusterName, myLocalId, myClusterName, conninfo);
if (!PQexec(primaryConn, sqlquery)) if (!PQexec(primaryConn, sqlquery))
{ {
fprintf(stderr, "Cannot insert node details, %s\n", fprintf(stderr, "Cannot insert node details, %s\n",
PQerrorMessage(primaryConn)); PQerrorMessage(primaryConn));
PQfinish(myLocalConn); PQfinish(myLocalConn);
PQfinish(primaryConn); PQfinish(primaryConn);
exit(1); exit(1);
@@ -410,30 +417,30 @@ checkNodeConfiguration(char *conninfo)
static unsigned long long int static unsigned long long int
walLocationToBytes(char *wal_location) walLocationToBytes(char *wal_location)
{ {
unsigned int xlogid; unsigned int xlogid;
unsigned int xrecoff; unsigned int xrecoff;
if (sscanf(wal_location, "%X/%X", &xlogid, &xrecoff) != 2) if (sscanf(wal_location, "%X/%X", &xlogid, &xrecoff) != 2)
{ {
fprintf(stderr, "wrong log location format: %s\n", wal_location); fprintf(stderr, "wrong log location format: %s\n", wal_location);
return 0; return 0;
} }
return ((xlogid * 16 * 1024 * 1024 * 255) + xrecoff); return ((xlogid * 16 * 1024 * 1024 * 255) + xrecoff);
} }
static void static void
help(const char *progname) help(const char *progname)
{ {
printf(_("\n%s: Replicator manager daemon \n"), progname); printf(_("\n%s: Replicator manager daemon \n"), progname);
printf(_("Usage:\n")); printf(_("Usage:\n"));
printf(_(" %s [OPTIONS]\n"), progname); printf(_(" %s [OPTIONS]\n"), progname);
printf(_("\nOptions:\n")); printf(_("\nOptions:\n"));
printf(_(" --help show this help, then exit\n")); printf(_(" --help show this help, then exit\n"));
printf(_(" --version output version information, then exit\n")); printf(_(" --version output version information, then exit\n"));
printf(_(" --verbose output verbose activity information\n")); printf(_(" --verbose output verbose activity information\n"));
printf(_(" -f, --config_file=PATH database to connect to\n")); printf(_(" -f, --config_file=PATH database to connect to\n"));
printf(_("\n%s monitors a cluster of servers.\n"), progname); printf(_("\n%s monitors a cluster of servers.\n"), progname);
} }
@@ -442,13 +449,13 @@ help(const char *progname)
static void static void
handle_sigint(SIGNAL_ARGS) handle_sigint(SIGNAL_ARGS)
{ {
CloseConnections(); CloseConnections();
} }
static void static void
setup_cancel_handler(void) setup_cancel_handler(void)
{ {
pqsignal(SIGINT, handle_sigint); pqsignal(SIGINT, handle_sigint);
} }
#endif #endif
@@ -456,13 +463,13 @@ setup_cancel_handler(void)
static void static void
CancelQuery(void) CancelQuery(void)
{ {
char errbuf[256]; char errbuf[256];
PGcancel *pgcancel; PGcancel *pgcancel;
pgcancel = PQgetCancel(primaryConn); pgcancel = PQgetCancel(primaryConn);
if (!pgcancel || PQcancel(pgcancel, errbuf, 256) == 0) if (!pgcancel || PQcancel(pgcancel, errbuf, 256) == 0)
fprintf(stderr, "Can't stop current query: %s", errbuf); fprintf(stderr, "Can't stop current query: %s", errbuf);
PQfreeCancel(pgcancel); PQfreeCancel(pgcancel);
} }