mirror of
https://github.com/EnterpriseDB/repmgr.git
synced 2026-03-22 22:56:29 +00:00
Since b5934bfd6071 in postgresql.git the flag `-Wshadow=compatible-local` is activated. This commit fixes any duplicated declaration made in the same function. References: HL-40
6229 lines
141 KiB
C
6229 lines
141 KiB
C
/*
|
|
* dbutils.c - Database connection/management functions
|
|
*
|
|
* Copyright (c) EnterpriseDB Corporation, 2010-2021
|
|
*
|
|
*
|
|
* This program is free software: you can redistribute it and/or modify
|
|
* it under the terms of the GNU General Public License as published by
|
|
* the Free Software Foundation, either version 3 of the License, or
|
|
* (at your option) any later version.
|
|
*
|
|
* This program is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU General Public License
|
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
#include <unistd.h>
|
|
#include <time.h>
|
|
#include <sys/time.h>
|
|
#include <sys/stat.h>
|
|
#include <dirent.h>
|
|
#include <arpa/inet.h>
|
|
|
|
#include "repmgr.h"
|
|
#include "dbutils.h"
|
|
#include "controldata.h"
|
|
#include "dirutil.h"
|
|
|
|
#define NODE_RECORD_PARAM_COUNT 11
|
|
|
|
|
|
static void log_db_error(PGconn *conn, const char *query_text, const char *fmt,...)
|
|
__attribute__((format(PG_PRINTF_ATTRIBUTE, 3, 4)));
|
|
|
|
static bool _is_server_available(const char *conninfo, bool quiet);
|
|
|
|
static PGconn *_establish_db_connection(const char *conninfo,
|
|
const bool exit_on_error,
|
|
const bool log_notice,
|
|
const bool verbose_only);
|
|
|
|
static PGconn * _establish_replication_connection_from_params(PGconn *conn, const char *conninfo, const char *repluser);
|
|
|
|
static PGconn *_get_primary_connection(PGconn *standby_conn, int *primary_id, char *primary_conninfo_out, bool quiet);
|
|
|
|
static bool _set_config(PGconn *conn, const char *config_param, const char *sqlquery);
|
|
static bool _get_pg_setting(PGconn *conn, const char *setting, char *str_output, bool *bool_output, int *int_output);
|
|
|
|
static RecordStatus _get_node_record(PGconn *conn, char *sqlquery, t_node_info *node_info, bool init_defaults);
|
|
static void _populate_node_record(PGresult *res, t_node_info *node_info, int row, bool init_defaults);
|
|
|
|
static void _populate_node_records(PGresult *res, NodeInfoList *node_list);
|
|
|
|
static bool _create_update_node_record(PGconn *conn, char *action, t_node_info *node_info);
|
|
|
|
static ReplSlotStatus _verify_replication_slot(PGconn *conn, char *slot_name, PQExpBufferData *error_msg);
|
|
|
|
static bool _create_event(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details, t_event_info *event_info, bool send_notification);
|
|
|
|
static NodeAttached _is_downstream_node_attached(PGconn *conn, char *node_name, char **node_state, bool quiet);
|
|
|
|
/*
|
|
* This provides a standardized way of logging database errors. Note
|
|
* that the provided PGconn can be a normal or a replication connection;
|
|
* no attempt is made to write to the database, only to report the output
|
|
* of PQerrorMessage().
|
|
*/
|
|
void
|
|
log_db_error(PGconn *conn, const char *query_text, const char *fmt,...)
|
|
{
|
|
va_list ap;
|
|
char buf[MAXLEN];
|
|
int retval;
|
|
|
|
va_start(ap, fmt);
|
|
retval = vsnprintf(buf, MAXLEN, fmt, ap);
|
|
va_end(ap);
|
|
|
|
if (retval < MAXLEN)
|
|
log_error("%s", buf);
|
|
|
|
if (conn != NULL)
|
|
{
|
|
log_detail("\n%s", PQerrorMessage(conn));
|
|
}
|
|
|
|
if (query_text != NULL)
|
|
{
|
|
log_detail("query text is:\n%s", query_text);
|
|
}
|
|
}
|
|
|
|
/* ================= */
|
|
/* utility functions */
|
|
/* ================= */
|
|
|
|
XLogRecPtr
|
|
parse_lsn(const char *str)
|
|
{
|
|
XLogRecPtr ptr = InvalidXLogRecPtr;
|
|
uint32 high,
|
|
low;
|
|
|
|
if (sscanf(str, "%x/%x", &high, &low) == 2)
|
|
ptr = (((XLogRecPtr) high) << 32) + (XLogRecPtr) low;
|
|
|
|
return ptr;
|
|
}
|
|
|
|
|
|
/* ==================== */
|
|
/* Connection functions */
|
|
/* ==================== */
|
|
|
|
/*
|
|
* _establish_db_connection()
|
|
*
|
|
* Connect to a database using a conninfo string.
|
|
*
|
|
* NOTE: *do not* use this for replication connections; instead use:
|
|
* establish_db_connection_by_params()
|
|
*/
|
|
|
|
static PGconn *
|
|
_establish_db_connection(const char *conninfo, const bool exit_on_error, const bool log_notice, const bool verbose_only)
|
|
{
|
|
PGconn *conn = NULL;
|
|
char *connection_string = NULL;
|
|
char *errmsg = NULL;
|
|
|
|
t_conninfo_param_list conninfo_params = T_CONNINFO_PARAM_LIST_INITIALIZER;
|
|
bool is_replication_connection = false;
|
|
bool parse_success = false;
|
|
|
|
initialize_conninfo_params(&conninfo_params, false);
|
|
|
|
parse_success = parse_conninfo_string(conninfo, &conninfo_params, &errmsg, false);
|
|
|
|
if (parse_success == false)
|
|
{
|
|
log_error(_("unable to parse provided conninfo string \"%s\""), conninfo);
|
|
log_detail("%s", errmsg);
|
|
free_conninfo_params(&conninfo_params);
|
|
return NULL;
|
|
}
|
|
|
|
/* set some default values if not explicitly provided */
|
|
param_set_ine(&conninfo_params, "connect_timeout", "2");
|
|
param_set_ine(&conninfo_params, "fallback_application_name", "repmgr");
|
|
|
|
if (param_get(&conninfo_params, "replication") != NULL)
|
|
is_replication_connection = true;
|
|
|
|
/* use a secure search_path */
|
|
param_set(&conninfo_params, "options", "-csearch_path=");
|
|
|
|
connection_string = param_list_to_string(&conninfo_params);
|
|
|
|
log_debug(_("connecting to: \"%s\""), connection_string);
|
|
|
|
conn = PQconnectdb(connection_string);
|
|
|
|
/* Check to see that the backend connection was successfully made */
|
|
if ((PQstatus(conn) != CONNECTION_OK))
|
|
{
|
|
bool emit_log = true;
|
|
|
|
if (verbose_only == true && verbose_logging == false)
|
|
emit_log = false;
|
|
|
|
if (emit_log)
|
|
{
|
|
if (log_notice)
|
|
{
|
|
log_notice(_("connection to database failed"));
|
|
log_detail("\n%s", PQerrorMessage(conn));
|
|
}
|
|
else
|
|
{
|
|
log_error(_("connection to database failed"));
|
|
log_detail("\n%s", PQerrorMessage(conn));
|
|
}
|
|
log_detail(_("attempted to connect using:\n %s"),
|
|
connection_string);
|
|
}
|
|
|
|
if (exit_on_error)
|
|
{
|
|
PQfinish(conn);
|
|
free_conninfo_params(&conninfo_params);
|
|
exit(ERR_DB_CONN);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* set "synchronous_commit" to "local" in case synchronous replication is
|
|
* in use
|
|
*
|
|
* XXX set this explicitly before any write operations
|
|
*/
|
|
|
|
else if (is_replication_connection == false &&
|
|
set_config(conn, "synchronous_commit", "local") == false)
|
|
{
|
|
if (exit_on_error)
|
|
{
|
|
PQfinish(conn);
|
|
free_conninfo_params(&conninfo_params);
|
|
exit(ERR_DB_CONN);
|
|
}
|
|
}
|
|
|
|
pfree(connection_string);
|
|
free_conninfo_params(&conninfo_params);
|
|
|
|
return conn;
|
|
}
|
|
|
|
|
|
/*
|
|
* Establish a database connection, optionally exit on error
|
|
*/
|
|
PGconn *
|
|
establish_db_connection(const char *conninfo, const bool exit_on_error)
|
|
{
|
|
return _establish_db_connection(conninfo, exit_on_error, false, false);
|
|
}
|
|
|
|
/*
|
|
* Attempt to establish a database connection, never exit on error, only
|
|
* output error messages if --verbose option used
|
|
*/
|
|
PGconn *
|
|
establish_db_connection_quiet(const char *conninfo)
|
|
{
|
|
return _establish_db_connection(conninfo, false, false, true);
|
|
}
|
|
|
|
|
|
PGconn *
|
|
establish_db_connection_with_replacement_param(const char *conninfo,
|
|
const char *param,
|
|
const char *value,
|
|
const bool exit_on_error)
|
|
{
|
|
t_conninfo_param_list node_conninfo = T_CONNINFO_PARAM_LIST_INITIALIZER;
|
|
char *errmsg = NULL;
|
|
bool parse_success = false;
|
|
PGconn *conn = NULL;
|
|
|
|
initialize_conninfo_params(&node_conninfo, false);
|
|
|
|
parse_success = parse_conninfo_string(conninfo,
|
|
&node_conninfo,
|
|
&errmsg, false);
|
|
|
|
if (parse_success == false)
|
|
{
|
|
log_error(_("unable to parse conninfo string \"%s\" for local node"),
|
|
conninfo);
|
|
log_detail("%s", errmsg);
|
|
|
|
if (exit_on_error == true)
|
|
exit(ERR_BAD_CONFIG);
|
|
|
|
return NULL;
|
|
}
|
|
|
|
param_set(&node_conninfo,
|
|
param,
|
|
value);
|
|
|
|
conn = establish_db_connection_by_params(&node_conninfo, exit_on_error);
|
|
|
|
free_conninfo_params(&node_conninfo);
|
|
|
|
return conn;
|
|
}
|
|
|
|
PGconn *
|
|
establish_primary_db_connection(PGconn *conn,
|
|
const bool exit_on_error)
|
|
{
|
|
t_node_info primary_node_info = T_NODE_INFO_INITIALIZER;
|
|
bool primary_record_found = get_primary_node_record(conn, &primary_node_info);
|
|
|
|
if (primary_record_found == false)
|
|
{
|
|
return NULL;
|
|
}
|
|
|
|
return establish_db_connection(primary_node_info.conninfo,
|
|
exit_on_error);
|
|
}
|
|
|
|
|
|
PGconn *
|
|
establish_db_connection_by_params(t_conninfo_param_list *param_list,
|
|
const bool exit_on_error)
|
|
{
|
|
PGconn *conn = NULL;
|
|
|
|
/* set some default values if not explicitly provided */
|
|
param_set_ine(param_list, "connect_timeout", "2");
|
|
param_set_ine(param_list, "fallback_application_name", "repmgr");
|
|
|
|
/* use a secure search_path */
|
|
param_set(param_list, "options", "-csearch_path=");
|
|
|
|
/* Connect to the database using the provided parameters */
|
|
conn = PQconnectdbParams((const char **) param_list->keywords, (const char **) param_list->values, true);
|
|
|
|
/* Check to see that the backend connection was successfully made */
|
|
if ((PQstatus(conn) != CONNECTION_OK))
|
|
{
|
|
log_error(_("connection to database failed"));
|
|
log_detail("\n%s", PQerrorMessage(conn));
|
|
|
|
if (exit_on_error)
|
|
{
|
|
PQfinish(conn);
|
|
exit(ERR_DB_CONN);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
bool is_replication_connection = false;
|
|
int i;
|
|
|
|
/*
|
|
* set "synchronous_commit" to "local" in case synchronous replication
|
|
* is in use (provided this is not a replication connection)
|
|
*/
|
|
|
|
for (i = 0; param_list->keywords[i]; i++)
|
|
{
|
|
if (strcmp(param_list->keywords[i], "replication") == 0)
|
|
is_replication_connection = true;
|
|
}
|
|
|
|
if (is_replication_connection == false && set_config(conn, "synchronous_commit", "local") == false)
|
|
{
|
|
if (exit_on_error)
|
|
{
|
|
PQfinish(conn);
|
|
exit(ERR_DB_CONN);
|
|
}
|
|
}
|
|
}
|
|
|
|
return conn;
|
|
}
|
|
|
|
|
|
/*
|
|
* Given an existing active connection and the name of a replication
|
|
* user, extract the connection parameters from that connection and
|
|
* attempt to return a replication connection.
|
|
*/
|
|
PGconn *
|
|
establish_replication_connection_from_conn(PGconn *conn, const char *repluser)
|
|
{
|
|
return _establish_replication_connection_from_params(conn, NULL, repluser);
|
|
}
|
|
|
|
|
|
PGconn *
|
|
establish_replication_connection_from_conninfo(const char *conninfo, const char *repluser)
|
|
{
|
|
return _establish_replication_connection_from_params(NULL, conninfo, repluser);
|
|
}
|
|
|
|
|
|
static PGconn *
|
|
_establish_replication_connection_from_params(PGconn *conn, const char *conninfo, const char *repluser)
|
|
{
|
|
t_conninfo_param_list repl_conninfo = T_CONNINFO_PARAM_LIST_INITIALIZER;
|
|
PGconn *repl_conn = NULL;
|
|
|
|
initialize_conninfo_params(&repl_conninfo, false);
|
|
|
|
if (conn != NULL)
|
|
conn_to_param_list(conn, &repl_conninfo);
|
|
else if (conninfo != NULL)
|
|
parse_conninfo_string(conninfo, &repl_conninfo, NULL, false);
|
|
|
|
/* Set the provided replication user */
|
|
param_set(&repl_conninfo, "user", repluser);
|
|
param_set(&repl_conninfo, "replication", "1");
|
|
param_set(&repl_conninfo, "dbname", "replication");
|
|
|
|
repl_conn = establish_db_connection_by_params(&repl_conninfo, false);
|
|
free_conninfo_params(&repl_conninfo);
|
|
|
|
return repl_conn;
|
|
}
|
|
|
|
|
|
PGconn *
|
|
get_primary_connection(PGconn *conn,
|
|
int *primary_id, char *primary_conninfo_out)
|
|
{
|
|
return _get_primary_connection(conn, primary_id, primary_conninfo_out, false);
|
|
}
|
|
|
|
|
|
PGconn *
|
|
get_primary_connection_quiet(PGconn *conn,
|
|
int *primary_id, char *primary_conninfo_out)
|
|
{
|
|
return _get_primary_connection(conn, primary_id, primary_conninfo_out, true);
|
|
}
|
|
|
|
PGconn *
|
|
duplicate_connection(PGconn *conn, const char *user, bool replication)
|
|
{
|
|
t_conninfo_param_list conninfo = T_CONNINFO_PARAM_LIST_INITIALIZER;
|
|
PGconn *duplicate_conn = NULL;
|
|
|
|
initialize_conninfo_params(&conninfo, false);
|
|
conn_to_param_list(conn, &conninfo);
|
|
|
|
if (user != NULL)
|
|
param_set(&conninfo, "user", user);
|
|
|
|
if (replication == true)
|
|
param_set(&conninfo, "replication", "1");
|
|
|
|
duplicate_conn = establish_db_connection_by_params(&conninfo, false);
|
|
|
|
free_conninfo_params(&conninfo);
|
|
|
|
return duplicate_conn;
|
|
}
|
|
|
|
|
|
|
|
void
|
|
close_connection(PGconn **conn)
|
|
{
|
|
if (*conn == NULL)
|
|
return;
|
|
|
|
PQfinish(*conn);
|
|
|
|
*conn = NULL;
|
|
}
|
|
|
|
|
|
/* =============================== */
|
|
/* conninfo manipulation functions */
|
|
/* =============================== */
|
|
|
|
|
|
/*
|
|
* get_conninfo_value()
|
|
*
|
|
* Extract the value represented by 'keyword' in 'conninfo' and copy
|
|
* it to the 'output' buffer.
|
|
*
|
|
* Returns true on success, or false on failure (conninfo string could
|
|
* not be parsed, or provided keyword not found).
|
|
*/
|
|
|
|
bool
|
|
get_conninfo_value(const char *conninfo, const char *keyword, char *output)
|
|
{
|
|
PQconninfoOption *conninfo_options = NULL;
|
|
PQconninfoOption *conninfo_option = NULL;
|
|
|
|
conninfo_options = PQconninfoParse(conninfo, NULL);
|
|
|
|
if (conninfo_options == NULL)
|
|
{
|
|
log_error(_("unable to parse provided conninfo string \"%s\""), conninfo);
|
|
return false;
|
|
}
|
|
|
|
for (conninfo_option = conninfo_options; conninfo_option->keyword != NULL; conninfo_option++)
|
|
{
|
|
if (strcmp(conninfo_option->keyword, keyword) == 0)
|
|
{
|
|
if (conninfo_option->val != NULL && conninfo_option->val[0] != '\0')
|
|
{
|
|
strncpy(output, conninfo_option->val, MAXLEN);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
PQconninfoFree(conninfo_options);
|
|
|
|
return true;
|
|
}
|
|
|
|
|
|
/*
|
|
* Get a default conninfo value for the provided parameter, and copy
|
|
* it to the 'output' buffer.
|
|
*
|
|
* Returns true on success, or false on failure (provided keyword not found).
|
|
*
|
|
*/
|
|
bool
|
|
get_conninfo_default_value(const char *param, char *output, int maxlen)
|
|
{
|
|
PQconninfoOption *defs = NULL;
|
|
PQconninfoOption *def = NULL;
|
|
bool found = false;
|
|
|
|
defs = PQconndefaults();
|
|
|
|
for (def = defs; def->keyword; def++)
|
|
{
|
|
if (strncmp(def->keyword, param, maxlen) == 0)
|
|
{
|
|
strncpy(output, def->val, maxlen);
|
|
found = true;
|
|
}
|
|
}
|
|
|
|
PQconninfoFree(defs);
|
|
|
|
return found;
|
|
}
|
|
|
|
|
|
void
|
|
initialize_conninfo_params(t_conninfo_param_list *param_list, bool set_defaults)
|
|
{
|
|
PQconninfoOption *defs = NULL;
|
|
PQconninfoOption *def = NULL;
|
|
int c;
|
|
|
|
defs = PQconndefaults();
|
|
param_list->size = 0;
|
|
|
|
/* Count maximum number of parameters */
|
|
for (def = defs; def->keyword; def++)
|
|
param_list->size++;
|
|
|
|
/* Initialize our internal parameter list */
|
|
param_list->keywords = pg_malloc0(sizeof(char *) * (param_list->size + 1));
|
|
param_list->values = pg_malloc0(sizeof(char *) * (param_list->size + 1));
|
|
|
|
for (c = 0; c < param_list->size; c++)
|
|
{
|
|
param_list->keywords[c] = NULL;
|
|
param_list->values[c] = NULL;
|
|
}
|
|
|
|
if (set_defaults == true)
|
|
{
|
|
/* Pre-set any defaults */
|
|
|
|
for (def = defs; def->keyword; def++)
|
|
{
|
|
if (def->val != NULL && def->val[0] != '\0')
|
|
{
|
|
param_set(param_list, def->keyword, def->val);
|
|
}
|
|
}
|
|
}
|
|
|
|
PQconninfoFree(defs);
|
|
}
|
|
|
|
|
|
void
|
|
free_conninfo_params(t_conninfo_param_list *param_list)
|
|
{
|
|
int c;
|
|
|
|
for (c = 0; c < param_list->size; c++)
|
|
{
|
|
if (param_list->keywords != NULL && param_list->keywords[c] != NULL)
|
|
pfree(param_list->keywords[c]);
|
|
|
|
if (param_list->values != NULL && param_list->values[c] != NULL)
|
|
pfree(param_list->values[c]);
|
|
}
|
|
|
|
if (param_list->keywords != NULL)
|
|
pfree(param_list->keywords);
|
|
|
|
if (param_list->values != NULL)
|
|
pfree(param_list->values);
|
|
}
|
|
|
|
|
|
|
|
void
|
|
copy_conninfo_params(t_conninfo_param_list *dest_list, t_conninfo_param_list *source_list)
|
|
{
|
|
int c;
|
|
|
|
for (c = 0; c < source_list->size && source_list->keywords[c] != NULL; c++)
|
|
{
|
|
if (source_list->values[c] != NULL && source_list->values[c][0] != '\0')
|
|
{
|
|
param_set(dest_list, source_list->keywords[c], source_list->values[c]);
|
|
}
|
|
}
|
|
}
|
|
|
|
void
|
|
param_set(t_conninfo_param_list *param_list, const char *param, const char *value)
|
|
{
|
|
int c;
|
|
int value_len = strlen(value) + 1;
|
|
int param_len;
|
|
|
|
/*
|
|
* Scan array to see if the parameter is already set - if not, replace it
|
|
*/
|
|
for (c = 0; c < param_list->size && param_list->keywords[c] != NULL; c++)
|
|
{
|
|
if (strcmp(param_list->keywords[c], param) == 0)
|
|
{
|
|
if (param_list->values[c] != NULL)
|
|
pfree(param_list->values[c]);
|
|
|
|
param_list->values[c] = pg_malloc0(value_len);
|
|
strncpy(param_list->values[c], value, value_len);
|
|
|
|
return;
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Sanity-check that the caller is not trying to overflow the array;
|
|
* in practice this is highly unlikely, and if it ever happens, this means
|
|
* something is highly wrong.
|
|
*/
|
|
Assert(c < param_list->size);
|
|
|
|
/*
|
|
* Parameter not in array - add it and its associated value
|
|
*/
|
|
param_len = strlen(param) + 1;
|
|
|
|
param_list->keywords[c] = pg_malloc0(param_len);
|
|
param_list->values[c] = pg_malloc0(value_len);
|
|
|
|
strncpy(param_list->keywords[c], param, param_len);
|
|
strncpy(param_list->values[c], value, value_len);
|
|
}
|
|
|
|
|
|
/*
|
|
* Like param_set(), but will only set the parameter if it doesn't exist
|
|
*/
|
|
void
|
|
param_set_ine(t_conninfo_param_list *param_list, const char *param, const char *value)
|
|
{
|
|
int c;
|
|
int value_len = strlen(value) + 1;
|
|
int param_len;
|
|
|
|
/*
|
|
* Scan array to see if the parameter is already set - if so, do nothing
|
|
*/
|
|
for (c = 0; c < param_list->size && param_list->keywords[c] != NULL; c++)
|
|
{
|
|
if (strcmp(param_list->keywords[c], param) == 0)
|
|
{
|
|
/* parameter exists, do nothing */
|
|
return;
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Sanity-check that the caller is not trying to overflow the array;
|
|
* in practice this is highly unlikely, and if it ever happens, this means
|
|
* something is highly wrong.
|
|
*/
|
|
Assert(c < param_list->size);
|
|
|
|
/*
|
|
* Parameter not in array - add it and its associated value
|
|
*/
|
|
param_len = strlen(param) + 1;
|
|
|
|
param_list->keywords[c] = pg_malloc0(param_len);
|
|
param_list->values[c] = pg_malloc0(value_len);
|
|
|
|
strncpy(param_list->keywords[c], param, param_len);
|
|
strncpy(param_list->values[c], value, value_len);
|
|
}
|
|
|
|
|
|
char *
|
|
param_get(t_conninfo_param_list *param_list, const char *param)
|
|
{
|
|
int c;
|
|
|
|
for (c = 0; c < param_list->size && param_list->keywords[c] != NULL; c++)
|
|
{
|
|
if (strcmp(param_list->keywords[c], param) == 0)
|
|
{
|
|
if (param_list->values[c] != NULL && param_list->values[c][0] != '\0')
|
|
return param_list->values[c];
|
|
else
|
|
return NULL;
|
|
}
|
|
}
|
|
|
|
return NULL;
|
|
}
|
|
|
|
|
|
/*
|
|
* Validate a conninfo string by attempting to parse it.
|
|
*
|
|
* "errmsg": passed to PQconninfoParse(), may be NULL
|
|
*
|
|
* NOTE: PQconninfoParse() verifies the string format and checks for
|
|
* valid options but does not sanity check values.
|
|
*/
|
|
|
|
bool
|
|
validate_conninfo_string(const char *conninfo_str, char **errmsg)
|
|
{
|
|
PQconninfoOption *connOptions = NULL;
|
|
|
|
connOptions = PQconninfoParse(conninfo_str, errmsg);
|
|
|
|
if (connOptions == NULL)
|
|
return false;
|
|
|
|
PQconninfoFree(connOptions);
|
|
|
|
return true;
|
|
}
|
|
|
|
|
|
/*
|
|
* Parse a conninfo string into a t_conninfo_param_list
|
|
*
|
|
* See conn_to_param_list() to do the same for a PGconn.
|
|
*
|
|
* "errmsg": passed to PQconninfoParse(), may be NULL
|
|
*
|
|
* "ignore_local_params": ignores those parameters specific
|
|
* to a local installation, i.e. when parsing an upstream
|
|
* node's conninfo string for inclusion into "primary_conninfo",
|
|
* don't copy that node's values
|
|
*/
|
|
bool
|
|
parse_conninfo_string(const char *conninfo_str, t_conninfo_param_list *param_list, char **errmsg, bool ignore_local_params)
|
|
{
|
|
PQconninfoOption *connOptions = NULL;
|
|
PQconninfoOption *option = NULL;
|
|
|
|
connOptions = PQconninfoParse(conninfo_str, errmsg);
|
|
|
|
if (connOptions == NULL)
|
|
return false;
|
|
|
|
for (option = connOptions; option && option->keyword; option++)
|
|
{
|
|
/* Ignore non-set or blank parameter values */
|
|
if (option->val == NULL || option->val[0] == '\0')
|
|
continue;
|
|
|
|
/* Ignore settings specific to the upstream node */
|
|
if (ignore_local_params == true)
|
|
{
|
|
if (strcmp(option->keyword, "application_name") == 0)
|
|
continue;
|
|
if (strcmp(option->keyword, "passfile") == 0)
|
|
continue;
|
|
if (strcmp(option->keyword, "servicefile") == 0)
|
|
continue;
|
|
}
|
|
param_set(param_list, option->keyword, option->val);
|
|
}
|
|
|
|
PQconninfoFree(connOptions);
|
|
|
|
return true;
|
|
}
|
|
|
|
|
|
/*
|
|
* Parse a PGconn into a t_conninfo_param_list
|
|
*
|
|
* See parse_conninfo_string() to do the same for a conninfo string
|
|
*
|
|
* NOTE: the current use case for this is to take an active connection,
|
|
* replace the existing username (typically replacing it with the superuser
|
|
* or replication user name), and make a new connection as that user.
|
|
* If the "password" field is set, it will cause any connection made with
|
|
* these parameters to fail (unless of course the password happens to be the
|
|
* same). Therefore we remove the password altogether, and rely on it being
|
|
* available via .pgpass.
|
|
*/
|
|
void
|
|
conn_to_param_list(PGconn *conn, t_conninfo_param_list *param_list)
|
|
{
|
|
PQconninfoOption *connOptions = NULL;
|
|
PQconninfoOption *option = NULL;
|
|
|
|
connOptions = PQconninfo(conn);
|
|
for (option = connOptions; option && option->keyword; option++)
|
|
{
|
|
/* Ignore non-set or blank parameter values */
|
|
if (option->val == NULL || option->val[0] == '\0')
|
|
continue;
|
|
|
|
/* Ignore "password" */
|
|
if (strcmp(option->keyword, "password") == 0)
|
|
continue;
|
|
|
|
param_set(param_list, option->keyword, option->val);
|
|
}
|
|
|
|
PQconninfoFree(connOptions);
|
|
}
|
|
|
|
|
|
/*
|
|
* Converts param list to string; caller must free returned pointer
|
|
*/
|
|
char *
|
|
param_list_to_string(t_conninfo_param_list *param_list)
|
|
{
|
|
int c;
|
|
PQExpBufferData conninfo_buf;
|
|
char *conninfo_str = NULL;
|
|
int len = 0;
|
|
|
|
initPQExpBuffer(&conninfo_buf);
|
|
|
|
for (c = 0; c < param_list->size && param_list->keywords[c] != NULL; c++)
|
|
{
|
|
if (param_list->values[c] != NULL && param_list->values[c][0] != '\0')
|
|
{
|
|
if (c > 0)
|
|
appendPQExpBufferChar(&conninfo_buf, ' ');
|
|
|
|
/* XXX escape value */
|
|
appendPQExpBuffer(&conninfo_buf,
|
|
"%s=%s",
|
|
param_list->keywords[c],
|
|
param_list->values[c]);
|
|
}
|
|
}
|
|
|
|
len = strlen(conninfo_buf.data) + 1;
|
|
conninfo_str = pg_malloc0(len);
|
|
|
|
strncpy(conninfo_str, conninfo_buf.data, len);
|
|
|
|
termPQExpBuffer(&conninfo_buf);
|
|
|
|
return conninfo_str;
|
|
}
|
|
|
|
|
|
/*
|
|
* Run a conninfo string through the parser, and pass it back as a normal
|
|
* conninfo string. This is mainly intended for converting connection URIs
|
|
* to parameter/value conninfo strings.
|
|
*
|
|
* Caller must free returned pointer.
|
|
*/
|
|
|
|
char *
|
|
normalize_conninfo_string(const char *conninfo_str)
|
|
{
|
|
t_conninfo_param_list conninfo_params = T_CONNINFO_PARAM_LIST_INITIALIZER;
|
|
bool parse_success = false;
|
|
char *normalized_string = NULL;
|
|
char *errmsg = NULL;
|
|
|
|
initialize_conninfo_params(&conninfo_params, false);
|
|
|
|
parse_success = parse_conninfo_string(conninfo_str, &conninfo_params, &errmsg, false);
|
|
|
|
if (parse_success == false)
|
|
{
|
|
log_error(_("unable to parse provided conninfo string \"%s\""), conninfo_str);
|
|
log_detail("%s", errmsg);
|
|
free_conninfo_params(&conninfo_params);
|
|
return NULL;
|
|
}
|
|
|
|
|
|
normalized_string = param_list_to_string(&conninfo_params);
|
|
free_conninfo_params(&conninfo_params);
|
|
|
|
return normalized_string;
|
|
}
|
|
|
|
/*
|
|
* check whether the libpq version in use recognizes the "passfile" parameter
|
|
* (should be 9.6 and later)
|
|
*/
|
|
bool
|
|
has_passfile(void)
|
|
{
|
|
PQconninfoOption *defs = PQconndefaults();
|
|
PQconninfoOption *def = NULL;
|
|
bool has_passfile = false;
|
|
|
|
for (def = defs; def->keyword; def++)
|
|
{
|
|
if (strcmp(def->keyword, "passfile") == 0)
|
|
{
|
|
has_passfile = true;
|
|
break;
|
|
}
|
|
}
|
|
|
|
PQconninfoFree(defs);
|
|
|
|
return has_passfile;
|
|
}
|
|
|
|
|
|
|
|
/* ===================== */
|
|
/* transaction functions */
|
|
/* ===================== */
|
|
|
|
bool
|
|
begin_transaction(PGconn *conn)
|
|
{
|
|
PGresult *res = NULL;
|
|
|
|
log_verbose(LOG_DEBUG, "begin_transaction()");
|
|
|
|
res = PQexec(conn, "BEGIN");
|
|
|
|
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
|
{
|
|
log_error(_("unable to begin transaction"));
|
|
log_detail("%s", PQerrorMessage(conn));
|
|
|
|
PQclear(res);
|
|
return false;
|
|
}
|
|
|
|
PQclear(res);
|
|
|
|
return true;
|
|
}
|
|
|
|
|
|
bool
|
|
commit_transaction(PGconn *conn)
|
|
{
|
|
PGresult *res = NULL;
|
|
|
|
log_verbose(LOG_DEBUG, "commit_transaction()");
|
|
|
|
res = PQexec(conn, "COMMIT");
|
|
|
|
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
|
{
|
|
log_error(_("unable to commit transaction"));
|
|
log_detail("%s", PQerrorMessage(conn));
|
|
PQclear(res);
|
|
|
|
return false;
|
|
}
|
|
|
|
PQclear(res);
|
|
|
|
return true;
|
|
}
|
|
|
|
|
|
bool
|
|
rollback_transaction(PGconn *conn)
|
|
{
|
|
PGresult *res = NULL;
|
|
|
|
log_verbose(LOG_DEBUG, "rollback_transaction()");
|
|
|
|
res = PQexec(conn, "ROLLBACK");
|
|
|
|
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
|
{
|
|
log_error(_("unable to rollback transaction"));
|
|
log_detail("%s", PQerrorMessage(conn));
|
|
PQclear(res);
|
|
|
|
return false;
|
|
}
|
|
|
|
PQclear(res);
|
|
|
|
return true;
|
|
}
|
|
|
|
|
|
/* ========================== */
|
|
/* GUC manipulation functions */
|
|
/* ========================== */
|
|
|
|
static bool
|
|
_set_config(PGconn *conn, const char *config_param, const char *sqlquery)
|
|
{
|
|
bool success = true;
|
|
PGresult *res = PQexec(conn, sqlquery);
|
|
|
|
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
|
{
|
|
log_db_error(conn, sqlquery, "_set_config(): unable to set \"%s\"", config_param);
|
|
success = false;
|
|
}
|
|
|
|
PQclear(res);
|
|
|
|
return success;
|
|
}
|
|
|
|
|
|
bool
|
|
set_config(PGconn *conn, const char *config_param, const char *config_value)
|
|
{
|
|
PQExpBufferData query;
|
|
bool result = false;
|
|
|
|
initPQExpBuffer(&query);
|
|
appendPQExpBuffer(&query,
|
|
"SET %s TO '%s'",
|
|
config_param,
|
|
config_value);
|
|
|
|
log_verbose(LOG_DEBUG, "set_config():\n %s", query.data);
|
|
|
|
result = _set_config(conn, config_param, query.data);
|
|
|
|
termPQExpBuffer(&query);
|
|
|
|
return result;
|
|
}
|
|
|
|
bool
|
|
set_config_bool(PGconn *conn, const char *config_param, bool state)
|
|
{
|
|
PQExpBufferData query;
|
|
bool result = false;
|
|
|
|
initPQExpBuffer(&query);
|
|
appendPQExpBuffer(&query,
|
|
"SET %s TO %s",
|
|
config_param,
|
|
state ? "TRUE" : "FALSE");
|
|
|
|
log_verbose(LOG_DEBUG, "set_config_bool():\n %s", query.data);
|
|
|
|
|
|
result = _set_config(conn, config_param, query.data);
|
|
|
|
termPQExpBuffer(&query);
|
|
|
|
return result;
|
|
}
|
|
|
|
|
|
int
|
|
guc_set(PGconn *conn, const char *parameter, const char *op,
|
|
const char *value)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
int retval = 1;
|
|
|
|
char *escaped_parameter = escape_string(conn, parameter);
|
|
char *escaped_value = escape_string(conn, value);
|
|
|
|
initPQExpBuffer(&query);
|
|
appendPQExpBuffer(&query,
|
|
"SELECT true FROM pg_catalog.pg_settings "
|
|
" WHERE name = '%s' AND setting %s '%s'",
|
|
escaped_parameter, op, escaped_value);
|
|
|
|
log_verbose(LOG_DEBUG, "guc_set():\n%s", query.data);
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, query.data, _("guc_set(): unable to execute query"));
|
|
retval = -1;
|
|
}
|
|
else if (PQntuples(res) == 0)
|
|
{
|
|
retval = 0;
|
|
}
|
|
|
|
pfree(escaped_parameter);
|
|
pfree(escaped_value);
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return retval;
|
|
}
|
|
|
|
|
|
bool
|
|
get_pg_setting(PGconn *conn, const char *setting, char *output)
|
|
{
|
|
bool success = _get_pg_setting(conn, setting, output, NULL, NULL);
|
|
|
|
if (success == true)
|
|
{
|
|
log_verbose(LOG_DEBUG, _("get_pg_setting(): returned value is \"%s\""), output);
|
|
}
|
|
|
|
return success;
|
|
}
|
|
|
|
bool
|
|
get_pg_setting_bool(PGconn *conn, const char *setting, bool *output)
|
|
{
|
|
bool success = _get_pg_setting(conn, setting, NULL, output, NULL);
|
|
|
|
if (success == true)
|
|
{
|
|
log_verbose(LOG_DEBUG, _("get_pg_setting(): returned value is \"%s\""),
|
|
*output == true ? "TRUE" : "FALSE");
|
|
}
|
|
|
|
return success;
|
|
}
|
|
|
|
bool
|
|
get_pg_setting_int(PGconn *conn, const char *setting, int *output)
|
|
{
|
|
bool success = _get_pg_setting(conn, setting, NULL, NULL, output);
|
|
|
|
if (success == true)
|
|
{
|
|
log_verbose(LOG_DEBUG, _("get_pg_setting_int(): returned value is \"%i\""), *output);
|
|
}
|
|
|
|
return success;
|
|
}
|
|
|
|
|
|
bool
|
|
_get_pg_setting(PGconn *conn, const char *setting, char *str_output, bool *bool_output, int *int_output)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
int i;
|
|
bool success = false;
|
|
|
|
char *escaped_setting = escape_string(conn, setting);
|
|
|
|
if (escaped_setting == NULL)
|
|
{
|
|
log_error(_("unable to escape setting \"%s\""), setting);
|
|
return false;
|
|
}
|
|
|
|
initPQExpBuffer(&query);
|
|
appendPQExpBuffer(&query,
|
|
"SELECT name, setting "
|
|
" FROM pg_catalog.pg_settings WHERE name = '%s'",
|
|
escaped_setting);
|
|
|
|
log_verbose(LOG_DEBUG, "get_pg_setting():\n %s", query.data);
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
pfree(escaped_setting);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, query.data, _("get_pg_setting() - unable to execute query"));
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return false;
|
|
}
|
|
|
|
for (i = 0; i < PQntuples(res); i++)
|
|
{
|
|
if (strcmp(PQgetvalue(res, i, 0), setting) == 0)
|
|
{
|
|
if (str_output != NULL)
|
|
{
|
|
snprintf(str_output, MAXLEN, "%s", PQgetvalue(res, i, 1));
|
|
}
|
|
else if (bool_output != NULL)
|
|
{
|
|
/*
|
|
* Note we assume the caller is sure this is a boolean parameter
|
|
*/
|
|
if (strncmp(PQgetvalue(res, i, 1), "on", MAXLEN) == 0)
|
|
*bool_output = true;
|
|
else
|
|
*bool_output = false;
|
|
}
|
|
else if (int_output != NULL)
|
|
{
|
|
*int_output = atoi(PQgetvalue(res, i, 1));
|
|
}
|
|
|
|
success = true;
|
|
break;
|
|
}
|
|
else
|
|
{
|
|
/* highly unlikely this would ever happen */
|
|
log_error(_("get_pg_setting(): unknown parameter \"%s\""), PQgetvalue(res, i, 0));
|
|
}
|
|
}
|
|
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return success;
|
|
}
|
|
|
|
|
|
|
|
bool
|
|
alter_system_int(PGconn *conn, const char *name, int value)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
bool success = true;
|
|
|
|
initPQExpBuffer(&query);
|
|
appendPQExpBuffer(&query,
|
|
"ALTER SYSTEM SET %s = %i",
|
|
name, value);
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
|
{
|
|
log_db_error(conn, query.data, _("alter_system_int() - unable to execute query"));
|
|
|
|
success = false;
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return success;
|
|
}
|
|
|
|
|
|
bool
|
|
pg_reload_conf(PGconn *conn)
|
|
{
|
|
PGresult *res = NULL;
|
|
bool success = true;
|
|
|
|
res = PQexec(conn, "SELECT pg_catalog.pg_reload_conf()");
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, NULL, _("pg_reload_conf() - unable to execute query"));
|
|
|
|
success = false;
|
|
}
|
|
|
|
PQclear(res);
|
|
|
|
return success;
|
|
}
|
|
|
|
|
|
/* ============================ */
|
|
/* Server information functions */
|
|
/* ============================ */
|
|
|
|
|
|
bool
|
|
get_cluster_size(PGconn *conn, char *size)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
bool success = true;
|
|
|
|
initPQExpBuffer(&query);
|
|
appendPQExpBufferStr(&query,
|
|
"SELECT pg_catalog.pg_size_pretty(pg_catalog.sum(pg_catalog.pg_database_size(oid))::bigint) "
|
|
" FROM pg_catalog.pg_database ");
|
|
|
|
log_verbose(LOG_DEBUG, "get_cluster_size():\n%s", query.data);
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
success = false;
|
|
else
|
|
snprintf(size, MAXLEN, "%s", PQgetvalue(res, 0, 0));
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return success;
|
|
}
|
|
|
|
|
|
/*
|
|
* Return the server version number for the connection provided
|
|
*/
|
|
int
|
|
get_server_version(PGconn *conn, char *server_version_buf)
|
|
{
|
|
PGresult *res = NULL;
|
|
int _server_version_num = UNKNOWN_SERVER_VERSION_NUM;
|
|
|
|
const char *sqlquery =
|
|
"SELECT pg_catalog.current_setting('server_version_num'), "
|
|
" pg_catalog.current_setting('server_version')";
|
|
|
|
res = PQexec(conn, sqlquery);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, sqlquery, _("unable to determine server version number"));
|
|
PQclear(res);
|
|
|
|
return UNKNOWN_SERVER_VERSION_NUM;
|
|
}
|
|
|
|
_server_version_num = atoi(PQgetvalue(res, 0, 0));
|
|
|
|
if (server_version_buf != NULL)
|
|
{
|
|
int i;
|
|
char _server_version_buf[MAXVERSIONSTR] = "";
|
|
|
|
memset(_server_version_buf, 0, MAXVERSIONSTR);
|
|
|
|
/*
|
|
* Some distributions may add extra info after the actual version number,
|
|
* e.g. "10.4 (Debian 10.4-2.pgdg90+1)", so copy everything up until the
|
|
* first space.
|
|
*/
|
|
|
|
snprintf(_server_version_buf, MAXVERSIONSTR, "%s", PQgetvalue(res, 0, 1));
|
|
|
|
for (i = 0; i < MAXVERSIONSTR; i++)
|
|
{
|
|
if (_server_version_buf[i] == ' ')
|
|
break;
|
|
|
|
*server_version_buf++ = _server_version_buf[i];
|
|
}
|
|
}
|
|
|
|
PQclear(res);
|
|
|
|
return _server_version_num;
|
|
}
|
|
|
|
|
|
RecoveryType
|
|
get_recovery_type(PGconn *conn)
|
|
{
|
|
PGresult *res = NULL;
|
|
RecoveryType recovery_type = RECTYPE_UNKNOWN;
|
|
|
|
const char *sqlquery = "SELECT pg_catalog.pg_is_in_recovery()";
|
|
|
|
log_verbose(LOG_DEBUG, "get_recovery_type(): %s", sqlquery);
|
|
|
|
res = PQexec(conn, sqlquery);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn,
|
|
sqlquery,
|
|
_("unable to determine if server is in recovery"));
|
|
|
|
recovery_type = RECTYPE_UNKNOWN;
|
|
}
|
|
else if (PQntuples(res) == 1)
|
|
{
|
|
if (strcmp(PQgetvalue(res, 0, 0), "f") == 0)
|
|
{
|
|
recovery_type = RECTYPE_PRIMARY;
|
|
}
|
|
else
|
|
{
|
|
recovery_type = RECTYPE_STANDBY;
|
|
}
|
|
}
|
|
|
|
PQclear(res);
|
|
|
|
return recovery_type;
|
|
}
|
|
|
|
/*
|
|
* Read the node list from the provided connection and attempt to connect to each node
|
|
* in turn to definitely establish if it's the cluster primary.
|
|
*
|
|
* The node list is returned in the order which makes it likely that the
|
|
* current primary will be returned first, reducing the number of speculative
|
|
* connections which need to be made to other nodes.
|
|
*
|
|
* If primary_conninfo_out points to allocated memory of MAXCONNINFO in length,
|
|
* the primary server's conninfo string will be copied there.
|
|
*/
|
|
|
|
PGconn *
|
|
_get_primary_connection(PGconn *conn,
|
|
int *primary_id, char *primary_conninfo_out, bool quiet)
|
|
{
|
|
PQExpBufferData query;
|
|
|
|
PGconn *remote_conn = NULL;
|
|
PGresult *res = NULL;
|
|
|
|
char remote_conninfo_stack[MAXCONNINFO];
|
|
char *remote_conninfo = &*remote_conninfo_stack;
|
|
|
|
int i,
|
|
node_id;
|
|
|
|
/*
|
|
* If the caller wanted to get a copy of the connection info string, sub
|
|
* out the local stack pointer for the pointer passed by the caller.
|
|
*/
|
|
if (primary_conninfo_out != NULL)
|
|
remote_conninfo = primary_conninfo_out;
|
|
|
|
if (primary_id != NULL)
|
|
{
|
|
*primary_id = NODE_NOT_FOUND;
|
|
}
|
|
|
|
/* find all registered nodes */
|
|
log_verbose(LOG_INFO, _("searching for primary node"));
|
|
|
|
initPQExpBuffer(&query);
|
|
appendPQExpBufferStr(&query,
|
|
" SELECT node_id, conninfo, "
|
|
" CASE WHEN type = 'primary' THEN 1 ELSE 2 END AS type_priority"
|
|
" FROM repmgr.nodes "
|
|
" WHERE active IS TRUE "
|
|
" AND type != 'witness' "
|
|
"ORDER BY active DESC, type_priority, priority, node_id");
|
|
|
|
log_verbose(LOG_DEBUG, "get_primary_connection():\n%s", query.data);
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, query.data, _("_get_primary_connection(): unable to retrieve node records"));
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return NULL;
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
|
|
for (i = 0; i < PQntuples(res); i++)
|
|
{
|
|
RecoveryType recovery_type;
|
|
|
|
/* initialize with the values of the current node being processed */
|
|
node_id = atoi(PQgetvalue(res, i, 0));
|
|
snprintf(remote_conninfo, MAXCONNINFO, "%s", PQgetvalue(res, i, 1));
|
|
|
|
log_verbose(LOG_INFO,
|
|
_("checking if node %i is primary"),
|
|
node_id);
|
|
|
|
if (quiet)
|
|
{
|
|
remote_conn = establish_db_connection_quiet(remote_conninfo);
|
|
}
|
|
else
|
|
{
|
|
remote_conn = establish_db_connection(remote_conninfo, false);
|
|
}
|
|
|
|
if (PQstatus(remote_conn) != CONNECTION_OK)
|
|
{
|
|
PQfinish(remote_conn);
|
|
remote_conn = NULL;
|
|
continue;
|
|
}
|
|
|
|
recovery_type = get_recovery_type(remote_conn);
|
|
|
|
if (recovery_type == RECTYPE_UNKNOWN)
|
|
{
|
|
log_warning(_("unable to retrieve recovery state from node %i"),
|
|
node_id);
|
|
|
|
PQfinish(remote_conn);
|
|
continue;
|
|
}
|
|
|
|
if (recovery_type == RECTYPE_PRIMARY)
|
|
{
|
|
PQclear(res);
|
|
log_verbose(LOG_INFO, _("current primary node is %i"), node_id);
|
|
|
|
if (primary_id != NULL)
|
|
{
|
|
*primary_id = node_id;
|
|
}
|
|
|
|
return remote_conn;
|
|
}
|
|
|
|
PQfinish(remote_conn);
|
|
}
|
|
|
|
PQclear(res);
|
|
return NULL;
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
* Return the id of the active primary node, or NODE_NOT_FOUND if no
|
|
* record available.
|
|
*
|
|
* This reports the value stored in the database only and
|
|
* does not verify whether the node is actually available
|
|
*/
|
|
int
|
|
get_primary_node_id(PGconn *conn)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
int retval = NODE_NOT_FOUND;
|
|
|
|
initPQExpBuffer(&query);
|
|
appendPQExpBufferStr(&query,
|
|
"SELECT node_id "
|
|
" FROM repmgr.nodes "
|
|
" WHERE type = 'primary' "
|
|
" AND active IS TRUE ");
|
|
|
|
log_verbose(LOG_DEBUG, "get_primary_node_id():\n%s", query.data);
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, query.data, _("get_primary_node_id(): unable to execute query"));
|
|
retval = UNKNOWN_NODE_ID;
|
|
}
|
|
else if (PQntuples(res) == 0)
|
|
{
|
|
log_verbose(LOG_WARNING, _("get_primary_node_id(): no active primary found"));
|
|
retval = NODE_NOT_FOUND;
|
|
}
|
|
else
|
|
{
|
|
retval = atoi(PQgetvalue(res, 0, 0));
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return retval;
|
|
}
|
|
|
|
|
|
|
|
|
|
int
|
|
get_ready_archive_files(PGconn *conn, const char *data_directory)
|
|
{
|
|
char archive_status_dir[MAXPGPATH] = "";
|
|
struct stat statbuf;
|
|
struct dirent *arcdir_ent;
|
|
DIR *arcdir;
|
|
|
|
int ready_count = 0;
|
|
|
|
if (PQserverVersion(conn) >= 100000)
|
|
{
|
|
snprintf(archive_status_dir, MAXPGPATH,
|
|
"%s/pg_wal/archive_status",
|
|
data_directory);
|
|
}
|
|
else
|
|
{
|
|
snprintf(archive_status_dir, MAXPGPATH,
|
|
"%s/pg_xlog/archive_status",
|
|
data_directory);
|
|
}
|
|
|
|
/* sanity-check directory path */
|
|
if (stat(archive_status_dir, &statbuf) == -1)
|
|
{
|
|
log_error(_("unable to access archive_status directory \"%s\""),
|
|
archive_status_dir);
|
|
log_detail("%s", strerror(errno));
|
|
|
|
return ARCHIVE_STATUS_DIR_ERROR;
|
|
}
|
|
|
|
arcdir = opendir(archive_status_dir);
|
|
|
|
if (arcdir == NULL)
|
|
{
|
|
log_error(_("unable to open archive directory \"%s\""),
|
|
archive_status_dir);
|
|
log_detail("%s", strerror(errno));
|
|
|
|
return ARCHIVE_STATUS_DIR_ERROR;
|
|
}
|
|
|
|
while ((arcdir_ent = readdir(arcdir)) != NULL)
|
|
{
|
|
struct stat local_statbuf;
|
|
char file_path[MAXPGPATH + sizeof(arcdir_ent->d_name)];
|
|
int basenamelen = 0;
|
|
|
|
snprintf(file_path, sizeof(file_path),
|
|
"%s/%s",
|
|
archive_status_dir,
|
|
arcdir_ent->d_name);
|
|
|
|
/* skip non-files */
|
|
if (stat(file_path, &local_statbuf) == 0 && !S_ISREG(local_statbuf.st_mode))
|
|
{
|
|
continue;
|
|
}
|
|
|
|
basenamelen = (int) strlen(arcdir_ent->d_name) - 6;
|
|
|
|
/*
|
|
* count anything ending in ".ready"; for a more precise
|
|
* implementation see: src/backend/postmaster/pgarch.c
|
|
*/
|
|
if (strcmp(arcdir_ent->d_name + basenamelen, ".ready") == 0)
|
|
ready_count++;
|
|
}
|
|
|
|
closedir(arcdir);
|
|
|
|
return ready_count;
|
|
}
|
|
|
|
|
|
bool
|
|
identify_system(PGconn *repl_conn, t_system_identification *identification)
|
|
{
|
|
PGresult *res = NULL;
|
|
|
|
/* semicolon required here */
|
|
res = PQexec(repl_conn, "IDENTIFY_SYSTEM;");
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK || !PQntuples(res))
|
|
{
|
|
log_db_error(repl_conn, NULL, _("unable to execute IDENTIFY_SYSTEM"));
|
|
|
|
PQclear(res);
|
|
return false;
|
|
}
|
|
|
|
#if defined(__i386__) || defined(__i386)
|
|
identification->system_identifier = atoll(PQgetvalue(res, 0, 0));
|
|
#else
|
|
identification->system_identifier = atol(PQgetvalue(res, 0, 0));
|
|
#endif
|
|
|
|
identification->timeline = atoi(PQgetvalue(res, 0, 1));
|
|
identification->xlogpos = parse_lsn(PQgetvalue(res, 0, 2));
|
|
|
|
PQclear(res);
|
|
return true;
|
|
}
|
|
|
|
|
|
/*
|
|
* Return the system identifier by querying pg_control_system().
|
|
*
|
|
* Note there is a similar function in controldata.c ("get_system_identifier()")
|
|
* which reads the control file.
|
|
*/
|
|
uint64
|
|
system_identifier(PGconn *conn)
|
|
{
|
|
uint64 system_identifier = UNKNOWN_SYSTEM_IDENTIFIER;
|
|
PGresult *res = NULL;
|
|
|
|
/*
|
|
* pg_control_system() was introduced in PostgreSQL 9.6
|
|
*/
|
|
if (PQserverVersion(conn) < 90600)
|
|
{
|
|
return UNKNOWN_SYSTEM_IDENTIFIER;
|
|
}
|
|
|
|
res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, NULL, _("system_identifier(): unable to query pg_control_system()"));
|
|
}
|
|
else
|
|
{
|
|
#if defined(__i386__) || defined(__i386)
|
|
system_identifier = atoll(PQgetvalue(res, 0, 0));
|
|
#else
|
|
system_identifier = atol(PQgetvalue(res, 0, 0));
|
|
#endif
|
|
}
|
|
|
|
PQclear(res);
|
|
|
|
return system_identifier;
|
|
}
|
|
|
|
|
|
TimeLineHistoryEntry *
|
|
get_timeline_history(PGconn *repl_conn, TimeLineID tli)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
|
|
PQExpBufferData result;
|
|
char *resptr;
|
|
|
|
TimeLineHistoryEntry *history;
|
|
TimeLineID file_tli = UNKNOWN_TIMELINE_ID;
|
|
uint32 switchpoint_hi;
|
|
uint32 switchpoint_lo;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBuffer(&query,
|
|
"TIMELINE_HISTORY %i",
|
|
(int)tli);
|
|
|
|
res = PQexec(repl_conn, query.data);
|
|
log_verbose(LOG_DEBUG, "get_timeline_history():\n%s", query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(repl_conn, query.data, _("get_timeline_history(): unable to execute query"));
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
return NULL;
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
|
|
if (PQntuples(res) != 1 || PQnfields(res) != 2)
|
|
{
|
|
log_error(_("unexpected response to TIMELINE_HISTORY command"));
|
|
log_detail(_("got %i rows and %i fields, expected %i rows and %i fields"),
|
|
PQntuples(res), PQnfields(res), 1, 2);
|
|
PQclear(res);
|
|
return NULL;
|
|
}
|
|
|
|
initPQExpBuffer(&result);
|
|
appendPQExpBufferStr(&result, PQgetvalue(res, 0, 1));
|
|
PQclear(res);
|
|
|
|
resptr = result.data;
|
|
|
|
while (*resptr)
|
|
{
|
|
char buf[MAXLEN];
|
|
char *bufptr = buf;
|
|
|
|
if (*resptr != '\n')
|
|
{
|
|
int len = 0;
|
|
|
|
memset(buf, 0, MAXLEN);
|
|
|
|
while (*resptr && *resptr != '\n' && len < MAXLEN)
|
|
{
|
|
*bufptr++ = *resptr++;
|
|
len++;
|
|
}
|
|
|
|
if (buf[0])
|
|
{
|
|
int nfields = sscanf(buf,
|
|
"%u\t%X/%X",
|
|
&file_tli, &switchpoint_hi, &switchpoint_lo);
|
|
if (nfields == 3 && file_tli == tli - 1)
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (*resptr)
|
|
resptr++;
|
|
}
|
|
|
|
termPQExpBuffer(&result);
|
|
|
|
if (file_tli == UNKNOWN_TIMELINE_ID || file_tli != tli - 1)
|
|
{
|
|
log_error(_("timeline %i not found in timeline history file content"), tli);
|
|
log_detail(_("content is: \"%s\""), result.data);
|
|
return NULL;
|
|
}
|
|
|
|
history = (TimeLineHistoryEntry *) palloc(sizeof(TimeLineHistoryEntry));
|
|
history->tli = file_tli;
|
|
history->begin = InvalidXLogRecPtr; /* we don't care about this */
|
|
history->end = ((uint64) (switchpoint_hi)) << 32 | (uint64) switchpoint_lo;
|
|
|
|
return history;
|
|
}
|
|
|
|
|
|
pid_t
|
|
get_wal_receiver_pid(PGconn *conn)
|
|
{
|
|
PGresult *res = NULL;
|
|
pid_t wal_receiver_pid = UNKNOWN_PID;
|
|
|
|
res = PQexec(conn, "SELECT repmgr.get_wal_receiver_pid()");
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_error(_("unable to execute \"SELECT repmgr.get_wal_receiver_pid()\""));
|
|
log_detail("%s", PQerrorMessage(conn));
|
|
}
|
|
else if (!PQgetisnull(res, 0, 0))
|
|
{
|
|
wal_receiver_pid = atoi(PQgetvalue(res, 0, 0));
|
|
}
|
|
|
|
PQclear(res);
|
|
|
|
return wal_receiver_pid;
|
|
}
|
|
|
|
|
|
/* =============================== */
|
|
/* user/role information functions */
|
|
/* =============================== */
|
|
|
|
|
|
/*
|
|
* Determine if the user associated with the current connection can execute CHECKPOINT command.
|
|
* User must be a supersuer or a member of the pg_checkpoint default role (available from PostgreSQL 15).
|
|
*/
|
|
bool
|
|
can_execute_checkpoint(PGconn *conn)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res;
|
|
bool has_pg_checkpoint_role = false;
|
|
|
|
/* superusers can do anything, no role check needed */
|
|
if (is_superuser_connection(conn, NULL) == true)
|
|
return true;
|
|
|
|
/* pg_checkpoint available from PostgreSQL 15 */
|
|
if (PQserverVersion(conn) < 150000)
|
|
return false;
|
|
|
|
initPQExpBuffer(&query);
|
|
appendPQExpBufferStr(&query,
|
|
" SELECT pg_catalog.pg_has_role('pg_checkpoint','USAGE') ");
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, query.data,
|
|
_("can_execute_checkpoint(): unable to query user roles"));
|
|
}
|
|
else
|
|
{
|
|
has_pg_checkpoint_role = atobool(PQgetvalue(res, 0, 0));
|
|
}
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return has_pg_checkpoint_role;
|
|
}
|
|
|
|
|
|
/*
|
|
* Determine if the user associated with the current connection
|
|
* has sufficient permissions to use pg_promote function
|
|
*/
|
|
bool
|
|
can_execute_pg_promote(PGconn *conn)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res;
|
|
bool has_pg_promote= false;
|
|
|
|
/* pg_promote() available from PostgreSQL 12 */
|
|
if (PQserverVersion(conn) < 120000)
|
|
return false;
|
|
|
|
initPQExpBuffer(&query);
|
|
appendPQExpBufferStr(&query,
|
|
" SELECT pg_catalog.has_function_privilege( "
|
|
" CURRENT_USER, "
|
|
" 'pg_catalog.pg_promote(bool,int)', "
|
|
" 'execute' "
|
|
" )");
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, query.data,
|
|
_("can_execute_pg_promote(): unable to query user function privilege"));
|
|
}
|
|
else
|
|
{
|
|
has_pg_promote = atobool(PQgetvalue(res, 0, 0));
|
|
}
|
|
termPQExpBuffer(&query);
|
|
|
|
return has_pg_promote;
|
|
}
|
|
|
|
|
|
/*
|
|
* Determine if the user associated with the current connection
|
|
* has sufficient permissions to disable the walsender
|
|
*/
|
|
bool
|
|
can_disable_walsender(PGconn *conn)
|
|
{
|
|
/*
|
|
* Requires PostgreSQL 9.5 or later, because ALTER SYSTEM
|
|
*/
|
|
if (PQserverVersion(conn) < 90500)
|
|
{
|
|
log_warning(_("\"standby_disconnect_on_failover\" specified, but not available for this PostgreSQL version"));
|
|
/* TODO: format server version */
|
|
log_detail(_("available from PostgreSQL 9.5; this PostgreSQL version is %i"), PQserverVersion(conn));
|
|
|
|
return false;
|
|
}
|
|
|
|
/*
|
|
* Superusers can do anything
|
|
*/
|
|
if (is_superuser_connection(conn, NULL) == true)
|
|
return true;
|
|
|
|
PQExpBufferData query;
|
|
PGresult *res;
|
|
bool has_alter_system_priv = false;
|
|
|
|
/* GRANT ALTER SYSTEM available from PostgreSQL 15 */
|
|
if (PQserverVersion(conn) >= 150000)
|
|
{
|
|
initPQExpBuffer(&query);
|
|
appendPQExpBufferStr(&query,
|
|
" SELECT pg_catalog.has_parameter_privilege('wal_retrieve_retry_interval', 'ALTER SYSTEM') ");
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, query.data,
|
|
_("can_disable_walsender(): unable to query user parameter privileges"));
|
|
}
|
|
else
|
|
{
|
|
has_alter_system_priv = atobool(PQgetvalue(res, 0, 0));
|
|
}
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
}
|
|
|
|
if (has_alter_system_priv == false)
|
|
{
|
|
log_warning(_("\"standby_disconnect_on_failover\" specified, but repmgr user is not authorized to perform ALTER SYSTEM wal_retrieve_retry_interval"));
|
|
|
|
if (PQserverVersion(conn) >= 150000)
|
|
{
|
|
log_detail(_("superuser or ALTER SYSTEM wal_retrieve_retry_interval permission required to disable standbys on failover"));
|
|
}
|
|
else
|
|
{
|
|
log_detail(_("superuser permission required to disable standbys on failover"));
|
|
}
|
|
}
|
|
|
|
return has_alter_system_priv;
|
|
}
|
|
|
|
/*
|
|
* Determine if the user associated with the current connection is
|
|
* a member of the "pg_monitor" default role, or optionally one
|
|
* of its three constituent "subroles".
|
|
*/
|
|
bool
|
|
connection_has_pg_monitor_role(PGconn *conn, const char *subrole)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res;
|
|
bool has_pg_monitor_role = false;
|
|
|
|
/* superusers can read anything, no role check needed */
|
|
if (is_superuser_connection(conn, NULL) == true)
|
|
return true;
|
|
|
|
/* pg_monitor and associated "subroles" introduced in PostgreSQL 10 */
|
|
if (PQserverVersion(conn) < 100000)
|
|
return false;
|
|
|
|
initPQExpBuffer(&query);
|
|
appendPQExpBufferStr(&query,
|
|
" SELECT CASE "
|
|
" WHEN pg_catalog.pg_has_role('pg_monitor','USAGE') "
|
|
" THEN TRUE ");
|
|
|
|
if (subrole != NULL)
|
|
{
|
|
appendPQExpBuffer(&query,
|
|
" WHEN pg_catalog.pg_has_role('%s','USAGE') "
|
|
" THEN TRUE ",
|
|
subrole);
|
|
}
|
|
|
|
appendPQExpBufferStr(&query,
|
|
" ELSE FALSE "
|
|
" END AS has_pg_monitor");
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, query.data,
|
|
_("connection_has_pg_monitor_role(): unable to query user roles"));
|
|
}
|
|
else
|
|
{
|
|
has_pg_monitor_role = atobool(PQgetvalue(res, 0, 0));
|
|
}
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return has_pg_monitor_role;
|
|
}
|
|
|
|
|
|
bool
|
|
is_replication_role(PGconn *conn, char *rolname)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res;
|
|
bool is_replication_role = false;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBufferStr(&query,
|
|
" SELECT rolreplication "
|
|
" FROM pg_catalog.pg_roles "
|
|
" WHERE rolname = ");
|
|
|
|
if (rolname != NULL)
|
|
{
|
|
appendPQExpBuffer(&query,
|
|
"'%s'",
|
|
rolname);
|
|
}
|
|
else
|
|
{
|
|
appendPQExpBufferStr(&query,
|
|
"CURRENT_USER");
|
|
}
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, query.data,
|
|
_("is_replication_role(): unable to query user roles"));
|
|
}
|
|
else
|
|
{
|
|
is_replication_role = atobool(PQgetvalue(res, 0, 0));
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return is_replication_role;
|
|
}
|
|
|
|
|
|
bool
|
|
is_superuser_connection(PGconn *conn, t_connection_user *userinfo)
|
|
{
|
|
bool is_superuser = false;
|
|
const char *current_user = PQuser(conn);
|
|
const char *superuser_status = PQparameterStatus(conn, "is_superuser");
|
|
|
|
is_superuser = (strcmp(superuser_status, "on") == 0) ? true : false;
|
|
|
|
if (userinfo != NULL)
|
|
{
|
|
snprintf(userinfo->username,
|
|
sizeof(userinfo->username),
|
|
"%s", current_user);
|
|
userinfo->is_superuser = is_superuser;
|
|
}
|
|
|
|
return is_superuser;
|
|
}
|
|
|
|
|
|
/* =============================== */
|
|
/* repmgrd shared memory functions */
|
|
/* =============================== */
|
|
|
|
bool
|
|
repmgrd_set_local_node_id(PGconn *conn, int local_node_id)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
bool success = true;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBuffer(&query,
|
|
"SELECT repmgr.set_local_node_id(%i)",
|
|
local_node_id);
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, query.data, _("repmgrd_set_local_node_id(): unable to execute query"));
|
|
|
|
success = false;
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return success;
|
|
}
|
|
|
|
|
|
int
|
|
repmgrd_get_local_node_id(PGconn *conn)
|
|
{
|
|
PGresult *res = NULL;
|
|
int local_node_id = UNKNOWN_NODE_ID;
|
|
|
|
const char *sqlquery = "SELECT repmgr.get_local_node_id()";
|
|
|
|
res = PQexec(conn, sqlquery);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, sqlquery, _("repmgrd_get_local_node_id(): unable to execute query"));
|
|
}
|
|
else if (!PQgetisnull(res, 0, 0))
|
|
{
|
|
local_node_id = atoi(PQgetvalue(res, 0, 0));
|
|
}
|
|
|
|
PQclear(res);
|
|
|
|
return local_node_id;
|
|
}
|
|
|
|
|
|
bool
|
|
repmgrd_check_local_node_id(PGconn *conn)
|
|
{
|
|
PGresult *res = NULL;
|
|
bool node_id_settable = true;
|
|
const char *sqlquery = "SELECT repmgr.get_local_node_id()";
|
|
|
|
res = PQexec(conn, sqlquery);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, sqlquery, _("repmgrd_get_local_node_id(): unable to execute query"));
|
|
}
|
|
|
|
if (PQgetisnull(res, 0, 0))
|
|
{
|
|
node_id_settable = false;
|
|
}
|
|
|
|
PQclear(res);
|
|
|
|
return node_id_settable;
|
|
}
|
|
|
|
|
|
/*
|
|
* Function that checks if the primary is in exclusive backup mode.
|
|
* We'll use this when executing an action can conflict with an exclusive
|
|
* backup.
|
|
*/
|
|
BackupState
|
|
server_in_exclusive_backup_mode(PGconn *conn)
|
|
{
|
|
BackupState backup_state = BACKUP_STATE_UNKNOWN;
|
|
const char *sqlquery = "SELECT pg_catalog.pg_is_in_backup()";
|
|
PGresult *res = NULL;
|
|
|
|
/* Exclusive backup removed from PostgreSQL 15 */
|
|
if (PQserverVersion(conn) >= 150000)
|
|
return BACKUP_STATE_NO_BACKUP;
|
|
|
|
res = PQexec(conn, sqlquery);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, sqlquery, _("unable to retrieve information regarding backup mode of node"));
|
|
|
|
backup_state = BACKUP_STATE_UNKNOWN;
|
|
}
|
|
else if (atobool(PQgetvalue(res, 0, 0)) == true)
|
|
{
|
|
backup_state = BACKUP_STATE_IN_BACKUP;
|
|
}
|
|
else
|
|
{
|
|
backup_state = BACKUP_STATE_NO_BACKUP;
|
|
}
|
|
|
|
PQclear(res);
|
|
|
|
return backup_state;
|
|
}
|
|
|
|
|
|
void
|
|
repmgrd_set_pid(PGconn *conn, pid_t repmgrd_pid, const char *pidfile)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
|
|
log_verbose(LOG_DEBUG, "repmgrd_set_pid(): pid is %i", (int) repmgrd_pid);
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBuffer(&query,
|
|
"SELECT repmgr.set_repmgrd_pid(%i, ",
|
|
(int) repmgrd_pid);
|
|
|
|
if (pidfile != NULL)
|
|
{
|
|
appendPQExpBuffer(&query,
|
|
" '%s')",
|
|
pidfile);
|
|
}
|
|
else
|
|
{
|
|
appendPQExpBufferStr(&query,
|
|
" NULL)");
|
|
}
|
|
|
|
res = PQexec(conn, query.data);
|
|
termPQExpBuffer(&query);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_error(_("unable to execute \"SELECT repmgr.set_repmgrd_pid()\""));
|
|
log_detail("%s", PQerrorMessage(conn));
|
|
}
|
|
|
|
PQclear(res);
|
|
|
|
return;
|
|
}
|
|
|
|
|
|
pid_t
|
|
repmgrd_get_pid(PGconn *conn)
|
|
{
|
|
PGresult *res = NULL;
|
|
pid_t repmgrd_pid = UNKNOWN_PID;
|
|
|
|
res = PQexec(conn, "SELECT repmgr.get_repmgrd_pid()");
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_error(_("unable to execute \"SELECT repmgr.get_repmgrd_pid()\""));
|
|
log_detail("%s", PQerrorMessage(conn));
|
|
}
|
|
else if (!PQgetisnull(res, 0, 0))
|
|
{
|
|
repmgrd_pid = atoi(PQgetvalue(res, 0, 0));
|
|
}
|
|
|
|
PQclear(res);
|
|
|
|
return repmgrd_pid;
|
|
}
|
|
|
|
|
|
bool
|
|
repmgrd_is_running(PGconn *conn)
|
|
{
|
|
PGresult *res = NULL;
|
|
bool is_running = false;
|
|
|
|
res = PQexec(conn, "SELECT repmgr.repmgrd_is_running()");
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_error(_("unable to execute \"SELECT repmgr.repmgrd_is_running()\""));
|
|
log_detail("%s", PQerrorMessage(conn));
|
|
}
|
|
else if (!PQgetisnull(res, 0, 0))
|
|
{
|
|
is_running = atobool(PQgetvalue(res, 0, 0));
|
|
}
|
|
|
|
PQclear(res);
|
|
|
|
return is_running;
|
|
}
|
|
|
|
|
|
bool
|
|
repmgrd_is_paused(PGconn *conn)
|
|
{
|
|
PGresult *res = NULL;
|
|
bool is_paused = false;
|
|
|
|
res = PQexec(conn, "SELECT repmgr.repmgrd_is_paused()");
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_error(_("unable to execute \"SELECT repmgr.repmgrd_is_paused()\""));
|
|
log_detail("%s", PQerrorMessage(conn));
|
|
}
|
|
else if (!PQgetisnull(res, 0, 0))
|
|
{
|
|
is_paused = atobool(PQgetvalue(res, 0, 0));
|
|
}
|
|
|
|
PQclear(res);
|
|
|
|
return is_paused;
|
|
}
|
|
|
|
|
|
bool
|
|
repmgrd_pause(PGconn *conn, bool pause)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
bool success = true;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBuffer(&query,
|
|
"SELECT repmgr.repmgrd_pause(%s)",
|
|
pause == true ? "TRUE" : "FALSE");
|
|
res = PQexec(conn, query.data);
|
|
termPQExpBuffer(&query);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_error(_("unable to execute \"SELECT repmgr.repmgrd_pause()\""));
|
|
log_detail("%s", PQerrorMessage(conn));
|
|
|
|
success = false;
|
|
}
|
|
|
|
PQclear(res);
|
|
|
|
return success;
|
|
}
|
|
|
|
|
|
int
|
|
repmgrd_get_upstream_node_id(PGconn *conn)
|
|
{
|
|
PGresult *res = NULL;
|
|
int upstream_node_id = UNKNOWN_NODE_ID;
|
|
|
|
const char *sqlquery = "SELECT repmgr.get_upstream_node_id()";
|
|
|
|
res = PQexec(conn, sqlquery);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, sqlquery, _("repmgrd_get_upstream_node_id(): unable to execute query"));
|
|
}
|
|
else if (!PQgetisnull(res, 0, 0))
|
|
{
|
|
upstream_node_id = atoi(PQgetvalue(res, 0, 0));
|
|
}
|
|
|
|
PQclear(res);
|
|
|
|
return upstream_node_id;
|
|
}
|
|
|
|
|
|
bool
|
|
repmgrd_set_upstream_node_id(PGconn *conn, int node_id)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
bool success = true;
|
|
|
|
initPQExpBuffer(&query);
|
|
appendPQExpBuffer(&query,
|
|
" SELECT repmgr.set_upstream_node_id(%i) ",
|
|
node_id);
|
|
|
|
log_verbose(LOG_DEBUG, "repmgrd_set_upstream_node_id():\n %s", query.data);
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, query.data,
|
|
_("repmgrd_set_upstream_node_id(): unable to set upstream node ID (provided value: %i)"), node_id);
|
|
success = false;
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return success;
|
|
}
|
|
|
|
/* ================ */
|
|
/* result functions */
|
|
/* ================ */
|
|
|
|
bool
|
|
atobool(const char *value)
|
|
{
|
|
return (strcmp(value, "t") == 0)
|
|
? true
|
|
: false;
|
|
}
|
|
|
|
|
|
/* =================== */
|
|
/* extension functions */
|
|
/* =================== */
|
|
|
|
ExtensionStatus
|
|
get_repmgr_extension_status(PGconn *conn, t_extension_versions *extversions)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
ExtensionStatus status = REPMGR_UNKNOWN;
|
|
|
|
/* TODO: check version */
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBufferStr(&query,
|
|
" SELECT ae.name, e.extname, "
|
|
" ae.default_version, "
|
|
" (((FLOOR(ae.default_version::NUMERIC)::INT) * 10000) + (ae.default_version::NUMERIC - FLOOR(ae.default_version::NUMERIC)::INT) * 1000)::INT AS available, "
|
|
" ae.installed_version, "
|
|
" (((FLOOR(ae.installed_version::NUMERIC)::INT) * 10000) + (ae.installed_version::NUMERIC - FLOOR(ae.installed_version::NUMERIC)::INT) * 1000)::INT AS installed "
|
|
" FROM pg_catalog.pg_available_extensions ae "
|
|
"LEFT JOIN pg_catalog.pg_extension e "
|
|
" ON e.extname=ae.name "
|
|
" WHERE ae.name='repmgr' ");
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, query.data, _("get_repmgr_extension_status(): unable to execute extension query"));
|
|
status = REPMGR_UNKNOWN;
|
|
}
|
|
|
|
/* 1. Check extension is actually available */
|
|
else if (PQntuples(res) == 0)
|
|
{
|
|
status = REPMGR_UNAVAILABLE;
|
|
}
|
|
|
|
/* 2. Check if extension installed */
|
|
else if (PQgetisnull(res, 0, 1) == 0)
|
|
{
|
|
int available_version = atoi(PQgetvalue(res, 0, 3));
|
|
int installed_version = atoi(PQgetvalue(res, 0, 5));
|
|
|
|
/* caller wants to know which versions are installed/available */
|
|
if (extversions != NULL)
|
|
{
|
|
snprintf(extversions->default_version,
|
|
sizeof(extversions->default_version),
|
|
"%s", PQgetvalue(res, 0, 2));
|
|
extversions->default_version_num = available_version;
|
|
snprintf(extversions->installed_version,
|
|
sizeof(extversions->installed_version),
|
|
"%s", PQgetvalue(res, 0, 4));
|
|
extversions->installed_version_num = installed_version;
|
|
}
|
|
|
|
if (available_version > installed_version)
|
|
{
|
|
status = REPMGR_OLD_VERSION_INSTALLED;
|
|
}
|
|
else
|
|
{
|
|
status = REPMGR_INSTALLED;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
status = REPMGR_AVAILABLE;
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return status;
|
|
}
|
|
|
|
/* ========================= */
|
|
/* node management functions */
|
|
/* ========================= */
|
|
|
|
/*
|
|
* Assumes the connection can execute CHECKPOINT command.
|
|
* A check can be executed via 'can_execute_checkpoint' function.
|
|
*/
|
|
void
|
|
checkpoint(PGconn *conn)
|
|
{
|
|
PGresult *res = NULL;
|
|
|
|
res = PQexec(conn, "CHECKPOINT");
|
|
|
|
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
|
{
|
|
log_db_error(conn, NULL, _("unable to execute CHECKPOINT"));
|
|
}
|
|
|
|
PQclear(res);
|
|
return;
|
|
}
|
|
|
|
|
|
bool
|
|
vacuum_table(PGconn *primary_conn, const char *table)
|
|
{
|
|
PQExpBufferData query;
|
|
bool success = true;
|
|
PGresult *res = NULL;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBuffer(&query, "VACUUM %s", table);
|
|
|
|
res = PQexec(primary_conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
|
{
|
|
log_db_error(primary_conn, NULL,
|
|
_("unable to vacuum table \"%s\""), table);
|
|
success = false;
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return success;
|
|
}
|
|
|
|
/*
|
|
* For use in PostgreSQL 12 and later
|
|
*/
|
|
bool
|
|
promote_standby(PGconn *conn, bool wait, int wait_seconds)
|
|
{
|
|
PQExpBufferData query;
|
|
bool success = true;
|
|
PGresult *res = NULL;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBuffer(&query,
|
|
"SELECT pg_catalog.pg_promote(wait := %s",
|
|
wait ? "TRUE" : "FALSE");
|
|
|
|
if (wait_seconds > 0)
|
|
{
|
|
appendPQExpBuffer(&query,
|
|
", wait_seconds := %i",
|
|
wait_seconds);
|
|
}
|
|
|
|
appendPQExpBufferStr(&query, ")");
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, query.data, _("unable to execute pg_promote()"));
|
|
success = false;
|
|
}
|
|
else
|
|
{
|
|
/* NOTE: if "wait" is false, pg_promote() will always return true */
|
|
success = atobool(PQgetvalue(res, 0, 0));
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return success;
|
|
}
|
|
|
|
|
|
bool
|
|
resume_wal_replay(PGconn *conn)
|
|
{
|
|
PGresult *res = NULL;
|
|
PQExpBufferData query;
|
|
bool success = true;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
if (PQserverVersion(conn) >= 100000)
|
|
{
|
|
appendPQExpBufferStr(&query,
|
|
"SELECT pg_catalog.pg_wal_replay_resume()");
|
|
}
|
|
else
|
|
{
|
|
appendPQExpBufferStr(&query,
|
|
"SELECT pg_catalog.pg_xlog_replay_resume()");
|
|
}
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, query.data, _("resume_wal_replay(): unable to resume WAL replay"));
|
|
success = false;
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return success;
|
|
}
|
|
|
|
|
|
/* ===================== */
|
|
/* Node record functions */
|
|
/* ===================== */
|
|
|
|
/*
|
|
* Note: init_defaults may only be false when the caller is refreshing a previously
|
|
* populated record.
|
|
*/
|
|
static RecordStatus
|
|
_get_node_record(PGconn *conn, char *sqlquery, t_node_info *node_info, bool init_defaults)
|
|
{
|
|
int ntuples = 0;
|
|
PGresult *res = PQexec(conn, sqlquery);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, sqlquery, _("_get_node_record(): unable to execute query"));
|
|
|
|
PQclear(res);
|
|
return RECORD_ERROR;
|
|
}
|
|
|
|
ntuples = PQntuples(res);
|
|
|
|
if (ntuples == 0)
|
|
{
|
|
PQclear(res);
|
|
return RECORD_NOT_FOUND;
|
|
}
|
|
|
|
_populate_node_record(res, node_info, 0, init_defaults);
|
|
|
|
PQclear(res);
|
|
|
|
return RECORD_FOUND;
|
|
}
|
|
|
|
|
|
/*
|
|
* Note: init_defaults may only be false when the caller is refreshing a previously
|
|
* populated record.
|
|
*/
|
|
static void
|
|
_populate_node_record(PGresult *res, t_node_info *node_info, int row, bool init_defaults)
|
|
{
|
|
node_info->node_id = atoi(PQgetvalue(res, row, 0));
|
|
node_info->type = parse_node_type(PQgetvalue(res, row, 1));
|
|
|
|
if (PQgetisnull(res, row, 2))
|
|
{
|
|
node_info->upstream_node_id = NO_UPSTREAM_NODE;
|
|
}
|
|
else
|
|
{
|
|
node_info->upstream_node_id = atoi(PQgetvalue(res, row, 2));
|
|
}
|
|
|
|
snprintf(node_info->node_name, sizeof(node_info->node_name), "%s", PQgetvalue(res, row, 3));
|
|
snprintf(node_info->conninfo, sizeof(node_info->conninfo), "%s", PQgetvalue(res, row, 4));
|
|
snprintf(node_info->repluser, sizeof(node_info->repluser), "%s", PQgetvalue(res, row, 5));
|
|
snprintf(node_info->slot_name, sizeof(node_info->slot_name), "%s", PQgetvalue(res, row, 6));
|
|
snprintf(node_info->location, sizeof(node_info->location), "%s", PQgetvalue(res, row, 7));
|
|
node_info->priority = atoi(PQgetvalue(res, row, 8));
|
|
node_info->active = atobool(PQgetvalue(res, row, 9));
|
|
snprintf(node_info->config_file, sizeof(node_info->config_file), "%s", PQgetvalue(res, row, 10));
|
|
|
|
/* These are only set by certain queries */
|
|
snprintf(node_info->upstream_node_name, sizeof(node_info->upstream_node_name), "%s", PQgetvalue(res, row, 11));
|
|
|
|
if (PQgetisnull(res, row, 12))
|
|
{
|
|
node_info->attached = NODE_ATTACHED_UNKNOWN;
|
|
}
|
|
else
|
|
{
|
|
node_info->attached = atobool(PQgetvalue(res, row, 12)) ? NODE_ATTACHED : NODE_DETACHED;
|
|
}
|
|
|
|
/* Set remaining struct fields with default values */
|
|
|
|
if (init_defaults == true)
|
|
{
|
|
node_info->node_status = NODE_STATUS_UNKNOWN;
|
|
node_info->recovery_type = RECTYPE_UNKNOWN;
|
|
node_info->last_wal_receive_lsn = InvalidXLogRecPtr;
|
|
node_info->monitoring_state = MS_NORMAL;
|
|
node_info->conn = NULL;
|
|
}
|
|
}
|
|
|
|
|
|
t_server_type
|
|
parse_node_type(const char *type)
|
|
{
|
|
if (strcmp(type, "primary") == 0)
|
|
{
|
|
return PRIMARY;
|
|
}
|
|
else if (strcmp(type, "standby") == 0)
|
|
{
|
|
return STANDBY;
|
|
}
|
|
else if (strcmp(type, "witness") == 0)
|
|
{
|
|
return WITNESS;
|
|
}
|
|
|
|
return UNKNOWN;
|
|
}
|
|
|
|
|
|
const char *
|
|
get_node_type_string(t_server_type type)
|
|
{
|
|
switch (type)
|
|
{
|
|
case PRIMARY:
|
|
return "primary";
|
|
case STANDBY:
|
|
return "standby";
|
|
case WITNESS:
|
|
return "witness";
|
|
/* this should never happen */
|
|
case UNKNOWN:
|
|
default:
|
|
log_error(_("unknown node type %i"), type);
|
|
return "unknown";
|
|
}
|
|
}
|
|
|
|
|
|
RecordStatus
|
|
get_node_record(PGconn *conn, int node_id, t_node_info *node_info)
|
|
{
|
|
PQExpBufferData query;
|
|
RecordStatus result;
|
|
|
|
initPQExpBuffer(&query);
|
|
appendPQExpBuffer(&query,
|
|
"SELECT " REPMGR_NODES_COLUMNS
|
|
" FROM repmgr.nodes n "
|
|
" WHERE n.node_id = %i",
|
|
node_id);
|
|
|
|
log_verbose(LOG_DEBUG, "get_node_record():\n %s", query.data);
|
|
|
|
result = _get_node_record(conn, query.data, node_info, true);
|
|
termPQExpBuffer(&query);
|
|
|
|
if (result == RECORD_NOT_FOUND)
|
|
{
|
|
log_verbose(LOG_DEBUG, "get_node_record(): no record found for node %i", node_id);
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
|
|
RecordStatus
|
|
refresh_node_record(PGconn *conn, int node_id, t_node_info *node_info)
|
|
{
|
|
PQExpBufferData query;
|
|
RecordStatus result;
|
|
|
|
initPQExpBuffer(&query);
|
|
appendPQExpBuffer(&query,
|
|
"SELECT " REPMGR_NODES_COLUMNS
|
|
" FROM repmgr.nodes n "
|
|
" WHERE n.node_id = %i",
|
|
node_id);
|
|
|
|
log_verbose(LOG_DEBUG, "get_node_record():\n %s", query.data);
|
|
|
|
result = _get_node_record(conn, query.data, node_info, false);
|
|
termPQExpBuffer(&query);
|
|
|
|
if (result == RECORD_NOT_FOUND)
|
|
{
|
|
log_verbose(LOG_DEBUG, "refresh_node_record(): no record found for node %i", node_id);
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
|
|
RecordStatus
|
|
get_node_record_with_upstream(PGconn *conn, int node_id, t_node_info *node_info)
|
|
{
|
|
PQExpBufferData query;
|
|
RecordStatus result;
|
|
|
|
initPQExpBuffer(&query);
|
|
appendPQExpBuffer(&query,
|
|
" SELECT " REPMGR_NODES_COLUMNS_WITH_UPSTREAM
|
|
" FROM repmgr.nodes n "
|
|
" LEFT JOIN repmgr.nodes un "
|
|
" ON un.node_id = n.upstream_node_id"
|
|
" WHERE n.node_id = %i",
|
|
node_id);
|
|
|
|
log_verbose(LOG_DEBUG, "get_node_record():\n %s", query.data);
|
|
|
|
result = _get_node_record(conn, query.data, node_info, true);
|
|
termPQExpBuffer(&query);
|
|
|
|
if (result == RECORD_NOT_FOUND)
|
|
{
|
|
log_verbose(LOG_DEBUG, "get_node_record(): no record found for node %i", node_id);
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
|
|
RecordStatus
|
|
get_node_record_by_name(PGconn *conn, const char *node_name, t_node_info *node_info)
|
|
{
|
|
PQExpBufferData query;
|
|
RecordStatus record_status = RECORD_NOT_FOUND;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBuffer(&query,
|
|
"SELECT " REPMGR_NODES_COLUMNS
|
|
" FROM repmgr.nodes n "
|
|
" WHERE n.node_name = '%s' ",
|
|
node_name);
|
|
|
|
log_verbose(LOG_DEBUG, "get_node_record_by_name():\n %s", query.data);
|
|
|
|
record_status = _get_node_record(conn, query.data, node_info, true);
|
|
|
|
termPQExpBuffer(&query);
|
|
|
|
if (record_status == RECORD_NOT_FOUND)
|
|
{
|
|
log_verbose(LOG_DEBUG, "get_node_record_by_name(): no record found for node \"%s\"",
|
|
node_name);
|
|
}
|
|
|
|
return record_status;
|
|
}
|
|
|
|
|
|
t_node_info *
|
|
get_node_record_pointer(PGconn *conn, int node_id)
|
|
{
|
|
t_node_info *node_info = pg_malloc0(sizeof(t_node_info));
|
|
RecordStatus record_status = RECORD_NOT_FOUND;
|
|
|
|
record_status = get_node_record(conn, node_id, node_info);
|
|
|
|
if (record_status != RECORD_FOUND)
|
|
{
|
|
pfree(node_info);
|
|
return NULL;
|
|
}
|
|
|
|
return node_info;
|
|
}
|
|
|
|
|
|
bool
|
|
get_primary_node_record(PGconn *conn, t_node_info *node_info)
|
|
{
|
|
RecordStatus record_status = RECORD_NOT_FOUND;
|
|
|
|
int primary_node_id = get_primary_node_id(conn);
|
|
|
|
if (primary_node_id == UNKNOWN_NODE_ID)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
record_status = get_node_record(conn, primary_node_id, node_info);
|
|
|
|
return record_status == RECORD_FOUND ? true : false;
|
|
}
|
|
|
|
|
|
/*
|
|
* Get the local node record; if this fails, exit. Many operations
|
|
* depend on this being available, so we'll centralize the check
|
|
* and failure messages here.
|
|
*/
|
|
bool
|
|
get_local_node_record(PGconn *conn, int node_id, t_node_info *node_info)
|
|
{
|
|
RecordStatus record_status = get_node_record(conn, node_id, node_info);
|
|
|
|
if (record_status != RECORD_FOUND)
|
|
{
|
|
log_error(_("unable to retrieve record for local node"));
|
|
log_detail(_("local node id is %i"), node_id);
|
|
log_hint(_("check this node was correctly registered"));
|
|
|
|
PQfinish(conn);
|
|
exit(ERR_BAD_CONFIG);
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
|
|
static
|
|
void
|
|
_populate_node_records(PGresult *res, NodeInfoList *node_list)
|
|
{
|
|
int i;
|
|
|
|
clear_node_info_list(node_list);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
return;
|
|
}
|
|
|
|
for (i = 0; i < PQntuples(res); i++)
|
|
{
|
|
NodeInfoListCell *cell;
|
|
|
|
cell = (NodeInfoListCell *) pg_malloc0(sizeof(NodeInfoListCell));
|
|
|
|
cell->node_info = pg_malloc0(sizeof(t_node_info));
|
|
|
|
_populate_node_record(res, cell->node_info, i, true);
|
|
|
|
if (node_list->tail)
|
|
node_list->tail->next = cell;
|
|
else
|
|
node_list->head = cell;
|
|
|
|
node_list->tail = cell;
|
|
node_list->node_count++;
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
|
|
bool
|
|
get_all_node_records(PGconn *conn, NodeInfoList *node_list)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
bool success = true;
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBufferStr(&query,
|
|
" SELECT " REPMGR_NODES_COLUMNS
|
|
" FROM repmgr.nodes n "
|
|
"ORDER BY n.node_id ");
|
|
|
|
log_verbose(LOG_DEBUG, "get_all_node_records():\n%s", query.data);
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
/* this will return an empty list if there was an error executing the query */
|
|
_populate_node_records(res, node_list);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, query.data, _("get_all_node_records(): unable to execute query"));
|
|
success = false;
|
|
}
|
|
|
|
PQclear(res);
|
|
termPQExpBuffer(&query);
|
|
|
|
return success;
|
|
}
|
|
|
|
bool
|
|
get_all_nodes_count(PGconn *conn, int *count)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
bool success = true;
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBufferStr(&query,
|
|
" SELECT count(*) "
|
|
" FROM repmgr.nodes n ");
|
|
|
|
log_verbose(LOG_DEBUG, "get_all_nodes_count():\n%s", query.data);
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, query.data, _("get_all_nodes_count(): unable to execute query"));
|
|
success = false;
|
|
}
|
|
else
|
|
{
|
|
*count = atoi(PQgetvalue(res, 0, 0));
|
|
}
|
|
|
|
PQclear(res);
|
|
termPQExpBuffer(&query);
|
|
|
|
return success;
|
|
}
|
|
|
|
void
|
|
get_downstream_node_records(PGconn *conn, int node_id, NodeInfoList *node_list)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBuffer(&query,
|
|
" SELECT " REPMGR_NODES_COLUMNS
|
|
" FROM repmgr.nodes n "
|
|
" WHERE n.upstream_node_id = %i "
|
|
"ORDER BY n.node_id ",
|
|
node_id);
|
|
|
|
log_verbose(LOG_DEBUG, "get_downstream_node_records():\n%s", query.data);
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, query.data, _("get_downstream_node_records(): unable to execute query"));
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
|
|
/* this will return an empty list if there was an error executing the query */
|
|
_populate_node_records(res, node_list);
|
|
|
|
PQclear(res);
|
|
|
|
return;
|
|
}
|
|
|
|
|
|
void
|
|
get_active_sibling_node_records(PGconn *conn, int node_id, int upstream_node_id, NodeInfoList *node_list)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBuffer(&query,
|
|
" SELECT " REPMGR_NODES_COLUMNS
|
|
" FROM repmgr.nodes n "
|
|
" WHERE n.upstream_node_id = %i "
|
|
" AND n.node_id != %i "
|
|
" AND n.active IS TRUE "
|
|
"ORDER BY n.node_id ",
|
|
upstream_node_id,
|
|
node_id);
|
|
|
|
log_verbose(LOG_DEBUG, "get_active_sibling_node_records():\n%s", query.data);
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, query.data, _("get_active_sibling_records(): unable to execute query"));
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
|
|
/* this will return an empty list if there was an error executing the query */
|
|
_populate_node_records(res, node_list);
|
|
|
|
PQclear(res);
|
|
|
|
return;
|
|
}
|
|
|
|
bool
|
|
get_child_nodes(PGconn *conn, int node_id, NodeInfoList *node_list)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
bool success = true;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBuffer(&query,
|
|
" SELECT n.node_id, n.type, n.upstream_node_id, n.node_name, n.conninfo, n.repluser, "
|
|
" n.slot_name, n.location, n.priority, n.active, n.config_file, "
|
|
" '' AS upstream_node_name, "
|
|
" CASE WHEN sr.application_name IS NULL THEN FALSE ELSE TRUE END AS attached "
|
|
" FROM repmgr.nodes n "
|
|
" LEFT JOIN pg_catalog.pg_stat_replication sr "
|
|
" ON sr.application_name = n.node_name "
|
|
" WHERE n.upstream_node_id = %i ",
|
|
node_id);
|
|
|
|
log_verbose(LOG_DEBUG, "get_child_nodes():\n%s", query.data);
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, query.data, _("get_active_sibling_records(): unable to execute query"));
|
|
success = false;
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
|
|
/* this will return an empty list if there was an error executing the query */
|
|
_populate_node_records(res, node_list);
|
|
|
|
PQclear(res);
|
|
|
|
return success;
|
|
}
|
|
|
|
|
|
void
|
|
get_node_records_by_priority(PGconn *conn, NodeInfoList *node_list)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBufferStr(&query,
|
|
" SELECT " REPMGR_NODES_COLUMNS
|
|
" FROM repmgr.nodes n "
|
|
"ORDER BY n.priority DESC, n.node_name ");
|
|
|
|
log_verbose(LOG_DEBUG, "get_node_records_by_priority():\n%s", query.data);
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, query.data, _("get_node_records_by_priority(): unable to execute query"));
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
|
|
/* this will return an empty list if there was an error executing the query */
|
|
_populate_node_records(res, node_list);
|
|
|
|
PQclear(res);
|
|
|
|
return;
|
|
}
|
|
|
|
/*
|
|
* return all node records together with their upstream's node name,
|
|
* if available.
|
|
*/
|
|
bool
|
|
get_all_node_records_with_upstream(PGconn *conn, NodeInfoList *node_list)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
bool success = true;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBufferStr(&query,
|
|
" SELECT " REPMGR_NODES_COLUMNS_WITH_UPSTREAM
|
|
" FROM repmgr.nodes n "
|
|
" LEFT JOIN repmgr.nodes un "
|
|
" ON un.node_id = n.upstream_node_id"
|
|
" ORDER BY n.node_id ");
|
|
|
|
log_verbose(LOG_DEBUG, "get_all_node_records_with_upstream():\n%s", query.data);
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, query.data, _("get_all_node_records_with_upstream(): unable to retrieve node records"));
|
|
success = false;
|
|
}
|
|
|
|
|
|
/* this will return an empty list if there was an error executing the query */
|
|
_populate_node_records(res, node_list);
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return success;
|
|
}
|
|
|
|
|
|
|
|
bool
|
|
get_downstream_nodes_with_missing_slot(PGconn *conn, int this_node_id, NodeInfoList *node_list)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
bool success = true;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBuffer(&query,
|
|
" SELECT " REPMGR_NODES_COLUMNS
|
|
" FROM repmgr.nodes n "
|
|
"LEFT JOIN pg_catalog.pg_replication_slots rs "
|
|
" ON rs.slot_name = n.slot_name "
|
|
" WHERE n.slot_name IS NOT NULL"
|
|
" AND rs.slot_name IS NULL "
|
|
" AND n.upstream_node_id = %i "
|
|
" AND n.type = 'standby'",
|
|
this_node_id);
|
|
|
|
log_verbose(LOG_DEBUG, "get_all_node_records_with_missing_slot():\n%s", query.data);
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, query.data, _("get_downstream_nodes_with_missing_slot(): unable to retrieve node records"));
|
|
success = false;
|
|
}
|
|
|
|
/* this will return an empty list if there was an error executing the query */
|
|
_populate_node_records(res, node_list);
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return success;
|
|
}
|
|
|
|
bool
|
|
create_node_record(PGconn *conn, char *repmgr_action, t_node_info *node_info)
|
|
{
|
|
if (repmgr_action != NULL)
|
|
log_verbose(LOG_DEBUG, "create_node_record(): action is \"%s\"", repmgr_action);
|
|
|
|
return _create_update_node_record(conn, "create", node_info);
|
|
}
|
|
|
|
|
|
bool
|
|
update_node_record(PGconn *conn, char *repmgr_action, t_node_info *node_info)
|
|
{
|
|
if (repmgr_action != NULL)
|
|
log_verbose(LOG_DEBUG, "update_node_record(): action is \"%s\"", repmgr_action);
|
|
|
|
return _create_update_node_record(conn, "update", node_info);
|
|
}
|
|
|
|
|
|
static bool
|
|
_create_update_node_record(PGconn *conn, char *action, t_node_info *node_info)
|
|
{
|
|
PQExpBufferData query;
|
|
char node_id[MAXLEN] = "";
|
|
char priority[MAXLEN] = "";
|
|
|
|
char upstream_node_id[MAXLEN] = "";
|
|
char *upstream_node_id_ptr = NULL;
|
|
|
|
char *slot_name_ptr = NULL;
|
|
|
|
int param_count = NODE_RECORD_PARAM_COUNT;
|
|
const char *param_values[NODE_RECORD_PARAM_COUNT];
|
|
|
|
PGresult *res;
|
|
bool success = true;
|
|
|
|
maxlen_snprintf(node_id, "%i", node_info->node_id);
|
|
maxlen_snprintf(priority, "%i", node_info->priority);
|
|
|
|
if (node_info->upstream_node_id == NO_UPSTREAM_NODE && node_info->type == STANDBY)
|
|
{
|
|
/*
|
|
* No explicit upstream node id provided for standby - attempt to get
|
|
* primary node id
|
|
*/
|
|
int primary_node_id = get_primary_node_id(conn);
|
|
|
|
maxlen_snprintf(upstream_node_id, "%i", primary_node_id);
|
|
upstream_node_id_ptr = upstream_node_id;
|
|
}
|
|
else if (node_info->upstream_node_id != NO_UPSTREAM_NODE)
|
|
{
|
|
maxlen_snprintf(upstream_node_id, "%i", node_info->upstream_node_id);
|
|
upstream_node_id_ptr = upstream_node_id;
|
|
}
|
|
|
|
if (node_info->slot_name[0] != '\0')
|
|
{
|
|
slot_name_ptr = node_info->slot_name;
|
|
}
|
|
|
|
|
|
param_values[0] = get_node_type_string(node_info->type);
|
|
param_values[1] = upstream_node_id_ptr;
|
|
param_values[2] = node_info->node_name;
|
|
param_values[3] = node_info->conninfo;
|
|
param_values[4] = node_info->repluser;
|
|
param_values[5] = slot_name_ptr;
|
|
param_values[6] = node_info->location;
|
|
param_values[7] = priority;
|
|
param_values[8] = node_info->active == true ? "TRUE" : "FALSE";
|
|
param_values[9] = node_info->config_file;
|
|
param_values[10] = node_id;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
if (strcmp(action, "create") == 0)
|
|
{
|
|
appendPQExpBufferStr(&query,
|
|
"INSERT INTO repmgr.nodes "
|
|
" (node_id, type, upstream_node_id, "
|
|
" node_name, conninfo, repluser, slot_name, "
|
|
" location, priority, active, config_file) "
|
|
"VALUES ($11, $1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ");
|
|
}
|
|
else
|
|
{
|
|
appendPQExpBufferStr(&query,
|
|
"UPDATE repmgr.nodes SET "
|
|
" type = $1, "
|
|
" upstream_node_id = $2, "
|
|
" node_name = $3, "
|
|
" conninfo = $4, "
|
|
" repluser = $5, "
|
|
" slot_name = $6, "
|
|
" location = $7, "
|
|
" priority = $8, "
|
|
" active = $9, "
|
|
" config_file = $10 "
|
|
" WHERE node_id = $11 ");
|
|
}
|
|
|
|
res = PQexecParams(conn,
|
|
query.data,
|
|
param_count,
|
|
NULL,
|
|
param_values,
|
|
NULL,
|
|
NULL,
|
|
0);
|
|
|
|
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
|
{
|
|
log_db_error(conn, query.data,
|
|
_("_create_update_node_record(): unable to %s node record for node \"%s\" (ID: %i)"),
|
|
action,
|
|
node_info->node_name,
|
|
node_info->node_id);
|
|
|
|
success = false;
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return success;
|
|
}
|
|
|
|
|
|
bool
|
|
update_node_record_set_active(PGconn *conn, int this_node_id, bool active)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
bool success = true;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBuffer(&query,
|
|
"UPDATE repmgr.nodes SET active = %s "
|
|
" WHERE node_id = %i",
|
|
active == true ? "TRUE" : "FALSE",
|
|
this_node_id);
|
|
|
|
log_verbose(LOG_DEBUG, "update_node_record_set_active():\n %s", query.data);
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
|
{
|
|
log_db_error(conn, query.data,
|
|
_("update_node_record_set_active(): unable to update node record"));
|
|
success = false;
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return success;
|
|
}
|
|
|
|
|
|
bool
|
|
update_node_record_set_active_standby(PGconn *conn, int this_node_id)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
bool success = true;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBuffer(&query,
|
|
"UPDATE repmgr.nodes "
|
|
" SET type = 'standby', "
|
|
" active = TRUE "
|
|
" WHERE node_id = %i",
|
|
this_node_id);
|
|
|
|
log_verbose(LOG_DEBUG, "update_node_record_set_active_standby():\n %s", query.data);
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
|
{
|
|
log_db_error(conn, query.data, _("update_node_record_set_active_standby(): unable to update node record"));
|
|
success = false;
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return success;
|
|
}
|
|
|
|
|
|
bool
|
|
update_node_record_set_primary(PGconn *conn, int this_node_id)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
|
|
log_debug(_("setting node %i as primary and marking existing primary as failed"),
|
|
this_node_id);
|
|
|
|
begin_transaction(conn);
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBuffer(&query,
|
|
" UPDATE repmgr.nodes "
|
|
" SET active = FALSE "
|
|
" WHERE type = 'primary' "
|
|
" AND active IS TRUE "
|
|
" AND node_id != %i ",
|
|
this_node_id);
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
|
{
|
|
log_db_error(conn, query.data,
|
|
_("update_node_record_set_primary(): unable to set old primary node as inactive"));
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
rollback_transaction(conn);
|
|
|
|
return false;
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBuffer(&query,
|
|
" UPDATE repmgr.nodes"
|
|
" SET type = 'primary', "
|
|
" upstream_node_id = NULL, "
|
|
" active = TRUE "
|
|
" WHERE node_id = %i ",
|
|
this_node_id);
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
|
{
|
|
log_db_error(conn, query.data,
|
|
_("unable to set current node %i as active primary"),
|
|
this_node_id);
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
rollback_transaction(conn);
|
|
|
|
return false;
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return commit_transaction(conn);
|
|
}
|
|
|
|
|
|
bool
|
|
update_node_record_set_upstream(PGconn *conn, int this_node_id, int new_upstream_node_id)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
bool success = true;
|
|
|
|
log_debug(_("update_node_record_set_upstream(): Updating node %i's upstream node to %i"),
|
|
this_node_id, new_upstream_node_id);
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBuffer(&query,
|
|
" UPDATE repmgr.nodes "
|
|
" SET upstream_node_id = %i "
|
|
" WHERE node_id = %i ",
|
|
new_upstream_node_id,
|
|
this_node_id);
|
|
|
|
log_verbose(LOG_DEBUG, "update_node_record_set_upstream():\n%s", query.data);
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
|
{
|
|
log_db_error(conn, query.data, _("update_node_record_set_upstream(): unable to set new upstream node id"));
|
|
|
|
success = false;
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return success;
|
|
}
|
|
|
|
|
|
/*
|
|
* Update node record following change of status
|
|
* (e.g. inactive primary converted to standby)
|
|
*/
|
|
bool
|
|
update_node_record_status(PGconn *conn, int this_node_id, char *type, int upstream_node_id, bool active)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
bool success = true;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBuffer(&query,
|
|
" UPDATE repmgr.nodes "
|
|
" SET type = '%s', "
|
|
" upstream_node_id = %i, "
|
|
" active = %s "
|
|
" WHERE node_id = %i ",
|
|
type,
|
|
upstream_node_id,
|
|
active ? "TRUE" : "FALSE",
|
|
this_node_id);
|
|
|
|
log_verbose(LOG_DEBUG, "update_node_record_status():\n %s", query.data);
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
|
{
|
|
log_db_error(conn, query.data,
|
|
_("update_node_record_status(): unable to update node record status for node %i"),
|
|
this_node_id);
|
|
|
|
success = false;
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return success;
|
|
}
|
|
|
|
|
|
/*
|
|
* Update node record's "conninfo" and "priority" fields. Called by repmgrd
|
|
* following a configuration file reload.
|
|
*/
|
|
bool
|
|
update_node_record_conn_priority(PGconn *conn, t_configuration_options *options)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
bool success = true;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBuffer(&query,
|
|
"UPDATE repmgr.nodes "
|
|
" SET conninfo = '%s', "
|
|
" priority = %d "
|
|
" WHERE node_id = %d ",
|
|
options->conninfo,
|
|
options->priority,
|
|
options->node_id);
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
|
{
|
|
log_db_error(conn, query.data, _("update_node_record_conn_priority(): unable to execute query"));
|
|
success = false;
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
|
|
PQclear(res);
|
|
|
|
return success;
|
|
}
|
|
|
|
|
|
/*
|
|
* Copy node records from primary to witness servers.
|
|
*
|
|
* This is used when initially registering a witness server, and
|
|
* by repmgrd to update the node records when required.
|
|
*/
|
|
|
|
bool
|
|
witness_copy_node_records(PGconn *primary_conn, PGconn *witness_conn)
|
|
{
|
|
PGresult *res = NULL;
|
|
NodeInfoList nodes = T_NODE_INFO_LIST_INITIALIZER;
|
|
NodeInfoListCell *cell = NULL;
|
|
|
|
begin_transaction(witness_conn);
|
|
|
|
/* Defer constraints */
|
|
|
|
res = PQexec(witness_conn, "SET CONSTRAINTS ALL DEFERRED");
|
|
|
|
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
|
|
{
|
|
log_db_error(witness_conn, NULL, ("witness_copy_node_records(): unable to defer constraints"));
|
|
|
|
rollback_transaction(witness_conn);
|
|
PQclear(res);
|
|
|
|
return false;
|
|
}
|
|
|
|
PQclear(res);
|
|
|
|
/* truncate existing records */
|
|
|
|
if (truncate_node_records(witness_conn) == false)
|
|
{
|
|
rollback_transaction(witness_conn);
|
|
|
|
return false;
|
|
}
|
|
|
|
if (get_all_node_records(primary_conn, &nodes) == false)
|
|
{
|
|
rollback_transaction(witness_conn);
|
|
|
|
return false;
|
|
}
|
|
|
|
for (cell = nodes.head; cell; cell = cell->next)
|
|
{
|
|
if (create_node_record(witness_conn, NULL, cell->node_info) == false)
|
|
{
|
|
rollback_transaction(witness_conn);
|
|
|
|
return false;
|
|
}
|
|
}
|
|
|
|
/* and done */
|
|
commit_transaction(witness_conn);
|
|
|
|
clear_node_info_list(&nodes);
|
|
|
|
return true;
|
|
}
|
|
|
|
|
|
bool
|
|
delete_node_record(PGconn *conn, int node)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
bool success = true;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBuffer(&query,
|
|
"DELETE FROM repmgr.nodes "
|
|
" WHERE node_id = %i",
|
|
node);
|
|
|
|
log_verbose(LOG_DEBUG, "delete_node_record():\n %s", query.data);
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
|
{
|
|
log_db_error(conn, query.data, _("delete_node_record(): unable to delete node record"));
|
|
|
|
success = false;
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return success;
|
|
}
|
|
|
|
bool
|
|
truncate_node_records(PGconn *conn)
|
|
{
|
|
PGresult *res = NULL;
|
|
bool success = true;
|
|
|
|
res = PQexec(conn, "TRUNCATE TABLE repmgr.nodes");
|
|
|
|
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
|
{
|
|
log_db_error(conn, NULL, _("truncate_node_records(): unable to truncate table \"repmgr.nodes\""));
|
|
|
|
success = false;
|
|
}
|
|
|
|
PQclear(res);
|
|
|
|
return success;
|
|
}
|
|
|
|
|
|
bool
|
|
update_node_record_slot_name(PGconn *primary_conn, int node_id, char *slot_name)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
bool success = true;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBuffer(&query,
|
|
" UPDATE repmgr.nodes "
|
|
" SET slot_name = '%s' "
|
|
" WHERE node_id = %i ",
|
|
slot_name,
|
|
node_id);
|
|
|
|
res = PQexec(primary_conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
|
{
|
|
log_db_error(primary_conn, query.data, _("unable to set node record slot name"));
|
|
|
|
success = false;
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return success;
|
|
}
|
|
|
|
|
|
|
|
|
|
void
|
|
clear_node_info_list(NodeInfoList *nodes)
|
|
{
|
|
NodeInfoListCell *cell = NULL;
|
|
NodeInfoListCell *next_cell = NULL;
|
|
|
|
log_verbose(LOG_DEBUG, "clear_node_info_list() - closing open connections");
|
|
|
|
/* close any open connections */
|
|
for (cell = nodes->head; cell; cell = cell->next)
|
|
{
|
|
|
|
if (PQstatus(cell->node_info->conn) == CONNECTION_OK)
|
|
{
|
|
PQfinish(cell->node_info->conn);
|
|
cell->node_info->conn = NULL;
|
|
}
|
|
}
|
|
|
|
log_verbose(LOG_DEBUG, "clear_node_info_list() - unlinking");
|
|
|
|
cell = nodes->head;
|
|
|
|
while (cell != NULL)
|
|
{
|
|
next_cell = cell->next;
|
|
|
|
if (cell->node_info->replication_info != NULL)
|
|
pfree(cell->node_info->replication_info);
|
|
|
|
pfree(cell->node_info);
|
|
pfree(cell);
|
|
cell = next_cell;
|
|
}
|
|
|
|
nodes->head = NULL;
|
|
nodes->tail = NULL;
|
|
nodes->node_count = 0;
|
|
}
|
|
|
|
|
|
/* ================================================ */
|
|
/* PostgreSQL configuration file location functions */
|
|
/* ================================================ */
|
|
|
|
bool
|
|
get_datadir_configuration_files(PGconn *conn, KeyValueList *list)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
int i;
|
|
bool success = true;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBufferStr(&query,
|
|
"WITH files AS ( "
|
|
" WITH dd AS ( "
|
|
" SELECT setting "
|
|
" FROM pg_catalog.pg_settings "
|
|
" WHERE name = 'data_directory') "
|
|
" SELECT distinct(sourcefile) AS config_file"
|
|
" FROM dd, pg_catalog.pg_settings ps "
|
|
" WHERE ps.sourcefile IS NOT NULL "
|
|
" AND ps.sourcefile ~ ('^' || dd.setting) "
|
|
" UNION "
|
|
" SELECT ps.setting AS config_file"
|
|
" FROM dd, pg_catalog.pg_settings ps "
|
|
" WHERE ps.name IN ('config_file', 'hba_file', 'ident_file') "
|
|
" AND ps.setting ~ ('^' || dd.setting) "
|
|
") "
|
|
" SELECT config_file, "
|
|
" pg_catalog.regexp_replace(config_file, '^.*\\/','') AS filename "
|
|
" FROM files "
|
|
"ORDER BY config_file");
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, query.data,
|
|
_("get_datadir_configuration_files(): unable to retrieve configuration file information"));
|
|
|
|
success = false;
|
|
}
|
|
else
|
|
{
|
|
for (i = 0; i < PQntuples(res); i++)
|
|
{
|
|
key_value_list_set(list,
|
|
PQgetvalue(res, i, 1),
|
|
PQgetvalue(res, i, 0));
|
|
}
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return success;
|
|
}
|
|
|
|
|
|
bool
|
|
get_configuration_file_locations(PGconn *conn, t_configfile_list *list)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
int i;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBufferStr(&query,
|
|
" WITH dd AS ( "
|
|
" SELECT setting AS data_directory"
|
|
" FROM pg_catalog.pg_settings "
|
|
" WHERE name = 'data_directory' "
|
|
" ) "
|
|
" SELECT DISTINCT(sourcefile), "
|
|
" pg_catalog.regexp_replace(sourcefile, '^.*\\/', '') AS filename, "
|
|
" sourcefile ~ ('^' || dd.data_directory) AS in_data_dir "
|
|
" FROM dd, pg_catalog.pg_settings ps "
|
|
" WHERE sourcefile IS NOT NULL "
|
|
" ORDER BY 1 ");
|
|
|
|
log_verbose(LOG_DEBUG, "get_configuration_file_locations():\n %s",
|
|
query.data);
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, query.data,
|
|
_("get_configuration_file_locations(): unable to retrieve configuration file locations"));
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return false;
|
|
}
|
|
|
|
/*
|
|
* allocate memory for config file array - number of rows returned from
|
|
* above query + 2 for pg_hba.conf, pg_ident.conf
|
|
*/
|
|
|
|
config_file_list_init(list, PQntuples(res) + 2);
|
|
|
|
for (i = 0; i < PQntuples(res); i++)
|
|
{
|
|
config_file_list_add(list,
|
|
PQgetvalue(res, i, 0),
|
|
PQgetvalue(res, i, 1),
|
|
atobool(PQgetvalue(res, i, 2)));
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
/* Fetch locations of pg_hba.conf and pg_ident.conf */
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBufferStr(&query,
|
|
" WITH dd AS ( "
|
|
" SELECT setting AS data_directory"
|
|
" FROM pg_catalog.pg_settings "
|
|
" WHERE name = 'data_directory' "
|
|
" ) "
|
|
" SELECT ps.setting, "
|
|
" pg_catalog.regexp_replace(setting, '^.*\\/', '') AS filename, "
|
|
" ps.setting ~ ('^' || dd.data_directory) AS in_data_dir "
|
|
" FROM dd, pg_catalog.pg_settings ps "
|
|
" WHERE ps.name IN ('hba_file', 'ident_file') "
|
|
" ORDER BY 1 ");
|
|
|
|
log_verbose(LOG_DEBUG, "get_configuration_file_locations():\n %s",
|
|
query.data);
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, query.data,
|
|
_("get_configuration_file_locations(): unable to retrieve configuration file locations"));
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return false;
|
|
}
|
|
|
|
for (i = 0; i < PQntuples(res); i++)
|
|
{
|
|
config_file_list_add(list,
|
|
PQgetvalue(res, i, 0),
|
|
PQgetvalue(res, i, 1),
|
|
atobool(PQgetvalue(res, i, 2)));
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return true;
|
|
}
|
|
|
|
|
|
void
|
|
config_file_list_init(t_configfile_list *list, int max_size)
|
|
{
|
|
list->size = max_size;
|
|
list->entries = 0;
|
|
list->files = pg_malloc0(sizeof(t_configfile_info *) * max_size);
|
|
|
|
if (list->files == NULL)
|
|
{
|
|
log_error(_("config_file_list_init(): unable to allocate memory; terminating"));
|
|
exit(ERR_OUT_OF_MEMORY);
|
|
}
|
|
}
|
|
|
|
|
|
void
|
|
config_file_list_add(t_configfile_list *list, const char *file, const char *filename, bool in_data_dir)
|
|
{
|
|
/* Failsafe to prevent entries being added beyond the end */
|
|
if (list->entries == list->size)
|
|
return;
|
|
|
|
list->files[list->entries] = pg_malloc0(sizeof(t_configfile_info));
|
|
|
|
if (list->files[list->entries] == NULL)
|
|
{
|
|
log_error(_("config_file_list_add(): unable to allocate memory; terminating"));
|
|
exit(ERR_OUT_OF_MEMORY);
|
|
}
|
|
|
|
|
|
snprintf(list->files[list->entries]->filepath,
|
|
sizeof(list->files[list->entries]->filepath),
|
|
"%s", file);
|
|
canonicalize_path(list->files[list->entries]->filepath);
|
|
|
|
snprintf(list->files[list->entries]->filename,
|
|
sizeof(list->files[list->entries]->filename),
|
|
"%s", filename);
|
|
|
|
list->files[list->entries]->in_data_directory = in_data_dir;
|
|
|
|
list->entries++;
|
|
}
|
|
|
|
|
|
/* ====================== */
|
|
/* event record functions */
|
|
/* ====================== */
|
|
|
|
|
|
/*
|
|
* create_event_record()
|
|
*
|
|
* Create a record in the "events" table, but don't execute the
|
|
* "event_notification_command".
|
|
*/
|
|
|
|
bool
|
|
create_event_record(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details)
|
|
{
|
|
/* create dummy t_event_info */
|
|
t_event_info event_info = T_EVENT_INFO_INITIALIZER;
|
|
|
|
return _create_event(conn, options, node_id, event, successful, details, &event_info, false);
|
|
}
|
|
|
|
|
|
/*
|
|
* create_event_notification()
|
|
*
|
|
* If `conn` is not NULL, insert a record into the events table.
|
|
*
|
|
* If configuration parameter "event_notification_command" is set, also
|
|
* attempt to execute that command.
|
|
*
|
|
* Returns true if all operations succeeded, false if one or more failed.
|
|
*
|
|
* Note this function may be called with "conn" set to NULL in cases where
|
|
* the primary node is not available and it's therefore not possible to write
|
|
* an event record. In this case, if `event_notification_command` is set, a
|
|
* user-defined notification to be generated; if not, this function will have
|
|
* no effect.
|
|
*/
|
|
bool
|
|
create_event_notification(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details)
|
|
{
|
|
/* create dummy t_event_info */
|
|
t_event_info event_info = T_EVENT_INFO_INITIALIZER;
|
|
|
|
return _create_event(conn, options, node_id, event, successful, details, &event_info, true);
|
|
}
|
|
|
|
|
|
/*
|
|
* create_event_notification_extended()
|
|
*
|
|
* The caller may need to pass additional parameters to the event notification
|
|
* command (currently only the conninfo string of another node)
|
|
|
|
*/
|
|
bool
|
|
create_event_notification_extended(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details, t_event_info *event_info)
|
|
{
|
|
return _create_event(conn, options, node_id, event, successful, details, event_info, true);
|
|
}
|
|
|
|
|
|
static bool
|
|
_create_event(PGconn *conn, t_configuration_options *options, int node_id, char *event, bool successful, char *details, t_event_info *event_info, bool send_notification)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
char event_timestamp[MAXLEN] = "";
|
|
bool success = true;
|
|
|
|
log_verbose(LOG_DEBUG, "_create_event(): event is \"%s\" for node %i", event, node_id);
|
|
|
|
/*
|
|
* Only attempt to write a record if a connection handle was provided,
|
|
* and the connection handle points to a node which is not in recovery.
|
|
*/
|
|
if (conn != NULL && PQstatus(conn) == CONNECTION_OK && get_recovery_type(conn) == RECTYPE_PRIMARY)
|
|
{
|
|
int n_node_id = htonl(node_id);
|
|
char *t_successful = successful ? "TRUE" : "FALSE";
|
|
|
|
const char *values[4] = {(char *) &n_node_id,
|
|
event,
|
|
t_successful,
|
|
details
|
|
};
|
|
|
|
int lengths[4] = {sizeof(n_node_id),
|
|
0,
|
|
0,
|
|
0
|
|
};
|
|
|
|
int binary[4] = {1, 0, 0, 0};
|
|
|
|
initPQExpBuffer(&query);
|
|
appendPQExpBufferStr(&query,
|
|
" INSERT INTO repmgr.events ( "
|
|
" node_id, "
|
|
" event, "
|
|
" successful, "
|
|
" details "
|
|
" ) "
|
|
" VALUES ($1, $2, $3, $4) "
|
|
" RETURNING event_timestamp ");
|
|
|
|
log_verbose(LOG_DEBUG, "_create_event():\n %s", query.data);
|
|
|
|
res = PQexecParams(conn,
|
|
query.data,
|
|
4,
|
|
NULL,
|
|
values,
|
|
lengths,
|
|
binary,
|
|
0);
|
|
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
/* we don't treat this as a fatal error */
|
|
log_warning(_("unable to create event record"));
|
|
log_detail("%s", PQerrorMessage(conn));
|
|
log_detail("%s", query.data);
|
|
|
|
success = false;
|
|
}
|
|
else
|
|
{
|
|
/* Store timestamp to send to the notification command */
|
|
snprintf(event_timestamp, MAXLEN, "%s", PQgetvalue(res, 0, 0));
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
}
|
|
|
|
/*
|
|
* If no database connection provided, or the query failed, generate a
|
|
* current timestamp ourselves. This isn't quite the same format as
|
|
* PostgreSQL, but is close enough for diagnostic use.
|
|
*/
|
|
if (!strlen(event_timestamp))
|
|
{
|
|
time_t now;
|
|
struct tm ts;
|
|
|
|
time(&now);
|
|
ts = *localtime(&now);
|
|
strftime(event_timestamp, MAXLEN, "%Y-%m-%d %H:%M:%S%z", &ts);
|
|
}
|
|
|
|
log_verbose(LOG_DEBUG, "_create_event(): Event timestamp is \"%s\"", event_timestamp);
|
|
|
|
/* an event notification command was provided - parse and execute it */
|
|
if (send_notification == true && strlen(options->event_notification_command))
|
|
{
|
|
char parsed_command[MAXPGPATH] = "";
|
|
const char *src_ptr = NULL;
|
|
char *dst_ptr = NULL;
|
|
char *end_ptr = NULL;
|
|
int r = 0;
|
|
|
|
log_verbose(LOG_DEBUG, "_create_event(): command is '%s'", options->event_notification_command);
|
|
/*
|
|
* If configuration option 'event_notifications' was provided, check
|
|
* if this event is one of the ones listed; if not listed, don't
|
|
* execute the notification script.
|
|
*
|
|
* (If 'event_notifications' was not provided, we assume the script
|
|
* should be executed for all events).
|
|
*/
|
|
if (options->event_notifications.head != NULL)
|
|
{
|
|
EventNotificationListCell *cell = NULL;
|
|
bool notify_ok = false;
|
|
|
|
for (cell = options->event_notifications.head; cell; cell = cell->next)
|
|
{
|
|
if (strcmp(event, cell->event_type) == 0)
|
|
{
|
|
notify_ok = true;
|
|
break;
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Event type not found in the 'event_notifications' list - return
|
|
* early
|
|
*/
|
|
if (notify_ok == false)
|
|
{
|
|
log_debug(_("not executing notification script for event type \"%s\""), event);
|
|
return success;
|
|
}
|
|
}
|
|
|
|
dst_ptr = parsed_command;
|
|
end_ptr = parsed_command + MAXPGPATH - 1;
|
|
*end_ptr = '\0';
|
|
|
|
for (src_ptr = options->event_notification_command; *src_ptr; src_ptr++)
|
|
{
|
|
if (*src_ptr == '%')
|
|
{
|
|
switch (src_ptr[1])
|
|
{
|
|
case '%':
|
|
/* %%: replace with % */
|
|
if (dst_ptr < end_ptr)
|
|
{
|
|
src_ptr++;
|
|
*dst_ptr++ = *src_ptr;
|
|
}
|
|
break;
|
|
case 'n':
|
|
/* %n: node id */
|
|
src_ptr++;
|
|
snprintf(dst_ptr, end_ptr - dst_ptr, "%i", node_id);
|
|
dst_ptr += strlen(dst_ptr);
|
|
break;
|
|
case 'a':
|
|
/* %a: node name */
|
|
src_ptr++;
|
|
if (event_info->node_name != NULL)
|
|
{
|
|
log_verbose(LOG_DEBUG, "node_name: %s", event_info->node_name);
|
|
strlcpy(dst_ptr, event_info->node_name, end_ptr - dst_ptr);
|
|
dst_ptr += strlen(dst_ptr);
|
|
}
|
|
break;
|
|
case 'e':
|
|
/* %e: event type */
|
|
src_ptr++;
|
|
strlcpy(dst_ptr, event, end_ptr - dst_ptr);
|
|
dst_ptr += strlen(dst_ptr);
|
|
break;
|
|
case 'd':
|
|
/* %d: details */
|
|
src_ptr++;
|
|
if (details != NULL)
|
|
{
|
|
PQExpBufferData details_escaped;
|
|
initPQExpBuffer(&details_escaped);
|
|
|
|
escape_double_quotes(details, &details_escaped);
|
|
|
|
strlcpy(dst_ptr, details_escaped.data, end_ptr - dst_ptr);
|
|
dst_ptr += strlen(dst_ptr);
|
|
termPQExpBuffer(&details_escaped);
|
|
}
|
|
break;
|
|
case 's':
|
|
/* %s: successful */
|
|
src_ptr++;
|
|
strlcpy(dst_ptr, successful ? "1" : "0", end_ptr - dst_ptr);
|
|
dst_ptr += strlen(dst_ptr);
|
|
break;
|
|
case 't':
|
|
/* %t: timestamp */
|
|
src_ptr++;
|
|
strlcpy(dst_ptr, event_timestamp, end_ptr - dst_ptr);
|
|
dst_ptr += strlen(dst_ptr);
|
|
break;
|
|
case 'c':
|
|
/* %c: conninfo for next available node */
|
|
src_ptr++;
|
|
if (event_info->conninfo_str != NULL)
|
|
{
|
|
log_debug("conninfo: %s", event_info->conninfo_str);
|
|
|
|
strlcpy(dst_ptr, event_info->conninfo_str, end_ptr - dst_ptr);
|
|
dst_ptr += strlen(dst_ptr);
|
|
}
|
|
break;
|
|
case 'p':
|
|
/* %p: primary id ("standby_switchover"/"repmgrd_failover_promote": former primary id) */
|
|
src_ptr++;
|
|
if (event_info->node_id != UNKNOWN_NODE_ID)
|
|
{
|
|
PQExpBufferData node_id;
|
|
initPQExpBuffer(&node_id);
|
|
appendPQExpBuffer(&node_id,
|
|
"%i", event_info->node_id);
|
|
strlcpy(dst_ptr, node_id.data, end_ptr - dst_ptr);
|
|
dst_ptr += strlen(dst_ptr);
|
|
termPQExpBuffer(&node_id);
|
|
}
|
|
break;
|
|
default:
|
|
/* otherwise treat the % as not special */
|
|
if (dst_ptr < end_ptr)
|
|
*dst_ptr++ = *src_ptr;
|
|
break;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if (dst_ptr < end_ptr)
|
|
*dst_ptr++ = *src_ptr;
|
|
}
|
|
}
|
|
|
|
*dst_ptr = '\0';
|
|
|
|
log_info(_("executing notification command for event \"%s\""),
|
|
event);
|
|
|
|
log_detail(_("command is:\n %s"), parsed_command);
|
|
r = system(parsed_command);
|
|
if (r != 0)
|
|
{
|
|
log_warning(_("unable to execute event notification command"));
|
|
log_detail(_("parsed event notification command was:\n %s"), parsed_command);
|
|
success = false;
|
|
}
|
|
}
|
|
|
|
return success;
|
|
}
|
|
|
|
|
|
PGresult *
|
|
get_event_records(PGconn *conn, int node_id, const char *node_name, const char *event, bool all, int limit)
|
|
{
|
|
PGresult *res;
|
|
|
|
PQExpBufferData query;
|
|
PQExpBufferData where_clause;
|
|
|
|
|
|
initPQExpBuffer(&query);
|
|
initPQExpBuffer(&where_clause);
|
|
|
|
/* LEFT JOIN used here as a node record may have been removed */
|
|
appendPQExpBufferStr(&query,
|
|
" SELECT e.node_id, n.node_name, e.event, e.successful, "
|
|
" pg_catalog.to_char(e.event_timestamp, 'YYYY-MM-DD HH24:MI:SS') AS timestamp, "
|
|
" e.details "
|
|
" FROM repmgr.events e "
|
|
"LEFT JOIN repmgr.nodes n ON e.node_id = n.node_id ");
|
|
|
|
if (node_id != UNKNOWN_NODE_ID)
|
|
{
|
|
append_where_clause(&where_clause,
|
|
"n.node_id=%i", node_id);
|
|
}
|
|
else if (node_name[0] != '\0')
|
|
{
|
|
char *escaped = escape_string(conn, node_name);
|
|
|
|
if (escaped == NULL)
|
|
{
|
|
log_error(_("unable to escape value provided for node name"));
|
|
log_detail(_("node name is: \"%s\""), node_name);
|
|
}
|
|
else
|
|
{
|
|
append_where_clause(&where_clause,
|
|
"n.node_name='%s'",
|
|
escaped);
|
|
pfree(escaped);
|
|
}
|
|
}
|
|
|
|
if (event[0] != '\0')
|
|
{
|
|
char *escaped = escape_string(conn, event);
|
|
|
|
if (escaped == NULL)
|
|
{
|
|
log_error(_("unable to escape value provided for event"));
|
|
log_detail(_("event is: \"%s\""), event);
|
|
}
|
|
else
|
|
{
|
|
append_where_clause(&where_clause,
|
|
"e.event='%s'",
|
|
escaped);
|
|
pfree(escaped);
|
|
}
|
|
}
|
|
|
|
appendPQExpBuffer(&query, "\n%s\n",
|
|
where_clause.data);
|
|
|
|
appendPQExpBufferStr(&query,
|
|
" ORDER BY e.event_timestamp DESC");
|
|
|
|
if (all == false && limit > 0)
|
|
{
|
|
appendPQExpBuffer(&query, " LIMIT %i",
|
|
limit);
|
|
}
|
|
|
|
log_debug("do_cluster_event():\n%s", query.data);
|
|
res = PQexec(conn, query.data);
|
|
|
|
termPQExpBuffer(&query);
|
|
termPQExpBuffer(&where_clause);
|
|
|
|
return res;
|
|
}
|
|
|
|
|
|
/* ========================== */
|
|
/* replication slot functions */
|
|
/* ========================== */
|
|
|
|
|
|
void
|
|
create_slot_name(char *slot_name, int node_id)
|
|
{
|
|
maxlen_snprintf(slot_name, "repmgr_slot_%i", node_id);
|
|
}
|
|
|
|
|
|
static ReplSlotStatus
|
|
_verify_replication_slot(PGconn *conn, char *slot_name, PQExpBufferData *error_msg)
|
|
{
|
|
RecordStatus record_status = RECORD_NOT_FOUND;
|
|
t_replication_slot slot_info = T_REPLICATION_SLOT_INITIALIZER;
|
|
|
|
/*
|
|
* Check whether slot exists already; if it exists and is active, that
|
|
* means another active standby is using it, which creates an error
|
|
* situation; if not we can reuse it as-is.
|
|
*/
|
|
record_status = get_slot_record(conn, slot_name, &slot_info);
|
|
|
|
if (record_status == RECORD_FOUND)
|
|
{
|
|
if (strcmp(slot_info.slot_type, "physical") != 0)
|
|
{
|
|
if (error_msg)
|
|
appendPQExpBuffer(error_msg,
|
|
_("slot \"%s\" exists and is not a physical slot\n"),
|
|
slot_name);
|
|
return SLOT_NOT_PHYSICAL;
|
|
}
|
|
|
|
if (slot_info.active == false)
|
|
{
|
|
log_debug("replication slot \"%s\" exists but is inactive; reusing",
|
|
slot_name);
|
|
|
|
return SLOT_INACTIVE;
|
|
}
|
|
|
|
if (error_msg)
|
|
appendPQExpBuffer(error_msg,
|
|
_("slot \"%s\" already exists as an active slot\n"),
|
|
slot_name);
|
|
return SLOT_ACTIVE;
|
|
}
|
|
|
|
return SLOT_NOT_FOUND;
|
|
}
|
|
|
|
|
|
bool
|
|
create_replication_slot_replprot(PGconn *conn, PGconn *repl_conn, char *slot_name, PQExpBufferData *error_msg)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
bool success = true;
|
|
|
|
ReplSlotStatus slot_status = _verify_replication_slot(conn, slot_name, error_msg);
|
|
|
|
/* Replication slot is unusable */
|
|
if (slot_status == SLOT_NOT_PHYSICAL || slot_status == SLOT_ACTIVE)
|
|
return false;
|
|
|
|
/* Replication slot can be reused */
|
|
if (slot_status == SLOT_INACTIVE)
|
|
return true;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBuffer(&query,
|
|
"CREATE_REPLICATION_SLOT %s PHYSICAL",
|
|
slot_name);
|
|
|
|
/* In 9.6 and later, reserve the LSN straight away */
|
|
if (PQserverVersion(conn) >= 90600)
|
|
{
|
|
appendPQExpBufferStr(&query,
|
|
" RESERVE_WAL");
|
|
}
|
|
|
|
appendPQExpBufferChar(&query, ';');
|
|
|
|
res = PQexec(repl_conn, query.data);
|
|
|
|
|
|
if ((PQresultStatus(res) != PGRES_TUPLES_OK || !PQntuples(res)) && error_msg != NULL)
|
|
{
|
|
appendPQExpBuffer(error_msg,
|
|
_("unable to create replication slot \"%s\" on the upstream node: %s\n"),
|
|
slot_name,
|
|
PQerrorMessage(conn));
|
|
success = false;
|
|
}
|
|
|
|
PQclear(res);
|
|
return success;
|
|
}
|
|
|
|
|
|
bool
|
|
create_replication_slot_sql(PGconn *conn, char *slot_name, PQExpBufferData *error_msg)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
bool success = true;
|
|
|
|
ReplSlotStatus slot_status = _verify_replication_slot(conn, slot_name, error_msg);
|
|
|
|
/* Replication slot is unusable */
|
|
if (slot_status == SLOT_NOT_PHYSICAL || slot_status == SLOT_ACTIVE)
|
|
return false;
|
|
|
|
/* Replication slot can be reused */
|
|
if (slot_status == SLOT_INACTIVE)
|
|
return true;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
/* In 9.6 and later, reserve the LSN straight away */
|
|
if (PQserverVersion(conn) >= 90600)
|
|
{
|
|
appendPQExpBuffer(&query,
|
|
"SELECT * FROM pg_catalog.pg_create_physical_replication_slot('%s', TRUE)",
|
|
slot_name);
|
|
}
|
|
else
|
|
{
|
|
appendPQExpBuffer(&query,
|
|
"SELECT * FROM pg_catalog.pg_create_physical_replication_slot('%s')",
|
|
slot_name);
|
|
}
|
|
|
|
log_debug(_("create_replication_slot_sql(): creating slot \"%s\" on upstream"), slot_name);
|
|
log_verbose(LOG_DEBUG, "create_replication_slot_sql():\n%s", query.data);
|
|
|
|
res = PQexec(conn, query.data);
|
|
termPQExpBuffer(&query);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK && error_msg != NULL)
|
|
{
|
|
appendPQExpBuffer(error_msg,
|
|
_("unable to create replication slot \"%s\" on the upstream node: %s\n"),
|
|
slot_name,
|
|
PQerrorMessage(conn));
|
|
success = false;
|
|
}
|
|
|
|
PQclear(res);
|
|
return success;
|
|
}
|
|
|
|
|
|
bool
|
|
drop_replication_slot_sql(PGconn *conn, char *slot_name)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
bool success = true;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBuffer(&query,
|
|
"SELECT pg_catalog.pg_drop_replication_slot('%s')",
|
|
slot_name);
|
|
|
|
log_verbose(LOG_DEBUG, "drop_replication_slot_sql():\n %s", query.data);
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, query.data,
|
|
_("drop_replication_slot_sql(): unable to drop replication slot \"%s\""),
|
|
slot_name);
|
|
|
|
success = false;
|
|
}
|
|
else
|
|
{
|
|
log_verbose(LOG_DEBUG, "replication slot \"%s\" successfully dropped",
|
|
slot_name);
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return success;
|
|
}
|
|
|
|
|
|
bool
|
|
drop_replication_slot_replprot(PGconn *repl_conn, char *slot_name)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
bool success = true;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBuffer(&query,
|
|
"DROP_REPLICATION_SLOT %s;",
|
|
slot_name);
|
|
|
|
log_verbose(LOG_DEBUG, "drop_replication_slot_replprot():\n %s", query.data);
|
|
|
|
res = PQexec(repl_conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK || !PQntuples(res))
|
|
{
|
|
log_db_error(repl_conn, query.data,
|
|
_("drop_replication_slot_replprot(): unable to drop replication slot \"%s\""),
|
|
slot_name);
|
|
|
|
success = false;
|
|
}
|
|
else
|
|
{
|
|
log_verbose(LOG_DEBUG, "replication slot \"%s\" successfully dropped",
|
|
slot_name);
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return success;
|
|
}
|
|
|
|
|
|
RecordStatus
|
|
get_slot_record(PGconn *conn, char *slot_name, t_replication_slot *record)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
RecordStatus record_status = RECORD_FOUND;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBuffer(&query,
|
|
"SELECT slot_name, slot_type, active "
|
|
" FROM pg_catalog.pg_replication_slots "
|
|
" WHERE slot_name = '%s' ",
|
|
slot_name);
|
|
|
|
log_verbose(LOG_DEBUG, "get_slot_record():\n%s", query.data);
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, query.data,
|
|
_("get_slot_record(): unable to query pg_replication_slots"));
|
|
|
|
record_status = RECORD_ERROR;
|
|
}
|
|
else if (!PQntuples(res))
|
|
{
|
|
record_status = RECORD_NOT_FOUND;
|
|
}
|
|
else
|
|
{
|
|
snprintf(record->slot_name,
|
|
sizeof(record->slot_name),
|
|
"%s", PQgetvalue(res, 0, 0));
|
|
snprintf(record->slot_type,
|
|
sizeof(record->slot_type),
|
|
"%s", PQgetvalue(res, 0, 1));
|
|
record->active = atobool(PQgetvalue(res, 0, 2));
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return record_status;
|
|
}
|
|
|
|
|
|
int
|
|
get_free_replication_slot_count(PGconn *conn, int *max_replication_slots)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
int free_slots = 0;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBufferStr(&query,
|
|
" SELECT pg_catalog.current_setting('max_replication_slots')::INT - "
|
|
" pg_catalog.count(*) "
|
|
" AS free_slots, "
|
|
" pg_catalog.current_setting('max_replication_slots')::INT "
|
|
" AS max_replication_slots "
|
|
" FROM pg_catalog.pg_replication_slots s"
|
|
" WHERE s.slot_type = 'physical'");
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, query.data,
|
|
_("get_free_replication_slot_count(): unable to execute replication slot query"));
|
|
|
|
free_slots = UNKNOWN_VALUE;
|
|
}
|
|
else if (PQntuples(res) == 0)
|
|
{
|
|
free_slots = UNKNOWN_VALUE;
|
|
}
|
|
else
|
|
{
|
|
free_slots = atoi(PQgetvalue(res, 0, 0));
|
|
if (max_replication_slots != NULL)
|
|
*max_replication_slots = atoi(PQgetvalue(res, 0, 1));
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return free_slots;
|
|
}
|
|
|
|
|
|
int
|
|
get_inactive_replication_slots(PGconn *conn, KeyValueList *list)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
int i, inactive_slots = 0;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBufferStr(&query,
|
|
" SELECT slot_name, slot_type "
|
|
" FROM pg_catalog.pg_replication_slots "
|
|
" WHERE active IS FALSE "
|
|
" AND slot_type = 'physical' "
|
|
" ORDER BY slot_name ");
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, query.data,
|
|
_("get_inactive_replication_slots(): unable to execute replication slot query"));
|
|
|
|
inactive_slots = -1;
|
|
}
|
|
else
|
|
{
|
|
inactive_slots = PQntuples(res);
|
|
|
|
for (i = 0; i < inactive_slots; i++)
|
|
{
|
|
key_value_list_set(list,
|
|
PQgetvalue(res, i, 0),
|
|
PQgetvalue(res, i, 1));
|
|
}
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return inactive_slots;
|
|
}
|
|
|
|
|
|
|
|
/* ==================== */
|
|
/* tablespace functions */
|
|
/* ==================== */
|
|
|
|
bool
|
|
get_tablespace_name_by_location(PGconn *conn, const char *location, char *name)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
bool success = true;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBuffer(&query,
|
|
"SELECT spcname "
|
|
" FROM pg_catalog.pg_tablespace "
|
|
" WHERE pg_catalog.pg_tablespace_location(oid) = '%s'",
|
|
location);
|
|
|
|
log_verbose(LOG_DEBUG, "get_tablespace_name_by_location():\n%s", query.data);
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, query.data,
|
|
_("get_tablespace_name_by_location(): unable to execute tablespace query"));
|
|
success = false;
|
|
}
|
|
else if (PQntuples(res) == 0)
|
|
{
|
|
success = false;
|
|
}
|
|
else
|
|
{
|
|
snprintf(name, MAXLEN,
|
|
"%s", PQgetvalue(res, 0, 0));
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return success;
|
|
}
|
|
|
|
/* ============================ */
|
|
/* asynchronous query functions */
|
|
/* ============================ */
|
|
|
|
bool
|
|
cancel_query(PGconn *conn, int timeout)
|
|
{
|
|
char errbuf[ERRBUFF_SIZE] = "";
|
|
PGcancel *pgcancel = NULL;
|
|
|
|
if (wait_connection_availability(conn, timeout) != 1)
|
|
return false;
|
|
|
|
pgcancel = PQgetCancel(conn);
|
|
|
|
if (pgcancel == NULL)
|
|
return false;
|
|
|
|
/*
|
|
* PQcancel can only return 0 if socket()/connect()/send() fails, in any
|
|
* of those cases we can assume something bad happened to the connection
|
|
*/
|
|
if (PQcancel(pgcancel, errbuf, ERRBUFF_SIZE) == 0)
|
|
{
|
|
log_warning(_("unable to cancel current query"));
|
|
log_detail("\n%s", errbuf);
|
|
PQfreeCancel(pgcancel);
|
|
return false;
|
|
}
|
|
|
|
PQfreeCancel(pgcancel);
|
|
|
|
return true;
|
|
}
|
|
|
|
|
|
/*
|
|
* Wait until current query finishes, ignoring any results.
|
|
* Usually this will be an async query or query cancellation.
|
|
*
|
|
* Returns 1 for success; 0 if any error occurred; -1 if timeout reached.
|
|
*/
|
|
int
|
|
wait_connection_availability(PGconn *conn, int timeout)
|
|
{
|
|
PGresult *res = NULL;
|
|
fd_set read_set;
|
|
int sock = PQsocket(conn);
|
|
struct timeval tmout,
|
|
before,
|
|
after;
|
|
struct timezone tz;
|
|
long long timeout_ms;
|
|
|
|
/* calculate timeout in microseconds */
|
|
timeout_ms = (long long) timeout * 1000000;
|
|
|
|
while (timeout_ms > 0)
|
|
{
|
|
if (PQconsumeInput(conn) == 0)
|
|
{
|
|
log_warning(_("wait_connection_availability(): unable to receive data from connection"));
|
|
log_detail("%s", PQerrorMessage(conn));
|
|
return 0;
|
|
}
|
|
|
|
if (PQisBusy(conn) == 0)
|
|
{
|
|
do
|
|
{
|
|
res = PQgetResult(conn);
|
|
PQclear(res);
|
|
} while (res != NULL);
|
|
|
|
break;
|
|
}
|
|
|
|
tmout.tv_sec = 0;
|
|
tmout.tv_usec = 250000;
|
|
|
|
FD_ZERO(&read_set);
|
|
FD_SET(sock, &read_set);
|
|
|
|
gettimeofday(&before, &tz);
|
|
if (select(sock, &read_set, NULL, NULL, &tmout) == -1)
|
|
{
|
|
log_warning(_("wait_connection_availability(): select() returned with error"));
|
|
log_detail("%s", strerror(errno));
|
|
return -1;
|
|
}
|
|
|
|
gettimeofday(&after, &tz);
|
|
|
|
timeout_ms -= (after.tv_sec * 1000000 + after.tv_usec) -
|
|
(before.tv_sec * 1000000 + before.tv_usec);
|
|
}
|
|
|
|
|
|
if (timeout_ms >= 0)
|
|
{
|
|
return 1;
|
|
}
|
|
|
|
log_warning(_("wait_connection_availability(): timeout (%i secs) reached"), timeout);
|
|
return -1;
|
|
}
|
|
|
|
|
|
/* =========================== */
|
|
/* node availability functions */
|
|
/* =========================== */
|
|
|
|
bool
|
|
is_server_available(const char *conninfo)
|
|
{
|
|
return _is_server_available(conninfo, false);
|
|
}
|
|
|
|
|
|
bool
|
|
is_server_available_quiet(const char *conninfo)
|
|
{
|
|
return _is_server_available(conninfo, true);
|
|
}
|
|
|
|
|
|
static bool
|
|
_is_server_available(const char *conninfo, bool quiet)
|
|
{
|
|
PGPing status = PQping(conninfo);
|
|
|
|
log_verbose(LOG_DEBUG, "is_server_available(): ping status for \"%s\" is %s", conninfo, print_pqping_status(status));
|
|
if (status == PQPING_OK)
|
|
return true;
|
|
|
|
if (quiet == false)
|
|
{
|
|
log_warning(_("unable to ping \"%s\""), conninfo);
|
|
log_detail(_("PQping() returned \"%s\""), print_pqping_status(status));
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
|
|
bool
|
|
is_server_available_params(t_conninfo_param_list *param_list)
|
|
{
|
|
PGPing status = PQpingParams((const char **) param_list->keywords,
|
|
(const char **) param_list->values,
|
|
false);
|
|
|
|
/* deparsing the param_list adds overhead, so only do it if needed */
|
|
if (log_level == LOG_DEBUG || status != PQPING_OK)
|
|
{
|
|
char *conninfo_str = param_list_to_string(param_list);
|
|
log_verbose(LOG_DEBUG, "is_server_available_params(): ping status for \"%s\" is %s", conninfo_str, print_pqping_status(status));
|
|
|
|
if (status != PQPING_OK)
|
|
{
|
|
log_warning(_("unable to ping \"%s\""), conninfo_str);
|
|
log_detail(_("PQping() returned \"%s\""), print_pqping_status(status));
|
|
}
|
|
|
|
pfree(conninfo_str);
|
|
}
|
|
|
|
if (status == PQPING_OK)
|
|
return true;
|
|
|
|
return false;
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
* Simple throw-away query to stop a connection handle going stale.
|
|
*/
|
|
ExecStatusType
|
|
connection_ping(PGconn *conn)
|
|
{
|
|
PGresult *res = PQexec(conn, "SELECT TRUE");
|
|
ExecStatusType ping_result;
|
|
|
|
log_verbose(LOG_DEBUG, "connection_ping(): result is %s", PQresStatus(PQresultStatus(res)));
|
|
|
|
ping_result = PQresultStatus(res);
|
|
PQclear(res);
|
|
|
|
return ping_result;
|
|
}
|
|
|
|
|
|
ExecStatusType
|
|
connection_ping_reconnect(PGconn *conn)
|
|
{
|
|
ExecStatusType ping_result = connection_ping(conn);
|
|
|
|
if (PQstatus(conn) != CONNECTION_OK)
|
|
{
|
|
log_warning(_("connection error, attempting to reset"));
|
|
log_detail("\n%s", PQerrorMessage(conn));
|
|
PQreset(conn);
|
|
ping_result = connection_ping(conn);
|
|
}
|
|
|
|
log_verbose(LOG_DEBUG, "connection_ping_reconnect(): result is %s", PQresStatus(ping_result));
|
|
|
|
return ping_result;
|
|
}
|
|
|
|
|
|
|
|
/* ==================== */
|
|
/* monitoring functions */
|
|
/* ==================== */
|
|
|
|
void
|
|
add_monitoring_record(PGconn *primary_conn,
|
|
PGconn *local_conn,
|
|
int primary_node_id,
|
|
int local_node_id,
|
|
char *monitor_standby_timestamp,
|
|
XLogRecPtr primary_last_wal_location,
|
|
XLogRecPtr last_wal_receive_lsn,
|
|
char *last_xact_replay_timestamp,
|
|
long long unsigned int replication_lag_bytes,
|
|
long long unsigned int apply_lag_bytes
|
|
)
|
|
{
|
|
PQExpBufferData query;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBuffer(&query,
|
|
"INSERT INTO repmgr.monitoring_history "
|
|
" (primary_node_id, "
|
|
" standby_node_id, "
|
|
" last_monitor_time, "
|
|
" last_apply_time, "
|
|
" last_wal_primary_location, "
|
|
" last_wal_standby_location, "
|
|
" replication_lag, "
|
|
" apply_lag ) "
|
|
" VALUES(%i, "
|
|
" %i, "
|
|
" '%s'::TIMESTAMP WITH TIME ZONE, "
|
|
" '%s'::TIMESTAMP WITH TIME ZONE, "
|
|
" '%X/%X', "
|
|
" '%X/%X', "
|
|
" %llu, "
|
|
" %llu) ",
|
|
primary_node_id,
|
|
local_node_id,
|
|
monitor_standby_timestamp,
|
|
last_xact_replay_timestamp,
|
|
format_lsn(primary_last_wal_location),
|
|
format_lsn(last_wal_receive_lsn),
|
|
replication_lag_bytes,
|
|
apply_lag_bytes);
|
|
|
|
log_verbose(LOG_DEBUG, "standby_monitor:()\n%s", query.data);
|
|
|
|
if (PQsendQuery(primary_conn, query.data) == 0)
|
|
{
|
|
log_warning(_("query could not be sent to primary:\n %s"),
|
|
PQerrorMessage(primary_conn));
|
|
}
|
|
else
|
|
{
|
|
PGresult *res = PQexec(local_conn, "SELECT repmgr.standby_set_last_updated()");
|
|
|
|
/* not critical if the above query fails */
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
log_warning(_("add_monitoring_record(): unable to set last_updated:\n %s"),
|
|
PQerrorMessage(local_conn));
|
|
|
|
PQclear(res);
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
|
|
return;
|
|
}
|
|
|
|
|
|
int
|
|
get_number_of_monitoring_records_to_delete(PGconn *primary_conn, int keep_history, int node_id)
|
|
{
|
|
PQExpBufferData query;
|
|
int record_count = -1;
|
|
PGresult *res = NULL;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBuffer(&query,
|
|
"SELECT pg_catalog.count(*) "
|
|
" FROM repmgr.monitoring_history "
|
|
" WHERE pg_catalog.age(pg_catalog.now(), last_monitor_time) >= '%d days'::interval",
|
|
keep_history);
|
|
|
|
if (node_id != UNKNOWN_NODE_ID)
|
|
{
|
|
appendPQExpBuffer(&query,
|
|
" AND standby_node_id = %i", node_id);
|
|
}
|
|
|
|
log_verbose(LOG_DEBUG, "get_number_of_monitoring_records_to_delete():\n %s", query.data);
|
|
|
|
res = PQexec(primary_conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(primary_conn, query.data,
|
|
_("get_number_of_monitoring_records_to_delete(): unable to query number of monitoring records to clean up"));
|
|
}
|
|
else
|
|
{
|
|
record_count = atoi(PQgetvalue(res, 0, 0));
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return record_count;
|
|
}
|
|
|
|
|
|
bool
|
|
delete_monitoring_records(PGconn *primary_conn, int keep_history, int node_id)
|
|
{
|
|
PQExpBufferData query;
|
|
bool success = true;
|
|
PGresult *res = NULL;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
if (keep_history > 0 || node_id != UNKNOWN_NODE_ID)
|
|
{
|
|
appendPQExpBuffer(&query,
|
|
"DELETE FROM repmgr.monitoring_history "
|
|
" WHERE pg_catalog.age(pg_catalog.now(), last_monitor_time) >= '%d days'::INTERVAL ",
|
|
keep_history);
|
|
|
|
if (node_id != UNKNOWN_NODE_ID)
|
|
{
|
|
appendPQExpBuffer(&query,
|
|
" AND standby_node_id = %i", node_id);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
appendPQExpBufferStr(&query,
|
|
"TRUNCATE TABLE repmgr.monitoring_history");
|
|
}
|
|
|
|
res = PQexec(primary_conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
|
{
|
|
log_db_error(primary_conn, query.data,
|
|
_("delete_monitoring_records(): unable to delete monitoring records"));
|
|
success = false;
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return success;
|
|
}
|
|
|
|
/*
|
|
* node voting functions
|
|
*
|
|
* These are intended to run under repmgrd and mainly rely on shared memory
|
|
*/
|
|
|
|
int
|
|
get_current_term(PGconn *conn)
|
|
{
|
|
PGresult *res = NULL;
|
|
int term = VOTING_TERM_NOT_SET;
|
|
|
|
res = PQexec(conn, "SELECT term FROM repmgr.voting_term");
|
|
|
|
/* it doesn't matter if for whatever reason the table has no rows */
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, NULL,
|
|
_("get_current_term(): unable to query \"repmgr.voting_term\""));
|
|
}
|
|
else if (PQntuples(res) > 0)
|
|
{
|
|
term = atoi(PQgetvalue(res, 0, 0));
|
|
}
|
|
|
|
PQclear(res);
|
|
return term;
|
|
}
|
|
|
|
|
|
void
|
|
initialize_voting_term(PGconn *conn)
|
|
{
|
|
PGresult *res = NULL;
|
|
|
|
int current_term = get_current_term(conn);
|
|
|
|
if (current_term == VOTING_TERM_NOT_SET)
|
|
{
|
|
res = PQexec(conn, "INSERT INTO repmgr.voting_term (term) VALUES (1)");
|
|
}
|
|
else
|
|
{
|
|
res = PQexec(conn, "UPDATE repmgr.voting_term SET term = 1");
|
|
}
|
|
|
|
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
|
{
|
|
log_db_error(conn, NULL, _("unable to initialize repmgr.voting_term"));
|
|
}
|
|
|
|
PQclear(res);
|
|
return;
|
|
}
|
|
|
|
|
|
void
|
|
increment_current_term(PGconn *conn)
|
|
{
|
|
PGresult *res = NULL;
|
|
|
|
res = PQexec(conn, "UPDATE repmgr.voting_term SET term = term + 1");
|
|
|
|
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
|
{
|
|
log_db_error(conn, NULL, _("unable to increment repmgr.voting_term"));
|
|
}
|
|
|
|
PQclear(res);
|
|
return;
|
|
}
|
|
|
|
|
|
bool
|
|
announce_candidature(PGconn *conn, t_node_info *this_node, t_node_info *other_node, int electoral_term)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
|
|
bool retval = false;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBuffer(&query,
|
|
"SELECT repmgr.other_node_is_candidate(%i, %i)",
|
|
this_node->node_id,
|
|
electoral_term);
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
|
{
|
|
log_db_error(conn, query.data, _("announce_candidature(): unable to execute repmgr.other_node_is_candidate()"));
|
|
}
|
|
else
|
|
{
|
|
retval = atobool(PQgetvalue(res, 0, 0));
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return retval;
|
|
}
|
|
|
|
|
|
void
|
|
notify_follow_primary(PGconn *conn, int primary_node_id)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBuffer(&query,
|
|
"SELECT repmgr.notify_follow_primary(%i)",
|
|
primary_node_id);
|
|
|
|
log_verbose(LOG_DEBUG, "notify_follow_primary():\n %s", query.data);
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, query.data, _("unable to execute repmgr.notify_follow_primary()"));
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return;
|
|
}
|
|
|
|
|
|
bool
|
|
get_new_primary(PGconn *conn, int *primary_node_id)
|
|
{
|
|
PGresult *res = NULL;
|
|
int new_primary_node_id = UNKNOWN_NODE_ID;
|
|
bool success = true;
|
|
|
|
const char *sqlquery = "SELECT repmgr.get_new_primary()";
|
|
|
|
res = PQexec(conn, sqlquery);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, sqlquery, _("unable to execute repmgr.get_new_primary()"));
|
|
success = false;
|
|
}
|
|
else if (PQgetisnull(res, 0, 0))
|
|
{
|
|
success = false;
|
|
}
|
|
else
|
|
{
|
|
new_primary_node_id = atoi(PQgetvalue(res, 0, 0));
|
|
}
|
|
|
|
PQclear(res);
|
|
|
|
/*
|
|
* repmgr.get_new_primary() will return UNKNOWN_NODE_ID if
|
|
* "follow_new_primary" is false
|
|
*/
|
|
if (new_primary_node_id == UNKNOWN_NODE_ID)
|
|
success = false;
|
|
|
|
*primary_node_id = new_primary_node_id;
|
|
|
|
return success;
|
|
}
|
|
|
|
|
|
void
|
|
reset_voting_status(PGconn *conn)
|
|
{
|
|
PGresult *res = NULL;
|
|
|
|
const char *sqlquery = "SELECT repmgr.reset_voting_status()";
|
|
|
|
res = PQexec(conn, sqlquery);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, sqlquery, _("unable to execute repmgr.reset_voting_status()"));
|
|
}
|
|
|
|
PQclear(res);
|
|
return;
|
|
}
|
|
|
|
|
|
/* ============================ */
|
|
/* replication status functions */
|
|
/* ============================ */
|
|
|
|
/*
|
|
* Returns the current LSN on the primary.
|
|
*
|
|
* This just executes "pg_current_wal_lsn()".
|
|
*
|
|
* Function "get_node_current_lsn()" below will return the latest
|
|
* LSN regardless of recovery state.
|
|
*/
|
|
XLogRecPtr
|
|
get_primary_current_lsn(PGconn *conn)
|
|
{
|
|
PGresult *res = NULL;
|
|
XLogRecPtr ptr = InvalidXLogRecPtr;
|
|
|
|
if (PQserverVersion(conn) >= 100000)
|
|
{
|
|
res = PQexec(conn, "SELECT pg_catalog.pg_current_wal_lsn()");
|
|
}
|
|
else
|
|
{
|
|
res = PQexec(conn, "SELECT pg_catalog.pg_current_xlog_location()");
|
|
}
|
|
|
|
if (PQresultStatus(res) == PGRES_TUPLES_OK)
|
|
{
|
|
ptr = parse_lsn(PQgetvalue(res, 0, 0));
|
|
}
|
|
else
|
|
{
|
|
log_db_error(conn, NULL, _("unable to execute get_primary_current_lsn()"));
|
|
}
|
|
|
|
|
|
PQclear(res);
|
|
|
|
return ptr;
|
|
}
|
|
|
|
|
|
XLogRecPtr
|
|
get_last_wal_receive_location(PGconn *conn)
|
|
{
|
|
PGresult *res = NULL;
|
|
XLogRecPtr ptr = InvalidXLogRecPtr;
|
|
|
|
if (PQserverVersion(conn) >= 100000)
|
|
{
|
|
res = PQexec(conn, "SELECT pg_catalog.pg_last_wal_receive_lsn()");
|
|
}
|
|
else
|
|
{
|
|
res = PQexec(conn, "SELECT pg_catalog.pg_last_xlog_receive_location()");
|
|
}
|
|
|
|
if (PQresultStatus(res) == PGRES_TUPLES_OK)
|
|
{
|
|
ptr = parse_lsn(PQgetvalue(res, 0, 0));
|
|
}
|
|
else
|
|
{
|
|
log_db_error(conn, NULL, _("unable to execute get_last_wal_receive_location()"));
|
|
}
|
|
|
|
PQclear(res);
|
|
|
|
return ptr;
|
|
}
|
|
|
|
/*
|
|
* Returns the latest LSN for the node regardless of recovery state.
|
|
*/
|
|
XLogRecPtr
|
|
get_node_current_lsn(PGconn *conn)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
XLogRecPtr ptr = InvalidXLogRecPtr;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
if (PQserverVersion(conn) >= 100000)
|
|
{
|
|
appendPQExpBufferStr(&query,
|
|
" WITH lsn_states AS ( "
|
|
" SELECT "
|
|
" CASE WHEN pg_catalog.pg_is_in_recovery() IS FALSE "
|
|
" THEN pg_catalog.pg_current_wal_lsn() "
|
|
" ELSE NULL "
|
|
" END "
|
|
" AS current_wal_lsn, "
|
|
" CASE WHEN pg_catalog.pg_is_in_recovery() IS TRUE "
|
|
" THEN pg_catalog.pg_last_wal_receive_lsn() "
|
|
" ELSE NULL "
|
|
" END "
|
|
" AS last_wal_receive_lsn, "
|
|
" CASE WHEN pg_catalog.pg_is_in_recovery() IS TRUE "
|
|
" THEN pg_catalog.pg_last_wal_replay_lsn() "
|
|
" ELSE NULL "
|
|
" END "
|
|
" AS last_wal_replay_lsn "
|
|
" ) ");
|
|
}
|
|
else
|
|
{
|
|
appendPQExpBufferStr(&query,
|
|
" WITH lsn_states AS ( "
|
|
" SELECT "
|
|
" CASE WHEN pg_catalog.pg_is_in_recovery() IS FALSE "
|
|
" THEN pg_catalog.pg_current_xlog_location() "
|
|
" ELSE NULL "
|
|
" END "
|
|
" AS current_wal_lsn, "
|
|
" CASE WHEN pg_catalog.pg_is_in_recovery() IS TRUE "
|
|
" THEN pg_catalog.pg_last_xlog_receive_location() "
|
|
" ELSE NULL "
|
|
" END "
|
|
" AS last_wal_receive_lsn, "
|
|
" CASE WHEN pg_catalog.pg_is_in_recovery() IS TRUE "
|
|
" THEN pg_catalog.pg_last_xlog_replay_location() "
|
|
" ELSE NULL "
|
|
" END "
|
|
" AS last_wal_replay_lsn "
|
|
" ) ");
|
|
}
|
|
|
|
appendPQExpBufferStr(&query,
|
|
" SELECT "
|
|
" CASE WHEN pg_catalog.pg_is_in_recovery() IS FALSE "
|
|
" THEN current_wal_lsn "
|
|
" ELSE "
|
|
" CASE WHEN last_wal_receive_lsn IS NULL "
|
|
" THEN last_wal_replay_lsn "
|
|
" ELSE "
|
|
" CASE WHEN last_wal_replay_lsn > last_wal_receive_lsn "
|
|
" THEN last_wal_replay_lsn "
|
|
" ELSE last_wal_receive_lsn "
|
|
" END "
|
|
" END "
|
|
" END "
|
|
" AS current_lsn "
|
|
" FROM lsn_states ");
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, query.data, _("unable to execute get_node_current_lsn()"));
|
|
}
|
|
else if (!PQgetisnull(res, 0, 0))
|
|
{
|
|
ptr = parse_lsn(PQgetvalue(res, 0, 0));
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return ptr;
|
|
}
|
|
|
|
|
|
void
|
|
init_replication_info(ReplInfo *replication_info)
|
|
{
|
|
memset(replication_info->current_timestamp, 0, sizeof(replication_info->current_timestamp));
|
|
replication_info->in_recovery = false;
|
|
replication_info->timeline_id = UNKNOWN_TIMELINE_ID;
|
|
replication_info->last_wal_receive_lsn = InvalidXLogRecPtr;
|
|
replication_info->last_wal_replay_lsn = InvalidXLogRecPtr;
|
|
memset(replication_info->last_xact_replay_timestamp, 0, sizeof(replication_info->last_xact_replay_timestamp));
|
|
replication_info->replication_lag_time = 0;
|
|
replication_info->receiving_streamed_wal = true;
|
|
replication_info->wal_replay_paused = false;
|
|
replication_info->upstream_last_seen = -1;
|
|
replication_info->upstream_node_id = UNKNOWN_NODE_ID;
|
|
}
|
|
|
|
|
|
bool
|
|
get_replication_info(PGconn *conn, t_server_type node_type, ReplInfo *replication_info)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
bool success = true;
|
|
|
|
initPQExpBuffer(&query);
|
|
appendPQExpBufferStr(&query,
|
|
" SELECT ts, "
|
|
" in_recovery, "
|
|
" last_wal_receive_lsn, "
|
|
" last_wal_replay_lsn, "
|
|
" last_xact_replay_timestamp, "
|
|
" CASE WHEN (last_wal_receive_lsn = last_wal_replay_lsn) "
|
|
" THEN 0::INT "
|
|
" ELSE "
|
|
" CASE WHEN last_xact_replay_timestamp IS NULL "
|
|
" THEN 0::INT "
|
|
" ELSE "
|
|
" EXTRACT(epoch FROM (pg_catalog.clock_timestamp() - last_xact_replay_timestamp))::INT "
|
|
" END "
|
|
" END AS replication_lag_time, "
|
|
" last_wal_receive_lsn >= last_wal_replay_lsn AS receiving_streamed_wal, "
|
|
" wal_replay_paused, "
|
|
" upstream_last_seen, "
|
|
" upstream_node_id "
|
|
" FROM ( "
|
|
" SELECT CURRENT_TIMESTAMP AS ts, "
|
|
" pg_catalog.pg_is_in_recovery() AS in_recovery, "
|
|
" pg_catalog.pg_last_xact_replay_timestamp() AS last_xact_replay_timestamp, ");
|
|
|
|
|
|
if (PQserverVersion(conn) >= 100000)
|
|
{
|
|
appendPQExpBufferStr(&query,
|
|
" COALESCE(pg_catalog.pg_last_wal_receive_lsn(), '0/0'::PG_LSN) AS last_wal_receive_lsn, "
|
|
" COALESCE(pg_catalog.pg_last_wal_replay_lsn(), '0/0'::PG_LSN) AS last_wal_replay_lsn, "
|
|
" CASE WHEN pg_catalog.pg_is_in_recovery() IS FALSE "
|
|
" THEN FALSE "
|
|
" ELSE pg_catalog.pg_is_wal_replay_paused() "
|
|
" END AS wal_replay_paused, ");
|
|
}
|
|
else
|
|
{
|
|
appendPQExpBufferStr(&query,
|
|
" COALESCE(pg_catalog.pg_last_xlog_receive_location(), '0/0'::PG_LSN) AS last_wal_receive_lsn, "
|
|
" COALESCE(pg_catalog.pg_last_xlog_replay_location(), '0/0'::PG_LSN) AS last_wal_replay_lsn, "
|
|
" CASE WHEN pg_catalog.pg_is_in_recovery() IS FALSE "
|
|
" THEN FALSE "
|
|
" ELSE pg_catalog.pg_is_xlog_replay_paused() "
|
|
" END AS wal_replay_paused, ");
|
|
}
|
|
|
|
/* Add information about upstream node from shared memory */
|
|
if (node_type == WITNESS)
|
|
{
|
|
appendPQExpBufferStr(&query,
|
|
" repmgr.get_upstream_last_seen() AS upstream_last_seen, "
|
|
" repmgr.get_upstream_node_id() AS upstream_node_id ");
|
|
}
|
|
else
|
|
{
|
|
appendPQExpBufferStr(&query,
|
|
" CASE WHEN pg_catalog.pg_is_in_recovery() IS FALSE "
|
|
" THEN -1 "
|
|
" ELSE repmgr.get_upstream_last_seen() "
|
|
" END AS upstream_last_seen, ");
|
|
appendPQExpBufferStr(&query,
|
|
" CASE WHEN pg_catalog.pg_is_in_recovery() IS FALSE "
|
|
" THEN -1 "
|
|
" ELSE repmgr.get_upstream_node_id() "
|
|
" END AS upstream_node_id ");
|
|
}
|
|
|
|
appendPQExpBufferStr(&query,
|
|
" ) q ");
|
|
|
|
log_verbose(LOG_DEBUG, "get_replication_info():\n%s", query.data);
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK || !PQntuples(res))
|
|
{
|
|
log_db_error(conn, query.data, _("get_replication_info(): unable to execute query"));
|
|
|
|
success = false;
|
|
}
|
|
else
|
|
{
|
|
snprintf(replication_info->current_timestamp,
|
|
sizeof(replication_info->current_timestamp),
|
|
"%s", PQgetvalue(res, 0, 0));
|
|
replication_info->in_recovery = atobool(PQgetvalue(res, 0, 1));
|
|
replication_info->last_wal_receive_lsn = parse_lsn(PQgetvalue(res, 0, 2));
|
|
replication_info->last_wal_replay_lsn = parse_lsn(PQgetvalue(res, 0, 3));
|
|
snprintf(replication_info->last_xact_replay_timestamp,
|
|
sizeof(replication_info->last_xact_replay_timestamp),
|
|
"%s", PQgetvalue(res, 0, 4));
|
|
replication_info->replication_lag_time = atoi(PQgetvalue(res, 0, 5));
|
|
replication_info->receiving_streamed_wal = atobool(PQgetvalue(res, 0, 6));
|
|
replication_info->wal_replay_paused = atobool(PQgetvalue(res, 0, 7));
|
|
replication_info->upstream_last_seen = atoi(PQgetvalue(res, 0, 8));
|
|
replication_info->upstream_node_id = atoi(PQgetvalue(res, 0, 9));
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return success;
|
|
}
|
|
|
|
|
|
int
|
|
get_replication_lag_seconds(PGconn *conn)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
int lag_seconds = 0;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
if (PQserverVersion(conn) >= 100000)
|
|
{
|
|
appendPQExpBufferStr(&query,
|
|
" SELECT CASE WHEN (pg_catalog.pg_last_wal_receive_lsn() = pg_catalog.pg_last_wal_replay_lsn()) ");
|
|
|
|
}
|
|
else
|
|
{
|
|
appendPQExpBufferStr(&query,
|
|
" SELECT CASE WHEN (pg_catalog.pg_last_xlog_receive_location() = pg_catalog.pg_last_xlog_replay_location()) ");
|
|
}
|
|
|
|
appendPQExpBufferStr(&query,
|
|
" THEN 0 "
|
|
" ELSE EXTRACT(epoch FROM (pg_catalog.clock_timestamp() - pg_catalog.pg_last_xact_replay_timestamp()))::INT "
|
|
" END "
|
|
" AS lag_seconds");
|
|
|
|
res = PQexec(conn, query.data);
|
|
log_verbose(LOG_DEBUG, "get_replication_lag_seconds():\n%s", query.data);
|
|
termPQExpBuffer(&query);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_warning("%s", PQerrorMessage(conn));
|
|
PQclear(res);
|
|
|
|
return UNKNOWN_REPLICATION_LAG;
|
|
}
|
|
|
|
if (!PQntuples(res))
|
|
{
|
|
return UNKNOWN_REPLICATION_LAG;
|
|
}
|
|
|
|
lag_seconds = atoi(PQgetvalue(res, 0, 0));
|
|
|
|
PQclear(res);
|
|
return lag_seconds;
|
|
}
|
|
|
|
|
|
|
|
TimeLineID
|
|
get_node_timeline(PGconn *conn, char *timeline_id_str)
|
|
{
|
|
TimeLineID timeline_id = UNKNOWN_TIMELINE_ID;
|
|
|
|
/*
|
|
* pg_control_checkpoint() was introduced in PostgreSQL 9.6
|
|
*/
|
|
if (PQserverVersion(conn) >= 90600)
|
|
{
|
|
PGresult *res = NULL;
|
|
|
|
res = PQexec(conn, "SELECT timeline_id FROM pg_catalog.pg_control_checkpoint()");
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, NULL, _("get_node_timeline(): unable to query pg_control_system()"));
|
|
}
|
|
else
|
|
{
|
|
timeline_id = atoi(PQgetvalue(res, 0, 0));
|
|
}
|
|
|
|
PQclear(res);
|
|
}
|
|
|
|
/* If requested, format the timeline ID as a string */
|
|
if (timeline_id_str != NULL)
|
|
{
|
|
if (timeline_id == UNKNOWN_TIMELINE_ID)
|
|
{
|
|
strncpy(timeline_id_str, "?", MAXLEN);
|
|
}
|
|
else
|
|
{
|
|
snprintf(timeline_id_str, MAXLEN, "%i", timeline_id);
|
|
}
|
|
}
|
|
|
|
return timeline_id;
|
|
}
|
|
|
|
|
|
void
|
|
get_node_replication_stats(PGconn *conn, t_node_info *node_info)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBufferStr(&query,
|
|
" SELECT pg_catalog.current_setting('max_wal_senders')::INT AS max_wal_senders, "
|
|
" (SELECT pg_catalog.count(*) FROM pg_catalog.pg_stat_replication) AS attached_wal_receivers, "
|
|
" current_setting('max_replication_slots')::INT AS max_replication_slots, "
|
|
" (SELECT pg_catalog.count(*) FROM pg_catalog.pg_replication_slots WHERE slot_type='physical') AS total_replication_slots, "
|
|
" (SELECT pg_catalog.count(*) FROM pg_catalog.pg_replication_slots WHERE active IS TRUE AND slot_type='physical') AS active_replication_slots, "
|
|
" (SELECT pg_catalog.count(*) FROM pg_catalog.pg_replication_slots WHERE active IS FALSE AND slot_type='physical') AS inactive_replication_slots, "
|
|
" pg_catalog.pg_is_in_recovery() AS in_recovery");
|
|
|
|
log_verbose(LOG_DEBUG, "get_node_replication_stats():\n%s", query.data);
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_warning(_("unable to retrieve node replication statistics"));
|
|
log_detail("%s", PQerrorMessage(conn));
|
|
log_detail("%s", query.data);
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return;
|
|
}
|
|
|
|
node_info->max_wal_senders = atoi(PQgetvalue(res, 0, 0));
|
|
node_info->attached_wal_receivers = atoi(PQgetvalue(res, 0, 1));
|
|
node_info->max_replication_slots = atoi(PQgetvalue(res, 0, 2));
|
|
node_info->total_replication_slots = atoi(PQgetvalue(res, 0, 3));
|
|
node_info->active_replication_slots = atoi(PQgetvalue(res, 0, 4));
|
|
node_info->inactive_replication_slots = atoi(PQgetvalue(res, 0, 5));
|
|
node_info->recovery_type = strcmp(PQgetvalue(res, 0, 6), "f") == 0 ? RECTYPE_PRIMARY : RECTYPE_STANDBY;
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return;
|
|
}
|
|
|
|
|
|
NodeAttached
|
|
is_downstream_node_attached(PGconn *conn, char *node_name, char **node_state)
|
|
{
|
|
return _is_downstream_node_attached(conn, node_name, node_state, false);
|
|
}
|
|
|
|
NodeAttached
|
|
is_downstream_node_attached_quiet(PGconn *conn, char *node_name, char **node_state)
|
|
{
|
|
return _is_downstream_node_attached(conn, node_name, node_state, true);
|
|
}
|
|
|
|
NodeAttached
|
|
_is_downstream_node_attached(PGconn *conn, char *node_name, char **node_state, bool quiet)
|
|
{
|
|
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBuffer(&query,
|
|
" SELECT pid, state "
|
|
" FROM pg_catalog.pg_stat_replication "
|
|
" WHERE application_name = '%s'",
|
|
node_name);
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_verbose(LOG_WARNING, _("unable to query pg_stat_replication"));
|
|
log_detail("%s", PQerrorMessage(conn));
|
|
log_detail("%s", query.data);
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return NODE_ATTACHED_UNKNOWN;
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
|
|
/*
|
|
* If there's more than one entry in pg_stat_application, there's no
|
|
* way we can reliably determine which one belongs to the node we're
|
|
* checking, so there's nothing more we can do.
|
|
*/
|
|
if (PQntuples(res) > 1)
|
|
{
|
|
if (quiet == false)
|
|
{
|
|
log_error(_("multiple entries with \"application_name\" set to \"%s\" found in \"pg_stat_replication\""),
|
|
node_name);
|
|
log_hint(_("verify that a unique node name is configured for each node"));
|
|
}
|
|
|
|
PQclear(res);
|
|
|
|
return NODE_ATTACHED_UNKNOWN;
|
|
}
|
|
|
|
if (PQntuples(res) == 0)
|
|
{
|
|
if (quiet == false)
|
|
log_warning(_("node \"%s\" not found in \"pg_stat_replication\""), node_name);
|
|
|
|
PQclear(res);
|
|
|
|
return NODE_DETACHED;
|
|
}
|
|
|
|
/*
|
|
* If the connection is not a superuser or member of pg_read_all_stats, we
|
|
* won't be able to retrieve the "state" column, so we'll assume
|
|
* the node is attached.
|
|
*/
|
|
|
|
if (connection_has_pg_monitor_role(conn, "pg_read_all_stats"))
|
|
{
|
|
const char *state = PQgetvalue(res, 0, 1);
|
|
|
|
if (node_state != NULL)
|
|
{
|
|
int state_len = strlen(state);
|
|
*node_state = palloc0(state_len + 1);
|
|
strncpy(*node_state, state, state_len);
|
|
}
|
|
|
|
if (strcmp(state, "streaming") != 0)
|
|
{
|
|
if (quiet == false)
|
|
log_warning(_("node \"%s\" attached in state \"%s\""),
|
|
node_name,
|
|
state);
|
|
|
|
PQclear(res);
|
|
|
|
return NODE_NOT_ATTACHED;
|
|
}
|
|
}
|
|
else if (node_state != NULL)
|
|
{
|
|
*node_state = palloc0(1);
|
|
*node_state[0] = '\0';
|
|
}
|
|
|
|
PQclear(res);
|
|
|
|
return NODE_ATTACHED;
|
|
}
|
|
|
|
|
|
void
|
|
set_upstream_last_seen(PGconn *conn, int upstream_node_id)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBuffer(&query,
|
|
"SELECT repmgr.set_upstream_last_seen(%i)",
|
|
upstream_node_id);
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, query.data, _("unable to execute repmgr.set_upstream_last_seen()"));
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
}
|
|
|
|
|
|
int
|
|
get_upstream_last_seen(PGconn *conn, t_server_type node_type)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
int upstream_last_seen = -1;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
if (node_type == WITNESS)
|
|
{
|
|
appendPQExpBufferStr(&query,
|
|
"SELECT repmgr.get_upstream_last_seen()");
|
|
}
|
|
else
|
|
{
|
|
appendPQExpBufferStr(&query,
|
|
"SELECT CASE WHEN pg_catalog.pg_is_in_recovery() IS FALSE "
|
|
" THEN -1 "
|
|
" ELSE repmgr.get_upstream_last_seen() "
|
|
" END AS upstream_last_seen ");
|
|
}
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, query.data, _("unable to execute repmgr.get_upstream_last_seen()"));
|
|
}
|
|
else
|
|
{
|
|
upstream_last_seen = atoi(PQgetvalue(res, 0, 0));
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return upstream_last_seen;
|
|
}
|
|
|
|
|
|
bool
|
|
is_wal_replay_paused(PGconn *conn, bool check_pending_wal)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
bool is_paused = false;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBufferStr(&query,
|
|
"SELECT paused.wal_replay_paused ");
|
|
|
|
if (PQserverVersion(conn) >= 100000)
|
|
{
|
|
if (check_pending_wal == true)
|
|
{
|
|
appendPQExpBufferStr(&query,
|
|
" AND pg_catalog.pg_last_wal_replay_lsn() < pg_catalog.pg_last_wal_receive_lsn() ");
|
|
}
|
|
|
|
appendPQExpBufferStr(&query,
|
|
" FROM (SELECT CASE WHEN pg_catalog.pg_is_in_recovery() IS FALSE "
|
|
" THEN FALSE "
|
|
" ELSE pg_catalog.pg_is_wal_replay_paused() "
|
|
" END AS wal_replay_paused) paused ");
|
|
}
|
|
else
|
|
{
|
|
if (check_pending_wal == true)
|
|
{
|
|
appendPQExpBufferStr(&query,
|
|
" AND pg_catalog.pg_last_xlog_replay_location() < pg_catalog.pg_last_xlog_receive_location() ");
|
|
}
|
|
|
|
appendPQExpBufferStr(&query,
|
|
" FROM (SELECT CASE WHEN pg_catalog.pg_is_in_recovery() IS FALSE "
|
|
" THEN FALSE "
|
|
" ELSE pg_catalog.pg_is_xlog_replay_paused() "
|
|
" END AS wal_replay_paused) paused ");
|
|
|
|
}
|
|
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, query.data, _("unable to execute WAL replay pause query"));
|
|
}
|
|
else
|
|
{
|
|
is_paused = atobool(PQgetvalue(res, 0, 0));
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
|
|
return is_paused;
|
|
}
|
|
|
|
/* repmgrd status functions */
|
|
|
|
CheckStatus
|
|
get_repmgrd_status(PGconn *conn)
|
|
{
|
|
PQExpBufferData query;
|
|
PGresult *res = NULL;
|
|
CheckStatus repmgrd_status = CHECK_STATUS_CRITICAL;
|
|
|
|
initPQExpBuffer(&query);
|
|
|
|
appendPQExpBufferStr(&query,
|
|
" SELECT "
|
|
" CASE "
|
|
" WHEN repmgr.repmgrd_is_running() "
|
|
" THEN "
|
|
" CASE "
|
|
" WHEN repmgr.repmgrd_is_paused() THEN 1 ELSE 0 "
|
|
" END "
|
|
" ELSE 2 "
|
|
" END AS repmgrd_status");
|
|
res = PQexec(conn, query.data);
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
|
{
|
|
log_db_error(conn, query.data, _("unable to execute repmgrd status query"));
|
|
}
|
|
else
|
|
{
|
|
repmgrd_status = atoi(PQgetvalue(res, 0, 0));
|
|
}
|
|
|
|
termPQExpBuffer(&query);
|
|
PQclear(res);
|
|
return repmgrd_status;
|
|
}
|
|
|
|
|
|
/* miscellaneous debugging functions */
|
|
|
|
const char *
|
|
print_node_status(NodeStatus node_status)
|
|
{
|
|
switch (node_status)
|
|
{
|
|
case NODE_STATUS_UNKNOWN:
|
|
return "UNKNOWN";
|
|
case NODE_STATUS_UP:
|
|
return "UP";
|
|
case NODE_STATUS_SHUTTING_DOWN:
|
|
return "SHUTTING_DOWN";
|
|
case NODE_STATUS_DOWN:
|
|
return "SHUTDOWN";
|
|
case NODE_STATUS_UNCLEAN_SHUTDOWN:
|
|
return "UNCLEAN_SHUTDOWN";
|
|
case NODE_STATUS_REJECTED:
|
|
return "REJECTED";
|
|
}
|
|
|
|
return "UNIDENTIFIED_STATUS";
|
|
}
|
|
|
|
|
|
const char *
|
|
print_pqping_status(PGPing ping_status)
|
|
{
|
|
switch (ping_status)
|
|
{
|
|
case PQPING_OK:
|
|
return "PQPING_OK";
|
|
case PQPING_REJECT:
|
|
return "PQPING_REJECT";
|
|
case PQPING_NO_RESPONSE:
|
|
return "PQPING_NO_RESPONSE";
|
|
case PQPING_NO_ATTEMPT:
|
|
return "PQPING_NO_ATTEMPT";
|
|
}
|
|
|
|
return "PQPING_UNKNOWN_STATUS";
|
|
}
|