From 711d2c1ebd955f4e091f03017e2ca6de2d6fef7d Mon Sep 17 00:00:00 2001 From: Frank Brehm Date: Thu, 11 Jan 2018 18:14:38 +0100 Subject: [PATCH] Integrated search for admin users in LDAP --- pp_lib/dnsui_users.py | 86 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 84 insertions(+), 2 deletions(-) diff --git a/pp_lib/dnsui_users.py b/pp_lib/dnsui_users.py index c1e8bb2..f81b31d 100644 --- a/pp_lib/dnsui_users.py +++ b/pp_lib/dnsui_users.py @@ -24,7 +24,7 @@ from .common import pp from .ldap_app import PpLdapAppError, PpLdapApplication -__version__ = '0.2.1' +__version__ = '0.3.1' LOG = logging.getLogger(__name__) @@ -40,7 +40,7 @@ class DnsuiUsersApp(PpLdapApplication): are existing and have administrator access. """ - default_admin_group = "cn=Administratoren Pixelpark Berlin,ou=Groups,o=Pixelpark,o=isp" + default_admin_group = "cn=Administratoren Pixelpark Berlin" # DB data default_db_host = 'master.pp-dns.com' @@ -52,6 +52,7 @@ class DnsuiUsersApp(PpLdapApplication): def __init__(self, appname=None, version=__version__): self.admin_users = [] + self.admin_user_dns = [] self.admin_group = self.default_admin_group self.db_host = self.default_db_host @@ -196,9 +197,90 @@ class DnsuiUsersApp(PpLdapApplication): LOG.info("Starting user checks ...") + self.get_admin_user_dns() + self.get_admin_users() + finally: self._close_db() + # ------------------------------------------------------------------------- + def get_admin_user_dns(self): + + LOG.info("Getting list of admin users.") + + self.admin_user_dns = [] + + query_filter = ('(&(|(objectclass=groupOfUniqueNames)(objectclass=groupOfURLs))' + '({}))').format(self.admin_group) + LOG.debug("Query filter: {!r}".format(query_filter)) + + group = ObjectDef(['objectclass', 'groupOfURLs']) + group += ['cn', 'memberURL', 'uniqueMember'] + + group_entries = self.ldap_search_subtree(group, query_filter) + if self.verbose > 1: + LOG.debug("Found {} LDAP entries.".format(len(group_entries))) + + if not group_entries: + LOG.warn("Did not found any admin groups.") + return + + for entry in group_entries: + member_urls = [] + member_dns = [] + for url in entry['memberURL']: + member_urls.append(url) + for dn in entry['uniqueMember']: + member_dns.append(dn) + + if self.verbose > 2: + LOG.debug("Found memberURL: {}".format(pp(member_urls))) + LOG.debug("Found unique members:\n{}".format(pp(member_dns))) + + for dn in member_dns: + if 'servicedesk' in dn: + continue + if dn not in self.admin_user_dns: + self.admin_user_dns.append(dn) + + self.admin_user_dns.sort() + LOG.debug("Found admin user dn's:\n{}".format(pp(self.admin_user_dns))) + + # ------------------------------------------------------------------------- + def get_admin_users(self): + + if not self.admin_user_dns: + LOG.warn("Did not found any admin users.") + return + + LOG.info("Getting data of admin users.") + + person = ObjectDef(['posixAccount', 'shadowAccount']) + person += ["uid", "givenName", "sn", "mail"] + + for dn in self.admin_user_dns: + + if self.verbose > 1: + LOG.debug("Searching for admin user {!r}.".format(dn)) + entries = self.ldap_search_object(person, dn) + if self.verbose >= 2: + LOG.debug("Found {} LDAP entries.".format(len(entries))) + if not entries: + LOG.error("No LDAP entry found for DN {!r}.".format(dn)) + continue + + entry = entries[0] + user = { + 'dn': dn, + 'uid': entry['uid'][0], + 'givenName': entry['givenName'][0], + 'sn': entry['sn'][0], + 'mail': entry['mail'][0], + } + self.admin_users.append(user) + + LOG.debug("Found admin user:\n{}".format(pp(self.admin_users))) + # ------------------------------------------------------------------------- def _close_db(self): -- 2.39.5