1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """Internal use: Topic-specific extensions for TCPROS support"""
36
37 import socket
38 import threading
39 import time
40
41 try:
42 from xmlrpc.client import ServerProxy
43 except ImportError:
44 from xmlrpclib import ServerProxy
45
46 from rospy.core import logwarn, logerr, logdebug, rospyerr
47 import rospy.exceptions
48 import rospy.names
49
50 import rospy.impl.registration
51 import rospy.impl.transport
52
53 from rospy.impl.tcpros_base import TCPROSTransport, TCPROSTransportProtocol, \
54 get_tcpros_server_address, start_tcpros_server,\
55 DEFAULT_BUFF_SIZE, TCPROS
56
58 """
59 Subscription transport implementation for receiving topic data via
60 peer-to-peer TCP/IP sockets
61 """
62
65 """
66 ctor.
67
68 @param resolved_name: resolved subscription name
69 @type resolved_name: str
70
71 @param recv_data_class: class to instantiate to receive
72 messages
73 @type recv_data_class: L{rospy.Message}
74
75 @param queue_size: maximum number of messages to
76 deserialize from newly read data off socket
77 @type queue_size: int
78
79 @param buff_size: recv buffer size
80 @type buff_size: int
81
82 @param tcp_nodelay: If True, request TCP_NODELAY from publisher
83 @type tcp_nodelay: bool
84 """
85 super(TCPROSSub, self).__init__(resolved_name, recv_data_class, queue_size, buff_size)
86 self.direction = rospy.impl.transport.INBOUND
87 self.tcp_nodelay = tcp_nodelay
88
90 """
91 @return: dictionary of subscriber fields
92 @rtype: dict
93 """
94 return {'topic': self.resolved_name,
95 'message_definition': self.recv_data_class._full_text,
96 'tcp_nodelay': '1' if self.tcp_nodelay else '0',
97 'md5sum': self.recv_data_class._md5sum,
98 'type': self.recv_data_class._type,
99 'callerid': rospy.names.get_caller_id()}
100
101
116
117
118
120 """
121 Publisher transport implementation for publishing topic data via
122 peer-to-peer TCP/IP sockets.
123 """
124
125 - def __init__(self, resolved_name, pub_data_class, is_latch=False, headers=None):
126 """
127 ctor.
128 @param resolved_name: resolved topic name
129 @type resolved_name: str
130 @param pub_data_class: class to instance to receive messages
131 @type pub_data_class: L{rospy.Message} class
132 @param is_latch: If True, Publisher is latching
133 @type is_latch: bool
134 """
135
136 super(TCPROSPub, self).__init__(resolved_name, None, queue_size=None, buff_size=128)
137 self.pub_data_class = pub_data_class
138 self.direction = rospy.impl.transport.OUTBOUND
139 self.is_latch = is_latch
140 self.headers = headers if headers else {}
141
143 base = {'topic': self.resolved_name,
144 'type': self.pub_data_class._type,
145 'latching': '1' if self.is_latch else '0',
146 'message_definition': self.pub_data_class._full_text,
147 'md5sum': self.pub_data_class._md5sum,
148 'callerid': rospy.names.get_caller_id() }
149
150
151
152
153 if self.headers:
154 base.update(self.headers)
155 return base
156
158 """
159 Keeps trying to create connection for subscriber. Then passes off to receive_loop once connected.
160 """
161
162
163
164
165 interval = 0.5
166 while conn.socket is None and not conn.done and not rospy.is_shutdown():
167 try:
168 conn.connect(dest_addr, dest_port, pub_uri, timeout=60.)
169 except rospy.exceptions.TransportInitError as e:
170
171
172 if conn.protocol is None:
173 conn.done = True
174 break
175 rospyerr("unable to create subscriber transport: %s. Will try again in %ss", e, interval)
176 interval = interval * 2
177 time.sleep(interval)
178
179
180 conn.done = not check_if_still_publisher(resolved_topic_name, pub_uri)
181
182 if not conn.done:
183 conn.receive_loop(receive_cb)
184
186 try:
187 s = ServerProxy(pub_uri)
188 code, msg, val = s.getPublications(rospy.names.get_name())
189 if code == 1:
190 return len([t for t in val if t[0] == resolved_topic_name]) > 0
191 else:
192 return False
193 except:
194 return False
195
197 """
198 ROS Protocol handler for TCPROS. Accepts both TCPROS topic
199 connections as well as ROS service connections over TCP. TCP server
200 socket is run once start_server() is called -- this is implicitly
201 called during init_publisher().
202 """
203
205 """ctor"""
206 self.tcp_nodelay_map = {}
207
209 """
210 @param resolved_name: resolved topic name
211 @type resolved_name: str
212
213 @param tcp_nodelay: If True, sets TCP_NODELAY on publisher's
214 socket (disables Nagle algorithm). This results in lower
215 latency publishing at the cost of efficiency.
216 @type tcp_nodelay: bool
217 """
218 self.tcp_nodelay_map[resolved_name] = tcp_nodelay
219
221 """
222 stops the TCP/IP server responsible for receiving inbound connections
223 """
224 pass
225
227 """
228 Connect to topic resolved_name on Publisher pub_uri using TCPROS.
229 @param resolved_name str: resolved topic name
230 @type resolved_name: str
231 @param pub_uri: XML-RPC URI of publisher
232 @type pub_uri: str
233 @param protocol_params: protocol parameters to use for connecting
234 @type protocol_params: [XmlRpcLegal]
235 @return: code, message, debug
236 @rtype: (int, str, int)
237 """
238
239
240 if type(protocol_params) != list or len(protocol_params) != 3:
241 return 0, "ERROR: invalid TCPROS parameters", 0
242 if protocol_params[0] != TCPROS:
243 return 0, "INTERNAL ERROR: protocol id is not TCPROS: %s"%id, 0
244 id, dest_addr, dest_port = protocol_params
245
246 sub = rospy.impl.registration.get_topic_manager().get_subscriber_impl(resolved_name)
247
248
249 protocol = TCPROSSub(resolved_name, sub.data_class, \
250 queue_size=sub.queue_size, buff_size=sub.buff_size,
251 tcp_nodelay=sub.tcp_nodelay)
252 conn = TCPROSTransport(protocol, resolved_name)
253 conn.set_endpoint_id(pub_uri);
254
255 t = threading.Thread(name=resolved_name, target=robust_connect_subscriber, args=(conn, dest_addr, dest_port, pub_uri, sub.receive_callback,resolved_name))
256
257
258 t.start()
259
260
261 if sub.add_connection(conn):
262 return 1, "Connected topic[%s]. Transport impl[%s]"%(resolved_name, conn.__class__.__name__), dest_port
263 else:
264 conn.close()
265 return 0, "ERROR: Race condition failure: duplicate topic subscriber [%s] was created"%(resolved_name), 0
266
268 """
269 @param protocol: name of protocol
270 @type protocol: str
271 @return: True if protocol is supported
272 @rtype: bool
273 """
274 return protocol == TCPROS
275
277 """
278 Get supported protocols
279 """
280 return [[TCPROS]]
281
283 """
284 Initialize this node to receive an inbound TCP connection,
285 i.e. startup a TCP server if one is not already running.
286
287 @param resolved_name: topic name
288 @type resolved__name: str
289
290 @param protocol: negotiated protocol
291 parameters. protocol[0] must be the string 'TCPROS'
292 @type protocol: [str, value*]
293 @return: (code, msg, [TCPROS, addr, port])
294 @rtype: (int, str, list)
295 """
296 if protocol[0] != TCPROS:
297 return 0, "Internal error: protocol does not match TCPROS: %s"%protocol, []
298 start_tcpros_server()
299 addr, port = get_tcpros_server_address()
300 return 1, "ready on %s:%s"%(addr, port), [TCPROS, addr, port]
301
303 """
304 Process incoming topic connection. Reads in topic name from
305 handshake and creates the appropriate L{TCPROSPub} handler for the
306 connection.
307 @param sock: socket connection
308 @type sock: socket.socket
309 @param client_addr: client address
310 @type client_addr: (str, int)
311 @param header: key/value pairs from handshake header
312 @type header: dict
313 @return: error string or None
314 @rtype: str
315 """
316 if rospy.core.is_shutdown_requested():
317 return "Node is shutting down"
318 for required in ['topic', 'md5sum', 'callerid']:
319 if not required in header:
320 return "Missing required '%s' field"%required
321 else:
322 resolved_topic_name = header['topic']
323 md5sum = header['md5sum']
324 tm = rospy.impl.registration.get_topic_manager()
325 topic = tm.get_publisher_impl(resolved_topic_name)
326 if not topic:
327 return "[%s] is not a publisher of [%s]. Topics are %s"%(rospy.names.get_caller_id(), resolved_topic_name, tm.get_publications())
328 elif not topic.data_class or topic.closed:
329 return "Internal error processing topic [%s]"%(resolved_topic_name)
330 elif md5sum != rospy.names.TOPIC_ANYTYPE and md5sum != topic.data_class._md5sum:
331 data_class = topic.data_class
332 actual_type = data_class._type
333
334
335
336 if 'type' in header:
337 requested_type = header['type']
338 if requested_type != actual_type:
339 return "topic types do not match: [%s] vs. [%s]"%(requested_type, actual_type)
340 else:
341
342 requested_type = actual_type
343
344 return "Client [%s] wants topic [%s] to have datatype/md5sum [%s/%s], but our version has [%s/%s] Dropping connection."%(header['callerid'], resolved_topic_name, requested_type, md5sum, actual_type, data_class._md5sum)
345
346 else:
347
348
349
350 if 'tcp_nodelay' in header:
351 tcp_nodelay = True if header['tcp_nodelay'].strip() == '1' else False
352 else:
353 tcp_nodelay = self.tcp_nodelay_map.get(resolved_topic_name, False)
354
355 _configure_pub_socket(sock, tcp_nodelay)
356 protocol = TCPROSPub(resolved_topic_name, topic.data_class, is_latch=topic.is_latch, headers=topic.headers)
357 transport = TCPROSTransport(protocol, resolved_topic_name)
358 transport.set_socket(sock, header['callerid'])
359 transport.write_header()
360 topic.add_connection(transport)
361
362
364 """
365 It wraps a Transport instance and behaves like one
366 but it queues the data written to it and relays them
367 asynchronously to the wrapped instance.
368 """
369
370 - def __init__(self, connection, queue_size):
371 """
372 ctor.
373 @param connection: the wrapped transport instance
374 @type connection: Transport
375 @param queue_size: the maximum size of the queue, zero means infinite
376 @type queue_size: int
377 """
378 super(QueuedConnection, self).__init__()
379 self._connection = connection
380 self._queue_size = queue_size
381
382 self._lock = threading.Lock()
383 self._cond_data_available = threading.Condition(self._lock)
384 self._queue = []
385 self._error = None
386
387 self._thread = threading.Thread(target=self._run)
388 self._thread.start()
389
391 if name.startswith('__'):
392 raise AttributeError(name)
393 return getattr(self._connection, name)
394
396 with self._lock:
397
398 if self._error:
399 error = self._error
400 self._error = None
401 raise error
402
403 if self._queue_size > 0 and len(self._queue) == self._queue_size:
404 del self._queue[0]
405 self._queue.append(data)
406 self._cond_data_available.notify()
407
408 time.sleep(0)
409 return True
410
412 while not self._connection.done:
413 queue = []
414 with self._lock:
415
416 while not self._queue and not self._connection.done:
417 self._cond_data_available.wait(1.0)
418
419 if self._queue:
420 queue = self._queue
421 self._queue = []
422
423 for data in queue:
424 try:
425 self._connection.write_data(data)
426 except Exception as e:
427 with self._lock:
428 self._error = e
429