00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 package org.ros.internal.node;
00018
00019 import com.google.common.annotations.VisibleForTesting;
00020
00021 import org.apache.commons.logging.Log;
00022 import org.ros.Parameters;
00023 import org.ros.concurrent.CancellableLoop;
00024 import org.ros.concurrent.ListenerGroup;
00025 import org.ros.concurrent.SignalRunnable;
00026 import org.ros.exception.RemoteException;
00027 import org.ros.exception.ServiceNotFoundException;
00028 import org.ros.internal.message.service.ServiceDescription;
00029 import org.ros.internal.message.topic.TopicDescription;
00030 import org.ros.internal.node.client.MasterClient;
00031 import org.ros.internal.node.client.Registrar;
00032 import org.ros.internal.node.parameter.DefaultParameterTree;
00033 import org.ros.internal.node.parameter.ParameterManager;
00034 import org.ros.internal.node.response.Response;
00035 import org.ros.internal.node.response.StatusCode;
00036 import org.ros.internal.node.server.NodeIdentifier;
00037 import org.ros.internal.node.server.SlaveServer;
00038 import org.ros.internal.node.service.ServiceDeclaration;
00039 import org.ros.internal.node.service.ServiceFactory;
00040 import org.ros.internal.node.service.ServiceIdentifier;
00041 import org.ros.internal.node.service.ServiceManager;
00042 import org.ros.internal.node.topic.PublisherFactory;
00043 import org.ros.internal.node.topic.SubscriberFactory;
00044 import org.ros.internal.node.topic.TopicDeclaration;
00045 import org.ros.internal.node.topic.TopicParticipantManager;
00046 import org.ros.internal.node.xmlrpc.XmlRpcTimeoutException;
00047 import org.ros.message.MessageDeserializer;
00048 import org.ros.message.MessageFactory;
00049 import org.ros.message.MessageSerializationFactory;
00050 import org.ros.message.MessageSerializer;
00051 import org.ros.message.Time;
00052 import org.ros.namespace.GraphName;
00053 import org.ros.namespace.NameResolver;
00054 import org.ros.namespace.NodeNameResolver;
00055 import org.ros.node.ConnectedNode;
00056 import org.ros.node.DefaultNodeFactory;
00057 import org.ros.node.Node;
00058 import org.ros.node.NodeConfiguration;
00059 import org.ros.node.NodeListener;
00060 import org.ros.node.parameter.ParameterTree;
00061 import org.ros.node.service.ServiceClient;
00062 import org.ros.node.service.ServiceResponseBuilder;
00063 import org.ros.node.service.ServiceServer;
00064 import org.ros.node.topic.DefaultPublisherListener;
00065 import org.ros.node.topic.DefaultSubscriberListener;
00066 import org.ros.node.topic.Publisher;
00067 import org.ros.node.topic.Subscriber;
00068 import org.ros.time.ClockTopicTimeProvider;
00069 import org.ros.time.TimeProvider;
00070
00071 import java.net.InetSocketAddress;
00072 import java.net.URI;
00073 import java.util.Collection;
00074 import java.util.concurrent.CountDownLatch;
00075 import java.util.concurrent.ScheduledExecutorService;
00076 import java.util.concurrent.TimeUnit;
00077
00085 public class DefaultNode implements ConnectedNode {
00086
00087 private static final boolean DEBUG = false;
00088
00089
00095 private static final int MAX_SHUTDOWN_DELAY_DURATION = 5;
00096 private static final TimeUnit MAX_SHUTDOWN_DELAY_UNITS = TimeUnit.SECONDS;
00097
00098 private final NodeConfiguration nodeConfiguration;
00099 private final ListenerGroup<NodeListener> nodeListeners;
00100 private final ScheduledExecutorService scheduledExecutorService;
00101 private final URI masterUri;
00102 private final MasterClient masterClient;
00103 private final TopicParticipantManager topicParticipantManager;
00104 private final ServiceManager serviceManager;
00105 private final ParameterManager parameterManager;
00106 private final GraphName nodeName;
00107 private final NodeNameResolver resolver;
00108 private final SlaveServer slaveServer;
00109 private final ParameterTree parameterTree;
00110 private final PublisherFactory publisherFactory;
00111 private final SubscriberFactory subscriberFactory;
00112 private final ServiceFactory serviceFactory;
00113 private final Registrar registrar;
00114
00115 private RosoutLogger log;
00116 private TimeProvider timeProvider;
00117
00128 public DefaultNode(NodeConfiguration nodeConfiguration, Collection<NodeListener> nodeListeners,
00129 ScheduledExecutorService scheduledExecutorService) {
00130 this.nodeConfiguration = NodeConfiguration.copyOf(nodeConfiguration);
00131 this.nodeListeners = new ListenerGroup<NodeListener>(scheduledExecutorService);
00132 this.nodeListeners.addAll(nodeListeners);
00133 this.scheduledExecutorService = scheduledExecutorService;
00134 masterUri = nodeConfiguration.getMasterUri();
00135 masterClient = new MasterClient(masterUri);
00136 topicParticipantManager = new TopicParticipantManager();
00137 serviceManager = new ServiceManager();
00138 parameterManager = new ParameterManager(scheduledExecutorService);
00139
00140 GraphName basename = nodeConfiguration.getNodeName();
00141 NameResolver parentResolver = nodeConfiguration.getParentResolver();
00142 nodeName = parentResolver.getNamespace().join(basename);
00143 resolver = new NodeNameResolver(nodeName, parentResolver);
00144 slaveServer =
00145 new SlaveServer(nodeName, nodeConfiguration.getTcpRosBindAddress(),
00146 nodeConfiguration.getTcpRosAdvertiseAddress(),
00147 nodeConfiguration.getXmlRpcBindAddress(),
00148 nodeConfiguration.getXmlRpcAdvertiseAddress(), masterClient, topicParticipantManager,
00149 serviceManager, parameterManager, scheduledExecutorService);
00150 slaveServer.start();
00151
00152 NodeIdentifier nodeIdentifier = slaveServer.toNodeIdentifier();
00153
00154 parameterTree =
00155 DefaultParameterTree.newFromNodeIdentifier(nodeIdentifier, masterClient.getRemoteUri(),
00156 resolver, parameterManager);
00157
00158 publisherFactory =
00159 new PublisherFactory(nodeIdentifier, topicParticipantManager,
00160 nodeConfiguration.getTopicMessageFactory(), scheduledExecutorService);
00161 subscriberFactory =
00162 new SubscriberFactory(nodeIdentifier, topicParticipantManager, scheduledExecutorService);
00163 serviceFactory =
00164 new ServiceFactory(nodeName, slaveServer, serviceManager, scheduledExecutorService);
00165
00166 registrar = new Registrar(masterClient, scheduledExecutorService);
00167 topicParticipantManager.setListener(registrar);
00168 serviceManager.setListener(registrar);
00169
00170 scheduledExecutorService.execute(new Runnable() {
00171 @Override
00172 public void run() {
00173 start();
00174 }
00175 });
00176 }
00177
00178 private void start() {
00179
00180
00181 registrar.start(slaveServer.toNodeIdentifier());
00182
00183
00184 final CountDownLatch latch = new CountDownLatch(2);
00185
00186 log = new RosoutLogger(this);
00187 log.getPublisher().addListener(new DefaultPublisherListener<rosgraph_msgs.Log>() {
00188 @Override
00189 public void onMasterRegistrationSuccess(Publisher<rosgraph_msgs.Log> registrant) {
00190 latch.countDown();
00191 }
00192 });
00193
00194 boolean useSimTime = false;
00195 try {
00196 useSimTime =
00197 parameterTree.has(Parameters.USE_SIM_TIME)
00198 && parameterTree.getBoolean(Parameters.USE_SIM_TIME);
00199 } catch (Exception e) {
00200 signalOnError(e);
00201 }
00202 if (useSimTime) {
00203 ClockTopicTimeProvider clockTopicTimeProvider = new ClockTopicTimeProvider(this);
00204 clockTopicTimeProvider.getSubscriber().addSubscriberListener(
00205 new DefaultSubscriberListener<rosgraph_msgs.Clock>() {
00206 @Override
00207 public void onMasterRegistrationSuccess(Subscriber<rosgraph_msgs.Clock> subscriber) {
00208 latch.countDown();
00209 }
00210 });
00211 timeProvider = clockTopicTimeProvider;
00212 } else {
00213 timeProvider = nodeConfiguration.getTimeProvider();
00214 latch.countDown();
00215 }
00216
00217 try {
00218 latch.await();
00219 } catch (InterruptedException e) {
00220 signalOnError(e);
00221 shutdown();
00222 return;
00223 }
00224
00225 signalOnStart();
00226 }
00227
00228 @VisibleForTesting
00229 Registrar getRegistrar() {
00230 return registrar;
00231 }
00232
00233 private <T> org.ros.message.MessageSerializer<T> newMessageSerializer(String messageType) {
00234 return nodeConfiguration.getMessageSerializationFactory().newMessageSerializer(messageType);
00235 }
00236
00237 @SuppressWarnings("unchecked")
00238 private <T> MessageDeserializer<T> newMessageDeserializer(String messageType) {
00239 return (MessageDeserializer<T>) nodeConfiguration.getMessageSerializationFactory()
00240 .newMessageDeserializer(messageType);
00241 }
00242
00243 @SuppressWarnings("unchecked")
00244 private <T> MessageSerializer<T> newServiceResponseSerializer(String serviceType) {
00245 return (MessageSerializer<T>) nodeConfiguration.getMessageSerializationFactory()
00246 .newServiceResponseSerializer(serviceType);
00247 }
00248
00249 @SuppressWarnings("unchecked")
00250 private <T> MessageDeserializer<T> newServiceResponseDeserializer(String serviceType) {
00251 return (MessageDeserializer<T>) nodeConfiguration.getMessageSerializationFactory()
00252 .newServiceResponseDeserializer(serviceType);
00253 }
00254
00255 @SuppressWarnings("unchecked")
00256 private <T> MessageSerializer<T> newServiceRequestSerializer(String serviceType) {
00257 return (MessageSerializer<T>) nodeConfiguration.getMessageSerializationFactory()
00258 .newServiceRequestSerializer(serviceType);
00259 }
00260
00261 @SuppressWarnings("unchecked")
00262 private <T> MessageDeserializer<T> newServiceRequestDeserializer(String serviceType) {
00263 return (MessageDeserializer<T>) nodeConfiguration.getMessageSerializationFactory()
00264 .newServiceRequestDeserializer(serviceType);
00265 }
00266
00267 @Override
00268 public <T> Publisher<T> newPublisher(GraphName topicName, String messageType) {
00269 GraphName resolvedTopicName = resolveName(topicName);
00270 TopicDescription topicDescription =
00271 nodeConfiguration.getTopicDescriptionFactory().newFromType(messageType);
00272 TopicDeclaration topicDeclaration =
00273 TopicDeclaration.newFromTopicName(resolvedTopicName, topicDescription);
00274 org.ros.message.MessageSerializer<T> serializer = newMessageSerializer(messageType);
00275 return publisherFactory.newOrExisting(topicDeclaration, serializer);
00276 }
00277
00278 @Override
00279 public <T> Publisher<T> newPublisher(String topicName, String messageType) {
00280 return newPublisher(GraphName.of(topicName), messageType);
00281 }
00282
00283 @Override
00284 public <T> Subscriber<T> newSubscriber(GraphName topicName, String messageType) {
00285 GraphName resolvedTopicName = resolveName(topicName);
00286 TopicDescription topicDescription =
00287 nodeConfiguration.getTopicDescriptionFactory().newFromType(messageType);
00288 TopicDeclaration topicDeclaration =
00289 TopicDeclaration.newFromTopicName(resolvedTopicName, topicDescription);
00290 MessageDeserializer<T> deserializer = newMessageDeserializer(messageType);
00291 Subscriber<T> subscriber = subscriberFactory.newOrExisting(topicDeclaration, deserializer);
00292 return subscriber;
00293 }
00294
00295 @Override
00296 public <T> Subscriber<T> newSubscriber(String topicName, String messageType) {
00297 return newSubscriber(GraphName.of(topicName), messageType);
00298 }
00299
00300 @Override
00301 public <T, S> ServiceServer<T, S> newServiceServer(GraphName serviceName, String serviceType,
00302 ServiceResponseBuilder<T, S> responseBuilder) {
00303 GraphName resolvedServiceName = resolveName(serviceName);
00304
00305
00306 ServiceIdentifier identifier = new ServiceIdentifier(resolvedServiceName, null);
00307 ServiceDescription serviceDescription =
00308 nodeConfiguration.getServiceDescriptionFactory().newFromType(serviceType);
00309 ServiceDeclaration definition = new ServiceDeclaration(identifier, serviceDescription);
00310 MessageDeserializer<T> requestDeserializer = newServiceRequestDeserializer(serviceType);
00311 MessageSerializer<S> responseSerializer = newServiceResponseSerializer(serviceType);
00312 return serviceFactory.newServer(definition, responseBuilder, requestDeserializer,
00313 responseSerializer, nodeConfiguration.getServiceResponseMessageFactory());
00314 }
00315
00316 @Override
00317 public <T, S> ServiceServer<T, S> newServiceServer(String serviceName, String serviceType,
00318 ServiceResponseBuilder<T, S> responseBuilder) {
00319 return newServiceServer(GraphName.of(serviceName), serviceType, responseBuilder);
00320 }
00321
00322 @SuppressWarnings("unchecked")
00323 @Override
00324 public <T, S> ServiceServer<T, S> getServiceServer(GraphName serviceName) {
00325 return (ServiceServer<T, S>) serviceManager.getServer(serviceName);
00326 }
00327
00328 @Override
00329 public <T, S> ServiceServer<T, S> getServiceServer(String serviceName) {
00330 return getServiceServer(GraphName.of(serviceName));
00331 }
00332
00333 @Override
00334 public URI lookupServiceUri(GraphName serviceName) {
00335 Response<URI> response =
00336 masterClient.lookupService(slaveServer.toNodeIdentifier().getName(),
00337 resolveName(serviceName).toString());
00338 if (response.getStatusCode() == StatusCode.SUCCESS) {
00339 return response.getResult();
00340 } else {
00341 return null;
00342 }
00343 }
00344
00345 @Override
00346 public URI lookupServiceUri(String serviceName) {
00347 return lookupServiceUri(GraphName.of(serviceName));
00348 }
00349
00350 @Override
00351 public <T, S> ServiceClient<T, S> newServiceClient(GraphName serviceName, String serviceType)
00352 throws ServiceNotFoundException {
00353 GraphName resolvedServiceName = resolveName(serviceName);
00354 URI uri = lookupServiceUri(resolvedServiceName);
00355 if (uri == null) {
00356 throw new ServiceNotFoundException("No such service " + resolvedServiceName + " of type "
00357 + serviceType);
00358 }
00359 ServiceDescription serviceDescription =
00360 nodeConfiguration.getServiceDescriptionFactory().newFromType(serviceType);
00361 ServiceIdentifier serviceIdentifier = new ServiceIdentifier(resolvedServiceName, uri);
00362 ServiceDeclaration definition = new ServiceDeclaration(serviceIdentifier, serviceDescription);
00363 MessageSerializer<T> requestSerializer = newServiceRequestSerializer(serviceType);
00364 MessageDeserializer<S> responseDeserializer = newServiceResponseDeserializer(serviceType);
00365 return serviceFactory.newClient(definition, requestSerializer, responseDeserializer,
00366 nodeConfiguration.getServiceRequestMessageFactory());
00367 }
00368
00369 @Override
00370 public <T, S> ServiceClient<T, S> newServiceClient(String serviceName, String serviceType)
00371 throws ServiceNotFoundException {
00372 return newServiceClient(GraphName.of(serviceName), serviceType);
00373 }
00374
00375 @Override
00376 public Time getCurrentTime() {
00377 return timeProvider.getCurrentTime();
00378 }
00379
00380 @Override
00381 public GraphName getName() {
00382 return nodeName;
00383 }
00384
00385 @Override
00386 public Log getLog() {
00387 return log;
00388 }
00389
00390 @Override
00391 public GraphName resolveName(GraphName name) {
00392 return resolver.resolve(name);
00393 }
00394
00395 @Override
00396 public GraphName resolveName(String name) {
00397 return resolver.resolve(GraphName.of(name));
00398 }
00399
00400 @Override
00401 public void shutdown() {
00402 signalOnShutdown();
00403
00404
00405
00406 for (Publisher<?> publisher : topicParticipantManager.getPublishers()) {
00407 publisher.shutdown();
00408 }
00409 for (Subscriber<?> subscriber : topicParticipantManager.getSubscribers()) {
00410 subscriber.shutdown();
00411 }
00412 for (ServiceServer<?, ?> serviceServer : serviceManager.getServers()) {
00413 try {
00414 Response<Integer> response =
00415 masterClient.unregisterService(slaveServer.toNodeIdentifier(), serviceServer);
00416 if (DEBUG) {
00417 if (response.getResult() == 0) {
00418 System.err.println("Failed to unregister service: " + serviceServer.getName());
00419 }
00420 }
00421 } catch (XmlRpcTimeoutException e) {
00422 log.error(e);
00423 } catch (RemoteException e) {
00424 log.error(e);
00425 }
00426 }
00427 for (ServiceClient<?, ?> serviceClient : serviceManager.getClients()) {
00428 serviceClient.shutdown();
00429 }
00430 registrar.shutdown();
00431 slaveServer.shutdown();
00432 signalOnShutdownComplete();
00433 }
00434
00435 @Override
00436 public URI getMasterUri() {
00437 return masterUri;
00438 }
00439
00440 @Override
00441 public NodeNameResolver getResolver() {
00442 return resolver;
00443 }
00444
00445 @Override
00446 public ParameterTree getParameterTree() {
00447 return parameterTree;
00448 }
00449
00450 @Override
00451 public URI getUri() {
00452 return slaveServer.getUri();
00453 }
00454
00455 @Override
00456 public MessageSerializationFactory getMessageSerializationFactory() {
00457 return nodeConfiguration.getMessageSerializationFactory();
00458 }
00459
00460 @Override
00461 public MessageFactory getTopicMessageFactory() {
00462 return nodeConfiguration.getTopicMessageFactory();
00463 }
00464
00465 @Override
00466 public MessageFactory getServiceRequestMessageFactory() {
00467 return nodeConfiguration.getServiceRequestMessageFactory();
00468 }
00469
00470 @Override
00471 public MessageFactory getServiceResponseMessageFactory() {
00472 return nodeConfiguration.getServiceResponseMessageFactory();
00473 }
00474
00475 @Override
00476 public void addListener(NodeListener listener) {
00477 nodeListeners.add(listener);
00478 }
00479
00486 private void signalOnError(final Throwable throwable) {
00487 final Node node = this;
00488 nodeListeners.signal(new SignalRunnable<NodeListener>() {
00489 @Override
00490 public void run(NodeListener listener) {
00491 listener.onError(node, throwable);
00492 }
00493 });
00494 }
00495
00501 private void signalOnStart() {
00502 final ConnectedNode connectedNode = this;
00503 nodeListeners.signal(new SignalRunnable<NodeListener>() {
00504 @Override
00505 public void run(NodeListener listener) {
00506 listener.onStart(connectedNode);
00507 }
00508 });
00509 }
00510
00517 private void signalOnShutdown() {
00518 final Node node = this;
00519 try {
00520 nodeListeners.signal(new SignalRunnable<NodeListener>() {
00521 @Override
00522 public void run(NodeListener listener) {
00523 listener.onShutdown(node);
00524 }
00525 }, MAX_SHUTDOWN_DELAY_DURATION, MAX_SHUTDOWN_DELAY_UNITS);
00526 } catch (InterruptedException e) {
00527
00528
00529
00530 }
00531 }
00532
00539 private void signalOnShutdownComplete() {
00540 final Node node = this;
00541 nodeListeners.signal(new SignalRunnable<NodeListener>() {
00542 @Override
00543 public void run(NodeListener listener) {
00544 try {
00545 listener.onShutdownComplete(node);
00546 } catch (Throwable e) {
00547 System.out.println(listener);
00548 }
00549 }
00550 });
00551 }
00552
00553 @VisibleForTesting
00554 InetSocketAddress getAddress() {
00555 return slaveServer.getAddress();
00556 }
00557
00558 @Override
00559 public ScheduledExecutorService getScheduledExecutorService() {
00560 return scheduledExecutorService;
00561 }
00562
00563 @Override
00564 public void executeCancellableLoop(final CancellableLoop cancellableLoop) {
00565 scheduledExecutorService.execute(cancellableLoop);
00566 addListener(new NodeListener() {
00567 @Override
00568 public void onStart(ConnectedNode connectedNode) {
00569 }
00570
00571 @Override
00572 public void onShutdown(Node node) {
00573 cancellableLoop.cancel();
00574 }
00575
00576 @Override
00577 public void onShutdownComplete(Node node) {
00578 }
00579
00580 @Override
00581 public void onError(Node node, Throwable throwable) {
00582 cancellableLoop.cancel();
00583 }
00584 });
00585 }
00586 }