Go to the documentation of this file.00001
00020 #include <coil/stringutil.h>
00021
00022 #include <rtm/OutPortPushConnector.h>
00023 #include <rtm/ConnectorListener.h>
00024
00025 namespace RTC
00026 {
00034 OutPortPushConnector::OutPortPushConnector(ConnectorInfo info,
00035 InPortConsumer* consumer,
00036 ConnectorListeners& listeners,
00037 CdrBufferBase* buffer)
00038 : OutPortConnector(info),
00039 m_consumer(consumer), m_publisher(0),
00040 m_listeners(listeners), m_buffer(buffer)
00041 {
00042
00043 m_publisher = createPublisher(info);
00044 if (m_buffer == 0)
00045 {
00046 m_buffer = createBuffer(info);
00047 }
00048 if (m_publisher == 0 || m_buffer == 0 || m_consumer == 0)
00049 { throw std::bad_alloc(); }
00050
00051 if (m_publisher->init(info.properties) != PORT_OK)
00052 {
00053 throw std::bad_alloc();
00054 }
00055 m_buffer->init(info.properties.getNode("buffer"));
00056 m_consumer->init(info.properties);
00057
00058 m_publisher->setConsumer(m_consumer);
00059 m_publisher->setBuffer(m_buffer);
00060 m_publisher->setListener(m_profile, &m_listeners);
00061
00062 onConnect();
00063 }
00064
00072 OutPortPushConnector::~OutPortPushConnector()
00073 {
00074 onDisconnect();
00075 disconnect();
00076 }
00077
00085 ConnectorBase::ReturnCode
00086 OutPortPushConnector::write(const cdrMemoryStream& data)
00087 {
00088 RTC_TRACE(("write()"));
00089 RTC_PARANOID(("data size = %d bytes", data.bufSize()));
00090
00091 return m_publisher->write(data, 0, 0);
00092 }
00093
00101 ConnectorBase::ReturnCode OutPortPushConnector::disconnect()
00102 {
00103 RTC_TRACE(("disconnect()"));
00104
00105 if (m_publisher != 0)
00106 {
00107 RTC_DEBUG(("delete publisher"));
00108 PublisherFactory& pfactory(PublisherFactory::instance());
00109 pfactory.deleteObject(m_publisher);
00110 }
00111 m_publisher = 0;
00112
00113
00114 if (m_consumer != 0)
00115 {
00116 RTC_DEBUG(("delete consumer"));
00117 InPortConsumerFactory& cfactory(InPortConsumerFactory::instance());
00118 cfactory.deleteObject(m_consumer);
00119 }
00120 m_consumer = 0;
00121
00122
00123 if (m_buffer != 0)
00124 {
00125 RTC_DEBUG(("delete buffer"));
00126 CdrBufferFactory& bfactory(CdrBufferFactory::instance());
00127 bfactory.deleteObject(m_buffer);
00128 }
00129 m_buffer = 0;
00130 RTC_TRACE(("disconnect() done"));
00131 return PORT_OK;
00132 }
00133
00143 void OutPortPushConnector::activate()
00144 {
00145 m_publisher->activate();
00146 }
00147
00157 void OutPortPushConnector::deactivate()
00158 {
00159 m_publisher->deactivate();
00160 }
00161
00175 CdrBufferBase* OutPortPushConnector::getBuffer()
00176 {
00177 return m_buffer;
00178 }
00179
00187 PublisherBase* OutPortPushConnector::createPublisher(ConnectorInfo& info)
00188 {
00189 std::string pub_type;
00190 pub_type = info.properties.getProperty("subscription_type",
00191 "flush");
00192 coil::normalize(pub_type);
00193 return PublisherFactory::instance().createObject(pub_type);
00194 }
00195
00203 CdrBufferBase* OutPortPushConnector::createBuffer(ConnectorInfo& info)
00204 {
00205 std::string buf_type;
00206 buf_type = info.properties.getProperty("buffer_type",
00207 "ring_buffer");
00208 return CdrBufferFactory::instance().createObject(buf_type);
00209 }
00210
00218 void OutPortPushConnector::onConnect()
00219 {
00220 m_listeners.connector_[ON_CONNECT].notify(m_profile);
00221 }
00222
00230 void OutPortPushConnector::onDisconnect()
00231 {
00232 m_listeners.connector_[ON_DISCONNECT].notify(m_profile);
00233 }
00234 };
00235