from __future__ import absolute_import
# Standard modules
-import sys
+import re
import logging
-import copy
-import time
# Third party modules
-from ldap3 import ALL_ATTRIBUTES
+import yaml
# Own modules
# from fb_tools.common import to_bool, is_sequence
from .ldap import LdapAppError
from .ldap import BaseLdapApplication
-from ..argparse_actions import NonNegativeItegerOptionAction
-from ..argparse_actions import LimitedFloatOptionAction
-
-__version__ = '0.2.2'
+__version__ = '0.3.1'
LOG = logging.getLogger(__name__)
_ = XLATOR.gettext
show_force_option = False
show_assume_options = False
+ re_yaml_extension = re.compile(r'\.ya?ml$', re.IGNORECASE)
+
check_attributes = ['member', 'uniqueMember', 'owner', 'seeAlso']
# -------------------------------------------------------------------------
self.all_check_dns = CIStringSet()
self.failed_entries = CIDict()
+ self.export_file = None
+
attr_list = format_list(self.check_attributes, do_repr=True)
desc = _(
"to existing entries in LDAP.").format(alist=attr_list)
super(CheckLdapDnAttributesApplication, self).__init__(
- appname=appname, description=desc, base_dir=base_dir,
- cfg_class=LdapConfiguration, initialized=False)
+ appname=appname, description=desc, base_dir=base_dir,
+ cfg_class=LdapConfiguration, initialized=False)
self.initialized = True
super(CheckLdapDnAttributesApplication, self)._verify_instances(is_admin=True)
+ # -------------------------------------------------------------------------
+ def init_arg_parser(self):
+
+ app_group = self.arg_parser.add_argument_group(_("Script options"))
+
+ app_group.add_argument(
+ '-E', '--export', dest='export', metavar=_('FILE'),
+ help=_(
+ "Exportig the faulty entries and attributes into a YAML file, "
+ "if there were found some of them."),
+ )
+
+ super(CheckLdapDnAttributesApplication, self).init_arg_parser()
+
# -------------------------------------------------------------------------
def post_init(self):
"""Execute some actions after initialising."""
super(CheckLdapDnAttributesApplication, self).post_init()
+ self.export_file = getattr(self.args, 'export', None)
+ if self.export_file:
+ if not self.re_yaml_extension.search(self.export_file):
+ self.export_file += '.yaml'
+
self.instance = self.ldap_instances[0]
self.connect_info = self.cfg.ldap_connection[self.instance]
ldap_url = self.cfg.ldap_connection[self.instance].url
+ if self.export_file:
+ with open(self.export_file, 'wt', encoding='utf-8', errors='surrogateescape') as fh:
+ fh.write('---\n\n')
+ LOG.debug(_("Created export file {!r}.").format(self.export_file))
+
msg = _(
"Start checking all DN-like attributes in in LDAP instance {inst!r} "
"({url}) ...").format(inst=self.instance, url=ldap_url)
self.check_entry(dn)
if self.failed_entries:
+ print(pp(self.failed_entries.as_dict(pure=True)))
nr = len(self.failed_entries)
nr_attr = 0
for e_dn in self.failed_entries:
"There is one inconsistent attribute.",
"There are {} inconsistent attributes.", nr_attr).format(nr_attr)
LOG.warn(msg)
- print(pp(self.failed_entries.as_dict(pure=True)))
+ if self.export_file:
+ LOG.debug(_("Writing export file {!r} ...").format(self.export_file))
+ with open(
+ self.export_file, 'wt', encoding='utf-8', errors='surrogateescape') as fh:
+ yaml.dump(
+ self.failed_entries.as_dict(pure=True), fh, explicit_start=True,
+ default_flow_style=False, indent=2)
self.exit(5)
msg = _("Did not found any inconsistent entries.")
self.failed_entries[dn] = {}
if attrib not in self.failed_entries[dn]:
self.failed_entries[dn][attrib] = []
- self.failed_entries[dn][attrib].append({
- 'value': ref,
- 'what': 'syntax',
- })
+ self.failed_entries[dn][attrib].append(ref)
continue
if not self.check_ref_existence(ref, self.instance):
if dn not in self.failed_entries:
self.failed_entries[dn] = {}
if attrib not in self.failed_entries[dn]:
self.failed_entries[dn][attrib] = []
- self.failed_entries[dn][attrib].append({
- 'value': ref,
- 'what': 'existence',
- })
+ self.failed_entries[dn][attrib].append(ref)
# -------------------------------------------------------------------------
def check_ref_syntax(self, dn):
self.checked_ref_dn[dn] = False
return False
+
# =============================================================================
if __name__ == "__main__":