1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36 from rosh.impl.exceptions import ROSHException
37 import rosh.impl.proc
38
39
40
41
80
81 -class Mux(TopicTool):
82 """
83 Interface to topic_tools/mux
84 """
85
86 - def __init__(self, ctx, ns_obj_inputs, ns_obj_output, name=None):
87 if not ns_obj_inputs or not type(ns_obj_inputs) in (list, tuple):
88 raise ValueError("ns_obj_inputs must be a non-empty list of input topics")
89 TopicTool.__init__(self, ctx, 'mux', ns_obj_inputs[0], ns_obj_output, name=name)
90 self.initial_topic_inputs = ns_obj_inputs
91 self._history = [x._name for x in ns_obj_inputs]
92
94 args = [self.topic_output._name] + [x._name for x in self.initial_topic_inputs] + \
95 ["mux:=%s"%self.name, "__name:=%s"%self.name]
96 self._start(args)
97
99 return self.switch(input_ns)
100
102 if input_ns._name not in self._history:
103 self._ctx.services[self.name].add(input_ns._name)
104 self._history.append(input_ns._name)
105 res = self._ctx.services[self.name].select(input_ns._name)
106 self.topic_input = input_ns
107 return res
108
110 """
111 Interface to topic_tools/relay
112 """
113
114 - def __init__(self, ctx, ns_obj_input, ns_obj_output, name=None, unreliable=False):
115 TopicTool.__init__(self, ctx, 'relay', ns_obj_input, ns_obj_output, name=name)
116 self.unreliable = unreliable
117
119 param = 'true' if self.unreliable else 'false'
120 args = [self.topic_input._name, self.topic_output._name,
121 "__name:=%s"%self.name, "_unreliable:=%s"%param]
122 self._start(args)
123
124 THROTTLE_MODE_MESSAGES = 'messages'
125 THROTTLE_MODE_BYTES = 'bytes'
127 """
128 Interface to topic_tools/relay
129 """
130
131 - def __init__(self, ctx, mode, ns_obj_input, ns_obj_output, rate, name=None, window=None):
132 if mode == THROTTLE_MODE_MESSAGES:
133 pass
134 elif mode == THROTTLE_MODE_BYTES:
135 if window is None:
136 raise ValueError("window must be specified")
137 else:
138 raise ValueError("invalid throttle mode [%s]"%(mode))
139
140 TopicTool.__init__(self, ctx, 'throttle', ns_obj_input, ns_obj_output, name=name)
141 self.mode = mode
142 self.rate = rate
143 self.window = window
144
146 params = [str(self.rate)]
147 if self.mode == THROTTLE_MODE_BYTES:
148 params.append(str(self.window))
149
150 args = [self.mode, self.topic_input._name] + params + [self.topic_output._name, "__name:=%s"%self.name]
151 self._start(args)
152
153
154
155
156
157
159 def mux(intopic_objs, outtopic_obj, name=None):
160 """
161 Create a Mux. The mux will start off selected to the first of the in topics.
162
163 @param intopic_objs: list of input topics
164 @type intopic_objs: [TopicNS]
165 @param outtopic_obj: output topic
166 @type outtopic_obj: [TopicNS]
167 @return: running Mux
168 @rtype: Mux
169 """
170 m = Mux(ctx, intopic_objs, outtopic_obj, name=name)
171 m.start()
172 return m
173 return mux
174
176 def throttle(intopic_obj, outtopic_obj, msgs_per_sec, name=None):
177 """
178 Create throttled topic controlled by message rate.
179
180 @param intopic_obj: input topic
181 @type intopic_obj: [TopicNS]
182 @param outtopic_obj: output topic
183 @type outtopic_obj: [TopicNS]
184 @param msgs_per_sec: Messages per second to let through
185 @type msgs_per_sec: int
186
187 @return: running Throttle
188 @rtype: Throttle
189 """
190 t = Throttle(ctx, THROTTLE_MODE_MESSAGES, intopic_obj, outtopic_obj, msgs_per_sec, name=name)
191 t.start()
192 return t
193 return throttle
194
196 def throttlebw(intopic_obj, outtopic_obj, bytes_per_sec, window=1.0, name=None):
197 """
198 Create throttled topic controlled by max bandwidth.
199
200 @param intopic_obj: input topic
201 @type intopic_obj: [TopicNS]
202 @param outtopic_obj: output topic
203 @type outtopic_obj: [TopicNS]
204 @param bytes_per_sec: Maximum bytes per second to let through
205 @type bytes_per_sec: int
206 @param window: (optional) window over which to sample bandwidth, default 1.0.
207 @type window: float
208
209 @return: running Throttle
210 @rtype: Throttle
211 """
212 t = Throttle(ctx, THROTTLE_MODE_BYTES, intopic_obj, outtopic_obj, bytes_per_sec, name=name, window=window)
213 t.start()
214 return t
215 return throttlebw
216
218 def relay(intopic_obj, outtopic_obj, unreliable=False, name=None):
219 """
220 Create a relay. This is useful where
221 latency is more important that receiving all packets,
222 e.g. operating over a wireless network.
223
224 @param intopic_obj: input topic
225 @type intopic_obj: [TopicNS]
226 @param outtopic_obj: output topic
227 @type outtopic_obj: [TopicNS]
228 @param unreliable: if True, uses unreliable (UDP)
229 transport. default False
230 @type unreliable: bool
231
232 @return: running Relay
233 @rtype: Relay
234 """
235 r = Relay(ctx, intopic_obj, outtopic_obj, name=name, unreliable=unreliable)
236 r.start()
237 return r
238 return relay
239
241 def udprelay(intopic_obj, outtopic_obj, name=None):
242 """
243 Create a relay with unreliable transport. This is useful where
244 latency is more important that receiving all packets,
245 e.g. operating over a wireless network.
246
247 @param intopic_obj: input topic
248 @type intopic_obj: [TopicNS]
249 @param outtopic_obj: output topic
250 @type outtopic_obj: [TopicNS]
251
252 @return: running Relay
253 @rtype: Relay
254 """
255 r = Relay(ctx, intopic_obj, outtopic_obj, name=name, unreliable=False)
256 r.start()
257 return r
258 return udprelay
259
273