"""Builder and associated Classes
This module provides the `Builder` class to construct Elasticsearch client connections
using configuration from dictionaries or YAML files.
Classes:
Builder: Constructs an Elasticsearch client with validated configuration.
SecretStore: Securely stores sensitive fields using Fernet encryption.
"""
# pylint: disable=C0415,R0902,R0913,R0917
import typing as t
import logging
import warnings
from dotmap import DotMap # type: ignore
from cryptography.fernet import Fernet
import tiered_debug as debug
from elastic_transport import ObjectApiResponse
import elasticsearch8
from .debug import debug, begin_end
from .defaults import (
VERSION_MIN,
VERSION_MAX,
CLIENT_SETTINGS,
OTHER_SETTINGS,
ES_DEFAULT,
)
from .exceptions import ConfigurationError, ESClientException, NotMaster
from .schemacheck import password_filter
from .utils import (
check_config,
ensure_list,
file_exists,
get_version,
get_yaml,
parse_apikey_token,
prune_nones,
verify_ssl_paths,
verify_url_schema,
)
logger = logging.getLogger(__name__)
# Error message constants
INVALID_HOST_SCHEMA = "Invalid host schema: {host}"
MUST_PROVIDE_BOTH_AUTH = "Must populate both username and password, or neither"
MUST_PROVIDE_BOTH_API_KEY = "Must populate both id and api_key, or neither"
HOSTS_AND_CLOUD_ID_CONFLICT = 'Cannot populate both "hosts" and "cloud_id"'
MULTIPLE_HOSTS_MASTER_ONLY = (
'"master_only" cannot be True if multiple hosts are specified. Hosts = {hosts}'
)
NOT_MASTER_NODE = (
"master_only is True, but the client is connected to a non-master node."
)
UNSUPPORTED_VERSION = "Elasticsearch version {version} not supported"
FILE_NOT_FOUND = '"{key}: {path}" File not found!'
[docs]
class SecretStore:
"""
Securely stores secrets using Fernet encryption.
Args:
key (bytes, optional): Fernet key for encryption. If None, generates a key.
Example:
>>> store = SecretStore()
>>> store.store_secret("api_key", ("id", "key"))
>>> store.get_secret("api_key")
('id', 'key')
"""
[docs]
def __init__(self, key: t.Optional[bytes] = None):
self._fernet = Fernet(key or Fernet.generate_key())
self._secrets: t.Dict[str, bytes] = {}
[docs]
def store_secret(self, name: str, value: t.Any) -> None:
"""Encrypt and store a secret."""
import json
serialized = json.dumps(value).encode()
self._secrets[name] = self._fernet.encrypt(serialized)
[docs]
def get_secret(self, name: str) -> t.Any:
"""Decrypt and return a secret, or None if not found."""
import json
if name in self._secrets:
decrypted = self._fernet.decrypt(self._secrets[name]).decode()
return json.loads(decrypted)
return None
def __repr__(self) -> str:
"""Return a safe string representation."""
return f"<SecretStore with {len(self._secrets)} secrets>"
[docs]
class Builder:
"""
Constructs an Elasticsearch client connection from configuration.
The `Builder` class processes configuration from a dictionary or YAML file,
validates it, and creates an :class:`~elasticsearch8.Elasticsearch` client.
It supports automatic connection and version checking, with options for
master-only connections. Sensitive fields are stored securely in a
:class:`SecretStore`.
Parameters:
configdict (dict, optional): Configuration dictionary with an 'elasticsearch'
key containing 'client' and 'other_settings' subkeys. Defaults to None.
configfile (str, optional): Path to a YAML file with the same structure as
configdict. Defaults to None.
autoconnect (bool, optional): Connect to client automatically. Defaults
to False.
version_min (tuple, optional): Minimum Elasticsearch version as (major, minor,
patch). Defaults to :const:`~es_client.defaults.VERSION_MIN`.
version_max (tuple, optional): Maximum Elasticsearch version as
(major, minor, patch). Defaults to :const:`~es_client.defaults.VERSION_MAX`.
Attributes:
attributes (DotMap): Storage for configuration and settings.
client (:class:`~elasticsearch8.Elasticsearch`): The Elasticsearch client
connection.
_secrets (:class:`SecretStore`): Secure storage for sensitive fields.
Raises:
:exc:`~es_client.exceptions.ConfigurationError`: If configuration is invalid,
such as an invalid host schema (checked during initialization).
:exc:`~es_client.exceptions.ESClientException`: If connection to Elasticsearch
fails.
:exc:`~es_client.exceptions.NotMaster`: If `master_only` is True and connected
node is not the master.
Examples:
>>> config = {'elasticsearch': {'client': {'hosts': ['http://localhost:9200']}}}
>>> builder = Builder(configdict=config)
>>> builder.client_args.hosts
['http://localhost:9200']
>>> builder.master_only = True
>>> builder.master_only
True
>>> cfg = {'elasticsearch': {'client': {'hosts': ['ftp://invalid']}}})
>>> Builder(configdict=cfg)
Traceback (most recent call last):
...
es_client.exceptions.ConfigurationError: Invalid host schema: ftp://invalid
"""
[docs]
def __init__(
self,
configdict: t.Union[t.Dict, None] = None,
configfile: t.Union[str, None] = None,
autoconnect: bool = False,
version_min: t.Tuple = VERSION_MIN,
version_max: t.Tuple = VERSION_MAX,
):
debug.lv2('Initializing Builder object...')
self.attributes = DotMap()
self.config = DotMap()
self.config.client = DotMap()
self.config.client.hosts = []
self.config.other_settings = DotMap()
self._secrets = SecretStore()
self.set_client_defaults()
self.set_other_defaults()
self.client = elasticsearch8.Elasticsearch(hosts="http://127.0.0.1:9200")
self.process_config_opts(configdict, configfile)
# Validate host schemas immediately
if self.config.client.get("hosts"):
verified_hosts = []
for host in ensure_list(self.config.client["hosts"]):
try:
debug.lv4(f'TRY: validate host {host}')
verified_hosts.append(verify_url_schema(host))
except ConfigurationError as exc:
logger.critical(INVALID_HOST_SCHEMA.format(host=host))
debug.lv3('Exiting method, raising exception')
debug.lv5(f'Exception = "{exc}"')
raise ConfigurationError(
INVALID_HOST_SCHEMA.format(host=host)
) from exc
self.config.client["hosts"] = verified_hosts
self.version_max = version_max
self.version_min = version_min
self.update_config()
self.validate()
if autoconnect:
self.connect()
self.test_connection()
debug.lv3('Builder object initialized')
def __repr__(self) -> str:
"""
Return a string representation of the Builder instance.
Returns:
str: A string describing the Builder's configuration and client status.
Example:
>>> config = {
... 'elasticsearch': {
... 'client': {
... 'hosts': ['http://localhost:9200'],
... 'cloud_id': 'my_cloud_id'
... }
... }
... }
>>> builder = Builder(configdict=config)
>>> repr(builder) # doctest: +ELLIPSIS
"Builder(hosts=['http://localhost:9200'], master_only=False,
version_min=(8, 0, 0), cloud_id='my_cloud_id', ...)"
>>> config = {
'elasticsearch': {
'client': {'hosts': ['http://localhost:9200']}
}
}
>>> builder = Builder(configdict=config)
>>> repr(builder) # doctest: +ELLIPSIS
"Builder(hosts=['http://localhost:9200'], master_only=False,
version_min=(8, 0, 0), ...)"
"""
hosts = self.client_args.hosts or ['None']
base = (
f"Builder(hosts={hosts}, master_only={self.master_only}, "
f"version_min={self.version_min}"
)
if self.client_args.cloud_id:
base += f", cloud_id='{self.client_args.cloud_id}'"
base += f", client={self.client})"
return base
@property
def master_only(self) -> bool:
"""
Get or set whether to connect only to the elected master node.
Returns:
bool: True if only the master node is allowed, False otherwise.
Example:
>>> builder = Builder()
>>> builder.master_only = True
>>> builder.master_only
True
"""
return self.attributes.master_only
@master_only.setter
def master_only(self, value: bool) -> None:
self.attributes.master_only = value
@property
def is_master(self) -> bool:
"""
Get or set whether the connected node is the elected master.
Returns:
bool: True if the connected node is the master, False otherwise.
Example:
>>> builder = Builder()
>>> builder.is_master = False
>>> builder.is_master
False
"""
return self.attributes.is_master
@is_master.setter
def is_master(self, value: bool) -> None:
self.attributes.is_master = value
@property
def config(self) -> DotMap:
"""
Get or set the configuration settings from configfile or configdict.
Returns:
DotMap: Configuration settings.
Example:
>>> config = {
... 'elasticsearch': {
... 'client': {'hosts': ['http://localhost:9200']}
... }
... }
>>> builder = Builder(configdict=config)
>>> builder.config.client.hosts
['http://localhost:9200']
"""
return self.attributes.config
@config.setter
def config(self, value: t.Dict) -> None:
self.attributes.config = DotMap(value)
@property
def client_args(self) -> DotMap:
"""
Get or set the client settings.
Returns:
DotMap: Client configuration settings.
Example:
>>> builder = Builder()
>>> builder.client_args.hosts = ['http://localhost:9200']
>>> builder.client_args.hosts
['http://localhost:9200']
"""
return self.attributes.client_args
@client_args.setter
def client_args(self, value: t.Dict) -> None:
self.attributes.client_args = DotMap(value)
@property
def other_args(self) -> DotMap:
"""
Get or set the other settings.
Returns:
DotMap: Other configuration settings.
Example:
>>> builder = Builder()
>>> builder.other_args.master_only = True
>>> builder.other_args.master_only
True
"""
return self.attributes.other_args
@other_args.setter
def other_args(self, value: t.Dict) -> None:
self.attributes.other_args = DotMap(value)
@property
def skip_version_test(self) -> bool:
"""
Get or set whether to skip version compatibility tests.
Returns:
bool: True if version tests are skipped, False otherwise.
Example:
>>> builder = Builder()
>>> builder.skip_version_test = True
>>> builder.skip_version_test
True
"""
return self.attributes.skip_version_test
@skip_version_test.setter
def skip_version_test(self, value: bool) -> None:
self.attributes.skip_version_test = value
@property
def version_min(self) -> t.Tuple:
"""
Get or set the minimum acceptable Elasticsearch version.
Returns:
tuple: Minimum version as (major, minor, patch).
Example:
>>> builder = Builder()
>>> builder.version_min
(8, 0, 0)
"""
return self.attributes.version_min
@version_min.setter
def version_min(self, value: t.Tuple) -> None:
self.attributes.version_min = value
@property
def version_max(self) -> t.Tuple:
"""
Get or set the maximum acceptable Elasticsearch version.
Returns:
tuple: Maximum version as (major, minor, patch).
Example:
>>> builder = Builder()
>>> builder.version_max
(8, 99, 99)
"""
return self.attributes.version_max
@version_max.setter
def version_max(self, value: t.Tuple) -> None:
self.attributes.version_max = value
[docs]
@begin_end()
def set_client_defaults(self) -> None:
"""
Set default values for client_args.
Initializes client_args with None for all keys in
:const:`~es_client.defaults.CLIENT_SETTINGS`.
"""
self.client_args = DotMap()
for key in CLIENT_SETTINGS:
self.client_args[key] = None
[docs]
@begin_end()
def set_other_defaults(self) -> None:
"""
Set default values for other_args.
Initializes other_args with None for all keys in
:const:`~es_client.defaults.OTHER_SETTINGS`.
"""
self.other_args = DotMap()
for key in OTHER_SETTINGS:
self.other_args[key] = None
[docs]
@begin_end()
def process_config_opts(
self, configdict: t.Union[t.Dict, None], configfile: t.Union[str, None]
) -> None:
"""
Process configuration from configdict or configfile.
Args:
configdict (dict, optional): Configuration dictionary with an
'elasticsearch' key containing 'client' and 'other_settings' subkeys.
configfile (str, optional): Path to a YAML file with the same structure as
configdict.
Prioritizes configdict over configfile. If neither is provided, uses
:const:`~es_client.defaults.ES_DEFAULT`, which sets hosts to
'http://127.0.0.1:9200'.
Example:
>>> builder = Builder()
>>> builder.config.client.hosts
['http://127.0.0.1:9200']
"""
if configfile:
debug.lv2(f'Using values from file: {configfile}')
self.config = check_config(get_yaml(configfile))
elif configdict:
debug.lv2(f'Using values from dict: {password_filter(configdict)}')
self.config = check_config(configdict)
else:
debug.lv2("No configuration provided. Using ES_DEFAULT.")
self.config = check_config(ES_DEFAULT["elasticsearch"])
[docs]
@begin_end()
def update_config(self) -> None:
"""
Update object with configuration values.
Applies settings from config to client_args and other_args, moves sensitive
fields to a secure store, and sets master_only and skip_version_test.
"""
self.client_args.update(self.config.client)
self.other_args.update(self.config.other_settings)
# Move sensitive fields to SecretStore
sensitive_fields = ['basic_auth', 'api_key', 'bearer_auth']
for field in sensitive_fields:
# We are checking in client_args in case someone has manually passed
# something. We build basic_auth from user/pass, and api_key comes
# from self.other_args. In other words, this is a failsafe.
if field in self.client_args and self.client_args[field] is not None:
self._secrets.store_secret(field, self.client_args[field])
self.client_args[field] = None
self.config.client[field] = None
if 'password' in self.other_args and self.other_args.password is not None:
self._secrets.store_secret('password', self.other_args.password)
self.other_args.password = None
self.config.other_settings.password = None
self.master_only = self.other_args.master_only
self.is_master = False
if "skip_version_test" in self.other_args:
self.skip_version_test = self.other_args.skip_version_test
else:
self.skip_version_test = False
[docs]
@begin_end()
def validate(self) -> None:
"""
Validate configuration settings.
Checks basic auth, API key, cloud ID, and SSL settings. Host schemas are
validated in :meth:`__init__` using :func:`~es_client.utils.verify_url_schema`.
Issues warnings for experimental options like ssl_version.
Raises:
:exc:`~es_client.exceptions.ConfigurationError`: If validation fails, such
as missing authentication credentials or invalid cloud ID.
Example:
>>> config = {'elasticsearch': {'client': {'ssl_version': 'TLSv1'}}}
>>> builder = Builder(configdict=config) # doctest: +ELLIPSIS
... # Warning: ssl_version is experimental; use ssl_context instead
"""
if self.client_args.ssl_version:
warnings.warn(
"ssl_version is experimental; use ssl_context instead",
DeprecationWarning,
stacklevel=2,
)
if self.client_args.ssl_assert_fingerprint:
warnings.warn(
"ssl_assert_fingerprint is experimental on CPython 3.10+; "
"use ssl_context instead",
DeprecationWarning,
stacklevel=2,
)
self._check_basic_auth()
self._check_api_key()
self._check_cloud_id()
self._check_ssl()
[docs]
@begin_end()
def connect(self) -> None:
"""
Establish connection to Elasticsearch.
Performs post-connection checks for version and master status using
:meth:`~Builder._check_version` and :meth:`~Builder._find_master`.
Raises:
:exc:`~es_client.exceptions.NotMaster`: If master_only is True and node is
not master.
:exc:`~es_client.exceptions.ESClientException`: If version is incompatible.
"""
self._get_client()
self._check_version()
if self.master_only:
self._check_multiple_hosts()
self._find_master()
self._check_if_master()
@begin_end()
def _check_basic_auth(self) -> None:
"""
Validate and set basic authentication credentials.
Creates basic_auth tuple from username and password if both are provided.
Raises:
:exc:`~es_client.exceptions.ConfigurationError`: If only one of username or
password is provided.
"""
if "username" in self.other_args or "password" in self.other_args:
usr = self.other_args.username if "username" in self.other_args else None
pwd = self._secrets.get_secret('password')
if usr is None and pwd is None:
pass
elif usr is None or pwd is None:
debug.lv3('Exiting method, raising exception')
debug.lv5(f'Exception = "{MUST_PROVIDE_BOTH_AUTH}"')
raise ConfigurationError(MUST_PROVIDE_BOTH_AUTH)
else:
self._secrets.store_secret('basic_auth', (usr, pwd))
@begin_end()
def _check_api_key(self) -> None:
"""
Validate and set API key credentials.
Processes API key from a token or id/api_key pair in other_args.api_key.
Token takes precedence over id and api_key.
Raises:
:exc:`~es_client.exceptions.ConfigurationError`: If id or api_key is
missing when required.
Example:
>>> builder = Builder()
>>> builder.other_args.api_key = {'id': 'test_id', 'api_key': 'test_key'}
>>> builder._check_api_key()
>>> builder._secrets.get_secret('api_key')
('test_id', 'test_key')
"""
if "api_key" not in self.other_args:
return
api_key_config = DotMap(self.other_args.api_key)
if api_key_config.get("token"):
api_id, api_key = parse_apikey_token(api_key_config.token)
self._secrets.store_secret('api_key', (api_id, api_key))
# Clean up sensitive fields from places they could still exist
self.other_args.api_key.token = None
self.config.other_settings.api_key.token = None
return
api_id = api_key_config.get("id")
api_key = api_key_config.get("api_key")
if api_id is None and api_key is None:
self._secrets.store_secret('api_key', None)
elif api_id is None or api_key is None:
raise ConfigurationError(MUST_PROVIDE_BOTH_API_KEY)
else:
self._secrets.store_secret('api_key', (api_id, api_key))
# Clean up sensitive fields from places they could still exist
self.other_args.api_key.id = None
self.config.other_settings.api_key.id = None
self.other_args.api_key.api_key = None
self.config.other_settings.api_key.api_key = None
@begin_end()
def _check_cloud_id(self) -> None:
"""
Validate cloud_id configuration.
Removes hosts if cloud_id is provided, as they are mutually exclusive.
Raises:
:exc:`~es_client.exceptions.ConfigurationError`: If both hosts and cloud_id
are specified.
"""
if "cloud_id" in self.client_args and self.client_args.cloud_id is not None:
if (
self.client_args.hosts == ["http://127.0.0.1:9200"]
and len(self.client_args.hosts) == 1
):
self.client_args.hosts = None
if self.client_args.hosts is not None:
debug.lv3('Exiting method, raising exception')
logger.error(HOSTS_AND_CLOUD_ID_CONFLICT)
raise ConfigurationError(HOSTS_AND_CLOUD_ID_CONFLICT)
@begin_end()
def _check_ssl(self) -> None:
"""
Validate SSL configuration.
Uses certifi for HTTPS if ca_certs is not specified. Checks file existence with
:func:`~es_client.utils.file_exists`.
Raises:
:exc:`~es_client.exceptions.ConfigurationError`: If SSL certificate or key
files are not found.
"""
verify_ssl_paths(self.client_args)
if "cloud_id" in self.client_args and self.client_args.cloud_id is not None:
scheme = "https"
elif self.client_args.hosts is None:
scheme = None
else:
scheme = self.client_args.hosts[0].split(":")[0].lower()
if scheme == "https":
if "ca_certs" not in self.client_args or not self.client_args.ca_certs:
import certifi
self.client_args.ca_certs = certifi.where()
else:
keylist = ["ca_certs", "client_cert", "client_key"]
for key in keylist:
if key in self.client_args and self.client_args[key]:
if not file_exists(self.client_args[key]):
msg = FILE_NOT_FOUND.format(
key=key, path=self.client_args[key]
)
logger.critical(msg)
debug.lv3('Exiting method, raising exception')
debug.lv5(f'Exception = "{msg}"')
raise ConfigurationError(msg)
@begin_end()
def _find_master(self) -> None:
"""
Check if the connected node is the elected master.
Sets is_master based on node ID comparison using
:meth:`~elasticsearch8.Elasticsearch.nodes.info`.
"""
my_node_id = list(self.client.nodes.info(node_id="_local")["nodes"])[0]
master_node_id = self.client.cluster.state(metric="master_node")["master_node"]
self.is_master = my_node_id == master_node_id
@begin_end()
def _check_multiple_hosts(self) -> None:
"""
Validate host count for master_only setting.
Raises:
:exc:`~es_client.exceptions.ConfigurationError`: If master_only is True and
multiple hosts are specified.
"""
if "hosts" in self.client_args and isinstance(self.client_args.hosts, list):
if len(self.client_args.hosts) > 1:
debug.lv3('Exiting method, raising exception')
hosts = self.client_args.hosts
msg = MULTIPLE_HOSTS_MASTER_ONLY.format(hosts=hosts)
logger.error(msg)
raise ConfigurationError(msg)
@begin_end()
def _check_if_master(self) -> None:
"""
Verify if connected node is the master when master_only is True.
Raises:
:exc:`~es_client.exceptions.NotMaster`: If connected node is not the master.
"""
if not self.is_master:
debug.lv3('Exiting method, raising exception')
logger.error(NOT_MASTER_NODE)
raise NotMaster(NOT_MASTER_NODE)
@begin_end()
def _check_version(self) -> None:
"""
Verify Elasticsearch version compatibility.
Compares cluster version against version_min and version_max using
:func:`~es_client.utils.get_version`.
Raises:
:exc:`~es_client.exceptions.ESClientException`: If version is outside
acceptable range.
"""
v = get_version(self.client)
if self.skip_version_test:
logger.warning("Skipping Elasticsearch version checks")
else:
debug.lv2(f'Version detected: {".".join(map(str, v))}')
if v >= self.version_max or v < self.version_min:
msg = UNSUPPORTED_VERSION.format(version='.'.join(map(str, v)))
debug.lv3('Exiting method, raising exception')
logger.error(msg)
raise ESClientException(msg)
@begin_end()
def _get_client(self) -> None:
"""
Instantiate the :class:`~elasticsearch8.Elasticsearch` client.
Creates client with pruned configuration arguments using
:func:`~es_client.utils.prune_nones`, including sensitive fields from
the secure store.
"""
client_args = prune_nones(self.client_args.toDict())
# Add sensitive fields from SecretStore
for field in ['basic_auth', 'api_key', 'bearer_auth']:
secret = self._secrets.get_secret(field)
if secret is not None and field == 'basic_auth':
client_args[field] = tuple(secret)
elif secret is not None:
client_args[field] = secret
self.client = elasticsearch8.Elasticsearch(**client_args)
[docs]
@begin_end()
def test_connection(self) -> ObjectApiResponse[t.Any]:
"""
Test the Elasticsearch connection.
Executes :meth:`~elasticsearch8.Elasticsearch.info` to verify connectivity.
Returns:
:class:`~elastic_transport.ObjectApiResponse`: Response from Elasticsearch
info API.
"""
retval = self.client.info()
debug.lv5(f'Return value = "{retval}"')
return retval