Initial primary node monitoring

This commit is contained in:
Ian Barwick
2017-06-27 00:15:29 +09:00
parent 7845a1fb47
commit 78a16d746d
3 changed files with 85 additions and 3 deletions

View File

@@ -2177,3 +2177,15 @@ wait_connection_availability(PGconn *conn, long long timeout)
return -1; return -1;
} }
/* node availability functions */
bool
is_server_available(const char *conninfo)
{
PGPing status = PQping(conninfo);
if (status == PQPING_OK)
return true;
return false;
}

View File

@@ -42,6 +42,9 @@ typedef enum {
RECORD_NOT_FOUND RECORD_NOT_FOUND
} RecordStatus; } RecordStatus;
/* /*
* Struct to store node information * Struct to store node information
*/ */
@@ -227,5 +230,9 @@ RecordStatus get_slot_record(PGconn *conn, char *slot_name, t_replication_slot *
bool cancel_query(PGconn *conn, int timeout); bool cancel_query(PGconn *conn, int timeout);
int wait_connection_availability(PGconn *conn, long long timeout); int wait_connection_availability(PGconn *conn, long long timeout);
/* node availability functions */
bool is_server_available(const char *conninfo);
#endif #endif

View File

@@ -14,6 +14,13 @@
#define OPT_HELP 1 #define OPT_HELP 1
typedef enum {
NODE_STATUS_UNKNOWN = -1,
NODE_STATUS_UP,
NODE_STATUS_DOWN
} NodeStatus;
static char *config_file = NULL; static char *config_file = NULL;
static bool verbose = false; static bool verbose = false;
static char *pid_file = NULL; static char *pid_file = NULL;
@@ -377,7 +384,7 @@ static void
start_monitoring(void) start_monitoring(void)
{ {
log_notice(_("starting monitoring of node %s (ID: %i)"), log_notice(_("starting monitoring of node \"%s\" (ID: %i)"),
local_node_info.node_name, local_node_info.node_name,
local_node_info.node_id); local_node_info.node_id);
@@ -405,6 +412,8 @@ start_monitoring(void)
static void static void
monitor_streaming_primary(void) monitor_streaming_primary(void)
{ {
NodeStatus node_status = NODE_STATUS_UP;
/* Log startup event */ /* Log startup event */
if (startup_event_logged == false) if (startup_event_logged == false)
{ {
@@ -416,14 +425,63 @@ monitor_streaming_primary(void)
NULL); NULL);
startup_event_logged = true; startup_event_logged = true;
// XXX add more detail log_notice(_("monitoring cluster primary \"%s\" (node ID: %i)"),
log_notice(_("monitoring cluster master")); local_node_info.node_name,
local_node_info.node_id);
} }
while (true) while (true)
{ {
// cache node list here, refresh at `node_list_refresh_interval`
if (is_server_available(local_node_info.conninfo) == false)
{
if (node_status == NODE_STATUS_UP)
{
int i;
int max_attempts = 30;
node_status = NODE_STATUS_UNKNOWN;
log_warning(_("unable to connect to local node"));
PQfinish(local_conn);
for (i = 0; i < max_attempts; i++)
{
log_info(_("checking state of local node, %i of %i attempts"), i, max_attempts);
if (is_server_available(local_node_info.conninfo) == true)
{
log_notice(_("local node has recovered, reconnecting"));
local_conn = establish_db_connection(local_node_info.conninfo, true);
if (PQstatus(local_conn) == CONNECTION_OK)
{
// log reconnect event
node_status = NODE_STATUS_UP;
goto loop;
}
PQfinish(local_conn);
log_notice(_("unable to reconnect to local node"));
}
sleep(1);
}
log_warning(_("unable to reconnect to local node after %i attempts"), max_attempts);
node_status = NODE_STATUS_DOWN;
}
if (node_status == NODE_STATUS_DOWN)
{
// attempt to find another node from cached list
}
}
loop:
sleep(1); sleep(1);
} }
} }
@@ -432,6 +490,11 @@ monitor_streaming_primary(void)
static void static void
monitor_streaming_standby(void) monitor_streaming_standby(void)
{ {
t_node_info upstream_node_info = T_NODE_INFO_INITIALIZER;
// check result
(void) get_node_record(local_conn, local_node_info.upstream_node_id, &upstream_node_info);
while (true) while (true)
{ {
sleep(1); sleep(1);