from fb_tools.common import pp, to_bool
from fb_tools.cfg_app import FbConfigApplication
from fb_tools.errors import FbAppError
+from fb_tools.mailaddress import MailAddress
# Own modules
from .. import __version__ as GLOBAL_VERSION
# rom ..config.ldap import DEFAULT_PORT_LDAP, DEFAULT_PORT_LDAPS
from ..config.ldap import DEFAULT_TIMEOUT, MAX_TIMEOUT
-__version__ = '0.3.2'
+__version__ = '0.3.3'
LOG = logging.getLogger(__name__)
_ = XLATOR.gettext
pattern_re_ldap_dn = r'^{pat}(,{pat})*$'.format(pat=token_ldap_dn)
re_ldap_dn = re.compile(pattern_re_ldap_dn, re.IGNORECASE)
+ pattern_re_uid = r'^[a-z](?:[a-z0-9\-_.]*[a-z0-9])?$'
+ re_uid = re.compile(pattern_re_uid, re.IGNORECASE)
+
# -------------------------------------------------------------------------
def __init__(
self, appname=None, verbose=0, version=GLOBAL_VERSION, base_dir=None,
LOG.debug(_("Disconnecting from LDAP server {!r} ...").format(connect_info.url))
del self.ldap_server[inst]
+ # -------------------------------------------------------------------------
+ def get_user_dn(self, user, inst):
+
+ connect_info = self.cfg.ldap_connection[inst]
+ base_dn = connect_info.base_dn
+
+ if self.verbose > 1:
+ msg = _("Trying to evaluate DN of user {user!r} in LDAP instance {inst} ...").format(
+ user=user, inst=connect_info.url)
+ LOG.debug(msg)
+
+ if MailAddress.valid_address(user, verbose=self.verbose):
+ msg = _("Trying to evaluate DN of user {u!r} as a mail address ...").format(u=user)
+ LOG.debug(msg)
+ return 'mail={u},{b}'.format(u=user, b=base_dn)
+
+ if self.re_ldap_dn.match(user):
+ msg = _("Trying to evaluate DN of user {u!r} as a LDAP DN ...").format(u=user)
+ LOG.debug(msg)
+ return user
+
+ if self.re_uid.match(user):
+ msg = _("Trying to evaluate DN of user {u!r} as a UID (Posix user name) ...").format(
+ u=user)
+ LOG.debug(msg)
+ return 'uid={u},{b}'.format(u=user, b=base_dn)
+
+ usr = user.strip()
+ if usr == '':
+ msg = _("Empty user given.")
+ LOG.warn(msg)
+ return None
+ msg = _("Trying to evaluate DN of user {u!r} as a CN ({c}) ...").format(
+ u=usr, c='common name')
+ LOG.debug(msg)
+ return 'cn={u},{b}'.format(u=usr, b=base_dn)
+
# =============================================================================
if __name__ == "__main__":
# Third party modules
# Own modules
-from fb_tools.common import to_bool
+from fb_tools.common import to_bool, is_sequence, pp
from ..xlate import XLATOR
from ..app.ldap import LdapAppError
from ..app.ldap import BaseLdapApplication
-__version__ = '0.2.2'
+__version__ = '0.3.0'
LOG = logging.getLogger(__name__)
_ = XLATOR.gettext
self.use_multiple_ldap_connections = True
self.show_cmdline_ldap_timeout = True
+ self.dns = {}
+ self.wrong_users = False
+
self.given_users = []
self.nologin_shell = self.default_nologin_shell
self._deactivate = False
LOG.error(_("No users to remove given."))
self.exit(1)
+ self.given_users = given_users
+
# -------------------------------------------------------------------------
def _run(self):
- LOG.info("Jetzt geht es los, mit ganz grossen Schritten ...")
+ for user in self.given_users:
+ self.eval_user_dns(user)
+
+ if self.wrong_users:
+ self.exit(5)
+
+ if self.verbose > 1:
+ msg = _("Evaluated DNs of user {!r}:").format(user)
+ LOG.debug(msg + ' ' + pp(self.dns))
+
+ # -------------------------------------------------------------------------
+ def eval_user_dns(self, user):
+
+ LOG.debug(_("Evaluating DNs of the user {!r} to remove ...").format(user))
+
+ usr = user.strip()
+ if usr == '':
+ msg = _("Empty user given.")
+ LOG.warn(msg)
+ self.wrong_users = True
+ return
+
+ for inst in self.ldap_instances:
+ connect_info = self.cfg.ldap_connection[inst]
+ dn = self.get_user_dn(usr, inst)
+ if dn:
+ if inst not in self.dns:
+ self.dns[inst] = []
+ if is_sequence(dn):
+ self.dns[inst] += dn
+ if len(dn) > 1:
+ msg = _("Found {nr} entries for user {u!r} in LDAP instance {i}.").format(
+ nr=len(dn), u=usr, i=connect_info.url)
+ LOG.warn(msg)
+ self.wrong_users = True
+ return
+ else:
+ self.dns[inst].append(dn)
+ if self.verbose > 2:
+ msg = _("Got DN {dn!r} for user {user!r} in LDAP instance {inst}.").format(
+ dn=dn, user=usr, inst=connect_info.url)
+ LOG.debug(msg)
+ else:
+ msg = _("Did not found user {user!r} in LDAP instance {inst}.").format(
+ user=usr, inst=connect_info.url)
+ LOG.info(msg)
# =============================================================================