1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34 """
35 UDPROS connection protocol.
36 """
37
38
39
40
41 import roslib.network
42
43 import rospy.impl.registration
44 import rospy.impl.transport
45
49
51 """
52 rospy protocol handler for UDPROS. Stores the datagram server if necessary.
53 """
54
61
63 """
64 Initialize and start the server thread, if not already initialized.
65 """
66 if self.server is not None:
67 return
68 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
69 s.bind((roslib.network.get_bind_address(), self.port))
70 if self.port == 0:
71 self.port = s.getsockname()[1]
72 self.server = s
73 threading.start_new_thread(self.run, ())
74
76 buff_size = self.buff_size
77 try:
78 while not rospy.core.is_shutdown():
79 data = self.server.recvfrom(self.buff_size)
80 print "received packet"
81
82 except:
83
84 pass
85
87 if self.sock is not None:
88 self.sock.close()
89
91 """
92 Connect to topic resolved_name on Publisher pub_uri using UDPROS.
93 @param resolved_name str: resolved topic name
94 @type resolved_name: str
95 @param pub_uri: XML-RPC URI of publisher
96 @type pub_uri: str
97 @param protocol_params: protocol parameters to use for connecting
98 @type protocol_params: [XmlRpcLegal]
99 @return: code, message, debug
100 @rtype: (int, str, int)
101 """
102
103
104 if type(protocol_params) != list or len(protocol_params) != 4:
105 return 0, "ERROR: invalid UDPROS parameters", 0
106 if protocol_params[0] != UDPROS:
107 return 0, "INTERNAL ERROR: protocol id is not UDPROS: %s"%id, 0
108
109
110 id, dest_addr, dest_port, headers = protocol_params
111
112 self.init_server()
113
114
115
116 sub = rospy.registration.get_topic_manager().get_subscriber_impl(topic_name)
117
118
119
120
121
122 transport = UDPTransport(protocol, topic_name, sub.receive_callback)
123
124
125 if sub.add_connection(transport):
126 return 1, "Connected topic[%s]. Transport impl[%s]"%(topic_name, transport.__class__.__name__), dest_port
127 else:
128 transport.close()
129 return 0, "ERROR: Race condition failure: duplicate topic subscriber [%s] was created"%(topic_name), 0
130
132 """
133 @param protocol: name of protocol
134 @type protocol: str
135 @return: True if protocol is supported
136 @rtype: bool
137 """
138 return protocol == UDPROS
139
141 """
142 Get supported protocols
143 """
144 return [[UDPROS]]
145
147 """
148 Initialize this node to start publishing to a new UDP location.
149
150 @param resolved_name: topic name
151 @type resolved__name: str
152
153 @param protocol_params: requested protocol
154 parameters. protocol[0] must be the string 'UDPROS'
155 @type protocol_params: [str, value*]
156 @return: (code, msg, [UDPROS, addr, port])
157 @rtype: (int, str, list)
158 """
159
160 if protocol_params[0] != UDPROS:
161 return 0, "Internal error: protocol does not match UDPROS: %s"%protocol, []
162
163 _, header, host, port, max_datagram_size = protocol_params
164
165 return 1, "ready", [UDPROS]
166
168 """
169 Process incoming topic connection. Reads in topic name from
170 handshake and creates the appropriate L{TCPROSPub} handler for the
171 connection.
172 @param sock: socket connection
173 @type sock: socket.socket
174 @param client_addr: client address
175 @type client_addr: (str, int)
176 @param header: key/value pairs from handshake header
177 @type header: dict
178 @return: error string or None
179 @rtype: str
180 """
181 for required in ['topic', 'md5sum', 'callerid']:
182 if not required in header:
183 return "Missing required '%s' field"%required
184 else:
185 resolved_topic_name = header['topic']
186 md5sum = header['md5sum']
187 tm = rospy.registration.get_topic_manager()
188 topic = tm.get_publisher_impl(resolved_topic_name)
189 if not topic:
190 return "[%s] is not a publisher of [%s]. Topics are %s"%(rospy.names.get_caller_id(), resolved_topic_name, tm.get_publications())
191 elif md5sum != rospy.names.TOPIC_ANYTYPE and md5sum != topic.data_class._md5sum:
192
193 actual_type = topic.data_class._type
194
195
196
197 if 'type' in header:
198 requested_type = header['type']
199 if requested_type != actual_type:
200 return "topic types do not match: [%s] vs. [%s]"%(requested_type, actual_type)
201 else:
202
203 requested_type = actual_type
204
205 return "Client [%s] wants topic [%s] to have datatype/md5sum [%s/%s], but our version has [%s/%s] Dropping connection."%(header['callerid'], resolved_topic_name, requested_type, md5sum, actual_type, topic.data_class._md5sum)
206
207 else:
208
209
210
211 if 'tcp_nodelay' in header:
212 tcp_nodelay = True if header['tcp_nodelay'].strip() == '1' else False
213 else:
214 tcp_nodelay = self.tcp_nodelay_map.get(resolved_topic_name, False)
215
216 _configure_pub_socket(sock, tcp_nodelay)
217 protocol = TCPROSPub(resolved_topic_name, topic.data_class, is_latch=topic.is_latch, headers=topic.headers)
218 transport = TCPROSTransport(protocol, resolved_topic_name)
219 transport.set_socket(sock, header['callerid'])
220 transport.write_header()
221 topic.add_connection(transport)
222
223
224
225
227 transport_type = 'UDPROS'
228
229 - def __init__(self, protocol, name, header):
230 """
231 ctor
232 @param name: topic name
233 @type name: str:
234 @param protocol: protocol implementation
235 @param protocol: UDPROSTransportProtocol
236 @param header: handshake header if transport handshake header was
237 already read off of transport.
238 @type header: dict
239 @throws TransportInitError: if transport cannot be initialized according to arguments
240 """
241 super(UDPROSTransport, self).__init__(protocol.direction, name=name)
242 if not name:
243 raise TransportInitError("Unable to initialize transport: name is not set")
244
245 self.done = False
246 self.header = header
247
249 """
250 Convenience routine for services to send a message across a
251 particular connection. NOTE: write_data is much more efficient
252 if same message is being sent to multiple connections. Not
253 threadsafe.
254 @param msg: message to send
255 @type msg: Msg
256 @param seq: sequence number for message
257 @type seq: int
258 @raise TransportException: if error occurred sending message
259 """
260
261 serialize_message(self.write_buff, seq, msg)
262 self.write_data(self.write_buff.getvalue())
263 self.write_buff.truncate(0)
264
266 """
267 Write raw data to transport
268 @raise TransportInitialiationError: could not be initialized
269 @raise TransportTerminated: no longer open for publishing
270 """
271
272
273
274 pass
275
277 """
278 block until messages are read off of socket
279 @return: list of newly received messages
280 @rtype: [Msg]
281 @raise TransportException: if unable to receive message due to error
282 """
283 pass
284
285
286
287
290
291
296
297 _handler = UDPROSHandler()
298
301