from . import __version__ as __pkg_version__
+from .errors import BaseHookError, UncriticalHookError
+
from .module_info import ModuleInfo
from .module_list import ModuleInfoDict
from .puppetfile import PuppetfileError, Puppetfile
+from .forge.mod_dict import ForgeModuleDict
+
from .xlate import __module_dir__ as __xlate_module_dir__
from .xlate import __base_dir__ as __xlate_base_dir__
from .xlate import __mo_file__ as __xlate_mo_file__
ngettext = XLATOR.ngettext
-# =============================================================================
-class BaseHookError(FbAppError):
- """
- Base error class useable by all descendand objects.
- """
-
- pass
-
-
-# =============================================================================
-class UncriticalHookError(BaseHookError):
- """
- Class for uncritical errors, which could and should be caught.
- """
-
- pass
-
-
# =============================================================================
class DataDirOptionAction(argparse.Action, UncriticalHookError):
self._output_type = self.default_output_type
self._mime_type = self.default_mime_type
+ self.environments = []
+ self.env_modules = {}
+ self.forge_modules = None
+
self._mime_type = self.valid_output_types[self.output_type]
self.data = None
lh_file.setFormatter(formatter)
root_log.addHandler(lh_file)
+ # -------------------------------------------------------------------------
+ def init_forge_module_dict(self):
+
+ self.forge_modules = ForgeModuleDict(
+ appname=self.appname, verbose=self.verbose, base_dir=self.base_dir,
+ data_dir=self.data_dir, simulate=self.simulate)
+
# -------------------------------------------------------------------------
def print_err(self, *objects, sep=' ', end='\n', file=sys.stderr.buffer, flush=True):
self.print_out(*objects, sep=sep, end=end, file=file, flush=flush)
return True
+ # -------------------------------------------------------------------------
+ def init_puppet_environments(self):
+
+ LOG.debug(_("Collecting all Puppet environments in {!r}.").format(
+ str(self.puppet_root_env_dir)))
+
+ self.environments = []
+ self.env_modules = {}
+
+ dirs = self.puppet_root_env_dir.glob('*')
+ for path in dirs:
+ if path.is_dir():
+ env = path.name
+ self.environments.append(env)
+ self.env_modules[env] = ModuleInfoDict(
+ appname=self.appname, verbose=self.verbose, base_dir=self.base_dir,
+ sort_by_name=self.sort_by_name)
+
+ self.environments.sort(key=str.lower)
+ if self.verbose > 1:
+ LOG.debug("Found Puppet environments:\n{}".format(pp(self.environments)))
+ if not self.environments:
+ raise BaseHookError(
+ _("Did not found environments in {!r}.").format(self.puppet_root_env_dir))
+
# =============================================================================
if __name__ == "__main__":
from . import __version__
-from .base_app import BaseHookError, BaseHookApp
+from .errors import BaseHookError
+from .base_app import BaseHookApp
from .module_info import ModuleInfo
from .module_meta_info import ModuleMetadata
from .forge.mod_info import ForgeModuleInfo
-from .forge.mod_dict import ForgeModuleDict
+# from .forge.mod_dict import ForgeModuleDict
from .xlate import XLATOR
def __init__(self, appname=None, base_dir=None, verbose=0, version=__version__):
"""Constructor."""
- self.environments = []
- self.env_modules = {}
- self.forge_modules = None
self.forge_uri = self.default_forge_uri
self.http_timeout = self.default_http_timeout
- self.modules = None
self._do_forge = True
description = _(
appname=appname, base_dir=base_dir, verbose=verbose,
version=version, description=description)
- self._init_forge_module_dict()
+ self.init_forge_module_dict()
# -----------------------------------------------------------
@property
help=_("Don't get module information from Puppet forge."),
)
- # -------------------------------------------------------------------------
- def _init_forge_module_dict(self):
-
- self.forge_modules = ForgeModuleDict(
- appname=self.appname, verbose=self.verbose, base_dir=self.base_dir,
- data_dir=self.data_dir, simulate=self.simulate)
-
# -------------------------------------------------------------------------
def perform_arg_parser(self):
self.print_out(msg)
modules_done = []
- self._init_forge_module_dict()
+ self.init_forge_module_dict()
for env in self.environments:
return ModuleInfo.init_from_json(
json_data, env, appname=self.appname, verbose=self.verbose, base_dir=self.base_dir)
- # -------------------------------------------------------------------------
- def init_puppet_environments(self):
-
- LOG.debug(_("Collecting all Puppet environments in {!r}.").format(
- str(self.puppet_root_env_dir)))
- pattern = os.path.join(str(self.puppet_root_env_dir), '*')
- dirs = glob.glob(pattern)
-
- self.environments = []
- self.env_modules = {}
- for rpath in dirs:
- path = pathlib.Path(rpath)
- if path.is_dir():
- env = path.name
- self.environments.append(env)
- self.env_modules[env] = ModuleInfoDict(
- appname=self.appname, verbose=self.verbose, base_dir=self.base_dir,
- sort_by_name=self.sort_by_name)
-
- self.environments.sort(key=str.lower)
- if self.verbose > 1:
- LOG.debug("Found Puppet environments:\n{}".format(pp(self.environments)))
- if not self.environments:
- raise GetForgeModulesError(
- _("Did not found environments in {!r}.").format(self.puppet_root_env_dir))
-
- # -------------------------------------------------------------------------
- def write_cache_file(self):
-
- output_file = self.cachefile
- tmp_file = pathlib.Path(str(output_file) + '.new')
- fd = None
-
- LOG.debug("Sorting found modules ...")
-
- data = []
- for full_name in self.modules.keys():
- module_info = self.modules[full_name]
- data.append(module_info.to_data())
- data.sort(key=itemgetter('name', 'vendor'))
- dump = yaml.dump(data, default_flow_style=False)
-
- LOG.info(_("Trying to open {!r} exclusive ...").format(tmp_file))
-
- if self.simulate:
- LOG.info(_("Simulation mode, cache file will not be written."))
- return
-
- try:
- fd = os.open(str(tmp_file), os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o644)
- fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
- except OSError as e:
- msg = _("Error on creating file {f!r}: {e}").format(f=tmp_file, e=e)
- if e.errno == errno.EEXIST:
- LOG.warn(msg)
- return None
- else:
- error_tuple = sys.exc_info()
- reraise(GetForgeModulesError, msg, error_tuple[2])
-
- try:
- os.write(fd, to_bytes('---\n'))
- os.write(fd, to_bytes(dump))
- finally:
- os.close(fd)
-
- if output_file.exists() and not os.geteuid():
- fstat = output_file.stat()
- user = fstat.st_uid
- try:
- user = pwd.getpwuid(fstat.st_uid).pw_name
- except KeyError:
- user = fstat.st_uid
- group = fstat.st_gid
- try:
- group = grp.getgrgid(fstat.st_gid).gr_name
- except KeyError:
- group = fstat.st_gid
- LOG.debug("Chowning {f!r} to {u}:{g} ...".format(
- f=tmp_file, u=user, g=group))
- os.chown(str(tmp_file), fstat.st_uid, fstat.st_gid)
-
- LOG.info(_("Renaming {src!r} => {tgt!r}.").format(
- src=str(tmp_file), tgt=str(output_file)))
- tmp_file.rename(output_file)
-
# =============================================================================