src/rosclean/__init__.py
Go to the documentation of this file.
1 # Software License Agreement (BSD License)
2 #
3 # Copyright (c) 2010, Willow Garage, Inc.
4 # All rights reserved.
5 #
6 # Redistribution and use in source and binary forms, with or without
7 # modification, are permitted provided that the following conditions
8 # are met:
9 #
10 # * Redistributions of source code must retain the above copyright
11 # notice, this list of conditions and the following disclaimer.
12 # * Redistributions in binary form must reproduce the above
13 # copyright notice, this list of conditions and the following
14 # disclaimer in the documentation and/or other materials provided
15 # with the distribution.
16 # * Neither the name of Willow Garage, Inc. nor the names of its
17 # contributors may be used to endorse or promote products derived
18 # from this software without specific prior written permission.
19 #
20 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
23 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
24 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
25 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
26 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
27 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
28 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
29 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
30 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31 # POSSIBILITY OF SUCH DAMAGE.
32 
33 from __future__ import print_function
34 
35 import argparse
36 import os
37 import platform
38 import subprocess
39 import sys
40 from distutils.spawn import find_executable
41 
42 import rospkg
43 
44 __version__ = '1.7.0'
45 
46 
47 class CleanupException(Exception):
48  pass
49 
50 
51 def _ask_and_call(cmds, cwd=None):
52  """
53  Pretty print cmds, ask if they should be run, and if so, runs
54  them using _call().
55 
56  :param cmds: a list of commands executed one after another, ``list``
57  :param cwd: (optional) set cwd of command that is executed, ``str``
58  :returns: ``True`` if cmds were run.
59  """
60  # Pretty-print a string version of the commands
61  def quote(s):
62  return '"%s"' % s if ' ' in s else s
63  accepted = _ask('\n'.join([' '.join([quote(s) for s in c]) for c in cmds]))
64  if accepted:
65  _call(cmds, cwd)
66  return accepted
67 
68 
69 def _ask(comment):
70  """
71  ask user with provided comment. If user responds with y, return True
72 
73  :param comment: comment, ``str``
74  :return: ``True`` if user responds with y
75  """
76  sys.stdout.write('Okay to perform:\n\n%s\n(y/n)?\n' % comment)
77  while 1:
78  input = sys.stdin.readline().strip().lower()
79  if input in ['y', 'n']:
80  break
81  return input == 'y'
82 
83 
84 def _call(cmds, cwd=None):
85  """
86  Runs cmds using subprocess.check_call.
87 
88  :param cmds: a list of commands executed one after another, ``list``
89  :param cwd: (optional) set cwd of command that is executed, ``str``
90  """
91  for c in cmds:
92  if cwd:
93  subprocess.check_call(c, cwd=cwd)
94  else:
95  subprocess.check_call(c)
96 
97 
98 def _usage():
99  print("""Usage: rosclean <command>
100 
101 Commands:
102 \trosclean check\tCheck usage of log files
103 \trosclean purge\tRemove log files
104 """)
105  sys.exit(getattr(os, 'EX_USAGE', 1))
106 
107 
109  home_dir = rospkg.get_ros_home()
110  log_dir = rospkg.get_log_dir()
111  dirs = [(log_dir, 'ROS node logs'),
112  (os.path.join(home_dir, 'rosmake'), 'rosmake logs')]
113  return [x for x in dirs if os.path.isdir(x[0])]
114 
115 
117  dirs = _get_check_dirs()
118  for d, label in dirs:
120  print('%s %s' % (desc, label))
121 
122 
124  total_size = 0
125  for dirpath, dirnames, filenames in os.walk(d):
126  for f in filenames:
127  fp = os.path.join(dirpath, f)
128  total_size += os.path.getsize(fp)
129  return total_size
130 
131 
133  """
134  Get human-readable disk usage for directory
135 
136  :param d: directory path, ``str`
137  :returns: human-readable disk usage (du -h), ``str``
138  """
139  # only implemented on Linux and FreeBSD for now. Should work on OS X but need to verify first (du is not identical)
140  if platform.system() in ['Linux', 'FreeBSD']:
141  try:
142  return subprocess.Popen(['du', '-sh', d], stdout=subprocess.PIPE).communicate()[0].split()[0]
143  except Exception:
144  raise CleanupException('rosclean is not supported on this platform')
145  elif platform.system() == 'Windows':
146  total_size = _get_disk_usage_by_walking_tree(d)
147  return 'Total Size: ' + str(total_size) + ' ' + d
148  else:
149  raise CleanupException('rosclean is not supported on this platform')
150 
151 
153  """
154  Get disk usage in bytes for directory
155  :param d: directory path, ``str``
156  :returns: disk usage in bytes (du -b) or (du -A) * 1024, ``int``
157  :raises: :exc:`CleanupException` If get_disk_usage() cannot be used on this platform
158  """
159  if platform.system() == 'Windows':
161 
162  # only implemented on Linux and FreeBSD for now. Should work on OS X but need to verify first (du is not identical)
163  cmd = None
164  unit = 1
165  du = find_executable('du')
166  if du is not None:
167  if platform.system() == 'Linux':
168  cmd = [du, '-sb', d]
169  elif platform.system() == 'FreeBSD':
170  cmd = [du, '-skA', d]
171  unit = 1024
172  try:
173  # detect BusyBox du command by following symlink
174  if os.path.basename(os.readlink(du)) == 'busybox':
175  cmd = [du, '-sk', d]
176  unit = 1024
177  except OSError:
178  # readlink raises OSError if the target is not symlink
179  pass
180 
181  if cmd is None:
182  raise CleanupException('rosclean is not supported on this platform')
183  try:
184  return int(subprocess.Popen(cmd, stdout=subprocess.PIPE).communicate()[0].split()[0]) * unit
185  except Exception:
186  raise CleanupException('rosclean is not supported on this platform')
187 
188 
190  """
191  Get files and directories in specified path sorted by last modified time
192  :param d: directory path, ```str```
193  :return: a list of files and directories sorted by last modified time (old first), ```list```
194  """
195  files = os.listdir(d)
196  files.sort(key=lambda f: os.path.getmtime(os.path.join(d, f)))
197  return files
198 
199 
201  dirs = _get_check_dirs()
202 
203  for d, label in dirs:
204  if not args.size:
205  print('Purging %s.' % label)
206  if platform.system() == 'Windows':
207  cmds = [['cmd', '/c', 'rd', '/s', '/q', d]]
208  else:
209  cmds = [['rm', '-rf', d]]
210  try:
211  if args.y:
212  _call(cmds)
213  else:
214  print('PLEASE BE CAREFUL TO VERIFY THE COMMAND BELOW!')
215  _ask_and_call(cmds)
216  except Exception:
217  print('FAILED to execute command', file=sys.stderr)
218  else:
219  files = _sort_file_by_oldest(d)
220  log_size = get_disk_usage(d)
221  if log_size <= args.size * 1024 * 1024:
222  print('Directory size of %s is %d MB which is already below the requested threshold of %d MB.' % (label, log_size / 1024 / 1024, args.size))
223  continue
224  print('Purging %s until directory size is at most %d MB (currently %d MB).' % (label, args.size, log_size / 1024 / 1024))
225  if not args.y:
226  print('PLEASE BE CAREFUL TO VERIFY THE COMMAND BELOW!')
227  if not _ask('Purge some of old logs in %s' % d):
228  return
229  for f in files:
230  if log_size <= args.size * 1024 * 1024:
231  break
232  path = os.path.join(d, f)
233  log_size -= get_disk_usage(path)
234  if platform.system() == 'Windows':
235  cmds = [['cmd', '/c', 'rd', '/s', '/q', path]]
236  else:
237  cmds = [['rm', '-rf', path]]
238  try:
239  _call(cmds)
240  except Exception:
241  print('FAILED to execute command', file=sys.stderr)
242 
243 
244 def rosclean_main(argv=None):
245  if argv is None:
246  argv = sys.argv
247  parser = argparse.ArgumentParser(prog='rosclean')
248  subparsers = parser.add_subparsers() # help='sub-command help')
249  parser_check = subparsers.add_parser('check', help='Check usage of log files')
250  parser_check.set_defaults(func=_rosclean_cmd_check)
251  parser_purge = subparsers.add_parser('purge', help='Remove log files')
252  parser_purge.set_defaults(func=_rosclean_cmd_purge)
253  parser_purge.add_argument('-y', action='store_true', default=False, help='CAUTION: automatically confirms all questions to delete files')
254  parser_purge.add_argument('--size', action='store', default=None, type=int, help='Maximum total size in MB to keep when deleting old files')
255  args = parser.parse_args(argv[1:])
256  args.func(args)
257 
258 
259 if __name__ == '__main__':
260  rosclean_main()
def rosclean_main(argv=None)
def _ask(comment)
def _rosclean_cmd_check(args)
def _sort_file_by_oldest(d)
def _get_disk_usage_by_walking_tree(d)
def get_human_readable_disk_usage(d)
def _call(cmds, cwd=None)
def _ask_and_call(cmds, cwd=None)
def _rosclean_cmd_purge(args)


rosclean
Author(s): Ken Conley
autogenerated on Thu Apr 30 2020 06:30:14