zipfile_interface.py
Go to the documentation of this file.
00001 import os
00002 import yaml
00003 import shutil
00004 import zipfile
00005 import tempfile
00006 import collections
00007 from roscompile.util import make_executable
00008 
00009 
00010 class ROSCompilePackageFiles:
00011     def __init__(self, package_name, pkg_files, executables):
00012         self.package_name = package_name
00013         self.is_written = False
00014         self.root = self.get_input_root()
00015         self.pkg_files = pkg_files
00016         self.executables = executables
00017 
00018     def copy(self):
00019         return ROSCompilePackageFiles(self.package_name, self.pkg_files, self.executables)
00020 
00021     def get_input_root(self):
00022         return os.path.join(tempfile.gettempdir(), self.package_name)
00023 
00024     def __enter__(self):
00025         self.write()
00026         self.is_written = True
00027         return self
00028 
00029     def __exit__(self, type, value, traceback):
00030         self.is_written = False
00031         self.clear()
00032 
00033     def get_filenames(self):
00034         if self.is_written:
00035             the_files = []
00036             for folder, _, files in os.walk(self.root):
00037                 short_folder = folder.replace(self.root, '')
00038                 if len(short_folder) > 0 and short_folder[0] == '/':
00039                     short_folder = short_folder[1:]
00040                 for fn in files:
00041                     the_files.append(os.path.join(short_folder, fn))
00042             return set(the_files)
00043         else:
00044             return set(self.pkg_files.keys())
00045 
00046     def get_contents(self, filename):
00047         if self.is_written:
00048             full_path = os.path.join(self.root, filename)
00049             if os.path.exists(full_path):
00050                 return open(full_path).read()
00051         elif filename in self.pkg_files:
00052             return self.pkg_files[filename]
00053 
00054     def compare_filesets(self, other_package):
00055         in_keys = self.get_filenames()
00056         out_keys = other_package.get_filenames()
00057         matches = in_keys.intersection(out_keys)
00058         missed_deletes = in_keys - out_keys
00059         missed_generations = out_keys - in_keys
00060         return matches, missed_deletes, missed_generations
00061 
00062     def write(self):
00063         self.clear()
00064         os.mkdir(self.root)
00065         for fn, contents in self.pkg_files.iteritems():
00066             outfile = os.path.join(self.root, fn)
00067             parts = outfile.split(os.sep)
00068             # Parts will be '', tmp, pkg_name, possible_folders, actual_filename
00069             # Create the possible_folders as needed
00070             for i in range(4, len(parts)):
00071                 new_folder = os.sep.join(parts[:i])
00072                 if not os.path.exists(new_folder):
00073                     os.mkdir(new_folder)
00074             with open(outfile, 'w') as f:
00075                 f.write(contents)
00076             if fn in self.executables:
00077                 make_executable(outfile)
00078 
00079     def clear(self):
00080         if os.path.exists(self.root):
00081             shutil.rmtree(self.root)
00082 
00083     def __repr__(self):
00084         return self.package_name
00085 
00086 
00087 def get_test_cases(zip_filename):
00088     file_data = collections.defaultdict(dict)
00089     zf = zipfile.ZipFile(zip_filename)
00090     config = None
00091     executables = set()
00092     for file in zf.filelist:
00093         if file.filename[-1] == '/':
00094             continue
00095         if file.filename == 'list_o_tests.yaml':
00096             config = yaml.load(zf.read(file))
00097             continue
00098         parts = file.filename.split(os.path.sep)
00099         package = parts[0]
00100         path = os.path.join(*parts[1:])
00101         file_data[package][path] = zf.read(file)
00102         if (file.external_attr >> 16) & 0o111:
00103             executables.add(path)
00104 
00105     test_data = {}
00106     for package, D in file_data.iteritems():
00107         test_data[package] = ROSCompilePackageFiles(package, D, executables)
00108     for D in config:
00109         if 'function' in D:
00110             D['functions'] = [D['function']]
00111             del D['function']
00112     return config, test_data


roscompile
Author(s):
autogenerated on Wed Jun 19 2019 19:21:36