Go to the documentation of this file.00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 
00032 
00033 package org.ros.rosserial;
00034 
00035 import java.nio.ByteBuffer;
00036 import java.nio.ByteOrder;
00037 import java.util.HashMap;
00038 import java.util.Map;
00039 import java.util.Timer;
00040 import java.util.TimerTask;
00041 
00042 import org.ros.message.Message;
00043 import org.ros.message.MessageDeserializer;
00044 import org.ros.message.rosserial_msgs.Log;
00045 import org.ros.message.rosserial_msgs.TopicInfo;
00046 import org.ros.service.rosserial_msgs.RequestParam;
00047 import org.ros.node.Node;
00048 import org.ros.node.parameter.ParameterTree;
00049 import org.ros.node.topic.Publisher;
00050 import org.ros.node.topic.Subscriber;
00051 
00052 import org.ros.rosserial.TopicRegistrationListener;
00053 
00059 public class Protocol {
00060 
00061         
00062         
00063         static final int TOPIC_PUBLISHERS = 0;
00064         static final int TOPIC_SUBSCRIBERS = 1;
00065         static final int TOPIC_TIME = 10;
00066         
00067         public static final byte[] NEGOTIATE_TOPICS_REQUEST = { (byte) 0, (byte) 0, (byte) 0, (byte) 0 };
00068 
00072         private Node node;
00073 
00077         private Map<Integer, TopicInfo> id_to_topic = new HashMap<Integer, TopicInfo>();
00078 
00082         private Map<String, Integer> topic_to_id = new HashMap<String, Integer>();
00083 
00087         private Map<Integer, Publisher> publishers = new HashMap<Integer, Publisher>();
00088         
00092         private Map<Integer, Subscriber> subscribers = new HashMap<Integer, Subscriber>();
00093         
00097         private Map<Integer, MessageDeserializer> msg_deserializers = new HashMap<Integer, MessageDeserializer>();
00098 
00102         private TopicRegistrationListener newPubListener;
00103 
00107         private TopicRegistrationListener newSubListener;
00108 
00112         private PacketHandler packetHandler;
00113 
00114         public Protocol(Node nh, PacketHandler handler) {
00115                 this.node = nh;
00116                 this.packetHandler = handler;
00117                 this.paramT = nh.newParameterTree();
00118         }
00119 
00125         public void setOnNewPublication(TopicRegistrationListener listener) {
00126                 newPubListener = listener;
00127         }
00128 
00134         public void setOnNewSubcription(TopicRegistrationListener listener) {
00135                 newSubListener = listener;
00136         }
00137 
00141         public void negotiateTopics() {
00142                 packetHandler.send(NEGOTIATE_TOPICS_REQUEST);
00143         }
00144 
00149         public byte[] constructMessage(int id, org.ros.message.Message m) {
00150                 int l = m.serializationLength();
00151                 byte[] data = new byte[l + 4];
00152                 ByteBuffer buff = ByteBuffer.wrap(data, 4, l);
00153 
00154                 data[0] = (byte) id;
00155                 data[1] = (byte) (id >> 8);
00156                 data[2] = (byte) l;
00157                 data[3] = (byte) (l >> 8);
00158 
00159                 m.serialize(buff, 0);
00160 
00161                 return data;
00162         }
00163 
00169         private void addTopic(TopicInfo topic, boolean is_publisher) {
00170                 String name = topic.topic_name;
00171                 String type = topic.message_type;
00172                 Integer id = topic.topic_id;
00173                 
00174                 if (id_to_topic.containsKey(id)) {
00175                         if (id_to_topic.get(id).topic_name.equals(name))
00176                                 return;
00177                 }
00178                 try {
00179                         msg_deserializers.put(topic.topic_id,
00180                                         node.getMessageSerializationFactory()
00181                                                         .newMessageDeserializer(type));
00182                         topic_to_id.put(topic.topic_name, topic.topic_id);
00183                         id_to_topic.put(id, topic);
00184 
00185                         if (is_publisher) {
00186                                 Publisher pub = node.newPublisher(name, type);
00187                                 publishers.put(id, pub);
00188                                 node.getLog().info(
00189                                                 "Adding Publisher " + name + " of type " + type);
00190                                 if (newPubListener != null)
00191                                         newPubListener.onNewTopic(topic);
00192                         } else {
00193                                 Subscriber sub = node.newSubscriber(name, type,
00194                                                 new MessageListenerForwarding(id, this));
00195                                 subscribers.put(id, sub);
00196                                 node.getLog().info(
00197                                                 "Adding Subscriber " + name + " of type " + type);
00198                                 if (newSubListener != null)
00199                                         newSubListener.onNewTopic(topic);
00200                         }
00201                 } catch (Exception e) {
00202                         node.getLog().error("Exception while adding topic", e);
00203                 }
00204         }
00205 
00206         public TopicInfo[] getSubscriptions() {
00207                 TopicInfo[] topics = new TopicInfo[subscribers.size()];
00208 
00209                 int i = 0;
00210                 for (Integer id : subscribers.keySet()) {
00211                         topics[i++] = id_to_topic.get(id);
00212                 }
00213                 return topics;
00214         }
00215 
00216         public TopicInfo[] getPublications() {
00217                 TopicInfo[] topics = new TopicInfo[publishers.size()];
00218 
00219                 int i = 0;
00220                 for (Integer id : publishers.keySet()) {
00221                         topics[i++] = id_to_topic.get(id);
00222                 }
00223                 return topics;
00224         }
00225 
00230         private Timer connection_timer =  new Timer();
00231         static final int CONNECTION_TIMOUT_PERIOD = 10000;
00232         TimerTask timer_cb = new TimerTask() {
00233                 
00234                 @Override
00235                 public void run() {
00236                         
00237                         if (sync_requested){
00238                                 connected = true;
00239                                 sync_requested = false;
00240                                 
00241                         }
00242                         else{
00243                                 node.getLog().info("Connection to client lost. Topic negotiation requested");
00244                                 connected = false;
00245                                 negotiateTopics();
00246                         }
00247                 }
00248         };
00249         
00250         private boolean sync_requested=false;
00251         private boolean connected =false;
00252         
00253         public void start(){
00254                 connection_timer.scheduleAtFixedRate(timer_cb, CONNECTION_TIMOUT_PERIOD,  CONNECTION_TIMOUT_PERIOD);
00255         }
00256         
00267         public boolean parsePacket(int topic_id, byte[] msg_data) {
00268 
00269                 switch (topic_id) {
00270                 case TopicInfo.ID_PUBLISHER:
00271                         TopicInfo pm = new TopicInfo();
00272                         pm.deserialize(msg_data);
00273                         addTopic(pm, true);
00274                         connected = true;
00275                         break;
00276 
00277                 case TopicInfo.ID_SUBSCRIBER:
00278                         TopicInfo sm = new TopicInfo();
00279                         sm.deserialize(msg_data);
00280                         addTopic(sm, false);
00281                         connected =true;
00282                         break;
00283 
00284                 case TopicInfo.ID_SERVICE_SERVER:
00285                         break;
00286                 case TopicInfo.ID_SERVICE_CLIENT:
00287                         break;
00288                 case TopicInfo.ID_PARAMETER_REQUEST:
00289                         handleParameterRequest(msg_data);
00290                         break;
00291                 case TopicInfo.ID_LOG:
00292                         handleLogging(msg_data);
00293                         break;
00294                 case TopicInfo.ID_TIME:
00295                         sync_requested = true;
00296                         org.ros.message.Time t = node.getCurrentTime();
00297                         org.ros.message.std_msgs.Time t_msg = new org.ros.message.std_msgs.Time();
00298                         t_msg.data = t;
00299                         packetHandler.send(constructMessage(TOPIC_TIME, t_msg));
00300                         break;
00301 
00302                 default:
00303                         MessageDeserializer c = msg_deserializers.get(topic_id);
00304                         if (c != null) {
00305                                 
00306                             ByteBuffer bb = ByteBuffer.wrap(msg_data);
00307                             Message msg = (Message) c.deserialize(bb.asReadOnlyBuffer().order(ByteOrder.LITTLE_ENDIAN));
00308                                 publishers.get(topic_id).publish(msg);
00309                         } else {
00310                                 node.getLog().info(
00311                                                 "Trying to publish to unregistered ID #" + topic_id);
00312 
00313                                 
00314                                 negotiateTopics();
00315                         }
00316                         break;
00317                 }
00318 
00319                 return false;
00320         }
00321 
00327         private void  handleLogging(byte[] msg_data){
00328                 Log log_msg = new Log();
00329                 log_msg.deserialize(msg_data);
00330                 switch(log_msg.level){
00331                 case Log.DEBUG:
00332                         node.getLog().debug(log_msg.msg);
00333                         break;
00334                 case Log.INFO:
00335                         node.getLog().info(log_msg.msg);
00336                         break;
00337                 case Log.WARN:
00338                         node.getLog().warn(log_msg.msg);
00339                         break;
00340                 case Log.ERROR:
00341                         node.getLog().error(log_msg.msg);
00342                         break;
00343                 case Log.FATAL:
00344                         node.getLog().fatal(log_msg.msg);
00345                         break;
00346                 }
00347         }
00348         
00349         
00350         ParameterTree paramT ;
00351         private void handleParameterRequest(byte[] msg_data){
00352                 RequestParam rp = new RequestParam();
00353                 RequestParam.Request req = rp.createRequest();
00354                 req.deserialize(msg_data);
00355                 
00356                 RequestParam.Response resp = rp.createResponse();               
00357 
00358         }
00359         
00366         private static class MessageListenerForwarding<MessageType> implements
00367                         org.ros.message.MessageListener<MessageType> {
00371                 private Protocol protocol;
00372 
00376                 private int id;
00377 
00378                 public MessageListenerForwarding(int topic_id, Protocol p) {
00379                         protocol = p;
00380                         id = topic_id;
00381                 }
00382 
00383                 @Override
00384                 public void onNewMessage(MessageType t) {
00385                         byte[] data = protocol.constructMessage(id, (Message) t);
00386                         protocol.packetHandler.send(data);
00387                 }
00388         }
00389 
00395         public interface PacketHandler {
00402                 void send(byte[] data);
00403         }
00404 }