1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """Internal use: Topic-specific extensions for TCPROS support"""
36
37 import socket
38 import threading
39 import time
40
41 try:
42 from xmlrpc.client import ServerProxy
43 except ImportError:
44 from xmlrpclib import ServerProxy
45
46 from rospy.core import logwarn, logerr, logdebug, rospyerr
47 import rospy.exceptions
48 import rospy.names
49
50 import rospy.impl.registration
51 import rospy.impl.transport
52
53 from rospy.impl.tcpros_base import TCPROSTransport, TCPROSTransportProtocol, \
54 get_tcpros_server_address, start_tcpros_server,\
55 DEFAULT_BUFF_SIZE, TCPROS
56
58 """
59 Subscription transport implementation for receiving topic data via
60 peer-to-peer TCP/IP sockets
61 """
62
65 """
66 ctor.
67
68 @param resolved_name: resolved subscription name
69 @type resolved_name: str
70
71 @param recv_data_class: class to instantiate to receive
72 messages
73 @type recv_data_class: L{rospy.Message}
74
75 @param queue_size: maximum number of messages to
76 deserialize from newly read data off socket
77 @type queue_size: int
78
79 @param buff_size: recv buffer size
80 @type buff_size: int
81
82 @param tcp_nodelay: If True, request TCP_NODELAY from publisher
83 @type tcp_nodelay: bool
84 """
85 super(TCPROSSub, self).__init__(resolved_name, recv_data_class, queue_size, buff_size)
86 self.direction = rospy.impl.transport.INBOUND
87 self.tcp_nodelay = tcp_nodelay
88
90 """
91 @return: dictionary of subscriber fields
92 @rtype: dict
93 """
94 return {'topic': self.resolved_name,
95 'message_definition': self.recv_data_class._full_text,
96 'tcp_nodelay': '1' if self.tcp_nodelay else '0',
97 'md5sum': self.recv_data_class._md5sum,
98 'type': self.recv_data_class._type,
99 'callerid': rospy.names.get_caller_id()}
100
101
116
117
118
120 """
121 Publisher transport implementation for publishing topic data via
122 peer-to-peer TCP/IP sockets.
123 """
124
125 - def __init__(self, resolved_name, pub_data_class, is_latch=False, headers=None):
126 """
127 ctor.
128 @param resolved_name: resolved topic name
129 @type resolved_name: str
130 @param pub_data_class: class to instance to receive messages
131 @type pub_data_class: L{rospy.Message} class
132 @param is_latch: If True, Publisher is latching
133 @type is_latch: bool
134 """
135
136 super(TCPROSPub, self).__init__(resolved_name, None, queue_size=None, buff_size=128)
137 self.pub_data_class = pub_data_class
138 self.direction = rospy.impl.transport.OUTBOUND
139 self.is_latch = is_latch
140 self.headers = headers if headers else {}
141
143 base = {'topic': self.resolved_name,
144 'type': self.pub_data_class._type,
145 'latching': '1' if self.is_latch else '0',
146 'message_definition': self.pub_data_class._full_text,
147 'md5sum': self.pub_data_class._md5sum,
148 'callerid': rospy.names.get_caller_id() }
149
150
151
152
153 if self.headers:
154 base.update(self.headers)
155 return base
156
158 """
159 Keeps trying to create connection for subscriber. Then passes off to receive_loop once connected.
160 """
161
162
163
164
165 interval = 0.5
166 while conn.socket is None and not conn.done and not rospy.is_shutdown():
167 try:
168 conn.connect(dest_addr, dest_port, pub_uri, timeout=60.)
169 except rospy.exceptions.TransportInitError as e:
170 rospyerr("unable to create subscriber transport: %s. Will try again in %ss", e, interval)
171 interval = interval * 2
172 time.sleep(interval)
173
174
175 conn.done = not check_if_still_publisher(resolved_topic_name, pub_uri)
176
177 if not conn.done:
178 conn.receive_loop(receive_cb)
179
181 try:
182 s = ServerProxy(pub_uri)
183 code, msg, val = s.getPublications(rospy.names.get_name())
184 if code == 1:
185 return len([t for t in val if t[0] == resolved_topic_name]) > 0
186 else:
187 return False
188 except:
189 return False
190
192 """
193 ROS Protocol handler for TCPROS. Accepts both TCPROS topic
194 connections as well as ROS service connections over TCP. TCP server
195 socket is run once start_server() is called -- this is implicitly
196 called during init_publisher().
197 """
198
200 """ctor"""
201 self.tcp_nodelay_map = {}
202
204 """
205 @param resolved_name: resolved topic name
206 @type resolved_name: str
207
208 @param tcp_nodelay: If True, sets TCP_NODELAY on publisher's
209 socket (disables Nagle algorithm). This results in lower
210 latency publishing at the cost of efficiency.
211 @type tcp_nodelay: bool
212 """
213 self.tcp_nodelay_map[resolved_name] = tcp_nodelay
214
216 """
217 stops the TCP/IP server responsible for receiving inbound connections
218 """
219 pass
220
222 """
223 Connect to topic resolved_name on Publisher pub_uri using TCPROS.
224 @param resolved_name str: resolved topic name
225 @type resolved_name: str
226 @param pub_uri: XML-RPC URI of publisher
227 @type pub_uri: str
228 @param protocol_params: protocol parameters to use for connecting
229 @type protocol_params: [XmlRpcLegal]
230 @return: code, message, debug
231 @rtype: (int, str, int)
232 """
233
234
235 if type(protocol_params) != list or len(protocol_params) != 3:
236 return 0, "ERROR: invalid TCPROS parameters", 0
237 if protocol_params[0] != TCPROS:
238 return 0, "INTERNAL ERROR: protocol id is not TCPROS: %s"%id, 0
239 id, dest_addr, dest_port = protocol_params
240
241 sub = rospy.impl.registration.get_topic_manager().get_subscriber_impl(resolved_name)
242
243
244 protocol = TCPROSSub(resolved_name, sub.data_class, \
245 queue_size=sub.queue_size, buff_size=sub.buff_size,
246 tcp_nodelay=sub.tcp_nodelay)
247 conn = TCPROSTransport(protocol, resolved_name)
248 t = threading.Thread(name=resolved_name, target=robust_connect_subscriber, args=(conn, dest_addr, dest_port, pub_uri, sub.receive_callback,resolved_name))
249
250
251 t.start()
252
253
254 if sub.add_connection(conn):
255 return 1, "Connected topic[%s]. Transport impl[%s]"%(resolved_name, conn.__class__.__name__), dest_port
256 else:
257 conn.close()
258 return 0, "ERROR: Race condition failure: duplicate topic subscriber [%s] was created"%(resolved_name), 0
259
261 """
262 @param protocol: name of protocol
263 @type protocol: str
264 @return: True if protocol is supported
265 @rtype: bool
266 """
267 return protocol == TCPROS
268
270 """
271 Get supported protocols
272 """
273 return [[TCPROS]]
274
276 """
277 Initialize this node to receive an inbound TCP connection,
278 i.e. startup a TCP server if one is not already running.
279
280 @param resolved_name: topic name
281 @type resolved__name: str
282
283 @param protocol: negotiated protocol
284 parameters. protocol[0] must be the string 'TCPROS'
285 @type protocol: [str, value*]
286 @return: (code, msg, [TCPROS, addr, port])
287 @rtype: (int, str, list)
288 """
289 if protocol[0] != TCPROS:
290 return 0, "Internal error: protocol does not match TCPROS: %s"%protocol, []
291 start_tcpros_server()
292 addr, port = get_tcpros_server_address()
293 return 1, "ready on %s:%s"%(addr, port), [TCPROS, addr, port]
294
296 """
297 Process incoming topic connection. Reads in topic name from
298 handshake and creates the appropriate L{TCPROSPub} handler for the
299 connection.
300 @param sock: socket connection
301 @type sock: socket.socket
302 @param client_addr: client address
303 @type client_addr: (str, int)
304 @param header: key/value pairs from handshake header
305 @type header: dict
306 @return: error string or None
307 @rtype: str
308 """
309 if rospy.core.is_shutdown_requested():
310 return "Node is shutting down"
311 for required in ['topic', 'md5sum', 'callerid']:
312 if not required in header:
313 return "Missing required '%s' field"%required
314 else:
315 resolved_topic_name = header['topic']
316 md5sum = header['md5sum']
317 tm = rospy.impl.registration.get_topic_manager()
318 topic = tm.get_publisher_impl(resolved_topic_name)
319 if not topic:
320 return "[%s] is not a publisher of [%s]. Topics are %s"%(rospy.names.get_caller_id(), resolved_topic_name, tm.get_publications())
321 elif not topic.data_class or topic.closed:
322 return "Internal error processing topic [%s]"%(resolved_topic_name)
323 elif md5sum != rospy.names.TOPIC_ANYTYPE and md5sum != topic.data_class._md5sum:
324 data_class = topic.data_class
325 actual_type = data_class._type
326
327
328
329 if 'type' in header:
330 requested_type = header['type']
331 if requested_type != actual_type:
332 return "topic types do not match: [%s] vs. [%s]"%(requested_type, actual_type)
333 else:
334
335 requested_type = actual_type
336
337 return "Client [%s] wants topic [%s] to have datatype/md5sum [%s/%s], but our version has [%s/%s] Dropping connection."%(header['callerid'], resolved_topic_name, requested_type, md5sum, actual_type, data_class._md5sum)
338
339 else:
340
341
342
343 if 'tcp_nodelay' in header:
344 tcp_nodelay = True if header['tcp_nodelay'].strip() == '1' else False
345 else:
346 tcp_nodelay = self.tcp_nodelay_map.get(resolved_topic_name, False)
347
348 _configure_pub_socket(sock, tcp_nodelay)
349 protocol = TCPROSPub(resolved_topic_name, topic.data_class, is_latch=topic.is_latch, headers=topic.headers)
350 transport = TCPROSTransport(protocol, resolved_topic_name)
351 transport.set_socket(sock, header['callerid'])
352 transport.write_header()
353 topic.add_connection(transport)
354