single_client.cpp
Go to the documentation of this file.
00001 /*
00002  * Copyright (c) 2011, Willow Garage, Inc.
00003  * All rights reserved.
00004  *
00005  * Redistribution and use in source and binary forms, with or without
00006  * modification, are permitted provided that the following conditions are met:
00007  *
00008  *     * Redistributions of source code must retain the above copyright
00009  *       notice, this list of conditions and the following disclaimer.
00010  *     * Redistributions in binary form must reproduce the above copyright
00011  *       notice, this list of conditions and the following disclaimer in the
00012  *       documentation and/or other materials provided with the distribution.
00013  *     * Neither the name of the Willow Garage, Inc. nor the names of its
00014  *       contributors may be used to endorse or promote products derived from
00015  *       this software without specific prior written permission.
00016  *
00017  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00018  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00019  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00020  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00021  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00022  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00023  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00024  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00025  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00026  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00027  * POSSIBILITY OF SUCH DAMAGE.
00028  *
00029  * Author: David Gossow
00030  */
00031 
00032 #include "interactive_markers/detail/single_client.h"
00033 
00034 #include <boost/bind.hpp>
00035 #include <boost/make_shared.hpp>
00036 
00037 #define DBG_MSG( ... ) ROS_DEBUG( __VA_ARGS__ );
00038 //#define DBG_MSG( ... ) printf("   "); printf( __VA_ARGS__ ); printf("\n");
00039 
00040 namespace interactive_markers
00041 {
00042 
00043 SingleClient::SingleClient(
00044     const std::string& server_id,
00045     tf::Transformer& tf,
00046     const std::string& target_frame,
00047     const InteractiveMarkerClient::CbCollection& callbacks
00048 )
00049 : state_(server_id,INIT)
00050 , first_update_seq_num_(-1)
00051 , last_update_seq_num_(-1)
00052 , tf_(tf)
00053 , target_frame_(target_frame)
00054 , callbacks_(callbacks)
00055 , server_id_(server_id)
00056 , warn_keepalive_(false)
00057 {
00058   callbacks_.statusCb( InteractiveMarkerClient::OK, server_id_, "Waiting for init message." );
00059 }
00060 
00061 SingleClient::~SingleClient()
00062 {
00063   callbacks_.resetCb( server_id_ );
00064 }
00065 
00066 void SingleClient::process(const visualization_msgs::InteractiveMarkerInit::ConstPtr& msg, bool enable_autocomplete_transparency)
00067 {
00068   DBG_MSG( "%s: received init #%lu", server_id_.c_str(), msg->seq_num );
00069 
00070   switch (state_)
00071   {
00072   case INIT:
00073     if ( init_queue_.size() > 5 )
00074     {
00075       DBG_MSG( "Init queue too large. Erasing init message with id %lu.", init_queue_.begin()->msg->seq_num );
00076       init_queue_.pop_back();
00077     }
00078     init_queue_.push_front( InitMessageContext(tf_, target_frame_, msg, enable_autocomplete_transparency ) );
00079     callbacks_.statusCb( InteractiveMarkerClient::OK, server_id_, "Init message received." );
00080     break;
00081 
00082   case RECEIVING:
00083   case TF_ERROR:
00084     break;
00085   }
00086 }
00087 
00088 void SingleClient::process(const visualization_msgs::InteractiveMarkerUpdate::ConstPtr& msg, bool enable_autocomplete_transparency)
00089 {
00090   if ( first_update_seq_num_ == (uint64_t)-1 )
00091   {
00092     first_update_seq_num_ = msg->seq_num;
00093   }
00094 
00095   last_update_time_ = ros::Time::now();
00096 
00097   if ( msg->type == msg->KEEP_ALIVE )
00098   {
00099     DBG_MSG( "%s: received keep-alive #%lu", server_id_.c_str(), msg->seq_num );
00100     if (last_update_seq_num_ != (uint64_t)-1 && msg->seq_num != last_update_seq_num_ )
00101     {
00102       std::ostringstream s;
00103       s << "Sequence number of update is out of order. Expected: " << last_update_seq_num_ << " Received: " << msg->seq_num;
00104       errorReset( s.str() );
00105       return;
00106     }
00107     last_update_seq_num_ = msg->seq_num;
00108     return;
00109   }
00110   else
00111   {
00112     DBG_MSG( "%s: received update #%lu", server_id_.c_str(), msg->seq_num );
00113     if (last_update_seq_num_ != (uint64_t)-1 && msg->seq_num != last_update_seq_num_+1 )
00114     {
00115       std::ostringstream s;
00116       s << "Sequence number of update is out of order. Expected: " << last_update_seq_num_+1 << " Received: " << msg->seq_num;
00117       errorReset( s.str() );
00118       return;
00119     }
00120     last_update_seq_num_ = msg->seq_num;
00121   }
00122 
00123   switch (state_)
00124   {
00125   case INIT:
00126     if ( update_queue_.size() > 100 )
00127     {
00128       DBG_MSG( "Update queue too large. Erasing update message with id %lu.", update_queue_.begin()->msg->seq_num );
00129       update_queue_.pop_back();
00130     }
00131     update_queue_.push_front( UpdateMessageContext(tf_, target_frame_, msg, enable_autocomplete_transparency) );
00132     break;
00133 
00134   case RECEIVING:
00135     update_queue_.push_front( UpdateMessageContext(tf_, target_frame_, msg, enable_autocomplete_transparency) );
00136     break;
00137 
00138   case TF_ERROR:
00139     break;
00140   }
00141 }
00142 
00143 void SingleClient::update()
00144 {
00145   switch (state_)
00146   {
00147   case INIT:
00148     transformInitMsgs();
00149     transformUpdateMsgs();
00150     checkInitFinished();
00151     break;
00152 
00153   case RECEIVING:
00154     transformUpdateMsgs();
00155     pushUpdates();
00156     checkKeepAlive();
00157     if ( update_queue_.size() > 100 )
00158     {
00159       errorReset( "Update queue overflow. Resetting connection." );
00160     }
00161     break;
00162 
00163   case TF_ERROR:
00164     if ( state_.getDuration().toSec() > 1.0 )
00165     {
00166       callbacks_.statusCb( InteractiveMarkerClient::ERROR, server_id_, "1 second has passed. Re-initializing." );
00167       state_ = INIT;
00168     }
00169     break;
00170   }
00171 }
00172 
00173 void SingleClient::checkKeepAlive()
00174 {
00175   double time_since_upd = (ros::Time::now() - last_update_time_).toSec();
00176   if ( time_since_upd > 2.0 )
00177   {
00178     std::ostringstream s;
00179     s << "No update received for " << round(time_since_upd) << " seconds.";
00180     callbacks_.statusCb( InteractiveMarkerClient::WARN, server_id_, s.str() );
00181     warn_keepalive_ = true;
00182   }
00183   else if ( warn_keepalive_ )
00184   {
00185     warn_keepalive_ = false;
00186     callbacks_.statusCb( InteractiveMarkerClient::OK, server_id_, "OK" );
00187   }
00188 }
00189 
00190 void SingleClient::checkInitFinished()
00191 {
00192   // check for all init messages received so far if tf info is ready
00193   // and the consecutive update exists.
00194   // If so, omit all updates with lower sequence number,
00195   // switch to RECEIVING mode and treat the init message like a regular update.
00196 
00197   if (last_update_seq_num_ == (uint64_t)-1)
00198   {
00199     callbacks_.statusCb( InteractiveMarkerClient::OK, server_id_, "Initialization: Waiting for first update/keep-alive message." );
00200     return;
00201   }
00202 
00203   M_InitMessageContext::iterator init_it;
00204   for ( init_it = init_queue_.begin(); init_it!=init_queue_.end(); ++init_it )
00205   {
00206     uint64_t init_seq_num = init_it->msg->seq_num;
00207     bool next_up_exists = init_seq_num >= first_update_seq_num_ && init_seq_num <= last_update_seq_num_;
00208 
00209     if ( !init_it->isReady() )
00210     {
00211       callbacks_.statusCb( InteractiveMarkerClient::OK, server_id_, "Initialization: Waiting for tf info." );
00212     }
00213     else if ( next_up_exists )
00214     {
00215       DBG_MSG( "Init message with seq_id=%lu is ready & in line with updates. Switching to receive mode.", init_seq_num );
00216       while ( !update_queue_.empty() && update_queue_.back().msg->seq_num <= init_seq_num )
00217       {
00218         DBG_MSG( "Omitting update with seq_id=%lu", update_queue_.back().msg->seq_num );
00219         update_queue_.pop_back();
00220       }
00221 
00222       callbacks_.initCb( init_it->msg );
00223       callbacks_.statusCb( InteractiveMarkerClient::OK, server_id_, "Receiving updates." );
00224 
00225       init_queue_.clear();
00226       state_ = RECEIVING;
00227 
00228       pushUpdates();
00229       break;
00230     }
00231   }
00232 }
00233 
00234 void SingleClient::transformInitMsgs()
00235 {
00236   M_InitMessageContext::iterator it;
00237   for ( it = init_queue_.begin(); it!=init_queue_.end(); )
00238   {
00239     try
00240     {
00241       it->getTfTransforms();
00242     }
00243     catch ( std::runtime_error& e )
00244     {
00245       // we want to notify the user, but also keep the init message
00246       // in case it is the only one we will receive.
00247       std::ostringstream s;
00248       s << "Cannot get tf info for init message with sequence number " << it->msg->seq_num << ". Error: " << e.what();
00249       callbacks_.statusCb( InteractiveMarkerClient::WARN, server_id_, s.str() );
00250     }
00251     ++it;
00252   }
00253 }
00254 
00255 void SingleClient::transformUpdateMsgs( )
00256 {
00257   M_UpdateMessageContext::iterator it;
00258   for ( it = update_queue_.begin(); it!=update_queue_.end(); ++it )
00259   {
00260     try
00261     {
00262       it->getTfTransforms();
00263     }
00264     catch ( std::runtime_error& e )
00265     {
00266       std::ostringstream s;
00267       s << "Resetting due to tf error: " << e.what();
00268       errorReset( s.str() );
00269       return;
00270     }
00271     catch ( ... )
00272     {
00273       std::ostringstream s;
00274       s << "Resetting due to unknown exception";
00275       errorReset( s.str() );
00276     }
00277   }
00278 }
00279 
00280 void SingleClient::errorReset( std::string error_msg )
00281 {
00282   // if we get an error here, we re-initialize everything
00283   state_ = TF_ERROR;
00284   update_queue_.clear();
00285   init_queue_.clear();
00286   first_update_seq_num_ = -1;
00287   last_update_seq_num_ = -1;
00288   warn_keepalive_ = false;
00289 
00290   callbacks_.statusCb( InteractiveMarkerClient::ERROR, server_id_, error_msg );
00291   callbacks_.resetCb( server_id_ );
00292 }
00293 
00294 void SingleClient::pushUpdates()
00295 {
00296   if( !update_queue_.empty() && update_queue_.back().isReady() )
00297   {
00298     callbacks_.statusCb( InteractiveMarkerClient::OK, server_id_, "OK" );
00299   }
00300   while( !update_queue_.empty() && update_queue_.back().isReady() )
00301   {
00302     DBG_MSG("Pushing out update #%lu.", update_queue_.back().msg->seq_num );
00303     callbacks_.updateCb( update_queue_.back().msg );
00304     update_queue_.pop_back();
00305   }
00306 }
00307 
00308 bool SingleClient::isInitialized()
00309 {
00310   return (state_ != INIT);
00311 }
00312 
00313 }
00314 


interactive_markers
Author(s): David Gossow
autogenerated on Fri Aug 28 2015 11:11:26