from .module_info import ModuleInfo
+from .module_list import ModuleInfoDict
+
from .puppetfile import Puppetfile, PuppetfileError
LOG = logging.getLogger(__name__)
self.puppet_root_env_dir = self.default_puppet_root_env_dir
self.forge_uri = self.default_forge_uri
self.http_timeout = self.default_http_timeout
- self.modules = {}
+ self.modules = None
self.description = textwrap.dedent('''\
Collects information about all used Puppet Modules from {url}
super(GetForgeModulesApp, self).__init__(
appname=appname, verbose=verbose, version=version)
+ self.modules = ModuleInfoDict(
+ appname=self.appname, verbose=self.verbose, base_dir=self.base_dir)
+
# -------------------------------------------------------------------------
def post_init(self):
self.get_forge_information()
if self.verbose > 2:
- LOG.debug("Found modules:\n{}".format(pp(self.modules)))
+ LOG.debug("Found modules:\n{}".format(pp(self.modules.as_list())))
self.write_cache_file()
# -------------------------------------------------------------------------
def collect_local_modules(self):
- self.modules = {}
+ self.modules = ModuleInfoDict(
+ appname=self.appname, verbose=self.verbose, base_dir=self.base_dir)
for env in self.environments:
self.read_puppetfile(env)
self.read_metadata_files(env)
+ if self.verbose > 2:
+ LOG.debug("ModuleInfoDict:\n{}".format(pp(self.modules.as_dict())))
# -------------------------------------------------------------------------
def get_forge_information(self):
self.modules = pfile.read_modules()
except PuppetfileError as e:
LOG.warn(str(e))
- self.modules = {}
+ self.modules = ModuleInfoDict(
+ appname=self.appname, verbose=self.verbose, base_dir=self.base_dir)
else:
if self.verbose > 2:
LOG.debug("Successful read {!r}.".format(pfile.filename))
--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+@author: Frank Brehm
+@contact: frank.brehm@pixelpark.com
+@copyright: © 2018 by Frank Brehm, Publicies Pixelpark GmbH, Berlin
+@summary: A module for encapsulating all information about a Puppet module
+"""
+from __future__ import absolute_import
+
+# Standard modules
+import logging
+import re
+import copy
+import warnings
+import time
+import datetime
+
+from operator import itemgetter, attrgetter
+
+from collections import MutableMapping
+
+from functools import cmp_to_key
+
+# Third party modules
+
+# Own modules
+from .common import pp, to_str, to_bool, is_sequence
+
+from .obj import BaseObjectError
+from .obj import BaseObject
+
+from .module_info import ModuleInfo
+
+__version__ = '0.1.1'
+
+LOG = logging.getLogger(__name__)
+
+# =============================================================================
+class ModuleInfoDict(MutableMapping, BaseObject):
+ """
+ A dictionary containing ModuleInfo objects.
+ It works like a dict.
+ i.e.:
+ modules = ModuleInfoDict(ModuleInfo(full_name='puppet-mongodb', ...))
+ and
+ modules['puppet-mongodb'] returns a ModuleInfo object for puppet module 'puppet-mongodb'
+ """
+
+ msg_invalid_modinfo_type = "Invalid value type {!r} to set, only ModuleInfo allowed."
+ msg_key_not_name = "The key {k!r} must be equal to the full name {n!r} of the module."
+ msg_none_type_error = "None type as key is not allowed."
+ msg_empty_key_error = "Empty key {!r} is not allowed."
+ msg_no_modinfo_dict = "Object {!r} is not a ModuleInfoDict object."
+
+ # -------------------------------------------------------------------------
+ # __init__() method required to create instance from class.
+ def __init__(
+ self, appname=None, verbose=0, version=__version__, base_dir=None, *args, **kwargs):
+
+ self._map = dict()
+
+ super(ModuleInfoDict, self).__init__(
+ appname=appname, verbose=verbose, version=version,
+ base_dir=base_dir, initialized=False,
+ )
+
+ for arg in args:
+ self.append(arg)
+
+ # -------------------------------------------------------------------------
+ def _set_item(self, key, module_info):
+
+ if not isinstance(module_info, ModuleInfo):
+ raise TypeError(self.msg_invalid_modinfo_type.format(module_info.__class__.__name__))
+
+ full_name = module_info.full_name
+ if full_name != key.lower():
+ raise KeyError(self.msg_key_not_name.format(k=key, n=full_name))
+
+ self._map[full_name] = module_info
+
+ # -------------------------------------------------------------------------
+ def append(self, module_info):
+
+ if not isinstance(module_info, ModuleInfo):
+ raise TypeError(self.msg_invalid_modinfo_type.format(module_info.__class__.__name__))
+ self._set_item(module_info.name, module_info)
+
+ # -------------------------------------------------------------------------
+ def as_dict(self, short=True):
+
+ res = super(ModuleInfoDict, self).as_dict(short=short)
+
+ res['items'] = {}
+ res['keys'] = []
+ for full_name in self.keys():
+ res['items'][full_name] = self._map[full_name].as_dict(short)
+ res['keys'].append(str(full_name))
+
+ return res
+
+ # -------------------------------------------------------------------------
+ def _get_item(self, key):
+
+ if key is None:
+ raise TypeError(self.msg_none_type_error)
+
+ full_name = str(key).lower().strip()
+ if full_name == '':
+ raise ValueError(self.msg_empty_key_error.format(key))
+
+ return self._map[full_name]
+
+ # -------------------------------------------------------------------------
+ def get(self, key):
+ return self._get_item(key)
+
+ # -------------------------------------------------------------------------
+ def _del_item(self, key, strict=True):
+
+ if key is None:
+ raise TypeError(self.msg_none_type_error)
+
+ full_name = str(key).lower().strip()
+ if full_name == '':
+ raise ValueError(self.msg_empty_key_error.format(key))
+
+ if not strict and full_name not in self._map:
+ return
+
+ del self._map[full_name]
+
+ # -------------------------------------------------------------------------
+ # The next five methods are requirements of the ABC.
+ def __setitem__(self, key, value):
+ self._set_item(key, value)
+
+ # -------------------------------------------------------------------------
+ def __getitem__(self, key):
+ return self._get_item(key)
+
+ # -------------------------------------------------------------------------
+ def __delitem__(self, key):
+ self._del_item(key)
+
+ # -------------------------------------------------------------------------
+ def __iter__(self):
+
+ for full_name in self.keys():
+ yield full_name
+
+ # -------------------------------------------------------------------------
+ def __len__(self):
+ return len(self._map)
+
+ # -------------------------------------------------------------------------
+ # The next methods aren't required, but nice for different purposes:
+ def __str__(self):
+ '''returns simple dict representation of the mapping'''
+ return str(self._map)
+
+ # -------------------------------------------------------------------------
+ def __contains__(self, key):
+ if key is None:
+ raise TypeError(self.msg_none_type_error)
+
+ full_name = str(key).lower().strip()
+ if full_name == '':
+ raise ValueError(self.msg_empty_key_error.format(key))
+
+ return full_name in self._map
+
+ # -------------------------------------------------------------------------
+ def keys(self):
+
+ def compare_items(x, y):
+ if self.verbose > 4:
+ LOG.debug("Comparing names {!r} > {!r}".format(x.name, y.name))
+ if x.name != y.name:
+ if x.name > y.name:
+ return 1
+ return -1
+ if self.verbose > 4:
+ LOG.debug("Comparing vendor {!r} > {!r}".format(x.vendor, y.vendor))
+ if x.vendor != y.vendor:
+ if x.vendor > y.vendor:
+ return 1
+ return -1
+ return 0
+
+ return sorted(
+ self._map.keys(),
+ key=lambda x: cmp_to_key(compare_items)(self._map[x]))
+
+ # -------------------------------------------------------------------------
+ def items(self):
+
+ item_list = []
+
+ for full_name in self.keys():
+ item_list.append((full_name, self._map[full_name]))
+
+ return item_list
+
+ # -------------------------------------------------------------------------
+ def values(self):
+
+ value_list = []
+ for full_name in self.keys():
+ value_list.append(self._map[full_name])
+ return value_list
+
+ # -------------------------------------------------------------------------
+ def __eq__(self, other):
+
+ if not isinstance(other, ModuleInfoDict):
+ raise TypeError(self.msg_no_modinfo_dict.format(other))
+
+ return self._map == other._map
+
+ # -------------------------------------------------------------------------
+ def __ne__(self, other):
+
+ if not isinstance(other, ModuleInfoDict):
+ raise TypeError(self.msg_no_modinfo_dict.format(other))
+
+ return self._map != other._map
+
+ # -------------------------------------------------------------------------
+ def pop(self, key, *args):
+
+ if key is None:
+ raise TypeError(self.msg_none_type_error)
+
+ full_name = str(key).lower().strip()
+ if full_name == '':
+ raise ValueError(self.msg_empty_key_error.format(key))
+
+ return self._map.pop(full_name, *args)
+
+ # -------------------------------------------------------------------------
+ def popitem(self):
+
+ if not len(self._map):
+ return None
+
+ full_name = self.keys()[0]
+ zone = self._map[full_name]
+ del self._map[full_name]
+ return (full_name, zone)
+
+ # -------------------------------------------------------------------------
+ def clear(self):
+ self._map = dict()
+
+ # -------------------------------------------------------------------------
+ def setdefault(self, key, default):
+
+ if key is None:
+ raise TypeError(self.msg_none_type_error)
+
+ full_name = str(key).lower().strip()
+ if full_name == '':
+ raise ValueError(self.msg_empty_key_error.format(key))
+
+ if not isinstance(default, ModuleInfo):
+ raise TypeError(self.msg_invalid_modinfo_type.format(default.__class__.__name__))
+
+ if full_name in self._map:
+ return self._map[full_name]
+
+ self._set_item(full_name, default)
+ return default
+
+ # -------------------------------------------------------------------------
+ def update(self, other):
+
+ if isinstance(other, ModuleInfoDict) or isinstance(other, dict):
+ for full_name in other.keys():
+ self._set_item(full_name, other[full_name])
+ return
+
+ for tokens in other:
+ key = tokens[0]
+ value = tokens[1]
+ self._set_item(key, value)
+
+ # -------------------------------------------------------------------------
+ def as_list(self, short=True):
+
+ res = []
+ for full_name in self.keys():
+ res.append(self._map[full_name].as_dict(short))
+ return res
+
+
+# =============================================================================
+
+if __name__ == "__main__":
+
+ pass
+
+# =============================================================================
+# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 list