1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 from __future__ import print_function
36
37 """
38 UDPROS connection protocol.
39 """
40
41
42
43
44 import roslib.network
45
46 import rospy.impl.registration
47 import rospy.impl.transport
48
52
54 """
55 rospy protocol handler for UDPROS. Stores the datagram server if necessary.
56 """
57
64
66 """
67 Initialize and start the server thread, if not already initialized.
68 """
69 if self.server is not None:
70 return
71 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
72 s.bind((roslib.network.get_bind_address(), self.port))
73 if self.port == 0:
74 self.port = s.getsockname()[1]
75 self.server = s
76 threading.start_new_thread(self.run, ())
77
79 buff_size = self.buff_size
80 try:
81 while not rospy.core.is_shutdown():
82 data = self.server.recvfrom(self.buff_size)
83 print("received packet")
84
85 except:
86
87 pass
88
90 if self.sock is not None:
91 self.sock.close()
92
94 """
95 Connect to topic resolved_name on Publisher pub_uri using UDPROS.
96 @param resolved_name str: resolved topic name
97 @type resolved_name: str
98 @param pub_uri: XML-RPC URI of publisher
99 @type pub_uri: str
100 @param protocol_params: protocol parameters to use for connecting
101 @type protocol_params: [XmlRpcLegal]
102 @return: code, message, debug
103 @rtype: (int, str, int)
104 """
105
106
107 if type(protocol_params) != list or len(protocol_params) != 4:
108 return 0, "ERROR: invalid UDPROS parameters", 0
109 if protocol_params[0] != UDPROS:
110 return 0, "INTERNAL ERROR: protocol id is not UDPROS: %s"%id, 0
111
112
113 id, dest_addr, dest_port, headers = protocol_params
114
115 self.init_server()
116
117
118
119 sub = rospy.registration.get_topic_manager().get_subscriber_impl(topic_name)
120
121
122
123
124
125 transport = UDPTransport(protocol, topic_name, sub.receive_callback)
126
127
128 if sub.add_connection(transport):
129 return 1, "Connected topic[%s]. Transport impl[%s]"%(topic_name, transport.__class__.__name__), dest_port
130 else:
131 transport.close()
132 return 0, "ERROR: Race condition failure: duplicate topic subscriber [%s] was created"%(topic_name), 0
133
135 """
136 @param protocol: name of protocol
137 @type protocol: str
138 @return: True if protocol is supported
139 @rtype: bool
140 """
141 return protocol == UDPROS
142
144 """
145 Get supported protocols
146 """
147 return [[UDPROS]]
148
150 """
151 Initialize this node to start publishing to a new UDP location.
152
153 @param resolved_name: topic name
154 @type resolved__name: str
155
156 @param protocol_params: requested protocol
157 parameters. protocol[0] must be the string 'UDPROS'
158 @type protocol_params: [str, value*]
159 @return: (code, msg, [UDPROS, addr, port])
160 @rtype: (int, str, list)
161 """
162
163 if protocol_params[0] != UDPROS:
164 return 0, "Internal error: protocol does not match UDPROS: %s"%protocol, []
165
166 _, header, host, port, max_datagram_size = protocol_params
167
168 return 1, "ready", [UDPROS]
169
171 """
172 Process incoming topic connection. Reads in topic name from
173 handshake and creates the appropriate L{TCPROSPub} handler for the
174 connection.
175 @param sock: socket connection
176 @type sock: socket.socket
177 @param client_addr: client address
178 @type client_addr: (str, int)
179 @param header: key/value pairs from handshake header
180 @type header: dict
181 @return: error string or None
182 @rtype: str
183 """
184 for required in ['topic', 'md5sum', 'callerid']:
185 if not required in header:
186 return "Missing required '%s' field"%required
187 else:
188 resolved_topic_name = header['topic']
189 md5sum = header['md5sum']
190 tm = rospy.registration.get_topic_manager()
191 topic = tm.get_publisher_impl(resolved_topic_name)
192 if not topic:
193 return "[%s] is not a publisher of [%s]. Topics are %s"%(rospy.names.get_caller_id(), resolved_topic_name, tm.get_publications())
194 elif md5sum != rospy.names.TOPIC_ANYTYPE and md5sum != topic.data_class._md5sum:
195
196 actual_type = topic.data_class._type
197
198
199
200 if 'type' in header:
201 requested_type = header['type']
202 if requested_type != actual_type:
203 return "topic types do not match: [%s] vs. [%s]"%(requested_type, actual_type)
204 else:
205
206 requested_type = actual_type
207
208 return "Client [%s] wants topic [%s] to have datatype/md5sum [%s/%s], but our version has [%s/%s] Dropping connection."%(header['callerid'], resolved_topic_name, requested_type, md5sum, actual_type, topic.data_class._md5sum)
209
210 else:
211
212
213
214 if 'tcp_nodelay' in header:
215 tcp_nodelay = True if header['tcp_nodelay'].strip() == '1' else False
216 else:
217 tcp_nodelay = self.tcp_nodelay_map.get(resolved_topic_name, False)
218
219 _configure_pub_socket(sock, tcp_nodelay)
220 protocol = TCPROSPub(resolved_topic_name, topic.data_class, is_latch=topic.is_latch, headers=topic.headers)
221 transport = TCPROSTransport(protocol, resolved_topic_name)
222 transport.set_socket(sock, header['callerid'])
223 transport.write_header()
224 topic.add_connection(transport)
225
226
227
228
230 transport_type = 'UDPROS'
231
232 - def __init__(self, protocol, name, header):
233 """
234 ctor
235 @param name: topic name
236 @type name: str:
237 @param protocol: protocol implementation
238 @param protocol: UDPROSTransportProtocol
239 @param header: handshake header if transport handshake header was
240 already read off of transport.
241 @type header: dict
242 @throws TransportInitError: if transport cannot be initialized according to arguments
243 """
244 super(UDPROSTransport, self).__init__(protocol.direction, name=name)
245 if not name:
246 raise TransportInitError("Unable to initialize transport: name is not set")
247
248 self.done = False
249 self.header = header
250
252 """
253 Convenience routine for services to send a message across a
254 particular connection. NOTE: write_data is much more efficient
255 if same message is being sent to multiple connections. Not
256 threadsafe.
257 @param msg: message to send
258 @type msg: Msg
259 @param seq: sequence number for message
260 @type seq: int
261 @raise TransportException: if error occurred sending message
262 """
263
264 serialize_message(self.write_buff, seq, msg)
265 self.write_data(self.write_buff.getvalue())
266 self.write_buff.truncate(0)
267
269 """
270 Write raw data to transport
271 @raise TransportInitialiationError: could not be initialized
272 @raise TransportTerminated: no longer open for publishing
273 """
274
275
276
277 pass
278
280 """
281 block until messages are read off of socket
282 @return: list of newly received messages
283 @rtype: [Msg]
284 @raise TransportException: if unable to receive message due to error
285 """
286 pass
287
288
289
290
293
294
299
300 _handler = UDPROSHandler()
301
304