csv.py
Go to the documentation of this file.
1 # -*- coding: utf-8 -*-
2 """
3 CSV output plugin.
4 
5 ------------------------------------------------------------------------------
6 This file is part of grepros - grep for ROS bag files and live topics.
7 Released under the BSD License.
8 
9 @author Erki Suurjaak
10 @created 03.12.2021
11 @modified 28.12.2023
12 ------------------------------------------------------------------------------
13 """
14 
15 from __future__ import absolute_import
16 import atexit
17 import csv
18 import itertools
19 import os
20 
21 import six
22 
23 from ... import api
24 from ... import common
25 from ... common import ConsolePrinter, plural
26 from ... outputs import Sink
27 
28 
29 class CsvSink(Sink):
30  """Writes messages to CSV files, each topic to a separate file."""
31 
32 
33  FILE_EXTENSIONS = (".csv", )
34 
35 
36  DEFAULT_ARGS = dict(EMIT_FIELD=(), META=False, NOEMIT_FIELD=(), WRITE_OPTIONS={}, VERBOSE=False)
37 
38  def __init__(self, args=None, **kwargs):
39  """
40  @param args arguments as namespace or dictionary, case-insensitive;
41  or a single path as the base name of CSV files to write
42  @param args.emit_field message fields to emit in output if not all
43  @param args.noemit_field message fields to skip in output
44  @param args.write base name of CSV files to write,
45  will add topic name like "name.__my__topic.csv" for "/my/topic",
46  will add counter like "name.__my__topic.2.csv" if exists
47  @param args.write_options {"overwrite": whether to overwrite existing files
48  (default false)}
49  @param args.meta whether to emit metainfo
50  @param args.verbose whether to emit debug information
51  @param kwargs any and all arguments as keyword overrides, case-insensitive
52  """
53  args = {"WRITE": str(args)} if isinstance(args, common.PATH_TYPES) else args
54  args = common.ensure_namespace(args, CsvSink.DEFAULT_ARGS, **kwargs)
55  super(CsvSink, self).__init__(args)
56  self._filebase = args.WRITE # Filename base, will be made unique
57  self._files = {} # {(topic, typename, typehash): file()}
58  self._writers = {} # {(topic, typename, typehash): CsvWriter}
59  self._patterns = {} # {key: [(() if any field else ('path', ), re.Pattern), ]}
60  self._lasttopickey = None # Last (topic, typename, typehash) emitted
61  self._overwrite = (args.WRITE_OPTIONS.get("overwrite") in (True, "true"))
62  self._close_printed = False
63 
64  for key, vals in [("print", args.EMIT_FIELD), ("noprint", args.NOEMIT_FIELD)]:
65  self._patterns[key] = [(tuple(v.split(".")), common.wildcard_to_regex(v)) for v in vals]
66  atexit.register(self.close)
67 
68  def emit(self, topic, msg, stamp=None, match=None, index=None):
69  """Writes message to output file."""
70  if not self.validate(): raise Exception("invalid")
71  stamp, index = self._ensure_stamp_index(topic, msg, stamp, index)
72  data = (v for _, v in self._iter_fields(msg))
73  metadata = [api.to_sec(stamp), api.to_datetime(stamp), api.get_message_type(msg)]
74  self._make_writer(topic, msg).writerow(itertools.chain(metadata, data))
75  self._close_printed = False
76  super(CsvSink, self).emit(topic, msg, stamp, match, index)
77 
78  def validate(self):
79  """Returns whether overwrite option is valid and file base is writable."""
80  if self.valid is not None: return self.valid
81  result = True
82  if self.args.WRITE_OPTIONS.get("overwrite") not in (None, True, False, "true", "false"):
83  ConsolePrinter.error("Invalid overwrite option for CSV: %r. "
84  "Choose one of {true, false}.",
85  self.args.WRITE_OPTIONS["overwrite"])
86  result = False
87  if not common.verify_io(self.args.WRITE, "w"):
88  result = False
89  self.valid = result
90  return self.valid
91 
92  def close(self):
93  """Closes output file(s), if any."""
94  try:
95  names = {k: f.name for k, f in self._files.items()}
96  for k in names:
97  self._files.pop(k).close()
98  self._writers.clear()
99  self._lasttopickey = None
100  finally:
101  if not self._close_printed and self._counts:
102  self._close_printed = True
103  sizes = {k: None for k in names.values()}
104  for k, n in names.items():
105  try: sizes[k] = os.path.getsize(n)
106  except Exception as e: ConsolePrinter.warn("Error getting size of %s: %s", n, e)
107  ConsolePrinter.debug("Wrote %s in %s to CSV (%s):",
108  plural("message", sum(self._counts.values())),
109  plural("topic", self._counts),
110  common.format_bytes(sum(filter(bool, sizes.values()))))
111  for topickey, name in names.items():
112  ConsolePrinter.debug("- %s (%s, %s)", name,
113  "error getting size" if sizes[topickey] is None else
114  common.format_bytes(sizes[topickey]),
115  plural("message", self._counts[topickey]))
116  super(CsvSink, self).close()
117 
118  def _make_writer(self, topic, msg):
119  """
120  Returns a csv.writer for writing topic data.
121 
122  File is populated with header if not created during this session.
123  """
124  topickey = api.TypeMeta.make(msg, topic).topickey
125  if not self._lasttopickey:
126  common.makedirs(os.path.dirname(self._filebase))
127  if self._lasttopickey and topickey != self._lasttopickey:
128  self._files[self._lasttopickey].close() # Avoid hitting ulimit
129  if topickey not in self._files or self._files[topickey].closed:
130  name = self._files[topickey].name if topickey in self._files else None
131  action = "Creating" # Or "Overwriting"
132  if not name:
133  base, ext = os.path.splitext(self._filebase)
134  name = "%s.%s%s" % (base, topic.lstrip("/").replace("/", "__"), ext)
135  if self._overwrite:
136  if os.path.isfile(name) and os.path.getsize(name): action = "Overwriting"
137  open(name, "w").close()
138  else: name = common.unique_path(name)
139  flags = {"mode": "ab"} if six.PY2 else {"mode": "a", "newline": ""}
140  f = open(name, **flags)
141  w = CsvWriter(f)
142  if topickey not in self._files:
143  if self.args.VERBOSE:
144  ConsolePrinter.debug("%s %s.", action, name)
145  header = (topic + "/" + ".".join(map(str, p)) for p, _ in self._iter_fields(msg))
146  metaheader = ["__time", "__datetime", "__type"]
147  w.writerow(itertools.chain(metaheader, header))
148  self._files[topickey], self._writers[topickey] = f, w
149  self._lasttopickey = topickey
150  return self._writers[topickey]
151 
152  def _iter_fields(self, msg, top=()):
153  """
154  Yields ((nested, path), scalar value) from ROS message.
155 
156  Lists are returned as ((nested, path, index), value), e.g. (("data", 0), 666).
157  """
158  prints, noprints = self._patterns["print"], self._patterns["noprint"]
159  fieldmap, identity = api.get_message_fields(msg), lambda x: x
160  fieldmap = api.filter_fields(fieldmap, top, include=prints, exclude=noprints)
161  for k, t in fieldmap.items() if fieldmap != msg else ():
162  v, path, baset = api.get_message_value(msg, k, t), top + (k, ), api.scalar(t)
163  is_sublist = isinstance(v, (list, tuple)) and baset not in api.ROS_BUILTIN_TYPES
164  cast = api.to_sec if baset in api.ROS_TIME_TYPES else identity
165  if isinstance(v, (list, tuple)) and not is_sublist:
166  for i, lv in enumerate(v):
167  yield path + (i, ), cast(lv)
168  elif is_sublist:
169  for i, lmsg in enumerate(v):
170  for lp, lv in self._iter_fields(lmsg, path + (i, )):
171  yield lp, lv
172  elif api.is_ros_message(v, ignore_time=True):
173  for mp, mv in self._iter_fields(v, path):
174  yield mp, mv
175  else:
176  yield path, cast(v)
177 
178 
179 class CsvWriter(object):
180  """Wraps csv.writer with bool conversion, iterator support, and lesser memory use."""
181 
182  def __init__(self, csvfile, dialect="excel", **fmtparams):
183  """
184  @param csvfile file-like object with `write()` method
185  @param dialect CSV dialect to use, one from `csv.list_dialects()`
186  @param fmtparams override individual format parameters in dialect
187  """
188  self._file = csvfile
189  self._buffer = six.BytesIO() if "b" in csvfile.mode else six.StringIO()
190  self._writer = csv.writer(self._buffer, dialect, **dict(fmtparams, lineterminator=""))
191  self._dialect = csv.writer(self._buffer, dialect, **fmtparams).dialect
192  self._format = lambda v: int(v) if isinstance(v, bool) else v
193  if six.PY2: # In Py2, CSV is written in binary mode
194  self._format = lambda v: int(v) if isinstance(v, bool) else \
195  v.encode("utf-8") if isinstance(v, six.text_type) else v
196 
197  @property
198  def dialect(self):
199  """A read-only description of the dialect in use by the writer."""
200  return self._dialect
201 
202  def writerow(self, row):
203  """
204  Writes the row to the writer’s file object.
205 
206  Fields will be formatted according to the current dialect.
207 
208  @param row iterable of field values
209  @return return value of the call to the write method of the underlying file object
210  """
211  def write_columns(cols, inter):
212  """Writes columns to file, returns number of bytes written."""
213  count = len(inter) if inter else 0
214  if inter: self._file.write(inter)
215  # Hack: use csv.writer to format a slice at a time; huge rows cause excessive memory use
216  self._writer.writerow(cols)
217  self._file.write(self._buffer.getvalue())
218  count += self._buffer.tell()
219  self._buffer.seek(0); self._buffer.truncate()
220  return count
221 
222  result, chunk, inter, DELIM, STEP = 0, [], "", self.dialect.delimiter, 10000
223  if "b" in self._file.mode: DELIM = six.binary_type(self.dialect.delimiter)
224  for v in row:
225  chunk.append(self._format(v))
226  if len(chunk) >= STEP:
227  result += write_columns(chunk, inter)
228  chunk, inter = [], DELIM
229  if chunk: result += write_columns(chunk, inter)
230  self._file.write(self.dialect.lineterminator)
231  result += len(self.dialect.lineterminator)
232  return result
233 
234  def writerows(self, rows):
235  """
236  Writes the rows to the writer’s file object.
237 
238  Fields will be formatted according to the current dialect.
239 
240  @param rows iterable of iterables of field values
241  """
242  for row in rows: self.writerow(row)
243 
244 
245 
246 def init(*_, **__):
247  """Adds CSV output format support."""
248  from ... import plugins # Late import to avoid circular
249  plugins.add_write_format("csv", CsvSink, "CSV", [
250  ("overwrite=true|false", "overwrite existing files in CSV output\n"
251  "instead of appending unique counter (default false)")
252  ])
253  plugins.add_output_label("CSV", ["--emit-field", "--no-emit-field"])
254 
255 
256 __all__ = ["CsvSink", "CsvWriter", "init"]
grepros.plugins.auto.csv.CsvSink._patterns
_patterns
Definition: csv.py:59
grepros.plugins.auto.csv.CsvSink.emit
def emit(self, topic, msg, stamp=None, match=None, index=None)
Definition: csv.py:68
grepros.plugins.auto.csv.CsvSink.validate
def validate(self)
Definition: csv.py:78
generate_msgs.plural
def plural(word, items)
Definition: generate_msgs.py:405
grepros.outputs.Sink.args
args
Definition: outputs.py:50
grepros.plugins.auto.csv.CsvWriter.__init__
def __init__(self, csvfile, dialect="excel", **fmtparams)
Definition: csv.py:182
grepros.plugins.auto.csv.CsvSink.__init__
def __init__(self, args=None, **kwargs)
Definition: csv.py:38
grepros.plugins.auto.csv.CsvSink._make_writer
def _make_writer(self, topic, msg)
Definition: csv.py:118
grepros.plugins.auto.csv.CsvWriter._dialect
_dialect
Definition: csv.py:191
grepros.plugins.auto.csv.CsvSink._lasttopickey
_lasttopickey
Definition: csv.py:60
grepros.plugins.auto.csv.CsvSink._overwrite
_overwrite
Definition: csv.py:61
grepros.plugins.auto.csv.CsvWriter
Definition: csv.py:179
grepros.outputs.Sink
Definition: outputs.py:32
grepros.plugins.auto.csv.CsvSink
Definition: csv.py:29
grepros.plugins.auto.csv.CsvSink.close
def close(self)
Definition: csv.py:92
grepros.outputs.Sink.valid
valid
Result of validate()
Definition: outputs.py:52
grepros.plugins.auto.csv.CsvWriter.dialect
def dialect(self)
Definition: csv.py:198
grepros.outputs.Sink._counts
_counts
Definition: outputs.py:48
grepros.plugins.auto.csv.CsvSink._close_printed
_close_printed
Definition: csv.py:62
grepros.plugins.auto.csv.CsvWriter._file
_file
Definition: csv.py:188
grepros.outputs.Sink.validate
def validate(self)
Definition: outputs.py:88
grepros.plugins.auto.csv.CsvWriter._writer
_writer
Definition: csv.py:190
grepros.plugins.auto.csv.CsvWriter._buffer
_buffer
Definition: csv.py:189
grepros.plugins.auto.csv.CsvWriter.writerows
def writerows(self, rows)
Definition: csv.py:234
grepros.plugins.auto.csv.CsvSink._filebase
_filebase
Definition: csv.py:56
grepros.outputs.Sink.close
def close(self)
Definition: outputs.py:93
grepros.plugins.auto.csv.CsvSink._writers
_writers
Definition: csv.py:58
grepros.plugins.auto.csv.CsvSink._iter_fields
def _iter_fields(self, msg, top=())
Definition: csv.py:152
grepros.plugins.auto.csv.CsvWriter.writerow
def writerow(self, row)
Definition: csv.py:202
grepros.plugins.auto.csv.CsvWriter._format
_format
Definition: csv.py:192
grepros.outputs.Sink._ensure_stamp_index
def _ensure_stamp_index(self, topic, msg, stamp=None, index=None)
Definition: outputs.py:115
grepros.plugins.auto.csv.CsvSink._files
_files
Definition: csv.py:57
grepros.plugins.auto.csv.init
def init(*_, **__)
Definition: csv.py:246


grepros
Author(s): Erki Suurjaak
autogenerated on Sat Jan 6 2024 03:11:29