DefaultPublisher.java
Go to the documentation of this file.
00001 /*
00002  * Copyright (C) 2011 Google Inc.
00003  * 
00004  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
00005  * use this file except in compliance with the License. You may obtain a copy of
00006  * the License at
00007  * 
00008  * http://www.apache.org/licenses/LICENSE-2.0
00009  * 
00010  * Unless required by applicable law or agreed to in writing, software
00011  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
00012  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
00013  * License for the specific language governing permissions and limitations under
00014  * the License.
00015  */
00016 
00017 package org.ros.internal.node.topic;
00018 
00019 import com.google.common.base.Preconditions;
00020 
00021 import org.apache.commons.logging.Log;
00022 import org.apache.commons.logging.LogFactory;
00023 import org.jboss.netty.buffer.ChannelBuffer;
00024 import org.jboss.netty.channel.Channel;
00025 import org.ros.concurrent.ListenerGroup;
00026 import org.ros.concurrent.SignalRunnable;
00027 import org.ros.internal.node.server.NodeIdentifier;
00028 import org.ros.internal.transport.ConnectionHeader;
00029 import org.ros.internal.transport.ConnectionHeaderFields;
00030 import org.ros.internal.transport.queue.OutgoingMessageQueue;
00031 import org.ros.message.MessageFactory;
00032 import org.ros.message.MessageSerializer;
00033 import org.ros.node.topic.DefaultPublisherListener;
00034 import org.ros.node.topic.Publisher;
00035 import org.ros.node.topic.PublisherListener;
00036 import org.ros.node.topic.Subscriber;
00037 
00038 import java.util.concurrent.ScheduledExecutorService;
00039 import java.util.concurrent.TimeUnit;
00040 
00046 public class DefaultPublisher<T> extends DefaultTopicParticipant implements Publisher<T> {
00047 
00048   private static final boolean DEBUG = false;
00049   private static final Log log = LogFactory.getLog(DefaultPublisher.class);
00050 
00056   private static final long DEFAULT_SHUTDOWN_TIMEOUT = 5;
00057   private static final TimeUnit DEFAULT_SHUTDOWN_TIMEOUT_UNITS = TimeUnit.SECONDS;
00058 
00062   private final OutgoingMessageQueue<T> outgoingMessageQueue;
00063   private final ListenerGroup<PublisherListener<T>> listeners;
00064   private final NodeIdentifier nodeIdentifier;
00065   private final MessageFactory messageFactory;
00066 
00067   public DefaultPublisher(NodeIdentifier nodeIdentifier, TopicDeclaration topicDeclaration,
00068       MessageSerializer<T> serializer, MessageFactory messageFactory,
00069       ScheduledExecutorService executorService) {
00070     super(topicDeclaration);
00071     this.nodeIdentifier = nodeIdentifier;
00072     this.messageFactory = messageFactory;
00073     outgoingMessageQueue = new OutgoingMessageQueue<T>(serializer, executorService);
00074     listeners = new ListenerGroup<PublisherListener<T>>(executorService);
00075     listeners.add(new DefaultPublisherListener<T>() {
00076       @Override
00077       public void onMasterRegistrationSuccess(Publisher<T> registrant) {
00078         log.info("Publisher registered: " + DefaultPublisher.this);
00079       }
00080 
00081       @Override
00082       public void onMasterRegistrationFailure(Publisher<T> registrant) {
00083         log.info("Publisher registration failed: " + DefaultPublisher.this);
00084       }
00085 
00086       @Override
00087       public void onMasterUnregistrationSuccess(Publisher<T> registrant) {
00088         log.info("Publisher unregistered: " + DefaultPublisher.this);
00089       }
00090 
00091       @Override
00092       public void onMasterUnregistrationFailure(Publisher<T> registrant) {
00093         log.info("Publisher unregistration failed: " + DefaultPublisher.this);
00094       }
00095     });
00096   }
00097 
00098   @Override
00099   public void setLatchMode(boolean enabled) {
00100     outgoingMessageQueue.setLatchMode(enabled);
00101   }
00102 
00103   @Override
00104   public boolean getLatchMode() {
00105     return outgoingMessageQueue.getLatchMode();
00106   }
00107 
00108   @Override
00109   public void shutdown(long timeout, TimeUnit unit) {
00110     signalOnShutdown(timeout, unit);
00111     outgoingMessageQueue.shutdown();
00112   }
00113 
00114   @Override
00115   public void shutdown() {
00116     shutdown(DEFAULT_SHUTDOWN_TIMEOUT, DEFAULT_SHUTDOWN_TIMEOUT_UNITS);
00117   }
00118 
00119   public PublisherIdentifier getIdentifier() {
00120     return new PublisherIdentifier(nodeIdentifier, getTopicDeclaration().getIdentifier());
00121   }
00122 
00123   public PublisherDeclaration toDeclaration() {
00124     return PublisherDeclaration.newFromNodeIdentifier(nodeIdentifier, getTopicDeclaration());
00125   }
00126 
00127   @Override
00128   public boolean hasSubscribers() {
00129     return outgoingMessageQueue.getNumberOfChannels() > 0;
00130   }
00131 
00132   @Override
00133   public int getNumberOfSubscribers() {
00134     return outgoingMessageQueue.getNumberOfChannels();
00135   }
00136 
00137   @Override
00138   public T newMessage() {
00139     return messageFactory.newFromType(getTopicDeclaration().getMessageType());
00140   }
00141 
00142   @Override
00143   public void publish(T message) {
00144     if (DEBUG) {
00145       log.info(String.format("Publishing message %s on topic %s.", message, getTopicName()));
00146     }
00147     outgoingMessageQueue.add(message);
00148   }
00149 
00157   public ChannelBuffer finishHandshake(ConnectionHeader incomingHeader) {
00158     ConnectionHeader topicDefinitionHeader = getTopicDeclarationHeader();
00159     if (DEBUG) {
00160       log.info("Subscriber handshake header: " + incomingHeader);
00161       log.info("Publisher handshake header: " + topicDefinitionHeader);
00162     }
00163     // TODO(damonkohler): Return errors to the subscriber over the wire.
00164     String incomingType = incomingHeader.getField(ConnectionHeaderFields.TYPE);
00165     String expectedType = topicDefinitionHeader.getField(ConnectionHeaderFields.TYPE);
00166     boolean messageTypeMatches =
00167         incomingType.equals(expectedType)
00168             || incomingType.equals(Subscriber.TOPIC_MESSAGE_TYPE_WILDCARD);
00169     Preconditions.checkState(messageTypeMatches, "Unexpected message type " + incomingType + " != "
00170         + expectedType);
00171     String incomingChecksum = incomingHeader.getField(ConnectionHeaderFields.MD5_CHECKSUM);
00172     String expectedChecksum = topicDefinitionHeader.getField(ConnectionHeaderFields.MD5_CHECKSUM);
00173     boolean checksumMatches =
00174         incomingChecksum.equals(expectedChecksum)
00175             || incomingChecksum.equals(Subscriber.TOPIC_MESSAGE_TYPE_WILDCARD);
00176     Preconditions.checkState(checksumMatches, "Unexpected message MD5 " + incomingChecksum + " != "
00177         + expectedChecksum);
00178     ConnectionHeader outgoingConnectionHeader = toDeclaration().toConnectionHeader();
00179     // TODO(damonkohler): Force latch mode to be consistent throughout the life
00180     // of the publisher.
00181     outgoingConnectionHeader.addField(ConnectionHeaderFields.LATCHING, getLatchMode() ? "1" : "0");
00182     return outgoingConnectionHeader.encode();
00183   }
00184 
00193   public void addSubscriber(SubscriberIdentifier subscriberIdentifer, Channel channel) {
00194     if (DEBUG) {
00195       log.info(String.format("Adding subscriber %s channel %s to publisher %s.",
00196           subscriberIdentifer, channel, this));
00197     }
00198     outgoingMessageQueue.addChannel(channel);
00199     signalOnNewSubscriber(subscriberIdentifer);
00200   }
00201 
00202   @Override
00203   public void addListener(PublisherListener<T> listener) {
00204     listeners.add(listener);
00205   }
00206 
00213   @Override
00214   public void signalOnMasterRegistrationSuccess() {
00215     final Publisher<T> publisher = this;
00216     listeners.signal(new SignalRunnable<PublisherListener<T>>() {
00217       @Override
00218       public void run(PublisherListener<T> listener) {
00219         listener.onMasterRegistrationSuccess(publisher);
00220       }
00221     });
00222   }
00223 
00230   @Override
00231   public void signalOnMasterRegistrationFailure() {
00232     final Publisher<T> publisher = this;
00233     listeners.signal(new SignalRunnable<PublisherListener<T>>() {
00234       @Override
00235       public void run(PublisherListener<T> listener) {
00236         listener.onMasterRegistrationFailure(publisher);
00237       }
00238     });
00239   }
00240 
00247   @Override
00248   public void signalOnMasterUnregistrationSuccess() {
00249     final Publisher<T> publisher = this;
00250     listeners.signal(new SignalRunnable<PublisherListener<T>>() {
00251       @Override
00252       public void run(PublisherListener<T> listener) {
00253         listener.onMasterUnregistrationSuccess(publisher);
00254       }
00255     });
00256   }
00257 
00264   @Override
00265   public void signalOnMasterUnregistrationFailure() {
00266     final Publisher<T> publisher = this;
00267     listeners.signal(new SignalRunnable<PublisherListener<T>>() {
00268       @Override
00269       public void run(PublisherListener<T> listener) {
00270         listener.onMasterUnregistrationFailure(publisher);
00271       }
00272     });
00273   }
00274 
00284   private void signalOnNewSubscriber(final SubscriberIdentifier subscriberIdentifier) {
00285     final Publisher<T> publisher = this;
00286     listeners.signal(new SignalRunnable<PublisherListener<T>>() {
00287       @Override
00288       public void run(PublisherListener<T> listener) {
00289         listener.onNewSubscriber(publisher, subscriberIdentifier);
00290       }
00291     });
00292   }
00293 
00303   private void signalOnShutdown(long timeout, TimeUnit unit) {
00304     final Publisher<T> publisher = this;
00305     try {
00306       listeners.signal(new SignalRunnable<PublisherListener<T>>() {
00307         @Override
00308         public void run(PublisherListener<T> listener) {
00309           listener.onShutdown(publisher);
00310         }
00311       }, timeout, unit);
00312     } catch (InterruptedException e) {
00313       // Ignored since we do not guarantee that all listeners will finish before
00314       // shutdown begins.
00315     }
00316   }
00317 
00318   @Override
00319   public String toString() {
00320     return "Publisher<" + toDeclaration() + ">";
00321   }
00322 }


rosjava_core
Author(s):
autogenerated on Wed Aug 26 2015 16:06:49