RemoteChannelElement.hpp
Go to the documentation of this file.
1 /***************************************************************************
2  tag: Peter Soetens Thu Oct 22 11:59:07 CEST 2009 RemoteChannelElement.hpp
3 
4  RemoteChannelElement.hpp - description
5  -------------------
6  begin : Thu October 22 2009
7  copyright : (C) 2009 Peter Soetens
8  email : peter@thesourcworks.com
9 
10  ***************************************************************************
11  * This library is free software; you can redistribute it and/or *
12  * modify it under the terms of the GNU General Public *
13  * License as published by the Free Software Foundation; *
14  * version 2 of the License. *
15  * *
16  * As a special exception, you may use this file as part of a free *
17  * software library without restriction. Specifically, if other files *
18  * instantiate templates or use macros or inline functions from this *
19  * file, or you compile this file and link it with other files to *
20  * produce an executable, this file does not by itself cause the *
21  * resulting executable to be covered by the GNU General Public *
22  * License. This exception does not however invalidate any other *
23  * reasons why the executable file might be covered by the GNU General *
24  * Public License. *
25  * *
26  * This library is distributed in the hope that it will be useful, *
27  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
28  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
29  * Lesser General Public License for more details. *
30  * *
31  * You should have received a copy of the GNU General Public *
32  * License along with this library; if not, write to the Free Software *
33  * Foundation, Inc., 59 Temple Place, *
34  * Suite 330, Boston, MA 02111-1307 USA *
35  * *
36  ***************************************************************************/
37 
38 
39 #ifndef CORBA_REMOTE_CHANNEL_ELEMENT_H
40 #define CORBA_REMOTE_CHANNEL_ELEMENT_H
41 
42 #include "DataFlowI.h"
43 #include "CorbaTypeTransporter.hpp"
44 #include "CorbaDispatcher.hpp"
45 #include "CorbaConnPolicy.hpp"
46 #include "ApplicationServer.hpp"
47 
48 namespace RTT {
49 
50  namespace corba {
51 
59  template<typename T>
62  , public base::ChannelElement<T>
63  {
64 
68  bool valid;
69 
71 
72  PortableServer::ObjectId_var oid;
73 
74  std::string localUri;
75 
77 
78  public:
84  RemoteChannelElement(CorbaTypeTransporter const& transport, DataFlowInterface* sender, PortableServer::POA_ptr poa, const ConnPolicy &policy)
85  : CRemoteChannelElement_i(transport, poa)
86  , valid(true)
87  , msender(sender)
88  , policy(policy)
89  {
90  // Big note about cleanup: The RTT will dispose this object through
91  // the ChannelElement<T> refcounting. So we only need to inform the
92  // POA that our object is dead in disconnect().
93  // CORBA refcount-managed servants must start with a refcount of
94  // 1
95  this->ref();
96  oid = mpoa->activate_object(this);
97  // Force creation of dispatcher.
99 
100  localUri = static_cast<const char*>(CORBA::String_var( ApplicationServer::orb->object_to_string(_this()) ));
101  }
102 
104  {
105  }
106 
108  void _add_ref()
109  { this->ref(); }
111  void _remove_ref()
112  { this->deref(); }
113 
114 
119  CORBA::SystemException
120  ))
122 
123  bool signal()
124  {
125  // forward too.
127  // intercept signal if no remote side set.
128  if ( CORBA::is_nil(remote_side.in()) )
129  return true;
130  // Remember that signal() is called in the context of the one
131  // that wrote the data, so we must decouple here to keep hard-RT happy.
132  // the dispatch thread must read the data and send it over by calling transferSample().
133  CorbaDispatcher::Instance(msender)->dispatchChannel( this );
134 
135  return valid;
136  }
137 
138  virtual void transferSamples() {
139  if (!valid)
140  return;
141  //log(Debug) <<"transfering..." <<endlog();
142  // in push mode, transfer all data, in pull mode, only signal once for each sample.
143  if ( policy.pull == ConnPolicy::PULL ) {
144  try
145  {
146 #ifndef RTT_CORBA_PORTS_DISABLE_SIGNAL
147  remote_side->remoteSignal();
148 #endif
149  }
150 #ifdef CORBA_IS_OMNIORB
151  catch(CORBA::SystemException& e)
152  {
153  log(Error) << "caught CORBA exception while signalling our remote endpoint: " << e._name() << " " << e.NP_minorString() << endlog();
154  valid = false;
155  }
156 #endif
157  catch(CORBA::Exception& e)
158  {
159  log(Error) << "caught CORBA exception while signalling our remote endpoint: " << e._name() << endlog();
160  valid = false;
161  }
162  } else {
164  typename base::ChannelElement<T>::value_t sample;
165 
166  //log(Debug) <<"...read..."<<endlog();
167  while ( this->read(sample, false) == NewData && valid) {
168  //log(Debug) <<"...write..."<<endlog();
169  if ( this->write(sample) == NotConnected )
170  valid = false;
171  //log(Debug) <<"...next read?..."<<endlog();
172  }
173  }
174  //log(Debug) <<"... done." <<endlog();
175 
176  }
177 
178  void disconnect() {
179  // disconnect both local and remote side.
180  // !!!THIS RELIES ON BEHAVIOR OF REMOTEDISCONNECT BELOW doing both forward and !forward !!!
181  try {
182  if ( ! CORBA::is_nil(remote_side.in()) )
183  remote_side->remoteDisconnect(true);
184  }
185  catch(CORBA::Exception&) {}
186 
187  try { this->remoteDisconnect(true); }
188  catch(CORBA::Exception&) {}
189  }
190 
194  void remoteDisconnect(bool forward) ACE_THROW_SPEC ((
195  CORBA::SystemException
196  ))
197  {
199 
200  // Because we support out-of-band transports, we must cleanup more thoroughly.
201  // an oob channel may be sitting at our other end. If not, this is a nop.
203 
204  // Will fail at shutdown if all objects are already deactivated
205  try {
206  if (mdataflow)
207  mdataflow->deregisterChannel(_this());
208  mpoa->deactivate_object(oid);
209  }
210  catch(CORBA::Exception&) {}
211  }
212 
213  bool disconnect(const base::ChannelElementBase::shared_ptr& channel, bool forward)
214  {
215  bool success = false;
216 
217  try {
218  if ( ! CORBA::is_nil(remote_side.in()) ) {
219  remote_side->remoteDisconnect(forward);
220  success = true;
221  }
222  }
223  catch(CORBA::Exception&) {}
224 
225  if ( ! CORBA::is_nil(remote_side.in()) ) {
226  success = base::ChannelElement<T>::disconnect(channel, forward);
227  }
228 
229  // Will fail at shutdown if all objects are already deactivated
230  if (success) {
231  try {
232  if (mdataflow)
233  mdataflow->deregisterChannel(_this());
234  mpoa->deactivate_object(oid);
235  }
236  catch(CORBA::Exception&) {}
237  }
238 
239  return success;
240  }
241 
242  FlowStatus read(typename base::ChannelElement<T>::reference_t sample, bool copy_old_data)
243  {
244  if (!valid)
245  return NoData;
246 
247  // try to read locally first
248  FlowStatus fs;
249  CFlowStatus cfs;
250  if ( (fs = base::ChannelElement<T>::read(sample, copy_old_data)) )
251  return fs;
252 
253  // can only read through corba if remote_side is known
254  if ( CORBA::is_nil(remote_side.in()) ) {
255  return NoData;
256  }
257 
258  // go through corba
259  CORBA::Any_var remote_value;
260  try
261  {
262  if ( remote_side && (cfs = remote_side->read(remote_value, copy_old_data) ) )
263  {
264  if (cfs == CNewData || (cfs == COldData && copy_old_data)) {
265  internal::LateReferenceDataSource<T> ref_data_source(&sample);
266  ref_data_source.ref();
267  transport.updateFromAny(&remote_value.in(), &ref_data_source);
268  }
269  return (FlowStatus)cfs;
270  }
271  else
272  return NoData;
273  }
274 #ifdef CORBA_IS_OMNIORB
275  catch(CORBA::SystemException& e)
276  {
277  log(Error) << "caught CORBA exception while reading a remote channel: " << e._name() << " " << e.NP_minorString() << endlog();
278  valid = false;
279  return NoData;
280  }
281 #endif
282  catch(CORBA::Exception& e)
283  {
284  log(Error) << "caught CORBA exception while reading a remote channel: " << e._name() << endlog();
285  valid = false;
286  return NoData;
287  }
288  }
289 
293  CFlowStatus read(::CORBA::Any_out sample, bool copy_old_data) ACE_THROW_SPEC ((
294  CORBA::SystemException
295  ))
296  {
297 
298  FlowStatus fs;
299  typename internal::ValueDataSource<T> value_data_source;
300  value_data_source.ref();
301  fs = base::ChannelElement<T>::read(value_data_source.set(), copy_old_data);
302  if (fs == NewData || (fs == OldData && copy_old_data)) {
303  sample = transport.createAny(&value_data_source);
304  if ( sample != 0) {
305  return (CFlowStatus)fs;
306  }
307  // this is a programmatic error and should never happen during run-time.
308  log(Error) << "CORBA Transport failed to create Any for " << value_data_source.getTypeName() << " while it should have!" <<endlog();
309  }
310  // we *must* return something in sample.
311  sample = new CORBA::Any();
312  return (CFlowStatus)fs;
313  }
314 
316  {
317  WriteStatus result;
318 
319  // try to write locally first
320  result = base::ChannelElement<T>::write(sample);
321  if (result != NotConnected)
322  return result;
323 
324  // can only write through corba if remote_side is known
325  if ( CORBA::is_nil(remote_side.in()) ) {
326  return NotConnected;
327  }
328 
329  // go through corba
330  assert( remote_side.in() != 0 && "Got write() without remote side. Need buffer OR remote side but neither was present.");
331  try
332  {
333  // This is used on the writing side, to avoid allocating an Any for
334  // each write
335  CORBA::Any write_any;
336  internal::LateConstReferenceDataSource<T> const_ref_data_source(&sample);
337  // There is a trick. We allocate on the stack, but need to
338  // provide shared pointers. Manually increment refence count
339  // (the stack "owns" the object)
340  const_ref_data_source.ref();
341 
342  if (!transport.updateAny(&const_ref_data_source, write_any)) {
343  return WriteFailure;
344  }
345 
346 #ifndef RTT_CORBA_PORTS_WRITE_ONEWAY
347  CWriteStatus cfs = remote_side->write(write_any);
348  return (WriteStatus)cfs;
349 #else
350  remote_side->writeOneway(write_any);
351  return WriteSuccess;
352 #endif
353  }
354 #ifdef CORBA_IS_OMNIORB
355  catch(CORBA::SystemException& e)
356  {
357  log(Error) << "caught CORBA exception while marshalling: " << e._name() << " " << e.NP_minorString() << endlog();
358  return NotConnected;
359  }
360 #endif
361  catch(CORBA::Exception& e)
362  {
363  log(Error) << "caught CORBA exception while marshalling: " << e._name() << endlog();
364  return NotConnected;
365  }
366  }
367 
371  CWriteStatus write(const ::CORBA::Any& sample) ACE_THROW_SPEC ((
372  CORBA::SystemException
373  ))
374  {
375  typename internal::ValueDataSource<T> value_data_source;
376  value_data_source.ref();
377  if (!transport.updateFromAny(&sample, &value_data_source)) {
378  return CWriteFailure;
379  }
380  WriteStatus fs = base::ChannelElement<T>::write(value_data_source.rvalue());
381  return (CWriteStatus)fs;
382  }
383 
387  void writeOneway(const ::CORBA::Any& sample) ACE_THROW_SPEC ((
388  CORBA::SystemException
389  ))
390  {
391  (void) write(sample);
392  }
393 
395  {
396  // we don't pass it on through CORBA (yet).
397  // If an oob transport is used, that one will send it through.
399  }
400 
402  {
403  // try locally first
405  return true;
406  }
407 
408  // if we do not have a reference to the remote side, assume that it's alright.
409  if ( CORBA::is_nil(remote_side.in()) ) return true;
410 
411  // go through corba
412  assert( remote_side.in() != 0 && "Got inputReady() without remote side.");
413  try {
414  return remote_side->inputReady();
415  }
416 #ifdef CORBA_IS_OMNIORB
417  catch(CORBA::SystemException& e)
418  {
419  log(Error) << "caught CORBA exception while checking a remote channel: " << e._name() << " " << e.NP_minorString() << endlog();
420  return false;
421  }
422 #endif
423  catch(CORBA::Exception& e)
424  {
425  log(Error) << "caught CORBA exception while checking a remote channel: " << e._name() << endlog();
426  return false;
427  }
428  }
429 
433  virtual bool inputReady()
434  {
435  // signal to oob transport if any.
437  this->getInput();
438  if (input)
440  return true;
441  }
442 
443  virtual bool channelReady(base::ChannelElementBase::shared_ptr const& caller, ConnPolicy const& policy, internal::ConnID *conn_id)
444  {
445  // try to forward locally first
446  if (base::ChannelElement<T>::channelReady(caller, policy, conn_id))
447  return true;
448 
449  // we are not using the ConnID on the remote side, so we clean it up here
450  delete conn_id;
451 
452  // go through corba
453  assert( remote_side.in() != 0 && "Got channelReady() request without remote side.");
454 
455  try
456  {
457  return remote_side->channelReady(toCORBA(policy));
458  }
459 #ifdef CORBA_IS_OMNIORB
460  catch(CORBA::SystemException& e)
461  {
462  log(Error) << "caught CORBA exception while marshalling: " << e._name() << " " << e.NP_minorString() << endlog();
463  return false;
464  }
465 #endif
466  catch(CORBA::Exception& e)
467  {
468  log(Error) << "caught CORBA exception while marshalling: " << e._name() << endlog();
469  return false;
470  }
471  }
472 
476  virtual bool channelReady(const CConnPolicy& cp) ACE_THROW_SPEC ((
477  CORBA::SystemException
478  ))
479  {
480  ConnPolicy policy = toRTT(cp);
481  return base::ChannelElement<T>::channelReady(this, policy);
482  }
483 
484  virtual bool isRemoteElement() const
485  {
486  return true;
487  }
488 
489  virtual std::string getRemoteURI() const
490  {
491  //check for output element case
493  if(base->getOutput())
495 
496  std::string uri = static_cast<const char*>(CORBA::String_var( ApplicationServer::orb->object_to_string(remote_side) ));
497  return uri;
498  }
499 
500  virtual std::string getLocalURI() const
501  {
502  //check for input element case
504  if(base->getInput())
506 
507  return localUri;
508  }
509 
510  virtual std::string getElementName() const
511  {
512  return "CorbaRemoteChannelElement";
513  }
514  };
515  }
516 }
517 
518 #endif
519 
virtual std::string getRemoteURI() const
virtual std::string getLocalURI() const
boost::call_traits< T >::param_type param_t
boost::intrusive_ptr< ChannelElement< T > > shared_ptr
virtual bool channelReady(ChannelElementBase::shared_ptr const &caller, ConnPolicy const &policy, internal::ConnID *conn_id=0)
virtual bool channelReady(const CConnPolicy &cp) ACE_THROW_SPEC((CORBA
virtual bool channelReady(base::ChannelElementBase::shared_ptr const &caller, ConnPolicy const &policy, internal::ConnID *conn_id)
static const bool PULL
Definition: ConnPolicy.hpp:120
FlowStatus
Definition: FlowStatus.hpp:56
void deregisterChannel(CChannelElement_ptr channel)
Definition: DataFlowI.cpp:232
virtual CORBA::Any_ptr createAny(base::DataSourceBase::shared_ptr source) const =0
virtual WriteStatus data_sample(typename base::ChannelElement< T >::param_t sample)
CWriteStatus write(const ::CORBA::Any &sample) ACE_THROW_SPEC((CORBA
CFlowStatus read(::CORBA::Any_out sample, bool copy_old_data) ACE_THROW_SPEC((CORBA
RTT::corba::CorbaTypeTransporter const & transport
Definition: DataFlowI.h:75
static CorbaDispatcher * Instance(DataFlowInterface *iface)
void set(typename AssignableDataSource< T >::param_t t)
RTT::ConnPolicy toRTT(RTT::corba::CConnPolicy const &corba_policy)
virtual std::string getRemoteURI() const
PortableServer::ObjectId_var oid
virtual void disconnect(bool forward)
virtual bool inputReady(base::ChannelElementBase::shared_ptr const &caller)
#define ACE_THROW_SPEC(x)
Definition: corba.h:67
void remoteSignal() ACE_THROW_SPEC((CORBA
void dispatchChannel(base::ChannelElementBase::shared_ptr chan)
bool disconnect(const base::ChannelElementBase::shared_ptr &channel, bool forward)
virtual std::string getLocalURI() const
RemoteChannelElement(CorbaTypeTransporter const &transport, DataFlowInterface *sender, PortableServer::POA_ptr poa, const ConnPolicy &policy)
virtual FlowStatus read(reference_t sample, bool copy_old_data=true)
Definition: corba.h:62
CDataFlowInterface_i * mdataflow
Definition: DataFlowI.h:77
void writeOneway(const ::CORBA::Any &sample) ACE_THROW_SPEC((CORBA
boost::call_traits< T >::reference reference_t
virtual value_t data_sample()
boost::intrusive_ptr< ChannelElementBase > shared_ptr
RTT::corba::CConnPolicy toCORBA(RTT::ConnPolicy const &policy)
virtual std::string getElementName() const
FlowStatus read(typename base::ChannelElement< T >::reference_t sample, bool copy_old_data)
CRemoteChannelElement_var remote_side
Definition: DataFlowI.h:74
virtual std::string getTypeName() const
virtual bool updateFromAny(const CORBA::Any *blob, base::DataSourceBase::shared_ptr target) const =0
virtual WriteStatus write(param_t sample)
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
Definition: Activity.cpp:53
AssignableDataSource< T >::const_reference_t rvalue() const
Definition: DataSources.hpp:95
static Logger & log()
Definition: Logger.hpp:350
virtual bool updateAny(base::DataSourceBase::shared_ptr source, CORBA::Any &any) const =0
static Logger::LogFunction endlog()
Definition: Logger.hpp:362
PortableServer::POA_var mpoa
Definition: DataFlowI.h:76
void remoteDisconnect(bool forward) ACE_THROW_SPEC((CORBA
WriteStatus
Definition: FlowStatus.hpp:66
WriteStatus write(typename base::ChannelElement< T >::param_t sample)


rtt
Author(s): RTT Developers
autogenerated on Fri Oct 25 2019 03:59:34