Merge pull request #797 from EnterpriseDB/connect-to-pg-backup-api

Connect to pg backup api
This commit is contained in:
Martín Marqués
2023-03-07 14:34:54 -03:00
committed by GitHub
11 changed files with 347 additions and 13 deletions

View File

@@ -22,7 +22,7 @@ GIT_WORK_TREE=${repmgr_abs_srcdir}
GIT_DIR=${repmgr_abs_srcdir}/.git GIT_DIR=${repmgr_abs_srcdir}/.git
export GIT_DIR export GIT_DIR
export GIT_WORK_TREE export GIT_WORK_TREE
PG_LDFLAGS=-lcurl -ljson-c
include $(PGXS) include $(PGXS)
-include ${repmgr_abs_srcdir}/Makefile.custom -include ${repmgr_abs_srcdir}/Makefile.custom

View File

@@ -66,7 +66,7 @@ REPMGR_CLIENT_OBJS = repmgr-client.o \
repmgr-action-primary.o repmgr-action-standby.o repmgr-action-witness.o \ repmgr-action-primary.o repmgr-action-standby.o repmgr-action-witness.o \
repmgr-action-cluster.o repmgr-action-node.o repmgr-action-service.o repmgr-action-daemon.o \ repmgr-action-cluster.o repmgr-action-node.o repmgr-action-service.o repmgr-action-daemon.o \
configdata.o configfile.o configfile-scan.o log.o strutil.o controldata.o dirutil.o compat.o \ configdata.o configfile.o configfile-scan.o log.o strutil.o controldata.o dirutil.o compat.o \
dbutils.o sysutils.o dbutils.o sysutils.o pgbackupapi.o
REPMGRD_OBJS = repmgrd.o repmgrd-physical.o configdata.o configfile.o configfile-scan.o log.o \ REPMGRD_OBJS = repmgrd.o repmgrd-physical.o configdata.o configfile.o configfile-scan.o log.o \
dbutils.o strutil.o controldata.o compat.o sysutils.o dbutils.o strutil.o controldata.o compat.o sysutils.o

View File

@@ -291,6 +291,46 @@ struct ConfigFileSetting config_file_settings[] =
{}, {},
{} {}
}, },
/* pg_backupapi_backup_id*/
{
"pg_backupapi_backup_id",
CONFIG_STRING,
{ .strptr = config_file_options.pg_backupapi_backup_id },
{ .strdefault = "" },
{},
{ .strmaxlen = sizeof(config_file_options.pg_backupapi_backup_id) },
{}
},
/* pg_backupapi_host*/
{
"pg_backupapi_host",
CONFIG_STRING,
{ .strptr = config_file_options.pg_backupapi_host },
{ .strdefault = "" },
{},
{ .strmaxlen = sizeof(config_file_options.pg_backupapi_host) },
{}
},
/* pg_backupapi_node_name */
{
"pg_backupapi_node_name",
CONFIG_STRING,
{ .strptr = config_file_options.pg_backupapi_node_name },
{ .strdefault = "" },
{},
{ .strmaxlen = sizeof(config_file_options.pg_backupapi_node_name) },
{}
},
/* pg_backupapi_remote_ssh_command */
{
"pg_backupapi_remote_ssh_command",
CONFIG_STRING,
{ .strptr = config_file_options.pg_backupapi_remote_ssh_command },
{ .strdefault = "" },
{},
{ .strmaxlen = sizeof(config_file_options.pg_backupapi_remote_ssh_command) },
{}
},
/* ======================= /* =======================
* standby follow settings * standby follow settings

View File

@@ -164,6 +164,10 @@ typedef struct
char archive_cleanup_command[MAXLEN]; char archive_cleanup_command[MAXLEN];
bool use_primary_conninfo_password; bool use_primary_conninfo_password;
char passfile[MAXPGPATH]; char passfile[MAXPGPATH];
char pg_backupapi_backup_id[NAMEDATALEN];
char pg_backupapi_host[NAMEDATALEN];
char pg_backupapi_node_name[NAMEDATALEN];
char pg_backupapi_remote_ssh_command[MAXLEN];
/* standby promote settings */ /* standby promote settings */
int promote_check_timeout; int promote_check_timeout;

View File

@@ -49,5 +49,6 @@
#define ERR_NODE_STATUS 25 #define ERR_NODE_STATUS 25
#define ERR_REPMGRD_PAUSE 26 #define ERR_REPMGRD_PAUSE 26
#define ERR_REPMGRD_SERVICE 27 #define ERR_REPMGRD_SERVICE 27
#define ERR_PGBACKUPAPI_SERVICE 28
#endif /* _ERRCODE_H_ */ #endif /* _ERRCODE_H_ */

147
pgbackupapi.c Normal file
View File

@@ -0,0 +1,147 @@
/*
* pgbackupapi.c
* Copyright (c) EnterpriseDB Corporation, 2010-2021
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <string.h>
#include <curl/curl.h>
#include <json-c/json.h>
#include "repmgr.h"
#include "pgbackupapi.h"
size_t receive_operations_cb(void *content, size_t size, size_t nmemb, char *buffer) {
short int max_chars_to_copy = MAX_BUFFER_LENGTH -2;
short int i = 0;
int operation_length = 0;
json_object *value;
json_object *root = json_tokener_parse(content);
json_object *operations = json_object_object_get(root, "operations");
operation_length = strlen(json_object_get_string(operations));
if (operation_length < max_chars_to_copy) {
max_chars_to_copy = operation_length;
}
strncpy(buffer, json_object_get_string(operations), max_chars_to_copy);
fprintf(stdout, "Success! The following operations were found\n");
for (i=0; i<json_object_array_length(operations); i++) {
value = json_object_array_get_idx(operations, i);
printf("%s\n", json_object_get_string(value));
}
return size * nmemb;
}
char * define_base_url(operation_task *task) {
char *format = "http://%s:80/servers/%s/operations";
char *url = malloc(MAX_BUFFER_LENGTH);
snprintf(url, MAX_BUFFER_LENGTH-1, format, task->host, task->node_name);
//`url` is freed on the function that called this
return url;
}
CURLcode get_operations_on_server(CURL *curl, operation_task *task) {
char buffer[MAX_BUFFER_LENGTH];
char *url = define_base_url(task);
CURLcode ret;
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, receive_operations_cb);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &buffer);
curl_easy_setopt(curl, CURLOPT_URL, url);
ret = curl_easy_perform(curl);
free(url);
return ret;
}
size_t receive_operation_id(void *content, size_t size, size_t nmemb, char *buffer) {
json_object *root = json_tokener_parse(content);
json_object *operation = json_object_object_get(root, "operation_id");
if (operation != NULL) {
strncpy(buffer, json_object_get_string(operation), MAX_BUFFER_LENGTH-2);
}
return size * nmemb;
}
CURLcode create_new_task(CURL *curl, operation_task *task) {
PQExpBufferData payload;
char *url = define_base_url(task);
CURLcode ret;
json_object *root = json_object_new_object();
struct curl_slist *chunk = NULL;
json_object_object_add(root, "operation_type", json_object_new_string(task->operation_type));
json_object_object_add(root, "backup_id", json_object_new_string(task->backup_id));
json_object_object_add(root, "remote_ssh_command", json_object_new_string(task->remote_ssh_command));
json_object_object_add(root, "destination_directory", json_object_new_string(task->destination_directory));
initPQExpBuffer(&payload);
appendPQExpBufferStr(&payload, json_object_to_json_string(root));
chunk = curl_slist_append(chunk, "Content-type: application/json");
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, chunk);
curl_easy_setopt(curl, CURLOPT_URL, url);
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, payload.data);
curl_easy_setopt(curl, CURLOPT_FAILONERROR, 1L);
//curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, receive_operation_id);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, task->operation_id);
ret = curl_easy_perform(curl);
free(url);
termPQExpBuffer(&payload);
return ret;
}
size_t receive_operation_status(void *content, size_t size, size_t nmemb, char *buffer) {
json_object *root = json_tokener_parse(content);
json_object *status = json_object_object_get(root, "status");
if (status != NULL) {
strncpy(buffer, json_object_get_string(status), MAX_BUFFER_LENGTH-2);
}
else {
fprintf(stderr, "Incorrect reply received for that operation ID.\n");
strcpy(buffer, "\0");
}
return size * nmemb;
}
CURLcode get_status_of_operation(CURL *curl, operation_task *task) {
CURLcode ret;
char *url = define_base_url(task);
strcat(url, "/");
strcat(url, task->operation_id);
curl_easy_setopt(curl, CURLOPT_URL, url);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, receive_operation_status);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, task->operation_status);
ret = curl_easy_perform(curl);
free(url);
return ret;
}

46
pgbackupapi.h Normal file
View File

@@ -0,0 +1,46 @@
/*
* pgbackupapi.h
* Copyright (c) EnterpriseDB Corporation, 2010-2021
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <curl/curl.h>
#include <json-c/json.h>
typedef struct operation_task {
char *backup_id;
char *destination_directory;
char *operation_type;
char *operation_id;
char *operation_status;
char *remote_ssh_command;
char *host;
char *node_name;
} operation_task;
//Default simplebuffer size in most of operations
#define MAX_BUFFER_LENGTH 72
//Callbacks to send/receive data from pg-backup-api endpoints
size_t receive_operations_cb(void *content, size_t size, size_t nmemb, char *buffer);
size_t receive_operation_id(void *content, size_t size, size_t nmemb, char *buffer);
size_t receive_operation_status(void *content, size_t size, size_t nmemb, char *buffer);
//Functions that implement the logic and know what to do and how to comunnicate wuth the API
CURLcode get_operations_on_server(CURL *curl, operation_task *task);
CURLcode create_new_task(CURL *curl, operation_task *task);
CURLcode get_status_of_operation(CURL *curl, operation_task *task);
//Helper to make simpler to read the handler where we set the URL
char * define_base_url(operation_task *task);

View File

@@ -21,6 +21,7 @@
#include <sys/stat.h> #include <sys/stat.h>
#include <time.h> #include <time.h>
#include <unistd.h>
#include "repmgr.h" #include "repmgr.h"
#include "dirutil.h" #include "dirutil.h"
@@ -29,7 +30,7 @@
#include "repmgr-client-global.h" #include "repmgr-client-global.h"
#include "repmgr-action-standby.h" #include "repmgr-action-standby.h"
#include "pgbackupapi.h"
typedef struct TablespaceDataListCell typedef struct TablespaceDataListCell
{ {
@@ -113,6 +114,7 @@ static void check_recovery_type(PGconn *conn);
static void initialise_direct_clone(t_node_info *local_node_record, t_node_info *upstream_node_record); static void initialise_direct_clone(t_node_info *local_node_record, t_node_info *upstream_node_record);
static int run_basebackup(t_node_info *node_record); static int run_basebackup(t_node_info *node_record);
static int run_file_backup(t_node_info *node_record); static int run_file_backup(t_node_info *node_record);
static int run_pg_backupapi(t_node_info *node_record);
static void copy_configuration_files(bool delete_after_copy); static void copy_configuration_files(bool delete_after_copy);
@@ -687,19 +689,18 @@ do_standby_clone(void)
exit(SUCCESS); exit(SUCCESS);
} }
if (mode != barman)
{
initialise_direct_clone(&local_node_record, &upstream_node_record);
}
switch (mode) switch (mode)
{ {
case pg_basebackup: case pg_basebackup:
initialise_direct_clone(&local_node_record, &upstream_node_record);
log_notice(_("starting backup (using pg_basebackup)...")); log_notice(_("starting backup (using pg_basebackup)..."));
break; break;
case barman: case barman:
log_notice(_("retrieving backup from Barman...")); log_notice(_("retrieving backup from Barman..."));
break; break;
case pg_backupapi:
log_notice(_("starting backup (using pg_backupapi)..."));
break;
default: default:
/* should never reach here */ /* should never reach here */
log_error(_("unknown clone mode")); log_error(_("unknown clone mode"));
@@ -721,6 +722,9 @@ do_standby_clone(void)
case barman: case barman:
r = run_file_backup(&local_node_record); r = run_file_backup(&local_node_record);
break; break;
case pg_backupapi:
r = run_pg_backupapi(&local_node_record);
break;
default: default:
/* should never reach here */ /* should never reach here */
log_error(_("unknown clone mode")); log_error(_("unknown clone mode"));
@@ -814,7 +818,6 @@ do_standby_clone(void)
} }
/* Write the recovery.conf file */ /* Write the recovery.conf file */
if (create_recovery_file(&local_node_record, if (create_recovery_file(&local_node_record,
&recovery_conninfo, &recovery_conninfo,
source_server_version_num, source_server_version_num,
@@ -846,6 +849,9 @@ do_standby_clone(void)
case barman: case barman:
log_notice(_("standby clone (from Barman) complete")); log_notice(_("standby clone (from Barman) complete"));
break; break;
case pg_backupapi:
log_notice(_("standby clone (from pg_backupapi) complete"));
break;
} }
/* /*
@@ -937,6 +943,9 @@ do_standby_clone(void)
case barman: case barman:
appendPQExpBufferStr(&event_details, "barman"); appendPQExpBufferStr(&event_details, "barman");
break; break;
case pg_backupapi:
appendPQExpBufferStr(&event_details, "pg_backupapi");
break;
} }
appendPQExpBuffer(&event_details, appendPQExpBuffer(&event_details,
@@ -7770,6 +7779,86 @@ stop_backup:
} }
/*
* Perform a call to pg_backupapi endpoint to ask barman to write the backup
* for us. This will ensure that no matter the format on-disk of new backups,
* barman will always find a way how to read and write them.
* From repmgr 4 this is only used for Barman backups.
*/
static int
run_pg_backupapi(t_node_info *local_node_record)
{
int r = ERR_PGBACKUPAPI_SERVICE;
long http_return_code = 0;
short seconds_to_sleep = 3;
operation_task *task = malloc(sizeof(operation_task));
CURL *curl = curl_easy_init();
CURLcode ret;
task->host = malloc(strlen(config_file_options.pg_backupapi_host)+1);
task->remote_ssh_command = malloc(strlen(config_file_options.pg_backupapi_remote_ssh_command)+1);
task->node_name = malloc(strlen(config_file_options.pg_backupapi_node_name)+1);
task->operation_type = malloc(strlen(DEFAULT_STANDBY_PG_BACKUPAPI_OP_TYPE)+1);
task->backup_id = malloc(strlen(config_file_options.pg_backupapi_backup_id)+1);
task->destination_directory = malloc(strlen(local_data_directory)+1);
task->operation_id = malloc(MAX_BUFFER_LENGTH);
task->operation_status = malloc(MAX_BUFFER_LENGTH);
strcpy(task->host, config_file_options.pg_backupapi_host);
strcpy(task->remote_ssh_command, config_file_options.pg_backupapi_remote_ssh_command);
strcpy(task->node_name, config_file_options.pg_backupapi_node_name);
strcpy(task->operation_type, DEFAULT_STANDBY_PG_BACKUPAPI_OP_TYPE);
strcpy(task->backup_id, config_file_options.pg_backupapi_backup_id);
strcpy(task->destination_directory, local_data_directory);
strcpy(task->operation_id, "\0");
ret = create_new_task(curl, task);
if ((ret != CURLE_OK) || (strlen(task->operation_id) == 0)) {
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &http_return_code);
if (499 > http_return_code && http_return_code >= 400) {
log_error("Cannot find backup '%s' for node '%s'.", task->backup_id, task->node_name);
} else {
log_error("whilst reaching out pg_backup service: %s\n", curl_easy_strerror(ret));
}
}
else
{
log_info("Success creating the task: operation id '%s'", task->operation_id);
//We call init again because previous call included POST calls
curl_easy_cleanup(curl);
curl = curl_easy_init();
while (true)
{
ret = get_status_of_operation(curl, task);
if (strlen(task->operation_status) == 0) {
log_info("Retrying...");
}
else
{
log_info("status %s", task->operation_status);
}
if (strcmp(task->operation_status, "FAILED") == 0) {
break;
}
if (strcmp(task->operation_status, "DONE") == 0) {
r = SUCCESS;
break;
}
sleep(seconds_to_sleep);
}
}
curl_easy_cleanup(curl);
free(task);
return r;
}
static char * static char *
make_barman_ssh_command(char *buf) make_barman_ssh_command(char *buf)
{ {

View File

@@ -193,7 +193,8 @@ typedef struct
typedef enum typedef enum
{ {
barman, barman,
pg_basebackup pg_basebackup,
pg_backupapi
} standy_clone_mode; } standy_clone_mode;
typedef enum typedef enum

View File

@@ -3096,9 +3096,14 @@ get_standby_clone_mode(void)
if (*config_file_options.barman_host != '\0' && runtime_options.without_barman == false) if (*config_file_options.barman_host != '\0' && runtime_options.without_barman == false)
mode = barman; mode = barman;
else else {
mode = pg_basebackup; if (*config_file_options.pg_backupapi_host != '\0') {
log_info("Attempting to use `pg_backupapi` new restore mode");
mode = pg_backupapi;
}
else
mode = pg_basebackup;
}
return mode; return mode;
} }

View File

@@ -116,6 +116,7 @@
#define DEFAULT_STANDBY_FOLLOW_TIMEOUT 30 /* seconds */ #define DEFAULT_STANDBY_FOLLOW_TIMEOUT 30 /* seconds */
#define DEFAULT_STANDBY_FOLLOW_RESTART false #define DEFAULT_STANDBY_FOLLOW_RESTART false
#define DEFAULT_SHUTDOWN_CHECK_TIMEOUT 60 /* seconds */ #define DEFAULT_SHUTDOWN_CHECK_TIMEOUT 60 /* seconds */
#define DEFAULT_STANDBY_PG_BACKUPAPI_OP_TYPE "recovery"
#define DEFAULT_STANDBY_RECONNECT_TIMEOUT 60 /* seconds */ #define DEFAULT_STANDBY_RECONNECT_TIMEOUT 60 /* seconds */
#define DEFAULT_NODE_REJOIN_TIMEOUT 60 /* seconds */ #define DEFAULT_NODE_REJOIN_TIMEOUT 60 /* seconds */
#define DEFAULT_ARCHIVE_READY_WARNING 16 /* WAL files */ #define DEFAULT_ARCHIVE_READY_WARNING 16 /* WAL files */