1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """Internal use: Topic-specific extensions for TCPROS support"""
36
37 import socket
38 import threading
39 import time
40
41 from rospy.core import logwarn, logerr, logdebug, rospyerr
42 import rospy.exceptions
43 import rospy.names
44
45 import rospy.impl.registration
46 import rospy.impl.transport
47
48 from rospy.impl.tcpros_base import TCPROSTransport, TCPROSTransportProtocol, \
49 get_tcpros_server_address, start_tcpros_server,\
50 DEFAULT_BUFF_SIZE, TCPROS
51
53 """
54 Subscription transport implementation for receiving topic data via
55 peer-to-peer TCP/IP sockets
56 """
57
60 """
61 ctor.
62
63 @param resolved_name: resolved subscription name
64 @type resolved_name: str
65
66 @param recv_data_class: class to instantiate to receive
67 messages
68 @type recv_data_class: L{rospy.Message}
69
70 @param queue_size: maximum number of messages to
71 deserialize from newly read data off socket
72 @type queue_size: int
73
74 @param buff_size: recv buffer size
75 @type buff_size: int
76
77 @param tcp_nodelay: If True, request TCP_NODELAY from publisher
78 @type tcp_nodelay: bool
79 """
80 super(TCPROSSub, self).__init__(resolved_name, recv_data_class, queue_size, buff_size)
81 self.direction = rospy.impl.transport.INBOUND
82 self.tcp_nodelay = tcp_nodelay
83
85 """
86 @return: dictionary of subscriber fields
87 @rtype: dict
88 """
89 return {'topic': self.resolved_name,
90 'message_definition': self.recv_data_class._full_text,
91 'tcp_nodelay': '1' if self.tcp_nodelay else '0',
92 'md5sum': self.recv_data_class._md5sum,
93 'type': self.recv_data_class._type,
94 'callerid': rospy.names.get_caller_id()}
95
96
111
112
113
115 """
116 Publisher transport implementation for publishing topic data via
117 peer-to-peer TCP/IP sockets.
118 """
119
120 - def __init__(self, resolved_name, pub_data_class, is_latch=False, headers=None):
121 """
122 ctor.
123 @param resolved_name: resolved topic name
124 @type resolved_name: str
125 @param pub_data_class: class to instance to receive messages
126 @type pub_data_class: L{rospy.Message} class
127 @param is_latch: If True, Publisher is latching
128 @type is_latch: bool
129 """
130
131 super(TCPROSPub, self).__init__(resolved_name, None, queue_size=None, buff_size=128)
132 self.pub_data_class = pub_data_class
133 self.direction = rospy.impl.transport.OUTBOUND
134 self.is_latch = is_latch
135 self.headers = headers if headers else {}
136
138 base = {'topic': self.resolved_name,
139 'type': self.pub_data_class._type,
140 'latching': '1' if self.is_latch else '0',
141 'message_definition': self.pub_data_class._full_text,
142 'md5sum': self.pub_data_class._md5sum,
143 'callerid': rospy.names.get_caller_id() }
144
145
146
147
148 if self.headers:
149 base.update(self.headers)
150 return base
151
153 """
154 Keeps trying to create connection for subscriber. Then passes off to receive_loop once connected.
155 """
156
157
158
159
160 interval = 0.5
161 while conn.socket is None and not conn.done and not rospy.is_shutdown():
162 try:
163 conn.connect(dest_addr, dest_port, pub_uri, timeout=60.)
164 except rospy.exceptions.TransportInitError as e:
165 rospyerr("unable to create subscriber transport: %s. Will try again in %ss", e, interval)
166 interval = interval * 2
167 time.sleep(interval)
168
169 conn.receive_loop(receive_cb)
170
172 """
173 ROS Protocol handler for TCPROS. Accepts both TCPROS topic
174 connections as well as ROS service connections over TCP. TCP server
175 socket is run once start_server() is called -- this is implicitly
176 called during init_publisher().
177 """
178
180 """ctor"""
181 self.tcp_nodelay_map = {}
182
184 """
185 @param resolved_name: resolved topic name
186 @type resolved_name: str
187
188 @param tcp_nodelay: If True, sets TCP_NODELAY on publisher's
189 socket (disables Nagle algorithm). This results in lower
190 latency publishing at the cost of efficiency.
191 @type tcp_nodelay: bool
192 """
193 self.tcp_nodelay_map[resolved_name] = tcp_nodelay
194
196 """
197 stops the TCP/IP server responsible for receiving inbound connections
198 """
199 pass
200
202 """
203 Connect to topic resolved_name on Publisher pub_uri using TCPROS.
204 @param resolved_name str: resolved topic name
205 @type resolved_name: str
206 @param pub_uri: XML-RPC URI of publisher
207 @type pub_uri: str
208 @param protocol_params: protocol parameters to use for connecting
209 @type protocol_params: [XmlRpcLegal]
210 @return: code, message, debug
211 @rtype: (int, str, int)
212 """
213
214
215 if type(protocol_params) != list or len(protocol_params) != 3:
216 return 0, "ERROR: invalid TCPROS parameters", 0
217 if protocol_params[0] != TCPROS:
218 return 0, "INTERNAL ERROR: protocol id is not TCPROS: %s"%id, 0
219 id, dest_addr, dest_port = protocol_params
220
221 sub = rospy.impl.registration.get_topic_manager().get_subscriber_impl(resolved_name)
222
223
224 protocol = TCPROSSub(resolved_name, sub.data_class, \
225 queue_size=sub.queue_size, buff_size=sub.buff_size,
226 tcp_nodelay=sub.tcp_nodelay)
227 conn = TCPROSTransport(protocol, resolved_name)
228 t = threading.Thread(name=resolved_name, target=robust_connect_subscriber, args=(conn, dest_addr, dest_port, pub_uri, sub.receive_callback,))
229
230
231 t.start()
232
233
234 if sub.add_connection(conn):
235 return 1, "Connected topic[%s]. Transport impl[%s]"%(resolved_name, conn.__class__.__name__), dest_port
236 else:
237 conn.close()
238 return 0, "ERROR: Race condition failure: duplicate topic subscriber [%s] was created"%(resolved_name), 0
239
241 """
242 @param protocol: name of protocol
243 @type protocol: str
244 @return: True if protocol is supported
245 @rtype: bool
246 """
247 return protocol == TCPROS
248
250 """
251 Get supported protocols
252 """
253 return [[TCPROS]]
254
256 """
257 Initialize this node to receive an inbound TCP connection,
258 i.e. startup a TCP server if one is not already running.
259
260 @param resolved_name: topic name
261 @type resolved__name: str
262
263 @param protocol: negotiated protocol
264 parameters. protocol[0] must be the string 'TCPROS'
265 @type protocol: [str, value*]
266 @return: (code, msg, [TCPROS, addr, port])
267 @rtype: (int, str, list)
268 """
269 if protocol[0] != TCPROS:
270 return 0, "Internal error: protocol does not match TCPROS: %s"%protocol, []
271 start_tcpros_server()
272 addr, port = get_tcpros_server_address()
273 return 1, "ready on %s:%s"%(addr, port), [TCPROS, addr, port]
274
276 """
277 Process incoming topic connection. Reads in topic name from
278 handshake and creates the appropriate L{TCPROSPub} handler for the
279 connection.
280 @param sock: socket connection
281 @type sock: socket.socket
282 @param client_addr: client address
283 @type client_addr: (str, int)
284 @param header: key/value pairs from handshake header
285 @type header: dict
286 @return: error string or None
287 @rtype: str
288 """
289 if rospy.core.is_shutdown_requested():
290 return "Node is shutting down"
291 for required in ['topic', 'md5sum', 'callerid']:
292 if not required in header:
293 return "Missing required '%s' field"%required
294 else:
295 resolved_topic_name = header['topic']
296 md5sum = header['md5sum']
297 tm = rospy.impl.registration.get_topic_manager()
298 topic = tm.get_publisher_impl(resolved_topic_name)
299 if not topic:
300 return "[%s] is not a publisher of [%s]. Topics are %s"%(rospy.names.get_caller_id(), resolved_topic_name, tm.get_publications())
301 elif not topic.data_class or topic.closed:
302 return "Internal error processing topic [%s]"%(resolved_topic_name)
303 elif md5sum != rospy.names.TOPIC_ANYTYPE and md5sum != topic.data_class._md5sum:
304 data_class = topic.data_class
305 actual_type = data_class._type
306
307
308
309 if 'type' in header:
310 requested_type = header['type']
311 if requested_type != actual_type:
312 return "topic types do not match: [%s] vs. [%s]"%(requested_type, actual_type)
313 else:
314
315 requested_type = actual_type
316
317 return "Client [%s] wants topic [%s] to have datatype/md5sum [%s/%s], but our version has [%s/%s] Dropping connection."%(header['callerid'], resolved_topic_name, requested_type, md5sum, actual_type, data_class._md5sum)
318
319 else:
320
321
322
323 if 'tcp_nodelay' in header:
324 tcp_nodelay = True if header['tcp_nodelay'].strip() == '1' else False
325 else:
326 tcp_nodelay = self.tcp_nodelay_map.get(resolved_topic_name, False)
327
328 _configure_pub_socket(sock, tcp_nodelay)
329 protocol = TCPROSPub(resolved_topic_name, topic.data_class, is_latch=topic.is_latch, headers=topic.headers)
330 transport = TCPROSTransport(protocol, resolved_topic_name)
331 transport.set_socket(sock, header['callerid'])
332 transport.write_header()
333 topic.add_connection(transport)
334