"""
@author: Frank Brehm
@contact: frank@brehm-online.com
-@copyright: © 2020 by Frank Brehm, Berlin
+@copyright: © 2021 by Frank Brehm, Berlin
@summary: All classes used by ldap_migration package
"""
from __future__ import absolute_import
from fb_tools.app import BaseApplication, DirectoryOptionAction
from fb_tools.config import CfgFileOptionAction
from fb_tools.errors import FbAppError
+from fb_tools.collections import CIStringSet, CIDict
from .config import LDAPMigrationConfiguration
-from .idict import CaseInsensitiveDict
-from .istringset import CaseInsensitiveStringSet
-__version__ = '0.8.12'
+__version__ = '0.9.0'
LOG = logging.getLogger(__name__)
CFG_BASENAME = 'ldap-migration.ini'
self.struct_entries = []
self.all_entries = []
- self.group_entries = CaseInsensitiveDict()
+ self.group_entries = CIDict()
self.limit = 0
self.count_added = 0
self.count_modified = 0
- self.object_classes = CaseInsensitiveDict()
- self.attribute_types = CaseInsensitiveDict()
- self.dns = CaseInsensitiveDict()
- self.struct_dns = CaseInsensitiveDict()
- self.migrated_entries = CaseInsensitiveDict()
- self.integer_attribute_types = CaseInsensitiveStringSet([])
- self.src_items_not_found = CaseInsensitiveStringSet([])
- self.unknown_attributetypes = CaseInsensitiveStringSet([])
- self.unknown_objectclasses = CaseInsensitiveStringSet([])
- self.dn_attr_types = CaseInsensitiveStringSet([])
- self.pure_binary_attr_types = CaseInsensitiveStringSet([])
+ self.object_classes = CIDict()
+ self.attribute_types = CIDict()
+ self.dns = CIDict()
+ self.struct_dns = CIDict()
+ self.migrated_entries = CIDict()
+ self.integer_attribute_types = CIStringSet()
+ self.src_items_not_found = CIStringSet()
+ self.unknown_attributetypes = CIStringSet()
+ self.unknown_objectclasses = CIStringSet()
+ self.dn_attr_types = CIStringSet()
+ self.pure_binary_attr_types = CIStringSet()
super(LDAPMigrationApplication, self).__init__(
LOG.debug("The 'memberOf' attribute will not be migrated.")
continue
- key = self.attribute_types.get_key(attribute, strict=False)
+ key = None
+ if attribute in self.attribute_types:
+ key = self.attribute_types.real_key(attribute)
if not key:
if self.verbose > 1:
msg = (
def lookup_for_attrtype(self, attrtype, silent=True):
at_lc = attrtype.lower()
- canon_attrtype = self.attribute_types.get_key(attrtype, strict=False)
+ canon_attrtype = None
+ if attrtype in self.attribute_types:
+ canon_attrtype = self.attribute_types.real_key(attrtype)
if canon_attrtype is None:
msg = "AttributeType {!r} not found.".format(attrtype)
if silent:
while len(tokens):
token = tokens.pop(0)
if 'childs' not in iter_hash:
- iter_hash['childs'] = CaseInsensitiveDict()
+ iter_hash['childs'] = CIDict()
if token not in iter_hash['childs']:
- iter_hash['childs'][token] = CaseInsensitiveDict()
+ iter_hash['childs'][token] = CIDict()
if 'childs' not in iter_hash['childs'][token]:
- iter_hash['childs'][token]['childs'] = CaseInsensitiveDict()
+ iter_hash['childs'][token]['childs'] = CIDict()
iter_hash = iter_hash['childs'][token]
if not len(tokens):
if self.verbose > 3:
object_classes = []
target_entry = {}
- used_classes = CaseInsensitiveStringSet([])
+ used_classes = CIStringSet()
for attribute_name in src_entry['attributes']:
if attribute_name.lower() == 'objectclass':
LOG.debug(msg)
self.unknown_objectclasses.add(src_oc_name)
continue
- tgt_oc_name = self.object_classes.get_key(src_oc_name)
+ tgt_oc_name = self.object_classes.real_key(src_oc_name)
used_classes.add(tgt_oc_name)
object_classes.append(tgt_oc_name)
continue
msg = msg.format(at=attribute_name, dn=src_dn)
LOG.debug(msg)
continue
- tgt_at_name = self.attribute_types.get_key(attribute_name)
+ tgt_at_name = self.attribute_types.real_key(attribute_name)
target_entry[tgt_at_name] = copy.copy(src_entry['attributes'][attribute_name])
if ('sunservice' in used_classes) or ('sunServiceComponent' in used_classes):
changes = {}
- src_obj_classes = CaseInsensitiveDict()
- src_attributes = CaseInsensitiveDict()
+ src_obj_classes = CIDict()
+ src_attributes = CIDict()
- tgt_obj_classes = CaseInsensitiveDict()
- tgt_attributes = CaseInsensitiveDict()
+ tgt_obj_classes = CIDict()
+ tgt_attributes = CIDict()
for src_at_name in src_entry['attributes']:
msg = msg.format(oc=src_oc_name, dn=src_dn)
LOG.debug(msg)
continue
- tgt_oc_name = self.object_classes.get_key(src_oc_name)
+ tgt_oc_name = self.object_classes.real_key(src_oc_name)
objectclasses_to_add.append(tgt_oc_name)
if objectclasses_to_add:
LOG.debug(msg)
continue
- tgt_at_name = self.attribute_types.get_key(src_at_name)
+ tgt_at_name = self.attribute_types.real_key(src_at_name)
src_value = src_attributes[src_at_name]
do_replace = False
if tgt_at_name in tgt_attributes:
src_entry = self.get_source_item(src_dn, tgt_dn, with_acl=with_acl)
if not migrate_if_group:
- object_classes = CaseInsensitiveStringSet([])
+ object_classes = CIStringSet()
for src_at_name in src_entry['attributes']:
if src_at_name.lower() == 'objectclass':
for src_oc_name in src_entry['attributes'][src_at_name]: