--- /dev/null
+# -*- coding: utf-8 -*-
+"""
+@author: Frank Brehm
+@contact: frank.brehm@pixelpark.com
+@copyright: © 2022 by Frank Brehm, Berlin
+@summary: A base module for application classes with LDAP support
+"""
+from __future__ import absolute_import
+
+# Standard modules
+import logging
+import copy
+import pipes
+import os
+import argparse
+
+try:
+ from pathlib import Path
+except ImportError:
+ from pathlib2 import Path
+
+## Third party modules
+from fb_tools.common import pp
+
+from fb_tools.cfg_app import FbConfigApplication
+
+from fb_tools.errors import FbAppError
+
+# Own modules
+from . import __version__ as GLOBAL_VERSION
+
+from .xlate import XLATOR
+
+from . import MAX_PORT_NUMBER, DEFAULT_CONFIG_DIR
+
+from .xlate import XLATOR
+
+from .argparse_actions import PortOptionAction
+
+from .ldap_config import LdapConfigError, LdapConnectionInfo, LdapConfiguration
+from .ldap_config import DEFAULT_PORT_LDAP, DEFAULT_PORT_LDAPS, DEFAULT_TIMEOUT, MAX_TIMEOUT
+
+__version__ = '0.1.0'
+LOG = logging.getLogger(__name__)
+
+_ = XLATOR.gettext
+ngettext = XLATOR.ngettext
+
+
+# =============================================================================
+class LdapAppError(FbAppError):
+ """ Base exception class for all exceptions in all LDAP using application classes."""
+ pass
+
+
+# =============================================================================
+class PasswordFileOptionAction(argparse.Action):
+
+ # -------------------------------------------------------------------------
+ def __init__(self, option_strings, must_exists=True, *args, **kwargs):
+
+ self.must_exists = bool(must_exists)
+
+ super(PasswordFileOptionAction, self).__init__(
+ option_strings=option_strings, *args, **kwargs)
+
+ # -------------------------------------------------------------------------
+ def __call__(self, parser, namespace, given_path, option_string=None):
+
+ path = Path(given_path)
+ if not path.is_absolute():
+ msg = _("The path {!r} must be an absolute path.").format(given_path)
+ raise argparse.ArgumentError(self, msg)
+
+ if self.must_exists:
+
+ if not path.exists():
+ msg = _("The file {!r} does not exists.").format(str(path))
+ raise argparse.ArgumentError(self, msg)
+
+ if not path.is_file():
+ msg = _("The given path {!r} exists, but is not a regular file.").format(str(path))
+ raise argparse.ArgumentError(self, msg)
+
+ if not os.access(str(path), os.R_OK):
+ msg = _("The given file {!r} is not readable.").format(str(path))
+ raise argparse.ArgumentError(self, msg)
+
+ setattr(namespace, self.dest, path)
+
+
+# =============================================================================
+class LdapPortOptionAction(argparse.Action):
+
+ # -------------------------------------------------------------------------
+ def __init__(self, option_strings, *args, **kwargs):
+
+ super(LdapPortOptionAction, self).__init__(
+ option_strings=option_strings, *args, **kwargs)
+
+ # -------------------------------------------------------------------------
+ def __call__(self, parser, namespace, given_port, option_string=None):
+
+ try:
+ port = int(given_port):
+ if port <= 0 or port > MAX_PORT_NUMBER:
+ msg = _(
+ "a port number must be greater than zero and less "
+ "or equal to {}.").format(MAX_PORT_NUMBER)
+ raise ValueError(msg)
+ except (ValueError, TypeError) as e:
+ msg = _("Wrong port number {!r}:").format(given_port)
+ msg += ' ' + str(e)
+ raise argparse.ArgumentError(self, msg)
+
+ setattr(namespace, self.dest, port)
+
+
+# =============================================================================
+class TimeoutOptionAction(argparse.Action):
+
+ # -------------------------------------------------------------------------
+ def __init__(self, option_strings, *args, **kwargs):
+
+ super(TimeoutOptionAction, self).__init__(
+ option_strings=option_strings, *args, **kwargs)
+
+ # -------------------------------------------------------------------------
+ def __call__(self, parser, namespace, given_timeout, option_string=None):
+
+ try:
+ timeout = int(given_timeout):
+ if timeout <= 0 or timeout > MAX_TIMEOUT:
+ msg = _(
+ "a timeout must be greater than zero and less "
+ "or equal to {}.").format(MAX_TIMEOUT)
+ raise ValueError(msg)
+ except (ValueError, TypeError) as e:
+ msg = _("Wrong timeout {!r}:").format(given_timeout)
+ msg += ' ' + str(e)
+ raise argparse.ArgumentError(self, msg)
+
+ setattr(namespace, self.dest, timeout)
+
+
+# =============================================================================
+class BaseLdapApplication(FbConfigApplication):
+ """
+ Base class for all application classes using LDAP.
+ """
+
+ # -------------------------------------------------------------------------
+ def __init__(
+ self, appname=None, verbose=0, version=GLOBAL_VERSION, base_dir=None,
+ cfg_class=LdapConfiguration, initialized=False, usage=None, description=None,
+ argparse_epilog=None, argparse_prefix_chars='-', env_prefix=None,
+ use_default_ldap_connection=True, config_dir=DEFAULT_CONFIG_DIR):
+
+ self._password_file = None
+ self._use_default_ldap_connection = bool(use_default_ldap_connection)
+
+ super(BaseLdapApplication, self).__init__(
+ appname=appname, verbose=verbose, version=version, base_dir=base_dir,
+ description=description, cfg_class=cfg_class, initialized=False,
+ argparse_epilog=argparse_epilog, argparse_prefix_chars=argparse_prefix_chars,
+ env_prefix=env_prefix, config_dir=config_dir
+ )
+
+ # -----------------------------------------------------------
+ @property
+ def use_default_ldap_connection(self):
+ """Should there be command line parameters for the default LDAP connection."""
+ return self._use_default_ldap_connection
+
+ # -----------------------------------------------------------
+ @property
+ def password_file(self):
+ """The file containing the password of the Bind DN of the default LDAP connection."""
+ return self._password_file
+
+ @password_file.setter
+ def password_file(self, value):
+
+ path = Path(value)
+ if not path.is_absolute():
+ msg = _("The path {!r} must be an absolute path.").format(value)
+ raise LdapAppError(msg)
+
+ if not path.exists():
+ msg = _("The file {!r} does not exists.").format(str(path))
+ raise LdapAppError(msg)
+
+ if not path.is_file():
+ msg = _("The given path {!r} exists, but is not a regular file.").format(str(path))
+ raise LdapAppError(msg)
+
+ if not os.access(str(path), os.R_OK):
+ msg = _("The given file {!r} is not readable.").format(str(path))
+ raise LdapAppError(msg)
+
+ self._password_file = path
+
+ # -------------------------------------------------------------------------
+ def as_dict(self, short=True):
+ """
+ Transforms the elements of the object into a dict
+
+ @param short: don't include local properties in resulting dict.
+ @type short: bool
+
+ @return: structure as dict
+ @rtype: dict
+ """
+
+ res = super(BaseLdapApplication, self).as_dict(short=short)
+
+ res['password_file'] = self.password_file
+ res['use_default_ldap_connection'] = self.use_default_ldap_connection
+
+ return res
+
+
+ # -------------------------------------------------------------------------
+ def init_arg_parser(self):
+ """
+ Public available method to initiate the argument parser.
+ """
+
+ super(BaseLdapApplication, self).init_arg_parser()
+
+ ldap_group = self.arg_parser.add_argument_group(_(
+ 'Options for the default LDAP connection'))
+
+ if self.use_default_ldap_connection:
+
+ ldap_host = LdapConfiguration.default_ldap_server
+ ldap_ssl = LdapConfiguration.use_ssl_on_default
+ ldap_ssl_str = _('No')
+ if ldap_ssl:
+ ldap_ssl_str = _('Yes')
+ ldap_port = LdapConfiguration.default_ldap_port
+ ldap_base_dn = LdapConfiguration.default_base_dn
+ ldap_bind_dn = LdapConfiguration.default_bind_dn
+
+ ldap_group.add_argument(
+ '-h', '--ldap-host', metavar=_("HOST"), dest="ldap_host",
+ help=_(
+ "Hostname or address of the LDAP server to use. Default: {!r}").format(
+ ldap_host),
+ )
+
+ ldap_group.add_argument(
+ '--ssl', '--ldaps', '--ldap-ssl', dest="ldap_ssl", action="store_true",
+ help=_("Use ldaps to connect to the LDAP server. Default: {}").format(ldap_ssl_str),
+ )
+
+ ldap_group.add_argument(
+ '-p', '--ldap-port', metavar=_("PORT"), type=int, dest="ldap_port",
+ action=LdapPortOptionAction,
+ help=_("The port number to connect to the LDAP server. Default: {}").format(
+ ldap_port),
+ )
+
+ ldap_group.add_argument(
+ '-b', '--base-dn', metavar="DN", dest="ldap_base_dn",
+ help=_(
+ "The base DN used as the root for the LDAP searches. "
+ "Default: {!r}").format(ldap_base_dn),
+ )
+
+ ldap_group.add_argument(
+ '-D', '--bind-dn', metavar="DN", dest="ldap_bind_dn",
+ help=_(
+ "The Bind DN to use to connect to the LDAP server. Default: {}").format(
+ ldap_bind_dn),
+ )
+
+ pw_group = ldap_group.add_mutually_exclusive_group()
+
+ pw_group.add_argument(
+ '-w', '--bind-pw', '--password', metavar=_("PASSWORD"), dest="ldap_bind_pw",
+ help=_("Use PASSWORD as the password for simple LDAP authentication."),
+ )
+
+ pw_group.add_argument(
+ '-W', '--password-prompt', action="store_true", dest="ldap_pw_prompt",
+ help=_(
+ "Prompt for simple LDAP authentication. This is used instead of "
+ "specifying the password on the command line."),
+ )
+
+ pw_group.add_argument(
+ '-y', '--password-file', metavar=_('PASSWORD_FILE'), dest="ldap_pw_file",
+ action=PasswordFileOptionAction,
+ help=_("Use contents of PASSWORD_FILE as the password for simple authentication."),
+ )
+
+ self.arg_parser.add_argument(
+ '-T', '--timeout', metavar=_('SECONDS'), dest="ldap_timeout",
+ action=TimeoutOptionAction,
+ help=_(
+ "Using the given timeout in seconds for all LDAP operations. "
+ "Default: {}").format(DEFAULT_TIMEOUT),
+ )
+
+ # -------------------------------------------------------------------------
+ def post_init(self):
+ """
+ Method to execute before calling run(). Here could be done some
+ finishing actions after reading in commandline parameters,
+ configuration a.s.o.
+
+ This method could be overwritten by descendant classes, these
+ methhods should allways include a call to post_init() of the
+ parent class.
+
+ """
+
+ self.initialized = False
+
+ super(BaseLdapApplication, self).post_init()
+
+ if not self.use_default_ldap_connection:
+ return
+
+ if 'default' in self.cfg.connection:
+ default_connection = self.cfg.connection['default']
+ else:
+ default_connection = LdapConnectionInfo(
+ appname=self.appname, verbose=self.verbose, base_dir=self.base_dir,
+ host=LdapConfiguration.default_ldap_server,
+ use_ldaps=LdapConfiguration.use_ssl_on_default,
+ port=LdapConfiguration.default_ldap_port,
+ base_dn=LdapConfiguration.default_base_dn,
+ bind_dn=LdapConfiguration.default_bind_dn,
+ initialized=False)
+ self.cfg.connection['default'] = default_connection
+
+ v = getattr(self.args, 'ldap_host', None)
+ if v:
+ default_connection.host = v
+
+ if getattr(self.args, 'ldap_ssl', False):
+ default_connection.use_ldaps = True
+
+ v = getattr(self.args, 'ldap_port', None)
+ if v is not None:
+ default_connection.port = v
+
+ v = getattr(self.args, 'ldap_base_dn', None)
+ if v:
+ default_connection.base_dn = v
+
+ v = getattr(self.args, 'ldap_bind_dn', None)
+ if v:
+ default_connection.bind_dn = v
+
+ v = getattr(self.args, 'ldap_bind_pw', None)
+ if v:
+ default_connection.bind_pw = v
+
+ v = getattr(self.args, 'ldap_timeout', None)
+ if v:
+ self.cfg.timeout = v
+
+
+# =============================================================================
+if __name__ == "__main__":
+
+ pass
+
+# =============================================================================
+
+# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 list