Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 package org.ros.internal.node.service;
00018
00019 import com.google.common.base.Preconditions;
00020 import com.google.common.collect.Lists;
00021
00022 import org.jboss.netty.buffer.ChannelBuffer;
00023 import org.ros.exception.RosRuntimeException;
00024 import org.ros.internal.message.MessageBufferPool;
00025 import org.ros.internal.transport.ClientHandshakeListener;
00026 import org.ros.internal.transport.ConnectionHeader;
00027 import org.ros.internal.transport.ConnectionHeaderFields;
00028 import org.ros.internal.transport.tcp.TcpClient;
00029 import org.ros.internal.transport.tcp.TcpClientManager;
00030 import org.ros.message.MessageDeserializer;
00031 import org.ros.message.MessageFactory;
00032 import org.ros.message.MessageSerializer;
00033 import org.ros.namespace.GraphName;
00034 import org.ros.node.service.ServiceClient;
00035 import org.ros.node.service.ServiceResponseListener;
00036
00037 import java.net.InetSocketAddress;
00038 import java.net.URI;
00039 import java.util.Queue;
00040 import java.util.concurrent.CountDownLatch;
00041 import java.util.concurrent.ScheduledExecutorService;
00042 import java.util.concurrent.TimeUnit;
00043
00049 public class DefaultServiceClient<T, S> implements ServiceClient<T, S> {
00050
00051 private final class HandshakeLatch implements ClientHandshakeListener {
00052
00053 private CountDownLatch latch;
00054 private boolean success;
00055 private String errorMessage;
00056
00057 @Override
00058 public void onSuccess(ConnectionHeader outgoingConnectionHeader,
00059 ConnectionHeader incomingConnectionHeader) {
00060 success = true;
00061 latch.countDown();
00062 }
00063
00064 @Override
00065 public void onFailure(ConnectionHeader outgoingConnectionHeader, String errorMessage) {
00066 this.errorMessage = errorMessage;
00067 success = false;
00068 latch.countDown();
00069 }
00070
00071 public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
00072 latch.await(timeout, unit);
00073 return success;
00074 }
00075
00076 public String getErrorMessage() {
00077 return errorMessage;
00078 }
00079
00080 public void reset() {
00081 latch = new CountDownLatch(1);
00082 success = false;
00083 errorMessage = null;
00084 }
00085 }
00086
00087 private final ServiceDeclaration serviceDeclaration;
00088 private final MessageSerializer<T> serializer;
00089 private final MessageFactory messageFactory;
00090 private final MessageBufferPool messageBufferPool;
00091 private final Queue<ServiceResponseListener<S>> responseListeners;
00092 private final ConnectionHeader connectionHeader;
00093 private final TcpClientManager tcpClientManager;
00094 private final HandshakeLatch handshakeLatch;
00095
00096 private TcpClient tcpClient;
00097
00098 public static <S, T> DefaultServiceClient<S, T> newDefault(GraphName nodeName,
00099 ServiceDeclaration serviceDeclaration, MessageSerializer<S> serializer,
00100 MessageDeserializer<T> deserializer, MessageFactory messageFactory,
00101 ScheduledExecutorService executorService) {
00102 return new DefaultServiceClient<S, T>(nodeName, serviceDeclaration, serializer, deserializer,
00103 messageFactory, executorService);
00104 }
00105
00106 private DefaultServiceClient(GraphName nodeName, ServiceDeclaration serviceDeclaration,
00107 MessageSerializer<T> serializer, MessageDeserializer<S> deserializer,
00108 MessageFactory messageFactory, ScheduledExecutorService executorService) {
00109 this.serviceDeclaration = serviceDeclaration;
00110 this.serializer = serializer;
00111 this.messageFactory = messageFactory;
00112 messageBufferPool = new MessageBufferPool();
00113 responseListeners = Lists.newLinkedList();
00114 connectionHeader = new ConnectionHeader();
00115 connectionHeader.addField(ConnectionHeaderFields.CALLER_ID, nodeName.toString());
00116
00117 connectionHeader.addField(ConnectionHeaderFields.PERSISTENT, "1");
00118 connectionHeader.merge(serviceDeclaration.toConnectionHeader());
00119 tcpClientManager = new TcpClientManager(executorService);
00120 ServiceClientHandshakeHandler<T, S> serviceClientHandshakeHandler =
00121 new ServiceClientHandshakeHandler<T, S>(connectionHeader, responseListeners, deserializer,
00122 executorService);
00123 handshakeLatch = new HandshakeLatch();
00124 serviceClientHandshakeHandler.addListener(handshakeLatch);
00125 tcpClientManager.addNamedChannelHandler(serviceClientHandshakeHandler);
00126 }
00127
00128 @Override
00129 public void connect(URI uri) {
00130 Preconditions.checkNotNull(uri, "URI must be specified.");
00131 Preconditions.checkArgument(uri.getScheme().equals("rosrpc"), "Invalid service URI.");
00132 Preconditions.checkState(tcpClient == null, "Already connected once.");
00133 InetSocketAddress address = new InetSocketAddress(uri.getHost(), uri.getPort());
00134 handshakeLatch.reset();
00135 tcpClient = tcpClientManager.connect(toString(), address);
00136 try {
00137 if (!handshakeLatch.await(1, TimeUnit.SECONDS)) {
00138 throw new RosRuntimeException(handshakeLatch.getErrorMessage());
00139 }
00140 } catch (InterruptedException e) {
00141 throw new RosRuntimeException("Handshake timed out.");
00142 }
00143 }
00144
00145 @Override
00146 public void shutdown() {
00147 Preconditions.checkNotNull(tcpClient, "Not connected.");
00148 tcpClientManager.shutdown();
00149 }
00150
00151 @Override
00152 public void call(T request, ServiceResponseListener<S> listener) {
00153 ChannelBuffer buffer = messageBufferPool.acquire();
00154 serializer.serialize(request, buffer);
00155 responseListeners.add(listener);
00156 tcpClient.write(buffer).awaitUninterruptibly();
00157 messageBufferPool.release(buffer);
00158 }
00159
00160 @Override
00161 public GraphName getName() {
00162 return serviceDeclaration.getName();
00163 }
00164
00165 @Override
00166 public String toString() {
00167 return "ServiceClient<" + serviceDeclaration + ">";
00168 }
00169
00170 @Override
00171 public T newMessage() {
00172 return messageFactory.newFromType(serviceDeclaration.getType());
00173 }
00174 }