Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 package org.ros.android.acm_serial;
00018
00019 import org.apache.commons.logging.Log;
00020 import org.apache.commons.logging.LogFactory;
00021 import org.ros.concurrent.CancellableLoop;
00022 import org.ros.exception.RosRuntimeException;
00023
00024 import java.io.IOException;
00025 import java.io.InputStream;
00026 import java.util.concurrent.ExecutorService;
00027
00033 public class PollingInputStream extends InputStream {
00034
00035 private final static boolean DEBUG = false;
00036 private final static Log log = LogFactory.getLog(PollingInputStream.class);
00037
00038 private final static int BUFFER_CAPACITY = 512 * 1024;
00039 private final static int READ_SIZE = 256;
00040
00041 private final byte[] readBuffer;
00042
00043 private int readPosition;
00044 private int writePosition;
00045
00052 public PollingInputStream(final InputStream inputStream, ExecutorService executorService) {
00053 readBuffer = new byte[BUFFER_CAPACITY];
00054 readPosition = 0;
00055 writePosition = 0;
00056 executorService.execute(new CancellableLoop() {
00057 @Override
00058 protected void loop() throws InterruptedException {
00059 try {
00060 while (remaining() < READ_SIZE) {
00061 if (readPosition < remaining()) {
00062
00063
00064 log.error("Not enough room to compact buffer.");
00065 Thread.yield();
00066 continue;
00067 }
00068 synchronized (readBuffer) {
00069 int remaining = remaining();
00070 System.arraycopy(readBuffer, writePosition, readBuffer, 0, remaining);
00071 writePosition = remaining;
00072 readPosition = 0;
00073 if (DEBUG) {
00074 log.info(String.format("Buffer compacted. %d bytes remaining.", remaining()));
00075 }
00076 }
00077 }
00078 int bytesRead = inputStream.read(readBuffer, writePosition, READ_SIZE);
00079 if (bytesRead < 0) {
00080 throw new IOException("Stream closed.");
00081 }
00082 writePosition += bytesRead;
00083 } catch (IOException e) {
00084 throw new RosRuntimeException(e);
00085 }
00086 }
00087 });
00088 }
00089
00090 @Override
00091 public synchronized int read(byte[] buffer, int offset, int length) throws IOException {
00092 int bytesRead = 0;
00093 if (length > 0) {
00094 while (available() == 0) {
00095
00096 Thread.yield();
00097 }
00098 synchronized (readBuffer) {
00099 bytesRead = Math.min(length, available());
00100 System.arraycopy(readBuffer, readPosition, buffer, offset, bytesRead);
00101 readPosition += bytesRead;
00102 }
00103 }
00104 return bytesRead;
00105 }
00106
00107 @Override
00108 public int read() throws IOException {
00109 byte[] buffer = new byte[1];
00110 return read(buffer, 0, 1);
00111 }
00112
00113 @Override
00114 public int available() throws IOException {
00115 return writePosition - readPosition;
00116 }
00117
00118 private int remaining() {
00119 return BUFFER_CAPACITY - writePosition;
00120 }
00121 }