00001 #include <ros/assert.h>
00002 #include <shared_serial/server.h>
00003 #include <shared_serial/watchdog.h>
00004
00005 SerialServerLock::SerialServerLock() :
00006 socket_(0), last_socket_(0)
00007 {
00008 pthread_mutex_init(&mutex_, NULL);
00009 pthread_cond_init(&condition_, NULL);
00010 }
00011
00012 void SerialServerLock::checkTimeout()
00013 {
00014 pthread_mutex_lock(&mutex_);
00015
00016 struct timeval tv;
00017 gettimeofday(&tv, NULL);
00018
00019
00020 if (socket_ && (tv.tv_sec > timeout_.tv_sec ||
00021 (tv.tv_sec == timeout_.tv_sec && tv.tv_usec > timeout_.tv_usec)))
00022 {
00023 ROS_DEBUG_STREAM("Lock " << socket_ << " expired");
00024 socket_ = 0;
00025
00026 pthread_cond_signal(&condition_);
00027 }
00028
00029 pthread_mutex_unlock(&mutex_);
00030 }
00031
00032 int SerialServerLock::lock(int socket, float timeout)
00033 {
00034 pthread_mutex_lock(&mutex_);
00035
00036
00037 if (socket && socket != socket_)
00038 {
00039 ROS_ERROR_STREAM("Unknown lock " << socket);
00040 pthread_mutex_unlock(&mutex_);
00041 return -1;
00042 }
00043
00044 if (!socket)
00045 {
00046
00047 while (socket_)
00048 {
00049 ROS_DEBUG_STREAM("Waiting for lock");
00050 pthread_cond_wait(&condition_, &mutex_);
00051 }
00052
00053
00054 socket_ = last_socket_ = std::max(last_socket_+1, 1);
00055
00056 ROS_DEBUG_STREAM("New lock " << socket_);
00057 }
00058
00059 if (timeout)
00060 {
00061
00062 struct timeval tv;
00063 gettimeofday(&tv, NULL);
00064
00065 tv.tv_sec += (int)timeout;
00066 tv.tv_usec += (int)((timeout-(int)timeout)*1000000);
00067 if (tv.tv_usec > 1000000)
00068 {
00069 tv.tv_sec++;
00070 tv.tv_usec -= 1000000;
00071 }
00072 timeout_ = tv;
00073
00074 ROS_DEBUG_STREAM("Lock " << socket_ << " will expire after " << timeout << " seconds");
00075 }
00076 else
00077 {
00078 ROS_DEBUG_STREAM("Lock " << socket_ << " expired");
00079
00080 timeout_.tv_sec = timeout_.tv_usec = socket_ = 0;
00081 }
00082
00083 return socket_;
00084 }
00085
00086 void SerialServerLock::unlock()
00087 {
00088 if (!socket_)
00089 pthread_cond_signal(&condition_);
00090 pthread_mutex_unlock(&mutex_);
00091 }
00092
00093 void SerialServer::advertiseServices()
00094 {
00095 ROS_INFO("Registering services");
00096
00097 send_topic_ = nh_.subscribe("send", 10, &SerialServer::callbackSend, this);
00098 close_topic_ = nh_.subscribe("close", 1, &SerialServer::callbackClose, this);
00099 flush_topic_ = nh_.subscribe("flush", 1, &SerialServer::callbackFlush, this);
00100 connect_service_ = nh_.advertiseService("connect", &SerialServer::callbackConnect, this);
00101 sendto_service_ = nh_.advertiseService("sendto", &SerialServer::callbackSendTo, this);
00102 recv_service_ = nh_.advertiseService("recv", &SerialServer::callbackRecv, this);
00103 sendrecv_service_ = nh_.advertiseService("sendrecv", &SerialServer::callbackSendRecv, this);
00104 }
00105
00106 void SerialServer::readParameters()
00107 {
00108 std::string port_name, port_type;
00109 LxSerial::PortType lx_port_type;
00110 int baud_rate;
00111 double watchdog_interval;
00112
00113 ROS_INFO("Reading parameters");
00114
00115 ROS_ASSERT(nh_.getParam("port_name", port_name));
00116 ROS_ASSERT(nh_.getParam("port_type", port_type));
00117 ROS_ASSERT(nh_.getParam("baud_rate", baud_rate));
00118 nh_.param<double>("watchdog_interval", watchdog_interval, 10);
00119
00120 if (port_type == "RS232")
00121 lx_port_type = LxSerial::RS232;
00122 else if (port_type == "RS485_FTDI")
00123 lx_port_type = LxSerial::RS485_FTDI;
00124 else if (port_type == "RS485_EXAR")
00125 lx_port_type = LxSerial::RS485_EXAR;
00126 else if (port_type == "RS485_SMSC")
00127 lx_port_type = LxSerial::RS485_SMSC;
00128 else if (port_type == "TCP")
00129 lx_port_type = LxSerial::TCP;
00130 else
00131 {
00132 ROS_FATAL_STREAM("Unknown port type " << port_type);
00133 ROS_BREAK();
00134 return;
00135 }
00136
00137 ROS_ASSERT(serial_port_.port_open(port_name, lx_port_type));
00138 ROS_ASSERT(serial_port_.set_speed_int(baud_rate));
00139 ROS_ASSERT(watchdog_.set(watchdog_interval));
00140 }
00141
00142 void SerialServer::callbackSend(const shared_serial::Send::ConstPtr& msg)
00143 {
00144 int socket = lock_.lock(msg->socket, msg->timeout);
00145 if (socket < 0) return;
00146
00147 unsigned char *buf = new unsigned char[msg->data.size()];
00148 for (unsigned int ii=0; ii != msg->data.size(); ++ii)
00149 buf[ii] = msg->data[ii];
00150
00151 int n = serial_port_.port_write(buf, msg->data.size());
00152 delete buf;
00153
00154 if (n != (int)msg->data.size())
00155 {
00156 ROS_ERROR("Truncated send, flushing port");
00157 serial_port_.flush_buffer();
00158
00159 lock_.unlock();
00160 return;
00161 }
00162
00163 ROS_DEBUG_STREAM("Sent " << n << " bytes");
00164
00165 lock_.unlock();
00166 return;
00167 }
00168
00169 void SerialServer::callbackClose(const shared_serial::Close::ConstPtr& msg)
00170 {
00171 int socket = lock_.lock(msg->socket, 0);
00172 if (socket < 0) return;
00173
00174 lock_.unlock();
00175 return;
00176 }
00177
00178 void SerialServer::callbackFlush(const shared_serial::Flush::ConstPtr& msg)
00179 {
00180 int socket = lock_.lock(msg->socket, msg->timeout);
00181 if (socket < 0) return;
00182
00183 serial_port_.flush_buffer();
00184 usleep(10);
00185 serial_port_.flush_buffer();
00186
00187 ROS_INFO_STREAM("Flushed port");
00188
00189 lock_.unlock();
00190 return;
00191 }
00192
00193 bool SerialServer::callbackConnect(shared_serial::Connect::Request& req, shared_serial::Connect::Response& res)
00194 {
00195 int socket = lock_.lock(0, req.timeout);
00196 if (socket < 0) return false;
00197
00198 res.socket = socket;
00199
00200 lock_.unlock();
00201 return true;
00202 }
00203
00204 bool SerialServer::callbackSendTo(shared_serial::SendTo::Request& req, shared_serial::SendTo::Response& res)
00205 {
00206 int socket = lock_.lock(req.socket, req.timeout);
00207 if (socket < 0) return false;
00208
00209 unsigned char *buf = new unsigned char[req.data.size()];
00210 for (unsigned int ii=0; ii != req.data.size(); ++ii)
00211 buf[ii] = req.data[ii];
00212
00213 int n = serial_port_.port_write(buf, req.data.size());
00214 delete buf;
00215
00216 if (n != (int)req.data.size())
00217 {
00218 ROS_ERROR("Truncated send, flushing port");
00219 serial_port_.flush_buffer();
00220
00221 lock_.unlock();
00222 return false;
00223 }
00224
00225 ROS_DEBUG_STREAM("Sent " << n << " bytes");
00226
00227 res.socket = socket;
00228
00229 lock_.unlock();
00230 return true;
00231 }
00232
00233 bool SerialServer::callbackRecv(shared_serial::Recv::Request& req, shared_serial::Recv::Response& res)
00234 {
00235 int socket = lock_.lock(req.socket, req.sock_timeout);
00236 if (socket < 0) return false;
00237
00238 ROS_ASSERT(req.length <= 65536);
00239
00240 unsigned char *buf = new unsigned char[req.length];
00241 int n = serial_port_.port_read(buf, req.length, (int)req.recv_timeout, (int)((req.recv_timeout-(int)req.recv_timeout)*1000000));
00242
00243 if (n < 0)
00244 {
00245 ROS_ERROR_STREAM("Error " << n << " while reading serial port");
00246 delete buf;
00247
00248 serial_port_.flush_buffer();
00249 usleep(10);
00250 serial_port_.flush_buffer();
00251
00252 lock_.unlock();
00253 return false;
00254 }
00255
00256 ROS_DEBUG_STREAM("Read " << n << " bytes");
00257
00258 res.data.resize(n);
00259 for (int ii=0; ii != n; ++ii)
00260 res.data[ii] = buf[ii];
00261 delete buf;
00262
00263 res.socket = socket;
00264
00265 lock_.unlock();
00266 return true;
00267 }
00268
00269 bool SerialServer::callbackSendRecv(shared_serial::SendRecv::Request& req, shared_serial::SendRecv::Response& res)
00270 {
00271 int socket = lock_.lock(req.socket, req.sock_timeout);
00272 if (socket < 0) return false;
00273
00274
00275 unsigned char *buf = new unsigned char[req.send_data.size()];
00276 for (unsigned int ii=0; ii != req.send_data.size(); ++ii)
00277 buf[ii] = req.send_data[ii];
00278
00279 int n = serial_port_.port_write(buf, req.send_data.size());
00280 delete buf;
00281
00282 if (n != (int)req.send_data.size())
00283 {
00284 ROS_ERROR("Truncated send, flushing port");
00285 serial_port_.flush_buffer();
00286
00287 lock_.unlock();
00288 return false;
00289 }
00290
00291 ROS_DEBUG_STREAM("Sent " << n << " bytes");
00292
00293
00294 ROS_ASSERT(req.length <= 65536);
00295
00296 buf = new unsigned char[req.length];
00297 n = serial_port_.port_read(buf, req.length, (int)req.recv_timeout, (int)((req.recv_timeout-(int)req.recv_timeout)*1000000));
00298
00299 if (n < 0)
00300 {
00301 ROS_ERROR("Error while reading serial port");
00302 delete buf;
00303
00304 serial_port_.flush_buffer();
00305 usleep(10);
00306 serial_port_.flush_buffer();
00307
00308 lock_.unlock();
00309 return false;
00310 }
00311
00312 ROS_DEBUG_STREAM("Read " << n << " bytes");
00313
00314 res.recv_data.resize(n);
00315 for (int ii=0; ii != n; ++ii)
00316 res.recv_data[ii] = buf[ii];
00317 delete buf;
00318
00319 res.socket = socket;
00320
00321 lock_.unlock();
00322 return true;
00323 }
00324
00325 int main(int argc, char **argv)
00326 {
00327 ros::init(argc, argv, "shared_serial_server");
00328
00329 SerialServer serialServer;
00330
00331 ROS_INFO("Spinning");
00332
00333 ros::AsyncSpinner spinner(8);
00334 spinner.start();
00335
00336 ros::Rate loop_rate(10);
00337 while (ros::ok())
00338 {
00339 serialServer.checkTimeout();
00340 serialServer.kickWatchdog();
00341 loop_rate.sleep();
00342 }
00343
00344 return 0;
00345 }