00001 /* 00002 * Licensed to the Apache Software Foundation (ASF) under one 00003 * or more contributor license agreements. See the NOTICE file 00004 * distributed with this work for additional information 00005 * regarding copyright ownership. The ASF licenses this file 00006 * to you under the Apache License, Version 2.0 (the 00007 * "License"); you may not use this file except in compliance 00008 * with the License. You may obtain a copy of the License at 00009 * 00010 * http://www.apache.org/licenses/LICENSE-2.0 00011 * 00012 * Unless required by applicable law or agreed to in writing, 00013 * software distributed under the License is distributed on an 00014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 00015 * KIND, either express or implied. See the License for the 00016 * specific language governing permissions and limitations 00017 * under the License. 00018 */ 00019 package org.apache.xmlrpc.util; 00020 00021 import java.util.ArrayList; 00022 import java.util.List; 00023 00024 00028 public class ThreadPool { 00031 public interface Task { 00035 void run() throws Throwable; 00036 } 00037 00040 public interface InterruptableTask extends Task { 00044 void shutdown() throws Throwable; 00045 } 00046 00047 private class Poolable { 00048 private volatile boolean shuttingDown; 00049 private Task task; 00050 private Thread thread; 00051 Poolable(ThreadGroup pGroup, int pNum) { 00052 thread = new Thread(pGroup, pGroup.getName() + "-" + pNum){ 00053 public void run() { 00054 while (!shuttingDown) { 00055 final Task t = getTask(); 00056 if (t == null) { 00057 try { 00058 synchronized (this) { 00059 if (!shuttingDown && getTask() == null) { 00060 wait(); 00061 } 00062 } 00063 } catch (InterruptedException e) { 00064 // Do nothing 00065 } 00066 } else { 00067 try { 00068 t.run(); 00069 resetTask(); 00070 repool(Poolable.this); 00071 } catch (Throwable e) { 00072 remove(Poolable.this); 00073 Poolable.this.shutdown(); 00074 resetTask(); 00075 } 00076 } 00077 } 00078 } 00079 }; 00080 thread.start(); 00081 } 00082 synchronized void shutdown() { 00083 shuttingDown = true; 00084 final Task t = getTask(); 00085 if (t != null && t instanceof InterruptableTask) { 00086 try { 00087 ((InterruptableTask) t).shutdown(); 00088 } catch (Throwable th) { 00089 // Ignore me 00090 } 00091 } 00092 task = null; 00093 synchronized (thread) { 00094 thread.notify(); 00095 } 00096 } 00097 private Task getTask() { 00098 return task; 00099 } 00100 private void resetTask() { 00101 task = null; 00102 } 00103 void start(Task pTask) { 00104 task = pTask; 00105 synchronized (thread) { 00106 thread.notify(); 00107 } 00108 } 00109 } 00110 00111 private final ThreadGroup threadGroup; 00112 private final int maxSize; 00113 private final List waitingThreads = new ArrayList(); 00114 private final List runningThreads = new ArrayList(); 00115 private final List waitingTasks = new ArrayList(); 00116 private int num; 00117 00118 00123 public ThreadPool(int pMaxSize, String pName) { 00124 maxSize = pMaxSize; 00125 threadGroup = new ThreadGroup(pName); 00126 } 00127 00128 private synchronized void remove(Poolable pPoolable) { 00129 runningThreads.remove(pPoolable); 00130 waitingThreads.remove(pPoolable); 00131 } 00132 00133 void repool(Poolable pPoolable) { 00134 boolean discarding = false; 00135 Task task = null; 00136 Poolable poolable = null; 00137 synchronized (this) { 00138 if (runningThreads.remove(pPoolable)) { 00139 if (maxSize != 0 && runningThreads.size() + waitingThreads.size() >= maxSize) { 00140 discarding = true; 00141 } else { 00142 waitingThreads.add(pPoolable); 00143 if (waitingTasks.size() > 0) { 00144 task = (Task) waitingTasks.remove(waitingTasks.size() - 1); 00145 poolable = getPoolable(task, false); 00146 } 00147 } 00148 } else { 00149 discarding = true; 00150 } 00151 if (discarding) { 00152 remove(pPoolable); 00153 } 00154 } 00155 if (poolable != null) { 00156 poolable.start(task); 00157 } 00158 if (discarding) { 00159 pPoolable.shutdown(); 00160 } 00161 } 00162 00169 public boolean startTask(Task pTask) { 00170 final Poolable poolable = getPoolable(pTask, false); 00171 if (poolable == null) { 00172 return false; 00173 } 00174 poolable.start(pTask); 00175 return true; 00176 } 00177 00178 private synchronized Poolable getPoolable(Task pTask, boolean pQueue) { 00179 if (maxSize != 0 && runningThreads.size() >= maxSize) { 00180 if (pQueue) { 00181 waitingTasks.add(pTask); 00182 } 00183 return null; 00184 } 00185 Poolable poolable; 00186 if (waitingThreads.size() > 0) { 00187 poolable = (Poolable) waitingThreads.remove(waitingThreads.size()-1); 00188 } else { 00189 poolable = new Poolable(threadGroup, num++); 00190 } 00191 runningThreads.add(poolable); 00192 return poolable; 00193 } 00194 00201 public boolean addTask(Task pTask) { 00202 final Poolable poolable = getPoolable(pTask, true); 00203 if (poolable != null) { 00204 poolable.start(pTask); 00205 return true; 00206 } 00207 return false; 00208 } 00209 00212 public synchronized void shutdown() { 00213 while (!waitingThreads.isEmpty()) { 00214 Poolable poolable = (Poolable) waitingThreads.remove(waitingThreads.size()-1); 00215 poolable.shutdown(); 00216 } 00217 while (!runningThreads.isEmpty()) { 00218 Poolable poolable = (Poolable) runningThreads.remove(runningThreads.size()-1); 00219 poolable.shutdown(); 00220 } 00221 } 00222 00226 public int getMaxThreads() { return maxSize; } 00227 00231 public synchronized int getNumThreads() { return num; } 00232 }