#!/bin/env python3
# -*- coding: utf-8 -*-
-__version__ = '1.4.7'
+__version__ = '1.5.1'
# vim: ts=4 et list
'__mo_file__': __xlate_mo_file__,
'XLATOR-Object-Type': XLATOR.__class__.__name__,
}
- LOG.info("XLATOR information:\n" + pp(xinfo))
+ LOG.debug("XLATOR information:\n" + pp(xinfo))
return
--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+@author: Frank Brehm
+@contact: frank.brehm@pixelpark.com
+@copyright: © 2018 by Frank Brehm, Publicies Pixelpark GmbH, Berlin
+@summary: A base module for encapsulating all information about a Puppet module
+"""
+from __future__ import absolute_import
+
+# Standard modules
+import logging
+import re
+import copy
+import warnings
+import time
+import datetime
+
+# Third party modules
+import requests
+import pytz
+
+# Own modules
+from fb_tools.common import pp, to_str, to_bool, is_sequence
+from fb_tools.obj import FbBaseObjectError, FbBaseObject
+
+from .xlate import XLATOR
+
+__version__ = '0.1.1'
+
+LOG = logging.getLogger(__name__)
+
+_ = XLATOR.gettext
+ngettext = XLATOR.ngettext
+
+
+# =============================================================================
+class BaseModuleInfoError(FbBaseObjectError):
+
+ pass
+
+
+# =============================================================================
+class BaseModuleInfo(FbBaseObject):
+ """Base class for encapsulating information about a Puppet module."""
+
+ re_split_name = re.compile(r'^\s*([a-z0-9]+)[-/_](\S+)\s*$', re.IGNORECASE)
+
+ # -------------------------------------------------------------------------
+ def __init__(
+ self, appname=None, verbose=0, version=__version__, base_dir=None,
+ initialized=None, name=None, vendor=None, full_name=None):
+
+ self._name = None
+ self._vendor = None
+ self._full_name_orig = None
+
+ super(BaseModuleInfo, self).__init__(
+ appname=appname, verbose=verbose, version=version,
+ base_dir=base_dir, initialized=False,
+ )
+
+ _name = name
+ if name is not None:
+ _name = to_str(name).strip().lower()
+
+ _vendor = vendor
+ if vendor is not None:
+ _vendor = to_str(vendor).strip().lower()
+
+ _full_name = full_name
+ if full_name is not None:
+ _full_name = to_str(full_name).strip().lower()
+
+ if _full_name:
+
+ match = self.re_split_name.match(_full_name)
+ if not match:
+ raise ModuleInfoError(
+ _("Could not analyze given full module name {!r}.").format(
+ full_name))
+
+ self._full_name_orig = _full_name
+
+ if _name:
+ self._name = _name
+ else:
+ self._name = match.group(2).lower()
+
+ if _vendor:
+ self._vendor = _name
+ else:
+ self._vendor = match.group(1).lower()
+
+ else:
+
+ if not _name or not _vendor:
+ msg = _(
+ "There must be given a full module name or the base module name "
+ "and the vendor name (user name on Puppet Forge).")
+ raise BaseModuleInfoError(msg)
+
+ self._name = _name
+ self._vendor = _vendor
+ self._full_name_orig = self.full_name
+
+ # -------------------------------------------------------------------------
+ @property
+ def name(self):
+ """The name of the module without the vendor."""
+ return self._name
+
+ # -------------------------------------------------------------------------
+ @property
+ def vendor(self):
+ """The vendor of the module - the username on Puppet forge."""
+ return self._vendor
+
+ # -------------------------------------------------------------------------
+ @property
+ def full_name(self):
+ """The full name of the module including vandor and name."""
+ if self.name is None:
+ return None
+ if self.vendor is None:
+ return None
+ return "{v}-{n}".format(v=self.vendor, n=self.name)
+
+ # -------------------------------------------------------------------------
+ @property
+ def full_name_orig(self):
+ """The original full name of the module."""
+ if self._full_name_orig:
+ return self._full_name_orig
+ return self.full_name
+
+ @full_name_orig.setter
+ def full_name_orig(self, value):
+ if value is None:
+ self._full_name_orig = None
+ return
+ val = to_str(value).strip().lower()
+ if val:
+ self._full_name_orig = val
+ else:
+ self._full_name_orig = None
+
+ # -------------------------------------------------------------------------
+ def __repr__(self):
+ return str(self)
+
+ # -------------------------------------------------------------------------
+ def __eq__(self, other):
+
+ if self.verbose > 4:
+ LOG.debug("Comparing {}-objects ...".format(self.__class__.__name__))
+
+ if not isinstance(other, BaseModuleInfo):
+ return False
+
+ if self.name != other.name:
+ return False
+
+ if self.vendor != other.vendor:
+ return False
+
+ return True
+
+ # -------------------------------------------------------------------------
+ def as_dict(self, short=True):
+ """
+ Transforms the elements of the object into a dict
+
+ @return: structure as dict
+ @rtype: dict
+ """
+
+ res = super(BaseModuleInfo, self).as_dict(short=short)
+
+ res['name'] = self.name
+ res['vendor'] = self.vendor
+ res['full_name'] = self.full_name
+ res['full_name_orig'] = self.full_name_orig
+
+ return res
+
+ # -------------------------------------------------------------------------
+ def to_data(self):
+ """Returning a dict, which can be used to re-instantiate this module info."""
+
+ res = {}
+
+ res['name'] = self.name
+ res['vendor'] = self.vendor
+ res['full_name'] = self.full_name
+ res['full_name_orig'] = self.full_name_orig
+
+ return res
+
+
+# =============================================================================
+if __name__ == "__main__":
+
+ pass
+
+# =============================================================================
+
+# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 list
from .xlate import XLATOR
-__version__ = '0.1.0'
+from .base_module_info import BaseModuleInfoError, BaseModuleInfo
+
+__version__ = '0.2.0'
LOG = logging.getLogger(__name__)
# =============================================================================
-class ReleasInfoError(FbBaseObjectError):
+class ReleaseInfoError(BaseModuleInfoError):
pass
# =============================================================================
-class ForgeModuleInfoError(FbBaseObjectError):
+class ForgeModuleInfoError(BaseModuleInfoError):
pass
# =============================================================================
-class ModuleReleaseInfo(FbBaseObject):
+class ModuleReleaseInfo(BaseModuleInfo):
"""Class for encapsulating information about a Puppet module release from Puppet Forge."""
+ re_split_name = re.compile(r'^\s*([a-z0-9]+)[-/_](\S+)\s*$', re.IGNORECASE)
+
# -------------------------------------------------------------------------
def __init__(
- self, appname=None, verbose=0, version=__version__, base_dir=None, initialized=None):
+ self, appname=None, verbose=0, version=__version__, base_dir=None,
+ initialized=None, name=None, vendor=None, full_name=None):
+ self._name = None
+ self._vendor = None
+ self._full_name_orig = None
self._created_at = None
self._deleted_at = None
self._file_size = None
self._version = None
super(ModuleReleaseInfo, self).__init__(
- appname=appname, verbose=verbose, version=version,
- base_dir=base_dir, initialized=False,
+ appname=appname, verbose=verbose, version=version, base_dir=base_dir,
+ initialized=False, name=name, vendor=vendor, full_name=full_name
)
+ if initialized is not None:
+ self.initialized = initialized
+
+ # -------------------------------------------------------------------------
+ @classmethod
+ def get_from_forge(
+ cls, full_name, forge_uri, http_timeout=30,
+ appname=None, verbose=0, base_dir=None):
+
+ url = "{url}/{name}".format(url=forge_uri, name=full_name)
+ module_info = None
+
+ LOG.info(_("Trying to get module {m!r} from Puppet forge {u!r} ...").format(
+ m=full_name, u=url))
+
+ session = requests.Session()
+ with warnings.catch_warnings(record=True) as w:
+ warnings.simplefilter("always")
+ response = session.request('GET', url, timeout=http_timeout)
+ if w:
+ warn_class = w[-1].category.__name__
+ warn_msg = '{}: {}'.format(
+ warn_class, w[-1].message)
+ if warn_class == 'SubjectAltNameWarning':
+ LOG.debug(warn_msg)
+ else:
+ LOG.warn(warn_msg)
+
+ LOG.debug("Got status code: {}.".format(response.status_code))
+ if not response.ok:
+ LOG.debug("Did not found module {} on Puppet forge.".format(full_name))
+ return None
+
+ if not response.text:
+ LOG.warn(_("No output for URL {!r}.").format(url))
+ return None
+ if verbose > 3:
+ msg = "Output:\n{}".format(response.text)
+ LOG.debug(msg)
+
+ try:
+ release_info = cls(
+ appname=appname, verbose=verbose, base_dir=base_dir,
+ full_name=full_name,
+ )
+ except ForgeModuleInfoError as e:
+ LOG.warn("{c}: {e}".format(c=e.__class__.__name__, e=e))
+ return None
+
+ version = None
+ source = None
+
+# js_info = response.json()
+# if 'current_release' in js_info and js_info['current_release']:
+# if 'version' in js_info['current_release']:
+# version = js_info['current_release']['version']
+# else:
+# msg = _("Did not found version of current release of module {}.").format(full_name)
+# LOG.warn(msg)
+# if 'metadata' in js_info['current_release'] and js_info['current_release']['metadata']:
+# if ('source' in js_info['current_release']['metadata'] and
+# js_info['current_release']['metadata']['source']):
+# source = str(js_info['current_release']['metadata']['source']).strip()
+# if not source:
+# LOG.warn(_("Did not found source information of module {}.").format(full_name))
+# else:
+# msg = _("Did not found current release of module {}.").format(full_name)
+# LOG.warn(msg)
+
+# LOG.debug("Current version of module {name} is {version}.".format(
+# name=full_name, version=version))
+
+# if version:
+# module_info.forge_version = version
+# if source:
+# module_info.forge_source = source
+
+# if 'homepage_url' in js_info and js_info['homepage_url']:
+# module_info.forge_homepage_url = js_info['homepage_url']
+
+# module_info.forge_releases = []
+# if 'releases' in js_info and is_sequence(js_info['releases']):
+# for release in js_info['releases']:
+# if 'version' in release and release['version']:
+# module_info.forge_releases.append(release['version'].strip())
+
+# module_info.set_ts_checked()
+
+ return release_info
+
# =============================================================================
from .base_app import BaseHookError, BaseHookApp
+from .forge_module_info import ModuleReleaseInfo
+
from .module_info import ModuleInfo
from .module_list import ModuleInfoDict
"""Constructor."""
self.environments = []
+ self.env_modules = {}
+ self.forge_modules = None
self.puppet_root_env_dir = self.puppet_envs_dir
self.forge_uri = self.default_forge_uri
self.http_timeout = self.default_http_timeout
appname=appname, base_dir=base_dir, verbose=verbose,
version=version, description=description)
- self.modules = ModuleInfoDict(
+ self.forge_modules = ModuleInfoDict(
appname=self.appname, verbose=self.verbose, base_dir=self.base_dir,
sort_by_name=self.sort_by_name)
LOG.error(str(e))
else:
six.reraise(*sys.exc_info())
+
self.init_puppet_environments()
self.collect_local_modules()
if self.do_forge:
self.get_forge_information()
- if self.verbose > 2:
- LOG.debug("Found modules:\n{}".format(pp(self.modules.as_list())))
+ #if self.verbose > 2:
+ # LOG.debug("Found modules:\n{}".format(pp(self.modules.as_list())))
- self.write_cache_file()
+ #self.write_cache_file()
print()
d = datetime.datetime.now(LOCALTZ)
# -------------------------------------------------------------------------
def collect_local_modules(self):
- self.modules = ModuleInfoDict(
- appname=self.appname, verbose=self.verbose, base_dir=self.base_dir,
- sort_by_name=self.sort_by_name)
-
for env in self.environments:
LOG.debug("Evaluating environment {!r} ..".format(env))
self.read_puppetfile(env)
self.read_metadata_files(env)
- if self.verbose > 2:
- LOG.debug("ModuleInfoDict:\n{}".format(pp(self.modules.as_dict())))
+ if self.verbose > 2:
+ LOG.debug("ModuleInfoDict:\n{}".format(pp(self.env_modules[env].as_dict())))
# -------------------------------------------------------------------------
def get_forge_information(self):
msg = _("Collecting module information from upstream ...")
LOG.info(msg)
self.print_out(msg)
+ modules_done = []
- for full_name in self.modules.keys():
- self.get_forge_module_info(full_name)
+ for env in self.environments:
+
+ for full_name in self.env_modules[env].keys():
+ if full_name in modules_done:
+ continue
+
+ self.get_forge_module_info(full_name)
if not self.verbose:
if self.modules[full_name].forge_avail:
print('.', end='', flush=True)
# -------------------------------------------------------------------------
def get_forge_module_info(self, full_name):
- module_info = self.modules[full_name]
-
- module_info_forge = ModuleInfo.get_from_forge(
+ release_info = ModuleReleaseInfo.get_from_forge(
full_name, forge_uri=self.forge_uri, http_timeout=self.http_timeout,
appname=self.appname, verbose=self.verbose, base_dir=self.base_dir, )
- if module_info_forge:
- module_info.merge_in(module_info_forge)
- else:
- module_info.set_ts_checked()
- module_info.forge_avail = False
+# module_info = self.modules[full_name]
+#
+# module_info_forge = ModuleInfo.get_from_forge(
+# full_name, forge_uri=self.forge_uri, http_timeout=self.http_timeout,
+# appname=self.appname, verbose=self.verbose, base_dir=self.base_dir, )
+#
+# if module_info_forge:
+# module_info.merge_in(module_info_forge)
+# else:
+# module_info.set_ts_checked()
+# module_info.forge_avail = False
# -------------------------------------------------------------------------
def read_metadata_files(self, env):
LOG.debug("Got infos about module {!r}".format(module_info.full_name))
elif self.verbose > 2:
LOG.debug("Got module info:\n{}".format(pp(module_info.as_dict())))
- self.modules.merge(module_info)
+ self.env_modules[env].merge(module_info)
else:
LOG.warn(_("Got no module info from directory {!r}.").format(module_dir))
LOG.debug("Successful read {!r}.".format(pfile.filename))
if pfile_modules:
for module_info in pfile_modules.values():
- self.modules.merge(module_info)
+ self.env_modules[env].append(module_info)
# -------------------------------------------------------------------------
def init_puppet_environments(self):
- LOG.debug("Collecting all Puppet environments in {!r}.".format(
+ LOG.debug(_("Collecting all Puppet environments in {!r}.").format(
str(self.puppet_root_env_dir)))
pattern = os.path.join(str(self.puppet_root_env_dir), '*')
dirs = glob.glob(pattern)
self.environments = []
+ self.env_modules = {}
for rpath in dirs:
path = pathlib.Path(rpath)
if path.is_dir():
env = path.name
self.environments.append(env)
+ self.env_modules[env] = ModuleInfoDict(
+ appname=self.appname, verbose=self.verbose, base_dir=self.base_dir,
+ sort_by_name=self.sort_by_name)
self.environments.sort(key=str.lower)
if self.verbose > 1:
from .xlate import XLATOR
-__version__ = '1.3.1'
+from .base_module_info import BaseModuleInfoError, BaseModuleInfo
+
+__version__ = '1.4.1'
LOG = logging.getLogger(__name__)
# =============================================================================
-class ModuleInfoError(FbBaseObjectError):
+class ModuleInfoError(BaseModuleInfoError):
pass
# =============================================================================
-class ModuleInfo(FbBaseObject):
+class ModuleInfo(BaseModuleInfo):
"""Class for encapsulating information about a Puppet module."""
- re_split_name = re.compile(r'^\s*([a-z0-9]+)[-/_](\S+)\s*$', re.IGNORECASE)
re_mod_pf_line = re.compile(r'\s*mod\s+\'([^\']+)\'\s*,\s*(\S+.*)\s*$', re.IGNORECASE)
re_pf_line_version = re.compile(r"^\s*'([^']+)'")
re_def_token = re.compile(r'^\s*(?:,\s*)?([^,]+)(?:\s*,|$)')
self, appname=None, verbose=0, version=__version__, base_dir=None,
initialized=None, name=None, vendor=None, full_name=None):
- self._name = None
- self._vendor = None
- self._full_name_orig = None
self._forge_version = None
self.local_versions = {}
self.expected_versions = {}
self._ts_checked = None
super(ModuleInfo, self).__init__(
- appname=appname, verbose=verbose, version=version,
- base_dir=base_dir, initialized=False,
+ appname=appname, verbose=verbose, version=version, base_dir=base_dir,
+ initialized=False, name=name, vendor=vendor, full_name=full_name
)
- _name = name
- if name is not None:
- _name = to_str(name).strip().lower()
-
- _vendor = vendor
- if vendor is not None:
- _vendor = to_str(vendor).strip().lower()
-
- _full_name = full_name
- if full_name is not None:
- _full_name = to_str(full_name).strip().lower()
-
- if _full_name:
-
- match = self.re_split_name.match(_full_name)
- if not match:
- raise ModuleInfoError(
- _("Could not analyze given full module name {!r}.").format(
- full_name))
-
- self._full_name_orig = _full_name
-
- if _name:
- self._name = _name
- else:
- self._name = match.group(2).lower()
-
- if _vendor:
- self._vendor = _name
- else:
- self._vendor = match.group(1).lower()
-
- else:
-
- if not _name or not _vendor:
- msg = _(
- "There must be given a full module name or the base module name "
- "and the vendor name (user name on Puppet Forge).")
- raise ModuleInfoError(msg)
-
- self._name = _name
- self._vendor = _vendor
- self._full_name_orig = self.full_name
-
# -------------------------------------------------------------------------
@property
def forge_version(self):
val = None
self._forge_source = val
- # -------------------------------------------------------------------------
- @property
- def name(self):
- """The name of the module without the vendor."""
- return self._name
-
- # -------------------------------------------------------------------------
- @property
- def vendor(self):
- """The vendor of the module - the username on Puppet forge."""
- return self._vendor
-
- # -------------------------------------------------------------------------
- @property
- def full_name(self):
- """The full name of the module including vandor and name."""
- if self.name is None:
- return None
- if self.vendor is None:
- return None
- return "{v}-{n}".format(v=self.vendor, n=self.name)
-
- # -------------------------------------------------------------------------
- @property
- def full_name_orig(self):
- """The original full name of the module."""
- if self._full_name_orig:
- return self._full_name_orig
- return self.full_name
-
- @full_name_orig.setter
- def full_name_orig(self, value):
- if value is None:
- self._full_name_orig = None
- return
- val = to_str(value).strip().lower()
- if val:
- self._full_name_orig = val
- else:
- self._full_name_orig = None
-
# -------------------------------------------------------------------------
@property
def forge_avail(self):
res = super(ModuleInfo, self).as_dict(short=short)
- res['name'] = self.name
- res['vendor'] = self.vendor
- res['full_name'] = self.full_name
- res['full_name_orig'] = self.full_name_orig
res['forge_version'] = self.forge_version
res['forge_avail'] = self.forge_avail
res['forge_homepage_url'] = self.forge_homepage_url
def to_data(self):
"""Returning a dict, which can be used to re-instantiate this module info."""
- res = {}
+ res = super(ModuleInfo, self).to_data()
- res['name'] = self.name
- res['vendor'] = self.vendor
- res['full_name'] = self.full_name
- res['full_name_orig'] = self.full_name_orig
res['forge_version'] = self.forge_version
res['forge_avail'] = self.forge_avail
res['forge_homepage_url'] = self.forge_homepage_url
import os
import shutil
import copy
+import sys
# Third party modules
+import pkgutil
+
# Own modules
+from fb_tools.common import pp
from . import __version__
self.print_out('')
self.output_env()
+ self.output_python_info()
# -------------------------------------------------------------------------
def output_env(self):
self.print_out()
+ # -------------------------------------------------------------------------
+ def output_python_info(self):
+
+ if self.output_type == 'json':
+ return
+
+ installed_packages = []
+ for mod in pkgutil.iter_modules():
+ if not mod.ispkg:
+ continue
+ installed_packages.append(mod.name)
+ installed_packages.sort(key=str.lower)
+
+ LOG.debug(_("Module list:") + '\n' + pp(installed_packages))
+
+ if self.output_type == 'html':
+ self.output_python_info_html(installed_packages)
+ else:
+ self.output_python_info_txt(installed_packages)
+
+ # -------------------------------------------------------------------------
+ def output_python_info_html(self, installed_packages):
+
+ return
+
+ # -------------------------------------------------------------------------
+ def output_python_info_txt(self, installed_packages):
+
+ print()
+
+ pv_label = _("Python version:")
+ label_len = len(pv_label)
+ wrapper = textwrap.TextWrapper()
+ wrapper.width = self.max_line_length - label_len - 1
+ wrapper.initial_indent = ''
+ wrapper.subsequent_indent = ' ' * (label_len + 1)
+ print(pv_label + ' ' + wrapper.fill(sys.version))
+
+ max_len = 1
+
+ for i in sorted(installed_packages):
+ if len(i) > max_len:
+ max_len = len(i)
+
# =============================================================================
if __name__ == "__main__":