Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 package org.ros.rosserial;
00034
00035 import java.nio.ByteBuffer;
00036 import java.nio.ByteOrder;
00037 import java.util.HashMap;
00038 import java.util.Map;
00039 import java.util.Timer;
00040 import java.util.TimerTask;
00041
00042 import org.ros.message.Message;
00043 import org.ros.message.MessageDeserializer;
00044 import org.ros.message.rosserial_msgs.Log;
00045 import org.ros.message.rosserial_msgs.TopicInfo;
00046 import org.ros.service.rosserial_msgs.RequestParam;
00047 import org.ros.node.Node;
00048 import org.ros.node.parameter.ParameterTree;
00049 import org.ros.node.topic.Publisher;
00050 import org.ros.node.topic.Subscriber;
00051
00052 import org.ros.rosserial.TopicRegistrationListener;
00053
00059 public class Protocol {
00060
00061
00062
00063 static final int TOPIC_PUBLISHERS = 0;
00064 static final int TOPIC_SUBSCRIBERS = 1;
00065 static final int TOPIC_TIME = 10;
00066
00067 public static final byte[] NEGOTIATE_TOPICS_REQUEST = { (byte) 0, (byte) 0, (byte) 0, (byte) 0 };
00068
00072 private Node node;
00073
00077 private Map<Integer, TopicInfo> id_to_topic = new HashMap<Integer, TopicInfo>();
00078
00082 private Map<String, Integer> topic_to_id = new HashMap<String, Integer>();
00083
00087 private Map<Integer, Publisher> publishers = new HashMap<Integer, Publisher>();
00088
00092 private Map<Integer, Subscriber> subscribers = new HashMap<Integer, Subscriber>();
00093
00097 private Map<Integer, MessageDeserializer> msg_deserializers = new HashMap<Integer, MessageDeserializer>();
00098
00102 private TopicRegistrationListener newPubListener;
00103
00107 private TopicRegistrationListener newSubListener;
00108
00112 private PacketHandler packetHandler;
00113
00114 public Protocol(Node nh, PacketHandler handler) {
00115 this.node = nh;
00116 this.packetHandler = handler;
00117 this.paramT = nh.newParameterTree();
00118 }
00119
00125 public void setOnNewPublication(TopicRegistrationListener listener) {
00126 newPubListener = listener;
00127 }
00128
00134 public void setOnNewSubcription(TopicRegistrationListener listener) {
00135 newSubListener = listener;
00136 }
00137
00141 public void negotiateTopics() {
00142 packetHandler.send(NEGOTIATE_TOPICS_REQUEST);
00143 }
00144
00149 public byte[] constructMessage(int id, org.ros.message.Message m) {
00150 int l = m.serializationLength();
00151 byte[] data = new byte[l + 4];
00152 ByteBuffer buff = ByteBuffer.wrap(data, 4, l);
00153
00154 data[0] = (byte) id;
00155 data[1] = (byte) (id >> 8);
00156 data[2] = (byte) l;
00157 data[3] = (byte) (l >> 8);
00158
00159 m.serialize(buff, 0);
00160
00161 return data;
00162 }
00163
00169 private void addTopic(TopicInfo topic, boolean is_publisher) {
00170 String name = topic.topic_name;
00171 String type = topic.message_type;
00172 Integer id = topic.topic_id;
00173
00174 if (id_to_topic.containsKey(id)) {
00175 if (id_to_topic.get(id).topic_name.equals(name))
00176 return;
00177 }
00178 try {
00179 msg_deserializers.put(topic.topic_id,
00180 node.getMessageSerializationFactory()
00181 .newMessageDeserializer(type));
00182 topic_to_id.put(topic.topic_name, topic.topic_id);
00183 id_to_topic.put(id, topic);
00184
00185 if (is_publisher) {
00186 Publisher pub = node.newPublisher(name, type);
00187 publishers.put(id, pub);
00188 node.getLog().info(
00189 "Adding Publisher " + name + " of type " + type);
00190 if (newPubListener != null)
00191 newPubListener.onNewTopic(topic);
00192 } else {
00193 Subscriber sub = node.newSubscriber(name, type,
00194 new MessageListenerForwarding(id, this));
00195 subscribers.put(id, sub);
00196 node.getLog().info(
00197 "Adding Subscriber " + name + " of type " + type);
00198 if (newSubListener != null)
00199 newSubListener.onNewTopic(topic);
00200 }
00201 } catch (Exception e) {
00202 node.getLog().error("Exception while adding topic", e);
00203 }
00204 }
00205
00206 public TopicInfo[] getSubscriptions() {
00207 TopicInfo[] topics = new TopicInfo[subscribers.size()];
00208
00209 int i = 0;
00210 for (Integer id : subscribers.keySet()) {
00211 topics[i++] = id_to_topic.get(id);
00212 }
00213 return topics;
00214 }
00215
00216 public TopicInfo[] getPublications() {
00217 TopicInfo[] topics = new TopicInfo[publishers.size()];
00218
00219 int i = 0;
00220 for (Integer id : publishers.keySet()) {
00221 topics[i++] = id_to_topic.get(id);
00222 }
00223 return topics;
00224 }
00225
00230 private Timer connection_timer = new Timer();
00231 static final int CONNECTION_TIMOUT_PERIOD = 10000;
00232 TimerTask timer_cb = new TimerTask() {
00233
00234 @Override
00235 public void run() {
00236
00237 if (sync_requested){
00238 connected = true;
00239 sync_requested = false;
00240
00241 }
00242 else{
00243 node.getLog().info("Connection to client lost. Topic negotiation requested");
00244 connected = false;
00245 negotiateTopics();
00246 }
00247 }
00248 };
00249
00250 private boolean sync_requested=false;
00251 private boolean connected =false;
00252
00253 public void start(){
00254 connection_timer.scheduleAtFixedRate(timer_cb, CONNECTION_TIMOUT_PERIOD, CONNECTION_TIMOUT_PERIOD);
00255 }
00256
00267 public boolean parsePacket(int topic_id, byte[] msg_data) {
00268
00269 switch (topic_id) {
00270 case TopicInfo.ID_PUBLISHER:
00271 TopicInfo pm = new TopicInfo();
00272 pm.deserialize(msg_data);
00273 addTopic(pm, true);
00274 connected = true;
00275 break;
00276
00277 case TopicInfo.ID_SUBSCRIBER:
00278 TopicInfo sm = new TopicInfo();
00279 sm.deserialize(msg_data);
00280 addTopic(sm, false);
00281 connected =true;
00282 break;
00283
00284 case TopicInfo.ID_SERVICE_SERVER:
00285 break;
00286 case TopicInfo.ID_SERVICE_CLIENT:
00287 break;
00288 case TopicInfo.ID_PARAMETER_REQUEST:
00289 handleParameterRequest(msg_data);
00290 break;
00291 case TopicInfo.ID_LOG:
00292 handleLogging(msg_data);
00293 break;
00294 case TopicInfo.ID_TIME:
00295 sync_requested = true;
00296 org.ros.message.Time t = node.getCurrentTime();
00297 org.ros.message.std_msgs.Time t_msg = new org.ros.message.std_msgs.Time();
00298 t_msg.data = t;
00299 packetHandler.send(constructMessage(TOPIC_TIME, t_msg));
00300 break;
00301
00302 default:
00303 MessageDeserializer c = msg_deserializers.get(topic_id);
00304 if (c != null) {
00305
00306 ByteBuffer bb = ByteBuffer.wrap(msg_data);
00307 Message msg = (Message) c.deserialize(bb.asReadOnlyBuffer().order(ByteOrder.LITTLE_ENDIAN));
00308 publishers.get(topic_id).publish(msg);
00309 } else {
00310 node.getLog().info(
00311 "Trying to publish to unregistered ID #" + topic_id);
00312
00313
00314 negotiateTopics();
00315 }
00316 break;
00317 }
00318
00319 return false;
00320 }
00321
00327 private void handleLogging(byte[] msg_data){
00328 Log log_msg = new Log();
00329 log_msg.deserialize(msg_data);
00330 switch(log_msg.level){
00331 case Log.DEBUG:
00332 node.getLog().debug(log_msg.msg);
00333 break;
00334 case Log.INFO:
00335 node.getLog().info(log_msg.msg);
00336 break;
00337 case Log.WARN:
00338 node.getLog().warn(log_msg.msg);
00339 break;
00340 case Log.ERROR:
00341 node.getLog().error(log_msg.msg);
00342 break;
00343 case Log.FATAL:
00344 node.getLog().fatal(log_msg.msg);
00345 break;
00346 }
00347 }
00348
00349
00350 ParameterTree paramT ;
00351 private void handleParameterRequest(byte[] msg_data){
00352 RequestParam rp = new RequestParam();
00353 RequestParam.Request req = rp.createRequest();
00354 req.deserialize(msg_data);
00355
00356 RequestParam.Response resp = rp.createResponse();
00357
00358 }
00359
00366 private static class MessageListenerForwarding<MessageType> implements
00367 org.ros.message.MessageListener<MessageType> {
00371 private Protocol protocol;
00372
00376 private int id;
00377
00378 public MessageListenerForwarding(int topic_id, Protocol p) {
00379 protocol = p;
00380 id = topic_id;
00381 }
00382
00383 @Override
00384 public void onNewMessage(MessageType t) {
00385 byte[] data = protocol.constructMessage(id, (Message) t);
00386 protocol.packetHandler.send(data);
00387 }
00388 }
00389
00395 public interface PacketHandler {
00402 void send(byte[] data);
00403 }
00404 }