1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """
36 ROS topic API and implementation for ROSH
37 """
38
39 from __future__ import with_statement
40
41
42 import os
43 import sys
44 import time
45 from collections import deque
46 from threading import Lock
47
48 import roslib.message
49 import roslib.names
50 from roslib.packages import get_pkg_dir
51
52 import rosmsg
53 import rostopic
54 import rospy
55
56 import rosh.impl.proc
57 from rosh.impl.exceptions import ROSHException
58 from rosh.impl.namespace import Namespace, Concept, NSResourceList
59
60
61 PLUGIN_SHOW_TOPIC_TYPE = 'show-topic-type'
62
63 LATEST = -1
64
67
68
69 self.count = -1
70 self.idx = idx + 1
71 self.msg = None
72
74 self.count += 1
75 if self.count == self.idx:
76 self.msg = msg
77
79
80 - def __init__(self, ns_obj, start, stop, step, last_msg=None):
81 if step == 0:
82 raise ValueError("slice step cannot be zero")
83
84 self.ns_obj = ns_obj
85
86
87
88 self.step_off = 0
89 self.start = start
90 self.stop = stop
91 self.step = step or 1
92 self.count = 0
93
94 self.done = False
95 self.started = False
96
97 self.buff = deque()
98 self.lock = Lock()
99
100
101
102
103 if self.start == 0 or self.start == None:
104 if last_msg is not None and self.stop > 0:
105 self.started = True
106 self.buff.append(last_msg)
107 self.step_off = 1
108 self.start = 1
109
110 self.step_off = self.step_off - self.start
111 if stop is not None and stop <= self.start:
112
113 self.done = True
114
116 with subscribe(self.ns_obj, self):
117 while not self.done:
118 if self.buff:
119
120 yield self.buff.popleft()
121 time.sleep(0.01)
122
123 while self.buff:
124 yield self.buff.popleft()
125
127 if not self.done:
128 self.count += 1
129 if self.count >= self.start:
130 if not self.started or (self.count + self.step_off) % self.step == 0:
131 self.started = True
132 self.buff.append(msg)
133 if self.stop is not None and self.count >= self.stop - 1:
134 self.done = True
135
137 """
138 Subscribe to topic instance. 'with:' semantics enable auto-unsubscribe.
139 """
140
142 self.ns_obj = ns_obj
143 if cb is not None:
144 self.callbacks = [cb]
145 self.ns_obj._add_subscriber_callback(cb)
146 else:
147 self.callbacks = []
148
151
152 - def __exit__(self, type, value, traceback):
154
156 """
157 Register a new callback on this handle.
158
159 Not threadsafe.
160 """
161 self.ns_obj._add_subscriber_callback(cb)
162 self.callbacks.append(cb)
163
165 """
166 Unregister callback on this handle.
167
168 Not threadsafe.
169 """
170 self.ns_obj._remove_subscriber_callback(cb)
171 self.callbacks.remove(cb)
172
174 for cb in self.callbacks:
175 self.ns_obj._remove_subscriber_callback(cb)
176
177 self.ns_obj = None
178 self.cb = None
179
181 if self.ns_obj is not None:
182 return "subscribe[%s]: %s callback(s)"%(self.ns_obj._name, len(self.callbacks))
183 else:
184 return "<closed>"
185
187 """
188 Get the next message on the topic
189
190 @param ns_obj: TopicNS instance
191 @type ns_obj: L{TopicNS}
192 @param timeout: timeout in millis
193 @type timeout: float
194 @return: message
195 @rtype: roslib.message.Message
196 """
197 cb = _TopicIdxCallback(1)
198
199 if timeout is not None:
200 timeout_t = time.time() + timeout
201 with subscribe(ns_obj, cb):
202 while cb.msg is None and time.time() < timeout_t:
203 time.sleep(0.01)
204 else:
205 with subscribe(ns_obj, cb):
206 while cb.msg is None:
207 time.sleep(0.01)
208 return cb.msg
209
210
212
213 - def __init__(self, config, name, pub_nodes, sub_nodes):
214 self.name, self._pub_nodes, self._sub_nodes = name, pub_nodes, sub_nodes
215 n = config.ctx.nodes
216 node_config = n._config
217 NodeNS = n._nstype
218 self.pub_nodes = NSResourceList('', node_config, pub_nodes, NodeNS)
219 self.sub_nodes = NSResourceList('', node_config, sub_nodes, NodeNS)
220
221 - def _update(self, pub_nodes, sub_nodes):
222 """
223 In-place update of resource lists
224 """
225 self._pub_nodes, _sub_nodes = pub_nodes, sub_nodes
226 self.pub_nodes._set_resources(pub_nodes)
227 self.sub_nodes._set_resources(sub_nodes)
228
231
233 buff ="Topic [%s]\n"%(self.name)
234 if self._pub_nodes:
235 buff += "\nPublishers:\n"
236 buff += '\n'.join([" * %s"%(l) for l in self._pub_nodes]) + '\n'
237 if self._sub_nodes:
238 buff += "\nSubscribers:\n"
239 buff += '\n'.join([" * %s"%(l) for l in self._sub_nodes]) + '\n'
240 return buff
241
243 try:
244 state = config.ctx.master.getSystemState()
245 pubs, subs, _ = state
246 subs = [x for x in subs if x[0] == name]
247 if subs:
248 subs = subs[0][1]
249 pubs = [x for x in pubs if x[0] == name]
250 if pubs:
251 pubs = pubs[0][1]
252 except:
253
254 return None
255 return TopicInfo(config, name, pubs, subs)
256
258 """
259 L{TopicNS} provides lightweight accessors to a ROS topic and its data.
260 """
261
263 """
264 @param config: Namespace configuration instance.
265 @type config: L{NamespaceConfig}
266 """
267 super(TopicNS,self).__init__(ns, config)
268 self._type_name = None
269 self._type = None
270 self._sub = None
271 self._pub = None
272 self._callbacks = []
273 self._last_msg = None
274 self._buffer = None
275 self._bufferlen = None
276 self._buffer_lock = None
277 self._latch = True
278 self._mux = None
279 self._topic_info = None
280
282
283
284
285 pubs, subs, _ = self._config.master.getSystemState()
286 return set([x[0] for x in pubs if x[0].startswith(self._name)] + [x[0] for x in subs if x[0].startswith(self._name)])
287
289 return ['buffer', 'latch']
290
292 if self._latch != val:
293 self._latch = val
294
295 if self._pub is not None:
296 self._pub.impl.is_latch = val
297
299 """
300 Initialize the buffer of this subscriber instance to the
301 specified size. This call can have multiple side-effects:
302
303 * it will start a subscription to the topic
304 * if buffer is already initialized, this call will empty the buffer.
305
306 @param val: length of subscriber callback buffer
307 @type val: int
308 """
309 self._bufferlen = val
310 if val is not None:
311 if self._buffer_lock is None:
312 self._buffer_lock = Lock()
313
314
315 if self._buffer is not None:
316 oldeque = self._buffer
317 self._buffer = deque([])
318 oldeque.clear()
319 else:
320 self._buffer = deque([])
321 self._init_sub()
322
324 if self._topic_info is None:
325 self._topic_info = topic_info(self._config, self._name)
326 return self._topic_info
327
329 """
330 show() API. Visualize this topic/namespace.
331 """
332
333 if self._type is None:
334 self._init_type()
335
336 if self._type_name == 'nav_msgs/OccupancyGrid':
337 if self._type is not None:
338 show_occ_grid(self)
339 elif self._type is None:
340
341 show_graph(self)
342 else:
343 show_rostopic(self)
344 return True
345
346 - def _cb(self, msg):
347 """Message subscription callback"""
348 self._last_msg = msg
349 if self._buffer is not None:
350
351 b = self._buffer
352 with self._buffer_lock:
353 while len(b) > self._bufferlen - 1:
354 b.popleft()
355 self._buffer.append(msg)
356
359
361 if self._type_name is None:
362 self._init_type()
363 if self._type_name is None:
364 return self._ns
365 else:
366 return rosmsg.get_msg_text(self._type_name)
367
369 """
370 rostype() API
371 """
372 if self._type is None:
373 self._init_type()
374 return self._type
375
377 """
378 rostype() API. Must be called with desired topic type. topic
379 type can be a string with the message type name, a
380 roslib.message.Message, or a Msg
381 """
382 if type(t) == str:
383 t = roslib.message.get_message_class(t)
384 if t is None:
385 raise ValueError("cannot load message type: %s"%(t))
386 else:
387 if not type(t) == type:
388 raise ValueError("invalid message type: %s"%(type(t)))
389 if not issubclass(t, roslib.message.Message):
390 raise ValueError("invalid message type: %s"%(t.__class__.__name__))
391
392
393 with self._config.lock:
394 if self._type is not None and t != self._type:
395 self._cleanup()
396 self._type = t
397 self._type_name = t._type
398
400 """
401 Lazy-initialize type based on graph state.
402 """
403 if self._ns == '/':
404 return
405 if self._type is None:
406 self._type_name = rostopic.get_topic_type(self._ns, blocking=False)[0]
407 if self._type_name:
408 self._type = roslib.message.get_message_class(self._type_name)
409 if self._type is None:
410 pkg, base_type = roslib.names.package_resource_name(self._type_name)
411 print >> sys.stderr, "\ncannot retrieve type [%s].\nPlease type 'rosmake %s'"%(self._type_name, pkg)
412
414 """
415 Lazy-init subscriber for topic
416 """
417
418
419
420
421 while self._sub is None:
422 with self._config.lock:
423 if self._sub is None:
424
425 self._init_type()
426 if self._type is None:
427 if not block:
428 raise ROSHException("please init() this instance with the topic type")
429 else:
430 self._sub = rospy.Subscriber(self._ns, self._type, self._cb)
431 return
432 if not block:
433 return
434 time.sleep(0.1)
435
437 with self._config.lock:
438 if self._pub is not None:
439 self._pub.unregister()
440 self._pub = None
441
443 with self._config.lock:
444 if self._sub is not None:
445 self._sub.unregister()
446 self._sub = None
447
449 with self._config.lock:
450
451
452
453 self._cleanup_pub()
454 self._cleanup_sub()
455
458
460 """
461 Lazy-init publisher for topic
462 """
463
464
465
466
467 if self._pub is None:
468 with self._config.lock:
469 if self._pub is None:
470
471 self._init_type()
472 if self._type:
473 self._pub = rospy.Publisher(self._ns, self._type, latch=self._latch)
474 else:
475 raise ROSHException("please init() this instance with the topic type")
476
478 """
479 Add callback to be invoked when new messages arrive
480 """
481 if self._sub is None:
482 self._init_sub()
483 self._sub.impl.add_callback(cb, cb_args)
484
487
489 """
490 Publish with a topic-style namespace object. Implements call
491 functionality for ROSNamespace objects when used with topics.
492
493 @param args: *args
494 @param kwds: **kwds
495 """
496
497 if self._pub is None:
498
499 if len(args) == 1 and isinstance(args[0], roslib.message.Message):
500 self._type = args[0].__class__
501 self._init_pub()
502 self._pub.publish(*args, **kwds)
503
505 self._init_sub(block=True)
506
507 start, stop, step = slice_obj.start, slice_obj.stop, slice_obj.step
508 step = step or 1
509 if step < 1:
510 raise ValueError("step size must be positive")
511
512 if start is None or start >= 0:
513 if stop is not None and start > stop:
514 raise ValueError("stop index must be greater than end index")
515
516 cb = _TopicSliceCallback(self, start, stop, step, self._last_msg)
517 return cb
518 else:
519
520
521 if self._buffer is None:
522 print >> sys.stderr, "This topic is not buffered. Use props(obj, buffer=N) to enable."
523 return []
524
525
526 if stop is not None and stop > 0:
527 raise IndexError("cannot slice from past to future")
528
529
530
531 if stop == 0:
532 stop = None
533
534
535
536
537
538 with self._buffer_lock:
539 retval = list(self._buffer)[start:stop:step]
540 return retval
541
542 - def _idx(self, idx):
543 if idx > 0:
544
545 self._init_sub(block=True)
546 else:
547 self._init_sub(block=False)
548
549 if idx < 0:
550 if idx == LATEST:
551
552 return self._last_msg
553 else:
554
555
556 raise IndexError("[%s] outside of history buffer"%idx)
557 else:
558 cb = _TopicIdxCallback(idx)
559 with subscribe(self, cb):
560 while cb.msg is None:
561 time.sleep(0.01)
562 return cb.msg
563
565 if key.startswith('_'):
566 return object.__setattr__(self, key, value)
567
568 from rosh.impl.top_tools import Mux
569
570 if not type(value) == TopicNS:
571 raise ValueError("value must be a topic")
572
573 output_ns = self.__getitem__(key)
574 if output_ns._mux is None:
575
576
577
578 m = Mux(self._config.ctx, [value], output_ns)
579 output_ns._mux = m
580 m.start()
581 return m
582 else:
583 output_ns._mux.switch(value)
584 return output_ns._mux
585
587 """
588 Dictionary-style accessor for services
589 """
590
591
592
593
594 if type(key) == slice:
595 return self._slice(key)
596 elif type(key) == int:
597 return self._idx(key)
598
599 else:
600 return super(TopicNS, self).__getitem__(key)
601
603
606
609
611 if key.startswith('_'):
612 return object.__setattr__(self, key, value)
613 else:
614 return self._root.__setattr__(key, value)
615
616
618
619 success = rosh.impl.proc.launch(ns_obj._config.ctx, 'rxgraph', 'rxgraph', args=['--topicns', ns_obj._ns])
620 if success:
621 print "showing graph of topics in namespace %s"%ns_obj._ns
622 else:
623 print >> sys.stderr, "failed to launch rxgraph"
624
626
627 ros_root = roslib.rosenv.get_ros_root()
628 success = rosh.impl.proc.run(ns_obj._config, ['xterm', '-e', os.path.join(ros_root, 'bin', 'rostopic'), 'echo', ns_obj._name], stdout=False)
629
630 if success:
631 print "running rostopic echo in a separate window"
632 else:
633 print >> sys.stderr, "unable to run rostopic echo in a separate window"
634