Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #include "interactive_marker_client.h"
00031 #include <ros/console.h>
00032
00033 namespace rviz
00034 {
00035
00036 InteractiveMarkerClient::PublisherContext::PublisherContext()
00037 {
00038 update_seen = false;
00039 init_seen = false;
00040 last_update_seq_num = 0;
00041 last_init_seq_num = 0;
00042 update_time_ok = true;
00043 initialized = false;
00044 last_update_time = ros::Time::now();
00045 }
00046
00047
00048
00049 bool InteractiveMarkerClient::PublisherContext::checkInitWith(const visualization_msgs::InteractiveMarkerUpdate::ConstPtr& update)
00050 {
00051 if( (init_seen &&
00052 update->type == visualization_msgs::InteractiveMarkerUpdate::UPDATE &&
00053 last_init_seq_num + 1 >= update->seq_num)
00054 ||
00055 (init_seen &&
00056 update->type == visualization_msgs::InteractiveMarkerUpdate::KEEP_ALIVE &&
00057 last_init_seq_num >= update->seq_num) )
00058 {
00059 initialized = true;
00060 }
00061 return initialized;
00062 }
00063
00064
00065
00066 bool InteractiveMarkerClient::PublisherContext::checkInitWith(const visualization_msgs::InteractiveMarkerInit::ConstPtr& init)
00067 {
00068 M_InteractiveMarkerUpdate::const_iterator q_it_same_seq = update_queue.find( init->seq_num );
00069 M_InteractiveMarkerUpdate::const_iterator q_it_seq_plus_one = update_queue.find( init->seq_num + 1 );
00070
00071 if( (update_seen &&
00072 init->seq_num + 1 >= last_update_seq_num + 1) ||
00073 (q_it_seq_plus_one != update_queue.end() && q_it_seq_plus_one->second->type == visualization_msgs::InteractiveMarkerUpdate::UPDATE) ||
00074 (q_it_same_seq != update_queue.end() && q_it_same_seq->second->type == visualization_msgs::InteractiveMarkerUpdate::KEEP_ALIVE) )
00075 {
00076 initialized = true;
00077 }
00078 return initialized;
00079 }
00080
00081 void InteractiveMarkerClient::PublisherContext::enqueueUpdate(const visualization_msgs::InteractiveMarkerUpdate::ConstPtr& update)
00082 {
00083 if( update->type == visualization_msgs::InteractiveMarkerUpdate::UPDATE )
00084 {
00085 update_queue[ update->seq_num ] = update;
00086 }
00087 else
00088 {
00089
00090
00091 if( update_queue.find( update->seq_num ) == update_queue.end() )
00092 {
00093 update_queue[ update->seq_num ] = update;
00094 }
00095 }
00096 update_seen = true;
00097 last_update_seq_num = update->seq_num;
00098 }
00099
00100
00101 InteractiveMarkerClient::InteractiveMarkerClient( InteractiveMarkerReceiver* receiver )
00102 : cleared_( true )
00103 , subscribed_to_init_( false )
00104 {
00105 receiver_ = receiver;
00106 }
00107
00108
00109 void InteractiveMarkerClient::processMarkerInit(const visualization_msgs::InteractiveMarkerInit::ConstPtr& marker_init)
00110 {
00111 ROS_DEBUG("InteractiveMarkerClient: %s INIT %lu",
00112 marker_init->server_id.c_str(),
00113 marker_init->seq_num);
00114
00115
00116 if ( marker_init->server_id.empty() )
00117 {
00118 receiver_->setStatusError( "Topic", "server_id is empty!");
00119 }
00120
00121 M_PublisherContext::iterator context_it = publisher_contexts_.find(marker_init->server_id);
00122
00123
00124 if ( context_it == publisher_contexts_.end() )
00125 {
00126 PublisherContextPtr pc(new PublisherContext());
00127 context_it = publisher_contexts_.insert( std::make_pair(marker_init->server_id,pc) ).first;
00128 }
00129
00130 PublisherContextPtr context = context_it->second;
00131
00132
00133
00134
00135 if( context->initialized )
00136 {
00137 return;
00138 }
00139
00140 if( context->checkInitWith( marker_init ))
00141 {
00142 receiver_->processMarkerChanges( &marker_init->markers );
00143 cleared_ = false;
00144
00145 context->last_init_seq_num = marker_init->seq_num;
00146 context->init_seen = true;
00147 context->last_update_time = ros::Time::now();
00148
00149
00150 receiver_->setStatusOk( context_it->first, "Initialization complete.");
00151
00152
00153
00154
00155
00156 context->last_update_seq_num = context->last_init_seq_num;
00157
00158
00159 maybeUnsubscribeFromInit();
00160
00161 playbackUpdateQueue( context );
00162 }
00163 else if( context->update_queue.empty() )
00164 {
00165
00166
00167
00168
00169
00170
00171
00172 if( context->init_seen )
00173 {
00174 reinit();
00175 }
00176
00177 receiver_->processMarkerChanges( &marker_init->markers );
00178 cleared_ = false;
00179
00180 context->last_init_seq_num = marker_init->seq_num;
00181 context->init_seen = true;
00182 context->last_update_time = ros::Time::now();
00183 }
00184
00185 }
00186
00187
00188 void InteractiveMarkerClient::playbackUpdateQueue( PublisherContextPtr& context )
00189 {
00190 uint64_t next_seq_needed = context->last_init_seq_num + 1;
00191
00192 M_InteractiveMarkerUpdate::iterator update_it = context->update_queue.begin();
00193 for( ; update_it != context->update_queue.end(); update_it++ )
00194 {
00195 visualization_msgs::InteractiveMarkerUpdate::ConstPtr update = update_it->second;
00196
00197 if( update->seq_num == next_seq_needed )
00198 {
00199 applyUpdate( update, context );
00200 next_seq_needed++;
00201 }
00202 else if( update->seq_num < next_seq_needed )
00203 {
00204 ROS_DEBUG("Ignoring unneeded queued update number %lu, looking for %lu.", update->seq_num, next_seq_needed);
00205 }
00206 else
00207 {
00208 ROS_ERROR("Found queued update number %lu, missed %lu.", update->seq_num, next_seq_needed);
00209 }
00210 }
00211 context->update_queue.clear();
00212 }
00213
00214 void InteractiveMarkerClient::processMarkerUpdate(const visualization_msgs::InteractiveMarkerUpdate::ConstPtr& marker_update)
00215 {
00216
00217
00218
00219
00220
00221
00222 if ( marker_update->server_id.empty() )
00223 {
00224 receiver_->setStatusError( "Topic", "server_id is empty!");
00225 }
00226
00227 M_PublisherContext::iterator context_it = publisher_contexts_.find(marker_update->server_id);
00228
00229
00230
00231
00232 if ( context_it == publisher_contexts_.end() )
00233 {
00234 PublisherContextPtr pc(new PublisherContext());
00235
00236 pc->enqueueUpdate(marker_update);
00237
00238 context_it = publisher_contexts_.insert( std::make_pair(marker_update->server_id,pc) ).first;
00239
00240 reinit();
00241
00242
00243
00244 return;
00245 }
00246
00247 PublisherContextPtr context = context_it->second;
00248
00249 if ( !context->initialized )
00250 {
00251 if( context->checkInitWith( marker_update ))
00252 {
00253
00254 receiver_->setStatusOk( context_it->first, "Initialization complete.");
00255
00256
00257
00258
00259
00260 context->last_update_seq_num = context->last_init_seq_num;
00261
00262
00263 maybeUnsubscribeFromInit();
00264 }
00265 else
00266 {
00267
00268
00269 receiver_->setStatusWarn( marker_update->server_id, "Received update or keep-alive without previous INIT message. It might be lost.");
00270 context->enqueueUpdate(marker_update);
00271 return;
00272 }
00273 }
00274
00275
00276
00277 applyUpdate( marker_update, context );
00278 }
00279
00280 void InteractiveMarkerClient::clear()
00281 {
00282 publisher_contexts_.clear();
00283 reinit();
00284 }
00285
00286 void InteractiveMarkerClient::unsubscribedFromInit()
00287 {
00288 subscribed_to_init_ = false;
00289 }
00290
00291 void InteractiveMarkerClient::reinit()
00292 {
00293 if( !cleared_ )
00294 {
00295 receiver_->clearMarkers();
00296 cleared_ = true;
00297 }
00298 if( !subscribed_to_init_ )
00299 {
00300 subscribed_to_init_ = receiver_->subscribeToInit();
00301 }
00302
00303
00304 M_PublisherContext::iterator context_it;
00305 for( context_it = publisher_contexts_.begin(); context_it != publisher_contexts_.end(); context_it++ )
00306 {
00307 context_it->second->initialized = false;
00308 }
00309 }
00310
00311
00312 void InteractiveMarkerClient::maybeUnsubscribeFromInit() {
00313 M_PublisherContext::iterator context_it;
00314 for( context_it = publisher_contexts_.begin(); context_it != publisher_contexts_.end(); context_it++ )
00315 {
00316 if( !context_it->second->initialized )
00317 {
00318 return;
00319 }
00320 }
00321 receiver_->unsubscribeFromInit();
00322 subscribed_to_init_ = false;
00323 }
00324
00325 void InteractiveMarkerClient::applyUpdate( const visualization_msgs::InteractiveMarkerUpdate::ConstPtr& marker_update,
00326 PublisherContextPtr& context )
00327 {
00328 uint64_t expected_seq_num = 0;
00329
00330 switch ( marker_update->type )
00331 {
00332 case visualization_msgs::InteractiveMarkerUpdate::UPDATE:
00333 expected_seq_num = context->last_update_seq_num + 1;
00334 break;
00335
00336 case visualization_msgs::InteractiveMarkerUpdate::KEEP_ALIVE:
00337 expected_seq_num = context->last_update_seq_num;
00338 break;
00339 }
00340
00341 if ( marker_update->seq_num != expected_seq_num )
00342 {
00343 if( marker_update->seq_num < expected_seq_num )
00344 {
00345 ROS_INFO("Received sequence number %lu, less than expected sequence number %lu. Ignoring.",
00346 marker_update->seq_num, expected_seq_num);
00347 return;
00348 }
00349
00350
00351 std::ostringstream s;
00352 s << "Detected lost update or server restart. Resetting. Reason: Received wrong sequence number (expected: " <<
00353 expected_seq_num << ", received: " << marker_update->seq_num << ")";
00354 receiver_->setStatusError( marker_update->server_id, s.str());
00355 reinit();
00356 return;
00357 }
00358
00359 context->last_update_seq_num = marker_update->seq_num;
00360 context->update_seen = true;
00361 context->last_update_time = ros::Time::now();
00362
00363 if( marker_update->type == visualization_msgs::InteractiveMarkerUpdate::UPDATE )
00364 {
00365 receiver_->processMarkerChanges( &marker_update->markers, &marker_update->poses, &marker_update->erases );
00366 cleared_ = false;
00367 }
00368 }
00369
00370 bool InteractiveMarkerClient::isPublisherListEmpty()
00371 {
00372 return publisher_contexts_.empty();
00373 }
00374
00375 void InteractiveMarkerClient::flagLateConnections()
00376 {
00377 M_PublisherContext::iterator it;
00378 for ( it = publisher_contexts_.begin(); it != publisher_contexts_.end(); it++ )
00379 {
00380 PublisherContextPtr& context = it->second;
00381 double time_since_last_update = (ros::Time::now() - context->last_update_time).toSec();
00382 if ( time_since_last_update > 1.0 )
00383 {
00384 std::stringstream s;
00385 s << "No update received for " << (int)time_since_last_update << " seconds. Connection might be lost.";
00386 receiver_->setStatusWarn( it->first, s.str() );
00387 context->update_time_ok = false;
00388 }
00389 if ( !context->update_time_ok && time_since_last_update <= 1.0 )
00390 {
00391 receiver_->setStatusOk( it->first, "OK" );
00392 }
00393 }
00394 }
00395
00396 }