$search
00001 /* 00002 Dashel 00003 A cross-platform DAta Stream Helper Encapsulation Library 00004 Copyright (C) 2007 -- 2012: 00005 00006 Stephane Magnenat <stephane at magnenat dot net> 00007 (http://stephane.magnenat.net) 00008 Mobots group - Laboratory of Robotics Systems, EPFL, Lausanne 00009 (http://mobots.epfl.ch) 00010 00011 Sebastian Gerlach 00012 Kenzan Technologies 00013 (http://www.kenzantech.com) 00014 00015 All rights reserved. 00016 00017 Redistribution and use in source and binary forms, with or without 00018 modification, are permitted provided that the following conditions are met: 00019 * Redistributions of source code must retain the above copyright 00020 notice, this list of conditions and the following disclaimer. 00021 * Redistributions in binary form must reproduce the above copyright 00022 notice, this list of conditions and the following disclaimer in the 00023 documentation and/or other materials provided with the distribution. 00024 * Neither the names of "Mobots", "Laboratory of Robotics Systems", "EPFL", 00025 "Kenzan Technologies" nor the names of the contributors may be used to 00026 endorse or promote products derived from this software without specific 00027 prior written permission. 00028 00029 THIS SOFTWARE IS PROVIDED BY COPYRIGHT HOLDERS ``AS IS'' AND ANY 00030 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 00031 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 00032 DISCLAIMED. IN NO EVENT SHALL COPYRIGHT HOLDERS BE LIABLE FOR ANY 00033 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 00034 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00035 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND 00036 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 00037 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 00038 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00039 */ 00040 00041 #include <string.h> 00042 #include <cassert> 00043 #include <cstdlib> 00044 #include <map> 00045 #include <vector> 00046 #include <valarray> 00047 #include <algorithm> 00048 #include <iostream> 00049 #include <sstream> 00050 #include <signal.h> 00051 #include <errno.h> 00052 #include <unistd.h> 00053 #include <termios.h> 00054 #include <fcntl.h> 00055 #include <netdb.h> 00056 #include <signal.h> 00057 #include <sys/select.h> 00058 #include <sys/time.h> 00059 #include <sys/types.h> 00060 #include <sys/stat.h> 00061 #include <pthread.h> 00062 00063 #ifdef __APPLE__ 00064 #define MACOSX 00065 #endif 00066 00067 #ifdef MACOSX 00068 #define USE_POLL_EMU 00069 #endif 00070 00071 #ifdef MACOSX 00072 #include <CoreFoundation/CoreFoundation.h> 00073 #include <IOKit/IOKitLib.h> 00074 #include <IOKit/serial/IOSerialKeys.h> 00075 #endif 00076 00077 #ifdef USE_LIBUDEV 00078 extern "C" { 00079 #include <libudev.h> 00080 } 00081 #endif 00082 00083 #ifdef USE_HAL 00084 #include <hal/libhal.h> 00085 #endif 00086 00087 #ifndef USE_POLL_EMU 00088 #include <poll.h> 00089 #else 00090 #include "poll_emu.h" 00091 #endif 00092 00093 #include "dashel-private.h" 00094 #include "dashel-posix.h" 00095 00096 00097 00102 namespace Dashel 00103 { 00104 using namespace std; 00105 00106 // Exception 00107 00108 void Stream::fail(DashelException::Source s, int se, const char* reason) 00109 { 00110 string sysMessage; 00111 failedFlag = true; 00112 00113 if (se) 00114 sysMessage = strerror(errno); 00115 00116 failReason = reason; 00117 failReason += " "; 00118 failReason += sysMessage; 00119 00120 throw DashelException(s, se, failReason.c_str(), this); 00121 } 00122 00123 // Serial port enumerator 00124 00125 std::map<int, std::pair<std::string, std::string> > SerialPortEnumerator::getPorts() 00126 { 00127 std::map<int, std::pair<std::string, std::string> > ports; 00128 00129 00130 00131 #ifdef MACOSX 00132 // use IOKit to enumerates devices 00133 00134 // get a matching dictionary to specify which IOService class we're interested in 00135 CFMutableDictionaryRef classesToMatch = IOServiceMatching(kIOSerialBSDServiceValue); 00136 if (classesToMatch == NULL) 00137 throw DashelException(DashelException::EnumerationError, 0, "IOServiceMatching returned a NULL dictionary"); 00138 00139 // specify all types of serial devices 00140 CFDictionarySetValue(classesToMatch, CFSTR(kIOSerialBSDTypeKey), CFSTR(kIOSerialBSDAllTypes)); 00141 00142 // get an iterator to serial port services 00143 io_iterator_t matchingServices; 00144 kern_return_t kernResult = IOServiceGetMatchingServices(kIOMasterPortDefault, classesToMatch, &matchingServices); 00145 if (KERN_SUCCESS != kernResult) 00146 throw DashelException(DashelException::EnumerationError, kernResult, "IOServiceGetMatchingServices failed"); 00147 00148 // iterate over services 00149 io_object_t modemService; 00150 int index = 0; 00151 while((modemService = IOIteratorNext(matchingServices))) 00152 { 00153 // get path for device 00154 CFTypeRef bsdPathAsCFString = IORegistryEntryCreateCFProperty(modemService, CFSTR(kIOCalloutDeviceKey), kCFAllocatorDefault, 0); 00155 if (bsdPathAsCFString) 00156 { 00157 std::string path; 00158 char cStr[255]; 00159 std::string name; 00160 00161 bool res = CFStringGetCString((CFStringRef) bsdPathAsCFString, cStr, 255, kCFStringEncodingUTF8); 00162 if(res) 00163 path = cStr; 00164 else 00165 throw DashelException(DashelException::EnumerationError, 0, "CFStringGetCString failed"); 00166 00167 CFRelease(bsdPathAsCFString); 00168 00169 CFTypeRef fn = IORegistryEntrySearchCFProperty(modemService, kIOServicePlane, CFSTR("USB Product Name"), kCFAllocatorDefault, 00170 kIORegistryIterateRecursively | kIORegistryIterateParents); 00171 if(fn) { 00172 res = CFStringGetCString((CFStringRef) fn, cStr, 255, kCFStringEncodingUTF8); 00173 if(res) 00174 name = cStr; 00175 else 00176 throw DashelException(DashelException::EnumerationError, 0, "CFStringGetString failed"); 00177 00178 CFRelease(fn); 00179 } else 00180 name = "Serial Port"; 00181 name = name + " (" + path + ")"; 00182 ports[index++] = std::make_pair<std::string, std::string>(path, name); 00183 } 00184 else 00185 throw DashelException(DashelException::EnumerationError, 0, "IORegistryEntryCreateCFProperty returned a NULL path"); 00186 00187 // release service 00188 IOObjectRelease(modemService); 00189 } 00190 00191 IOObjectRelease(matchingServices); 00192 00193 00194 #elif defined(USE_LIBUDEV) 00195 00196 struct udev *udev; 00197 struct udev_enumerate *enumerate; 00198 struct udev_list_entry *devices, *dev_list_entry; 00199 struct udev_device *dev; 00200 int index = 0; 00201 00202 udev = udev_new(); 00203 if(!udev) 00204 throw DashelException(DashelException::EnumerationError, 0, "Cannot create udev context"); 00205 00206 enumerate = udev_enumerate_new(udev); 00207 udev_enumerate_add_match_subsystem(enumerate, "tty"); 00208 udev_enumerate_scan_devices(enumerate); 00209 devices = udev_enumerate_get_list_entry(enumerate); 00210 00211 udev_list_entry_foreach(dev_list_entry, devices) { 00212 const char *sysfs_path; 00213 struct udev_device *usb_dev; 00214 const char * path; 00215 struct stat st; 00216 unsigned int maj,min; 00217 00218 /* Get sysfs path and create the udev device */ 00219 sysfs_path = udev_list_entry_get_name(dev_list_entry); 00220 dev = udev_device_new_from_syspath(udev, sysfs_path); 00221 00222 // Some sanity check 00223 path = udev_device_get_devnode(dev); 00224 if(stat(path, &st)) 00225 throw DashelException(DashelException::EnumerationError, 0, "Cannot stat serial port"); 00226 00227 if(!S_ISCHR(st.st_mode)) 00228 throw DashelException(DashelException::EnumerationError, 0, "Serial port is not character device"); 00229 00230 // Get the major/minor number 00231 maj = major(st.st_rdev); 00232 min = minor(st.st_rdev); 00233 00234 // Ignore all the non physical ports 00235 if(!(maj == 2 || (maj == 4 && min < 64) || maj == 3 || maj == 5)) { 00236 ostringstream oss; 00237 00238 // Check if usb, if yes get the device name 00239 usb_dev = udev_device_get_parent_with_subsystem_devtype(dev,"usb","usb_device"); 00240 if(usb_dev) 00241 oss << udev_device_get_sysattr_value(usb_dev,"product"); 00242 else 00243 oss << "Serial Port"; 00244 00245 oss << " (" << path << ")"; 00246 00247 ports[index++] = std::make_pair<std::string, std::string>(path,oss.str()); 00248 } 00249 udev_device_unref(dev); 00250 } 00251 00252 udev_enumerate_unref(enumerate); 00253 00254 udev_unref(udev); 00255 00256 #elif defined(USE_HAL) 00257 00258 // use HAL to enumerates devices 00259 DBusConnection* dbusConnection = dbus_bus_get(DBUS_BUS_SYSTEM, 0); 00260 if (!dbusConnection) 00261 throw DashelException(DashelException::EnumerationError, 0, "cannot connect to D-BUS."); 00262 00263 LibHalContext* halContext = libhal_ctx_new(); 00264 if (!halContext) 00265 throw DashelException(DashelException::EnumerationError, 0, "cannot create HAL context: cannot create context"); 00266 if (!libhal_ctx_set_dbus_connection(halContext, dbusConnection)) 00267 throw DashelException(DashelException::EnumerationError, 0, "cannot create HAL context: cannot connect to D-BUS"); 00268 if (!libhal_ctx_init(halContext, 0)) 00269 throw DashelException(DashelException::EnumerationError, 0, "cannot create HAL context: cannot init context"); 00270 00271 int devicesCount; 00272 char** devices = libhal_find_device_by_capability(halContext, "serial", &devicesCount, 0); 00273 for (int i = 0; i < devicesCount; i++) 00274 { 00275 char* devFileName = libhal_device_get_property_string(halContext, devices[i], "serial.device", 0); 00276 char* info = libhal_device_get_property_string(halContext, devices[i], "info.product", 0); 00277 int port = libhal_device_get_property_int(halContext, devices[i], "serial.port", 0); 00278 00279 ostringstream oss; 00280 oss << info << " " << port; 00281 ports[devicesCount - i] = std::make_pair<std::string, std::string>(devFileName, oss.str()); 00282 00283 libhal_free_string(info); 00284 libhal_free_string(devFileName); 00285 } 00286 00287 libhal_free_string_array(devices); 00288 libhal_ctx_shutdown(halContext, 0); 00289 libhal_ctx_free(halContext); 00290 #endif 00291 00292 return ports; 00293 }; 00294 00295 // Asserted dynamic cast 00296 00298 template<typename Derived, typename Base> 00299 inline Derived polymorphic_downcast(Base base) 00300 { 00301 Derived derived = dynamic_cast<Derived>(base); 00302 assert(derived); 00303 return derived; 00304 } 00305 00306 // Streams 00307 00308 #define RECV_BUFFER_SIZE 4096 00309 00310 SelectableStream::SelectableStream(const string& protocolName) : 00311 Stream(protocolName), 00312 fd(-1), 00313 writeOnly(false) 00314 { 00315 00316 } 00317 00318 SelectableStream::~SelectableStream() 00319 { 00320 // on POSIX, do not close stdin, stdout, nor stderr 00321 if (fd >= 3) 00322 close(fd); 00323 } 00324 00326 class DisconnectableStream: public SelectableStream 00327 { 00328 protected: 00329 friend class Hub; 00330 unsigned char recvBuffer[RECV_BUFFER_SIZE]; 00331 size_t recvBufferPos; 00332 size_t recvBufferSize; 00333 00334 public: 00336 DisconnectableStream(const string& protocolName) : 00337 Stream(protocolName), 00338 SelectableStream(protocolName), 00339 recvBufferPos(0), 00340 recvBufferSize(0) 00341 { 00342 00343 } 00344 00346 virtual bool isDataInRecvBuffer() const { return recvBufferPos != recvBufferSize; } 00347 }; 00348 00350 class SocketStream: public DisconnectableStream 00351 { 00352 protected: 00353 #ifndef TCP_CORK 00354 00355 enum Consts 00356 { 00357 SEND_BUFFER_SIZE_INITIAL = 256, 00358 SEND_BUFFER_SIZE_LIMIT = 65536 00359 }; 00360 00361 ExpandableBuffer sendBuffer; 00362 #endif 00363 00364 public: 00366 SocketStream(const string& targetName) : 00367 Stream("tcp"), 00368 DisconnectableStream("tcp") 00369 #ifndef TCP_CORK 00370 ,sendBuffer(SEND_BUFFER_SIZE_INITIAL) 00371 #endif 00372 { 00373 target.add("tcp:host;port;connectionPort=-1;sock=-1"); 00374 target.add(targetName.c_str()); 00375 00376 fd = target.get<int>("sock"); 00377 if (fd < 0) 00378 { 00379 // create socket 00380 fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); 00381 if (fd < 0) 00382 throw DashelException(DashelException::ConnectionFailed, errno, "Cannot create socket."); 00383 00384 IPV4Address remoteAddress(target.get("host"), target.get<int>("port")); 00385 00386 // connect 00387 sockaddr_in addr; 00388 addr.sin_family = AF_INET; 00389 addr.sin_port = htons(remoteAddress.port); 00390 addr.sin_addr.s_addr = htonl(remoteAddress.address); 00391 if (connect(fd, (struct sockaddr *)&addr, sizeof(addr)) != 0) 00392 throw DashelException(DashelException::ConnectionFailed, errno, "Cannot connect to remote host."); 00393 00394 // overwrite target name with a canonical one 00395 target.add(remoteAddress.format().c_str()); 00396 target.erase("connectionPort"); 00397 } 00398 else 00399 { 00400 // remove file descriptor information from target name 00401 target.erase("sock"); 00402 } 00403 00404 // setup TCP Cork for delayed sending 00405 #ifdef TCP_CORK 00406 int flag = 1; 00407 setsockopt(fd, IPPROTO_TCP, TCP_CORK, &flag , sizeof(flag)); 00408 #endif 00409 } 00410 00411 virtual ~SocketStream() 00412 { 00413 if (!failed()) 00414 flush(); 00415 00416 if (fd >= 0) 00417 shutdown(fd, SHUT_RDWR); 00418 } 00419 00420 virtual void write(const void *data, const size_t size) 00421 { 00422 assert(fd >= 0); 00423 00424 if (size == 0) 00425 return; 00426 00427 #ifdef TCP_CORK 00428 send(data, size); 00429 #else 00430 if (size >= SEND_BUFFER_SIZE_LIMIT) 00431 { 00432 flush(); 00433 send(data, size); 00434 } 00435 else 00436 { 00437 sendBuffer.add(data, size); 00438 if (sendBuffer.size() >= SEND_BUFFER_SIZE_LIMIT) 00439 flush(); 00440 } 00441 #endif 00442 } 00443 00445 void send(const void *data, size_t size) 00446 { 00447 assert(fd >= 0); 00448 00449 unsigned char *ptr = (unsigned char *)data; 00450 size_t left = size; 00451 00452 while (left) 00453 { 00454 #ifdef MACOSX 00455 ssize_t len = ::send(fd, ptr, left, 0); 00456 #else 00457 ssize_t len = ::send(fd, ptr, left, MSG_NOSIGNAL); 00458 #endif 00459 00460 if (len < 0) 00461 { 00462 fail(DashelException::IOError, errno, "Socket write I/O error."); 00463 } 00464 else if (len == 0) 00465 { 00466 fail(DashelException::ConnectionLost, 0, "Connection lost."); 00467 } 00468 else 00469 { 00470 ptr += len; 00471 left -= len; 00472 } 00473 } 00474 } 00475 00476 virtual void flush() 00477 { 00478 assert(fd >= 0); 00479 00480 #ifdef TCP_CORK 00481 int flag = 0; 00482 setsockopt(fd, IPPROTO_TCP, TCP_CORK, &flag , sizeof(flag)); 00483 flag = 1; 00484 setsockopt(fd, IPPROTO_TCP, TCP_CORK, &flag , sizeof(flag)); 00485 #else 00486 send(sendBuffer.get(), sendBuffer.size()); 00487 sendBuffer.clear(); 00488 #endif 00489 } 00490 00491 virtual void read(void *data, size_t size) 00492 { 00493 assert(fd >= 0); 00494 00495 if (size == 0) 00496 return; 00497 00498 unsigned char *ptr = (unsigned char *)data; 00499 size_t left = size; 00500 00501 if (isDataInRecvBuffer()) 00502 { 00503 size_t toCopy = std::min(recvBufferSize - recvBufferPos, size); 00504 memcpy(ptr, recvBuffer + recvBufferPos, toCopy); 00505 recvBufferPos += toCopy; 00506 ptr += toCopy; 00507 left -= toCopy; 00508 } 00509 00510 while (left) 00511 { 00512 ssize_t len = recv(fd, ptr, left, 0); 00513 00514 if (len < 0) 00515 { 00516 fail(DashelException::IOError, errno, "Socket read I/O error."); 00517 } 00518 else if (len == 0) 00519 { 00520 fail(DashelException::ConnectionLost, 0, "Connection lost."); 00521 } 00522 else 00523 { 00524 ptr += len; 00525 left -= len; 00526 } 00527 } 00528 } 00529 00530 virtual bool receiveDataAndCheckDisconnection() 00531 { 00532 assert(recvBufferPos == recvBufferSize); 00533 00534 ssize_t len = recv(fd, &recvBuffer, RECV_BUFFER_SIZE, 0); 00535 if (len > 0) 00536 { 00537 recvBufferSize = len; 00538 recvBufferPos = 0; 00539 return false; 00540 } 00541 else 00542 { 00543 if (len < 0) 00544 fail(DashelException::IOError, errno, "Socket read I/O error."); 00545 return true; 00546 } 00547 } 00548 }; 00549 00551 00554 class SocketServerStream : public SelectableStream 00555 { 00556 public: 00558 SocketServerStream(const std::string& targetName) : 00559 Stream("tcpin"), 00560 SelectableStream("tcpin") 00561 { 00562 target.add("tcpin:port=5000;address=0.0.0.0"); 00563 target.add(targetName.c_str()); 00564 00565 IPV4Address bindAddress(target.get("address"), target.get<int>("port")); 00566 00567 // create socket 00568 fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); 00569 if (fd < 0) 00570 throw DashelException(DashelException::ConnectionFailed, errno, "Cannot create socket."); 00571 00572 // reuse address 00573 int flag = 1; 00574 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (flag)) < 0) 00575 throw DashelException(DashelException::ConnectionFailed, errno, "Cannot set address reuse flag on socket, probably the port is already in use."); 00576 00577 // bind 00578 sockaddr_in addr; 00579 addr.sin_family = AF_INET; 00580 addr.sin_port = htons(bindAddress.port); 00581 addr.sin_addr.s_addr = htonl(bindAddress.address); 00582 if (bind(fd, (struct sockaddr *)&addr, sizeof(addr)) != 0) 00583 throw DashelException(DashelException::ConnectionFailed, errno, "Cannot bind socket to port, probably the port is already in use."); 00584 00585 // Listen on socket, backlog is sort of arbitrary. 00586 if(listen(fd, 16) < 0) 00587 throw DashelException(DashelException::ConnectionFailed, errno, "Cannot listen on socket."); 00588 } 00589 00590 virtual void write(const void *data, const size_t size) { } 00591 virtual void flush() { } 00592 virtual void read(void *data, size_t size) { } 00593 virtual bool receiveDataAndCheckDisconnection() { return false; } 00594 virtual bool isDataInRecvBuffer() const { return false; } 00595 }; 00596 00598 class UDPSocketStream: public MemoryPacketStream, public SelectableStream 00599 { 00600 private: 00601 mutable bool selectWasCalled; 00602 00603 public: 00605 UDPSocketStream(const string& targetName) : 00606 Stream("udp"), 00607 MemoryPacketStream("udp"), 00608 SelectableStream("udp"), 00609 selectWasCalled(false) 00610 { 00611 target.add("udp:port=5000;address=0.0.0.0;sock=-1"); 00612 target.add(targetName.c_str()); 00613 00614 fd = target.get<int>("sock"); 00615 if (fd < 0) 00616 { 00617 // create socket 00618 fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); 00619 if (fd < 0) 00620 throw DashelException(DashelException::ConnectionFailed, errno, "Cannot create socket."); 00621 00622 IPV4Address bindAddress(target.get("address"), target.get<int>("port")); 00623 00624 // bind 00625 sockaddr_in addr; 00626 addr.sin_family = AF_INET; 00627 addr.sin_port = htons(bindAddress.port); 00628 addr.sin_addr.s_addr = htonl(bindAddress.address); 00629 if (bind(fd, (struct sockaddr *)&addr, sizeof(addr)) != 0) 00630 throw DashelException(DashelException::ConnectionFailed, errno, "Cannot bind socket to port, probably the port is already in use."); 00631 } 00632 else 00633 { 00634 // remove file descriptor information from target name 00635 target.erase("sock"); 00636 } 00637 00638 // enable broadcast 00639 int broadcastPermission = 1; 00640 setsockopt(fd, SOL_SOCKET, SO_BROADCAST, &broadcastPermission, sizeof(broadcastPermission)); 00641 } 00642 00643 virtual void send(const IPV4Address& dest) 00644 { 00645 sockaddr_in addr; 00646 addr.sin_family = AF_INET; 00647 addr.sin_port = htons(dest.port);; 00648 addr.sin_addr.s_addr = htonl(dest.address); 00649 00650 if (sendto(fd, sendBuffer.get(), sendBuffer.size(), 0, (struct sockaddr *)&addr, sizeof(addr)) != sendBuffer.size()) 00651 fail(DashelException::IOError, errno, "UDP Socket write I/O error."); 00652 00653 sendBuffer.clear(); 00654 } 00655 00656 virtual void receive(IPV4Address& source) 00657 { 00658 unsigned char buf[4096]; 00659 sockaddr_in addr; 00660 socklen_t addrLen = sizeof(addr); 00661 ssize_t recvCount = recvfrom(fd, buf, 4096, 0, (struct sockaddr *)&addr, &addrLen); 00662 if (recvCount <= 0) 00663 fail(DashelException::ConnectionLost, errno, "UDP Socket read I/O error."); 00664 00665 receptionBuffer.resize(recvCount); 00666 std::copy(buf, buf+recvCount, receptionBuffer.begin()); 00667 00668 source = IPV4Address(ntohl(addr.sin_addr.s_addr), ntohs(addr.sin_port)); 00669 } 00670 00671 virtual bool receiveDataAndCheckDisconnection() { selectWasCalled = true; return false; } 00672 virtual bool isDataInRecvBuffer() const { bool ret = selectWasCalled; selectWasCalled = false; return ret; } 00673 }; 00674 00675 00677 class FileDescriptorStream: public DisconnectableStream 00678 { 00679 public: 00681 FileDescriptorStream(const string& protocolName) : 00682 Stream(protocolName), 00683 DisconnectableStream(protocolName) 00684 { } 00685 00686 virtual void write(const void *data, const size_t size) 00687 { 00688 assert(fd >= 0); 00689 00690 if (size == 0) 00691 return; 00692 00693 const char *ptr = (const char *)data; 00694 size_t left = size; 00695 00696 while (left) 00697 { 00698 ssize_t len = ::write(fd, ptr, left); 00699 00700 if (len < 0) 00701 { 00702 fail(DashelException::IOError, errno, "File write I/O error."); 00703 } 00704 else if (len == 0) 00705 { 00706 fail(DashelException::ConnectionLost, 0, "File full."); 00707 } 00708 else 00709 { 00710 ptr += len; 00711 left -= len; 00712 } 00713 } 00714 } 00715 00716 virtual void flush() 00717 { 00718 assert(fd >= 0); 00719 00720 #ifdef MACOSX 00721 if (fsync(fd) < 0) 00722 #else 00723 if (fdatasync(fd) < 0) 00724 #endif 00725 { 00726 fail(DashelException::IOError, errno, "File flush error."); 00727 } 00728 } 00729 00730 virtual void read(void *data, size_t size) 00731 { 00732 assert(fd >= 0); 00733 00734 if (size == 0) 00735 return; 00736 00737 char *ptr = (char *)data; 00738 size_t left = size; 00739 00740 if (isDataInRecvBuffer()) 00741 { 00742 size_t toCopy = std::min(recvBufferSize - recvBufferPos, size); 00743 memcpy(ptr, recvBuffer + recvBufferPos, toCopy); 00744 recvBufferPos += toCopy; 00745 ptr += toCopy; 00746 left -= toCopy; 00747 } 00748 00749 while (left) 00750 { 00751 ssize_t len = ::read(fd, ptr, left); 00752 00753 if (len < 0) 00754 { 00755 fail(DashelException::IOError, errno, "File read I/O error."); 00756 } 00757 else if (len == 0) 00758 { 00759 fail(DashelException::ConnectionLost, 0, "Reached end of file."); 00760 } 00761 else 00762 { 00763 ptr += len; 00764 left -= len; 00765 } 00766 } 00767 } 00768 00769 virtual bool receiveDataAndCheckDisconnection() 00770 { 00771 assert(recvBufferPos == recvBufferSize); 00772 00773 ssize_t len = ::read(fd, &recvBuffer, RECV_BUFFER_SIZE); 00774 if (len > 0) 00775 { 00776 recvBufferSize = len; 00777 recvBufferPos = 0; 00778 return false; 00779 } 00780 else 00781 { 00782 if (len < 0) 00783 fail(DashelException::IOError, errno, "File read I/O error."); 00784 return true; 00785 } 00786 } 00787 }; 00788 00790 class FileStream: public FileDescriptorStream 00791 { 00792 public: 00794 FileStream(const string& targetName) : 00795 Stream("file"), 00796 FileDescriptorStream("file") 00797 { 00798 target.add("file:name;mode=read;fd=-1"); 00799 target.add(targetName.c_str()); 00800 fd = target.get<int>("fd"); 00801 if (fd < 0) 00802 { 00803 const std::string name = target.get("name"); 00804 const std::string mode = target.get("mode"); 00805 00806 // open file 00807 if (mode == "read") 00808 fd = open(name.c_str(), O_RDONLY); 00809 else if (mode == "write") 00810 fd = creat(name.c_str(), S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP), writeOnly = true; 00811 else if (mode == "readwrite") 00812 fd = open(name.c_str(), O_RDWR); 00813 else 00814 throw DashelException(DashelException::InvalidTarget, 0, "Invalid file mode."); 00815 if (fd == -1) 00816 { 00817 string errorMessage = "Cannot open file " + name + " for " + mode + "."; 00818 throw DashelException(DashelException::ConnectionFailed, errno, errorMessage.c_str()); 00819 } 00820 } 00821 else 00822 { 00823 // remove file descriptor information from target name 00824 target.erase("fd"); 00825 } 00826 } 00827 }; 00828 00830 struct StdinStream: public FileStream 00831 { 00832 StdinStream(const string& targetName): 00833 Stream("file"), 00834 FileStream("file:name=/dev/stdin;mode=read;fd=0") {} 00835 }; 00836 00838 struct StdoutStream: public FileStream 00839 { 00840 StdoutStream(const string& targetName): 00841 Stream("file"), 00842 FileStream("file:name=/dev/stdout;mode=write;fd=1") {} 00843 }; 00844 00846 class SerialStream: public FileDescriptorStream 00847 { 00848 protected: 00849 struct termios oldtio; 00850 00851 public: 00853 SerialStream(const string& targetName) : 00854 Stream("ser"), 00855 FileDescriptorStream("ser") 00856 { 00857 target.add("ser:port=1;baud=115200;stop=1;parity=none;fc=none;bits=8"); 00858 target.add(targetName.c_str()); 00859 string devFileName; 00860 00861 if (target.isSet("device")) 00862 { 00863 target.addParam("device", NULL, true); 00864 target.erase("port"); 00865 target.erase("name"); 00866 00867 devFileName = target.get("device"); 00868 } 00869 else if (target.isSet("name")) 00870 { 00871 target.addParam("name", NULL, true); 00872 target.erase("port"); 00873 target.erase("device"); 00874 00875 // Enumerates the ports 00876 std::string name = target.get("name"); 00877 std::map<int, std::pair<std::string, std:: string> > ports = SerialPortEnumerator::getPorts(); 00878 00879 // Iterate on all ports to found one with "name" in its description 00880 std::map<int, std::pair<std::string, std:: string> >::iterator it; 00881 for (it = ports.begin(); it != ports.end(); it++) 00882 { 00883 if (it->second.second.find(name) != std::string::npos) 00884 { 00885 devFileName = it->second.first; 00886 std::cout << "Found " << name << " on port " << devFileName << std::endl; 00887 break; 00888 } 00889 } 00890 if (devFileName.size() == 0) 00891 throw DashelException(DashelException::ConnectionFailed, 0, "The specified name could not be find among the serial ports."); 00892 } 00893 else // port 00894 { 00895 target.erase("device"); 00896 target.erase("name"); 00897 00898 std::map<int, std::pair<std::string, std::string> > ports = SerialPortEnumerator::getPorts(); 00899 std::map<int, std::pair<std::string, std::string> >::const_iterator it = ports.find(target.get<int>("port")); 00900 if (it != ports.end()) 00901 { 00902 devFileName = it->first; 00903 } 00904 else 00905 throw DashelException(DashelException::ConnectionFailed, 0, "The specified serial port does not exists."); 00906 } 00907 00908 fd = open(devFileName.c_str(), O_RDWR); 00909 00910 if (fd == -1) 00911 throw DashelException(DashelException::ConnectionFailed, 0, "Cannot open serial port."); 00912 00913 struct termios newtio; 00914 00915 // save old serial port state and clear new one 00916 tcgetattr(fd, &oldtio); 00917 memset(&newtio, 0, sizeof(newtio)); 00918 00919 newtio.c_cflag |= CLOCAL; // ignore modem control lines. 00920 newtio.c_cflag |= CREAD; // enable receiver. 00921 switch (target.get<int>("bits")) // Set amount of bits per character 00922 { 00923 case 5: newtio.c_cflag |= CS5; break; 00924 case 6: newtio.c_cflag |= CS6; break; 00925 case 7: newtio.c_cflag |= CS7; break; 00926 case 8: newtio.c_cflag |= CS8; break; 00927 default: throw DashelException(DashelException::InvalidTarget, 0, "Invalid number of bits per character, must be 5, 6, 7, or 8."); 00928 } 00929 if (target.get("stop") == "2") 00930 newtio.c_cflag |= CSTOPB; // Set two stop bits, rather than one. 00931 if (target.get("fc") == "hard") 00932 newtio.c_cflag |= CRTSCTS; // enable hardware flow control 00933 if (target.get("parity") != "none") 00934 { 00935 newtio.c_cflag |= PARENB; // enable parity generation on output and parity checking for input. 00936 if (target.get("parity") == "odd") 00937 newtio.c_cflag |= PARODD; // parity for input and output is odd. 00938 } 00939 00940 #ifdef MACOSX 00941 if (cfsetspeed(&newtio,target.get<int>("baud")) != 0) 00942 throw DashelException(DashelException::ConnectionFailed, errno, "Invalid baud rate."); 00943 #else 00944 switch (target.get<int>("baud")) 00945 { 00946 case 50: newtio.c_cflag |= B50; break; 00947 case 75: newtio.c_cflag |= B75; break; 00948 case 110: newtio.c_cflag |= B110; break; 00949 case 134: newtio.c_cflag |= B134; break; 00950 case 150: newtio.c_cflag |= B150; break; 00951 case 200: newtio.c_cflag |= B200; break; 00952 case 300: newtio.c_cflag |= B300; break; 00953 case 600: newtio.c_cflag |= B600; break; 00954 case 1200: newtio.c_cflag |= B1200; break; 00955 case 1800: newtio.c_cflag |= B1800; break; 00956 case 2400: newtio.c_cflag |= B2400; break; 00957 case 4800: newtio.c_cflag |= B4800; break; 00958 case 9600: newtio.c_cflag |= B9600; break; 00959 case 19200: newtio.c_cflag |= B19200; break; 00960 case 38400: newtio.c_cflag |= B38400; break; 00961 case 57600: newtio.c_cflag |= B57600; break; 00962 case 115200: newtio.c_cflag |= B115200; break; 00963 case 230400: newtio.c_cflag |= B230400; break; 00964 case 460800: newtio.c_cflag |= B460800; break; 00965 case 500000: newtio.c_cflag |= B500000; break; 00966 case 576000: newtio.c_cflag |= B576000; break; 00967 case 921600: newtio.c_cflag |= B921600; break; 00968 case 1000000: newtio.c_cflag |= B1000000; break; 00969 case 1152000: newtio.c_cflag |= B1152000; break; 00970 case 1500000: newtio.c_cflag |= B1500000; break; 00971 case 2000000: newtio.c_cflag |= B2000000; break; 00972 case 2500000: newtio.c_cflag |= B2500000; break; 00973 case 3000000: newtio.c_cflag |= B3000000; break; 00974 case 3500000: newtio.c_cflag |= B3500000; break; 00975 case 4000000: newtio.c_cflag |= B4000000; break; 00976 default: throw DashelException(DashelException::ConnectionFailed, 0, "Invalid baud rate."); 00977 } 00978 #endif 00979 00980 newtio.c_iflag = IGNPAR; // ignore parity on input 00981 00982 newtio.c_oflag = 0; 00983 00984 newtio.c_lflag = 0; 00985 00986 newtio.c_cc[VTIME] = 0; // block forever if no byte 00987 newtio.c_cc[VMIN] = 1; // one byte is sufficient to return 00988 00989 // set attributes 00990 if ((tcflush(fd, TCIOFLUSH) < 0) || (tcsetattr(fd, TCSANOW, &newtio) < 0)) 00991 throw DashelException(DashelException::ConnectionFailed, 0, "Cannot setup serial port. The requested baud rate might not be supported."); 00992 } 00993 00995 virtual ~SerialStream() 00996 { 00997 tcsetattr(fd, TCSANOW, &oldtio); 00998 } 00999 01000 virtual void flush() 01001 { 01002 } 01003 }; 01004 01005 01006 /* 01007 We have decided to let the application choose what to do with signals. 01008 Hub::stop() being thread safe, it is ok to let the user decide. 01009 01010 // Signal handler for SIGTERM 01011 01012 typedef std::set<Hub*> HubsSet; 01013 typedef HubsSet::iterator HubsSetIterator; 01014 01016 static HubsSet allHubs; 01017 01019 void termHandler(int t) 01020 { 01021 for (HubsSetIterator it = allHubs.begin(); it != allHubs.end(); ++it) 01022 { 01023 (*it)->stop(); 01024 } 01025 } 01026 01027 01029 static class SigTermHandlerSetuper 01030 { 01031 public: 01033 SigTermHandlerSetuper() 01034 { 01035 struct sigaction new_act, old_act; 01036 new_act.sa_handler = termHandler; 01037 sigemptyset(&new_act.sa_mask); 01038 01039 new_act.sa_flags = 0; 01040 01041 sigaction(SIGTERM, &new_act, &old_act); 01042 sigaction(SIGINT, &new_act, &old_act); 01043 } 01044 } staticSigTermHandlerSetuper; 01045 */ 01046 01047 // Hub 01048 01049 Hub::Hub(const bool resolveIncomingNames): 01050 resolveIncomingNames(resolveIncomingNames) 01051 { 01052 int *terminationPipes = new int[2]; 01053 if (pipe(terminationPipes) != 0) 01054 abort(); 01055 hTerminate = terminationPipes; 01056 01057 streamsLock = new pthread_mutex_t; 01058 01059 pthread_mutex_init((pthread_mutex_t*)streamsLock, NULL); 01060 01061 // commented because we let the users manage the signal themselves 01062 //allHubs.insert(this); 01063 } 01064 01065 Hub::~Hub() 01066 { 01067 // commented because we let the users manage the signal themselves 01068 //allHubs.erase(this); 01069 01070 int *terminationPipes = (int*)hTerminate; 01071 close(terminationPipes[0]); 01072 close(terminationPipes[1]); 01073 delete[] terminationPipes; 01074 01075 for (StreamsSet::iterator it = streams.begin(); it != streams.end(); ++it) 01076 delete *it; 01077 01078 pthread_mutex_destroy((pthread_mutex_t*)streamsLock); 01079 01080 delete (pthread_mutex_t*) streamsLock; 01081 } 01082 01083 Stream* Hub::connect(const std::string &target) 01084 { 01085 std::string proto, params; 01086 size_t c = target.find_first_of(':'); 01087 if (c == std::string::npos) 01088 throw DashelException(DashelException::InvalidTarget, 0, "No protocol specified in target."); 01089 proto = target.substr(0, c); 01090 params = target.substr(c+1); 01091 01092 SelectableStream *s(dynamic_cast<SelectableStream*>(streamTypeRegistry.create(proto, target, *this))); 01093 if(!s) 01094 { 01095 std::string r = "Invalid protocol in target: "; 01096 r += proto; 01097 r += ", known protocol are: "; 01098 r += streamTypeRegistry.list(); 01099 throw DashelException(DashelException::InvalidTarget, 0, r.c_str()); 01100 } 01101 01102 /* The caller must have the stream lock held */ 01103 01104 streams.insert(s); 01105 if (proto != "tcpin") 01106 { 01107 dataStreams.insert(s); 01108 connectionCreated(s); 01109 } 01110 01111 return s; 01112 } 01113 01114 void Hub::run() 01115 { 01116 while (step(-1)); 01117 } 01118 01119 bool Hub::step(const int timeout) 01120 { 01121 bool firstPoll = true; 01122 bool wasActivity = false; 01123 bool runInterrupted = false; 01124 01125 pthread_mutex_lock((pthread_mutex_t*)streamsLock); 01126 01127 do 01128 { 01129 wasActivity = false; 01130 size_t streamsCount = streams.size(); 01131 valarray<struct pollfd> pollFdsArray(streamsCount+1); 01132 valarray<SelectableStream*> streamsArray(streamsCount); 01133 01134 // add streams 01135 size_t i = 0; 01136 for (StreamsSet::iterator it = streams.begin(); it != streams.end(); ++it) 01137 { 01138 SelectableStream* stream = polymorphic_downcast<SelectableStream*>(*it); 01139 01140 streamsArray[i] = stream; 01141 pollFdsArray[i].fd = stream->fd; 01142 pollFdsArray[i].events = 0; 01143 if ((!stream->failed()) && (!stream->writeOnly)) 01144 pollFdsArray[i].events |= POLLIN; 01145 01146 i++; 01147 } 01148 // add pipe 01149 int *terminationPipes = (int*)hTerminate; 01150 pollFdsArray[i].fd = terminationPipes[0]; 01151 pollFdsArray[i].events = POLLIN; 01152 01153 // do poll and check for error 01154 int thisPollTimeout = firstPoll ? timeout : 0; 01155 firstPoll = false; 01156 01157 pthread_mutex_unlock((pthread_mutex_t*)streamsLock); 01158 01159 #ifndef USE_POLL_EMU 01160 int ret = poll(&pollFdsArray[0], pollFdsArray.size(), thisPollTimeout); 01161 #else 01162 int ret = poll_emu(&pollFdsArray[0], pollFdsArray.size(), thisPollTimeout); 01163 #endif 01164 if (ret < 0) 01165 throw DashelException(DashelException::SyncError, errno, "Error during poll."); 01166 01167 pthread_mutex_lock((pthread_mutex_t*)streamsLock); 01168 01169 // check streams for errors 01170 for (i = 0; i < streamsCount; i++) 01171 { 01172 SelectableStream* stream = streamsArray[i]; 01173 01174 // make sure we do not try to handle removed streams 01175 if (streams.find(stream) == streams.end()) 01176 continue; 01177 01178 assert((pollFdsArray[i].revents & POLLNVAL) == 0); 01179 01180 if (pollFdsArray[i].revents & POLLERR) 01181 { 01182 //std::cerr << "POLLERR" << std::endl; 01183 wasActivity = true; 01184 01185 try 01186 { 01187 stream->fail(DashelException::SyncError, 0, "Error on stream during poll."); 01188 } 01189 catch (DashelException e) 01190 { 01191 assert(e.stream); 01192 } 01193 01194 try 01195 { 01196 connectionClosed(stream, true); 01197 } 01198 catch (DashelException e) 01199 { 01200 assert(e.stream); 01201 } 01202 01203 closeStream(stream); 01204 } 01205 else if (pollFdsArray[i].revents & POLLHUP) 01206 { 01207 //std::cerr << "POLLHUP" << std::endl; 01208 wasActivity = true; 01209 01210 try 01211 { 01212 connectionClosed(stream, false); 01213 } 01214 catch (DashelException e) 01215 { 01216 assert(e.stream); 01217 } 01218 01219 closeStream(stream); 01220 } 01221 else if (pollFdsArray[i].revents & POLLIN) 01222 { 01223 //std::cerr << "POLLIN" << std::endl; 01224 wasActivity = true; 01225 01226 // test if listen stream 01227 SocketServerStream* serverStream = dynamic_cast<SocketServerStream*>(stream); 01228 01229 if (serverStream) 01230 { 01231 // accept connection 01232 struct sockaddr_in targetAddr; 01233 socklen_t l = sizeof (targetAddr); 01234 int targetFD = accept (stream->fd, (struct sockaddr *)&targetAddr, &l); 01235 if (targetFD < 0) 01236 { 01237 pthread_mutex_unlock((pthread_mutex_t*)streamsLock); 01238 throw DashelException(DashelException::SyncError, errno, "Cannot accept new stream."); 01239 } 01240 01241 // create a target stream using the new file descriptor from accept 01242 ostringstream targetName; 01243 targetName << IPV4Address(ntohl(targetAddr.sin_addr.s_addr), ntohs(targetAddr.sin_port)).format(resolveIncomingNames); 01244 targetName << ";connectionPort="; 01245 targetName << atoi(serverStream->getTargetParameter("port").c_str()); 01246 targetName << ";sock="; 01247 targetName << targetFD; 01248 connect(targetName.str()); 01249 } 01250 else 01251 { 01252 bool streamClosed = false; 01253 try 01254 { 01255 if (stream->receiveDataAndCheckDisconnection()) 01256 { 01257 //std::cerr << "connection closed" << std::endl; 01258 connectionClosed(stream, false); 01259 streamClosed = true; 01260 } 01261 else 01262 { 01263 // read all data available on this socket 01264 while (stream->isDataInRecvBuffer()) 01265 incomingData(stream); 01266 //std::cerr << "incoming data" << std::endl; 01267 } 01268 } 01269 catch (DashelException e) 01270 { 01271 //std::cerr << "exception on POLLIN" << std::endl; 01272 assert(e.stream); 01273 } 01274 01275 if (streamClosed) 01276 closeStream(stream); 01277 } 01278 } 01279 } 01280 // check pipe for termination 01281 if (pollFdsArray[i].revents) 01282 { 01283 char c; 01284 const ssize_t ret = read(pollFdsArray[i].fd, &c, 1); 01285 if (ret != 1) 01286 abort(); // poll did notify us that there was something to read, but we did not read anything, this is a bug 01287 runInterrupted = true; 01288 } 01289 01290 // collect and remove all failed streams 01291 vector<Stream*> failedStreams; 01292 for (StreamsSet::iterator it = streams.begin(); it != streams.end();++it) 01293 if ((*it)->failed()) 01294 failedStreams.push_back(*it); 01295 01296 for (size_t i = 0; i < failedStreams.size(); i++) 01297 { 01298 Stream* stream = failedStreams[i]; 01299 if (streams.find(stream) == streams.end()) 01300 continue; 01301 if (stream->failed()) 01302 { 01303 try 01304 { 01305 connectionClosed(stream, true); 01306 } 01307 catch (DashelException e) 01308 { 01309 assert(e.stream); 01310 } 01311 closeStream(stream); 01312 } 01313 } 01314 } 01315 while (wasActivity && !runInterrupted); 01316 01317 pthread_mutex_unlock((pthread_mutex_t*)streamsLock); 01318 01319 return !runInterrupted; 01320 } 01321 01322 void Hub::lock() 01323 { 01324 pthread_mutex_lock((pthread_mutex_t*)streamsLock); 01325 } 01326 01327 void Hub::unlock() 01328 { 01329 pthread_mutex_unlock((pthread_mutex_t*)streamsLock); 01330 } 01331 01332 void Hub::stop() 01333 { 01334 int *terminationPipes = (int*)hTerminate; 01335 char c = 0; 01336 const ssize_t ret = write(terminationPipes[1], &c, 1); 01337 if (ret != 1) 01338 throw DashelException(DashelException::IOError, ret, "Cannot write to termination pipe."); 01339 } 01340 01341 StreamTypeRegistry::StreamTypeRegistry() 01342 { 01343 reg("file", &createInstance<FileStream>); 01344 reg("stdin", &createInstance<StdinStream>); 01345 reg("stdout", &createInstance<StdoutStream>); 01346 reg("ser", &createInstance<SerialStream>); 01347 reg("tcpin", &createInstance<SocketServerStream>); 01348 reg("tcp", &createInstance<SocketStream>); 01349 reg("udp", &createInstance<UDPSocketStream>); 01350 } 01351 01352 StreamTypeRegistry __attribute__((init_priority(1000))) streamTypeRegistry; 01353 }