MQChannelElement.hpp
Go to the documentation of this file.
1 /***************************************************************************
2  tag: Peter Soetens Thu Oct 22 11:59:07 CEST 2009 MQChannelElement.hpp
3 
4  MQChannelElement.hpp - description
5  -------------------
6  begin : Thu October 22 2009
7  copyright : (C) 2009 Peter Soetens
8  email : peter@thesourcworks.com
9 
10  ***************************************************************************
11  * This library is free software; you can redistribute it and/or *
12  * modify it under the terms of the GNU General Public *
13  * License as published by the Free Software Foundation; *
14  * version 2 of the License. *
15  * *
16  * As a special exception, you may use this file as part of a free *
17  * software library without restriction. Specifically, if other files *
18  * instantiate templates or use macros or inline functions from this *
19  * file, or you compile this file and link it with other files to *
20  * produce an executable, this file does not by itself cause the *
21  * resulting executable to be covered by the GNU General Public *
22  * License. This exception does not however invalidate any other *
23  * reasons why the executable file might be covered by the GNU General *
24  * Public License. *
25  * *
26  * This library is distributed in the hope that it will be useful, *
27  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
28  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
29  * Lesser General Public License for more details. *
30  * *
31  * You should have received a copy of the GNU General Public *
32  * License along with this library; if not, write to the Free Software *
33  * Foundation, Inc., 59 Temple Place, *
34  * Suite 330, Boston, MA 02111-1307 USA *
35  * *
36  ***************************************************************************/
37 
38 
39 #ifndef MQ_CHANNEL_ELEMENT_H
40 #define MQ_CHANNEL_ELEMENT_H
41 
42 #include "MQSendRecv.hpp"
43 #include "../../Logger.hpp"
44 #include "../../base/ChannelElement.hpp"
45 #include "../../internal/DataSource.hpp"
46 #include "../../internal/DataSources.hpp"
47 #include <stdexcept>
48 
49 namespace RTT
50 {
51  namespace mqueue
52  {
61  template<typename T>
63  {
68 
69  public:
75  const ConnPolicy& policy, bool is_sender)
76  : MQSendRecv(transport)
77  , read_sample(new internal::ValueDataSource<T>)
78  , write_sample(new internal::LateConstReferenceDataSource<T>)
79 
80  {
81  Logger::In in("MQChannelElement");
82  setupStream(read_sample, port, policy, is_sender);
83  }
84 
86  cleanupStream();
87  }
88 
89  virtual bool inputReady(base::ChannelElementBase::shared_ptr const& caller) {
90  if ( mqReady(read_sample, this) ) {
91  typename base::ChannelElement<T>::shared_ptr output = caller->narrow<T>();
92  assert(output);
93  output->data_sample(read_sample->rvalue());
94  return true;
95  }
96  return false;
97  }
98 
99  virtual WriteStatus data_sample(typename base::ChannelElement<T>::param_t sample, bool reset = true)
100  {
101  // send initial data sample to the other side using a plain write.
102  if (mis_sender && (!write_sample->getRawDataConst() || reset)) {
103  write_sample->setPointer(&sample);
104  // update MQSendRecv buffer:
105  mqNewSample(write_sample);
106  return mqWrite(write_sample) ? WriteSuccess : WriteFailure;
107  }
108  return NotConnected;
109  }
110 
128  bool signal()
129  {
130  // copy messages into channel
131  if (mis_sender) {
132  // this read should always succeed since signal() means
133  // 'data available in a data element'.
135  this->getInput();
136  if( input && input->read(read_sample->set(), false) == NewData )
137  return ( this->write(read_sample->rvalue()) == WriteSuccess );
138  } else {
140  this->getOutput();
141  if (output && mqRead(read_sample))
142  return ( output->write(read_sample->rvalue()) == WriteSuccess );
143  }
144  return false;
145  }
146 
152  FlowStatus read(typename base::ChannelElement<T>::reference_t sample, bool copy_old_data)
153  {
154  throw std::runtime_error("not implemented");
155  }
156 
163  {
164  write_sample->setPointer(&sample);
165  if (!mqWrite(write_sample)) {
166  return WriteFailure;
167  }
168  return WriteSuccess;
169  }
170 
171  virtual bool isRemoteElement() const
172  {
173  return true;
174  }
175 
176  virtual std::string getRemoteURI() const
177  {
178  //check for output element case
180  if(base->getOutput())
182 
183  return mqname;
184  }
185 
186  virtual std::string getLocalURI() const
187  {
188  //check for input element case
190  if(base->getInput())
192 
193  return mqname;
194  }
195 
196  virtual std::string getElementName() const
197  {
198  return "MQChannelElement";
199  }
200  };
201  }
202 }
203 
204 #endif
205 
WriteStatus write(typename base::ChannelElement< T >::param_t sample)
boost::call_traits< T >::param_type param_t
boost::intrusive_ptr< ChannelElement< T > > shared_ptr
virtual void mqNewSample(base::DataSourceBase::shared_ptr ds)
Definition: MQSendRecv.cpp:188
bool mqRead(base::DataSourceBase::shared_ptr ds)
Definition: MQSendRecv.cpp:246
boost::intrusive_ptr< LateConstReferenceDataSource< T > > shared_ptr
void setupStream(base::DataSourceBase::shared_ptr ds, base::PortInterface *port, ConnPolicy const &policy, bool is_sender)
Definition: MQSendRecv.cpp:69
FlowStatus
Definition: FlowStatus.hpp:56
virtual std::string getLocalURI() const
FlowStatus read(typename base::ChannelElement< T >::reference_t sample, bool copy_old_data)
virtual bool inputReady(base::ChannelElementBase::shared_ptr const &caller)
virtual std::string getElementName() const
bool mqWrite(base::DataSourceBase::shared_ptr ds)
Definition: MQSendRecv.cpp:271
void set(typename AssignableDataSource< T >::param_t t)
virtual bool mqReady(base::DataSourceBase::shared_ptr ds, base::ChannelElementBase *chan)
Definition: MQSendRecv.cpp:198
virtual std::string getRemoteURI() const
virtual WriteStatus data_sample(typename base::ChannelElement< T >::param_t sample, bool reset=true)
virtual std::string getLocalURI() const
internal::ValueDataSource< T >::shared_ptr read_sample
virtual FlowStatus read(reference_t sample, bool copy_old_data=true)
boost::call_traits< T >::reference reference_t
boost::intrusive_ptr< ChannelElementBase > shared_ptr
internal::LateConstReferenceDataSource< T >::shared_ptr write_sample
void setPointer(const typename AssignableDataSource< T >::value_t *ptr)
virtual WriteStatus data_sample(param_t sample, bool reset=true)
virtual WriteStatus write(param_t sample)
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
Definition: Activity.cpp:53
AssignableDataSource< T >::const_reference_t rvalue() const
Definition: DataSources.hpp:95
MQChannelElement(base::PortInterface *port, types::TypeMarshaller const &transport, const ConnPolicy &policy, bool is_sender)
virtual bool isRemoteElement() const
virtual std::string getRemoteURI() const
WriteStatus
Definition: FlowStatus.hpp:66
boost::intrusive_ptr< ValueDataSource< T > > shared_ptr
Definition: DataSources.hpp:72


rtt
Author(s): RTT Developers
autogenerated on Tue Jun 25 2019 19:33:25