00001 template<typename Msg> 00002 void TypedSubscriber<Msg>::start(const std::string& _topic){ 00003 unique_recursive_lock lock(generalMutex); 00004 if (running && (topic==_topic)) return; 00005 if (running) stop(); 00006 topic=_topic; 00007 sub= node.subscribe(topic, 1000, &Self::msgCallback,this); 00008 running=true; 00009 } 00010 00011 template<typename Msg> 00012 void TypedSubscriber<Msg>::stop(){ 00013 unique_recursive_lock lock(generalMutex); 00014 if (!running) return; 00015 running=false; 00016 sub.shutdown(); 00017 } 00018 00019 template<typename Msg> 00020 void TypedSubscriber<Msg>::setActive(bool flag) 00021 { 00022 unique_recursive_lock lock(generalMutex); 00023 if (subscriberActive == flag) return; 00024 if (flag && !running) 00025 { 00026 ROS_ERROR("Cannot activate TypedSubscriber if it is not running"); 00027 return; 00028 } 00029 subscriberActive = flag; 00030 } 00031 00032 template<typename Msg> 00033 bool TypedSubscriber<Msg>::isActive() const 00034 { 00035 unique_recursive_lock lock(generalMutex); 00036 return subscriberActive; 00037 } 00038 00039 template<typename Msg> 00040 bool TypedSubscriber<Msg>::isRunning() const 00041 { 00042 unique_recursive_lock lock(generalMutex); 00043 return running; 00044 } 00045 00046 template<typename Msg> 00047 bool TypedSubscriber<Msg>::getLastMessage(Msg& msg) const 00048 { 00049 unique_recursive_lock lock(generalMutex); 00050 if (getLastUpdateTime() < 1e-03) return false; 00051 msg=lastArrivedMessage; 00052 return true; 00053 } 00054 00055 template<typename Msg> 00056 bool TypedSubscriber<Msg>::waitForNextMessage(Msg& msg, float timeout, float wait_step) const 00057 { 00058 if (!isActive()) 00059 { 00060 ROS_ERROR("Called TypedSubscriber<Msg>::waitForNextMessage() without calling TypedSubscriber<Msg>::setActive(true) first"); 00061 return false; 00062 } 00063 00064 ros::Time start_time = ros::Time::now(); 00065 float time_waited=0; 00066 00067 bool msgArrived=false; 00068 while (!msgArrived) 00069 { 00070 // ROS_INFO("TypedSubscriber: Waiting..."); 00071 { 00072 unique_lock lock(messageArrivedMutex); 00073 // Unlocks the mutex and waits for a notification. 00074 // msgArrived=this->messageArrivedCondition.timed_wait(lock, baselib_binding::get_duration_secs(wait_step)); 00075 msgArrived = COND_WAIT(this->messageArrivedCondition, lock, wait_step); 00076 } 00077 if (!msgArrived) 00078 { // it is still possible that a message arrived in-between the timed_wait calls, but we never 00079 // got the timing right for the condition trigger, especially if we use a low wait_step. 00080 // So do a separate check here. 00081 ros::Time last_upd = getLastUpdateTime(); 00082 if (last_upd > start_time) msgArrived = true; 00083 } 00084 if (msgArrived) 00085 { 00086 // ROS_INFO("Message arrived!"); 00087 unique_recursive_lock lock(generalMutex); 00088 msg=lastArrivedMessage; 00089 break; 00090 } 00091 // spin once so that msgCallback() may still be called by the subscriber 00092 // in case the node is not running in multi-threaded mode. 00093 ros::spinOnce(); 00094 ros::Time curr_time = ros::Time::now(); 00095 time_waited = (curr_time - start_time).toSec(); 00096 if ((timeout > 0) && (time_waited > timeout)) 00097 { // timeout reached 00098 // ROS_INFO("Timeout reached"); 00099 return false; 00100 } 00101 } 00102 return msgArrived; 00103 } 00104 00105 template<typename Msg> 00106 ros::Time TypedSubscriber<Msg>::getLastUpdateTime() const 00107 { 00108 unique_recursive_lock glock(generalMutex); 00109 return lastUpdateTime; 00110 } 00111 00112 template<typename Msg> 00113 void TypedSubscriber<Msg>::msgCallback(const Msg& _msg) 00114 { 00115 //ROS_INFO_STREAM("TYPED CALLBACK "<<_msg); 00116 unique_lock mlock(messageArrivedMutex); 00117 lastArrivedMessage=_msg; 00118 messageArrivedCondition.notify_all(); 00119 unique_recursive_lock glock(generalMutex); 00120 lastUpdateTime = ros::Time::now(); 00121 } 00122