# Standard modules
# import copy
+import bz2
+import gzip
import logging
+import lzma
+import os
import re
+from pathlib import Path
# Third party modules
+from fb_tools.common import is_sequence
from fb_tools.common import pp
from fb_tools.handling_obj import HandlingObject
_ = XLATOR.gettext
ngettext = XLATOR.ngettext
-__version__ = '0.5.5'
+__version__ = '0.6.0'
# =============================================================================
'postfix/pipe', 'postfix/qmgr', 'postfix/smtp'
)
+ open_args = {
+ 'encoding': 'utf-8',
+ 'errors': 'surrogateescape',
+ }
+
# -------------------------------------------------------------------------
def __init__(
self, appname=None, verbose=0, version=__version__, base_dir=None,
self.active_smtpd_pid = {}
self.connect = {}
+ # -------------------------------------------------------------------------
+ def evaluate_logfiles(self, logfiles):
+ """Evaluate and parse all given logfiles."""
+ if not is_sequence(logfiles):
+ msg = _('Given parameter {p!r} for {f}() is not a sequential objects: {v!r}.').format(
+ p='logfiles', f='evaluate_logfiles', v=logfiles)
+ raise TypeError(msg)
+
+ self.reset()
+
+ for logfile in logfiles:
+ self.evaluate_logfile(logfile)
+
+ # -------------------------------------------------------------------------
+ def evaluate_logfile(self, logfile):
+ """Evaluate the log entries from the given logfile."""
+ if not isinstance(logfile, Path):
+ msg = _('Parameter {p!r} for {f}() is not a {what} object: {v!r}.').format(
+ p='logfile', f='evaluate_logfile', what='Path', v=logfile)
+ raise TypeError(msg)
+
+ if not logfile.exists():
+ LOG.error(_('Logfile {!r} does not exists.').format(str(logfile)))
+ return
+
+ if not logfile.is_file():
+ LOG.error(_('Logfile {!r} is not a regular file.').format(str(logfile)))
+ return
+
+ if not os.access(logfile, os.R_OK):
+ LOG.error(_('No read access to logfile {!r}.').format(str(logfile)))
+ return
+
+ LOG.info(_('Evaluating logfile {!r} ...').format(str(logfile)))
+
+ try:
+ if logfile.name.endswith('.gz'):
+ LOG.debug(_('Reading file {f!r} as a {what} compressed file.').format(
+ f=str(logfile), what='gzip'))
+ fh = gzip.open(str(logfile), 'rt', **self.open_args)
+
+ elif logfile.name.endswith('.xz'):
+ LOG.debug(_('Reading file {f!r} as a {what} compressed file.').format(
+ f=str(logfile), what='xz/lzma'))
+ fh = lzma.open(str(logfile), 'rt', **self.open_args)
+
+ elif logfile.name.endswith('.bz2') or logfile.name.endswith('.bzip2'):
+ LOG.debug(_('Reading file {f!r} as a {what} compressed file.').format(
+ f=str(logfile), what='bzip2'))
+ fh = bz2.open(str(logfile), 'rt', **self.open_args)
+
+ else:
+ LOG.debug(_('Reading file {!r} as an uncompressed file.').format(str(logfile)))
+ fh = logfile.open('rt', **self.open_args)
+
+# self,parse(fh)
+
+ finally:
+ if fh:
+ LOG.debug(_('Closing file {!r}.').format(str(logfile)))
+ fh.close()
+
# -------------------------------------------------------------------------
def parse(self, fh):
"""Parse the content of a given logfile and store the results in self.chain."""
LOG.debug(_('Start parsing postfix logfile ...'))
- try:
- for line in fh.readlines():
- self.evaluate_logline(line)
- finally:
- self.active_smtpd_pid = {}
+ for line in fh.readlines():
+ self.evaluate_logline(line)
# -------------------------------------------------------------------------
def evaluate_logline(self, line):