From 7e6c4bfff06614b7969ce22b977014156a114085 Mon Sep 17 00:00:00 2001 From: Frank Brehm Date: Wed, 27 Mar 2024 18:04:42 +0100 Subject: [PATCH] Adding and testing class SmtpAction. --- lib/pp_admintools/postfix_chain.py | 629 +++++++++++++++++++++++++++-- test/test_20_postfix_chain.py | 113 +++++- 2 files changed, 704 insertions(+), 38 deletions(-) diff --git a/lib/pp_admintools/postfix_chain.py b/lib/pp_admintools/postfix_chain.py index 72c51ca..ffef626 100644 --- a/lib/pp_admintools/postfix_chain.py +++ b/lib/pp_admintools/postfix_chain.py @@ -33,7 +33,7 @@ from .xlate import XLATOR _ = XLATOR.gettext ngettext = XLATOR.ngettext -__version__ = '0.6.1' +__version__ = '0.7.0' LOG = logging.getLogger(__name__) @@ -153,7 +153,596 @@ class DataPair(object): return self.__class__(self.value, total=self.total) -# ============================================================================= +# ============================================================================== +class SmtpAction(FbGenericBaseObject): + """A class encapsulating the logged action of postfix/smtp.""" + + warn_on_parse_error = True + + attributes = ( + 'to_address', 'origin_to', 'smtp_pid', 'date', 'relay', 'delay_total', + 'time_before_queue', 'time_in_queue', 'time_conn_setup', 'time_xmission', + 'dsn', 'status', 'message', 'remote_id' + ) + + re_to_address = re.compile(r'\bto=<(?P[^>]*)>(?:,\s+)', re.IGNORECASE) + re_orig_to_address = re.compile(r'orig_to=<(?P[^>]*)>(?:,\s+)', re.IGNORECASE) + re_relay = re.compile(r'relay=(?P\S+)(?:,\s)', re.IGNORECASE) + re_delay = re.compile(r'delay=(?P\d+(?:\.\d*)?)(?:,\s+)', re.IGNORECASE) + + pat_delays = r'delays=' + pat_delays += r'(?P\d+(?:\.\d*)?)/(?P\d+(?:\.\d*)?)' + pat_delays += r'/(?P\d+(?:\.\d*)?)/(?P\d+(?:\.\d*)?)' + pat_delays += r'(?:,\s+)' + re_delays = re.compile(pat_delays, re.IGNORECASE) + + re_dsn = re.compile(r'dsn=(?P\S+)(?:,\s+)', re.IGNORECASE) + re_status = re.compile(r'status=(?P\S+)\s+', re.IGNORECASE) + + pat_remote_id = r'(?:queued\s+as\s+(?P[0-9a-f]+))' + pat_remote_id += r'|(?:InternalId=(?P\d+))' + pat_remote_id += r'|(?:\sid=(?P\S+))' + pat_remote_id += r'|(?:Message\s+(?P[0-9a-f]+)\s+accepted)' + pat_remote_id += r'|(?:\s(?P\S+)\s+(?:Message|mail)\s+accepted)' + pat_remote_id += r'|(?:\sOK\s+\d+\s+(?PS+)\s+-\s+gsmtp)' + pat_remote_id += r'|(?:\sok\s+\((?P\S+)\))' + pat_remote_id += r'|(?:\sok\s+(?P\S+))' + + re_remote_id = re.compile(pat_remote_id, re.IGNORECASE) + + # ------------------------------------------------------------------------- + def __init__(self, **kwargs): + """Initialize this object.""" + for attr in self.attributes: + priv_name = '_' + attr + setattr(self, priv_name, None) + + for attr in kwargs.keys(): + if attr not in self.attributes: + msg = _('Unknown parameter {p!r} on calling {c}.__init__().').format( + p=attr, c=self.__class__.__name__) + raise AttributeError(msg) + setattr(self, attr, kwargs[attr]) + + # ----------------------------------------------------------- + @property + def date(self): + """Return the timestamp of the SMTP action.""" + return self._date + + @date.setter + def date(self, value): + if value is None: + self._date = None + return + + if isinstance(value, datetime.datetime): + self._date = value + return + + val = str(value).strip() + if val == '': + self._date = None + return + + try: + date = fromisoformat(val) + except DateFormatError as e: + msg = _('Could not interprete date {!r}:').format(val) + ' ' + str(e) + if self.warn_on_parse_error: + LOG.warn(msg) + else: + LOG.debug(msg) + date = None + if date: + self._date = date + return + self._date = val + + # ---------------------------------------------------------- + @property + def delay_total(self): + """Return the total delay of the sended mail ion the current postfix.""" + return self._delay_total + + @delay_total.setter + def delay_total(self, value): + if value is None: + self._delay_total = None + return + + if isinstance(value, (float, int)): + self._delay_total = float(value) + return + val = str(value).strip() + if val == '': + self._delay_total = None + return + + try: + self._delay_total = float(val) + except ValueError as e: + msg = _('Could not interprete total delay {a!r}: {e}').format(a=value, e=e) + if self.warn_on_parse_error: + LOG.warn(msg) + else: + LOG.debug(msg) + self._delay_total = val + + # ----------------------------------------------------------- + @property + def dsn(self): + """Return the Delivery Status Notification (DSN) information.""" + return self._dsn + + @dsn.setter + def dsn(self, value): + if value is None: + self._dsn = None + return + + val = str(value).strip() + if val == '': + self._dsn = None + return + self._dsn = val + + # ----------------------------------------------------------- + @property + def message(self): + """Return the verbose message of the smtp transaction.""" + return self._message + + @message.setter + def message(self, value): + if value is None: + self._message = None + return + + val = str(value).strip() + if val == '': + self._message = None + return + self._message = val + + # ----------------------------------------------------------- + @property + def origin_to(self): + """Return the original RCPT TO address in the SMTP dialogue envelope.""" + return self._origin_to + + @origin_to.setter + def origin_to(self, value): + if value is None: + self._origin_to = None + return + + if isinstance(value, MailAddress): + self._origin_to = copy.copy(value) + return + val = str(value).strip() + if val == '': + self._origin_to = None + return + + try: + self._origin_to = MailAddress(val) + except InvalidMailAddressError as e: + msg = _('Could not interprete to address {a!r}: {e}').format(a=val, e=e) + if self.warn_on_parse_error: + LOG.warn(msg) + else: + LOG.debug(msg) + self._origin_to = val + + # ----------------------------------------------------------- + @property + def relay(self): + """Return the socket address of the receiving relay MTA.""" + return self._relay + + @relay.setter + def relay(self, value): + if value is None: + self._relay = None + return + + val = str(value).strip() + if val == '': + self._relay = None + return + self._relay = val + + # ----------------------------------------------------------- + @property + def remote_id(self): + """Return the the Mail ID of the remote (receiving) MTA.""" + return self._remote_id + + @remote_id.setter + def remote_id(self, value): + if value is None: + self._remote_id = None + return + + val = str(value).strip() + if val == '': + self._remote_id = None + return + self._remote_id = val + + # ---------------------------------------------------------- + @property + def smtp_pid(self): + """Return the process ID (PID) of the sending smtp process.""" + return self._smtp_pid + + @smtp_pid.setter + def smtp_pid(self, value): + if value is None: + self._smtp_pid = None + return + + if isinstance(value, (float, int)): + self._smtp_pid = int(value) + return + val = str(value).strip() + if val == '': + self._smtp_pid = None + return + + try: + self._smtp_pid = int(val) + except ValueError as e: + msg = _('Could not interprete PID of smtp {a!r}: {e}').format(a=value, e=e) + if self.warn_on_parse_error: + LOG.warn(msg) + else: + LOG.debug(msg) + self._smtp_pid = val + + # ----------------------------------------------------------- + @property + def status(self): + """Return the final status of the smtp transaction.""" + return self._status + + @status.setter + def status(self, value): + if value is None: + self._status = None + return + + val = str(value).strip() + if val == '': + self._status = None + return + self._status = val + + # ---------------------------------------------------------- + @property + def time_before_queue(self): + """Return the used time before the mail was queued.""" + return self._time_before_queue + + @time_before_queue.setter + def time_before_queue(self, value): + if value is None: + self._time_before_queue = None + return + + if isinstance(value, (float, int)): + self._time_before_queue = float(value) + return + val = str(value).strip() + if val == '': + self._time_before_queue = None + return + + try: + self._time_before_queue = float(val) + except ValueError as e: + msg = _('Could not interprete time before queueing {a!r}: {e}').format(a=value, e=e) + if self.warn_on_parse_error: + LOG.warn(msg) + else: + LOG.debug(msg) + self._time_before_queue = val + + # ---------------------------------------------------------- + @property + def time_conn_setup(self): + """ + Return the time the smtp process needed to establish a SMTP connection. + + This is including DNS, HELO and TLS. + """ + return self._time_conn_setup + + @time_conn_setup.setter + def time_conn_setup(self, value): + if value is None: + self._time_conn_setup = None + return + + if isinstance(value, (float, int)): + self._time_conn_setup = float(value) + return + val = str(value).strip() + if val == '': + self._time_conn_setup = None + return + + try: + self._time_conn_setup = float(val) + except ValueError as e: + msg = _('Could not interprete smtp connection setup time {a!r}: {e}').format( + a=value, e=e) + if self.warn_on_parse_error: + LOG.warn(msg) + else: + LOG.debug(msg) + self._time_conn_setup = val + + # ---------------------------------------------------------- + @property + def time_in_queue(self): + """Return the time the mail was held in the queue.""" + return self._time_in_queue + + @time_in_queue.setter + def time_in_queue(self, value): + if value is None: + self._time_in_queue = None + return + + if isinstance(value, (float, int)): + self._time_in_queue = float(value) + return + val = str(value).strip() + if val == '': + self._time_in_queue = None + return + + try: + self._time_in_queue = float(val) + except ValueError as e: + msg = _('Could not interprete time in queue {a!r}: {e}').format(a=value, e=e) + if self.warn_on_parse_error: + LOG.warn(msg) + else: + LOG.debug(msg) + self._time_in_queue = val + + # ---------------------------------------------------------- + @property + def time_xmission(self): + """Return the time the smtp process needed to transmit the mail.""" + return self._time_xmission + + @time_xmission.setter + def time_xmission(self, value): + if value is None: + self._time_xmission = None + return + + if isinstance(value, (float, int)): + self._time_xmission = float(value) + return + val = str(value).strip() + if val == '': + self._time_xmission = None + return + + try: + self._time_xmission = float(val) + except ValueError as e: + msg = _('Could not interprete smtp transmission time {a!r}: {e}').format(a=value, e=e) + if self.warn_on_parse_error: + LOG.warn(msg) + else: + LOG.debug(msg) + self._time_xmission = val + + # ----------------------------------------------------------- + @property + def to_address(self): + """Return the RCPT TO address in the SMTP dialogue envelope.""" + return self._to_address + + @to_address.setter + def to_address(self, value): + if value is None: + self._to_address = None + return + + if isinstance(value, MailAddress): + self._to_address = copy.copy(value) + return + val = str(value).strip() + if val == '': + self._to_address = None + return + + try: + self._to_address = MailAddress(val) + except InvalidMailAddressError as e: + msg = _('Could not interprete to address {a!r}: {e}').format(a=val, e=e) + if self.warn_on_parse_error: + LOG.warn(msg) + else: + LOG.debug(msg) + self._to_address = val + + # ------------------------------------------------------------------------- + def __str__(self): + """Typecast into a string object. + + @return: structure as string + @rtype: str + """ + return pp(self.as_dict(exportable=True)) + + # ------------------------------------------------------------------------- + def __repr__(self): + """Typecast into a string for reproduction.""" + out = '<%s(' % (self.__class__.__name__) + + fields = [] + + attr_dict = self.as_dict(exportable=True) + for attr in attr_dict.keys(): + value = attr_dict[attr] + if value is None: + continue + fields.append(f'{attr}={value!r}') + + if fields: + out += ', '.join(fields) + + out += ')>' + return out + + # ------------------------------------------------------------------------- + def as_dict(self, short=True, exportable=False): + """ + Transform the elements of the object into a dict. + + @param short: don't include local properties in resulting dict. + @type short: bool + + @return: structure as dict + @rtype: dict + """ + if exportable: + res = OrderedDict() + else: + res = super(SmtpAction, self).as_dict(short=short) + + for attrib in self.attributes: + if not hasattr(self, attrib): + continue + value = getattr(self, attrib, None) + if value is None: + res[attrib] = None + continue + if isinstance(value, ipaddress._BaseAddress): + res[attrib] = str(value) if exportable else value + continue + if isinstance(value, datetime.datetime): + res[attrib] = value.isoformat(' ') if exportable else value + continue + if isinstance(value, (DataPair, MailAddress)): + res[attrib] = str(value) if exportable else value + continue + + # Catch all + res[attrib] = value + + return res + + # ------------------------------------------------------------------------- + def __copy__(self): + """Copy the current chain data into a new object.""" + params = {} + + for attrib in self.attributes: + value = getattr(self, attrib, None) + if value is None: + continue + params[attrib] = copy.copy(value) + + return self.__class__(**params) + + # ------------------------------------------------------------------------- + @classmethod + def _get_remote_mailid(cls, message): + rmatch = cls.re_remote_id.search(message) + if not rmatch: + return None + + if rmatch['pf_id'] is not None: + return rmatch['pf_id'] + elif rmatch['jira_id'] is not None: + return rmatch['jira_id'] + elif rmatch['raw_id'] is not None: + return rmatch['raw_id'] + elif rmatch['msg1_id'] is not None: + return rmatch['msg1_id'] + elif rmatch['msg2_id'] is not None: + return rmatch['msg2_id'] + elif rmatch['gsmtp_id'] is not None: + return rmatch['gsmtp_id'] + elif rmatch['msg3_id'] is not None: + return rmatch['msg3_id'] + elif rmatch['msg4_id'] is not None: + return rmatch['msg4_id'] + + return None + + # ------------------------------------------------------------------------- + @classmethod + def from_log_entry(cls, timestamp, pid, message, verbose=0): + """Try to create a SmtpAction from the given Postfix log entry.""" + action = cls() + action.date = timestamp + action.smtp_pid = pid + cur_msg = message + + if verbose > 2: + LOG.debug(f'Parsing postfix/smtp line: {message}') + + rmatch = cls.re_to_address.search(cur_msg) + if rmatch: + action.to_address = rmatch['to'] + cur_msg = cls.re_to_address.sub('', cur_msg) + + rmatch = cls.re_orig_to_address.search(cur_msg) + if rmatch: + action.origin_to = rmatch['orig'] + cur_msg = cls.re_orig_to_address.sub('', cur_msg) + + rmatch = cls.re_relay.search(cur_msg) + if rmatch: + action.relay = rmatch['relay'] + cur_msg = cls.re_relay.sub('', cur_msg) + + rmatch = cls.re_delay.search(cur_msg) + if rmatch: + action.delay_total = rmatch['delay'] + cur_msg = cls.re_delay.sub('', cur_msg) + + rmatch = cls.re_delays.search(cur_msg) + if rmatch: + action.time_before_queue = rmatch['p1'] + action.time_in_queue = rmatch['p2'] + action.time_conn_setup = rmatch['p3'] + action.time_xmission = rmatch['p4'] + cur_msg = cls.re_delays.sub('', cur_msg) + + rmatch = cls.re_dsn.search(cur_msg) + if rmatch: + action.dsn = rmatch['dsn'] + cur_msg = cls.re_dsn.sub('', cur_msg) + + rmatch = cls.re_status.search(cur_msg) + if rmatch: + action.status = rmatch['status'] + cur_msg = cls.re_status.sub('', cur_msg) + + cur_msg = cur_msg.strip() + if cur_msg: + if cur_msg.startswith('(') and cur_msg.endswith(')'): + cur_msg = cur_msg[1:-1] + if cur_msg: + action.message = cur_msg + remote_id = cls._get_remote_mailid(cur_msg) + if remote_id: + action.remote_id = remote_id + + return action + +# ============================================================================== class PostfixLogchainInfo(FbGenericBaseObject): """A class for encapsulating the information from a chain of Postfix log entries.""" @@ -162,7 +751,7 @@ class PostfixLogchainInfo(FbGenericBaseObject): attributes = ( 'client_host', 'client_addr', 'start', 'end', 'message_id', 'postfix_id', 'ehlo', 'starttls', 'sent_quit', 'auth', 'commands', 'rcpt', 'data', 'mail', 'from_address', - 'origin_to', 'smtpd_pid', 'mailhost', 'tls_version', 'tls_cipher', 'size', 'nr_rcpt', + 'smtpd_pid', 'mailhost', 'tls_version', 'tls_cipher', 'size', 'nr_rcpt', ) # ------------------------------------------------------------------------- @@ -554,10 +1143,10 @@ class PostfixLogchainInfo(FbGenericBaseObject): else: self._sent_quit = DataPair.from_str(val) - # ----------------------------------------------------------- + # ---------------------------------------------------------- @property def smtpd_pid(self): - """Return statistics about the quit command in SMTP dialogue.""" + """Return the process ID (PID) of the receiving smtpd process.""" return self._smtpd_pid @smtpd_pid.setter @@ -644,36 +1233,6 @@ class PostfixLogchainInfo(FbGenericBaseObject): else: self._starttls = DataPair.from_str(val) - # ----------------------------------------------------------- - @property - def origin_to(self): - """Return the original RCPT TO address in the SMTP dialogue envelope.""" - return self._origin_to - - @origin_to.setter - def origin_to(self, value): - if value is None: - self._origin_to = None - return - - if isinstance(value, MailAddress): - self._origin_to = copy.copy(value) - return - val = str(value).strip() - if val == '': - self._origin_to = None - return - - try: - self._origin_to = MailAddress(val) - except InvalidMailAddressError as e: - msg = _('Could not interprete to address {a!r}: {e}').format(a=val, e=e) - if self.warn_on_parse_error: - LOG.warn(msg) - else: - LOG.debug(msg) - self._origin_to = val - # ------------------------------------------------------- @property def tls_version(self): diff --git a/test/test_20_postfix_chain.py b/test/test_20_postfix_chain.py index d78a069..d75e7a8 100755 --- a/test/test_20_postfix_chain.py +++ b/test/test_20_postfix_chain.py @@ -11,8 +11,10 @@ import logging import os +import re import sys import textwrap +from pathlib import Path try: import unittest2 as unittest @@ -43,6 +45,9 @@ class TestPostfixChain(PpAdminToolsTestcase): if self.verbose >= 1: print() + self.test_dir = Path(__file__).parent.resolve() + self.logfile = self.test_dir / 'postfix.mail01.log' + self._appname = APPNAME # ------------------------------------------------------------------------- @@ -69,15 +74,116 @@ class TestPostfixChain(PpAdminToolsTestcase): """Test init of an empty PostfixLogchainInfo object.""" LOG.info(self.get_method_doc()) + from pp_admintools.postfix_chain import SmtpAction from pp_admintools.postfix_chain import PostfixLogchainInfo + action = SmtpAction() + LOG.debug('SmtpAction %r: {!r}'.format(action)) + LOG.debug('SmtpAction %s:\n{}'.format(action)) + LOG.debug('SmtpAction as_dict():\n{}'.format(pp(action.as_dict()))) + chain = PostfixLogchainInfo() LOG.debug('PostfixLogchainInfo %r: {!r}'.format(chain)) LOG.debug('PostfixLogchainInfo %s:\n{}'.format(chain)) LOG.debug('PostfixLogchainInfo as_dict():\n{}'.format(pp(chain.as_dict()))) # ------------------------------------------------------------------------- - def test_filled_object(self): + def test_filled_smtp_action(self): + """Test init of a filled SmtpAction object.""" + LOG.info(self.get_method_doc()) + + from pp_admintools.postfix_chain import SmtpAction + + action = SmtpAction( + to_address='simon.heger99+noperm@gmail.com', + origin_to='simon.heger99+noperm@gmail.com', + smtp_pid='12345', + date='2024-03-15T17:12:01.322245+01:00', + relay='gmail-smtp-in.l.google.com[173.194.76.26]:25', + delay_total='1.9', + time_before_queue='0.95', + time_in_queue='0.22', + time_conn_setup='0.22', + time_xmission='0.53', + dsn='2.0.0', + status='sent', + message='250 2.0.0 OK 1710519121 k16-20020adff50033e2227abd5si1856173wrp.280 - gsmtp', + remote_id='k16-20020adff5d0000000b0033e2227abd5si1856173wrp.280', + ) + + LOG.debug('SmtpAction %r: {!r}'.format(action)) + LOG.debug('SmtpAction %s:\n{}'.format(action)) + LOG.debug('SmtpAction as_dict():\n{}'.format(pp(action.as_dict()))) + + # ------------------------------------------------------------------------- + def test_parsing_smtp(self): + """Test parsing a postfix logfile for postfix/smtp entries.""" + LOG.info(self.get_method_doc()) + + from pp_admintools.postfix_chain import SmtpAction + + pattern_logentry = r'^(?P\S+)\s+(?P\S+)\s+' + pattern_logentry += r'(?P[^\[\s]+)\[(?P\d+)\]:\s+' + pattern_logentry += r'(?P.*)\s*$' + + re_logentry = re.compile(pattern_logentry) + re_postfix_id = re.compile(r'^(?P[0-9a-f]+):\s+', re.IGNORECASE) + + re_tls_line = re.compile(r'Trusted\s+TLS\s+connection\s+established', re.IGNORECASE) + + maillog = self.logfile + if 'MAILLOG' in os.environ: + env_maillog = Path(os.environ['MAILLOG']) + if env_maillog.exists() and env_maillog.is_file() and os.access(env_maillog, os.R_OK): + maillog = env_maillog + + limit = None + if 'SMTP_LIMIT' in os.environ: + limit = int(os.environ['SMTP_LIMIT']) + + LOG.debug('Parsing maillog {!r} for postfix/smtp entries ...'.format(str(maillog))) + i = 0 + with maillog.open('rt') as fh: + for line in fh.readlines(): + rmatch = re_logentry.match(line) + if not rmatch: + continue + + timestamp = rmatch['timestamp'] + # host = rmatch['host'] + command = rmatch['command'] + pid = int(rmatch['pid']) + message = rmatch['msg'] + + if command != 'postfix/smtp': + continue + + if re_tls_line.search(message): + continue + + m_id = re_postfix_id.match(message) + if m_id: + # postfix_id = m_id['id'].upper() + message = re_postfix_id.sub('', message) + + i += 1 + action = SmtpAction.from_log_entry( + timestamp=timestamp, pid=pid, message=message, + verbose=self.verbose) + + if self.verbose > 3: + LOG.debug('SmtpAction %r: {!r}'.format(action)) + LOG.debug('SmtpAction %s:\n{}'.format(action)) + if self.verbose > 2: + LOG.debug('SmtpAction as_dict():\n{}'.format(pp(action.as_dict()))) + + if limit and i >= limit: + break + + LOG.debug(f'Parsed {i} postfix/smtp log entries.') + + # ------------------------------------------------------------------------- + def test_filled_chain(self): """Test init of a filled PostfixLogchainInfo object.""" LOG.info(self.get_method_doc()) @@ -104,7 +210,6 @@ class TestPostfixChain(PpAdminToolsTestcase): data='3', mail=1, from_address='frank.brehm@pixelpark.com', - origin_to='solution@pixelpark.com', smtpd_pid='23456', mailhost='prd-mail01.pixelpark.com', tls_version='TLSv1.2', @@ -133,7 +238,9 @@ if __name__ == '__main__': suite.addTest(TestPostfixChain('test_import', verbose)) suite.addTest(TestPostfixChain('test_empty_object', verbose)) - suite.addTest(TestPostfixChain('test_filled_object', verbose)) + suite.addTest(TestPostfixChain('test_filled_smtp_action', verbose)) + suite.addTest(TestPostfixChain('test_parsing_smtp', verbose)) + suite.addTest(TestPostfixChain('test_filled_chain', verbose)) runner = unittest.TextTestRunner(verbosity=verbose) -- 2.39.5