from ..argparse_actions import NonNegativeItegerOptionAction
from ..argparse_actions import LimitedFloatOptionAction
-__version__ = '0.1.0'
+__version__ = '0.2.0'
LOG = logging.getLogger(__name__)
_ = XLATOR.gettext
show_cmdline_ldap_timeout = True
apply_default_ldap_instance_if_not_given = False
show_force_option = False
+ show_assume_options = False
check_attributes = ['member', 'uniqueMember', 'owner', 'seeAlso']
"({url}) ...").format(inst=self.instance, url=ldap_url)
LOG.debug(msg)
+ self.get_dns_to_check()
+ self.check_entries()
+
+ # -------------------------------------------------------------------------
+ def get_dns_to_check(self):
+
+ ldap_filter = '(|' + ''.join(
+ map(lambda x: '({}=*)'.format(x), self.check_attributes)) + ')'
+
+ for dn in self.get_all_entry_dns(self.instance, ldap_filter=ldap_filter):
+ self.all_check_dns.add(dn)
+
+ if self.verbose:
+ nr = len(self.all_check_dns)
+ if nr:
+ msg = ngettext(
+ "Found one entry to check.",
+ "Found {} entries to check.".format(nr), nr)
+ else:
+ msg = _("Found no to check.")
+ LOG.debug(msg)
+
+ if self.verbose > 2:
+ LOG.debug("Found entries to check:\n" + pp(self.all_check_dns.as_list()))
+
+ # -------------------------------------------------------------------------
+ def check_entries(self):
+
+ for dn in self.all_check_dns:
+ self.check_entry(dn)
+
+ if self.failed_entries:
+ nr = len(self.failed_entries)
+ msg = ngettext(
+ "Got an inconsistent entry:", "Got {} inconsistent entries:".format(nr), nr)
+ LOG.error(msg)
+ print(pp(self.failed_entries.as_dict(pure=True)))
+ self.exit(5)
+
+ msg = _("Did not found any inconsistent entries.")
+ LOG.info(msg)
+ self.exit(0)
+
+ # -------------------------------------------------------------------------
+ def check_entry(self, dn):
+
+ if self.verbose > 1:
+ LOG.debug(_("Checking DN-like attributes of entry {!r} ...").format(dn))
+
+ entry = self.get_entry(dn, self.instance, attributes=self.check_attributes)
+ attribs = self.normalized_attributes(entry)
+ if self.verbose > 2:
+ LOG.debug(_("Got attributes:") + '\n' + pp(attribs.as_dict()))
+
+ for attrib in self.check_attributes:
+ if attrib in attribs:
+ for ref in attribs[attrib]:
+ if not self.check_ref_syntax(ref):
+ if dn not in self.failed_entries:
+ self.failed_entries[dn] = {}
+ if attrib not in self.failed_entries[dn]:
+ self.failed_entries[dn][attrib] = []
+ self.failed_entries[dn][attrib].append({
+ 'value': ref,
+ 'what': 'syntax',
+ })
+ continue
+ if not self.check_ref_existence(ref, self.instance):
+ if dn not in self.failed_entries:
+ self.failed_entries[dn] = {}
+ if attrib not in self.failed_entries[dn]:
+ self.failed_entries[dn][attrib] = []
+ self.failed_entries[dn][attrib].append({
+ 'value': ref,
+ 'what': 'existence',
+ })
+
+ # -------------------------------------------------------------------------
+ def check_ref_syntax(self, dn):
+
+ if self.re_ldap_dn.match(dn):
+ return True
+ return False
+
+ # -------------------------------------------------------------------------
+ def check_ref_existence(self, dn, inst):
+
+ if dn in self.checked_ref_dn:
+ return self.checked_ref_dn[dn]
+
+ if dn == self.connect_info.admin_dn:
+ self.checked_ref_dn[dn] = True
+ return True
+
+ if self.get_entry(dn, inst, attributes=['dn']):
+ self.checked_ref_dn[dn] = True
+ return True
+
+ self.checked_ref_dn[dn] = False
+ return False
+
# =============================================================================
if __name__ == "__main__":
# rom ..config.ldap import DEFAULT_PORT_LDAP, DEFAULT_PORT_LDAPS
from ..config.ldap import DEFAULT_TIMEOUT
-__version__ = '0.10.7'
+__version__ = '0.11.0'
LOG = logging.getLogger(__name__)
_ = XLATOR.gettext
del self.ldap_server[inst]
# -------------------------------------------------------------------------
- def get_all_entry_dns(self, inst):
+ def get_all_entry_dns(self, inst, ldap_filter=None):
"""Get DNs of all entries in the given LDAP instance and sort them."""
connect_info = self.cfg.ldap_connection[inst]
result = []
attributes = ['dn']
- ldap_filter = '(objectClass=*)'
+ if not ldap_filter:
+ ldap_filter = '(objectClass=*)'
+
+ if self.verbose > 1:
+ LOG.debug(_("Using LDAP filter: {!r}").format(ldap_filter))
req_status, req_result, req_response, req_whatever = ldap.search(
search_base=base_dn, search_scope=SUBTREE, search_filter=ldap_filter,
return result
# -------------------------------------------------------------------------
- def get_all_entry_dns_hash(self, inst):
+ def get_all_entry_dns_hash(self, inst, ldap_filter=None):
"""Get Object classes and DNs of all entries in the given LDAP instance."""
connect_info = self.cfg.ldap_connection[inst]
result = CIDict()
attributes = ['objectClass']
- ldap_filter = '(objectClass=*)'
+ if not ldap_filter:
+ ldap_filter = '(objectClass=*)'
LOG.debug(_("Getting all Entry DNs of LDAP instance {i!r} below {b!r}.").format(
i=inst, b=base_dn))
+ if self.verbose > 1:
+ LOG.debug(_("Using LDAP filter: {!r}").format(ldap_filter))
+
req_status, req_result, req_response, req_whatever = ldap.search(
search_base=base_dn, search_scope=SUBTREE, search_filter=ldap_filter,
get_operational_attributes=False, attributes=attributes,
from ..xlate import XLATOR
-__version__ = '0.6.2'
+__version__ = '0.7.0'
LOG = logging.getLogger(__name__)
_ = XLATOR.gettext
re_ldap_is_admin_key = re.compile(r'^\s*(?:is[_-]*)?admin\s*$', re.IGNORECASE)
re_ldap_readonly_key = re.compile(r'^\s*read[_-]*only\s*$', re.IGNORECASE)
re_ldap_sync_source_key = re.compile(r'^\s*sync[_-]*source\s*$', re.IGNORECASE)
+ re_ldap_admin_dn_key = re.compile(r'^\s*admin[_-]*dn\s*$', re.IGNORECASE)
+
+ default_admin_dn = 'cn=admin'
# -------------------------------------------------------------------------
def __init__(
self, appname=None, verbose=0, version=__version__, base_dir=None,
host=None, use_ldaps=False, port=DEFAULT_PORT_LDAP, base_dn=None,
bind_dn=None, bind_pw=None, is_admin=None, readonly=None, tier=None,
- sync_source=None, initialized=False):
+ sync_source=None, admin_dn=None, initialized=False):
self._host = None
self._use_ldaps = False
self._readonly = True
self._tier = DEFAULT_TIER
self._sync_source = None
+ self._admin_dn = self.default_admin_dn
super(LdapConnectionInfo, self).__init__(
appname=appname, verbose=verbose, version=version, base_dir=base_dir,
self.readonly = readonly
self.tier = tier
self.sync_source = sync_source
+ self.admin_dn = admin_dn
if initialized:
self.initialized = True
res['base_dn'] = self.base_dn
res['bind_dn'] = self.bind_dn
res['bind_pw'] = None
+ res['admin_dn'] = self.admin_dn
res['is_admin'] = self.is_admin
res['host'] = self.host
res['port'] = self.port
return
self._sync_source = str(value).strip()
+ # -----------------------------------------------------------
+ @property
+ def admin_dn(self):
+ """The DN of the adminitrator of the LDAP instance."""
+ return self._admin_dn
+
+ @admin_dn.setter
+ def admin_dn(self, value):
+ if value is None or str(value).strip() == '':
+ return
+ self._admin_dn = str(value).strip()
+
# -------------------------------------------------------------------------
def __repr__(self):
"""Typecasting into a string for reproduction."""
fields.append("readonly={!r}".format(self.readonly))
fields.append("tier={!r}".format(self.tier))
fields.append("sync_source={!r}".format(self.sync_source))
+ fields.append("admin_dn={!r}".format(self.admin_dn))
fields.append("initialized={!r}".format(self.initialized))
out += ", ".join(fields) + ")>"
appname=self.appname, verbose=self.verbose, base_dir=self.base_dir, host=self.host,
use_ldaps=self.use_ldaps, port=self.port, base_dn=self.base_dn, bind_dn=self.bind_dn,
bind_pw=self.bind_pw, is_admin=self.is_admin, readonly=self.readonly, tier=self.tier,
- sync_source=self.sync_source, initialized=self.initialized)
+ sync_source=self.sync_source, admin_dn=self.admin_dn, initialized=self.initialized)
return new
new.sync_source = value
continue
+ if cls.re_ldap_admin_dn_key.match(key):
+ new.admin_dn = value
+ continue
+
msg = _("Unknown LDAP configuration key {key} found in section {sec!r}.").format(
key=key, sec=section_name)
LOG.error(msg)