RemoteChannelElement.hpp
Go to the documentation of this file.
00001 /***************************************************************************
00002   tag: Peter Soetens  Thu Oct 22 11:59:07 CEST 2009  RemoteChannelElement.hpp
00003 
00004                         RemoteChannelElement.hpp -  description
00005                            -------------------
00006     begin                : Thu October 22 2009
00007     copyright            : (C) 2009 Peter Soetens
00008     email                : peter@thesourcworks.com
00009 
00010  ***************************************************************************
00011  *   This library is free software; you can redistribute it and/or         *
00012  *   modify it under the terms of the GNU General Public                   *
00013  *   License as published by the Free Software Foundation;                 *
00014  *   version 2 of the License.                                             *
00015  *                                                                         *
00016  *   As a special exception, you may use this file as part of a free       *
00017  *   software library without restriction.  Specifically, if other files   *
00018  *   instantiate templates or use macros or inline functions from this     *
00019  *   file, or you compile this file and link it with other files to        *
00020  *   produce an executable, this file does not by itself cause the         *
00021  *   resulting executable to be covered by the GNU General Public          *
00022  *   License.  This exception does not however invalidate any other        *
00023  *   reasons why the executable file might be covered by the GNU General   *
00024  *   Public License.                                                       *
00025  *                                                                         *
00026  *   This library is distributed in the hope that it will be useful,       *
00027  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
00028  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU     *
00029  *   Lesser General Public License for more details.                       *
00030  *                                                                         *
00031  *   You should have received a copy of the GNU General Public             *
00032  *   License along with this library; if not, write to the Free Software   *
00033  *   Foundation, Inc., 59 Temple Place,                                    *
00034  *   Suite 330, Boston, MA  02111-1307  USA                                *
00035  *                                                                         *
00036  ***************************************************************************/
00037 
00038 
00039 #ifndef CORBA_REMOTE_CHANNEL_ELEMENT_H
00040 #define CORBA_REMOTE_CHANNEL_ELEMENT_H
00041 
00042 #include "DataFlowI.h"
00043 #include "CorbaTypeTransporter.hpp"
00044 #include "CorbaDispatcher.hpp"
00045 
00046 namespace RTT {
00047 
00048     namespace corba {
00049 
00057         template<typename T>
00058         class RemoteChannelElement 
00059             : public CRemoteChannelElement_i
00060             , public base::ChannelElement<T>
00061         {
00062 
00066             bool valid;
00070             bool pull;
00071 
00072             DataFlowInterface* msender;
00073 
00074             PortableServer::ObjectId_var oid;
00075 
00076         public:
00082             RemoteChannelElement(CorbaTypeTransporter const& transport, DataFlowInterface* sender, PortableServer::POA_ptr poa, bool is_pull)
00083         : CRemoteChannelElement_i(transport, poa)
00084         , valid(true), pull(is_pull)
00085         , msender(sender)
00086             {
00087                 // Big note about cleanup: The RTT will dispose this object through
00088                     // the ChannelElement<T> refcounting. So we only need to inform the
00089                 // POA that our object is dead in disconnect().
00090                 // CORBA refcount-managed servants must start with a refcount of
00091                 // 1
00092                 this->ref();
00093                 oid = mpoa->activate_object(this);
00094                 // Force creation of dispatcher.
00095                 CorbaDispatcher::Instance(msender);
00096             }
00097 
00098             ~RemoteChannelElement()
00099             {
00100             }
00101 
00103             void _add_ref()
00104             { this->ref(); }
00106             void _remove_ref()
00107             { this->deref(); }
00108 
00109 
00113             CORBA::Boolean remoteSignal() ACE_THROW_SPEC ((
00114                       CORBA::SystemException
00115                     ))
00116             { return base::ChannelElement<T>::signal(); }
00117 
00118             bool signal()
00119             {
00120                 // forward too.
00121                 base::ChannelElementBase::signal();
00122                 // intercept signal if no remote side set.
00123                 if ( CORBA::is_nil(remote_side.in()) )
00124                     return true;
00125                 // Remember that signal() is called in the context of the one
00126                 // that wrote the data, so we must decouple here to keep hard-RT happy.
00127                 // the dispatch thread must read the data and send it over by calling transferSample().
00128                 CorbaDispatcher::Instance(msender)->dispatchChannel( this );
00129 
00130                 return valid;
00131             }
00132 
00133             virtual void transferSamples() {
00134                 if (!valid)
00135                     return;
00136                 //log(Debug) <<"transfering..." <<endlog();
00137                 // in push mode, transfer all data, in pull mode, only signal once for each sample.
00138                 if ( pull ) {
00139                     try
00140                     { valid = remote_side->remoteSignal(); }
00141 #ifdef CORBA_IS_OMNIORB
00142                     catch(CORBA::SystemException& e)
00143                     {
00144                         log(Error) << "caught CORBA exception while signalling our remote endpoint: " << e._name() << " " << e.NP_minorString() << endlog();
00145                         valid = false;
00146                     }
00147 #endif
00148                     catch(CORBA::Exception& e)
00149                     {
00150                         log(Error) << "caught CORBA exception while signalling our remote endpoint: " << e._name() << endlog();
00151                         valid = false;
00152                     }
00153                 } else {
00155                     typename base::ChannelElement<T>::value_t sample;
00156 
00157                     //log(Debug) <<"...read..."<<endlog();
00158                     while ( this->read(sample, false) == NewData && valid) {
00159                         //log(Debug) <<"...write..."<<endlog();
00160                         if ( this->write(sample) == false )
00161                             valid = false;
00162                         //log(Debug) <<"...next read?..."<<endlog();
00163                     }
00164                 }
00165                 //log(Debug) <<"... done." <<endlog();
00166 
00167             }
00168 
00172             void disconnect() ACE_THROW_SPEC ((
00173                       CORBA::SystemException
00174                     )) {
00175                 // disconnect both local and remote side.
00176                 // !!!THIS RELIES ON BEHAVIOR OF REMOTEDISCONNECT BELOW doing both writer_to_reader and !writer_to_reader !!!
00177                 try {
00178                     if ( ! CORBA::is_nil(remote_side.in()) )
00179                         remote_side->remoteDisconnect(true);
00180                 }
00181                 catch(CORBA::Exception&) {}
00182 
00183                 try { this->remoteDisconnect(true); }
00184                 catch(CORBA::Exception&) {}
00185             }
00186 
00187             void remoteDisconnect(bool writer_to_reader) ACE_THROW_SPEC ((
00188                       CORBA::SystemException
00189                     ))
00190             {
00191                 base::ChannelElement<T>::disconnect(writer_to_reader);
00192 
00193                 // Because we support out-of-band transports, we must cleanup more thoroughly.
00194                 // an oob channel may be sitting at our other end. If not, this is a nop.
00195                 base::ChannelElement<T>::disconnect(!writer_to_reader);
00196 
00197                 // Will fail at shutdown if all objects are already deactivated
00198                 try {
00199                     if (mdataflow)
00200                         mdataflow->deregisterChannel(_this());
00201                     mpoa->deactivate_object(oid);
00202                 }
00203                 catch(CORBA::Exception&) {}
00204             }
00205 
00209             void disconnect(bool writer_to_reader) ACE_THROW_SPEC ((
00210                       CORBA::SystemException
00211                     ))
00212             {
00213                 try {
00214                     if ( ! CORBA::is_nil(remote_side.in()) )
00215                         remote_side->remoteDisconnect(writer_to_reader);
00216                 }
00217                 catch(CORBA::Exception&) {}
00218 
00219                 base::ChannelElement<T>::disconnect(writer_to_reader);
00220 
00221                 // Will fail at shutdown if all objects are already deactivated
00222                 try {
00223                     if (mdataflow)
00224                         mdataflow->deregisterChannel(_this());
00225                     mpoa->deactivate_object(oid);
00226                 }
00227                 catch(CORBA::Exception&) {}
00228             }
00229 
00230             FlowStatus read(typename base::ChannelElement<T>::reference_t sample, bool copy_old_data)
00231             {
00232                 if (!valid)
00233                     return NoData;
00234 
00235                 // try to read locally first
00236                 FlowStatus fs;
00237                 CFlowStatus cfs;
00238                 if ( (fs = base::ChannelElement<T>::read(sample, copy_old_data)) )
00239                     return fs;
00240 
00241                 // go through corba
00242                 CORBA::Any_var remote_value;
00243                 try
00244                 {
00245                     if ( remote_side && (cfs = remote_side->read(remote_value, copy_old_data) ) )
00246                     {
00247                         if (cfs == CNewData || (cfs == COldData && copy_old_data)) {
00248                             internal::LateReferenceDataSource<T> ref_data_source(&sample);
00249                             ref_data_source.ref();
00250                             transport.updateFromAny(&remote_value.in(), &ref_data_source);
00251                         }
00252                         return (FlowStatus)cfs;
00253                     }
00254                     else
00255                         return NoData;
00256                 }
00257 #ifdef CORBA_IS_OMNIORB
00258                 catch(CORBA::SystemException& e)
00259                 {
00260                     log(Error) << "caught CORBA exception while reading a remote channel: " << e._name() << " " << e.NP_minorString() << endlog();
00261                     valid = false;
00262                     return NoData;
00263                 }
00264 #endif
00265                 catch(CORBA::Exception& e)
00266                 {
00267                     log(Error) << "caught CORBA exception while reading a remote channel: " << e._name() << endlog();
00268                     valid = false;
00269                     return NoData;
00270                 }
00271             }
00272 
00276             CFlowStatus read(::CORBA::Any_out sample, bool copy_old_data) ACE_THROW_SPEC ((
00277                       CORBA::SystemException
00278                     ))
00279             {
00280 
00281                 FlowStatus fs;
00282                 typename internal::ValueDataSource<T> value_data_source;
00283                 value_data_source.ref();
00284                 fs = base::ChannelElement<T>::read(value_data_source.set(), copy_old_data);
00285                 if (fs == NewData || (fs == OldData && copy_old_data)) {
00286                     sample = transport.createAny(&value_data_source);
00287                     if ( sample != 0) {
00288                         return (CFlowStatus)fs;
00289                     }
00290                     // this is a programmatic error and should never happen during run-time.
00291                     log(Error) << "CORBA Transport failed to create Any for " << value_data_source.getTypeName() << " while it should have!" <<endlog();
00292                 }
00293                 // we *must* return something in sample.
00294                 sample = new CORBA::Any();
00295                 return (CFlowStatus)fs;
00296             }
00297 
00298             bool write(typename base::ChannelElement<T>::param_t sample)
00299             {
00300                 // try to write locally first
00301                 if (base::ChannelElement<T>::write(sample))
00302                     return true;
00303                 // go through corba
00304                 assert( remote_side.in() != 0 && "Got write() without remote side. Need buffer OR remote side but neither was present.");
00305                 try
00306                 {
00310                     CORBA::Any write_any;
00311                     internal::LateConstReferenceDataSource<T> const_ref_data_source(&sample);
00312                     const_ref_data_source.ref();
00313 
00314                     // There is a trick. We allocate on the stack, but need to
00315                     // provide shared pointers. Manually increment refence count
00316                     // (the stack "owns" the object)
00317                     transport.updateAny(&const_ref_data_source, write_any);
00318                     remote_side->write(write_any); 
00319                     return true;
00320                 }
00321 #ifdef CORBA_IS_OMNIORB
00322                 catch(CORBA::SystemException& e)
00323                 {
00324                     log(Error) << "caught CORBA exception while marshalling: " << e._name() << " " << e.NP_minorString() << endlog();
00325                     return false;
00326                 }
00327 #endif
00328                 catch(CORBA::Exception& e)
00329                 {
00330                     log(Error) << "caught CORBA exception while marshalling: " << e._name() << endlog();
00331                     return false;
00332                 }
00333             }
00334 
00338             bool write(const ::CORBA::Any& sample) ACE_THROW_SPEC ((
00339                       CORBA::SystemException
00340                     ))
00341             {
00342                 typename internal::ValueDataSource<T> value_data_source;
00343                 value_data_source.ref();
00344                 transport.updateFromAny(&sample, &value_data_source);
00345                 return base::ChannelElement<T>::write(value_data_source.rvalue());
00346             }
00347 
00348             virtual bool data_sample(typename base::ChannelElement<T>::param_t sample)
00349             {
00350                 // we don't pass it on through CORBA (yet).
00351                 // If an oob transport is used, that one will send it through.
00352                 typename base::ChannelElement<T>::shared_ptr output =
00353                     this->getOutput();
00354                 if (output)
00355                     return base::ChannelElement<T>::data_sample(sample);
00356                 return true;
00357             }
00358 
00362             virtual bool inputReady() {
00363                 // signal to oob transport if any.
00364                 typename base::ChannelElement<T>::shared_ptr input =
00365                     this->getInput();
00366                 if (input)
00367                     return base::ChannelElement<T>::inputReady();
00368                 return true;
00369             }
00370 
00371         };
00372     }
00373 }
00374 
00375 #endif
00376 


rtt
Author(s): RTT Developers
autogenerated on Sat Jun 8 2019 18:46:17