Package rosh :: Package impl :: Module top_tools
[frames] | no frames]

Source Code for Module rosh.impl.top_tools

  1  #!/usr/bin/env python 
  2  # Software License Agreement (BSD License) 
  3  # 
  4  # Copyright (c) 2010, Willow Garage, Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions 
  9  # are met: 
 10  # 
 11  #  * Redistributions of source code must retain the above copyright 
 12  #    notice, this list of conditions and the following disclaimer. 
 13  #  * Redistributions in binary form must reproduce the above 
 14  #    copyright notice, this list of conditions and the following 
 15  #    disclaimer in the documentation and/or other materials provided 
 16  #    with the distribution. 
 17  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 18  #    contributors may be used to endorse or promote products derived 
 19  #    from this software without specific prior written permission. 
 20  # 
 21  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 22  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 23  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 24  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 25  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 26  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 27  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 28  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 29  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 30  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 31  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 32  # POSSIBILITY OF SUCH DAMAGE. 
 33  # 
 34  # Revision $Id: top_tools.py 11681 2010-10-22 07:08:51Z kwc $ 
 35   
 36  from rosh.impl.exceptions import ROSHException 
 37  import rosh.impl.proc 
 38   
 39  #TODO (long term): add remote launch capability, as that's where topic 
 40  #tools are most useful. 
 41   
42 -class TopicTool(object):
43 """ 44 Abstraction of general topic tool API 45 """ 46
47 - def __init__(self, ctx, type_, ns_obj_input, ns_obj_output, name=None):
48 if ns_obj_input._name == ns_obj_output._name: 49 raise ValueError("cannot %s to same topic"%(type)) 50 51 self._ctx = ctx 52 self.type = type_ 53 self.topic_input = ns_obj_input 54 self.topic_output = ns_obj_output 55 if name is None: 56 self.name = '/%s%s'%(self.type, self.topic_output._name) 57 else: 58 self.name = name 59 60 self._proc = self._node = None
61
62 - def _start(self, args):
63 if self._proc is not None: 64 raise ROSHException("Already started") 65 self._node, self._proc = \ 66 rosh.impl.proc.launch_node(self._ctx, 'topic_tools', self.type, args=args)
67
68 - def kill(self):
69 # kill() API require _kill() 70 self._kill()
71
72 - def _kill(self):
73 if self._proc is not None: 74 self._proc._kill() 75 self._proc = None 76 77 self._node = None 78 else: 79 print >> sys.stderr, "nothing to kill"
80
81 -class Mux(TopicTool):
82 """ 83 Interface to topic_tools/mux 84 """ 85
86 - def __init__(self, ctx, ns_obj_inputs, ns_obj_output, name=None):
87 if not ns_obj_inputs or not type(ns_obj_inputs) in (list, tuple): 88 raise ValueError("ns_obj_inputs must be a non-empty list of input topics") 89 TopicTool.__init__(self, ctx, 'mux', ns_obj_inputs[0], ns_obj_output, name=name) 90 self.initial_topic_inputs = ns_obj_inputs 91 self._history = [x._name for x in ns_obj_inputs]
92
93 - def start(self):
94 args = [self.topic_output._name] + [x._name for x in self.initial_topic_inputs] + \ 95 ["mux:=%s"%self.name, "__name:=%s"%self.name] 96 self._start(args)
97
98 - def __call__(self, input_ns):
99 return self.switch(input_ns)
100
101 - def switch(self, input_ns):
102 if input_ns._name not in self._history: 103 self._ctx.services[self.name].add(input_ns._name) 104 self._history.append(input_ns._name) 105 res = self._ctx.services[self.name].select(input_ns._name) 106 self.topic_input = input_ns 107 return res
108
109 -class Relay(TopicTool):
110 """ 111 Interface to topic_tools/relay 112 """ 113
114 - def __init__(self, ctx, ns_obj_input, ns_obj_output, name=None, unreliable=False):
115 TopicTool.__init__(self, ctx, 'relay', ns_obj_input, ns_obj_output, name=name) 116 self.unreliable = unreliable
117
118 - def start(self):
119 param = 'true' if self.unreliable else 'false' 120 args = [self.topic_input._name, self.topic_output._name, 121 "__name:=%s"%self.name, "_unreliable:=%s"%param] 122 self._start(args)
123 124 THROTTLE_MODE_MESSAGES = 'messages' 125 THROTTLE_MODE_BYTES = 'bytes'
126 -class Throttle(TopicTool):
127 """ 128 Interface to topic_tools/relay 129 """ 130
131 - def __init__(self, ctx, mode, ns_obj_input, ns_obj_output, rate, name=None, window=None):
132 if mode == THROTTLE_MODE_MESSAGES: 133 pass 134 elif mode == THROTTLE_MODE_BYTES: 135 if window is None: 136 raise ValueError("window must be specified") 137 else: 138 raise ValueError("invalid throttle mode [%s]"%(mode)) 139 140 TopicTool.__init__(self, ctx, 'throttle', ns_obj_input, ns_obj_output, name=name) 141 self.mode = mode 142 self.rate = rate 143 self.window = window
144
145 - def start(self):
146 params = [str(self.rate)] 147 if self.mode == THROTTLE_MODE_BYTES: 148 params.append(str(self.window)) 149 150 args = [self.mode, self.topic_input._name] + params + [self.topic_output._name, "__name:=%s"%self.name] 151 self._start(args)
152 153 ################################################################################ 154 # Factories: these functions create function handles with the context 155 # already initialized so that users don't have to retrieve the plugin 156 # context. 157
158 -def mux_factory(ctx):
159 def mux(intopic_objs, outtopic_obj, name=None): 160 """ 161 Create a Mux. The mux will start off selected to the first of the in topics. 162 163 @param intopic_objs: list of input topics 164 @type intopic_objs: [TopicNS] 165 @param outtopic_obj: output topic 166 @type outtopic_obj: [TopicNS] 167 @return: running Mux 168 @rtype: Mux 169 """ 170 m = Mux(ctx, intopic_objs, outtopic_obj, name=name) 171 m.start() 172 return m
173 return mux 174
175 -def throttle_factory(ctx):
176 def throttle(intopic_obj, outtopic_obj, msgs_per_sec, name=None): 177 """ 178 Create throttled topic controlled by message rate. 179 180 @param intopic_obj: input topic 181 @type intopic_obj: [TopicNS] 182 @param outtopic_obj: output topic 183 @type outtopic_obj: [TopicNS] 184 @param msgs_per_sec: Messages per second to let through 185 @type msgs_per_sec: int 186 187 @return: running Throttle 188 @rtype: Throttle 189 """ 190 t = Throttle(ctx, THROTTLE_MODE_MESSAGES, intopic_obj, outtopic_obj, msgs_per_sec, name=name) 191 t.start() 192 return t
193 return throttle 194
195 -def throttlebw_factory(ctx):
196 def throttlebw(intopic_obj, outtopic_obj, bytes_per_sec, window=1.0, name=None): 197 """ 198 Create throttled topic controlled by max bandwidth. 199 200 @param intopic_obj: input topic 201 @type intopic_obj: [TopicNS] 202 @param outtopic_obj: output topic 203 @type outtopic_obj: [TopicNS] 204 @param bytes_per_sec: Maximum bytes per second to let through 205 @type bytes_per_sec: int 206 @param window: (optional) window over which to sample bandwidth, default 1.0. 207 @type window: float 208 209 @return: running Throttle 210 @rtype: Throttle 211 """ 212 t = Throttle(ctx, THROTTLE_MODE_BYTES, intopic_obj, outtopic_obj, bytes_per_sec, name=name, window=window) 213 t.start() 214 return t
215 return throttlebw 216
217 -def relay_factory(ctx):
218 def relay(intopic_obj, outtopic_obj, unreliable=False, name=None): 219 """ 220 Create a relay. This is useful where 221 latency is more important that receiving all packets, 222 e.g. operating over a wireless network. 223 224 @param intopic_obj: input topic 225 @type intopic_obj: [TopicNS] 226 @param outtopic_obj: output topic 227 @type outtopic_obj: [TopicNS] 228 @param unreliable: if True, uses unreliable (UDP) 229 transport. default False 230 @type unreliable: bool 231 232 @return: running Relay 233 @rtype: Relay 234 """ 235 r = Relay(ctx, intopic_obj, outtopic_obj, name=name, unreliable=unreliable) 236 r.start() 237 return r
238 return relay 239
240 -def udprelay_factory(ctx):
241 def udprelay(intopic_obj, outtopic_obj, name=None): 242 """ 243 Create a relay with unreliable transport. This is useful where 244 latency is more important that receiving all packets, 245 e.g. operating over a wireless network. 246 247 @param intopic_obj: input topic 248 @type intopic_obj: [TopicNS] 249 @param outtopic_obj: output topic 250 @type outtopic_obj: [TopicNS] 251 252 @return: running Relay 253 @rtype: Relay 254 """ 255 r = Relay(ctx, intopic_obj, outtopic_obj, name=name, unreliable=False) 256 r.start() 257 return r
258 return udprelay 259
260 -def topic_tools_symbols(ctx):
261 """ 262 Generate API for topic tools based on ctx instance. 263 264 @return: dictionary of symbols for topic tools user-facing API 265 """ 266 return { 267 'relay': relay_factory(ctx), 268 'udprelay': udprelay_factory(ctx), 269 'mux': mux_factory(ctx), 270 'throttle': throttle_factory(ctx), 271 'throttlebw': throttlebw_factory(ctx) 272 }
273