Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 package org.ros.concurrent;
00018
00019 import static org.junit.Assert.assertTrue;
00020
00021 import org.junit.Before;
00022 import org.junit.Test;
00023
00024 import java.util.concurrent.CountDownLatch;
00025 import java.util.concurrent.ExecutorService;
00026 import java.util.concurrent.Executors;
00027 import java.util.concurrent.TimeUnit;
00028 import java.util.concurrent.atomic.AtomicInteger;
00029
00033 public class ListenerGroupTest {
00034
00035 private ExecutorService executorService;
00036 private ListenerGroup<Runnable> listenerGroup;
00037
00038 @Before
00039 public void before() {
00040 executorService = Executors.newCachedThreadPool();
00041 listenerGroup = new ListenerGroup<Runnable>(executorService);
00042 }
00043
00044 @Test
00045 public void testOneListenerMultipleSignals() throws InterruptedException {
00046 int numberOfSignals = 10;
00047 final CountDownLatch latch = new CountDownLatch(numberOfSignals);
00048 listenerGroup.add(new Runnable() {
00049 @Override
00050 public void run() {
00051 latch.countDown();
00052 }
00053 });
00054 for (int i = 0; i < numberOfSignals; i++) {
00055 listenerGroup.signal(new SignalRunnable<Runnable>() {
00056 @Override
00057 public void run(Runnable listener) {
00058 listener.run();
00059 }
00060 });
00061 }
00062 assertTrue(latch.await(1, TimeUnit.SECONDS));
00063 }
00064
00065 @Test
00066 public void testMultipleListenersMultipleSignals() throws InterruptedException {
00067 int numberOfSignals = 10;
00068 final CountDownLatch latch1 = new CountDownLatch(numberOfSignals);
00069 final CountDownLatch latch2 = new CountDownLatch(numberOfSignals);
00070 listenerGroup.add(new Runnable() {
00071 @Override
00072 public void run() {
00073 latch1.countDown();
00074 }
00075 });
00076 listenerGroup.add(new Runnable() {
00077 @Override
00078 public void run() {
00079 latch2.countDown();
00080 }
00081 });
00082 for (int i = 0; i < numberOfSignals; i++) {
00083 listenerGroup.signal(new SignalRunnable<Runnable>() {
00084 @Override
00085 public void run(Runnable listener) {
00086 listener.run();
00087 }
00088 });
00089 }
00090 assertTrue(latch1.await(1, TimeUnit.SECONDS));
00091 assertTrue(latch2.await(1, TimeUnit.SECONDS));
00092 }
00093
00094 private interface CountingListener {
00095 void run(int count);
00096 }
00097
00098 @Test
00099 public void testSignalOrder() throws InterruptedException {
00100 int numberOfSignals = 100;
00101 final CountDownLatch latch = new CountDownLatch(numberOfSignals);
00102
00103 ListenerGroup<CountingListener> listenerGroup =
00104 new ListenerGroup<CountingListener>(executorService);
00105 listenerGroup.add(new CountingListener() {
00106 private AtomicInteger count = new AtomicInteger();
00107
00108 @Override
00109 public void run(int count) {
00110 if (this.count.compareAndSet(count, count + 1)) {
00111 latch.countDown();
00112 }
00113 try {
00114
00115
00116 Thread.sleep(5);
00117 } catch (InterruptedException e) {
00118 }
00119 }
00120 });
00121
00122 for (int i = 0; i < numberOfSignals; i++) {
00123 final int count = i;
00124 listenerGroup.signal(new SignalRunnable<CountingListener>() {
00125 @Override
00126 public void run(CountingListener listener) {
00127 listener.run(count);
00128 }
00129 });
00130 }
00131
00132 assertTrue(latch.await(1, TimeUnit.SECONDS));
00133 }
00134 }