From: Frank Brehm Date: Thu, 11 May 2017 13:36:32 +0000 (+0200) Subject: Started wit open() in lib/trace_maillog/any_uncompress_file.py X-Git-Url: https://git.uhu-banane.net/?a=commitdiff_plain;h=cd8c87aa2ea8bb232db1740210b7676d01059ebd;p=pixelpark%2Ftrace-maillog.git Started wit open() in lib/trace_maillog/any_uncompress_file.py --- diff --git a/lib/trace_maillog/any_uncompress_file.py b/lib/trace_maillog/any_uncompress_file.py index 5cdd1bd..8d9f616 100644 --- a/lib/trace_maillog/any_uncompress_file.py +++ b/lib/trace_maillog/any_uncompress_file.py @@ -4,7 +4,7 @@ @author: Frank Brehm @contact: frank.brehm@pixelpark.com @copyright: © 2017 by Frank Brehm, Berlin -@summary: The module for a AnyUmpressFile class +@summary: The module for a AnyUncompressFile class $Id: $ """ from __future__ import absolute_import @@ -31,7 +31,12 @@ except ImportError: # Third party modules import six -__version__ = '0.2.1' +# Own modules +from trace_maillog import magic + +__version__ = '0.3.0' + +LOG = logging.getLogger(__name__) # ============================================================================= @@ -39,6 +44,7 @@ class BaseAnyUncompressError(Exception): """Base class for all exceptions defined in this module.""" pass + # ============================================================================= class InvalidCompressionError(BaseAnyUncompressError, NotImplementedError): """Special exception class for not implemented compression types.""" @@ -61,7 +67,7 @@ class InvalidCompressionError(BaseAnyUncompressError, NotImplementedError): # ============================================================================= -class AnyUmpressFile(object): +class AnyUncompressFile(object): compression_types = { '7z': { @@ -106,7 +112,7 @@ class AnyUmpressFile(object): }, 'zip': { 'mime_types': ('application/zip', ), - 'supported': False, + 'supported': True, 'method': None, }, } @@ -125,6 +131,48 @@ class AnyUmpressFile(object): compression_types['xz']['supported'] = True +# ============================================================================= +def open( + filename, archive_file=None, text=False, buffering=None, universal_newline=False, + encoding=None, errors=None): + + if not os.path.exists(filename): + raise ValueError("File {!r} does not exists.".format(filename)) + if not os.path.isfile(filename): + raise ValueError("File {!r} is not a regular file.".format(filename)) + if not os.access(filename, os.R_OK): + raise ValueError("No read access to File {!r}.".format(filename)) + fstat = os.stat(filename) + fsize = fstat.st_size + + mime_type = None + compress_type = None + + if fsize: + LOG.debug("Retreiving MIME type of file {!r}...".format(filename)) + m = magic.Magic(mime=True) + mime_type = m.from_file(filename) + LOG.debug("Got MIME type {!r}.".format(mime_type)) + supported = False + for ctype in AnyUncompressFile.compression_types.keys(): + type_inf = AnyUncompressFile.compression_types[ctype] + if mime_type in type_inf['mime_types']: + compress_type = ctype + supported = type_inf.get('supported', False) + break + if compress_type: + LOG.debug("Got compress type {!r}.".format(compress_type)) + if not supported: + raise InvalidCompressionError(compress_type, filename) + else: + LOG.debug("Did not found appropriate compress type.") + else: + LOG.debug("Could not assign a compress type to {!r}, because file is empty.".format( + filename)) + + return None + + # ============================================================================= if __name__ == "__main__": diff --git a/test/test_any_uncompress.py b/test/test_any_uncompress.py index afa77f7..ed9a76a 100755 --- a/test/test_any_uncompress.py +++ b/test/test_any_uncompress.py @@ -18,6 +18,20 @@ except ImportError: import six +HAS_BZIP2 = False +try: + import bz2 + HAS_BZIP2 = True +except ImportError: + pass + +HAS_LZMA = False +try: + import lzma + HAS_LZMA = True +except ImportError: + pass + from pb_base.common import pp from pb_base.common import bytes2human @@ -41,6 +55,22 @@ class TestAnyUncompress(LogtraceTestcase): self.appname = APPNAME self.testdata_dir = os.path.join(os.path.dirname(__file__), 'testfiles-uncompress') + self.empty_file = os.path.join(self.testdata_dir, 'empty.txt') + self.usable_files = [] + for bname in ('lorem-ipsum.txt', 'lorem-ipsum.txt.gz', 'lorem-ipsum.txt.zip'): + self.usable_files.append(os.path.join(self.testdata_dir, bname)) + if HAS_BZIP2: + self.usable_files.append(os.path.join(self.testdata_dir, 'lorem-ipsum.txt.bz2')) + if HAS_LZMA: + self.usable_files.append(os.path.join(self.testdata_dir, 'lorem-ipsum.txt.lzma')) + self.usable_files.append(os.path.join(self.testdata_dir, 'lorem-ipsum.txt.xz')) + + self.unusable_files = [] + for bname in ( + 'lorem-ipsum.txt.7z', 'lorem-ipsum.txt.lz4', + 'lorem-ipsum.txt.rar', 'lorem-ipsum.txt.Z'): + self.unusable_files.append(os.path.join(self.testdata_dir, bname)) + # ------------------------------------------------------------------------- def tearDown(self): pass @@ -49,10 +79,36 @@ class TestAnyUncompress(LogtraceTestcase): def test_import(self): LOG.info("Testing import of trace_maillog.any_uncompress_file ...") - import trace_maillog.any_uncompress_file # noqa + import trace_maillog.any_uncompress_file # noqa + + LOG.info("Testing import of AnyUncompressFile from trace_maillog.any_uncompress_file ...") + from trace_maillog.any_uncompress_file import AnyUncompressFile # noqa + + # ------------------------------------------------------------------------- + def test_open(self): + + from trace_maillog import any_uncompress_file + from trace_maillog.any_uncompress_file import AnyUncompressFile + from trace_maillog.any_uncompress_file import InvalidCompressionError + + LOG.info("Testing opening of usable files...") + + for fname in self.usable_files: + LOG.debug("Testing {!r} ...".format(fname)) + fh = any_uncompress_file.open(fname) + + LOG.info("Test opening a non file path...") + with self.assertRaises(ValueError) as cm: + fh = any_uncompress_file.open(self.testdata_dir) + e = cm.exception + LOG.debug("%s raised: %s", e.__class__.__name__, e) - LOG.info("Testing import of AnyUmpressFile from trace_maillog.any_uncompress_file ...") - from trace_maillog.any_uncompress_file import AnyUmpressFile # noqa + LOG.info("Test opening unsupported compressed files ...") + for fname in self.unusable_files: + with self.assertRaises(InvalidCompressionError) as cm: + fh = any_uncompress_file.open(fname) + e = cm.exception + LOG.debug("%s raised: %s", e.__class__.__name__, e) # ============================================================================= @@ -71,6 +127,7 @@ if __name__ == '__main__': suite = unittest.TestSuite() suite.addTest(TestAnyUncompress('test_import', verbose)) + suite.addTest(TestAnyUncompress('test_open', verbose)) runner = unittest.TextTestRunner(verbosity=verbose) diff --git a/test/testfiles-uncompress/empty.txt b/test/testfiles-uncompress/empty.txt new file mode 100644 index 0000000..e69de29 diff --git a/test/testfiles-uncompress/emty.txt b/test/testfiles-uncompress/emty.txt deleted file mode 100644 index e69de29..0000000