Add dns_cache for server addresses as in pgbouncer (#249)

* Add dns_cache so server addresses are cached and invalidated when DNS changes.

Adds a module to deal with dns_cache feature. It's
main struct is CachedResolver, which is a simple thread safe
hostname <-> Ips cache with the ability to refresh resolutions
every `dns_max_ttl` seconds. This way, a client can check whether its
ip address has changed.

* Allow reloading dns cached

* Add documentation for dns_cached
This commit is contained in:
Jose Fernández
2023-05-02 10:26:40 +02:00
committed by GitHub
parent 3601130ba1
commit 7dfbd993f2
10 changed files with 794 additions and 3 deletions

View File

@@ -7,6 +7,7 @@ use parking_lot::{Mutex, RwLock};
use postgres_protocol::message;
use std::collections::HashMap;
use std::io::Read;
use std::net::IpAddr;
use std::sync::Arc;
use std::time::SystemTime;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, BufStream};
@@ -16,6 +17,7 @@ use tokio_rustls::{client::TlsStream, TlsConnector};
use crate::config::{get_config, Address, User};
use crate::constants::*;
use crate::dns_cache::{AddrSet, CACHED_RESOLVER};
use crate::errors::{Error, ServerIdentifier};
use crate::messages::*;
use crate::mirrors::MirroringManager;
@@ -148,6 +150,9 @@ pub struct Server {
last_activity: SystemTime,
mirror_manager: Option<MirroringManager>,
// Associated addresses used
addr_set: Option<AddrSet>,
}
impl Server {
@@ -161,6 +166,24 @@ impl Server {
stats: Arc<ServerStats>,
auth_hash: Arc<RwLock<Option<String>>>,
) -> Result<Server, Error> {
let cached_resolver = CACHED_RESOLVER.load();
let mut addr_set: Option<AddrSet> = None;
// If we are caching addresses and hostname is not an IP
if cached_resolver.enabled() && address.host.parse::<IpAddr>().is_err() {
debug!("Resolving {}", &address.host);
addr_set = match cached_resolver.lookup_ip(&address.host).await {
Ok(ok) => {
debug!("Obtained: {:?}", ok);
Some(ok)
}
Err(err) => {
warn!("Error trying to resolve {}, ({:?})", &address.host, err);
None
}
}
};
let mut stream =
match TcpStream::connect(&format!("{}:{}", &address.host, address.port)).await {
Ok(stream) => stream,
@@ -609,6 +632,7 @@ impl Server {
bad: false,
needs_cleanup: false,
client_server_map,
addr_set,
connected_at: chrono::offset::Utc::now().naive_utc(),
stats,
application_name: String::new(),
@@ -849,7 +873,23 @@ impl Server {
/// Server & client are out of sync, we must discard this connection.
/// This happens with clients that misbehave.
pub fn is_bad(&self) -> bool {
self.bad
if self.bad {
return self.bad;
};
let cached_resolver = CACHED_RESOLVER.load();
if cached_resolver.enabled() {
if let Some(addr_set) = &self.addr_set {
if cached_resolver.has_changed(self.address.host.as_str(), addr_set) {
warn!(
"DNS changed for {}, it was {:?}. Dropping server connection.",
self.address.host.as_str(),
addr_set
);
return true;
}
}
}
false
}
/// Get server startup information to forward it to the client.