]> Frank Brehm's Git Trees - pixelpark/create-vmware-tpl.git/commitdiff
Moving LDAP related methods from lib/cr_vmware_tpl/handler/__init__.py into lib/cr_vm...
authorFrank Brehm <frank.brehm@pixelpark.com>
Tue, 10 Oct 2023 08:50:38 +0000 (10:50 +0200)
committerFrank Brehm <frank.brehm@pixelpark.com>
Tue, 10 Oct 2023 08:50:38 +0000 (10:50 +0200)
lib/cr_vmware_tpl/handler/__init__.py
lib/cr_vmware_tpl/handler/ldap.py [new file with mode: 0644]

index cc6ea2034a2a4e1c882c55dbf4d102054b1f3400..064e68c838ff47ce410a12e23a6283653aec92c6 100644 (file)
@@ -28,11 +28,7 @@ import paramiko
 
 from pyVmomi import vim
 
-import ldap3
-
-# Own modules
-
-from fb_tools.common import pp, to_str, is_sequence
+from fb_tools.common import pp, to_str
 from fb_tools.errors import HandlerError, ExpectedHandlerError
 from fb_tools.handler import BaseHandler
 from fb_tools.xlate import format_list
@@ -46,8 +42,12 @@ from fb_vmware.connect import VsphereConnection
 from fb_vmware.iface import VsphereVmInterface
 from fb_vmware.datastore import VsphereDatastore
 
+
+# Own modules
+
 from .. import print_section_start, print_section_end
-from .. import DEFAULT_PORT_LDAP, DEFAULT_PORT_LDAPS
+
+from .ldap import HandlerLdapMixin
 
 from ..config import CrTplConfiguration
 
@@ -57,7 +57,7 @@ from ..errors import MSG_NO_CLUSTER, TempVmExistsError, NoDatastoreFoundError
 
 from ..xlate import XLATOR
 
-__version__ = '2.4.0'
+__version__ = '2.4.1'
 
 LOG = logging.getLogger(__name__)
 TZ = pytz.timezone('Europe/Berlin')
@@ -66,7 +66,7 @@ _ = XLATOR.gettext
 ngettext = XLATOR.ngettext
 
 # =============================================================================
-class CrTplHandler(BaseHandler):
+class CrTplHandler(BaseHandler, HandlerLdapMixin):
     """
     A handler class for creating a vSphere template.
     """
@@ -187,65 +187,6 @@ class CrTplHandler(BaseHandler):
 
         return res
 
-    # -------------------------------------------------------------------------
-    def connect_ldap(self):
-
-        ldap_config = self.cfg.ldap_connection['default']
-
-        server_opts = {}
-        if ldap_config.use_ldaps:
-            server_opts['use_ssl'] = True
-            if ldap_config.port != DEFAULT_PORT_LDAPS:
-                server_opts['port'] = ldap_config.port
-        else:
-            server_opts['use_ssl'] = False
-            if ldap_config.port != DEFAULT_PORT_LDAP:
-                server_opts['port'] = ldap_config.port
-
-        server_opts['get_info'] = ldap3.DSA
-        server_opts['mode'] = ldap3.IP_V4_PREFERRED
-        server_opts['connect_timeout'] = self.cfg.ldap_timeout
-
-        LOG.info(_("Connecting to LDAP server {!r} ...").format(ldap_config.url))
-
-        if self.verbose > 1:
-            msg = _("Connect options to LDAP server {!r}:").format(ldap_config.url)
-            msg += '\n' + pp(server_opts)
-            LOG.debug(msg)
-
-        self.ldap_server = ldap3.Server(ldap_config.host, **server_opts)
-
-        if self.verbose > 2:
-            LOG.debug("LDAP server {s}: {re}".format(
-                s=ldap_config.host, re=repr(self.ldap_server)))
-
-        self.ldap = ldap3.Connection(
-            self.ldap_server, ldap_config.bind_dn, ldap_config.bind_pw,
-            client_strategy=ldap3.SAFE_SYNC, auto_bind=True)
-
-        if self.verbose > 2:
-            msg = _("Info about LDAP server {}:").format(ldap_config.url)
-            msg += '\n' + repr(self.ldap_server.info)
-            LOG.debug(msg)
-
-    # -------------------------------------------------------------------------
-    def disconnect_ldap(self):
-
-        if 'default' in self.cfg.ldap_connection:
-            ldap_config = self.cfg.ldap_connection['default']
-            ldap_server = ldap_config.url
-        else:
-            ldap_server = 'unknown'
-
-        if self.ldap:
-            LOG.info(_("Unbinding from LDAP server {} ...").format(ldap_server))
-            self.ldap.unbind()
-            self.ldap = None
-
-        if self.ldap_server:
-            LOG.info(_("Disconnecting from LDAP server {} ...").format(ldap_server))
-            self.ldap_server = None
-
     # -------------------------------------------------------------------------
     def __del__(self):
         """Destructor."""
@@ -1309,95 +1250,6 @@ class CrTplHandler(BaseHandler):
         self.auth_keys_file.write_text(auth_keys)
         print_section_end('create_root_authkeys')
 
-    # -------------------------------------------------------------------------
-    def get_ldap_admins(self):
-
-        if not self.ldap:
-            msg = _("No LDAP connection initialized.")
-            raise HandlerError(msg)
-
-        admins = {}
-
-        attrs = ['cn', 'dn', 'mail', 'sshPublicKey', 'uid']
-        ldap_config = self.cfg.ldap_connection['default']
-        fltr = ldap_config.admin_filter
-
-        msg = _("Trying to get a list of all DPX admins with their public SSH keys ...")
-        LOG.debug(msg)
-
-        msg = _("LDAP search starting in {!r} with filter:").format(ldap_config.base_dn)
-        msg += '\n' + fltr
-        LOG.debug(msg)
-
-        status, result, response, request = self.ldap.search(
-            search_base=ldap_config.base_dn, search_scope=ldap3.SUBTREE, search_filter=fltr,
-            attributes=attrs, time_limit=self.cfg.ldap_timeout)
-
-        if not status:
-            msg = _("Error retrieving DPX admin list from LDAP:")
-            msg += ' ' + result
-            raise HandlerError(msg)
-
-        for entry in response:
-
-            uid = None
-            admin = {
-                'cn': None,
-                'dn': None,
-                'mail': None,
-                'keys': [],
-                'uid': None,
-            }
-
-            admin['dn'] = entry['dn']
-
-            for attr in entry['attributes']:
-
-                val = entry['attributes'][attr]
-
-                if attr.lower() == 'uid':
-                    if is_sequence(val):
-                        uid = val[0]
-                    else:
-                        uid = val
-                    admin['uid'] = uid
-
-                if attr.lower() == 'cn':
-                    if is_sequence(val):
-                        admin['cn'] = val[0]
-                    else:
-                        admin['cn'] = val
-
-                if attr.lower() == 'mail':
-                    if is_sequence(val):
-                        admin['mail'] = val[0]
-                    else:
-                        admin['mail'] = val
-
-                if attr.lower() == 'sshpublickey':
-                    if is_sequence(val):
-                        for key in val:
-                            admin['keys'].append(key)
-                    else:
-                        admin['keys'].append(val)
-
-            if self.verbose == 2:
-                msg = _("Got an admin {cn} <{mail}>.").format(cn=admin['cn'], mail=admin['mail'])
-                LOG.debug(msg)
-            elif self.verbose > 2:
-                msg = _("Got an admin:") + '\n' + pp(admin)
-                LOG.debug(msg)
-
-            admins[uid] = admin
-
-        if not admins:
-            msg = _("Did not found any admins below base DN {!r} with filter:")
-            msg = msg.format(self.cfg.ldap_connection['default'].base_dn)
-            msg += '\n' + fltr
-            raise HandlerError(msg)
-
-        return admins
-
 
 # =============================================================================
 if __name__ == "__main__":
diff --git a/lib/cr_vmware_tpl/handler/ldap.py b/lib/cr_vmware_tpl/handler/ldap.py
new file mode 100644 (file)
index 0000000..4ce3b68
--- /dev/null
@@ -0,0 +1,196 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+@author: Frank Brehm
+@contact: frank.brehm@pixelpark.com
+@copyright: © 2023 by Frank Brehm, Berlin
+@summary: A mixin module for the CrTplHandler class for LDAP dependend methods.
+"""
+from __future__ import absolute_import, print_function
+
+# Standard modules
+import logging
+
+# Third party modules
+import ldap3
+
+from fb_tools.common import pp, is_sequence
+from fb_tools.errors import HandlerError
+
+# Own modules
+
+# from .. import print_section_start, print_section_end
+from .. import DEFAULT_PORT_LDAP, DEFAULT_PORT_LDAPS
+
+from ..xlate import XLATOR
+
+__version__ = '0.1.1'
+
+LOG = logging.getLogger(__name__)
+
+_ = XLATOR.gettext
+ngettext = XLATOR.ngettext
+
+# =============================================================================
+class HandlerLdapMixin():
+    """
+    A mixin class for extending the CrTplHandler class for LDAP dependend methods.
+    """
+
+    # -------------------------------------------------------------------------
+    def connect_ldap(self):
+
+        ldap_config = self.cfg.ldap_connection['default']
+
+        server_opts = {}
+        if ldap_config.use_ldaps:
+            server_opts['use_ssl'] = True
+            if ldap_config.port != DEFAULT_PORT_LDAPS:
+                server_opts['port'] = ldap_config.port
+        else:
+            server_opts['use_ssl'] = False
+            if ldap_config.port != DEFAULT_PORT_LDAP:
+                server_opts['port'] = ldap_config.port
+
+        server_opts['get_info'] = ldap3.DSA
+        server_opts['mode'] = ldap3.IP_V4_PREFERRED
+        server_opts['connect_timeout'] = self.cfg.ldap_timeout
+
+        LOG.info(_("Connecting to LDAP server {!r} ...").format(ldap_config.url))
+
+        if self.verbose > 1:
+            msg = _("Connect options to LDAP server {!r}:").format(ldap_config.url)
+            msg += '\n' + pp(server_opts)
+            LOG.debug(msg)
+
+        self.ldap_server = ldap3.Server(ldap_config.host, **server_opts)
+
+        if self.verbose > 2:
+            LOG.debug("LDAP server {s}: {re}".format(
+                s=ldap_config.host, re=repr(self.ldap_server)))
+
+        self.ldap = ldap3.Connection(
+            self.ldap_server, ldap_config.bind_dn, ldap_config.bind_pw,
+            client_strategy=ldap3.SAFE_SYNC, auto_bind=True)
+
+        if self.verbose > 2:
+            msg = _("Info about LDAP server {}:").format(ldap_config.url)
+            msg += '\n' + repr(self.ldap_server.info)
+            LOG.debug(msg)
+
+    # -------------------------------------------------------------------------
+    def disconnect_ldap(self):
+
+        if 'default' in self.cfg.ldap_connection:
+            ldap_config = self.cfg.ldap_connection['default']
+            ldap_server = ldap_config.url
+        else:
+            ldap_server = 'unknown'
+
+        if self.ldap:
+            LOG.info(_("Unbinding from LDAP server {} ...").format(ldap_server))
+            self.ldap.unbind()
+            self.ldap = None
+
+        if self.ldap_server:
+            LOG.info(_("Disconnecting from LDAP server {} ...").format(ldap_server))
+            self.ldap_server = None
+
+    # -------------------------------------------------------------------------
+    def get_ldap_admins(self):
+
+        if not self.ldap:
+            msg = _("No LDAP connection initialized.")
+            raise HandlerError(msg)
+
+        admins = {}
+
+        attrs = ['cn', 'dn', 'mail', 'sshPublicKey', 'uid']
+        ldap_config = self.cfg.ldap_connection['default']
+        fltr = ldap_config.admin_filter
+
+        msg = _("Trying to get a list of all DPX admins with their public SSH keys ...")
+        LOG.debug(msg)
+
+        msg = _("LDAP search starting in {!r} with filter:").format(ldap_config.base_dn)
+        msg += '\n' + fltr
+        LOG.debug(msg)
+
+        status, result, response, request = self.ldap.search(
+            search_base=ldap_config.base_dn, search_scope=ldap3.SUBTREE, search_filter=fltr,
+            attributes=attrs, time_limit=self.cfg.ldap_timeout)
+
+        if not status:
+            msg = _("Error retrieving DPX admin list from LDAP:")
+            msg += ' ' + result
+            raise HandlerError(msg)
+
+        for entry in response:
+
+            uid = None
+            admin = {
+                'cn': None,
+                'dn': None,
+                'mail': None,
+                'keys': [],
+                'uid': None,
+            }
+
+            admin['dn'] = entry['dn']
+
+            for attr in entry['attributes']:
+
+                val = entry['attributes'][attr]
+
+                if attr.lower() == 'uid':
+                    if is_sequence(val):
+                        uid = val[0]
+                    else:
+                        uid = val
+                    admin['uid'] = uid
+
+                if attr.lower() == 'cn':
+                    if is_sequence(val):
+                        admin['cn'] = val[0]
+                    else:
+                        admin['cn'] = val
+
+                if attr.lower() == 'mail':
+                    if is_sequence(val):
+                        admin['mail'] = val[0]
+                    else:
+                        admin['mail'] = val
+
+                if attr.lower() == 'sshpublickey':
+                    if is_sequence(val):
+                        for key in val:
+                            admin['keys'].append(key)
+                    else:
+                        admin['keys'].append(val)
+
+            if self.verbose == 2:
+                msg = _("Got an admin {cn} <{mail}>.").format(cn=admin['cn'], mail=admin['mail'])
+                LOG.debug(msg)
+            elif self.verbose > 2:
+                msg = _("Got an admin:") + '\n' + pp(admin)
+                LOG.debug(msg)
+
+            admins[uid] = admin
+
+        if not admins:
+            msg = _("Did not found any admins below base DN {!r} with filter:")
+            msg = msg.format(self.cfg.ldap_connection['default'].base_dn)
+            msg += '\n' + fltr
+            raise HandlerError(msg)
+
+        return admins
+
+
+# =============================================================================
+if __name__ == "__main__":
+
+    pass
+
+# =============================================================================
+
+# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 list