--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+@author: Frank Brehm
+@contact: frank.brehm@pixelpark.com
+@copyright: © 2018 by Frank Brehm, Publicies Pixelpark GmbH, Berlin
+@summary: A module for encapsulating all information about a Puppet module
+"""
+from __future__ import absolute_import
+
+# Standard modules
+import logging
+import re
+import copy
+import warnings
+import time
+import datetime
+import collections
+
+# Own modules
+from fb_tools.common import pp, to_str, to_bool, is_sequence
+from fb_tools.obj import FbBaseObjectError, FbBaseObject
+
+from ..xlate import XLATOR
+
+from ..base_module_info import BaseModuleInfoError, BaseModuleInfo
+
+from ..module_meta_info import ModuleMetadata
+
+__version__ = '0.1.0'
+
+LOG = logging.getLogger(__name__)
+
+_ = XLATOR.gettext
+ngettext = XLATOR.ngettext
+
+
+# =============================================================================
+def parse_forge_date(dt):
+
+ return datetime.datetime.strptime(dt, '%Y-%m-%d %H:%M:%S %z')
+
+
+# =============================================================================
+class ReleaseInfoError(BaseModuleInfoError):
+
+ pass
+
+
+# =============================================================================
+class ForgeModuleInfoError(BaseModuleInfoError):
+
+ pass
+
+
+# =============================================================================
+class ForgeModuleInfoTypeError(ForgeModuleInfoError, TypeError):
+
+ pass
+
+
+# =============================================================================
+class BaseForgeObject(FbBaseObject):
+ """Base class for the most forge classes."""
+
+ # -------------------------------------------------------------------------
+ def __init__(
+ self, slug=None, uri=None,
+ appname=None, verbose=0, version=__version__, base_dir=None, initialized=None):
+
+ self._slug = None
+ self._uri = None
+
+ super(BaseForgeObject, self).__init__(
+ appname=appname, verbose=verbose, version=version, base_dir=base_dir, initialized=False)
+
+ self.slug = slug
+ self.uri = uri
+
+ if initialized is not None:
+ self.initialized = initialized
+
+ # -------------------------------------------------------------------------
+ def as_dict(self, short=True):
+ """
+ Transforms the elements of the object into a dict
+
+ @return: structure as dict
+ @rtype: dict
+ """
+
+ res = super(BaseForgeObject, self).as_dict(short=short)
+
+ res['slug'] = self.slug
+ res['uri'] = self.uri
+
+ return res
+
+ # -------------------------------------------------------------------------
+ @property
+ def slug(self):
+ """The slug of this owner."""
+ return self._slug
+
+ @slug.setter
+ def slug(self, value):
+ if value is None:
+ self._slug = None
+ return
+ v = str(value).strip()
+ if value == '':
+ self._slug = None
+ return
+ self._slug = v
+
+ # -------------------------------------------------------------------------
+ @property
+ def uri(self):
+ """The URI of this owner."""
+ return self._uri
+
+ @uri.setter
+ def uri(self, value):
+ if value is None:
+ self._uri = None
+ return
+ v = str(value).strip()
+ if value == '':
+ self._uri = None
+ return
+ self._uri = v
+
+ # -------------------------------------------------------------------------
+ def to_data(self):
+ """Returning a dict, which can be used to re-instantiate this owner info."""
+
+ res = {
+ 'slug': self.slug,
+ 'uri': self.uri,
+ }
+
+ return res
+
+ # -------------------------------------------------------------------------
+ def __copy__(self):
+
+ new = self.__class__(
+ appname=self.appname, verbose=self.verbose, base_dir=self.base_dir)
+
+ self.copy_to(new)
+ new.initialized = self.initialized
+
+ return new
+
+ # -------------------------------------------------------------------------
+ def copy_to(self, new):
+
+ if not isinstance(new, BaseForgeObject):
+ msg = _("Parameter {p!r} is not of class {c!r}, but of {e!r} instead.").format(
+ p='new', c='BaseForgeObject', e=new.__class__.__name__)
+ raise TypeError(msg)
+
+ new.slug = self.slug
+ new.uri = self.uri
+
+ # -------------------------------------------------------------------------
+ def __eq__(self, other):
+
+ if self.verbose > 4:
+ LOG.debug(_("Comparing {} objects ...").format(self.__class__.__name__))
+
+ if not isinstance(other, BaseForgeObject):
+ return False
+
+ if self.slug != other.slug:
+ return False
+ if self.uri != other.uri:
+ return False
+
+ return True
+
+ # -------------------------------------------------------------------------
+ @classmethod
+ def from_data(cls, data, appname=None, verbose=0, base_dir=None):
+
+ owner = cls(appname=appname, verbose=verbose, base_dir=base_dir)
+
+ owner.apply_data(data)
+
+ owner.initialized = True
+
+ if verbose > 3:
+ LOG.debug(_("Got {}:").format(cls.__name__) + '\n' + pp(owner.as_dict()))
+
+ return owner
+
+ # -------------------------------------------------------------------------
+ def apply_data(self, data):
+
+ if 'gravatar_id' in data:
+ self.gravatar_id = data['gravatar_id']
+ if 'username' in data:
+ self.username = data['username']
+
+
+# =============================================================================
+if __name__ == "__main__":
+
+ pass
+
+# =============================================================================
+
+# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 list
--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+@author: Frank Brehm
+@contact: frank.brehm@pixelpark.com
+@copyright: © 2018 by Frank Brehm, Publicies Pixelpark GmbH, Berlin
+@summary: A module for encapsulating all information about a Puppet module
+"""
+from __future__ import absolute_import
+
+# Standard modules
+import logging
+import re
+import copy
+import warnings
+import time
+import datetime
+import collections
+
+
+# Own modules
+from fb_tools.common import pp, to_str, to_bool, is_sequence
+
+from ..xlate import XLATOR
+
+from . import parse_forge_date
+from . import ReleaseInfoError, ForgeModuleInfoError, ForgeModuleInfoTypeError
+from . import BaseForgeObject
+
+from .owner_info import ForgeOwnerInfo
+
+__version__ = '0.1.0'
+
+LOG = logging.getLogger(__name__)
+
+_ = XLATOR.gettext
+ngettext = XLATOR.ngettext
+
+
+# =============================================================================
+class BaseForgeModuleInfo(BaseForgeObject):
+
+ # -------------------------------------------------------------------------
+ def __init__(
+ self, appname=None, verbose=0, version=__version__, base_dir=None,
+ initialized=None):
+
+ self._deprecated_at = None
+ self._name = None
+ self.owner = None
+
+ super(BaseForgeModuleInfo, self).__init__(
+ appname=appname, verbose=verbose, version=version, base_dir=base_dir, initialized=False)
+
+ if initialized is not None:
+ self.initialized = initialized
+
+ # -------------------------------------------------------------------------
+ def as_dict(self, short=True):
+ """
+ Transforms the elements of the object into a dict
+
+ @return: structure as dict
+ @rtype: dict
+ """
+
+ res = super(BaseForgeModuleInfo, self).as_dict(short=short)
+
+ res['deprecated_at'] = self.deprecated_at
+ res['name'] = self.name
+
+ return res
+
+ # -------------------------------------------------------------------------
+ @property
+ def deprecated_at(self):
+ """Date of deprecation of this module."""
+ return self._deprecated_at
+
+ @deprecated_at.setter
+ def deprecated_at(self, value):
+ if value is None:
+ self._deprecated_at = None
+ return
+ if isinstance(value, datetime.datetime):
+ self._deprecated_at = value
+ return
+ v = str(value).strip()
+ if value == '':
+ self._deprecated_at = None
+ return
+ self._deprecated_at = parse_forge_date(v)
+
+ # -------------------------------------------------------------------------
+ @property
+ def name(self):
+ """The name of this module."""
+ return self._name
+
+ @name.setter
+ def name(self, value):
+ if value is None:
+ self._name = None
+ return
+ v = str(value).strip()
+ if value == '':
+ self._name = None
+ return
+ self._name = v
+
+ # -------------------------------------------------------------------------
+ def to_data(self):
+ """Returning a dict, which can be used to re-instantiate this module info."""
+
+ res = super(BaseForgeModuleInfo, self).to_data()
+
+ res['deprecated_at'] = None
+ if self.deprecated_at:
+ res['deprecated_at'] = self.deprecated_at.strftime('%Y-%m-%d %H:%M:%S %z')
+
+ res['owner'] = None
+ if self.owner:
+ res['owner'] = self.owner.to_data()
+
+ res['name'] = self.name
+
+ return res
+
+ # -------------------------------------------------------------------------
+ def copy_to(self, new):
+
+ if not isinstance(new, BaseForgeModuleInfo):
+ msg = _("Parameter {p!r} is not of class {c!r}, but of {e!r} instead.").format(
+ p='new', c='BaseForgeModuleInfo', e=new.__class__.__name__)
+ raise TypeError(msg)
+
+ super(BaseForgeModuleInfo, self).copy_to(new)
+ new.deprecated_at = self.deprecated_at
+ new.name = self.name
+ if self.owner:
+ new.owner = copy.copy(self.owner)
+
+ # -------------------------------------------------------------------------
+ def __eq__(self, other):
+
+ if self.verbose > 4:
+ LOG.debug(_("Comparing {} objects ...").format(self.__class__.__name__))
+
+ if not super(BaseForgeModuleInfo, self).__eq__(other):
+ return False
+
+ if not isinstance(other, BaseForgeModuleInfo):
+ return False
+
+ if self.deprecated_at != other.deprecated_at:
+ return False
+ if self.name != other.name:
+ return False
+ if self.owner != other.owner:
+ return False
+
+ return True
+
+ # -------------------------------------------------------------------------
+ def apply_data(self, data):
+
+ super(BaseForgeModuleInfo, self).apply_data(data)
+
+ if 'deprecated_at' in data:
+ self.deprecated_at = data['deprecated_at']
+ if 'name' in data:
+ self.name = data['name']
+
+ if 'owner' in data and data['owner']:
+ self.owner = ForgeOwnerInfo.from_data(
+ data['owner'], appname=self.appname,
+ verbose=self.verbose, base_dir=self.base_dir)
+
+
+# =============================================================================
+if __name__ == "__main__":
+
+ pass
+
+# =============================================================================
+
+# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 list
--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+@author: Frank Brehm
+@contact: frank.brehm@pixelpark.com
+@copyright: © 2018 by Frank Brehm, Publicies Pixelpark GmbH, Berlin
+@summary: A module for encapsulating all information about a Puppet module
+"""
+from __future__ import absolute_import
+
+# Standard modules
+import logging
+import re
+import copy
+import warnings
+import time
+import datetime
+import collections
+
+
+# Own modules
+from fb_tools.common import pp, to_str, to_bool, is_sequence
+
+from ..xlate import XLATOR
+
+from . import parse_forge_date
+from . import ReleaseInfoError, ForgeModuleInfoError, ForgeModuleInfoTypeError
+from . import BaseForgeObject
+
+from ..module_meta_info import ModuleMetadata
+
+from .owner_info import ForgeOwnerInfo
+
+from .mod_release_info import ModuleReleaseInfo
+
+from .base_module_info import BaseForgeModuleInfo
+
+__version__ = '0.1.0'
+
+LOG = logging.getLogger(__name__)
+
+_ = XLATOR.gettext
+ngettext = XLATOR.ngettext
+
+
+# =============================================================================
+class CurrentModuleReleaseInfo(ModuleReleaseInfo):
+ """Class for encapsulating information about the current release of a Puppet module
+ from Puppet Forge."""
+
+ # -------------------------------------------------------------------------
+ def __init__(
+ self, appname=None, verbose=0, version=__version__, base_dir=None,
+ initialized=None):
+
+ self._changelog = None
+ self._deleted_for = None
+ self._downloads = None
+ self._file_md5 = None
+ self._license = None
+ self.metadata = None
+ self.module = None
+ self._pdk = False
+ self._readme = None
+ self._reference = None
+ self.tags = []
+ self.tasks = []
+ self._updated_at = None
+ self._validation_score = None
+
+ super(CurrentModuleReleaseInfo, self).__init__(
+ appname=appname, verbose=verbose, version=version, base_dir=base_dir, initialized=False)
+
+ if initialized is not None:
+ self.initialized = initialized
+
+ # -------------------------------------------------------------------------
+ def as_dict(self, short=True):
+ """
+ Transforms the elements of the object into a dict
+
+ @return: structure as dict
+ @rtype: dict
+ """
+
+ res = super(CurrentModuleReleaseInfo, self).as_dict(short=short)
+
+ res['changelog'] = self.changelog
+ res['deleted_for'] = self.deleted_for
+ res['downloads'] = self.downloads
+ res['file_md5'] = self.file_md5
+ res['license'] = self.license
+ res['pdk'] = self.pdk
+ res['readme'] = self.readme
+ res['reference'] = self.reference
+ res['updated_at'] = self.updated_at
+ res['validation_score'] = self.validation_score
+
+ return res
+
+ # -------------------------------------------------------------------------
+ @property
+ def changelog(self):
+ """The change log of this module release."""
+ return self._changelog
+
+ @changelog.setter
+ def changelog(self, value):
+ if value is None:
+ self._changelog = None
+ return
+ v = str(value).strip()
+ if v == '':
+ self._changelog = None
+ return
+ self._changelog = v
+
+ # -------------------------------------------------------------------------
+ @property
+ def deleted_for(self):
+ """The reason, why this module release was deleted."""
+ return self._deleted_for
+
+ @deleted_for.setter
+ def deleted_for(self, value):
+ if value is None:
+ self._deleted_for = None
+ return
+ v = str(value).strip()
+ if v == '':
+ self._deleted_for = None
+ return
+ self._deleted_for = v
+
+ # -------------------------------------------------------------------------
+ @property
+ def downloads(self):
+ "The number of downloads of this module release."""
+ return self._downloads
+
+ @downloads.setter
+ def downloads(self, value):
+ if value is None:
+ self._downloads = None
+ return
+ try:
+ v = int(value)
+ self._downloads = v
+ return
+ except Exception as e:
+ LOG.error(_("Got a {c} setting {w}: {e}").format(
+ c=e.__class__.__name__, w='downloads', e=e))
+
+ # -------------------------------------------------------------------------
+ @property
+ def file_md5(self):
+ """The MD5-sum of the current release package."""
+ return self._file_md5
+
+ @file_md5.setter
+ def file_md5(self, value):
+ if value is None:
+ self._file_md5 = None
+ return
+ v = str(value).strip()
+ if v == '':
+ self._file_md5 = None
+ return
+ self._file_md5 = v
+
+ # -------------------------------------------------------------------------
+ @property
+ def license(self):
+ """The license of module source."""
+ return self._license
+
+ @license.setter
+ def license(self, value):
+ if value is None:
+ self._license = None
+ return
+ v = str(value).strip()
+ if v == '':
+ self._license = None
+ return
+ self._license = v
+
+ # -------------------------------------------------------------------------
+ @property
+ def pdk(self):
+ """The pdk of this release."""
+ return self._pdk
+
+ @pdk.setter
+ def pdk(self, value):
+ if value is None:
+ self._pdk = None
+ return
+ self._pdk = to_bool(value)
+
+ # -------------------------------------------------------------------------
+ @property
+ def readme(self):
+ """The readme of module release."""
+ return self._readme
+
+ @readme.setter
+ def readme(self, value):
+ if value is None:
+ self._readme = None
+ return
+ v = str(value).strip()
+ if v == '':
+ self._readme = None
+ return
+ self._readme = v
+
+ # -------------------------------------------------------------------------
+ @property
+ def reference(self):
+ """The reference of module release."""
+ return self._reference
+
+ @reference.setter
+ def reference(self, value):
+ if value is None:
+ self._reference = None
+ return
+ v = str(value).strip()
+ if v == '':
+ self._reference = None
+ return
+ self._reference = v
+
+ # -------------------------------------------------------------------------
+ @property
+ def updated_at(self):
+ """Update date of this release."""
+ return self._updated_at
+
+ @updated_at.setter
+ def updated_at(self, value):
+ if value is None:
+ self._updated_at = None
+ return
+ if isinstance(value, datetime.datetime):
+ self._updated_at = value
+ return
+ v = str(value).strip()
+ if value == '':
+ self._updated_at = None
+ return
+ self._updated_at = parse_forge_date(v)
+
+ # -------------------------------------------------------------------------
+ @property
+ def validation_score(self):
+ "The validation score of this module release."""
+ return self._validation_score
+
+ @validation_score.setter
+ def validation_score(self, value):
+ if value is None:
+ self._validation_score = None
+ return
+ try:
+ v = int(value)
+ self._validation_score = v
+ return
+ except Exception as e:
+ LOG.error(_("Got a {c} setting {w}: {e}").format(
+ c=e.__class__.__name__, w='validation_score', e=e))
+
+ # -------------------------------------------------------------------------
+ def to_data(self):
+ """Returning a dict, which can be used to re-instantiate this module info."""
+
+ res = super(CurrentModuleReleaseInfo, self).to_data()
+
+ res['changelog'] = self.changelog
+ res['deleted_for'] = self.deleted_for
+ res['downloads'] = self.downloads
+ res['file_md5'] = self.file_md5
+ res['license'] = self.license
+ res['pdk'] = self.pdk
+ res['readme'] = self.readme
+ res['reference'] = self.reference
+ res['validation_score'] = self.validation_score
+
+ res['metadata'] = None
+ if self.metadata:
+ res['metadata'] = self.metadata.to_data()
+
+ res['module'] = None
+ if self.module:
+ res['module'] = self.module.to_data()
+
+ res['updated_at'] = None
+ if self.updated_at:
+ res['updated_at'] = self.updated_at.strftime('%Y-%m-%d %H:%M:%S %z')
+
+ res['tags'] = []
+ for tag in self.tags:
+ res['tags'].append(tag)
+
+ res['tasks'] = []
+ for task in self.tasks:
+ res['tasks'].append(copy.copy(task))
+
+ return res
+
+ # -------------------------------------------------------------------------
+ def copy_to(self, new):
+
+ if not isinstance(new, CurrentModuleReleaseInfo):
+ msg = _("Parameter {p!r} is not of class {c!r}, but of {e!r} instead.").format(
+ p='new', c='CurrentModuleReleaseInfo', e=new.__class__.__name__)
+ raise TypeError(msg)
+
+ super(CurrentModuleReleaseInfo, self).copy_to(new)
+
+ new.changelog = self.changelog
+ new.deleted_for = self.deleted_for
+ new.downloads = self.downloads
+ new.file_md5 = self.file_md5
+ new.license = self.license
+ new.pdk = self.pdk
+ new.readme = self.readme
+ new.reference = self.reference
+ new.updated_at = self.updated_at
+ new.validation_score = self.validation_score
+
+ new.metadata = None
+ if self.metadata:
+ new.metadata = copy.copy(self.metadata)
+
+ new.module = None
+ if self.module:
+ new.module = copy.copy(self.module)
+
+ new.tags = []
+ if self.tags:
+ for tag in self.tags:
+ new.tags.append(tag)
+
+ new.tasks = []
+ if self.tasks:
+ for task in self.tasks:
+ new.tasks.append(copy.copy(task))
+
+ # -------------------------------------------------------------------------
+ def __eq__(self, other):
+
+ if self.verbose > 4:
+ LOG.debug(_("Comparing {} objects ...").format(self.__class__.__name__))
+
+ if not super(CurrentModuleReleaseInfo, self).__eq__(other):
+ return False
+
+ if not isinstance(other, CurrentModuleReleaseInfo):
+ return False
+
+ if self.changelog != other.changelog:
+ return False
+ if self.deleted_for != other.deleted_for:
+ return False
+ if self.downloads != other.downloads:
+ return False
+ if self.file_md5 != other.file_md5:
+ return False
+ if self.license != other.license:
+ return False
+ if self.pdk != other.pdk:
+ return False
+ if self.readme != other.readme:
+ return False
+ if self.reference != other.reference:
+ return False
+ if self.updated_at != other.updated_at:
+ return False
+ if self.validation_score != other.validation_score:
+ return False
+ if self.metadata != other.metadata:
+ return False
+ if self.module != other.module:
+ return False
+ if self.tags != other.tags:
+ return False
+ if self.tasks != other.tasks:
+ return False
+
+ return True
+
+ # -------------------------------------------------------------------------
+ def apply_data(self, data):
+
+ super(CurrentModuleReleaseInfo, self).apply_data(data)
+
+ if 'changelog' in data and data['changelog']:
+ self.changelog = data['changelog']
+ if 'deleted_for' in data and data['deleted_for']:
+ self.deleted_for = data['deleted_for']
+ if 'downloads' in data and data['downloads']:
+ self.downloads = data['downloads']
+ if 'file_md5' in data and data['file_md5']:
+ self.file_md5 = data['file_md5']
+ if 'license' in data and data['license']:
+ self.license = data['license']
+
+ if 'metadata' in data and data['metadata']:
+ self.metadata = ModuleMetadata.from_json_data(
+ data['metadata'], appname=self.appname,
+ verbose=self.verbose, base_dir=self.base_dir)
+
+ if 'module' in data and data['module']:
+ self.module = BaseForgeModuleInfo.from_data(
+ data['module'], appname=self.appname,
+ verbose=self.verbose, base_dir=self.base_dir)
+
+ if 'pdk' in data:
+ self.pdk = data['pdk']
+ if 'readme' in data:
+ self.readme = data['readme']
+ if 'reference' in data:
+ self.reference = data['reference']
+ if 'updated_at' in data:
+ self.reference = data['updated_at']
+ if 'validation_score' in data:
+ self.validation_score = data['validation_score']
+
+ self.tags = []
+ if 'tags' in data and data['tags']:
+ if self.verbose > 2:
+ LOG.debug(
+ "Tags in current release of {!r}:".format(self.slug) + '\n' + pp(data['tags']))
+ for tag in data['tags']:
+ self.tags.append(tag)
+
+ self.tasks = []
+ if 'tasks' in data and data['tasks']:
+ if self.verbose > 3:
+ LOG.debug(
+ "Tasks in current release of {!r}:".format(self.slug) +
+ '\n' + pp(data['tasks']))
+ for task in data['tasks']:
+ self.tasks.append(copy.copy(task))
+
+
+# =============================================================================
+if __name__ == "__main__":
+
+ pass
+
+# =============================================================================
+
+# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 list
--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+@author: Frank Brehm
+@contact: frank.brehm@pixelpark.com
+@copyright: © 2018 by Frank Brehm, Publicies Pixelpark GmbH, Berlin
+@summary: A module for encapsulating all information about a Puppet module
+"""
+from __future__ import absolute_import
+
+# Standard modules
+import logging
+
+from collections import MutableMapping
+
+from functools import cmp_to_key
+
+# Third party modules
+
+# Own modules
+from fb_tools.common import to_bool
+from fb_tools.obj import FbBaseObject
+
+from .mod_info import ForgeModuleInfo
+
+from ..xlate import XLATOR
+
+__version__ = '0.1.0'
+
+LOG = logging.getLogger(__name__)
+
+_ = XLATOR.gettext
+ngettext = XLATOR.ngettext
+
+
+# =============================================================================
+class ForgeModuleDict(MutableMapping, FbBaseObject):
+ """
+ A dictionary containing ForgeModuleInfo objects.
+ It works like a dict.
+ i.e.:
+ modules = ForgeModuleDict(ForgeModuleInfo(full_name='puppet-mongodb', ...))
+ and
+ modules['puppet-mongodb'] returns a ForgeModuleInfo object for puppet module 'puppet-mongodb'
+ """
+
+ msg_invalid_modinfo_type = _("Invalid value type {{!r}} to set, only {} allowed.").format(
+ 'ForgeModuleInfo')
+ msg_key_not_name = _("The key {k!r} must be equal to the full name {n!r} of the module.")
+ msg_none_type_error = _("None type as key is not allowed.")
+ msg_empty_key_error = _("Empty key {!r} is not allowed.")
+ msg_no_modinfo_dict = _("Object {{!r}} is not a {} object.").format('ForgeModuleDict')
+
+
+ # -------------------------------------------------------------------------
+ # __init__() method required to create instance from class.
+ def __init__(
+ self, appname=None, verbose=0, version=__version__, base_dir=None,
+ sort_by_name=False, *args, **kwargs):
+
+ self._map = dict()
+ self._sort_by_name = False
+
+ super(ForgeModuleDict, self).__init__(
+ appname=appname, verbose=verbose, version=version,
+ base_dir=base_dir, initialized=False,
+ )
+
+ self.sort_by_name = sort_by_name
+
+ for arg in args:
+ self.append(arg)
+
+ # -----------------------------------------------------------
+ @property
+ def sort_by_name(self):
+ """Sorting modules by name and vendor, instead of the full name."""
+ return self._sort_by_name
+
+ @sort_by_name.setter
+ def sort_by_name(self, value):
+ self._sort_by_name = to_bool(value)
+
+ # -------------------------------------------------------------------------
+ def _set_item(self, key, module_info):
+
+ if not isinstance(module_info, ForgeModuleInfo):
+ raise TypeError(self.msg_invalid_modinfo_type.format(module_info.__class__.__name__))
+
+ full_name = module_info.full_name
+ if full_name != key.lower():
+ raise KeyError(self.msg_key_not_name.format(k=key, n=full_name))
+
+ self._map[full_name] = module_info
+
+ # -------------------------------------------------------------------------
+ def append(self, module_info):
+
+ if not isinstance(module_info, ForgeModuleInfo):
+ raise TypeError(self.msg_invalid_modinfo_type.format(module_info.__class__.__name__))
+ self._set_item(module_info.full_name, module_info)
+
+ # -------------------------------------------------------------------------
+ def as_dict(self, short=True):
+
+ res = super(ForgeModuleDict, self).as_dict(short=short)
+
+ res['sort_by_name'] = self.sort_by_name
+ res['items'] = {}
+ res['keys'] = []
+ for full_name in self.keys():
+ res['items'][full_name] = self._map[full_name].as_dict(short)
+ res['keys'].append(str(full_name))
+
+ return res
+
+ # -------------------------------------------------------------------------
+ def _get_item(self, key):
+
+ if key is None:
+ raise TypeError(self.msg_none_type_error)
+
+ full_name = str(key).lower().strip()
+ if full_name == '':
+ raise ValueError(self.msg_empty_key_error.format(key))
+
+ return self._map[full_name]
+
+ # -------------------------------------------------------------------------
+ def get(self, key):
+ return self._get_item(key)
+
+ # -------------------------------------------------------------------------
+ def _del_item(self, key, strict=True):
+
+ if key is None:
+ raise TypeError(self.msg_none_type_error)
+
+ full_name = str(key).lower().strip()
+ if full_name == '':
+ raise ValueError(self.msg_empty_key_error.format(key))
+
+ if not strict and full_name not in self._map:
+ return
+
+ del self._map[full_name]
+
+ # -------------------------------------------------------------------------
+ def merge(self, item):
+
+ if not isinstance(item, ForgeModuleInfo):
+ raise TypeError(self.msg_invalid_modinfo_type.format(item.__class__.__name__))
+
+ full_name = item.full_name
+ if full_name in self._map.keys():
+ if self.verbose > 2:
+ LOG.debug("Merging module {!r}.".format(full_name))
+ self._map[full_name].merge_in(item)
+ else:
+ if self.verbose > 2:
+ LOG.debug("New module {!r}.".format(full_name))
+ self._set_item(full_name, item)
+
+ # -------------------------------------------------------------------------
+ # The next five methods are requirements of the ABC.
+ def __setitem__(self, key, value):
+ self._set_item(key, value)
+
+ # -------------------------------------------------------------------------
+ def __getitem__(self, key):
+ return self._get_item(key)
+
+ # -------------------------------------------------------------------------
+ def __delitem__(self, key):
+ self._del_item(key)
+
+ # -------------------------------------------------------------------------
+ def __iter__(self):
+
+ for full_name in self.keys():
+ yield full_name
+
+ # -------------------------------------------------------------------------
+ def __len__(self):
+ return len(self._map)
+
+ # -------------------------------------------------------------------------
+ # The next methods aren't required, but nice for different purposes:
+ def __str__(self):
+ '''returns simple dict representation of the mapping'''
+ return str(self._map)
+
+ # -------------------------------------------------------------------------
+ def __contains__(self, key):
+ if key is None:
+ raise TypeError(self.msg_none_type_error)
+
+ full_name = str(key).lower().strip()
+ if full_name == '':
+ raise ValueError(self.msg_empty_key_error.format(key))
+
+ if self.verbose > 4:
+ LOG.debug("Searching for key {!r} ...".format(key))
+
+ return full_name in self._map
+
+ # -------------------------------------------------------------------------
+ def keys(self):
+
+ def compare_items(x, y):
+ if self.sort_by_name:
+ if self.verbose > 4:
+ LOG.debug("Comparing names {!r} > {!r}".format(x.name, y.name))
+ if x.name != y.name:
+ if x.name > y.name:
+ return 1
+ return -1
+ if self.verbose > 4:
+ LOG.debug("Comparing vendor {!r} > {!r}".format(x.vendor, y.vendor))
+ if x.vendor != y.vendor:
+ if x.vendor > y.vendor:
+ return 1
+ return -1
+ return 0
+ if self.verbose > 4:
+ LOG.debug("Comparing full names {!r} > {!r}".format(x.full_name, y.full_name))
+ if x.full_name != y.full_name:
+ if x.full_name > y.full_name:
+ return 1
+ return -1
+ return 0
+
+ return sorted(
+ self._map.keys(),
+ key=lambda x: cmp_to_key(compare_items)(self._map[x]))
+
+ # -------------------------------------------------------------------------
+ def items(self):
+
+ item_list = []
+
+ for full_name in self.keys():
+ item_list.append((full_name, self._map[full_name]))
+
+ return item_list
+
+ # -------------------------------------------------------------------------
+ def values(self):
+
+ value_list = []
+ for full_name in self.keys():
+ value_list.append(self._map[full_name])
+ return value_list
+
+ # -------------------------------------------------------------------------
+ def __eq__(self, other):
+
+ if not isinstance(other, ForgeModuleDict):
+ raise TypeError(self.msg_no_modinfo_dict.format(other))
+
+ return self._map == other._map
+
+ # -------------------------------------------------------------------------
+ def __ne__(self, other):
+
+ if not isinstance(other, ForgeModuleDict):
+ raise TypeError(self.msg_no_modinfo_dict.format(other))
+
+ return self._map != other._map
+
+ # -------------------------------------------------------------------------
+ def pop(self, key, *args):
+
+ if key is None:
+ raise TypeError(self.msg_none_type_error)
+
+ full_name = str(key).lower().strip()
+ if full_name == '':
+ raise ValueError(self.msg_empty_key_error.format(key))
+
+ return self._map.pop(full_name, *args)
+
+ # -------------------------------------------------------------------------
+ def popitem(self):
+
+ if not len(self._map):
+ return None
+
+ full_name = self.keys()[0]
+ zone = self._map[full_name]
+ del self._map[full_name]
+ return (full_name, zone)
+
+ # -------------------------------------------------------------------------
+ def clear(self):
+ self._map = dict()
+
+ # -------------------------------------------------------------------------
+ def setdefault(self, key, default):
+
+ if key is None:
+ raise TypeError(self.msg_none_type_error)
+
+ full_name = str(key).lower().strip()
+ if full_name == '':
+ raise ValueError(self.msg_empty_key_error.format(key))
+
+ if not isinstance(default, ForgeModuleInfo):
+ raise TypeError(self.msg_invalid_modinfo_type.format(default.__class__.__name__))
+
+ if full_name in self._map:
+ return self._map[full_name]
+
+ self._set_item(full_name, default)
+ return default
+
+ # -------------------------------------------------------------------------
+ def update(self, other):
+
+ if isinstance(other, ForgeModuleDict) or isinstance(other, dict):
+ for full_name in other.keys():
+ self._set_item(full_name, other[full_name])
+ return
+
+ for tokens in other:
+ key = tokens[0]
+ value = tokens[1]
+ self._set_item(key, value)
+
+ # -------------------------------------------------------------------------
+ def as_list(self, short=True):
+
+ res = []
+ for full_name in self.keys():
+ res.append(self._map[full_name].as_dict(short))
+ return res
+
+
+# =============================================================================
+
+if __name__ == "__main__":
+
+ pass
+
+# =============================================================================
+# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 list
--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+@author: Frank Brehm
+@contact: frank.brehm@pixelpark.com
+@copyright: © 2018 by Frank Brehm, Publicies Pixelpark GmbH, Berlin
+@summary: A module for encapsulating all information about a Puppet module
+"""
+from __future__ import absolute_import
+
+# Standard modules
+import logging
+import re
+import copy
+import warnings
+import time
+import datetime
+import collections
+
+# Third party modules
+import requests
+import pytz
+
+# Own modules
+from fb_tools.common import pp, to_str, to_bool, is_sequence
+from fb_tools.obj import FbBaseObjectError, FbBaseObject
+
+from ..xlate import XLATOR
+
+from ..base_module_info import BaseModuleInfoError, BaseModuleInfo
+
+from ..module_meta_info import ModuleMetadata
+
+from . import parse_forge_date
+
+from .mod_release_info import ModuleReleaseInfo
+from .mod_release_list import ModuleReleaseList
+from .cur_mod_release_info import CurrentModuleReleaseInfo
+from .owner_info import ForgeOwnerInfo
+
+
+__version__ = '0.1.0'
+
+LOG = logging.getLogger(__name__)
+
+_ = XLATOR.gettext
+ngettext = XLATOR.ngettext
+
+
+# =============================================================================
+class ForgeModuleInfo(BaseModuleInfo):
+ """Class for encapsulating all information about a Puppet module from Puppet Forge."""
+
+ # -------------------------------------------------------------------------
+ def __init__(
+ self, appname=None, verbose=0, version=__version__, base_dir=None,
+ initialized=None, name=None, vendor=None, full_name=None):
+
+ self.current_release = None
+ self.releases = None
+ self.owner = None
+
+ self._created_at = None
+ self._deprecated_at = None
+ self._deprecated_for = None
+ self._downloads = None
+ self._endorsement = None
+ self._feedback_score = None
+ self._homepage_url = None
+ self._issues_url = None
+ self._module_group = None
+ self._slug = None
+ self._superseded_by = None
+ self._supported = None
+ self._updated_at = None
+ self._uri = None
+
+ super(ForgeModuleInfo, self).__init__(
+ appname=appname, verbose=verbose, version=version, base_dir=base_dir,
+ initialized=False, name=name, vendor=vendor, full_name=full_name
+ )
+
+ if initialized is not None:
+ self.initialized = initialized
+
+ # -------------------------------------------------------------------------
+ def as_dict(self, short=True):
+
+ res = super(ForgeModuleInfo, self).as_dict(short=short)
+
+ res['created_at'] = self.created_at
+ res['deprecated_at'] = self.deprecated_at
+ res['deprecated_for'] = self.deprecated_for
+ res['downloads'] = self.downloads
+ res['endorsement'] = self.endorsement
+ res['feedback_score'] = self.feedback_score
+ res['homepage_url'] = self.homepage_url
+ res['issues_url'] = self.issues_url
+ res['module_group'] = self.module_group
+ res['slug'] = self.slug
+ res['superseded_by'] = self.superseded_by
+ res['supported'] = self.supported
+ res['updated_at'] = self.updated_at
+ res['uri'] = self.uri
+
+ return res
+
+ # -------------------------------------------------------------------------
+ @property
+ def created_at(self):
+ """Creation date of this forge module."""
+ return self._created_at
+
+ @created_at.setter
+ def created_at(self, value):
+ if value is None:
+ self._created_at = None
+ return
+ if isinstance(value, datetime.datetime):
+ self._created_at = value
+ return
+ v = str(value).strip()
+ if value == '':
+ self._created_at = None
+ return
+ self._created_at = parse_forge_date(v)
+
+ # -------------------------------------------------------------------------
+ @property
+ def deprecated_at(self):
+ """Date of deprecation of this forge module."""
+ return self._deprecated_at
+
+ @deprecated_at.setter
+ def deprecated_at(self, value):
+ if value is None:
+ self._deprecated_at = None
+ return
+ if isinstance(value, datetime.datetime):
+ self._deprecated_at = value
+ return
+ v = str(value).strip()
+ if value == '':
+ self._deprecated_at = None
+ return
+ self._deprecated_at = parse_forge_date(v)
+
+ # -------------------------------------------------------------------------
+ @property
+ def deprecated_for(self):
+ """The reason of deprecation of this forge module."""
+ return self._deprecated_for
+
+ @deprecated_for.setter
+ def deprecated_for(self, value):
+ if value is None:
+ self._deprecated_for = None
+ return
+ v = str(value).strip()
+ if value == '':
+ self._deprecated_for = None
+ return
+ self._deprecated_for = v
+
+ # -------------------------------------------------------------------------
+ @property
+ def downloads(self):
+ """The number of downloads of this module."""
+ return self._downloads
+
+ @downloads.setter
+ def downloads(self, value):
+ if value is None:
+ self._downloads = None
+ return
+ v = int(value)
+ if v < 0:
+ msg = _(
+ "The number of {w!r} must be greater or equal to zero "
+ "(Given: {v}).").format(w='downloads', v=value)
+ raise ValueError(msg)
+ self._downloads = v
+
+ # -------------------------------------------------------------------------
+ @property
+ def endorsement(self):
+ """The endorsement of this forge module.."""
+ return self._endorsement
+
+ @endorsement.setter
+ def endorsement(self, value):
+ if value is None:
+ self._endorsement = None
+ return
+ v = str(value).strip()
+ if value == '':
+ self._endorsement = None
+ return
+ self._endorsement = v
+
+ # -------------------------------------------------------------------------
+ @property
+ def feedback_score(self):
+ """The number of feedback_scores of this module."""
+ return self._feedback_score
+
+ @feedback_score.setter
+ def feedback_score(self, value):
+ if value is None:
+ self._feedback_score = None
+ return
+ v = int(value)
+ if v < 0:
+ msg = _(
+ "The number of {w!r} must be greater or equal to zero "
+ "(Given: {v}).").format(w='feedback_score', v=value)
+ raise ValueError(msg)
+ self._feedback_score = v
+
+ # -------------------------------------------------------------------------
+ @property
+ def homepage_url(self):
+ """The homepage URL of this forge module."""
+ return self._homepage_url
+
+ @homepage_url.setter
+ def homepage_url(self, value):
+ if value is None:
+ self._homepage_url = None
+ return
+ v = str(value).strip()
+ if value == '':
+ self._homepage_url = None
+ return
+ self._homepage_url = v
+
+ # -------------------------------------------------------------------------
+ @property
+ def issues_url(self):
+ """The issues URL of this forge module."""
+ return self._issues_url
+
+ @issues_url.setter
+ def issues_url(self, value):
+ if value is None:
+ self._issues_url = None
+ return
+ v = str(value).strip()
+ if value == '':
+ self._issues_url = None
+ return
+ self._issues_url = v
+
+ # -------------------------------------------------------------------------
+ @property
+ def module_group(self):
+ """The module group of this forge module."""
+ return self._module_group
+
+ @module_group.setter
+ def module_group(self, value):
+ if value is None:
+ self._module_group = None
+ return
+ v = str(value).strip()
+ if value == '':
+ self._module_group = None
+ return
+ self._module_group = v
+
+ # -------------------------------------------------------------------------
+ @property
+ def slug(self):
+ """The slug of this forge module."""
+ return self._slug
+
+ @slug.setter
+ def slug(self, value):
+ if value is None:
+ self._slug = None
+ return
+ v = str(value).strip()
+ if value == '':
+ self._slug = None
+ return
+ self._slug = v
+
+ # -------------------------------------------------------------------------
+ @property
+ def superseded_by(self):
+ """The name of the superseding module this forge module."""
+ return self._superseded_by
+
+ @superseded_by.setter
+ def superseded_by(self, value):
+ if value is None:
+ self._superseded_by = None
+ return
+ if isinstance(value, collections.Mapping):
+ self._superseded_by = copy.copy(value)
+ return
+
+ v = str(value).strip()
+ if value == '':
+ self._superseded_by = None
+ return
+ self._superseded_by = v
+
+ # -------------------------------------------------------------------------
+ @property
+ def supported(self):
+ """Is this forge module supported by Puppetlabs?."""
+ return self._supported
+
+ @supported.setter
+ def supported(self, value):
+ if value is None:
+ self._supported = None
+ return
+ self._supported = to_bool(value)
+
+ # -------------------------------------------------------------------------
+ @property
+ def supported(self):
+ """Is this forge module supported by Puppetlabs?."""
+ return self._supported
+
+ @supported.setter
+ def supported(self, value):
+ if value is None:
+ self._supported = None
+ return
+ self._supported = to_bool(value)
+
+ # -------------------------------------------------------------------------
+ @property
+ def updated_at(self):
+ """Last update date of this forge module."""
+ return self._updated_at
+
+ @updated_at.setter
+ def updated_at(self, value):
+ if value is None:
+ self._updated_at = None
+ return
+ if isinstance(value, datetime.datetime):
+ self._updated_at = value
+ return
+ v = str(value).strip()
+ if value == '':
+ self._updated_at = None
+ return
+ self._updated_at = parse_forge_date(v)
+
+ # -------------------------------------------------------------------------
+ @property
+ def uri(self):
+ """The URI of this forge module."""
+ return self._uri
+
+ @uri.setter
+ def uri(self, value):
+ if value is None:
+ self._uri = None
+ return
+ v = str(value).strip()
+ if value == '':
+ self._uri = None
+ return
+ self._uri = v
+
+ # -------------------------------------------------------------------------
+ def to_data(self):
+ """Returning a dict, which can be used to re-instantiate this module info."""
+
+ res = {}
+
+ res['deprecated_for'] = self.deprecated_for
+ res['downloads'] = self.downloads
+ res['endorsement'] = self.endorsement
+ res['feedback_score'] = self.feedback_score
+ res['homepage_url'] = self.homepage_url
+ res['issues_url'] = self.issues_url
+ res['module_group'] = self.module_group
+ res['name'] = self.name
+ res['slug'] = self.slug
+ res['superseded_by'] = self.superseded_by
+ res['supported'] = self.supported
+ res['uri'] = self.uri
+
+ res['created_at'] = None
+ if self.created_at:
+ res['created_at'] = self.created_at.strftime('%Y-%m-%d %H:%M:%S %z')
+
+ res['deprecated_at'] = None
+ if self.deprecated_at:
+ res['deprecated_at'] = self.deprecated_at.strftime('%Y-%m-%d %H:%M:%S %z')
+
+ res['updated_at'] = None
+ if self.updated_at:
+ res['updated_at'] = self.updated_at.strftime('%Y-%m-%d %H:%M:%S %z')
+
+ res['releases'] = []
+ for release in self.releases:
+ res['releases'].append(release.to_data())
+
+ res['current_release'] = None
+ if self.current_release:
+ res['current_release'] = self.current_release.to_data()
+
+ res['owner'] = None
+ if self.owner:
+ res['owner'] = self.owner.to_data()
+
+ return res
+
+ # -------------------------------------------------------------------------
+ def apply_data(self, data):
+
+ self.releases = ModuleReleaseList(
+ appname=self.appname, verbose=self.verbose, base_dir=self.base_dir)
+
+ for prop_name in (
+ 'created_at', 'deprecated_at', 'deprecated_for', 'downloads', 'endorsement',
+ 'feedback_score', 'homepage_url', 'issues_url', 'module_group', 'slug',
+ 'superseded_by', 'supported', 'updated_at', 'uri'):
+ if prop_name in data and data[prop_name]:
+ setattr(self, prop_name, data[prop_name])
+
+ if 'current_release' in data and data['current_release']:
+ self.current_release = CurrentModuleReleaseInfo.from_data(
+ data['current_release'], appname=self.appname,
+ verbose=self.verbose, base_dir=self.base_dir)
+
+ if 'releases' in data:
+ for rel in data['releases']:
+ release = ModuleReleaseInfo.from_data(
+ rel, appname=self.appname, verbose=self.verbose, base_dir=self.base_dir)
+ if release:
+ self.releases.append(release)
+
+ self.releases.initialized = True
+
+ if 'owner' in data and data['owner']:
+ self.owner = ForgeOwnerInfo.from_data(
+ data['owner'], appname=self.appname, verbose=self.verbose, base_dir=self.base_dir)
+
+ # -------------------------------------------------------------------------
+ @classmethod
+ def from_data(cls, data, appname=None, verbose=0, base_dir=None):
+
+ if verbose > 3:
+ LOG.debug(_(
+ "Trying to instantiate a {}-object from:").format(
+ cls.__name__) + '\n' + pp(data))
+
+ if 'slug' not in data:
+ msg = _("Did not found {!r}-definition in data for forge module:").format(
+ 'slug') + '\n' + pp(data)
+ LOG.error(msg)
+ return None
+
+ full_name = data['slug'].strip()
+ if full_name == '':
+ msg = _("Found empty {!r}-definition in data for forge module:").format(
+ 'slug') + '\n' + pp(data)
+ LOG.error(msg)
+ return None
+
+ module_info = cls(appname=appname, verbose=verbose, base_dir=base_dir, full_name=full_name)
+ module_info.apply_data(data)
+
+ return module_info
+
+ # -------------------------------------------------------------------------
+ @classmethod
+ def get_from_forge(
+ cls, full_name, forge_uri, http_timeout=30,
+ appname=None, verbose=0, base_dir=None):
+
+ url = "{url}/{name}".format(url=forge_uri, name=full_name)
+ module_info = None
+
+ LOG.info(_("Trying to get module {m!r} from Puppet forge {u!r} ...").format(
+ m=full_name, u=url))
+
+ session = requests.Session()
+ with warnings.catch_warnings(record=True) as w:
+ warnings.simplefilter("always")
+ response = session.request('GET', url, timeout=http_timeout)
+ if w:
+ warn_class = w[-1].category.__name__
+ warn_msg = '{}: {}'.format(
+ warn_class, w[-1].message)
+ if warn_class == 'SubjectAltNameWarning':
+ LOG.debug(warn_msg)
+ else:
+ LOG.warn(warn_msg)
+
+ LOG.debug("Got status code: {}.".format(response.status_code))
+ if not response.ok:
+ LOG.debug("Did not found module {} on Puppet forge.".format(full_name))
+ return None
+
+ if not response.text:
+ LOG.warn(_("No output for URL {!r}.").format(url))
+ return None
+ if verbose > 3:
+ msg = "Output:\n{}".format(response.text)
+ LOG.debug(msg)
+
+ try:
+ module_info = cls(
+ appname=appname, verbose=verbose, base_dir=base_dir,
+ full_name=full_name,
+ )
+ except ForgeModuleInfoError as e:
+ LOG.warn("{c}: {e}".format(c=e.__class__.__name__, e=e))
+ return None
+
+ data = response.json()
+ if verbose > 4:
+ LOG.debug("Performing forge data:\n" + pp(data))
+ module_info.apply_data(data)
+
+ if module_info.superseded_by:
+ subst = module_info.superseded_by
+ if verbose > 2:
+ LOG.debug("Superseded info:\n" + pp(subst))
+ if 'slug' in subst:
+ subst = subst['slug']
+ LOG.warning(_(
+ "Module {c!r} is deprecated at Puppet forge and should be substituted "
+ "by module {n!r}.").format(c=module_info.slug, n=subst))
+
+ return module_info
+
+
+# =============================================================================
+if __name__ == "__main__":
+
+ pass
+
+# =============================================================================
+
+# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 list
--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+@author: Frank Brehm
+@contact: frank.brehm@pixelpark.com
+@copyright: © 2018 by Frank Brehm, Publicies Pixelpark GmbH, Berlin
+@summary: A module for encapsulating all information about a Puppet module
+"""
+from __future__ import absolute_import
+
+# Standard modules
+import logging
+import re
+import copy
+import warnings
+import time
+import datetime
+import collections
+
+
+# Own modules
+from fb_tools.common import pp, to_str, to_bool, is_sequence
+
+from ..xlate import XLATOR
+
+from . import parse_forge_date
+from . import ReleaseInfoError, ForgeModuleInfoError, ForgeModuleInfoTypeError
+from . import BaseForgeObject
+
+__version__ = '0.1.0'
+
+LOG = logging.getLogger(__name__)
+
+_ = XLATOR.gettext
+ngettext = XLATOR.ngettext
+
+
+# =============================================================================
+class ModuleReleaseInfo(BaseForgeObject):
+ """Class for encapsulating information about a Puppet module release from Puppet Forge."""
+
+ # -------------------------------------------------------------------------
+ def __init__(
+ self, appname=None, verbose=0, version=__version__, base_dir=None,
+ initialized=None):
+
+ self._created_at = None
+ self._deleted_at = None
+ self._file_size = None
+ self._file_uri = None
+ self._supported = None
+ self._release_version = None
+
+ super(ModuleReleaseInfo, self).__init__(
+ appname=appname, verbose=verbose, version=version, base_dir=base_dir, initialized=False)
+
+ if initialized is not None:
+ self.initialized = initialized
+
+ # -------------------------------------------------------------------------
+ def as_dict(self, short=True):
+ """
+ Transforms the elements of the object into a dict
+
+ @return: structure as dict
+ @rtype: dict
+ """
+
+ res = super(ModuleReleaseInfo, self).as_dict(short=short)
+
+ res['created_at'] = self.created_at
+ res['deleted_at'] = self.deleted_at
+ res['file_size'] = self.file_size
+ res['file_uri'] = self.file_uri
+ res['supported'] = self.supported
+ res['release_version'] = self.release_version
+
+ return res
+
+ # -------------------------------------------------------------------------
+ @property
+ def created_at(self):
+ """Creation date of this release."""
+ return self._created_at
+
+ @created_at.setter
+ def created_at(self, value):
+ if value is None:
+ self._created_at = None
+ return
+ if isinstance(value, datetime.datetime):
+ self._created_at = value
+ return
+ v = str(value).strip()
+ if value == '':
+ self._created_at = None
+ return
+ self._created_at = parse_forge_date(v)
+
+ # -------------------------------------------------------------------------
+ @property
+ def deleted_at(self):
+ """Deletion date of this release."""
+ return self._deleted_at
+
+ @deleted_at.setter
+ def deleted_at(self, value):
+ if value is None:
+ self._deleted_at = None
+ return
+ if isinstance(value, datetime.datetime):
+ self._deleted_at = value
+ return
+ v = str(value).strip()
+ if value == '':
+ self._deleted_at = None
+ return
+ self._deleted_at = parse_forge_date(v)
+
+ # -------------------------------------------------------------------------
+ @property
+ def file_size(self):
+ """The file size in bytes of this release."""
+ return self._file_size
+
+ @file_size.setter
+ def file_size(self, value):
+ if value is None:
+ self._file_size = None
+ return
+ v = int(value)
+ if v < 0:
+ msg = _(
+ "The file size of a release must be greater or equal to zero "
+ "(Given: {}).").format(value)
+ raise ValueError(msg)
+ self._file_size = v
+
+ # -------------------------------------------------------------------------
+ @property
+ def file_uri(self):
+ """The file URI of this release."""
+ return self._file_uri
+
+ @file_uri.setter
+ def file_uri(self, value):
+ if value is None:
+ self._file_uri = None
+ return
+ v = str(value).strip()
+ if value == '':
+ self._file_uri = None
+ return
+ self._file_uri = v
+
+ # -------------------------------------------------------------------------
+ @property
+ def release_version(self):
+ """The version number of this release."""
+ return self._release_version
+
+ @release_version.setter
+ def release_version(self, value):
+ if value is None:
+ self._release_version = None
+ return
+ v = str(value).strip()
+ if value == '':
+ self._release_version = None
+ return
+ self._release_version = v
+
+ # -------------------------------------------------------------------------
+ @property
+ def supported(self):
+ """The URI of this release."""
+ return self._supported
+
+ @supported.setter
+ def supported(self, value):
+ if value is None:
+ self._supported = None
+ return
+ self._supported = to_bool(value)
+
+ # -------------------------------------------------------------------------
+ def to_data(self):
+ """Returning a dict, which can be used to re-instantiate this module info."""
+
+ res = super(ModuleReleaseInfo, self).to_data()
+
+ res['created_at'] = None
+ if self.created_at:
+ res['created_at'] = self.created_at.strftime('%Y-%m-%d %H:%M:%S %z')
+
+ res['deleted_at'] = None
+ if self.deleted_at:
+ res['deleted_at'] = self.deleted_at.strftime('%Y-%m-%d %H:%M:%S %z')
+
+ res['file_size'] = self.file_size
+ res['file_uri'] = self.file_uri
+ res['supported'] = self.supported
+ res['version'] = self.release_version
+
+ return res
+
+ # -------------------------------------------------------------------------
+ def copy_to(self, new):
+
+ if not isinstance(new, ModuleReleaseInfo):
+ msg = _("Parameter {p!r} is not of class {c!r}, but of {e!r} instead.").format(
+ p='new', c='ModuleReleaseInfo', e=new.__class__.__name__)
+ raise TypeError(msg)
+
+ super(ModuleReleaseInfo, self).copy_to(new)
+ new.created_at = self.created_at
+ new.deleted_at = self.deleted_at
+ new.file_size = self.file_size
+ new.file_uri = self.file_uri
+ new.supported = self.supported
+ new.release_version = self.release_version
+
+ # -------------------------------------------------------------------------
+ def __eq__(self, other):
+
+ if self.verbose > 4:
+ LOG.debug(_("Comparing {} objects ...").format(self.__class__.__name__))
+
+ if not super(ModuleReleaseInfo, self).__eq__(other):
+ return False
+
+ if not isinstance(other, ModuleReleaseInfo):
+ return False
+
+ if self.created_at != other.created_at:
+ return False
+ if self.deleted_at != other.deleted_at:
+ return False
+ if self.file_size != other.file_size:
+ return False
+ if self.file_uri != other.file_uri:
+ return False
+ if self.supported != other.supported:
+ return False
+ if self.release_version != other.release_version:
+ return False
+
+ return True
+
+ # -------------------------------------------------------------------------
+ def apply_data(self, data):
+
+ super(ModuleReleaseInfo, self).apply_data(data)
+
+ if 'created_at' in data and data['created_at']:
+ self.created_at = data['created_at']
+ if 'deleted_at' in data and data['deleted_at']:
+ self.deleted_at = data['deleted_at']
+ if 'file_size' in data and data['file_size']:
+ self.file_size = data['file_size']
+ if 'file_uri' in data and data['file_uri']:
+ self.file_uri = data['file_uri']
+ if 'slug' in data and data['slug']:
+ self.slug = data['slug']
+ if 'supported' in data and data['supported']:
+ self.supported = data['supported']
+ if 'uri' in data and data['uri']:
+ self.uri = data['uri']
+ if 'version' in data and data['version']:
+ self.release_version = data['version']
+
+
+# =============================================================================
+if __name__ == "__main__":
+
+ pass
+
+# =============================================================================
+
+# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 list
--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+@author: Frank Brehm
+@contact: frank.brehm@pixelpark.com
+@copyright: © 2018 by Frank Brehm, Publicies Pixelpark GmbH, Berlin
+@summary: A module for encapsulating all information about a Puppet module
+"""
+from __future__ import absolute_import
+
+# Standard modules
+import logging
+import re
+import copy
+import warnings
+import time
+import datetime
+import collections
+
+# Own modules
+from fb_tools.common import pp, to_str, to_bool, is_sequence
+from fb_tools.obj import FbBaseObjectError, FbBaseObject
+
+from ..xlate import XLATOR
+
+from ..base_module_info import BaseModuleInfoError, BaseModuleInfo
+
+from .mod_release_info import ModuleReleaseInfo
+
+__version__ = '0.1.0'
+
+LOG = logging.getLogger(__name__)
+
+_ = XLATOR.gettext
+ngettext = XLATOR.ngettext
+
+
+# =============================================================================
+class ModuleReleaseList(collections.MutableSequence, FbBaseObject):
+
+ msg_no_release = _("Invalid type {t!r} as an item of a {c}, only {o} objects are allowed.")
+
+ # -------------------------------------------------------------------------
+ def __init__(
+ self, appname=None, verbose=0, version=__version__, base_dir=None,
+ initialized=None, *releases):
+
+ self._list = []
+
+ super(ModuleReleaseList, self).__init__(
+ appname=appname, verbose=verbose, version=version, base_dir=base_dir, initialized=False)
+
+ for release in releases:
+ self.append(release)
+
+ # -------------------------------------------------------------------------
+ def as_dict(self, short=True):
+
+ res = super(ModuleReleaseList, self).as_dict(short=short)
+
+ res['list'] = []
+ for release in self:
+ res['list'].append(release.as_dict(short=short))
+
+ return res
+
+ # -------------------------------------------------------------------------
+ def to_data(self):
+ """Returning a list, which can be used to re-instantiate this module info."""
+
+ res = []
+ for release in self:
+ res.append(release.to_data())
+
+ return res
+
+ # -------------------------------------------------------------------------
+ def index(self, release, *args):
+
+ i = None
+ j = None
+
+ if len(args) > 0:
+ if len(args) > 2:
+ raise TypeError(_("{m} takes at most {max} arguments ({n} given).").format(
+ m='index()', max=3, n=len(args) + 1))
+ i = int(args[0])
+ if len(args) > 1:
+ j = int(args[1])
+
+ index = 0
+ if i is not None:
+ start = i
+ if i < 0:
+ start = len(self._list) + i
+ wrap = False
+ end = len(self._list)
+ if j is not None:
+ if j < 0:
+ end = len(self._list) + j
+ if end < index:
+ wrap = True
+ else:
+ end = j
+ for index in list(range(len(self._list))):
+ item = self._list[index]
+ if index < start:
+ continue
+ if index >= end and not wrap:
+ break
+ if item == release:
+ return index
+
+ if wrap:
+ for index in list(range(len(self._list))):
+ item = self._list[index]
+ if index >= end:
+ break
+ if item == release:
+ return index
+
+ msg = _("Release {!r} is not in release list.").format(release.release_version)
+ raise ValueError(msg)
+
+ # -------------------------------------------------------------------------
+ def __contains__(self, release):
+
+ if not isinstance(release, ModuleReleaseInfo):
+ raise TypeError(self.msg_no_release.format(
+ t=release.__class__.__name__, c=self.__class__.__name__, o='ModuleReleaseInfo'))
+
+ if not self._list:
+ return False
+
+ for item in self._list:
+ if item == release:
+ return True
+
+ return False
+
+ # -------------------------------------------------------------------------
+ def count(self, release):
+
+ if not isinstance(release, ModuleReleaseInfo):
+ raise TypeError(self.msg_no_release.format(
+ t=release.__class__.__name__, c=self.__class__.__name__, o='ModuleReleaseInfo'))
+
+ if not self._list:
+ return 0
+
+ num = 0
+ for item in self._list:
+ if item == release:
+ num += 1
+ return num
+
+ # -------------------------------------------------------------------------
+ def __len__(self):
+ return len(self._list)
+
+ # -------------------------------------------------------------------------
+ def __getitem__(self, key):
+ return self._list.__getitem__(key)
+
+ # -------------------------------------------------------------------------
+ def __reversed__(self):
+
+ return reversed(self._list)
+
+ # -------------------------------------------------------------------------
+ def __setitem__(self, key, release):
+
+ if not isinstance(release, ModuleReleaseInfo):
+ raise TypeError(self.msg_no_release.format(
+ t=release.__class__.__name__, c=self.__class__.__name__, o='ModuleReleaseInfo'))
+
+ self._list.__setitem__(key, release)
+
+ # -------------------------------------------------------------------------
+ def __delitem__(self, key):
+
+ del self._list[key]
+
+ # -------------------------------------------------------------------------
+ def append(self, release):
+
+ if not isinstance(release, ModuleReleaseInfo):
+ raise TypeError(self.msg_no_release.format(
+ t=release.__class__.__name__, c=self.__class__.__name__, o='ModuleReleaseInfo'))
+
+ self._list.append(release)
+
+ # -------------------------------------------------------------------------
+ def insert(self, index, release):
+
+ if not isinstance(release, ModuleReleaseInfo):
+ raise TypeError(self.msg_no_release.format(
+ t=release.__class__.__name__, c=self.__class__.__name__, o='ModuleReleaseInfo'))
+
+ self._list.insert(index, release)
+
+ # -------------------------------------------------------------------------
+ def __copy__(self):
+
+ new_list = self.__class__()
+ for release in self._list:
+ new_list.append(copy.copy(release))
+ return new_list
+
+
+# =============================================================================
+if __name__ == "__main__":
+
+ pass
+
+# =============================================================================
+
+# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 list
--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+@author: Frank Brehm
+@contact: frank.brehm@pixelpark.com
+@copyright: © 2018 by Frank Brehm, Publicies Pixelpark GmbH, Berlin
+@summary: A module for encapsulating all information about a Puppet module
+"""
+from __future__ import absolute_import
+
+# Standard modules
+import logging
+import re
+import copy
+import warnings
+import time
+import datetime
+import collections
+
+
+# Own modules
+from fb_tools.common import pp, to_str, to_bool, is_sequence
+
+from ..xlate import XLATOR
+
+from . import parse_forge_date
+from . import ReleaseInfoError, ForgeModuleInfoError, ForgeModuleInfoTypeError
+from . import BaseForgeObject
+
+__version__ = '0.2.0'
+
+LOG = logging.getLogger(__name__)
+
+_ = XLATOR.gettext
+ngettext = XLATOR.ngettext
+
+
+# =============================================================================
+class ForgeOwnerInfo(BaseForgeObject):
+ """Class for encapsulating information about an module owner in Puppet forge."""
+
+ # -------------------------------------------------------------------------
+ def __init__(
+ self, gravatar_id=None, slug=None, uri=None, username=None,
+ appname=None, verbose=0, version=__version__, base_dir=None, initialized=None):
+
+ self._gravatar_id = None
+ self._username = None
+
+ super(ForgeOwnerInfo, self).__init__(
+ slug=slug, uri=uri, appname=appname, verbose=verbose, version=version,
+ base_dir=base_dir, initialized=False)
+
+ self.gravatar_id = gravatar_id
+ self.username = username
+
+ if initialized is not None:
+ self.initialized = initialized
+
+ # -------------------------------------------------------------------------
+ def as_dict(self, short=True):
+ """
+ Transforms the elements of the object into a dict
+
+ @return: structure as dict
+ @rtype: dict
+ """
+
+ res = super(ForgeOwnerInfo, self).as_dict(short=short)
+
+ res['gravatar_id'] = self.gravatar_id
+ res['username'] = self.username
+
+ return res
+
+ # -------------------------------------------------------------------------
+ @property
+ def gravatar_id(self):
+ """The Gravatar-Id of this owner."""
+ return self._gravatar_id
+
+ @gravatar_id.setter
+ def gravatar_id(self, value):
+ if value is None:
+ self._gravatar_id = None
+ return
+ v = str(value).strip()
+ if value == '':
+ self._gravatar_id = None
+ return
+ self._gravatar_id = v
+
+ # -------------------------------------------------------------------------
+ @property
+ def username(self):
+ """The username of this owner."""
+ return self._username
+
+ @username.setter
+ def username(self, value):
+ if value is None:
+ self._username = None
+ return
+ v = str(value).strip()
+ if value == '':
+ self._username = None
+ return
+ self._username = v
+
+ # -------------------------------------------------------------------------
+ def to_data(self):
+ """Returning a dict, which can be used to re-instantiate this owner info."""
+
+ res = super(ForgeOwnerInfo, self).to_data()
+ res['gravatar_id'] = self.gravatar_id
+ res['username'] = self.username
+
+ return res
+
+ # -------------------------------------------------------------------------
+ def copy_to(self, new):
+
+ if not isinstance(new, ForgeOwnerInfo):
+ msg = _("Parameter {p!r} is not of class {c!r}, but of {e!r} instead.").format(
+ p='new', c='ForgeOwnerInfo', e=new.__class__.__name__)
+ raise TypeError(msg)
+
+ super(ForgeOwnerInfo, self).copy_to(new)
+ new.gravatar_id = self.gravatar_id
+ new.username = self.username
+
+ # -------------------------------------------------------------------------
+ def __eq__(self, other):
+
+ if self.verbose > 4:
+ LOG.debug(_("Comparing {} objects ...").format(self.__class__.__name__))
+
+ if not super(ForgeOwnerInfo, self).__eq__(other):
+ return False
+
+ if not isinstance(other, ForgeOwnerInfo):
+ return False
+
+ if self.gravatar_id != other.gravatar_id:
+ return False
+ if self.username != other.username:
+ return False
+
+ return True
+
+ # -------------------------------------------------------------------------
+ def apply_data(self, data):
+
+ super(ForgeOwnerInfo, self).apply_data(data)
+
+ if 'gravatar_id' in data:
+ self.gravatar_id = data['gravatar_id']
+ if 'username' in data:
+ self.username = data['username']
+
+
+# =============================================================================
+if __name__ == "__main__":
+
+ pass
+
+# =============================================================================
+
+# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 list
from .base_app import BaseHookError, BaseHookApp
-from .forge_module_info import ForgeModuleInfo, ForgeModulesList
+#from .forge_module_info import ForgeModuleInfo, ForgeModulesList
from .module_info import ModuleInfo
from .puppetfile import Puppetfile, PuppetfileError
+from .forge.mod_info import ForgeModuleInfo
+from .forge.mod_dict import ForgeModuleDict
+
from .xlate import XLATOR
LOG = logging.getLogger(__name__)
appname=appname, base_dir=base_dir, verbose=verbose,
version=version, description=description)
- self._init_forge_module_list()
+ self._init_forge_module_dict()
# -----------------------------------------------------------
@property
)
# -------------------------------------------------------------------------
- def _init_forge_module_list(self):
+ def _init_forge_module_dict(self):
- self.forge_modules = ForgeModulesList(
+ self.forge_modules = ForgeModuleDict(
appname=self.appname, verbose=self.verbose, base_dir=self.base_dir)
# -------------------------------------------------------------------------
self.print_out(msg)
modules_done = []
- self._init_forge_module_list()
+ self._init_forge_module_dict()
for env in self.environments:
print('.', end='', flush=True)
else:
print('~', end='', flush=True)
+
+ if self.verbose > 1:
+ LOG.debug(_(
+ "Got a {}:").format('ForgeModuleDict') + '\n' + pp(self.forge_modules.keys()))
if not self.verbose:
print()