00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #include "interactive_marker_client.h"
00031
00032 namespace rviz
00033 {
00034
00035 InteractiveMarkerClient::PublisherContext::PublisherContext()
00036 {
00037 update_seen = false;
00038 init_seen = false;
00039 last_update_seq_num = 0;
00040 last_init_seq_num = 0;
00041 update_time_ok = true;
00042 initialized = false;
00043 last_update_time = ros::Time::now();
00044 }
00045
00046
00047
00048 bool InteractiveMarkerClient::PublisherContext::checkInitWith(const visualization_msgs::InteractiveMarkerUpdate::ConstPtr& update)
00049 {
00050 if( (init_seen &&
00051 update->type == visualization_msgs::InteractiveMarkerUpdate::UPDATE &&
00052 last_init_seq_num + 1 >= update->seq_num)
00053 ||
00054 (init_seen &&
00055 update->type == visualization_msgs::InteractiveMarkerUpdate::KEEP_ALIVE &&
00056 last_init_seq_num >= update->seq_num) )
00057 {
00058 initialized = true;
00059 }
00060 return initialized;
00061 }
00062
00063
00064
00065 bool InteractiveMarkerClient::PublisherContext::checkInitWith(const visualization_msgs::InteractiveMarkerInit::ConstPtr& init)
00066 {
00067 M_InteractiveMarkerUpdate::const_iterator q_it_same_seq = update_queue.find( init->seq_num );
00068 M_InteractiveMarkerUpdate::const_iterator q_it_seq_plus_one = update_queue.find( init->seq_num + 1 );
00069
00070 if( (update_seen &&
00071 init->seq_num + 1 >= last_update_seq_num + 1) ||
00072 (q_it_seq_plus_one != update_queue.end() && q_it_seq_plus_one->second->type == visualization_msgs::InteractiveMarkerUpdate::UPDATE) ||
00073 (q_it_same_seq != update_queue.end() && q_it_same_seq->second->type == visualization_msgs::InteractiveMarkerUpdate::KEEP_ALIVE) )
00074 {
00075 initialized = true;
00076 }
00077 return initialized;
00078 }
00079
00080 void InteractiveMarkerClient::PublisherContext::enqueueUpdate(const visualization_msgs::InteractiveMarkerUpdate::ConstPtr& update)
00081 {
00082 if( update->type == visualization_msgs::InteractiveMarkerUpdate::UPDATE )
00083 {
00084 update_queue[ update->seq_num ] = update;
00085 }
00086 else
00087 {
00088
00089
00090 if( update_queue.find( update->seq_num ) == update_queue.end() )
00091 {
00092 update_queue[ update->seq_num ] = update;
00093 }
00094 }
00095 update_seen = true;
00096 last_update_seq_num = update->seq_num;
00097 }
00098
00099
00100 InteractiveMarkerClient::InteractiveMarkerClient( InteractiveMarkerReceiver* receiver )
00101 : cleared_( true )
00102 , subscribed_to_init_( false )
00103 {
00104 receiver_ = receiver;
00105 }
00106
00107
00108 void InteractiveMarkerClient::processMarkerInit(const visualization_msgs::InteractiveMarkerInit::ConstPtr& marker_init)
00109 {
00110 ROS_DEBUG("InteractiveMarkerClient: %s INIT %lu",
00111 marker_init->server_id.c_str(),
00112 marker_init->seq_num);
00113
00114
00115 if ( marker_init->server_id.empty() )
00116 {
00117 receiver_->setStatusError( "Topic", "server_id is empty!");
00118 }
00119
00120 M_PublisherContext::iterator context_it = publisher_contexts_.find(marker_init->server_id);
00121
00122
00123 if ( context_it == publisher_contexts_.end() )
00124 {
00125 PublisherContextPtr pc(new PublisherContext());
00126 context_it = publisher_contexts_.insert( std::make_pair(marker_init->server_id,pc) ).first;
00127 }
00128
00129 PublisherContextPtr context = context_it->second;
00130
00131
00132
00133
00134 if( context->initialized )
00135 {
00136 return;
00137 }
00138
00139 if( context->checkInitWith( marker_init ))
00140 {
00141 receiver_->processMarkerChanges( &marker_init->markers );
00142 cleared_ = false;
00143
00144 context->last_init_seq_num = marker_init->seq_num;
00145 context->init_seen = true;
00146 context->last_update_time = ros::Time::now();
00147
00148
00149 receiver_->setStatusOk( context_it->first, "Initialization complete.");
00150
00151
00152
00153
00154
00155 context->last_update_seq_num = context->last_init_seq_num;
00156
00157
00158 maybeUnsubscribeFromInit();
00159
00160 playbackUpdateQueue( context );
00161 }
00162 else if( context->update_queue.empty() )
00163 {
00164
00165
00166
00167
00168
00169
00170
00171 if( context->init_seen )
00172 {
00173 reinit();
00174 }
00175
00176 receiver_->processMarkerChanges( &marker_init->markers );
00177 cleared_ = false;
00178
00179 context->last_init_seq_num = marker_init->seq_num;
00180 context->init_seen = true;
00181 context->last_update_time = ros::Time::now();
00182 }
00183
00184 }
00185
00186
00187 void InteractiveMarkerClient::playbackUpdateQueue( PublisherContextPtr& context )
00188 {
00189 uint64_t next_seq_needed = context->last_init_seq_num + 1;
00190
00191 M_InteractiveMarkerUpdate::iterator update_it = context->update_queue.begin();
00192 for( ; update_it != context->update_queue.end(); update_it++ )
00193 {
00194 visualization_msgs::InteractiveMarkerUpdate::ConstPtr update = update_it->second;
00195
00196 if( update->seq_num == next_seq_needed )
00197 {
00198 applyUpdate( update, context );
00199 next_seq_needed++;
00200 }
00201 else if( update->seq_num < next_seq_needed )
00202 {
00203 ROS_DEBUG("Ignoring unneeded queued update number %lu, looking for %lu.", update->seq_num, next_seq_needed);
00204 }
00205 else
00206 {
00207 ROS_ERROR("Found queued update number %lu, missed %lu.", update->seq_num, next_seq_needed);
00208 }
00209 }
00210 context->update_queue.clear();
00211 }
00212
00213 void InteractiveMarkerClient::processMarkerUpdate(const visualization_msgs::InteractiveMarkerUpdate::ConstPtr& marker_update)
00214 {
00215
00216
00217
00218
00219
00220
00221 if ( marker_update->server_id.empty() )
00222 {
00223 receiver_->setStatusError( "Topic", "server_id is empty!");
00224 }
00225
00226 M_PublisherContext::iterator context_it = publisher_contexts_.find(marker_update->server_id);
00227
00228
00229
00230
00231 if ( context_it == publisher_contexts_.end() )
00232 {
00233 PublisherContextPtr pc(new PublisherContext());
00234
00235 pc->enqueueUpdate(marker_update);
00236
00237 context_it = publisher_contexts_.insert( std::make_pair(marker_update->server_id,pc) ).first;
00238
00239 reinit();
00240
00241
00242
00243 return;
00244 }
00245
00246 PublisherContextPtr context = context_it->second;
00247
00248 if ( !context->initialized )
00249 {
00250 if( context->checkInitWith( marker_update ))
00251 {
00252
00253 receiver_->setStatusOk( context_it->first, "Initialization complete.");
00254
00255
00256
00257
00258
00259 context->last_update_seq_num = context->last_init_seq_num;
00260
00261
00262 maybeUnsubscribeFromInit();
00263 }
00264 else
00265 {
00266
00267
00268 receiver_->setStatusWarn( marker_update->server_id, "Received update or keep-alive without previous INIT message. It might be lost.");
00269 context->enqueueUpdate(marker_update);
00270 return;
00271 }
00272 }
00273
00274
00275
00276 applyUpdate( marker_update, context );
00277 }
00278
00279 void InteractiveMarkerClient::clear()
00280 {
00281 publisher_contexts_.clear();
00282 reinit();
00283 }
00284
00285 void InteractiveMarkerClient::unsubscribedFromInit()
00286 {
00287 subscribed_to_init_ = false;
00288 }
00289
00290 void InteractiveMarkerClient::reinit()
00291 {
00292 if( !cleared_ )
00293 {
00294 receiver_->clearMarkers();
00295 cleared_ = true;
00296 }
00297 if( !subscribed_to_init_ )
00298 {
00299 subscribed_to_init_ = receiver_->subscribeToInit();
00300 }
00301
00302
00303 M_PublisherContext::iterator context_it;
00304 for( context_it = publisher_contexts_.begin(); context_it != publisher_contexts_.end(); context_it++ )
00305 {
00306 context_it->second->initialized = false;
00307 }
00308 }
00309
00310
00311 void InteractiveMarkerClient::maybeUnsubscribeFromInit() {
00312 M_PublisherContext::iterator context_it;
00313 for( context_it = publisher_contexts_.begin(); context_it != publisher_contexts_.end(); context_it++ )
00314 {
00315 if( !context_it->second->initialized )
00316 {
00317 return;
00318 }
00319 }
00320 receiver_->unsubscribeFromInit();
00321 subscribed_to_init_ = false;
00322 }
00323
00324 void InteractiveMarkerClient::applyUpdate( const visualization_msgs::InteractiveMarkerUpdate::ConstPtr& marker_update,
00325 PublisherContextPtr& context )
00326 {
00327 uint64_t expected_seq_num = 0;
00328
00329 switch ( marker_update->type )
00330 {
00331 case visualization_msgs::InteractiveMarkerUpdate::UPDATE:
00332 expected_seq_num = context->last_update_seq_num + 1;
00333 break;
00334
00335 case visualization_msgs::InteractiveMarkerUpdate::KEEP_ALIVE:
00336 expected_seq_num = context->last_update_seq_num;
00337 break;
00338 }
00339
00340 if ( marker_update->seq_num != expected_seq_num )
00341 {
00342 if( marker_update->seq_num < expected_seq_num )
00343 {
00344 ROS_INFO("Received sequence number %lu, less than expected sequence number %lu. Ignoring.",
00345 marker_update->seq_num, expected_seq_num);
00346 return;
00347 }
00348
00349
00350 std::ostringstream s;
00351 s << "Detected lost update or server restart. Resetting. Reason: Received wrong sequence number (expected: " <<
00352 expected_seq_num << ", received: " << marker_update->seq_num << ")";
00353 receiver_->setStatusError( marker_update->server_id, s.str());
00354 reinit();
00355 return;
00356 }
00357
00358 context->last_update_seq_num = marker_update->seq_num;
00359 context->update_seen = true;
00360 context->last_update_time = ros::Time::now();
00361
00362 if( marker_update->type == visualization_msgs::InteractiveMarkerUpdate::UPDATE )
00363 {
00364 receiver_->processMarkerChanges( &marker_update->markers, &marker_update->poses, &marker_update->erases );
00365 cleared_ = false;
00366 }
00367 }
00368
00369 bool InteractiveMarkerClient::isPublisherListEmpty()
00370 {
00371 return publisher_contexts_.empty();
00372 }
00373
00374 void InteractiveMarkerClient::flagLateConnections()
00375 {
00376 M_PublisherContext::iterator it;
00377 for ( it = publisher_contexts_.begin(); it != publisher_contexts_.end(); it++ )
00378 {
00379 PublisherContextPtr& context = it->second;
00380 double time_since_last_update = (ros::Time::now() - context->last_update_time).toSec();
00381 if ( time_since_last_update > 1.0 )
00382 {
00383 std::stringstream s;
00384 s << "No update received for " << (int)time_since_last_update << " seconds. Connection might be lost.";
00385 receiver_->setStatusWarn( it->first, s.str() );
00386 context->update_time_ok = false;
00387 }
00388 if ( !context->update_time_ok && time_since_last_update <= 1.0 )
00389 {
00390 receiver_->setStatusOk( it->first, "OK" );
00391 }
00392 }
00393 }
00394
00395 }