$search
00001 //================================================================================================= 00002 // Copyright (c) 2012, Johannes Meyer, TU Darmstadt 00003 // All rights reserved. 00004 00005 // Redistribution and use in source and binary forms, with or without 00006 // modification, are permitted provided that the following conditions are met: 00007 // * Redistributions of source code must retain the above copyright 00008 // notice, this list of conditions and the following disclaimer. 00009 // * Redistributions in binary form must reproduce the above copyright 00010 // notice, this list of conditions and the following disclaimer in the 00011 // documentation and/or other materials provided with the distribution. 00012 // * Neither the name of the Flight Systems and Automatic Control group, 00013 // TU Darmstadt, nor the names of its contributors may be used to 00014 // endorse or promote products derived from this software without 00015 // specific prior written permission. 00016 00017 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 00018 // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 00019 // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 00020 // DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER BE LIABLE FOR ANY 00021 // DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 00022 // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00023 // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND 00024 // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 00025 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 00026 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00027 //================================================================================================= 00028 00029 #include <rosmatlab/subscriber.h> 00030 #include <rosmatlab/exception.h> 00031 #include <rosmatlab/options.h> 00032 #include <rosmatlab/conversion.h> 00033 00034 #include <introspection/message.h> 00035 00036 namespace rosmatlab { 00037 00038 template <> const char *Object<Subscriber>::class_name_ = "ros.Subscriber"; 00039 static const ros::WallDuration DEFAULT_TIMEOUT(1e-3); 00040 00041 class SubscriptionCallbackHelper : public ros::SubscriptionCallbackHelper 00042 { 00043 public: 00044 SubscriptionCallbackHelper(Subscriber *subscriber) 00045 : subscriber_(subscriber) {} 00046 virtual ~SubscriptionCallbackHelper() {} 00047 00048 VoidConstPtr deserialize(const ros::SubscriptionCallbackHelperDeserializeParams&); 00049 void call(ros::SubscriptionCallbackHelperCallParams& params); 00050 const std::type_info& getTypeInfo() { return subscriber_->introspection_->getTypeId(); } 00051 bool isConst() { return false; } 00052 00053 private: 00054 Subscriber *subscriber_; 00055 }; 00056 00057 Subscriber::Subscriber() 00058 : Object<Subscriber>(this) 00059 { 00060 timeout_ = DEFAULT_TIMEOUT; 00061 node_handle_.setCallbackQueue(&callback_queue_); 00062 } 00063 00064 Subscriber::Subscriber(int nrhs, const mxArray *prhs[]) 00065 : Object<Subscriber>(this) 00066 { 00067 timeout_ = DEFAULT_TIMEOUT; 00068 node_handle_.setCallbackQueue(&callback_queue_); 00069 00070 if (nrhs > 0) subscribe(nrhs, prhs); 00071 } 00072 00073 Subscriber::~Subscriber() { 00074 shutdown(); 00075 } 00076 00077 bool Subscriber::subscribe(int nrhs, const mxArray *prhs[]) { 00078 if (nrhs < 2) { 00079 throw Exception("Subscriber: subscribe needs at least two arguments"); 00080 } 00081 00082 options_ = ros::SubscribeOptions(); 00083 for(int i = 0; i < nrhs; i++) { 00084 switch(i) { 00085 case 0: 00086 if (!Options::isString(prhs[i])) throw Exception("Subscriber: need a topic as 1st argument"); 00087 options_.topic = Options::getString(prhs[i]); 00088 break; 00089 00090 case 1: 00091 if (!Options::isString(prhs[i])) throw Exception("Subscriber: need a datatype as 2nd argument"); 00092 options_.datatype = Options::getString(prhs[i]); 00093 break; 00094 00095 case 2: 00096 if (!Options::isDoubleScalar(prhs[i])) throw Exception("Subscriber: need a queue size as 3rd argument"); 00097 options_.queue_size = Options::getDoubleScalar(prhs[i]); 00098 break; 00099 00100 default: 00101 throw Exception("Subscriber: too many arguments"); 00102 } 00103 } 00104 00105 introspection_ = cpp_introspection::messageByDataType(options_.datatype); 00106 if (!introspection_) throw Exception("Subscriber: unknown datatype '" + options_.datatype + "'"); 00107 options_.md5sum = introspection_->getMD5Sum(); 00108 options_.helper.reset(new SubscriptionCallbackHelper(this)); 00109 00110 *this = node_handle_.subscribe(options_); 00111 return *this; 00112 } 00113 00114 mxArray *Subscriber::poll(int nlhs, mxArray *plhs[], int nrhs, const mxArray *prhs[]) 00115 { 00116 ros::WallDuration timeout = timeout_; 00117 if (nrhs && mxIsDouble(*prhs) && mxGetPr(*prhs)) { timeout.fromSec(*mxGetPr(*prhs++)); nrhs--; } 00118 callback_queue_.callOne(timeout); 00119 00120 last_event_.reset(); 00121 if (!new_event_) { 00122 plhs[0] = mxCreateStructMatrix(0,0,0,0); 00123 return plhs[0]; 00124 } 00125 00126 plhs[0] = Conversion(introspect(new_event_->getConstMessage())).toMatlab(); 00127 last_event_.swap(new_event_); 00128 return plhs[0]; 00129 } 00130 00131 mxArray *Subscriber::getConnectionHeader() const 00132 { 00133 mxArray *header = mxCreateStructMatrix(1, 1, 0, 0); 00134 if (!last_event_) return header; 00135 00136 for(ros::M_string::iterator it = last_event_->getConnectionHeader().begin(); it != last_event_->getConnectionHeader().end(); ++it) { 00137 mxAddField(header, it->first.c_str()); 00138 mxSetField(header, 0, it->first.c_str(), mxCreateString(it->second.c_str())); 00139 } 00140 00141 return header; 00142 } 00143 00144 mxArray *Subscriber::getReceiptTime() const 00145 { 00146 mxArray *receiptTime = mxCreateDoubleScalar(0); 00147 if (!last_event_) return receiptTime; 00148 *mxGetPr(receiptTime) = last_event_->getReceiptTime().toSec(); 00149 return receiptTime; 00150 } 00151 00152 mxArray *Subscriber::getTopic() const 00153 { 00154 return mxCreateString(ros::Subscriber::getTopic().c_str()); 00155 } 00156 00157 mxArray *Subscriber::getNumPublishers() const 00158 { 00159 return mxCreateDoubleScalar(ros::Subscriber::getNumPublishers()); 00160 } 00161 00162 MessagePtr Subscriber::introspect(const VoidConstPtr& msg) { 00163 if (!introspection_ || !msg) return MessagePtr(); 00164 return introspection_->introspect(msg.get()); 00165 } 00166 00167 void Subscriber::callback(const ros::MessageEvent<void>& event) 00168 { 00169 if (new_event_) { 00170 ROS_WARN_NAMED("rosmatlab", "missed a %s message on topic %s, polling is too slow...", options_.datatype.c_str(), options_.topic.c_str()); 00171 return; 00172 } 00173 new_event_.reset(new MessageEvent(event)); 00174 } 00175 00176 VoidConstPtr SubscriptionCallbackHelper::deserialize(const ros::SubscriptionCallbackHelperDeserializeParams& params) 00177 { 00178 ros::serialization::IStream stream(params.buffer, params.length); 00179 VoidPtr msg = subscriber_->introspection_->deserialize(stream); 00180 if (!msg) ROS_WARN_NAMED("rosmatlab", "deserialization of a message of type %s failed", subscriber_->options_.datatype.c_str()); 00181 00182 // TODO: setConnectionHeader 00183 return VoidConstPtr(msg); 00184 } 00185 00186 void SubscriptionCallbackHelper::call(ros::SubscriptionCallbackHelperCallParams& params) 00187 { 00188 ros::MessageEvent<void> event(params.event, boost::bind(&cpp_introspection::Message::createInstance, subscriber_->introspection_)); 00189 subscriber_->callback(event); 00190 } 00191 00192 } // namespace rosmatlab