Dropping phase2 from repmgr, it now is an independent tool

called pg_blender
This commit is contained in:
postgres
2010-09-13 10:58:39 -05:00
parent 9f61ea6c5c
commit 7ad17f7f48
5 changed files with 4 additions and 215 deletions

View File

@@ -1,47 +0,0 @@
Phase 2
=======
Idea is that we keep a transaction running on primary with a snapshot
that prevents VACUUM from ever removing records that a query on the
standby can see. So there are never any query conflicts.
Make two connections to primary, P1 and P2. One to Standby, S.
P1:
BEGIN;
SELECT txid_current(); INTO Pn->xid
SELECT pg_sleep(10000000000);
P2:
BEGIN;
SELECT txid_current(); INTO Pn->xid
SELECT pg_sleep(10000000000);
Then every 1sec
S: GetOldestXmin() INTO S->xmin
/*
* If both P1 and P2 are older than oldest xmin on S
* then we cancel the oldest of P1 and P2, and
* start a new xid on that channel. So that P1 and P2
* slowly step forwards, as queries complete on the
* standby.
*/
if (TransactionIdPrecedes(P1->xid, S->xmin) &&
TransactionIdPrecedes(P2->xid, S->xmin))
{
if (TransactionIdPrecedes(P1->xid, P2->xid))
cancel_and_rerun(1)
else
cancel_and_rerun(2)
)
cancel_and_rerun()
{
cancel Pn
BEGIN;
SELECT txid_current(); INTO Pn->xid
SELECT pg_sleep(10000000000);
}

156
main.c
View File

@@ -12,7 +12,6 @@
#include <unistd.h>
#include "repmgr.h"
#include "access/transam.h"
char myClusterName[MAXLEN];
@@ -36,7 +35,6 @@ void getLocalMonitoredInfo(char *currTimestamp, char *xlogLocation,
void MonitorCheck(void);
void MonitorExecute(void);
TransactionId startSleepingTransaction(PGconn * conn, bool stop_current);
int
@@ -199,92 +197,13 @@ getLocalMonitoredInfo(char *currTimestamp, char *xlogLocation, char *xlogTimesta
void
MonitorCheck(void) {
PGconn *p1;
PGconn *p2;
TransactionId p1_xid;
TransactionId p2_xid;
TransactionId local_xmin;
int cicles = 0;
/*
* We are trying to avoid cleanup on primary for records that are still
* visible on standby
*/
p1 = establishDBConnection(primaryConninfo, false);
p2 = establishDBConnection(primaryConninfo, false);
p1_xid = startSleepingTransaction(p1, false);
if (p1_xid == InvalidTransactionId)
{
PQfinish(myLocalConn);
PQfinish(primaryConn);
exit(1);
}
p2_xid = startSleepingTransaction(p2, false);
if (p2_xid == InvalidTransactionId)
{
PQfinish(p1);
PQfinish(myLocalConn);
PQfinish(primaryConn);
exit(1);
}
/*
* We are using two long running transactions on primary to avoid
* cleanup of records that are still visible on this standby.
* Every second check if we can let advance the cleanup to avoid
* bloat on primary.
*
* Every 3 cicles (around 3 seconds), insert monitor info
/*
* Every 3 seconds, insert monitor info
*/
for (;;)
{
PGresult *res;
res = PQexec(myLocalConn, "SELECT get_oldest_xmin()");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
fprintf(stderr, "PQexec failed: %s", PQerrorMessage(myLocalConn));
PQclear(res);
}
local_xmin = atol(PQgetvalue(res, 0, 0));
PQclear(res);
if (TransactionIdPrecedes(p1_xid, local_xmin) &&
TransactionIdPrecedes(p2_xid, local_xmin))
{
if (TransactionIdPrecedes(p1_xid, p2_xid))
{
p1_xid = startSleepingTransaction(p1, true);
if (p1_xid == InvalidTransactionId)
{
PQfinish(p1);
PQfinish(p2);
PQfinish(myLocalConn);
PQfinish(primaryConn);
exit(1);
}
}
else
{
p2_xid = startSleepingTransaction(p2, true);
if (p2_xid == InvalidTransactionId)
{
PQfinish(p1);
PQfinish(p2);
PQfinish(myLocalConn);
PQfinish(primaryConn);
exit(1);
}
}
}
PQclear(res);
if (cicles++ >= 3)
MonitorExecute();
sleep(1);
MonitorExecute();
sleep(3);
}
}
@@ -407,70 +326,3 @@ checkNodeConfiguration(char *conninfo)
}
PQclear(res);
}
TransactionId
startSleepingTransaction(PGconn *conn, bool stop_current) {
TransactionId txid;
PGresult *res;
/*
* if stop_current is set we cancel any query that is currently
* executing on this connection
*/
if (stop_current)
{
char errbuf[256];
if (PQcancel(PQgetCancel(conn), errbuf, 256) == 0)
fprintf(stderr, "Can't stop current query: %s", errbuf);
if (PQisBusy(conn) == 1)
return InvalidTransactionId;
else
PQexec(conn, "ROLLBACK");
}
if (!PQexec(conn, "BEGIN"))
{
fprintf(stderr, "Can't start a transaction on primary. Error: %s",
PQerrorMessage(conn));
return InvalidTransactionId;
}
res = PQexec(conn, "SELECT txid_current()");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
fprintf(stderr, "PQexec failed: %s", PQerrorMessage(conn));
PQclear(res);
return InvalidTransactionId;
}
txid = atol(PQgetvalue(res, 0, 0));
PQclear(res);
/* Let this transaction sleep */
PQsendQuery(conn, "SELECT pg_sleep(10000000000)");
return txid;
}
/*
* TransactionIdPrecedes --- is id1 logically < id2?
* This function was copied from src/backend/access/transam/transam.c
*/
bool
TransactionIdPrecedes(TransactionId id1, TransactionId id2)
{
/*
* If either ID is a permanent XID then we can just do unsigned
* comparison. If both are normal, do a modulo-2^31 comparison.
*/
int32 diff;
if (!TransactionIdIsNormal(id1) || !TransactionIdIsNormal(id2))
return (id1 < id2);
diff = (int32) (id1 - id2);
return (diff < 0);
}

View File

@@ -8,12 +8,10 @@
#include "postgres.h"
#include "fmgr.h"
#include "access/xlog.h"
#include "storage/procarray.h"
PG_MODULE_MAGIC;
Datum last_xlog_replay_timestamp(PG_FUNCTION_ARGS);
Datum oldest_xmin(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(last_xlog_replay_timestamp);
@@ -31,12 +29,3 @@ bool fromSource;
PG_RETURN_TIMESTAMPTZ(rTime);
}
}
PG_FUNCTION_INFO_V1(oldest_xmin);
Datum
oldest_xmin(PG_FUNCTION_ARGS)
{
PG_RETURN_INT64(GetOldestXmin(false, false));
}

View File

@@ -7,7 +7,3 @@
CREATE FUNCTION get_last_xlog_replay_timestamp() RETURNS timestamp with time zone
AS 'MODULE_PATHNAME', 'last_xlog_replay_timestamp'
LANGUAGE C STRICT;
CREATE FUNCTION get_oldest_xmin() RETURNS bigint
AS 'MODULE_PATHNAME', 'oldest_xmin'
LANGUAGE C STRICT;

View File

@@ -1,2 +1 @@
DROP FUNCTION get_last_xlog_replay_timestamp();
DROP FUNCTION get_oldest_xmin();