# -*- coding: utf-8 -*-
"""
+@summary: A base module for application classes with LDAP support.
+
@author: Frank Brehm
@contact: frank.brehm@pixelpark.com
@copyright: © 2023 by Frank Brehm, Berlin
-@summary: A base module for application classes with LDAP support
"""
from __future__ import absolute_import
# Standard modules
+import argparse
import logging
import os
-import argparse
import re
-
try:
from pathlib import Path
except ImportError:
from pathlib2 import Path
-
from functools import cmp_to_key
# Third party modules
-from ldap3 import Server, Connection, DSA, IP_V4_PREFERRED, SAFE_SYNC
+from fb_tools.argparse_actions import TimeoutOptionAction
+from fb_tools.collections import CIDict, CIStringSet, FrozenCIStringSet
+from fb_tools.common import is_sequence, to_bool
+from fb_tools.mailaddress import MailAddress
+
# from ldap3 import ALL
-from ldap3 import BASE, SUBTREE
+# from ldap3 import ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES
# from ldap3 import BASE, LEVEL, SUBTREE, DEREF_NEVER, DEREF_SEARCH, DEREF_BASE, DEREF_ALWAYS
+# from ldap3.core.exceptions import LDAPInvalidDnError, LDAPInvalidValueError
+# from ldap3.core.exceptions import LDAPException, LDAPBindError
from ldap3 import ALL_ATTRIBUTES
-# from ldap3 import ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES
+from ldap3 import BASE, SUBTREE
+from ldap3 import Connection, DSA, IP_V4_PREFERRED, SAFE_SYNC, Server
from ldap3 import MODIFY_ADD, MODIFY_DELETE, MODIFY_REPLACE
-# from ldap3.core.exceptions import LDAPInvalidDnError, LDAPInvalidValueError
from ldap3.core.exceptions import LDAPException
-# from ldap3.core.exceptions import LDAPException, LDAPBindError
-
-from fb_tools.common import is_sequence, to_bool
-from fb_tools.argparse_actions import TimeoutOptionAction
-from fb_tools.mailaddress import MailAddress
-from fb_tools.collections import FrozenCIStringSet, CIStringSet, CIDict
# Own modules
+from . import BaseDPXApplication, DPXAppError
+from .. import DEFAULT_CONFIG_DIR, MAX_PORT_NUMBER
from .. import __version__ as GLOBAL_VERSION
from .. import pp
-
-from ..xlate import XLATOR, format_list
-
-from .. import MAX_PORT_NUMBER, DEFAULT_CONFIG_DIR
-
-# from ..argparse_actions import PortOptionAction
-
-from . import DPXAppError, BaseDPXApplication
-
-# from ..config.ldap import LdapConfigError
-from ..config.ldap import LdapConnectionInfo, LdapConfiguration
-# rom ..config.ldap import DEFAULT_PORT_LDAP, DEFAULT_PORT_LDAPS
from ..config.ldap import DEFAULT_TIMEOUT
+from ..config.ldap import LdapConfiguration, LdapConnectionInfo
+from ..xlate import XLATOR, format_list
-__version__ = '0.11.4'
+__version__ = '0.11.5'
LOG = logging.getLogger(__name__)
_ = XLATOR.gettext
# =============================================================================
class LdapAppError(DPXAppError):
- """ Base exception class for all exceptions in all LDAP using application classes."""
+ """Base exception class for all exceptions in all LDAP using application classes."""
+
pass
# =============================================================================
class FatalLDAPError(LdapAppError):
"""Fatal errors leading to interrupt the current application."""
+
pass
# =============================================================================
class LDAPExecutionError(FatalLDAPError):
"""Error class in case, a LDAP operation was not successful."""
+
pass
# =============================================================================
class WriteLDAPItemError(FatalLDAPError):
"""Error class in case, a LDAP item could not be written."""
+
pass
# =============================================================================
class DeleteLDAPItemError(FatalLDAPError):
"""Error class in case, a LDAP item could not be deleted."""
+
pass
# =============================================================================
class LDAPParseError(FatalLDAPError):
"""Error on parsing LDAP stuff."""
+
pass
# =============================================================================
class PasswordFileOptionAction(argparse.Action):
+ """Argparse action for a password file."""
# -------------------------------------------------------------------------
def __init__(self, option_strings, must_exists=True, must_absolute=True, *args, **kwargs):
-
+ """Construct the action object."""
self.must_exists = bool(must_exists)
self.must_absolute = bool(must_absolute)
# -------------------------------------------------------------------------
def __call__(self, parser, namespace, given_path, option_string=None):
-
+ """Call the option action."""
path = Path(given_path)
if self.must_absolute:
if not path.is_absolute():
- msg = _("The path {!r} must be an absolute path.").format(given_path)
+ msg = _('The path {!r} must be an absolute path.').format(given_path)
raise argparse.ArgumentError(self, msg)
if self.must_exists:
if not path.exists():
- msg = _("The file {!r} does not exists.").format(str(path))
+ msg = _('The file {!r} does not exists.').format(str(path))
raise argparse.ArgumentError(self, msg)
if not path.is_file():
- msg = _("The given path {!r} exists, but is not a regular file.").format(str(path))
+ msg = _('The given path {!r} exists, but is not a regular file.').format(str(path))
raise argparse.ArgumentError(self, msg)
if not os.access(str(path), os.R_OK):
- msg = _("The given file {!r} is not readable.").format(str(path))
+ msg = _('The given file {!r} is not readable.').format(str(path))
raise argparse.ArgumentError(self, msg)
setattr(namespace, self.dest, path)
# =============================================================================
class LdapPortOptionAction(argparse.Action):
+ """Argparse action for the LDAP TCP (UDP?) port."""
# -------------------------------------------------------------------------
def __init__(self, option_strings, *args, **kwargs):
-
+ """Construct the action object."""
super(LdapPortOptionAction, self).__init__(
option_strings=option_strings, *args, **kwargs)
# -------------------------------------------------------------------------
def __call__(self, parser, namespace, given_port, option_string=None):
-
+ """Call the option action."""
try:
port = int(given_port)
if port <= 0 or port > MAX_PORT_NUMBER:
msg = _(
- "a port number must be greater than zero and less "
- "or equal to {}.").format(MAX_PORT_NUMBER)
+ 'a port number must be greater than zero and less '
+ 'or equal to {}.').format(MAX_PORT_NUMBER)
raise ValueError(msg)
except (ValueError, TypeError) as e:
- msg = _("Wrong port number {!r}:").format(given_port)
+ msg = _('Wrong port number {!r}:').format(given_port)
msg += ' ' + str(e)
raise argparse.ArgumentError(self, msg)
# =============================================================================
class BaseLdapApplication(BaseDPXApplication):
- """
- Base class for all application classes using LDAP.
- """
+ """Base class for all application classes using LDAP."""
use_default_ldap_connection = True
use_multiple_ldap_connections = False
# -------------------------------------------------------------------------
@classmethod
def compare_ldap_dns(cls, x, y):
+ """
+ Compare two LDAP DN strings.
+ It will be done by splitting it into the particular tokens
+ and comparing the from right to left.
+ """
# Check for empty DNs
if x is None and y is None:
return 0
return -1
# Should never come to here ...
- LOG.warn("Reached a point, where I never should come.")
- LOG.debug("Compared x: {x!r} <=> y: {y!r}".format(x=x, y=y))
+ LOG.warn('Reached a point, where I never should come.')
+ LOG.debug('Compared x: {x!r} <=> y: {y!r}'.format(x=x, y=y))
return 0
# -------------------------------------------------------------------------
cfg_class=LdapConfiguration, initialized=False, usage=None, description=None,
argparse_epilog=None, argparse_prefix_chars='-', env_prefix=None,
config_dir=DEFAULT_CONFIG_DIR):
-
+ """Contrict the application object."""
self._password_file = None
self.ldap_instances = []
self.ldap_server = {}
# -----------------------------------------------------------
@property
def password_file(self):
- """The file containing the password of the Bind DN of the default LDAP connection."""
+ """Return file containing the password of the Bind DN of the default LDAP connection."""
return self._password_file
@password_file.setter
path = Path(value)
if not path.is_absolute():
- msg = _("The path {!r} must be an absolute path.").format(value)
+ msg = _('The path {!r} must be an absolute path.').format(value)
raise LdapAppError(msg)
if not path.exists():
- msg = _("The file {!r} does not exists.").format(str(path))
+ msg = _('The file {!r} does not exists.').format(str(path))
raise LdapAppError(msg)
if not path.is_file():
- msg = _("The given path {!r} exists, but is not a regular file.").format(str(path))
+ msg = _('The given path {!r} exists, but is not a regular file.').format(str(path))
raise LdapAppError(msg)
if not os.access(str(path), os.R_OK):
- msg = _("The given file {!r} is not readable.").format(str(path))
+ msg = _('The given file {!r} is not readable.').format(str(path))
raise LdapAppError(msg)
self._password_file = path
# -------------------------------------------------------------------------
def as_dict(self, short=True):
"""
- Transforms the elements of the object into a dict
+ Transform the elements of the object into a dict.
@param short: don't include local properties in resulting dict.
@type short: bool
@return: structure as dict
@rtype: dict
"""
-
res = super(BaseLdapApplication, self).as_dict(short=short)
res['pattern_re_ldap_dn'] = self.pattern_re_ldap_dn
# -------------------------------------------------------------------------
def init_arg_parser(self):
- """
- Public available method to initiate the argument parser.
- """
-
+ """Public available method to initiate the argument parser."""
group_title = _('LDAP options')
if self.use_default_ldap_connection:
group_title = _('Options for the default LDAP connection')
ldap_bind_dn = LdapConfiguration.default_bind_dn
ldap_group.add_argument(
- '-H', '--ldap-host', metavar=_("HOST"), dest="ldap_host",
+ '-H', '--ldap-host', metavar=_('HOST'), dest='ldap_host',
help=_(
- "Hostname or address of the LDAP server to use. Default: {!r}").format(
+ 'Hostname or address of the LDAP server to use. Default: {!r}').format(
ldap_host),
)
ldap_group.add_argument(
- '--ssl', '--ldaps', '--ldap-ssl', dest="ldap_ssl", action="store_true",
- help=_("Use ldaps to connect to the LDAP server. Default: {}").format(
+ '--ssl', '--ldaps', '--ldap-ssl', dest='ldap_ssl', action='store_true',
+ help=_('Use ldaps to connect to the LDAP server. Default: {}').format(
ldap_ssl_str),
)
ldap_group.add_argument(
- '-p', '--ldap-port', metavar=_("PORT"), type=int, dest="ldap_port",
+ '-p', '--ldap-port', metavar=_('PORT'), type=int, dest='ldap_port',
action=LdapPortOptionAction,
- help=_("The port number to connect to the LDAP server. Default: {}").format(
+ help=_('The port number to connect to the LDAP server. Default: {}').format(
ldap_port),
)
ldap_group.add_argument(
- '-b', '--base-dn', metavar="DN", dest="ldap_base_dn",
+ '-b', '--base-dn', metavar='DN', dest='ldap_base_dn',
help=_(
- "The base DN used as the root for the LDAP searches. "
- "Default: {!r}").format(ldap_base_dn),
+ 'The base DN used as the root for the LDAP searches. '
+ 'Default: {!r}').format(ldap_base_dn),
)
ldap_group.add_argument(
- '-D', '--bind-dn', metavar="DN", dest="ldap_bind_dn",
+ '-D', '--bind-dn', metavar='DN', dest='ldap_bind_dn',
help=_(
- "The Bind DN to use to connect to the LDAP server. Default: {!r}").format(
+ 'The Bind DN to use to connect to the LDAP server. Default: {!r}').format(
ldap_bind_dn),
)
pw_group = ldap_group.add_mutually_exclusive_group()
pw_group.add_argument(
- '-w', '--bind-pw', '--password', metavar=_("PASSWORD"), dest="ldap_bind_pw",
- help=_("Use PASSWORD as the password for simple LDAP authentication."),
+ '-w', '--bind-pw', '--password', metavar=_('PASSWORD'), dest='ldap_bind_pw',
+ help=_('Use PASSWORD as the password for simple LDAP authentication.'),
)
pw_group.add_argument(
- '-W', '--password-prompt', action="store_true", dest="ldap_pw_prompt",
+ '-W', '--password-prompt', action='store_true', dest='ldap_pw_prompt',
help=_(
- "Prompt for simple LDAP authentication. This is used instead of "
- "specifying the password on the command line."),
+ 'Prompt for simple LDAP authentication. This is used instead of '
+ 'specifying the password on the command line.'),
)
pw_group.add_argument(
- '-y', '--password-file', metavar=_('PASSWORD_FILE'), dest="ldap_pw_file",
+ '-y', '--password-file', metavar=_('PASSWORD_FILE'), dest='ldap_pw_file',
action=PasswordFileOptionAction,
- help=_("Use contents of PASSWORD_FILE as the password for simple authentication."),
+ help=_('Use contents of PASSWORD_FILE as the password for simple authentication.'),
)
else:
arg_params = {
- 'dest': "instance",
+ 'dest': 'instance',
'type': str,
'metavar': _('INSTANCE'),
}
help_default_inst = _(
- "If not given, then the instance {!r} will be used.").format(self.default_default_ldap_instance)
+ 'If not given, then the instance {!r} will be used.').format(
+ self.default_default_ldap_instance)
help_single = _(
- "The LDAP instance (LDAP cluster) from configuration, "
- "where to execute this script.")
+ 'The LDAP instance (LDAP cluster) from configuration, '
+ 'where to execute this script.')
help_multi = _(
- "The LDAP instance (LDAP cluster) from configuration, where to execute "
- "this script. It is possible to give here the value {val_all!r}, "
- "then all found LDAP instances except {default!r} are used. "
- "It is alo possible to give the value {val_list!r}, then all configured "
- "LDAP instances are shown, and the application is exiting.").format(
+ 'The LDAP instance (LDAP cluster) from configuration, where to execute '
+ 'this script. It is possible to give here the value {val_all!r}, '
+ 'then all found LDAP instances except {default!r} are used. '
+ 'It is alo possible to give the value {val_list!r}, then all configured '
+ 'LDAP instances are shown, and the application is exiting.').format(
val_all='all', val_list='list', default='default')
if self.use_multiple_ldap_connections:
if self.show_cmdline_ldap_timeout:
ldap_group.add_argument(
- '-T', '--timeout', metavar=_('SECONDS'), dest="ldap_timeout",
+ '-T', '--timeout', metavar=_('SECONDS'), dest='ldap_timeout',
action=TimeoutOptionAction,
help=_(
- "Using the given timeout in seconds for all LDAP operations. "
- "Default: {}").format(DEFAULT_TIMEOUT),
+ 'Using the given timeout in seconds for all LDAP operations. '
+ 'Default: {}').format(DEFAULT_TIMEOUT),
)
super(BaseLdapApplication, self).init_arg_parser()
# -------------------------------------------------------------------------
def post_init(self):
"""
- Method to execute before calling run(). Here could be done some
- finishing actions after reading in commandline parameters,
- configuration a.s.o.
+ Call this method before calling run().
- This method could be overwritten by descendant classes, these
- methhods should allways include a call to post_init() of the
- parent class.
+ Here could be done some finishing actions after reading in commandline parameters,
+ configuration a.s.o.
+ This method could be overwritten by descendant classes, these methods should allways
+ include a call to post_init() of the parent class.
"""
-
self.initialized = False
super(BaseLdapApplication, self).post_init()
self.ldap_instances = [self.default_default_ldap_instance]
return
- LOG.debug(_("Checking given instances."))
+ LOG.debug(_('Checking given instances.'))
insts = getattr(self.args, 'instance', None)
if self.verbose > 1:
- LOG.debug(_("Given insts:") + ' ' + pp(insts))
+ LOG.debug(_('Given insts:') + ' ' + pp(insts))
if insts is None:
insts = []
max_key_len += 1
- title = _("Configured LDAP instances:")
+ title = _('Configured LDAP instances:')
print(title)
print('-' * len(title))
print()
- tpl = "{inst:<{width}} {url:<{url_l}} {bind_dn:<{bind_dn_l}} {tier}"
+ tpl = '{inst:<{width}} {url:<{url_l}} {bind_dn:<{bind_dn_l}} {tier}'
for inst in instances:
cfg = self.cfg.ldap_connection[inst]
url = '{url}/{base}'.format(url=cfg.url, base=cfg.base_dn)
def _verify_instances(self, is_admin=None, readonly=None, tier=None, has_sync_source=False):
if self.verbose > 1:
- LOG.debug(_("Verifying given instances ..."))
+ LOG.debug(_('Verifying given instances ...'))
if self.verbose > 2:
show_filter = []
show_filter.append('tier = {!r}'.format(tier))
if has_sync_source:
show_filter.append('sync_source = *')
- msg = _("Used filter:") + ' ' + format_list(show_filter)
+ msg = _('Used filter:') + ' ' + format_list(show_filter)
LOG.debug(msg)
filtered_instances = []
filtered_instances.append(inst.lower())
if self.verbose > 2:
- LOG.debug(_("Filtered instances:") + ' ' + pp(filtered_instances))
+ LOG.debug(_('Filtered instances:') + ' ' + pp(filtered_instances))
self._validate_given_instances(filtered_instances)
if self.verbose > 1:
- LOG.debug(_("LDAP instances to use:") + ' ' + pp(self.ldap_instances))
+ LOG.debug(_('LDAP instances to use:') + ' ' + pp(self.ldap_instances))
# -------------------------------------------------------------------------
def _validate_given_instances(self, filtered_instances):
all_ok = True
for given_inst in self.ldap_instances:
if given_inst not in filtered_instances:
- msg = _("LDAP instance {!r} not found in configuration or is not usable.").format(
+ msg = _('LDAP instance {!r} not found in configuration or is not usable.').format(
given_inst)
LOG.error(msg)
all_ok = False
# -------------------------------------------------------------------------
def __del__(self):
-
+ """Destruct the object."""
self.disconnect_all()
# -------------------------------------------------------------------------
def pre_run(self):
-
- LOG.debug(_("Preparations ..."))
+ """Execute it immediately before running the underlaying payload."""
+ LOG.debug(_('Preparations ...'))
super(BaseLdapApplication, self).pre_run()
- LOG.debug(_("Open all necessary LDAP connections ..."))
+ LOG.debug(_('Open all necessary LDAP connections ...'))
for inst in self.ldap_instances:
self.connect_instance(inst)
# -------------------------------------------------------------------------
def connect_instance(self, inst):
-
+ """Connect to the given LDAP instance."""
connect_info = self.cfg.ldap_connection[inst]
ldap_server = self.get_ldap_server_obj(inst)
self.ldap_server[inst] = ldap_server
if not connect_info.bind_pw:
- first_prompt = _("Password of user {usr} on LDAP instance {inst}:").format(
+ first_prompt = _('Password of user {usr} on LDAP instance {inst}:').format(
usr=self.colored(connect_info.bind_dn, 'CYAN'),
inst=self.colored(connect_info.url, 'CYAN')) + ' '
connect_info.bind_pw = self.get_password(first_prompt, may_empty=False, repeat=False)
self.ldap_connection[inst] = ldap_connection
if self.verbose > 2:
- msg = _("Info about LDAP server {}:").format(connect_info.url)
+ msg = _('Info about LDAP server {}:').format(connect_info.url)
msg += ' ' + repr(ldap_connection)
LOG.debug(msg)
# -------------------------------------------------------------------------
def get_ldap_server_obj(self, inst):
-
+ """Return the ldap3-Server object for the given instance."""
connect_info = self.cfg.ldap_connection[inst]
if self.verbose > 2:
- msg = _("Trying to get LDAP server object for {} ...").format(connect_info.url)
+ msg = _('Trying to get LDAP server object for {} ...').format(connect_info.url)
LOG.debug(msg)
server_opts = {}
server_opts['mode'] = IP_V4_PREFERRED
server_opts['connect_timeout'] = self.cfg.ldap_timeout
if self.verbose > 2:
- msg = _("Connect options to server {!r}:").format(connect_info.url)
+ msg = _('Connect options to server {!r}:').format(connect_info.url)
msg += ' ' + pp(server_opts)
LOG.debug(msg)
ldap_server = Server(connect_info.host, **server_opts)
if self.verbose > 2:
- LOG.debug(_("LDAP server {s}: {re}").format(s=ldap_server, re=repr(ldap_server)))
+ LOG.debug(_('LDAP server {s}: {re}').format(s=ldap_server, re=repr(ldap_server)))
return ldap_server
# -------------------------------------------------------------------------
def connect_to_ldap_server(self, ldap_server, inst, bind_dn=None, bind_pw=None):
-
+ """Connect to the given LDAP server."""
connect_info = self.cfg.ldap_connection[inst]
if not bind_dn:
bind_dn = connect_info.bind_dn
bind_pw = connect_info.bind_pw
if self.verbose > 1:
- msg = _("Connecting to LDAP server {url} as {dn!r} ...").format(
+ msg = _('Connecting to LDAP server {url} as {dn!r} ...').format(
url=connect_info.url, dn=bind_dn)
LOG.debug(msg)
# -------------------------------------------------------------------------
def post_run(self):
-
- LOG.debug(_("Finishing ..."))
+ """Execute after the the payload action of the application."""
+ LOG.debug(_('Finishing ...'))
super(BaseLdapApplication, self).post_run()
self.disconnect_all()
# -------------------------------------------------------------------------
def disconnect_all(self):
-
+ """Disconnect from all connected LDAP instances."""
if hasattr(self, 'ldap_connection'):
if len(self.ldap_connection) or len(self.ldap_server):
- LOG.debug(_("Disconnecting from all remaining LDAP instances ..."))
+ LOG.debug(_('Disconnecting from all remaining LDAP instances ...'))
for inst in self.ldap_instances:
self.disconnect_instance(inst)
# -------------------------------------------------------------------------
def disconnect_instance(self, inst):
-
+ """Disconnect from the given instance."""
connect_info = self.cfg.ldap_connection[inst]
if inst in self.ldap_connection:
ldap_connection = self.ldap_connection[inst]
if self.verbose > 1:
- LOG.debug(_("Unbinding from LDAP server {!r} ...").format(connect_info.url))
+ LOG.debug(_('Unbinding from LDAP server {!r} ...').format(connect_info.url))
ldap_connection.unbind()
ldap_connection = None
del self.ldap_connection[inst]
if inst in self.ldap_server:
if self.verbose > 1:
- LOG.debug(_("Disconnecting from LDAP server {!r} ...").format(connect_info.url))
+ LOG.debug(_('Disconnecting from LDAP server {!r} ...').format(connect_info.url))
del self.ldap_server[inst]
# -------------------------------------------------------------------------
def get_all_entries(self, inst, base_dn=None, ldap_filter=None, attributes=None):
- """Get all LDAP entries bellow the given BaseDN and the given LDAP filter.
+ """
+ Get all LDAP entries bellow the given BaseDN and the given LDAP filter.
+
If no attributes are given, all attributes are given back.
The result is a hash with the DNs if the resulting entries as keys, and a hash
with the resulting attributes as values.
if self.verbose > 2:
msg = _(
- "Searching in {uri}/{bdn} for all entries with filter {fltr!r}, "
- "giving attributes:").format(uri=connect_info.url, bdn=base_dn, fltr=ldap_filter)
+ 'Searching in {uri}/{bdn} for all entries with filter {fltr!r}, '
+ 'giving attributes:').format(uri=connect_info.url, bdn=base_dn, fltr=ldap_filter)
msg += ' ' + format_list(attributes, do_repr=True)
LOG.debug(msg)
if req_status:
if self.verbose > 4:
- LOG.debug(_("Result of searching:") + '\n' + pp(req_result))
+ LOG.debug(_('Result of searching:') + '\n' + pp(req_result))
for entry in req_response:
dn = entry['dn']
if self.verbose > 3:
- LOG.debug(_("Found entry {!r}.").format(dn))
+ LOG.debug(_('Found entry {!r}.').format(dn))
result[dn] = self.normalized_attributes(entry)
if self.verbose > 2:
msg = ngettext(
- "Found one entry with filter {fltr!r} in {uri}/{bdn}.",
- "Found {nr} enries with filter {fltr!r} in {uri}/{bdn}.",
+ 'Found one entry with filter {fltr!r} in {uri}/{bdn}.',
+ 'Found {nr} enries with filter {fltr!r} in {uri}/{bdn}.',
len(result)).format(
nr=len(result), uri=connect_info.url, bdn=base_dn, fltr=ldap_filter)
LOG.debug(msg)
if self.verbose > 4:
- LOG.debug(_("Got response entries:") + '\n' + pp(result))
+ LOG.debug(_('Got response entries:') + '\n' + pp(result))
else:
if self.verbose > 3:
- msg = _("No entry found with filter {fltr!r} in {uri}/{bdn}.").format(
+ msg = _('No entry found with filter {fltr!r} in {uri}/{bdn}.').format(
uri=connect_info.url, bdn=base_dn, fltr=ldap_filter)
LOG.debug(msg)
# -------------------------------------------------------------------------
def get_all_entry_dns(self, inst, ldap_filter=None):
"""Get DNs of all entries in the given LDAP instance and sort them."""
-
connect_info = self.cfg.ldap_connection[inst]
base_dn = connect_info.base_dn
ldap = self.ldap_connection[inst]
ldap_filter = '(objectClass=*)'
if self.verbose > 1:
- LOG.debug(_("Using LDAP filter: {!r}").format(ldap_filter))
+ LOG.debug(_('Using LDAP filter: {!r}').format(ldap_filter))
req_status, req_result, req_response, req_whatever = ldap.search(
search_base=base_dn, search_scope=SUBTREE, search_filter=ldap_filter,
if req_status:
if self.verbose > 5:
- msg = _("Result of searching for DNs of all entries:")
+ msg = _('Result of searching for DNs of all entries:')
LOG.debug(msg + '\n' + pp(req_result))
for entry in req_response:
if self.verbose > 4:
- LOG.debug(_("Got a response entry:") + ' ' + pp(entry))
+ LOG.debug(_('Got a response entry:') + ' ' + pp(entry))
result.append(entry['dn'])
else:
- LOG.warn("Got no entry DNs.")
+ LOG.warn('Got no entry DNs.')
if result:
result = sorted(result, key=cmp_to_key(self.compare_ldap_dns))
if self.verbose > 3 and result:
- LOG.debug(_("Result:") + '\n' + pp(result))
+ LOG.debug(_('Result:') + '\n' + pp(result))
return result
# -------------------------------------------------------------------------
def get_all_entry_dns_hash(self, inst, ldap_filter=None):
"""Get Object classes and DNs of all entries in the given LDAP instance."""
-
connect_info = self.cfg.ldap_connection[inst]
base_dn = connect_info.base_dn
ldap = self.ldap_connection[inst]
if not ldap_filter:
ldap_filter = '(objectClass=*)'
- LOG.debug(_("Getting all Entry DNs of LDAP instance {i!r} below {b!r}.").format(
+ LOG.debug(_('Getting all Entry DNs of LDAP instance {i!r} below {b!r}.').format(
i=inst, b=base_dn))
if self.verbose > 1:
- LOG.debug(_("Using LDAP filter: {!r}").format(ldap_filter))
+ LOG.debug(_('Using LDAP filter: {!r}').format(ldap_filter))
req_status, req_result, req_response, req_whatever = ldap.search(
search_base=base_dn, search_scope=SUBTREE, search_filter=ldap_filter,
if req_status:
if self.verbose > 5:
- msg = _("Result of searching for DNs of all entries:")
+ msg = _('Result of searching for DNs of all entries:')
LOG.debug(msg + '\n' + pp(req_result))
for entry in req_response:
if self.verbose > 4:
- LOG.debug(_("Got a response entry:") + ' ' + pp(entry))
+ LOG.debug(_('Got a response entry:') + ' ' + pp(entry))
dn = entry['dn']
object_classes = FrozenCIStringSet(entry['attributes']['objectClass'])
}
else:
- LOG.warn("Got no entry DNs.")
+ LOG.warn('Got no entry DNs.')
return result
# -------------------------------------------------------------------------
def get_user_dn(self, user, inst):
-
+ """Get the DN of the given user in the given LDAP instance somehow."""
connect_info = self.cfg.ldap_connection[inst]
if self.verbose > 1:
- msg = _("Trying to evaluate DN of user {user!r} in LDAP instance {inst} ...").format(
+ msg = _('Trying to evaluate DN of user {user!r} in LDAP instance {inst} ...').format(
user=user, inst=connect_info.url)
LOG.debug(msg)
if MailAddress.valid_address(user, verbose=self.verbose):
- msg = _("Trying to evaluate DN of user {u!r} as a mail address ...").format(u=user)
+ msg = _('Trying to evaluate DN of user {u!r} as a mail address ...').format(u=user)
LOG.debug(msg)
dns = self.get_user_dn_by_mail(user, inst)
if dns:
return dns
if self.re_ldap_dn.match(user):
- msg = _("Trying to evaluate DN of user {u!r} as a LDAP DN ...").format(u=user)
+ msg = _('Trying to evaluate DN of user {u!r} as a LDAP DN ...').format(u=user)
LOG.debug(msg)
dns = self.get_user_dn_by_dn(user, inst)
if dns:
return dns
if self.re_uid.match(user):
- msg = _("Trying to evaluate DN of user {u!r} as a UID (Posix user name) ...").format(
+ msg = _('Trying to evaluate DN of user {u!r} as a UID (Posix user name) ...').format(
u=user)
LOG.debug(msg)
dns = self.get_user_dn_by_uid(user, inst)
usr = user.strip()
if usr == '':
- msg = _("Empty user given.")
+ msg = _('Empty user given.')
LOG.warn(msg)
return None
- msg = _("Trying to evaluate DN of user {u!r} as a CN ({c}) ...").format(
+ msg = _('Trying to evaluate DN of user {u!r} as a CN ({c}) ...').format(
u=usr, c='common name')
LOG.debug(msg)
return self.get_user_dn_by_cn(user, inst)
# -------------------------------------------------------------------------
def get_user_dn_by_mail(self, mail, inst):
-
+ """Get the DN of the user with the given mail address in the given LDAP instance."""
connect_info = self.cfg.ldap_connection[inst]
base_dn = connect_info.base_dn
ldap = self.ldap_connection[inst]
ldap_filter += ')'
if self.verbose > 1:
- msg = _("Searching in {uri}/{bdn} with filter: {fltr}").format(
+ msg = _('Searching in {uri}/{bdn} with filter: {fltr}').format(
uri=connect_info.url, bdn=base_dn, fltr=ldap_filter)
LOG.debug(msg)
if req_status:
if self.verbose > 4:
- msg = _("Result of searching for mail address {m!r}:").format(m=mail)
+ msg = _('Result of searching for mail address {m!r}:').format(m=mail)
LOG.debug(msg + ' ' + pp(req_result))
for entry in req_response:
if self.verbose > 4:
- LOG.debug(_("Got a response entry:") + ' ' + pp(entry))
+ LOG.debug(_('Got a response entry:') + ' ' + pp(entry))
result.append(entry['dn'])
if self.verbose > 3:
- LOG.debug(_("Result:") + ' ' + pp(result))
+ LOG.debug(_('Result:') + ' ' + pp(result))
else:
if self.verbose > 3:
- msg = _("User with mail address {m!r} not found in {uri}/{bdn}.").format(
+ msg = _('User with mail address {m!r} not found in {uri}/{bdn}.').format(
m=mail, uri=connect_info.url, bdn=base_dn)
LOG.debug(msg)
# -------------------------------------------------------------------------
def get_user_dn_by_uid(self, uid, inst):
-
+ """Get the DN of the user with the given uid (POSIX name) in the given LDAP instance."""
connect_info = self.cfg.ldap_connection[inst]
base_dn = connect_info.base_dn
ldap = self.ldap_connection[inst]
ldap_filter += ')'
if self.verbose > 1:
- msg = _("Searching in {uri}/{bdn} with filter: {fltr}").format(
+ msg = _('Searching in {uri}/{bdn} with filter: {fltr}').format(
uri=connect_info.url, bdn=base_dn, fltr=ldap_filter)
LOG.debug(msg)
if req_status:
if self.verbose > 4:
- msg = _("Result of searching for uid {u!r}:").format(u=uid)
+ msg = _('Result of searching for uid {u!r}:').format(u=uid)
LOG.debug(msg + ' ' + pp(req_result))
for entry in req_response:
if self.verbose > 4:
- LOG.debug(_("Got a response entry:") + ' ' + pp(entry))
+ LOG.debug(_('Got a response entry:') + ' ' + pp(entry))
result.append(entry['dn'])
if self.verbose > 3:
- LOG.debug(_("Result:") + ' ' + pp(result))
+ LOG.debug(_('Result:') + ' ' + pp(result))
else:
if self.verbose > 3:
- msg = _("User with uid {u!r} not found in {uri}/{bdn}.").format(
+ msg = _('User with uid {u!r} not found in {uri}/{bdn}.').format(
u=uid, uri=connect_info.url, bdn=base_dn)
LOG.debug(msg)
# -------------------------------------------------------------------------
def get_user_dn_by_cn(self, cn, inst):
-
+ """Get the DN of the user with the given cn (common name) in the given LDAP instance."""
connect_info = self.cfg.ldap_connection[inst]
base_dn = connect_info.base_dn
ldap = self.ldap_connection[inst]
ldap_filter += ')'
if self.verbose > 1:
- msg = _("Searching in {uri}/{bdn} with filter: {fltr}").format(
+ msg = _('Searching in {uri}/{bdn} with filter: {fltr}').format(
uri=connect_info.url, bdn=base_dn, fltr=ldap_filter)
LOG.debug(msg)
if req_status:
if self.verbose > 4:
- msg = _("Result of searching for CN {cn!r}:").format(cn=cn)
+ msg = _('Result of searching for CN {cn!r}:').format(cn=cn)
LOG.debug(msg + ' ' + pp(req_result))
for entry in req_response:
if self.verbose > 4:
- LOG.debug(_("Got a response entry:") + ' ' + pp(entry))
+ LOG.debug(_('Got a response entry:') + ' ' + pp(entry))
result.append(entry['dn'])
if self.verbose > 3:
- LOG.debug(_("Result:") + ' ' + pp(result))
+ LOG.debug(_('Result:') + ' ' + pp(result))
else:
if self.verbose > 3:
- msg = _("User with cn {cn!r} not found in {uri}/{bdn}.").format(
+ msg = _('User with cn {cn!r} not found in {uri}/{bdn}.').format(
cn=cn, uri=connect_info.url, bdn=base_dn)
LOG.debug(msg)
# -------------------------------------------------------------------------
def get_user_dn_by_dn(self, dn, inst, strict=True):
-
+ """Search for an entry with the given DN in the given LDAP instance."""
connect_info = self.cfg.ldap_connection[inst]
attributes = ['objectClass']
if not entry:
if self.verbose > 3:
- msg = _("User with DN {dn!r} not found in {uri}.").format(
+ msg = _('User with DN {dn!r} not found in {uri}.').format(
dn=dn, uri=connect_info.url)
LOG.debug(msg)
return None
object_classes.add(values)
if self.verbose > 3:
- msg = _("ObjectClasses of {dn!r}:").format(dn=found_dn)
+ msg = _('ObjectClasses of {dn!r}:').format(dn=found_dn)
LOG.debug(msg + '\n' + pp(object_classes.as_list()))
is_person = False
break
if not is_person:
- msg = _("Entry {dn!r} in {uri} seems not to be an account.").format(
+ msg = _('Entry {dn!r} in {uri} seems not to be an account.').format(
dn=found_dn, uri=connect_info.url)
LOG.warn(msg)
if strict:
# -------------------------------------------------------------------------
def get_entry(self, dn, inst, attributes=None, operational_attributes=False):
-
+ """Get an complete LDAP entry by the given DN in the given instance."""
connect_info = self.cfg.ldap_connection[inst]
ldap = self.ldap_connection[inst]
result = None
if self.verbose > 1:
- msg = _("Searching DN {dn!r} in {uri}.").format(dn=dn, uri=connect_info.url)
+ msg = _('Searching DN {dn!r} in {uri}.').format(dn=dn, uri=connect_info.url)
LOG.debug(msg)
req_status, req_result, req_response, req_whatever = ldap.search(
if req_status:
if self.verbose > 4:
- msg = _("Result of searching for DN {dn!r}:").format(dn=dn)
+ msg = _('Result of searching for DN {dn!r}:').format(dn=dn)
LOG.debug(msg + ' ' + pp(req_result))
result = req_response[0]
if self.verbose > 3:
- LOG.debug(_("Got a response entry:") + '\n' + pp(result))
+ LOG.debug(_('Got a response entry:') + '\n' + pp(result))
else:
if self.verbose > 3:
- msg = _("Entry with DN {dn!r} not found in {uri}.").format(
+ msg = _('Entry with DN {dn!r} not found in {uri}.').format(
dn=dn, uri=connect_info.url)
LOG.debug(msg)
# -------------------------------------------------------------------------
def normalized_attributes(self, entry, omit_members=False, omit_memberof=False):
-
+ """Transform an LDAP entry into a simple dict."""
attribs = CIDict()
for attrib in entry['attributes']:
# -------------------------------------------------------------------------
def add_entry(self, inst, dn, object_classes, target_entry, ldap=None):
- """Creating a LDAP entry."""
+ """Create a LDAP entry."""
connect_info = self.cfg.ldap_connection[inst]
if not ldap:
ldap = self.ldap_connection[inst]
if self.verbose > 2:
- msg = _("Creating entry {dn!r} on {uri}:").format(
+ msg = _('Creating entry {dn!r} on {uri}:').format(
uri=connect_info.url, dn=dn)
msg += '\nobjectClasses:\n' + pp(object_classes)
- msg += "\nAttributes:\n" + pp(target_entry)
+ msg += '\nAttributes:\n' + pp(target_entry)
if self.simulate:
- LOG.info(_("Simulation mode - entry will not be created."))
+ LOG.info(_('Simulation mode - entry will not be created.'))
return True
try:
req_status, req_result, req_response, req_whatever = ldap.add(
dn, object_class=object_classes, attributes=target_entry)
except LDAPException as e:
- msg = _("Creation of entry {dn!r} was NOT successfull - {c}: {e}").format(
+ msg = _('Creation of entry {dn!r} was NOT successfull - {c}: {e}').format(
dn=dn, c=e.__class__.__name__, e=e)
msg += '\nobjectClasses:\n' + pp(object_classes)
- msg += "\nAttributes:\n" + pp(target_entry)
+ msg += '\nAttributes:\n' + pp(target_entry)
raise WriteLDAPItemError(msg)
# Example result on a not successful modification:
# 'type': 'modifyResponse'}
if self.verbose > 1:
- LOG.debug(_("Creation status: {!r}.").format(req_status))
+ LOG.debug(_('Creation status: {!r}.').format(req_status))
if self.verbose > 2:
- LOG.debug(_("Result of creating:") + '\n' + pp(req_result))
+ LOG.debug(_('Result of creating:') + '\n' + pp(req_result))
if not req_status:
- msg = _("Creation of entry {dn!r} was NOT successful: {desc} - {msg}").format(
+ msg = _('Creation of entry {dn!r} was NOT successful: {desc} - {msg}').format(
dn=dn, desc=req_result['description'], msg=req_result['message'].strip())
msg += '\nobjectClasses:\n' + pp(object_classes)
- msg += "\nAttributes:\n" + pp(target_entry)
+ msg += '\nAttributes:\n' + pp(target_entry)
raise WriteLDAPItemError(msg)
LOG.debug(_('Creation successful.'))
# -------------------------------------------------------------------------
def modify_entry(self, inst, dn, changes, ldap=None):
- """Mofifying an existing LDAP entry."""
+ """Mofify an existing LDAP entry."""
connect_info = self.cfg.ldap_connection[inst]
if not ldap:
ldap = self.ldap_connection[inst]
if self.verbose > 1:
- msg = _("Applying changes on {uri} to DN {dn!r}:").format(
+ msg = _('Applying changes on {uri} to DN {dn!r}:').format(
uri=connect_info.url, dn=dn)
LOG.debug(msg + '\n' + pp(changes))
if self.simulate:
- LOG.info(_("Simulation mode - changes are not applied."))
+ LOG.info(_('Simulation mode - changes are not applied.'))
return True
try:
req_status, req_result, req_response, req_whatever = ldap.modify(dn, changes)
except LDAPException as e:
- msg = _("Modification of {dn!r} was NOT successfull - {c}: {e}").format(
+ msg = _('Modification of {dn!r} was NOT successfull - {c}: {e}').format(
dn=dn, c=e.__class__.__name__, e=e)
- msg += '\n' + _("Changes:") + '\n' + pp(changes)
+ msg += '\n' + _('Changes:') + '\n' + pp(changes)
raise WriteLDAPItemError(msg)
# Example result on a not successful modification:
# 'type': 'modifyResponse'}
if self.verbose > 1:
- LOG.debug(_("Modification status: {!r}.").format(req_status))
+ LOG.debug(_('Modification status: {!r}.').format(req_status))
if self.verbose > 2:
- LOG.debug(_("Result of modifying:") + '\n' + pp(req_result))
+ LOG.debug(_('Result of modifying:') + '\n' + pp(req_result))
if not req_status:
- msg = _("Modification of {dn!r} was NOT successful: {desc} - {msg}").format(
+ msg = _('Modification of {dn!r} was NOT successful: {desc} - {msg}').format(
dn=dn, desc=req_result['description'], msg=req_result['message'].strip())
- msg += '\n' + _("Changes:") + '\n' + pp(changes)
+ msg += '\n' + _('Changes:') + '\n' + pp(changes)
raise WriteLDAPItemError(msg)
LOG.debug(_('Modification successful.'))
# -------------------------------------------------------------------------
def delete_entry(self, inst, dn, ldap=None):
-
+ """Delete a LDAP entry."""
connect_info = self.cfg.ldap_connection[inst]
if not ldap:
ldap = self.ldap_connection[inst]
- msg = _("Deleting LDAP entry {dn!r} on {uri} ...").format(
+ msg = _('Deleting LDAP entry {dn!r} on {uri} ...').format(
uri=connect_info.url, dn=dn)
LOG.info(msg)
if self.simulate:
- LOG.info(_("Simulation mode - deletion will not be executed."))
+ LOG.info(_('Simulation mode - deletion will not be executed.'))
return True
try:
req_status, req_result, req_response, req_whatever = ldap.delete(dn)
except LDAPException as e:
- msg = _("Deletion of {dn!r} was NOT successfull - {c}: {e}").format(
+ msg = _('Deletion of {dn!r} was NOT successfull - {c}: {e}').format(
c=e.__class__.__name__, e=e)
raise DeleteLDAPItemError(msg)
if self.verbose > 1:
- LOG.debug(_("Deletion status: {!r}.").format(req_status))
+ LOG.debug(_('Deletion status: {!r}.').format(req_status))
if self.verbose > 2:
- LOG.debug(_("Result of deletion:") + '\n' + pp(req_result))
+ LOG.debug(_('Result of deletion:') + '\n' + pp(req_result))
if not req_status:
- msg = _("Deletion of {dn!r} was NOT successful: {desc} - {msg}").format(
+ msg = _('Deletion of {dn!r} was NOT successful: {desc} - {msg}').format(
dn=dn, desc=req_result['description'], msg=req_result['message'].strip())
raise DeleteLDAPItemError(msg)
# -------------------------------------------------------------------------
def get_group_memberships(self, inst, dn, base_dn=None):
-
+ """Return a list with DNs of all groups containing the given DN as a member."""
connect_info = self.cfg.ldap_connection[inst]
ldap = self.ldap_connection[inst]
ldap_filter = '(member={})'.format(dn)
if self.verbose > 1:
- msg = _("Searching in {uri}/{bdn} with filter: {fltr}").format(
+ msg = _('Searching in {uri}/{bdn} with filter: {fltr}').format(
uri=connect_info.url, bdn=base_dn, fltr=ldap_filter)
LOG.debug(msg)
if req_status:
for entry in req_response:
if self.verbose > 4:
- LOG.debug(_("Got a response entry:") + ' ' + pp(entry))
+ LOG.debug(_('Got a response entry:') + ' ' + pp(entry))
result.append(entry['dn'])
if self.verbose > 3:
- LOG.debug(_("Result:") + ' ' + pp(result))
+ LOG.debug(_('Result:') + ' ' + pp(result))
return result
# -------------------------------------------------------------------------
def get_unique_group_memberships(self, inst, dn, base_dn=None):
-
+ """Return a list with DNs of all groups containing the given DN as a unique member."""
connect_info = self.cfg.ldap_connection[inst]
ldap = self.ldap_connection[inst]
ldap_filter = '(uniqueMember={})'.format(dn)
if self.verbose > 1:
- msg = _("Searching in {uri}/{bdn} with filter: {fltr}").format(
+ msg = _('Searching in {uri}/{bdn} with filter: {fltr}').format(
uri=connect_info.url, bdn=base_dn, fltr=ldap_filter)
LOG.debug(msg)
if req_status:
for entry in req_response:
if self.verbose > 4:
- LOG.debug(_("Got a response entry:") + ' ' + pp(entry))
+ LOG.debug(_('Got a response entry:') + ' ' + pp(entry))
result.append(entry['dn'])
if self.verbose > 3:
- LOG.debug(_("Result:") + ' ' + pp(result))
+ LOG.debug(_('Result:') + ' ' + pp(result))
return result
# -------------------------------------------------------------------------
def get_posix_group_memberships(self, inst, uid, base_dn=None):
-
+ """Return a list with DNs of all POSIX groups containing the given UID as a member."""
connect_info = self.cfg.ldap_connection[inst]
ldap = self.ldap_connection[inst]
ldap_filter = '(memberUid={})'.format(uid)
if self.verbose > 1:
- msg = _("Searching in {uri}/{bdn} with filter: {fltr}").format(
+ msg = _('Searching in {uri}/{bdn} with filter: {fltr}').format(
uri=connect_info.url, bdn=base_dn, fltr=ldap_filter)
LOG.debug(msg)
if req_status:
for entry in req_response:
if self.verbose > 4:
- LOG.debug(_("Got a response entry:") + ' ' + pp(entry))
+ LOG.debug(_('Got a response entry:') + ' ' + pp(entry))
result.append(entry['dn'])
if self.verbose > 3:
- LOG.debug(_("Result:") + ' ' + pp(result))
+ LOG.debug(_('Result:') + ' ' + pp(result))
return result
# -------------------------------------------------------------------------
def get_sudo_group_memberships(self, inst, uid, base_dn=None):
-
+ """Return a list with DNs of all sudo roles containing the given UID as a member."""
connect_info = self.cfg.ldap_connection[inst]
ldap = self.ldap_connection[inst]
ldap_filter = '(sudoUser={})'.format(uid)
if self.verbose > 1:
- msg = _("Searching in {uri}/{bdn} with filter: {fltr}").format(
+ msg = _('Searching in {uri}/{bdn} with filter: {fltr}').format(
uri=connect_info.url, bdn=base_dn, fltr=ldap_filter)
LOG.debug(msg)
if req_status:
for entry in req_response:
if self.verbose > 4:
- LOG.debug(_("Got a response entry:") + ' ' + pp(entry))
+ LOG.debug(_('Got a response entry:') + ' ' + pp(entry))
result.append(entry['dn'])
if self.verbose > 3:
- LOG.debug(_("Result:") + ' ' + pp(result))
+ LOG.debug(_('Result:') + ' ' + pp(result))
return result
# -------------------------------------------------------------------------
def read_password_file(self, pw_file):
- """Reading a password from the given password file. It returns the first
- stripped non empty line."""
+ """
+ Read a password from the given password file.
+ It returns the first stripped non empty line.
+ """
if self.verbose > 1:
abs_path = pw_file.resolve()
- LOG.debug(_("Reading password file {!r} ...").format(str(abs_path)))
+ LOG.debug(_('Reading password file {!r} ...').format(str(abs_path)))
if not pw_file.exists():
- msg = _("The file {!r} does not exists.").format(str(pw_file))
+ msg = _('The file {!r} does not exists.').format(str(pw_file))
LOG.warn(msg)
return None
if not pw_file.is_file():
- msg = _("The given path {!r} exists, but is not a regular file.").format(str(pw_file))
+ msg = _('The given path {!r} exists, but is not a regular file.').format(str(pw_file))
LOG.warn(msg)
return None
if not os.access(str(pw_file), os.R_OK):
- msg = _("The given file {!r} is not readable.").format(str(pw_file))
+ msg = _('The given file {!r} is not readable.').format(str(pw_file))
LOG.warn(msg)
return None
# -------------------------------------------------------------------------
def generate_modify_data(self, dn, src_attribs, tgt_attribs):
-
+ """Generate a dict for using my LDAP-modify by the given source and target attributes."""
changes = {}
first_dn_token = self.re_dn_separator.split(dn)[0]
match = self.re_dntoken.match(first_dn_token)
if not match:
- msg = _("Could not detect RDN from DN {!r}.").format(dn)
+ msg = _('Could not detect RDN from DN {!r}.').format(dn)
raise LDAPParseError(msg)
rdn = match.group(1)
if self.verbose > 2:
- msg = _("Found RDN attribute {!r}.").format(rdn)
+ msg = _('Found RDN attribute {!r}.').format(rdn)
LOG.debug(msg)
for attrib_name in src_attribs:
if attrib_name.lower() == rdn.lower():
if self.verbose > 2:
- msg = _("RDN attribute {!r} will not be touched.").format(rdn)
+ msg = _('RDN attribute {!r} will not be touched.').format(rdn)
LOG.debug(msg)
continue
if attrib_name.lower() == 'memberof':
if self.verbose > 2:
- msg = _("Attribute {!r} will not be touched.").format(attrib_name)
+ msg = _('Attribute {!r} will not be touched.').format(attrib_name)
LOG.debug(msg)
continue
continue
if attrib_name.lower() == rdn.lower():
- msg = "RDN attribute {!r} will not be touched.".format(rdn)
+ msg = 'RDN attribute {!r} will not be touched.'.format(rdn)
LOG.warn(msg)
continue
if attrib_name.lower() == 'memberof':
if self.verbose > 2:
- msg = _("Attribute {!r} will not be touched.").format(attrib_name)
+ msg = _('Attribute {!r} will not be touched.').format(attrib_name)
LOG.debug(msg)
continue
values_del.append(tgt_val)
if self.verbose > 2 and values_add:
- msg = _("Values to add to attribute {!r}:").format(attrib_name)
+ msg = _('Values to add to attribute {!r}:').format(attrib_name)
LOG.debug(msg + '\n' + pp(values_add))
if self.verbose > 2 and values_del:
- msg = _("Values to removed from attribute {!r}:").format(attrib_name)
+ msg = _('Values to removed from attribute {!r}:').format(attrib_name)
LOG.debug(msg + '\n' + pp(values_del))
if len(values_add) == len(src_attrib_values) and len(values_del) == 0:
if self.verbose > 3:
if attr_changes:
- msg = _("Changes for attribute {!r}:").format(attrib_name)
+ msg = _('Changes for attribute {!r}:').format(attrib_name)
msg += '\n' + pp(attr_changes)
else:
- msg = _("No changes to attribute {!r}.").format(attrib_name)
+ msg = _('No changes to attribute {!r}.').format(attrib_name)
LOG.debug(msg)
return attr_changes
# -------------------------------------------------------------------------
def generate_create_entry(self, src_attribs):
+ """
+ Create data for cretating a new LDAP entry.
+ It returns a tuple, consisting of a list of the objec classes,
+ and a dict with all other attributes.
+ """
object_classes = []
target_entry = {}
if attrib_name.lower() == 'memberof':
if self.verbose > 2:
- msg = _("Attribute {!r} will not be touched.").format(attrib_name)
+ msg = _('Attribute {!r} will not be touched.').format(attrib_name)
LOG.debug(msg)
continue
# =============================================================================
-if __name__ == "__main__":
+if __name__ == '__main__':
pass