Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 package org.ros.internal.node.topic;
00018
00019 import com.google.common.base.Preconditions;
00020
00021 import org.apache.commons.logging.Log;
00022 import org.apache.commons.logging.LogFactory;
00023 import org.jboss.netty.buffer.ChannelBuffer;
00024 import org.jboss.netty.channel.Channel;
00025 import org.ros.concurrent.ListenerGroup;
00026 import org.ros.concurrent.SignalRunnable;
00027 import org.ros.internal.node.server.NodeIdentifier;
00028 import org.ros.internal.transport.ConnectionHeader;
00029 import org.ros.internal.transport.ConnectionHeaderFields;
00030 import org.ros.internal.transport.queue.OutgoingMessageQueue;
00031 import org.ros.message.MessageFactory;
00032 import org.ros.message.MessageSerializer;
00033 import org.ros.node.topic.DefaultPublisherListener;
00034 import org.ros.node.topic.Publisher;
00035 import org.ros.node.topic.PublisherListener;
00036 import org.ros.node.topic.Subscriber;
00037
00038 import java.util.concurrent.ScheduledExecutorService;
00039 import java.util.concurrent.TimeUnit;
00040
00046 public class DefaultPublisher<T> extends DefaultTopicParticipant implements Publisher<T> {
00047
00048 private static final boolean DEBUG = false;
00049 private static final Log log = LogFactory.getLog(DefaultPublisher.class);
00050
00056 private static final long DEFAULT_SHUTDOWN_TIMEOUT = 5;
00057 private static final TimeUnit DEFAULT_SHUTDOWN_TIMEOUT_UNITS = TimeUnit.SECONDS;
00058
00062 private final OutgoingMessageQueue<T> outgoingMessageQueue;
00063 private final ListenerGroup<PublisherListener<T>> listeners;
00064 private final NodeIdentifier nodeIdentifier;
00065 private final MessageFactory messageFactory;
00066
00067 public DefaultPublisher(NodeIdentifier nodeIdentifier, TopicDeclaration topicDeclaration,
00068 MessageSerializer<T> serializer, MessageFactory messageFactory,
00069 ScheduledExecutorService executorService) {
00070 super(topicDeclaration);
00071 this.nodeIdentifier = nodeIdentifier;
00072 this.messageFactory = messageFactory;
00073 outgoingMessageQueue = new OutgoingMessageQueue<T>(serializer, executorService);
00074 listeners = new ListenerGroup<PublisherListener<T>>(executorService);
00075 listeners.add(new DefaultPublisherListener<T>() {
00076 @Override
00077 public void onMasterRegistrationSuccess(Publisher<T> registrant) {
00078 log.info("Publisher registered: " + DefaultPublisher.this);
00079 }
00080
00081 @Override
00082 public void onMasterRegistrationFailure(Publisher<T> registrant) {
00083 log.info("Publisher registration failed: " + DefaultPublisher.this);
00084 }
00085
00086 @Override
00087 public void onMasterUnregistrationSuccess(Publisher<T> registrant) {
00088 log.info("Publisher unregistered: " + DefaultPublisher.this);
00089 }
00090
00091 @Override
00092 public void onMasterUnregistrationFailure(Publisher<T> registrant) {
00093 log.info("Publisher unregistration failed: " + DefaultPublisher.this);
00094 }
00095 });
00096 }
00097
00098 @Override
00099 public void setLatchMode(boolean enabled) {
00100 outgoingMessageQueue.setLatchMode(enabled);
00101 }
00102
00103 @Override
00104 public boolean getLatchMode() {
00105 return outgoingMessageQueue.getLatchMode();
00106 }
00107
00108 @Override
00109 public void shutdown(long timeout, TimeUnit unit) {
00110 signalOnShutdown(timeout, unit);
00111 outgoingMessageQueue.shutdown();
00112 }
00113
00114 @Override
00115 public void shutdown() {
00116 shutdown(DEFAULT_SHUTDOWN_TIMEOUT, DEFAULT_SHUTDOWN_TIMEOUT_UNITS);
00117 }
00118
00119 public PublisherIdentifier getIdentifier() {
00120 return new PublisherIdentifier(nodeIdentifier, getTopicDeclaration().getIdentifier());
00121 }
00122
00123 public PublisherDeclaration toDeclaration() {
00124 return PublisherDeclaration.newFromNodeIdentifier(nodeIdentifier, getTopicDeclaration());
00125 }
00126
00127 @Override
00128 public boolean hasSubscribers() {
00129 return outgoingMessageQueue.getNumberOfChannels() > 0;
00130 }
00131
00132 @Override
00133 public int getNumberOfSubscribers() {
00134 return outgoingMessageQueue.getNumberOfChannels();
00135 }
00136
00137 @Override
00138 public T newMessage() {
00139 return messageFactory.newFromType(getTopicDeclaration().getMessageType());
00140 }
00141
00142 @Override
00143 public void publish(T message) {
00144 if (DEBUG) {
00145 log.info(String.format("Publishing message %s on topic %s.", message, getTopicName()));
00146 }
00147 outgoingMessageQueue.add(message);
00148 }
00149
00157 public ChannelBuffer finishHandshake(ConnectionHeader incomingHeader) {
00158 ConnectionHeader topicDefinitionHeader = getTopicDeclarationHeader();
00159 if (DEBUG) {
00160 log.info("Subscriber handshake header: " + incomingHeader);
00161 log.info("Publisher handshake header: " + topicDefinitionHeader);
00162 }
00163
00164 String incomingType = incomingHeader.getField(ConnectionHeaderFields.TYPE);
00165 String expectedType = topicDefinitionHeader.getField(ConnectionHeaderFields.TYPE);
00166 boolean messageTypeMatches =
00167 incomingType.equals(expectedType)
00168 || incomingType.equals(Subscriber.TOPIC_MESSAGE_TYPE_WILDCARD);
00169 Preconditions.checkState(messageTypeMatches, "Unexpected message type " + incomingType + " != "
00170 + expectedType);
00171 String incomingChecksum = incomingHeader.getField(ConnectionHeaderFields.MD5_CHECKSUM);
00172 String expectedChecksum = topicDefinitionHeader.getField(ConnectionHeaderFields.MD5_CHECKSUM);
00173 boolean checksumMatches =
00174 incomingChecksum.equals(expectedChecksum)
00175 || incomingChecksum.equals(Subscriber.TOPIC_MESSAGE_TYPE_WILDCARD);
00176 Preconditions.checkState(checksumMatches, "Unexpected message MD5 " + incomingChecksum + " != "
00177 + expectedChecksum);
00178 ConnectionHeader outgoingConnectionHeader = toDeclaration().toConnectionHeader();
00179
00180
00181 outgoingConnectionHeader.addField(ConnectionHeaderFields.LATCHING, getLatchMode() ? "1" : "0");
00182 return outgoingConnectionHeader.encode();
00183 }
00184
00193 public void addSubscriber(SubscriberIdentifier subscriberIdentifer, Channel channel) {
00194 if (DEBUG) {
00195 log.info(String.format("Adding subscriber %s channel %s to publisher %s.",
00196 subscriberIdentifer, channel, this));
00197 }
00198 outgoingMessageQueue.addChannel(channel);
00199 signalOnNewSubscriber(subscriberIdentifer);
00200 }
00201
00202 @Override
00203 public void addListener(PublisherListener<T> listener) {
00204 listeners.add(listener);
00205 }
00206
00213 @Override
00214 public void signalOnMasterRegistrationSuccess() {
00215 final Publisher<T> publisher = this;
00216 listeners.signal(new SignalRunnable<PublisherListener<T>>() {
00217 @Override
00218 public void run(PublisherListener<T> listener) {
00219 listener.onMasterRegistrationSuccess(publisher);
00220 }
00221 });
00222 }
00223
00230 @Override
00231 public void signalOnMasterRegistrationFailure() {
00232 final Publisher<T> publisher = this;
00233 listeners.signal(new SignalRunnable<PublisherListener<T>>() {
00234 @Override
00235 public void run(PublisherListener<T> listener) {
00236 listener.onMasterRegistrationFailure(publisher);
00237 }
00238 });
00239 }
00240
00247 @Override
00248 public void signalOnMasterUnregistrationSuccess() {
00249 final Publisher<T> publisher = this;
00250 listeners.signal(new SignalRunnable<PublisherListener<T>>() {
00251 @Override
00252 public void run(PublisherListener<T> listener) {
00253 listener.onMasterUnregistrationSuccess(publisher);
00254 }
00255 });
00256 }
00257
00264 @Override
00265 public void signalOnMasterUnregistrationFailure() {
00266 final Publisher<T> publisher = this;
00267 listeners.signal(new SignalRunnable<PublisherListener<T>>() {
00268 @Override
00269 public void run(PublisherListener<T> listener) {
00270 listener.onMasterUnregistrationFailure(publisher);
00271 }
00272 });
00273 }
00274
00284 private void signalOnNewSubscriber(final SubscriberIdentifier subscriberIdentifier) {
00285 final Publisher<T> publisher = this;
00286 listeners.signal(new SignalRunnable<PublisherListener<T>>() {
00287 @Override
00288 public void run(PublisherListener<T> listener) {
00289 listener.onNewSubscriber(publisher, subscriberIdentifier);
00290 }
00291 });
00292 }
00293
00303 private void signalOnShutdown(long timeout, TimeUnit unit) {
00304 final Publisher<T> publisher = this;
00305 try {
00306 listeners.signal(new SignalRunnable<PublisherListener<T>>() {
00307 @Override
00308 public void run(PublisherListener<T> listener) {
00309 listener.onShutdown(publisher);
00310 }
00311 }, timeout, unit);
00312 } catch (InterruptedException e) {
00313
00314
00315 }
00316 }
00317
00318 @Override
00319 public String toString() {
00320 return "Publisher<" + toDeclaration() + ">";
00321 }
00322 }