1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 from __future__ import print_function
36
37 """
38 UDPROS connection protocol.
39 """
40
41
42
43
44 import rosgraph.network
45
46 import rospy.impl.registration
47 import rospy.impl.transport
48
52
54 """
55 rospy protocol handler for UDPROS. Stores the datagram server if necessary.
56 """
57
64
66 """
67 Initialize and start the server thread, if not already initialized.
68 """
69 if self.server is not None:
70 return
71 if rosgraph.network.use_ipv6():
72 s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
73 else:
74 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
75 s.bind((rosgraph.network.get_bind_address(), self.port))
76 if self.port == 0:
77 self.port = s.getsockname()[1]
78 self.server = s
79 threading.start_new_thread(self.run, ())
80
82 buff_size = self.buff_size
83 try:
84 while not rospy.core.is_shutdown():
85 data = self.server.recvfrom(self.buff_size)
86 print("received packet")
87
88 except:
89
90 pass
91
93 if self.sock is not None:
94 self.sock.close()
95
97 """
98 Connect to topic resolved_name on Publisher pub_uri using UDPROS.
99 @param resolved_name str: resolved topic name
100 @type resolved_name: str
101 @param pub_uri: XML-RPC URI of publisher
102 @type pub_uri: str
103 @param protocol_params: protocol parameters to use for connecting
104 @type protocol_params: [XmlRpcLegal]
105 @return: code, message, debug
106 @rtype: (int, str, int)
107 """
108
109
110 if type(protocol_params) != list or len(protocol_params) != 4:
111 return 0, "ERROR: invalid UDPROS parameters", 0
112 if protocol_params[0] != UDPROS:
113 return 0, "INTERNAL ERROR: protocol id is not UDPROS: %s"%id, 0
114
115
116 id, dest_addr, dest_port, headers = protocol_params
117
118 self.init_server()
119
120
121
122 sub = rospy.registration.get_topic_manager().get_subscriber_impl(topic_name)
123
124
125
126
127
128 transport = UDPTransport(protocol, topic_name, sub.receive_callback)
129
130
131 if sub.add_connection(transport):
132 return 1, "Connected topic[%s]. Transport impl[%s]"%(topic_name, transport.__class__.__name__), dest_port
133 else:
134 transport.close()
135 return 0, "ERROR: Race condition failure: duplicate topic subscriber [%s] was created"%(topic_name), 0
136
138 """
139 @param protocol: name of protocol
140 @type protocol: str
141 @return: True if protocol is supported
142 @rtype: bool
143 """
144 return protocol == UDPROS
145
147 """
148 Get supported protocols
149 """
150 return [[UDPROS]]
151
153 """
154 Initialize this node to start publishing to a new UDP location.
155
156 @param resolved_name: topic name
157 @type resolved__name: str
158
159 @param protocol_params: requested protocol
160 parameters. protocol[0] must be the string 'UDPROS'
161 @type protocol_params: [str, value*]
162 @return: (code, msg, [UDPROS, addr, port])
163 @rtype: (int, str, list)
164 """
165
166 if protocol_params[0] != UDPROS:
167 return 0, "Internal error: protocol does not match UDPROS: %s"%protocol, []
168
169 _, header, host, port, max_datagram_size = protocol_params
170
171 return 1, "ready", [UDPROS]
172
174 """
175 Process incoming topic connection. Reads in topic name from
176 handshake and creates the appropriate L{TCPROSPub} handler for the
177 connection.
178 @param sock: socket connection
179 @type sock: socket.socket
180 @param client_addr: client address
181 @type client_addr: (str, int)
182 @param header: key/value pairs from handshake header
183 @type header: dict
184 @return: error string or None
185 @rtype: str
186 """
187 for required in ['topic', 'md5sum', 'callerid']:
188 if not required in header:
189 return "Missing required '%s' field"%required
190 else:
191 resolved_topic_name = header['topic']
192 md5sum = header['md5sum']
193 tm = rospy.registration.get_topic_manager()
194 topic = tm.get_publisher_impl(resolved_topic_name)
195 if not topic:
196 return "[%s] is not a publisher of [%s]. Topics are %s"%(rospy.names.get_caller_id(), resolved_topic_name, tm.get_publications())
197 elif md5sum != rospy.names.TOPIC_ANYTYPE and md5sum != topic.data_class._md5sum:
198
199 actual_type = topic.data_class._type
200
201
202
203 if 'type' in header:
204 requested_type = header['type']
205 if requested_type != actual_type:
206 return "topic types do not match: [%s] vs. [%s]"%(requested_type, actual_type)
207 else:
208
209 requested_type = actual_type
210
211 return "Client [%s] wants topic [%s] to have datatype/md5sum [%s/%s], but our version has [%s/%s] Dropping connection."%(header['callerid'], resolved_topic_name, requested_type, md5sum, actual_type, topic.data_class._md5sum)
212
213 else:
214
215
216
217 if 'tcp_nodelay' in header:
218 tcp_nodelay = True if header['tcp_nodelay'].strip() == '1' else False
219 else:
220 tcp_nodelay = self.tcp_nodelay_map.get(resolved_topic_name, False)
221
222 _configure_pub_socket(sock, tcp_nodelay)
223 protocol = TCPROSPub(resolved_topic_name, topic.data_class, is_latch=topic.is_latch, headers=topic.headers)
224 transport = TCPROSTransport(protocol, resolved_topic_name)
225 transport.set_socket(sock, header['callerid'])
226 transport.write_header()
227 topic.add_connection(transport)
228
229
230
231
233 transport_type = 'UDPROS'
234
235 - def __init__(self, protocol, name, header):
236 """
237 ctor
238 @param name: topic name
239 @type name: str:
240 @param protocol: protocol implementation
241 @param protocol: UDPROSTransportProtocol
242 @param header: handshake header if transport handshake header was
243 already read off of transport.
244 @type header: dict
245 @throws TransportInitError: if transport cannot be initialized according to arguments
246 """
247 super(UDPROSTransport, self).__init__(protocol.direction, name=name)
248 if not name:
249 raise TransportInitError("Unable to initialize transport: name is not set")
250
251 self.done = False
252 self.header = header
253
255 """
256 Convenience routine for services to send a message across a
257 particular connection. NOTE: write_data is much more efficient
258 if same message is being sent to multiple connections. Not
259 threadsafe.
260 @param msg: message to send
261 @type msg: Msg
262 @param seq: sequence number for message
263 @type seq: int
264 @raise TransportException: if error occurred sending message
265 """
266
267 serialize_message(self.write_buff, seq, msg)
268 self.write_data(self.write_buff.getvalue())
269 self.write_buff.truncate(0)
270
272 """
273 Write raw data to transport
274 @raise TransportInitialiationError: could not be initialized
275 @raise TransportTerminated: no longer open for publishing
276 """
277
278
279
280 pass
281
283 """
284 block until messages are read off of socket
285 @return: list of newly received messages
286 @rtype: [Msg]
287 @raise TransportException: if unable to receive message due to error
288 """
289 pass
290
291
292
293
296
297
302
303 _handler = UDPROSHandler()
304
307