Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 #include <rqt_multiplot/MessageSubscriber.h>
00020
00021 #include "rqt_multiplot/CurveDataSequencer.h"
00022
00023 namespace rqt_multiplot {
00024
00025
00026
00027
00028
00029 CurveDataSequencer::CurveDataSequencer(QObject* parent) :
00030 QObject(parent),
00031 config_(0),
00032 broker_(0) {
00033 }
00034
00035 CurveDataSequencer::~CurveDataSequencer() {
00036 }
00037
00038
00039
00040
00041
00042 void CurveDataSequencer::setConfig(CurveConfig* config) {
00043 if (config != config_) {
00044 bool wasSubscribed = isSubscribed();
00045
00046 if (config_) {
00047 disconnect(config_->getAxisConfig(CurveConfig::X),
00048 SIGNAL(changed()), this, SLOT(configAxisConfigChanged()));
00049 disconnect(config_->getAxisConfig(CurveConfig::Y),
00050 SIGNAL(changed()), this, SLOT(configAxisConfigChanged()));
00051 disconnect(config_, SIGNAL(subscriberQueueSizeChanged(size_t)),
00052 this, SLOT(configSubscriberQueueSizeChanged(size_t)));
00053
00054 unsubscribe();
00055 }
00056
00057 config_ = config;
00058
00059 if (config) {
00060 connect(config->getAxisConfig(CurveConfig::X),
00061 SIGNAL(changed()), this, SLOT(configAxisConfigChanged()));
00062 connect(config->getAxisConfig(CurveConfig::Y),
00063 SIGNAL(changed()), this, SLOT(configAxisConfigChanged()));
00064 connect(config, SIGNAL(subscriberQueueSizeChanged(size_t)),
00065 this, SLOT(configSubscriberQueueSizeChanged(size_t)));
00066
00067 if (wasSubscribed)
00068 subscribe();
00069 }
00070 }
00071 }
00072
00073 CurveConfig* CurveDataSequencer::getConfig() const {
00074 return config_;
00075 }
00076
00077 void CurveDataSequencer::setBroker(MessageBroker* broker) {
00078 if (broker != broker_) {
00079 bool wasSubscribed = isSubscribed();
00080
00081 if (broker_)
00082 unsubscribe();
00083
00084 broker_ = broker;
00085
00086 if (broker && wasSubscribed)
00087 subscribe();
00088 }
00089 }
00090
00091 MessageBroker* CurveDataSequencer::getBroker() const {
00092 return broker_;
00093 }
00094
00095 bool CurveDataSequencer::isSubscribed() const {
00096 return !subscribedTopics_.isEmpty();
00097 }
00098
00099
00100
00101
00102
00103 void CurveDataSequencer::subscribe() {
00104 if (isSubscribed())
00105 unsubscribe();
00106
00107 if (config_ && broker_) {
00108 CurveAxisConfig* xAxisConfig = config_->getAxisConfig(CurveConfig::X);
00109 CurveAxisConfig* yAxisConfig = config_->getAxisConfig(CurveConfig::Y);
00110
00111 if (xAxisConfig->getTopic() == yAxisConfig->getTopic()) {
00112 QString topic = xAxisConfig->getTopic();
00113
00114 MessageBroker::PropertyMap properties;
00115 properties[MessageSubscriber::QueueSize] = QVariant::
00116 fromValue<qulonglong>(config_->getSubscriberQueueSize());
00117
00118 if (broker_->subscribe(topic, this,
00119 SLOT(subscriberMessageReceived(const QString&,
00120 const Message&)), properties)) {
00121 subscribedTopics_[CurveConfig::X] = topic;
00122 subscribedTopics_[CurveConfig::Y] = topic;
00123 }
00124 }
00125 else {
00126 QString xTopic = xAxisConfig->getTopic();
00127 QString yTopic = yAxisConfig->getTopic();
00128
00129 MessageBroker::PropertyMap properties;
00130 properties[MessageSubscriber::QueueSize] = QVariant::
00131 fromValue<qulonglong>(config_->getSubscriberQueueSize());
00132
00133 if (broker_->subscribe(xTopic, this,
00134 SLOT(subscriberXAxisMessageReceived(const QString&,
00135 const Message&)), properties))
00136 subscribedTopics_[CurveConfig::X] = xTopic;
00137
00138 if (broker_->subscribe(yTopic, this,
00139 SLOT(subscriberYAxisMessageReceived(const QString&,
00140 const Message&)), properties))
00141 subscribedTopics_[CurveConfig::Y] = yTopic;
00142 }
00143 }
00144
00145 if (!subscribedTopics_.isEmpty())
00146 emit subscribed();
00147 }
00148
00149 void CurveDataSequencer::unsubscribe() {
00150 if (isSubscribed()) {
00151 for (QMap<CurveConfig::Axis, QString>::iterator it = subscribedTopics_.
00152 begin(); it != subscribedTopics_.end(); ++it)
00153 broker_->unsubscribe(it.value(), this);
00154
00155 subscribedTopics_.clear();
00156 timeFields_.clear();
00157 timeValues_.clear();
00158
00159 emit unsubscribed();
00160 }
00161 }
00162
00163 void CurveDataSequencer::processMessage(const Message& message) {
00164 if (!config_)
00165 return;
00166
00167 CurveAxisConfig* xAxisConfig = config_->getAxisConfig(CurveConfig::X);
00168 CurveAxisConfig* yAxisConfig = config_->getAxisConfig(CurveConfig::Y);
00169
00170 QPointF point;
00171
00172 if (xAxisConfig->getFieldType() == CurveAxisConfig::MessageData) {
00173 variant_topic_tools::BuiltinVariant variant = message.getVariant().
00174 getMember(xAxisConfig->getField().toStdString());
00175
00176 point.setX(variant.getNumericValue());
00177 }
00178 else
00179 point.setX(message.getReceiptTime().toSec());
00180
00181 if (yAxisConfig->getFieldType() == CurveAxisConfig::MessageData) {
00182 variant_topic_tools::BuiltinVariant variant = message.getVariant().
00183 getMember(yAxisConfig->getField().toStdString());
00184
00185 point.setY(variant.getNumericValue());
00186 }
00187 else
00188 point.setY(message.getReceiptTime().toSec());
00189
00190 emit pointReceived(point);
00191 }
00192
00193 void CurveDataSequencer::processMessage(CurveConfig::Axis axis, const
00194 Message& message) {
00195 if (!config_)
00196 return;
00197
00198 CurveAxisConfig* axisConfig = config_->getAxisConfig(axis);
00199
00200 if (axisConfig) {
00201 if (!timeFields_.contains(axis)) {
00202 timeFields_[axis] = QString();
00203
00204 if (axisConfig->getFieldType() == CurveAxisConfig::MessageData) {
00205 QStringList fieldParts = axisConfig->getField().split("/");
00206
00207 while (!fieldParts.isEmpty()) {
00208 fieldParts.removeLast();
00209
00210 QString parentField = fieldParts.join("/");
00211 variant_topic_tools::MessageVariant variant;
00212
00213 if (!parentField.isEmpty())
00214 variant = message.getVariant().getMember(fieldParts.join("/").
00215 toStdString());
00216 else
00217 variant = message.getVariant();
00218
00219 variant_topic_tools::MessageDataType type = variant.getType();
00220
00221 if (type.hasHeader()) {
00222 timeFields_[axis] = parentField+"/header/stamp";
00223 break;
00224 }
00225 }
00226 }
00227 }
00228
00229 TimeValue timeValue;
00230
00231 if (!timeFields_[axis].isEmpty()) {
00232 variant_topic_tools::BuiltinVariant variant = message.getVariant().
00233 getMember(timeFields_[axis].toStdString());
00234
00235 timeValue.time_ = variant.getValue<ros::Time>();
00236 }
00237 else
00238 timeValue.time_ = message.getReceiptTime();
00239
00240 if (axisConfig->getFieldType() == CurveAxisConfig::MessageData) {
00241 variant_topic_tools::BuiltinVariant variant = message.getVariant().
00242 getMember(axisConfig->getField().toStdString());
00243
00244 timeValue.value_ = variant.getNumericValue();
00245 }
00246 else
00247 timeValue.value_ = message.getReceiptTime().toSec();
00248
00249 if (timeValues_[axis].isEmpty() ||
00250 (timeValue.time_ > timeValues_[axis].last().time_))
00251 timeValues_[axis].append(timeValue);
00252 }
00253
00254 interpolate();
00255 }
00256
00257 void CurveDataSequencer::interpolate() {
00258 TimeValueList& timeValuesX = timeValues_[CurveConfig::X];
00259 TimeValueList& timeValuesY = timeValues_[CurveConfig::Y];
00260
00261 while ((timeValuesX.count() > 1) && (timeValuesY.count() > 1)) {
00262 while ((timeValuesX.count() > 1) &&
00263 ((++timeValuesX.begin())->time_ < timeValuesY.front().time_))
00264 timeValuesX.removeFirst();
00265
00266 while ((timeValuesY.count() > 1) &&
00267 ((++timeValuesY.begin())->time_ < timeValuesX.front().time_))
00268 timeValuesY.removeFirst();
00269
00270 if ((timeValuesY.front().time_ >= timeValuesX.front().time_) &&
00271 (timeValuesX.count() > 1)) {
00272 QPointF point;
00273
00274 const TimeValue& firstX = timeValuesX.first();
00275 const TimeValue& secondX = *(++timeValuesX.begin());
00276
00277 point.setX(firstX.value_+(secondX.value_-firstX.value_)*
00278 (timeValuesY.front().time_-firstX.time_).toSec()/
00279 (secondX.time_-firstX.time_).toSec());
00280 point.setY(timeValuesY.front().value_);
00281
00282 timeValuesY.removeFirst();
00283
00284 emit pointReceived(point);
00285 }
00286 else if ((timeValuesX.front().time_ >= timeValuesY.front().time_) &&
00287 (timeValuesY.count() > 1)) {
00288 QPointF point;
00289
00290 const TimeValue& firstY = timeValuesY.first();
00291 const TimeValue& secondY = *(++timeValuesY.begin());
00292
00293 point.setX(timeValuesX.front().value_);
00294 point.setY(firstY.value_+(secondY.value_-firstY.value_)*
00295 (timeValuesX.front().time_-firstY.time_).toSec()/
00296 (secondY.time_-firstY.time_).toSec());
00297
00298 timeValuesX.removeFirst();
00299
00300 emit pointReceived(point);
00301 }
00302 }
00303 }
00304
00305
00306
00307
00308
00309 void CurveDataSequencer::configAxisConfigChanged() {
00310 if (isSubscribed()) {
00311 unsubscribe();
00312 subscribe();
00313 }
00314 }
00315
00316 void CurveDataSequencer::configSubscriberQueueSizeChanged(size_t queueSize) {
00317 if (isSubscribed()) {
00318 unsubscribe();
00319 subscribe();
00320 }
00321 }
00322
00323 void CurveDataSequencer::subscriberMessageReceived(const QString& topic,
00324 const Message& message) {
00325 processMessage(message);
00326 }
00327
00328 void CurveDataSequencer::subscriberXAxisMessageReceived(const QString& topic,
00329 const Message& message) {
00330 processMessage(CurveConfig::X, message);
00331 }
00332
00333 void CurveDataSequencer::subscriberYAxisMessageReceived(const QString& topic,
00334 const Message& message) {
00335 processMessage(CurveConfig::Y, message);
00336 }
00337
00338 }