ConnFactory.cpp
Go to the documentation of this file.
1 /***************************************************************************
2  tag: Peter Soetens Thu Oct 22 11:59:08 CEST 2009 ConnFactory.cpp
3 
4  ConnFactory.cpp - description
5  -------------------
6  begin : Thu October 22 2009
7  copyright : (C) 2009 Peter Soetens
8  email : peter@thesourcworks.com
9 
10  ***************************************************************************
11  * This library is free software; you can redistribute it and/or *
12  * modify it under the terms of the GNU General Public *
13  * License as published by the Free Software Foundation; *
14  * version 2 of the License. *
15  * *
16  * As a special exception, you may use this file as part of a free *
17  * software library without restriction. Specifically, if other files *
18  * instantiate templates or use macros or inline functions from this *
19  * file, or you compile this file and link it with other files to *
20  * produce an executable, this file does not by itself cause the *
21  * resulting executable to be covered by the GNU General Public *
22  * License. This exception does not however invalidate any other *
23  * reasons why the executable file might be covered by the GNU General *
24  * Public License. *
25  * *
26  * This library is distributed in the hope that it will be useful, *
27  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
28  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
29  * Lesser General Public License for more details. *
30  * *
31  * You should have received a copy of the GNU General Public *
32  * License along with this library; if not, write to the Free Software *
33  * Foundation, Inc., 59 Temple Place, *
34  * Suite 330, Boston, MA 02111-1307 USA *
35  * *
36  ***************************************************************************/
37 
38 
39 #include "../Port.hpp"
40 #include "ConnFactory.hpp"
41 #include "../base/InputPortInterface.hpp"
42 #include "../DataFlowInterface.hpp"
43 #include "../types/TypeMarshaller.hpp"
44 
45 using namespace std;
46 using namespace RTT;
47 using namespace RTT::internal;
48 
49 bool LocalConnID::isSameID(ConnID const& id) const
50 {
51  LocalConnID const* real_id = dynamic_cast<LocalConnID const*>(&id);
52  if (!real_id)
53  return false;
54  else return real_id->ptr == this->ptr;
55 }
56 
57 ConnID* LocalConnID::clone() const {
58  return new LocalConnID(this->ptr);
59 }
60 
61 bool StreamConnID::isSameID(ConnID const& id) const
62 {
63  StreamConnID const* real_id = dynamic_cast<StreamConnID const*>(&id);
64  if (!real_id)
65  return false;
66  else return real_id->name_id == this->name_id;
67 }
68 
69 ConnID* StreamConnID::clone() const {
70  return new StreamConnID(this->name_id);
71 }
72 
74 {
75  // Remote connection
76  // if the policy's transport is set to zero, use the input ports server protocol,
77  // otherwise, use the policy's protocol
78  int transport = policy.transport == 0 ? input_port.serverProtocol() : policy.transport;
79  types::TypeInfo const* type_info = output_port.getTypeInfo();
80  if (!type_info || input_port.getTypeInfo() != type_info)
81  {
82  log(Error) << "Type of port " << output_port.getName() << " is not registered into the type system, cannot marshal it into the right transporter" << endlog();
83  // There is no type info registered for this type
85  }
86  else if ( !type_info->getProtocol( transport ) )
87  {
88  log(Error) << "Type " << type_info->getTypeName() << " cannot be marshalled into the requested transporter (id:"<< transport<<")." << endlog();
89  // This type cannot be marshalled into the right transporter
91  }
92  else
93  {
94  return input_port.
95  buildRemoteChannelOutput(output_port, type_info, input_port, policy);
96  }
98 }
99 
100 bool ConnFactory::createAndCheckConnection(base::OutputPortInterface& output_port, base::InputPortInterface& input_port, base::ChannelElementBase::shared_ptr channel_input, base::ChannelElementBase::shared_ptr channel_output, ConnPolicy const& policy) {
101  // connect channel input to channel output
102  if (!channel_input->connectTo(channel_output, policy.mandatory)) {
103  channel_input->disconnect(channel_output, true);
104  channel_output->disconnect(channel_input, false);
105  return false;
106  }
107 
108  // Register the channel's input to the output port.
109  // This is a bit hacky. We have to find the next channel element in the pipeline as seen from the ConnOutputEndpoint:
110  base::ChannelElementBase::shared_ptr next_hop = channel_output;
111  if (channel_input != output_port.getEndpoint()) {
112  next_hop = channel_input;
113  while(next_hop->getInput() && next_hop->getInput() != output_port.getEndpoint()) {
114  next_hop = next_hop->getInput();
115  }
116  }
117  if ( !output_port.addConnection( input_port.getPortID(), next_hop, policy ) ) {
118  // setup failed.
119  log(Error) << "The output port "<< output_port.getName()
120  << " could not successfully use the connection to input port " << input_port.getName() <<endlog();
121  channel_input->disconnect(channel_output, true);
122  return false;
123  }
124 
125  // Notify input that the connection is now complete and test the connection
126  if ( !channel_output->channelReady( channel_input, policy, output_port.getPortID() ) ) {
127  log(Error) << "The input port "<< input_port.getName()
128  << " could not successfully read from the connection from output port " << output_port.getName() <<endlog();
129  output_port.disconnect( &input_port );
130  channel_output->disconnect(channel_input, false);
131  return false;
132  }
133 
134  log(Debug) << "Connected output port "<< output_port.getName()
135  << " successfully to " << input_port.getName() <<endlog();
136  return true;
137 }
138 
139 base::ChannelElementBase::shared_ptr ConnFactory::createAndCheckStream(base::OutputPortInterface& output_port, ConnPolicy const& policy, base::ChannelElementBase::shared_ptr channel_input, StreamConnID* conn_id) {
140  if (policy.transport == 0 ) {
141  log(Error) << "Need a transport for creating streams." <<endlog();
143  }
144  const types::TypeInfo* type = output_port.getTypeInfo();
145  if ( type->getProtocol(policy.transport) == 0 ) {
146  log(Error) << "Could not create transport stream for port "<< output_port.getName() << " with transport id " << policy.transport <<endlog();
147  log(Error) << "No such transport registered. Check your policy.transport settings or add the transport for type "<< type->getTypeName() <<endlog();
149  }
150  types::TypeMarshaller* ttt = dynamic_cast<types::TypeMarshaller*> ( type->getProtocol(policy.transport) );
151  if (ttt) {
152  int size_hint = ttt->getSampleSize( output_port.getDataSource() );
153  policy.data_size = size_hint;
154  } else {
155  log(Debug) <<"Could not determine sample size for type " << type->getTypeName() << endlog();
156  }
157  RTT::base::ChannelElementBase::shared_ptr chan_stream = type->getProtocol(policy.transport)->createStream(&output_port, policy, /* is_sender = */ true);
158 
159  if ( !chan_stream ) {
160  log(Error) << "Transport failed to create remote channel for output stream of port "<<output_port.getName() << endlog();
162  }
163 
164  conn_id->name_id = policy.name_id;
165  channel_input->connectTo( chan_stream, policy.mandatory );
166 
167  if ( !output_port.addConnection( conn_id, chan_stream, policy ) ) {
168  // setup failed: manual cleanup.
169  channel_input->disconnect( chan_stream, true );
170  log(Error) << "Failed to create output stream for output port "<< output_port.getName() <<endlog();
172  }
173 
174  log(Info) << "Created output stream for output port "<< output_port.getName() <<endlog();
175  return chan_stream;
176 }
177 
178 base::ChannelElementBase::shared_ptr ConnFactory::createAndCheckStream(base::InputPortInterface& input_port, ConnPolicy const& policy, base::ChannelElementBase::shared_ptr outhalf, StreamConnID* conn_id) {
179  if (policy.transport == 0 ) {
180  log(Error) << "Need a transport for creating streams." <<endlog();
182  }
183  const types::TypeInfo* type = input_port.getTypeInfo();
184  if ( type->getProtocol(policy.transport) == 0 ) {
185  log(Error) << "Could not create transport stream for port "<< input_port.getName() << " with transport id " << policy.transport <<endlog();
186  log(Error) << "No such transport registered. Check your policy.transport settings or add the transport for type "<< type->getTypeName() <<endlog();
188  }
189 
190  // note: don't refcount this final input chan, because no one will
191  // take a reference to it. It would be destroyed upon return of this function.
192  RTT::base::ChannelElementBase::shared_ptr chan = type->getProtocol(policy.transport)->createStream(&input_port, policy, /* is_sender = */ false);
193 
194  if ( !chan ) {
195  log(Error) << "Transport failed to create remote channel for input stream of port " << input_port.getName() << endlog();
197  }
198 
199  chan = chan->getOutputEndPoint();
200  conn_id->name_id = policy.name_id;
201 
202  chan->connectTo( outhalf, policy.mandatory );
203  if ( !outhalf->channelReady(chan, policy, conn_id) ) {
204  // setup failed: manual cleanup.
205  chan->disconnect(true);
206  log(Error) << "Failed to create input stream for input port " << input_port.getName() <<endlog();
208  }
209 
210  log(Info) << "Created input stream for input port " << input_port.getName() <<endlog();
211  return chan;
212 }
213 
214 bool ConnFactory::createSharedConnection(base::OutputPortInterface* output_port, base::InputPortInterface* input_port, SharedConnectionBase::shared_ptr shared_connection, ConnPolicy const& policy)
215 {
216  PortConnectionLock lock_output_port(output_port);
217  PortConnectionLock lock_input_port(input_port);
218  return createAndCheckSharedConnection(output_port, input_port, shared_connection, policy);
219 }
220 
221 bool ConnFactory::createAndCheckSharedConnection(base::OutputPortInterface* output_port, base::InputPortInterface* input_port, SharedConnectionBase::shared_ptr shared_connection, ConnPolicy const& policy)
222 {
223  if (!shared_connection) return false;
224 
225  // check if the found connection is compatible to the requested policy
226  if (
227  (policy.buffer_policy != Shared) ||
228  (shared_connection->getConnPolicy()->type != policy.type) ||
229  (shared_connection->getConnPolicy()->size != policy.size) ||
230  (shared_connection->getConnPolicy()->lock_policy != policy.lock_policy)
231  )
232  {
233  log(Error) << "You mixed incompatible connection policies for shared connection '" << shared_connection->getName() << "': "
234  << "The new connection requests a " << policy << " connection, "
235  << "but the existing connection is of type " << *(shared_connection->getConnPolicy()) << "." << endlog();
236  return false;
237  }
238 
239  // set name_id in ConnPolicy (mutable field)
240  policy.name_id = shared_connection->getName();
241 
242  // connect the output port...
243  if (output_port && output_port->getSharedConnection() != shared_connection) {
244  if ( !output_port->addConnection( shared_connection->getConnID(), shared_connection, policy ) ) {
245  // setup failed.
246  log(Error) << "The output port "<< output_port->getName()
247  << " could not successfully connect to shared connection '" << shared_connection->getName() << "'." << endlog();
248  return false;
249  }
250 
251  output_port->getEndpoint()->connectTo(shared_connection, policy.mandatory);
252  }
253 
254  // ... and the input port
255  if (input_port && input_port->isLocal() && input_port->getSharedConnection() != shared_connection) {
256  if ( !input_port->addConnection( shared_connection->getConnID(), shared_connection, policy ) ) {
257  // setup failed.
258  log(Error) << "The input port "<< input_port->getName()
259  << " could not successfully connect to shared connection '" << shared_connection->getName() << "'." << endlog();
260  return false;
261  }
262 
263  shared_connection->connectTo(input_port->getEndpoint(), policy.mandatory);
264  }
265 
266  return true;
267 }
268 
269 bool ConnFactory::findSharedConnection(base::OutputPortInterface *output_port, base::InputPortInterface *input_port, ConnPolicy const& policy, SharedConnectionBase::shared_ptr &shared_connection)
270 {
271  shared_connection.reset();
272 
273  if (output_port) {
274  shared_connection = output_port->getSharedConnection();
275  }
276 
277  if (input_port) {
278  if (!shared_connection) {
279  shared_connection = input_port->getSharedConnection();
280  } else {
281  assert(output_port); // must be set if shared_connection has been set before
282 
283  // For the case both, the output and the input port already have shared connections, check if it matches the one of the input port:
284  SharedConnectionBase::shared_ptr input_ports_shared_connection = input_port->getSharedConnection();
285  if (shared_connection == input_ports_shared_connection) {
286  RTT::log(RTT::Info) << "Output port '" << output_port->getName() << "' and input port '" << input_port->getName() << "' are already connected to the same shared connection." << RTT::endlog();
287  // return SharedConnectionBase::shared_ptr();
288  } else if (input_ports_shared_connection) {
289  RTT::log(RTT::Error) << "Output port '" << output_port->getName() << "' and input port '" << input_port->getName() << "' are already connected to different shared connections!" << RTT::endlog();
290  shared_connection.reset();
291  return true;
292  }
293  }
294  }
295 
296  if (!policy.name_id.empty()) {
297  if (!shared_connection) {
298  // lookup shared connection by the given name
299  shared_connection = SharedConnectionRepository::Instance()->get(policy.name_id);
300  } else if (shared_connection->getName() != policy.name_id) {
301  RTT::log(RTT::Error) << "At least one of the given ports is already connected to shared connection '" << shared_connection->getName() << "' but you requested to connect to '" << policy.name_id << "'!" << RTT::endlog();
302  shared_connection.reset();
303  return true;
304  }
305  }
306 
307  return bool(shared_connection);
308 }
virtual const types::TypeInfo * getTypeInfo() const =0
virtual bool addConnection(internal::ConnID *port_id, ChannelElementBase::shared_ptr channel, ConnPolicy const &policy)
boost::intrusive_ptr< SharedConnectionBase > shared_ptr
static base::ChannelElementBase::shared_ptr buildRemoteChannelOutput(base::OutputPortInterface &output_port, base::InputPortInterface &input_port, ConnPolicy const &policy)
Definition: ConnFactory.cpp:73
base::PortInterface const * ptr
Definition: ConnFactory.hpp:69
virtual bool isLocal() const
const std::string & getTypeName() const
Definition: TypeInfo.hpp:83
virtual internal::SharedConnectionBase::shared_ptr getSharedConnection() const
Definition: mystd.hpp:163
const std::string & getName() const
virtual DataSourceBase::shared_ptr getDataSource() const =0
virtual ChannelElementBase * getEndpoint() const =0
virtual bool connectTo(ChannelElementBase::shared_ptr const &output, bool mandatory=true)
virtual base::ChannelElementBase::shared_ptr createStream(base::PortInterface *port, const ConnPolicy &policy, bool is_sender) const =0
virtual bool addConnection(internal::ConnID *port_id, ChannelElementBase::shared_ptr channel_input, ConnPolicy const &policy)
virtual int serverProtocol() const
boost::intrusive_ptr< ChannelElementBase > shared_ptr
TypeTransporter * getProtocol(int protocol_id) const
Definition: TypeInfo.cpp:150
virtual unsigned int getSampleSize(base::DataSourceBase::shared_ptr sample, void *cookie=0) const =0
virtual internal::ConnID * getPortID() const
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
Definition: Activity.cpp:53
std::string name_id
Definition: ConnPolicy.hpp:256
static Logger & log()
Definition: Logger.hpp:350
static Logger::LogFunction endlog()
Definition: Logger.hpp:362


rtt
Author(s): RTT Developers
autogenerated on Fri Oct 25 2019 03:59:32