# Software License Agreement (BSD License)
#
# Copyright (c) 2013, Open Source Robotics Foundation, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Open Source Robotics Foundation, Inc. nor
# the names of its contributors may be used to endorse or promote
# products derived from this software without specific prior
# written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
'''
Extract log information from repositories.
'''
import os
import re
import shutil
import subprocess
import tempfile
[docs]class Tag(object):
def __init__(self, name, timestamp=None):
self.name = name
self.timestamp = timestamp
[docs]class LogEntry(object):
def __init__(self, msg, affected_paths, author):
self.msg = msg
self.author = author
self._affected_paths = [p for p in affected_paths if p]
[docs] def affects_path(self, path):
for apath in self._affected_paths:
# if the path is the root of the repository
# it is affected by all changes
if path == '.':
return True
if apath.startswith(os.path.join(path, '')):
return True
return False
[docs]class VcsClientBase(object):
def __init__(self, path):
self.path = path
[docs] def get_latest_tag_name(self):
raise NotImplementedError()
[docs] def get_log_entries(self, from_tag, to_tag):
raise NotImplementedError()
[docs] def replace_repository_references(self, line):
return line
def _find_executable(self, file_name):
for path in os.getenv('PATH').split(os.path.pathsep):
file_path = os.path.join(path, file_name)
if os.path.isfile(file_path):
return file_path
return None
def _run_command(self, cmd, env=None):
cwd = os.path.abspath(self.path)
result = {'cmd': ' '.join(cmd), 'cwd': cwd}
try:
proc = subprocess.Popen(cmd, cwd=cwd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=env)
output, _ = proc.communicate()
result['output'] = output.rstrip()
result['returncode'] = proc.returncode
except subprocess.CalledProcessError as e:
result['output'] = e.output
result['returncode'] = e.returncode
return result
def _truncate_timestamps(self, tags):
# truncate timestamps to shortest unique representation
# - date only
# - date including hours and minutes
# - date include hours, minutes and seconds
lengths = [10, 16, 19]
for length in lengths:
# filter tags which have not been truncated yet
considered_tags = [t for t in tags if len(t.timestamp) > length]
# count tags which timestamps have the same truncated representation
grouped_by_timestamp = {}
for t in considered_tags:
truncated_timestamp = t.timestamp[:length]
if truncated_timestamp not in grouped_by_timestamp:
grouped_by_timestamp[truncated_timestamp] = []
grouped_by_timestamp[truncated_timestamp].append(t)
# truncate timestamp of tags which are unique
for truncated_timestamp, similar_tags in grouped_by_timestamp.items():
if len(similar_tags) == 1:
similar_tags[0].timestamp = truncated_timestamp
[docs]class GitClient(VcsClientBase):
type = 'git'
def __init__(self, path):
super(GitClient, self).__init__(path)
self._executable = self._find_executable('git')
self._repo_hosting = None
self._github_base_url = 'https://github.com/'
self._github_path = None
# query author
def _get_author(self, hash_):
cmd = [self._executable, 'log', hash_, '-n', '1', '--format=format:%an']
result = self._run_command(cmd)
if result['returncode']:
raise RuntimeError('Could not fetch author:\n%s' % result['output'])
return result['output']
[docs] def get_latest_tag_name(self):
cmd_describe = [self._executable, 'describe', '--abbrev=0', '--tags']
result_describe = self._run_command(cmd_describe)
if result_describe['returncode']:
raise RuntimeError('Could not fetch latest tag:\n%s' % result_describe['output'])
tag_name = result_describe['output']
return tag_name
[docs] def get_log_entries(self, from_tag, to_tag):
# query all hashes in the range
cmd = [self._executable, 'log']
if from_tag or to_tag:
cmd.append('%s%s' % ('%s..' % to_tag if to_tag else '', from_tag if from_tag else ''))
cmd.append('--format=format:%H')
result = self._run_command(cmd)
if result['returncode']:
raise RuntimeError('Could not fetch commit hashes:\n%s' % result['output'])
log_entries = []
if result['output']:
# query further information for each changeset
hashes = result['output'].splitlines()
for hash_ in hashes:
# query commit message
cmd = [self._executable, 'log', hash_, '-n', '1', '--format=format:%B']
result = self._run_command(cmd)
if result['returncode']:
raise RuntimeError('Could not fetch commit message:\n%s' % result['output'])
if result['output'] == from_tag:
continue
msg = result['output']
# query affected paths
cmd = [self._executable, 'show', hash_, '--name-only', '--format=format:""']
result = self._run_command(cmd)
if result['returncode']:
raise RuntimeError('Could not fetch affected paths:\n%s' % result['output'])
affected_paths = result['output'].splitlines()
log_entries.append(LogEntry(msg, affected_paths, self._get_author(hash_)))
return log_entries
[docs] def replace_repository_references(self, line):
if self._repo_hosting is None:
self._repo_hosting = False
try:
self._determine_repo_hosting()
except RuntimeError:
pass
if self._repo_hosting == 'github':
line = self._replace_github_issue_references(line)
return line
def _determine_repo_hosting(self):
cmd = [self._executable, 'config', '--get', 'remote.origin.url']
result = self._run_command(cmd)
if result['returncode']:
raise RuntimeError('Could not fetch remote url:\n%s' % result['output'])
# detect github hosting
prefixes = ['git@github.com:', 'https://github.com/', 'git://github.com/']
for prefix in prefixes:
if result['output'].startswith(prefix):
self._repo_hosting = 'github'
path = result['output'][len(prefix):]
if path.endswith('.git'):
path = path[:-4]
self._github_path = path
break
def _replace_github_issue_references(self, line):
valid_name = '[\\w\._-]+'
issue_pattern = '#(\\d+)'
def replace_issue_number(match):
issue_url = self._github_base_url
if match.group(1):
path = match.group(1)
issue_url += path
else:
path = ''
issue_url += self._github_path
issue_number = match.group(2)
issue_url += '/issues/' + issue_number
return '`%s#%s <%s>`_' % (path, issue_number, issue_url)
line = re.sub(('(%s/%s)?' % (valid_name, valid_name)) + issue_pattern, replace_issue_number, line)
return line
[docs]class HgClient(VcsClientBase):
type = 'hg'
def __init__(self, path):
super(HgClient, self).__init__(path)
self._executable = self._find_executable('hg')
# query author
def _get_author(self, hash_):
cmd = [self._executable, 'log', '-r', hash_, '--template', '{author}']
result = self._run_command(cmd)
if result['returncode']:
raise RuntimeError('Could not fetch author:\n%s' % result['output'])
return result['output']
[docs] def get_latest_tag_name(self):
cmd_log = [self._executable, 'log', '--rev', '.', '--template', '{latesttagdistance}']
result_log = self._run_command(cmd_log)
if result_log['returncode']:
raise RuntimeError('Could not fetch latest tag:\n%s' % result_log['output'])
tag_name = result_log['output']
return tag_name
[docs] def get_log_entries(self, from_tag, to_tag):
# query all hashes in the range
# ascending chronological order since than it is easier to handle empty tag names
revrange = '%s:%s' % ((to_tag if to_tag else ''), (from_tag if from_tag else 'tip'))
if to_tag:
revrange += '-%s' % to_tag
if from_tag:
revrange += '-%s' % from_tag
cmd = [self._executable, 'log', '-r', revrange, '--template', '{rev}\n']
result = self._run_command(cmd)
if result['returncode']:
raise RuntimeError('Could not fetch commit hashes:\n%s' % result['output'])
tmp_base = tempfile.mkdtemp('-hg-style')
try:
style_file = os.path.join(tmp_base, 'hg-changeset-files-per-line.style')
with open(style_file, 'w') as f:
f.write("changeset = '{files}'\n")
f.write("file = '{file}\\n'\n")
log_entries = []
if result['output']:
# query further information for each changeset
revs = reversed(result['output'].splitlines())
for rev in revs:
# query commit message
cmd = [self._executable, 'log', '-r', rev, '-l', '1', '--template', '{desc}']
result = self._run_command(cmd)
if result['returncode']:
raise RuntimeError('Could not fetch commit message:\n%s' % result['output'])
if result['output'] == from_tag:
continue
msg = result['output']
# query affected paths
cmd = [self._executable, 'log', '-r', rev, '-l', '1', '--style', style_file]
result = self._run_command(cmd)
if result['returncode']:
raise RuntimeError('Could not fetch affected paths:\n%s' % result['output'])
affected_paths = result['output'].splitlines()
log_entries.append(LogEntry(msg, affected_paths, self._get_author(rev)))
finally:
shutil.rmtree(tmp_base)
return log_entries
[docs]def get_vcs_client(base_path):
vcs_clients = []
vcs_clients.append(GitClient)
vcs_clients.append(HgClient)
client_types = [c.type for c in vcs_clients]
if len(client_types) != len(set(client_types)):
raise RuntimeError('Multiple vcs clients share the same type: %s' % ', '.join(sorted(client_types)))
for vcs_client in vcs_clients:
if os.path.exists(os.path.join(base_path, '.%s' % vcs_client.type)):
return vcs_client(base_path)
raise RuntimeError('Could not detect repository type - currently supports: %s' % ', '.join([c.type for c in vcs_clients]))