Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 package org.ros.internal.node.topic;
00018
00019 import com.google.common.annotations.VisibleForTesting;
00020 import com.google.common.collect.Sets;
00021
00022 import org.apache.commons.logging.Log;
00023 import org.apache.commons.logging.LogFactory;
00024 import org.ros.concurrent.ListenerGroup;
00025 import org.ros.concurrent.SignalRunnable;
00026 import org.ros.internal.node.server.NodeIdentifier;
00027 import org.ros.internal.transport.ProtocolNames;
00028 import org.ros.internal.transport.queue.IncomingMessageQueue;
00029 import org.ros.internal.transport.tcp.TcpClientManager;
00030 import org.ros.message.MessageDeserializer;
00031 import org.ros.message.MessageListener;
00032 import org.ros.node.topic.DefaultSubscriberListener;
00033 import org.ros.node.topic.Publisher;
00034 import org.ros.node.topic.Subscriber;
00035 import org.ros.node.topic.SubscriberListener;
00036
00037 import java.net.InetSocketAddress;
00038 import java.util.Collection;
00039 import java.util.Set;
00040 import java.util.concurrent.ScheduledExecutorService;
00041 import java.util.concurrent.TimeUnit;
00042
00048 public class DefaultSubscriber<T> extends DefaultTopicParticipant implements Subscriber<T> {
00049
00050 private static final Log log = LogFactory.getLog(DefaultPublisher.class);
00051
00057 private static final int DEFAULT_SHUTDOWN_TIMEOUT = 5;
00058 private static final TimeUnit DEFAULT_SHUTDOWN_TIMEOUT_UNITS = TimeUnit.SECONDS;
00059
00060 private final NodeIdentifier nodeIdentifier;
00061 private final ScheduledExecutorService executorService;
00062 private final IncomingMessageQueue<T> incomingMessageQueue;
00063 private final Set<PublisherIdentifier> knownPublishers;
00064 private final TcpClientManager tcpClientManager;
00065 private final Object mutex;
00066
00070 private final ListenerGroup<SubscriberListener<T>> subscriberListeners;
00071
00072 public static <S> DefaultSubscriber<S> newDefault(NodeIdentifier nodeIdentifier,
00073 TopicDeclaration description, ScheduledExecutorService executorService,
00074 MessageDeserializer<S> deserializer) {
00075 return new DefaultSubscriber<S>(nodeIdentifier, description, deserializer, executorService);
00076 }
00077
00078 private DefaultSubscriber(NodeIdentifier nodeIdentifier, TopicDeclaration topicDeclaration,
00079 MessageDeserializer<T> deserializer, ScheduledExecutorService executorService) {
00080 super(topicDeclaration);
00081 this.nodeIdentifier = nodeIdentifier;
00082 this.executorService = executorService;
00083 incomingMessageQueue = new IncomingMessageQueue<T>(deserializer, executorService);
00084 knownPublishers = Sets.newHashSet();
00085 tcpClientManager = new TcpClientManager(executorService);
00086 mutex = new Object();
00087 SubscriberHandshakeHandler<T> subscriberHandshakeHandler =
00088 new SubscriberHandshakeHandler<T>(toDeclaration().toConnectionHeader(),
00089 incomingMessageQueue, executorService);
00090 tcpClientManager.addNamedChannelHandler(subscriberHandshakeHandler);
00091 subscriberListeners = new ListenerGroup<SubscriberListener<T>>(executorService);
00092 subscriberListeners.add(new DefaultSubscriberListener<T>() {
00093 @Override
00094 public void onMasterRegistrationSuccess(Subscriber<T> registrant) {
00095 log.info("Subscriber registered: " + DefaultSubscriber.this);
00096 }
00097
00098 @Override
00099 public void onMasterRegistrationFailure(Subscriber<T> registrant) {
00100 log.info("Subscriber registration failed: " + DefaultSubscriber.this);
00101 }
00102
00103 @Override
00104 public void onMasterUnregistrationSuccess(Subscriber<T> registrant) {
00105 log.info("Subscriber unregistered: " + DefaultSubscriber.this);
00106 }
00107
00108 @Override
00109 public void onMasterUnregistrationFailure(Subscriber<T> registrant) {
00110 log.info("Subscriber unregistration failed: " + DefaultSubscriber.this);
00111 }
00112 });
00113 }
00114
00115 public SubscriberIdentifier toIdentifier() {
00116 return new SubscriberIdentifier(nodeIdentifier, getTopicDeclaration().getIdentifier());
00117 }
00118
00119 public SubscriberDeclaration toDeclaration() {
00120 return new SubscriberDeclaration(toIdentifier(), getTopicDeclaration());
00121 }
00122
00123 public Collection<String> getSupportedProtocols() {
00124 return ProtocolNames.SUPPORTED;
00125 }
00126
00127 @Override
00128 public boolean getLatchMode() {
00129 return incomingMessageQueue.getLatchMode();
00130 }
00131
00132 @Override
00133 public void addMessageListener(MessageListener<T> messageListener, int limit) {
00134 incomingMessageQueue.addListener(messageListener, limit);
00135 }
00136
00137 @Override
00138 public void addMessageListener(MessageListener<T> messageListener) {
00139 addMessageListener(messageListener, 1);
00140 }
00141
00142 @VisibleForTesting
00143 public void addPublisher(PublisherIdentifier publisherIdentifier, InetSocketAddress address) {
00144 synchronized (mutex) {
00145
00146
00147 if (knownPublishers.contains(publisherIdentifier)) {
00148 return;
00149 }
00150 tcpClientManager.connect(toString(), address);
00151
00152
00153 knownPublishers.add(publisherIdentifier);
00154 signalOnNewPublisher(publisherIdentifier);
00155 }
00156 }
00157
00166 public void updatePublishers(Collection<PublisherIdentifier> publisherIdentifiers) {
00167 for (final PublisherIdentifier publisherIdentifier : publisherIdentifiers) {
00168 executorService.execute(new UpdatePublisherRunnable<T>(this, nodeIdentifier,
00169 publisherIdentifier));
00170 }
00171 }
00172
00173 @Override
00174 public void shutdown(long timeout, TimeUnit unit) {
00175 signalOnShutdown(timeout, unit);
00176 incomingMessageQueue.shutdown();
00177 tcpClientManager.shutdown();
00178 subscriberListeners.shutdown();
00179 }
00180
00181 @Override
00182 public void shutdown() {
00183 shutdown(DEFAULT_SHUTDOWN_TIMEOUT, DEFAULT_SHUTDOWN_TIMEOUT_UNITS);
00184 }
00185
00186 @Override
00187 public void addSubscriberListener(SubscriberListener<T> listener) {
00188 subscriberListeners.add(listener);
00189 }
00190
00197 @Override
00198 public void signalOnMasterRegistrationSuccess() {
00199 final Subscriber<T> subscriber = this;
00200 subscriberListeners.signal(new SignalRunnable<SubscriberListener<T>>() {
00201 @Override
00202 public void run(SubscriberListener<T> listener) {
00203 listener.onMasterRegistrationSuccess(subscriber);
00204 }
00205 });
00206 }
00207
00215 @Override
00216 public void signalOnMasterRegistrationFailure() {
00217 final Subscriber<T> subscriber = this;
00218 subscriberListeners.signal(new SignalRunnable<SubscriberListener<T>>() {
00219 @Override
00220 public void run(SubscriberListener<T> listener) {
00221 listener.onMasterRegistrationFailure(subscriber);
00222 }
00223 });
00224 }
00225
00232 @Override
00233 public void signalOnMasterUnregistrationSuccess() {
00234 final Subscriber<T> subscriber = this;
00235 subscriberListeners.signal(new SignalRunnable<SubscriberListener<T>>() {
00236 @Override
00237 public void run(SubscriberListener<T> listener) {
00238 listener.onMasterUnregistrationSuccess(subscriber);
00239 }
00240 });
00241 }
00242
00249 @Override
00250 public void signalOnMasterUnregistrationFailure() {
00251 final Subscriber<T> subscriber = this;
00252 subscriberListeners.signal(new SignalRunnable<SubscriberListener<T>>() {
00253 @Override
00254 public void run(SubscriberListener<T> listener) {
00255 listener.onMasterUnregistrationFailure(subscriber);
00256 }
00257 });
00258 }
00259
00266 public void signalOnNewPublisher(final PublisherIdentifier publisherIdentifier) {
00267 final Subscriber<T> subscriber = this;
00268 subscriberListeners.signal(new SignalRunnable<SubscriberListener<T>>() {
00269 @Override
00270 public void run(SubscriberListener<T> listener) {
00271 listener.onNewPublisher(subscriber, publisherIdentifier);
00272 }
00273 });
00274 }
00275
00282 private void signalOnShutdown(long timeout, TimeUnit unit) {
00283 final Subscriber<T> subscriber = this;
00284 try {
00285 subscriberListeners.signal(new SignalRunnable<SubscriberListener<T>>() {
00286 @Override
00287 public void run(SubscriberListener<T> listener) {
00288 listener.onShutdown(subscriber);
00289 }
00290 }, timeout, unit);
00291 } catch (InterruptedException e) {
00292
00293
00294 }
00295 }
00296
00297 @Override
00298 public String toString() {
00299 return "Subscriber<" + getTopicDeclaration() + ">";
00300 }
00301 }