1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """Internal use: Topic-specific extensions for TCPROS support"""
36
37 import socket
38 import threading
39 import time
40
41 try:
42 from xmlrpc.client import ServerProxy
43 except ImportError:
44 from xmlrpclib import ServerProxy
45
46 from rospy.core import logwarn, logerr, logdebug, rospyerr
47 import rospy.exceptions
48 import rospy.names
49
50 import rospy.impl.registration
51 import rospy.impl.transport
52
53 from rospy.impl.tcpros_base import TCPROSTransport, TCPROSTransportProtocol, \
54 get_tcpros_server_address, start_tcpros_server,\
55 DEFAULT_BUFF_SIZE, TCPROS
56
58 """
59 Subscription transport implementation for receiving topic data via
60 peer-to-peer TCP/IP sockets
61 """
62
65 """
66 ctor.
67
68 @param resolved_name: resolved subscription name
69 @type resolved_name: str
70
71 @param recv_data_class: class to instantiate to receive
72 messages
73 @type recv_data_class: L{rospy.Message}
74
75 @param queue_size: maximum number of messages to
76 deserialize from newly read data off socket
77 @type queue_size: int
78
79 @param buff_size: recv buffer size
80 @type buff_size: int
81
82 @param tcp_nodelay: If True, request TCP_NODELAY from publisher
83 @type tcp_nodelay: bool
84 """
85 super(TCPROSSub, self).__init__(resolved_name, recv_data_class, queue_size, buff_size)
86 self.direction = rospy.impl.transport.INBOUND
87 self.tcp_nodelay = tcp_nodelay
88
90 """
91 @return: dictionary of subscriber fields
92 @rtype: dict
93 """
94 return {'topic': self.resolved_name,
95 'message_definition': self.recv_data_class._full_text,
96 'tcp_nodelay': '1' if self.tcp_nodelay else '0',
97 'md5sum': self.recv_data_class._md5sum,
98 'type': self.recv_data_class._type,
99 'callerid': rospy.names.get_caller_id()}
100
101
116
117
118
120 """
121 Publisher transport implementation for publishing topic data via
122 peer-to-peer TCP/IP sockets.
123 """
124
125 - def __init__(self, resolved_name, pub_data_class, is_latch=False, headers=None):
126 """
127 ctor.
128 @param resolved_name: resolved topic name
129 @type resolved_name: str
130 @param pub_data_class: class to instance to receive messages
131 @type pub_data_class: L{rospy.Message} class
132 @param is_latch: If True, Publisher is latching
133 @type is_latch: bool
134 """
135
136 super(TCPROSPub, self).__init__(resolved_name, None, queue_size=None, buff_size=128)
137 self.pub_data_class = pub_data_class
138 self.direction = rospy.impl.transport.OUTBOUND
139 self.is_latch = is_latch
140 self.headers = headers if headers else {}
141
143 base = {'topic': self.resolved_name,
144 'type': self.pub_data_class._type,
145 'latching': '1' if self.is_latch else '0',
146 'message_definition': self.pub_data_class._full_text,
147 'md5sum': self.pub_data_class._md5sum,
148 'callerid': rospy.names.get_caller_id() }
149
150
151
152
153 if self.headers:
154 base.update(self.headers)
155 return base
156
158 """
159 Keeps trying to create connection for subscriber. Then passes off to receive_loop once connected.
160 """
161
162
163
164
165 interval = 0.5
166 while conn.socket is None and not conn.done and not rospy.is_shutdown():
167 try:
168 conn.connect(dest_addr, dest_port, pub_uri, timeout=60.)
169 except rospy.exceptions.TransportInitError as e:
170
171
172 if conn.protocol is None:
173 conn.done = True
174 break
175 rospyerr("unable to create subscriber transport: %s. Will try again in %ss", e, interval)
176 if interval < 30.0:
177
178 interval = interval * 2
179 time.sleep(interval)
180
181
182 conn.done = not check_if_still_publisher(resolved_topic_name, pub_uri)
183
184 if not conn.done:
185 conn.receive_loop(receive_cb)
186
188 try:
189 s = ServerProxy(pub_uri)
190 code, msg, val = s.getPublications(rospy.names.get_name())
191 if code == 1:
192 return len([t for t in val if t[0] == resolved_topic_name]) > 0
193 else:
194 return False
195 except:
196 return False
197
199 """
200 ROS Protocol handler for TCPROS. Accepts both TCPROS topic
201 connections as well as ROS service connections over TCP. TCP server
202 socket is run once start_server() is called -- this is implicitly
203 called during init_publisher().
204 """
205
207 """ctor"""
208 self.tcp_nodelay_map = {}
209
211 """
212 @param resolved_name: resolved topic name
213 @type resolved_name: str
214
215 @param tcp_nodelay: If True, sets TCP_NODELAY on publisher's
216 socket (disables Nagle algorithm). This results in lower
217 latency publishing at the cost of efficiency.
218 @type tcp_nodelay: bool
219 """
220 self.tcp_nodelay_map[resolved_name] = tcp_nodelay
221
223 """
224 stops the TCP/IP server responsible for receiving inbound connections
225 """
226 pass
227
229 """
230 Connect to topic resolved_name on Publisher pub_uri using TCPROS.
231 @param resolved_name str: resolved topic name
232 @type resolved_name: str
233 @param pub_uri: XML-RPC URI of publisher
234 @type pub_uri: str
235 @param protocol_params: protocol parameters to use for connecting
236 @type protocol_params: [XmlRpcLegal]
237 @return: code, message, debug
238 @rtype: (int, str, int)
239 """
240
241
242 if type(protocol_params) != list or len(protocol_params) != 3:
243 return 0, "ERROR: invalid TCPROS parameters", 0
244 if protocol_params[0] != TCPROS:
245 return 0, "INTERNAL ERROR: protocol id is not TCPROS: %s"%protocol_params[0], 0
246 id, dest_addr, dest_port = protocol_params
247
248 sub = rospy.impl.registration.get_topic_manager().get_subscriber_impl(resolved_name)
249
250
251 protocol = TCPROSSub(resolved_name, sub.data_class, \
252 queue_size=sub.queue_size, buff_size=sub.buff_size,
253 tcp_nodelay=sub.tcp_nodelay)
254 conn = TCPROSTransport(protocol, resolved_name)
255 conn.set_endpoint_id(pub_uri);
256
257 t = threading.Thread(name=resolved_name, target=robust_connect_subscriber, args=(conn, dest_addr, dest_port, pub_uri, sub.receive_callback,resolved_name))
258
259
260
261
262 if sub.add_connection(conn):
263
264
265
266 t.start()
267 return 1, "Connected topic[%s]. Transport impl[%s]"%(resolved_name, conn.__class__.__name__), dest_port
268 else:
269
270 conn.close()
271 return 0, "ERROR: Race condition failure creating topic subscriber [%s]"%(resolved_name), 0
272
274 """
275 @param protocol: name of protocol
276 @type protocol: str
277 @return: True if protocol is supported
278 @rtype: bool
279 """
280 return protocol == TCPROS
281
283 """
284 Get supported protocols
285 """
286 return [[TCPROS]]
287
289 """
290 Initialize this node to receive an inbound TCP connection,
291 i.e. startup a TCP server if one is not already running.
292
293 @param resolved_name: topic name
294 @type resolved__name: str
295
296 @param protocol: negotiated protocol
297 parameters. protocol[0] must be the string 'TCPROS'
298 @type protocol: [str, value*]
299 @return: (code, msg, [TCPROS, addr, port])
300 @rtype: (int, str, list)
301 """
302 if protocol[0] != TCPROS:
303 return 0, "Internal error: protocol does not match TCPROS: %s"%protocol, []
304 start_tcpros_server()
305 addr, port = get_tcpros_server_address()
306 return 1, "ready on %s:%s"%(addr, port), [TCPROS, addr, port]
307
309 """
310 Process incoming topic connection. Reads in topic name from
311 handshake and creates the appropriate L{TCPROSPub} handler for the
312 connection.
313 @param sock: socket connection
314 @type sock: socket.socket
315 @param client_addr: client address
316 @type client_addr: (str, int)
317 @param header: key/value pairs from handshake header
318 @type header: dict
319 @return: error string or None
320 @rtype: str
321 """
322 if rospy.core.is_shutdown_requested():
323 return "Node is shutting down"
324 for required in ['topic', 'md5sum', 'callerid']:
325 if not required in header:
326 return "Missing required '%s' field"%required
327 else:
328 resolved_topic_name = header['topic']
329 md5sum = header['md5sum']
330 tm = rospy.impl.registration.get_topic_manager()
331 topic = tm.get_publisher_impl(resolved_topic_name)
332 if not topic:
333 return "[%s] is not a publisher of [%s]. Topics are %s"%(rospy.names.get_caller_id(), resolved_topic_name, tm.get_publications())
334 elif not topic.data_class or topic.closed:
335 return "Internal error processing topic [%s]"%(resolved_topic_name)
336 elif md5sum != rospy.names.TOPIC_ANYTYPE and md5sum != topic.data_class._md5sum:
337 data_class = topic.data_class
338 actual_type = data_class._type
339
340
341
342 if 'type' in header:
343 requested_type = header['type']
344 if requested_type != actual_type:
345 return "topic types do not match: [%s] vs. [%s]"%(requested_type, actual_type)
346 else:
347
348 requested_type = actual_type
349
350 return "Client [%s] wants topic [%s] to have datatype/md5sum [%s/%s], but our version has [%s/%s] Dropping connection."%(header['callerid'], resolved_topic_name, requested_type, md5sum, actual_type, data_class._md5sum)
351
352 else:
353
354
355
356 if 'tcp_nodelay' in header:
357 tcp_nodelay = True if header['tcp_nodelay'].strip() == '1' else False
358 else:
359 tcp_nodelay = self.tcp_nodelay_map.get(resolved_topic_name, False)
360
361 _configure_pub_socket(sock, tcp_nodelay)
362 protocol = TCPROSPub(resolved_topic_name, topic.data_class, is_latch=topic.is_latch, headers=topic.headers)
363 transport = TCPROSTransport(protocol, resolved_topic_name)
364 transport.set_socket(sock, header['callerid'])
365 transport.remote_endpoint = client_addr
366 transport.write_header()
367 topic.add_connection(transport)
368
369
371 """
372 It wraps a Transport instance and behaves like one
373 but it queues the data written to it and relays them
374 asynchronously to the wrapped instance.
375 """
376
377 - def __init__(self, connection, queue_size):
378 """
379 ctor.
380 @param connection: the wrapped transport instance
381 @type connection: Transport
382 @param queue_size: the maximum size of the queue, zero means infinite
383 @type queue_size: int
384 """
385 super(QueuedConnection, self).__init__()
386 self._connection = connection
387 self._queue_size = queue_size
388
389 self._lock = threading.Lock()
390 self._cond_data_available = threading.Condition(self._lock)
391 self._connection.set_cleanup_callback(self._closed_connection_callback)
392 self._queue = []
393 self._error = None
394
395 self._thread = threading.Thread(target=self._run)
396 self._thread.start()
397
399 with self._lock:
400 self._cond_data_available.notify()
401
403 if name.startswith('__'):
404 raise AttributeError(name)
405 return getattr(self._connection, name)
406
408 with self._lock:
409
410 if self._error:
411 error = self._error
412 self._error = None
413 raise error
414
415 if self._queue_size > 0 and len(self._queue) == self._queue_size:
416 del self._queue[0]
417 self._queue.append(data)
418 self._cond_data_available.notify()
419
420 time.sleep(0)
421 return True
422
424 while not self._connection.done:
425 queue = []
426 with self._lock:
427
428 while not self._queue and not self._connection.done:
429 self._cond_data_available.wait()
430
431 if self._queue:
432 queue = self._queue
433 self._queue = []
434
435 for data in queue:
436 try:
437 self._connection.write_data(data)
438 except Exception as e:
439 with self._lock:
440 self._error = e
441