00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 package org.ros.internal.node.server.master;
00018
00019 import com.google.common.annotations.VisibleForTesting;
00020 import com.google.common.collect.Lists;
00021
00022 import org.apache.commons.logging.Log;
00023 import org.apache.commons.logging.LogFactory;
00024 import org.ros.address.AdvertiseAddress;
00025 import org.ros.address.BindAddress;
00026 import org.ros.internal.node.client.SlaveClient;
00027 import org.ros.internal.node.server.NodeIdentifier;
00028 import org.ros.internal.node.server.SlaveServer;
00029 import org.ros.internal.node.server.XmlRpcServer;
00030 import org.ros.internal.node.topic.TopicParticipant;
00031 import org.ros.internal.node.xmlrpc.MasterXmlRpcEndpointImpl;
00032 import org.ros.master.client.TopicSystemState;
00033 import org.ros.namespace.GraphName;
00034 import org.ros.node.Node;
00035 import org.ros.node.service.ServiceServer;
00036 import org.ros.node.topic.Publisher;
00037 import org.ros.node.topic.Subscriber;
00038
00039 import java.net.URI;
00040 import java.util.Collection;
00041 import java.util.List;
00042
00056 public class MasterServer extends XmlRpcServer implements MasterRegistrationListener {
00057
00058 private static final boolean DEBUG = false;
00059 private static final Log log = LogFactory.getLog(MasterServer.class);
00060
00064 public static final int SYSTEM_STATE_PUBLISHERS = 0;
00065
00069 public static final int SYSTEM_STATE_SUBSCRIBERS = 1;
00070
00074 public static final int SYSTEM_STATE_SERVICES = 2;
00075
00080 private static final GraphName MASTER_NODE_NAME = GraphName.of("/master");
00081
00085 private final MasterRegistrationManagerImpl masterRegistrationManager;
00086
00087 public MasterServer(BindAddress bindAddress, AdvertiseAddress advertiseAddress) {
00088 super(bindAddress, advertiseAddress);
00089 masterRegistrationManager = new MasterRegistrationManagerImpl(this);
00090 }
00091
00095 public void start() {
00096 if (DEBUG) {
00097 log.info("Starting master server.");
00098 }
00099 super.start(MasterXmlRpcEndpointImpl.class, new MasterXmlRpcEndpointImpl(this));
00100 }
00101
00114 public void registerService(GraphName nodeName, URI nodeSlaveUri, GraphName serviceName,
00115 URI serviceUri) {
00116 synchronized (masterRegistrationManager) {
00117 masterRegistrationManager.registerService(nodeName, nodeSlaveUri, serviceName, serviceUri);
00118 }
00119 }
00120
00132 public boolean unregisterService(GraphName nodeName, GraphName serviceName, URI serviceUri) {
00133 synchronized (masterRegistrationManager) {
00134 return masterRegistrationManager.unregisterService(nodeName, serviceName, serviceUri);
00135 }
00136 }
00137
00154 public List<URI> registerSubscriber(GraphName nodeName, URI nodeSlaveUri, GraphName topicName,
00155 String topicMessageType) {
00156 if (DEBUG) {
00157 log.info(String.format(
00158 "Registering subscriber %s with message type %s on node %s with URI %s", topicName,
00159 topicMessageType, nodeName, nodeSlaveUri));
00160 }
00161
00162 synchronized (masterRegistrationManager) {
00163 TopicRegistrationInfo topicInfo =
00164 masterRegistrationManager.registerSubscriber(nodeName, nodeSlaveUri, topicName,
00165 topicMessageType);
00166 List<URI> publisherUris = Lists.newArrayList();
00167 for (NodeRegistrationInfo publisherNodeInfo : topicInfo.getPublishers()) {
00168 publisherUris.add(publisherNodeInfo.getNodeSlaveUri());
00169 }
00170 return publisherUris;
00171 }
00172 }
00173
00183 public boolean unregisterSubscriber(GraphName nodeName, GraphName topicName) {
00184 if (DEBUG) {
00185 log.info(String.format("Unregistering subscriber for %s on node %s.", topicName, nodeName));
00186 }
00187 synchronized (masterRegistrationManager) {
00188 return masterRegistrationManager.unregisterSubscriber(nodeName, topicName);
00189 }
00190 }
00191
00207 public List<URI> registerPublisher(GraphName nodeName, URI nodeSlaveUri, GraphName topicName,
00208 String topicMessageType) {
00209 if (DEBUG) {
00210 log.info(String.format(
00211 "Registering publisher %s with message type %s on node %s with URI %s.", topicName,
00212 topicMessageType, nodeName, nodeSlaveUri));
00213 }
00214
00215 synchronized (masterRegistrationManager) {
00216 TopicRegistrationInfo topicInfo =
00217 masterRegistrationManager.registerPublisher(nodeName, nodeSlaveUri, topicName,
00218 topicMessageType);
00219
00220 List<URI> subscriberSlaveUris = Lists.newArrayList();
00221 for (NodeRegistrationInfo publisherNodeInfo : topicInfo.getSubscribers()) {
00222 subscriberSlaveUris.add(publisherNodeInfo.getNodeSlaveUri());
00223 }
00224
00225 publisherUpdate(topicInfo, subscriberSlaveUris);
00226
00227 return subscriberSlaveUris;
00228 }
00229 }
00230
00240 private void publisherUpdate(TopicRegistrationInfo topicInfo, List<URI> subscriberSlaveUris) {
00241 if (DEBUG) {
00242 log.info("Publisher update: " + topicInfo.getTopicName());
00243 }
00244 List<URI> publisherUris = Lists.newArrayList();
00245 for (NodeRegistrationInfo publisherNodeInfo : topicInfo.getPublishers()) {
00246 publisherUris.add(publisherNodeInfo.getNodeSlaveUri());
00247 }
00248
00249 GraphName topicName = topicInfo.getTopicName();
00250 for (URI subscriberSlaveUri : subscriberSlaveUris) {
00251 contactSubscriberForPublisherUpdate(subscriberSlaveUri, topicName, publisherUris);
00252 }
00253 }
00254
00265 @VisibleForTesting
00266 protected void contactSubscriberForPublisherUpdate(URI subscriberSlaveUri, GraphName topicName,
00267 List<URI> publisherUris) {
00268 SlaveClient client = new SlaveClient(MASTER_NODE_NAME, subscriberSlaveUri);
00269 client.publisherUpdate(topicName, publisherUris);
00270 }
00271
00281 public boolean unregisterPublisher(GraphName nodeName, GraphName topicName) {
00282 if (DEBUG) {
00283 log.info(String.format("Unregistering publisher for %s on %s.", topicName, nodeName));
00284 }
00285 synchronized (masterRegistrationManager) {
00286 return masterRegistrationManager.unregisterPublisher(nodeName, topicName);
00287 }
00288 }
00289
00302 public URI lookupNode(GraphName nodeName) {
00303 synchronized (masterRegistrationManager) {
00304 NodeRegistrationInfo node = masterRegistrationManager.getNodeRegistrationInfo(nodeName);
00305 if (node != null) {
00306 return node.getNodeSlaveUri();
00307 } else {
00308 return null;
00309 }
00310 }
00311 }
00312
00321 public List<List<String>> getTopicTypes(GraphName calledId) {
00322 synchronized (masterRegistrationManager) {
00323 List<List<String>> result = Lists.newArrayList();
00324 for (TopicRegistrationInfo topic : masterRegistrationManager.getAllTopics()) {
00325 result.add(Lists.newArrayList(topic.getTopicName().toString(), topic.getMessageType()));
00326 }
00327 return result;
00328 }
00329 }
00330
00339 public List<Object> getSystemState() {
00340 synchronized (masterRegistrationManager) {
00341 List<Object> result = Lists.newArrayList();
00342
00343 Collection<TopicRegistrationInfo> topics = masterRegistrationManager.getAllTopics();
00344 result.add(getSystemStatePublishers(topics));
00345 result.add(getSystemStateSubscribers(topics));
00346 result.add(getSystemStateServices());
00347 return result;
00348 }
00349 }
00350
00361 private List<Object> getSystemStatePublishers(Collection<TopicRegistrationInfo> topics) {
00362 List<Object> result = Lists.newArrayList();
00363 for (TopicRegistrationInfo topic : topics) {
00364 if (topic.hasPublishers()) {
00365 List<Object> topicInfo = Lists.newArrayList();
00366 topicInfo.add(topic.getTopicName().toString());
00367
00368 List<String> publist = Lists.newArrayList();
00369 for (NodeRegistrationInfo node : topic.getPublishers()) {
00370 publist.add(node.getNodeName().toString());
00371 }
00372 topicInfo.add(publist);
00373
00374 result.add(topicInfo);
00375 }
00376 }
00377 return result;
00378 }
00379
00390 private List<Object> getSystemStateSubscribers(Collection<TopicRegistrationInfo> topics) {
00391 List<Object> result = Lists.newArrayList();
00392 for (TopicRegistrationInfo topic : topics) {
00393 if (topic.hasSubscribers()) {
00394 List<Object> topicInfo = Lists.newArrayList();
00395 topicInfo.add(topic.getTopicName().toString());
00396
00397 List<Object> sublist = Lists.newArrayList();
00398 for (NodeRegistrationInfo node : topic.getSubscribers()) {
00399 sublist.add(node.getNodeName().toString());
00400 }
00401 topicInfo.add(sublist);
00402
00403 result.add(topicInfo);
00404 }
00405 }
00406 return result;
00407 }
00408
00416 private List<Object> getSystemStateServices() {
00417 List<Object> result = Lists.newArrayList();
00418
00419 for (ServiceRegistrationInfo service : masterRegistrationManager.getAllServices()) {
00420 List<Object> topicInfo = Lists.newArrayList();
00421 topicInfo.add(service.getServiceName().toString());
00422 topicInfo.add(Lists.newArrayList(service.getServiceName().toString()));
00423
00424 result.add(topicInfo);
00425 }
00426
00427 return result;
00428 }
00429
00438 public URI lookupService(GraphName serviceName) {
00439 synchronized (masterRegistrationManager) {
00440 ServiceRegistrationInfo service =
00441 masterRegistrationManager.getServiceRegistrationInfo(serviceName);
00442 if (service != null) {
00443 return service.getServiceUri();
00444 } else {
00445 return null;
00446 }
00447 }
00448 }
00449
00462 public List<Object> getPublishedTopics(GraphName caller, GraphName subgraph) {
00463 synchronized (masterRegistrationManager) {
00464
00465 List<Object> result = Lists.newArrayList();
00466 for (TopicRegistrationInfo topic : masterRegistrationManager.getAllTopics()) {
00467 if (topic.hasPublishers()) {
00468 result.add(Lists.newArrayList(topic.getTopicName().toString(), topic.getMessageType()));
00469 }
00470 }
00471 return result;
00472 }
00473 }
00474
00475 @Override
00476 public void onNodeReplacement(NodeRegistrationInfo nodeInfo) {
00477
00478
00479 if (log.isWarnEnabled()) {
00480 log.warn(String.format("Existing node %s with slave URI %s will be shutdown.",
00481 nodeInfo.getNodeName(), nodeInfo.getNodeSlaveUri()));
00482 }
00483
00484 SlaveClient client = new SlaveClient(MASTER_NODE_NAME, nodeInfo.getNodeSlaveUri());
00485 client.shutdown("Replaced by new slave");
00486 }
00487 }