--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+@author: Frank Brehm
+@contact: frank.brehm@pixelpark.com
+@copyright: © 2010 - 2017 by Frank Brehm, Publicies Pixelpark GmbH, Berlin
+"""
+from __future__ import absolute_import
+
+# Standard modules
+import sys
+import os
+import logging
+import locale
+import re
+
+# Third party modules
+import six
+
+# Own modules
+from .common import pp, to_bytes, to_str
+
+from .errors import PpError
+
+from .obj import PpBaseObjectError, PpBaseObject
+
+
+__version__ = '0.3.1'
+
+LOG = logging.getLogger(__name__)
+
+DU_UNITS = ['K', 'k', 'M', 'm', 'G', 'g', 'T', 't', 'H', 'h']
+DU_UNIT_EXP = {
+ 'K': 0,
+ 'M': 1,
+ 'G': 2,
+ 'T': 3,
+}
+
+
+# =============================================================================
+class DuError(PpBaseObjectError):
+ pass
+
+
+# =============================================================================
+class DuParseError(DuError):
+
+ # -------------------------------------------------------------------------
+ def __init__(self, line):
+ self.line = line
+
+ # -------------------------------------------------------------------------
+ def __str__(self):
+
+ msg = "Could not parse line from DU output: {!r}".format(self.line)
+ return msg
+
+
+# =============================================================================
+class DuListError(DuError):
+ pass
+
+
+# =============================================================================
+class DuEntry(PpBaseObject):
+ """
+ Class encapsulating one DU entry.
+ """
+
+ kilo = 1024
+ if six.PY2:
+ kilo = long(1024)
+
+ factor = {}
+ for unit in DU_UNIT_EXP.keys():
+ exp = DU_UNIT_EXP[unit]
+ factor[unit] = kilo ** exp
+
+ locale_conv = locale.localeconv()
+ dp = '.'
+ ts = ','
+ if 'decimal_point' in locale_conv and locale_conv['decimal_point'] != '.':
+ dp = locale_conv['decimal_point']
+ if 'thousands_sep' in locale_conv:
+ ts = locale_conv['thousands_sep']
+
+ parse_pattern = r'^\s*(\d+(?:' + re.escape(dp) + r'\d*)?)([KMGT])?\s+(\S+.*)'
+ parse_re = re.compile(parse_pattern, re.IGNORECASE)
+
+ # -------------------------------------------------------------------------
+ def __init__(
+ self, size_kb, path, appname=None, verbose=0, base_dir=None):
+
+ self._size_kb = None
+ self._path = None
+
+ super(DuEntry, self).__init__(
+ appname=appname, verbose=verbose, version=__version__,
+ base_dir=base_dir, initialized=False)
+
+ self.size_kb = size_kb
+ self.path = path
+
+ self.initialized = True
+
+ # -----------------------------------------------------------
+ @property
+ def size_kb(self):
+ """The size of the entry in KiBytes."""
+ if not hasattr(self, '_size_kb'):
+ if six.PY2:
+ return long(0)
+ return 0
+ return getattr(self, '_size_kb', 0)
+
+ @size_kb.setter
+ def size_kb(self, value):
+ if six.PY2:
+ v = long(value)
+ else:
+ v = int(value)
+ if v >= 0:
+ self._size_kb = v
+ else:
+ LOG.warn("Wrong size for DU entry{!r}, must be >= 0".format(value))
+
+ # -----------------------------------------------------------
+ @property
+ def path(self):
+ """The path name of the DU entry."""
+ return self._path
+
+ @path.setter
+ def path(self, value):
+ if value is not None:
+ self._path = str(to_str(value))
+ else:
+ self._path = None
+
+ # -------------------------------------------------------------------------
+ def __repr__(self):
+ """Typecasting into a string for reproduction."""
+
+ out = "<%s(" % (self.__class__.__name__)
+
+ fields = []
+ fields.append("size_kb={!r}".format(self.size_kb))
+ fields.append("path={!r}".format(self.path))
+ fields.append("appname={!r}".format(self.appname))
+ fields.append("verbose={!r}".format(self.verbose))
+ fields.append("base_dir={!r}".format(self.base_dir))
+ fields.append("initialized={!r}".format(self.initialized))
+
+ out += ", ".join(fields) + ")>"
+ return out
+
+ # -------------------------------------------------------------------------
+ def as_dict(self, short=True):
+ """
+ Transforms the elements of the object into a dict
+
+ @param short: don't include local properties in resulting dict.
+ @type short: bool
+
+ @return: structure as dict
+ @rtype: dict
+ """
+
+ res = super(DuEntry, self).as_dict(short=short)
+ res['size_kb'] = self.size_kb
+ res['path'] = self.path
+ res['dp'] = self.dp
+ res['ts'] = self.ts
+ res['parse_pattern'] = self.parse_pattern
+
+ return res
+
+ # -------------------------------------------------------------------------
+ def __str__(self):
+ return self.to_str()
+
+ # -------------------------------------------------------------------------
+ def size_str(self, unit='K', precision=0):
+
+ u = unit.upper()
+ unit_show = u
+ if u in self.factor:
+ size_float = float(self.size_kb) / float(self.factor[u])
+ if u == 'K':
+ unit_show = ''
+ precision = 0
+ else:
+ unit_show = 'K'
+ size_float = float(self.size_kb)
+ prec = 0
+ for u in ('T', 'G', 'M'):
+ limit = 1.5 * float(self.factor[u])
+ sz_f = float(self.size_kb) / float(self.factor[u])
+ if sz_f >= limit:
+ unit_show = u
+ size_float = sz_f
+ if precision < 1:
+ prec = 1
+ else:
+ prec = precision
+ break
+ if unit_show == 'K':
+ precision = 0
+ else:
+ precision = prec
+
+ template = "{{:,.{:d}f}}".format(precision) + unit_show
+ size_show = template.format(size_float)
+
+ # Localisation
+ if self.dp != '.':
+ size_show = size_show.replace('.', ';').replace(',', self.ts).replace(';', self.dp)
+
+ return size_show
+
+ # -------------------------------------------------------------------------
+ def to_str(self, unit='K', precision=0, size_with=5):
+
+ size_show = self.size_str(unit=unit, precision=precision)
+ line = "{0:>{w}s} {1}".format(size_show, self.path, w=size_with)
+
+ return line
+
+ # -------------------------------------------------------------------------
+ @classmethod
+ def init_locales(cls):
+
+ cls.locale_conv = locale.localeconv()
+ cls.dp = '.'
+ if 'decimal_point' in cls.locale_conv and cls.locale_conv['decimal_point'] != '.':
+ cls.dp = cls.locale_conv['decimal_point']
+ if 'thousands_sep' in cls.locale_conv:
+ cls.ts = cls.locale_conv['thousands_sep']
+
+ cls.parse_pattern = r'^\s*(\d+(?:' + re.escape(cls.dp) + r'\d*)?)([KMGT])?\s+(\S+.*)'
+ cls.parse_re = re.compile(cls.parse_pattern, re.IGNORECASE)
+
+ # -------------------------------------------------------------------------
+ @classmethod
+ def from_line(cls, line, appname=None, verbose=0, base_dir=None):
+
+ match = cls.parse_re.match(line)
+ if not match:
+ raise DuParseError(line)
+
+ if verbose > 3:
+ LOG.debug("Got matching groups: {}.".format(match.groups()))
+
+ sz = match.group(1)
+ if cls.ts:
+ sz = sz.replace(cls.ts, '')
+ if cls.dp != '.':
+ sz = sz.replace(cls.dp, '.')
+ if verbose > 2:
+ LOG.debug("De-localized size: {!r}.".format(sz))
+ size = float(sz)
+ unit = match.group(2)
+ path = match.group(3)
+
+ if unit is not None:
+ unit = unit.upper()
+ if unit in cls.factor:
+ size *= cls.factor[unit]
+
+ entry = cls(
+ size_kb=size, path=path, appname=appname, verbose=verbose, base_dir=base_dir)
+
+ return entry
+
+
+# =============================================================================
+
+if __name__ == "__main__":
+
+ pass
+
+# =============================================================================
+
+# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4
import logging
import textwrap
import sys
+import copy
+import random
# Third party modules
import six
# Own modules
+from .common import pp, to_bytes, to_str
+
from .errors import FunctionNotImplementedError, PpAppError
from .app import PpApplication
+from .du import DuError, DuParseError, DuListError
+from .du import DuEntry
+from .du import DU_UNITS, DU_UNIT_EXP
+
try:
from .local_version import __version__ as my_version
except ImportError:
from .global_version import __version__ as my_version
-__version__ = '0.3.1'
+__version__ = '0.4.1'
LOG = logging.getLogger(__name__)
Application class for the format-du command
"""
- units = ['K', 'k', 'M', 'm', 'G', 'g', 'H', 'h']
-
- unit_exp = {
- 'K': 0,
- 'M': 1,
- 'G': 2,
- }
+ units = copy.copy(DU_UNITS)
+ unit_exp = copy.copy(DU_UNIT_EXP)
# -------------------------------------------------------------------------
def __init__(
)
self.post_init()
+ DuEntry.init_locales()
+ if self.verbose > 2:
+ LOG.debug("Locale conversions:\n{}".format(pp(DuEntry.locale_conv)))
self.initialized = True
# -------------------------------------------------------------------------
line = None
eof = False
+ lnr = 0
while not eof:
+ lnr += 1
line = fh.readline()
if not line:
eof = True
line = line.strip()
if not line:
continue
- self.eval_line(line)
+ self.eval_line(line, lnr)
if self.verbose > 1:
LOG.debug("Finished reading.")
# -------------------------------------------------------------------------
- def eval_line(self, line):
+ def eval_line(self, line, lnr):
if self.verbose > 2:
LOG.debug("Evaluating line {!r} ...".format(line))
+ try:
+ entry = DuEntry.from_line(
+ line=line, appname=self.appname, verbose=self.verbose, base_dir=self.base_dir)
+ except DuParseError as e:
+ LOG.error("Could not parse line {lnr}: {e}".format(lnr=lnr, e=e))
+ LOG.debug("Parsing pattern: {!r}".format(DuEntry.parse_pattern))
+ return
+
+ LOG.debug("Entry:\n{}".format(pp(entry.as_dict())))
+ LOG.debug("Entry: {}".format(entry.to_str(unit='M', precision=1, size_with=10)))
+
# =============================================================================
if __name__ == "__main__":