#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
+@summary: A module for providing a configuration for LDAP based applications.
+
@author: Frank Brehm
@contact: frank.brehm@pixelpark.com
@copyright: © 2023 by Frank Brehm, Berlin
-@summary: A module for providing a configuration for applications,
- which are performing LDAP actions, like search a.s.o.
"""
from __future__ import absolute_import
# Standard module
-import logging
import copy
+import logging
import re
# Third party modules
-
-# Own modules
-
from fb_tools import MAX_TIMEOUT
-from fb_tools.common import is_sequence, to_bool
+from fb_tools.common import is_sequence
+from fb_tools.common import to_bool
from fb_tools.multi_config import DEFAULT_ENCODING
-from fb_tools.obj import FbGenericBaseObject, FbBaseObject
-
-from . import PpConfigurationError, PpBaseConfiguration
-
-from .. import MAX_PORT_NUMBER, DEFAULT_CONFIG_DIR
-# from .. import pp
-
-from . import VALID_TIERS, DEFAULT_TIER
+from fb_tools.obj import FbBaseObject
+from fb_tools.obj import FbGenericBaseObject
+# Own modules
+from . import DEFAULT_TIER
+from . import PpBaseConfiguration
+from . import PpConfigurationError
+from . import VALID_TIERS
+from .. import DEFAULT_CONFIG_DIR
+from .. import MAX_PORT_NUMBER
from ..xlate import XLATOR
-__version__ = '0.7.0'
+__version__ = '0.7.1'
LOG = logging.getLogger(__name__)
_ = XLATOR.gettext
# =============================================================================
class LdapConfigError(PpConfigurationError):
- """Base error class for all exceptions happened during
- execution this configured application"""
+ """Base error class for all exceptions happened during execution a configured application."""
pass
host=None, use_ldaps=False, port=DEFAULT_PORT_LDAP, base_dn=None,
bind_dn=None, bind_pw=None, is_admin=None, readonly=None, tier=None,
sync_source=None, admin_dn=None, initialized=False):
-
+ """Initialize the LdapConnectionInfo object."""
self._host = None
self._use_ldaps = False
self._port = DEFAULT_PORT_LDAP
# -------------------------------------------------------------------------
def as_dict(self, short=True):
"""
- Transforms the elements of the object into a dict
+ Transform the elements of the object into a dict.
@param short: don't include local properties in resulting dict.
@type short: bool
@return: structure as dict
@rtype: dict
"""
-
res = super(LdapConnectionInfo, self).as_dict(short=short)
res['base_dn'] = self.base_dn
# -----------------------------------------------------------
@property
def host(self):
- """The host name (or IP address) of the LDAP server."""
+ """Return the host name (or IP address) of the LDAP server."""
return self._host
@host.setter
# -----------------------------------------------------------
@property
def use_ldaps(self):
- """Should there be used LDAPS for communicating with the LDAP server?"""
+ """Return whether to use LDAPS for communicating with the LDAP server."""
return self._use_ldaps
@use_ldaps.setter
# -----------------------------------------------------------
@property
def port(self):
- "The TCP port number of the LDAP server."
+ """Return the TCP port number of the LDAP server."""
return self._port
@port.setter
def port(self, value):
v = int(value)
if v < 1 or v > MAX_PORT_NUMBER:
- raise LdapConfigError(_("Invalid port {!r} for LDAP server given.").format(value))
+ raise LdapConfigError(_('Invalid port {!r} for LDAP server given.').format(value))
self._port = v
# -----------------------------------------------------------
@property
def base_dn(self):
- """The DN used to connect to the LDAP server, anonymous bind is used, if
- this DN is empty or None."""
+ """Return the base DN for all LDAP searches, if no explicite base DN is given."""
return self._base_dn
@base_dn.setter
def base_dn(self, value):
if value is None or str(value).strip() == '':
- msg = _("An empty Base DN for LDAP searches is not allowed.")
+ msg = _('An empty Base DN for LDAP searches is not allowed.')
raise LdapConfigError(msg)
self._base_dn = str(value).strip()
# -----------------------------------------------------------
@property
def bind_dn(self):
- """The DN used to connect to the LDAP server, anonymous bind is used, if
- this DN is empty or None."""
+ """
+ Return the DN used to connect to the LDAP server.
+
+ If this DN is empty or None, then anonymous bind is used.
+ """
return self._bind_dn
@bind_dn.setter
# -----------------------------------------------------------
@property
def bind_pw(self):
- """The password of the DN used to connect to the LDAP server."""
+ """Return the password of the DN used to connect to the LDAP server."""
return self._bind_pw
@bind_pw.setter
# -----------------------------------------------------------
@property
def schema(self):
- """The schema as part of the URL to connect to the LDAP server."""
+ """Return the schema as part of the URL to connect to the LDAP server."""
if self.use_ldaps:
return 'ldaps'
return 'ldap'
# -----------------------------------------------------------
@property
def url(self):
- """The URL, which ca be used to connect to the LDAP server."""
+ """Return the URL, which ca be used to connect to the LDAP server."""
if not self.host:
return None
# -----------------------------------------------------------
@property
def is_admin(self):
- """Is this an admin connection with all permissions."""
+ """Return, whether this is an admin connection with all permissions."""
return self._is_admin
@is_admin.setter
# -----------------------------------------------------------
@property
def readonly(self):
- """Is this a readonly connection."""
+ """Return, whether this is a readonly connection."""
return self._readonly
@readonly.setter
# -----------------------------------------------------------
@property
def tier(self):
- """The tier of production level of the LDAP instance (prod, test or dev)."""
+ """Return the tier of production level of the LDAP instance (prod, test or dev)."""
return self._tier
@tier.setter
return
val_lc = str(value).strip().lower()
if val_lc not in VALID_TIERS:
- msg = _("Invalid production tier {!r} given.").format(value)
+ msg = _('Invalid production tier {!r} given.').format(value)
raise LdapConfigError(msg)
self._tier = val_lc
# -----------------------------------------------------------
@property
def sync_source(self):
- """The host name (or IP address) of the LDAP server."""
+ """Return the host name (or IP address) of the LDAP server."""
return self._sync_source
@sync_source.setter
# -----------------------------------------------------------
@property
def admin_dn(self):
- """The DN of the adminitrator of the LDAP instance."""
+ """Return the DN of the administrator of the LDAP instance."""
return self._admin_dn
@admin_dn.setter
# -------------------------------------------------------------------------
def __repr__(self):
- """Typecasting into a string for reproduction."""
-
- out = "<%s(" % (self.__class__.__name__)
+ """Cast the type into a string for reproduction."""
+ out = '<%s(' % (self.__class__.__name__)
fields = []
- fields.append("appname={!r}".format(self.appname))
- fields.append("host={!r}".format(self.host))
- fields.append("use_ldaps={!r}".format(self.use_ldaps))
- fields.append("port={!r}".format(self.port))
- fields.append("base_dn={!r}".format(self.base_dn))
- fields.append("bind_dn={!r}".format(self.bind_dn))
- fields.append("bind_pw={!r}".format(self.bind_pw))
- fields.append("is_admin={!r}".format(self.is_admin))
- fields.append("readonly={!r}".format(self.readonly))
- fields.append("tier={!r}".format(self.tier))
- fields.append("sync_source={!r}".format(self.sync_source))
- fields.append("admin_dn={!r}".format(self.admin_dn))
- fields.append("initialized={!r}".format(self.initialized))
-
- out += ", ".join(fields) + ")>"
+ fields.append('appname={!r}'.format(self.appname))
+ fields.append('host={!r}'.format(self.host))
+ fields.append('use_ldaps={!r}'.format(self.use_ldaps))
+ fields.append('port={!r}'.format(self.port))
+ fields.append('base_dn={!r}'.format(self.base_dn))
+ fields.append('bind_dn={!r}'.format(self.bind_dn))
+ fields.append('bind_pw={!r}'.format(self.bind_pw))
+ fields.append('is_admin={!r}'.format(self.is_admin))
+ fields.append('readonly={!r}'.format(self.readonly))
+ fields.append('tier={!r}'.format(self.tier))
+ fields.append('sync_source={!r}'.format(self.sync_source))
+ fields.append('admin_dn={!r}'.format(self.admin_dn))
+ fields.append('initialized={!r}'.format(self.initialized))
+
+ out += ', '.join(fields) + ')>'
return out
# -------------------------------------------------------------------------
def __copy__(self):
-
+ """Return a deep copy of the current object."""
new = self.__class__(
appname=self.appname, verbose=self.verbose, base_dir=self.base_dir, host=self.host,
use_ldaps=self.use_ldaps, port=self.port, base_dn=self.base_dn, bind_dn=self.bind_dn,
# -------------------------------------------------------------------------
@classmethod
def from_config(cls, section, connection_name, appname=None, verbose=0, base_dir=None):
-
- msg_invalid = _("Invalid value {val!r} in section {sec!r} for a LDAP {what}.")
- section_name = "ldap:" + connection_name
+ """Create a new LdapConnectionInfo object based on the given configuration data."""
+ msg_invalid = _('Invalid value {val!r} in section {sec!r} for a LDAP {what}.')
+ section_name = 'ldap:' + connection_name
new = cls(appname=appname, verbose=verbose, base_dir=base_dir, initialized=False)
new.admin_dn = value
continue
- msg = _("Unknown LDAP configuration key {key} found in section {sec!r}.").format(
+ msg = _('Unknown LDAP configuration key {key} found in section {sec!r}.').format(
key=key, sec=section_name)
LOG.error(msg)
# -------------------------------------------------------------------------
def as_dict(self, short=True):
"""
- Transforms the elements of the object into a dict
+ Transform the elements of the object into a dict.
@param short: don't include local properties in resulting dict.
@type short: bool
@return: structure as dict
@rtype: dict
"""
-
res = super(LdapConnectionDict, self).as_dict(short=short)
for key in self.keys():
# -------------------------------------------------------------------------
def __copy__(self):
-
+ """Return a deep copy of the current object."""
new = self.__class__()
for key in self.keys():
# =============================================================================
class LdapConfiguration(PpBaseConfiguration):
"""
- A class for providing a configuration for an arbitrary Application working
- with one or more LDAP connections, and methods to read it from configuration files.
+ A class for providing a configuration for LDAP vased applications.
+
+ It provides also methods to read from configuration files.
"""
default_ldap_server = 'prd-ds.pixelpark.com'
append_appname_to_stems=True, additional_stems=None, config_dir=DEFAULT_CONFIG_DIR,
additional_config_file=None, additional_cfgdirs=None, encoding=DEFAULT_ENCODING,
ensure_privacy=False, use_chardet=True, initialized=False):
-
+ """Initialize the LdapConfiguration object."""
add_stems = []
if additional_stems:
if is_sequence(additional_stems):
# -------------------------------------------------------------------------
def eval_section(self, section_name):
-
+ """Evaluate a particular configuration section."""
super(LdapConfiguration, self).eval_section(section_name)
sn = section_name.lower()
section = self.cfg[section_name]
if sn == 'ldap':
- LOG.debug(_("Evaluating LDAP config ..."))
+ LOG.debug(_('Evaluating LDAP config ...'))
for key in section.keys():
if self.verbose > 1:
- LOG.debug(_("Evaluating LDAP section {!r} ...").format(key))
+ LOG.debug(_('Evaluating LDAP section {!r} ...').format(key))
sub = section[key]
if key.lower().strip() == 'timeout':
self._eval_ldap_timeout(sub)
def _eval_ldap_timeout(self, value):
timeout = DEFAULT_TIMEOUT
- msg_invalid = _("Value {!r} for a timeout is invalid.")
+ msg_invalid = _('Value {!r} for a timeout is invalid.')
try:
timeout = int(value)
def _eval_ldap_connection(self, connection_name, section):
if self.verbose > 2:
- msg = _("Reading configuration of LDAP instance {!r} ...").format(connection_name)
+ msg = _('Reading configuration of LDAP instance {!r} ...').format(connection_name)
LOG.debug(msg)
connection = LdapConnectionInfo.from_config(
# =============================================================================
-if __name__ == "__main__":
+if __name__ == '__main__':
pass