Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 package org.ros.internal.transport;
00018
00019 import org.jboss.netty.buffer.ChannelBuffer;
00020 import org.jboss.netty.channel.ChannelHandlerContext;
00021 import org.jboss.netty.channel.ChannelStateEvent;
00022 import org.jboss.netty.channel.MessageEvent;
00023 import org.ros.concurrent.ListenerGroup;
00024 import org.ros.concurrent.SignalRunnable;
00025 import org.ros.internal.transport.tcp.AbstractNamedChannelHandler;
00026
00027 import java.util.concurrent.ExecutorService;
00028
00034 public abstract class BaseClientHandshakeHandler extends AbstractNamedChannelHandler {
00035
00036 private final ClientHandshake clientHandshake;
00037 private final ListenerGroup<ClientHandshakeListener> clientHandshakeListeners;
00038
00039 public BaseClientHandshakeHandler(ClientHandshake clientHandshake, ExecutorService executorService) {
00040 this.clientHandshake = clientHandshake;
00041 clientHandshakeListeners = new ListenerGroup<ClientHandshakeListener>(executorService);
00042 }
00043
00044 public void addListener(ClientHandshakeListener clientHandshakeListener) {
00045 clientHandshakeListeners.add(clientHandshakeListener);
00046 }
00047
00048 @Override
00049 public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
00050 super.channelConnected(ctx, e);
00051 e.getChannel().write(clientHandshake.getOutgoingConnectionHeader().encode());
00052 }
00053
00054 @Override
00055 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
00056 ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
00057 ConnectionHeader connectionHeader = ConnectionHeader.decode(buffer);
00058 if (clientHandshake.handshake(connectionHeader)) {
00059 onSuccess(connectionHeader, ctx, e);
00060 signalOnSuccess(connectionHeader);
00061 } else {
00062 onFailure(clientHandshake.getErrorMessage(), ctx, e);
00063 signalOnFailure(clientHandshake.getErrorMessage());
00064 }
00065 }
00066
00078 protected abstract void onSuccess(ConnectionHeader incommingConnectionHeader,
00079 ChannelHandlerContext ctx, MessageEvent e);
00080
00081 private void signalOnSuccess(final ConnectionHeader incommingConnectionHeader) {
00082 clientHandshakeListeners.signal(new SignalRunnable<ClientHandshakeListener>() {
00083 @Override
00084 public void run(ClientHandshakeListener listener) {
00085 listener.onSuccess(clientHandshake.getOutgoingConnectionHeader(), incommingConnectionHeader);
00086 }
00087 });
00088 }
00089
00090 protected abstract void onFailure(String errorMessage, ChannelHandlerContext ctx, MessageEvent e);
00091
00092 private void signalOnFailure(final String errorMessage) {
00093 clientHandshakeListeners.signal(new SignalRunnable<ClientHandshakeListener>() {
00094 @Override
00095 public void run(ClientHandshakeListener listener) {
00096 listener.onFailure(clientHandshake.getOutgoingConnectionHeader(), errorMessage);
00097 }
00098 });
00099 }
00100 }