r'^\s*from=<(?P<from>[^>]*)>,\s+size=(?P<size>\d+),\s+nrcpt=(?P<nrcpt>\d+)',
re.IGNORECASE)
+ warn_on_parse_error = True
+
# -------------------------------------------------------------------------
def __init__(
self, appname=None, verbose=0, version=__version__, base_dir=None,
terminal_has_colors=terminal_has_colors, initialized=False,
)
+ if not self.warn_on_parse_error:
+ SmtpAction.warn_on_parse_error = False
+ PostfixLogchainInfo.warn_on_parse_error = False
+
self.reset(no_warnings=True)
if initialized:
self, postfix_id, timestamp=None, host=None, pid=None, message=None):
"""Evaluate a postfix/smtp action log line."""
if postfix_id not in self.chain:
- LOG.warn(_('Postfix transaction {!r} does not exists.').format(postfix_id))
+ msg = _('Postfix transaction {!r} does not exists.').format(postfix_id)
+ if self.warn_on_parse_error:
+ LOG.warn(msg)
+ else:
+ LOG.debug(msg)
return
action = SmtpAction.from_log_entry(
# Third party modules
from fb_tools.common import pp
+from fb_tools.common import timeinterval2delta
from fb_tools.common import to_bool
from fb_tools.errors import InvalidMailAddressError
from fb_tools.mailaddress import MailAddress
from fb_tools.obj import FbGenericBaseObject
# Own modules
+from . import DEFAULT_TZ_OFFSET
from .errors import DateFormatError
from .errors import WrongDateIsoformatError
-from .tzlocal import get_localzone
from .xlate import XLATOR
+HAS_GET_LOCALZONE = False
+try:
+ from .tzlocal import get_localzone
+ HAS_GET_LOCALZONE = True
+except ImportError:
+ pass
+
_ = XLATOR.gettext
ngettext = XLATOR.ngettext
-__version__ = '0.7.2'
+__version__ = '0.7.3'
LOG = logging.getLogger(__name__)
RE_ISODATETIME = re.compile(PATTERN_ISODATE + r'[T\s]' + PATTERN_ISOTIME + PATTERN_ISOTIMEZONE)
+PATTERN_DEFAULT_TZ = r'^\s*(?P<sign>[+-])\s*(?P<offset>.*)'
+RE_DEFAULT_TZ = re.compile(PATTERN_DEFAULT_TZ)
+
+RE_TZ = re.compile(r'^\s*(?P<tz_hours>[01]\d)(?::?(?P<tz_mins>[0-5]\d))?')
+
+DEFAULT_TZ = UTC
+
+# =============================================================================
+def get_default_tz_from_offset(offset=None):
+ """Return the given offset in seconds as a datetime.timezone object."""
+ if offset is None:
+ offset = DEFAULT_TZ_OFFSET
+
+ default_tz = datetime.timedelta()
+
+ if not offset:
+ return default_tz
+
+ if isinstance(offset, (int, float)):
+ return datetime.timezone(datetime.timedelta(seconds=float(offset)))
+
+ offset = str(offset)
+ omatch = RE_DEFAULT_TZ.match(offset)
+ if not omatch:
+ msg = _('Could not interprete {!r} as a timezone offset.').format(offset)
+ raise ValueError(msg)
+
+ sign = omatch['sign']
+ offset_abs = omatch['offset']
+
+ tz_match = RE_TZ.match(offset_abs)
+ if tz_match:
+ res_offset = 0
+ if tz_match['tz_mins']:
+ res_offset = int(tz_match['tz_mins']) * 60
+ res_offset += int(tz_match['tz_hours']) * 3600
+ if sign == '-':
+ res_offset *= -1
+ return datetime.timezone(datetime.timedelta(seconds=res_offset))
+
+ delta = timeinterval2delta(offset_abs)
+ if sign == '-':
+ delta *= -1
+ return datetime.timezone(delta)
+
+
+# =============================================================================
+DEFAULT_TZ = get_default_tz_from_offset(DEFAULT_TZ_OFFSET)
+
# =============================================================================
def fromisoformat(datestr):
params['tzinfo'] = datetime.timezone(datetime.timedelta(seconds=offset))
else:
- params['tzinfo'] = get_localzone()
+ if HAS_GET_LOCALZONE:
+ params['tzinfo'] = get_localzone()
+ else:
+ params['tzinfo'] = DEFAULT_TZ
return datetime.datetime(**params)