Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029 #include <rosmatlab/subscriber.h>
00030 #include <rosmatlab/exception.h>
00031 #include <rosmatlab/options.h>
00032 #include <rosmatlab/conversion.h>
00033
00034 #include <introspection/message.h>
00035
00036 namespace rosmatlab {
00037
00038 template <> const char *Object<Subscriber>::class_name_ = "ros.Subscriber";
00039 static const ros::WallDuration DEFAULT_TIMEOUT(1e-3);
00040
00041 class SubscriptionCallbackHelper : public ros::SubscriptionCallbackHelper
00042 {
00043 public:
00044 SubscriptionCallbackHelper(Subscriber *subscriber)
00045 : subscriber_(subscriber) {}
00046 virtual ~SubscriptionCallbackHelper() {}
00047
00048 VoidConstPtr deserialize(const ros::SubscriptionCallbackHelperDeserializeParams&);
00049 void call(ros::SubscriptionCallbackHelperCallParams& params);
00050 const std::type_info& getTypeInfo() { return subscriber_->introspection_->getTypeId(); }
00051 bool isConst() { return false; }
00052
00053 private:
00054 Subscriber *subscriber_;
00055 };
00056
00057 Subscriber::Subscriber()
00058 : Object<Subscriber>(this)
00059 {
00060 timeout_ = DEFAULT_TIMEOUT;
00061 node_handle_.setCallbackQueue(&callback_queue_);
00062 }
00063
00064 Subscriber::Subscriber(int nrhs, const mxArray *prhs[])
00065 : Object<Subscriber>(this)
00066 {
00067 timeout_ = DEFAULT_TIMEOUT;
00068 node_handle_.setCallbackQueue(&callback_queue_);
00069
00070 if (nrhs > 0) subscribe(nrhs, prhs);
00071 }
00072
00073 Subscriber::~Subscriber() {
00074 shutdown();
00075 }
00076
00077 bool Subscriber::subscribe(int nrhs, const mxArray *prhs[]) {
00078 if (nrhs < 2) {
00079 throw Exception("Subscriber: subscribe needs at least two arguments");
00080 }
00081
00082 options_ = ros::SubscribeOptions();
00083 for(int i = 0; i < nrhs; i++) {
00084 switch(i) {
00085 case 0:
00086 if (!Options::isString(prhs[i])) throw Exception("Subscriber: need a topic as 1st argument");
00087 options_.topic = Options::getString(prhs[i]);
00088 break;
00089
00090 case 1:
00091 if (!Options::isString(prhs[i])) throw Exception("Subscriber: need a datatype as 2nd argument");
00092 options_.datatype = Options::getString(prhs[i]);
00093 break;
00094
00095 case 2:
00096 if (!Options::isDoubleScalar(prhs[i])) throw Exception("Subscriber: need a queue size as 3rd argument");
00097 options_.queue_size = Options::getDoubleScalar(prhs[i]);
00098 break;
00099
00100 default:
00101 throw Exception("Subscriber: too many arguments");
00102 }
00103 }
00104
00105 introspection_ = cpp_introspection::messageByDataType(options_.datatype);
00106 if (!introspection_) throw Exception("Subscriber: unknown datatype '" + options_.datatype + "'");
00107 options_.md5sum = introspection_->getMD5Sum();
00108 options_.helper.reset(new SubscriptionCallbackHelper(this));
00109
00110 *this = node_handle_.subscribe(options_);
00111 return *this;
00112 }
00113
00114 mxArray *Subscriber::poll(int nlhs, mxArray *plhs[], int nrhs, const mxArray *prhs[])
00115 {
00116 ros::WallDuration timeout = timeout_;
00117 if (nrhs && mxIsDouble(*prhs) && mxGetPr(*prhs)) { timeout.fromSec(*mxGetPr(*prhs++)); nrhs--; }
00118 callback_queue_.callOne(timeout);
00119
00120 last_event_.reset();
00121 if (!new_event_) {
00122 plhs[0] = mxCreateStructMatrix(0,0,0,0);
00123 return plhs[0];
00124 }
00125
00126 plhs[0] = Conversion(introspect(new_event_->getConstMessage())).toMatlab();
00127 last_event_.swap(new_event_);
00128 return plhs[0];
00129 }
00130
00131 mxArray *Subscriber::getConnectionHeader() const
00132 {
00133 mxArray *header = mxCreateStructMatrix(1, 1, 0, 0);
00134 if (!last_event_) return header;
00135
00136 for(ros::M_string::iterator it = last_event_->getConnectionHeader().begin(); it != last_event_->getConnectionHeader().end(); ++it) {
00137 mxAddField(header, it->first.c_str());
00138 mxSetField(header, 0, it->first.c_str(), mxCreateString(it->second.c_str()));
00139 }
00140
00141 return header;
00142 }
00143
00144 mxArray *Subscriber::getReceiptTime() const
00145 {
00146 mxArray *receiptTime = mxCreateDoubleScalar(0);
00147 if (!last_event_) return receiptTime;
00148 *mxGetPr(receiptTime) = last_event_->getReceiptTime().toSec();
00149 return receiptTime;
00150 }
00151
00152 mxArray *Subscriber::getTopic() const
00153 {
00154 return mxCreateString(ros::Subscriber::getTopic().c_str());
00155 }
00156
00157 mxArray *Subscriber::getNumPublishers() const
00158 {
00159 return mxCreateDoubleScalar(ros::Subscriber::getNumPublishers());
00160 }
00161
00162 MessagePtr Subscriber::introspect(const VoidConstPtr& msg) {
00163 if (!introspection_ || !msg) return MessagePtr();
00164 return introspection_->introspect(msg.get());
00165 }
00166
00167 void Subscriber::callback(const ros::MessageEvent<void>& event)
00168 {
00169 if (new_event_) {
00170 ROS_WARN_NAMED("rosmatlab", "missed a %s message on topic %s, polling is too slow...", options_.datatype.c_str(), options_.topic.c_str());
00171 return;
00172 }
00173 new_event_.reset(new MessageEvent(event));
00174 }
00175
00176 VoidConstPtr SubscriptionCallbackHelper::deserialize(const ros::SubscriptionCallbackHelperDeserializeParams& params)
00177 {
00178 ros::serialization::IStream stream(params.buffer, params.length);
00179 VoidPtr msg = subscriber_->introspection_->deserialize(stream);
00180 if (!msg) ROS_WARN_NAMED("rosmatlab", "deserialization of a message of type %s failed", subscriber_->options_.datatype.c_str());
00181
00182
00183 return VoidConstPtr(msg);
00184 }
00185
00186 void SubscriptionCallbackHelper::call(ros::SubscriptionCallbackHelperCallParams& params)
00187 {
00188 ros::MessageEvent<void> event(params.event, boost::bind(&cpp_introspection::Message::createInstance, subscriber_->introspection_));
00189 subscriber_->callback(event);
00190 }
00191
00192 }