# Third party modules
from fb_tools.collections import CIStringSet
-from fb_tools.common import is_sequence, to_bool
+# from fb_tools.common import is_sequence
+# from fb_tools.common import to_bool
from fb_tools.xlate import format_list
-from ldap3 import MODIFY_ADD, MODIFY_DELETE, MODIFY_REPLACE
+# from ldap3 import MODIFY_ADD
+# from ldap3 import MODIFY_DELETE
+# from ldap3 import MODIFY_REPLACE
# Own modules
from .ldap import BaseLdapApplication
-from .ldap import FatalLDAPError, LdapAppError
+# from .ldap import FatalLDAPError
+from .ldap import LdapAppError
from .. import pp
from ..argparse_actions import LimitedIntegerOptionAction
from ..xlate import XLATOR
-__version__ = '0.1.0'
+__version__ = '0.2.0'
LOG = logging.getLogger(__name__)
_ = XLATOR.gettext
default_max_cycles = 20
max_max_cycles = 1000
- group_object_classes = ('posixGroup', 'groupOfNames', 'uniqueMember')
- member_attributes = ('member', 'uniqueMember', 'memberUid', 'mgrpRFC822MailMember')
+ group_object_classes = ('groupOfNames', 'groupOfUniqueNames', 'groupOfURLs', 'posixGroup')
+ member_attributes = (
+ 'member', 'memberUid', 'memberURL', 'mgrpRFC822MailMember', 'uniqueMember')
# -------------------------------------------------------------------------
def __init__(self, appname=None, base_dir=None):
self.current_cycle = 0
self.last_nr_groups_done = 0
+ self._base_dn = None
self._max_cycles = self.default_max_cycles
+ list_oc = format_list(self.group_object_classes, do_repr=True)
+ list_attr = format_list(self.member_attributes, do_repr=True)
+
desc = _(
'Removes all LDAP groups, which does not have any members, that means, they are one '
- 'of the following objectClasses:')
- desc += ' ' + format_list(self.group_object_classes) + ', '
- desc += _('and they have none of the following attributes:')
- desc += ' ' + format_list(self.member_attributes, do_repr=True)
+ 'of the following objectClasses: {ocl}, and they have none of the following '
+ 'attributes: {atl}.').format(ocl=list_oc, atl=list_attr)
super(CleanEmptyLdapGroupsApplication, self).__init__(
appname=appname, description=desc, base_dir=base_dir, initialized=False)
self.initialized = True
+ # -------------------------------------------
+ @property
+ def base_dn(self):
+ """Return the base dn for the groups to search."""
+ return self._base_dn
+
+ @base_dn.setter
+ def base_dn(self, value):
+ if value is None:
+ self._base_dn = None
+ return
+ v = str(value).strip()
+ if v:
+ self._base_dn = v
+ else:
+ self._base_dn = None
+
# -------------------------------------------
@property
def max_cycles(self):
"""
res = super(CleanEmptyLdapGroupsApplication, self).as_dict(short=short)
+ res['base_dn'] = self.base_dn
res['default_max_cycles'] = self.default_max_cycles
res['group_object_classes'] = copy.copy(self.group_object_classes)
res['max_cycles'] = self.max_cycles
"""Initialize specific command line parameters for this application."""
remove_group = self.arg_parser.add_argument_group(_('Removing options'))
+ remove_group.add_argument(
+ '-b', '--base-dn', metavar='DN', dest='ldap_base_dn',
+ help=_('The base DN used as the root for the LDAP search.')
+ )
+
cycles_help = _(
'The maximum number of iteration cycles for searching for empty groups. '
'It must not be less or equal to zero and must not be greater than {}.')
if cycles:
self.max_cycles = cycles
+ self.base_dn = getattr(self.args, 'ldap_base_dn', None)
+
self.check_instances()
# -------------------------------------------------------------------------
LOG.debug(_('Searching for empty groups ...'))
+ inst = self.ldap_instances[0]
+ self.get_empty_groups(inst)
+
+ # -------------------------------------------------------------------------
+ def get_empty_groups(self, inst):
+ """Retrieve all LDAP groups without members."""
+ self.dns_todo = CIStringSet()
+
+ ldap_filter = '(&(|'
+ ldap_filter += ''.join('(objectClass={})'.format(x) for x in self.group_object_classes)
+ ldap_filter += ')'
+ ldap_filter += ''.join('(!({}=*))'.format(x) for x in self.member_attributes)
+ ldap_filter += ')'
+
+ # LOG.debug('LDAP filter: ' + ldap_filter)
+ dns = self.get_all_entry_dns(
+ inst=inst, ldap_filter=ldap_filter, base_dn=self.base_dn, no_complain=True)
+ for dn in dns:
+ if dn not in self.dns_done:
+ self.dns_todo.add(dn)
+ if self.verbose > 1:
+ LOG.debug('Found Group-DNs:\n' + pp(self.dns_todo.as_list()))
+
# =============================================================================
if __name__ == '__main__':