00001 #include "statepublisher_zmq.h" 00002 #include <QTextStream> 00003 #include <QFile> 00004 #include <QMessageBox> 00005 #include <zmq.hpp> 00006 #include <thread> 00007 00008 StatePublisherZMQ::StatePublisherZMQ(): 00009 _prev_dataplot(0), 00010 _thread( std::bind( &StatePublisherZMQ::run_thread, this) ), 00011 _prev_time( 0 ) 00012 { 00013 00014 } 00015 00016 StatePublisherZMQ::~StatePublisherZMQ() 00017 { 00018 00019 } 00020 00021 void StatePublisherZMQ::run_thread() 00022 { 00023 zmq::context_t context(1); 00024 zmq::socket_t socket (context, ZMQ_REP); 00025 socket.bind ("tcp://*:6665"); 00026 00027 while (true) 00028 { 00029 zmq::message_t request; 00030 00031 // Wait for next request from client 00032 socket.recv (&request); 00033 const char* request_data = (const char*)request.data(); 00034 00035 if( strncmp( request_data, "[get_data_names]", 16 ) == 0 ) 00036 { 00037 QString string_reply; 00038 00039 _mutex.lock(); 00040 00041 std::map<QString, double>::iterator it; 00042 for( it = _current_data.begin(); it != _current_data.end(); it++ ) 00043 { 00044 string_reply.append( it->first + QString(" ")); 00045 } 00046 _mutex.unlock(); 00047 00048 zmq::message_t reply ( string_reply.size() ); 00049 socket.send (reply); 00050 } 00051 else if( strncmp( request_data, "[get_data]", 10 ) == 0 ) 00052 { 00053 bool abort = false; 00054 QString string_request = QString::fromUtf8( &request_data [10], request.size() - 10); 00055 QStringList names = string_request.split(';'); 00056 00057 _mutex.lock(); 00058 QString string_reply; 00059 00060 for(int i = 0; i< names.count(); i++ ) 00061 { 00062 std::map<QString, double>::iterator it = _current_data.find( names.at(i) ); 00063 if( it == _current_data.end()) 00064 { 00065 abort = true; 00066 break; 00067 } 00068 else{ 00069 double value = it->second; 00070 string_reply.append( QString::number(value) + QString(" ")); 00071 } 00072 } 00073 _mutex.unlock(); 00074 00075 if( abort ) 00076 { 00077 zmq::message_t reply (5); 00078 memcpy (reply.data (), "Error", 5); 00079 socket.send (reply); 00080 } 00081 else{ 00082 zmq::message_t reply ( string_reply.size() ); 00083 socket.send (reply); 00084 } 00085 } 00086 else{ 00087 zmq::message_t reply (5); 00088 memcpy (reply.data (), "Error", 5); 00089 socket.send (reply); 00090 } 00091 } 00092 00093 } 00094 00095 void StatePublisherZMQ::updateState(PlotDataMap *datamap, double current_time) 00096 { 00097 if( datamap == 0) 00098 { 00099 _prev_dataplot = datamap; 00100 _prev_time = current_time; 00101 _mutex.lock(); 00102 _current_data.clear(); 00103 _mutex.unlock(); 00104 return; 00105 } 00106 00107 PlotDataMap::iterator it; 00108 00109 _mutex.lock(); 00110 if( datamap != _prev_dataplot || current_time != _prev_time) 00111 { 00112 for ( it = datamap->begin(); it != datamap->end(); it++ ) 00113 { 00114 const QString& name = it->first; 00115 PlotDataPtr plotdata = it->second; 00116 _current_data[ name ] = plotdata->getY( current_time ); 00117 } 00118 } 00119 _mutex.unlock(); 00120 00121 _prev_dataplot = datamap; 00122 _prev_time = current_time; 00123 }