DefaultServiceClient.java
Go to the documentation of this file.
00001 /*
00002  * Copyright (C) 2011 Google Inc.
00003  * 
00004  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
00005  * use this file except in compliance with the License. You may obtain a copy of
00006  * the License at
00007  * 
00008  * http://www.apache.org/licenses/LICENSE-2.0
00009  * 
00010  * Unless required by applicable law or agreed to in writing, software
00011  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
00012  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
00013  * License for the specific language governing permissions and limitations under
00014  * the License.
00015  */
00016 
00017 package org.ros.internal.node.service;
00018 
00019 import com.google.common.base.Preconditions;
00020 import com.google.common.collect.Lists;
00021 
00022 import org.jboss.netty.buffer.ChannelBuffer;
00023 import org.ros.exception.RosRuntimeException;
00024 import org.ros.internal.message.MessageBufferPool;
00025 import org.ros.internal.transport.ClientHandshakeListener;
00026 import org.ros.internal.transport.ConnectionHeader;
00027 import org.ros.internal.transport.ConnectionHeaderFields;
00028 import org.ros.internal.transport.tcp.TcpClient;
00029 import org.ros.internal.transport.tcp.TcpClientManager;
00030 import org.ros.message.MessageDeserializer;
00031 import org.ros.message.MessageFactory;
00032 import org.ros.message.MessageSerializer;
00033 import org.ros.namespace.GraphName;
00034 import org.ros.node.service.ServiceClient;
00035 import org.ros.node.service.ServiceResponseListener;
00036 
00037 import java.net.InetSocketAddress;
00038 import java.net.URI;
00039 import java.util.Queue;
00040 import java.util.concurrent.CountDownLatch;
00041 import java.util.concurrent.ScheduledExecutorService;
00042 import java.util.concurrent.TimeUnit;
00043 
00049 public class DefaultServiceClient<T, S> implements ServiceClient<T, S> {
00050 
00051   private final class HandshakeLatch implements ClientHandshakeListener {
00052 
00053     private CountDownLatch latch;
00054     private boolean success;
00055     private String errorMessage;
00056 
00057     @Override
00058     public void onSuccess(ConnectionHeader outgoingConnectionHeader,
00059         ConnectionHeader incomingConnectionHeader) {
00060       success = true;
00061       latch.countDown();
00062     }
00063 
00064     @Override
00065     public void onFailure(ConnectionHeader outgoingConnectionHeader, String errorMessage) {
00066       this.errorMessage = errorMessage;
00067       success = false;
00068       latch.countDown();
00069     }
00070 
00071     public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
00072       latch.await(timeout, unit);
00073       return success;
00074     }
00075 
00076     public String getErrorMessage() {
00077       return errorMessage;
00078     }
00079 
00080     public void reset() {
00081       latch = new CountDownLatch(1);
00082       success = false;
00083       errorMessage = null;
00084     }
00085   }
00086 
00087   private final ServiceDeclaration serviceDeclaration;
00088   private final MessageSerializer<T> serializer;
00089   private final MessageFactory messageFactory;
00090   private final MessageBufferPool messageBufferPool;
00091   private final Queue<ServiceResponseListener<S>> responseListeners;
00092   private final ConnectionHeader connectionHeader;
00093   private final TcpClientManager tcpClientManager;
00094   private final HandshakeLatch handshakeLatch;
00095 
00096   private TcpClient tcpClient;
00097 
00098   public static <S, T> DefaultServiceClient<S, T> newDefault(GraphName nodeName,
00099       ServiceDeclaration serviceDeclaration, MessageSerializer<S> serializer,
00100       MessageDeserializer<T> deserializer, MessageFactory messageFactory,
00101       ScheduledExecutorService executorService) {
00102     return new DefaultServiceClient<S, T>(nodeName, serviceDeclaration, serializer, deserializer,
00103         messageFactory, executorService);
00104   }
00105 
00106   private DefaultServiceClient(GraphName nodeName, ServiceDeclaration serviceDeclaration,
00107       MessageSerializer<T> serializer, MessageDeserializer<S> deserializer,
00108       MessageFactory messageFactory, ScheduledExecutorService executorService) {
00109     this.serviceDeclaration = serviceDeclaration;
00110     this.serializer = serializer;
00111     this.messageFactory = messageFactory;
00112     messageBufferPool = new MessageBufferPool();
00113     responseListeners = Lists.newLinkedList();
00114     connectionHeader = new ConnectionHeader();
00115     connectionHeader.addField(ConnectionHeaderFields.CALLER_ID, nodeName.toString());
00116     // TODO(damonkohler): Support non-persistent connections.
00117     connectionHeader.addField(ConnectionHeaderFields.PERSISTENT, "1");
00118     connectionHeader.merge(serviceDeclaration.toConnectionHeader());
00119     tcpClientManager = new TcpClientManager(executorService);
00120     ServiceClientHandshakeHandler<T, S> serviceClientHandshakeHandler =
00121         new ServiceClientHandshakeHandler<T, S>(connectionHeader, responseListeners, deserializer,
00122             executorService);
00123     handshakeLatch = new HandshakeLatch();
00124     serviceClientHandshakeHandler.addListener(handshakeLatch);
00125     tcpClientManager.addNamedChannelHandler(serviceClientHandshakeHandler);
00126   }
00127 
00128   @Override
00129   public void connect(URI uri) {
00130     Preconditions.checkNotNull(uri, "URI must be specified.");
00131     Preconditions.checkArgument(uri.getScheme().equals("rosrpc"), "Invalid service URI.");
00132     Preconditions.checkState(tcpClient == null, "Already connected once.");
00133     InetSocketAddress address = new InetSocketAddress(uri.getHost(), uri.getPort());
00134     handshakeLatch.reset();
00135     tcpClient = tcpClientManager.connect(toString(), address);
00136     try {
00137       if (!handshakeLatch.await(1, TimeUnit.SECONDS)) {
00138         throw new RosRuntimeException(handshakeLatch.getErrorMessage());
00139       }
00140     } catch (InterruptedException e) {
00141       throw new RosRuntimeException("Handshake timed out.");
00142     }
00143   }
00144 
00145   @Override
00146   public void shutdown() {
00147     Preconditions.checkNotNull(tcpClient, "Not connected.");
00148     tcpClientManager.shutdown();
00149   }
00150 
00151   @Override
00152   public void call(T request, ServiceResponseListener<S> listener) {
00153     ChannelBuffer buffer = messageBufferPool.acquire();
00154     serializer.serialize(request, buffer);
00155     responseListeners.add(listener);
00156     tcpClient.write(buffer).awaitUninterruptibly();
00157     messageBufferPool.release(buffer);
00158   }
00159 
00160   @Override
00161   public GraphName getName() {
00162     return serviceDeclaration.getName();
00163   }
00164 
00165   @Override
00166   public String toString() {
00167     return "ServiceClient<" + serviceDeclaration + ">";
00168   }
00169 
00170   @Override
00171   public T newMessage() {
00172     return messageFactory.newFromType(serviceDeclaration.getType());
00173   }
00174 }


rosjava_core
Author(s):
autogenerated on Wed Aug 26 2015 16:06:49