From: Frank Brehm Date: Wed, 27 Feb 2019 15:08:43 +0000 (+0100) Subject: Splitting lib/webhooks/forge_module_info.py in different modules X-Git-Tag: 1.6.4^2~39 X-Git-Url: https://git.uhu-banane.net/?a=commitdiff_plain;h=b956347606de0ab353e5d98577af7a5bef1660c3;p=pixelpark%2Fpuppetmaster-webhooks.git Splitting lib/webhooks/forge_module_info.py in different modules --- diff --git a/lib/webhooks/forge/__init__.py b/lib/webhooks/forge/__init__.py new file mode 100644 index 0000000..364c3ec --- /dev/null +++ b/lib/webhooks/forge/__init__.py @@ -0,0 +1,213 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +@author: Frank Brehm +@contact: frank.brehm@pixelpark.com +@copyright: © 2018 by Frank Brehm, Publicies Pixelpark GmbH, Berlin +@summary: A module for encapsulating all information about a Puppet module +""" +from __future__ import absolute_import + +# Standard modules +import logging +import re +import copy +import warnings +import time +import datetime +import collections + +# Own modules +from fb_tools.common import pp, to_str, to_bool, is_sequence +from fb_tools.obj import FbBaseObjectError, FbBaseObject + +from ..xlate import XLATOR + +from ..base_module_info import BaseModuleInfoError, BaseModuleInfo + +from ..module_meta_info import ModuleMetadata + +__version__ = '0.1.0' + +LOG = logging.getLogger(__name__) + +_ = XLATOR.gettext +ngettext = XLATOR.ngettext + + +# ============================================================================= +def parse_forge_date(dt): + + return datetime.datetime.strptime(dt, '%Y-%m-%d %H:%M:%S %z') + + +# ============================================================================= +class ReleaseInfoError(BaseModuleInfoError): + + pass + + +# ============================================================================= +class ForgeModuleInfoError(BaseModuleInfoError): + + pass + + +# ============================================================================= +class ForgeModuleInfoTypeError(ForgeModuleInfoError, TypeError): + + pass + + +# ============================================================================= +class BaseForgeObject(FbBaseObject): + """Base class for the most forge classes.""" + + # ------------------------------------------------------------------------- + def __init__( + self, slug=None, uri=None, + appname=None, verbose=0, version=__version__, base_dir=None, initialized=None): + + self._slug = None + self._uri = None + + super(BaseForgeObject, self).__init__( + appname=appname, verbose=verbose, version=version, base_dir=base_dir, initialized=False) + + self.slug = slug + self.uri = uri + + if initialized is not None: + self.initialized = initialized + + # ------------------------------------------------------------------------- + def as_dict(self, short=True): + """ + Transforms the elements of the object into a dict + + @return: structure as dict + @rtype: dict + """ + + res = super(BaseForgeObject, self).as_dict(short=short) + + res['slug'] = self.slug + res['uri'] = self.uri + + return res + + # ------------------------------------------------------------------------- + @property + def slug(self): + """The slug of this owner.""" + return self._slug + + @slug.setter + def slug(self, value): + if value is None: + self._slug = None + return + v = str(value).strip() + if value == '': + self._slug = None + return + self._slug = v + + # ------------------------------------------------------------------------- + @property + def uri(self): + """The URI of this owner.""" + return self._uri + + @uri.setter + def uri(self, value): + if value is None: + self._uri = None + return + v = str(value).strip() + if value == '': + self._uri = None + return + self._uri = v + + # ------------------------------------------------------------------------- + def to_data(self): + """Returning a dict, which can be used to re-instantiate this owner info.""" + + res = { + 'slug': self.slug, + 'uri': self.uri, + } + + return res + + # ------------------------------------------------------------------------- + def __copy__(self): + + new = self.__class__( + appname=self.appname, verbose=self.verbose, base_dir=self.base_dir) + + self.copy_to(new) + new.initialized = self.initialized + + return new + + # ------------------------------------------------------------------------- + def copy_to(self, new): + + if not isinstance(new, BaseForgeObject): + msg = _("Parameter {p!r} is not of class {c!r}, but of {e!r} instead.").format( + p='new', c='BaseForgeObject', e=new.__class__.__name__) + raise TypeError(msg) + + new.slug = self.slug + new.uri = self.uri + + # ------------------------------------------------------------------------- + def __eq__(self, other): + + if self.verbose > 4: + LOG.debug(_("Comparing {} objects ...").format(self.__class__.__name__)) + + if not isinstance(other, BaseForgeObject): + return False + + if self.slug != other.slug: + return False + if self.uri != other.uri: + return False + + return True + + # ------------------------------------------------------------------------- + @classmethod + def from_data(cls, data, appname=None, verbose=0, base_dir=None): + + owner = cls(appname=appname, verbose=verbose, base_dir=base_dir) + + owner.apply_data(data) + + owner.initialized = True + + if verbose > 3: + LOG.debug(_("Got {}:").format(cls.__name__) + '\n' + pp(owner.as_dict())) + + return owner + + # ------------------------------------------------------------------------- + def apply_data(self, data): + + if 'gravatar_id' in data: + self.gravatar_id = data['gravatar_id'] + if 'username' in data: + self.username = data['username'] + + +# ============================================================================= +if __name__ == "__main__": + + pass + +# ============================================================================= + +# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 list diff --git a/lib/webhooks/forge/base_module_info.py b/lib/webhooks/forge/base_module_info.py new file mode 100644 index 0000000..5f81f78 --- /dev/null +++ b/lib/webhooks/forge/base_module_info.py @@ -0,0 +1,187 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +@author: Frank Brehm +@contact: frank.brehm@pixelpark.com +@copyright: © 2018 by Frank Brehm, Publicies Pixelpark GmbH, Berlin +@summary: A module for encapsulating all information about a Puppet module +""" +from __future__ import absolute_import + +# Standard modules +import logging +import re +import copy +import warnings +import time +import datetime +import collections + + +# Own modules +from fb_tools.common import pp, to_str, to_bool, is_sequence + +from ..xlate import XLATOR + +from . import parse_forge_date +from . import ReleaseInfoError, ForgeModuleInfoError, ForgeModuleInfoTypeError +from . import BaseForgeObject + +from .owner_info import ForgeOwnerInfo + +__version__ = '0.1.0' + +LOG = logging.getLogger(__name__) + +_ = XLATOR.gettext +ngettext = XLATOR.ngettext + + +# ============================================================================= +class BaseForgeModuleInfo(BaseForgeObject): + + # ------------------------------------------------------------------------- + def __init__( + self, appname=None, verbose=0, version=__version__, base_dir=None, + initialized=None): + + self._deprecated_at = None + self._name = None + self.owner = None + + super(BaseForgeModuleInfo, self).__init__( + appname=appname, verbose=verbose, version=version, base_dir=base_dir, initialized=False) + + if initialized is not None: + self.initialized = initialized + + # ------------------------------------------------------------------------- + def as_dict(self, short=True): + """ + Transforms the elements of the object into a dict + + @return: structure as dict + @rtype: dict + """ + + res = super(BaseForgeModuleInfo, self).as_dict(short=short) + + res['deprecated_at'] = self.deprecated_at + res['name'] = self.name + + return res + + # ------------------------------------------------------------------------- + @property + def deprecated_at(self): + """Date of deprecation of this module.""" + return self._deprecated_at + + @deprecated_at.setter + def deprecated_at(self, value): + if value is None: + self._deprecated_at = None + return + if isinstance(value, datetime.datetime): + self._deprecated_at = value + return + v = str(value).strip() + if value == '': + self._deprecated_at = None + return + self._deprecated_at = parse_forge_date(v) + + # ------------------------------------------------------------------------- + @property + def name(self): + """The name of this module.""" + return self._name + + @name.setter + def name(self, value): + if value is None: + self._name = None + return + v = str(value).strip() + if value == '': + self._name = None + return + self._name = v + + # ------------------------------------------------------------------------- + def to_data(self): + """Returning a dict, which can be used to re-instantiate this module info.""" + + res = super(BaseForgeModuleInfo, self).to_data() + + res['deprecated_at'] = None + if self.deprecated_at: + res['deprecated_at'] = self.deprecated_at.strftime('%Y-%m-%d %H:%M:%S %z') + + res['owner'] = None + if self.owner: + res['owner'] = self.owner.to_data() + + res['name'] = self.name + + return res + + # ------------------------------------------------------------------------- + def copy_to(self, new): + + if not isinstance(new, BaseForgeModuleInfo): + msg = _("Parameter {p!r} is not of class {c!r}, but of {e!r} instead.").format( + p='new', c='BaseForgeModuleInfo', e=new.__class__.__name__) + raise TypeError(msg) + + super(BaseForgeModuleInfo, self).copy_to(new) + new.deprecated_at = self.deprecated_at + new.name = self.name + if self.owner: + new.owner = copy.copy(self.owner) + + # ------------------------------------------------------------------------- + def __eq__(self, other): + + if self.verbose > 4: + LOG.debug(_("Comparing {} objects ...").format(self.__class__.__name__)) + + if not super(BaseForgeModuleInfo, self).__eq__(other): + return False + + if not isinstance(other, BaseForgeModuleInfo): + return False + + if self.deprecated_at != other.deprecated_at: + return False + if self.name != other.name: + return False + if self.owner != other.owner: + return False + + return True + + # ------------------------------------------------------------------------- + def apply_data(self, data): + + super(BaseForgeModuleInfo, self).apply_data(data) + + if 'deprecated_at' in data: + self.deprecated_at = data['deprecated_at'] + if 'name' in data: + self.name = data['name'] + + if 'owner' in data and data['owner']: + self.owner = ForgeOwnerInfo.from_data( + data['owner'], appname=self.appname, + verbose=self.verbose, base_dir=self.base_dir) + + +# ============================================================================= +if __name__ == "__main__": + + pass + +# ============================================================================= + +# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 list diff --git a/lib/webhooks/forge/cur_mod_release_info.py b/lib/webhooks/forge/cur_mod_release_info.py new file mode 100644 index 0000000..c8b8e73 --- /dev/null +++ b/lib/webhooks/forge/cur_mod_release_info.py @@ -0,0 +1,456 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +@author: Frank Brehm +@contact: frank.brehm@pixelpark.com +@copyright: © 2018 by Frank Brehm, Publicies Pixelpark GmbH, Berlin +@summary: A module for encapsulating all information about a Puppet module +""" +from __future__ import absolute_import + +# Standard modules +import logging +import re +import copy +import warnings +import time +import datetime +import collections + + +# Own modules +from fb_tools.common import pp, to_str, to_bool, is_sequence + +from ..xlate import XLATOR + +from . import parse_forge_date +from . import ReleaseInfoError, ForgeModuleInfoError, ForgeModuleInfoTypeError +from . import BaseForgeObject + +from ..module_meta_info import ModuleMetadata + +from .owner_info import ForgeOwnerInfo + +from .mod_release_info import ModuleReleaseInfo + +from .base_module_info import BaseForgeModuleInfo + +__version__ = '0.1.0' + +LOG = logging.getLogger(__name__) + +_ = XLATOR.gettext +ngettext = XLATOR.ngettext + + +# ============================================================================= +class CurrentModuleReleaseInfo(ModuleReleaseInfo): + """Class for encapsulating information about the current release of a Puppet module + from Puppet Forge.""" + + # ------------------------------------------------------------------------- + def __init__( + self, appname=None, verbose=0, version=__version__, base_dir=None, + initialized=None): + + self._changelog = None + self._deleted_for = None + self._downloads = None + self._file_md5 = None + self._license = None + self.metadata = None + self.module = None + self._pdk = False + self._readme = None + self._reference = None + self.tags = [] + self.tasks = [] + self._updated_at = None + self._validation_score = None + + super(CurrentModuleReleaseInfo, self).__init__( + appname=appname, verbose=verbose, version=version, base_dir=base_dir, initialized=False) + + if initialized is not None: + self.initialized = initialized + + # ------------------------------------------------------------------------- + def as_dict(self, short=True): + """ + Transforms the elements of the object into a dict + + @return: structure as dict + @rtype: dict + """ + + res = super(CurrentModuleReleaseInfo, self).as_dict(short=short) + + res['changelog'] = self.changelog + res['deleted_for'] = self.deleted_for + res['downloads'] = self.downloads + res['file_md5'] = self.file_md5 + res['license'] = self.license + res['pdk'] = self.pdk + res['readme'] = self.readme + res['reference'] = self.reference + res['updated_at'] = self.updated_at + res['validation_score'] = self.validation_score + + return res + + # ------------------------------------------------------------------------- + @property + def changelog(self): + """The change log of this module release.""" + return self._changelog + + @changelog.setter + def changelog(self, value): + if value is None: + self._changelog = None + return + v = str(value).strip() + if v == '': + self._changelog = None + return + self._changelog = v + + # ------------------------------------------------------------------------- + @property + def deleted_for(self): + """The reason, why this module release was deleted.""" + return self._deleted_for + + @deleted_for.setter + def deleted_for(self, value): + if value is None: + self._deleted_for = None + return + v = str(value).strip() + if v == '': + self._deleted_for = None + return + self._deleted_for = v + + # ------------------------------------------------------------------------- + @property + def downloads(self): + "The number of downloads of this module release.""" + return self._downloads + + @downloads.setter + def downloads(self, value): + if value is None: + self._downloads = None + return + try: + v = int(value) + self._downloads = v + return + except Exception as e: + LOG.error(_("Got a {c} setting {w}: {e}").format( + c=e.__class__.__name__, w='downloads', e=e)) + + # ------------------------------------------------------------------------- + @property + def file_md5(self): + """The MD5-sum of the current release package.""" + return self._file_md5 + + @file_md5.setter + def file_md5(self, value): + if value is None: + self._file_md5 = None + return + v = str(value).strip() + if v == '': + self._file_md5 = None + return + self._file_md5 = v + + # ------------------------------------------------------------------------- + @property + def license(self): + """The license of module source.""" + return self._license + + @license.setter + def license(self, value): + if value is None: + self._license = None + return + v = str(value).strip() + if v == '': + self._license = None + return + self._license = v + + # ------------------------------------------------------------------------- + @property + def pdk(self): + """The pdk of this release.""" + return self._pdk + + @pdk.setter + def pdk(self, value): + if value is None: + self._pdk = None + return + self._pdk = to_bool(value) + + # ------------------------------------------------------------------------- + @property + def readme(self): + """The readme of module release.""" + return self._readme + + @readme.setter + def readme(self, value): + if value is None: + self._readme = None + return + v = str(value).strip() + if v == '': + self._readme = None + return + self._readme = v + + # ------------------------------------------------------------------------- + @property + def reference(self): + """The reference of module release.""" + return self._reference + + @reference.setter + def reference(self, value): + if value is None: + self._reference = None + return + v = str(value).strip() + if v == '': + self._reference = None + return + self._reference = v + + # ------------------------------------------------------------------------- + @property + def updated_at(self): + """Update date of this release.""" + return self._updated_at + + @updated_at.setter + def updated_at(self, value): + if value is None: + self._updated_at = None + return + if isinstance(value, datetime.datetime): + self._updated_at = value + return + v = str(value).strip() + if value == '': + self._updated_at = None + return + self._updated_at = parse_forge_date(v) + + # ------------------------------------------------------------------------- + @property + def validation_score(self): + "The validation score of this module release.""" + return self._validation_score + + @validation_score.setter + def validation_score(self, value): + if value is None: + self._validation_score = None + return + try: + v = int(value) + self._validation_score = v + return + except Exception as e: + LOG.error(_("Got a {c} setting {w}: {e}").format( + c=e.__class__.__name__, w='validation_score', e=e)) + + # ------------------------------------------------------------------------- + def to_data(self): + """Returning a dict, which can be used to re-instantiate this module info.""" + + res = super(CurrentModuleReleaseInfo, self).to_data() + + res['changelog'] = self.changelog + res['deleted_for'] = self.deleted_for + res['downloads'] = self.downloads + res['file_md5'] = self.file_md5 + res['license'] = self.license + res['pdk'] = self.pdk + res['readme'] = self.readme + res['reference'] = self.reference + res['validation_score'] = self.validation_score + + res['metadata'] = None + if self.metadata: + res['metadata'] = self.metadata.to_data() + + res['module'] = None + if self.module: + res['module'] = self.module.to_data() + + res['updated_at'] = None + if self.updated_at: + res['updated_at'] = self.updated_at.strftime('%Y-%m-%d %H:%M:%S %z') + + res['tags'] = [] + for tag in self.tags: + res['tags'].append(tag) + + res['tasks'] = [] + for task in self.tasks: + res['tasks'].append(copy.copy(task)) + + return res + + # ------------------------------------------------------------------------- + def copy_to(self, new): + + if not isinstance(new, CurrentModuleReleaseInfo): + msg = _("Parameter {p!r} is not of class {c!r}, but of {e!r} instead.").format( + p='new', c='CurrentModuleReleaseInfo', e=new.__class__.__name__) + raise TypeError(msg) + + super(CurrentModuleReleaseInfo, self).copy_to(new) + + new.changelog = self.changelog + new.deleted_for = self.deleted_for + new.downloads = self.downloads + new.file_md5 = self.file_md5 + new.license = self.license + new.pdk = self.pdk + new.readme = self.readme + new.reference = self.reference + new.updated_at = self.updated_at + new.validation_score = self.validation_score + + new.metadata = None + if self.metadata: + new.metadata = copy.copy(self.metadata) + + new.module = None + if self.module: + new.module = copy.copy(self.module) + + new.tags = [] + if self.tags: + for tag in self.tags: + new.tags.append(tag) + + new.tasks = [] + if self.tasks: + for task in self.tasks: + new.tasks.append(copy.copy(task)) + + # ------------------------------------------------------------------------- + def __eq__(self, other): + + if self.verbose > 4: + LOG.debug(_("Comparing {} objects ...").format(self.__class__.__name__)) + + if not super(CurrentModuleReleaseInfo, self).__eq__(other): + return False + + if not isinstance(other, CurrentModuleReleaseInfo): + return False + + if self.changelog != other.changelog: + return False + if self.deleted_for != other.deleted_for: + return False + if self.downloads != other.downloads: + return False + if self.file_md5 != other.file_md5: + return False + if self.license != other.license: + return False + if self.pdk != other.pdk: + return False + if self.readme != other.readme: + return False + if self.reference != other.reference: + return False + if self.updated_at != other.updated_at: + return False + if self.validation_score != other.validation_score: + return False + if self.metadata != other.metadata: + return False + if self.module != other.module: + return False + if self.tags != other.tags: + return False + if self.tasks != other.tasks: + return False + + return True + + # ------------------------------------------------------------------------- + def apply_data(self, data): + + super(CurrentModuleReleaseInfo, self).apply_data(data) + + if 'changelog' in data and data['changelog']: + self.changelog = data['changelog'] + if 'deleted_for' in data and data['deleted_for']: + self.deleted_for = data['deleted_for'] + if 'downloads' in data and data['downloads']: + self.downloads = data['downloads'] + if 'file_md5' in data and data['file_md5']: + self.file_md5 = data['file_md5'] + if 'license' in data and data['license']: + self.license = data['license'] + + if 'metadata' in data and data['metadata']: + self.metadata = ModuleMetadata.from_json_data( + data['metadata'], appname=self.appname, + verbose=self.verbose, base_dir=self.base_dir) + + if 'module' in data and data['module']: + self.module = BaseForgeModuleInfo.from_data( + data['module'], appname=self.appname, + verbose=self.verbose, base_dir=self.base_dir) + + if 'pdk' in data: + self.pdk = data['pdk'] + if 'readme' in data: + self.readme = data['readme'] + if 'reference' in data: + self.reference = data['reference'] + if 'updated_at' in data: + self.reference = data['updated_at'] + if 'validation_score' in data: + self.validation_score = data['validation_score'] + + self.tags = [] + if 'tags' in data and data['tags']: + if self.verbose > 2: + LOG.debug( + "Tags in current release of {!r}:".format(self.slug) + '\n' + pp(data['tags'])) + for tag in data['tags']: + self.tags.append(tag) + + self.tasks = [] + if 'tasks' in data and data['tasks']: + if self.verbose > 3: + LOG.debug( + "Tasks in current release of {!r}:".format(self.slug) + + '\n' + pp(data['tasks'])) + for task in data['tasks']: + self.tasks.append(copy.copy(task)) + + +# ============================================================================= +if __name__ == "__main__": + + pass + +# ============================================================================= + +# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 list diff --git a/lib/webhooks/forge/mod_dict.py b/lib/webhooks/forge/mod_dict.py new file mode 100644 index 0000000..daec854 --- /dev/null +++ b/lib/webhooks/forge/mod_dict.py @@ -0,0 +1,346 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +@author: Frank Brehm +@contact: frank.brehm@pixelpark.com +@copyright: © 2018 by Frank Brehm, Publicies Pixelpark GmbH, Berlin +@summary: A module for encapsulating all information about a Puppet module +""" +from __future__ import absolute_import + +# Standard modules +import logging + +from collections import MutableMapping + +from functools import cmp_to_key + +# Third party modules + +# Own modules +from fb_tools.common import to_bool +from fb_tools.obj import FbBaseObject + +from .mod_info import ForgeModuleInfo + +from ..xlate import XLATOR + +__version__ = '0.1.0' + +LOG = logging.getLogger(__name__) + +_ = XLATOR.gettext +ngettext = XLATOR.ngettext + + +# ============================================================================= +class ForgeModuleDict(MutableMapping, FbBaseObject): + """ + A dictionary containing ForgeModuleInfo objects. + It works like a dict. + i.e.: + modules = ForgeModuleDict(ForgeModuleInfo(full_name='puppet-mongodb', ...)) + and + modules['puppet-mongodb'] returns a ForgeModuleInfo object for puppet module 'puppet-mongodb' + """ + + msg_invalid_modinfo_type = _("Invalid value type {{!r}} to set, only {} allowed.").format( + 'ForgeModuleInfo') + msg_key_not_name = _("The key {k!r} must be equal to the full name {n!r} of the module.") + msg_none_type_error = _("None type as key is not allowed.") + msg_empty_key_error = _("Empty key {!r} is not allowed.") + msg_no_modinfo_dict = _("Object {{!r}} is not a {} object.").format('ForgeModuleDict') + + + # ------------------------------------------------------------------------- + # __init__() method required to create instance from class. + def __init__( + self, appname=None, verbose=0, version=__version__, base_dir=None, + sort_by_name=False, *args, **kwargs): + + self._map = dict() + self._sort_by_name = False + + super(ForgeModuleDict, self).__init__( + appname=appname, verbose=verbose, version=version, + base_dir=base_dir, initialized=False, + ) + + self.sort_by_name = sort_by_name + + for arg in args: + self.append(arg) + + # ----------------------------------------------------------- + @property + def sort_by_name(self): + """Sorting modules by name and vendor, instead of the full name.""" + return self._sort_by_name + + @sort_by_name.setter + def sort_by_name(self, value): + self._sort_by_name = to_bool(value) + + # ------------------------------------------------------------------------- + def _set_item(self, key, module_info): + + if not isinstance(module_info, ForgeModuleInfo): + raise TypeError(self.msg_invalid_modinfo_type.format(module_info.__class__.__name__)) + + full_name = module_info.full_name + if full_name != key.lower(): + raise KeyError(self.msg_key_not_name.format(k=key, n=full_name)) + + self._map[full_name] = module_info + + # ------------------------------------------------------------------------- + def append(self, module_info): + + if not isinstance(module_info, ForgeModuleInfo): + raise TypeError(self.msg_invalid_modinfo_type.format(module_info.__class__.__name__)) + self._set_item(module_info.full_name, module_info) + + # ------------------------------------------------------------------------- + def as_dict(self, short=True): + + res = super(ForgeModuleDict, self).as_dict(short=short) + + res['sort_by_name'] = self.sort_by_name + res['items'] = {} + res['keys'] = [] + for full_name in self.keys(): + res['items'][full_name] = self._map[full_name].as_dict(short) + res['keys'].append(str(full_name)) + + return res + + # ------------------------------------------------------------------------- + def _get_item(self, key): + + if key is None: + raise TypeError(self.msg_none_type_error) + + full_name = str(key).lower().strip() + if full_name == '': + raise ValueError(self.msg_empty_key_error.format(key)) + + return self._map[full_name] + + # ------------------------------------------------------------------------- + def get(self, key): + return self._get_item(key) + + # ------------------------------------------------------------------------- + def _del_item(self, key, strict=True): + + if key is None: + raise TypeError(self.msg_none_type_error) + + full_name = str(key).lower().strip() + if full_name == '': + raise ValueError(self.msg_empty_key_error.format(key)) + + if not strict and full_name not in self._map: + return + + del self._map[full_name] + + # ------------------------------------------------------------------------- + def merge(self, item): + + if not isinstance(item, ForgeModuleInfo): + raise TypeError(self.msg_invalid_modinfo_type.format(item.__class__.__name__)) + + full_name = item.full_name + if full_name in self._map.keys(): + if self.verbose > 2: + LOG.debug("Merging module {!r}.".format(full_name)) + self._map[full_name].merge_in(item) + else: + if self.verbose > 2: + LOG.debug("New module {!r}.".format(full_name)) + self._set_item(full_name, item) + + # ------------------------------------------------------------------------- + # The next five methods are requirements of the ABC. + def __setitem__(self, key, value): + self._set_item(key, value) + + # ------------------------------------------------------------------------- + def __getitem__(self, key): + return self._get_item(key) + + # ------------------------------------------------------------------------- + def __delitem__(self, key): + self._del_item(key) + + # ------------------------------------------------------------------------- + def __iter__(self): + + for full_name in self.keys(): + yield full_name + + # ------------------------------------------------------------------------- + def __len__(self): + return len(self._map) + + # ------------------------------------------------------------------------- + # The next methods aren't required, but nice for different purposes: + def __str__(self): + '''returns simple dict representation of the mapping''' + return str(self._map) + + # ------------------------------------------------------------------------- + def __contains__(self, key): + if key is None: + raise TypeError(self.msg_none_type_error) + + full_name = str(key).lower().strip() + if full_name == '': + raise ValueError(self.msg_empty_key_error.format(key)) + + if self.verbose > 4: + LOG.debug("Searching for key {!r} ...".format(key)) + + return full_name in self._map + + # ------------------------------------------------------------------------- + def keys(self): + + def compare_items(x, y): + if self.sort_by_name: + if self.verbose > 4: + LOG.debug("Comparing names {!r} > {!r}".format(x.name, y.name)) + if x.name != y.name: + if x.name > y.name: + return 1 + return -1 + if self.verbose > 4: + LOG.debug("Comparing vendor {!r} > {!r}".format(x.vendor, y.vendor)) + if x.vendor != y.vendor: + if x.vendor > y.vendor: + return 1 + return -1 + return 0 + if self.verbose > 4: + LOG.debug("Comparing full names {!r} > {!r}".format(x.full_name, y.full_name)) + if x.full_name != y.full_name: + if x.full_name > y.full_name: + return 1 + return -1 + return 0 + + return sorted( + self._map.keys(), + key=lambda x: cmp_to_key(compare_items)(self._map[x])) + + # ------------------------------------------------------------------------- + def items(self): + + item_list = [] + + for full_name in self.keys(): + item_list.append((full_name, self._map[full_name])) + + return item_list + + # ------------------------------------------------------------------------- + def values(self): + + value_list = [] + for full_name in self.keys(): + value_list.append(self._map[full_name]) + return value_list + + # ------------------------------------------------------------------------- + def __eq__(self, other): + + if not isinstance(other, ForgeModuleDict): + raise TypeError(self.msg_no_modinfo_dict.format(other)) + + return self._map == other._map + + # ------------------------------------------------------------------------- + def __ne__(self, other): + + if not isinstance(other, ForgeModuleDict): + raise TypeError(self.msg_no_modinfo_dict.format(other)) + + return self._map != other._map + + # ------------------------------------------------------------------------- + def pop(self, key, *args): + + if key is None: + raise TypeError(self.msg_none_type_error) + + full_name = str(key).lower().strip() + if full_name == '': + raise ValueError(self.msg_empty_key_error.format(key)) + + return self._map.pop(full_name, *args) + + # ------------------------------------------------------------------------- + def popitem(self): + + if not len(self._map): + return None + + full_name = self.keys()[0] + zone = self._map[full_name] + del self._map[full_name] + return (full_name, zone) + + # ------------------------------------------------------------------------- + def clear(self): + self._map = dict() + + # ------------------------------------------------------------------------- + def setdefault(self, key, default): + + if key is None: + raise TypeError(self.msg_none_type_error) + + full_name = str(key).lower().strip() + if full_name == '': + raise ValueError(self.msg_empty_key_error.format(key)) + + if not isinstance(default, ForgeModuleInfo): + raise TypeError(self.msg_invalid_modinfo_type.format(default.__class__.__name__)) + + if full_name in self._map: + return self._map[full_name] + + self._set_item(full_name, default) + return default + + # ------------------------------------------------------------------------- + def update(self, other): + + if isinstance(other, ForgeModuleDict) or isinstance(other, dict): + for full_name in other.keys(): + self._set_item(full_name, other[full_name]) + return + + for tokens in other: + key = tokens[0] + value = tokens[1] + self._set_item(key, value) + + # ------------------------------------------------------------------------- + def as_list(self, short=True): + + res = [] + for full_name in self.keys(): + res.append(self._map[full_name].as_dict(short)) + return res + + +# ============================================================================= + +if __name__ == "__main__": + + pass + +# ============================================================================= +# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 list diff --git a/lib/webhooks/forge/mod_info.py b/lib/webhooks/forge/mod_info.py new file mode 100644 index 0000000..025ff5f --- /dev/null +++ b/lib/webhooks/forge/mod_info.py @@ -0,0 +1,546 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +@author: Frank Brehm +@contact: frank.brehm@pixelpark.com +@copyright: © 2018 by Frank Brehm, Publicies Pixelpark GmbH, Berlin +@summary: A module for encapsulating all information about a Puppet module +""" +from __future__ import absolute_import + +# Standard modules +import logging +import re +import copy +import warnings +import time +import datetime +import collections + +# Third party modules +import requests +import pytz + +# Own modules +from fb_tools.common import pp, to_str, to_bool, is_sequence +from fb_tools.obj import FbBaseObjectError, FbBaseObject + +from ..xlate import XLATOR + +from ..base_module_info import BaseModuleInfoError, BaseModuleInfo + +from ..module_meta_info import ModuleMetadata + +from . import parse_forge_date + +from .mod_release_info import ModuleReleaseInfo +from .mod_release_list import ModuleReleaseList +from .cur_mod_release_info import CurrentModuleReleaseInfo +from .owner_info import ForgeOwnerInfo + + +__version__ = '0.1.0' + +LOG = logging.getLogger(__name__) + +_ = XLATOR.gettext +ngettext = XLATOR.ngettext + + +# ============================================================================= +class ForgeModuleInfo(BaseModuleInfo): + """Class for encapsulating all information about a Puppet module from Puppet Forge.""" + + # ------------------------------------------------------------------------- + def __init__( + self, appname=None, verbose=0, version=__version__, base_dir=None, + initialized=None, name=None, vendor=None, full_name=None): + + self.current_release = None + self.releases = None + self.owner = None + + self._created_at = None + self._deprecated_at = None + self._deprecated_for = None + self._downloads = None + self._endorsement = None + self._feedback_score = None + self._homepage_url = None + self._issues_url = None + self._module_group = None + self._slug = None + self._superseded_by = None + self._supported = None + self._updated_at = None + self._uri = None + + super(ForgeModuleInfo, self).__init__( + appname=appname, verbose=verbose, version=version, base_dir=base_dir, + initialized=False, name=name, vendor=vendor, full_name=full_name + ) + + if initialized is not None: + self.initialized = initialized + + # ------------------------------------------------------------------------- + def as_dict(self, short=True): + + res = super(ForgeModuleInfo, self).as_dict(short=short) + + res['created_at'] = self.created_at + res['deprecated_at'] = self.deprecated_at + res['deprecated_for'] = self.deprecated_for + res['downloads'] = self.downloads + res['endorsement'] = self.endorsement + res['feedback_score'] = self.feedback_score + res['homepage_url'] = self.homepage_url + res['issues_url'] = self.issues_url + res['module_group'] = self.module_group + res['slug'] = self.slug + res['superseded_by'] = self.superseded_by + res['supported'] = self.supported + res['updated_at'] = self.updated_at + res['uri'] = self.uri + + return res + + # ------------------------------------------------------------------------- + @property + def created_at(self): + """Creation date of this forge module.""" + return self._created_at + + @created_at.setter + def created_at(self, value): + if value is None: + self._created_at = None + return + if isinstance(value, datetime.datetime): + self._created_at = value + return + v = str(value).strip() + if value == '': + self._created_at = None + return + self._created_at = parse_forge_date(v) + + # ------------------------------------------------------------------------- + @property + def deprecated_at(self): + """Date of deprecation of this forge module.""" + return self._deprecated_at + + @deprecated_at.setter + def deprecated_at(self, value): + if value is None: + self._deprecated_at = None + return + if isinstance(value, datetime.datetime): + self._deprecated_at = value + return + v = str(value).strip() + if value == '': + self._deprecated_at = None + return + self._deprecated_at = parse_forge_date(v) + + # ------------------------------------------------------------------------- + @property + def deprecated_for(self): + """The reason of deprecation of this forge module.""" + return self._deprecated_for + + @deprecated_for.setter + def deprecated_for(self, value): + if value is None: + self._deprecated_for = None + return + v = str(value).strip() + if value == '': + self._deprecated_for = None + return + self._deprecated_for = v + + # ------------------------------------------------------------------------- + @property + def downloads(self): + """The number of downloads of this module.""" + return self._downloads + + @downloads.setter + def downloads(self, value): + if value is None: + self._downloads = None + return + v = int(value) + if v < 0: + msg = _( + "The number of {w!r} must be greater or equal to zero " + "(Given: {v}).").format(w='downloads', v=value) + raise ValueError(msg) + self._downloads = v + + # ------------------------------------------------------------------------- + @property + def endorsement(self): + """The endorsement of this forge module..""" + return self._endorsement + + @endorsement.setter + def endorsement(self, value): + if value is None: + self._endorsement = None + return + v = str(value).strip() + if value == '': + self._endorsement = None + return + self._endorsement = v + + # ------------------------------------------------------------------------- + @property + def feedback_score(self): + """The number of feedback_scores of this module.""" + return self._feedback_score + + @feedback_score.setter + def feedback_score(self, value): + if value is None: + self._feedback_score = None + return + v = int(value) + if v < 0: + msg = _( + "The number of {w!r} must be greater or equal to zero " + "(Given: {v}).").format(w='feedback_score', v=value) + raise ValueError(msg) + self._feedback_score = v + + # ------------------------------------------------------------------------- + @property + def homepage_url(self): + """The homepage URL of this forge module.""" + return self._homepage_url + + @homepage_url.setter + def homepage_url(self, value): + if value is None: + self._homepage_url = None + return + v = str(value).strip() + if value == '': + self._homepage_url = None + return + self._homepage_url = v + + # ------------------------------------------------------------------------- + @property + def issues_url(self): + """The issues URL of this forge module.""" + return self._issues_url + + @issues_url.setter + def issues_url(self, value): + if value is None: + self._issues_url = None + return + v = str(value).strip() + if value == '': + self._issues_url = None + return + self._issues_url = v + + # ------------------------------------------------------------------------- + @property + def module_group(self): + """The module group of this forge module.""" + return self._module_group + + @module_group.setter + def module_group(self, value): + if value is None: + self._module_group = None + return + v = str(value).strip() + if value == '': + self._module_group = None + return + self._module_group = v + + # ------------------------------------------------------------------------- + @property + def slug(self): + """The slug of this forge module.""" + return self._slug + + @slug.setter + def slug(self, value): + if value is None: + self._slug = None + return + v = str(value).strip() + if value == '': + self._slug = None + return + self._slug = v + + # ------------------------------------------------------------------------- + @property + def superseded_by(self): + """The name of the superseding module this forge module.""" + return self._superseded_by + + @superseded_by.setter + def superseded_by(self, value): + if value is None: + self._superseded_by = None + return + if isinstance(value, collections.Mapping): + self._superseded_by = copy.copy(value) + return + + v = str(value).strip() + if value == '': + self._superseded_by = None + return + self._superseded_by = v + + # ------------------------------------------------------------------------- + @property + def supported(self): + """Is this forge module supported by Puppetlabs?.""" + return self._supported + + @supported.setter + def supported(self, value): + if value is None: + self._supported = None + return + self._supported = to_bool(value) + + # ------------------------------------------------------------------------- + @property + def supported(self): + """Is this forge module supported by Puppetlabs?.""" + return self._supported + + @supported.setter + def supported(self, value): + if value is None: + self._supported = None + return + self._supported = to_bool(value) + + # ------------------------------------------------------------------------- + @property + def updated_at(self): + """Last update date of this forge module.""" + return self._updated_at + + @updated_at.setter + def updated_at(self, value): + if value is None: + self._updated_at = None + return + if isinstance(value, datetime.datetime): + self._updated_at = value + return + v = str(value).strip() + if value == '': + self._updated_at = None + return + self._updated_at = parse_forge_date(v) + + # ------------------------------------------------------------------------- + @property + def uri(self): + """The URI of this forge module.""" + return self._uri + + @uri.setter + def uri(self, value): + if value is None: + self._uri = None + return + v = str(value).strip() + if value == '': + self._uri = None + return + self._uri = v + + # ------------------------------------------------------------------------- + def to_data(self): + """Returning a dict, which can be used to re-instantiate this module info.""" + + res = {} + + res['deprecated_for'] = self.deprecated_for + res['downloads'] = self.downloads + res['endorsement'] = self.endorsement + res['feedback_score'] = self.feedback_score + res['homepage_url'] = self.homepage_url + res['issues_url'] = self.issues_url + res['module_group'] = self.module_group + res['name'] = self.name + res['slug'] = self.slug + res['superseded_by'] = self.superseded_by + res['supported'] = self.supported + res['uri'] = self.uri + + res['created_at'] = None + if self.created_at: + res['created_at'] = self.created_at.strftime('%Y-%m-%d %H:%M:%S %z') + + res['deprecated_at'] = None + if self.deprecated_at: + res['deprecated_at'] = self.deprecated_at.strftime('%Y-%m-%d %H:%M:%S %z') + + res['updated_at'] = None + if self.updated_at: + res['updated_at'] = self.updated_at.strftime('%Y-%m-%d %H:%M:%S %z') + + res['releases'] = [] + for release in self.releases: + res['releases'].append(release.to_data()) + + res['current_release'] = None + if self.current_release: + res['current_release'] = self.current_release.to_data() + + res['owner'] = None + if self.owner: + res['owner'] = self.owner.to_data() + + return res + + # ------------------------------------------------------------------------- + def apply_data(self, data): + + self.releases = ModuleReleaseList( + appname=self.appname, verbose=self.verbose, base_dir=self.base_dir) + + for prop_name in ( + 'created_at', 'deprecated_at', 'deprecated_for', 'downloads', 'endorsement', + 'feedback_score', 'homepage_url', 'issues_url', 'module_group', 'slug', + 'superseded_by', 'supported', 'updated_at', 'uri'): + if prop_name in data and data[prop_name]: + setattr(self, prop_name, data[prop_name]) + + if 'current_release' in data and data['current_release']: + self.current_release = CurrentModuleReleaseInfo.from_data( + data['current_release'], appname=self.appname, + verbose=self.verbose, base_dir=self.base_dir) + + if 'releases' in data: + for rel in data['releases']: + release = ModuleReleaseInfo.from_data( + rel, appname=self.appname, verbose=self.verbose, base_dir=self.base_dir) + if release: + self.releases.append(release) + + self.releases.initialized = True + + if 'owner' in data and data['owner']: + self.owner = ForgeOwnerInfo.from_data( + data['owner'], appname=self.appname, verbose=self.verbose, base_dir=self.base_dir) + + # ------------------------------------------------------------------------- + @classmethod + def from_data(cls, data, appname=None, verbose=0, base_dir=None): + + if verbose > 3: + LOG.debug(_( + "Trying to instantiate a {}-object from:").format( + cls.__name__) + '\n' + pp(data)) + + if 'slug' not in data: + msg = _("Did not found {!r}-definition in data for forge module:").format( + 'slug') + '\n' + pp(data) + LOG.error(msg) + return None + + full_name = data['slug'].strip() + if full_name == '': + msg = _("Found empty {!r}-definition in data for forge module:").format( + 'slug') + '\n' + pp(data) + LOG.error(msg) + return None + + module_info = cls(appname=appname, verbose=verbose, base_dir=base_dir, full_name=full_name) + module_info.apply_data(data) + + return module_info + + # ------------------------------------------------------------------------- + @classmethod + def get_from_forge( + cls, full_name, forge_uri, http_timeout=30, + appname=None, verbose=0, base_dir=None): + + url = "{url}/{name}".format(url=forge_uri, name=full_name) + module_info = None + + LOG.info(_("Trying to get module {m!r} from Puppet forge {u!r} ...").format( + m=full_name, u=url)) + + session = requests.Session() + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + response = session.request('GET', url, timeout=http_timeout) + if w: + warn_class = w[-1].category.__name__ + warn_msg = '{}: {}'.format( + warn_class, w[-1].message) + if warn_class == 'SubjectAltNameWarning': + LOG.debug(warn_msg) + else: + LOG.warn(warn_msg) + + LOG.debug("Got status code: {}.".format(response.status_code)) + if not response.ok: + LOG.debug("Did not found module {} on Puppet forge.".format(full_name)) + return None + + if not response.text: + LOG.warn(_("No output for URL {!r}.").format(url)) + return None + if verbose > 3: + msg = "Output:\n{}".format(response.text) + LOG.debug(msg) + + try: + module_info = cls( + appname=appname, verbose=verbose, base_dir=base_dir, + full_name=full_name, + ) + except ForgeModuleInfoError as e: + LOG.warn("{c}: {e}".format(c=e.__class__.__name__, e=e)) + return None + + data = response.json() + if verbose > 4: + LOG.debug("Performing forge data:\n" + pp(data)) + module_info.apply_data(data) + + if module_info.superseded_by: + subst = module_info.superseded_by + if verbose > 2: + LOG.debug("Superseded info:\n" + pp(subst)) + if 'slug' in subst: + subst = subst['slug'] + LOG.warning(_( + "Module {c!r} is deprecated at Puppet forge and should be substituted " + "by module {n!r}.").format(c=module_info.slug, n=subst)) + + return module_info + + +# ============================================================================= +if __name__ == "__main__": + + pass + +# ============================================================================= + +# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 list diff --git a/lib/webhooks/forge/mod_release_info.py b/lib/webhooks/forge/mod_release_info.py new file mode 100644 index 0000000..b8238fd --- /dev/null +++ b/lib/webhooks/forge/mod_release_info.py @@ -0,0 +1,280 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +@author: Frank Brehm +@contact: frank.brehm@pixelpark.com +@copyright: © 2018 by Frank Brehm, Publicies Pixelpark GmbH, Berlin +@summary: A module for encapsulating all information about a Puppet module +""" +from __future__ import absolute_import + +# Standard modules +import logging +import re +import copy +import warnings +import time +import datetime +import collections + + +# Own modules +from fb_tools.common import pp, to_str, to_bool, is_sequence + +from ..xlate import XLATOR + +from . import parse_forge_date +from . import ReleaseInfoError, ForgeModuleInfoError, ForgeModuleInfoTypeError +from . import BaseForgeObject + +__version__ = '0.1.0' + +LOG = logging.getLogger(__name__) + +_ = XLATOR.gettext +ngettext = XLATOR.ngettext + + +# ============================================================================= +class ModuleReleaseInfo(BaseForgeObject): + """Class for encapsulating information about a Puppet module release from Puppet Forge.""" + + # ------------------------------------------------------------------------- + def __init__( + self, appname=None, verbose=0, version=__version__, base_dir=None, + initialized=None): + + self._created_at = None + self._deleted_at = None + self._file_size = None + self._file_uri = None + self._supported = None + self._release_version = None + + super(ModuleReleaseInfo, self).__init__( + appname=appname, verbose=verbose, version=version, base_dir=base_dir, initialized=False) + + if initialized is not None: + self.initialized = initialized + + # ------------------------------------------------------------------------- + def as_dict(self, short=True): + """ + Transforms the elements of the object into a dict + + @return: structure as dict + @rtype: dict + """ + + res = super(ModuleReleaseInfo, self).as_dict(short=short) + + res['created_at'] = self.created_at + res['deleted_at'] = self.deleted_at + res['file_size'] = self.file_size + res['file_uri'] = self.file_uri + res['supported'] = self.supported + res['release_version'] = self.release_version + + return res + + # ------------------------------------------------------------------------- + @property + def created_at(self): + """Creation date of this release.""" + return self._created_at + + @created_at.setter + def created_at(self, value): + if value is None: + self._created_at = None + return + if isinstance(value, datetime.datetime): + self._created_at = value + return + v = str(value).strip() + if value == '': + self._created_at = None + return + self._created_at = parse_forge_date(v) + + # ------------------------------------------------------------------------- + @property + def deleted_at(self): + """Deletion date of this release.""" + return self._deleted_at + + @deleted_at.setter + def deleted_at(self, value): + if value is None: + self._deleted_at = None + return + if isinstance(value, datetime.datetime): + self._deleted_at = value + return + v = str(value).strip() + if value == '': + self._deleted_at = None + return + self._deleted_at = parse_forge_date(v) + + # ------------------------------------------------------------------------- + @property + def file_size(self): + """The file size in bytes of this release.""" + return self._file_size + + @file_size.setter + def file_size(self, value): + if value is None: + self._file_size = None + return + v = int(value) + if v < 0: + msg = _( + "The file size of a release must be greater or equal to zero " + "(Given: {}).").format(value) + raise ValueError(msg) + self._file_size = v + + # ------------------------------------------------------------------------- + @property + def file_uri(self): + """The file URI of this release.""" + return self._file_uri + + @file_uri.setter + def file_uri(self, value): + if value is None: + self._file_uri = None + return + v = str(value).strip() + if value == '': + self._file_uri = None + return + self._file_uri = v + + # ------------------------------------------------------------------------- + @property + def release_version(self): + """The version number of this release.""" + return self._release_version + + @release_version.setter + def release_version(self, value): + if value is None: + self._release_version = None + return + v = str(value).strip() + if value == '': + self._release_version = None + return + self._release_version = v + + # ------------------------------------------------------------------------- + @property + def supported(self): + """The URI of this release.""" + return self._supported + + @supported.setter + def supported(self, value): + if value is None: + self._supported = None + return + self._supported = to_bool(value) + + # ------------------------------------------------------------------------- + def to_data(self): + """Returning a dict, which can be used to re-instantiate this module info.""" + + res = super(ModuleReleaseInfo, self).to_data() + + res['created_at'] = None + if self.created_at: + res['created_at'] = self.created_at.strftime('%Y-%m-%d %H:%M:%S %z') + + res['deleted_at'] = None + if self.deleted_at: + res['deleted_at'] = self.deleted_at.strftime('%Y-%m-%d %H:%M:%S %z') + + res['file_size'] = self.file_size + res['file_uri'] = self.file_uri + res['supported'] = self.supported + res['version'] = self.release_version + + return res + + # ------------------------------------------------------------------------- + def copy_to(self, new): + + if not isinstance(new, ModuleReleaseInfo): + msg = _("Parameter {p!r} is not of class {c!r}, but of {e!r} instead.").format( + p='new', c='ModuleReleaseInfo', e=new.__class__.__name__) + raise TypeError(msg) + + super(ModuleReleaseInfo, self).copy_to(new) + new.created_at = self.created_at + new.deleted_at = self.deleted_at + new.file_size = self.file_size + new.file_uri = self.file_uri + new.supported = self.supported + new.release_version = self.release_version + + # ------------------------------------------------------------------------- + def __eq__(self, other): + + if self.verbose > 4: + LOG.debug(_("Comparing {} objects ...").format(self.__class__.__name__)) + + if not super(ModuleReleaseInfo, self).__eq__(other): + return False + + if not isinstance(other, ModuleReleaseInfo): + return False + + if self.created_at != other.created_at: + return False + if self.deleted_at != other.deleted_at: + return False + if self.file_size != other.file_size: + return False + if self.file_uri != other.file_uri: + return False + if self.supported != other.supported: + return False + if self.release_version != other.release_version: + return False + + return True + + # ------------------------------------------------------------------------- + def apply_data(self, data): + + super(ModuleReleaseInfo, self).apply_data(data) + + if 'created_at' in data and data['created_at']: + self.created_at = data['created_at'] + if 'deleted_at' in data and data['deleted_at']: + self.deleted_at = data['deleted_at'] + if 'file_size' in data and data['file_size']: + self.file_size = data['file_size'] + if 'file_uri' in data and data['file_uri']: + self.file_uri = data['file_uri'] + if 'slug' in data and data['slug']: + self.slug = data['slug'] + if 'supported' in data and data['supported']: + self.supported = data['supported'] + if 'uri' in data and data['uri']: + self.uri = data['uri'] + if 'version' in data and data['version']: + self.release_version = data['version'] + + +# ============================================================================= +if __name__ == "__main__": + + pass + +# ============================================================================= + +# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 list diff --git a/lib/webhooks/forge/mod_release_list.py b/lib/webhooks/forge/mod_release_list.py new file mode 100644 index 0000000..b9118de --- /dev/null +++ b/lib/webhooks/forge/mod_release_list.py @@ -0,0 +1,218 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +@author: Frank Brehm +@contact: frank.brehm@pixelpark.com +@copyright: © 2018 by Frank Brehm, Publicies Pixelpark GmbH, Berlin +@summary: A module for encapsulating all information about a Puppet module +""" +from __future__ import absolute_import + +# Standard modules +import logging +import re +import copy +import warnings +import time +import datetime +import collections + +# Own modules +from fb_tools.common import pp, to_str, to_bool, is_sequence +from fb_tools.obj import FbBaseObjectError, FbBaseObject + +from ..xlate import XLATOR + +from ..base_module_info import BaseModuleInfoError, BaseModuleInfo + +from .mod_release_info import ModuleReleaseInfo + +__version__ = '0.1.0' + +LOG = logging.getLogger(__name__) + +_ = XLATOR.gettext +ngettext = XLATOR.ngettext + + +# ============================================================================= +class ModuleReleaseList(collections.MutableSequence, FbBaseObject): + + msg_no_release = _("Invalid type {t!r} as an item of a {c}, only {o} objects are allowed.") + + # ------------------------------------------------------------------------- + def __init__( + self, appname=None, verbose=0, version=__version__, base_dir=None, + initialized=None, *releases): + + self._list = [] + + super(ModuleReleaseList, self).__init__( + appname=appname, verbose=verbose, version=version, base_dir=base_dir, initialized=False) + + for release in releases: + self.append(release) + + # ------------------------------------------------------------------------- + def as_dict(self, short=True): + + res = super(ModuleReleaseList, self).as_dict(short=short) + + res['list'] = [] + for release in self: + res['list'].append(release.as_dict(short=short)) + + return res + + # ------------------------------------------------------------------------- + def to_data(self): + """Returning a list, which can be used to re-instantiate this module info.""" + + res = [] + for release in self: + res.append(release.to_data()) + + return res + + # ------------------------------------------------------------------------- + def index(self, release, *args): + + i = None + j = None + + if len(args) > 0: + if len(args) > 2: + raise TypeError(_("{m} takes at most {max} arguments ({n} given).").format( + m='index()', max=3, n=len(args) + 1)) + i = int(args[0]) + if len(args) > 1: + j = int(args[1]) + + index = 0 + if i is not None: + start = i + if i < 0: + start = len(self._list) + i + wrap = False + end = len(self._list) + if j is not None: + if j < 0: + end = len(self._list) + j + if end < index: + wrap = True + else: + end = j + for index in list(range(len(self._list))): + item = self._list[index] + if index < start: + continue + if index >= end and not wrap: + break + if item == release: + return index + + if wrap: + for index in list(range(len(self._list))): + item = self._list[index] + if index >= end: + break + if item == release: + return index + + msg = _("Release {!r} is not in release list.").format(release.release_version) + raise ValueError(msg) + + # ------------------------------------------------------------------------- + def __contains__(self, release): + + if not isinstance(release, ModuleReleaseInfo): + raise TypeError(self.msg_no_release.format( + t=release.__class__.__name__, c=self.__class__.__name__, o='ModuleReleaseInfo')) + + if not self._list: + return False + + for item in self._list: + if item == release: + return True + + return False + + # ------------------------------------------------------------------------- + def count(self, release): + + if not isinstance(release, ModuleReleaseInfo): + raise TypeError(self.msg_no_release.format( + t=release.__class__.__name__, c=self.__class__.__name__, o='ModuleReleaseInfo')) + + if not self._list: + return 0 + + num = 0 + for item in self._list: + if item == release: + num += 1 + return num + + # ------------------------------------------------------------------------- + def __len__(self): + return len(self._list) + + # ------------------------------------------------------------------------- + def __getitem__(self, key): + return self._list.__getitem__(key) + + # ------------------------------------------------------------------------- + def __reversed__(self): + + return reversed(self._list) + + # ------------------------------------------------------------------------- + def __setitem__(self, key, release): + + if not isinstance(release, ModuleReleaseInfo): + raise TypeError(self.msg_no_release.format( + t=release.__class__.__name__, c=self.__class__.__name__, o='ModuleReleaseInfo')) + + self._list.__setitem__(key, release) + + # ------------------------------------------------------------------------- + def __delitem__(self, key): + + del self._list[key] + + # ------------------------------------------------------------------------- + def append(self, release): + + if not isinstance(release, ModuleReleaseInfo): + raise TypeError(self.msg_no_release.format( + t=release.__class__.__name__, c=self.__class__.__name__, o='ModuleReleaseInfo')) + + self._list.append(release) + + # ------------------------------------------------------------------------- + def insert(self, index, release): + + if not isinstance(release, ModuleReleaseInfo): + raise TypeError(self.msg_no_release.format( + t=release.__class__.__name__, c=self.__class__.__name__, o='ModuleReleaseInfo')) + + self._list.insert(index, release) + + # ------------------------------------------------------------------------- + def __copy__(self): + + new_list = self.__class__() + for release in self._list: + new_list.append(copy.copy(release)) + return new_list + + +# ============================================================================= +if __name__ == "__main__": + + pass + +# ============================================================================= + +# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 list diff --git a/lib/webhooks/forge/owner_info.py b/lib/webhooks/forge/owner_info.py new file mode 100644 index 0000000..201a6e7 --- /dev/null +++ b/lib/webhooks/forge/owner_info.py @@ -0,0 +1,169 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +@author: Frank Brehm +@contact: frank.brehm@pixelpark.com +@copyright: © 2018 by Frank Brehm, Publicies Pixelpark GmbH, Berlin +@summary: A module for encapsulating all information about a Puppet module +""" +from __future__ import absolute_import + +# Standard modules +import logging +import re +import copy +import warnings +import time +import datetime +import collections + + +# Own modules +from fb_tools.common import pp, to_str, to_bool, is_sequence + +from ..xlate import XLATOR + +from . import parse_forge_date +from . import ReleaseInfoError, ForgeModuleInfoError, ForgeModuleInfoTypeError +from . import BaseForgeObject + +__version__ = '0.2.0' + +LOG = logging.getLogger(__name__) + +_ = XLATOR.gettext +ngettext = XLATOR.ngettext + + +# ============================================================================= +class ForgeOwnerInfo(BaseForgeObject): + """Class for encapsulating information about an module owner in Puppet forge.""" + + # ------------------------------------------------------------------------- + def __init__( + self, gravatar_id=None, slug=None, uri=None, username=None, + appname=None, verbose=0, version=__version__, base_dir=None, initialized=None): + + self._gravatar_id = None + self._username = None + + super(ForgeOwnerInfo, self).__init__( + slug=slug, uri=uri, appname=appname, verbose=verbose, version=version, + base_dir=base_dir, initialized=False) + + self.gravatar_id = gravatar_id + self.username = username + + if initialized is not None: + self.initialized = initialized + + # ------------------------------------------------------------------------- + def as_dict(self, short=True): + """ + Transforms the elements of the object into a dict + + @return: structure as dict + @rtype: dict + """ + + res = super(ForgeOwnerInfo, self).as_dict(short=short) + + res['gravatar_id'] = self.gravatar_id + res['username'] = self.username + + return res + + # ------------------------------------------------------------------------- + @property + def gravatar_id(self): + """The Gravatar-Id of this owner.""" + return self._gravatar_id + + @gravatar_id.setter + def gravatar_id(self, value): + if value is None: + self._gravatar_id = None + return + v = str(value).strip() + if value == '': + self._gravatar_id = None + return + self._gravatar_id = v + + # ------------------------------------------------------------------------- + @property + def username(self): + """The username of this owner.""" + return self._username + + @username.setter + def username(self, value): + if value is None: + self._username = None + return + v = str(value).strip() + if value == '': + self._username = None + return + self._username = v + + # ------------------------------------------------------------------------- + def to_data(self): + """Returning a dict, which can be used to re-instantiate this owner info.""" + + res = super(ForgeOwnerInfo, self).to_data() + res['gravatar_id'] = self.gravatar_id + res['username'] = self.username + + return res + + # ------------------------------------------------------------------------- + def copy_to(self, new): + + if not isinstance(new, ForgeOwnerInfo): + msg = _("Parameter {p!r} is not of class {c!r}, but of {e!r} instead.").format( + p='new', c='ForgeOwnerInfo', e=new.__class__.__name__) + raise TypeError(msg) + + super(ForgeOwnerInfo, self).copy_to(new) + new.gravatar_id = self.gravatar_id + new.username = self.username + + # ------------------------------------------------------------------------- + def __eq__(self, other): + + if self.verbose > 4: + LOG.debug(_("Comparing {} objects ...").format(self.__class__.__name__)) + + if not super(ForgeOwnerInfo, self).__eq__(other): + return False + + if not isinstance(other, ForgeOwnerInfo): + return False + + if self.gravatar_id != other.gravatar_id: + return False + if self.username != other.username: + return False + + return True + + # ------------------------------------------------------------------------- + def apply_data(self, data): + + super(ForgeOwnerInfo, self).apply_data(data) + + if 'gravatar_id' in data: + self.gravatar_id = data['gravatar_id'] + if 'username' in data: + self.username = data['username'] + + +# ============================================================================= +if __name__ == "__main__": + + pass + +# ============================================================================= + +# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 list diff --git a/lib/webhooks/get_forge_modules.py b/lib/webhooks/get_forge_modules.py index ef2b89a..89d639e 100644 --- a/lib/webhooks/get_forge_modules.py +++ b/lib/webhooks/get_forge_modules.py @@ -38,7 +38,7 @@ from . import __version__ from .base_app import BaseHookError, BaseHookApp -from .forge_module_info import ForgeModuleInfo, ForgeModulesList +#from .forge_module_info import ForgeModuleInfo, ForgeModulesList from .module_info import ModuleInfo @@ -48,6 +48,9 @@ from .module_meta_info import ModuleMetadata from .puppetfile import Puppetfile, PuppetfileError +from .forge.mod_info import ForgeModuleInfo +from .forge.mod_dict import ForgeModuleDict + from .xlate import XLATOR LOG = logging.getLogger(__name__) @@ -94,7 +97,7 @@ class GetForgeModulesApp(BaseHookApp): appname=appname, base_dir=base_dir, verbose=verbose, version=version, description=description) - self._init_forge_module_list() + self._init_forge_module_dict() # ----------------------------------------------------------- @property @@ -117,9 +120,9 @@ class GetForgeModulesApp(BaseHookApp): ) # ------------------------------------------------------------------------- - def _init_forge_module_list(self): + def _init_forge_module_dict(self): - self.forge_modules = ForgeModulesList( + self.forge_modules = ForgeModuleDict( appname=self.appname, verbose=self.verbose, base_dir=self.base_dir) # ------------------------------------------------------------------------- @@ -242,7 +245,7 @@ class GetForgeModulesApp(BaseHookApp): self.print_out(msg) modules_done = [] - self._init_forge_module_list() + self._init_forge_module_dict() for env in self.environments: @@ -258,6 +261,10 @@ class GetForgeModulesApp(BaseHookApp): print('.', end='', flush=True) else: print('~', end='', flush=True) + + if self.verbose > 1: + LOG.debug(_( + "Got a {}:").format('ForgeModuleDict') + '\n' + pp(self.forge_modules.keys())) if not self.verbose: print()