DefaultSubscriber.java
Go to the documentation of this file.
00001 /*
00002  * Copyright (C) 2011 Google Inc.
00003  * 
00004  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
00005  * use this file except in compliance with the License. You may obtain a copy of
00006  * the License at
00007  * 
00008  * http://www.apache.org/licenses/LICENSE-2.0
00009  * 
00010  * Unless required by applicable law or agreed to in writing, software
00011  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
00012  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
00013  * License for the specific language governing permissions and limitations under
00014  * the License.
00015  */
00016 
00017 package org.ros.internal.node.topic;
00018 
00019 import com.google.common.annotations.VisibleForTesting;
00020 import com.google.common.collect.Sets;
00021 
00022 import org.apache.commons.logging.Log;
00023 import org.apache.commons.logging.LogFactory;
00024 import org.ros.concurrent.ListenerGroup;
00025 import org.ros.concurrent.SignalRunnable;
00026 import org.ros.internal.node.server.NodeIdentifier;
00027 import org.ros.internal.transport.ProtocolNames;
00028 import org.ros.internal.transport.queue.IncomingMessageQueue;
00029 import org.ros.internal.transport.tcp.TcpClientManager;
00030 import org.ros.message.MessageDeserializer;
00031 import org.ros.message.MessageListener;
00032 import org.ros.node.topic.DefaultSubscriberListener;
00033 import org.ros.node.topic.Publisher;
00034 import org.ros.node.topic.Subscriber;
00035 import org.ros.node.topic.SubscriberListener;
00036 
00037 import java.net.InetSocketAddress;
00038 import java.util.Collection;
00039 import java.util.Set;
00040 import java.util.concurrent.ScheduledExecutorService;
00041 import java.util.concurrent.TimeUnit;
00042 
00048 public class DefaultSubscriber<T> extends DefaultTopicParticipant implements Subscriber<T> {
00049 
00050   private static final Log log = LogFactory.getLog(DefaultPublisher.class);
00051 
00057   private static final int DEFAULT_SHUTDOWN_TIMEOUT = 5;
00058   private static final TimeUnit DEFAULT_SHUTDOWN_TIMEOUT_UNITS = TimeUnit.SECONDS;
00059 
00060   private final NodeIdentifier nodeIdentifier;
00061   private final ScheduledExecutorService executorService;
00062   private final IncomingMessageQueue<T> incomingMessageQueue;
00063   private final Set<PublisherIdentifier> knownPublishers;
00064   private final TcpClientManager tcpClientManager;
00065   private final Object mutex;
00066 
00070   private final ListenerGroup<SubscriberListener<T>> subscriberListeners;
00071 
00072   public static <S> DefaultSubscriber<S> newDefault(NodeIdentifier nodeIdentifier,
00073       TopicDeclaration description, ScheduledExecutorService executorService,
00074       MessageDeserializer<S> deserializer) {
00075     return new DefaultSubscriber<S>(nodeIdentifier, description, deserializer, executorService);
00076   }
00077 
00078   private DefaultSubscriber(NodeIdentifier nodeIdentifier, TopicDeclaration topicDeclaration,
00079       MessageDeserializer<T> deserializer, ScheduledExecutorService executorService) {
00080     super(topicDeclaration);
00081     this.nodeIdentifier = nodeIdentifier;
00082     this.executorService = executorService;
00083     incomingMessageQueue = new IncomingMessageQueue<T>(deserializer, executorService);
00084     knownPublishers = Sets.newHashSet();
00085     tcpClientManager = new TcpClientManager(executorService);
00086     mutex = new Object();
00087     SubscriberHandshakeHandler<T> subscriberHandshakeHandler =
00088         new SubscriberHandshakeHandler<T>(toDeclaration().toConnectionHeader(),
00089             incomingMessageQueue, executorService);
00090     tcpClientManager.addNamedChannelHandler(subscriberHandshakeHandler);
00091     subscriberListeners = new ListenerGroup<SubscriberListener<T>>(executorService);
00092     subscriberListeners.add(new DefaultSubscriberListener<T>() {
00093       @Override
00094       public void onMasterRegistrationSuccess(Subscriber<T> registrant) {
00095         log.info("Subscriber registered: " + DefaultSubscriber.this);
00096       }
00097 
00098       @Override
00099       public void onMasterRegistrationFailure(Subscriber<T> registrant) {
00100         log.info("Subscriber registration failed: " + DefaultSubscriber.this);
00101       }
00102 
00103       @Override
00104       public void onMasterUnregistrationSuccess(Subscriber<T> registrant) {
00105         log.info("Subscriber unregistered: " + DefaultSubscriber.this);
00106       }
00107 
00108       @Override
00109       public void onMasterUnregistrationFailure(Subscriber<T> registrant) {
00110         log.info("Subscriber unregistration failed: " + DefaultSubscriber.this);
00111       }
00112     });
00113   }
00114 
00115   public SubscriberIdentifier toIdentifier() {
00116     return new SubscriberIdentifier(nodeIdentifier, getTopicDeclaration().getIdentifier());
00117   }
00118 
00119   public SubscriberDeclaration toDeclaration() {
00120     return new SubscriberDeclaration(toIdentifier(), getTopicDeclaration());
00121   }
00122 
00123   public Collection<String> getSupportedProtocols() {
00124     return ProtocolNames.SUPPORTED;
00125   }
00126 
00127   @Override
00128   public boolean getLatchMode() {
00129     return incomingMessageQueue.getLatchMode();
00130   }
00131 
00132   @Override
00133   public void addMessageListener(MessageListener<T> messageListener, int limit) {
00134     incomingMessageQueue.addListener(messageListener, limit);
00135   }
00136 
00137   @Override
00138   public void addMessageListener(MessageListener<T> messageListener) {
00139     addMessageListener(messageListener, 1);
00140   }
00141 
00142   @VisibleForTesting
00143   public void addPublisher(PublisherIdentifier publisherIdentifier, InetSocketAddress address) {
00144     synchronized (mutex) {
00145       // TODO(damonkohler): If the connection is dropped, knownPublishers should
00146       // be updated.
00147       if (knownPublishers.contains(publisherIdentifier)) {
00148         return;
00149       }
00150       tcpClientManager.connect(toString(), address);
00151       // TODO(damonkohler): knownPublishers is duplicate information that is
00152       // already available to the TopicParticipantManager.
00153       knownPublishers.add(publisherIdentifier);
00154       signalOnNewPublisher(publisherIdentifier);
00155     }
00156   }
00157 
00166   public void updatePublishers(Collection<PublisherIdentifier> publisherIdentifiers) {
00167     for (final PublisherIdentifier publisherIdentifier : publisherIdentifiers) {
00168       executorService.execute(new UpdatePublisherRunnable<T>(this, nodeIdentifier,
00169           publisherIdentifier));
00170     }
00171   }
00172 
00173   @Override
00174   public void shutdown(long timeout, TimeUnit unit) {
00175     signalOnShutdown(timeout, unit);
00176     incomingMessageQueue.shutdown();
00177     tcpClientManager.shutdown();
00178     subscriberListeners.shutdown();
00179   }
00180 
00181   @Override
00182   public void shutdown() {
00183     shutdown(DEFAULT_SHUTDOWN_TIMEOUT, DEFAULT_SHUTDOWN_TIMEOUT_UNITS);
00184   }
00185 
00186   @Override
00187   public void addSubscriberListener(SubscriberListener<T> listener) {
00188     subscriberListeners.add(listener);
00189   }
00190 
00197   @Override
00198   public void signalOnMasterRegistrationSuccess() {
00199     final Subscriber<T> subscriber = this;
00200     subscriberListeners.signal(new SignalRunnable<SubscriberListener<T>>() {
00201       @Override
00202       public void run(SubscriberListener<T> listener) {
00203         listener.onMasterRegistrationSuccess(subscriber);
00204       }
00205     });
00206   }
00207 
00215   @Override
00216   public void signalOnMasterRegistrationFailure() {
00217     final Subscriber<T> subscriber = this;
00218     subscriberListeners.signal(new SignalRunnable<SubscriberListener<T>>() {
00219       @Override
00220       public void run(SubscriberListener<T> listener) {
00221         listener.onMasterRegistrationFailure(subscriber);
00222       }
00223     });
00224   }
00225 
00232   @Override
00233   public void signalOnMasterUnregistrationSuccess() {
00234     final Subscriber<T> subscriber = this;
00235     subscriberListeners.signal(new SignalRunnable<SubscriberListener<T>>() {
00236       @Override
00237       public void run(SubscriberListener<T> listener) {
00238         listener.onMasterUnregistrationSuccess(subscriber);
00239       }
00240     });
00241   }
00242 
00249   @Override
00250   public void signalOnMasterUnregistrationFailure() {
00251     final Subscriber<T> subscriber = this;
00252     subscriberListeners.signal(new SignalRunnable<SubscriberListener<T>>() {
00253       @Override
00254       public void run(SubscriberListener<T> listener) {
00255         listener.onMasterUnregistrationFailure(subscriber);
00256       }
00257     });
00258   }
00259 
00266   public void signalOnNewPublisher(final PublisherIdentifier publisherIdentifier) {
00267     final Subscriber<T> subscriber = this;
00268     subscriberListeners.signal(new SignalRunnable<SubscriberListener<T>>() {
00269       @Override
00270       public void run(SubscriberListener<T> listener) {
00271         listener.onNewPublisher(subscriber, publisherIdentifier);
00272       }
00273     });
00274   }
00275 
00282   private void signalOnShutdown(long timeout, TimeUnit unit) {
00283     final Subscriber<T> subscriber = this;
00284     try {
00285       subscriberListeners.signal(new SignalRunnable<SubscriberListener<T>>() {
00286         @Override
00287         public void run(SubscriberListener<T> listener) {
00288           listener.onShutdown(subscriber);
00289         }
00290       }, timeout, unit);
00291     } catch (InterruptedException e) {
00292       // Ignored since we do not guarantee that all listeners will finish before
00293       // shutdown begins.
00294     }
00295   }
00296 
00297   @Override
00298   public String toString() {
00299     return "Subscriber<" + getTopicDeclaration() + ">";
00300   }
00301 }


rosjava_core
Author(s):
autogenerated on Wed Aug 26 2015 16:06:49