Remove "XL*" macros and rationalize XLogRecPtr handling

As of 9.3, XLogRecPtr is a uint64, which makes it much easier to
handle, so we can retire the legacy macros and work directly
with the values.
This commit is contained in:
Ian Barwick
2015-01-12 17:18:57 +09:00
parent 41a5274b44
commit 93d5d7905f

135
repmgrd.c
View File

@@ -39,16 +39,6 @@
#include "access/xlogdefs.h" #include "access/xlogdefs.h"
#include "libpq/pqsignal.h" #include "libpq/pqsignal.h"
#define XLAssign(a, b) \
a = b
#define XLAssignValue(a, xlogid, xrecoff) \
a = xrecoff
#define XLByteLT(a, b) \
(a < b)
/* /*
* Struct to keep info about the nodes, used in the voting process in * Struct to keep info about the nodes, used in the voting process in
@@ -99,7 +89,7 @@ static void update_shared_memory(char *last_wal_standby_applied);
static void update_registration(void); static void update_registration(void);
static void do_failover(void); static void do_failover(void);
static unsigned long long int wal_location_to_bytes(char *wal_location); static XLogRecPtr lsn_to_xlogrecptr(char *lsn, bool *format_ok);
/* /*
* Flag to mark SIGHUP. Whenever the main loop comes around it * Flag to mark SIGHUP. Whenever the main loop comes around it
@@ -614,9 +604,9 @@ standby_monitor(void)
char last_wal_standby_applied_timestamp[MAXLEN]; char last_wal_standby_applied_timestamp[MAXLEN];
char sqlquery[QUERY_STR_LEN]; char sqlquery[QUERY_STR_LEN];
unsigned long long int lsn_primary; XLogRecPtr lsn_primary;
unsigned long long int lsn_standby_received; XLogRecPtr lsn_standby_received;
unsigned long long int lsn_standby_applied; XLogRecPtr lsn_standby_applied;
int connection_retries, int connection_retries,
ret; ret;
@@ -768,9 +758,9 @@ standby_monitor(void)
PQclear(res); PQclear(res);
/* Calculate the lag */ /* Calculate the lag */
lsn_primary = wal_location_to_bytes(last_wal_primary_location); lsn_primary = lsn_to_xlogrecptr(last_wal_primary_location, NULL);
lsn_standby_received = wal_location_to_bytes(last_wal_standby_received); lsn_standby_received = lsn_to_xlogrecptr(last_wal_standby_received, NULL);
lsn_standby_applied = wal_location_to_bytes(last_wal_standby_applied); lsn_standby_applied = lsn_to_xlogrecptr(last_wal_standby_applied, NULL);
/* /*
* Build the SQL to execute on primary * Build the SQL to execute on primary
@@ -779,14 +769,14 @@ standby_monitor(void)
"INSERT INTO %s.repl_monitor " "INSERT INTO %s.repl_monitor "
"VALUES(%d, %d, '%s'::timestamp with time zone, " "VALUES(%d, %d, '%s'::timestamp with time zone, "
"'%s'::timestamp with time zone, '%s', '%s', " "'%s'::timestamp with time zone, '%s', '%s', "
"%lld, %lld) ", "%llu, %llu) ",
get_repmgr_schema_quoted(primary_conn), get_repmgr_schema_quoted(primary_conn),
primary_options.node, local_options.node, monitor_standby_timestamp, primary_options.node, local_options.node, monitor_standby_timestamp,
last_wal_standby_applied_timestamp, last_wal_standby_applied_timestamp,
last_wal_primary_location, last_wal_primary_location,
last_wal_standby_received, last_wal_standby_received,
(lsn_primary - lsn_standby_received), (long long unsigned int)(lsn_primary - lsn_standby_received),
(lsn_standby_received - lsn_standby_applied)); (long long unsigned int)(lsn_standby_received - lsn_standby_applied));
/* /*
* Execute the query asynchronously, but don't check for a result. We will * Execute the query asynchronously, but don't check for a result. We will
@@ -814,9 +804,8 @@ do_failover(void)
int i; int i;
int r; int r;
uint32 uxlogid;
uint32 uxrecoff;
XLogRecPtr xlog_recptr; XLogRecPtr xlog_recptr;
bool lsn_format_ok;
char last_wal_standby_applied[MAXLEN]; char last_wal_standby_applied[MAXLEN];
@@ -873,7 +862,7 @@ do_failover(void)
nodes[i].is_visible = false; nodes[i].is_visible = false;
nodes[i].is_ready = false; nodes[i].is_ready = false;
XLAssignValue(nodes[i].xlog_location, 0, 0); nodes[i].xlog_location = 0;
log_debug(_("%s: node=%d conninfo=\"%s\" witness=%s\n"), log_debug(_("%s: node=%d conninfo=\"%s\" witness=%s\n"),
progname, nodes[i].node_id, nodes[i].conninfo_str, progname, nodes[i].node_id, nodes[i].conninfo_str,
@@ -936,9 +925,6 @@ do_failover(void)
terminate(ERR_FAILOVER_FAIL); terminate(ERR_FAILOVER_FAIL);
} }
uxlogid = 0;
uxrecoff = 0;
sqlquery_snprintf(sqlquery, "SELECT pg_last_xlog_receive_location()"); sqlquery_snprintf(sqlquery, "SELECT pg_last_xlog_receive_location()");
res = PQexec(node_conn, sqlquery); res = PQexec(node_conn, sqlquery);
if (PQresultStatus(res) != PGRES_TUPLES_OK) if (PQresultStatus(res) != PGRES_TUPLES_OK)
@@ -951,23 +937,20 @@ do_failover(void)
terminate(ERR_FAILOVER_FAIL); terminate(ERR_FAILOVER_FAIL);
} }
if (sscanf(PQgetvalue(res, 0, 0), "%X/%X", &uxlogid, &uxrecoff) != 2) xlog_recptr = lsn_to_xlogrecptr(PQgetvalue(res, 0, 0), &lsn_format_ok);
log_info(_("could not parse transaction log location \"%s\"\n"),
PQgetvalue(res, 0, 0));
log_debug("XLog position of node %d: log id=%u (%X), offset=%u (%X)\n", log_debug(_("LSN of node %i is: %s\n"), nodes[i].node_id, PQgetvalue(res, 0, 0));
nodes[i].node_id, uxlogid, uxlogid, uxrecoff, uxrecoff);
/* If position is 0/0, error */ /* If position is 0/0, error */
if (uxlogid == 0 && uxrecoff == 0) if(xlog_recptr == InvalidXLogRecPtr)
{ {
PQclear(res); PQclear(res);
PQfinish(node_conn); PQfinish(node_conn);
log_info(_("InvalidXLogRecPtr detected in a standby\n")); log_info(_("InvalidXLogRecPtr detected on standby node %i\n"), nodes[i].node_id);
terminate(ERR_FAILOVER_FAIL); terminate(ERR_FAILOVER_FAIL);
} }
XLAssignValue(nodes[i].xlog_location, uxlogid, uxrecoff); nodes[i].xlog_location = xlog_recptr;
PQclear(res); PQclear(res);
PQfinish(node_conn); PQfinish(node_conn);
@@ -1033,9 +1016,6 @@ do_failover(void)
break; break;
} }
uxlogid = 0;
uxrecoff = 0;
sqlquery_snprintf(sqlquery, sqlquery_snprintf(sqlquery,
"SELECT %s.repmgr_get_last_standby_location()", "SELECT %s.repmgr_get_last_standby_location()",
get_repmgr_schema_quoted(node_conn)); get_repmgr_schema_quoted(node_conn));
@@ -1050,36 +1030,37 @@ do_failover(void)
terminate(ERR_DB_QUERY); terminate(ERR_DB_QUERY);
} }
if (sscanf(PQgetvalue(res, 0, 0), "%X/%X", &uxlogid, &uxrecoff) != 2) xlog_recptr = lsn_to_xlogrecptr(PQgetvalue(res, 0, 0), &lsn_format_ok);
{
log_info(_("could not parse transaction log location \"%s\"\n"),
PQgetvalue(res, 0, 0));
/* we can't do anything but fail at this point... */ /* If position is 0/0, check for format error, otherwise continue loop */
if (*PQgetvalue(res, 0, 0) == '\0') if(xlog_recptr == InvalidXLogRecPtr)
{
/* Unable to parse value returned by `repmgr_get_last_standby_location()` */
if(lsn_format_ok == false)
{ {
log_crit("Whoops, seems as if shared_preload_libraries=repmgr_funcs is not set!\n"); if(*PQgetvalue(res, 0, 0) == '\0')
exit(ERR_BAD_CONFIG); {
log_crit("Whoops, seems as if shared_preload_libraries=repmgr_funcs is not set!\n");
exit(ERR_BAD_CONFIG);
}
log_warning(_("Unable to parse LSN \"%s\"\n"),
PQgetvalue(res, 0, 0));
} }
PQclear(res);
PQfinish(node_conn);
continue;
} }
if (nodes[i].xlog_location < xlog_recptr)
{
nodes[i].xlog_location = xlog_recptr;
}
log_debug(_("LSN of node %i is: %s\n"), nodes[i].node_id, PQgetvalue(res, 0, 0));
PQclear(res); PQclear(res);
PQfinish(node_conn); PQfinish(node_conn);
/* If position is 0/0, keep checking */
if (uxlogid == 0 && uxrecoff == 0)
continue;
XLAssignValue(xlog_recptr, uxlogid, uxrecoff);
if (XLByteLT(nodes[i].xlog_location, xlog_recptr))
{
XLAssignValue(nodes[i].xlog_location, uxlogid, uxrecoff);
}
log_debug("Last XLog position of node %d: log id=%u (%X), offset=%u (%X)\n",
nodes[i].node_id, uxlogid, uxlogid,
uxrecoff, uxrecoff);
ready_nodes++; ready_nodes++;
nodes[i].is_ready = true; nodes[i].is_ready = true;
@@ -1109,23 +1090,21 @@ do_failover(void)
* one * one
*/ */
best_candidate.node_id = nodes[i].node_id; best_candidate.node_id = nodes[i].node_id;
XLAssign(best_candidate.xlog_location, nodes[i].xlog_location); best_candidate.xlog_location = nodes[i].xlog_location;
best_candidate.is_ready = nodes[i].is_ready; best_candidate.is_ready = nodes[i].is_ready;
best_candidate.is_witness = nodes[i].is_witness; best_candidate.is_witness = nodes[i].is_witness;
find_best = true; find_best = true;
} }
/* we use the macros provided by xlogdefs.h to compare XLogRecPtr */
/* /*
* Nodes are retrieved ordered by priority, so if the current best * Nodes are retrieved ordered by priority, so if the current best
* candidate is lower than the next node's wal location then assign * candidate is lower than the next node's wal location then assign
* next node as the new best candidate. * next node as the new best candidate.
*/ */
if (XLByteLT(best_candidate.xlog_location, nodes[i].xlog_location)) if (best_candidate.xlog_location < nodes[i].xlog_location)
{ {
best_candidate.node_id = nodes[i].node_id; best_candidate.node_id = nodes[i].node_id;
XLAssign(best_candidate.xlog_location, nodes[i].xlog_location); best_candidate.xlog_location = nodes[i].xlog_location;
best_candidate.is_ready = nodes[i].is_ready; best_candidate.is_ready = nodes[i].is_ready;
best_candidate.is_witness = nodes[i].is_witness; best_candidate.is_witness = nodes[i].is_witness;
} }
@@ -1360,20 +1339,32 @@ check_node_configuration(void)
} }
static unsigned long long int /*
wal_location_to_bytes(char *wal_location) * lsn_to_xlogrecptr()
*
* Convert an LSN represented as a string to an XLogRecPtr;
* optionally set a flag to indicated the provided string
* could not be parsed
*/
static XLogRecPtr
lsn_to_xlogrecptr(char *lsn, bool *format_ok)
{ {
unsigned int xlogid; uint32 xlogid;
unsigned int xrecoff; uint32 xrecoff;
if (sscanf(wal_location, "%X/%X", &xlogid, &xrecoff) != 2) if (sscanf(lsn, "%X/%X", &xlogid, &xrecoff) != 2)
{ {
log_err(_("wrong log location format: %s\n"), wal_location); if(format_ok != NULL)
*format_ok = false;
log_err(_("wrong log location format: %s\n"), lsn);
return 0; return 0;
} }
return (((long long) xlogid * 16 * 1024 * 1024 * 255) + xrecoff);
}
if(format_ok != NULL)
*format_ok = true;
return (((XLogRecPtr) xlogid * 16 * 1024 * 1024 * 255) + xrecoff);
}
void void
usage(void) usage(void)