Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 package org.ros.internal.node.client;
00018
00019 import com.google.common.base.Preconditions;
00020
00021 import org.apache.commons.logging.Log;
00022 import org.apache.commons.logging.LogFactory;
00023 import org.ros.concurrent.Holder;
00024 import org.ros.concurrent.RetryingExecutorService;
00025 import org.ros.exception.RosRuntimeException;
00026 import org.ros.internal.node.response.Response;
00027 import org.ros.internal.node.server.NodeIdentifier;
00028 import org.ros.internal.node.server.SlaveServer;
00029 import org.ros.internal.node.server.master.MasterServer;
00030 import org.ros.internal.node.service.DefaultServiceServer;
00031 import org.ros.internal.node.service.ServiceManagerListener;
00032 import org.ros.internal.node.topic.DefaultPublisher;
00033 import org.ros.internal.node.topic.DefaultSubscriber;
00034 import org.ros.internal.node.topic.PublisherIdentifier;
00035 import org.ros.internal.node.topic.TopicParticipantManagerListener;
00036
00037 import java.net.URI;
00038 import java.util.Collection;
00039 import java.util.List;
00040 import java.util.concurrent.Callable;
00041 import java.util.concurrent.ScheduledExecutorService;
00042 import java.util.concurrent.TimeUnit;
00043
00051 public class Registrar implements TopicParticipantManagerListener, ServiceManagerListener {
00052
00053 private static final boolean DEBUG = true;
00054 private static final Log log = LogFactory.getLog(Registrar.class);
00055
00056 private static final int SHUTDOWN_TIMEOUT = 5;
00057 private static final TimeUnit SHUTDOWN_TIMEOUT_UNITS = TimeUnit.SECONDS;
00058
00059 private final MasterClient masterClient;
00060 private final ScheduledExecutorService executorService;
00061 private final RetryingExecutorService retryingExecutorService;
00062
00063 private NodeIdentifier nodeIdentifier;
00064 private boolean running;
00065
00073 public Registrar(MasterClient masterClient, ScheduledExecutorService executorService) {
00074 this.masterClient = masterClient;
00075 this.executorService = executorService;
00076 retryingExecutorService = new RetryingExecutorService(executorService);
00077 nodeIdentifier = null;
00078 running = false;
00079 if (DEBUG) {
00080 log.info("MasterXmlRpcEndpoint URI: " + masterClient.getRemoteUri());
00081 }
00082 }
00083
00094 public void setRetryDelay(long delay, TimeUnit unit) {
00095 retryingExecutorService.setRetryDelay(delay, unit);
00096 }
00097
00098 private boolean submit(Callable<Boolean> callable) {
00099 if (running) {
00100 retryingExecutorService.submit(callable);
00101 return true;
00102 }
00103 log.warn("Registrar no longer running, request ignored.");
00104 return false;
00105 }
00106
00107 private <T> boolean callMaster(Callable<Response<T>> callable) {
00108 Preconditions.checkNotNull(nodeIdentifier, "Registrar not started.");
00109 boolean success;
00110 try {
00111 Response<T> response = callable.call();
00112 if (DEBUG) {
00113 log.info(response);
00114 }
00115 success = response.isSuccess();
00116 } catch (Exception e) {
00117 if (DEBUG) {
00118 log.error("Exception caught while communicating with master.", e);
00119 } else {
00120 log.error("Exception caught while communicating with master.");
00121 }
00122 success = false;
00123 }
00124 return success;
00125 }
00126
00127 @Override
00128 public void onPublisherAdded(final DefaultPublisher<?> publisher) {
00129 if (DEBUG) {
00130 log.info("Registering publisher: " + publisher);
00131 }
00132 boolean submitted = submit(new Callable<Boolean>() {
00133 @Override
00134 public Boolean call() throws Exception {
00135 boolean success = callMaster(new Callable<Response<List<URI>>>() {
00136 @Override
00137 public Response<List<URI>> call() throws Exception {
00138 return masterClient.registerPublisher(publisher.toDeclaration());
00139 }
00140 });
00141 if (success) {
00142 publisher.signalOnMasterRegistrationSuccess();
00143 } else {
00144 publisher.signalOnMasterRegistrationFailure();
00145 }
00146 return !success;
00147 }
00148 });
00149 if (!submitted) {
00150 executorService.execute(new Runnable() {
00151 @Override
00152 public void run() {
00153 publisher.signalOnMasterRegistrationFailure();
00154 }
00155 });
00156 }
00157 }
00158
00159 @Override
00160 public void onPublisherRemoved(final DefaultPublisher<?> publisher) {
00161 if (DEBUG) {
00162 log.info("Unregistering publisher: " + publisher);
00163 }
00164 boolean submitted = submit(new Callable<Boolean>() {
00165 @Override
00166 public Boolean call() throws Exception {
00167 boolean success = callMaster(new Callable<Response<Integer>>() {
00168 @Override
00169 public Response<Integer> call() throws Exception {
00170 return masterClient.unregisterPublisher(publisher.getIdentifier());
00171 }
00172 });
00173 if (success) {
00174 publisher.signalOnMasterUnregistrationSuccess();
00175 } else {
00176 publisher.signalOnMasterUnregistrationFailure();
00177 }
00178 return !success;
00179 }
00180 });
00181 if (!submitted) {
00182 executorService.execute(new Runnable() {
00183 @Override
00184 public void run() {
00185 publisher.signalOnMasterUnregistrationFailure();
00186 }
00187 });
00188 }
00189 }
00190
00191 @Override
00192 public void onSubscriberAdded(final DefaultSubscriber<?> subscriber) {
00193 if (DEBUG) {
00194 log.info("Registering subscriber: " + subscriber);
00195 }
00196 boolean submitted = submit(new Callable<Boolean>() {
00197 @Override
00198 public Boolean call() throws Exception {
00199 final Holder<Response<List<URI>>> holder = Holder.newEmpty();
00200 boolean success = callMaster(new Callable<Response<List<URI>>>() {
00201 @Override
00202 public Response<List<URI>> call() throws Exception {
00203 return holder.set(masterClient.registerSubscriber(nodeIdentifier, subscriber));
00204 }
00205 });
00206 if (success) {
00207 Collection<PublisherIdentifier> publisherIdentifiers =
00208 PublisherIdentifier.newCollectionFromUris(holder.get().getResult(),
00209 subscriber.getTopicDeclaration());
00210 subscriber.updatePublishers(publisherIdentifiers);
00211 subscriber.signalOnMasterRegistrationSuccess();
00212 } else {
00213 subscriber.signalOnMasterRegistrationFailure();
00214 }
00215 return !success;
00216 }
00217 });
00218 if (!submitted) {
00219 executorService.execute(new Runnable() {
00220 @Override
00221 public void run() {
00222 subscriber.signalOnMasterRegistrationFailure();
00223 }
00224 });
00225 }
00226 }
00227
00228 @Override
00229 public void onSubscriberRemoved(final DefaultSubscriber<?> subscriber) {
00230 if (DEBUG) {
00231 log.info("Unregistering subscriber: " + subscriber);
00232 }
00233 boolean submitted = submit(new Callable<Boolean>() {
00234 @Override
00235 public Boolean call() throws Exception {
00236 boolean success = callMaster(new Callable<Response<Integer>>() {
00237 @Override
00238 public Response<Integer> call() throws Exception {
00239 return masterClient.unregisterSubscriber(nodeIdentifier, subscriber);
00240 }
00241 });
00242 if (success) {
00243 subscriber.signalOnMasterUnregistrationSuccess();
00244 } else {
00245 subscriber.signalOnMasterUnregistrationFailure();
00246 }
00247 return !success;
00248 }
00249 });
00250 if (!submitted) {
00251 executorService.execute(new Runnable() {
00252 @Override
00253 public void run() {
00254 subscriber.signalOnMasterUnregistrationFailure();
00255 }
00256 });
00257 }
00258 }
00259
00260 @Override
00261 public void onServiceServerAdded(final DefaultServiceServer<?, ?> serviceServer) {
00262 if (DEBUG) {
00263 log.info("Registering service: " + serviceServer);
00264 }
00265 boolean submitted = submit(new Callable<Boolean>() {
00266 @Override
00267 public Boolean call() throws Exception {
00268 boolean success = callMaster(new Callable<Response<Void>>() {
00269 @Override
00270 public Response<Void> call() throws Exception {
00271 return masterClient.registerService(nodeIdentifier, serviceServer);
00272 }
00273 });
00274 if (success) {
00275 serviceServer.signalOnMasterRegistrationSuccess();
00276 } else {
00277 serviceServer.signalOnMasterRegistrationFailure();
00278 }
00279 return !success;
00280 }
00281 });
00282 if (!submitted) {
00283 executorService.execute(new Runnable() {
00284 @Override
00285 public void run() {
00286 serviceServer.signalOnMasterRegistrationFailure();
00287 }
00288 });
00289 }
00290 }
00291
00292 @Override
00293 public void onServiceServerRemoved(final DefaultServiceServer<?, ?> serviceServer) {
00294 if (DEBUG) {
00295 log.info("Unregistering service: " + serviceServer);
00296 }
00297 boolean submitted = submit(new Callable<Boolean>() {
00298 @Override
00299 public Boolean call() throws Exception {
00300 boolean success = callMaster(new Callable<Response<Integer>>() {
00301 @Override
00302 public Response<Integer> call() throws Exception {
00303 return masterClient.unregisterService(nodeIdentifier, serviceServer);
00304 }
00305 });
00306 if (success) {
00307 serviceServer.signalOnMasterUnregistrationSuccess();
00308 } else {
00309 serviceServer.signalOnMasterUnregistrationFailure();
00310 }
00311 return !success;
00312 }
00313 });
00314 if (!submitted) {
00315 executorService.execute(new Runnable() {
00316 @Override
00317 public void run() {
00318 serviceServer.signalOnMasterUnregistrationFailure();
00319 }
00320 });
00321 }
00322 }
00323
00332 public void start(NodeIdentifier nodeIdentifier) {
00333 Preconditions.checkNotNull(nodeIdentifier);
00334 Preconditions.checkState(this.nodeIdentifier == null, "Registrar already started.");
00335 this.nodeIdentifier = nodeIdentifier;
00336 running = true;
00337 }
00338
00350 public void shutdown() {
00351 if (!running) {
00352 return;
00353 }
00354 running = false;
00355 try {
00356 retryingExecutorService.shutdown(SHUTDOWN_TIMEOUT, SHUTDOWN_TIMEOUT_UNITS);
00357 } catch (InterruptedException e) {
00358 throw new RosRuntimeException(e);
00359 }
00360 }
00361 }