Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #ifndef CORBA_REMOTE_CHANNEL_ELEMENT_H
00040 #define CORBA_REMOTE_CHANNEL_ELEMENT_H
00041
00042 #include "DataFlowI.h"
00043 #include "CorbaTypeTransporter.hpp"
00044 #include "CorbaDispatcher.hpp"
00045
00046 namespace RTT {
00047
00048 namespace corba {
00049
00057 template<typename T>
00058 class RemoteChannelElement
00059 : public CRemoteChannelElement_i
00060 , public base::ChannelElement<T>
00061 {
00062
00066 bool valid;
00070 bool pull;
00071
00072 DataFlowInterface* msender;
00073
00074 PortableServer::ObjectId_var oid;
00075
00076 public:
00082 RemoteChannelElement(CorbaTypeTransporter const& transport, DataFlowInterface* sender, PortableServer::POA_ptr poa, bool is_pull)
00083 : CRemoteChannelElement_i(transport, poa)
00084 , valid(true), pull(is_pull)
00085 , msender(sender)
00086 {
00087
00088
00089
00090
00091
00092 this->ref();
00093 oid = mpoa->activate_object(this);
00094
00095 CorbaDispatcher::Instance(msender);
00096 }
00097
00098 ~RemoteChannelElement()
00099 {
00100 }
00101
00103 void _add_ref()
00104 { this->ref(); }
00106 void _remove_ref()
00107 { this->deref(); }
00108
00109
00113 CORBA::Boolean remoteSignal() ACE_THROW_SPEC ((
00114 CORBA::SystemException
00115 ))
00116 { return base::ChannelElement<T>::signal(); }
00117
00118 bool signal()
00119 {
00120
00121 base::ChannelElementBase::signal();
00122
00123 if ( CORBA::is_nil(remote_side.in()) )
00124 return true;
00125
00126
00127
00128 CorbaDispatcher::Instance(msender)->dispatchChannel( this );
00129
00130 return valid;
00131 }
00132
00133 virtual void transferSamples() {
00134 if (!valid)
00135 return;
00136
00137
00138 if ( pull ) {
00139 try
00140 { valid = remote_side->remoteSignal(); }
00141 #ifdef CORBA_IS_OMNIORB
00142 catch(CORBA::SystemException& e)
00143 {
00144 log(Error) << "caught CORBA exception while signalling our remote endpoint: " << e._name() << " " << e.NP_minorString() << endlog();
00145 valid = false;
00146 }
00147 #endif
00148 catch(CORBA::Exception& e)
00149 {
00150 log(Error) << "caught CORBA exception while signalling our remote endpoint: " << e._name() << endlog();
00151 valid = false;
00152 }
00153 } else {
00155 typename base::ChannelElement<T>::value_t sample;
00156
00157
00158 while ( this->read(sample, false) == NewData && valid) {
00159
00160 if ( this->write(sample) == false )
00161 valid = false;
00162
00163 }
00164 }
00165
00166
00167 }
00168
00172 void disconnect() ACE_THROW_SPEC ((
00173 CORBA::SystemException
00174 )) {
00175
00176
00177 try {
00178 if ( ! CORBA::is_nil(remote_side.in()) )
00179 remote_side->remoteDisconnect(true);
00180 }
00181 catch(CORBA::Exception&) {}
00182
00183 try { this->remoteDisconnect(true); }
00184 catch(CORBA::Exception&) {}
00185 }
00186
00187 void remoteDisconnect(bool writer_to_reader) ACE_THROW_SPEC ((
00188 CORBA::SystemException
00189 ))
00190 {
00191 base::ChannelElement<T>::disconnect(writer_to_reader);
00192
00193
00194
00195 base::ChannelElement<T>::disconnect(!writer_to_reader);
00196
00197
00198 try {
00199 if (mdataflow)
00200 mdataflow->deregisterChannel(_this());
00201 mpoa->deactivate_object(oid);
00202 }
00203 catch(CORBA::Exception&) {}
00204 }
00205
00209 void disconnect(bool writer_to_reader) ACE_THROW_SPEC ((
00210 CORBA::SystemException
00211 ))
00212 {
00213 try {
00214 if ( ! CORBA::is_nil(remote_side.in()) )
00215 remote_side->remoteDisconnect(writer_to_reader);
00216 }
00217 catch(CORBA::Exception&) {}
00218
00219 base::ChannelElement<T>::disconnect(writer_to_reader);
00220
00221
00222 try {
00223 if (mdataflow)
00224 mdataflow->deregisterChannel(_this());
00225 mpoa->deactivate_object(oid);
00226 }
00227 catch(CORBA::Exception&) {}
00228 }
00229
00230 FlowStatus read(typename base::ChannelElement<T>::reference_t sample, bool copy_old_data)
00231 {
00232 if (!valid)
00233 return NoData;
00234
00235
00236 FlowStatus fs;
00237 CFlowStatus cfs;
00238 if ( (fs = base::ChannelElement<T>::read(sample, copy_old_data)) )
00239 return fs;
00240
00241
00242 CORBA::Any_var remote_value;
00243 try
00244 {
00245 if ( remote_side && (cfs = remote_side->read(remote_value, copy_old_data) ) )
00246 {
00247 if (cfs == CNewData || (cfs == COldData && copy_old_data)) {
00248 internal::LateReferenceDataSource<T> ref_data_source(&sample);
00249 ref_data_source.ref();
00250 transport.updateFromAny(&remote_value.in(), &ref_data_source);
00251 }
00252 return (FlowStatus)cfs;
00253 }
00254 else
00255 return NoData;
00256 }
00257 #ifdef CORBA_IS_OMNIORB
00258 catch(CORBA::SystemException& e)
00259 {
00260 log(Error) << "caught CORBA exception while reading a remote channel: " << e._name() << " " << e.NP_minorString() << endlog();
00261 valid = false;
00262 return NoData;
00263 }
00264 #endif
00265 catch(CORBA::Exception& e)
00266 {
00267 log(Error) << "caught CORBA exception while reading a remote channel: " << e._name() << endlog();
00268 valid = false;
00269 return NoData;
00270 }
00271 }
00272
00276 CFlowStatus read(::CORBA::Any_out sample, bool copy_old_data) ACE_THROW_SPEC ((
00277 CORBA::SystemException
00278 ))
00279 {
00280
00281 FlowStatus fs;
00282 typename internal::ValueDataSource<T> value_data_source;
00283 value_data_source.ref();
00284 fs = base::ChannelElement<T>::read(value_data_source.set(), copy_old_data);
00285 if (fs == NewData || (fs == OldData && copy_old_data)) {
00286 sample = transport.createAny(&value_data_source);
00287 if ( sample != 0) {
00288 return (CFlowStatus)fs;
00289 }
00290
00291 log(Error) << "CORBA Transport failed to create Any for " << value_data_source.getTypeName() << " while it should have!" <<endlog();
00292 }
00293
00294 sample = new CORBA::Any();
00295 return (CFlowStatus)fs;
00296 }
00297
00298 bool write(typename base::ChannelElement<T>::param_t sample)
00299 {
00300
00301 if (base::ChannelElement<T>::write(sample))
00302 return true;
00303
00304 assert( remote_side.in() != 0 && "Got write() without remote side. Need buffer OR remote side but neither was present.");
00305 try
00306 {
00310 CORBA::Any write_any;
00311 internal::LateConstReferenceDataSource<T> const_ref_data_source(&sample);
00312 const_ref_data_source.ref();
00313
00314
00315
00316
00317 transport.updateAny(&const_ref_data_source, write_any);
00318 remote_side->write(write_any);
00319 return true;
00320 }
00321 #ifdef CORBA_IS_OMNIORB
00322 catch(CORBA::SystemException& e)
00323 {
00324 log(Error) << "caught CORBA exception while marshalling: " << e._name() << " " << e.NP_minorString() << endlog();
00325 return false;
00326 }
00327 #endif
00328 catch(CORBA::Exception& e)
00329 {
00330 log(Error) << "caught CORBA exception while marshalling: " << e._name() << endlog();
00331 return false;
00332 }
00333 }
00334
00338 bool write(const ::CORBA::Any& sample) ACE_THROW_SPEC ((
00339 CORBA::SystemException
00340 ))
00341 {
00342 typename internal::ValueDataSource<T> value_data_source;
00343 value_data_source.ref();
00344 transport.updateFromAny(&sample, &value_data_source);
00345 return base::ChannelElement<T>::write(value_data_source.rvalue());
00346 }
00347
00348 virtual bool data_sample(typename base::ChannelElement<T>::param_t sample)
00349 {
00350
00351
00352 typename base::ChannelElement<T>::shared_ptr output =
00353 this->getOutput();
00354 if (output)
00355 return base::ChannelElement<T>::data_sample(sample);
00356 return true;
00357 }
00358
00362 virtual bool inputReady() {
00363
00364 typename base::ChannelElement<T>::shared_ptr input =
00365 this->getInput();
00366 if (input)
00367 return base::ChannelElement<T>::inputReady();
00368 return true;
00369 }
00370
00371 };
00372 }
00373 }
00374
00375 #endif
00376