From 7e0d1a75ecd13eb689b1343b519a10d42455382a Mon Sep 17 00:00:00 2001 From: Frank Brehm Date: Thu, 25 May 2023 15:04:21 +0200 Subject: [PATCH] Make the linter happy with lib/pp_admintools/app/dns_deploy_zones.py --- lib/pp_admintools/app/__init__.py | 31 +- .../app/check_ldap_dn_attributes.py | 82 +++-- lib/pp_admintools/app/dns_deploy_zones.py | 320 ++++++++---------- setup.cfg | 2 +- 4 files changed, 205 insertions(+), 230 deletions(-) diff --git a/lib/pp_admintools/app/__init__.py b/lib/pp_admintools/app/__init__.py index 4a850a9..49c5579 100644 --- a/lib/pp_admintools/app/__init__.py +++ b/lib/pp_admintools/app/__init__.py @@ -1,9 +1,10 @@ # -*- coding: utf-8 -*- """ +@summary: A base module for all DPX application classes. + @author: Frank Brehm @contact: frank.brehm@pixelpark.com @copyright: © 2023 by Frank Brehm, Berlin -@summary: A base module for all DPX application classes """ from __future__ import absolute_import @@ -17,10 +18,9 @@ from fb_tools.errors import FbAppError from fb_tools.multi_config import BaseMultiConfig # Own modules -from .. import __version__ as GLOBAL_VERSION from .. import DEFAULT_CONFIG_DIR -from .. import DEFAULT_TERMINAL_WIDTH, DEFAULT_TERMINAL_HEIGHT - +from .. import DEFAULT_TERMINAL_HEIGHT, DEFAULT_TERMINAL_WIDTH +from .. import __version__ as GLOBAL_VERSION from ..xlate import XLATOR LOG = logging.getLogger(__name__) @@ -28,26 +28,26 @@ LOG = logging.getLogger(__name__) _ = XLATOR.gettext ngettext = XLATOR.ngettext -__version__ = '0.6.4' +__version__ = '0.6.5' # ============================================================================= class DPXAppError(FbAppError): - """ Base exception class for all exceptions in all LDAP using application classes.""" + """Base exception class for all exceptions in all LDAP using application classes.""" + pass # ============================================================================= class AbortAppError(DPXAppError): """Special exception class interrupting the application.""" + pass # ============================================================================= class BaseDPXApplication(FbConfigApplication): - """ - Base class for all DPX application classes. - """ + """Base class for all DPX application classes.""" show_assume_options = True show_console_timeout_option = True @@ -60,7 +60,7 @@ class BaseDPXApplication(FbConfigApplication): cfg_class=BaseMultiConfig, initialized=False, usage=None, description=None, argparse_epilog=None, argparse_prefix_chars='-', env_prefix=None, config_dir=DEFAULT_CONFIG_DIR): - + """Initialize the BaseDPXApplication object.""" super(BaseDPXApplication, self).__init__( appname=appname, verbose=verbose, version=version, base_dir=base_dir, description=description, cfg_class=cfg_class, initialized=False, @@ -96,22 +96,21 @@ class BaseDPXApplication(FbConfigApplication): # ------------------------------------------------------------------------- def post_init(self): """ - Method to execute before calling run(). Here could be done some - finishing actions after reading in commandline parameters, - configuration a.s.o. + Execute some actions before calling run(). + + Here could be done some finishing actions after reading in + commandline parameters, configuration a.s.o. This method could be overwritten by descendant classes, these methhods should allways include a call to post_init() of the parent class. - """ - self.initialized = False super(BaseDPXApplication, self).post_init() if self.logfile: - LOG.debug(_("Using logfile {!r}.").format(str(self.logfile))) + LOG.debug(_('Using logfile {!r}.').format(str(self.logfile))) else: LOG.debug(_("Don't using a logfile.")) diff --git a/lib/pp_admintools/app/check_ldap_dn_attributes.py b/lib/pp_admintools/app/check_ldap_dn_attributes.py index 35e045b..06bed7a 100644 --- a/lib/pp_admintools/app/check_ldap_dn_attributes.py +++ b/lib/pp_admintools/app/check_ldap_dn_attributes.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- """ -@summary: An application module for checking all DN-like attributes in a LDAP instance +@summary: An application module for checking all DN-like attributes in a LDAP instance. @author: Frank Brehm @contact: frank.brehm@pixelpark.com @@ -9,10 +9,9 @@ from __future__ import absolute_import # Standard modules -import re import logging +import re import sys - from functools import cmp_to_key # Third party modules @@ -21,17 +20,14 @@ from functools import cmp_to_key from fb_tools.collections import CIDict, CIStringSet from fb_tools.xlate import format_list -from .. import pp - -from ..xlate import XLATOR - -from ..config.ldap import LdapConfiguration - +from .ldap import BaseLdapApplication # from .ldap import LdapAppError, FatalLDAPError from .ldap import LdapAppError -from .ldap import BaseLdapApplication +from .. import pp +from ..config.ldap import LdapConfiguration +from ..xlate import XLATOR -__version__ = '0.3.4' +__version__ = '0.3.5' LOG = logging.getLogger(__name__) _ = XLATOR.gettext @@ -64,7 +60,7 @@ class CheckLdapDnAttributesApplication(BaseLdapApplication): # ------------------------------------------------------------------------- def __init__(self, appname=None, base_dir=None): - + """Initialize the CheckLdapDnAttributesApplication object.""" self.ldap = None self.instance = None self.connect_info = None @@ -80,9 +76,9 @@ class CheckLdapDnAttributesApplication(BaseLdapApplication): attr_list = format_list(self.check_attributes, do_repr=True) desc = _( - "Checking all attributes with a DN syntax ({alist}) in the given instance. " - "The check is for the DN-syntax of the attributes and whether they are referencing " - "to existing entries in LDAP.").format(alist=attr_list) + 'Checking all attributes with a DN syntax ({alist}) in the given instance. ' + 'The check is for the DN-syntax of the attributes and whether they are referencing ' + 'to existing entries in LDAP.').format(alist=attr_list) super(CheckLdapDnAttributesApplication, self).__init__( appname=appname, description=desc, base_dir=base_dir, @@ -97,14 +93,14 @@ class CheckLdapDnAttributesApplication(BaseLdapApplication): # ------------------------------------------------------------------------- def init_arg_parser(self): - - app_group = self.arg_parser.add_argument_group(_("Script options")) + """Initialze the argument parser with some special parameters.""" + app_group = self.arg_parser.add_argument_group(_('Script options')) app_group.add_argument( '-E', '--export', dest='export', metavar=_('FILE'), help=_( - "Exportig the faulty entries and attributes into a YAML file, " - "if there were found some of them."), + 'Exportig the faulty entries and attributes into a YAML file, ' + 'if there were found some of them.'), ) super(CheckLdapDnAttributesApplication, self).init_arg_parser() @@ -112,7 +108,6 @@ class CheckLdapDnAttributesApplication(BaseLdapApplication): # ------------------------------------------------------------------------- def post_init(self): """Execute some actions after initialising.""" - super(CheckLdapDnAttributesApplication, self).post_init() self.export_file = getattr(self.args, 'export', None) @@ -131,11 +126,11 @@ class CheckLdapDnAttributesApplication(BaseLdapApplication): if self.export_file: with open(self.export_file, 'wt', encoding='utf-8', errors='surrogateescape') as fh: fh.write('---\n\n') - LOG.debug(_("Created export file {!r}.").format(self.export_file)) + LOG.debug(_('Created export file {!r}.').format(self.export_file)) msg = _( - "Start checking all DN-like attributes in in LDAP instance {inst!r} " - "({url}) ...").format(inst=self.instance, url=ldap_url) + 'Start checking all DN-like attributes in in LDAP instance {inst!r} ' + '({url}) ...').format(inst=self.instance, url=ldap_url) LOG.debug(msg) self.get_dns_to_check() @@ -143,7 +138,11 @@ class CheckLdapDnAttributesApplication(BaseLdapApplication): # ------------------------------------------------------------------------- def get_dns_to_check(self): + """ + Collect DNs of all entries with attributes with DN-like syntax. + The collected DNs are added to self.all_check_dns. + """ ldap_filter = '(|' + ''.join( map(lambda x: '({}=*)'.format(x), self.check_attributes)) + ')' @@ -154,18 +153,18 @@ class CheckLdapDnAttributesApplication(BaseLdapApplication): nr = len(self.all_check_dns) if nr: msg = ngettext( - "Found one entry to check.", - "Found {} entries to check.", nr).format(nr) + 'Found one entry to check.', + 'Found {} entries to check.', nr).format(nr) else: - msg = _("Found no to check.") + msg = _('Found no to check.') LOG.debug(msg) if self.verbose > 2: - LOG.debug("Found entries to check:\n" + pp(self.all_check_dns.as_list())) + LOG.debug('Found entries to check:\n' + pp(self.all_check_dns.as_list())) # ------------------------------------------------------------------------- def check_entries(self): - + """Check sytax of DN-like attributes in collected entries.""" for dn in self.all_check_dns: self.check_entry(dn) @@ -178,29 +177,28 @@ class CheckLdapDnAttributesApplication(BaseLdapApplication): for a_val in self.failed_entries[e_dn][attr]: nr_attr += 1 msg = ngettext( - "Got an inconsistent entry.", "Got {} inconsistent entries.", nr).format(nr) + 'Got an inconsistent entry.', 'Got {} inconsistent entries.', nr).format(nr) LOG.error(msg) msg = ngettext( - "There is one inconsistent attribute.", - "There are {} inconsistent attributes.", nr_attr).format(nr_attr) + 'There is one inconsistent attribute.', + 'There are {} inconsistent attributes.', nr_attr).format(nr_attr) LOG.warn(msg) if self.export_file: - LOG.debug(_("Writing export file {!r} ...").format(self.export_file)) + LOG.debug(_('Writing export file {!r} ...').format(self.export_file)) with open( self.export_file, 'wt', encoding='utf-8', errors='surrogateescape') as fh: print('---', file=fh) self.print_errors(out=fh) self.exit(5) - msg = _("Did not found any inconsistent entries.") + msg = _('Did not found any inconsistent entries.') LOG.info(msg) self.exit(0) # ------------------------------------------------------------------------- def print_errors(self, out=sys.stdout): - - sorted_dns = sorted( - list(self.failed_entries.keys()), key=cmp_to_key(self.compare_ldap_dns)) + """Print out results to STDOUT.""" + sorted_dns = sorted(self.failed_entries.keys(), key=cmp_to_key(self.compare_ldap_dns)) for dn in sorted_dns: entry = self.failed_entries[dn] @@ -212,14 +210,14 @@ class CheckLdapDnAttributesApplication(BaseLdapApplication): # ------------------------------------------------------------------------- def check_entry(self, dn): - + """Check syntax of DN-like attributes of a particular LDAP entry.""" if self.verbose > 1: - LOG.debug(_("Checking DN-like attributes of entry {!r} ...").format(dn)) + LOG.debug(_('Checking DN-like attributes of entry {!r} ...').format(dn)) entry = self.get_entry(dn, self.instance, attributes=self.check_attributes) attribs = self.normalized_attributes(entry) if self.verbose > 2: - LOG.debug(_("Got attributes:") + '\n' + pp(attribs.as_dict())) + LOG.debug(_('Got attributes:') + '\n' + pp(attribs.as_dict())) for attrib in self.check_attributes: if attrib in attribs: @@ -240,14 +238,14 @@ class CheckLdapDnAttributesApplication(BaseLdapApplication): # ------------------------------------------------------------------------- def check_ref_syntax(self, dn): - + """Check syntax of a DN-like value.""" if self.re_ldap_dn.match(dn): return True return False # ------------------------------------------------------------------------- def check_ref_existence(self, dn, inst): - + """Check existence of referenced entry.""" if dn in self.checked_ref_dn: return self.checked_ref_dn[dn] @@ -264,7 +262,7 @@ class CheckLdapDnAttributesApplication(BaseLdapApplication): # ============================================================================= -if __name__ == "__main__": +if __name__ == '__main__': pass diff --git a/lib/pp_admintools/app/dns_deploy_zones.py b/lib/pp_admintools/app/dns_deploy_zones.py index 08feea2..58e6d95 100644 --- a/lib/pp_admintools/app/dns_deploy_zones.py +++ b/lib/pp_admintools/app/dns_deploy_zones.py @@ -1,51 +1,46 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- """ +@summary: A module for the application class for configuring named. + @author: Frank Brehm @contact: frank.brehm@pixelpark.com @copyright: © 2023 by Frank Brehm, Berlin -@summary: A module for the application class for configuring named """ from __future__ import absolute_import -import os +import datetime +import ipaddress import logging import logging.config -import textwrap +import os +import pipes import re import shlex -import datetime +import shutil import tempfile +import textwrap import time -import shutil -import pipes -import ipaddress - -from subprocess import Popen, TimeoutExpired, PIPE - from pathlib import Path +from subprocess import PIPE, Popen, TimeoutExpired # Third party modules -import six -from pytz import timezone, UnknownTimeZoneError - -# Own modules +from fb_tools.app import BaseApplication from fb_tools.common import to_str +from fb_tools.pidfile import PidFile, PidFileError -from fb_tools.app import BaseApplication +from pytz import UnknownTimeZoneError, timezone -from fb_tools.pidfile import PidFileError, PidFile +import six +# Own modules +from .pdns import PpPDNSAppError, PpPDNSApplication from .. import __version__ as GLOBAL_VERSION from .. import pp - -from .pdns import PpPDNSAppError, PpPDNSApplication - from ..config.dns_deploy_zones import DnsDeployZonesConfig - from ..xlate import XLATOR -__version__ = '0.8.7' +__version__ = '0.8.8' LOG = logging.getLogger(__name__) _ = XLATOR.gettext @@ -53,14 +48,17 @@ _ = XLATOR.gettext # ============================================================================= class PpDeployZonesError(PpPDNSAppError): + """Special exception class to use in this module.""" + pass # ============================================================================= class PpDeployZonesApp(PpPDNSApplication): """ - Class for a application 'dns-deploy-zones' for configuring slaves - of the BIND named daemon. + Class for a application 'dns-deploy-zones'. + + It is intended for configuring slaves of the BIND named daemon. """ show_simulate_option = True @@ -90,7 +88,7 @@ class PpDeployZonesApp(PpPDNSApplication): def __init__( self, appname=None, base_dir=None, version=GLOBAL_VERSION, cfg_class=DnsDeployZonesConfig): - + """Initialize the PpDeployZonesApp object.""" self.zones = {} self.pidfile = None @@ -125,7 +123,7 @@ class PpDeployZonesApp(PpPDNSApplication): super(PpDeployZonesApp, self).__init__( appname=appname, version=version, description=description, base_dir=base_dir, - cfg_class=cfg_class, initialized=False, instance="public", + cfg_class=cfg_class, initialized=False, instance='public', ) masters = [] @@ -140,8 +138,7 @@ class PpDeployZonesApp(PpPDNSApplication): # ------------------------------------------- @property def cmd_named_checkconf(self): - """The OS command for named-checkconf.""" - + """Return the OS command for named-checkconf.""" checkconf = DnsDeployZonesConfig.default_named_checkconf if self.cfg: checkconf = self.cfg.named_checkconf @@ -150,52 +147,47 @@ class PpDeployZonesApp(PpPDNSApplication): # ------------------------------------------- @property def cmd_named_reload(self): - """The OS command to reload the BIND nameserver.""" - + """Return the OS command to reload the BIND nameserver.""" rndc = DnsDeployZonesConfig.default_rndc if self.cfg: rndc = self.cfg.rndc - return "{} reload".format(rndc) + return '{} reload'.format(rndc) # ------------------------------------------- @property def cmd_named_status(self): - """The OS command to show the status of the BIND nameserver service.""" - + """Return the OS command to show the status of the BIND nameserver service.""" systemctl = DnsDeployZonesConfig.default_systemctl if self.cfg: systemctl = self.cfg.systemctl - return "{} status named.service".format(systemctl) + return '{} status named.service'.format(systemctl) # ------------------------------------------- @property def cmd_named_start(self): - """The OS command to start the BIND nameserver service.""" - + """Return the OS command to start the BIND nameserver service.""" systemctl = DnsDeployZonesConfig.default_systemctl if self.cfg: systemctl = self.cfg.systemctl - return "{} start named.service".format(systemctl) + return '{} start named.service'.format(systemctl) # ------------------------------------------- @property def cmd_named_restart(self): - """The OS command to restart the BIND nameserver service.""" - + """Return the OS command to restart the BIND nameserver service.""" systemctl = DnsDeployZonesConfig.default_systemctl if self.cfg: systemctl = self.cfg.systemctl - return "{} restart named.service".format(systemctl) + return '{} restart named.service'.format(systemctl) # ------------------------------------------- @property def named_zones_cfg_file(self): - """The file for configuration of all own zones.""" - + """Return the file for configuration of all own zones.""" conf_dir = DnsDeployZonesConfig.default_named_conf_dir zones_cfg_file = DnsDeployZonesConfig.default_named_zones_cfg_file if self.cfg: @@ -207,8 +199,7 @@ class PpDeployZonesApp(PpPDNSApplication): # ------------------------------------------- @property def named_slavedir_rel(self): - """The directory for zone files of slave zones.""" - + """Return the directory for zone files of slave zones.""" if self.cfg: return self.cfg.named_slavedir return DnsDeployZonesConfig.default_named_slavedir @@ -216,8 +207,7 @@ class PpDeployZonesApp(PpPDNSApplication): # ------------------------------------------- @property def named_basedir(self): - """The base directory of named, where all volatile data are stored.""" - + """Return the base directory of named, where all volatile data are stored.""" if self.cfg: return self.cfg.named_basedir return DnsDeployZonesConfig.default_named_basedir @@ -225,14 +215,13 @@ class PpDeployZonesApp(PpPDNSApplication): # ------------------------------------------- @property def named_slavedir_abs(self): - """The directory for zone files of slave zones.""" - + """Return the directory for zone files of slave zones.""" return (self.named_basedir / self.named_slavedir_rel).resolve() # ------------------------------------------------------------------------- def as_dict(self, short=True): """ - Transforms the elements of the object into a dict + Transform the elements of the object into a dict. @param short: don't include local properties in resulting dict. @type short: bool @@ -240,7 +229,6 @@ class PpDeployZonesApp(PpPDNSApplication): @return: structure as dict @rtype: dict """ - res = super(PpDeployZonesApp, self).as_dict(short=short) res['named_slavedir_abs'] = self.named_slavedir_abs @@ -258,31 +246,27 @@ class PpDeployZonesApp(PpPDNSApplication): # ------------------------------------------------------------------------- def init_arg_parser(self): - - deploy_group = self.arg_parser.add_argument_group(_("Options for {}").format( + """Initialize command line parser with some special options.""" + deploy_group = self.arg_parser.add_argument_group(_('Options for {}').format( self.appname)) deploy_group.add_argument( - '-B', '--backup', dest="keep_backup", action='store_true', - help=_("Keep a backup file for each changed configuration file."), + '-B', '--backup', dest='keep_backup', action='store_true', + help=_('Keep a backup file for each changed configuration file.'), ) deploy_group.add_argument( '-K', '--keep-tempdir', dest='keep_tempdir', action='store_true', help=_( - "Keeping the temporary directory instead of removing it at the end " - "(e.g. for debugging purposes)"), + 'Keeping the temporary directory instead of removing it at the end ' + '(e.g. for debugging purposes)'), ) super(PpDeployZonesApp, self).init_arg_parser() # ------------------------------------------------------------------------- def perform_arg_parser(self): - """ - Public available method to execute some actions after parsing - the command line parameters. - """ - + """Execute some actions after parsing the command line parameters.""" super(PpDeployZonesApp, self).perform_arg_parser() if self.args.keep_tempdir: @@ -293,15 +277,15 @@ class PpDeployZonesApp(PpPDNSApplication): # ------------------------------------------------------------------------- def post_init(self): - + """Execute some actions at the end of initialization.""" if not self.quiet: print('') - LOG.debug(_("Post init phase.")) + LOG.debug(_('Post init phase.')) super(PpDeployZonesApp, self).post_init() - LOG.debug(_("My own post init phase.")) + LOG.debug(_('My own post init phase.')) cmd_namedcheckconf = self.get_command('named-checkconf', resolve=True) if not cmd_namedcheckconf: @@ -317,36 +301,31 @@ class PpDeployZonesApp(PpPDNSApplication): try: self.local_tz = timezone(self.local_tz_name) except UnknownTimeZoneError: - LOG.error(_("Unknown time zone: {!r}.").format(self.local_tz_name)) + LOG.error(_('Unknown time zone: {!r}.').format(self.local_tz_name)) self.exit(6) # ------------------------------------------------------------------------- def current_timestamp(self): - + """Return formatted current timestamp.""" if self.local_tz: return datetime.datetime.now(self.local_tz).strftime('%Y-%m-%d %H:%M:%S %Z') return datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') # ------------------------------------------------------------------------- def pre_run(self): - """ - Dummy function to run before the main routine. - Could be overwritten by descendant classes. - - """ - + """Execute some actions before starting the underlaying stuff.""" my_uid = os.geteuid() if my_uid: - msg = _("You must be root to execute this script.") + msg = _('You must be root to execute this script.') if self.simulate: - msg += ' ' + _("But in simulation mode we are continuing nevertheless.") + msg += ' ' + _('But in simulation mode we are continuing nevertheless.') LOG.warn(msg) time.sleep(1) else: LOG.error(msg) self.exit(1) - LOG.info(_("Starting: {}").format(self.current_timestamp())) + LOG.info(_('Starting: {}').format(self.current_timestamp())) try: super(PpDeployZonesApp, self).pre_run() @@ -355,7 +334,7 @@ class PpDeployZonesApp(PpPDNSApplication): if self.cfg.pdns_instance == 'global': LOG.error(_( - "Using the global DNS master is not supported, " + 'Using the global DNS master is not supported, ' "please use 'local' or 'public'")) self.exit(1) @@ -367,7 +346,7 @@ class PpDeployZonesApp(PpPDNSApplication): try: self.pidfile.create() except PidFileError as e: - LOG.error(_("Could not occupy pidfile: {}").format(e)) + LOG.error(_('Could not occupy pidfile: {}').format(e)) self.exit(7) return @@ -392,68 +371,67 @@ class PpDeployZonesApp(PpPDNSApplication): finally: self.cleanup() self.pidfile = None - LOG.info(_("Ending: {}").format(self.current_timestamp())) + LOG.info(_('Ending: {}').format(self.current_timestamp())) # ------------------------------------------------------------------------- def cleanup(self): - - LOG.info(_("Cleaning up ...")) + """Clean up all temporary stuff after a unexpected exit.""" + LOG.info(_('Cleaning up ...')) for tgt_file in self.moved_files.keys(): backup_file = self.moved_files[tgt_file] - LOG.debug(_("Searching for {!r}.").format(backup_file)) + LOG.debug(_('Searching for {!r}.').format(backup_file)) if backup_file.exists(): if self.keep_backup: - LOG.info(_("Keep existing backup file {!r}.").format(str(backup_file))) + LOG.info(_('Keep existing backup file {!r}.').format(str(backup_file))) else: - LOG.info(_("Removing {!r} ...").format(str(backup_file))) + LOG.info(_('Removing {!r} ...').format(str(backup_file))) if not self.simulate: backup_file.unlink() # ----------------------- def emit_rm_err(function, path, excinfo): - LOG.error(_("Error removing {p!r} - {c}: {e}").format( + LOG.error(_('Error removing {p!r} - {c}: {e}').format( p=str(path), c=excinfo[1].__class__.__name__, e=excinfo[1])) if self.tempdir: if self.keep_tempdir: msg = _( - "Temporary directory {!r} will not be removed. " + 'Temporary directory {!r} will not be removed. ' "It's on yours to remove it manually.").format(str(self.tempdir)) LOG.warn(msg) else: - LOG.debug(_("Destroying temporary directory {!r} ...").format(str(self.tempdir))) + LOG.debug(_('Destroying temporary directory {!r} ...').format(str(self.tempdir))) shutil.rmtree(str(self.tempdir), False, emit_rm_err) self.tempdir = None # ------------------------------------------------------------------------- def init_temp_objects(self): """Init temporary objects and properties.""" - self.tempdir = Path(tempfile.mkdtemp(prefix=(self.appname + '.'), suffix='.tmp.d')) - LOG.debug(_("Temporary directory: {!r}.").format(str(self.tempdir))) + LOG.debug(_('Temporary directory: {!r}.').format(str(self.tempdir))) self.temp_zones_cfg_file = self.tempdir / self.cfg.named_zones_cfg_file if self.verbose > 1: - LOG.debug(_("Temporary zones conf: {!r}").format(str(self.temp_zones_cfg_file))) + LOG.debug(_('Temporary zones conf: {!r}').format(str(self.temp_zones_cfg_file))) - # ------------------------------------------------------------------------- + # --------------------------''--------------------------------------------- def get_named_keys(self): - - LOG.info(_("Trying to get all keys from named.conf ...")) + """Get all keys from output of named-checkconf -s'.""" + LOG.info(_('Trying to get all keys from named.conf ...')) cmd = shlex.split(self.cmd_named_checkconf) cmd.append('-p') cmd_str = ' '.join(map(lambda x: pipes.quote(x), cmd)) - LOG.debug(_("Executing: {}").format(cmd_str)) + LOG.debug(_('Executing: {}').format(cmd_str)) result = super(BaseApplication, self).run( cmd, stdout=PIPE, stderr=PIPE, timeout=10, check=True, may_simulate=False) if self.verbose > 3: - LOG.debug(_("Result:") + '\n' + str(result)) + LOG.debug(_('Result:') + '\n' + str(result)) config = result.stdout @@ -471,7 +449,7 @@ class PpDeployZonesApp(PpPDNSApplication): key_name = match[1] key_data = match[2].strip() if self.verbose > 2: - LOG.debug("Found key {!r}:".format(key_name) + '\n' + key_data) + LOG.debug('Found key {!r}:'.format(key_name) + '\n' + key_data) algorithm = None secret = None @@ -494,14 +472,14 @@ class PpDeployZonesApp(PpPDNSApplication): if self.verbose > 1: if self.named_keys: - LOG.debug(_("Found named keys:") + '\n' + pp(self.named_keys)) + LOG.debug(_('Found named keys:') + '\n' + pp(self.named_keys)) else: - LOG.debug(_("Found named keys:") + ' ' + _('None')) + LOG.debug(_('Found named keys:') + ' ' + _('None')) # ------------------------------------------------------------------------- def generate_slave_cfg_file(self): - - LOG.info(_("Generating {} ...").format(self.cfg.named_zones_cfg_file)) + """Generate configuration file for slave zones.""" + LOG.info(_('Generating {} ...').format(self.cfg.named_zones_cfg_file)) cur_date = datetime.datetime.now().isoformat(' ') @@ -525,9 +503,9 @@ class PpDeployZonesApp(PpPDNSApplication): content += '\n' + zone_config if self.servers: - LOG.debug(_("Collected server configuration:") + '\n' + pp(self.servers)) + LOG.debug(_('Collected server configuration:') + '\n' + pp(self.servers)) else: - LOG.debug(_("Collected server configuration:") + ' ' + _('None')) + LOG.debug(_('Collected server configuration:') + ' ' + _('None')) if self.servers: for server in sorted(self.servers.keys()): @@ -548,12 +526,12 @@ class PpDeployZonesApp(PpPDNSApplication): if self.verbose > 2: LOG.debug( - _("Generated file {!r}:").format( + _('Generated file {!r}:').format( str(self.temp_zones_cfg_file)) + '\n' + content.strip()) # ------------------------------------------------------------------------- def generate_zone_config(self, zone_name): - + """Generate configuration block for the given Nameserver.""" zone = self.zones[zone_name] zone.update() @@ -564,7 +542,7 @@ class PpDeployZonesApp(PpPDNSApplication): prefix = self._get_ipv4_prefix(match.group(1)) if prefix: if prefix == '127.0.0': - LOG.debug(_("Pure local zone {!r} will not be considered.").format(prefix)) + LOG.debug(_('Pure local zone {!r} will not be considered.').format(prefix)) return '' canonical_name = 'rev.' + prefix else: @@ -596,7 +574,7 @@ class PpDeployZonesApp(PpPDNSApplication): for key_id in zone.master_tsig_key_ids: if key_id not in self.named_keys: - msg = _("Key {k!r} for zone {z!r} not found in named configuration.").format( + msg = _('Key {k!r} for zone {z!r} not found in named configuration.').format( k=key_id, z=show_name) raise PpDeployZonesError(msg) @@ -626,7 +604,7 @@ class PpDeployZonesApp(PpPDNSApplication): if t: tuples.insert(0, t) if self.verbose > 2: - LOG.debug(_("Got IPv4 tuples: {}").format(pp(tuples))) + LOG.debug(_('Got IPv4 tuples: {}').format(pp(tuples))) return '.'.join(tuples) # ------------------------------------------------------------------------- @@ -650,56 +628,56 @@ class PpDeployZonesApp(PpPDNSApplication): del tuples[0:4] if self.verbose > 2: - LOG.debug(_("Got IPv6 tokens: {}").format(pp(tokens))) + LOG.debug(_('Got IPv6 tokens: {}').format(pp(tokens))) return ':'.join(tokens) # ------------------------------------------------------------------------- def compare_files(self): - - LOG.info(_("Comparing generated files with existing ones.")) + """Compare generated files with existing ones..""" + LOG.info(_('Comparing generated files with existing ones.')) if not self.files_equal_content(self.temp_zones_cfg_file, self.named_zones_cfg_file): self.reload_necessary = True self.files2replace[self.named_zones_cfg_file] = self.temp_zones_cfg_file if self.verbose > 1: - LOG.debug(_("Files to replace:") + '\n' + pp(self.files2replace)) + LOG.debug(_('Files to replace:') + '\n' + pp(self.files2replace)) # ------------------------------------------------------------------------- def files_equal_content(self, file_src, file_tgt): - + """Compare two configuration files by ignoring whitespaces and comments.""" if not file_src: - raise PpDeployZonesError(_("Source file not defined.")) + raise PpDeployZonesError(_('Source file not defined.')) if not file_tgt: - raise PpDeployZonesError(_("Target file not defined.")) + raise PpDeployZonesError(_('Target file not defined.')) - LOG.debug(_("Comparing {one!r} with {two!r} ...").format( + LOG.debug(_('Comparing {one!r} with {two!r} ...').format( one=str(file_src), two=str(file_tgt))) if not file_src.exists(): - msg = _("{what} {f!r} does not exists.").format( - what=_("Source file"), f=str(file_src)) + msg = _('{what} {f!r} does not exists.').format( + what=_('Source file'), f=str(file_src)) raise PpDeployZonesError(msg) if not file_src.is_file(): - msg = _("{what} {f!r} is not a regular file.").format( - what=_("Source file"), f=str(file_src)) + msg = _('{what} {f!r} is not a regular file.').format( + what=_('Source file'), f=str(file_src)) raise PpDeployZonesError(msg) if not file_tgt.exists(): - msg = _("{what} {f!r} does not exists.").format( - what=_("Target file"), f=str(file_tgt)) + msg = _('{what} {f!r} does not exists.').format( + what=_('Target file'), f=str(file_tgt)) LOG.debug(msg) return False if not file_tgt.is_file(): - msg = _("{what} {f!r} is not a regular file.").format( - what=_("Target file"), f=str(file_tgt)) + msg = _('{what} {f!r} is not a regular file.').format( + what=_('Target file'), f=str(file_tgt)) raise PpDeployZonesError(msg) # Reading source file content_src = '' if self.verbose > 2: - LOG.debug(_("Reading {!r} ...").format(str(file_src))) + LOG.debug(_('Reading {!r} ...').format(str(file_src))) content_src = file_src.read_text(**self.open_args) lines_str_src = self.re_block_comment.sub('', content_src) lines_str_src = self.re_line_comment.sub('', lines_str_src) @@ -709,14 +687,14 @@ class PpDeployZonesApp(PpPDNSApplication): if line: lines_src.append(line) if self.verbose > 3: - msg = _("Cleaned version of {!r}:").format(str(file_src)) + msg = _('Cleaned version of {!r}:').format(str(file_src)) msg += '\n' + '\n'.join(lines_src) LOG.debug(msg) # Reading target file content_tgt = '' if self.verbose > 2: - LOG.debug(_("Reading {!r} ...").format(str(file_tgt))) + LOG.debug(_('Reading {!r} ...').format(str(file_tgt))) content_tgt = file_tgt.read_text(**self.open_args) lines_str_tgt = self.re_block_comment.sub('', content_tgt) lines_str_tgt = self.re_line_comment.sub('', lines_str_tgt) @@ -726,14 +704,14 @@ class PpDeployZonesApp(PpPDNSApplication): if line: lines_tgt.append(line) if self.verbose > 3: - msg = _("Cleaned version of {!r}:").format(str(file_tgt)) + msg = _('Cleaned version of {!r}:').format(str(file_tgt)) msg += '\n' + '\n'.join(lines_tgt) LOG.debug(msg) if len(lines_src) != len(lines_tgt): LOG.debug(_( - "Source file {sf!r} has different number essential lines ({sl}) than " - "the target file {tf!r} ({tl} lines).").format( + 'Source file {sf!r} has different number essential lines ({sl}) than ' + 'the target file {tf!r} ({tl} lines).').format( sf=str(file_src), sl=len(lines_src), tf=str(file_tgt), tl=len(lines_tgt))) return False @@ -741,8 +719,8 @@ class PpDeployZonesApp(PpPDNSApplication): while i < len(lines_src): if lines_src[i] != lines_tgt[i]: LOG.debug(_( - "Source file {sf!r} has a different content than " - "the target file {tf!r}.").format(sf=str(file_src), tf=str(file_tgt))) + 'Source file {sf!r} has a different content than ' + 'the target file {tf!r}.').format(sf=str(file_src), tf=str(file_tgt))) return False i += 1 @@ -750,12 +728,12 @@ class PpDeployZonesApp(PpPDNSApplication): # ------------------------------------------------------------------------- def replace_configfiles(self): - + """Replace the configuration file with the modified one.""" if not self.files2replace: - LOG.debug(_("No replacement of any config files necessary.")) + LOG.debug(_('No replacement of any config files necessary.')) return - LOG.debug(_("Start replacing of config files ...")) + LOG.debug(_('Start replacing of config files ...')) for tgt_file in self.files2replace.keys(): @@ -763,51 +741,51 @@ class PpDeployZonesApp(PpPDNSApplication): if tgt_file.exists(): self.moved_files[tgt_file] = backup_file - LOG.info(_("Copying {frm!r} => {to!r} ...").format( + LOG.info(_('Copying {frm!r} => {to!r} ...').format( frm=str(tgt_file), to=str(backup_file))) if not self.simulate: shutil.copy2(str(tgt_file), str(backup_file)) if self.verbose > 1: - LOG.debug(_("All backuped config files:") + '\n' + pp(self.moved_files)) + LOG.debug(_('All backuped config files:') + '\n' + pp(self.moved_files)) for tgt_file in self.files2replace.keys(): src_file = self.files2replace[tgt_file] - LOG.info(_("Copying {frm!r} => {to!r} ...").format( + LOG.info(_('Copying {frm!r} => {to!r} ...').format( frm=str(src_file), to=str(tgt_file))) if not self.simulate: shutil.copy2(str(src_file), str(tgt_file)) # ------------------------------------------------------------------------- def restore_configfiles(self): - - LOG.error(_("Restoring of original config files because of an exception.")) + """Restore original config files because of an exception.""" + LOG.error(_('Restoring of original config files because of an exception.')) for tgt_file in self.moved_files.keys(): backup_file = self.moved_files[tgt_file] - LOG.info(_("Moving {frm!r} => {to!r} ...").format( + LOG.info(_('Moving {frm!r} => {to!r} ...').format( frm=str(backup_file), to=str(tgt_file))) if not self.simulate: if backup_file.exists(): backup_file.rename(tgt_file) else: - LOG.error(_("Could not find backup file {!r}.").format(str(backup_file))) + LOG.error(_('Could not find backup file {!r}.').format(str(backup_file))) # ------------------------------------------------------------------------- def check_namedconf(self): - - LOG.info(_("Checking syntax correctness of named.conf ...")) + """Chack the syntax correctness of named.conf.""" + LOG.info(_('Checking syntax correctness of named.conf ...')) cmd = shlex.split(self.cmd_named_checkconf) if self.verbose > 2: cmd.append('-p') cmd_str = ' '.join(map(lambda x: pipes.quote(x), cmd)) - LOG.debug(_("Executing: {}").format(cmd_str)) + LOG.debug(_('Executing: {}').format(cmd_str)) result = super(BaseApplication, self).run( cmd, stdout=PIPE, stderr=PIPE, timeout=10, check=False, may_simulate=False) if self.verbose > 2: - LOG.debug(_("Result:") + '\n' + str(result)) + LOG.debug(_('Result:') + '\n' + str(result)) if result.returncode: return False @@ -815,14 +793,14 @@ class PpDeployZonesApp(PpPDNSApplication): # ------------------------------------------------------------------------- def apply_config(self): - + """Apply the changed config of the BIND nameserver.""" if not self.reload_necessary and not self.restart_necessary: - LOG.info(_("Reload or restart of named is not necessary.")) + LOG.info(_('Reload or restart of named is not necessary.')) return running = self.named_running() if not running: - LOG.warn(_("Named is not running, please start it manually.")) + LOG.warn(_('Named is not running, please start it manually.')) return if self.restart_necessary: @@ -832,12 +810,12 @@ class PpDeployZonesApp(PpPDNSApplication): # ------------------------------------------------------------------------- def named_running(self): - - LOG.debug(_("Checking, whether named is running ...")) + """Chack the running state of the BIND nameserver.""" + LOG.debug(_('Checking, whether named is running ...')) cmd = shlex.split(self.cmd_named_status) cmd_str = ' '.join(map(lambda x: pipes.quote(x), cmd)) - LOG.debug(_("Executing: {}").format(cmd_str)) + LOG.debug(_('Executing: {}').format(cmd_str)) std_out = None std_err = None @@ -851,11 +829,11 @@ class PpDeployZonesApp(PpPDNSApplication): std_out, std_err = proc.communicate() ret_val = proc.wait() - LOG.debug(_("Return value: {!r}").format(ret_val)) + LOG.debug(_('Return value: {!r}').format(ret_val)) if std_out and std_out.strip(): - LOG.debug(_("Output on {}").format('STDOUT') + '\n' + to_str(std_out.strip())) + LOG.debug(_('Output on {}').format('STDOUT') + '\n' + to_str(std_out.strip())) if std_err and std_err.strip(): - LOG.warn(_("Output on {}").format('STDERR') + ' ' + to_str(std_err.strip())) + LOG.warn(_('Output on {}').format('STDERR') + ' ' + to_str(std_err.strip())) if ret_val: return False @@ -864,12 +842,12 @@ class PpDeployZonesApp(PpPDNSApplication): # ------------------------------------------------------------------------- def start_named(self): - - LOG.info(_("Starting {} ...").format('named')) + """Start the BIND nameserver.""" + LOG.info(_('Starting {} ...').format('named')) cmd = shlex.split(self.cmd_named_start) cmd_str = ' '.join(map(lambda x: pipes.quote(x), cmd)) - LOG.debug(_("Executing: {}").format(cmd_str)) + LOG.debug(_('Executing: {}').format(cmd_str)) if self.simulate: return @@ -886,11 +864,11 @@ class PpDeployZonesApp(PpPDNSApplication): std_out, std_err = proc.communicate() ret_val = proc.wait() - LOG.debug(_("Return value: {!r}").format(ret_val)) + LOG.debug(_('Return value: {!r}').format(ret_val)) if std_out and std_out.strip(): - LOG.debug(_("Output on {}").format('STDOUT') + '\n' + to_str(std_out.strip())) + LOG.debug(_('Output on {}').format('STDOUT') + '\n' + to_str(std_out.strip())) if std_err and std_err.strip(): - LOG.error(_("Output on {}").format('STDERR') + ' ' + to_str(std_err.strip())) + LOG.error(_('Output on {}').format('STDERR') + ' ' + to_str(std_err.strip())) if ret_val: return False @@ -899,12 +877,12 @@ class PpDeployZonesApp(PpPDNSApplication): # ------------------------------------------------------------------------- def restart_named(self): - - LOG.info(_("Restarting {} ...").format('named')) + """Restart the BIND nameserver.""" + LOG.info(_('Restarting {} ...').format('named')) cmd = shlex.split(self.cmd_named_restart) cmd_str = ' '.join(map(lambda x: pipes.quote(x), cmd)) - LOG.debug(_("Executing: {}").format(cmd_str)) + LOG.debug(_('Executing: {}').format(cmd_str)) if self.simulate: return @@ -921,11 +899,11 @@ class PpDeployZonesApp(PpPDNSApplication): std_out, std_err = proc.communicate() ret_val = proc.wait() - LOG.debug(_("Return value: {!r}").format(ret_val)) + LOG.debug(_('Return value: {!r}').format(ret_val)) if std_out and std_out.strip(): - LOG.debug(_("Output on {}").format('STDOUT') + '\n' + to_str(std_out.strip())) + LOG.debug(_('Output on {}').format('STDOUT') + '\n' + to_str(std_out.strip())) if std_err and std_err.strip(): - LOG.error(_("Output on {}").format('STDERR') + ' ' + to_str(std_err.strip())) + LOG.error(_('Output on {}').format('STDERR') + ' ' + to_str(std_err.strip())) if ret_val: return False @@ -934,12 +912,12 @@ class PpDeployZonesApp(PpPDNSApplication): # ------------------------------------------------------------------------- def reload_named(self): - - LOG.info(_("Reloading {} ...").format('named')) + """Reload the BIND nameserver.""" + LOG.info(_('Reloading {} ...').format('named')) cmd = shlex.split(self.cmd_named_reload) cmd_str = ' '.join(map(lambda x: pipes.quote(x), cmd)) - LOG.debug(_("Executing: {}").format(cmd_str)) + LOG.debug(_('Executing: {}').format(cmd_str)) if self.simulate: return @@ -956,11 +934,11 @@ class PpDeployZonesApp(PpPDNSApplication): std_out, std_err = proc.communicate() ret_val = proc.wait() - LOG.debug(_("Return value: {!r}").format(ret_val)) + LOG.debug(_('Return value: {!r}').format(ret_val)) if std_out and std_out.strip(): - LOG.debug(_("Output on {}").format('STDOUT') + '\n' + to_str(std_out.strip())) + LOG.debug(_('Output on {}').format('STDOUT') + '\n' + to_str(std_out.strip())) if std_err and std_err.strip(): - LOG.error(_("Output on {}").format('STDERR') + ' ' + to_str(std_err.strip())) + LOG.error(_('Output on {}').format('STDERR') + ' ' + to_str(std_err.strip())) if ret_val: return False @@ -970,7 +948,7 @@ class PpDeployZonesApp(PpPDNSApplication): # ============================================================================= -if __name__ == "__main__": +if __name__ == '__main__': pass diff --git a/setup.cfg b/setup.cfg index eee552f..6f49cdb 100644 --- a/setup.cfg +++ b/setup.cfg @@ -59,6 +59,6 @@ max-line-length = 99 max-line-length = 99 max-complexity = 20 -ignore = E226,E302,E41,E402 +ignore = E226,E302,E41,E402,B902 # vim: filetype=dosini -- 2.39.5