00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #ifndef CORBA_REMOTE_CHANNEL_ELEMENT_H
00040 #define CORBA_REMOTE_CHANNEL_ELEMENT_H
00041
00042 #include "DataFlowI.h"
00043 #include "CorbaTypeTransporter.hpp"
00044 #include "CorbaDispatcher.hpp"
00045
00046 namespace RTT {
00047
00048 namespace corba {
00049
00057 template<typename T>
00058 class RemoteChannelElement
00059 : public CRemoteChannelElement_i
00060 , public base::ChannelElement<T>
00061 {
00062 typename internal::ValueDataSource<T>::shared_ptr data_source;
00063
00067 bool valid;
00071 bool pull;
00072
00074 typename base::ChannelElement<T>::value_t sample;
00075
00076 DataFlowInterface* msender;
00077
00081 CORBA::Any* write_any;
00082
00083 PortableServer::ObjectId_var oid;
00084
00085 public:
00091 RemoteChannelElement(CorbaTypeTransporter const& transport, DataFlowInterface* sender, PortableServer::POA_ptr poa, bool is_pull)
00092 : CRemoteChannelElement_i(transport, poa),
00093 data_source(new internal::ValueDataSource<T>), valid(true), pull(is_pull),
00094 msender(sender),
00095 write_any(new CORBA::Any)
00096 {
00097
00098
00099
00100
00101
00102 this->ref();
00103 oid = mpoa->activate_object(this);
00104
00105 CorbaDispatcher::Instance(msender);
00106 }
00107
00108 ~RemoteChannelElement()
00109 {
00110 delete write_any;
00111 }
00112
00114 void _add_ref()
00115 { this->ref(); }
00117 void _remove_ref()
00118 { this->deref(); }
00119
00120
00124 CORBA::Boolean remoteSignal() ACE_THROW_SPEC ((
00125 CORBA::SystemException
00126 ))
00127 { return base::ChannelElement<T>::signal(); }
00128
00129 bool signal()
00130 {
00131
00132 base::ChannelElementBase::signal();
00133
00134 if ( CORBA::is_nil(remote_side.in()) )
00135 return true;
00136
00137
00138
00139 CorbaDispatcher::Instance(msender)->dispatchChannel( this );
00140
00141 return valid;
00142 }
00143
00144 virtual void transferSamples() {
00145 if (!valid)
00146 return;
00147
00148
00149 if ( pull ) {
00150 try
00151 { valid = remote_side->remoteSignal(); }
00152 #ifdef CORBA_IS_OMNIORB
00153 catch(CORBA::SystemException& e)
00154 {
00155 log(Error) << "caught CORBA exception while signalling our remote endpoint: " << e._name() << " " << e.NP_minorString() << endlog();
00156 valid = false;
00157 }
00158 #endif
00159 catch(CORBA::Exception& e)
00160 {
00161 log(Error) << "caught CORBA exception while signalling our remote endpoint: " << e._name() << endlog();
00162 valid = false;
00163 }
00164 } else {
00165
00166 while ( this->read(sample, false) == NewData && valid) {
00167
00168 if ( this->write(sample) == false )
00169 valid = false;
00170
00171 }
00172 }
00173
00174
00175 }
00176
00180 void disconnect() ACE_THROW_SPEC ((
00181 CORBA::SystemException
00182 )) {
00183
00184
00185 try {
00186 if ( ! CORBA::is_nil(remote_side.in()) )
00187 remote_side->remoteDisconnect(true);
00188 }
00189 catch(CORBA::Exception&) {}
00190
00191 try { this->remoteDisconnect(true); }
00192 catch(CORBA::Exception&) {}
00193 }
00194
00195 void remoteDisconnect(bool writer_to_reader) ACE_THROW_SPEC ((
00196 CORBA::SystemException
00197 ))
00198 {
00199 base::ChannelElement<T>::disconnect(writer_to_reader);
00200
00201
00202
00203 base::ChannelElement<T>::disconnect(!writer_to_reader);
00204
00205
00206 try {
00207 if (mdataflow)
00208 mdataflow->deregisterChannel(_this());
00209 mpoa->deactivate_object(oid);
00210 }
00211 catch(CORBA::Exception&) {}
00212 }
00213
00217 void disconnect(bool writer_to_reader) ACE_THROW_SPEC ((
00218 CORBA::SystemException
00219 ))
00220 {
00221 try {
00222 if ( ! CORBA::is_nil(remote_side.in()) )
00223 remote_side->remoteDisconnect(writer_to_reader);
00224 }
00225 catch(CORBA::Exception&) {}
00226
00227 base::ChannelElement<T>::disconnect(writer_to_reader);
00228
00229
00230 try {
00231 if (mdataflow)
00232 mdataflow->deregisterChannel(_this());
00233 mpoa->deactivate_object(oid);
00234 }
00235 catch(CORBA::Exception&) {}
00236 }
00237
00238 FlowStatus read(typename base::ChannelElement<T>::reference_t sample, bool copy_old_data)
00239 {
00240
00241 FlowStatus fs;
00242 CFlowStatus cfs;
00243 if ( (fs = base::ChannelElement<T>::read(sample, copy_old_data)) )
00244 return fs;
00245
00246 CORBA::Any_var remote_value;
00247 try
00248 {
00249 if ( remote_side && (cfs = remote_side->read(remote_value, copy_old_data) ) )
00250 {
00251 RTT::internal::ReferenceDataSource<T> data_source(sample);
00252
00253
00254
00255
00256 data_source.ref();
00257 RTT::base::DataSourceBase::shared_ptr ptr(&data_source);
00258
00259 transport.updateFromAny(&remote_value.in(), ptr);
00260 return (FlowStatus)cfs;
00261 }
00262 else
00263 return NoData;
00264 }
00265 #ifdef CORBA_IS_OMNIORB
00266 catch(CORBA::SystemException& e)
00267 {
00268 log(Error) << "caught CORBA exception while reading a remote channel: " << e._name() << " " << e.NP_minorString() << endlog();
00269 return NoData;
00270 }
00271 #endif
00272 catch(CORBA::Exception& e)
00273 {
00274 log(Error) << "caught CORBA exception while reading a remote channel: " << e._name() << endlog();
00275 return NoData;
00276 }
00277 }
00278
00282 CFlowStatus read(::CORBA::Any_out sample, bool copy_old_data) ACE_THROW_SPEC ((
00283 CORBA::SystemException
00284 ))
00285 {
00286
00287 FlowStatus fs;
00288 if ( (fs = base::ChannelElement<T>::read(data_source->set(), copy_old_data)) )
00289 {
00290 sample = transport.createAny(data_source);
00291 if ( sample != 0) {
00292 return (CFlowStatus)fs;
00293 }
00294
00295 log(Error) << "CORBA Transport failed to create Any for " << data_source->getTypeName() << " while it should have!" <<endlog();
00296 }
00297
00298 sample = new CORBA::Any();
00299 return CNoData;
00300 }
00301
00302 bool write(typename base::ChannelElement<T>::param_t sample)
00303 {
00304 data_source->set(sample);
00305
00306 if (base::ChannelElement<T>::write(sample))
00307 return true;
00308
00309 assert( remote_side.in() != 0 && "Got write() without remote side. Need buffer OR remote side but neither was present.");
00310 try
00311 {
00312 transport.updateAny(data_source, *write_any);
00313 remote_side->write(*write_any);
00314 return true;
00315 }
00316 #ifdef CORBA_IS_OMNIORB
00317 catch(CORBA::SystemException& e)
00318 {
00319 log(Error) << "caught CORBA exception while marshalling: " << e._name() << " " << e.NP_minorString() << endlog();
00320 return false;
00321 }
00322 #endif
00323 catch(CORBA::Exception& e)
00324 {
00325 log(Error) << "caught CORBA exception while marshalling: " << e._name() << endlog();
00326 return false;
00327 }
00328 }
00329
00333 bool write(const ::CORBA::Any& sample) ACE_THROW_SPEC ((
00334 CORBA::SystemException
00335 ))
00336 {
00337 transport.updateFromAny(&sample, data_source);
00338 return base::ChannelElement<T>::write(data_source->rvalue());
00339 }
00340
00341 virtual bool data_sample(typename base::ChannelElement<T>::param_t sample)
00342 {
00343
00344
00345 typename base::ChannelElement<T>::shared_ptr output =
00346 this->getOutput();
00347 if (output)
00348 return base::ChannelElement<T>::data_sample(sample);
00349 return true;
00350 }
00351
00355 virtual bool inputReady() {
00356
00357 typename base::ChannelElement<T>::shared_ptr input =
00358 this->getInput();
00359 if (input)
00360 return base::ChannelElement<T>::inputReady();
00361 return true;
00362 }
00363
00364 };
00365 }
00366 }
00367
00368 #endif
00369