TcpServerHandshakeHandler.java
Go to the documentation of this file.
00001 /*
00002  * Copyright (C) 2011 Google Inc.
00003  * 
00004  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
00005  * use this file except in compliance with the License. You may obtain a copy of
00006  * the License at
00007  * 
00008  * http://www.apache.org/licenses/LICENSE-2.0
00009  * 
00010  * Unless required by applicable law or agreed to in writing, software
00011  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
00012  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
00013  * License for the specific language governing permissions and limitations under
00014  * the License.
00015  */
00016 
00017 package org.ros.internal.transport.tcp;
00018 
00019 import com.google.common.base.Preconditions;
00020 
00021 import org.jboss.netty.buffer.ChannelBuffer;
00022 import org.jboss.netty.channel.Channel;
00023 import org.jboss.netty.channel.ChannelFuture;
00024 import org.jboss.netty.channel.ChannelHandler;
00025 import org.jboss.netty.channel.ChannelHandlerContext;
00026 import org.jboss.netty.channel.ChannelPipeline;
00027 import org.jboss.netty.channel.MessageEvent;
00028 import org.jboss.netty.channel.SimpleChannelHandler;
00029 import org.ros.exception.RosRuntimeException;
00030 import org.ros.internal.node.server.NodeIdentifier;
00031 import org.ros.internal.node.service.DefaultServiceServer;
00032 import org.ros.internal.node.service.ServiceManager;
00033 import org.ros.internal.node.service.ServiceResponseEncoder;
00034 import org.ros.internal.node.topic.DefaultPublisher;
00035 import org.ros.internal.node.topic.SubscriberIdentifier;
00036 import org.ros.internal.node.topic.TopicIdentifier;
00037 import org.ros.internal.node.topic.TopicParticipantManager;
00038 import org.ros.internal.transport.ConnectionHeader;
00039 import org.ros.internal.transport.ConnectionHeaderFields;
00040 import org.ros.namespace.GraphName;
00041 
00048 public class TcpServerHandshakeHandler extends SimpleChannelHandler {
00049 
00050   private final TopicParticipantManager topicParticipantManager;
00051   private final ServiceManager serviceManager;
00052 
00053   public TcpServerHandshakeHandler(TopicParticipantManager topicParticipantManager,
00054       ServiceManager serviceManager) {
00055     this.topicParticipantManager = topicParticipantManager;
00056     this.serviceManager = serviceManager;
00057   }
00058 
00059   @Override
00060   public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
00061     ChannelBuffer incomingBuffer = (ChannelBuffer) e.getMessage();
00062     ChannelPipeline pipeline = e.getChannel().getPipeline();
00063     ConnectionHeader incomingHeader = ConnectionHeader.decode(incomingBuffer);
00064     if (incomingHeader.hasField(ConnectionHeaderFields.SERVICE)) {
00065       handleServiceHandshake(e, pipeline, incomingHeader);
00066     } else {
00067       handleSubscriberHandshake(ctx, e, pipeline, incomingHeader);
00068     }
00069   }
00070 
00071   private void handleServiceHandshake(MessageEvent e, ChannelPipeline pipeline,
00072       ConnectionHeader incomingHeader) {
00073     GraphName serviceName = GraphName.of(incomingHeader.getField(ConnectionHeaderFields.SERVICE));
00074     Preconditions.checkState(serviceManager.hasServer(serviceName));
00075     DefaultServiceServer<?, ?> serviceServer = serviceManager.getServer(serviceName);
00076     e.getChannel().write(serviceServer.finishHandshake(incomingHeader));
00077     String probe = incomingHeader.getField(ConnectionHeaderFields.PROBE);
00078     if (probe != null && probe.equals("1")) {
00079       e.getChannel().close();
00080     } else {
00081       pipeline.replace(TcpServerPipelineFactory.LENGTH_FIELD_PREPENDER, "ServiceResponseEncoder",
00082           new ServiceResponseEncoder());
00083       pipeline.replace(this, "ServiceRequestHandler", serviceServer.newRequestHandler());
00084     }
00085   }
00086 
00087   private void handleSubscriberHandshake(ChannelHandlerContext ctx, MessageEvent e,
00088       ChannelPipeline pipeline, ConnectionHeader incomingConnectionHeader)
00089       throws InterruptedException, Exception {
00090     Preconditions.checkState(incomingConnectionHeader.hasField(ConnectionHeaderFields.TOPIC),
00091         "Handshake header missing field: " + ConnectionHeaderFields.TOPIC);
00092     GraphName topicName =
00093         GraphName.of(incomingConnectionHeader.getField(ConnectionHeaderFields.TOPIC));
00094     Preconditions.checkState(topicParticipantManager.hasPublisher(topicName),
00095         "No publisher for topic: " + topicName);
00096     DefaultPublisher<?> publisher = topicParticipantManager.getPublisher(topicName);
00097     ChannelBuffer outgoingBuffer = publisher.finishHandshake(incomingConnectionHeader);
00098     Channel channel = ctx.getChannel();
00099     ChannelFuture future = channel.write(outgoingBuffer).await();
00100     if (!future.isSuccess()) {
00101       throw new RosRuntimeException(future.getCause());
00102     }
00103     String nodeName = incomingConnectionHeader.getField(ConnectionHeaderFields.CALLER_ID);
00104     publisher.addSubscriber(new SubscriberIdentifier(NodeIdentifier.forName(nodeName),
00105         new TopicIdentifier(topicName)), channel);
00106 
00107     // Once the handshake is complete, there will be nothing incoming on the
00108     // channel. So, we replace the handshake handler with a handler which will
00109     // drop everything.
00110     pipeline.replace(this, "DiscardHandler", new SimpleChannelHandler());
00111   }
00112 }


rosjava_core
Author(s):
autogenerated on Wed Aug 26 2015 16:06:49