]> Frank Brehm's Git Trees - pixelpark/pp-admin-tools.git/commitdiff
Adding lib/pp_admintools/dns_deploy_zones_config.py
authorFrank Brehm <frank@brehm-online.com>
Wed, 30 Mar 2022 13:30:06 +0000 (15:30 +0200)
committerFrank Brehm <frank@brehm-online.com>
Wed, 30 Mar 2022 13:30:06 +0000 (15:30 +0200)
lib/pp_admintools/dns_deploy_zones_config.py [new file with mode: 0644]

diff --git a/lib/pp_admintools/dns_deploy_zones_config.py b/lib/pp_admintools/dns_deploy_zones_config.py
new file mode 100644 (file)
index 0000000..3003f2a
--- /dev/null
@@ -0,0 +1,541 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+@author: Frank Brehm
+@contact: frank.brehm@pixelpark.com
+@copyright: © 2022 by Frank Brehm, Berlin
+@summary: A module for providing a configuration the dns-deploy-zones applications.
+          It's based on class PdnsConfiguration.
+"""
+from __future__ import absolute_import
+
+# Standard module
+import logging
+import re
+import copy
+import socket
+
+from pathlib import Path
+
+# Third party modules
+
+# Own modules
+
+from fb_tools.common import is_sequence, pp, to_bool
+
+# from .config import ConfigError, BaseConfiguration
+from fb_tools.multi_config import DEFAULT_ENCODING
+
+from . import __version__ as GLOBAL_VERSION
+from . import MAX_TIMEOUT, MAX_PORT_NUMBER
+
+from .pdns_config import PdnsConfigError, PdnsConfiguration
+from .mail_config import DEFAULT_CONFIG_DIR
+
+from .xlate import XLATOR
+
+__version__ = '0.1.0'
+LOG = logging.getLogger(__name__)
+
+_ = XLATOR.gettext
+
+
+# =============================================================================
+class DnsDeployZonesConfigError(PdnsConfigError):
+    """Base error class for all exceptions happened during
+    execution this configured application"""
+
+    pass
+
+
+# =============================================================================
+class DnsDeployZonesConfig(PdnsConfiguration):
+    """
+    A class for providing a configuration for an arbitrary PowerDNS Application
+    and methods to read it from configuration files.
+    """
+
+    default_pidfile = Path('/run/dns-deploy-zones.pid')
+
+    default_named_conf_dir = Path('/etc')
+    default_named_zones_cfg_file = Path('named.zones.conf')
+    default_named_basedir = Path('/var/named')
+    default_named_slavedir = Path('slaves')
+
+    default_zone_masters_local = set('dnsmaster-local.pixelpark.com')
+    default_zone_masters_public = set('dnsmaster-public.pixelpark.com')
+
+    default_rndc = Path('/usr/sbin/rndc')
+    default_systemctl = Path('/usr/bin/systemctl')
+    default_named_checkconf = Path('/usr/sbin/named-checkconf')
+
+    default_named_listen_on_v6 = False
+    default_named_internal = False
+
+    re_split_addresses = re.compile(r'[,;\s]+')
+
+    # -------------------------------------------------------------------------
+    def __init__(
+        self, appname=None, verbose=0, version=__version__, base_dir=None,
+            append_appname_to_stems=True, additional_stems=None, config_dir=DEFAULT_CONFIG_DIR,
+            additional_config_file=None, additional_cfgdirs=None, encoding=DEFAULT_ENCODING,
+            ensure_privacy=True, use_chardet=True, initialized=False):
+
+        self.pidfile = self.default_pidfile
+        self.named_conf_dir = self.default_named_conf_dir
+        self.named_zones_cfg_file = self.default_named_zones_cfg_file
+        self.named_basedir = self.default_named_basedir
+        self.named_slavedir = self.default_named_slavedir
+
+        self.zone_masters_local = []
+        for master in self.default_zone_masters_local:
+            self.zone_masters_local.append(master)
+
+        self.zone_masters_public = []
+        for master in self.default_zone_masters_public:
+            self.zone_masters_public.append(master)
+
+        self.rndc = self.default_rndc
+        self.systemctl = self.default_systemctl
+        self.named_checkconf = self.default_named_checkconf
+
+        self._named_listen_on_v6 = self.default_named_listen_on_v6
+        self._named_internal = self.default_named_internal
+
+        self.masters = set()
+
+        add_stems = []
+        if additional_stems:
+            if is_sequence(additional_stems):
+                for stem in additional_stems:
+                    add_stems.append(stem)
+            else:
+                add_stems.append(additional_stems)
+
+        if 'named' not in add_stems:
+            add_stems.append('named')
+
+        super(DnsDeployZonesConfig, self).__init__(
+            appname=appname, verbose=verbose, version=version, base_dir=base_dir,
+            append_appname_to_stems=append_appname_to_stems, config_dir=config_dir,
+            additional_stems=add_stems, additional_config_file=additional_config_file,
+            additional_cfgdirs=additional_cfgdirs, encoding=encoding, use_chardet=use_chardet,
+            ensure_privacy=ensure_privacy, initialized=False,
+        )
+
+        if initialized:
+            self.initialized = True
+
+    # -------------------------------------------------------------------------
+    @property
+    def named_internal(self):
+        """Is the BIND nameserver on the current host a local resolver (True)
+        or an authoritative nameserver for outside."""
+        return self._named_internal
+
+    @named_internal.setter
+    def named_internal(self, value):
+        self._named_internal = to_bool(value)
+
+    # -------------------------------------------------------------------------
+    @property
+    def named_listen_on_v6(self):
+        """Is the BIND nameserver on the current listening on some IPv6 addresses?"""
+        return self._named_listen_on_v6
+
+    @named_listen_on_v6.setter
+    def named_listen_on_v6(self, value):
+        self._named_listen_on_v6 = to_bool(value)
+
+    # -------------------------------------------------------------------------
+    def as_dict(self, short=True):
+        """
+        Transforms the elements of the object into a dict
+
+        @param short: don't include local properties in resulting dict.
+        @type short: bool
+
+        @return: structure as dict
+        @rtype:  dict
+        """
+
+        res = super(DnsDeployZonesConfig, self).as_dict(short=short)
+
+        res['default_pidfile'] = self.default_pidfile
+        res['default_named_conf_dir'] = self.default_named_conf_dir
+        res['default_named_zones_cfg_file'] = self.default_named_zones_cfg_file
+        res['default_named_basedir'] = self.default_named_basedir
+        res['default_named_slavedir'] = self.default_named_slavedir
+        res['default_zone_masters_local'] = copy.copy(self.default_zone_masters_local)
+        res['default_zone_masters_public'] = copy.copy(self.default_zone_masters_public)
+        res['default_rndc'] = self.default_rndc
+        res['default_systemctl'] = self.default_systemctl
+        res['default_named_checkconf'] = self.default_named_checkconf
+        res['default_named_listen_on_v6'] = self.default_named_listen_on_v6
+        res['default_named_internal'] = self.default_named_internal
+        res['named_listen_on_v6'] = self.named_listen_on_v6
+        res['named_internal'] = self.named_internal
+
+        res['masters'] = copy.copy(self.masters)
+
+        return res
+
+    # -------------------------------------------------------------------------
+    def eval_section(self, section_name):
+
+        super(DnsDeployZonesConfig, self).eval_section(section_name)
+        sn = section_name.lower()
+
+        if sn == 'named':
+            section = self.cfg[section_name]
+            return self._eval_named(section_name, section)
+
+        if sn == self.appname.lower() or sn == 'app':
+            section = self.cfg[section_name]
+            return self._eval_app(section_name, section)
+
+    # -------------------------------------------------------------------------
+    def _eval_named(self, section_name, section):
+
+        if self.verbose > 2:
+            msg = _("Evaluating config section {!r}:").format(section_name)
+            LOG.debug(msg + '\n' + pp(section))
+
+        re_config_dir = re.compile(r'^\s*(?:named[_-]?)?conf(?:ig)?[_-]?dir\s*$', re.IGNORECASE)
+        re_config_file = re.compile(
+            r'^\s*(?:named[_-]?)?zones[_-]?(?:conf(?:ig)?|cfg)[_-]*file\s*$', re.IGNORECASE)
+        re_base_dir = re.compile(r'^\s*(?:named[_-]?)?base[_-]?dir\s*$', re.IGNORECASE)
+        re_slave_dir = re.compile(r'^\s*(?:named[_-]?)?slave[_-]?dir\s*$', re.IGNORECASE)
+        re_named_checkconf = re.compile(r'^named[_-]?checkconf$', re.IGNORECASE)
+        re_internal = re.compile(
+            r'^\s*(?:named[_-]?)?(?:is[_-]?)?intern(?:al)?\s*$', re.IGNORECASE)
+        re_listen_v6 = re.compile(r'^\s*listen[_-](?:on[_-])?(?:ip)v6\s*$', re.IGNORECASE)
+
+        for key in section.keys():
+
+            if key.lower() == 'masters':
+                self._eval_named_masters(section_name, key, section)
+                continue
+
+            if key.lower() == 'rndc':
+                self._eval_named_rndc(section_name, key, section)
+                continue
+
+            if key.lower() == 'systemctl':
+                self._eval_named_systemctl(section_name, key, section)
+                continue
+
+            if re_config_dir.search(key):
+                self._eval_named_configdir(section_name, key, section)
+                continue
+
+            if re_config_file.search(key):
+                self._eval_named_configfile(section_name, key, section)
+                continue
+
+            if re_base_dir.search(key):
+                self._eval_named_basedir(section_name, key, section)
+                continue
+
+            if re_slave_dir.search(key):
+                self._eval_named_slavedir(section_name, key, section)
+                continue
+
+            if re_named_checkconf.search(key):
+                self._eval_named_checkconf(section_name, key, section)
+                continue
+
+            if re_internal.search(key):
+                self._eval_named_internal(section_name, key, section)
+                continue
+
+            if re_listen_v6.search(key):
+                self._eval_named_listen_v6(section_name, key, section)
+                continue
+
+    # -------------------------------------------------------------------------
+    def _eval_named_masters(self, section_name, key, section):
+
+        val = section[key]
+
+        if not val:
+            return
+
+        master_list = set()
+
+        if is_sequence(val):
+            for value in val:
+                masters = self._eval_named_master_list(value)
+                if masters:
+                    master_list |= masters
+        else:
+            masters = self._eval_named_master_list(val)
+            if masters:
+                master_list |= masters
+
+        self.masters = master_list
+
+    # -------------------------------------------------------------------------
+    def _eval_named_master_list(self, value):
+
+        masters = set()
+
+        for m in self.re_split_addresses.split(value):
+            if not m:
+                continue
+
+            m = m.strip().lower()
+            if self.verbose > 1:
+                LOG.debug(_("Checking given master address {!r} ...").format(m))
+            addr_list = self.get_addresses(m)
+            masters |= addr_list
+
+        return masters
+
+    # -------------------------------------------------------------------------
+    def get_addresses(self, host):
+
+        addr_list = set()
+
+        try:
+            addr_infos = socket.getaddrinfo(host, 53, proto=socket.IPPROTO_TCP)
+            for addr_info in addr_infos:
+                addr = addr_info[4][0]
+                addr_list.add(addr)
+        except socket.gaierror as e:
+            msg = _("Invalid hostname or address {a!r} found in masters: {e}")
+            msg = msg.format(a=host, e=e)
+            if self.raise_on_error:
+                raise DnsDeployZonesConfigError(msg)
+            else:
+                LOG.error(msg)
+                return set()
+        return addr_list
+
+
+    # -------------------------------------------------------------------------
+    def _eval_named_rndc(self, iname, section):
+
+        val = section[key].strip()
+        if not val:
+            return
+
+        path = Path(val)
+        if not path.is_absolute()
+            msg = _("The path to {what} must be an absolute path, found {path!r}.")
+            msg = msg.format(what='rndc', path=val)
+            if self.raise_on_error:
+                raise DnsDeployZonesConfigError(msg)
+            else:
+                LOG.error(msg)
+                return
+
+        if self.verbose > 2:
+            msg = _("Found path to {what}: {path!r}.").format(what='rndc', path=val)
+            LOG.debug(msg)
+
+        self.rndc = path
+
+    # -------------------------------------------------------------------------
+    def _eval_named_systemctl(self, iname, section):
+
+        val = section[key].strip()
+        if not val:
+            return
+
+        path = Path(val)
+        if not path.is_absolute()
+            msg = _("The path to {what} must be an absolute path, found {path!r}.")
+            msg = msg.format(what='systemctl', path=val)
+            if self.raise_on_error:
+                raise DnsDeployZonesConfigError(msg)
+            else:
+                LOG.error(msg)
+                return
+
+        if self.verbose > 2:
+            msg = _("Found path to {what}: {path!r}.").format(what='systemctl', path=val)
+            LOG.debug(msg)
+
+        self.systemctl = path
+
+    # -------------------------------------------------------------------------
+    def _eval_named_configdir(self, iname, section):
+
+        val = section[key].strip()
+        if not val:
+            return
+
+        what = _("the named config directory")
+        path = Path(val)
+
+        if not path.is_absolute()
+            msg = _("The path to {what} must be an absolute path, found {path!r}.")
+            msg = msg.format(what=what, path=val)
+            if self.raise_on_error:
+                raise DnsDeployZonesConfigError(msg)
+            else:
+                LOG.error(msg)
+                return
+
+        if self.verbose > 2:
+            msg = _("Found path to {what}: {path!r}.").format(what=what, path=val)
+            LOG.debug(msg)
+
+        self.named_config_dir = path
+
+    # -------------------------------------------------------------------------
+    def _eval_named_configfile(self, iname, section):
+
+        val = section[key].strip()
+        if not val:
+            return
+
+        what = _("the named config file for zones")
+        path = Path(val)
+
+        if path.is_absolute()
+            msg = _("The path to {what} must not be an absolute path, found {path!r}.")
+            msg = msg.format(what=what, path=val)
+            if self.raise_on_error:
+                raise DnsDeployZonesConfigError(msg)
+            else:
+                LOG.error(msg)
+                return
+
+        if self.verbose > 2:
+            msg = _("Found path to {what}: {path!r}.").format(what=what, path=val)
+            LOG.debug(msg)
+
+        self.named_zones_cfg_file = path
+
+    # -------------------------------------------------------------------------
+    def _eval_named_basedir(self, iname, section):
+
+        val = section[key].strip()
+        if not val:
+            return
+
+        what = _("the named base directory")
+        path = Path(val)
+        if not path.is_absolute()
+            msg = _("The path to {what} must be an absolute path, found {path!r}.")
+            msg = msg.format(what=what, path=val)
+            if self.raise_on_error:
+                raise DnsDeployZonesConfigError(msg)
+            else:
+                LOG.error(msg)
+                return
+
+        if self.verbose > 2:
+            msg = _("Found path to {what}: {path!r}.").format(what=what, path=val)
+            LOG.debug(msg)
+
+        self.named_basedir = path
+
+    # -------------------------------------------------------------------------
+    def _eval_named_slavedir(self, iname, section):
+
+        val = section[key].strip()
+        if not val:
+            return
+
+        what = _("the directory for slave zones of named")
+        path = Path(val)
+
+        if path.is_absolute()
+            msg = _("The path to {what} must not be an absolute path, found {path!r}.")
+            msg = msg.format(what=what, path=val)
+            if self.raise_on_error:
+                raise DnsDeployZonesConfigError(msg)
+            else:
+                LOG.error(msg)
+                return
+
+        if self.verbose > 2:
+            msg = _("Found path to {what}: {path!r}.").format(what=what, path=val)
+            LOG.debug(msg)
+
+        self.named_slavedir = path
+
+    # -------------------------------------------------------------------------
+    def _eval_named_checkconf(self, iname, section):
+
+        val = section[key].strip()
+        if not val:
+            return
+
+        what = "named-checkconf"
+        path = Path(val)
+        if not path.is_absolute()
+            msg = _("The path to {what} must be an absolute path, found {path!r}.")
+            msg = msg.format(what=what, path=val)
+            if self.raise_on_error:
+                raise DnsDeployZonesConfigError(msg)
+            else:
+                LOG.error(msg)
+                return
+
+        if self.verbose > 2:
+            msg = _("Found path to {what}: {path!r}.").format(what=what, path=val)
+            LOG.debug(msg)
+
+        self.named_checkconf = path
+
+    # -------------------------------------------------------------------------
+    def _eval_named_internal(self, iname, section):
+
+        val = section[key]
+        if val is None:
+            return
+
+        self.named_internal = val
+
+    # -------------------------------------------------------------------------
+    def _eval_named_listen_v6(self, iname, section):
+
+        val = section[key]
+        if val is None:
+            return
+
+        self.named_listen_on_v6 = val
+
+    # -------------------------------------------------------------------------
+    def eval(self):
+        """Evaluating read configuration and storing them in object properties."""
+
+        super(DnsDeployZonesConfig, self).eval()
+
+        addr_list = set()
+        if self.named_internal:
+            for host in self.default_zone_masters_local:
+                addr_list |= self.get_addresses(host)
+        else:
+            for host in self.default_zone_masters_public:
+                addr_list |= self.get_addresses(host)
+
+        self.masters |= addr_list
+
+        if not self.named_listen_on_v6:
+
+            addresses = set()
+            for addr in self.masters:
+                if ':' not in addr:
+                    addresses.add(addr)
+            self.masters = addresses
+
+        if self.masters:
+            if self.verbose > 2:
+                LOG.debug(_("Using configured masters:") + ' ' + pp(self.masters))
+        else:
+            LOG.warn(_("No valid masters found in configuration."))
+
+
+# =============================================================================
+if __name__ == "__main__":
+
+    pass
+
+# =============================================================================
+
+# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 list