import ldap3
+# ldap3 classes and objects
+from ldap3 import Server, ServerPool, Connection, Reader
+# ldap3 constants
+from ldap3 import IP_V4_PREFERRED, ROUND_ROBIN, AUTO_BIND_NONE, ALL_ATTRIBUTES
+from ldap3 import SUBTREE
+
from ldap3.core.exceptions import LDAPPasswordIsMandatoryError
# Own modules
from .cfg_app import PpCfgAppError, PpConfigApplication
-__version__ = '0.3.4'
+__version__ = '0.4.1'
LOG = logging.getLogger(__name__)
# ----------------------
def _get_ldap_server(host):
- return ldap3.Server(
+ return Server(
host, port=self.ldap_port, use_ssl=self.ldap_use_ssl,
- mode=ldap3.IP_V4_PREFERRED, connect_timeout=self.ldap_timeout)
+ mode=IP_V4_PREFERRED, connect_timeout=self.ldap_timeout)
# Init LDAP Server objects
if len(self.ldap_hosts) > 1:
- self.ldap_server = ldap3.ServerPool(None, ldap3.ROUND_ROBIN)
+ self.ldap_server = ServerPool(None, ROUND_ROBIN)
for h in self.ldap_hosts:
server = _get_ldap_server(h)
self.ldap_server.add(server)
raise PpLdapAppError(msg)
# Init LDAP connection object
- self.ldap_connection = ldap3.Connection(
+ self.ldap_connection = Connection(
self.ldap_server, user=self.ldap_bind_dn, password=self.ldap_bind_pw,
- auto_bind=ldap3.AUTO_BIND_NONE, lazy=True, auto_range=True
+ auto_bind=AUTO_BIND_NONE, lazy=True, auto_range=True
)
# -------------------------------------------------------------------------
LOG.debug("Executing nothing ...")
# -------------------------------------------------------------------------
- def ldap_search(
- self, query_filter, dn=None, attributes=ldap3.ALL_ATTRIBUTES,
- scope=ldap3.SUBTREE):
+ def ldap_search(self, query_filter, dn=None, attributes=ALL_ATTRIBUTES, scope=SUBTREE):
if self.verbose > 1:
LOG.debug("Query string: {q!r}, attributes: {a}".format(
if dn is None:
dn = self.ldap_base_dn
+ cursor = Reader(
+ self.ldap_connection,
+ query=query_filter, base=dn, attributes=attributes
+ )
+
+ try:
+ cursor.search()
+ except LDAPPasswordIsMandatoryError as e:
+ msg = "Please configure [LDAP]/bind_pw in configuration - " + str(e)
+ LOG.error(msg)
+ return []
+
+ return cursor.entries
+
try:
self.ldap_connection.search(
dn, query_filter, search_scope=scope, attributes=attributes)
entries = self.ldap_connection.entries
return entries
+ # -------------------------------------------------------------------------
+ def ldap_search_subtree(self, obj_def, query=None, base=None):
+
+ if base is None:
+ base = self.ldap_base_dn
+
+ cursor = Reader(
+ self.ldap_connection,
+ object_def=obj_def, query=query, base=base)
+
+ if self.verbose > 1:
+ LOG.debug("LDAP-Reader:\n{}".format(cursor))
+
+ cursor.search()
+ return cursor.entries
+
# -------------------------------------------------------------------------
def post_run(self):
"""
import ldap3
+from ldap3 import ObjectDef, AttrDef
+
+from ldap3.core.exceptions import LDAPKeyError
+
# Own modules
from .global_version import __version__ as __global_version__
from .ldap_app import PpLdapAppError, PpLdapApplication
-__version__ = '0.1.2'
+__version__ = '0.3.1'
LOG = logging.getLogger(__name__)
"""
default_initial_uid = 999999999
- default_chroot_homedir = os.sep + os.path.join('mnt', 'nfs', 'home')
+ default_chroot_homedir = os.sep + os.path.join('mnt', 'nfs')
# -------------------------------------------------------------------------
def __init__(self, appname=None, version=__version__):
self.initial_uid = self.default_initial_uid
self.chroot_homedir = self.default_chroot_homedir
self.simulate = False
+ self.user_entries = []
description = textwrap.dedent('''\
Home Directory and UIDNumber generation - this script will search for
LOG.error(msg)
self.exit(1)
+ if self.simulate:
+ LOG.warn("Simulation mode - nothing is really done.")
+
# -------------------------------------------------------------------------
def _run(self):
- LOG.debug("Executing something ...")
+ self.get_all_user_entries()
+ self.check_numeric_uids()
+ self.check_home_dirs()
- query_filter = (
- '(&'
- '(objectclass=posixAccount)'
- '(objectclass=shadowAccount)'
- '(uid=frank.brehm)'
- ')'
- )
+ # -------------------------------------------------------------------------
+ def get_all_user_entries(self):
- entries = self.ldap_search(query_filter)
+ LOG.info("Reading Accounts from LDAP ...")
+
+ query_filter = '(&(objectclass=posixAccount)(objectclass=shadowAccount))'
+ attributes = ["uid", "uidNumber", "homeDirectory", "gidNumber"]
+
+ person = ObjectDef(['posixAccount', 'shadowAccount'])
+ person += ["uid", "uidNumber", "homeDirectory", "gidNumber"]
+
+ #self.user_entries = self.ldap_search(query_filter, attributes=attributes)
+ self.user_entries = self.ldap_search_subtree(person, query_filter)
+ LOG.debug("Found {} LDAP entries.".format(len(self.user_entries)))
+
+ # -------------------------------------------------------------------------
+ def check_numeric_uids(self):
+
+ LOG.info("Checking UID's for new Users ...")
+
+ # -------------------------------------------------------------------------
+ def check_home_dirs(self):
+
+ LOG.info("Checking home directories ...")
- print("Found {} LDAP entries.".format(len(entries)))
i = 0
- for entry in entries:
+ for entry in self.user_entries:
+
i += 1
- print("\n{}".format(entry))
+ if self.verbose > 3:
+ LOG.debug("Found {c}:\n{e}".format(c=entry.__class__.__name__, e=pp(entry)))
+ dn = entry.entry_get_dn()
+ LOG.debug("Checking home of {!r} ...".format(dn))
+
+ try:
+ home_dir = entry['homeDirectory']
+ except LDAPKeyError as e:
+ LOG.debug("LDAP user {!r} has no home directory.".format(dn))
+
+ home = entry['homeDirectory'][0]
+ LOG.debug("Checking home directory {!r} ...".format(home))
+
if i >= 5:
break
-
# =============================================================================
if __name__ == "__main__":