1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16   
 17   
 18   
 19   
 20   
 21   
 22   
 23   
 24   
 25   
 26   
 27   
 28   
 29   
 30   
 31   
 32   
 33   
 34   
 35  from __future__ import print_function 
 36   
 37  """ 
 38  UDPROS connection protocol. 
 39  """ 
 40   
 41   
 42   
 43   
 44  import rosgraph.network 
 45   
 46  import rospy.impl.registration 
 47  import rospy.impl.transport 
 48   
 52   
 54      """ 
 55      rospy protocol handler for UDPROS. Stores the datagram server if necessary. 
 56      """ 
 57       
 64           
 66          """ 
 67          Initialize and start the server thread, if not already initialized. 
 68          """ 
 69          if self.server is not None: 
 70              return 
 71          if rosgraph.network.use_ipv6(): 
 72              s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) 
 73          else: 
 74              s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)  
 75          s.bind((rosgraph.network.get_bind_address(), self.port)) 
 76          if self.port == 0: 
 77              self.port = s.getsockname()[1] 
 78          self.server = s 
 79          threading.start_new_thread(self.run, ()) 
  80   
 82          buff_size = self.buff_size 
 83          try: 
 84              while not rospy.core.is_shutdown(): 
 85                  data = self.server.recvfrom(self.buff_size) 
 86                  print("received packet") 
 87                   
 88          except: 
 89               
 90              pass 
  91   
 93          if self.sock is not None: 
 94              self.sock.close() 
  95   
 97          """ 
 98          Connect to topic resolved_name on Publisher pub_uri using UDPROS. 
 99          @param resolved_name str: resolved topic name 
100          @type  resolved_name: str 
101          @param pub_uri: XML-RPC URI of publisher  
102          @type  pub_uri: str 
103          @param protocol_params: protocol parameters to use for connecting 
104          @type protocol_params: [XmlRpcLegal] 
105          @return: code, message, debug 
106          @rtype: (int, str, int) 
107          """ 
108   
109           
110          if type(protocol_params) != list or len(protocol_params) != 4: 
111              return 0, "ERROR: invalid UDPROS parameters", 0 
112          if protocol_params[0] != UDPROS: 
113              return 0, "INTERNAL ERROR: protocol id is not UDPROS: %s"%id, 0 
114   
115           
116          id, dest_addr, dest_port, headers = protocol_params 
117   
118          self.init_server() 
119           
120           
121   
122          sub = rospy.registration.get_topic_manager().get_subscriber_impl(topic_name) 
123           
124           
125           
126           
127           
128          transport = UDPTransport(protocol, topic_name, sub.receive_callback)  
129           
130           
131          if sub.add_connection(transport):  
132              return 1, "Connected topic[%s]. Transport impl[%s]"%(topic_name, transport.__class__.__name__), dest_port 
133          else: 
134              transport.close() 
135              return 0, "ERROR: Race condition failure: duplicate topic subscriber [%s] was created"%(topic_name), 0 
 136   
138          """ 
139          @param protocol: name of protocol 
140          @type protocol: str 
141          @return: True if protocol is supported 
142          @rtype: bool 
143          """ 
144          return protocol == UDPROS 
 145       
147          """ 
148          Get supported protocols 
149          """ 
150          return [[UDPROS]] 
 151           
153          """ 
154          Initialize this node to start publishing to a new UDP location. 
155           
156          @param resolved_name: topic name 
157          @type  resolved__name: str 
158           
159          @param protocol_params: requested protocol 
160            parameters. protocol[0] must be the string 'UDPROS' 
161          @type  protocol_params: [str, value*] 
162          @return: (code, msg, [UDPROS, addr, port]) 
163          @rtype: (int, str, list) 
164          """ 
165   
166          if protocol_params[0] != UDPROS: 
167              return 0, "Internal error: protocol does not match UDPROS: %s"%protocol, [] 
168           
169          _, header, host, port, max_datagram_size = protocol_params 
170           
171          return 1, "ready", [UDPROS] 
 172   
174          """ 
175          Process incoming topic connection. Reads in topic name from 
176          handshake and creates the appropriate L{TCPROSPub} handler for the 
177          connection. 
178          @param sock: socket connection 
179          @type sock: socket.socket 
180          @param client_addr: client address 
181          @type client_addr: (str, int) 
182          @param header: key/value pairs from handshake header 
183          @type header: dict 
184          @return: error string or None  
185          @rtype: str 
186          """ 
187          for required in ['topic', 'md5sum', 'callerid']: 
188              if not required in header: 
189                  return "Missing required '%s' field"%required 
190          else: 
191              resolved_topic_name = header['topic'] 
192              md5sum = header['md5sum'] 
193              tm = rospy.registration.get_topic_manager() 
194              topic = tm.get_publisher_impl(resolved_topic_name) 
195              if not topic: 
196                  return "[%s] is not a publisher of  [%s]. Topics are %s"%(rospy.names.get_caller_id(), resolved_topic_name, tm.get_publications()) 
197              elif md5sum != rospy.names.TOPIC_ANYTYPE and md5sum != topic.data_class._md5sum: 
198   
199                  actual_type = topic.data_class._type 
200   
201                   
202                   
203                  if 'type' in header: 
204                      requested_type = header['type'] 
205                      if requested_type != actual_type: 
206                          return "topic types do not match: [%s] vs. [%s]"%(requested_type, actual_type) 
207                  else: 
208                       
209                      requested_type = actual_type 
210   
211                  return "Client [%s] wants topic [%s] to have datatype/md5sum [%s/%s], but our version has [%s/%s] Dropping connection."%(header['callerid'], resolved_topic_name, requested_type, md5sum, actual_type, topic.data_class._md5sum) 
212   
213              else: 
214                   
215   
216                   
217                  if 'tcp_nodelay' in header: 
218                      tcp_nodelay = True if header['tcp_nodelay'].strip() == '1' else False 
219                  else: 
220                      tcp_nodelay = self.tcp_nodelay_map.get(resolved_topic_name, False) 
221   
222                  _configure_pub_socket(sock, tcp_nodelay) 
223                  protocol = TCPROSPub(resolved_topic_name, topic.data_class, is_latch=topic.is_latch, headers=topic.headers) 
224                  transport = TCPROSTransport(protocol, resolved_topic_name) 
225                  transport.set_socket(sock, header['callerid']) 
226                  transport.write_header() 
227                  topic.add_connection(transport) 
 228               
229       
230   
231   
233      transport_type = 'UDPROS' 
234       
235 -    def __init__(self, protocol, name, header): 
 236          """ 
237          ctor 
238          @param name: topic name     
239          @type  name: str: 
240          @param protocol: protocol implementation     
241          @param protocol: UDPROSTransportProtocol  
242          @param header: handshake header if transport handshake header was 
243          already read off of transport. 
244          @type  header: dict 
245          @throws TransportInitError: if transport cannot be initialized according to arguments 
246          """ 
247          super(UDPROSTransport, self).__init__(protocol.direction, name=name) 
248          if not name: 
249              raise TransportInitError("Unable to initialize transport: name is not set") 
250   
251          self.done = False 
252          self.header = header 
 253               
255          """ 
256          Convenience routine for services to send a message across a 
257          particular connection. NOTE: write_data is much more efficient 
258          if same message is being sent to multiple connections. Not 
259          threadsafe. 
260          @param msg: message to send 
261          @type  msg: Msg 
262          @param seq: sequence number for message 
263          @type  seq: int 
264          @raise TransportException: if error occurred sending message 
265          """ 
266           
267          serialize_message(self.write_buff, seq, msg) 
268          self.write_data(self.write_buff.getvalue()) 
269          self.write_buff.truncate(0) 
 270   
272          """ 
273          Write raw data to transport 
274          @raise TransportInitialiationError: could not be initialized 
275          @raise TransportTerminated: no longer open for publishing 
276          """ 
277           
278           
279           
280          pass 
 281       
283          """ 
284          block until messages are read off of socket 
285          @return: list of newly received messages 
286          @rtype: [Msg] 
287          @raise TransportException: if unable to receive message due to error 
288          """ 
289          pass 
 290   
291       
292       
293       
296       
297       
 302       
303  _handler = UDPROSHandler() 
304   
307