5 ------------------------------------------------------------------------------
6 This file is part of grepros - grep for ROS bag files and live topics.
7 Released under the BSD License.
12 ------------------------------------------------------------------------------
15 from __future__
import absolute_import
24 from ...
import common
25 from ... common
import ConsolePrinter, plural
26 from ... outputs
import Sink
30 """Writes messages to CSV files, each topic to a separate file."""
33 FILE_EXTENSIONS = (
".csv", )
36 DEFAULT_ARGS = dict(EMIT_FIELD=(), META=
False, NOEMIT_FIELD=(), WRITE_OPTIONS={}, VERBOSE=
False)
40 @param args arguments as namespace or dictionary, case-insensitive;
41 or a single path as the base name of CSV files to write
42 @param args.emit_field message fields to emit in output if not all
43 @param args.noemit_field message fields to skip in output
44 @param args.write base name of CSV files to write,
45 will add topic name like "name.__my__topic.csv" for "/my/topic",
46 will add counter like "name.__my__topic.2.csv" if exists
47 @param args.write_options {"overwrite": whether to overwrite existing files
49 @param args.meta whether to emit metainfo
50 @param args.verbose whether to emit debug information
51 @param kwargs any and all arguments as keyword overrides, case-insensitive
53 args = {
"WRITE": str(args)}
if isinstance(args, common.PATH_TYPES)
else args
54 args = common.ensure_namespace(args, CsvSink.DEFAULT_ARGS, **kwargs)
61 self.
_overwrite = (args.WRITE_OPTIONS.get(
"overwrite")
in (
True,
"true"))
64 for key, vals
in [(
"print", args.EMIT_FIELD), (
"noprint", args.NOEMIT_FIELD)]:
65 self.
_patterns[key] = [(tuple(v.split(
".")), common.wildcard_to_regex(v))
for v
in vals]
66 atexit.register(self.
close)
68 def emit(self, topic, msg, stamp=None, match=None, index=None):
69 """Writes message to output file."""
70 if not self.
validate():
raise Exception(
"invalid")
73 metadata = [api.to_sec(stamp), api.to_datetime(stamp), api.get_message_type(msg)]
74 self.
_make_writer(topic, msg).writerow(itertools.chain(metadata, data))
76 super(CsvSink, self).
emit(topic, msg, stamp, match, index)
79 """Returns whether overwrite option is valid and file base is writable."""
82 if self.
args.WRITE_OPTIONS.get(
"overwrite")
not in (
None,
True,
False,
"true",
"false"):
83 ConsolePrinter.error(
"Invalid overwrite option for CSV: %r. "
84 "Choose one of {true, false}.",
85 self.
args.WRITE_OPTIONS[
"overwrite"])
87 if not common.verify_io(self.
args.WRITE,
"w"):
93 """Closes output file(s), if any."""
95 names = {k: f.name
for k, f
in self.
_files.items()}
103 sizes = {k:
None for k
in names.values()}
104 for k, n
in names.items():
105 try: sizes[k] = os.path.getsize(n)
106 except Exception
as e: ConsolePrinter.warn(
"Error getting size of %s: %s", n, e)
107 ConsolePrinter.debug(
"Wrote %s in %s to CSV (%s):",
110 common.format_bytes(sum(filter(bool, sizes.values()))))
111 for topickey, name
in names.items():
112 ConsolePrinter.debug(
"- %s (%s, %s)", name,
113 "error getting size" if sizes[topickey]
is None else
114 common.format_bytes(sizes[topickey]),
116 super(CsvSink, self).
close()
120 Returns a csv.writer for writing topic data.
122 File is populated with header if not created during this session.
124 topickey = api.TypeMeta.make(msg, topic).topickey
126 common.makedirs(os.path.dirname(self.
_filebase))
129 if topickey
not in self.
_files or self.
_files[topickey].closed:
130 name = self.
_files[topickey].name
if topickey
in self.
_files else None
133 base, ext = os.path.splitext(self.
_filebase)
134 name =
"%s.%s%s" % (base, topic.lstrip(
"/").replace(
"/",
"__"), ext)
136 if os.path.isfile(name)
and os.path.getsize(name): action =
"Overwriting"
137 open(name,
"w").
close()
138 else: name = common.unique_path(name)
139 flags = {
"mode":
"ab"}
if six.PY2
else {
"mode":
"a",
"newline":
""}
140 f = open(name, **flags)
142 if topickey
not in self.
_files:
143 if self.
args.VERBOSE:
144 ConsolePrinter.debug(
"%s %s.", action, name)
145 header = (topic +
"/" +
".".join(map(str, p))
for p, _
in self.
_iter_fields(msg))
146 metaheader = [
"__time",
"__datetime",
"__type"]
147 w.writerow(itertools.chain(metaheader, header))
154 Yields ((nested, path), scalar value) from ROS message.
156 Lists are returned as ((nested, path, index), value), e.g. (("data", 0), 666).
159 fieldmap, identity = api.get_message_fields(msg),
lambda x: x
160 fieldmap = api.filter_fields(fieldmap, top, include=prints, exclude=noprints)
161 for k, t
in fieldmap.items()
if fieldmap != msg
else ():
162 v, path, baset = api.get_message_value(msg, k, t), top + (k, ), api.scalar(t)
163 is_sublist = isinstance(v, (list, tuple))
and baset
not in api.ROS_BUILTIN_TYPES
164 cast = api.to_sec
if baset
in api.ROS_TIME_TYPES
else identity
165 if isinstance(v, (list, tuple))
and not is_sublist:
166 for i, lv
in enumerate(v):
167 yield path + (i, ), cast(lv)
169 for i, lmsg
in enumerate(v):
172 elif api.is_ros_message(v, ignore_time=
True):
180 """Wraps csv.writer with bool conversion, iterator support, and lesser memory use."""
182 def __init__(self, csvfile, dialect="excel", **fmtparams):
184 @param csvfile file-like object with `write()` method
185 @param dialect CSV dialect to use, one from `csv.list_dialects()`
186 @param fmtparams override individual format parameters in dialect
189 self.
_buffer = six.BytesIO()
if "b" in csvfile.mode
else six.StringIO()
190 self.
_writer = csv.writer(self.
_buffer, dialect, **dict(fmtparams, lineterminator=
""))
192 self.
_format =
lambda v: int(v)
if isinstance(v, bool)
else v
194 self.
_format =
lambda v: int(v)
if isinstance(v, bool)
else \
195 v.encode(
"utf-8")
if isinstance(v, six.text_type)
else v
199 """A read-only description of the dialect in use by the writer."""
204 Writes the row to the writer’s file object.
206 Fields will be formatted according to the current dialect.
208 @param row iterable of field values
209 @return return value of the call to the write method of the underlying file object
211 def write_columns(cols, inter):
212 """Writes columns to file, returns number of bytes written."""
213 count = len(inter)
if inter
else 0
214 if inter: self.
_file.write(inter)
222 result, chunk, inter, DELIM, STEP = 0, [],
"", self.
dialect.delimiter, 10000
223 if "b" in self.
_file.mode: DELIM = six.binary_type(self.
dialect.delimiter)
226 if len(chunk) >= STEP:
227 result += write_columns(chunk, inter)
228 chunk, inter = [], DELIM
229 if chunk: result += write_columns(chunk, inter)
231 result += len(self.
dialect.lineterminator)
236 Writes the rows to the writer’s file object.
238 Fields will be formatted according to the current dialect.
240 @param rows iterable of iterables of field values
247 """Adds CSV output format support."""
248 from ...
import plugins
249 plugins.add_write_format(
"csv", CsvSink,
"CSV", [
250 (
"overwrite=true|false",
"overwrite existing files in CSV output\n"
251 "instead of appending unique counter (default false)")
253 plugins.add_output_label(
"CSV", [
"--emit-field",
"--no-emit-field"])
256 __all__ = [
"CsvSink",
"CsvWriter",
"init"]