dashel-posix.cpp
Go to the documentation of this file.
00001 /*
00002         Dashel
00003         A cross-platform DAta Stream Helper Encapsulation Library
00004         Copyright (C) 2007 -- 2012:
00005                 
00006                 Stephane Magnenat <stephane at magnenat dot net>
00007                         (http://stephane.magnenat.net)
00008                 Mobots group - Laboratory of Robotics Systems, EPFL, Lausanne
00009                         (http://mobots.epfl.ch)
00010                 
00011                 Sebastian Gerlach
00012                 Kenzan Technologies
00013                         (http://www.kenzantech.com)
00014         
00015         All rights reserved.
00016         
00017         Redistribution and use in source and binary forms, with or without
00018         modification, are permitted provided that the following conditions are met:
00019                 * Redistributions of source code must retain the above copyright
00020                   notice, this list of conditions and the following disclaimer.
00021                 * Redistributions in binary form must reproduce the above copyright
00022                   notice, this list of conditions and the following disclaimer in the
00023                   documentation and/or other materials provided with the distribution.
00024                 * Neither the names of "Mobots", "Laboratory of Robotics Systems", "EPFL",
00025                   "Kenzan Technologies" nor the names of the contributors may be used to
00026                   endorse or promote products derived from this software without specific
00027                   prior written permission.
00028         
00029         THIS SOFTWARE IS PROVIDED BY COPYRIGHT HOLDERS ``AS IS'' AND ANY
00030         EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
00031         WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
00032         DISCLAIMED. IN NO EVENT SHALL COPYRIGHT HOLDERS BE LIABLE FOR ANY
00033         DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
00034         (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00035         LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
00036         ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00037         (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00038         SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00039 */
00040 
00041 #include <string.h>
00042 #include <cassert>
00043 #include <cstdlib>
00044 #include <map>
00045 #include <vector>
00046 #include <valarray>
00047 #include <algorithm>
00048 #include <iostream>
00049 #include <sstream>
00050 #include <signal.h>
00051 #include <errno.h>
00052 #include <unistd.h>
00053 #include <termios.h>
00054 #include <fcntl.h>
00055 #include <netdb.h>
00056 #include <signal.h>
00057 #include <sys/select.h>
00058 #include <sys/time.h>
00059 #include <sys/types.h>
00060 #include <sys/stat.h>
00061 #include <pthread.h>
00062 
00063 #ifdef __APPLE__
00064 #define MACOSX
00065 #endif
00066 
00067 #ifdef MACOSX
00068         #define USE_POLL_EMU
00069 #endif
00070 
00071 #ifdef MACOSX
00072         #include <CoreFoundation/CoreFoundation.h>
00073         #include <IOKit/IOKitLib.h>
00074         #include <IOKit/serial/IOSerialKeys.h>
00075 #endif
00076 
00077 #ifdef USE_LIBUDEV
00078 extern "C" {
00079         #include <libudev.h>
00080 }
00081 #endif
00082 
00083 #ifdef USE_HAL
00084         #include <hal/libhal.h>
00085 #endif
00086 
00087 #ifndef USE_POLL_EMU
00088         #include <poll.h>
00089 #else
00090         #include "poll_emu.h"
00091 #endif
00092 
00093 #include "dashel-private.h"
00094 #include "dashel-posix.h"
00095 
00096 
00097 
00102 namespace Dashel
00103 {
00104         using namespace std;
00105         
00106         // Exception
00107         
00108         void Stream::fail(DashelException::Source s, int se, const char* reason)
00109         {
00110                 string sysMessage;
00111                 failedFlag = true;
00112                 
00113                 if (se)
00114                         sysMessage = strerror(errno);
00115                         
00116                 failReason = reason;
00117                 failReason += " ";
00118                 failReason += sysMessage;
00119                 
00120                 throw DashelException(s, se, failReason.c_str(), this);
00121         }
00122         
00123         // Serial port enumerator
00124         
00125         std::map<int, std::pair<std::string, std::string> > SerialPortEnumerator::getPorts()
00126         {
00127                 std::map<int, std::pair<std::string, std::string> > ports;
00128                 
00129 
00130 
00131 #ifdef MACOSX
00132                 // use IOKit to enumerates devices
00133                 
00134                 // get a matching dictionary to specify which IOService class we're interested in
00135                 CFMutableDictionaryRef classesToMatch = IOServiceMatching(kIOSerialBSDServiceValue);
00136                 if (classesToMatch == NULL)
00137                         throw DashelException(DashelException::EnumerationError, 0, "IOServiceMatching returned a NULL dictionary");
00138                 
00139                 // specify all types of serial devices
00140                 CFDictionarySetValue(classesToMatch, CFSTR(kIOSerialBSDTypeKey), CFSTR(kIOSerialBSDAllTypes));
00141                 
00142                 // get an iterator to serial port services
00143                 io_iterator_t matchingServices;
00144                 kern_return_t kernResult = IOServiceGetMatchingServices(kIOMasterPortDefault, classesToMatch, &matchingServices);    
00145                 if (KERN_SUCCESS != kernResult)
00146                         throw DashelException(DashelException::EnumerationError, kernResult, "IOServiceGetMatchingServices failed");
00147                         
00148                 // iterate over services
00149                 io_object_t modemService;
00150                 int index = 0;
00151                 while((modemService = IOIteratorNext(matchingServices)))
00152                 {
00153                         // get path for device
00154                         CFTypeRef bsdPathAsCFString = IORegistryEntryCreateCFProperty(modemService, CFSTR(kIOCalloutDeviceKey), kCFAllocatorDefault, 0);
00155                         if (bsdPathAsCFString)
00156                         {
00157                                 std::string path;
00158                                 char cStr[255];
00159                                 std::string name;
00160                                 
00161                                 bool res = CFStringGetCString((CFStringRef) bsdPathAsCFString, cStr, 255, kCFStringEncodingUTF8);
00162                                 if(res)
00163                                         path = cStr;
00164                                 else
00165                                         throw DashelException(DashelException::EnumerationError, 0, "CFStringGetCString failed");
00166                                 
00167                                 CFRelease(bsdPathAsCFString);
00168                                 
00169                                 CFTypeRef fn = IORegistryEntrySearchCFProperty(modemService, kIOServicePlane, CFSTR("USB Product Name"), kCFAllocatorDefault,
00170                                                                                 kIORegistryIterateRecursively | kIORegistryIterateParents);
00171                                 if(fn) {
00172                                         res = CFStringGetCString((CFStringRef) fn, cStr, 255, kCFStringEncodingUTF8);
00173                                         if(res) 
00174                                                 name = cStr;
00175                                         else
00176                                                 throw DashelException(DashelException::EnumerationError, 0, "CFStringGetString failed");
00177 
00178                                         CFRelease(fn);
00179                                 } else 
00180                                         name = "Serial Port";
00181                                 name = name + " (" + path + ")";
00182                                 ports[index++] = std::make_pair<std::string, std::string>(path, name);
00183                         }
00184                         else
00185                                 throw DashelException(DashelException::EnumerationError, 0, "IORegistryEntryCreateCFProperty returned a NULL path");
00186                         
00187                         // release service
00188                         IOObjectRelease(modemService);
00189                 }
00190 
00191                 IOObjectRelease(matchingServices);
00192 
00193                         
00194 #elif defined(USE_LIBUDEV)
00195 
00196                 struct udev *udev;
00197                 struct udev_enumerate *enumerate;
00198                 struct udev_list_entry *devices, *dev_list_entry;
00199                 struct udev_device *dev;
00200                 int index = 0;
00201 
00202                 udev = udev_new();
00203                 if(!udev)
00204                         throw DashelException(DashelException::EnumerationError, 0, "Cannot create udev context");
00205 
00206                 enumerate = udev_enumerate_new(udev);
00207                 udev_enumerate_add_match_subsystem(enumerate, "tty");
00208                 udev_enumerate_scan_devices(enumerate);
00209                 devices = udev_enumerate_get_list_entry(enumerate);
00210 
00211                 udev_list_entry_foreach(dev_list_entry, devices) {
00212                         const char *sysfs_path;
00213                         struct udev_device *usb_dev;
00214                         const char * path;
00215                         struct stat st;
00216                         unsigned int maj,min;
00217 
00218                         /* Get sysfs path and create the udev device */
00219                         sysfs_path = udev_list_entry_get_name(dev_list_entry);
00220                         dev = udev_device_new_from_syspath(udev, sysfs_path);
00221 
00222                         // Some sanity check
00223                         path = udev_device_get_devnode(dev);
00224                         if(stat(path, &st)) 
00225                                 throw DashelException(DashelException::EnumerationError, 0, "Cannot stat serial port");
00226                         
00227                         if(!S_ISCHR(st.st_mode))
00228                                 throw DashelException(DashelException::EnumerationError, 0, "Serial port is not character device");
00229 
00230                         // Get the major/minor number
00231                         maj = major(st.st_rdev);
00232                         min = minor(st.st_rdev);
00233 
00234                         // Ignore all the non physical ports
00235                         if(!(maj == 2 || (maj == 4 && min < 64) || maj == 3 || maj == 5)) {
00236                                 ostringstream oss;
00237 
00238                                 // Check if usb, if yes get the device name
00239                                 usb_dev = udev_device_get_parent_with_subsystem_devtype(dev,"usb","usb_device");
00240                                 if(usb_dev)
00241                                         oss << udev_device_get_sysattr_value(usb_dev,"product");
00242                                 else
00243                                         oss << "Serial Port";
00244 
00245                                 oss << " (" << path << ")";
00246 
00247                                 ports[index++] = std::make_pair<std::string, std::string>(path,oss.str());
00248                         }
00249                         udev_device_unref(dev);
00250                 }
00251                 
00252                 udev_enumerate_unref(enumerate);
00253 
00254                 udev_unref(udev);
00255 
00256 #elif defined(USE_HAL)
00257 
00258                 // use HAL to enumerates devices
00259                 DBusConnection* dbusConnection = dbus_bus_get(DBUS_BUS_SYSTEM, 0);
00260                 if (!dbusConnection)
00261                         throw DashelException(DashelException::EnumerationError, 0, "cannot connect to D-BUS.");
00262                 
00263                 LibHalContext* halContext = libhal_ctx_new();
00264                 if (!halContext)
00265                         throw DashelException(DashelException::EnumerationError, 0, "cannot create HAL context: cannot create context");
00266                 if (!libhal_ctx_set_dbus_connection(halContext, dbusConnection))
00267                         throw DashelException(DashelException::EnumerationError, 0, "cannot create HAL context: cannot connect to D-BUS");
00268                 if (!libhal_ctx_init(halContext, 0))
00269                         throw DashelException(DashelException::EnumerationError, 0, "cannot create HAL context: cannot init context");
00270                 
00271                 int devicesCount;
00272                 char** devices = libhal_find_device_by_capability(halContext, "serial", &devicesCount, 0);
00273                 for (int i = 0; i < devicesCount; i++)
00274                 {
00275                         char* devFileName = libhal_device_get_property_string(halContext, devices[i], "serial.device", 0);
00276                         char* info = libhal_device_get_property_string(halContext, devices[i], "info.product", 0);
00277                         int port = libhal_device_get_property_int(halContext, devices[i], "serial.port", 0);
00278                         
00279                         ostringstream oss;
00280                         oss << info << " " << port;
00281                         ports[devicesCount - i] = std::make_pair<std::string, std::string>(devFileName, oss.str());
00282                         
00283                         libhal_free_string(info);
00284                         libhal_free_string(devFileName);
00285                 }
00286                 
00287                 libhal_free_string_array(devices);
00288                 libhal_ctx_shutdown(halContext, 0);
00289                 libhal_ctx_free(halContext);
00290 #endif
00291                 
00292                 return ports;
00293         };
00294         
00295         // Asserted dynamic cast
00296         
00298         template<typename Derived, typename Base>
00299         inline Derived polymorphic_downcast(Base base)
00300         {
00301                 Derived derived = dynamic_cast<Derived>(base);
00302                 assert(derived);
00303                 return derived;
00304         }
00305         
00306         // Streams
00307         
00308         #define RECV_BUFFER_SIZE        4096
00309 
00310         SelectableStream::SelectableStream(const string& protocolName) : 
00311                 Stream(protocolName),
00312                 fd(-1),
00313                 writeOnly(false)
00314         {
00315                 
00316         }
00317         
00318         SelectableStream::~SelectableStream()
00319         {
00320                 // on POSIX, do not close stdin, stdout, nor stderr
00321                 if (fd >= 3)
00322                         close(fd);
00323         }
00324         
00326         class DisconnectableStream: public SelectableStream
00327         {
00328         protected:
00329                 friend class Hub;
00330                 unsigned char recvBuffer[RECV_BUFFER_SIZE];     
00331                 size_t recvBufferPos; 
00332                 size_t recvBufferSize; 
00333         
00334         public:
00336                 DisconnectableStream(const string& protocolName) :
00337                         Stream(protocolName),
00338                         SelectableStream(protocolName),
00339                         recvBufferPos(0),
00340                         recvBufferSize(0)
00341                 {
00342                         
00343                 }
00344                 
00346                 virtual bool isDataInRecvBuffer() const { return recvBufferPos != recvBufferSize; }
00347         };
00348         
00350         class SocketStream: public DisconnectableStream
00351         {
00352         protected:
00353                 #ifndef TCP_CORK
00354 
00355                 enum Consts
00356                 {
00357                         SEND_BUFFER_SIZE_INITIAL = 256, 
00358                         SEND_BUFFER_SIZE_LIMIT = 65536 
00359                 };
00360                 
00361                 ExpandableBuffer sendBuffer;
00362                 #endif
00363                 
00364         public:
00366                 SocketStream(const string& targetName) :
00367                         Stream("tcp"),
00368                         DisconnectableStream("tcp")
00369                         #ifndef TCP_CORK
00370                         ,sendBuffer(SEND_BUFFER_SIZE_INITIAL)
00371                         #endif
00372                 {
00373                         target.add("tcp:host;port;connectionPort=-1;sock=-1");
00374                         target.add(targetName.c_str());
00375 
00376                         fd = target.get<int>("sock");
00377                         if (fd < 0)
00378                         {
00379                                 // create socket
00380                                 fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
00381                                 if (fd < 0)
00382                                         throw DashelException(DashelException::ConnectionFailed, errno, "Cannot create socket.");
00383                                 
00384                                 IPV4Address remoteAddress(target.get("host"), target.get<int>("port"));
00385                                 
00386                                 // connect
00387                                 sockaddr_in addr;
00388                                 addr.sin_family = AF_INET;
00389                                 addr.sin_port = htons(remoteAddress.port);
00390                                 addr.sin_addr.s_addr = htonl(remoteAddress.address);
00391                                 if (connect(fd, (struct sockaddr *)&addr, sizeof(addr)) != 0)
00392                                         throw DashelException(DashelException::ConnectionFailed, errno, "Cannot connect to remote host.");
00393                                 
00394                                 // overwrite target name with a canonical one
00395                                 target.add(remoteAddress.format().c_str());
00396                                 target.erase("connectionPort");
00397                         }
00398                         else
00399                         {
00400                                 // remove file descriptor information from target name
00401                                 target.erase("sock");
00402                         }
00403                         
00404                         // setup TCP Cork for delayed sending
00405                         #ifdef TCP_CORK
00406                         int flag = 1;
00407                         setsockopt(fd, IPPROTO_TCP, TCP_CORK, &flag , sizeof(flag));
00408                         #endif
00409                 }
00410                 
00411                 virtual ~SocketStream()
00412                 {
00413                         if (!failed())
00414                                 flush();
00415                         
00416                         if (fd >= 0)
00417                                 shutdown(fd, SHUT_RDWR);
00418                 }
00419                 
00420                 virtual void write(const void *data, const size_t size)
00421                 {
00422                         assert(fd >= 0);
00423                         
00424                         if (size == 0)
00425                                 return;
00426                         
00427                         #ifdef TCP_CORK
00428                         send(data, size);
00429                         #else
00430                         if (size >= SEND_BUFFER_SIZE_LIMIT)
00431                         {
00432                                 flush();
00433                                 send(data, size);
00434                         }
00435                         else
00436                         {
00437                                 sendBuffer.add(data, size);
00438                                 if (sendBuffer.size() >= SEND_BUFFER_SIZE_LIMIT)
00439                                         flush();
00440                         }
00441                         #endif
00442                 }
00443                 
00445                 void send(const void *data, size_t size)
00446                 {
00447                         assert(fd >= 0);
00448                         
00449                         unsigned char *ptr = (unsigned char *)data;
00450                         size_t left = size;
00451                         
00452                         while (left)
00453                         {
00454 #ifdef MACOSX
00455                                 ssize_t len = ::send(fd, ptr, left, 0);
00456 #else
00457                                 ssize_t len = ::send(fd, ptr, left, MSG_NOSIGNAL);
00458 #endif
00459                                 
00460                                 if (len < 0)
00461                                 {
00462                                         fail(DashelException::IOError, errno, "Socket write I/O error.");
00463                                 }
00464                                 else if (len == 0)
00465                                 {
00466                                         fail(DashelException::ConnectionLost, 0, "Connection lost.");
00467                                 }
00468                                 else
00469                                 {
00470                                         ptr += len;
00471                                         left -= len;
00472                                 }
00473                         }
00474                 }
00475                 
00476                 virtual void flush()
00477                 {
00478                         assert(fd >= 0);
00479                         
00480                         #ifdef TCP_CORK
00481                         int flag = 0;
00482                         setsockopt(fd, IPPROTO_TCP, TCP_CORK, &flag , sizeof(flag));
00483                         flag = 1;
00484                         setsockopt(fd, IPPROTO_TCP, TCP_CORK, &flag , sizeof(flag));
00485                         #else
00486                         send(sendBuffer.get(), sendBuffer.size());
00487                         sendBuffer.clear();
00488                         #endif
00489                 }
00490                 
00491                 virtual void read(void *data, size_t size)
00492                 {
00493                         assert(fd >= 0);
00494                         
00495                         if (size == 0)
00496                                 return;
00497                         
00498                         unsigned char *ptr = (unsigned char *)data;
00499                         size_t left = size;
00500                         
00501                         if (isDataInRecvBuffer())
00502                         {
00503                                 size_t toCopy = std::min(recvBufferSize - recvBufferPos, size);
00504                                 memcpy(ptr, recvBuffer + recvBufferPos, toCopy);
00505                                 recvBufferPos += toCopy;
00506                                 ptr += toCopy;
00507                                 left -= toCopy;
00508                         }
00509                         
00510                         while (left)
00511                         {
00512                                 ssize_t len = recv(fd, ptr, left, 0);
00513                                 
00514                                 if (len < 0)
00515                                 {
00516                                         fail(DashelException::IOError, errno, "Socket read I/O error.");
00517                                 }
00518                                 else if (len == 0)
00519                                 {
00520                                         fail(DashelException::ConnectionLost, 0, "Connection lost.");
00521                                 }
00522                                 else
00523                                 {
00524                                         ptr += len;
00525                                         left -= len;
00526                                 }
00527                         }
00528                 }
00529                 
00530                 virtual bool receiveDataAndCheckDisconnection()
00531                 {
00532                         assert(recvBufferPos == recvBufferSize);
00533                         
00534                         ssize_t len = recv(fd, &recvBuffer, RECV_BUFFER_SIZE, 0);
00535                         if (len > 0)
00536                         {
00537                                 recvBufferSize = len;
00538                                 recvBufferPos = 0;
00539                                 return false;
00540                         }
00541                         else
00542                         {
00543                                 if (len < 0)
00544                                         fail(DashelException::IOError, errno, "Socket read I/O error.");
00545                                 return true;
00546                         }
00547                 }
00548         };
00549         
00551 
00554         class SocketServerStream : public SelectableStream
00555         {
00556         public:
00558                 SocketServerStream(const std::string& targetName) :
00559                         Stream("tcpin"),
00560                         SelectableStream("tcpin")
00561                 {
00562                         target.add("tcpin:port=5000;address=0.0.0.0");
00563                         target.add(targetName.c_str());
00564                         
00565                         IPV4Address bindAddress(target.get("address"), target.get<int>("port"));
00566                         
00567                         // create socket
00568                         fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
00569                         if (fd < 0)
00570                                 throw DashelException(DashelException::ConnectionFailed, errno, "Cannot create socket.");
00571                         
00572                         // reuse address
00573                         int flag = 1;
00574                         if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (flag)) < 0)
00575                                 throw DashelException(DashelException::ConnectionFailed, errno, "Cannot set address reuse flag on socket, probably the port is already in use.");
00576                         
00577                         // bind
00578                         sockaddr_in addr;
00579                         addr.sin_family = AF_INET;
00580                         addr.sin_port = htons(bindAddress.port);
00581                         addr.sin_addr.s_addr = htonl(bindAddress.address);
00582                         if (bind(fd, (struct sockaddr *)&addr, sizeof(addr)) != 0)
00583                                 throw DashelException(DashelException::ConnectionFailed, errno, "Cannot bind socket to port, probably the port is already in use.");
00584                         
00585                         // Listen on socket, backlog is sort of arbitrary.
00586                         if(listen(fd, 16) < 0)
00587                                 throw DashelException(DashelException::ConnectionFailed, errno, "Cannot listen on socket.");
00588                 }
00589                 
00590                 virtual void write(const void *data, const size_t size) { }
00591                 virtual void flush() { }
00592                 virtual void read(void *data, size_t size) { }
00593                 virtual bool receiveDataAndCheckDisconnection() { return false; }
00594                 virtual bool isDataInRecvBuffer() const { return false; }
00595         };
00596         
00598         class UDPSocketStream: public MemoryPacketStream, public SelectableStream
00599         {
00600         private:
00601                 mutable bool selectWasCalled;
00602                 
00603         public:
00605                 UDPSocketStream(const string& targetName) :
00606                         Stream("udp"),
00607                         MemoryPacketStream("udp"),
00608                         SelectableStream("udp"),
00609                         selectWasCalled(false)
00610                 {
00611                         target.add("udp:port=5000;address=0.0.0.0;sock=-1");
00612                         target.add(targetName.c_str());
00613                         
00614                         fd = target.get<int>("sock");
00615                         if (fd < 0)
00616                         {
00617                                 // create socket
00618                                 fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
00619                                 if (fd < 0)
00620                                         throw DashelException(DashelException::ConnectionFailed, errno, "Cannot create socket.");
00621                                 
00622                                 IPV4Address bindAddress(target.get("address"), target.get<int>("port"));
00623                                 
00624                                 // bind
00625                                 sockaddr_in addr;
00626                                 addr.sin_family = AF_INET;
00627                                 addr.sin_port = htons(bindAddress.port);
00628                                 addr.sin_addr.s_addr = htonl(bindAddress.address);
00629                                 if (bind(fd, (struct sockaddr *)&addr, sizeof(addr)) != 0)
00630                                         throw DashelException(DashelException::ConnectionFailed, errno, "Cannot bind socket to port, probably the port is already in use.");
00631                         }
00632                         else
00633                         {
00634                                 // remove file descriptor information from target name
00635                                 target.erase("sock");
00636                         }
00637                         
00638                         // enable broadcast
00639                         int broadcastPermission = 1;
00640                         setsockopt(fd, SOL_SOCKET, SO_BROADCAST, &broadcastPermission, sizeof(broadcastPermission));
00641                 }
00642                 
00643                 virtual void send(const IPV4Address& dest)
00644                 {
00645                         sockaddr_in addr;
00646                         addr.sin_family = AF_INET;
00647                         addr.sin_port = htons(dest.port);;
00648                         addr.sin_addr.s_addr = htonl(dest.address);
00649                         
00650                         if (sendto(fd, sendBuffer.get(), sendBuffer.size(), 0, (struct sockaddr *)&addr, sizeof(addr)) != sendBuffer.size())
00651                                 fail(DashelException::IOError, errno, "UDP Socket write I/O error.");
00652                         
00653                         sendBuffer.clear();
00654                 }
00655                 
00656                 virtual void receive(IPV4Address& source)
00657                 {
00658                         unsigned char buf[4096];
00659                         sockaddr_in addr;
00660                         socklen_t addrLen = sizeof(addr);
00661                         ssize_t recvCount = recvfrom(fd, buf, 4096, 0, (struct sockaddr *)&addr, &addrLen);
00662                         if (recvCount <= 0)
00663                                 fail(DashelException::ConnectionLost, errno, "UDP Socket read I/O error.");
00664                         
00665                         receptionBuffer.resize(recvCount);
00666                         std::copy(buf, buf+recvCount, receptionBuffer.begin());
00667                         
00668                         source = IPV4Address(ntohl(addr.sin_addr.s_addr), ntohs(addr.sin_port));
00669                 }
00670                 
00671                 virtual bool receiveDataAndCheckDisconnection() { selectWasCalled = true; return false; }
00672                 virtual bool isDataInRecvBuffer() const { bool ret = selectWasCalled; selectWasCalled = false; return ret; }
00673         };
00674         
00675         
00677         class FileDescriptorStream: public DisconnectableStream
00678         {
00679         public:
00681                 FileDescriptorStream(const string& protocolName) :
00682                         Stream(protocolName),
00683                         DisconnectableStream(protocolName)
00684                 { }
00685                 
00686                 virtual void write(const void *data, const size_t size)
00687                 {
00688                         assert(fd >= 0);
00689                         
00690                         if (size == 0)
00691                                 return;
00692                         
00693                         const char *ptr = (const char *)data;
00694                         size_t left = size;
00695                         
00696                         while (left)
00697                         {
00698                                 ssize_t len = ::write(fd, ptr, left);
00699                                 
00700                                 if (len < 0)
00701                                 {
00702                                         fail(DashelException::IOError, errno, "File write I/O error.");
00703                                 }
00704                                 else if (len == 0)
00705                                 {
00706                                         fail(DashelException::ConnectionLost, 0, "File full.");
00707                                 }
00708                                 else
00709                                 {
00710                                         ptr += len;
00711                                         left -= len;
00712                                 }
00713                         }
00714                 }
00715                 
00716                 virtual void flush()
00717                 {
00718                         assert(fd >= 0);
00719                         
00720 #ifdef MACOSX
00721                         if (fsync(fd) < 0)
00722 #else
00723                         if (fdatasync(fd) < 0)
00724 #endif
00725                         {
00726                                 fail(DashelException::IOError, errno, "File flush error.");
00727                         }
00728                 }
00729                 
00730                 virtual void read(void *data, size_t size)
00731                 {
00732                         assert(fd >= 0);
00733                         
00734                         if (size == 0)
00735                                 return;
00736                         
00737                         char *ptr = (char *)data;
00738                         size_t left = size;
00739                         
00740                         if (isDataInRecvBuffer())
00741                         {
00742                                 size_t toCopy = std::min(recvBufferSize - recvBufferPos, size);
00743                                 memcpy(ptr, recvBuffer + recvBufferPos, toCopy);
00744                                 recvBufferPos += toCopy;
00745                                 ptr += toCopy;
00746                                 left -= toCopy;
00747                         }
00748                         
00749                         while (left)
00750                         {
00751                                 ssize_t len = ::read(fd, ptr, left);
00752                                 
00753                                 if (len < 0)
00754                                 {
00755                                         fail(DashelException::IOError, errno, "File read I/O error.");
00756                                 }
00757                                 else if (len == 0)
00758                                 {
00759                                         fail(DashelException::ConnectionLost, 0, "Reached end of file.");
00760                                 }
00761                                 else
00762                                 {
00763                                         ptr += len;
00764                                         left -= len;
00765                                 }
00766                         }
00767                 }
00768                 
00769                 virtual bool receiveDataAndCheckDisconnection()
00770                 {
00771                         assert(recvBufferPos == recvBufferSize);
00772                         
00773                         ssize_t len = ::read(fd, &recvBuffer, RECV_BUFFER_SIZE);
00774                         if (len > 0)
00775                         {
00776                                 recvBufferSize = len;
00777                                 recvBufferPos = 0;
00778                                 return false;
00779                         }
00780                         else
00781                         {
00782                                 if (len < 0)
00783                                         fail(DashelException::IOError, errno, "File read I/O error.");
00784                                 return true;
00785                         }
00786                 }
00787         };
00788         
00790         class FileStream: public FileDescriptorStream
00791         {
00792         public:
00794                 FileStream(const string& targetName) :
00795                         Stream("file"),
00796                         FileDescriptorStream("file")
00797                 {
00798                         target.add("file:name;mode=read;fd=-1");
00799                         target.add(targetName.c_str());
00800                         fd = target.get<int>("fd");
00801                         if (fd < 0)
00802                         {
00803                                 const std::string name = target.get("name");
00804                                 const std::string mode = target.get("mode");
00805                                 
00806                                 // open file
00807                                 if (mode == "read")
00808                                         fd = open(name.c_str(), O_RDONLY);
00809                                 else if (mode == "write")
00810                                         fd = creat(name.c_str(), S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP), writeOnly = true;
00811                                 else if (mode == "readwrite")
00812                                         fd = open(name.c_str(), O_RDWR);
00813                                 else
00814                                         throw DashelException(DashelException::InvalidTarget, 0, "Invalid file mode.");
00815                                 if (fd == -1)
00816                                 {
00817                                         string errorMessage = "Cannot open file " + name + " for " + mode + ".";
00818                                         throw DashelException(DashelException::ConnectionFailed, errno, errorMessage.c_str());
00819                                 }
00820                         }
00821                         else
00822                         {
00823                                 // remove file descriptor information from target name
00824                                 target.erase("fd");
00825                         }
00826                 }
00827         };
00828         
00830         struct StdinStream: public FileStream
00831         {
00832                 StdinStream(const string& targetName):
00833                         Stream("file"),
00834                         FileStream("file:name=/dev/stdin;mode=read;fd=0") {}
00835         };
00836         
00838         struct StdoutStream: public FileStream
00839         {
00840                 StdoutStream(const string& targetName):
00841                         Stream("file"),
00842                         FileStream("file:name=/dev/stdout;mode=write;fd=1") {}
00843         };
00844         
00846         class SerialStream: public FileDescriptorStream
00847         {
00848         protected:
00849                 struct termios oldtio;  
00850                 
00851         public:
00853                 SerialStream(const string& targetName) :
00854                         Stream("ser"),
00855                         FileDescriptorStream("ser")
00856                 {
00857                         target.add("ser:port=1;baud=115200;stop=1;parity=none;fc=none;bits=8");
00858                         target.add(targetName.c_str());
00859                         string devFileName;
00860                         
00861                         if (target.isSet("device"))
00862                         {
00863                                 target.addParam("device", NULL, true);
00864                                 target.erase("port");
00865                                 target.erase("name");
00866                                 
00867                                 devFileName = target.get("device");
00868                         }
00869                         else if (target.isSet("name"))
00870                         {
00871                                 target.addParam("name", NULL, true);
00872                                 target.erase("port");
00873                                 target.erase("device");
00874 
00875                                 // Enumerates the ports
00876                                 std::string name = target.get("name");
00877                                 std::map<int, std::pair<std::string, std:: string> > ports = SerialPortEnumerator::getPorts();
00878 
00879                                 // Iterate on all ports to found one with "name" in its description
00880                                 std::map<int, std::pair<std::string, std:: string> >::iterator it;
00881                                 for (it = ports.begin(); it != ports.end(); it++)
00882                                 {
00883                                         if (it->second.second.find(name) != std::string::npos)
00884                                         {
00885                                                 devFileName = it->second.first;
00886                                                 std::cout << "Found " << name << " on port " << devFileName << std::endl;
00887                                                 break;
00888                                         }
00889                                 }
00890                                 if (devFileName.size() == 0)
00891                                         throw DashelException(DashelException::ConnectionFailed, 0, "The specified name could not be find among the serial ports.");
00892                         }
00893                         else // port
00894                         {
00895                                 target.erase("device");
00896                                 target.erase("name");
00897                                 
00898                                 std::map<int, std::pair<std::string, std::string> > ports = SerialPortEnumerator::getPorts();
00899                                 std::map<int, std::pair<std::string, std::string> >::const_iterator it = ports.find(target.get<int>("port"));
00900                                 if (it != ports.end())
00901                                 {
00902                                         devFileName = it->first;
00903                                 }
00904                                 else
00905                                         throw DashelException(DashelException::ConnectionFailed, 0, "The specified serial port does not exists.");
00906                         }
00907                 
00908                         fd = open(devFileName.c_str(), O_RDWR);
00909                         
00910                         if (fd == -1)
00911                                 throw DashelException(DashelException::ConnectionFailed, 0, "Cannot open serial port.");
00912                         
00913                         struct termios newtio;
00914                         
00915                         // save old serial port state and clear new one
00916                         tcgetattr(fd, &oldtio);
00917                         memset(&newtio, 0, sizeof(newtio));
00918                         
00919                         newtio.c_cflag |= CLOCAL;                       // ignore modem control lines.
00920                         newtio.c_cflag |= CREAD;                        // enable receiver.
00921                         switch (target.get<int>("bits"))                // Set amount of bits per character
00922                         {
00923                                 case 5: newtio.c_cflag |= CS5; break;
00924                                 case 6: newtio.c_cflag |= CS6; break;
00925                                 case 7: newtio.c_cflag |= CS7; break;
00926                                 case 8: newtio.c_cflag |= CS8; break;
00927                                 default: throw DashelException(DashelException::InvalidTarget, 0, "Invalid number of bits per character, must be 5, 6, 7, or 8.");
00928                         }
00929                         if (target.get("stop") == "2")
00930                                 newtio.c_cflag |= CSTOPB;               // Set two stop bits, rather than one.
00931                         if (target.get("fc") == "hard")
00932                                 newtio.c_cflag |= CRTSCTS;              // enable hardware flow control
00933                         if (target.get("parity") != "none")
00934                         {
00935                                 newtio.c_cflag |= PARENB;               // enable parity generation on output and parity checking for input.
00936                                 if (target.get("parity") == "odd")
00937                                         newtio.c_cflag |= PARODD;       // parity for input and output is odd.
00938                         }
00939                         
00940 #ifdef MACOSX
00941                         if (cfsetspeed(&newtio,target.get<int>("baud")) != 0)
00942                                 throw DashelException(DashelException::ConnectionFailed, errno, "Invalid baud rate.");
00943 #else
00944                         switch (target.get<int>("baud"))
00945                         {
00946                                 case 50: newtio.c_cflag |= B50; break;
00947                                 case 75: newtio.c_cflag |= B75; break;
00948                                 case 110: newtio.c_cflag |= B110; break;
00949                                 case 134: newtio.c_cflag |= B134; break;
00950                                 case 150: newtio.c_cflag |= B150; break;
00951                                 case 200: newtio.c_cflag |= B200; break;
00952                                 case 300: newtio.c_cflag |= B300; break;
00953                                 case 600: newtio.c_cflag |= B600; break;
00954                                 case 1200: newtio.c_cflag |= B1200; break;
00955                                 case 1800: newtio.c_cflag |= B1800; break;
00956                                 case 2400: newtio.c_cflag |= B2400; break;
00957                                 case 4800: newtio.c_cflag |= B4800; break;
00958                                 case 9600: newtio.c_cflag |= B9600; break;
00959                                 case 19200: newtio.c_cflag |= B19200; break;
00960                                 case 38400: newtio.c_cflag |= B38400; break;
00961                                 case 57600: newtio.c_cflag |= B57600; break;
00962                                 case 115200: newtio.c_cflag |= B115200; break;
00963                                 case 230400: newtio.c_cflag |= B230400; break;
00964                                 case 460800: newtio.c_cflag |= B460800; break;
00965                                 case 500000: newtio.c_cflag |= B500000; break;
00966                                 case 576000: newtio.c_cflag |= B576000; break;
00967                                 case 921600: newtio.c_cflag |= B921600; break;
00968                                 case 1000000: newtio.c_cflag |= B1000000; break;
00969                                 case 1152000: newtio.c_cflag |= B1152000; break;
00970                                 case 1500000: newtio.c_cflag |= B1500000; break;
00971                                 case 2000000: newtio.c_cflag |= B2000000; break;
00972                                 case 2500000: newtio.c_cflag |= B2500000; break;
00973                                 case 3000000: newtio.c_cflag |= B3000000; break;
00974                                 case 3500000: newtio.c_cflag |= B3500000; break;
00975                                 case 4000000: newtio.c_cflag |= B4000000; break;
00976                                 default: throw DashelException(DashelException::ConnectionFailed, 0, "Invalid baud rate.");
00977                         }
00978 #endif
00979                         
00980                         newtio.c_iflag = IGNPAR;                        // ignore parity on input
00981                         
00982                         newtio.c_oflag = 0;
00983                         
00984                         newtio.c_lflag = 0;
00985                         
00986                         newtio.c_cc[VTIME] = 0;                         // block forever if no byte
00987                         newtio.c_cc[VMIN] = 1;                          // one byte is sufficient to return
00988                         
00989                         // set attributes
00990                         if ((tcflush(fd, TCIOFLUSH) < 0) || (tcsetattr(fd, TCSANOW, &newtio) < 0))
00991                                 throw DashelException(DashelException::ConnectionFailed, 0, "Cannot setup serial port. The requested baud rate might not be supported.");
00992                 }
00993                 
00995                 virtual ~SerialStream()
00996                 {
00997                          tcsetattr(fd, TCSANOW, &oldtio);
00998                 }
00999                 
01000                 virtual void flush()
01001                 {
01002                 }
01003         };
01004         
01005         
01006         /*
01007         We have decided to let the application choose what to do with signals.
01008         Hub::stop() being thread safe, it is ok to let the user decide.
01009         
01010         // Signal handler for SIGTERM
01011         
01012         typedef std::set<Hub*> HubsSet;
01013         typedef HubsSet::iterator HubsSetIterator;
01014         
01016         static HubsSet allHubs;
01017         
01019         void termHandler(int t)
01020         {
01021                 for (HubsSetIterator it = allHubs.begin(); it != allHubs.end(); ++it)
01022                 {
01023                         (*it)->stop();
01024                 }
01025         }
01026         
01027         
01029         static class SigTermHandlerSetuper
01030         {
01031         public:
01033                 SigTermHandlerSetuper()
01034                 {
01035                         struct sigaction new_act, old_act;
01036                         new_act.sa_handler = termHandler;
01037                         sigemptyset(&new_act.sa_mask);
01038 
01039                         new_act.sa_flags = 0;
01040 
01041                         sigaction(SIGTERM, &new_act, &old_act);
01042                         sigaction(SIGINT, &new_act, &old_act);
01043                 }
01044         } staticSigTermHandlerSetuper;
01045         */
01046         
01047         // Hub
01048         
01049         Hub::Hub(const bool resolveIncomingNames):
01050                 resolveIncomingNames(resolveIncomingNames)
01051         {
01052                 int *terminationPipes = new int[2];
01053                 if (pipe(terminationPipes) != 0)
01054                         abort();
01055                 hTerminate = terminationPipes;
01056                 
01057                 streamsLock = new pthread_mutex_t;
01058                 
01059                 pthread_mutex_init((pthread_mutex_t*)streamsLock, NULL);
01060 
01061                 // commented because we let the users manage the signal themselves
01062                 //allHubs.insert(this);
01063         }
01064         
01065         Hub::~Hub()
01066         {
01067                 // commented because we let the users manage the signal themselves
01068                 //allHubs.erase(this);
01069                 
01070                 int *terminationPipes = (int*)hTerminate;
01071                 close(terminationPipes[0]);
01072                 close(terminationPipes[1]);
01073                 delete[] terminationPipes;
01074                 
01075                 for (StreamsSet::iterator it = streams.begin(); it != streams.end(); ++it)
01076                         delete *it;
01077                 
01078                 pthread_mutex_destroy((pthread_mutex_t*)streamsLock);
01079                 
01080                 delete (pthread_mutex_t*) streamsLock;
01081         }
01082         
01083         Stream* Hub::connect(const std::string &target)
01084         {
01085                 std::string proto, params;
01086                 size_t c = target.find_first_of(':');
01087                 if (c == std::string::npos)
01088                         throw DashelException(DashelException::InvalidTarget, 0, "No protocol specified in target.");
01089                 proto = target.substr(0, c);
01090                 params = target.substr(c+1);
01091                 
01092                 SelectableStream *s(dynamic_cast<SelectableStream*>(streamTypeRegistry.create(proto, target, *this)));
01093                 if(!s)
01094                 {
01095                         std::string r = "Invalid protocol in target: ";
01096                         r += proto;
01097                         r += ", known protocol are: ";
01098                         r += streamTypeRegistry.list();
01099                         throw DashelException(DashelException::InvalidTarget, 0, r.c_str());
01100                 }
01101                 
01102                 /* The caller must have the stream lock held */
01103                 
01104                 streams.insert(s);
01105                 if (proto != "tcpin")
01106                 {
01107                         dataStreams.insert(s);
01108                         connectionCreated(s);
01109                 }
01110                 
01111                 return s;
01112         }
01113         
01114         void Hub::run()
01115         {
01116                 while (step(-1));
01117         }
01118         
01119         bool Hub::step(const int timeout)
01120         {
01121                 bool firstPoll = true;
01122                 bool wasActivity = false;
01123                 bool runInterrupted = false;
01124                 
01125                 pthread_mutex_lock((pthread_mutex_t*)streamsLock);
01126                 
01127                 do
01128                 {
01129                         wasActivity = false;
01130                         size_t streamsCount = streams.size();
01131                         valarray<struct pollfd> pollFdsArray(streamsCount+1);
01132                         valarray<SelectableStream*> streamsArray(streamsCount);
01133                         
01134                         // add streams
01135                         size_t i = 0;
01136                         for (StreamsSet::iterator it = streams.begin(); it != streams.end(); ++it)
01137                         {
01138                                 SelectableStream* stream = polymorphic_downcast<SelectableStream*>(*it);
01139                                 
01140                                 streamsArray[i] = stream;
01141                                 pollFdsArray[i].fd = stream->fd;
01142                                 pollFdsArray[i].events = 0;
01143                                 if ((!stream->failed()) && (!stream->writeOnly))
01144                                         pollFdsArray[i].events |= POLLIN;
01145                                 
01146                                 i++;
01147                         }
01148                         // add pipe
01149                         int *terminationPipes = (int*)hTerminate;
01150                         pollFdsArray[i].fd = terminationPipes[0];
01151                         pollFdsArray[i].events = POLLIN;
01152                         
01153                         // do poll and check for error
01154                         int thisPollTimeout = firstPoll ? timeout : 0;
01155                         firstPoll = false;
01156                         
01157                         pthread_mutex_unlock((pthread_mutex_t*)streamsLock);
01158                         
01159                         #ifndef USE_POLL_EMU
01160                         int ret = poll(&pollFdsArray[0], pollFdsArray.size(), thisPollTimeout);
01161                         #else
01162                         int ret = poll_emu(&pollFdsArray[0], pollFdsArray.size(), thisPollTimeout);
01163                         #endif
01164                         if (ret < 0)
01165                                 throw DashelException(DashelException::SyncError, errno, "Error during poll.");
01166                         
01167                         pthread_mutex_lock((pthread_mutex_t*)streamsLock);
01168                         
01169                         // check streams for errors
01170                         for (i = 0; i < streamsCount; i++)
01171                         {
01172                                 SelectableStream* stream = streamsArray[i];
01173                                 
01174                                 // make sure we do not try to handle removed streams
01175                                 if (streams.find(stream) == streams.end())
01176                                         continue;
01177                                 
01178                                 assert((pollFdsArray[i].revents & POLLNVAL) == 0);
01179                                 
01180                                 if (pollFdsArray[i].revents & POLLERR)
01181                                 {
01182                                         //std::cerr << "POLLERR" << std::endl;
01183                                         wasActivity = true;
01184                                         
01185                                         try
01186                                         {
01187                                                 stream->fail(DashelException::SyncError, 0, "Error on stream during poll.");
01188                                         }
01189                                         catch (DashelException e)
01190                                         {
01191                                                 assert(e.stream);
01192                                         }
01193                                         
01194                                         try
01195                                         {
01196                                                 connectionClosed(stream, true);
01197                                         }
01198                                         catch (DashelException e)
01199                                         {
01200                                                 assert(e.stream);
01201                                         }
01202                                         
01203                                         closeStream(stream);
01204                                 }
01205                                 else if (pollFdsArray[i].revents & POLLHUP)
01206                                 {
01207                                         //std::cerr << "POLLHUP" << std::endl;
01208                                         wasActivity = true;
01209                                         
01210                                         try
01211                                         {
01212                                                 connectionClosed(stream, false);
01213                                         }
01214                                         catch (DashelException e)
01215                                         {
01216                                                 assert(e.stream);
01217                                         }
01218                                         
01219                                         closeStream(stream);
01220                                 }
01221                                 else if (pollFdsArray[i].revents & POLLIN)
01222                                 {
01223                                         //std::cerr << "POLLIN" << std::endl;
01224                                         wasActivity = true;
01225                                         
01226                                         // test if listen stream
01227                                         SocketServerStream* serverStream = dynamic_cast<SocketServerStream*>(stream);
01228                                         
01229                                         if (serverStream)
01230                                         {       
01231                                                 // accept connection
01232                                                 struct sockaddr_in targetAddr;
01233                                                 socklen_t l = sizeof (targetAddr);
01234                                                 int targetFD = accept (stream->fd, (struct sockaddr *)&targetAddr, &l);
01235                                                 if (targetFD < 0)
01236                                                 {
01237                                                         pthread_mutex_unlock((pthread_mutex_t*)streamsLock);
01238                                                         throw DashelException(DashelException::SyncError, errno, "Cannot accept new stream.");
01239                                                 }
01240                                                 
01241                                                 // create a target stream using the new file descriptor from accept
01242                                                 ostringstream targetName;
01243                                                 targetName << IPV4Address(ntohl(targetAddr.sin_addr.s_addr), ntohs(targetAddr.sin_port)).format(resolveIncomingNames);
01244                                                 targetName << ";connectionPort=";
01245                                                 targetName << atoi(serverStream->getTargetParameter("port").c_str());
01246                                                 targetName << ";sock=";
01247                                                 targetName << targetFD;
01248                                                 connect(targetName.str());
01249                                         }
01250                                         else
01251                                         {
01252                                                 bool streamClosed = false;
01253                                                 try
01254                                                 {
01255                                                         if (stream->receiveDataAndCheckDisconnection())
01256                                                         {
01257                                                                 //std::cerr << "connection closed" << std::endl;
01258                                                                 connectionClosed(stream, false);
01259                                                                 streamClosed = true;
01260                                                         }
01261                                                         else
01262                                                         {
01263                                                                 // read all data available on this socket
01264                                                                 while (stream->isDataInRecvBuffer())
01265                                                                         incomingData(stream);
01266                                                                 //std::cerr << "incoming data" << std::endl;
01267                                                         }
01268                                                 }
01269                                                 catch (DashelException e)
01270                                                 {
01271                                                         //std::cerr << "exception on POLLIN" << std::endl;
01272                                                         assert(e.stream);
01273                                                 }
01274                                                 
01275                                                 if (streamClosed)
01276                                                         closeStream(stream);
01277                                         }
01278                                 }
01279                         }
01280                         // check pipe for termination
01281                         if (pollFdsArray[i].revents)
01282                         {
01283                                 char c;
01284                                 const ssize_t ret = read(pollFdsArray[i].fd, &c, 1);
01285                                 if (ret != 1)
01286                                         abort(); // poll did notify us that there was something to read, but we did not read anything, this is a bug
01287                                 runInterrupted = true;
01288                         }
01289                         
01290                         // collect and remove all failed streams
01291                         vector<Stream*> failedStreams;
01292                         for (StreamsSet::iterator it = streams.begin(); it != streams.end();++it)
01293                                 if ((*it)->failed())
01294                                         failedStreams.push_back(*it);
01295                         
01296                         for (size_t i = 0; i < failedStreams.size(); i++)
01297                         {
01298                                 Stream* stream = failedStreams[i];
01299                                 if (streams.find(stream) == streams.end())
01300                                         continue;
01301                                 if (stream->failed())
01302                                 {
01303                                         try
01304                                         {
01305                                                 connectionClosed(stream, true);
01306                                         }
01307                                         catch (DashelException e)
01308                                         {
01309                                                 assert(e.stream);
01310                                         }
01311                                         closeStream(stream);
01312                                 }
01313                         }
01314                 }
01315                 while (wasActivity && !runInterrupted);
01316                 
01317                 pthread_mutex_unlock((pthread_mutex_t*)streamsLock);
01318                 
01319                 return !runInterrupted;
01320         }
01321         
01322         void Hub::lock()
01323         {
01324                 pthread_mutex_lock((pthread_mutex_t*)streamsLock);
01325         }
01326         
01327         void Hub::unlock()
01328         {
01329                 pthread_mutex_unlock((pthread_mutex_t*)streamsLock);
01330         }
01331         
01332         void Hub::stop()
01333         {
01334                 int *terminationPipes = (int*)hTerminate;
01335                 char c = 0;
01336                 const ssize_t ret = write(terminationPipes[1], &c, 1);
01337                 if (ret != 1)
01338                         throw DashelException(DashelException::IOError, ret, "Cannot write to termination pipe.");
01339         }
01340         
01341         StreamTypeRegistry::StreamTypeRegistry()
01342         {
01343                 reg("file", &createInstance<FileStream>);
01344                 reg("stdin", &createInstance<StdinStream>);
01345                 reg("stdout", &createInstance<StdoutStream>);
01346                 reg("ser", &createInstance<SerialStream>);
01347                 reg("tcpin", &createInstance<SocketServerStream>);
01348                 reg("tcp", &createInstance<SocketStream>);
01349                 reg("udp", &createInstance<UDPSocketStream>);
01350         }
01351         
01352         StreamTypeRegistry __attribute__((init_priority(1000))) streamTypeRegistry;
01353 }


dashel
Author(s): Stéphane Magnenat
autogenerated on Thu Jan 2 2014 11:17:11