CurveDataSequencer.cpp
Go to the documentation of this file.
00001 /******************************************************************************
00002  * Copyright (C) 2015 by Ralf Kaestner                                        *
00003  * ralf.kaestner@gmail.com                                                    *
00004  *                                                                            *
00005  * This program is free software; you can redistribute it and/or modify       *
00006  * it under the terms of the Lesser GNU General Public License as published by*
00007  * the Free Software Foundation; either version 3 of the License, or          *
00008  * (at your option) any later version.                                        *
00009  *                                                                            *
00010  * This program is distributed in the hope that it will be useful,            *
00011  * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
00012  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the               *
00013  * Lesser GNU General Public License for more details.                        *
00014  *                                                                            *
00015  * You should have received a copy of the Lesser GNU General Public License   *
00016  * along with this program. If not, see <http://www.gnu.org/licenses/>.       *
00017  ******************************************************************************/
00018 
00019 #include <rqt_multiplot/MessageSubscriber.h>
00020 
00021 #include "rqt_multiplot/CurveDataSequencer.h"
00022 
00023 namespace rqt_multiplot {
00024 
00025 /*****************************************************************************/
00026 /* Constructors and Destructor                                               */
00027 /*****************************************************************************/
00028 
00029 CurveDataSequencer::CurveDataSequencer(QObject* parent) :
00030   QObject(parent),
00031   config_(0),
00032   broker_(0) {
00033 }
00034 
00035 CurveDataSequencer::~CurveDataSequencer() {
00036 }
00037 
00038 /*****************************************************************************/
00039 /* Accessors                                                                 */
00040 /*****************************************************************************/
00041 
00042 void CurveDataSequencer::setConfig(CurveConfig* config) {
00043   if (config != config_) {
00044     bool wasSubscribed = isSubscribed();
00045     
00046     if (config_) {
00047       disconnect(config_->getAxisConfig(CurveConfig::X),
00048         SIGNAL(changed()), this, SLOT(configAxisConfigChanged()));
00049       disconnect(config_->getAxisConfig(CurveConfig::Y),
00050         SIGNAL(changed()), this, SLOT(configAxisConfigChanged()));
00051       disconnect(config_, SIGNAL(subscriberQueueSizeChanged(size_t)),
00052         this, SLOT(configSubscriberQueueSizeChanged(size_t)));
00053       
00054       unsubscribe();
00055     }
00056     
00057     config_ = config;
00058     
00059     if (config) {
00060       connect(config->getAxisConfig(CurveConfig::X),
00061         SIGNAL(changed()), this, SLOT(configAxisConfigChanged()));
00062       connect(config->getAxisConfig(CurveConfig::Y),
00063         SIGNAL(changed()), this, SLOT(configAxisConfigChanged()));
00064       connect(config, SIGNAL(subscriberQueueSizeChanged(size_t)),
00065         this, SLOT(configSubscriberQueueSizeChanged(size_t)));
00066       
00067       if (wasSubscribed)
00068         subscribe();
00069     }
00070   }
00071 }
00072 
00073 CurveConfig* CurveDataSequencer::getConfig() const {
00074   return config_;
00075 }
00076 
00077 void CurveDataSequencer::setBroker(MessageBroker* broker) {
00078   if (broker != broker_) {
00079     bool wasSubscribed = isSubscribed();
00080     
00081     if (broker_)
00082       unsubscribe();
00083     
00084     broker_ = broker;
00085     
00086     if (broker && wasSubscribed)
00087       subscribe();
00088   }
00089 }
00090 
00091 MessageBroker* CurveDataSequencer::getBroker() const {
00092   return broker_;
00093 }
00094 
00095 bool CurveDataSequencer::isSubscribed() const {
00096   return !subscribedTopics_.isEmpty();
00097 }
00098     
00099 /*****************************************************************************/
00100 /* Methods                                                                   */
00101 /*****************************************************************************/
00102 
00103 void CurveDataSequencer::subscribe() {
00104   if (isSubscribed())
00105     unsubscribe();
00106   
00107   if (config_ && broker_) {
00108     CurveAxisConfig* xAxisConfig = config_->getAxisConfig(CurveConfig::X);
00109     CurveAxisConfig* yAxisConfig = config_->getAxisConfig(CurveConfig::Y);
00110     
00111     if (xAxisConfig->getTopic() == yAxisConfig->getTopic()) {
00112       QString topic = xAxisConfig->getTopic();
00113       
00114       MessageBroker::PropertyMap properties;
00115       properties[MessageSubscriber::QueueSize] = QVariant::
00116         fromValue<qulonglong>(config_->getSubscriberQueueSize());
00117       
00118       if (broker_->subscribe(topic, this,
00119           SLOT(subscriberMessageReceived(const QString&,
00120           const Message&)), properties)) {
00121         subscribedTopics_[CurveConfig::X] = topic;
00122         subscribedTopics_[CurveConfig::Y] = topic;
00123       }
00124     }
00125     else {
00126       QString xTopic = xAxisConfig->getTopic();
00127       QString yTopic = yAxisConfig->getTopic();
00128       
00129       MessageBroker::PropertyMap properties;
00130       properties[MessageSubscriber::QueueSize] = QVariant::
00131         fromValue<qulonglong>(config_->getSubscriberQueueSize());
00132       
00133       if (broker_->subscribe(xTopic, this,
00134           SLOT(subscriberXAxisMessageReceived(const QString&,
00135           const Message&)), properties))
00136         subscribedTopics_[CurveConfig::X] = xTopic;
00137       
00138       if (broker_->subscribe(yTopic, this,
00139           SLOT(subscriberYAxisMessageReceived(const QString&,
00140           const Message&)), properties))
00141         subscribedTopics_[CurveConfig::Y] = yTopic;
00142     }
00143   }
00144   
00145   if (!subscribedTopics_.isEmpty())
00146     emit subscribed();
00147 }
00148 
00149 void CurveDataSequencer::unsubscribe() {
00150   if (isSubscribed()) {
00151     for (QMap<CurveConfig::Axis, QString>::iterator it = subscribedTopics_.
00152         begin(); it != subscribedTopics_.end(); ++it)
00153       broker_->unsubscribe(it.value(), this);
00154       
00155     subscribedTopics_.clear();
00156     timeFields_.clear();
00157     timeValues_.clear();
00158     
00159     emit unsubscribed();
00160   }
00161 }
00162 
00163 void CurveDataSequencer::processMessage(const Message& message) {
00164   if (!config_)
00165     return;
00166   
00167   CurveAxisConfig* xAxisConfig = config_->getAxisConfig(CurveConfig::X);
00168   CurveAxisConfig* yAxisConfig = config_->getAxisConfig(CurveConfig::Y);
00169   
00170   QPointF point;
00171   
00172   if (xAxisConfig->getFieldType() == CurveAxisConfig::MessageData) {
00173     variant_topic_tools::BuiltinVariant variant = message.getVariant().
00174       getMember(xAxisConfig->getField().toStdString());
00175       
00176     point.setX(variant.getNumericValue());
00177   }
00178   else
00179     point.setX(message.getReceiptTime().toSec());
00180   
00181   if (yAxisConfig->getFieldType() == CurveAxisConfig::MessageData) {
00182     variant_topic_tools::BuiltinVariant variant = message.getVariant().
00183       getMember(yAxisConfig->getField().toStdString());
00184       
00185     point.setY(variant.getNumericValue());
00186   }
00187   else
00188     point.setY(message.getReceiptTime().toSec());
00189   
00190   emit pointReceived(point);
00191 }
00192 
00193 void CurveDataSequencer::processMessage(CurveConfig::Axis axis, const
00194     Message& message) {
00195   if (!config_)
00196     return;
00197   
00198   CurveAxisConfig* axisConfig = config_->getAxisConfig(axis);
00199   
00200   if (axisConfig) {
00201     if (!timeFields_.contains(axis)) {
00202       timeFields_[axis] = QString();
00203       
00204       if (axisConfig->getFieldType() == CurveAxisConfig::MessageData) {
00205         QStringList fieldParts = axisConfig->getField().split("/");
00206         
00207         while (!fieldParts.isEmpty()) {
00208           fieldParts.removeLast();
00209           
00210           QString parentField = fieldParts.join("/");
00211           variant_topic_tools::MessageVariant variant;
00212           
00213           if (!parentField.isEmpty())
00214             variant = message.getVariant().getMember(fieldParts.join("/").
00215               toStdString());
00216           else
00217             variant = message.getVariant();
00218             
00219           variant_topic_tools::MessageDataType type = variant.getType();
00220             
00221           if (type.hasHeader()) {
00222             timeFields_[axis] = parentField+"/header/stamp";
00223             break;
00224           }
00225         }
00226       }
00227     }
00228 
00229     TimeValue timeValue;
00230   
00231     if (!timeFields_[axis].isEmpty()) {
00232       variant_topic_tools::BuiltinVariant variant = message.getVariant().
00233         getMember(timeFields_[axis].toStdString());
00234         
00235       timeValue.time_ = variant.getValue<ros::Time>();
00236     }
00237     else
00238       timeValue.time_ = message.getReceiptTime();
00239     
00240     if (axisConfig->getFieldType() == CurveAxisConfig::MessageData) {
00241       variant_topic_tools::BuiltinVariant variant = message.getVariant().
00242         getMember(axisConfig->getField().toStdString());
00243         
00244       timeValue.value_ = variant.getNumericValue();
00245     }
00246     else
00247       timeValue.value_ = message.getReceiptTime().toSec();
00248       
00249     if (timeValues_[axis].isEmpty() ||
00250         (timeValue.time_ > timeValues_[axis].last().time_))
00251       timeValues_[axis].append(timeValue);
00252   }
00253 
00254   interpolate();
00255 }
00256 
00257 void CurveDataSequencer::interpolate() {
00258   TimeValueList& timeValuesX = timeValues_[CurveConfig::X];
00259   TimeValueList& timeValuesY = timeValues_[CurveConfig::Y];
00260   
00261   while ((timeValuesX.count() > 1) && (timeValuesY.count() > 1)) {
00262     while ((timeValuesX.count() > 1) &&
00263         ((++timeValuesX.begin())->time_ < timeValuesY.front().time_))
00264       timeValuesX.removeFirst();
00265     
00266     while ((timeValuesY.count() > 1) &&
00267         ((++timeValuesY.begin())->time_ < timeValuesX.front().time_))
00268       timeValuesY.removeFirst();
00269   
00270     if ((timeValuesY.front().time_ >= timeValuesX.front().time_) &&
00271         (timeValuesX.count() > 1)) {
00272       QPointF point;
00273       
00274       const TimeValue& firstX = timeValuesX.first();
00275       const TimeValue& secondX = *(++timeValuesX.begin());
00276       
00277       point.setX(firstX.value_+(secondX.value_-firstX.value_)*
00278         (timeValuesY.front().time_-firstX.time_).toSec()/
00279         (secondX.time_-firstX.time_).toSec());
00280       point.setY(timeValuesY.front().value_);
00281       
00282       timeValuesY.removeFirst();
00283   
00284       emit pointReceived(point); 
00285     }    
00286     else if ((timeValuesX.front().time_ >= timeValuesY.front().time_) &&
00287         (timeValuesY.count() > 1)) {
00288       QPointF point;
00289       
00290       const TimeValue& firstY = timeValuesY.first();
00291       const TimeValue& secondY = *(++timeValuesY.begin());
00292       
00293       point.setX(timeValuesX.front().value_);
00294       point.setY(firstY.value_+(secondY.value_-firstY.value_)*
00295         (timeValuesX.front().time_-firstY.time_).toSec()/
00296         (secondY.time_-firstY.time_).toSec());
00297       
00298       timeValuesX.removeFirst();
00299       
00300       emit pointReceived(point); 
00301     }
00302   }
00303 }
00304 
00305 /*****************************************************************************/
00306 /* Slots                                                                     */
00307 /*****************************************************************************/
00308 
00309 void CurveDataSequencer::configAxisConfigChanged() {
00310   if (isSubscribed()) {
00311     unsubscribe();
00312     subscribe();
00313   }
00314 }
00315 
00316 void CurveDataSequencer::configSubscriberQueueSizeChanged(size_t queueSize) {
00317   if (isSubscribed()) {
00318     unsubscribe();
00319     subscribe();
00320   }
00321 }
00322 
00323 void CurveDataSequencer::subscriberMessageReceived(const QString& topic,
00324     const Message& message) {
00325   processMessage(message);
00326 }
00327 
00328 void CurveDataSequencer::subscriberXAxisMessageReceived(const QString& topic,
00329     const Message& message) {      
00330   processMessage(CurveConfig::X, message);
00331 }
00332 
00333 void CurveDataSequencer::subscriberYAxisMessageReceived(const QString& topic,
00334     const Message& message) {
00335   processMessage(CurveConfig::Y, message);
00336 }
00337 
00338 }


rqt_multiplot
Author(s): Ralf Kaestner
autogenerated on Tue May 9 2017 02:16:02