--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+@author: Frank Brehm
+@contact: frank@brehm-online.com
+@copyright: © 2019 by Frank Brehm, Berlin
+@summary: A module for providing a configuration for the ldap_migration application
+from __future__ import absolute_import
+# Standard module
+import logging
+import re
+from pathlib import Path
+# Third party modules
+# Own modules
+from fb_tools.common import to_bool
+from fb_tools.config import ConfigError, BaseConfiguration
+__version__ = '0.1.0'
+LOG = logging.getLogger(__name__)
+# =============================================================================
+class LDAPMigrationConfigError(ConfigError):
+ """Base error class for all exceptions happened during
+ execution this configured application"""
+ pass
+# =============================================================================
+class LDAPMigrationConfiguration(BaseConfiguration):
+ """
+ A class for providing a configuration for the LDAPMigrationApplication class
+ and methods to read it from configuration files.
+ """
+ default_src_server = 'ldap.pixelpark.com'
+ default_src_use_ssl = False
+ default_src_port = 389
+ default_src_bind_dn = 'cn=admin'
+ default_tgt_server = 'ldap2.pixelpark.com'
+ default_tgt_use_ssl = True
+ default_tgt_port = 636
+ default_tgt_bind_dn = 'cn=admin,o=isp'
+ default_suffix = 'o=isp'
+ default_timeout = 20
+ max_tcp_port = (2**16 - 1)
+ re_bind_dn = re.compile(r'^\s*bind[_-]?dn\s*$', re.IGNORECASE)
+ re_bind_pw = re.compile(r'^\s*bind[_-]?(?:pw|passwd|passsword)\s*$', re.IGNORECASE)
+ # -------------------------------------------------------------------------
+ def __init__(
+ self, appname=None, verbose=0, version=__version__, base_dir=None,
+ encoding=None, config_dir=None, config_file=None, initialized=False):
+ self.src_server = self.default_src_server
+ self.src_use_ssl = self.default_src_use_ssl
+ self._src_port = self.default_src_port
+ self.src_bind_dn = self.default_src_bind_dn
+ self.src_bind_pw = None
+ self.tgt_server = self.default_tgt_server
+ self.tgt_use_ssl = self.default_tgt_use_ssl
+ self._tgt_port = self.default_tgt_port
+ self.tgt_bind_dn = self.default_tgt_bind_dn
+ self.tgt_bind_pw = None
+ self.suffix = self.default_suffix
+ self._timeout = self.default_timeout
+ super(LDAPMigrationConfiguration, self).__init__(
+ appname=appname, verbose=verbose, version=version, base_dir=base_dir,
+ encoding=encoding, config_dir=config_dir, config_file=config_file, initialized=False,
+ )
+ if initialized:
+ self.initialized = True
+ # -------------------------------------------------------------------------
+ @property
+ def src_port(self):
+ """The TCP port number of the source LDAP host."""
+ return self._src_port
+ @src_port.setter
+ def src_port(self, value):
+ if value is None:
+ self._src_port = self.default_src_port
+ return
+ val = int(value)
+ err_msg = "Invalid port number {port!r} for connecting to source server, "
+ err_msg += "must be 0 < PORT < {max}."
+ if val <= 1 or val > self.max_tcp_port:
+ msg = err_msg.format(port=value, max=self.max_tcp_port)
+ raise ValueError(msg)
+ self._src_port = val
+ # -------------------------------------------------------------------------
+ @property
+ def tgt_port(self):
+ """The TCP port number of the source LDAP host."""
+ return self._tgt_port
+ @tgt_port.setter
+ def tgt_port(self, value):
+ if value is None:
+ self._tgt_port = self.default_tgt_port
+ return
+ val = int(value)
+ err_msg = "Invalid port number {port!r} for connecting to source server, "
+ err_msg += "must be 0 < PORT < {max}."
+ if val <= 1 or val > self.max_tcp_port:
+ msg = err_msg.format(port=value, max=self.max_tcp_port)
+ raise ValueError(msg)
+ self._tgt_port = val
+ # -------------------------------------------------------------------------
+ @property
+ def timeout(self):
+ """The timeout in seconds for Web requests."""
+ return self._timeout
+ @timeout.setter
+ def timeout(self, value):
+ if value is None:
+ self._timeout = self.default_timeout
+ return
+ val = int(value)
+ err_msg = "Invalid timeout {!r} for LDAP operations, must be 0 < SECONDS < 3600."
+ if val <= 0 or val > 3600:
+ msg = err_msg.format(value)
+ raise ValueError(msg)
+ self._timeout = val
+ # -------------------------------------------------------------------------
+ def as_dict(self, short=True):
+ """
+ Transforms the elements of the object into a dict
+ @param short: don't include local properties in resulting dict.
+ @type short: bool
+ @return: structure as dict
+ @rtype: dict
+ """
+ res = super(LDAPMigrationConfiguration, self).as_dict(short=short)
+ res['src_bind_pw'] = None
+ if self.src_bind_pw:
+ if self.verbose > 4:
+ res['src_bind_pw'] = self.src_bind_pw
+ else:
+ res['src_bind_pw'] = '*******'
+ res['tgt_bind_pw'] = None
+ if self.tgt_bind_pw:
+ if self.verbose > 4:
+ res['tgt_bind_pw'] = self.tgt_bind_pw
+ else:
+ res['tgt_bind_pw'] = '*******'
+ res['timeout'] = self.timeout
+ res['src_port'] = self.src_port
+ res['tgt_port'] = self.tgt_port
+ return res
+ # -------------------------------------------------------------------------
+ def eval_config_section(self, config, section_name):
+ super(LDAPMigrationConfiguration,, self).eval_config_section(config, section_name)
+ if section_name.lower() == 'source' or section_name.lower() == 'src':
+ self._eval_config_source(config, section_name)
+ return
+ if section_name.lower() == 'target' or section_name.lower() == 'tgt':
+ self._eval_config_target(config, section_name)
+ return
+ if section_name.lower() == 'common':
+ self._eval_config_common(config, section_name)
+ if self.verbose > 1:
+ LOG.debug("Unhandled configuration section {!r}.".format(section_name))
+ # -------------------------------------------------------------------------
+ def _eval_config_source(self, config, section_name):
+ if self.verbose > 1:
+ LOG.debug("Checking config section {!r} ...".format(section_name))
+ for (key, value) in config.items(section_name):
+ if (key.lower() == 'server' or key.lower() == 'host') and and value.strip():
+ self.src_server = value.strip().lower()
+ continue
+ if key.lower() == 'ssl':
+ self.src_use_ssl = to_bool(value)
+ continue
+ if key.lower() == 'port':
+ try:
+ self.src_port = value
+ except (ValueError, KeyError) as e:
+ msg = "Invalid value {!r} as source port:".format(value) + ' ' + str(e)
+ LOG.error(msg)
+ continue
+ if self.re_bind_dn.match(key) and value.strip():
+ self.src_bind_dn = value.strip()
+ continue
+ if self.re_bind_pw.match():
+ self.src_bind_pw = value
+ continue
+ LOG.warning((
+ "Unknown configuration option {o!r} with value {v!r} in "
+ "section {s!r}.").format(o=key, v=value, s=section_name))
+ return
+ # -------------------------------------------------------------------------
+ def _eval_config_target(self, config, section_name):
+ if self.verbose > 1:
+ LOG.debug("Checking config section {!r} ...".format(section_name))
+ for (key, value) in config.items(section_name):
+ if (key.lower() == 'server' or key.lower() == 'host') and and value.strip():
+ self.tgt_server = value.strip().lower()
+ continue
+ if key.lower() == 'ssl':
+ self.tgt_use_ssl = to_bool(value)
+ continue
+ if key.lower() == 'port':
+ try:
+ self.tgt_port = value
+ except (ValueError, KeyError) as e:
+ msg = "Invalid value {!r} as target port:".format(value) + ' ' + str(e)
+ LOG.error(msg)
+ continue
+ if self.re_bind_dn.match(key) and value.strip():
+ self.tgt_bind_dn = value.strip()
+ continue
+ if self.re_bind_pw.match():
+ self.tgt_bind_pw = value
+ continue
+ LOG.warning((
+ "Unknown configuration option {o!r} with value {v!r} in "
+ "section {s!r}.").format(o=key, v=value, s=section_name))
+ return
+ # -------------------------------------------------------------------------
+ def _eval_config_common(self, config, section_name):
+ if self.verbose > 1:
+ LOG.debug("Checking config section {!r} ...".format(section_name))
+ for (key, value) in config.items(section_name):
+ if key.lower() == 'suffix' and value.strip():
+ self.suffix = value.strip()
+ continue
+ if key.lower() == 'timeout':
+ try:
+ self.timeout = value
+ except (ValueError, KeyError) as e:
+ msg = "Invalid value {!r} as timeout:".format(value) + ' ' + str(e)
+ LOG.error(msg)
+ continue
+ LOG.warning((
+ "Unknown configuration option {o!r} with value {v!r} in "
+ "section {s!r}.").format(o=key, v=value, s=section_name))
+ return
+# =============================================================================
+if __name__ == "__main__":
+ pass
+# =============================================================================
+# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 list