connection_factory.hpp
Go to the documentation of this file.
00001 /* 
00002  *  connection_factory.hpp - micros connection factory
00003  *  Copyright (C) 2015 Zaile Jiang
00004  *  
00005  *  This program is free software; you can redistribute it and/or
00006  *  modify it under the terms of the GNU General Public License
00007  *  as published by the Free Software Foundation; either version 2
00008  *  of the License, or (at your option) any later version.
00009  *  
00010  *  This program is distributed in the hope that it will be useful,
00011  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00012  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00013  *  GNU General Public License for more details.
00014  *  
00015  *  You should have received a copy of the GNU General Public License
00016  *  along with this program; if not, write to the Free Software
00017  *  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
00018 */
00019 #ifndef MICROSRTT_CONN_FACTORY_HPP
00020 #define MICROSRTT_CONN_FACTORY_HPP
00021 
00022 #include "ros/ros.h"
00023 #include "micros_rtt/connection_base.h"
00024 #include "micros_rtt/oro/channel_element.hpp"
00025 #include "micros_rtt/oro/mq_channel_element.hpp"
00026 #include "micros_rtt/oro/conn_input_endpoint.hpp"
00027 #include "micros_rtt/oro/conn_output_endpoint.hpp"
00028 #include "micros_rtt/oro/data_lockfree.hpp"
00029 
00030 namespace micros_rtt
00031 {
00032 
00039 class ConnFactory
00040 {
00041 public:
00042   ConnFactory() {}
00043   ~ConnFactory() {}
00044 
00045   template<typename M>
00046   static ChannelElementBase* buildDataStorage(const M& initial_value = M())
00047   {
00048     typename DataObjectLockFree<M>::shared_ptr data_object;
00049     data_object.reset( new DataObjectLockFree<M>(initial_value) );
00050     ChannelDataElement<M>* result = new ChannelDataElement<M>(data_object);
00051     return result;
00052   }
00053 
00054   template<typename M>
00055   static ChannelElementBase::shared_ptr buildChannelInput(ConnectionBasePtr publication, ChannelElementBase::shared_ptr output_channel)
00056   {
00057     ChannelElementBase::shared_ptr endpoint = new ConnInputEndpoint<M>(publication);
00058     if (output_channel)
00059       endpoint->setOutput(output_channel);
00060     return endpoint;
00061   }
00062 
00063   template<typename M>
00064   static ChannelElementBase::shared_ptr buildChannelOutput(ConnectionBasePtr subscription)
00065   {
00066     ChannelElementBase::shared_ptr endpoint = new ConnOutputEndpoint<M>(subscription);
00067     return endpoint;
00068   }
00069   
00070   template<typename M>
00071   static ChannelElementBase::shared_ptr buildBufferedChannelOutput(ConnectionBasePtr subscription, M const& initial_value = M() )
00072   {
00073     ChannelElementBase::shared_ptr endpoint = new ConnOutputEndpoint<M>(subscription);
00074     ChannelElementBase::shared_ptr data_object = buildDataStorage<M>(initial_value);
00075     data_object->setOutput(endpoint);
00076     return data_object;
00077   }
00078 
00079   template<typename M>
00080   static bool createConnection(ConnectionBasePtr publication, ConnectionBasePtr subscription)
00081   {
00082     // This is the input channel element of the output half
00083     ChannelElementBase::shared_ptr output_half = 0;
00084     // local ports, create buffer here.
00085     output_half = buildBufferedChannelOutput<M>(subscription);
00086 
00087     if (!output_half)
00088     {
00089       ROS_WARN("micros connection factory fail to build buffered channel output of topic:%s.", publication->getTopic().c_str());
00090       return false;
00091     }
00092 
00093     // Since output is local, buildChannelInput is local as well.
00094     // This this the input channel element of the whole connection
00095     ChannelElementBase::shared_ptr channel_input =
00096       buildChannelInput<M>(publication, output_half);
00097     ROS_DEBUG("micros connection factory has build all channel elements topic:%s needed, ready to check the connection.", publication->getTopic().c_str());
00098 
00099     return createAndCheckConnection(publication, subscription, channel_input);
00100   }
00101 
00102   template<class M>
00103   static bool createStream(ConnectionBasePtr connection, bool is_sender)
00104   {
00105     ChannelElementBase::shared_ptr chan;
00106     if (is_sender)
00107     {
00108       ROS_DEBUG("micros connection factory creating publication stream.");
00109       chan = buildChannelInput<M>(connection, ChannelElementBase::shared_ptr() );
00110         
00111       ChannelElementBase::shared_ptr chan_stream = createMqStream<M>(connection, true);
00112       if ( !chan_stream ) 
00113       {
00114         ROS_WARN("micros connection factory failed to create channel stream  for publication:%s", connection->getTopic().c_str());
00115         return false;
00116       }
00117       
00118       chan->setOutput( chan_stream );
00119       return createAndCheckStream(connection, chan, true);
00120     }
00121     else
00122     {
00123       ROS_DEBUG("micros connection factory creating subscription stream.");
00124       chan = buildChannelOutput<M>(connection);
00125       
00126       // note: don't refcount this final input chan, because no one will
00127       // take a reference to it. It would be destroyed upon return of this function.
00128       ChannelElementBase::shared_ptr chan_stream = createMqStream<M>(connection, false);
00129       
00130       if ( !chan_stream ) 
00131       {
00132         ROS_WARN("micros connection factory failed to create channel stream for subscription:%s", connection->getTopic().c_str());
00133         return false;
00134       }
00135     
00136       chan_stream->getOutputEndPoint()->setOutput( chan );
00137       return createAndCheckStream(connection, chan_stream, false);
00138     }
00139     
00140   }
00141 
00142 protected:
00143   static bool createAndCheckConnection(ConnectionBasePtr publication, 
00144           ConnectionBasePtr subscription, ChannelElementBase::shared_ptr channel_input);
00145 
00146   static bool createAndCheckStream(ConnectionBasePtr connection, 
00147           ChannelElementBase::shared_ptr chan, bool is_sender);
00148   
00149   template<typename M>
00150   static ChannelElementBase::shared_ptr createMqStream(ConnectionBasePtr connection, bool is_sender) 
00151   {
00152     try
00153     {
00154       ChannelElementBase::shared_ptr mq = new MQChannelElement<M>(connection, is_sender);
00155       if (!is_sender)
00156       {
00157         //now we get message directly from message queue.
00158         //in the future, the receiver will have a buffer to store his messages in.
00159         //ChannelElementBase::shared_ptr buf = buildDataStorage<M>();
00160         //mq->setOutput(buf);
00161       }
00162       return mq;
00163     }
00164     catch(std::exception& e)
00165     {
00166       ROS_INFO("micros connection factory failed to create MQueue Channel element for topic:%s", connection->getTopic().c_str());
00167     }
00168     return ChannelElementBase::shared_ptr();
00169   } 
00170 
00171 };
00172 
00173 }
00174 #endif
00175 


micros_rtt
Author(s): Zaile Jiang , Xiaodong Yi , Minglong Li
autogenerated on Sat Jun 8 2019 19:02:21