from .app import PpApplication
-__version__ = '0.6.6'
+__version__ = '0.7.1'
LOG = logging.getLogger(__name__)
VALID_MAIL_METHODS = ('smtp', 'sendmail')
def _perform_config(self):
"""Execute some actions after reading the configuration."""
- if 'general' in self.cfg and 'verbose' in self.cfg['general']:
- v = self.cfg['general']['verbose']
+ for section_name in self.cfg.keys():
+
+ section = self.cfg[section_name]
+
+ if section_name.lower() == 'general':
+ self._perform_config_general(section, section_name)
+ continue
+
+ if section_name.lower() == 'mail':
+ self._perform_config_mail(section, section_name)
+ continue
+
+ self.perform_config()
+
+ self._perform_mail_cmdline_options()
+
+ if self.config_has_errors:
+ LOG.error("There are errors in configuration.")
+ self.exit(1)
+ else:
+ LOG.debug("There are no errors in configuration.")
+ self.config_has_errors = False
+
+ # -------------------------------------------------------------------------
+ def _perform_config_general(self, section, section_name):
+
+ if self.verbose > 2:
+ LOG.debug("Evaluating config section {n!r}:\n{s}".format(
+ n=section_name, s=pp(section)))
+
+ if 'verbose' in section:
+ v = section['verbose']
if to_bool(v):
try:
v = int(v)
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
- for section_name in self.cfg.keys():
+ # -------------------------------------------------------------------------
+ def _perform_config_mail(self, section, section_name):
- if self.verbose > 3:
- LOG.debug("Checking config section {!r} ...".format(section_name))
+ if self.verbose > 2:
+ LOG.debug("Evaluating config section {n!r}:\n{s}".format(
+ n=section_name, s=pp(section)))
- if section_name.lower() != "mail":
- continue
- section = self.cfg[section_name]
+ self._perform_config_mail_rcpt(section, section_name)
+ self._perform_config_mail_cc(section, section_name)
+ self._perform_config_mail_reply_to(section, section_name)
+ self._perform_config_mail_method(section, section_name)
+ self._perform_config_mail_server(section, section_name)
+ self._perform_config_smtp_port(section, section_name)
- if 'mail_recipients' in section:
- v = section['mail_recipients'].strip()
- self.mail_recipients = []
- if v:
- tokens = self.whitespace_re.split(v)
- for token in tokens:
- if MailAddress.valid_address(token):
- if token not in self.mail_recipients:
- self.mail_recipients.append(token)
- else:
- msg = (
- "Found invalid recipient mail address {!r} "
- "in configuration.").format(
- token)
- LOG.error(msg)
-
- if 'mail_cc' in section:
- v = section['mail_cc'].strip()
- self.mail_cc = []
- if v:
- tokens = self.whitespace_re.split(v)
- if self.verbose > 1:
- LOG.debug("CC addresses:\n{}".format(pp(tokens)))
- for token in tokens:
- if MailAddress.valid_address(token):
- if token not in self.mail_cc:
- self.mail_cc.append(token)
- else:
- msg = "Found invalid cc mail address {!r} in configuration.".format(
- token)
- LOG.error(msg)
-
- if 'reply_to' in section:
- v = section['reply_to'].strip()
- self.reply_to = None
- if v:
- tokens = self.whitespace_re.split(v)
- if len(tokens):
- if MailAddress.valid_address(tokens[0]):
- self.reply_to = tokens[0]
- else:
- msg = "Found invalid reply mail address {!r} in configuration.".format(
- tokens[0])
- LOG.error(msg)
-
- if 'mail_method' in section:
- v = section['mail_method'].strip().lower()
- if v:
- if v in VALID_MAIL_METHODS:
- self.mail_method = v
- else:
- msg = "Found invalid mail method {!r} in configuration.".format(
- section['mail_method'])
- LOG.error(msg)
+ # -------------------------------------------------------------------------
+ def _perform_config_mail_rcpt(self, section, section_name):
- if 'mail_server' in section:
- v = section['reply_to'].strip()
- if v:
- self.mail_server = v
+ if 'mail_recipients' not in section:
+ return
- if 'smtp_port' in section:
- v = section['smtp_port']
- port = self.smtp_port
- try:
- port = int(v)
- except (ValueError, TypeError):
- msg = "Found invalid SMTP port number {!r} in configuration.".format(v)
+ v = section['mail_recipients'].strip()
+ self.mail_recipients = []
+ if v:
+ tokens = self.whitespace_re.split(v)
+ for token in tokens:
+ if MailAddress.valid_address(token):
+ if token not in self.mail_recipients:
+ self.mail_recipients.append(token)
+ else:
+ msg = (
+ "Found invalid recipient mail address {!r} "
+ "in configuration.").format(
+ token)
LOG.error(msg)
+
+ # -------------------------------------------------------------------------
+ def _perform_config_mail_cc(self, section, section_name):
+
+ if 'mail_cc' not in section:
+ return
+
+ v = section['mail_cc'].strip()
+ self.mail_cc = []
+ if v:
+ tokens = self.whitespace_re.split(v)
+ if self.verbose > 1:
+ LOG.debug("CC addresses:\n{}".format(pp(tokens)))
+ for token in tokens:
+ if MailAddress.valid_address(token):
+ if token not in self.mail_cc:
+ self.mail_cc.append(token)
else:
- if port <= 0:
- msg = "Found invalid SMTP port number {!r} in configuration.".format(port)
- LOG.error(msg)
- else:
- self.smtp_port = port
+ msg = "Found invalid cc mail address {!r} in configuration.".format(
+ token)
+ LOG.error(msg)
- self.perform_config()
+ # -------------------------------------------------------------------------
+ def _perform_config_mail_reply_to(self, section, section_name):
- self._perform_mail_cmdline_options()
+ if 'reply_to' not in section:
+ return
- if self.config_has_errors:
- LOG.error("There are errors in configuration.")
- self.exit(1)
+ v = section['reply_to'].strip()
+ self.reply_to = None
+ if v:
+ tokens = self.whitespace_re.split(v)
+ if len(tokens):
+ if MailAddress.valid_address(tokens[0]):
+ self.reply_to = tokens[0]
+ else:
+ msg = "Found invalid reply mail address {!r} in configuration.".format(
+ tokens[0])
+ LOG.error(msg)
+
+ # -------------------------------------------------------------------------
+ def _perform_config_mail_method(self, section, section_name):
+
+ if 'mail_method' not in section:
+ return
+
+ v = section['mail_method'].strip().lower()
+ if v:
+ if v in VALID_MAIL_METHODS:
+ self.mail_method = v
+ else:
+ msg = "Found invalid mail method {!r} in configuration.".format(
+ section['mail_method'])
+ LOG.error(msg)
+
+ # -------------------------------------------------------------------------
+ def _perform_config_mail_server(self, section, section_name):
+
+ if 'mail_server' not in section:
+ return
+
+ v = section['reply_to'].strip()
+ if v:
+ self.mail_server = v
+
+ # -------------------------------------------------------------------------
+ def _perform_config_smtp_port(self, section, section_name):
+
+ if 'smtp_port' not in section:
+ return
+
+ v = section['smtp_port']
+ port = self.smtp_port
+ try:
+ port = int(v)
+ except (ValueError, TypeError):
+ msg = "Found invalid SMTP port number {!r} in configuration.".format(v)
+ LOG.error(msg)
else:
- LOG.debug("There are no errors in configuration.")
- self.config_has_errors = False
+ if port <= 0:
+ msg = "Found invalid SMTP port number {!r} in configuration.".format(port)
+ LOG.error(msg)
+ else:
+ self.smtp_port = port
# -------------------------------------------------------------------------
def _perform_mail_cmdline_options(self):
+ self._perform_cmdline_mail_rcpt()
+ self._perform_cmdline_mail_cc()
+ self._perform_cmdline_reply_to()
+
+ v = getattr(self.args, 'mail_method', None)
+ if v:
+ self.mail_method = v
+
+ v = getattr(self.args, 'mail_server', None)
+ if v:
+ self.mail_server = v
+
+ v = getattr(self.args, 'smtp_port', None)
+ if v is not None:
+ if v <= 0:
+ msg = "Got invalid SMTP port number {!r}.".format(v)
+ LOG.error(msg)
+ else:
+ self.smtp_port = v
+
+ # -------------------------------------------------------------------------
+ def _perform_cmdline_mail_rcpt(self):
+
v = getattr(self.args, 'mail_recipients', None)
if v is not None:
self.mail_recipients = []
msg = "Did not found any valid recipient mail addresses."
LOG.error(msg)
+ # -------------------------------------------------------------------------
+ def _perform_cmdline_mail_cc(self):
+
v = getattr(self.args, 'mail_cc', None)
- if v is not None:
- self.mail_cc = []
- for addr in v:
- tokens = self.whitespace_re.split(addr)
- for token in tokens:
- if MailAddress.valid_address(token):
- if token not in self.mail_cc:
- self.mail_cc.append(token)
- else:
- msg = "Got invalid CC mail address {!r}.".format(token)
- LOG.error(msg)
+ if v is None:
+ return
- v = getattr(self.args, 'mail_reply_to', None)
- if v:
- tokens = self.whitespace_re.split(v)
- if len(tokens):
- if MailAddress.valid_address(tokens[0]):
- self.reply_to = tokens[0]
+ self.mail_cc = []
+ for addr in v:
+ tokens = self.whitespace_re.split(addr)
+ for token in tokens:
+ if MailAddress.valid_address(token):
+ if token not in self.mail_cc:
+ self.mail_cc.append(token)
else:
- msg = "Got invalid reply mail address {!r}.".format(
- tokens[0])
+ msg = "Got invalid CC mail address {!r}.".format(token)
LOG.error(msg)
- v = getattr(self.args, 'mail_method', None)
- if v:
- self.mail_method = v
+ # -------------------------------------------------------------------------
+ def _perform_cmdline_reply_to(self):
- v = getattr(self.args, 'mail_server', None)
- if v:
- self.mail_server = v
+ v = getattr(self.args, 'mail_reply_to', None)
+ if not v:
+ return
- v = getattr(self.args, 'smtp_port', None)
- if v is not None:
- if v <= 0:
- msg = "Got invalid SMTP port number {!r}.".format(v)
- LOG.error(msg)
+ tokens = self.whitespace_re.split(v)
+ if len(tokens):
+ if MailAddress.valid_address(tokens[0]):
+ self.reply_to = tokens[0]
else:
- self.smtp_port = v
+ msg = "Got invalid reply mail address {!r}.".format(
+ tokens[0])
+ LOG.error(msg)
# -------------------------------------------------------------------------
def perform_config(self):
return False
net_address = '.'.join(tupels) + '/{}'.format(bitmask)
if self.verbose > 2:
- LOG.debug("Got IPv4 network address of zone {!r}: {!r}.".format(zone_name, net_address))
+ LOG.debug(
+ "Got IPv4 network address of zone {!r}: {!r}.".format(
+ zone_name, net_address))
network = ipaddress.ip_network(net_address)
if network.is_global:
if self.verbose > 1:
- LOG.debug("The network {!r} of zone {!r} is allocated for public networks.".format(
- net_address, zone_name))
+ LOG.debug(
+ "The network {!r} of zone {!r} is allocated for public networks.".format(
+ net_address, zone_name))
return False
LOG.debug("The network {!r} of zone {!r} is allocated for local networks.".format(
net_address, zone_name))
return True
if self.verbose > 2:
- LOG.debug("Zone {!r} seems to be a reverse zone for a public network.".format(zone_name))
+ LOG.debug(
+ "Zone {!r} seems to be a reverse zone for a public network.".format(zone_name))
return False