00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #ifndef ROSCPP_CONNECTION_H
00036 #define ROSCPP_CONNECTION_H
00037
00038 #include "ros/header.h"
00039
00040 #include <boost/signals.hpp>
00041 #include <boost/function.hpp>
00042 #include <boost/shared_ptr.hpp>
00043 #include <boost/shared_array.hpp>
00044 #include <boost/enable_shared_from_this.hpp>
00045 #include <boost/thread/mutex.hpp>
00046 #include <boost/thread/recursive_mutex.hpp>
00047
00048 #define READ_BUFFER_SIZE (1024*64)
00049
00050 namespace ros
00051 {
00052
00053 class Transport;
00054 typedef boost::shared_ptr<Transport> TransportPtr;
00055 class Connection;
00056 typedef boost::shared_ptr<Connection> ConnectionPtr;
00057 typedef boost::function<void(const ConnectionPtr&, const boost::shared_array<uint8_t>&, uint32_t, bool)> ReadFinishedFunc;
00058 typedef boost::function<void(const ConnectionPtr&)> WriteFinishedFunc;
00059
00060 typedef boost::function<bool(const ConnectionPtr&, const Header&)> HeaderReceivedFunc;
00061
00068 class Connection : public boost::enable_shared_from_this<Connection>
00069 {
00070 public:
00071 enum DropReason
00072 {
00073 TransportDisconnect,
00074 HeaderError,
00075 Destructing,
00076 };
00077
00078 Connection();
00079 ~Connection();
00080
00084 void initialize(const TransportPtr& transport, bool is_server, const HeaderReceivedFunc& header_func);
00089 void drop(DropReason reason);
00090
00094 bool isDropped();
00095
00099 bool isSendingHeaderError() { return sending_header_error_; }
00100
00105 void sendHeaderError(const std::string& error_message);
00111 void writeHeader(const M_string& key_vals, const WriteFinishedFunc& finished_callback);
00112
00126 void read(uint32_t size, const ReadFinishedFunc& finished_callback);
00143 void write(const boost::shared_array<uint8_t>& buffer, uint32_t size, const WriteFinishedFunc& finished_callback, bool immedate = true);
00144
00145 typedef boost::signal<void(const ConnectionPtr&, DropReason reason)> DropSignal;
00146 typedef boost::function<void(const ConnectionPtr&, DropReason reason)> DropFunc;
00150 boost::signals::connection addDropListener(const DropFunc& slot);
00151 void removeDropListener(const boost::signals::connection& c);
00152
00156 void setHeaderReceivedCallback(const HeaderReceivedFunc& func);
00157
00161 const TransportPtr& getTransport() { return transport_; }
00165 Header& getHeader() { return header_; }
00166
00171 void setHeader(const Header& header) { header_ = header; }
00172
00173 std::string getCallerId();
00174 std::string getRemoteString();
00175
00176 private:
00180 void onReadable(const TransportPtr& transport);
00184 void onWriteable(const TransportPtr& transport);
00189 void onDisconnect(const TransportPtr& transport);
00190
00191
00192 void onHeaderWritten(const ConnectionPtr& conn);
00193 void onErrorHeaderWritten(const ConnectionPtr& conn);
00194 void onHeaderLengthRead(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success);
00195 void onHeaderRead(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success);
00196
00201 void readTransport();
00205 void writeTransport();
00206
00208 bool is_server_;
00210 bool dropped_;
00212 Header header_;
00214 TransportPtr transport_;
00216 HeaderReceivedFunc header_func_;
00217
00219 boost::shared_array<uint8_t> read_buffer_;
00221 uint32_t read_filled_;
00223 uint32_t read_size_;
00225 ReadFinishedFunc read_callback_;
00227 boost::recursive_mutex read_mutex_;
00229 bool reading_;
00233 volatile uint32_t has_read_callback_;
00234
00236 boost::shared_array<uint8_t> write_buffer_;
00238 uint32_t write_sent_;
00240 uint32_t write_size_;
00242 WriteFinishedFunc write_callback_;
00243 boost::mutex write_callback_mutex_;
00245 boost::recursive_mutex write_mutex_;
00247 bool writing_;
00251 volatile uint32_t has_write_callback_;
00252
00254 WriteFinishedFunc header_written_callback_;
00255
00257 DropSignal drop_signal_;
00258
00260 boost::recursive_mutex drop_mutex_;
00261
00263 bool sending_header_error_;
00264 };
00265 typedef boost::shared_ptr<Connection> ConnectionPtr;
00266
00267 }
00268
00269 #endif // ROSCPP_CONNECTION_H