subscriber.cpp
Go to the documentation of this file.
00001 //=================================================================================================
00002 // Copyright (c) 2012, Johannes Meyer, TU Darmstadt
00003 // All rights reserved.
00004 
00005 // Redistribution and use in source and binary forms, with or without
00006 // modification, are permitted provided that the following conditions are met:
00007 //     * Redistributions of source code must retain the above copyright
00008 //       notice, this list of conditions and the following disclaimer.
00009 //     * Redistributions in binary form must reproduce the above copyright
00010 //       notice, this list of conditions and the following disclaimer in the
00011 //       documentation and/or other materials provided with the distribution.
00012 //     * Neither the name of the Flight Systems and Automatic Control group,
00013 //       TU Darmstadt, nor the names of its contributors may be used to
00014 //       endorse or promote products derived from this software without
00015 //       specific prior written permission.
00016 
00017 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
00018 // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
00019 // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
00020 // DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER BE LIABLE FOR ANY
00021 // DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
00022 // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00023 // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
00024 // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00025 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00026 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00027 //=================================================================================================
00028 
00029 #include <rosmatlab/subscriber.h>
00030 #include <rosmatlab/exception.h>
00031 #include <rosmatlab/options.h>
00032 #include <rosmatlab/conversion.h>
00033 #include <rosmatlab/log.h>
00034 #include <rosmatlab/connection_header.h>
00035 
00036 #include <introspection/message.h>
00037 
00038 namespace rosmatlab {
00039 
00040 template <> const char *Object<Subscriber>::class_name_ = "ros.Subscriber";
00041 static const ros::WallDuration DEFAULT_TIMEOUT(1e-3);
00042 
00043 class SubscriptionCallbackHelper : public ros::SubscriptionCallbackHelper
00044 {
00045 public:
00046   SubscriptionCallbackHelper(Subscriber *subscriber)
00047     : subscriber_(subscriber) {}
00048   virtual ~SubscriptionCallbackHelper() {}
00049 
00050   VoidConstPtr deserialize(const ros::SubscriptionCallbackHelperDeserializeParams&);
00051   void call(ros::SubscriptionCallbackHelperCallParams& params);
00052   const std::type_info& getTypeInfo() { return subscriber_->introspection_->getTypeId(); }
00053   bool isConst() { return false; }
00054 
00055 private:
00056   Subscriber *subscriber_;
00057 };
00058 
00059 Subscriber::Subscriber()
00060   : Object<Subscriber>(this)
00061 {
00062   timeout_ = DEFAULT_TIMEOUT;
00063   node_handle_.setCallbackQueue(&callback_queue_);
00064 }
00065 
00066 Subscriber::Subscriber(int nrhs, const mxArray *prhs[])
00067   : Object<Subscriber>(this)
00068 {
00069   timeout_ = DEFAULT_TIMEOUT;
00070   node_handle_.setCallbackQueue(&callback_queue_);
00071 
00072   if (nrhs > 0) subscribe(nrhs, prhs);
00073 }
00074 
00075 Subscriber::~Subscriber() {
00076   shutdown();
00077 }
00078 
00079 mxArray *Subscriber::subscribe(int nrhs, const mxArray *prhs[]) {
00080   if (nrhs < 2) {
00081     throw ArgumentException("Subscriber.subscribe", 2);
00082   }
00083 
00084   options_ = ros::SubscribeOptions();
00085   for(int i = 0; i < nrhs; i++) {
00086     switch(i) {
00087       case 0:
00088         if (!Options::isString(prhs[i])) throw Exception("Subscriber.subscribe", "need a topic as 1st argument");
00089         options_.topic = Options::getString(prhs[i]);
00090         break;
00091 
00092       case 1:
00093         if (!Options::isString(prhs[i])) throw Exception("Subscriber.subscribe", "need a datatype as 2nd argument");
00094         options_.datatype = Options::getString(prhs[i]);
00095         break;
00096 
00097       case 2:
00098         if (!Options::isDoubleScalar(prhs[i])) throw Exception("Subscriber.subscribe", "need a queue size as 3rd argument");
00099         options_.queue_size = Options::getDoubleScalar(prhs[i]);
00100         break;
00101 
00102       default:
00103         throw ArgumentException("Subscriber.subscribe", "too many arguments");
00104     }
00105   }
00106 
00107   introspection_ = cpp_introspection::messageByDataType(options_.datatype);
00108   if (!introspection_) throw Exception("Subscriber.subscribe", "unknown datatype '" + options_.datatype + "'");
00109   options_.md5sum = introspection_->getMD5Sum();
00110   options_.helper.reset(new SubscriptionCallbackHelper(this));
00111 
00112   *this = node_handle_.subscribe(options_);
00113   return mxCreateLogicalScalar(*this);
00114 }
00115 
00116 mxArray *Subscriber::poll(int nlhs, mxArray *plhs[], int nrhs, const mxArray *prhs[])
00117 {
00118   ros::WallDuration timeout = timeout_;
00119   if (nrhs && mxIsDouble(*prhs) && mxGetPr(*prhs)) { timeout.fromSec(*mxGetPr(*prhs++)); nrhs--; }
00120   callback_queue_.callOne(timeout);
00121 
00122   last_event_.reset();
00123   if (!new_event_) {
00124     plhs[0] = mxCreateStructMatrix(0,0,0,0);
00125     return plhs[0];
00126   }
00127 
00128   plhs[0] = Conversion(introspect(new_event_->getConstMessage())).toMatlab();
00129   last_event_.swap(new_event_);
00130 
00131   if (nlhs > 1) plhs[1] = getConnectionHeader();
00132   if (nlhs > 2) plhs[2] = getReceiptTime();
00133   return plhs[0];
00134 }
00135 
00136 mxArray *Subscriber::getConnectionHeader() const
00137 {
00138   if (!last_event_) return mxCreateStructMatrix(0, 0, 0, 0);
00139   return ConnectionHeader(last_event_->getConnectionHeaderPtr()).toMatlab();
00140 }
00141 
00142 mxArray *Subscriber::getReceiptTime() const
00143 {
00144   mxArray *receiptTime = mxCreateDoubleScalar(0);
00145   if (!last_event_) return receiptTime;
00146   *mxGetPr(receiptTime) = last_event_->getReceiptTime().toSec();
00147   return receiptTime;
00148 }
00149 
00150 mxArray *Subscriber::getTopic() const
00151 {
00152   return mxCreateString(ros::Subscriber::getTopic().c_str());
00153 }
00154 
00155 mxArray *Subscriber::getDataType() const
00156 {
00157   if (!introspection_) return mxCreateEmpty();
00158   return mxCreateString(introspection_->getDataType());
00159 }
00160 
00161 mxArray *Subscriber::getMD5Sum() const
00162 {
00163   if (!introspection_) return mxCreateEmpty();
00164   return mxCreateString(introspection_->getMD5Sum());
00165 }
00166 
00167 mxArray *Subscriber::getNumPublishers() const
00168 {
00169   return mxCreateDoubleScalar(ros::Subscriber::getNumPublishers());
00170 }
00171 
00172 MessagePtr Subscriber::introspect(const VoidConstPtr& msg) {
00173   if (!introspection_ || !msg) return MessagePtr();
00174   return introspection_->introspect(msg.get());
00175 }
00176 
00177 void Subscriber::callback(const MessageEvent& event)
00178 {
00179   if (new_event_) {
00180     ROSMATLAB_WARN("missed a %s message on topic %s, polling is too slow...", options_.datatype.c_str(), options_.topic.c_str());
00181     return;
00182   }
00183   new_event_.reset(new MessageEvent(event));
00184 }
00185 
00186 VoidConstPtr SubscriptionCallbackHelper::deserialize(const ros::SubscriptionCallbackHelperDeserializeParams& params)
00187 {
00188   ros::serialization::IStream stream(params.buffer, params.length);
00189   VoidPtr msg = subscriber_->introspection_->deserialize(stream);
00190   if (!msg) ROSMATLAB_WARN("deserialization of a message of type %s failed", subscriber_->options_.datatype.c_str());
00191 
00192   return VoidConstPtr(msg);
00193 }
00194 
00195 void SubscriptionCallbackHelper::call(ros::SubscriptionCallbackHelperCallParams& params)
00196 {
00197   Subscriber::MessageEvent event(params.event, boost::bind(&cpp_introspection::Message::createInstance, subscriber_->introspection_));
00198   subscriber_->callback(event);
00199 }
00200 
00201 } // namespace rosmatlab


rosmatlab
Author(s): Johannes Meyer
autogenerated on Fri Jul 25 2014 06:08:36