ThreadPool.java
Go to the documentation of this file.
00001 /*
00002  * Licensed to the Apache Software Foundation (ASF) under one
00003  * or more contributor license agreements.  See the NOTICE file
00004  * distributed with this work for additional information
00005  * regarding copyright ownership.  The ASF licenses this file
00006  * to you under the Apache License, Version 2.0 (the
00007  * "License"); you may not use this file except in compliance
00008  * with the License.  You may obtain a copy of the License at
00009  *
00010  *   http://www.apache.org/licenses/LICENSE-2.0
00011  *
00012  * Unless required by applicable law or agreed to in writing,
00013  * software distributed under the License is distributed on an
00014  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
00015  * KIND, either express or implied.  See the License for the
00016  * specific language governing permissions and limitations
00017  * under the License.    
00018  */
00019 package org.apache.xmlrpc.util;
00020 
00021 import java.util.ArrayList;
00022 import java.util.List;
00023 
00024 
00028 public class ThreadPool {
00031         public interface Task {
00035                 void run() throws Throwable;
00036         }
00037 
00040     public interface InterruptableTask extends Task {
00044         void shutdown() throws Throwable;
00045     }
00046 
00047     private class Poolable {
00048         private volatile boolean shuttingDown;
00049         private Task task;
00050         private Thread thread;
00051         Poolable(ThreadGroup pGroup, int pNum) {
00052             thread = new Thread(pGroup, pGroup.getName() + "-" + pNum){
00053                 public void run() {
00054                     while (!shuttingDown) {
00055                         final Task t = getTask();
00056                         if (t == null) {
00057                             try {
00058                                 synchronized (this) {
00059                                     if (!shuttingDown  &&  getTask() == null) {
00060                                         wait();
00061                                     }
00062                                 }
00063                             } catch (InterruptedException e) {
00064                                 // Do nothing
00065                             }
00066                         } else {
00067                             try {
00068                                 t.run();
00069                                 resetTask();
00070                                 repool(Poolable.this);
00071                             } catch (Throwable e) {
00072                                 remove(Poolable.this);
00073                                 Poolable.this.shutdown();
00074                                 resetTask();
00075                             }
00076                         }
00077                     }
00078                 }
00079             };
00080             thread.start();
00081         }
00082         synchronized void shutdown() {
00083             shuttingDown = true;
00084             final Task t = getTask();
00085             if (t != null  &&  t instanceof InterruptableTask) {
00086                 try {
00087                     ((InterruptableTask) t).shutdown();
00088                 } catch (Throwable th) {
00089                     // Ignore me
00090                 }
00091             }
00092             task = null;
00093             synchronized (thread) {
00094                 thread.notify();
00095             }
00096         }
00097         private Task getTask() {
00098             return task;
00099         }
00100         private void resetTask() {
00101             task = null;
00102         }
00103         void start(Task pTask) {
00104             task = pTask;
00105             synchronized (thread) {
00106                 thread.notify();
00107             }
00108         }
00109     }
00110 
00111         private final ThreadGroup threadGroup;
00112         private final int maxSize;
00113         private final List waitingThreads = new ArrayList();
00114         private final List runningThreads = new ArrayList();
00115         private final List waitingTasks = new ArrayList();
00116         private int num;
00117 
00118 
00123         public ThreadPool(int pMaxSize, String pName) {
00124                 maxSize = pMaxSize;
00125                 threadGroup = new ThreadGroup(pName);
00126         }
00127 
00128         private synchronized void remove(Poolable pPoolable) {
00129         runningThreads.remove(pPoolable);
00130         waitingThreads.remove(pPoolable);
00131         }
00132 
00133         void repool(Poolable pPoolable) {
00134             boolean discarding = false;
00135             Task task = null;
00136             Poolable poolable = null;
00137             synchronized (this) {
00138                 if (runningThreads.remove(pPoolable)) {
00139                     if (maxSize != 0  &&  runningThreads.size() + waitingThreads.size() >= maxSize) {
00140                         discarding = true;
00141                     } else {
00142                         waitingThreads.add(pPoolable);
00143                         if (waitingTasks.size() > 0) {
00144                             task = (Task) waitingTasks.remove(waitingTasks.size() - 1);
00145                             poolable = getPoolable(task, false);
00146                         }
00147                     }
00148                 } else {
00149                     discarding = true;
00150                 }
00151                 if (discarding) {
00152                     remove(pPoolable);
00153                 }
00154             }
00155             if (poolable != null) {
00156                 poolable.start(task);
00157             }
00158             if (discarding) {
00159                 pPoolable.shutdown();
00160             }
00161         }
00162 
00169         public boolean startTask(Task pTask) {
00170             final Poolable poolable = getPoolable(pTask, false);
00171             if (poolable == null) {
00172                 return false;
00173             }
00174             poolable.start(pTask);
00175                 return true;
00176         }
00177 
00178         private synchronized Poolable getPoolable(Task pTask, boolean pQueue) {
00179         if (maxSize != 0  &&  runningThreads.size() >= maxSize) {
00180             if (pQueue) {
00181                 waitingTasks.add(pTask);
00182             }
00183             return null;
00184         }
00185         Poolable poolable;
00186         if (waitingThreads.size() > 0) {
00187             poolable = (Poolable) waitingThreads.remove(waitingThreads.size()-1);
00188         } else {
00189             poolable = new Poolable(threadGroup, num++);
00190         }
00191         runningThreads.add(poolable);
00192         return poolable;
00193         }
00194         
00201         public boolean addTask(Task pTask) {
00202             final Poolable poolable = getPoolable(pTask, true);
00203             if (poolable != null) {
00204                 poolable.start(pTask);
00205                 return true;
00206             }
00207             return false;
00208         }
00209 
00212         public synchronized void shutdown() {
00213         while (!waitingThreads.isEmpty()) {
00214             Poolable poolable = (Poolable) waitingThreads.remove(waitingThreads.size()-1);
00215             poolable.shutdown();
00216         }
00217         while (!runningThreads.isEmpty()) {
00218             Poolable poolable = (Poolable) runningThreads.remove(runningThreads.size()-1);
00219             poolable.shutdown();
00220         }
00221         }
00222 
00226         public int getMaxThreads() { return maxSize; }
00227 
00231     public synchronized int getNumThreads() { return num; }
00232 }


rosjava_core
Author(s):
autogenerated on Wed Aug 26 2015 16:06:49