Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #include "interactive_marker_client.h"
00031
00032 namespace rviz
00033 {
00034
00035 InteractiveMarkerClient::PublisherContext::PublisherContext()
00036 {
00037 update_seen = false;
00038 init_seen = false;
00039 last_update_seq_num = 0;
00040 last_init_seq_num = 0;
00041 update_time_ok = true;
00042 initialized = false;
00043 last_update_time = ros::Time::now();
00044 }
00045
00046
00047
00048 bool InteractiveMarkerClient::PublisherContext::checkInitWith(const visualization_msgs::InteractiveMarkerUpdate::ConstPtr& update)
00049 {
00050 if( (init_seen &&
00051 update->type == visualization_msgs::InteractiveMarkerUpdate::UPDATE &&
00052 last_init_seq_num + 1 >= update->seq_num)
00053 ||
00054 (init_seen &&
00055 update->type == visualization_msgs::InteractiveMarkerUpdate::KEEP_ALIVE &&
00056 last_init_seq_num >= update->seq_num) )
00057 {
00058 initialized = true;
00059 }
00060 return initialized;
00061 }
00062
00063
00064
00065 bool InteractiveMarkerClient::PublisherContext::checkInitWith(const visualization_msgs::InteractiveMarkerInit::ConstPtr& init)
00066 {
00067 M_InteractiveMarkerUpdate::const_iterator q_it_same_seq = update_queue.find( init->seq_num );
00068 M_InteractiveMarkerUpdate::const_iterator q_it_seq_plus_one = update_queue.find( init->seq_num + 1 );
00069
00070 if( (update_seen &&
00071 init->seq_num + 1 >= last_update_seq_num + 1) ||
00072 (q_it_seq_plus_one != update_queue.end() && q_it_seq_plus_one->second->type == visualization_msgs::InteractiveMarkerUpdate::UPDATE) ||
00073 (q_it_same_seq != update_queue.end() && q_it_same_seq->second->type == visualization_msgs::InteractiveMarkerUpdate::KEEP_ALIVE) )
00074 {
00075 initialized = true;
00076 }
00077 return initialized;
00078 }
00079
00080 void InteractiveMarkerClient::PublisherContext::enqueueUpdate(const visualization_msgs::InteractiveMarkerUpdate::ConstPtr& update)
00081 {
00082 if( update->type == visualization_msgs::InteractiveMarkerUpdate::UPDATE )
00083 {
00084 update_queue[ update->seq_num ] = update;
00085 }
00086 else
00087 {
00088
00089
00090 if( update_queue.find( update->seq_num ) == update_queue.end() )
00091 {
00092 update_queue[ update->seq_num ] = update;
00093 }
00094 }
00095 update_seen = true;
00096 last_update_seq_num = update->seq_num;
00097 }
00098
00099
00100 InteractiveMarkerClient::InteractiveMarkerClient( InteractiveMarkerReceiver* receiver )
00101 : cleared_( true )
00102 , subscribed_to_init_( false )
00103 {
00104 receiver_ = receiver;
00105 }
00106
00107
00108 void InteractiveMarkerClient::processMarkerInit(const visualization_msgs::InteractiveMarkerInit::ConstPtr& marker_init)
00109 {
00110 ROS_DEBUG("InteractiveMarkerClient: %s INIT %lu",
00111 marker_init->server_id.c_str(),
00112 marker_init->seq_num);
00113
00114
00115 if ( marker_init->server_id.empty() )
00116 {
00117 receiver_->setStatusError( "Topic", "server_id is empty!");
00118 }
00119
00120 M_PublisherContext::iterator context_it = publisher_contexts_.find(marker_init->server_id);
00121
00122
00123 if ( context_it == publisher_contexts_.end() )
00124 {
00125 PublisherContextPtr pc(new PublisherContext());
00126 context_it = publisher_contexts_.insert( std::make_pair(marker_init->server_id,pc) ).first;
00127 }
00128
00129 PublisherContextPtr context = context_it->second;
00130
00131
00132
00133
00134 if( context->initialized )
00135 {
00136 return;
00137 }
00138
00139 if( context->checkInitWith( marker_init ))
00140 {
00141 receiver_->processMarkerChanges( &marker_init->markers );
00142 cleared_ = false;
00143
00144 context->last_init_seq_num = marker_init->seq_num;
00145 context->init_seen = true;
00146 context->last_update_time = ros::Time::now();
00147
00148
00149 receiver_->setStatusOk( context_it->first, "Initialization complete.");
00150
00151
00152
00153
00154
00155 context->last_update_seq_num = context->last_init_seq_num;
00156
00157
00158 maybeUnsubscribeFromInit();
00159
00160 playbackUpdateQueue( context );
00161 }
00162 else if( context->update_queue.empty() )
00163 {
00164
00165
00166
00167
00168
00169
00170
00171 if( context->init_seen )
00172 {
00173 reinit();
00174 }
00175
00176 receiver_->processMarkerChanges( &marker_init->markers );
00177 cleared_ = false;
00178
00179 context->last_init_seq_num = marker_init->seq_num;
00180 context->init_seen = true;
00181 context->last_update_time = ros::Time::now();
00182 }
00183
00184 }
00185
00186
00187 void InteractiveMarkerClient::playbackUpdateQueue( PublisherContextPtr& context )
00188 {
00189 uint64_t next_seq_needed = context->last_init_seq_num + 1;
00190
00191 M_InteractiveMarkerUpdate::iterator update_it = context->update_queue.begin();
00192 for( ; update_it != context->update_queue.end(); update_it++ )
00193 {
00194 visualization_msgs::InteractiveMarkerUpdate::ConstPtr update = update_it->second;
00195
00196 if( update->seq_num == next_seq_needed )
00197 {
00198 applyUpdate( update, context );
00199 next_seq_needed++;
00200 }
00201 else if( update->seq_num < next_seq_needed )
00202 {
00203 ROS_DEBUG("Ignoring unneeded queued update number %lu, looking for %lu.", update->seq_num, next_seq_needed);
00204 }
00205 else
00206 {
00207 ROS_ERROR("Found queued update number %lu, missed %lu.", update->seq_num, next_seq_needed);
00208 }
00209 }
00210 context->update_queue.clear();
00211 }
00212
00213 void InteractiveMarkerClient::processMarkerUpdate(const visualization_msgs::InteractiveMarkerUpdate::ConstPtr& marker_update)
00214 {
00215
00216
00217
00218
00219
00220
00221 if ( marker_update->server_id.empty() )
00222 {
00223 receiver_->setStatusError( "Topic", "server_id is empty!");
00224 }
00225
00226 M_PublisherContext::iterator context_it = publisher_contexts_.find(marker_update->server_id);
00227
00228
00229
00230
00231 if ( context_it == publisher_contexts_.end() )
00232 {
00233 PublisherContextPtr pc(new PublisherContext());
00234
00235 pc->enqueueUpdate(marker_update);
00236
00237 context_it = publisher_contexts_.insert( std::make_pair(marker_update->server_id,pc) ).first;
00238
00239 reinit();
00240
00241
00242
00243 return;
00244 }
00245
00246 PublisherContextPtr context = context_it->second;
00247
00248 if ( !context->initialized )
00249 {
00250 if( context->checkInitWith( marker_update ))
00251 {
00252
00253 receiver_->setStatusOk( context_it->first, "Initialization complete.");
00254
00255
00256
00257
00258
00259 context->last_update_seq_num = context->last_init_seq_num;
00260
00261
00262 maybeUnsubscribeFromInit();
00263 }
00264 else
00265 {
00266
00267
00268 receiver_->setStatusWarn( marker_update->server_id, "Received update or keep-alive without previous INIT message. It might be lost.");
00269 context->enqueueUpdate(marker_update);
00270 return;
00271 }
00272 }
00273
00274
00275
00276 applyUpdate( marker_update, context );
00277 }
00278
00279 void InteractiveMarkerClient::clear()
00280 {
00281 publisher_contexts_.clear();
00282 reinit();
00283 }
00284
00285 void InteractiveMarkerClient::unsubscribedFromInit()
00286 {
00287 subscribed_to_init_ = false;
00288 }
00289
00290 void InteractiveMarkerClient::reinit()
00291 {
00292 if( !cleared_ )
00293 {
00294 receiver_->clearMarkers();
00295 cleared_ = true;
00296 }
00297 if( !subscribed_to_init_ )
00298 {
00299 subscribed_to_init_ = receiver_->subscribeToInit();
00300 }
00301
00302
00303 M_PublisherContext::iterator context_it;
00304 for( context_it = publisher_contexts_.begin(); context_it != publisher_contexts_.end(); context_it++ )
00305 {
00306 context_it->second->initialized = false;
00307 }
00308 }
00309
00310
00311 void InteractiveMarkerClient::maybeUnsubscribeFromInit() {
00312 M_PublisherContext::iterator context_it;
00313 for( context_it = publisher_contexts_.begin(); context_it != publisher_contexts_.end(); context_it++ )
00314 {
00315 if( !context_it->second->initialized )
00316 {
00317 return;
00318 }
00319 }
00320 receiver_->unsubscribeFromInit();
00321 subscribed_to_init_ = false;
00322 }
00323
00324 void InteractiveMarkerClient::applyUpdate( const visualization_msgs::InteractiveMarkerUpdate::ConstPtr& marker_update,
00325 PublisherContextPtr& context )
00326 {
00327 uint64_t expected_seq_num = 0;
00328
00329 switch ( marker_update->type )
00330 {
00331 case visualization_msgs::InteractiveMarkerUpdate::UPDATE:
00332 expected_seq_num = context->last_update_seq_num + 1;
00333 break;
00334
00335 case visualization_msgs::InteractiveMarkerUpdate::KEEP_ALIVE:
00336 expected_seq_num = context->last_update_seq_num;
00337 break;
00338 }
00339
00340 if ( marker_update->seq_num != expected_seq_num )
00341 {
00342 if( marker_update->seq_num < expected_seq_num )
00343 {
00344 ROS_INFO("Received sequence number %lu, less than expected sequence number %lu. Ignoring.",
00345 marker_update->seq_num, expected_seq_num);
00346 return;
00347 }
00348
00349
00350 std::ostringstream s;
00351 s << "Detected lost update or server restart. Resetting. Reason: Received wrong sequence number (expected: " <<
00352 expected_seq_num << ", received: " << marker_update->seq_num << ")";
00353 receiver_->setStatusError( marker_update->server_id, s.str());
00354 reinit();
00355 return;
00356 }
00357
00358 context->last_update_seq_num = marker_update->seq_num;
00359 context->update_seen = true;
00360 context->last_update_time = ros::Time::now();
00361
00362 if( marker_update->type == visualization_msgs::InteractiveMarkerUpdate::UPDATE )
00363 {
00364 receiver_->processMarkerChanges( &marker_update->markers, &marker_update->poses, &marker_update->erases );
00365 cleared_ = false;
00366 }
00367 }
00368
00369 bool InteractiveMarkerClient::isPublisherListEmpty()
00370 {
00371 return publisher_contexts_.empty();
00372 }
00373
00374 void InteractiveMarkerClient::flagLateConnections()
00375 {
00376 M_PublisherContext::iterator it;
00377 for ( it = publisher_contexts_.begin(); it != publisher_contexts_.end(); it++ )
00378 {
00379 PublisherContextPtr& context = it->second;
00380 double time_since_last_update = (ros::Time::now() - context->last_update_time).toSec();
00381 if ( time_since_last_update > 1.0 )
00382 {
00383 std::stringstream s;
00384 s << "No update received for " << (int)time_since_last_update << " seconds. Connection might be lost.";
00385 receiver_->setStatusWarn( it->first, s.str() );
00386 context->update_time_ok = false;
00387 }
00388 if ( !context->update_time_ok && time_since_last_update <= 1.0 )
00389 {
00390 receiver_->setStatusOk( it->first, "OK" );
00391 }
00392 }
00393 }
00394
00395 }