import copy
import glob
import stat
+import pipes
+
+from subprocess import Popen, PIPE
# Third party modules
import six
from .errors import FunctionNotImplementedError, PpAppError
-from .common import pp, terminal_can_colors, to_bytes, to_bool
+from .common import pp, terminal_can_colors, to_bytes, to_bool, to_str
from .cfg_app import PpCfgAppError, PpConfigApplication
-__version__ = '0.2.1'
+__version__ = '0.4.1'
LOG = logging.getLogger(__name__)
UTC = datetime.timezone.utc
default_status_dir = os.sep + os.path.join('var', 'lib', 'quota-check')
default_statusfile_base = 'quota-check.yaml'
+ du_line_re = re.compile(r'^\s*(\d+)\s+(.*)')
+
# -------------------------------------------------------------------------
def __init__(self, appname=None, version=__version__):
self.passwd_data = {}
self.map_uid = {}
self.now = datetime.datetime.now(UTC)
+ self.du_cmd = self.get_command('du', quiet=True)
description = textwrap.dedent('''\
This checks the utilization of the home directories on the NFS server
cfg_stems='quota-check'
)
+ if not self.du_cmd:
+ LOG.error("Command {!r} not found.".format('du'))
+ self.exit(7)
+
self.initialized = True
# -------------------------------------------------------------------------
self.status_data = self.read_status_data()
self.status_data['last_check'] = self.now
self.read_passwd_data()
+ self.check_homes()
self.write_status_data()
if self.verbose > 2:
LOG.debug("User data in passwd:\n{}".format(pp(self.passwd_data)))
+ # -------------------------------------------------------------------------
+ def get_util_dir_kb(self, directory):
+
+ if not os.path.isdir(directory):
+ return 0
+
+ cmd = [self.du_cmd, '-sk', directory]
+ cmd_str = ' '.join(map(lambda x: pipes.quote(x), cmd))
+ if self.verbose > 2:
+ LOG.debug("Executing: {}".format(cmd_str))
+
+ p = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE, universal_newlines=True)
+ (stdoutdata, stderrdata) = p.communicate()
+ ret = p.wait()
+ if stdoutdata:
+ stdoutdata = to_str(stdoutdata)
+ if stderrdata:
+ stderrdata = to_str(stderrdata)
+
+ if ret:
+ msg = "Return value of \"{c}\": {r}.".format(c=cmd_str, r=ret)
+ if stderrdata:
+ msg += "\nError message: {}".format(stderrdata)
+ LOG.error(msg)
+ return 0
+
+ if not stdoutdata:
+ return 0
+
+ result = 0
+ for line in stdoutdata.splitlines():
+ line = line.strip()
+ match = self.du_line_re.search(line)
+ if not match:
+ continue
+ result = int(match.group(1))
+ break
+
+ return result
+
# -------------------------------------------------------------------------
def check_homes(self):
- LOG.info("Checking for unnecessary home directories ...")
+ LOG.info("Checking utilization of home directories ...")
glob_pattern = os.path.join(self.home_root_real, '*')
all_home_entries = glob.glob(glob_pattern)
+ if 'checks' not in self.status_data:
+ self.status_data['checks'] = {}
+ self.status_data['checks'][self.now] = {}
+ check = self.status_data['checks'][self.now]
+
+ i = 0
+
for path in all_home_entries:
if not os.path.isdir(path):
continue
+ i += 1
home_rel = os.sep + os.path.relpath(path, self.chroot_homedir)
if self.verbose > 2:
LOG.debug("Checking {p!r} ({h!r}) ...".format(
p=path, h=home_rel))
- if home_rel in self.passwd_home_dirs:
- continue
- if home_rel in self.exclude_dirs:
- continue
- LOG.debug("Marking {!r} as unnecessary.".format(home_rel))
- self.unnecessary_dirs.append(home_rel)
-
- self.unnecessary_dirs.sort(key=str.lower)
+ dir_stat = os.stat(path)
+ dir_uid = dir_stat.st_uid
+ dir_owner = str(dir_uid)
+ username = dir_owner
+ if dir_uid in self.map_uid:
+ dir_owner = self.map_uid[dir_uid]
+ username = dir_owner
+ if dir_owner in self.passwd_data and self.passwd_data[dir_owner].pw_gecos:
+ dir_owner = self.passwd_data[dir_owner].pw_gecos
+ util = self.get_util_dir_kb(path)
+ result = {
+ 'home': home_rel,
+ 'util_kb': util,
+ 'uid': dir_uid,
+ 'gid': dir_stat.st_gid,
+ 'user': username,
+ 'gecos': dir_owner,
+ }
+ check[home_rel] = result
+ if i > 10:
+ break
# -------------------------------------------------------------------------
def send_results(self):