Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 from __future__ import print_function
00034
00035 __version__ = '1.7.0'
00036
00037 import argparse
00038 import os
00039 import sys
00040 import platform
00041 import subprocess
00042
00043 import rospkg
00044
00045 class CleanupException(Exception): pass
00046
00047 def _ask_and_call(cmds, cwd=None):
00048 """
00049 Pretty print cmds, ask if they should be run, and if so, runs
00050 them using _call().
00051
00052 :param cmds: a list of commands executed one after another, ``list``
00053 :param cwd: (optional) set cwd of command that is executed, ``str``
00054 :returns: ``True`` if cmds were run.
00055 """
00056
00057 def quote(s):
00058 return '"%s"'%s if ' ' in s else s
00059 sys.stdout.write("Okay to execute:\n\n%s\n(y/n)?\n"%('\n'.join([' '.join([quote(s) for s in c]) for c in cmds])))
00060 while 1:
00061 input = sys.stdin.readline().strip().lower()
00062 if input in ['y', 'n']:
00063 break
00064 accepted = input == 'y'
00065 if accepted:
00066 _call(cmds, cwd)
00067 return accepted
00068
00069 def _call(cmds, cwd=None):
00070 """
00071 Runs cmds using subprocess.check_call.
00072
00073 :param cmds: a list of commands executed one after another, ``list``
00074 :param cwd: (optional) set cwd of command that is executed, ``str``
00075 """
00076 for c in cmds:
00077 if cwd:
00078 subprocess.check_call(c, cwd=cwd)
00079 else:
00080 subprocess.check_call(c)
00081
00082 def _usage():
00083 print("""Usage: rosclean <command>
00084
00085 Commands:
00086 \trosclean check\tCheck usage of log files
00087 \trosclean purge\tRemove log files
00088 """)
00089 sys.exit(getattr(os, 'EX_USAGE', 1))
00090
00091 def _get_check_dirs():
00092 home_dir = rospkg.get_ros_home()
00093 log_dir = rospkg.get_log_dir()
00094 dirs = [ (log_dir, 'ROS node logs'),
00095 (os.path.join(home_dir, 'rosmake'), 'rosmake logs')]
00096 return [x for x in dirs if os.path.isdir(x[0])]
00097
00098 def _rosclean_cmd_check(args):
00099 dirs = _get_check_dirs()
00100 for d, label in dirs:
00101 desc = get_human_readable_disk_usage(d)
00102 print("%s %s"%(desc, label))
00103
00104 def get_human_readable_disk_usage(d):
00105 """
00106 Get human-readable disk usage for directory
00107
00108 :param d: directory path, ``str`
00109 :returns: human-readable disk usage (du -h), ``str``
00110 """
00111
00112 if platform.system() in ['Linux', 'FreeBSD']:
00113 try:
00114 return subprocess.Popen(['du', '-sh', d], stdout=subprocess.PIPE).communicate()[0].split()[0]
00115 except:
00116 raise CleanupException("rosclean is not supported on this platform")
00117 else:
00118 raise CleanupException("rosclean is not supported on this platform")
00119
00120 def get_disk_usage(d):
00121 """
00122 Get disk usage in bytes for directory
00123 :param d: directory path, ``str``
00124 :returns: disk usage in bytes (du -b) or (du -A) * 1024, ``int``
00125 :raises: :exc:`CleanupException` If get_disk_usage() cannot be used on this platform
00126 """
00127
00128 if platform.system() == 'Linux':
00129 try:
00130 return int(subprocess.Popen(['du', '-sb', d], stdout=subprocess.PIPE).communicate()[0].split()[0])
00131 except:
00132 raise CleanupException("rosclean is not supported on this platform")
00133 elif platform.system() == 'FreeBSD':
00134 try:
00135 return int(subprocess.Popen(['du', '-sA', d], stdout=subprocess.PIPE).communicate()[0].split()[0]) * 1024
00136 except:
00137 raise CleanupException("rosclean is not supported on this platform")
00138 else:
00139 raise CleanupException("rosclean is not supported on this platform")
00140
00141 def _rosclean_cmd_purge(args):
00142 dirs = _get_check_dirs()
00143
00144 for d, label in dirs:
00145 print("Purging %s."%label)
00146 cmds = [['rm', '-rf', d]]
00147 try:
00148 if args.y:
00149 _call(cmds)
00150 else:
00151 print("PLEASE BE CAREFUL TO VERIFY THE COMMAND BELOW!")
00152 _ask_and_call(cmds)
00153 except:
00154 print("FAILED to execute command", file=sys.stderr)
00155
00156 def rosclean_main(argv=None):
00157 if argv is None:
00158 argv = sys.argv
00159 parser = argparse.ArgumentParser(prog='rosclean')
00160 subparsers = parser.add_subparsers()
00161 parser_check = subparsers.add_parser('check', help='Check usage of log files')
00162 parser_check.set_defaults(func=_rosclean_cmd_check)
00163 parser_purge = subparsers.add_parser('purge', help='Remove log files')
00164 parser_purge.set_defaults(func=_rosclean_cmd_purge)
00165 parser_purge.add_argument('-y', action='store_true', default=False, help='CAUTION: automatically confirms all questions to delete files')
00166 args = parser.parse_args(argv[1:])
00167 args.func(args)
00168
00169 if __name__ == '__main__':
00170 rosclean_main()