Source code for es_client.schemacheck

"""Schema validation and redaction utilities

This module provides the :class:`SchemaCheck` class to validate configuration
dictionaries against a :class:`voluptuous.Schema` and a function to redact sensitive
data for secure logging. It supports configuration validation for
:class:`~es_client.builder.Builder` and logging in :mod:`~es_client.logging`.

Classes:
    SchemaCheck: Validates a configuration dictionary against a schema.

Functions:
    password_filter: Redact sensitive values from a configuration dictionary.
"""

# pylint: disable=E1101,W0718
import typing as t
import logging
from re import sub
from copy import deepcopy
from voluptuous import Schema
from .debug import debug, begin_end
from .defaults import KEYS_TO_REDACT
from .exceptions import FailedValidation

logger = logging.getLogger(__name__)


[docs] def password_filter(data: t.Dict) -> t.Dict: """ Redact sensitive values from a configuration dictionary. Args: data (dict): Configuration dictionary to process. Returns: dict: A deep copy of `data` with sensitive values (keys in :data:`~es_client.defaults.KEYS_TO_REDACT`) replaced with 'REDACTED'. Recursively traverses `data`, replacing values of keys listed in :data:`~es_client.defaults.KEYS_TO_REDACT` (e.g., 'password', 'api_key') with 'REDACTED' for secure logging. Example: >>> data = {'user': 'test', 'password': 'secret', 'nested': {'api_key': 'key'}} >>> filtered = password_filter(data) >>> filtered {'user': 'test', 'password': 'REDACTED', 'nested': {'api_key': 'REDACTED'}} >>> data['password'] # Original unchanged 'secret' """ def iterdict(mydict): for key, value in mydict.items(): if isinstance(value, dict): iterdict(value) elif key in KEYS_TO_REDACT: mydict.update({key: "REDACTED"}) return mydict return iterdict(deepcopy(data))
[docs] class SchemaCheck: """ Validate a configuration dictionary against a voluptuous schema. Args: config (dict): Configuration dictionary to validate. schema (:class:`voluptuous.Schema`): Schema to validate against. test_what (str): Description of the configuration block (e.g., 'Client Config'). location (str): Context of the configuration (e.g., 'elasticsearch.client'). Attributes: config (dict): The configuration dictionary. schema (:class:`voluptuous.Schema`): The validation schema. test_what (str): Description of the configuration block. location (str): Context of the configuration. badvalue (str): Invalid value causing validation failure, or 'no bad value yet'. error (str): Validation error message, or 'No error yet'. Raises: :exc:`~es_client.exceptions.FailedValidation`: If validation fails. Example: >>> from voluptuous import Schema >>> schema = Schema({'host': str}) >>> config = {'host': 'localhost'} >>> check = SchemaCheck(config, schema, 'Test Config', 'test') >>> check.result() {'host': 'localhost'} >>> config = {'host': 123} >>> check = SchemaCheck(config, schema, 'Test Config', 'test') >>> check.result() Traceback (most recent call last): ... es_client.exceptions.FailedValidation: Configuration: Test Config: Location: test: Bad Value: "123", expected str @ data['host']. Check configuration file. """
[docs] def __init__(self, config: t.Dict, schema: Schema, test_what: str, location: str): debug.lv2('Starting function...') debug.lv5(f'Schema: {schema}') if isinstance(config, dict): debug.lv5(f'"{test_what} config: {password_filter(config)}"') else: debug.lv5(f'"{test_what} config: {config}"') self.config = config self.schema = schema self.test_what = test_what self.location = location self.badvalue = "no bad value yet" self.error = "No error yet"
[docs] @begin_end() def parse_error(self) -> t.Any: """ Extract and report the invalid value causing a validation error. Attempts to parse the error message to identify the bad value, updating :attr:`badvalue`. Logs errors if parsing fails. Returns: None: Updates :attr:`badvalue` and logs the result. Example: >>> from voluptuous import Schema >>> schema = Schema({'host': str}) >>> config = {'host': 123} >>> check = SchemaCheck(config, schema, 'Test Config', 'test') >>> try: ... check.result() ... except FailedValidation: ... check.badvalue '123' """ def get_badvalue(data_string, data): debug.lv5('Starting nested function...') elements = sub(r"[\'\]]", "", data_string).split("[") elements.pop(0) # Remove 'data' prefix value = None for k in elements: try: debug.lv4('TRY: parsing key') key = int(k) except ValueError: key = k if value is None: value = data[key] debug.lv5(f'Exiting nested function, returning {value}') return value try: debug.lv4('TRY: parsing error') self.badvalue = get_badvalue(str(self.error).split()[-1], self.config) except Exception as exc: logger.error(f'Unable to extract value: {exc}') self.badvalue = "(could not determine)"
[docs] @begin_end() def result(self) -> t.Any: """ Validate the configuration and return the result. Returns: :class:`voluptuous.Schema`: Validated configuration from :attr:`config` if successful. Raises: :exc:`~es_client.exceptions.FailedValidation`: If validation fails, including error details and bad value. Calls :meth:`parse_error` to extract the invalid value if validation fails. Example: >>> from voluptuous import Schema >>> schema = Schema({'host': str}) >>> config = {'host': 'localhost'} >>> check = SchemaCheck(config, schema, 'Test Config', 'test') >>> check.result() {'host': 'localhost'} """ try: debug.lv4('TRY: validating configuration...') return self.schema(self.config) except Exception as exc: try: debug.lv4('TRY: parsing exception...') self.error = exc.errors[0] # type: ignore[attr-defined] except Exception as err: logger.error(f'Could not parse exception: {err}') self.error = f"{exc}" self.parse_error() logger.error(f'Schema error: {self.error}') msg = ( f"Configuration: {self.test_what}: Location: {self.location}: " f'Bad Value: "{self.badvalue}", {self.error}. Check configuration file.' ) debug.lv3('Exiting function, raising exception') debug.lv5(f'Value = "{exc}"') logger.error(msg) raise FailedValidation(msg) from exc
def __repr__(self) -> str: """ Return a string representation of the SchemaCheck instance. Returns: str: Description of the configuration being validated. Example: >>> from voluptuous import Schema >>> check = SchemaCheck( {'host': 'localhost'}, Schema({'host': str}), 'Test Config', 'test' ) >>> repr(check) "<SchemaCheck test_what='Test Config' location='test'>" """ return f"<SchemaCheck test_what='{self.test_what}' location='{self.location}'>"