1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """Internal use: Topic-specific extensions for TCPROS support"""
36
37 import socket
38 import threading
39 import time
40
41 try:
42 from xmlrpc.client import ServerProxy
43 except ImportError:
44 from xmlrpclib import ServerProxy
45
46 from rospy.core import logwarn, logerr, logdebug, rospyerr
47 import rospy.exceptions
48 import rospy.names
49
50 import rospy.impl.registration
51 import rospy.impl.transport
52
53 from rospy.impl.tcpros_base import TCPROSTransport, TCPROSTransportProtocol, \
54 get_tcpros_server_address, start_tcpros_server,\
55 DEFAULT_BUFF_SIZE, TCPROS
56
58 """
59 Subscription transport implementation for receiving topic data via
60 peer-to-peer TCP/IP sockets
61 """
62
65 """
66 ctor.
67
68 @param resolved_name: resolved subscription name
69 @type resolved_name: str
70
71 @param recv_data_class: class to instantiate to receive
72 messages
73 @type recv_data_class: L{rospy.Message}
74
75 @param queue_size: maximum number of messages to
76 deserialize from newly read data off socket
77 @type queue_size: int
78
79 @param buff_size: recv buffer size
80 @type buff_size: int
81
82 @param tcp_nodelay: If True, request TCP_NODELAY from publisher
83 @type tcp_nodelay: bool
84 """
85 super(TCPROSSub, self).__init__(resolved_name, recv_data_class, queue_size, buff_size)
86 self.direction = rospy.impl.transport.INBOUND
87 self.tcp_nodelay = tcp_nodelay
88
90 """
91 @return: dictionary of subscriber fields
92 @rtype: dict
93 """
94 return {'topic': self.resolved_name,
95 'message_definition': self.recv_data_class._full_text,
96 'tcp_nodelay': '1' if self.tcp_nodelay else '0',
97 'md5sum': self.recv_data_class._md5sum,
98 'type': self.recv_data_class._type,
99 'callerid': rospy.names.get_caller_id()}
100
101
116
117
118
120 """
121 Publisher transport implementation for publishing topic data via
122 peer-to-peer TCP/IP sockets.
123 """
124
125 - def __init__(self, resolved_name, pub_data_class, is_latch=False, headers=None):
126 """
127 ctor.
128 @param resolved_name: resolved topic name
129 @type resolved_name: str
130 @param pub_data_class: class to instance to receive messages
131 @type pub_data_class: L{rospy.Message} class
132 @param is_latch: If True, Publisher is latching
133 @type is_latch: bool
134 """
135
136 super(TCPROSPub, self).__init__(resolved_name, None, queue_size=None, buff_size=128)
137 self.pub_data_class = pub_data_class
138 self.direction = rospy.impl.transport.OUTBOUND
139 self.is_latch = is_latch
140 self.headers = headers if headers else {}
141
143 base = {'topic': self.resolved_name,
144 'type': self.pub_data_class._type,
145 'latching': '1' if self.is_latch else '0',
146 'message_definition': self.pub_data_class._full_text,
147 'md5sum': self.pub_data_class._md5sum,
148 'callerid': rospy.names.get_caller_id() }
149
150
151
152
153 if self.headers:
154 base.update(self.headers)
155 return base
156
158 """
159 Keeps trying to create connection for subscriber. Then passes off to receive_loop once connected.
160 """
161
162
163
164
165 interval = 0.5
166 while conn.socket is None and not conn.done and not rospy.is_shutdown():
167 try:
168 conn.connect(dest_addr, dest_port, pub_uri, timeout=60.)
169 except rospy.exceptions.TransportInitError as e:
170
171
172 if conn.protocol is None:
173 conn.done = True
174 break
175 rospyerr("unable to create subscriber transport: %s. Will try again in %ss", e, interval)
176 interval = interval * 2
177 time.sleep(interval)
178
179
180 conn.done = not check_if_still_publisher(resolved_topic_name, pub_uri)
181
182 if not conn.done:
183 conn.receive_loop(receive_cb)
184
186 try:
187 s = ServerProxy(pub_uri)
188 code, msg, val = s.getPublications(rospy.names.get_name())
189 if code == 1:
190 return len([t for t in val if t[0] == resolved_topic_name]) > 0
191 else:
192 return False
193 except:
194 return False
195
197 """
198 ROS Protocol handler for TCPROS. Accepts both TCPROS topic
199 connections as well as ROS service connections over TCP. TCP server
200 socket is run once start_server() is called -- this is implicitly
201 called during init_publisher().
202 """
203
205 """ctor"""
206 self.tcp_nodelay_map = {}
207
209 """
210 @param resolved_name: resolved topic name
211 @type resolved_name: str
212
213 @param tcp_nodelay: If True, sets TCP_NODELAY on publisher's
214 socket (disables Nagle algorithm). This results in lower
215 latency publishing at the cost of efficiency.
216 @type tcp_nodelay: bool
217 """
218 self.tcp_nodelay_map[resolved_name] = tcp_nodelay
219
221 """
222 stops the TCP/IP server responsible for receiving inbound connections
223 """
224 pass
225
227 """
228 Connect to topic resolved_name on Publisher pub_uri using TCPROS.
229 @param resolved_name str: resolved topic name
230 @type resolved_name: str
231 @param pub_uri: XML-RPC URI of publisher
232 @type pub_uri: str
233 @param protocol_params: protocol parameters to use for connecting
234 @type protocol_params: [XmlRpcLegal]
235 @return: code, message, debug
236 @rtype: (int, str, int)
237 """
238
239
240 if type(protocol_params) != list or len(protocol_params) != 3:
241 return 0, "ERROR: invalid TCPROS parameters", 0
242 if protocol_params[0] != TCPROS:
243 return 0, "INTERNAL ERROR: protocol id is not TCPROS: %s"%id, 0
244 id, dest_addr, dest_port = protocol_params
245
246 sub = rospy.impl.registration.get_topic_manager().get_subscriber_impl(resolved_name)
247
248
249 protocol = TCPROSSub(resolved_name, sub.data_class, \
250 queue_size=sub.queue_size, buff_size=sub.buff_size,
251 tcp_nodelay=sub.tcp_nodelay)
252 conn = TCPROSTransport(protocol, resolved_name)
253 conn.set_endpoint_id(pub_uri);
254
255 t = threading.Thread(name=resolved_name, target=robust_connect_subscriber, args=(conn, dest_addr, dest_port, pub_uri, sub.receive_callback,resolved_name))
256
257
258
259
260 if sub.add_connection(conn):
261
262
263
264 t.start()
265 return 1, "Connected topic[%s]. Transport impl[%s]"%(resolved_name, conn.__class__.__name__), dest_port
266 else:
267
268 conn.close()
269 return 0, "ERROR: Race condition failure creating topic subscriber [%s]"%(resolved_name), 0
270
272 """
273 @param protocol: name of protocol
274 @type protocol: str
275 @return: True if protocol is supported
276 @rtype: bool
277 """
278 return protocol == TCPROS
279
281 """
282 Get supported protocols
283 """
284 return [[TCPROS]]
285
287 """
288 Initialize this node to receive an inbound TCP connection,
289 i.e. startup a TCP server if one is not already running.
290
291 @param resolved_name: topic name
292 @type resolved__name: str
293
294 @param protocol: negotiated protocol
295 parameters. protocol[0] must be the string 'TCPROS'
296 @type protocol: [str, value*]
297 @return: (code, msg, [TCPROS, addr, port])
298 @rtype: (int, str, list)
299 """
300 if protocol[0] != TCPROS:
301 return 0, "Internal error: protocol does not match TCPROS: %s"%protocol, []
302 start_tcpros_server()
303 addr, port = get_tcpros_server_address()
304 return 1, "ready on %s:%s"%(addr, port), [TCPROS, addr, port]
305
307 """
308 Process incoming topic connection. Reads in topic name from
309 handshake and creates the appropriate L{TCPROSPub} handler for the
310 connection.
311 @param sock: socket connection
312 @type sock: socket.socket
313 @param client_addr: client address
314 @type client_addr: (str, int)
315 @param header: key/value pairs from handshake header
316 @type header: dict
317 @return: error string or None
318 @rtype: str
319 """
320 if rospy.core.is_shutdown_requested():
321 return "Node is shutting down"
322 for required in ['topic', 'md5sum', 'callerid']:
323 if not required in header:
324 return "Missing required '%s' field"%required
325 else:
326 resolved_topic_name = header['topic']
327 md5sum = header['md5sum']
328 tm = rospy.impl.registration.get_topic_manager()
329 topic = tm.get_publisher_impl(resolved_topic_name)
330 if not topic:
331 return "[%s] is not a publisher of [%s]. Topics are %s"%(rospy.names.get_caller_id(), resolved_topic_name, tm.get_publications())
332 elif not topic.data_class or topic.closed:
333 return "Internal error processing topic [%s]"%(resolved_topic_name)
334 elif md5sum != rospy.names.TOPIC_ANYTYPE and md5sum != topic.data_class._md5sum:
335 data_class = topic.data_class
336 actual_type = data_class._type
337
338
339
340 if 'type' in header:
341 requested_type = header['type']
342 if requested_type != actual_type:
343 return "topic types do not match: [%s] vs. [%s]"%(requested_type, actual_type)
344 else:
345
346 requested_type = actual_type
347
348 return "Client [%s] wants topic [%s] to have datatype/md5sum [%s/%s], but our version has [%s/%s] Dropping connection."%(header['callerid'], resolved_topic_name, requested_type, md5sum, actual_type, data_class._md5sum)
349
350 else:
351
352
353
354 if 'tcp_nodelay' in header:
355 tcp_nodelay = True if header['tcp_nodelay'].strip() == '1' else False
356 else:
357 tcp_nodelay = self.tcp_nodelay_map.get(resolved_topic_name, False)
358
359 _configure_pub_socket(sock, tcp_nodelay)
360 protocol = TCPROSPub(resolved_topic_name, topic.data_class, is_latch=topic.is_latch, headers=topic.headers)
361 transport = TCPROSTransport(protocol, resolved_topic_name)
362 transport.set_socket(sock, header['callerid'])
363 transport.remote_endpoint = client_addr
364 transport.write_header()
365 topic.add_connection(transport)
366
367
369 """
370 It wraps a Transport instance and behaves like one
371 but it queues the data written to it and relays them
372 asynchronously to the wrapped instance.
373 """
374
375 - def __init__(self, connection, queue_size):
376 """
377 ctor.
378 @param connection: the wrapped transport instance
379 @type connection: Transport
380 @param queue_size: the maximum size of the queue, zero means infinite
381 @type queue_size: int
382 """
383 super(QueuedConnection, self).__init__()
384 self._connection = connection
385 self._queue_size = queue_size
386
387 self._lock = threading.Lock()
388 self._cond_data_available = threading.Condition(self._lock)
389 self._connection.set_cleanup_callback(self._closed_connection_callback)
390 self._queue = []
391 self._error = None
392
393 self._thread = threading.Thread(target=self._run)
394 self._thread.start()
395
397 with self._lock:
398 self._cond_data_available.notify()
399
401 if name.startswith('__'):
402 raise AttributeError(name)
403 return getattr(self._connection, name)
404
406 with self._lock:
407
408 if self._error:
409 error = self._error
410 self._error = None
411 raise error
412
413 if self._queue_size > 0 and len(self._queue) == self._queue_size:
414 del self._queue[0]
415 self._queue.append(data)
416 self._cond_data_available.notify()
417
418 time.sleep(0)
419 return True
420
422 while not self._connection.done:
423 queue = []
424 with self._lock:
425
426 while not self._queue and not self._connection.done:
427 self._cond_data_available.wait()
428
429 if self._queue:
430 queue = self._queue
431 self._queue = []
432
433 for data in queue:
434 try:
435 self._connection.write_data(data)
436 except Exception as e:
437 with self._lock:
438 self._error = e
439