Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 package org.ros.rosserial;
00034
00035 import java.io.BufferedInputStream;
00036 import java.io.IOException;
00037 import java.io.InputStream;
00038 import java.io.OutputStream;
00039 import java.util.Timer;
00040
00041 import org.ros.message.rosserial_msgs.TopicInfo;
00042 import org.ros.node.Node;
00043
00049 public class ROSSerial implements Runnable {
00053 public static final byte[] FLAGS = { (byte) 0xff, (byte) 0xff };
00054
00059 private static final int MAX_MSG_DATA_SIZE =512;
00060
00064 private OutputStream ostream;
00065
00069 private InputStream istream;
00070
00074 private Node node;
00075
00079 private Protocol protocol;
00080
00086 public void setOnNewPublication(TopicRegistrationListener listener) {
00087 protocol.setOnNewPublication(listener);
00088 }
00089
00095 public void setOnNewSubcription(TopicRegistrationListener listener) {
00096 protocol.setOnNewSubcription(listener);
00097 }
00098
00099
00100 public TopicInfo[] getSubscriptions() {
00101 return protocol.getSubscriptions();
00102 }
00103
00104 public TopicInfo[] getPublications() {
00105 return protocol.getPublications();
00106 }
00107
00111 private boolean running = false;
00112
00113
00114 private enum PACKET_STATE {
00115 FLAGA, FLAGB, HEADER, DATA, CHECKSUM
00116 };
00117
00118 private PACKET_STATE packet_state;
00119 private byte[] header = new byte[4];
00120 private byte[] data = new byte[MAX_MSG_DATA_SIZE];
00121 private int data_len = 0;
00122 private int byte_index = 0;;
00123
00127 Protocol.PacketHandler sendHandler = new Protocol.PacketHandler() {
00128
00129 @Override
00130 public void send(byte[] data) {
00131
00132 int chk = 0;
00133 for (int i = 0; i < data.length; i++)
00134 chk += 0xff & data[i];
00135 chk = 255 - chk % 256;
00136
00137 try {
00138 ostream.write(FLAGS);
00139 ostream.write(data);
00140 ostream.write((byte) chk);
00141 } catch (IOException e) {
00142 System.out.println("Exception sending :"
00143 + BinaryUtils.byteArrayToHexString(data));
00144 e.printStackTrace();
00145 }
00146 }
00147 };
00148
00149 public ROSSerial(Node nh, InputStream input, OutputStream output) {
00150 ostream = output;
00151 istream = input;
00152 node = nh;
00153
00154 protocol = new Protocol(node, sendHandler);
00155
00156 }
00157
00161 public void shutdown() {
00162 running = false;
00163 }
00164
00170 Timer packet_timeout_timer;
00171
00175 public void run() {
00176
00177 protocol.negotiateTopics();
00178 protocol.start();
00179
00180 resetPacket();
00181
00182 running = true;
00183
00184
00185
00186
00187
00188 byte[] buffer = new byte[500];
00189 while (running) {
00190 try {
00191 int ret = istream.read(buffer);
00192 for(int i=0; i<ret;i++) {
00193 handleByte((byte) (0xff & buffer[i]));
00194 }
00195 } catch (IOException e) {
00196 node.getLog().error("Unable to read input stream", e);
00197 System.out.println("Unable to read input stream");
00198
00199 if (e.toString().equals("java.io.IOException: No such device")) {
00200 node.getLog()
00201 .error("Total IO Failure. Now exiting ROSSerial iothread.");
00202 break;
00203 }
00204 resetPacket();
00205 } catch (Exception e) {
00206 node.getLog().error("Unable to read input stream", e);
00207 }
00208 try {
00209
00210
00211 Thread.sleep(10);
00212 } catch (InterruptedException e) {
00213 e.printStackTrace();
00214 break;
00215 }
00216 }
00217 node.getLog().info("Finished ROSSerial IO Thread");
00218 }
00219
00220
00221
00222
00223 private void resetPacket() {
00224 byte_index = 0;
00225 data_len = 0;
00226 packet_state = PACKET_STATE.FLAGA;
00227 }
00228
00229
00230
00231
00232
00233
00234 private boolean handleByte(byte b) {
00235 switch (packet_state) {
00236 case FLAGA:
00237 if (b == (byte) 0xff)
00238 packet_state = PACKET_STATE.FLAGB;
00239 break;
00240 case FLAGB:
00241 if (b == (byte) 0xff) {
00242 packet_state = PACKET_STATE.HEADER;
00243 } else {
00244 resetPacket();
00245 return false;
00246 }
00247 break;
00248 case HEADER:
00249 header[byte_index] = b;
00250 byte_index++;
00251 if (byte_index == 4) {
00252 int len = (int) (header[3] << 8) | (int) (header[2]);
00253 data_len = len;
00254 byte_index = 0;
00255 packet_state = PACKET_STATE.DATA;
00256 }
00257 break;
00258 case DATA:
00259 data[byte_index] = b;
00260 byte_index++;
00261 if (byte_index == data_len) {
00262 packet_state = PACKET_STATE.CHECKSUM;
00263 }
00264 break;
00265 case CHECKSUM:
00266 int chk = (int) (0xff & b);
00267 for (int i = 0; i < 4; i++)
00268 chk += (int) (0xff & header[i]);
00269 for (int i = 0; i < data_len; i++) {
00270 chk += (int) (0xff & data[i]);
00271 }
00272 if (chk % 256 != 255) {
00273 resetPacket();
00274 System.out.println("Checksum failed!");
00275 return false;
00276 } else {
00277 int topic_id = (int) header[0] | (int) (header[1]) << 8;
00278 resetPacket();
00279 protocol.parsePacket(topic_id, data);
00280 }
00281 break;
00282 }
00283 return true;
00284 }
00285
00286 }