Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #ifndef CORBA_REMOTE_CHANNEL_ELEMENT_H
00040 #define CORBA_REMOTE_CHANNEL_ELEMENT_H
00041
00042 #include "DataFlowI.h"
00043 #include "CorbaTypeTransporter.hpp"
00044 #include "CorbaDispatcher.hpp"
00045
00046 namespace RTT {
00047
00048 namespace corba {
00049
00057 template<typename T>
00058 class RemoteChannelElement
00059 : public CRemoteChannelElement_i
00060 , public base::ChannelElement<T>
00061 {
00062 typename internal::ValueDataSource<T>::shared_ptr value_data_source;
00063 typename internal::LateReferenceDataSource<T>::shared_ptr ref_data_source;
00064 typename internal::LateConstReferenceDataSource<T>::shared_ptr const_ref_data_source;
00065
00069 bool valid;
00073 bool pull;
00074
00076 typename base::ChannelElement<T>::value_t sample;
00077
00078 DataFlowInterface* msender;
00079
00083 CORBA::Any* write_any;
00084
00085 PortableServer::ObjectId_var oid;
00086
00087 public:
00093 RemoteChannelElement(CorbaTypeTransporter const& transport, DataFlowInterface* sender, PortableServer::POA_ptr poa, bool is_pull)
00094 : CRemoteChannelElement_i(transport, poa),
00095 value_data_source(new internal::ValueDataSource<T>),
00096 ref_data_source(new internal::LateReferenceDataSource<T>),
00097 const_ref_data_source(new internal::LateConstReferenceDataSource<T>),
00098 valid(true), pull(is_pull),
00099 msender(sender),
00100 write_any(new CORBA::Any)
00101 {
00102
00103
00104
00105
00106
00107 this->ref();
00108 oid = mpoa->activate_object(this);
00109
00110 CorbaDispatcher::Instance(msender);
00111 }
00112
00113 ~RemoteChannelElement()
00114 {
00115 delete write_any;
00116 }
00117
00119 void _add_ref()
00120 { this->ref(); }
00122 void _remove_ref()
00123 { this->deref(); }
00124
00125
00129 CORBA::Boolean remoteSignal() ACE_THROW_SPEC ((
00130 CORBA::SystemException
00131 ))
00132 { return base::ChannelElement<T>::signal(); }
00133
00134 bool signal()
00135 {
00136
00137 base::ChannelElementBase::signal();
00138
00139 if ( CORBA::is_nil(remote_side.in()) )
00140 return true;
00141
00142
00143
00144 CorbaDispatcher::Instance(msender)->dispatchChannel( this );
00145
00146 return valid;
00147 }
00148
00149 virtual void transferSamples() {
00150 if (!valid)
00151 return;
00152
00153
00154 if ( pull ) {
00155 try
00156 { valid = remote_side->remoteSignal(); }
00157 #ifdef CORBA_IS_OMNIORB
00158 catch(CORBA::SystemException& e)
00159 {
00160 log(Error) << "caught CORBA exception while signalling our remote endpoint: " << e._name() << " " << e.NP_minorString() << endlog();
00161 valid = false;
00162 }
00163 #endif
00164 catch(CORBA::Exception& e)
00165 {
00166 log(Error) << "caught CORBA exception while signalling our remote endpoint: " << e._name() << endlog();
00167 valid = false;
00168 }
00169 } else {
00170
00171 while ( this->read(sample, false) == NewData && valid) {
00172
00173 if ( this->write(sample) == false )
00174 valid = false;
00175
00176 }
00177 }
00178
00179
00180 }
00181
00185 void disconnect() ACE_THROW_SPEC ((
00186 CORBA::SystemException
00187 )) {
00188
00189
00190 try {
00191 if ( ! CORBA::is_nil(remote_side.in()) )
00192 remote_side->remoteDisconnect(true);
00193 }
00194 catch(CORBA::Exception&) {}
00195
00196 try { this->remoteDisconnect(true); }
00197 catch(CORBA::Exception&) {}
00198 }
00199
00200 void remoteDisconnect(bool writer_to_reader) ACE_THROW_SPEC ((
00201 CORBA::SystemException
00202 ))
00203 {
00204 base::ChannelElement<T>::disconnect(writer_to_reader);
00205
00206
00207
00208 base::ChannelElement<T>::disconnect(!writer_to_reader);
00209
00210
00211 try {
00212 if (mdataflow)
00213 mdataflow->deregisterChannel(_this());
00214 mpoa->deactivate_object(oid);
00215 }
00216 catch(CORBA::Exception&) {}
00217 }
00218
00222 void disconnect(bool writer_to_reader) ACE_THROW_SPEC ((
00223 CORBA::SystemException
00224 ))
00225 {
00226 try {
00227 if ( ! CORBA::is_nil(remote_side.in()) )
00228 remote_side->remoteDisconnect(writer_to_reader);
00229 }
00230 catch(CORBA::Exception&) {}
00231
00232 base::ChannelElement<T>::disconnect(writer_to_reader);
00233
00234
00235 try {
00236 if (mdataflow)
00237 mdataflow->deregisterChannel(_this());
00238 mpoa->deactivate_object(oid);
00239 }
00240 catch(CORBA::Exception&) {}
00241 }
00242
00243 FlowStatus read(typename base::ChannelElement<T>::reference_t sample, bool copy_old_data)
00244 {
00245 if (!valid)
00246 return NoData;
00247
00248
00249 FlowStatus fs;
00250 CFlowStatus cfs;
00251 if ( (fs = base::ChannelElement<T>::read(sample, copy_old_data)) )
00252 return fs;
00253
00254
00255 CORBA::Any_var remote_value;
00256 try
00257 {
00258 if ( remote_side && (cfs = remote_side->read(remote_value, copy_old_data) ) )
00259 {
00260 ref_data_source->setPointer(&sample);
00261 transport.updateFromAny(&remote_value.in(), ref_data_source);
00262 return (FlowStatus)cfs;
00263 }
00264 else
00265 return NoData;
00266 }
00267 #ifdef CORBA_IS_OMNIORB
00268 catch(CORBA::SystemException& e)
00269 {
00270 log(Error) << "caught CORBA exception while reading a remote channel: " << e._name() << " " << e.NP_minorString() << endlog();
00271 valid = false;
00272 return NoData;
00273 }
00274 #endif
00275 catch(CORBA::Exception& e)
00276 {
00277 log(Error) << "caught CORBA exception while reading a remote channel: " << e._name() << endlog();
00278 valid = false;
00279 return NoData;
00280 }
00281 }
00282
00286 CFlowStatus read(::CORBA::Any_out sample, bool copy_old_data) ACE_THROW_SPEC ((
00287 CORBA::SystemException
00288 ))
00289 {
00290
00291 FlowStatus fs;
00292 if ( (fs = base::ChannelElement<T>::read(value_data_source->set(), copy_old_data)) )
00293 {
00294 sample = transport.createAny(value_data_source);
00295 if ( sample != 0) {
00296 return (CFlowStatus)fs;
00297 }
00298
00299 log(Error) << "CORBA Transport failed to create Any for " << value_data_source->getTypeName() << " while it should have!" <<endlog();
00300 }
00301
00302 sample = new CORBA::Any();
00303 return CNoData;
00304 }
00305
00306 bool write(typename base::ChannelElement<T>::param_t sample)
00307 {
00308
00309 if (base::ChannelElement<T>::write(sample))
00310 return true;
00311
00312 assert( remote_side.in() != 0 && "Got write() without remote side. Need buffer OR remote side but neither was present.");
00313 try
00314 {
00315
00316
00317
00318 const_ref_data_source->setPointer(&sample);
00319 transport.updateAny(const_ref_data_source, *write_any);
00320 remote_side->write(*write_any);
00321 return true;
00322 }
00323 #ifdef CORBA_IS_OMNIORB
00324 catch(CORBA::SystemException& e)
00325 {
00326 log(Error) << "caught CORBA exception while marshalling: " << e._name() << " " << e.NP_minorString() << endlog();
00327 return false;
00328 }
00329 #endif
00330 catch(CORBA::Exception& e)
00331 {
00332 log(Error) << "caught CORBA exception while marshalling: " << e._name() << endlog();
00333 return false;
00334 }
00335 }
00336
00340 bool write(const ::CORBA::Any& sample) ACE_THROW_SPEC ((
00341 CORBA::SystemException
00342 ))
00343 {
00344 transport.updateFromAny(&sample, value_data_source);
00345 return base::ChannelElement<T>::write(value_data_source->rvalue());
00346 }
00347
00348 virtual bool data_sample(typename base::ChannelElement<T>::param_t sample)
00349 {
00350
00351
00352 typename base::ChannelElement<T>::shared_ptr output =
00353 this->getOutput();
00354 if (output)
00355 return base::ChannelElement<T>::data_sample(sample);
00356 return true;
00357 }
00358
00362 virtual bool inputReady() {
00363
00364 typename base::ChannelElement<T>::shared_ptr input =
00365 this->getInput();
00366 if (input)
00367 return base::ChannelElement<T>::inputReady();
00368 return true;
00369 }
00370
00371 };
00372 }
00373 }
00374
00375 #endif
00376