Package rosh :: Package impl :: Module topic
[frames] | no frames]

Source Code for Module rosh.impl.topic

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2010, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id: topic.py 13210 2011-02-14 01:05:08Z kwc $ 
 34   
 35  """ 
 36  ROS topic API and implementation for ROSH 
 37  """ 
 38   
 39  from __future__ import with_statement 
 40   
 41   
 42  import os 
 43  import sys 
 44  import time 
 45  from collections import deque 
 46  from threading import Lock 
 47   
 48  import roslib.message 
 49  import roslib.names 
 50  from roslib.packages import get_pkg_dir 
 51   
 52  import rosmsg 
 53  import rostopic 
 54  import rospy 
 55   
 56  import rosh.impl.proc 
 57  from rosh.impl.exceptions import ROSHException 
 58  from rosh.impl.namespace import Namespace, Concept, NSResourceList 
 59   
 60  # key for adding plugin to show topic type 
 61  PLUGIN_SHOW_TOPIC_TYPE = 'show-topic-type' 
 62   
 63  LATEST = -1 # per API review, index of the buffered message 
 64   
65 -class _TopicIdxCallback(object):
66 - def __init__(self, idx):
67 # per API review, topic[0] is really equal to the next 68 # message, so start count at -1 69 self.count = -1 70 self.idx = idx + 1 71 self.msg = None
72
73 - def __call__(self, msg):
74 self.count += 1 75 if self.count == self.idx: 76 self.msg = msg
77
78 -class _TopicSliceCallback(object):
79
80 - def __init__(self, ns_obj, start, stop, step, last_msg=None):
81 if step == 0: 82 raise ValueError("slice step cannot be zero") 83 84 self.ns_obj = ns_obj 85 86 # step_off is necessary for doing our step calculation 87 # with the quirkiness of last_msg 88 self.step_off = 0 89 self.start = start 90 self.stop = stop 91 self.step = step or 1 92 self.count = 0 93 94 self.done = False 95 self.started = False 96 97 self.buff = deque() 98 self.lock = Lock() 99 100 # Python's behavior with out-of-bounds slices is to silently 101 # succeed. This seems to mean that if last_msg is not 102 # initialized and the start is 0, we ignore. 103 if self.start == 0 or self.start == None: 104 if last_msg is not None and self.stop > 0: 105 self.started = True 106 self.buff.append(last_msg) 107 self.step_off = 1 108 self.start = 1 109 110 self.step_off = self.step_off - self.start 111 if stop is not None and stop <= self.start: 112 # note that we switch to the original start idx here to include last_msg 113 self.done = True
114
115 - def __iter__(self):
116 with subscribe(self.ns_obj, self): 117 while not self.done: 118 if self.buff: 119 # deques are threadsafe 120 yield self.buff.popleft() 121 time.sleep(0.01) 122 # yield remainder 123 while self.buff: 124 yield self.buff.popleft()
125
126 - def __call__(self, msg):
127 if not self.done: 128 self.count += 1 129 if self.count >= self.start: 130 if not self.started or (self.count + self.step_off) % self.step == 0: 131 self.started = True 132 self.buff.append(msg) 133 if self.stop is not None and self.count >= self.stop - 1: 134 self.done = True
135
136 -class subscribe(object):
137 """ 138 Subscribe to topic instance. 'with:' semantics enable auto-unsubscribe. 139 """ 140
141 - def __init__(self, ns_obj, cb=None):
142 self.ns_obj = ns_obj 143 if cb is not None: 144 self.callbacks = [cb] 145 self.ns_obj._add_subscriber_callback(cb) 146 else: 147 self.callbacks = []
148
149 - def __enter__(self):
150 pass
151
152 - def __exit__(self, type, value, traceback):
153 self.close()
154
155 - def add_callback(self, cb):
156 """ 157 Register a new callback on this handle. 158 159 Not threadsafe. 160 """ 161 self.ns_obj._add_subscriber_callback(cb) 162 self.callbacks.append(cb)
163
164 - def remove_callback(self, cb):
165 """ 166 Unregister callback on this handle. 167 168 Not threadsafe. 169 """ 170 self.ns_obj._remove_subscriber_callback(cb) 171 self.callbacks.remove(cb)
172
173 - def close(self):
174 for cb in self.callbacks: 175 self.ns_obj._remove_subscriber_callback(cb) 176 177 self.ns_obj = None 178 self.cb = None
179
180 - def __repr__(self):
181 if self.ns_obj is not None: 182 return "subscribe[%s]: %s callback(s)"%(self.ns_obj._name, len(self.callbacks)) 183 else: 184 return "<closed>"
185
186 -def next_msg(ns_obj, timeout=None):
187 """ 188 Get the next message on the topic 189 190 @param ns_obj: TopicNS instance 191 @type ns_obj: L{TopicNS} 192 @param timeout: timeout in millis 193 @type timeout: float 194 @return: message 195 @rtype: roslib.message.Message 196 """ 197 cb = _TopicIdxCallback(1) 198 199 if timeout is not None: 200 timeout_t = time.time() + timeout 201 with subscribe(ns_obj, cb): 202 while cb.msg is None and time.time() < timeout_t: 203 time.sleep(0.01) # yield 204 else: 205 with subscribe(ns_obj, cb): 206 while cb.msg is None: 207 time.sleep(0.01) # yield 208 return cb.msg
209 210
211 -class TopicInfo(object):
212
213 - def __init__(self, config, name, pub_nodes, sub_nodes):
214 self.name, self._pub_nodes, self._sub_nodes = name, pub_nodes, sub_nodes 215 n = config.ctx.nodes 216 node_config = n._config 217 NodeNS = n._nstype 218 self.pub_nodes = NSResourceList('', node_config, pub_nodes, NodeNS) 219 self.sub_nodes = NSResourceList('', node_config, sub_nodes, NodeNS)
220
221 - def _update(self, pub_nodes, sub_nodes):
222 """ 223 In-place update of resource lists 224 """ 225 self._pub_nodes, _sub_nodes = pub_nodes, sub_nodes 226 self.pub_nodes._set_resources(pub_nodes) 227 self.sub_nodes._set_resources(sub_nodes)
228
229 - def __repr__(self):
230 return self.__str__()
231
232 - def __str__(self):
233 buff ="Topic [%s]\n"%(self.name) 234 if self._pub_nodes: 235 buff += "\nPublishers:\n" 236 buff += '\n'.join([" * %s"%(l) for l in self._pub_nodes]) + '\n' 237 if self._sub_nodes: 238 buff += "\nSubscribers:\n" 239 buff += '\n'.join([" * %s"%(l) for l in self._sub_nodes]) + '\n' 240 return buff
241
242 -def topic_info(config, name):
243 try: 244 state = config.ctx.master.getSystemState() 245 pubs, subs, _ = state 246 subs = [x for x in subs if x[0] == name] 247 if subs: 248 subs = subs[0][1] 249 pubs = [x for x in pubs if x[0] == name] 250 if pubs: 251 pubs = pubs[0][1] 252 except: 253 #TODO: log 254 return None 255 return TopicInfo(config, name, pubs, subs)
256
257 -class TopicNS(Namespace):
258 """ 259 L{TopicNS} provides lightweight accessors to a ROS topic and its data. 260 """ 261
262 - def __init__(self, ns, config):
263 """ 264 @param config: Namespace configuration instance. 265 @type config: L{NamespaceConfig} 266 """ 267 super(TopicNS,self).__init__(ns, config) 268 self._type_name = None 269 self._type = None 270 self._sub = None 271 self._pub = None 272 self._callbacks = [] 273 self._last_msg = None 274 self._buffer = None 275 self._bufferlen = None 276 self._buffer_lock = None 277 self._latch = True 278 self._mux = None 279 self._topic_info = None
280
281 - def _list(self):
282 # TODO: add getTopics() to master? This is not very 283 # efficient. At the very least, need to start caching in 284 # config.master. 285 pubs, subs, _ = self._config.master.getSystemState() 286 return set([x[0] for x in pubs if x[0].startswith(self._name)] + [x[0] for x in subs if x[0].startswith(self._name)])
287
288 - def _props(self):
289 return ['buffer', 'latch']
290
291 - def _set_latch(self, val):
292 if self._latch != val: 293 self._latch = val 294 # TODO: it would be good to abstract this in rospy better 295 if self._pub is not None: 296 self._pub.impl.is_latch = val
297
298 - def _set_buffer(self, val):
299 """ 300 Initialize the buffer of this subscriber instance to the 301 specified size. This call can have multiple side-effects: 302 303 * it will start a subscription to the topic 304 * if buffer is already initialized, this call will empty the buffer. 305 306 @param val: length of subscriber callback buffer 307 @type val: int 308 """ 309 self._bufferlen = val 310 if val is not None: 311 if self._buffer_lock is None: 312 self._buffer_lock = Lock() 313 314 # - unfortunately 'maxlen' is only supported in Python 2.6 315 if self._buffer is not None: 316 oldeque = self._buffer 317 self._buffer = deque([]) 318 oldeque.clear() 319 else: 320 self._buffer = deque([]) 321 self._init_sub()
322
323 - def _info(self):
324 if self._topic_info is None: 325 self._topic_info = topic_info(self._config, self._name) 326 return self._topic_info
327
328 - def _show(self):
329 """ 330 show() API. Visualize this topic/namespace. 331 """ 332 #TODO: make this pluggable so that visualizers can be attached to any type name 333 if self._type is None: 334 self._init_type() 335 # any type_name checks must occur before type is None checks 336 if self._type_name == 'nav_msgs/OccupancyGrid': 337 if self._type is not None: 338 show_occ_grid(self) 339 elif self._type is None: 340 # namespace 341 show_graph(self) 342 else: 343 show_rostopic(self) 344 return True
345
346 - def _cb(self, msg):
347 """Message subscription callback""" 348 self._last_msg = msg 349 if self._buffer is not None: 350 # have to lock due to potential to slice in separate thread 351 b = self._buffer 352 with self._buffer_lock: 353 while len(b) > self._bufferlen - 1: 354 b.popleft() 355 self._buffer.append(msg)
356
357 - def __repr__(self):
358 return self.__str__()
359
360 - def __str__(self):
361 if self._type_name is None: 362 self._init_type() 363 if self._type_name is None: 364 return self._ns 365 else: 366 return rosmsg.get_msg_text(self._type_name)
367
368 - def _get_type(self):
369 """ 370 rostype() API 371 """ 372 if self._type is None: 373 self._init_type() 374 return self._type
375
376 - def _set_type(self, t):
377 """ 378 rostype() API. Must be called with desired topic type. topic 379 type can be a string with the message type name, a 380 roslib.message.Message, or a Msg 381 """ 382 if type(t) == str: 383 t = roslib.message.get_message_class(t) 384 if t is None: 385 raise ValueError("cannot load message type: %s"%(t)) 386 else: 387 if not type(t) == type: 388 raise ValueError("invalid message type: %s"%(type(t))) 389 if not issubclass(t, roslib.message.Message): 390 raise ValueError("invalid message type: %s"%(t.__class__.__name__)) 391 392 # finish initalization 393 with self._config.lock: 394 if self._type is not None and t != self._type: 395 self._cleanup() 396 self._type = t 397 self._type_name = t._type
398
399 - def _init_type(self):
400 """ 401 Lazy-initialize type based on graph state. 402 """ 403 if self._ns == '/': 404 return 405 if self._type is None: 406 self._type_name = rostopic.get_topic_type(self._ns, blocking=False)[0] 407 if self._type_name: 408 self._type = roslib.message.get_message_class(self._type_name) 409 if self._type is None: 410 pkg, base_type = roslib.names.package_resource_name(self._type_name) 411 print >> sys.stderr, "\ncannot retrieve type [%s].\nPlease type 'rosmake %s'"%(self._type_name, pkg)
412
413 - def _init_sub(self, block=False):
414 """ 415 Lazy-init subscriber for topic 416 """ 417 # This initialization is fairly fragile as it depends on the 418 # type being introspectable. I'll need to think of a way to 419 # allow users to declare the type of a topic, but allow lazy 420 # behavior as well. 421 while self._sub is None: 422 with self._config.lock: 423 if self._sub is None: 424 #print "init_sub", self._ns 425 self._init_type() 426 if self._type is None: 427 if not block: 428 raise ROSHException("please init() this instance with the topic type") 429 else: 430 self._sub = rospy.Subscriber(self._ns, self._type, self._cb) 431 return 432 if not block: 433 return 434 time.sleep(0.1)
435
436 - def _cleanup_pub(self):
437 with self._config.lock: 438 if self._pub is not None: 439 self._pub.unregister() 440 self._pub = None
441
442 - def _cleanup_sub(self):
443 with self._config.lock: 444 if self._sub is not None: 445 self._sub.unregister() 446 self._sub = None
447
448 - def _cleanup(self):
449 with self._config.lock: 450 # TODO: this should probably check to see if our the impl 451 # is still present, and, if so, either tear down the impl 452 # or error about type inconsistency. 453 self._cleanup_pub() 454 self._cleanup_sub()
455
456 - def __delattr__(self):
457 self._cleanup()
458
459 - def _init_pub(self):
460 """ 461 Lazy-init publisher for topic 462 """ 463 # This initialization is fairly fragile as it depends on the 464 # type being introspectable. I'll need to think of a way to 465 # allow users to declare the type of a topic, but allow lazy 466 # behavior as well. 467 if self._pub is None: 468 with self._config.lock: 469 if self._pub is None: 470 #print "init_pub", self._ns 471 self._init_type() 472 if self._type: 473 self._pub = rospy.Publisher(self._ns, self._type, latch=self._latch) 474 else: 475 raise ROSHException("please init() this instance with the topic type")
476
477 - def _add_subscriber_callback(self, cb, cb_args=None):
478 """ 479 Add callback to be invoked when new messages arrive 480 """ 481 if self._sub is None: 482 self._init_sub() 483 self._sub.impl.add_callback(cb, cb_args)
484
485 - def _remove_subscriber_callback(self, cb, cb_args=None):
486 self._sub.impl.remove_callback(cb, cb_args)
487
488 - def __call__(self, *args, **kwds):
489 """ 490 Publish with a topic-style namespace object. Implements call 491 functionality for ROSNamespace objects when used with topics. 492 493 @param args: *args 494 @param kwds: **kwds 495 """ 496 # lazy-init ns_obj 497 if self._pub is None: 498 # lazy-init type if user calls with a message class arg 499 if len(args) == 1 and isinstance(args[0], roslib.message.Message): 500 self._type = args[0].__class__ 501 self._init_pub() 502 self._pub.publish(*args, **kwds)
503
504 - def _slice(self, slice_obj):
505 self._init_sub(block=True) 506 507 start, stop, step = slice_obj.start, slice_obj.stop, slice_obj.step 508 step = step or 1 509 if step < 1: 510 raise ValueError("step size must be positive") 511 512 if start is None or start >= 0: 513 if stop is not None and start > stop: 514 raise ValueError("stop index must be greater than end index") 515 516 cb = _TopicSliceCallback(self, start, stop, step, self._last_msg) 517 return cb 518 else: 519 520 # print to console 521 if self._buffer is None: 522 print >> sys.stderr, "This topic is not buffered. Use props(obj, buffer=N) to enable." 523 return [] 524 525 # we don't allow this because there are race conditions that are hard to protect against 526 if stop is not None and stop > 0: 527 raise IndexError("cannot slice from past to future") 528 529 # we interpret 0 to mean the last message, so, for the 530 # purposes of a slice, it's the end of the history buffer 531 if stop == 0: 532 stop = None 533 534 # we use python's interpretation of start being 535 # out-of-bounds, which is to just silently succeed 536 # - do this under lock as cb is happening in separate 537 # thread 538 with self._buffer_lock: 539 retval = list(self._buffer)[start:stop:step] 540 return retval
541
542 - def _idx(self, idx):
543 if idx > 0: 544 # if user asks for future message, block until topic exists 545 self._init_sub(block=True) 546 else: 547 self._init_sub(block=False) 548 549 if idx < 0: 550 if idx == LATEST: # -1 551 # per API review, return buffered message 552 return self._last_msg 553 else: 554 #TODO 555 #check history buffer 556 raise IndexError("[%s] outside of history buffer"%idx) 557 else: 558 cb = _TopicIdxCallback(idx) 559 with subscribe(self, cb): 560 while cb.msg is None: 561 time.sleep(0.01) # yield 562 return cb.msg
563
564 - def __setattr__(self, key, value):
565 if key.startswith('_'): 566 return object.__setattr__(self, key, value) 567 568 from rosh.impl.top_tools import Mux 569 #TODO: allow assignment to string topic spec 570 if not type(value) == TopicNS: 571 raise ValueError("value must be a topic") 572 573 output_ns = self.__getitem__(key) 574 if output_ns._mux is None: 575 # implement as mux so that it can be reconfigurable. With 576 # a relay we have to start/stop a process each time 577 # reassignment occurs. 578 m = Mux(self._config.ctx, [value], output_ns) 579 output_ns._mux = m 580 m.start() 581 return m 582 else: 583 output_ns._mux.switch(value) 584 return output_ns._mux
585
586 - def __getitem__(self, key):
587 """ 588 Dictionary-style accessor for services 589 """ 590 # This used to allow fetching fields directly, but I think 591 # that confuses the API too much as the user can't tell when 592 # they've bottomed out in a topic and the semantics of which 593 # message you're looking at are unclear. 594 if type(key) == slice: 595 return self._slice(key) 596 elif type(key) == int: 597 return self._idx(key) 598 # - get namespace 599 else: 600 return super(TopicNS, self).__getitem__(key)
601
602 -class Topics(Concept):
603
604 - def __init__(self, ctx, lock):
605 super(Topics, self).__init__(ctx, lock, TopicNS)
606
607 - def _show(self):
608 show_graph(self._root)
609
610 - def __setattr__(self, key, value):
611 if key.startswith('_'): 612 return object.__setattr__(self, key, value) 613 else: 614 return self._root.__setattr__(key, value)
615 616
617 -def show_graph(ns_obj):
618 #TODO: in ROS 1.3, rxgraph will be a node within rxgraph so we can use launch instead of run() 619 success = rosh.impl.proc.launch(ns_obj._config.ctx, 'rxgraph', 'rxgraph', args=['--topicns', ns_obj._ns]) 620 if success: 621 print "showing graph of topics in namespace %s"%ns_obj._ns 622 else: 623 print >> sys.stderr, "failed to launch rxgraph"
624
625 -def show_rostopic(ns_obj):
626 # rostopic echo in a separate window 627 ros_root = roslib.rosenv.get_ros_root() 628 success = rosh.impl.proc.run(ns_obj._config, ['xterm', '-e', os.path.join(ros_root, 'bin', 'rostopic'), 'echo', ns_obj._name], stdout=False) 629 # TODO: return proc object instead 630 if success: 631 print "running rostopic echo in a separate window" 632 else: 633 print >> sys.stderr, "unable to run rostopic echo in a separate window"
634