Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 #ifndef MICROSRTT_CONN_FACTORY_HPP
00020 #define MICROSRTT_CONN_FACTORY_HPP
00021
00022 #include "ros/ros.h"
00023 #include "micros_rtt/connection_base.h"
00024 #include "micros_rtt/oro/channel_element.hpp"
00025 #include "micros_rtt/oro/mq_channel_element.hpp"
00026 #include "micros_rtt/oro/conn_input_endpoint.hpp"
00027 #include "micros_rtt/oro/conn_output_endpoint.hpp"
00028 #include "micros_rtt/oro/data_lockfree.hpp"
00029
00030 namespace micros_rtt
00031 {
00032
00039 class ConnFactory
00040 {
00041 public:
00042 ConnFactory() {}
00043 ~ConnFactory() {}
00044
00045 template<typename M>
00046 static ChannelElementBase* buildDataStorage(const M& initial_value = M())
00047 {
00048 typename DataObjectLockFree<M>::shared_ptr data_object;
00049 data_object.reset( new DataObjectLockFree<M>(initial_value) );
00050 ChannelDataElement<M>* result = new ChannelDataElement<M>(data_object);
00051 return result;
00052 }
00053
00054 template<typename M>
00055 static ChannelElementBase::shared_ptr buildChannelInput(ConnectionBasePtr publication, ChannelElementBase::shared_ptr output_channel)
00056 {
00057 ChannelElementBase::shared_ptr endpoint = new ConnInputEndpoint<M>(publication);
00058 if (output_channel)
00059 endpoint->setOutput(output_channel);
00060 return endpoint;
00061 }
00062
00063 template<typename M>
00064 static ChannelElementBase::shared_ptr buildChannelOutput(ConnectionBasePtr subscription)
00065 {
00066 ChannelElementBase::shared_ptr endpoint = new ConnOutputEndpoint<M>(subscription);
00067 return endpoint;
00068 }
00069
00070 template<typename M>
00071 static ChannelElementBase::shared_ptr buildBufferedChannelOutput(ConnectionBasePtr subscription, M const& initial_value = M() )
00072 {
00073 ChannelElementBase::shared_ptr endpoint = new ConnOutputEndpoint<M>(subscription);
00074 ChannelElementBase::shared_ptr data_object = buildDataStorage<M>(initial_value);
00075 data_object->setOutput(endpoint);
00076 return data_object;
00077 }
00078
00079 template<typename M>
00080 static bool createConnection(ConnectionBasePtr publication, ConnectionBasePtr subscription)
00081 {
00082
00083 ChannelElementBase::shared_ptr output_half = 0;
00084
00085 output_half = buildBufferedChannelOutput<M>(subscription);
00086
00087 if (!output_half)
00088 {
00089 ROS_WARN("micros connection factory fail to build buffered channel output of topic:%s.", publication->getTopic().c_str());
00090 return false;
00091 }
00092
00093
00094
00095 ChannelElementBase::shared_ptr channel_input =
00096 buildChannelInput<M>(publication, output_half);
00097 ROS_DEBUG("micros connection factory has build all channel elements topic:%s needed, ready to check the connection.", publication->getTopic().c_str());
00098
00099 return createAndCheckConnection(publication, subscription, channel_input);
00100 }
00101
00102 template<class M>
00103 static bool createStream(ConnectionBasePtr connection, bool is_sender)
00104 {
00105 ChannelElementBase::shared_ptr chan;
00106 if (is_sender)
00107 {
00108 ROS_DEBUG("micros connection factory creating publication stream.");
00109 chan = buildChannelInput<M>(connection, ChannelElementBase::shared_ptr() );
00110
00111 ChannelElementBase::shared_ptr chan_stream = createMqStream<M>(connection, true);
00112 if ( !chan_stream )
00113 {
00114 ROS_WARN("micros connection factory failed to create channel stream for publication:%s", connection->getTopic().c_str());
00115 return false;
00116 }
00117
00118 chan->setOutput( chan_stream );
00119 return createAndCheckStream(connection, chan, true);
00120 }
00121 else
00122 {
00123 ROS_DEBUG("micros connection factory creating subscription stream.");
00124 chan = buildChannelOutput<M>(connection);
00125
00126
00127
00128 ChannelElementBase::shared_ptr chan_stream = createMqStream<M>(connection, false);
00129
00130 if ( !chan_stream )
00131 {
00132 ROS_WARN("micros connection factory failed to create channel stream for subscription:%s", connection->getTopic().c_str());
00133 return false;
00134 }
00135
00136 chan_stream->getOutputEndPoint()->setOutput( chan );
00137 return createAndCheckStream(connection, chan_stream, false);
00138 }
00139
00140 }
00141
00142 protected:
00143 static bool createAndCheckConnection(ConnectionBasePtr publication,
00144 ConnectionBasePtr subscription, ChannelElementBase::shared_ptr channel_input);
00145
00146 static bool createAndCheckStream(ConnectionBasePtr connection,
00147 ChannelElementBase::shared_ptr chan, bool is_sender);
00148
00149 template<typename M>
00150 static ChannelElementBase::shared_ptr createMqStream(ConnectionBasePtr connection, bool is_sender)
00151 {
00152 try
00153 {
00154 ChannelElementBase::shared_ptr mq = new MQChannelElement<M>(connection, is_sender);
00155 if (!is_sender)
00156 {
00157
00158
00159
00160
00161 }
00162 return mq;
00163 }
00164 catch(std::exception& e)
00165 {
00166 ROS_INFO("micros connection factory failed to create MQueue Channel element for topic:%s", connection->getTopic().c_str());
00167 }
00168 return ChannelElementBase::shared_ptr();
00169 }
00170
00171 };
00172
00173 }
00174 #endif
00175