PollingInputStream.java
Go to the documentation of this file.
00001 /*
00002  * Copyright (C) 2011 Google Inc.
00003  *
00004  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
00005  * use this file except in compliance with the License. You may obtain a copy of
00006  * the License at
00007  *
00008  * http://www.apache.org/licenses/LICENSE-2.0
00009  *
00010  * Unless required by applicable law or agreed to in writing, software
00011  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
00012  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
00013  * License for the specific language governing permissions and limitations under
00014  * the License.
00015  */
00016 
00017 package org.ros.android.acm_serial;
00018 
00019 import org.apache.commons.logging.Log;
00020 import org.apache.commons.logging.LogFactory;
00021 import org.ros.concurrent.CancellableLoop;
00022 import org.ros.exception.RosRuntimeException;
00023 
00024 import java.io.IOException;
00025 import java.io.InputStream;
00026 import java.util.concurrent.ExecutorService;
00027 
00033 public class PollingInputStream extends InputStream {
00034 
00035   private final static boolean DEBUG = false;
00036   private final static Log log = LogFactory.getLog(PollingInputStream.class);
00037 
00038   private final static int BUFFER_CAPACITY = 512 * 1024;
00039   private final static int READ_SIZE = 256;
00040 
00041   private final byte[] readBuffer;
00042 
00043   private int readPosition;
00044   private int writePosition;
00045 
00052   public PollingInputStream(final InputStream inputStream, ExecutorService executorService) {
00053     readBuffer = new byte[BUFFER_CAPACITY];
00054     readPosition = 0;
00055     writePosition = 0;
00056     executorService.execute(new CancellableLoop() {
00057       @Override
00058       protected void loop() throws InterruptedException {
00059         try {
00060           while (remaining() < READ_SIZE) {
00061             if (readPosition < remaining()) {
00062               // There isn't enough room to compact the buffer yet. We will most
00063               // likely start dropping messages.
00064               log.error("Not enough room to compact buffer.");
00065               Thread.yield();
00066               continue;
00067             }
00068             synchronized (readBuffer) {
00069               int remaining = remaining();
00070               System.arraycopy(readBuffer, writePosition, readBuffer, 0, remaining);
00071               writePosition = remaining;
00072               readPosition = 0;
00073               if (DEBUG) {
00074                 log.info(String.format("Buffer compacted. %d bytes remaining.", remaining()));
00075               }
00076             }
00077           }
00078           int bytesRead = inputStream.read(readBuffer, writePosition, READ_SIZE);
00079           if (bytesRead < 0) {
00080             throw new IOException("Stream closed.");
00081           }
00082           writePosition += bytesRead;
00083         } catch (IOException e) {
00084           throw new RosRuntimeException(e);
00085         }
00086       }
00087     });
00088   }
00089 
00090   @Override
00091   public synchronized int read(byte[] buffer, int offset, int length) throws IOException {
00092     int bytesRead = 0;
00093     if (length > 0) {
00094       while (available() == 0) {
00095         // Block until there are bytes to read.
00096         Thread.yield();
00097       }
00098       synchronized (readBuffer) {
00099         bytesRead = Math.min(length, available());
00100         System.arraycopy(readBuffer, readPosition, buffer, offset, bytesRead);
00101         readPosition += bytesRead;
00102       }
00103     }
00104     return bytesRead;
00105   }
00106 
00107   @Override
00108   public int read() throws IOException {
00109     byte[] buffer = new byte[1];
00110     return read(buffer, 0, 1);
00111   }
00112 
00113   @Override
00114   public int available() throws IOException {
00115     return writePosition - readPosition;
00116   }
00117 
00118   private int remaining() {
00119     return BUFFER_CAPACITY - writePosition;
00120   }
00121 }


android_core
Author(s): Damon Kohler
autogenerated on Thu Aug 27 2015 12:11:33