server.cpp
Go to the documentation of this file.
00001 #include <ros/assert.h>
00002 #include <shared_serial/server.h>
00003 #include <shared_serial/watchdog.h>
00004 
00005 SerialServerLock::SerialServerLock() :
00006   socket_(0), last_socket_(0)
00007 {
00008   pthread_mutex_init(&mutex_, NULL);
00009   pthread_cond_init(&condition_, NULL);
00010 }
00011 
00012 void SerialServerLock::checkTimeout()
00013 {
00014   pthread_mutex_lock(&mutex_);
00015 
00016   struct timeval tv;
00017   gettimeofday(&tv, NULL);
00018   
00019   // Check timeout
00020   if (socket_ && (tv.tv_sec > timeout_.tv_sec ||
00021                   (tv.tv_sec == timeout_.tv_sec && tv.tv_usec > timeout_.tv_usec)))
00022   {
00023     ROS_DEBUG_STREAM("Lock " << socket_ << " expired");
00024     socket_ = 0;
00025 
00026     pthread_cond_signal(&condition_);
00027   }
00028   
00029   pthread_mutex_unlock(&mutex_);
00030 }
00031 
00032 int SerialServerLock::lock(int socket, float timeout)
00033 {
00034   pthread_mutex_lock(&mutex_);
00035 
00036   // Check validity
00037   if (socket && socket != socket_)
00038   {
00039     ROS_ERROR_STREAM("Unknown lock " << socket);
00040     pthread_mutex_unlock(&mutex_);
00041     return -1;
00042   }
00043   
00044   if (!socket)
00045   {
00046     // Wait for port to become free
00047     while (socket_)
00048     {
00049       ROS_DEBUG_STREAM("Waiting for lock");
00050       pthread_cond_wait(&condition_, &mutex_);
00051     }
00052     
00053     // New connection
00054     socket_ = last_socket_ = std::max(last_socket_+1, 1);
00055     
00056     ROS_DEBUG_STREAM("New lock " << socket_);
00057   }
00058 
00059   if (timeout)
00060   {
00061     // Set timeout
00062     struct timeval tv;
00063     gettimeofday(&tv, NULL);
00064   
00065     tv.tv_sec += (int)timeout;
00066     tv.tv_usec += (int)((timeout-(int)timeout)*1000000);
00067     if (tv.tv_usec > 1000000)
00068     {
00069       tv.tv_sec++;
00070       tv.tv_usec -= 1000000;
00071     }
00072     timeout_ = tv;
00073     
00074     ROS_DEBUG_STREAM("Lock " << socket_ << " will expire after " << timeout << " seconds");
00075   }
00076   else
00077   {
00078     ROS_DEBUG_STREAM("Lock " << socket_ << " expired");
00079 
00080     timeout_.tv_sec = timeout_.tv_usec = socket_ = 0;
00081   }
00082   
00083   return socket_;
00084 }
00085 
00086 void SerialServerLock::unlock()
00087 {
00088   if (!socket_)
00089     pthread_cond_signal(&condition_);
00090   pthread_mutex_unlock(&mutex_);
00091 }
00092 
00093 void SerialServer::advertiseServices()
00094 {
00095   ROS_INFO("Registering services");
00096 
00097   send_topic_ = nh_.subscribe("send", 10, &SerialServer::callbackSend, this);
00098   close_topic_ = nh_.subscribe("close", 1, &SerialServer::callbackClose, this);
00099   flush_topic_ = nh_.subscribe("flush", 1, &SerialServer::callbackFlush, this);
00100   connect_service_ = nh_.advertiseService("connect", &SerialServer::callbackConnect, this);
00101   sendto_service_ = nh_.advertiseService("sendto", &SerialServer::callbackSendTo, this);
00102   recv_service_ = nh_.advertiseService("recv", &SerialServer::callbackRecv, this);
00103   sendrecv_service_ = nh_.advertiseService("sendrecv", &SerialServer::callbackSendRecv, this);
00104 }
00105 
00106 void SerialServer::readParameters()
00107 {
00108   std::string port_name, port_type;
00109   LxSerial::PortType lx_port_type;
00110   int baud_rate;
00111   double watchdog_interval;
00112   
00113   ROS_INFO("Reading parameters");
00114   
00115   ROS_ASSERT(nh_.getParam("port_name", port_name));
00116   ROS_ASSERT(nh_.getParam("port_type", port_type));
00117   ROS_ASSERT(nh_.getParam("baud_rate", baud_rate));
00118   nh_.param<double>("watchdog_interval", watchdog_interval, 10);
00119 
00120   if (port_type == "RS232")
00121     lx_port_type = LxSerial::RS232;
00122   else if (port_type == "RS485_FTDI")
00123     lx_port_type = LxSerial::RS485_FTDI;
00124   else if (port_type == "RS485_EXAR")
00125     lx_port_type = LxSerial::RS485_EXAR;
00126   else if (port_type == "RS485_SMSC")
00127     lx_port_type = LxSerial::RS485_SMSC;
00128   else if (port_type == "TCP")
00129     lx_port_type = LxSerial::TCP;
00130   else
00131   {
00132     ROS_FATAL_STREAM("Unknown port type " << port_type);
00133     ROS_BREAK();
00134     return;
00135   }
00136 
00137   ROS_ASSERT(serial_port_.port_open(port_name, lx_port_type));
00138   ROS_ASSERT(serial_port_.set_speed_int(baud_rate));
00139   ROS_ASSERT(watchdog_.set(watchdog_interval));
00140 }
00141 
00142 void SerialServer::callbackSend(const shared_serial::Send::ConstPtr& msg)
00143 {
00144   int socket = lock_.lock(msg->socket, msg->timeout);
00145   if (socket < 0) return;
00146     
00147   unsigned char *buf = new unsigned char[msg->data.size()];
00148   for (unsigned int ii=0; ii != msg->data.size(); ++ii)
00149     buf[ii] = msg->data[ii];
00150   
00151   int n = serial_port_.port_write(buf, msg->data.size());
00152   delete buf;
00153   
00154   if (n != (int)msg->data.size())
00155   {
00156     ROS_ERROR("Truncated send, flushing port");
00157     serial_port_.flush_buffer();
00158     
00159     lock_.unlock();
00160     return;
00161   }
00162 
00163   ROS_DEBUG_STREAM("Sent " << n << " bytes");
00164     
00165   lock_.unlock();
00166   return;
00167 }
00168 
00169 void SerialServer::callbackClose(const shared_serial::Close::ConstPtr& msg)
00170 {
00171   int socket = lock_.lock(msg->socket, 0);
00172   if (socket < 0) return;
00173 
00174   lock_.unlock();
00175   return;
00176 }
00177 
00178 void SerialServer::callbackFlush(const shared_serial::Flush::ConstPtr& msg)
00179 {
00180   int socket = lock_.lock(msg->socket, msg->timeout);
00181   if (socket < 0) return;
00182     
00183   serial_port_.flush_buffer();
00184   usleep(10);
00185   serial_port_.flush_buffer();
00186 
00187   ROS_INFO_STREAM("Flushed port");
00188     
00189   lock_.unlock();
00190   return;
00191 }
00192 
00193 bool SerialServer::callbackConnect(shared_serial::Connect::Request& req, shared_serial::Connect::Response& res)
00194 {
00195   int socket = lock_.lock(0, req.timeout);
00196   if (socket < 0) return false;
00197   
00198   res.socket = socket;
00199 
00200   lock_.unlock();  
00201   return true;
00202 }
00203 
00204 bool SerialServer::callbackSendTo(shared_serial::SendTo::Request& req, shared_serial::SendTo::Response& res)
00205 {
00206   int socket = lock_.lock(req.socket, req.timeout);
00207   if (socket < 0) return false;
00208 
00209   unsigned char *buf = new unsigned char[req.data.size()];
00210   for (unsigned int ii=0; ii != req.data.size(); ++ii)
00211     buf[ii] = req.data[ii];
00212     
00213   int n = serial_port_.port_write(buf, req.data.size());
00214   delete buf;
00215   
00216   if (n != (int)req.data.size())
00217   {
00218     ROS_ERROR("Truncated send, flushing port");
00219     serial_port_.flush_buffer();
00220     
00221     lock_.unlock();
00222     return false;
00223   }
00224 
00225   ROS_DEBUG_STREAM("Sent " << n << " bytes");
00226   
00227   res.socket = socket;
00228 
00229   lock_.unlock();
00230   return true;
00231 }
00232 
00233 bool SerialServer::callbackRecv(shared_serial::Recv::Request& req, shared_serial::Recv::Response& res)
00234 {
00235   int socket = lock_.lock(req.socket, req.sock_timeout);
00236   if (socket < 0) return false;
00237 
00238   ROS_ASSERT(req.length <= 65536);
00239 
00240   unsigned char *buf = new unsigned char[req.length];
00241   int n = serial_port_.port_read(buf, req.length, (int)req.recv_timeout, (int)((req.recv_timeout-(int)req.recv_timeout)*1000000));
00242   
00243   if (n < 0)
00244   {
00245     ROS_ERROR_STREAM("Error " << n << " while reading serial port");
00246     delete buf;
00247 
00248     serial_port_.flush_buffer();
00249     usleep(10);
00250     serial_port_.flush_buffer();
00251     
00252     lock_.unlock();
00253     return false;
00254   }
00255   
00256   ROS_DEBUG_STREAM("Read " << n << " bytes");
00257   
00258   res.data.resize(n);
00259   for (int ii=0; ii != n; ++ii)
00260     res.data[ii] = buf[ii];
00261   delete buf;
00262   
00263   res.socket = socket;
00264     
00265   lock_.unlock();
00266   return true;
00267 }
00268 
00269 bool SerialServer::callbackSendRecv(shared_serial::SendRecv::Request& req, shared_serial::SendRecv::Response& res)
00270 {
00271   int socket = lock_.lock(req.socket, req.sock_timeout);
00272   if (socket < 0) return false;
00273 
00274   // *** Send ***
00275   unsigned char *buf = new unsigned char[req.send_data.size()];
00276   for (unsigned int ii=0; ii != req.send_data.size(); ++ii)
00277     buf[ii] = req.send_data[ii];
00278     
00279   int n = serial_port_.port_write(buf, req.send_data.size());
00280   delete buf;
00281   
00282   if (n != (int)req.send_data.size())
00283   {
00284     ROS_ERROR("Truncated send, flushing port");
00285     serial_port_.flush_buffer();
00286     
00287     lock_.unlock();
00288     return false;
00289   }
00290 
00291   ROS_DEBUG_STREAM("Sent " << n << " bytes");
00292 
00293   // *** Receive ***
00294   ROS_ASSERT(req.length <= 65536);
00295 
00296   buf = new unsigned char[req.length];
00297   n = serial_port_.port_read(buf, req.length, (int)req.recv_timeout, (int)((req.recv_timeout-(int)req.recv_timeout)*1000000));
00298   
00299   if (n < 0)
00300   {
00301     ROS_ERROR("Error while reading serial port");
00302     delete buf;
00303 
00304     serial_port_.flush_buffer();
00305     usleep(10);
00306     serial_port_.flush_buffer();
00307     
00308     lock_.unlock();
00309     return false;
00310   }
00311   
00312   ROS_DEBUG_STREAM("Read " << n << " bytes");
00313   
00314   res.recv_data.resize(n);
00315   for (int ii=0; ii != n; ++ii)
00316     res.recv_data[ii] = buf[ii];
00317   delete buf;
00318   
00319   res.socket = socket;
00320     
00321   lock_.unlock();    
00322   return true;
00323 }
00324 
00325 int main(int argc, char **argv)
00326 {
00327   ros::init(argc, argv, "shared_serial_server");
00328   
00329   SerialServer serialServer;
00330 
00331   ROS_INFO("Spinning");
00332   
00333   ros::AsyncSpinner spinner(8);
00334   spinner.start();
00335   
00336   ros::Rate loop_rate(10);
00337   while (ros::ok())
00338   {
00339     serialServer.checkTimeout();
00340     serialServer.kickWatchdog();
00341     loop_rate.sleep();
00342   }
00343   
00344   return 0;
00345 }


shared_serial
Author(s): Wouter Caarls
autogenerated on Sun Jan 5 2014 11:06:47