# Own modules
from trace_maillog import magic
-__version__ = '0.3.2'
+__version__ = '0.3.3'
LOG = logging.getLogger(__name__)
self._fh = None
if self.container:
+ LOG.debug("Closing container file.")
self.container.close()
self.container = None
LOG.debug("Got compress type {!r}.".format(compress_type))
if not supported:
raise InvalidCompressionError(compress_type, filename)
- if mime_type in AnyUncompressFile.compression_types['gzip']['mime_types']:
- return _open_gzip(
- filename, text=text, universal_newline=universal_newline,
- encoding=encoding, errors=errors)
- elif mime_type in AnyUncompressFile.compression_types['bz2']['mime_types']:
- return _open_bzip2(
- filename, text=text, buffering=buffering, universal_newline=universal_newline,
- encoding=encoding, errors=errors)
- elif mime_type in AnyUncompressFile.compression_types['zip']['mime_types']:
- return _open_zip(
- filename, archive_file=archive_file, text=text,
- universal_newline=universal_newline)
+ method = type_inf.get('method', None)
+ if method:
+ return method(
+ filename, archive_file=archive_file, text=text, buffering=buffering,
+ universal_newline=universal_newline, encoding=encoding, errors=errors)
+ else:
+ raise RuntimeError(
+ "Did not found an opening method for compression type {!r}.".format(compress_type))
else:
LOG.debug("Did not found appropriate compress type.")
return _open_raw(
# =============================================================================
def _open_gzip(
filename, text=False, universal_newline=False,
- encoding=None, errors=None):
+ encoding=None, errors=None, **kwargs):
open_args = {}
mode = 'rb'
fh = gzip.GzipFile(filename, mode, **open_args)
return AnyUncompressFile(fh, 'gzip')
+
+#------
+AnyUncompressFile.compression_types['gzip']['method'] = _open_gzip
+
+
# =============================================================================
def _open_bzip2(
filename, text=False, buffering=None, universal_newline=False,
- encoding=None, errors=None):
+ encoding=None, errors=None, **kwargs):
+
+ if not HAS_BZIP2:
+ raise InvalidCompressionError(filename, 'bzip2')
open_args = {}
mode = 'r'
fh = bz2.open(filename, mode, **open_args)
return AnyUncompressFile(fh, 'bzip2')
+
+#------
+AnyUncompressFile.compression_types['bz2']['method'] = _open_bzip2
+
+
# =============================================================================
-def _open_zip(filename, archive_file=None, text=False, universal_newline=False):
+def _open_zip(filename, archive_file=None, text=False, universal_newline=False, **kwargs):
if archive_file is None:
raise ValueError("A filename for the file inside the zipfile container is needed.")
return ret
+
+#------
+AnyUncompressFile.compression_types['zip']['method'] = _open_zip
+
+
+# =============================================================================
+def _open_lzma(
+ filename, text=False, universal_newline=False,
+ encoding=None, errors=None, **kwargs):
+
+ if not HAS_LZMA:
+ raise InvalidCompressionError(filename, 'lzma')
+
+ open_args = {}
+ mode = 'rb'
+ if text:
+ mode = 'rt'
+ if universal_newline:
+ open_args['newline'] = os.linesep
+ if encoding is None:
+ open_args['encoding'] = 'utf-8'
+ else:
+ open_args['encoding'] = encoding
+ if errors is None:
+ open_args['errors'] = 'surrogatescape'
+ else:
+ open_args['errors'] = errors
+
+ LOG.debug("Opening lzma/xz file {!r} with mod {!r}, other open arguments: {}".format(
+ filename, mode, pp(open_args)))
+ fh = lzma.open(filename, mode, **open_args)
+ return AnyUncompressFile(fh, 'lzma')
+
+
+#------
+AnyUncompressFile.compression_types['lzma']['method'] = _open_lzma
+AnyUncompressFile.compression_types['xz']['method'] = _open_lzma
+
+
# =============================================================================
def _open_raw(
filename, text=False, buffering=None, universal_newline=False,
- encoding=None, errors=None):
+ encoding=None, errors=None, **kwargs):
open_args = {}
if buffering is not None: