# -*- coding: utf-8 -*-
"""
-@summary: An application module for checking all DN-like attributes in a LDAP instance
+@summary: An application module for checking all DN-like attributes in a LDAP instance.
@author: Frank Brehm
@contact: frank.brehm@pixelpark.com
from __future__ import absolute_import
# Standard modules
-import re
import logging
+import re
import sys
-
from functools import cmp_to_key
# Third party modules
from fb_tools.collections import CIDict, CIStringSet
from fb_tools.xlate import format_list
-from .. import pp
-
-from ..xlate import XLATOR
-
-from ..config.ldap import LdapConfiguration
-
+from .ldap import BaseLdapApplication
# from .ldap import LdapAppError, FatalLDAPError
from .ldap import LdapAppError
-from .ldap import BaseLdapApplication
+from .. import pp
+from ..config.ldap import LdapConfiguration
+from ..xlate import XLATOR
-__version__ = '0.3.4'
+__version__ = '0.3.5'
LOG = logging.getLogger(__name__)
_ = XLATOR.gettext
# -------------------------------------------------------------------------
def __init__(self, appname=None, base_dir=None):
-
+ """Initialize the CheckLdapDnAttributesApplication object."""
self.ldap = None
self.instance = None
self.connect_info = None
attr_list = format_list(self.check_attributes, do_repr=True)
desc = _(
- "Checking all attributes with a DN syntax ({alist}) in the given instance. "
- "The check is for the DN-syntax of the attributes and whether they are referencing "
- "to existing entries in LDAP.").format(alist=attr_list)
+ 'Checking all attributes with a DN syntax ({alist}) in the given instance. '
+ 'The check is for the DN-syntax of the attributes and whether they are referencing '
+ 'to existing entries in LDAP.').format(alist=attr_list)
super(CheckLdapDnAttributesApplication, self).__init__(
appname=appname, description=desc, base_dir=base_dir,
# -------------------------------------------------------------------------
def init_arg_parser(self):
-
- app_group = self.arg_parser.add_argument_group(_("Script options"))
+ """Initialze the argument parser with some special parameters."""
+ app_group = self.arg_parser.add_argument_group(_('Script options'))
app_group.add_argument(
'-E', '--export', dest='export', metavar=_('FILE'),
help=_(
- "Exportig the faulty entries and attributes into a YAML file, "
- "if there were found some of them."),
+ 'Exportig the faulty entries and attributes into a YAML file, '
+ 'if there were found some of them.'),
)
super(CheckLdapDnAttributesApplication, self).init_arg_parser()
# -------------------------------------------------------------------------
def post_init(self):
"""Execute some actions after initialising."""
-
super(CheckLdapDnAttributesApplication, self).post_init()
self.export_file = getattr(self.args, 'export', None)
if self.export_file:
with open(self.export_file, 'wt', encoding='utf-8', errors='surrogateescape') as fh:
fh.write('---\n\n')
- LOG.debug(_("Created export file {!r}.").format(self.export_file))
+ LOG.debug(_('Created export file {!r}.').format(self.export_file))
msg = _(
- "Start checking all DN-like attributes in in LDAP instance {inst!r} "
- "({url}) ...").format(inst=self.instance, url=ldap_url)
+ 'Start checking all DN-like attributes in in LDAP instance {inst!r} '
+ '({url}) ...').format(inst=self.instance, url=ldap_url)
LOG.debug(msg)
self.get_dns_to_check()
# -------------------------------------------------------------------------
def get_dns_to_check(self):
+ """
+ Collect DNs of all entries with attributes with DN-like syntax.
+ The collected DNs are added to self.all_check_dns.
+ """
ldap_filter = '(|' + ''.join(
map(lambda x: '({}=*)'.format(x), self.check_attributes)) + ')'
nr = len(self.all_check_dns)
if nr:
msg = ngettext(
- "Found one entry to check.",
- "Found {} entries to check.", nr).format(nr)
+ 'Found one entry to check.',
+ 'Found {} entries to check.', nr).format(nr)
else:
- msg = _("Found no to check.")
+ msg = _('Found no to check.')
LOG.debug(msg)
if self.verbose > 2:
- LOG.debug("Found entries to check:\n" + pp(self.all_check_dns.as_list()))
+ LOG.debug('Found entries to check:\n' + pp(self.all_check_dns.as_list()))
# -------------------------------------------------------------------------
def check_entries(self):
-
+ """Check sytax of DN-like attributes in collected entries."""
for dn in self.all_check_dns:
self.check_entry(dn)
for a_val in self.failed_entries[e_dn][attr]:
nr_attr += 1
msg = ngettext(
- "Got an inconsistent entry.", "Got {} inconsistent entries.", nr).format(nr)
+ 'Got an inconsistent entry.', 'Got {} inconsistent entries.', nr).format(nr)
LOG.error(msg)
msg = ngettext(
- "There is one inconsistent attribute.",
- "There are {} inconsistent attributes.", nr_attr).format(nr_attr)
+ 'There is one inconsistent attribute.',
+ 'There are {} inconsistent attributes.', nr_attr).format(nr_attr)
LOG.warn(msg)
if self.export_file:
- LOG.debug(_("Writing export file {!r} ...").format(self.export_file))
+ LOG.debug(_('Writing export file {!r} ...').format(self.export_file))
with open(
self.export_file, 'wt', encoding='utf-8', errors='surrogateescape') as fh:
print('---', file=fh)
self.print_errors(out=fh)
self.exit(5)
- msg = _("Did not found any inconsistent entries.")
+ msg = _('Did not found any inconsistent entries.')
LOG.info(msg)
self.exit(0)
# -------------------------------------------------------------------------
def print_errors(self, out=sys.stdout):
-
- sorted_dns = sorted(
- list(self.failed_entries.keys()), key=cmp_to_key(self.compare_ldap_dns))
+ """Print out results to STDOUT."""
+ sorted_dns = sorted(self.failed_entries.keys(), key=cmp_to_key(self.compare_ldap_dns))
for dn in sorted_dns:
entry = self.failed_entries[dn]
# -------------------------------------------------------------------------
def check_entry(self, dn):
-
+ """Check syntax of DN-like attributes of a particular LDAP entry."""
if self.verbose > 1:
- LOG.debug(_("Checking DN-like attributes of entry {!r} ...").format(dn))
+ LOG.debug(_('Checking DN-like attributes of entry {!r} ...').format(dn))
entry = self.get_entry(dn, self.instance, attributes=self.check_attributes)
attribs = self.normalized_attributes(entry)
if self.verbose > 2:
- LOG.debug(_("Got attributes:") + '\n' + pp(attribs.as_dict()))
+ LOG.debug(_('Got attributes:') + '\n' + pp(attribs.as_dict()))
for attrib in self.check_attributes:
if attrib in attribs:
# -------------------------------------------------------------------------
def check_ref_syntax(self, dn):
-
+ """Check syntax of a DN-like value."""
if self.re_ldap_dn.match(dn):
return True
return False
# -------------------------------------------------------------------------
def check_ref_existence(self, dn, inst):
-
+ """Check existence of referenced entry."""
if dn in self.checked_ref_dn:
return self.checked_ref_dn[dn]
# =============================================================================
-if __name__ == "__main__":
+if __name__ == '__main__':
pass
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
+@summary: A module for the application class for configuring named.
+
@author: Frank Brehm
@contact: frank.brehm@pixelpark.com
@copyright: © 2023 by Frank Brehm, Berlin
-@summary: A module for the application class for configuring named
"""
from __future__ import absolute_import
-import os
+import datetime
+import ipaddress
import logging
import logging.config
-import textwrap
+import os
+import pipes
import re
import shlex
-import datetime
+import shutil
import tempfile
+import textwrap
import time
-import shutil
-import pipes
-import ipaddress
-
-from subprocess import Popen, TimeoutExpired, PIPE
-
from pathlib import Path
+from subprocess import PIPE, Popen, TimeoutExpired
# Third party modules
-import six
-from pytz import timezone, UnknownTimeZoneError
-
-# Own modules
+from fb_tools.app import BaseApplication
from fb_tools.common import to_str
+from fb_tools.pidfile import PidFile, PidFileError
-from fb_tools.app import BaseApplication
+from pytz import UnknownTimeZoneError, timezone
-from fb_tools.pidfile import PidFileError, PidFile
+import six
+# Own modules
+from .pdns import PpPDNSAppError, PpPDNSApplication
from .. import __version__ as GLOBAL_VERSION
from .. import pp
-
-from .pdns import PpPDNSAppError, PpPDNSApplication
-
from ..config.dns_deploy_zones import DnsDeployZonesConfig
-
from ..xlate import XLATOR
-__version__ = '0.8.7'
+__version__ = '0.8.8'
LOG = logging.getLogger(__name__)
_ = XLATOR.gettext
# =============================================================================
class PpDeployZonesError(PpPDNSAppError):
+ """Special exception class to use in this module."""
+
pass
# =============================================================================
class PpDeployZonesApp(PpPDNSApplication):
"""
- Class for a application 'dns-deploy-zones' for configuring slaves
- of the BIND named daemon.
+ Class for a application 'dns-deploy-zones'.
+
+ It is intended for configuring slaves of the BIND named daemon.
"""
show_simulate_option = True
def __init__(
self, appname=None, base_dir=None, version=GLOBAL_VERSION,
cfg_class=DnsDeployZonesConfig):
-
+ """Initialize the PpDeployZonesApp object."""
self.zones = {}
self.pidfile = None
super(PpDeployZonesApp, self).__init__(
appname=appname, version=version, description=description, base_dir=base_dir,
- cfg_class=cfg_class, initialized=False, instance="public",
+ cfg_class=cfg_class, initialized=False, instance='public',
)
masters = []
# -------------------------------------------
@property
def cmd_named_checkconf(self):
- """The OS command for named-checkconf."""
-
+ """Return the OS command for named-checkconf."""
checkconf = DnsDeployZonesConfig.default_named_checkconf
if self.cfg:
checkconf = self.cfg.named_checkconf
# -------------------------------------------
@property
def cmd_named_reload(self):
- """The OS command to reload the BIND nameserver."""
-
+ """Return the OS command to reload the BIND nameserver."""
rndc = DnsDeployZonesConfig.default_rndc
if self.cfg:
rndc = self.cfg.rndc
- return "{} reload".format(rndc)
+ return '{} reload'.format(rndc)
# -------------------------------------------
@property
def cmd_named_status(self):
- """The OS command to show the status of the BIND nameserver service."""
-
+ """Return the OS command to show the status of the BIND nameserver service."""
systemctl = DnsDeployZonesConfig.default_systemctl
if self.cfg:
systemctl = self.cfg.systemctl
- return "{} status named.service".format(systemctl)
+ return '{} status named.service'.format(systemctl)
# -------------------------------------------
@property
def cmd_named_start(self):
- """The OS command to start the BIND nameserver service."""
-
+ """Return the OS command to start the BIND nameserver service."""
systemctl = DnsDeployZonesConfig.default_systemctl
if self.cfg:
systemctl = self.cfg.systemctl
- return "{} start named.service".format(systemctl)
+ return '{} start named.service'.format(systemctl)
# -------------------------------------------
@property
def cmd_named_restart(self):
- """The OS command to restart the BIND nameserver service."""
-
+ """Return the OS command to restart the BIND nameserver service."""
systemctl = DnsDeployZonesConfig.default_systemctl
if self.cfg:
systemctl = self.cfg.systemctl
- return "{} restart named.service".format(systemctl)
+ return '{} restart named.service'.format(systemctl)
# -------------------------------------------
@property
def named_zones_cfg_file(self):
- """The file for configuration of all own zones."""
-
+ """Return the file for configuration of all own zones."""
conf_dir = DnsDeployZonesConfig.default_named_conf_dir
zones_cfg_file = DnsDeployZonesConfig.default_named_zones_cfg_file
if self.cfg:
# -------------------------------------------
@property
def named_slavedir_rel(self):
- """The directory for zone files of slave zones."""
-
+ """Return the directory for zone files of slave zones."""
if self.cfg:
return self.cfg.named_slavedir
return DnsDeployZonesConfig.default_named_slavedir
# -------------------------------------------
@property
def named_basedir(self):
- """The base directory of named, where all volatile data are stored."""
-
+ """Return the base directory of named, where all volatile data are stored."""
if self.cfg:
return self.cfg.named_basedir
return DnsDeployZonesConfig.default_named_basedir
# -------------------------------------------
@property
def named_slavedir_abs(self):
- """The directory for zone files of slave zones."""
-
+ """Return the directory for zone files of slave zones."""
return (self.named_basedir / self.named_slavedir_rel).resolve()
# -------------------------------------------------------------------------
def as_dict(self, short=True):
"""
- Transforms the elements of the object into a dict
+ Transform the elements of the object into a dict.
@param short: don't include local properties in resulting dict.
@type short: bool
@return: structure as dict
@rtype: dict
"""
-
res = super(PpDeployZonesApp, self).as_dict(short=short)
res['named_slavedir_abs'] = self.named_slavedir_abs
# -------------------------------------------------------------------------
def init_arg_parser(self):
-
- deploy_group = self.arg_parser.add_argument_group(_("Options for {}").format(
+ """Initialize command line parser with some special options."""
+ deploy_group = self.arg_parser.add_argument_group(_('Options for {}').format(
self.appname))
deploy_group.add_argument(
- '-B', '--backup', dest="keep_backup", action='store_true',
- help=_("Keep a backup file for each changed configuration file."),
+ '-B', '--backup', dest='keep_backup', action='store_true',
+ help=_('Keep a backup file for each changed configuration file.'),
)
deploy_group.add_argument(
'-K', '--keep-tempdir', dest='keep_tempdir', action='store_true',
help=_(
- "Keeping the temporary directory instead of removing it at the end "
- "(e.g. for debugging purposes)"),
+ 'Keeping the temporary directory instead of removing it at the end '
+ '(e.g. for debugging purposes)'),
)
super(PpDeployZonesApp, self).init_arg_parser()
# -------------------------------------------------------------------------
def perform_arg_parser(self):
- """
- Public available method to execute some actions after parsing
- the command line parameters.
- """
-
+ """Execute some actions after parsing the command line parameters."""
super(PpDeployZonesApp, self).perform_arg_parser()
if self.args.keep_tempdir:
# -------------------------------------------------------------------------
def post_init(self):
-
+ """Execute some actions at the end of initialization."""
if not self.quiet:
print('')
- LOG.debug(_("Post init phase."))
+ LOG.debug(_('Post init phase.'))
super(PpDeployZonesApp, self).post_init()
- LOG.debug(_("My own post init phase."))
+ LOG.debug(_('My own post init phase.'))
cmd_namedcheckconf = self.get_command('named-checkconf', resolve=True)
if not cmd_namedcheckconf:
try:
self.local_tz = timezone(self.local_tz_name)
except UnknownTimeZoneError:
- LOG.error(_("Unknown time zone: {!r}.").format(self.local_tz_name))
+ LOG.error(_('Unknown time zone: {!r}.').format(self.local_tz_name))
self.exit(6)
# -------------------------------------------------------------------------
def current_timestamp(self):
-
+ """Return formatted current timestamp."""
if self.local_tz:
return datetime.datetime.now(self.local_tz).strftime('%Y-%m-%d %H:%M:%S %Z')
return datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# -------------------------------------------------------------------------
def pre_run(self):
- """
- Dummy function to run before the main routine.
- Could be overwritten by descendant classes.
-
- """
-
+ """Execute some actions before starting the underlaying stuff."""
my_uid = os.geteuid()
if my_uid:
- msg = _("You must be root to execute this script.")
+ msg = _('You must be root to execute this script.')
if self.simulate:
- msg += ' ' + _("But in simulation mode we are continuing nevertheless.")
+ msg += ' ' + _('But in simulation mode we are continuing nevertheless.')
LOG.warn(msg)
time.sleep(1)
else:
LOG.error(msg)
self.exit(1)
- LOG.info(_("Starting: {}").format(self.current_timestamp()))
+ LOG.info(_('Starting: {}').format(self.current_timestamp()))
try:
super(PpDeployZonesApp, self).pre_run()
if self.cfg.pdns_instance == 'global':
LOG.error(_(
- "Using the global DNS master is not supported, "
+ 'Using the global DNS master is not supported, '
"please use 'local' or 'public'"))
self.exit(1)
try:
self.pidfile.create()
except PidFileError as e:
- LOG.error(_("Could not occupy pidfile: {}").format(e))
+ LOG.error(_('Could not occupy pidfile: {}').format(e))
self.exit(7)
return
finally:
self.cleanup()
self.pidfile = None
- LOG.info(_("Ending: {}").format(self.current_timestamp()))
+ LOG.info(_('Ending: {}').format(self.current_timestamp()))
# -------------------------------------------------------------------------
def cleanup(self):
-
- LOG.info(_("Cleaning up ..."))
+ """Clean up all temporary stuff after a unexpected exit."""
+ LOG.info(_('Cleaning up ...'))
for tgt_file in self.moved_files.keys():
backup_file = self.moved_files[tgt_file]
- LOG.debug(_("Searching for {!r}.").format(backup_file))
+ LOG.debug(_('Searching for {!r}.').format(backup_file))
if backup_file.exists():
if self.keep_backup:
- LOG.info(_("Keep existing backup file {!r}.").format(str(backup_file)))
+ LOG.info(_('Keep existing backup file {!r}.').format(str(backup_file)))
else:
- LOG.info(_("Removing {!r} ...").format(str(backup_file)))
+ LOG.info(_('Removing {!r} ...').format(str(backup_file)))
if not self.simulate:
backup_file.unlink()
# -----------------------
def emit_rm_err(function, path, excinfo):
- LOG.error(_("Error removing {p!r} - {c}: {e}").format(
+ LOG.error(_('Error removing {p!r} - {c}: {e}').format(
p=str(path), c=excinfo[1].__class__.__name__, e=excinfo[1]))
if self.tempdir:
if self.keep_tempdir:
msg = _(
- "Temporary directory {!r} will not be removed. "
+ 'Temporary directory {!r} will not be removed. '
"It's on yours to remove it manually.").format(str(self.tempdir))
LOG.warn(msg)
else:
- LOG.debug(_("Destroying temporary directory {!r} ...").format(str(self.tempdir)))
+ LOG.debug(_('Destroying temporary directory {!r} ...').format(str(self.tempdir)))
shutil.rmtree(str(self.tempdir), False, emit_rm_err)
self.tempdir = None
# -------------------------------------------------------------------------
def init_temp_objects(self):
"""Init temporary objects and properties."""
-
self.tempdir = Path(tempfile.mkdtemp(prefix=(self.appname + '.'), suffix='.tmp.d'))
- LOG.debug(_("Temporary directory: {!r}.").format(str(self.tempdir)))
+ LOG.debug(_('Temporary directory: {!r}.').format(str(self.tempdir)))
self.temp_zones_cfg_file = self.tempdir / self.cfg.named_zones_cfg_file
if self.verbose > 1:
- LOG.debug(_("Temporary zones conf: {!r}").format(str(self.temp_zones_cfg_file)))
+ LOG.debug(_('Temporary zones conf: {!r}').format(str(self.temp_zones_cfg_file)))
- # -------------------------------------------------------------------------
+ # --------------------------''---------------------------------------------
def get_named_keys(self):
-
- LOG.info(_("Trying to get all keys from named.conf ..."))
+ """Get all keys from output of named-checkconf -s'."""
+ LOG.info(_('Trying to get all keys from named.conf ...'))
cmd = shlex.split(self.cmd_named_checkconf)
cmd.append('-p')
cmd_str = ' '.join(map(lambda x: pipes.quote(x), cmd))
- LOG.debug(_("Executing: {}").format(cmd_str))
+ LOG.debug(_('Executing: {}').format(cmd_str))
result = super(BaseApplication, self).run(
cmd, stdout=PIPE, stderr=PIPE, timeout=10, check=True, may_simulate=False)
if self.verbose > 3:
- LOG.debug(_("Result:") + '\n' + str(result))
+ LOG.debug(_('Result:') + '\n' + str(result))
config = result.stdout
key_name = match[1]
key_data = match[2].strip()
if self.verbose > 2:
- LOG.debug("Found key {!r}:".format(key_name) + '\n' + key_data)
+ LOG.debug('Found key {!r}:'.format(key_name) + '\n' + key_data)
algorithm = None
secret = None
if self.verbose > 1:
if self.named_keys:
- LOG.debug(_("Found named keys:") + '\n' + pp(self.named_keys))
+ LOG.debug(_('Found named keys:') + '\n' + pp(self.named_keys))
else:
- LOG.debug(_("Found named keys:") + ' ' + _('None'))
+ LOG.debug(_('Found named keys:') + ' ' + _('None'))
# -------------------------------------------------------------------------
def generate_slave_cfg_file(self):
-
- LOG.info(_("Generating {} ...").format(self.cfg.named_zones_cfg_file))
+ """Generate configuration file for slave zones."""
+ LOG.info(_('Generating {} ...').format(self.cfg.named_zones_cfg_file))
cur_date = datetime.datetime.now().isoformat(' ')
content += '\n' + zone_config
if self.servers:
- LOG.debug(_("Collected server configuration:") + '\n' + pp(self.servers))
+ LOG.debug(_('Collected server configuration:') + '\n' + pp(self.servers))
else:
- LOG.debug(_("Collected server configuration:") + ' ' + _('None'))
+ LOG.debug(_('Collected server configuration:') + ' ' + _('None'))
if self.servers:
for server in sorted(self.servers.keys()):
if self.verbose > 2:
LOG.debug(
- _("Generated file {!r}:").format(
+ _('Generated file {!r}:').format(
str(self.temp_zones_cfg_file)) + '\n' + content.strip())
# -------------------------------------------------------------------------
def generate_zone_config(self, zone_name):
-
+ """Generate configuration block for the given Nameserver."""
zone = self.zones[zone_name]
zone.update()
prefix = self._get_ipv4_prefix(match.group(1))
if prefix:
if prefix == '127.0.0':
- LOG.debug(_("Pure local zone {!r} will not be considered.").format(prefix))
+ LOG.debug(_('Pure local zone {!r} will not be considered.').format(prefix))
return ''
canonical_name = 'rev.' + prefix
else:
for key_id in zone.master_tsig_key_ids:
if key_id not in self.named_keys:
- msg = _("Key {k!r} for zone {z!r} not found in named configuration.").format(
+ msg = _('Key {k!r} for zone {z!r} not found in named configuration.').format(
k=key_id, z=show_name)
raise PpDeployZonesError(msg)
if t:
tuples.insert(0, t)
if self.verbose > 2:
- LOG.debug(_("Got IPv4 tuples: {}").format(pp(tuples)))
+ LOG.debug(_('Got IPv4 tuples: {}').format(pp(tuples)))
return '.'.join(tuples)
# -------------------------------------------------------------------------
del tuples[0:4]
if self.verbose > 2:
- LOG.debug(_("Got IPv6 tokens: {}").format(pp(tokens)))
+ LOG.debug(_('Got IPv6 tokens: {}').format(pp(tokens)))
return ':'.join(tokens)
# -------------------------------------------------------------------------
def compare_files(self):
-
- LOG.info(_("Comparing generated files with existing ones."))
+ """Compare generated files with existing ones.."""
+ LOG.info(_('Comparing generated files with existing ones.'))
if not self.files_equal_content(self.temp_zones_cfg_file, self.named_zones_cfg_file):
self.reload_necessary = True
self.files2replace[self.named_zones_cfg_file] = self.temp_zones_cfg_file
if self.verbose > 1:
- LOG.debug(_("Files to replace:") + '\n' + pp(self.files2replace))
+ LOG.debug(_('Files to replace:') + '\n' + pp(self.files2replace))
# -------------------------------------------------------------------------
def files_equal_content(self, file_src, file_tgt):
-
+ """Compare two configuration files by ignoring whitespaces and comments."""
if not file_src:
- raise PpDeployZonesError(_("Source file not defined."))
+ raise PpDeployZonesError(_('Source file not defined.'))
if not file_tgt:
- raise PpDeployZonesError(_("Target file not defined."))
+ raise PpDeployZonesError(_('Target file not defined.'))
- LOG.debug(_("Comparing {one!r} with {two!r} ...").format(
+ LOG.debug(_('Comparing {one!r} with {two!r} ...').format(
one=str(file_src), two=str(file_tgt)))
if not file_src.exists():
- msg = _("{what} {f!r} does not exists.").format(
- what=_("Source file"), f=str(file_src))
+ msg = _('{what} {f!r} does not exists.').format(
+ what=_('Source file'), f=str(file_src))
raise PpDeployZonesError(msg)
if not file_src.is_file():
- msg = _("{what} {f!r} is not a regular file.").format(
- what=_("Source file"), f=str(file_src))
+ msg = _('{what} {f!r} is not a regular file.').format(
+ what=_('Source file'), f=str(file_src))
raise PpDeployZonesError(msg)
if not file_tgt.exists():
- msg = _("{what} {f!r} does not exists.").format(
- what=_("Target file"), f=str(file_tgt))
+ msg = _('{what} {f!r} does not exists.').format(
+ what=_('Target file'), f=str(file_tgt))
LOG.debug(msg)
return False
if not file_tgt.is_file():
- msg = _("{what} {f!r} is not a regular file.").format(
- what=_("Target file"), f=str(file_tgt))
+ msg = _('{what} {f!r} is not a regular file.').format(
+ what=_('Target file'), f=str(file_tgt))
raise PpDeployZonesError(msg)
# Reading source file
content_src = ''
if self.verbose > 2:
- LOG.debug(_("Reading {!r} ...").format(str(file_src)))
+ LOG.debug(_('Reading {!r} ...').format(str(file_src)))
content_src = file_src.read_text(**self.open_args)
lines_str_src = self.re_block_comment.sub('', content_src)
lines_str_src = self.re_line_comment.sub('', lines_str_src)
if line:
lines_src.append(line)
if self.verbose > 3:
- msg = _("Cleaned version of {!r}:").format(str(file_src))
+ msg = _('Cleaned version of {!r}:').format(str(file_src))
msg += '\n' + '\n'.join(lines_src)
LOG.debug(msg)
# Reading target file
content_tgt = ''
if self.verbose > 2:
- LOG.debug(_("Reading {!r} ...").format(str(file_tgt)))
+ LOG.debug(_('Reading {!r} ...').format(str(file_tgt)))
content_tgt = file_tgt.read_text(**self.open_args)
lines_str_tgt = self.re_block_comment.sub('', content_tgt)
lines_str_tgt = self.re_line_comment.sub('', lines_str_tgt)
if line:
lines_tgt.append(line)
if self.verbose > 3:
- msg = _("Cleaned version of {!r}:").format(str(file_tgt))
+ msg = _('Cleaned version of {!r}:').format(str(file_tgt))
msg += '\n' + '\n'.join(lines_tgt)
LOG.debug(msg)
if len(lines_src) != len(lines_tgt):
LOG.debug(_(
- "Source file {sf!r} has different number essential lines ({sl}) than "
- "the target file {tf!r} ({tl} lines).").format(
+ 'Source file {sf!r} has different number essential lines ({sl}) than '
+ 'the target file {tf!r} ({tl} lines).').format(
sf=str(file_src), sl=len(lines_src), tf=str(file_tgt), tl=len(lines_tgt)))
return False
while i < len(lines_src):
if lines_src[i] != lines_tgt[i]:
LOG.debug(_(
- "Source file {sf!r} has a different content than "
- "the target file {tf!r}.").format(sf=str(file_src), tf=str(file_tgt)))
+ 'Source file {sf!r} has a different content than '
+ 'the target file {tf!r}.').format(sf=str(file_src), tf=str(file_tgt)))
return False
i += 1
# -------------------------------------------------------------------------
def replace_configfiles(self):
-
+ """Replace the configuration file with the modified one."""
if not self.files2replace:
- LOG.debug(_("No replacement of any config files necessary."))
+ LOG.debug(_('No replacement of any config files necessary.'))
return
- LOG.debug(_("Start replacing of config files ..."))
+ LOG.debug(_('Start replacing of config files ...'))
for tgt_file in self.files2replace.keys():
if tgt_file.exists():
self.moved_files[tgt_file] = backup_file
- LOG.info(_("Copying {frm!r} => {to!r} ...").format(
+ LOG.info(_('Copying {frm!r} => {to!r} ...').format(
frm=str(tgt_file), to=str(backup_file)))
if not self.simulate:
shutil.copy2(str(tgt_file), str(backup_file))
if self.verbose > 1:
- LOG.debug(_("All backuped config files:") + '\n' + pp(self.moved_files))
+ LOG.debug(_('All backuped config files:') + '\n' + pp(self.moved_files))
for tgt_file in self.files2replace.keys():
src_file = self.files2replace[tgt_file]
- LOG.info(_("Copying {frm!r} => {to!r} ...").format(
+ LOG.info(_('Copying {frm!r} => {to!r} ...').format(
frm=str(src_file), to=str(tgt_file)))
if not self.simulate:
shutil.copy2(str(src_file), str(tgt_file))
# -------------------------------------------------------------------------
def restore_configfiles(self):
-
- LOG.error(_("Restoring of original config files because of an exception."))
+ """Restore original config files because of an exception."""
+ LOG.error(_('Restoring of original config files because of an exception.'))
for tgt_file in self.moved_files.keys():
backup_file = self.moved_files[tgt_file]
- LOG.info(_("Moving {frm!r} => {to!r} ...").format(
+ LOG.info(_('Moving {frm!r} => {to!r} ...').format(
frm=str(backup_file), to=str(tgt_file)))
if not self.simulate:
if backup_file.exists():
backup_file.rename(tgt_file)
else:
- LOG.error(_("Could not find backup file {!r}.").format(str(backup_file)))
+ LOG.error(_('Could not find backup file {!r}.').format(str(backup_file)))
# -------------------------------------------------------------------------
def check_namedconf(self):
-
- LOG.info(_("Checking syntax correctness of named.conf ..."))
+ """Chack the syntax correctness of named.conf."""
+ LOG.info(_('Checking syntax correctness of named.conf ...'))
cmd = shlex.split(self.cmd_named_checkconf)
if self.verbose > 2:
cmd.append('-p')
cmd_str = ' '.join(map(lambda x: pipes.quote(x), cmd))
- LOG.debug(_("Executing: {}").format(cmd_str))
+ LOG.debug(_('Executing: {}').format(cmd_str))
result = super(BaseApplication, self).run(
cmd, stdout=PIPE, stderr=PIPE, timeout=10, check=False, may_simulate=False)
if self.verbose > 2:
- LOG.debug(_("Result:") + '\n' + str(result))
+ LOG.debug(_('Result:') + '\n' + str(result))
if result.returncode:
return False
# -------------------------------------------------------------------------
def apply_config(self):
-
+ """Apply the changed config of the BIND nameserver."""
if not self.reload_necessary and not self.restart_necessary:
- LOG.info(_("Reload or restart of named is not necessary."))
+ LOG.info(_('Reload or restart of named is not necessary.'))
return
running = self.named_running()
if not running:
- LOG.warn(_("Named is not running, please start it manually."))
+ LOG.warn(_('Named is not running, please start it manually.'))
return
if self.restart_necessary:
# -------------------------------------------------------------------------
def named_running(self):
-
- LOG.debug(_("Checking, whether named is running ..."))
+ """Chack the running state of the BIND nameserver."""
+ LOG.debug(_('Checking, whether named is running ...'))
cmd = shlex.split(self.cmd_named_status)
cmd_str = ' '.join(map(lambda x: pipes.quote(x), cmd))
- LOG.debug(_("Executing: {}").format(cmd_str))
+ LOG.debug(_('Executing: {}').format(cmd_str))
std_out = None
std_err = None
std_out, std_err = proc.communicate()
ret_val = proc.wait()
- LOG.debug(_("Return value: {!r}").format(ret_val))
+ LOG.debug(_('Return value: {!r}').format(ret_val))
if std_out and std_out.strip():
- LOG.debug(_("Output on {}").format('STDOUT') + '\n' + to_str(std_out.strip()))
+ LOG.debug(_('Output on {}').format('STDOUT') + '\n' + to_str(std_out.strip()))
if std_err and std_err.strip():
- LOG.warn(_("Output on {}").format('STDERR') + ' ' + to_str(std_err.strip()))
+ LOG.warn(_('Output on {}').format('STDERR') + ' ' + to_str(std_err.strip()))
if ret_val:
return False
# -------------------------------------------------------------------------
def start_named(self):
-
- LOG.info(_("Starting {} ...").format('named'))
+ """Start the BIND nameserver."""
+ LOG.info(_('Starting {} ...').format('named'))
cmd = shlex.split(self.cmd_named_start)
cmd_str = ' '.join(map(lambda x: pipes.quote(x), cmd))
- LOG.debug(_("Executing: {}").format(cmd_str))
+ LOG.debug(_('Executing: {}').format(cmd_str))
if self.simulate:
return
std_out, std_err = proc.communicate()
ret_val = proc.wait()
- LOG.debug(_("Return value: {!r}").format(ret_val))
+ LOG.debug(_('Return value: {!r}').format(ret_val))
if std_out and std_out.strip():
- LOG.debug(_("Output on {}").format('STDOUT') + '\n' + to_str(std_out.strip()))
+ LOG.debug(_('Output on {}').format('STDOUT') + '\n' + to_str(std_out.strip()))
if std_err and std_err.strip():
- LOG.error(_("Output on {}").format('STDERR') + ' ' + to_str(std_err.strip()))
+ LOG.error(_('Output on {}').format('STDERR') + ' ' + to_str(std_err.strip()))
if ret_val:
return False
# -------------------------------------------------------------------------
def restart_named(self):
-
- LOG.info(_("Restarting {} ...").format('named'))
+ """Restart the BIND nameserver."""
+ LOG.info(_('Restarting {} ...').format('named'))
cmd = shlex.split(self.cmd_named_restart)
cmd_str = ' '.join(map(lambda x: pipes.quote(x), cmd))
- LOG.debug(_("Executing: {}").format(cmd_str))
+ LOG.debug(_('Executing: {}').format(cmd_str))
if self.simulate:
return
std_out, std_err = proc.communicate()
ret_val = proc.wait()
- LOG.debug(_("Return value: {!r}").format(ret_val))
+ LOG.debug(_('Return value: {!r}').format(ret_val))
if std_out and std_out.strip():
- LOG.debug(_("Output on {}").format('STDOUT') + '\n' + to_str(std_out.strip()))
+ LOG.debug(_('Output on {}').format('STDOUT') + '\n' + to_str(std_out.strip()))
if std_err and std_err.strip():
- LOG.error(_("Output on {}").format('STDERR') + ' ' + to_str(std_err.strip()))
+ LOG.error(_('Output on {}').format('STDERR') + ' ' + to_str(std_err.strip()))
if ret_val:
return False
# -------------------------------------------------------------------------
def reload_named(self):
-
- LOG.info(_("Reloading {} ...").format('named'))
+ """Reload the BIND nameserver."""
+ LOG.info(_('Reloading {} ...').format('named'))
cmd = shlex.split(self.cmd_named_reload)
cmd_str = ' '.join(map(lambda x: pipes.quote(x), cmd))
- LOG.debug(_("Executing: {}").format(cmd_str))
+ LOG.debug(_('Executing: {}').format(cmd_str))
if self.simulate:
return
std_out, std_err = proc.communicate()
ret_val = proc.wait()
- LOG.debug(_("Return value: {!r}").format(ret_val))
+ LOG.debug(_('Return value: {!r}').format(ret_val))
if std_out and std_out.strip():
- LOG.debug(_("Output on {}").format('STDOUT') + '\n' + to_str(std_out.strip()))
+ LOG.debug(_('Output on {}').format('STDOUT') + '\n' + to_str(std_out.strip()))
if std_err and std_err.strip():
- LOG.error(_("Output on {}").format('STDERR') + ' ' + to_str(std_err.strip()))
+ LOG.error(_('Output on {}').format('STDERR') + ' ' + to_str(std_err.strip()))
if ret_val:
return False
# =============================================================================
-if __name__ == "__main__":
+if __name__ == '__main__':
pass