DefaultNode.java
Go to the documentation of this file.
00001 /*
00002  * Copyright (C) 2011 Google Inc.
00003  * 
00004  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
00005  * use this file except in compliance with the License. You may obtain a copy of
00006  * the License at
00007  * 
00008  * http://www.apache.org/licenses/LICENSE-2.0
00009  * 
00010  * Unless required by applicable law or agreed to in writing, software
00011  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
00012  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
00013  * License for the specific language governing permissions and limitations under
00014  * the License.
00015  */
00016 
00017 package org.ros.internal.node;
00018 
00019 import com.google.common.annotations.VisibleForTesting;
00020 
00021 import org.apache.commons.logging.Log;
00022 import org.ros.Parameters;
00023 import org.ros.concurrent.CancellableLoop;
00024 import org.ros.concurrent.ListenerGroup;
00025 import org.ros.concurrent.SignalRunnable;
00026 import org.ros.exception.RemoteException;
00027 import org.ros.exception.ServiceNotFoundException;
00028 import org.ros.internal.message.service.ServiceDescription;
00029 import org.ros.internal.message.topic.TopicDescription;
00030 import org.ros.internal.node.client.MasterClient;
00031 import org.ros.internal.node.client.Registrar;
00032 import org.ros.internal.node.parameter.DefaultParameterTree;
00033 import org.ros.internal.node.parameter.ParameterManager;
00034 import org.ros.internal.node.response.Response;
00035 import org.ros.internal.node.response.StatusCode;
00036 import org.ros.internal.node.server.NodeIdentifier;
00037 import org.ros.internal.node.server.SlaveServer;
00038 import org.ros.internal.node.service.ServiceDeclaration;
00039 import org.ros.internal.node.service.ServiceFactory;
00040 import org.ros.internal.node.service.ServiceIdentifier;
00041 import org.ros.internal.node.service.ServiceManager;
00042 import org.ros.internal.node.topic.PublisherFactory;
00043 import org.ros.internal.node.topic.SubscriberFactory;
00044 import org.ros.internal.node.topic.TopicDeclaration;
00045 import org.ros.internal.node.topic.TopicParticipantManager;
00046 import org.ros.internal.node.xmlrpc.XmlRpcTimeoutException;
00047 import org.ros.message.MessageDeserializer;
00048 import org.ros.message.MessageFactory;
00049 import org.ros.message.MessageSerializationFactory;
00050 import org.ros.message.MessageSerializer;
00051 import org.ros.message.Time;
00052 import org.ros.namespace.GraphName;
00053 import org.ros.namespace.NameResolver;
00054 import org.ros.namespace.NodeNameResolver;
00055 import org.ros.node.ConnectedNode;
00056 import org.ros.node.DefaultNodeFactory;
00057 import org.ros.node.Node;
00058 import org.ros.node.NodeConfiguration;
00059 import org.ros.node.NodeListener;
00060 import org.ros.node.parameter.ParameterTree;
00061 import org.ros.node.service.ServiceClient;
00062 import org.ros.node.service.ServiceResponseBuilder;
00063 import org.ros.node.service.ServiceServer;
00064 import org.ros.node.topic.DefaultPublisherListener;
00065 import org.ros.node.topic.DefaultSubscriberListener;
00066 import org.ros.node.topic.Publisher;
00067 import org.ros.node.topic.Subscriber;
00068 import org.ros.time.ClockTopicTimeProvider;
00069 import org.ros.time.TimeProvider;
00070 
00071 import java.net.InetSocketAddress;
00072 import java.net.URI;
00073 import java.util.Collection;
00074 import java.util.concurrent.CountDownLatch;
00075 import java.util.concurrent.ScheduledExecutorService;
00076 import java.util.concurrent.TimeUnit;
00077 
00085 public class DefaultNode implements ConnectedNode {
00086 
00087   private static final boolean DEBUG = false;
00088 
00089   // TODO(damonkohler): Move to NodeConfiguration.
00095   private static final int MAX_SHUTDOWN_DELAY_DURATION = 5;
00096   private static final TimeUnit MAX_SHUTDOWN_DELAY_UNITS = TimeUnit.SECONDS;
00097 
00098   private final NodeConfiguration nodeConfiguration;
00099   private final ListenerGroup<NodeListener> nodeListeners;
00100   private final ScheduledExecutorService scheduledExecutorService;
00101   private final URI masterUri;
00102   private final MasterClient masterClient;
00103   private final TopicParticipantManager topicParticipantManager;
00104   private final ServiceManager serviceManager;
00105   private final ParameterManager parameterManager;
00106   private final GraphName nodeName;
00107   private final NodeNameResolver resolver;
00108   private final SlaveServer slaveServer;
00109   private final ParameterTree parameterTree;
00110   private final PublisherFactory publisherFactory;
00111   private final SubscriberFactory subscriberFactory;
00112   private final ServiceFactory serviceFactory;
00113   private final Registrar registrar;
00114 
00115   private RosoutLogger log;
00116   private TimeProvider timeProvider;
00117 
00128   public DefaultNode(NodeConfiguration nodeConfiguration, Collection<NodeListener> nodeListeners,
00129       ScheduledExecutorService scheduledExecutorService) {
00130     this.nodeConfiguration = NodeConfiguration.copyOf(nodeConfiguration);
00131     this.nodeListeners = new ListenerGroup<NodeListener>(scheduledExecutorService);
00132     this.nodeListeners.addAll(nodeListeners);
00133     this.scheduledExecutorService = scheduledExecutorService;
00134     masterUri = nodeConfiguration.getMasterUri();
00135     masterClient = new MasterClient(masterUri);
00136     topicParticipantManager = new TopicParticipantManager();
00137     serviceManager = new ServiceManager();
00138     parameterManager = new ParameterManager(scheduledExecutorService);
00139 
00140     GraphName basename = nodeConfiguration.getNodeName();
00141     NameResolver parentResolver = nodeConfiguration.getParentResolver();
00142     nodeName = parentResolver.getNamespace().join(basename);
00143     resolver = new NodeNameResolver(nodeName, parentResolver);
00144     slaveServer =
00145         new SlaveServer(nodeName, nodeConfiguration.getTcpRosBindAddress(),
00146             nodeConfiguration.getTcpRosAdvertiseAddress(),
00147             nodeConfiguration.getXmlRpcBindAddress(),
00148             nodeConfiguration.getXmlRpcAdvertiseAddress(), masterClient, topicParticipantManager,
00149             serviceManager, parameterManager, scheduledExecutorService);
00150     slaveServer.start();
00151 
00152     NodeIdentifier nodeIdentifier = slaveServer.toNodeIdentifier();
00153 
00154     parameterTree =
00155         DefaultParameterTree.newFromNodeIdentifier(nodeIdentifier, masterClient.getRemoteUri(),
00156             resolver, parameterManager);
00157 
00158     publisherFactory =
00159         new PublisherFactory(nodeIdentifier, topicParticipantManager,
00160             nodeConfiguration.getTopicMessageFactory(), scheduledExecutorService);
00161     subscriberFactory =
00162         new SubscriberFactory(nodeIdentifier, topicParticipantManager, scheduledExecutorService);
00163     serviceFactory =
00164         new ServiceFactory(nodeName, slaveServer, serviceManager, scheduledExecutorService);
00165 
00166     registrar = new Registrar(masterClient, scheduledExecutorService);
00167     topicParticipantManager.setListener(registrar);
00168     serviceManager.setListener(registrar);
00169 
00170     scheduledExecutorService.execute(new Runnable() {
00171       @Override
00172       public void run() {
00173         start();
00174       }
00175     });
00176   }
00177 
00178   private void start() {
00179     // The Registrar must be started first so that master registration is
00180     // possible during startup.
00181     registrar.start(slaveServer.toNodeIdentifier());
00182 
00183     // During startup, we wait for 1) the RosoutLogger and 2) the TimeProvider.
00184     final CountDownLatch latch = new CountDownLatch(2);
00185 
00186     log = new RosoutLogger(this);
00187     log.getPublisher().addListener(new DefaultPublisherListener<rosgraph_msgs.Log>() {
00188       @Override
00189       public void onMasterRegistrationSuccess(Publisher<rosgraph_msgs.Log> registrant) {
00190         latch.countDown();
00191       }
00192     });
00193 
00194     boolean useSimTime = false;
00195     try {
00196       useSimTime =
00197           parameterTree.has(Parameters.USE_SIM_TIME)
00198               && parameterTree.getBoolean(Parameters.USE_SIM_TIME);
00199     } catch (Exception e) {
00200       signalOnError(e);
00201     }
00202     if (useSimTime) {
00203       ClockTopicTimeProvider clockTopicTimeProvider = new ClockTopicTimeProvider(this);
00204       clockTopicTimeProvider.getSubscriber().addSubscriberListener(
00205           new DefaultSubscriberListener<rosgraph_msgs.Clock>() {
00206             @Override
00207             public void onMasterRegistrationSuccess(Subscriber<rosgraph_msgs.Clock> subscriber) {
00208               latch.countDown();
00209             }
00210           });
00211       timeProvider = clockTopicTimeProvider;
00212     } else {
00213       timeProvider = nodeConfiguration.getTimeProvider();
00214       latch.countDown();
00215     }
00216 
00217     try {
00218       latch.await();
00219     } catch (InterruptedException e) {
00220       signalOnError(e);
00221       shutdown();
00222       return;
00223     }
00224 
00225     signalOnStart();
00226   }
00227 
00228   @VisibleForTesting
00229   Registrar getRegistrar() {
00230     return registrar;
00231   }
00232 
00233   private <T> org.ros.message.MessageSerializer<T> newMessageSerializer(String messageType) {
00234     return nodeConfiguration.getMessageSerializationFactory().newMessageSerializer(messageType);
00235   }
00236 
00237   @SuppressWarnings("unchecked")
00238   private <T> MessageDeserializer<T> newMessageDeserializer(String messageType) {
00239     return (MessageDeserializer<T>) nodeConfiguration.getMessageSerializationFactory()
00240         .newMessageDeserializer(messageType);
00241   }
00242 
00243   @SuppressWarnings("unchecked")
00244   private <T> MessageSerializer<T> newServiceResponseSerializer(String serviceType) {
00245     return (MessageSerializer<T>) nodeConfiguration.getMessageSerializationFactory()
00246         .newServiceResponseSerializer(serviceType);
00247   }
00248 
00249   @SuppressWarnings("unchecked")
00250   private <T> MessageDeserializer<T> newServiceResponseDeserializer(String serviceType) {
00251     return (MessageDeserializer<T>) nodeConfiguration.getMessageSerializationFactory()
00252         .newServiceResponseDeserializer(serviceType);
00253   }
00254 
00255   @SuppressWarnings("unchecked")
00256   private <T> MessageSerializer<T> newServiceRequestSerializer(String serviceType) {
00257     return (MessageSerializer<T>) nodeConfiguration.getMessageSerializationFactory()
00258         .newServiceRequestSerializer(serviceType);
00259   }
00260 
00261   @SuppressWarnings("unchecked")
00262   private <T> MessageDeserializer<T> newServiceRequestDeserializer(String serviceType) {
00263     return (MessageDeserializer<T>) nodeConfiguration.getMessageSerializationFactory()
00264         .newServiceRequestDeserializer(serviceType);
00265   }
00266 
00267   @Override
00268   public <T> Publisher<T> newPublisher(GraphName topicName, String messageType) {
00269     GraphName resolvedTopicName = resolveName(topicName);
00270     TopicDescription topicDescription =
00271         nodeConfiguration.getTopicDescriptionFactory().newFromType(messageType);
00272     TopicDeclaration topicDeclaration =
00273         TopicDeclaration.newFromTopicName(resolvedTopicName, topicDescription);
00274     org.ros.message.MessageSerializer<T> serializer = newMessageSerializer(messageType);
00275     return publisherFactory.newOrExisting(topicDeclaration, serializer);
00276   }
00277 
00278   @Override
00279   public <T> Publisher<T> newPublisher(String topicName, String messageType) {
00280     return newPublisher(GraphName.of(topicName), messageType);
00281   }
00282 
00283   @Override
00284   public <T> Subscriber<T> newSubscriber(GraphName topicName, String messageType) {
00285     GraphName resolvedTopicName = resolveName(topicName);
00286     TopicDescription topicDescription =
00287         nodeConfiguration.getTopicDescriptionFactory().newFromType(messageType);
00288     TopicDeclaration topicDeclaration =
00289         TopicDeclaration.newFromTopicName(resolvedTopicName, topicDescription);
00290     MessageDeserializer<T> deserializer = newMessageDeserializer(messageType);
00291     Subscriber<T> subscriber = subscriberFactory.newOrExisting(topicDeclaration, deserializer);
00292     return subscriber;
00293   }
00294 
00295   @Override
00296   public <T> Subscriber<T> newSubscriber(String topicName, String messageType) {
00297     return newSubscriber(GraphName.of(topicName), messageType);
00298   }
00299 
00300   @Override
00301   public <T, S> ServiceServer<T, S> newServiceServer(GraphName serviceName, String serviceType,
00302       ServiceResponseBuilder<T, S> responseBuilder) {
00303     GraphName resolvedServiceName = resolveName(serviceName);
00304     // TODO(damonkohler): It's rather non-obvious that the URI will be
00305     // created later on the fly.
00306     ServiceIdentifier identifier = new ServiceIdentifier(resolvedServiceName, null);
00307     ServiceDescription serviceDescription =
00308         nodeConfiguration.getServiceDescriptionFactory().newFromType(serviceType);
00309     ServiceDeclaration definition = new ServiceDeclaration(identifier, serviceDescription);
00310     MessageDeserializer<T> requestDeserializer = newServiceRequestDeserializer(serviceType);
00311     MessageSerializer<S> responseSerializer = newServiceResponseSerializer(serviceType);
00312     return serviceFactory.newServer(definition, responseBuilder, requestDeserializer,
00313         responseSerializer, nodeConfiguration.getServiceResponseMessageFactory());
00314   }
00315 
00316   @Override
00317   public <T, S> ServiceServer<T, S> newServiceServer(String serviceName, String serviceType,
00318       ServiceResponseBuilder<T, S> responseBuilder) {
00319     return newServiceServer(GraphName.of(serviceName), serviceType, responseBuilder);
00320   }
00321 
00322   @SuppressWarnings("unchecked")
00323   @Override
00324   public <T, S> ServiceServer<T, S> getServiceServer(GraphName serviceName) {
00325     return (ServiceServer<T, S>) serviceManager.getServer(serviceName);
00326   }
00327 
00328   @Override
00329   public <T, S> ServiceServer<T, S> getServiceServer(String serviceName) {
00330     return getServiceServer(GraphName.of(serviceName));
00331   }
00332 
00333   @Override
00334   public URI lookupServiceUri(GraphName serviceName) {
00335     Response<URI> response =
00336         masterClient.lookupService(slaveServer.toNodeIdentifier().getName(),
00337             resolveName(serviceName).toString());
00338     if (response.getStatusCode() == StatusCode.SUCCESS) {
00339       return response.getResult();
00340     } else {
00341       return null;
00342     }
00343   }
00344 
00345   @Override
00346   public URI lookupServiceUri(String serviceName) {
00347     return lookupServiceUri(GraphName.of(serviceName));
00348   }
00349 
00350   @Override
00351   public <T, S> ServiceClient<T, S> newServiceClient(GraphName serviceName, String serviceType)
00352       throws ServiceNotFoundException {
00353     GraphName resolvedServiceName = resolveName(serviceName);
00354     URI uri = lookupServiceUri(resolvedServiceName);
00355     if (uri == null) {
00356       throw new ServiceNotFoundException("No such service " + resolvedServiceName + " of type "
00357           + serviceType);
00358     }
00359     ServiceDescription serviceDescription =
00360         nodeConfiguration.getServiceDescriptionFactory().newFromType(serviceType);
00361     ServiceIdentifier serviceIdentifier = new ServiceIdentifier(resolvedServiceName, uri);
00362     ServiceDeclaration definition = new ServiceDeclaration(serviceIdentifier, serviceDescription);
00363     MessageSerializer<T> requestSerializer = newServiceRequestSerializer(serviceType);
00364     MessageDeserializer<S> responseDeserializer = newServiceResponseDeserializer(serviceType);
00365     return serviceFactory.newClient(definition, requestSerializer, responseDeserializer,
00366         nodeConfiguration.getServiceRequestMessageFactory());
00367   }
00368 
00369   @Override
00370   public <T, S> ServiceClient<T, S> newServiceClient(String serviceName, String serviceType)
00371       throws ServiceNotFoundException {
00372     return newServiceClient(GraphName.of(serviceName), serviceType);
00373   }
00374 
00375   @Override
00376   public Time getCurrentTime() {
00377     return timeProvider.getCurrentTime();
00378   }
00379 
00380   @Override
00381   public GraphName getName() {
00382     return nodeName;
00383   }
00384 
00385   @Override
00386   public Log getLog() {
00387     return log;
00388   }
00389 
00390   @Override
00391   public GraphName resolveName(GraphName name) {
00392     return resolver.resolve(name);
00393   }
00394 
00395   @Override
00396   public GraphName resolveName(String name) {
00397     return resolver.resolve(GraphName.of(name));
00398   }
00399 
00400   @Override
00401   public void shutdown() {
00402     signalOnShutdown();
00403     // NOTE(damonkohler): We don't want to raise potentially spurious
00404     // exceptions during shutdown that would interrupt the process. This is
00405     // simply best effort cleanup.
00406     for (Publisher<?> publisher : topicParticipantManager.getPublishers()) {
00407       publisher.shutdown();
00408     }
00409     for (Subscriber<?> subscriber : topicParticipantManager.getSubscribers()) {
00410       subscriber.shutdown();
00411     }
00412     for (ServiceServer<?, ?> serviceServer : serviceManager.getServers()) {
00413       try {
00414         Response<Integer> response =
00415             masterClient.unregisterService(slaveServer.toNodeIdentifier(), serviceServer);
00416         if (DEBUG) {
00417           if (response.getResult() == 0) {
00418             System.err.println("Failed to unregister service: " + serviceServer.getName());
00419           }
00420         }
00421       } catch (XmlRpcTimeoutException e) {
00422         log.error(e);
00423       } catch (RemoteException e) {
00424         log.error(e);
00425       }
00426     }
00427     for (ServiceClient<?, ?> serviceClient : serviceManager.getClients()) {
00428       serviceClient.shutdown();
00429     }
00430     registrar.shutdown();
00431     slaveServer.shutdown();
00432     signalOnShutdownComplete();
00433   }
00434 
00435   @Override
00436   public URI getMasterUri() {
00437     return masterUri;
00438   }
00439 
00440   @Override
00441   public NodeNameResolver getResolver() {
00442     return resolver;
00443   }
00444 
00445   @Override
00446   public ParameterTree getParameterTree() {
00447     return parameterTree;
00448   }
00449 
00450   @Override
00451   public URI getUri() {
00452     return slaveServer.getUri();
00453   }
00454 
00455   @Override
00456   public MessageSerializationFactory getMessageSerializationFactory() {
00457     return nodeConfiguration.getMessageSerializationFactory();
00458   }
00459 
00460   @Override
00461   public MessageFactory getTopicMessageFactory() {
00462     return nodeConfiguration.getTopicMessageFactory();
00463   }
00464 
00465   @Override
00466   public MessageFactory getServiceRequestMessageFactory() {
00467     return nodeConfiguration.getServiceRequestMessageFactory();
00468   }
00469 
00470   @Override
00471   public MessageFactory getServiceResponseMessageFactory() {
00472     return nodeConfiguration.getServiceResponseMessageFactory();
00473   }
00474 
00475   @Override
00476   public void addListener(NodeListener listener) {
00477     nodeListeners.add(listener);
00478   }
00479 
00486   private void signalOnError(final Throwable throwable) {
00487     final Node node = this;
00488     nodeListeners.signal(new SignalRunnable<NodeListener>() {
00489       @Override
00490       public void run(NodeListener listener) {
00491         listener.onError(node, throwable);
00492       }
00493     });
00494   }
00495 
00501   private void signalOnStart() {
00502     final ConnectedNode connectedNode = this;
00503     nodeListeners.signal(new SignalRunnable<NodeListener>() {
00504       @Override
00505       public void run(NodeListener listener) {
00506         listener.onStart(connectedNode);
00507       }
00508     });
00509   }
00510 
00517   private void signalOnShutdown() {
00518     final Node node = this;
00519     try {
00520       nodeListeners.signal(new SignalRunnable<NodeListener>() {
00521         @Override
00522         public void run(NodeListener listener) {
00523           listener.onShutdown(node);
00524         }
00525       }, MAX_SHUTDOWN_DELAY_DURATION, MAX_SHUTDOWN_DELAY_UNITS);
00526     } catch (InterruptedException e) {
00527       // Ignored since we do not guarantee that all listeners will finish
00528       // before
00529       // shutdown begins.
00530     }
00531   }
00532 
00539   private void signalOnShutdownComplete() {
00540     final Node node = this;
00541     nodeListeners.signal(new SignalRunnable<NodeListener>() {
00542       @Override
00543       public void run(NodeListener listener) {
00544         try {
00545           listener.onShutdownComplete(node);
00546         } catch (Throwable e) {
00547           System.out.println(listener);
00548         }
00549       }
00550     });
00551   }
00552 
00553   @VisibleForTesting
00554   InetSocketAddress getAddress() {
00555     return slaveServer.getAddress();
00556   }
00557 
00558   @Override
00559   public ScheduledExecutorService getScheduledExecutorService() {
00560     return scheduledExecutorService;
00561   }
00562 
00563   @Override
00564   public void executeCancellableLoop(final CancellableLoop cancellableLoop) {
00565     scheduledExecutorService.execute(cancellableLoop);
00566     addListener(new NodeListener() {
00567       @Override
00568       public void onStart(ConnectedNode connectedNode) {
00569       }
00570 
00571       @Override
00572       public void onShutdown(Node node) {
00573         cancellableLoop.cancel();
00574       }
00575 
00576       @Override
00577       public void onShutdownComplete(Node node) {
00578       }
00579 
00580       @Override
00581       public void onError(Node node, Throwable throwable) {
00582         cancellableLoop.cancel();
00583       }
00584     });
00585   }
00586 }


rosjava_core
Author(s):
autogenerated on Wed Aug 26 2015 16:06:49