Go to the documentation of this file.00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 package org.ros.internal.node.client;
00018 
00019 import com.google.common.base.Preconditions;
00020 
00021 import org.apache.commons.logging.Log;
00022 import org.apache.commons.logging.LogFactory;
00023 import org.ros.concurrent.Holder;
00024 import org.ros.concurrent.RetryingExecutorService;
00025 import org.ros.exception.RosRuntimeException;
00026 import org.ros.internal.node.response.Response;
00027 import org.ros.internal.node.server.NodeIdentifier;
00028 import org.ros.internal.node.server.SlaveServer;
00029 import org.ros.internal.node.server.master.MasterServer;
00030 import org.ros.internal.node.service.DefaultServiceServer;
00031 import org.ros.internal.node.service.ServiceManagerListener;
00032 import org.ros.internal.node.topic.DefaultPublisher;
00033 import org.ros.internal.node.topic.DefaultSubscriber;
00034 import org.ros.internal.node.topic.PublisherIdentifier;
00035 import org.ros.internal.node.topic.TopicParticipantManagerListener;
00036 
00037 import java.net.URI;
00038 import java.util.Collection;
00039 import java.util.List;
00040 import java.util.concurrent.Callable;
00041 import java.util.concurrent.ScheduledExecutorService;
00042 import java.util.concurrent.TimeUnit;
00043 
00051 public class Registrar implements TopicParticipantManagerListener, ServiceManagerListener {
00052 
00053   private static final boolean DEBUG = true;
00054   private static final Log log = LogFactory.getLog(Registrar.class);
00055 
00056   private static final int SHUTDOWN_TIMEOUT = 5;
00057   private static final TimeUnit SHUTDOWN_TIMEOUT_UNITS = TimeUnit.SECONDS;
00058 
00059   private final MasterClient masterClient;
00060   private final ScheduledExecutorService executorService;
00061   private final RetryingExecutorService retryingExecutorService;
00062 
00063   private NodeIdentifier nodeIdentifier;
00064   private boolean running;
00065 
00073   public Registrar(MasterClient masterClient, ScheduledExecutorService executorService) {
00074     this.masterClient = masterClient;
00075     this.executorService = executorService;
00076     retryingExecutorService = new RetryingExecutorService(executorService);
00077     nodeIdentifier = null;
00078     running = false;
00079     if (DEBUG) {
00080       log.info("MasterXmlRpcEndpoint URI: " + masterClient.getRemoteUri());
00081     }
00082   }
00083 
00094   public void setRetryDelay(long delay, TimeUnit unit) {
00095     retryingExecutorService.setRetryDelay(delay, unit);
00096   }
00097 
00098   private boolean submit(Callable<Boolean> callable) {
00099     if (running) {
00100       retryingExecutorService.submit(callable);
00101       return true;
00102     }
00103     log.warn("Registrar no longer running, request ignored.");
00104     return false;
00105   }
00106 
00107   private <T> boolean callMaster(Callable<Response<T>> callable) {
00108     Preconditions.checkNotNull(nodeIdentifier, "Registrar not started.");
00109     boolean success;
00110     try {
00111       Response<T> response = callable.call();
00112       if (DEBUG) {
00113         log.info(response);
00114       }
00115       success = response.isSuccess();
00116     } catch (Exception e) {
00117       if (DEBUG) {
00118         log.error("Exception caught while communicating with master.", e);
00119       } else {
00120         log.error("Exception caught while communicating with master.");
00121       }
00122       success = false;
00123     }
00124     return success;
00125   }
00126 
00127   @Override
00128   public void onPublisherAdded(final DefaultPublisher<?> publisher) {
00129     if (DEBUG) {
00130       log.info("Registering publisher: " + publisher);
00131     }
00132     boolean submitted = submit(new Callable<Boolean>() {
00133       @Override
00134       public Boolean call() throws Exception {
00135         boolean success = callMaster(new Callable<Response<List<URI>>>() {
00136           @Override
00137           public Response<List<URI>> call() throws Exception {
00138             return masterClient.registerPublisher(publisher.toDeclaration());
00139           }
00140         });
00141         if (success) {
00142           publisher.signalOnMasterRegistrationSuccess();
00143         } else {
00144           publisher.signalOnMasterRegistrationFailure();
00145         }
00146         return !success;
00147       }
00148     });
00149     if (!submitted) {
00150       executorService.execute(new Runnable() {
00151         @Override
00152         public void run() {
00153           publisher.signalOnMasterRegistrationFailure();
00154         }
00155       });
00156     }
00157   }
00158 
00159   @Override
00160   public void onPublisherRemoved(final DefaultPublisher<?> publisher) {
00161     if (DEBUG) {
00162       log.info("Unregistering publisher: " + publisher);
00163     }
00164     boolean submitted = submit(new Callable<Boolean>() {
00165       @Override
00166       public Boolean call() throws Exception {
00167         boolean success = callMaster(new Callable<Response<Integer>>() {
00168           @Override
00169           public Response<Integer> call() throws Exception {
00170             return masterClient.unregisterPublisher(publisher.getIdentifier());
00171           }
00172         });
00173         if (success) {
00174           publisher.signalOnMasterUnregistrationSuccess();
00175         } else {
00176           publisher.signalOnMasterUnregistrationFailure();
00177         }
00178         return !success;
00179       }
00180     });
00181     if (!submitted) {
00182       executorService.execute(new Runnable() {
00183         @Override
00184         public void run() {
00185           publisher.signalOnMasterUnregistrationFailure();
00186         }
00187       });
00188     }
00189   }
00190 
00191   @Override
00192   public void onSubscriberAdded(final DefaultSubscriber<?> subscriber) {
00193     if (DEBUG) {
00194       log.info("Registering subscriber: " + subscriber);
00195     }
00196     boolean submitted = submit(new Callable<Boolean>() {
00197       @Override
00198       public Boolean call() throws Exception {
00199         final Holder<Response<List<URI>>> holder = Holder.newEmpty();
00200         boolean success = callMaster(new Callable<Response<List<URI>>>() {
00201           @Override
00202           public Response<List<URI>> call() throws Exception {
00203             return holder.set(masterClient.registerSubscriber(nodeIdentifier, subscriber));
00204           }
00205         });
00206         if (success) {
00207           Collection<PublisherIdentifier> publisherIdentifiers =
00208               PublisherIdentifier.newCollectionFromUris(holder.get().getResult(),
00209                   subscriber.getTopicDeclaration());
00210           subscriber.updatePublishers(publisherIdentifiers);
00211           subscriber.signalOnMasterRegistrationSuccess();
00212         } else {
00213           subscriber.signalOnMasterRegistrationFailure();
00214         }
00215         return !success;
00216       }
00217     });
00218     if (!submitted) {
00219       executorService.execute(new Runnable() {
00220         @Override
00221         public void run() {
00222           subscriber.signalOnMasterRegistrationFailure();
00223         }
00224       });
00225     }
00226   }
00227 
00228   @Override
00229   public void onSubscriberRemoved(final DefaultSubscriber<?> subscriber) {
00230     if (DEBUG) {
00231       log.info("Unregistering subscriber: " + subscriber);
00232     }
00233     boolean submitted = submit(new Callable<Boolean>() {
00234       @Override
00235       public Boolean call() throws Exception {
00236         boolean success = callMaster(new Callable<Response<Integer>>() {
00237           @Override
00238           public Response<Integer> call() throws Exception {
00239             return masterClient.unregisterSubscriber(nodeIdentifier, subscriber);
00240           }
00241         });
00242         if (success) {
00243           subscriber.signalOnMasterUnregistrationSuccess();
00244         } else {
00245           subscriber.signalOnMasterUnregistrationFailure();
00246         }
00247         return !success;
00248       }
00249     });
00250     if (!submitted) {
00251       executorService.execute(new Runnable() {
00252         @Override
00253         public void run() {
00254           subscriber.signalOnMasterUnregistrationFailure();
00255         }
00256       });
00257     }
00258   }
00259 
00260   @Override
00261   public void onServiceServerAdded(final DefaultServiceServer<?, ?> serviceServer) {
00262     if (DEBUG) {
00263       log.info("Registering service: " + serviceServer);
00264     }
00265     boolean submitted = submit(new Callable<Boolean>() {
00266       @Override
00267       public Boolean call() throws Exception {
00268         boolean success = callMaster(new Callable<Response<Void>>() {
00269           @Override
00270           public Response<Void> call() throws Exception {
00271             return masterClient.registerService(nodeIdentifier, serviceServer);
00272           }
00273         });
00274         if (success) {
00275           serviceServer.signalOnMasterRegistrationSuccess();
00276         } else {
00277           serviceServer.signalOnMasterRegistrationFailure();
00278         }
00279         return !success;
00280       }
00281     });
00282     if (!submitted) {
00283       executorService.execute(new Runnable() {
00284         @Override
00285         public void run() {
00286           serviceServer.signalOnMasterRegistrationFailure();
00287         }
00288       });
00289     }
00290   }
00291 
00292   @Override
00293   public void onServiceServerRemoved(final DefaultServiceServer<?, ?> serviceServer) {
00294     if (DEBUG) {
00295       log.info("Unregistering service: " + serviceServer);
00296     }
00297     boolean submitted = submit(new Callable<Boolean>() {
00298       @Override
00299       public Boolean call() throws Exception {
00300         boolean success = callMaster(new Callable<Response<Integer>>() {
00301           @Override
00302           public Response<Integer> call() throws Exception {
00303             return masterClient.unregisterService(nodeIdentifier, serviceServer);
00304           }
00305         });
00306         if (success) {
00307           serviceServer.signalOnMasterUnregistrationSuccess();
00308         } else {
00309           serviceServer.signalOnMasterUnregistrationFailure();
00310         }
00311         return !success;
00312       }
00313     });
00314     if (!submitted) {
00315       executorService.execute(new Runnable() {
00316         @Override
00317         public void run() {
00318           serviceServer.signalOnMasterUnregistrationFailure();
00319         }
00320       });
00321     }
00322   }
00323 
00332   public void start(NodeIdentifier nodeIdentifier) {
00333     Preconditions.checkNotNull(nodeIdentifier);
00334     Preconditions.checkState(this.nodeIdentifier == null, "Registrar already started.");
00335     this.nodeIdentifier = nodeIdentifier;
00336     running = true;
00337   }
00338 
00350   public void shutdown() {
00351     if (!running) {
00352       return;
00353     }
00354     running = false;
00355     try {
00356       retryingExecutorService.shutdown(SHUTDOWN_TIMEOUT, SHUTDOWN_TIMEOUT_UNITS);
00357     } catch (InterruptedException e) {
00358       throw new RosRuntimeException(e);
00359     }
00360   }
00361 }