Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 from __future__ import print_function
00034
00035 __version__ = '1.7.0'
00036
00037 import os
00038 import sys
00039 import platform
00040 import subprocess
00041
00042 import rospkg
00043
00044 class CleanupException(Exception): pass
00045
00046 def _ask_and_call(cmds, cwd=None):
00047 """
00048 Pretty print cmds, ask if they should be run, and if so, runs
00049 them using subprocess.check_call.
00050
00051 :param cwd: (optional) set cwd of command that is executed, ``str``
00052 :returns: ``True`` if cmds were run.
00053 """
00054
00055 def quote(s):
00056 return '"%s"'%s if ' ' in s else s
00057 sys.stdout.write("Okay to execute:\n\n%s\n(y/n)?\n"%('\n'.join([' '.join([quote(s) for s in c]) for c in cmds])))
00058 while 1:
00059 input = sys.stdin.readline().strip().lower()
00060 if input in ['y', 'n']:
00061 break
00062 accepted = input == 'y'
00063 if accepted:
00064 for c in cmds:
00065 if cwd:
00066 subprocess.check_call(c, cwd=cwd)
00067 else:
00068 subprocess.check_call(c)
00069 return accepted
00070
00071 def _usage():
00072 print("""Usage: rosclean <command>
00073
00074 Commands:
00075 \trosclean check\tCheck usage of log files
00076 \trosclean purge\tRemove log files
00077 """)
00078 sys.exit(getattr(os, 'EX_USAGE', 1))
00079
00080 def _get_check_dirs():
00081 home_dir = rospkg.get_ros_home()
00082 log_dir = rospkg.get_log_dir()
00083 dirs = [ (log_dir, 'ROS node logs'),
00084 (os.path.join(home_dir, 'rosmake'), 'rosmake logs')]
00085 return [x for x in dirs if os.path.isdir(x[0])]
00086
00087 def _rosclean_cmd_check(argv):
00088 dirs = _get_check_dirs()
00089 for d, label in dirs:
00090 desc = get_human_readable_disk_usage(d)
00091 print("%s %s"%(desc, label))
00092
00093 def get_human_readable_disk_usage(d):
00094 """
00095 Get human-readable disk usage for directory
00096
00097 :param d: directory path, ``str`
00098 :returns: human-readable disk usage (du -h), ``str``
00099 """
00100
00101 if platform.system() in ['Linux', 'FreeBSD']:
00102 try:
00103 return subprocess.Popen(['du', '-sh', d], stdout=subprocess.PIPE).communicate()[0].split()[0]
00104 except:
00105 raise CleanupException("rosclean is not supported on this platform")
00106 else:
00107 raise CleanupException("rosclean is not supported on this platform")
00108
00109 def get_disk_usage(d):
00110 """
00111 Get disk usage in bytes for directory
00112 :param d: directory path, ``str``
00113 :returns: disk usage in bytes (du -b) or (du -A) * 1024, ``int``
00114 :raises: :exc:`CleanupException` If get_disk_usage() cannot be used on this platform
00115 """
00116
00117 if platform.system() == 'Linux':
00118 try:
00119 return int(subprocess.Popen(['du', '-sb', d], stdout=subprocess.PIPE).communicate()[0].split()[0])
00120 except:
00121 raise CleanupException("rosclean is not supported on this platform")
00122 elif platform.system() == 'FreeBSD':
00123 try:
00124 return int(subprocess.Popen(['du', '-sA', d], stdout=subprocess.PIPE).communicate()[0].split()[0]) * 1024
00125 except:
00126 raise CleanupException("rosclean is not supported on this platform")
00127 else:
00128 raise CleanupException("rosclean is not supported on this platform")
00129
00130 def _rosclean_cmd_purge(argv):
00131 dirs = _get_check_dirs()
00132
00133 for d, label in dirs:
00134 print("Purging %s.\nPLEASE BE CAREFUL TO VERIFY THE COMMAND BELOW!"%label)
00135 cmds = [['rm', '-rf', d]]
00136 try:
00137 _ask_and_call(cmds)
00138 except:
00139 print("FAILED to execute command", file=sys.stderr)
00140
00141 def rosclean_main(argv=None):
00142 if argv == None:
00143 argv = sys.argv
00144 if len(argv) < 2:
00145 _usage()
00146 command = argv[1]
00147 if command == 'check':
00148 _rosclean_cmd_check(argv)
00149 elif command == 'purge':
00150 _rosclean_cmd_purge(argv)
00151 else:
00152 _usage()
00153
00154 if __name__ == '__main__':
00155 rosclean_main()