Go to the documentation of this file.00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 #include "interactive_marker_client.h"
00031 #include <ros/console.h>
00032 
00033 namespace rviz
00034 {
00035 
00036 InteractiveMarkerClient::PublisherContext::PublisherContext()
00037 {
00038   update_seen = false;
00039   init_seen = false;
00040   last_update_seq_num = 0;
00041   last_init_seq_num = 0;
00042   update_time_ok = true;
00043   initialized = false;
00044   last_update_time = ros::Time::now();
00045 }
00046 
00047 
00048 
00049 bool InteractiveMarkerClient::PublisherContext::checkInitWith(const visualization_msgs::InteractiveMarkerUpdate::ConstPtr& update)
00050 {
00051   if( (init_seen &&
00052        update->type == visualization_msgs::InteractiveMarkerUpdate::UPDATE &&
00053        last_init_seq_num + 1 >= update->seq_num)
00054       ||
00055       (init_seen &&
00056        update->type == visualization_msgs::InteractiveMarkerUpdate::KEEP_ALIVE &&
00057        last_init_seq_num >= update->seq_num) )
00058   {
00059     initialized = true;
00060   }
00061   return initialized;
00062 }
00063 
00064 
00065 
00066 bool InteractiveMarkerClient::PublisherContext::checkInitWith(const visualization_msgs::InteractiveMarkerInit::ConstPtr& init)
00067 {
00068   M_InteractiveMarkerUpdate::const_iterator q_it_same_seq = update_queue.find( init->seq_num );
00069   M_InteractiveMarkerUpdate::const_iterator q_it_seq_plus_one = update_queue.find( init->seq_num + 1 );
00070 
00071   if( (update_seen &&
00072        init->seq_num + 1 >= last_update_seq_num + 1) ||
00073       (q_it_seq_plus_one != update_queue.end() && q_it_seq_plus_one->second->type == visualization_msgs::InteractiveMarkerUpdate::UPDATE) ||
00074       (q_it_same_seq != update_queue.end() && q_it_same_seq->second->type == visualization_msgs::InteractiveMarkerUpdate::KEEP_ALIVE) )
00075   {
00076     initialized = true;
00077   }
00078   return initialized;
00079 }
00080 
00081 void InteractiveMarkerClient::PublisherContext::enqueueUpdate(const visualization_msgs::InteractiveMarkerUpdate::ConstPtr& update)
00082 {
00083   if( update->type == visualization_msgs::InteractiveMarkerUpdate::UPDATE )
00084   {
00085     update_queue[ update->seq_num ] = update;
00086   }
00087   else 
00088   {
00089     
00090     
00091     if( update_queue.find( update->seq_num ) == update_queue.end() )
00092     {
00093       update_queue[ update->seq_num ] = update;
00094     }
00095   }
00096   update_seen = true;
00097   last_update_seq_num = update->seq_num;
00098 }
00099 
00100 
00101 InteractiveMarkerClient::InteractiveMarkerClient( InteractiveMarkerReceiver* receiver )
00102   : cleared_( true )
00103   , subscribed_to_init_( false )
00104 {
00105   receiver_ = receiver;
00106 }
00107 
00108 
00109 void InteractiveMarkerClient::processMarkerInit(const visualization_msgs::InteractiveMarkerInit::ConstPtr& marker_init)
00110 {
00111   ROS_DEBUG("InteractiveMarkerClient: %s INIT %lu",
00112             marker_init->server_id.c_str(),
00113             marker_init->seq_num);
00114 
00115   
00116   if ( marker_init->server_id.empty() )
00117   {
00118     receiver_->setStatusError( "Topic", "server_id is empty!");
00119   }
00120 
00121   M_PublisherContext::iterator context_it = publisher_contexts_.find(marker_init->server_id);
00122 
00123   
00124   if ( context_it == publisher_contexts_.end() )
00125   {
00126     PublisherContextPtr pc(new PublisherContext());
00127     context_it = publisher_contexts_.insert( std::make_pair(marker_init->server_id,pc) ).first;
00128   }
00129 
00130   PublisherContextPtr context = context_it->second;
00131 
00132   
00133   
00134   
00135   if( context->initialized )
00136   {
00137     return;
00138   }
00139 
00140   if( context->checkInitWith( marker_init ))
00141   {
00142     receiver_->processMarkerChanges( &marker_init->markers );
00143     cleared_ = false;
00144 
00145     context->last_init_seq_num = marker_init->seq_num;
00146     context->init_seen = true;
00147     context->last_update_time = ros::Time::now();
00148 
00149     
00150     receiver_->setStatusOk( context_it->first, "Initialization complete.");
00151 
00152     
00153     
00154     
00155     
00156     context->last_update_seq_num = context->last_init_seq_num;
00157 
00158     
00159     maybeUnsubscribeFromInit();
00160     
00161     playbackUpdateQueue( context );
00162   }
00163   else if( context->update_queue.empty() )
00164   {
00165     
00166     
00167     
00168     
00169     
00170     
00171     
00172     if( context->init_seen )
00173     {
00174       reinit();
00175     }
00176 
00177     receiver_->processMarkerChanges( &marker_init->markers );
00178     cleared_ = false;
00179 
00180     context->last_init_seq_num = marker_init->seq_num;
00181     context->init_seen = true;
00182     context->last_update_time = ros::Time::now();
00183   }
00184   
00185 }
00186 
00187 
00188 void InteractiveMarkerClient::playbackUpdateQueue( PublisherContextPtr& context )
00189 {
00190   uint64_t next_seq_needed = context->last_init_seq_num + 1;
00191 
00192   M_InteractiveMarkerUpdate::iterator update_it = context->update_queue.begin();
00193   for( ; update_it != context->update_queue.end(); update_it++ )
00194   {
00195     visualization_msgs::InteractiveMarkerUpdate::ConstPtr update = update_it->second;
00196 
00197     if( update->seq_num == next_seq_needed )
00198     {
00199       applyUpdate( update, context );
00200       next_seq_needed++;
00201     }
00202     else if( update->seq_num < next_seq_needed )
00203     {
00204       ROS_DEBUG("Ignoring unneeded queued update number %lu, looking for %lu.", update->seq_num, next_seq_needed);
00205     }
00206     else
00207     {
00208       ROS_ERROR("Found queued update number %lu, missed %lu.", update->seq_num, next_seq_needed);
00209     }
00210   }
00211   context->update_queue.clear();
00212 }
00213 
00214 void InteractiveMarkerClient::processMarkerUpdate(const visualization_msgs::InteractiveMarkerUpdate::ConstPtr& marker_update)
00215 {
00216 
00217 
00218 
00219 
00220 
00221   
00222   if ( marker_update->server_id.empty() )
00223   {
00224     receiver_->setStatusError( "Topic", "server_id is empty!");
00225   }
00226 
00227   M_PublisherContext::iterator context_it = publisher_contexts_.find(marker_update->server_id);
00228 
00229   
00230   
00231   
00232   if ( context_it == publisher_contexts_.end() )
00233   {
00234     PublisherContextPtr pc(new PublisherContext());
00235     
00236     pc->enqueueUpdate(marker_update);
00237 
00238     context_it = publisher_contexts_.insert( std::make_pair(marker_update->server_id,pc) ).first;
00239 
00240     reinit();
00241 
00242     
00243     
00244     return;
00245   }
00246 
00247   PublisherContextPtr context = context_it->second;
00248 
00249   if ( !context->initialized )
00250   {
00251     if( context->checkInitWith( marker_update ))
00252     {
00253       
00254       receiver_->setStatusOk( context_it->first, "Initialization complete.");
00255 
00256       
00257       
00258       
00259       
00260       context->last_update_seq_num = context->last_init_seq_num;
00261 
00262       
00263       maybeUnsubscribeFromInit();
00264     }
00265     else
00266     {
00267       
00268       
00269       receiver_->setStatusWarn( marker_update->server_id, "Received update or keep-alive without previous INIT message. It might be lost.");
00270       context->enqueueUpdate(marker_update);
00271       return;
00272     }
00273   }
00274 
00275   
00276   
00277   applyUpdate( marker_update, context );
00278 }
00279 
00280 void InteractiveMarkerClient::clear()
00281 {
00282   publisher_contexts_.clear();
00283   reinit();
00284 }
00285 
00286 void InteractiveMarkerClient::unsubscribedFromInit()
00287 {
00288   subscribed_to_init_ = false;
00289 }
00290 
00291 void InteractiveMarkerClient::reinit()
00292 {
00293   if( !cleared_ )
00294   {
00295     receiver_->clearMarkers();
00296     cleared_ = true;
00297   }
00298   if( !subscribed_to_init_ )
00299   {
00300     subscribed_to_init_ = receiver_->subscribeToInit();
00301   }
00302 
00303   
00304   M_PublisherContext::iterator context_it;
00305   for( context_it = publisher_contexts_.begin(); context_it != publisher_contexts_.end(); context_it++ )
00306   {
00307     context_it->second->initialized = false;
00308   }
00309 }
00310 
00311 
00312 void InteractiveMarkerClient::maybeUnsubscribeFromInit() {
00313   M_PublisherContext::iterator context_it;
00314   for( context_it = publisher_contexts_.begin(); context_it != publisher_contexts_.end(); context_it++ )
00315   {
00316     if( !context_it->second->initialized )
00317     {
00318       return;
00319     }
00320   }
00321   receiver_->unsubscribeFromInit();
00322   subscribed_to_init_ = false;
00323 }
00324 
00325 void InteractiveMarkerClient::applyUpdate( const visualization_msgs::InteractiveMarkerUpdate::ConstPtr& marker_update,
00326                                            PublisherContextPtr& context )
00327 {
00328   uint64_t expected_seq_num = 0;
00329 
00330   switch ( marker_update->type )
00331   {
00332     case visualization_msgs::InteractiveMarkerUpdate::UPDATE:
00333       expected_seq_num = context->last_update_seq_num + 1;
00334       break;
00335 
00336     case visualization_msgs::InteractiveMarkerUpdate::KEEP_ALIVE:
00337       expected_seq_num = context->last_update_seq_num;
00338       break;
00339   }
00340 
00341   if ( marker_update->seq_num != expected_seq_num )
00342   {
00343     if( marker_update->seq_num < expected_seq_num )
00344     {
00345       ROS_INFO("Received sequence number %lu, less than expected sequence number %lu. Ignoring.",
00346                marker_update->seq_num, expected_seq_num);
00347       return;
00348     }
00349 
00350     
00351     std::ostringstream s;
00352     s << "Detected lost update or server restart. Resetting. Reason: Received wrong sequence number (expected: " <<
00353         expected_seq_num << ", received: " << marker_update->seq_num << ")";
00354     receiver_->setStatusError( marker_update->server_id, s.str());
00355     reinit();
00356     return;
00357   }
00358 
00359   context->last_update_seq_num = marker_update->seq_num;
00360   context->update_seen = true;
00361   context->last_update_time = ros::Time::now();
00362 
00363   if( marker_update->type == visualization_msgs::InteractiveMarkerUpdate::UPDATE )
00364   {
00365     receiver_->processMarkerChanges( &marker_update->markers, &marker_update->poses, &marker_update->erases );
00366     cleared_ = false;
00367   }
00368 }
00369 
00370 bool InteractiveMarkerClient::isPublisherListEmpty()
00371 {
00372   return publisher_contexts_.empty();
00373 }
00374 
00375 void InteractiveMarkerClient::flagLateConnections()
00376 {
00377   M_PublisherContext::iterator it;
00378   for ( it = publisher_contexts_.begin(); it != publisher_contexts_.end(); it++ )
00379   {
00380     PublisherContextPtr& context = it->second;
00381     double time_since_last_update = (ros::Time::now() - context->last_update_time).toSec();
00382     if ( time_since_last_update > 1.0 )
00383     {
00384       std::stringstream s;
00385       s << "No update received for " << (int)time_since_last_update << " seconds. Connection might be lost.";
00386       receiver_->setStatusWarn( it->first, s.str() );
00387       context->update_time_ok = false;
00388     }
00389     if ( !context->update_time_ok && time_since_last_update <= 1.0 )
00390     {
00391       receiver_->setStatusOk( it->first, "OK" );
00392     }
00393   }
00394 }
00395 
00396 }