Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 package org.ros.internal.node.service;
00018
00019 import com.google.common.base.Preconditions;
00020
00021 import org.apache.commons.logging.Log;
00022 import org.apache.commons.logging.LogFactory;
00023 import org.jboss.netty.buffer.ChannelBuffer;
00024 import org.jboss.netty.channel.ChannelHandler;
00025 import org.ros.address.AdvertiseAddress;
00026 import org.ros.concurrent.ListenerGroup;
00027 import org.ros.concurrent.SignalRunnable;
00028 import org.ros.internal.message.service.ServiceDescription;
00029 import org.ros.internal.node.topic.DefaultPublisher;
00030 import org.ros.internal.transport.ConnectionHeader;
00031 import org.ros.internal.transport.ConnectionHeaderFields;
00032 import org.ros.message.MessageDeserializer;
00033 import org.ros.message.MessageFactory;
00034 import org.ros.message.MessageSerializer;
00035 import org.ros.namespace.GraphName;
00036 import org.ros.node.service.DefaultServiceServerListener;
00037 import org.ros.node.service.ServiceResponseBuilder;
00038 import org.ros.node.service.ServiceServer;
00039 import org.ros.node.service.ServiceServerListener;
00040
00041 import java.net.URI;
00042 import java.util.concurrent.ScheduledExecutorService;
00043
00049 public class DefaultServiceServer<T, S> implements ServiceServer<T, S> {
00050
00051 private static final boolean DEBUG = false;
00052 private static final Log log = LogFactory.getLog(DefaultPublisher.class);
00053
00054 private final ServiceDeclaration serviceDeclaration;
00055 private final ServiceResponseBuilder<T, S> serviceResponseBuilder;
00056 private final AdvertiseAddress advertiseAddress;
00057 private final MessageDeserializer<T> messageDeserializer;
00058 private final MessageSerializer<S> messageSerializer;
00059 private final MessageFactory messageFactory;
00060 private final ScheduledExecutorService scheduledExecutorService;
00061 private final ListenerGroup<ServiceServerListener<T, S>> listenerGroup;
00062
00063 public DefaultServiceServer(ServiceDeclaration serviceDeclaration,
00064 ServiceResponseBuilder<T, S> serviceResponseBuilder, AdvertiseAddress advertiseAddress,
00065 MessageDeserializer<T> messageDeserializer, MessageSerializer<S> messageSerializer,
00066 MessageFactory messageFactory, ScheduledExecutorService scheduledExecutorService) {
00067 this.serviceDeclaration = serviceDeclaration;
00068 this.serviceResponseBuilder = serviceResponseBuilder;
00069 this.advertiseAddress = advertiseAddress;
00070 this.messageDeserializer = messageDeserializer;
00071 this.messageSerializer = messageSerializer;
00072 this.messageFactory = messageFactory;
00073 this.scheduledExecutorService = scheduledExecutorService;
00074 listenerGroup = new ListenerGroup<ServiceServerListener<T, S>>(scheduledExecutorService);
00075 listenerGroup.add(new DefaultServiceServerListener<T, S>() {
00076 @Override
00077 public void onMasterRegistrationSuccess(ServiceServer<T, S> registrant) {
00078 log.info("Service registered: " + DefaultServiceServer.this);
00079 }
00080
00081 @Override
00082 public void onMasterRegistrationFailure(ServiceServer<T, S> registrant) {
00083 log.info("Service registration failed: " + DefaultServiceServer.this);
00084 }
00085
00086 @Override
00087 public void onMasterUnregistrationSuccess(ServiceServer<T, S> registrant) {
00088 log.info("Service unregistered: " + DefaultServiceServer.this);
00089 }
00090
00091 @Override
00092 public void onMasterUnregistrationFailure(ServiceServer<T, S> registrant) {
00093 log.info("Service unregistration failed: " + DefaultServiceServer.this);
00094 }
00095 });
00096 }
00097
00098 public ChannelBuffer finishHandshake(ConnectionHeader incomingConnectionHeader) {
00099 if (DEBUG) {
00100 log.info("Client handshake header: " + incomingConnectionHeader);
00101 }
00102 ConnectionHeader connectionHeader = toDeclaration().toConnectionHeader();
00103 String expectedChecksum = connectionHeader.getField(ConnectionHeaderFields.MD5_CHECKSUM);
00104 String incomingChecksum =
00105 incomingConnectionHeader.getField(ConnectionHeaderFields.MD5_CHECKSUM);
00106
00107 Preconditions.checkState(incomingChecksum.equals(expectedChecksum)
00108 || incomingChecksum.equals("*"));
00109 if (DEBUG) {
00110 log.info("Server handshake header: " + connectionHeader);
00111 }
00112 return connectionHeader.encode();
00113 }
00114
00115 @Override
00116 public URI getUri() {
00117 return advertiseAddress.toUri("rosrpc");
00118 }
00119
00120 @Override
00121 public GraphName getName() {
00122 return serviceDeclaration.getName();
00123 }
00124
00129 ServiceDeclaration toDeclaration() {
00130 ServiceIdentifier identifier = new ServiceIdentifier(serviceDeclaration.getName(), getUri());
00131 return new ServiceDeclaration(identifier, new ServiceDescription(serviceDeclaration.getType(),
00132 serviceDeclaration.getDefinition(), serviceDeclaration.getMd5Checksum()));
00133 }
00134
00135 public ChannelHandler newRequestHandler() {
00136 return new ServiceRequestHandler<T, S>(serviceDeclaration, serviceResponseBuilder,
00137 messageDeserializer, messageSerializer, messageFactory, scheduledExecutorService);
00138 }
00139
00147 public void signalOnMasterRegistrationSuccess() {
00148 final ServiceServer<T, S> serviceServer = this;
00149 listenerGroup.signal(new SignalRunnable<ServiceServerListener<T, S>>() {
00150 @Override
00151 public void run(ServiceServerListener<T, S> listener) {
00152 listener.onMasterRegistrationSuccess(serviceServer);
00153 }
00154 });
00155 }
00156
00164 public void signalOnMasterRegistrationFailure() {
00165 final ServiceServer<T, S> serviceServer = this;
00166 listenerGroup.signal(new SignalRunnable<ServiceServerListener<T, S>>() {
00167 @Override
00168 public void run(ServiceServerListener<T, S> listener) {
00169 listener.onMasterRegistrationFailure(serviceServer);
00170 }
00171 });
00172 }
00173
00181 public void signalOnMasterUnregistrationSuccess() {
00182 final ServiceServer<T, S> serviceServer = this;
00183 listenerGroup.signal(new SignalRunnable<ServiceServerListener<T, S>>() {
00184 @Override
00185 public void run(ServiceServerListener<T, S> listener) {
00186 listener.onMasterUnregistrationSuccess(serviceServer);
00187 }
00188 });
00189 }
00190
00198 public void signalOnMasterUnregistrationFailure() {
00199 final ServiceServer<T, S> serviceServer = this;
00200 listenerGroup.signal(new SignalRunnable<ServiceServerListener<T, S>>() {
00201 @Override
00202 public void run(ServiceServerListener<T, S> listener) {
00203 listener.onMasterUnregistrationFailure(serviceServer);
00204 }
00205 });
00206 }
00207
00208 @Override
00209 public void shutdown() {
00210 throw new UnsupportedOperationException();
00211 }
00212
00213 @Override
00214 public void addListener(ServiceServerListener<T, S> listener) {
00215 listenerGroup.add(listener);
00216 }
00217
00218 @Override
00219 public String toString() {
00220 return "ServiceServer<" + toDeclaration() + ">";
00221 }
00222 }