Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 package org.ros.internal.transport.tcp;
00018
00019 import com.google.common.base.Preconditions;
00020
00021 import org.jboss.netty.buffer.ChannelBuffer;
00022 import org.jboss.netty.channel.Channel;
00023 import org.jboss.netty.channel.ChannelFuture;
00024 import org.jboss.netty.channel.ChannelHandler;
00025 import org.jboss.netty.channel.ChannelHandlerContext;
00026 import org.jboss.netty.channel.ChannelPipeline;
00027 import org.jboss.netty.channel.MessageEvent;
00028 import org.jboss.netty.channel.SimpleChannelHandler;
00029 import org.ros.exception.RosRuntimeException;
00030 import org.ros.internal.node.server.NodeIdentifier;
00031 import org.ros.internal.node.service.DefaultServiceServer;
00032 import org.ros.internal.node.service.ServiceManager;
00033 import org.ros.internal.node.service.ServiceResponseEncoder;
00034 import org.ros.internal.node.topic.DefaultPublisher;
00035 import org.ros.internal.node.topic.SubscriberIdentifier;
00036 import org.ros.internal.node.topic.TopicIdentifier;
00037 import org.ros.internal.node.topic.TopicParticipantManager;
00038 import org.ros.internal.transport.ConnectionHeader;
00039 import org.ros.internal.transport.ConnectionHeaderFields;
00040 import org.ros.namespace.GraphName;
00041
00048 public class TcpServerHandshakeHandler extends SimpleChannelHandler {
00049
00050 private final TopicParticipantManager topicParticipantManager;
00051 private final ServiceManager serviceManager;
00052
00053 public TcpServerHandshakeHandler(TopicParticipantManager topicParticipantManager,
00054 ServiceManager serviceManager) {
00055 this.topicParticipantManager = topicParticipantManager;
00056 this.serviceManager = serviceManager;
00057 }
00058
00059 @Override
00060 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
00061 ChannelBuffer incomingBuffer = (ChannelBuffer) e.getMessage();
00062 ChannelPipeline pipeline = e.getChannel().getPipeline();
00063 ConnectionHeader incomingHeader = ConnectionHeader.decode(incomingBuffer);
00064 if (incomingHeader.hasField(ConnectionHeaderFields.SERVICE)) {
00065 handleServiceHandshake(e, pipeline, incomingHeader);
00066 } else {
00067 handleSubscriberHandshake(ctx, e, pipeline, incomingHeader);
00068 }
00069 }
00070
00071 private void handleServiceHandshake(MessageEvent e, ChannelPipeline pipeline,
00072 ConnectionHeader incomingHeader) {
00073 GraphName serviceName = GraphName.of(incomingHeader.getField(ConnectionHeaderFields.SERVICE));
00074 Preconditions.checkState(serviceManager.hasServer(serviceName));
00075 DefaultServiceServer<?, ?> serviceServer = serviceManager.getServer(serviceName);
00076 e.getChannel().write(serviceServer.finishHandshake(incomingHeader));
00077 String probe = incomingHeader.getField(ConnectionHeaderFields.PROBE);
00078 if (probe != null && probe.equals("1")) {
00079 e.getChannel().close();
00080 } else {
00081 pipeline.replace(TcpServerPipelineFactory.LENGTH_FIELD_PREPENDER, "ServiceResponseEncoder",
00082 new ServiceResponseEncoder());
00083 pipeline.replace(this, "ServiceRequestHandler", serviceServer.newRequestHandler());
00084 }
00085 }
00086
00087 private void handleSubscriberHandshake(ChannelHandlerContext ctx, MessageEvent e,
00088 ChannelPipeline pipeline, ConnectionHeader incomingConnectionHeader)
00089 throws InterruptedException, Exception {
00090 Preconditions.checkState(incomingConnectionHeader.hasField(ConnectionHeaderFields.TOPIC),
00091 "Handshake header missing field: " + ConnectionHeaderFields.TOPIC);
00092 GraphName topicName =
00093 GraphName.of(incomingConnectionHeader.getField(ConnectionHeaderFields.TOPIC));
00094 Preconditions.checkState(topicParticipantManager.hasPublisher(topicName),
00095 "No publisher for topic: " + topicName);
00096 DefaultPublisher<?> publisher = topicParticipantManager.getPublisher(topicName);
00097 ChannelBuffer outgoingBuffer = publisher.finishHandshake(incomingConnectionHeader);
00098 Channel channel = ctx.getChannel();
00099 ChannelFuture future = channel.write(outgoingBuffer).await();
00100 if (!future.isSuccess()) {
00101 throw new RosRuntimeException(future.getCause());
00102 }
00103 String nodeName = incomingConnectionHeader.getField(ConnectionHeaderFields.CALLER_ID);
00104 publisher.addSubscriber(new SubscriberIdentifier(NodeIdentifier.forName(nodeName),
00105 new TopicIdentifier(topicName)), channel);
00106
00107
00108
00109
00110 pipeline.replace(this, "DiscardHandler", new SimpleChannelHandler());
00111 }
00112 }