00001 #ifndef H_ASIO_BASE 00002 #define H_ASIO_BASE 00003 00004 #include <socketcan_interface/interface.h> 00005 #include <socketcan_interface/dispatcher.h> 00006 #include <boost/asio.hpp> 00007 #include <boost/thread/mutex.hpp> 00008 #include <boost/thread/thread.hpp> 00009 #include <boost/bind.hpp> 00010 00011 namespace can{ 00012 00013 00014 template<typename Socket> class AsioDriver : public DriverInterface{ 00015 typedef FilteredDispatcher<const unsigned int, CommInterface::FrameListener> FrameDispatcher; 00016 typedef SimpleDispatcher<StateInterface::StateListener> StateDispatcher; 00017 FrameDispatcher frame_dispatcher_; 00018 StateDispatcher state_dispatcher_; 00019 00020 State state_; 00021 boost::mutex state_mutex_; 00022 boost::mutex socket_mutex_; 00023 00024 protected: 00025 boost::asio::io_service io_service_; 00026 boost::asio::strand strand_; 00027 Socket socket_; 00028 Frame input_; 00029 00030 virtual void triggerReadSome() = 0; 00031 virtual bool enqueue(const Frame & msg) = 0; 00032 00033 void dispatchFrame(const Frame &msg){ 00034 strand_.post(boost::bind(&FrameDispatcher::dispatch, &frame_dispatcher_, msg)); // copies msg 00035 } 00036 void setErrorCode(const boost::system::error_code& error){ 00037 boost::mutex::scoped_lock lock(state_mutex_); 00038 if(state_.error_code != error){ 00039 state_.error_code = error; 00040 state_dispatcher_.dispatch(state_); 00041 } 00042 } 00043 void setInternalError(unsigned int internal_error){ 00044 boost::mutex::scoped_lock lock(state_mutex_); 00045 if(state_.internal_error != internal_error){ 00046 state_.internal_error = internal_error; 00047 state_dispatcher_.dispatch(state_); 00048 } 00049 } 00050 00051 void setDriverState(State::DriverState state){ 00052 boost::mutex::scoped_lock lock(state_mutex_); 00053 if(state_.driver_state != state){ 00054 state_.driver_state = state; 00055 state_dispatcher_.dispatch(state_); 00056 } 00057 } 00058 void setNotReady(){ 00059 setDriverState(socket_.is_open()?State::open : State::closed); 00060 } 00061 00062 void frameReceived(const boost::system::error_code& error){ 00063 if(!error){ 00064 dispatchFrame(input_); 00065 triggerReadSome(); 00066 }else{ 00067 setErrorCode(error); 00068 setNotReady(); 00069 } 00070 } 00071 00072 AsioDriver() 00073 : strand_(io_service_), socket_(io_service_) 00074 {} 00075 00076 public: 00077 virtual ~AsioDriver() { shutdown(); } 00078 00079 State getState(){ 00080 boost::mutex::scoped_lock lock(state_mutex_); 00081 return state_; 00082 } 00083 virtual void run(){ 00084 setNotReady(); 00085 00086 if(getState().driver_state == State::open){ 00087 io_service_.reset(); 00088 boost::asio::io_service::work work(io_service_); 00089 setDriverState(State::ready); 00090 00091 boost::thread post_thread(boost::bind(&boost::asio::io_service::run, &io_service_)); 00092 00093 triggerReadSome(); 00094 00095 boost::system::error_code ec; 00096 io_service_.run(ec); 00097 setErrorCode(ec); 00098 00099 setNotReady(); 00100 } 00101 state_dispatcher_.dispatch(getState()); 00102 } 00103 virtual bool send(const Frame & msg){ 00104 return getState().driver_state == State::ready && enqueue(msg); 00105 } 00106 00107 virtual void shutdown(){ 00108 if(socket_.is_open()){ 00109 socket_.cancel(); 00110 socket_.close(); 00111 } 00112 io_service_.stop(); 00113 } 00114 00115 virtual FrameListener::Ptr createMsgListener(const FrameDelegate &delegate){ 00116 return frame_dispatcher_.createListener(delegate); 00117 } 00118 virtual FrameListener::Ptr createMsgListener(const Frame::Header&h , const FrameDelegate &delegate){ 00119 return frame_dispatcher_.createListener(h, delegate); 00120 } 00121 virtual StateListener::Ptr createStateListener(const StateDelegate &delegate){ 00122 return state_dispatcher_.createListener(delegate); 00123 } 00124 00125 }; 00126 00127 } // namespace can 00128 #endif