1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 from __future__ import print_function
36
37 """
38 UDPROS connection protocol.
39 """
40
41
42
43
44 import socket
45 import threading
46 import rosgraph.network
47
48 import rospy.impl.registration
49 import rospy.impl.transport
50
54
56 """
57 rospy protocol handler for UDPROS. Stores the datagram server if necessary.
58 """
59
66
68 """
69 Initialize and start the server thread, if not already initialized.
70 """
71 if self.server is not None:
72 return
73 if rosgraph.network.use_ipv6():
74 s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
75 else:
76 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
77 s.bind((rosgraph.network.get_bind_address(), self.port))
78 if self.port == 0:
79 self.port = s.getsockname()[1]
80 self.server = s
81 threading.start_new_thread(self.run, ())
82
84 buff_size = self.buff_size
85 try:
86 while not rospy.core.is_shutdown():
87 data = self.server.recvfrom(self.buff_size)
88 print("received packet")
89
90 except:
91
92 pass
93
95 if self.sock is not None:
96 self.sock.close()
97
99 """
100 Connect to topic resolved_name on Publisher pub_uri using UDPROS.
101 @param resolved_name str: resolved topic name
102 @type resolved_name: str
103 @param pub_uri: XML-RPC URI of publisher
104 @type pub_uri: str
105 @param protocol_params: protocol parameters to use for connecting
106 @type protocol_params: [XmlRpcLegal]
107 @return: code, message, debug
108 @rtype: (int, str, int)
109 """
110
111
112 if type(protocol_params) != list or len(protocol_params) != 4:
113 return 0, "ERROR: invalid UDPROS parameters", 0
114 if protocol_params[0] != UDPROS:
115 return 0, "INTERNAL ERROR: protocol id is not UDPROS: %s"%id, 0
116
117
118 id, dest_addr, dest_port, headers = protocol_params
119
120 self.init_server()
121
122
123
124 sub = rospy.registration.get_topic_manager().get_subscriber_impl(topic_name)
125
126
127
128
129
130 transport = UDPTransport(protocol, topic_name, sub.receive_callback)
131
132
133 if sub.add_connection(transport):
134 return 1, "Connected topic[%s]. Transport impl[%s]"%(topic_name, transport.__class__.__name__), dest_port
135 else:
136 transport.close()
137 return 0, "ERROR: Race condition failure: duplicate topic subscriber [%s] was created"%(topic_name), 0
138
140 """
141 @param protocol: name of protocol
142 @type protocol: str
143 @return: True if protocol is supported
144 @rtype: bool
145 """
146 return protocol == UDPROS
147
149 """
150 Get supported protocols
151 """
152 return [[UDPROS]]
153
155 """
156 Initialize this node to start publishing to a new UDP location.
157
158 @param resolved_name: topic name
159 @type resolved__name: str
160
161 @param protocol_params: requested protocol
162 parameters. protocol[0] must be the string 'UDPROS'
163 @type protocol_params: [str, value*]
164 @return: (code, msg, [UDPROS, addr, port])
165 @rtype: (int, str, list)
166 """
167
168 if protocol_params[0] != UDPROS:
169 return 0, "Internal error: protocol does not match UDPROS: %s"%protocol, []
170
171 _, header, host, port, max_datagram_size = protocol_params
172
173 return 1, "ready", [UDPROS]
174
176 """
177 Process incoming topic connection. Reads in topic name from
178 handshake and creates the appropriate L{TCPROSPub} handler for the
179 connection.
180 @param sock: socket connection
181 @type sock: socket.socket
182 @param client_addr: client address
183 @type client_addr: (str, int)
184 @param header: key/value pairs from handshake header
185 @type header: dict
186 @return: error string or None
187 @rtype: str
188 """
189 for required in ['topic', 'md5sum', 'callerid']:
190 if not required in header:
191 return "Missing required '%s' field"%required
192 else:
193 resolved_topic_name = header['topic']
194 md5sum = header['md5sum']
195 tm = rospy.registration.get_topic_manager()
196 topic = tm.get_publisher_impl(resolved_topic_name)
197 if not topic:
198 return "[%s] is not a publisher of [%s]. Topics are %s"%(rospy.names.get_caller_id(), resolved_topic_name, tm.get_publications())
199 elif md5sum != rospy.names.TOPIC_ANYTYPE and md5sum != topic.data_class._md5sum:
200
201 actual_type = topic.data_class._type
202
203
204
205 if 'type' in header:
206 requested_type = header['type']
207 if requested_type != actual_type:
208 return "topic types do not match: [%s] vs. [%s]"%(requested_type, actual_type)
209 else:
210
211 requested_type = actual_type
212
213 return "Client [%s] wants topic [%s] to have datatype/md5sum [%s/%s], but our version has [%s/%s] Dropping connection."%(header['callerid'], resolved_topic_name, requested_type, md5sum, actual_type, topic.data_class._md5sum)
214
215 else:
216
217
218
219 if 'tcp_nodelay' in header:
220 tcp_nodelay = True if header['tcp_nodelay'].strip() == '1' else False
221 else:
222 tcp_nodelay = self.tcp_nodelay_map.get(resolved_topic_name, False)
223
224 _configure_pub_socket(sock, tcp_nodelay)
225 protocol = TCPROSPub(resolved_topic_name, topic.data_class, is_latch=topic.is_latch, headers=topic.headers)
226 transport = TCPROSTransport(protocol, resolved_topic_name)
227 transport.set_socket(sock, header['callerid'])
228 transport.write_header()
229 topic.add_connection(transport)
230
231
232
233
235 transport_type = 'UDPROS'
236
237 - def __init__(self, protocol, name, header):
238 """
239 ctor
240 @param name: topic name
241 @type name: str:
242 @param protocol: protocol implementation
243 @param protocol: UDPROSTransportProtocol
244 @param header: handshake header if transport handshake header was
245 already read off of transport.
246 @type header: dict
247 @throws TransportInitError: if transport cannot be initialized according to arguments
248 """
249 super(UDPROSTransport, self).__init__(protocol.direction, name=name)
250 if not name:
251 raise TransportInitError("Unable to initialize transport: name is not set")
252
253 self.done = False
254 self.header = header
255
257 """
258 Convenience routine for services to send a message across a
259 particular connection. NOTE: write_data is much more efficient
260 if same message is being sent to multiple connections. Not
261 threadsafe.
262 @param msg: message to send
263 @type msg: Msg
264 @param seq: sequence number for message
265 @type seq: int
266 @raise TransportException: if error occurred sending message
267 """
268
269 serialize_message(self.write_buff, seq, msg)
270 self.write_data(self.write_buff.getvalue())
271 self.write_buff.truncate(0)
272 self.write_buff.seek(0)
273
275 """
276 Write raw data to transport
277 @raise TransportInitialiationError: could not be initialized
278 @raise TransportTerminated: no longer open for publishing
279 """
280
281
282
283 pass
284
286 """
287 block until messages are read off of socket
288 @return: list of newly received messages
289 @rtype: [Msg]
290 @raise TransportException: if unable to receive message due to error
291 """
292 pass
293
294
295
296
299
300
305
306 _handler = UDPROSHandler()
307
310