From 78a16d746dcc544624980cae48575df17f010229 Mon Sep 17 00:00:00 2001 From: Ian Barwick Date: Tue, 27 Jun 2017 00:15:29 +0900 Subject: [PATCH] Initial primary node monitoring --- dbutils.c | 12 ++++++++++ dbutils.h | 7 ++++++ repmgrd.c | 69 ++++++++++++++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 85 insertions(+), 3 deletions(-) diff --git a/dbutils.c b/dbutils.c index 4f81e579..59b844e5 100644 --- a/dbutils.c +++ b/dbutils.c @@ -2177,3 +2177,15 @@ wait_connection_availability(PGconn *conn, long long timeout) return -1; } +/* node availability functions */ + +bool +is_server_available(const char *conninfo) +{ + PGPing status = PQping(conninfo); + + if (status == PQPING_OK) + return true; + + return false; +} diff --git a/dbutils.h b/dbutils.h index 0e3dfc96..03979121 100644 --- a/dbutils.h +++ b/dbutils.h @@ -42,6 +42,9 @@ typedef enum { RECORD_NOT_FOUND } RecordStatus; + + + /* * Struct to store node information */ @@ -227,5 +230,9 @@ RecordStatus get_slot_record(PGconn *conn, char *slot_name, t_replication_slot * bool cancel_query(PGconn *conn, int timeout); int wait_connection_availability(PGconn *conn, long long timeout); +/* node availability functions */ +bool is_server_available(const char *conninfo); + + #endif diff --git a/repmgrd.c b/repmgrd.c index 0cd1e059..575900e2 100644 --- a/repmgrd.c +++ b/repmgrd.c @@ -14,6 +14,13 @@ #define OPT_HELP 1 +typedef enum { + NODE_STATUS_UNKNOWN = -1, + NODE_STATUS_UP, + NODE_STATUS_DOWN +} NodeStatus; + + static char *config_file = NULL; static bool verbose = false; static char *pid_file = NULL; @@ -377,7 +384,7 @@ static void start_monitoring(void) { - log_notice(_("starting monitoring of node %s (ID: %i)"), + log_notice(_("starting monitoring of node \"%s\" (ID: %i)"), local_node_info.node_name, local_node_info.node_id); @@ -405,6 +412,8 @@ start_monitoring(void) static void monitor_streaming_primary(void) { + NodeStatus node_status = NODE_STATUS_UP; + /* Log startup event */ if (startup_event_logged == false) { @@ -416,14 +425,63 @@ monitor_streaming_primary(void) NULL); startup_event_logged = true; - // XXX add more detail - log_notice(_("monitoring cluster master")); + log_notice(_("monitoring cluster primary \"%s\" (node ID: %i)"), + local_node_info.node_name, + local_node_info.node_id); } while (true) { + // cache node list here, refresh at `node_list_refresh_interval` + if (is_server_available(local_node_info.conninfo) == false) + { + if (node_status == NODE_STATUS_UP) + { + int i; + int max_attempts = 30; + + node_status = NODE_STATUS_UNKNOWN; + + log_warning(_("unable to connect to local node")); + + PQfinish(local_conn); + for (i = 0; i < max_attempts; i++) + { + log_info(_("checking state of local node, %i of %i attempts"), i, max_attempts); + if (is_server_available(local_node_info.conninfo) == true) + { + log_notice(_("local node has recovered, reconnecting")); + + local_conn = establish_db_connection(local_node_info.conninfo, true); + + if (PQstatus(local_conn) == CONNECTION_OK) + { + // log reconnect event + node_status = NODE_STATUS_UP; + + goto loop; + } + + PQfinish(local_conn); + log_notice(_("unable to reconnect to local node")); + } + sleep(1); + } + + log_warning(_("unable to reconnect to local node after %i attempts"), max_attempts); + node_status = NODE_STATUS_DOWN; + } + + if (node_status == NODE_STATUS_DOWN) + { + // attempt to find another node from cached list + } + } + + + loop: sleep(1); } } @@ -432,6 +490,11 @@ monitor_streaming_primary(void) static void monitor_streaming_standby(void) { + t_node_info upstream_node_info = T_NODE_INFO_INITIALIZER; + + // check result + (void) get_node_record(local_conn, local_node_info.upstream_node_id, &upstream_node_info); + while (true) { sleep(1);