00001 /* 00002 * Copyright (C) 2011 Google Inc. 00003 * 00004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not 00005 * use this file except in compliance with the License. You may obtain a copy of 00006 * the License at 00007 * 00008 * http://www.apache.org/licenses/LICENSE-2.0 00009 * 00010 * Unless required by applicable law or agreed to in writing, software 00011 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 00012 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 00013 * License for the specific language governing permissions and limitations under 00014 * the License. 00015 */ 00016 00017 package org.ros.internal.node.topic; 00018 00019 import org.ros.internal.node.server.NodeIdentifier; 00020 import org.ros.message.MessageDeserializer; 00021 import org.ros.namespace.GraphName; 00022 import org.ros.node.topic.DefaultSubscriberListener; 00023 import org.ros.node.topic.Subscriber; 00024 00025 import java.util.concurrent.ScheduledExecutorService; 00026 00032 public class SubscriberFactory { 00033 00034 private final NodeIdentifier nodeIdentifier; 00035 private final TopicParticipantManager topicParticipantManager; 00036 private final ScheduledExecutorService executorService; 00037 private final Object mutex; 00038 00039 public SubscriberFactory(NodeIdentifier nodeIdentifier, 00040 TopicParticipantManager topicParticipantManager, ScheduledExecutorService executorService) { 00041 this.nodeIdentifier = nodeIdentifier; 00042 this.topicParticipantManager = topicParticipantManager; 00043 this.executorService = executorService; 00044 mutex = new Object(); 00045 } 00046 00060 @SuppressWarnings("unchecked") 00061 public <T> Subscriber<T> newOrExisting(TopicDeclaration topicDeclaration, 00062 MessageDeserializer<T> messageDeserializer) { 00063 synchronized (mutex) { 00064 GraphName topicName = topicDeclaration.getName(); 00065 if (topicParticipantManager.hasSubscriber(topicName)) { 00066 return (DefaultSubscriber<T>) topicParticipantManager.getSubscriber(topicName); 00067 } else { 00068 DefaultSubscriber<T> subscriber = 00069 DefaultSubscriber.newDefault(nodeIdentifier, topicDeclaration, executorService, 00070 messageDeserializer); 00071 subscriber.addSubscriberListener(new DefaultSubscriberListener<T>() { 00072 @Override 00073 public void onNewPublisher(Subscriber<T> subscriber, 00074 PublisherIdentifier publisherIdentifier) { 00075 topicParticipantManager.addSubscriberConnection((DefaultSubscriber<T>) subscriber, 00076 publisherIdentifier); 00077 } 00078 00079 @Override 00080 public void onShutdown(Subscriber<T> subscriber) { 00081 topicParticipantManager.removeSubscriber((DefaultSubscriber<T>) subscriber); 00082 } 00083 }); 00084 topicParticipantManager.addSubscriber(subscriber); 00085 return subscriber; 00086 } 00087 } 00088 } 00089 }