]> Frank Brehm's Git Trees - pixelpark/admin-tools.git/commitdiff
Integrated search for admin users in LDAP
authorFrank Brehm <frank.brehm@pixelpark.com>
Thu, 11 Jan 2018 17:14:38 +0000 (18:14 +0100)
committerFrank Brehm <frank.brehm@pixelpark.com>
Thu, 11 Jan 2018 17:14:38 +0000 (18:14 +0100)
pp_lib/dnsui_users.py

index c1e8bb291bd3b8a9ef55236ccb051695ec4854a0..f81b31de2af680ba6df1d8c87fc23543deb3b957 100644 (file)
@@ -24,7 +24,7 @@ from .common import pp
 
 from .ldap_app import PpLdapAppError, PpLdapApplication
 
-__version__ = '0.2.1'
+__version__ = '0.3.1'
 LOG = logging.getLogger(__name__)
 
 
@@ -40,7 +40,7 @@ class DnsuiUsersApp(PpLdapApplication):
           are existing and have administrator access.
     """
 
-    default_admin_group = "cn=Administratoren Pixelpark Berlin,ou=Groups,o=Pixelpark,o=isp"
+    default_admin_group = "cn=Administratoren Pixelpark Berlin"
 
     # DB data
     default_db_host = 'master.pp-dns.com'
@@ -52,6 +52,7 @@ class DnsuiUsersApp(PpLdapApplication):
     def __init__(self, appname=None, version=__version__):
 
         self.admin_users = []
+        self.admin_user_dns = []
         self.admin_group = self.default_admin_group
 
         self.db_host = self.default_db_host
@@ -196,9 +197,90 @@ class DnsuiUsersApp(PpLdapApplication):
 
             LOG.info("Starting user checks ...")
 
+            self.get_admin_user_dns()
+            self.get_admin_users()
+
         finally:
             self._close_db()
 
+    # -------------------------------------------------------------------------
+    def get_admin_user_dns(self):
+
+        LOG.info("Getting list of admin users.")
+
+        self.admin_user_dns = []
+
+        query_filter = ('(&(|(objectclass=groupOfUniqueNames)(objectclass=groupOfURLs))'
+            '({}))').format(self.admin_group)
+        LOG.debug("Query filter: {!r}".format(query_filter))
+
+        group = ObjectDef(['objectclass', 'groupOfURLs'])
+        group += ['cn', 'memberURL', 'uniqueMember']
+
+        group_entries =  self.ldap_search_subtree(group, query_filter)
+        if self.verbose > 1:
+            LOG.debug("Found {} LDAP entries.".format(len(group_entries)))
+
+        if not group_entries:
+            LOG.warn("Did not found any admin groups.")
+            return
+
+        for entry in group_entries:
+            member_urls = []
+            member_dns = []
+            for url in entry['memberURL']:
+                member_urls.append(url)
+            for dn in entry['uniqueMember']:
+                member_dns.append(dn)
+
+            if self.verbose > 2:
+                LOG.debug("Found memberURL: {}".format(pp(member_urls)))
+                LOG.debug("Found unique members:\n{}".format(pp(member_dns)))
+
+            for dn in member_dns:
+                if 'servicedesk' in dn:
+                    continue
+                if dn not in self.admin_user_dns:
+                    self.admin_user_dns.append(dn)
+
+        self.admin_user_dns.sort()
+        LOG.debug("Found admin user dn's:\n{}".format(pp(self.admin_user_dns)))
+
+    # -------------------------------------------------------------------------
+    def get_admin_users(self):
+
+        if not self.admin_user_dns:
+            LOG.warn("Did not found any admin users.")
+            return
+
+        LOG.info("Getting data of admin users.")
+
+        person = ObjectDef(['posixAccount', 'shadowAccount'])
+        person += ["uid", "givenName", "sn", "mail"]
+
+        for dn in self.admin_user_dns:
+
+            if self.verbose > 1:
+                LOG.debug("Searching for admin user {!r}.".format(dn))
+            entries = self.ldap_search_object(person, dn)
+            if self.verbose >= 2:
+                LOG.debug("Found {} LDAP entries.".format(len(entries)))
+            if not entries:
+                LOG.error("No LDAP entry found for DN {!r}.".format(dn))
+                continue
+
+            entry = entries[0]
+            user = {
+                'dn': dn,
+                'uid': entry['uid'][0],
+                'givenName': entry['givenName'][0],
+                'sn': entry['sn'][0],
+                'mail': entry['mail'][0],
+            }
+            self.admin_users.append(user)
+
+        LOG.debug("Found admin user:\n{}".format(pp(self.admin_users)))
+
     # -------------------------------------------------------------------------
     def _close_db(self):