3 Main outputs for emitting messages.
5 ------------------------------------------------------------------------------
6 This file is part of grepros - grep for ROS bag files and live topics.
7 Released under the BSD License.
12 ------------------------------------------------------------------------------
15 from __future__
import print_function
28 from . common
import ConsolePrinter, MatchMarkers
29 from . inputs
import Source
33 """Output base class."""
39 DEFAULT_ARGS = dict(META=
False)
43 @param args arguments as namespace or dictionary, case-insensitive
44 @param args.meta whether to emit metainfo
45 @param kwargs any and all arguments as keyword overrides, case-insensitive
50 self.
args = common.ensure_namespace(args, Sink.DEFAULT_ARGS, **kwargs)
57 """Context manager entry."""
60 def __exit__(self, exc_type, exc_value, traceback):
61 """Context manager exit, closes sink."""
65 """Outputs source metainfo like bag header as debug stream, if not already emitted."""
66 batch = self.
args.META
and self.
source.get_batch()
69 meta
and ConsolePrinter.debug(meta)
71 def emit(self, topic, msg, stamp=None, match=None, index=None):
75 @param topic full name of ROS topic the message is from
76 @param msg ROS message
77 @param stamp message ROS timestamp, if not current ROS time
78 @param match ROS message with values tagged with match markers if matched, else None
79 @param index message index in topic, if any
81 topickey = api.TypeMeta.make(msg, topic).topickey
85 """Attaches source to sink."""
89 """Returns whether sink prerequisites are met (like ROS environment set if TopicSink)."""
94 """Shuts down output, closing any files or connections."""
99 """Writes out any pending data to disk."""
102 """Handles exception, used by background threads."""
103 ConsolePrinter.error(text)
106 """Returns whether this sink requires highlighted matches."""
111 """Returns true if target is recognizable as output for this sink class."""
112 ext = os.path.splitext(target
or "")[-1].lower()
116 """Returns (stamp, index) populated with current ROS time and topic index if `None`."""
117 if stamp
is None: stamp = api.get_rostime(fallback=
True)
118 if index
is None: index = self.
_counts.get(api.TypeMeta.make(msg, topic).topickey, 0) + 1
123 """Provides message formatting as text."""
126 NOCOLOR_HIGHLIGHT_WRAPPERS =
"**",
"**"
129 DEFAULT_ARGS = dict(COLOR=
True, EMIT_FIELD=(), NOEMIT_FIELD=(), HIGHLIGHT=
True,
130 MAX_FIELD_LINES=
None, START_LINE=
None, END_LINE=
None,
131 MAX_MESSAGE_LINES=
None, LINES_AROUND_MATCH=
None, MATCHED_FIELDS_ONLY=
False,
132 WRAP_WIDTH=
None, MATCH_WRAPPER=
None)
136 @param args arguments as namespace or dictionary, case-insensitive
137 @param args.color False or "never" for not using colors in replacements
138 @param args.highlight highlight matched values (default true)
139 @param args.emit_field message fields to emit if not all
140 @param args.noemit_field message fields to skip in output
141 @param args.max_field_lines maximum number of lines to output per field
142 @param args.start_line message line number to start output from
143 @param args.end_line message line number to stop output at
144 @param args.max_message_lines maximum number of lines to output per message
145 @param args.lines_around_match number of message lines around matched fields to output
146 @param args.matched_fields_only output only the fields where match was found
147 @param args.wrap_width character width to wrap message YAML output at
148 @param args.match_wrapper string to wrap around matched values,
149 both sides if one value, start and end if more than one,
150 or no wrapping if zero values
151 @param kwargs any and all arguments as keyword overrides, case-insensitive
159 self.
_configure(common.ensure_namespace(args, TextSinkMixin.DEFAULT_ARGS, **kwargs))
163 """Returns message as formatted string, optionally highlighted for matches if configured."""
166 highlight = highlight
and self.args.HIGHLIGHT
167 if self.
_prefix or self.args.START_LINE
or self.args.END_LINE \
168 or self.args.MAX_MESSAGE_LINES
or (self.args.LINES_AROUND_MATCH
and highlight):
169 lines = text.splitlines()
171 if self.args.START_LINE
or self.args.END_LINE
or self.args.MAX_MESSAGE_LINES:
172 start = self.args.START_LINE
or 0
173 start = max(start, -len(lines)) - (start > 0)
174 end = self.args.END_LINE
or len(lines)
175 end = max(end, -len(lines)) - (end > 0)
176 if self.args.MAX_MESSAGE_LINES: end = min(end, start + self.args.MAX_MESSAGE_LINES)
177 lines = lines[start:end + 1]
178 lines = lines
and (lines[:-1] + [lines[-1] + self.
_styles[
"rst"]])
180 if self.args.LINES_AROUND_MATCH
and highlight:
181 spans, NUM = [], self.args.LINES_AROUND_MATCH
182 for i, l
in enumerate(lines):
183 if MatchMarkers.START
in l:
184 spans.append([max(0, i - NUM), min(i + NUM + 1, len(lines))])
185 if MatchMarkers.END
in l
and spans:
186 spans[-1][1] = min(i + NUM + 1, len(lines))
187 lines = sum((lines[a:b - 1] + [lines[b - 1] + self.
_styles[
"rst"]]
188 for a, b
in common.merge_spans(spans)), [])
191 lines = [self.
_prefix + l
for l
in lines]
193 text =
"\n".join(lines)
196 text = re.sub(
r"(%s)\1+" % re.escape(a),
r"\1", text)
197 text = text.replace(a, b)
203 """Returns ROS message or other value as YAML."""
205 unquote =
lambda v: v[1:-1]
if v[:1] == v[-1:] ==
'"' else v
207 def retag_match_lines(lines):
208 """Adds match tags to lines where wrapping separated start and end."""
210 for i, l
in enumerate(lines):
211 startpos0, endpos0 = l.find (MatchMarkers.START), l.find (MatchMarkers.END)
212 startpos1, endpos1 = l.rfind(MatchMarkers.START), l.rfind(MatchMarkers.END)
213 if endpos0 >= 0
and (startpos0 < 0
or startpos0 > endpos0):
214 lines[i] = l = re.sub(
r"^(\s*)",
r"\1" + MatchMarkers.START, l)
215 if startpos1 >= 0
and endpos1 < startpos1
and i + 1 < len(lines):
216 lines[i + 1] = re.sub(
r"^(\s*)",
r"\1" + MatchMarkers.START, lines[i + 1])
217 if startpos1 >= 0
and startpos1 > endpos1:
218 CUT, EXTRA = (-len(PH), PH)
if PH
and l.endswith(PH)
else (len(l),
"")
219 lines[i] = l[:CUT] + MatchMarkers.END + EXTRA
223 """Returns text or list/tuple truncated to length used in final output."""
224 if self.args.LINES_AROUND_MATCH \
225 or (
not self.args.MAX_MESSAGE_LINES
and (self.args.END_LINE
or 0) <= 0):
return v
227 MAX_CHAR_LEN = 1 + len(MatchMarkers.START) + len(MatchMarkers.END)
229 if isinstance(v, (list, tuple)): textlen = bytelen = 2 + len(v) * (2 + MAX_CHAR_LEN)
230 else: textlen, bytelen = self.
_wrapper.strlen(v), len(v)
231 if textlen < 10000:
return v
236 MIN_CHARS_PER_LINE = self.
_wrapper.width
237 if MAX_CHAR_LEN != 1:
238 MIN_CHARS_PER_LINE = self.
_wrapper.width // MAX_CHAR_LEN * 2
239 MAX_LINES = self.args.MAX_MESSAGE_LINES
or self.args.END_LINE
240 MAX_CHARS = MAX_LEN = MAX_LINES * MIN_CHARS_PER_LINE * self.
_wrapper.width + 100
241 if bytelen > MAX_CHARS:
242 if isinstance(v, (list, tuple)): MAX_LEN = MAX_CHARS // 3
246 indent =
" " * len(top)
247 if isinstance(val, six.integer_types + (float, bool)):
249 if isinstance(val, common.TEXT_TYPES):
250 if val
in (
"", MatchMarkers.EMPTY):
251 return MatchMarkers.EMPTY_REPL
if val
else "''"
253 return yaml.safe_dump(truncate(val), default_style=
'"', width=sys.maxsize).rstrip(
"\n")
254 if isinstance(val, (list, tuple)):
257 if api.scalar(typename)
in api.ROS_STRING_TYPES:
258 yaml_str = yaml.safe_dump(truncate(val)).rstrip(
'\n')
259 return "\n" +
"\n".join(indent + line
for line
in yaml_str.splitlines())
260 vals = [x
for v
in truncate(val)
for x
in [self.
message_to_yaml(v, top, typename)]
if x]
261 if api.scalar(typename)
in api.ROS_NUMERIC_TYPES:
262 return "[%s]" %
", ".join(unquote(str(v))
for v
in vals)
263 return (
"\n" +
"\n".join(indent +
"- " + v
for v
in vals))
if vals
else ""
264 if api.is_ros_message(val):
265 MATCHED_ONLY = self.args.MATCHED_FIELDS_ONLY
and not self.args.LINES_AROUND_MATCH
266 vals, fieldmap = [], api.get_message_fields(val)
268 fieldmap = api.filter_fields(fieldmap, top, include=prints, exclude=noprints)
269 for k, t
in fieldmap.items():
270 v = self.
message_to_yaml(api.get_message_value(val, k, t), top + (k, ), t)
271 if not v
or MATCHED_ONLY
and MatchMarkers.START
not in v:
274 if t
not in api.ROS_STRING_TYPES: v = unquote(v)
275 if api.scalar(t)
in api.ROS_BUILTIN_TYPES:
276 is_strlist = t.endswith(
"]")
and api.scalar(t)
in api.ROS_STRING_TYPES
277 is_num = api.scalar(t)
in api.ROS_NUMERIC_TYPES
278 extra_indent = indent
if is_strlist
else " " * len(indent + k +
": ")
280 self.
_wrapper.drop_whitespace = t.endswith(
"]")
and not is_strlist
281 self.
_wrapper.break_long_words =
not is_num
282 v = (
"\n" + extra_indent).join(retag_match_lines(self.
_wrapper.wrap(v)))
283 if is_strlist
and self.
_wrapper.strip(v) !=
"[]": v =
"\n" + v
284 vals.append(
"%s%s: %s" % (indent, k, api.format_message_value(val, k, v)))
285 return (
"\n" if indent
and vals
else "") +
"\n".join(vals)
291 """Initializes output settings."""
292 prints, noprints = args.EMIT_FIELD, args.NOEMIT_FIELD
293 for key, vals
in [(
"print", prints), (
"noprint", noprints)]:
294 self.
_patterns[key] = [(tuple(v.split(
".")), common.wildcard_to_regex(v))
for v
in vals]
296 if args.COLOR
not in (
"never",
False):
297 self.
_styles.update({
"hl0": ConsolePrinter.STYLE_HIGHLIGHT
if self.args.HIGHLIGHT
299 "ll0": ConsolePrinter.STYLE_LOWLIGHT,
300 "pfx0": ConsolePrinter.STYLE_SPECIAL,
301 "sep0": ConsolePrinter.STYLE_SPECIAL2})
302 self.
_styles.default_factory =
lambda: ConsolePrinter.STYLE_RESET
304 WRAPS = args.MATCH_WRAPPER
if self.args.HIGHLIGHT
else ""
306 WRAPS = ((WRAPS
or [
""]) * 2)[:2]
310 custom_widths = {MatchMarkers.START: len(WRAPS[0]), MatchMarkers.END: len(WRAPS[1]),
314 wrapargs = dict(max_lines=args.MAX_FIELD_LINES,
315 placeholder=
"%s ...%s" % (self.
_styles[
"ll0"], self.
_styles[
"ll1"]))
316 if args.WRAP_WIDTH
is not None: wrapargs.update(width=args.WRAP_WIDTH)
319 MatchMarkers.END: self.
_styles[
"hl1"]}
324 """Provides output file rollover by size, duration, or message count."""
327 DEFAULT_ARGS = dict(VERBOSE=
False, WRITE=
None, WRITE_OPTIONS={})
330 OPTIONS_TEMPLATES = [
331 (
"rollover-size=NUM",
"size limit for individual files\nin {label} output\n"
332 "as bytes (supports abbreviations like 1K or 2M or 3G)"),
333 (
"rollover-count=NUM",
"message limit for individual files\nin {label} output\n"
334 "(supports abbreviations like 1K or 2M or 3G)"),
335 (
"rollover-duration=INTERVAL",
"message time span limit for individual files\n"
336 "in {label} output\n"
337 "as seconds (supports abbreviations like 60m or 2h or 1d)"),
338 (
"rollover-template=STR",
"output filename template for individual files\n"
339 "in {label} output,\n"
340 'supporting strftime format codes like "%%H-%%M-%%S"\n'
341 'and "%%(index)s" as output file index'),
344 START_META_TEMPLATE =
"{mcount} in {tcount} to "
346 FILE_META_TEMPLATE =
"{name} ({size})"
348 MULTI_META_TEMPLATE =
"\n- {name} ({size}, {mcount}, {tcount})"
353 @param args arguments as namespace or dictionary, case-insensitive
354 @param args.write base name of output file to write if not using rollover-template
355 @param args.write_options {"rollover-size": bytes limit for individual output files,
356 "rollover-count": message limit for individual output files,
357 "rollover-duration": time span limit for individual output files,
358 as ROS duration or convertible seconds,
359 "rollover-template": output filename template, supporting
360 strftime format codes like "%H-%M-%S"
361 and "%(index)s" as output file index,
362 "overwrite": whether to overwrite existing file
364 @param kwargs any and all arguments as keyword overrides, case-insensitive
375 """Returns whether write options are valid, emits error if not, else populates options."""
377 for k
in (
"size",
"count",
"duration"):
378 value = value0 = self.args.WRITE_OPTIONS.get(
"rollover-%s" % k)
379 if value
is None:
continue
380 SUFFIXES = dict(zip(
"smhd", [1, 60, 3600, 24*3600]))
if "duration" == k
else \
381 dict(zip(
"KMGT", [2**10, 2**20, 2**30, 2**40]))
if "size" == k
else \
382 dict(zip(
"KMGT", [10**3, 10**6, 10**9, 10**12]))
384 if isinstance(value, (six.binary_type, six.text_type)):
385 value = common.parse_number(value, SUFFIXES)
386 value = (api.to_duration
if "duration" == k
else int)(value)
387 except Exception:
pass
388 if (value
is None or value < 0)
if "duration" != k \
389 else (k != api.get_ros_time_category(value)
or api.to_sec(value) < 0):
390 ConsolePrinter.error(
"Invalid rollover %s option: %r. "
391 "Value must be a non-negative %s.", k, value0, k)
395 if self.args.WRITE_OPTIONS.get(
"rollover-template"):
396 value = self.args.WRITE_OPTIONS[
"rollover-template"]
397 value = re.sub(
r"(^|[^%])%\(index\)",
r"\1%%(index)", value)
398 try: datetime.datetime.now().strftime(value)
400 ConsolePrinter.error(
"Invalid rollover template option: %r. "
401 "Value must contain valid strftime codes.", value)
406 ConsolePrinter.warn(
"Ignoring rollover template option: "
407 "no rollover limits given.")
413 Closes current output file and prepares new filename if rollover limit reached.
419 stamp = api.time_message(stamp, to_message=
False)
422 props[
"size"] = self.
size
425 do_rollover = (sum(props[
"counts"].values()) >= self.
_rollover_limits[
"count"])
427 stamps = [stamp, props[
"start"]]
428 do_rollover = (max(stamps) - min(stamps) >= self.
_rollover_limits[
"duration"])
431 props[
"size"] = self.
size
435 topickey = api.TypeMeta.make(msg, topic).topickey
436 if not props: props.update({
"counts": {},
"start": stamp,
"size":
None})
437 props[
"start"] = min((props[
"start"], stamp))
438 props[
"counts"][topickey] = props[
"counts"].get(topickey, 0) + 1
442 """Closes output file, if any."""
443 raise NotImplementedError
447 """Returns new filename for output, accounting for rollover template and overwrite."""
448 result = self.args.WRITE
452 except Exception:
pass
453 if self.args.WRITE_OPTIONS.get(
"overwrite")
not in (
True,
"true"):
454 result = common.unique_path(result, empty_ok=
True)
459 """Returns output file metainfo string, with names and sizes and message/topic counts."""
460 if not self._counts:
return ""
461 SIZE_ERROR =
"error getting size"
463 mcount=common.plural(
"message", sum(self._counts.values())),
464 tcount=common.plural(
"topic", self._counts)
468 sizestr = SIZE_ERROR
if sz
is None else common.format_bytes(sz)
472 if props[
"size"]
is None:
473 try: props[
"size"] = os.path.getsize(path)
474 except Exception
as e:
475 ConsolePrinter.warn(
"Error getting size of %s: %s", path, e)
476 sizesum = sum(x[
"size"]
for x
in self.
_rollover_files.values()
if x[
"size"]
is not None)
479 size=common.format_bytes(sizesum)
482 sizestr = SIZE_ERROR
if props[
"size"]
is None else common.format_bytes(props[
"size"])
484 mcount=common.plural(
"message", sum(props[
"counts"].values())),
485 tcount=common.plural(
"topic", props[
"counts"])
492 """Returns current file size in bytes, or None if size lookup failed."""
493 try:
return os.path.getsize(self.
filename)
494 except Exception
as e:
495 ConsolePrinter.warn(
"Error getting size of %s: %s", self.
filename, e)
501 """Returns command-line help texts for rollover options, as [(name, help)]."""
507 """Prints messages to console."""
509 META_LINE_TEMPLATE =
"{ll0}{sep} {line}{ll1}"
510 MESSAGE_SEP_TEMPLATE =
"{ll0}{sep}{ll1}"
511 PREFIX_TEMPLATE =
"{pfx0}{batch}{pfx1}{sep0}{sep}{sep1}"
512 MATCH_PREFIX_SEP =
":"
513 CONTEXT_PREFIX_SEP =
"-"
517 DEFAULT_ARGS = dict(COLOR=
True, EMIT_FIELD=(), NOEMIT_FIELD=(), HIGHLIGHT=
True, META=
False,
518 LINE_PREFIX=
True, MAX_FIELD_LINES=
None, START_LINE=
None,
519 END_LINE=
None, MAX_MESSAGE_LINES=
None, LINES_AROUND_MATCH=
None,
520 MATCHED_FIELDS_ONLY=
False, WRAP_WIDTH=
None, MATCH_WRAPPER=
None)
525 @param args arguments as namespace or dictionary, case-insensitive
526 @param args.color False or "never" for not using colors in replacements
527 @param args.highlight highlight matched values (default true)
528 @param args.meta whether to print metainfo
529 @param args.emit_field message fields to emit if not all
530 @param args.noemit_field message fields to skip in output
531 @param args.line_prefix print source prefix like bag filename on each message line
532 @param args.max_field_lines maximum number of lines to print per field
533 @param args.start_line message line number to start output from
534 @param args.end_line message line number to stop output at
535 @param args.max_message_lines maximum number of lines to output per message
536 @param args.lines_around_match number of message lines around matched fields to output
537 @param args.matched_fields_only output only the fields where match was found
538 @param args.wrap_width character width to wrap message YAML output at
539 @param args.match_wrapper string to wrap around matched values,
540 both sides if one value, start and end if more than one,
541 or no wrapping if zero values
542 @param kwargs any and all arguments as keyword overrides, case-insensitive
544 args = common.ensure_namespace(args, ConsoleSink.DEFAULT_ARGS, **kwargs)
545 if args.WRAP_WIDTH
is None:
546 args = common.structcopy(args)
547 args.WRAP_WIDTH = ConsolePrinter.WIDTH
549 super(ConsoleSink, self).
__init__(args)
550 TextSinkMixin.__init__(self, args)
554 """Prints source metainfo like bag header, if not already printed."""
555 batch = self.
args.META
and self.
source.get_batch()
560 for x
in meta.splitlines())
561 meta
and ConsolePrinter.print(meta)
564 def emit(self, topic, msg, stamp=None, match=None, index=None):
565 """Prints separator line and message text."""
568 if self.
args.LINE_PREFIX
and self.
source.get_batch():
570 kws = dict(self.
_styles, sep=sep, batch=self.
source.get_batch())
574 meta = self.
source.format_message_meta(topic, msg, stamp, index)
576 for x
in meta.splitlines())
577 meta
and ConsolePrinter.print(meta)
580 sep
and ConsolePrinter.print(sep)
581 ConsolePrinter.print(self.
format_message(match
or msg, highlight=bool(match)))
582 super(ConsoleSink, self).
emit(topic, msg, stamp, match, index)
586 """Returns True if sink is configured to highlight matched values."""
587 return bool(self.
args.HIGHLIGHT)
592 """Writes messages to bagfile."""
595 DEFAULT_ARGS = dict(META=
False, WRITE_OPTIONS={}, VERBOSE=
False)
599 @param args arguments as namespace or dictionary, case-insensitive;
600 or a single path as the ROS bagfile to write,
601 or a stream or {@link grepros.api.Bag Bag} instance to write to
602 @param args.write name of ROS bagfile to create or append to,
603 or a stream to write to
604 @param args.write_options {"overwrite": whether to overwrite existing file
606 "rollover-size": bytes limit for individual output files,
607 "rollover-count": message limit for individual output files,
608 "rollover-duration": time span limit for individual output files,
609 as ROS duration or convertible seconds,
610 "rollover-template": output filename template, supporting
611 strftime format codes like "%H-%M-%S"
612 and "%(index)s" as output file index}
613 @param args.meta whether to emit metainfo
614 @param args.verbose whether to emit debug information
615 @param kwargs any and all arguments as keyword overrides, case-insensitive
619 args = {
"WRITE": str(args)}
if isinstance(args, common.PATH_TYPES)
else \
620 {
"WRITE": args}
if common.is_stream(args)
else \
621 {}
if isinstance(args,
api.Bag)
else args
622 args = common.ensure_namespace(args, BagSink.DEFAULT_ARGS, **kwargs)
624 RolloverSinkMixin.__init__(self, args)
626 self.
_overwrite = (args.WRITE_OPTIONS.get(
"overwrite")
in (
"true",
True))
630 atexit.register(self.
close)
632 def emit(self, topic, msg, stamp=None, match=None, index=None):
633 """Writes message to output bagfile."""
634 if not self.
validate():
raise Exception(
"invalid")
636 if self.
_is_pathed: RolloverSinkMixin.ensure_rollover(self, topic, msg, stamp)
638 topickey = api.TypeMeta.make(msg, topic).topickey
639 if topickey
not in self.
_counts and self.
args.VERBOSE:
640 ConsolePrinter.debug(
"Adding topic %s in bag output.", topic)
642 qoses = self.
source.get_message_meta(topic, msg, stamp).get(
"qoses")
643 self.
_bag.write(topic, msg, stamp, qoses=qoses)
644 super(BagSink, self).
emit(topic, msg, stamp, match, index)
647 """Returns whether write options are valid and ROS environment set, emits error if not."""
649 result = RolloverSinkMixin.validate(self)
650 if self.
args.WRITE_OPTIONS.get(
"overwrite")
not in (
None,
True,
False,
"true",
"false"):
651 ConsolePrinter.error(
"Invalid overwrite option for bag: %r. "
652 "Choose one of {true, false}.",
653 self.
args.WRITE_OPTIONS[
"overwrite"])
657 ConsolePrinter.error(
"File not writable.")
659 if not self.
_bag and common.is_stream(self.
args.WRITE) \
660 and not any(c.STREAMABLE
for c
in api.Bag.WRITER_CLASSES):
661 ConsolePrinter.error(
"Bag format does not support writing streams.")
663 if self.
_bag and self.
_bag.mode
not in (
"a",
"w"):
664 ConsolePrinter.error(
"Bag not in write mode.")
666 self.
valid = api.validate()
and result
670 """Closes output bag, if any, emits metainfo."""
675 super(BagSink, self).
close()
678 """Closes output bag, if any."""
684 """Returns current file size in bytes, or None if size lookup failed."""
685 try:
return os.path.getsize(self.
_bag.filename
if self.
_bag else self.
filename) \
686 if not self.
_bag or (self.
_bag.filename
and api.ROS1)
else self.
_bag.size
687 except Exception
as e:
688 ConsolePrinter.warn(
"Error getting size of %s: %s", self.
filename, e)
692 """Opens output file if not already open."""
693 if self.
_bag is not None:
699 if common.is_stream(self.
args.WRITE):
705 if not self.
_overwrite and os.path.isfile(filename)
and os.path.getsize(filename):
706 cls = api.Bag.autodetect(filename)
707 if cls
and "a" not in getattr(cls,
"MODES", (
"a", )):
708 filename = self.
filename = common.unique_path(filename)
709 if self.
args.VERBOSE:
710 ConsolePrinter.debug(
"Making unique filename %r, as %s does not support "
711 "appending.", filename, cls.__name___)
712 if self.
args.VERBOSE:
713 sz = os.path.isfile(filename)
and os.path.getsize(filename)
714 ConsolePrinter.debug(
"%s bag output %s%s.",
716 "Appending to" if sz
else "Creating",
717 filename, (
" (%s)" % common.format_bytes(sz))
if sz
else "")
718 common.makedirs(os.path.dirname(filename))
723 """Returns true if target is recognizable as a ROS bag."""
724 ext = os.path.splitext(target
or "")[-1].lower()
725 return ext
in api.BAG_EXTENSIONS
729 """Publishes messages to ROS topics."""
732 DEFAULT_ARGS = dict(LIVE=
False, META=
False, QUEUE_SIZE_OUT=10, PUBLISH_PREFIX=
"",
733 PUBLISH_SUFFIX=
"", PUBLISH_FIXNAME=
"", VERBOSE=
False)
737 @param args arguments as namespace or dictionary, case-insensitive
738 @param args.live whether reading messages from live ROS topics
739 @param args.queue_size_out publisher queue size (default 10)
740 @param args.publish_prefix output topic prefix, prepended to input topic
741 @param args.publish_suffix output topic suffix, appended to output topic
742 @param args.publish_fixname single output topic name to publish to,
743 overrides prefix and suffix if given
744 @param args.meta whether to emit metainfo
745 @param args.verbose whether to emit debug information
746 @param kwargs any and all arguments as keyword overrides, case-insensitive
748 args = common.ensure_namespace(args, TopicSink.DEFAULT_ARGS, **kwargs)
749 super(TopicSink, self).
__init__(args)
753 def emit(self, topic, msg, stamp=None, match=None, index=None):
754 """Publishes message to output topic."""
755 if not self.
validate():
raise Exception(
"invalid")
757 with api.TypeMeta.make(msg, topic)
as m:
758 topickey, cls = (m.topickey, m.typeclass)
759 if topickey
not in self.
_pubs:
760 topic2 = self.
args.PUBLISH_PREFIX + topic + self.
args.PUBLISH_SUFFIX
761 topic2 = self.
args.PUBLISH_FIXNAME
or topic2
762 if self.
args.VERBOSE:
763 ConsolePrinter.debug(
"Publishing from %s to %s.", topic, topic2)
766 if self.
args.PUBLISH_FIXNAME:
767 pub = next((v
for (_, c), v
in self.
_pubs.items()
if c == cls),
None)
768 pub = pub
or api.create_publisher(topic2, cls, queue_size=self.
args.QUEUE_SIZE_OUT)
769 self.
_pubs[topickey] = pub
771 self.
_pubs[topickey].publish(msg)
773 super(TopicSink, self).
emit(topic, msg, stamp, match, index)
776 """Attaches source to sink and blocks until connected to ROS."""
777 if not self.
validate():
raise Exception(
"invalid")
778 super(TopicSink, self).
bind(source)
783 Returns whether ROS environment is set for publishing,
784 and output topic configuration is valid, emits error if not.
787 result = api.validate(live=
True)
789 if self.
args.LIVE
and not any((self.
args.PUBLISH_PREFIX, self.
args.PUBLISH_SUFFIX,
790 self.
args.PUBLISH_FIXNAME)):
791 ConsolePrinter.error(
"Need topic prefix or suffix or fixname "
792 "when republishing messages from live ROS topics.")
798 """Shuts down publishers."""
801 ConsolePrinter.debug(
"Published %s to %s.",
802 common.plural(
"message", sum(self.
_counts.values())),
803 common.plural(
"topic", self.
_pubs))
804 for k
in list(self.
_pubs):
805 try: self.
_pubs.pop(k).unregister()
806 except Exception
as e:
807 if self.
args.VERBOSE:
808 ConsolePrinter.warn(
"Error closing publisher on topic %r: %s", k[0], e)
809 super(TopicSink, self).
close()
813 """Provides messages to callback function."""
816 DEFAULT_ARGS = dict(EMIT=
None, METAEMIT=
None, HIGHLIGHT=
False)
820 @param args arguments as namespace or dictionary, case-insensitive;
822 @param args.emit callback(topic, msg, stamp, highlighted msg, index in topic), if any
823 @param args.metaemit callback(metadata dict) if any, invoked before first emit from source batch
824 @param args.highlight whether to expect highlighted matching fields from source messages
825 @param kwargs any and all arguments as keyword overrides, case-insensitive
827 if callable(args): args = common.ensure_namespace(
None, emit=args)
828 args = common.ensure_namespace(args, AppSink.DEFAULT_ARGS, **kwargs)
832 """Invokes registered metaemit callback, if any, and not already invoked."""
833 if not self.
source:
return
834 batch = self.
source.get_batch()
if self.
args.METAEMIT
else None
837 self.
args.METAEMIT(meta)
839 def emit(self, topic, msg, stamp=None, match=None, index=None):
840 """Registers message and invokes registered emit callback, if any."""
842 super(AppSink, self).
emit(topic, msg, stamp, match, index)
843 if self.
args.EMIT: self.
args.EMIT(topic, msg, stamp, match, index)
846 """Returns whether emitted matches are highlighted."""
847 return self.
args.HIGHLIGHT
851 """Combines any number of sinks."""
854 FLAG_CLASSES = {
"PUBLISH": TopicSink,
"CONSOLE": ConsoleSink,
"APP": AppSink}
857 FORMAT_CLASSES = {
"bag": BagSink}
861 Accepts more arguments, given to the real sinks constructed.
863 @param args arguments as namespace or dictionary, case-insensitive
864 @param args.console print matches to console
865 @param args.write [[target, format=FORMAT, key=value, ], ]
866 @param args.publish publish matches to live topics
867 @param args.app provide messages to given callback function
868 @param sinks pre-created sinks, arguments will be ignored
869 @param kwargs any and all arguments as keyword overrides, case-insensitive
871 args = common.ensure_namespace(args, **kwargs)
872 super(MultiSink, self).
__init__(args)
877 if getattr(args, flag,
None)]
if not sinks
else list(sinks)
879 for dumpopts
in getattr(args,
"WRITE", [])
if not sinks
else ():
880 kwargs = dict(x.split(
"=", 1)
for x
in dumpopts[1:]
if isinstance(x, common.TEXT_TYPES))
881 kwargs.update(kv
for x
in dumpopts[1:]
if isinstance(x, dict)
for kv
in x.items())
882 target, cls = dumpopts[0], self.
FORMAT_CLASSES.get(kwargs.pop(
"format",
None))
885 key=
lambda x: x
is BagSink)
886 if callable(getattr(c,
"autodetect",
None))
887 and c.autodetect(target)),
None)
889 ConsolePrinter.error(
'Unknown output format in "%s"' %
" ".join(map(str, dumpopts)))
892 clsargs = common.structcopy(args)
893 clsargs.WRITE, clsargs.WRITE_OPTIONS = target, kwargs
894 self.
sinks += [cls(clsargs)]
897 """Outputs source metainfo in one sink, if not already emitted."""
898 sink = next((s
for s
in self.
sinks if isinstance(s, ConsoleSink)),
None)
900 sink = sink
or self.
sinks[0]
if self.
sinks else None
901 sink
and sink.emit_meta()
903 def emit(self, topic, msg, stamp=None, match=None, index=None):
904 """Outputs ROS message to all sinks."""
906 for sink
in self.
sinks:
907 sink.emit(topic, msg, stamp, match, index)
908 super(MultiSink, self).
emit(topic, msg, stamp, match, index)
911 """Attaches source to all sinks, sets thread_excepthook on all sinks."""
912 super(MultiSink, self).
bind(source)
913 for sink
in self.
sinks:
918 """Returns whether prerequisites are met for all sinks."""
920 ConsolePrinter.error(
"No output configured.")
921 return bool(self.
sinks)
and all([sink.validate()
for sink
in self.
sinks])
and self.
valid
924 """Closes all sinks."""
925 for sink
in self.
sinks:
929 """Flushes all sinks."""
930 for sink
in self.
sinks:
934 """Returns whether any sink requires highlighted matches."""
935 return any(s.is_highlighting()
for s
in self.
sinks)
939 "AppSink",
"BagSink",
"ConsoleSink",
"MultiSink",
"RolloverSinkMixin",
"Sink",
"TextSinkMixin",