Go to the documentation of this file.00001 import os
00002 import yaml
00003 import shutil
00004 import zipfile
00005 import tempfile
00006 import collections
00007 from roscompile.util import make_executable
00008
00009
00010 class ROSCompilePackageFiles:
00011 def __init__(self, package_name, pkg_files, executables):
00012 self.package_name = package_name
00013 self.is_written = False
00014 self.root = self.get_input_root()
00015 self.pkg_files = pkg_files
00016 self.executables = executables
00017
00018 def copy(self):
00019 return ROSCompilePackageFiles(self.package_name, self.pkg_files, self.executables)
00020
00021 def get_input_root(self):
00022 return os.path.join(tempfile.gettempdir(), self.package_name)
00023
00024 def __enter__(self):
00025 self.write()
00026 self.is_written = True
00027 return self
00028
00029 def __exit__(self, type, value, traceback):
00030 self.is_written = False
00031 self.clear()
00032
00033 def get_filenames(self):
00034 if self.is_written:
00035 the_files = []
00036 for folder, _, files in os.walk(self.root):
00037 short_folder = folder.replace(self.root, '')
00038 if len(short_folder) > 0 and short_folder[0] == '/':
00039 short_folder = short_folder[1:]
00040 for fn in files:
00041 the_files.append(os.path.join(short_folder, fn))
00042 return set(the_files)
00043 else:
00044 return set(self.pkg_files.keys())
00045
00046 def get_contents(self, filename):
00047 if self.is_written:
00048 full_path = os.path.join(self.root, filename)
00049 if os.path.exists(full_path):
00050 return open(full_path).read()
00051 elif filename in self.pkg_files:
00052 return self.pkg_files[filename]
00053
00054 def compare_filesets(self, other_package):
00055 in_keys = self.get_filenames()
00056 out_keys = other_package.get_filenames()
00057 matches = in_keys.intersection(out_keys)
00058 missed_deletes = in_keys - out_keys
00059 missed_generations = out_keys - in_keys
00060 return matches, missed_deletes, missed_generations
00061
00062 def write(self):
00063 self.clear()
00064 os.mkdir(self.root)
00065 for fn, contents in self.pkg_files.iteritems():
00066 outfile = os.path.join(self.root, fn)
00067 parts = outfile.split(os.sep)
00068
00069
00070 for i in range(4, len(parts)):
00071 new_folder = os.sep.join(parts[:i])
00072 if not os.path.exists(new_folder):
00073 os.mkdir(new_folder)
00074 with open(outfile, 'w') as f:
00075 f.write(contents)
00076 if fn in self.executables:
00077 make_executable(outfile)
00078
00079 def clear(self):
00080 if os.path.exists(self.root):
00081 shutil.rmtree(self.root)
00082
00083 def __repr__(self):
00084 return self.package_name
00085
00086
00087 def get_test_cases(zip_filename):
00088 file_data = collections.defaultdict(dict)
00089 zf = zipfile.ZipFile(zip_filename)
00090 config = None
00091 executables = set()
00092 for file in zf.filelist:
00093 if file.filename[-1] == '/':
00094 continue
00095 if file.filename == 'list_o_tests.yaml':
00096 config = yaml.load(zf.read(file))
00097 continue
00098 parts = file.filename.split(os.path.sep)
00099 package = parts[0]
00100 path = os.path.join(*parts[1:])
00101 file_data[package][path] = zf.read(file)
00102 if (file.external_attr >> 16) & 0o111:
00103 executables.add(path)
00104
00105 test_data = {}
00106 for package, D in file_data.iteritems():
00107 test_data[package] = ROSCompilePackageFiles(package, D, executables)
00108 for D in config:
00109 if 'function' in D:
00110 D['functions'] = [D['function']]
00111 del D['function']
00112 return config, test_data