00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041 #include <string.h>
00042 #include <cassert>
00043 #include <cstdlib>
00044 #include <map>
00045 #include <vector>
00046 #include <valarray>
00047 #include <algorithm>
00048 #include <iostream>
00049 #include <sstream>
00050 #include <signal.h>
00051 #include <errno.h>
00052 #include <unistd.h>
00053 #include <termios.h>
00054 #include <fcntl.h>
00055 #include <netdb.h>
00056 #include <signal.h>
00057 #include <sys/select.h>
00058 #include <sys/time.h>
00059 #include <sys/types.h>
00060 #include <sys/stat.h>
00061 #include <pthread.h>
00062
00063 #ifdef __APPLE__
00064 #define MACOSX
00065 #endif
00066
00067 #ifdef MACOSX
00068 #define USE_POLL_EMU
00069 #endif
00070
00071 #ifdef MACOSX
00072 #include <CoreFoundation/CoreFoundation.h>
00073 #include <IOKit/IOKitLib.h>
00074 #include <IOKit/serial/IOSerialKeys.h>
00075 #endif
00076
00077 #ifdef USE_LIBUDEV
00078 extern "C" {
00079 #include <libudev.h>
00080 }
00081 #endif
00082
00083 #ifdef USE_HAL
00084 #include <hal/libhal.h>
00085 #endif
00086
00087 #ifndef USE_POLL_EMU
00088 #include <poll.h>
00089 #else
00090 #include "poll_emu.h"
00091 #endif
00092
00093 #include "dashel-private.h"
00094 #include "dashel-posix.h"
00095
00096
00097
00102 namespace Dashel
00103 {
00104 using namespace std;
00105
00106
00107
00108 void Stream::fail(DashelException::Source s, int se, const char* reason)
00109 {
00110 string sysMessage;
00111 failedFlag = true;
00112
00113 if (se)
00114 sysMessage = strerror(errno);
00115
00116 failReason = reason;
00117 failReason += " ";
00118 failReason += sysMessage;
00119
00120 throw DashelException(s, se, failReason.c_str(), this);
00121 }
00122
00123
00124
00125 std::map<int, std::pair<std::string, std::string> > SerialPortEnumerator::getPorts()
00126 {
00127 std::map<int, std::pair<std::string, std::string> > ports;
00128
00129
00130
00131 #ifdef MACOSX
00132
00133
00134
00135 CFMutableDictionaryRef classesToMatch = IOServiceMatching(kIOSerialBSDServiceValue);
00136 if (classesToMatch == NULL)
00137 throw DashelException(DashelException::EnumerationError, 0, "IOServiceMatching returned a NULL dictionary");
00138
00139
00140 CFDictionarySetValue(classesToMatch, CFSTR(kIOSerialBSDTypeKey), CFSTR(kIOSerialBSDAllTypes));
00141
00142
00143 io_iterator_t matchingServices;
00144 kern_return_t kernResult = IOServiceGetMatchingServices(kIOMasterPortDefault, classesToMatch, &matchingServices);
00145 if (KERN_SUCCESS != kernResult)
00146 throw DashelException(DashelException::EnumerationError, kernResult, "IOServiceGetMatchingServices failed");
00147
00148
00149 io_object_t modemService;
00150 int index = 0;
00151 while((modemService = IOIteratorNext(matchingServices)))
00152 {
00153
00154 CFTypeRef bsdPathAsCFString = IORegistryEntryCreateCFProperty(modemService, CFSTR(kIOCalloutDeviceKey), kCFAllocatorDefault, 0);
00155 if (bsdPathAsCFString)
00156 {
00157 std::string path;
00158 char cStr[255];
00159 std::string name;
00160
00161 bool res = CFStringGetCString((CFStringRef) bsdPathAsCFString, cStr, 255, kCFStringEncodingUTF8);
00162 if(res)
00163 path = cStr;
00164 else
00165 throw DashelException(DashelException::EnumerationError, 0, "CFStringGetCString failed");
00166
00167 CFRelease(bsdPathAsCFString);
00168
00169 CFTypeRef fn = IORegistryEntrySearchCFProperty(modemService, kIOServicePlane, CFSTR("USB Product Name"), kCFAllocatorDefault,
00170 kIORegistryIterateRecursively | kIORegistryIterateParents);
00171 if(fn) {
00172 res = CFStringGetCString((CFStringRef) fn, cStr, 255, kCFStringEncodingUTF8);
00173 if(res)
00174 name = cStr;
00175 else
00176 throw DashelException(DashelException::EnumerationError, 0, "CFStringGetString failed");
00177
00178 CFRelease(fn);
00179 } else
00180 name = "Serial Port";
00181 name = name + " (" + path + ")";
00182 ports[index++] = std::make_pair<std::string, std::string>(path, name);
00183 }
00184 else
00185 throw DashelException(DashelException::EnumerationError, 0, "IORegistryEntryCreateCFProperty returned a NULL path");
00186
00187
00188 IOObjectRelease(modemService);
00189 }
00190
00191 IOObjectRelease(matchingServices);
00192
00193
00194 #elif defined(USE_LIBUDEV)
00195
00196 struct udev *udev;
00197 struct udev_enumerate *enumerate;
00198 struct udev_list_entry *devices, *dev_list_entry;
00199 struct udev_device *dev;
00200 int index = 0;
00201
00202 udev = udev_new();
00203 if(!udev)
00204 throw DashelException(DashelException::EnumerationError, 0, "Cannot create udev context");
00205
00206 enumerate = udev_enumerate_new(udev);
00207 udev_enumerate_add_match_subsystem(enumerate, "tty");
00208 udev_enumerate_scan_devices(enumerate);
00209 devices = udev_enumerate_get_list_entry(enumerate);
00210
00211 udev_list_entry_foreach(dev_list_entry, devices) {
00212 const char *sysfs_path;
00213 struct udev_device *usb_dev;
00214 const char * path;
00215 struct stat st;
00216 unsigned int maj,min;
00217
00218
00219 sysfs_path = udev_list_entry_get_name(dev_list_entry);
00220 dev = udev_device_new_from_syspath(udev, sysfs_path);
00221
00222
00223 path = udev_device_get_devnode(dev);
00224 if(stat(path, &st))
00225 throw DashelException(DashelException::EnumerationError, 0, "Cannot stat serial port");
00226
00227 if(!S_ISCHR(st.st_mode))
00228 throw DashelException(DashelException::EnumerationError, 0, "Serial port is not character device");
00229
00230
00231 maj = major(st.st_rdev);
00232 min = minor(st.st_rdev);
00233
00234
00235 if(!(maj == 2 || (maj == 4 && min < 64) || maj == 3 || maj == 5)) {
00236 ostringstream oss;
00237
00238
00239 usb_dev = udev_device_get_parent_with_subsystem_devtype(dev,"usb","usb_device");
00240 if(usb_dev)
00241 oss << udev_device_get_sysattr_value(usb_dev,"product");
00242 else
00243 oss << "Serial Port";
00244
00245 oss << " (" << path << ")";
00246
00247 ports[index++] = std::make_pair<std::string, std::string>(path,oss.str());
00248 }
00249 udev_device_unref(dev);
00250 }
00251
00252 udev_enumerate_unref(enumerate);
00253
00254 udev_unref(udev);
00255
00256 #elif defined(USE_HAL)
00257
00258
00259 DBusConnection* dbusConnection = dbus_bus_get(DBUS_BUS_SYSTEM, 0);
00260 if (!dbusConnection)
00261 throw DashelException(DashelException::EnumerationError, 0, "cannot connect to D-BUS.");
00262
00263 LibHalContext* halContext = libhal_ctx_new();
00264 if (!halContext)
00265 throw DashelException(DashelException::EnumerationError, 0, "cannot create HAL context: cannot create context");
00266 if (!libhal_ctx_set_dbus_connection(halContext, dbusConnection))
00267 throw DashelException(DashelException::EnumerationError, 0, "cannot create HAL context: cannot connect to D-BUS");
00268 if (!libhal_ctx_init(halContext, 0))
00269 throw DashelException(DashelException::EnumerationError, 0, "cannot create HAL context: cannot init context");
00270
00271 int devicesCount;
00272 char** devices = libhal_find_device_by_capability(halContext, "serial", &devicesCount, 0);
00273 for (int i = 0; i < devicesCount; i++)
00274 {
00275 char* devFileName = libhal_device_get_property_string(halContext, devices[i], "serial.device", 0);
00276 char* info = libhal_device_get_property_string(halContext, devices[i], "info.product", 0);
00277 int port = libhal_device_get_property_int(halContext, devices[i], "serial.port", 0);
00278
00279 ostringstream oss;
00280 oss << info << " " << port;
00281 ports[devicesCount - i] = std::make_pair<std::string, std::string>(devFileName, oss.str());
00282
00283 libhal_free_string(info);
00284 libhal_free_string(devFileName);
00285 }
00286
00287 libhal_free_string_array(devices);
00288 libhal_ctx_shutdown(halContext, 0);
00289 libhal_ctx_free(halContext);
00290 #endif
00291
00292 return ports;
00293 };
00294
00295
00296
00298 template<typename Derived, typename Base>
00299 inline Derived polymorphic_downcast(Base base)
00300 {
00301 Derived derived = dynamic_cast<Derived>(base);
00302 assert(derived);
00303 return derived;
00304 }
00305
00306
00307
00308 #define RECV_BUFFER_SIZE 4096
00309
00310 SelectableStream::SelectableStream(const string& protocolName) :
00311 Stream(protocolName),
00312 fd(-1),
00313 writeOnly(false)
00314 {
00315
00316 }
00317
00318 SelectableStream::~SelectableStream()
00319 {
00320
00321 if (fd >= 3)
00322 close(fd);
00323 }
00324
00326 class DisconnectableStream: public SelectableStream
00327 {
00328 protected:
00329 friend class Hub;
00330 unsigned char recvBuffer[RECV_BUFFER_SIZE];
00331 size_t recvBufferPos;
00332 size_t recvBufferSize;
00333
00334 public:
00336 DisconnectableStream(const string& protocolName) :
00337 Stream(protocolName),
00338 SelectableStream(protocolName),
00339 recvBufferPos(0),
00340 recvBufferSize(0)
00341 {
00342
00343 }
00344
00346 virtual bool isDataInRecvBuffer() const { return recvBufferPos != recvBufferSize; }
00347 };
00348
00350 class SocketStream: public DisconnectableStream
00351 {
00352 protected:
00353 #ifndef TCP_CORK
00354
00355 enum Consts
00356 {
00357 SEND_BUFFER_SIZE_INITIAL = 256,
00358 SEND_BUFFER_SIZE_LIMIT = 65536
00359 };
00360
00361 ExpandableBuffer sendBuffer;
00362 #endif
00363
00364 public:
00366 SocketStream(const string& targetName) :
00367 Stream("tcp"),
00368 DisconnectableStream("tcp")
00369 #ifndef TCP_CORK
00370 ,sendBuffer(SEND_BUFFER_SIZE_INITIAL)
00371 #endif
00372 {
00373 target.add("tcp:host;port;connectionPort=-1;sock=-1");
00374 target.add(targetName.c_str());
00375
00376 fd = target.get<int>("sock");
00377 if (fd < 0)
00378 {
00379
00380 fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
00381 if (fd < 0)
00382 throw DashelException(DashelException::ConnectionFailed, errno, "Cannot create socket.");
00383
00384 IPV4Address remoteAddress(target.get("host"), target.get<int>("port"));
00385
00386
00387 sockaddr_in addr;
00388 addr.sin_family = AF_INET;
00389 addr.sin_port = htons(remoteAddress.port);
00390 addr.sin_addr.s_addr = htonl(remoteAddress.address);
00391 if (connect(fd, (struct sockaddr *)&addr, sizeof(addr)) != 0)
00392 throw DashelException(DashelException::ConnectionFailed, errno, "Cannot connect to remote host.");
00393
00394
00395 target.add(remoteAddress.format().c_str());
00396 target.erase("connectionPort");
00397 }
00398 else
00399 {
00400
00401 target.erase("sock");
00402 }
00403
00404
00405 #ifdef TCP_CORK
00406 int flag = 1;
00407 setsockopt(fd, IPPROTO_TCP, TCP_CORK, &flag , sizeof(flag));
00408 #endif
00409 }
00410
00411 virtual ~SocketStream()
00412 {
00413 if (!failed())
00414 flush();
00415
00416 if (fd >= 0)
00417 shutdown(fd, SHUT_RDWR);
00418 }
00419
00420 virtual void write(const void *data, const size_t size)
00421 {
00422 assert(fd >= 0);
00423
00424 if (size == 0)
00425 return;
00426
00427 #ifdef TCP_CORK
00428 send(data, size);
00429 #else
00430 if (size >= SEND_BUFFER_SIZE_LIMIT)
00431 {
00432 flush();
00433 send(data, size);
00434 }
00435 else
00436 {
00437 sendBuffer.add(data, size);
00438 if (sendBuffer.size() >= SEND_BUFFER_SIZE_LIMIT)
00439 flush();
00440 }
00441 #endif
00442 }
00443
00445 void send(const void *data, size_t size)
00446 {
00447 assert(fd >= 0);
00448
00449 unsigned char *ptr = (unsigned char *)data;
00450 size_t left = size;
00451
00452 while (left)
00453 {
00454 #ifdef MACOSX
00455 ssize_t len = ::send(fd, ptr, left, 0);
00456 #else
00457 ssize_t len = ::send(fd, ptr, left, MSG_NOSIGNAL);
00458 #endif
00459
00460 if (len < 0)
00461 {
00462 fail(DashelException::IOError, errno, "Socket write I/O error.");
00463 }
00464 else if (len == 0)
00465 {
00466 fail(DashelException::ConnectionLost, 0, "Connection lost.");
00467 }
00468 else
00469 {
00470 ptr += len;
00471 left -= len;
00472 }
00473 }
00474 }
00475
00476 virtual void flush()
00477 {
00478 assert(fd >= 0);
00479
00480 #ifdef TCP_CORK
00481 int flag = 0;
00482 setsockopt(fd, IPPROTO_TCP, TCP_CORK, &flag , sizeof(flag));
00483 flag = 1;
00484 setsockopt(fd, IPPROTO_TCP, TCP_CORK, &flag , sizeof(flag));
00485 #else
00486 send(sendBuffer.get(), sendBuffer.size());
00487 sendBuffer.clear();
00488 #endif
00489 }
00490
00491 virtual void read(void *data, size_t size)
00492 {
00493 assert(fd >= 0);
00494
00495 if (size == 0)
00496 return;
00497
00498 unsigned char *ptr = (unsigned char *)data;
00499 size_t left = size;
00500
00501 if (isDataInRecvBuffer())
00502 {
00503 size_t toCopy = std::min(recvBufferSize - recvBufferPos, size);
00504 memcpy(ptr, recvBuffer + recvBufferPos, toCopy);
00505 recvBufferPos += toCopy;
00506 ptr += toCopy;
00507 left -= toCopy;
00508 }
00509
00510 while (left)
00511 {
00512 ssize_t len = recv(fd, ptr, left, 0);
00513
00514 if (len < 0)
00515 {
00516 fail(DashelException::IOError, errno, "Socket read I/O error.");
00517 }
00518 else if (len == 0)
00519 {
00520 fail(DashelException::ConnectionLost, 0, "Connection lost.");
00521 }
00522 else
00523 {
00524 ptr += len;
00525 left -= len;
00526 }
00527 }
00528 }
00529
00530 virtual bool receiveDataAndCheckDisconnection()
00531 {
00532 assert(recvBufferPos == recvBufferSize);
00533
00534 ssize_t len = recv(fd, &recvBuffer, RECV_BUFFER_SIZE, 0);
00535 if (len > 0)
00536 {
00537 recvBufferSize = len;
00538 recvBufferPos = 0;
00539 return false;
00540 }
00541 else
00542 {
00543 if (len < 0)
00544 fail(DashelException::IOError, errno, "Socket read I/O error.");
00545 return true;
00546 }
00547 }
00548 };
00549
00551
00554 class SocketServerStream : public SelectableStream
00555 {
00556 public:
00558 SocketServerStream(const std::string& targetName) :
00559 Stream("tcpin"),
00560 SelectableStream("tcpin")
00561 {
00562 target.add("tcpin:port=5000;address=0.0.0.0");
00563 target.add(targetName.c_str());
00564
00565 IPV4Address bindAddress(target.get("address"), target.get<int>("port"));
00566
00567
00568 fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
00569 if (fd < 0)
00570 throw DashelException(DashelException::ConnectionFailed, errno, "Cannot create socket.");
00571
00572
00573 int flag = 1;
00574 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (flag)) < 0)
00575 throw DashelException(DashelException::ConnectionFailed, errno, "Cannot set address reuse flag on socket, probably the port is already in use.");
00576
00577
00578 sockaddr_in addr;
00579 addr.sin_family = AF_INET;
00580 addr.sin_port = htons(bindAddress.port);
00581 addr.sin_addr.s_addr = htonl(bindAddress.address);
00582 if (bind(fd, (struct sockaddr *)&addr, sizeof(addr)) != 0)
00583 throw DashelException(DashelException::ConnectionFailed, errno, "Cannot bind socket to port, probably the port is already in use.");
00584
00585
00586 if(listen(fd, 16) < 0)
00587 throw DashelException(DashelException::ConnectionFailed, errno, "Cannot listen on socket.");
00588 }
00589
00590 virtual void write(const void *data, const size_t size) { }
00591 virtual void flush() { }
00592 virtual void read(void *data, size_t size) { }
00593 virtual bool receiveDataAndCheckDisconnection() { return false; }
00594 virtual bool isDataInRecvBuffer() const { return false; }
00595 };
00596
00598 class UDPSocketStream: public MemoryPacketStream, public SelectableStream
00599 {
00600 private:
00601 mutable bool selectWasCalled;
00602
00603 public:
00605 UDPSocketStream(const string& targetName) :
00606 Stream("udp"),
00607 MemoryPacketStream("udp"),
00608 SelectableStream("udp"),
00609 selectWasCalled(false)
00610 {
00611 target.add("udp:port=5000;address=0.0.0.0;sock=-1");
00612 target.add(targetName.c_str());
00613
00614 fd = target.get<int>("sock");
00615 if (fd < 0)
00616 {
00617
00618 fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
00619 if (fd < 0)
00620 throw DashelException(DashelException::ConnectionFailed, errno, "Cannot create socket.");
00621
00622 IPV4Address bindAddress(target.get("address"), target.get<int>("port"));
00623
00624
00625 sockaddr_in addr;
00626 addr.sin_family = AF_INET;
00627 addr.sin_port = htons(bindAddress.port);
00628 addr.sin_addr.s_addr = htonl(bindAddress.address);
00629 if (bind(fd, (struct sockaddr *)&addr, sizeof(addr)) != 0)
00630 throw DashelException(DashelException::ConnectionFailed, errno, "Cannot bind socket to port, probably the port is already in use.");
00631 }
00632 else
00633 {
00634
00635 target.erase("sock");
00636 }
00637
00638
00639 int broadcastPermission = 1;
00640 setsockopt(fd, SOL_SOCKET, SO_BROADCAST, &broadcastPermission, sizeof(broadcastPermission));
00641 }
00642
00643 virtual void send(const IPV4Address& dest)
00644 {
00645 sockaddr_in addr;
00646 addr.sin_family = AF_INET;
00647 addr.sin_port = htons(dest.port);;
00648 addr.sin_addr.s_addr = htonl(dest.address);
00649
00650 if (sendto(fd, sendBuffer.get(), sendBuffer.size(), 0, (struct sockaddr *)&addr, sizeof(addr)) != sendBuffer.size())
00651 fail(DashelException::IOError, errno, "UDP Socket write I/O error.");
00652
00653 sendBuffer.clear();
00654 }
00655
00656 virtual void receive(IPV4Address& source)
00657 {
00658 unsigned char buf[4096];
00659 sockaddr_in addr;
00660 socklen_t addrLen = sizeof(addr);
00661 ssize_t recvCount = recvfrom(fd, buf, 4096, 0, (struct sockaddr *)&addr, &addrLen);
00662 if (recvCount <= 0)
00663 fail(DashelException::ConnectionLost, errno, "UDP Socket read I/O error.");
00664
00665 receptionBuffer.resize(recvCount);
00666 std::copy(buf, buf+recvCount, receptionBuffer.begin());
00667
00668 source = IPV4Address(ntohl(addr.sin_addr.s_addr), ntohs(addr.sin_port));
00669 }
00670
00671 virtual bool receiveDataAndCheckDisconnection() { selectWasCalled = true; return false; }
00672 virtual bool isDataInRecvBuffer() const { bool ret = selectWasCalled; selectWasCalled = false; return ret; }
00673 };
00674
00675
00677 class FileDescriptorStream: public DisconnectableStream
00678 {
00679 public:
00681 FileDescriptorStream(const string& protocolName) :
00682 Stream(protocolName),
00683 DisconnectableStream(protocolName)
00684 { }
00685
00686 virtual void write(const void *data, const size_t size)
00687 {
00688 assert(fd >= 0);
00689
00690 if (size == 0)
00691 return;
00692
00693 const char *ptr = (const char *)data;
00694 size_t left = size;
00695
00696 while (left)
00697 {
00698 ssize_t len = ::write(fd, ptr, left);
00699
00700 if (len < 0)
00701 {
00702 fail(DashelException::IOError, errno, "File write I/O error.");
00703 }
00704 else if (len == 0)
00705 {
00706 fail(DashelException::ConnectionLost, 0, "File full.");
00707 }
00708 else
00709 {
00710 ptr += len;
00711 left -= len;
00712 }
00713 }
00714 }
00715
00716 virtual void flush()
00717 {
00718 assert(fd >= 0);
00719
00720 #ifdef MACOSX
00721 if (fsync(fd) < 0)
00722 #else
00723 if (fdatasync(fd) < 0)
00724 #endif
00725 {
00726 fail(DashelException::IOError, errno, "File flush error.");
00727 }
00728 }
00729
00730 virtual void read(void *data, size_t size)
00731 {
00732 assert(fd >= 0);
00733
00734 if (size == 0)
00735 return;
00736
00737 char *ptr = (char *)data;
00738 size_t left = size;
00739
00740 if (isDataInRecvBuffer())
00741 {
00742 size_t toCopy = std::min(recvBufferSize - recvBufferPos, size);
00743 memcpy(ptr, recvBuffer + recvBufferPos, toCopy);
00744 recvBufferPos += toCopy;
00745 ptr += toCopy;
00746 left -= toCopy;
00747 }
00748
00749 while (left)
00750 {
00751 ssize_t len = ::read(fd, ptr, left);
00752
00753 if (len < 0)
00754 {
00755 fail(DashelException::IOError, errno, "File read I/O error.");
00756 }
00757 else if (len == 0)
00758 {
00759 fail(DashelException::ConnectionLost, 0, "Reached end of file.");
00760 }
00761 else
00762 {
00763 ptr += len;
00764 left -= len;
00765 }
00766 }
00767 }
00768
00769 virtual bool receiveDataAndCheckDisconnection()
00770 {
00771 assert(recvBufferPos == recvBufferSize);
00772
00773 ssize_t len = ::read(fd, &recvBuffer, RECV_BUFFER_SIZE);
00774 if (len > 0)
00775 {
00776 recvBufferSize = len;
00777 recvBufferPos = 0;
00778 return false;
00779 }
00780 else
00781 {
00782 if (len < 0)
00783 fail(DashelException::IOError, errno, "File read I/O error.");
00784 return true;
00785 }
00786 }
00787 };
00788
00790 class FileStream: public FileDescriptorStream
00791 {
00792 public:
00794 FileStream(const string& targetName) :
00795 Stream("file"),
00796 FileDescriptorStream("file")
00797 {
00798 target.add("file:name;mode=read;fd=-1");
00799 target.add(targetName.c_str());
00800 fd = target.get<int>("fd");
00801 if (fd < 0)
00802 {
00803 const std::string name = target.get("name");
00804 const std::string mode = target.get("mode");
00805
00806
00807 if (mode == "read")
00808 fd = open(name.c_str(), O_RDONLY);
00809 else if (mode == "write")
00810 fd = creat(name.c_str(), S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP), writeOnly = true;
00811 else if (mode == "readwrite")
00812 fd = open(name.c_str(), O_RDWR);
00813 else
00814 throw DashelException(DashelException::InvalidTarget, 0, "Invalid file mode.");
00815 if (fd == -1)
00816 {
00817 string errorMessage = "Cannot open file " + name + " for " + mode + ".";
00818 throw DashelException(DashelException::ConnectionFailed, errno, errorMessage.c_str());
00819 }
00820 }
00821 else
00822 {
00823
00824 target.erase("fd");
00825 }
00826 }
00827 };
00828
00830 struct StdinStream: public FileStream
00831 {
00832 StdinStream(const string& targetName):
00833 Stream("file"),
00834 FileStream("file:name=/dev/stdin;mode=read;fd=0") {}
00835 };
00836
00838 struct StdoutStream: public FileStream
00839 {
00840 StdoutStream(const string& targetName):
00841 Stream("file"),
00842 FileStream("file:name=/dev/stdout;mode=write;fd=1") {}
00843 };
00844
00846 class SerialStream: public FileDescriptorStream
00847 {
00848 protected:
00849 struct termios oldtio;
00850
00851 public:
00853 SerialStream(const string& targetName) :
00854 Stream("ser"),
00855 FileDescriptorStream("ser")
00856 {
00857 target.add("ser:port=1;baud=115200;stop=1;parity=none;fc=none;bits=8");
00858 target.add(targetName.c_str());
00859 string devFileName;
00860
00861 if (target.isSet("device"))
00862 {
00863 target.addParam("device", NULL, true);
00864 target.erase("port");
00865 target.erase("name");
00866
00867 devFileName = target.get("device");
00868 }
00869 else if (target.isSet("name"))
00870 {
00871 target.addParam("name", NULL, true);
00872 target.erase("port");
00873 target.erase("device");
00874
00875
00876 std::string name = target.get("name");
00877 std::map<int, std::pair<std::string, std:: string> > ports = SerialPortEnumerator::getPorts();
00878
00879
00880 std::map<int, std::pair<std::string, std:: string> >::iterator it;
00881 for (it = ports.begin(); it != ports.end(); it++)
00882 {
00883 if (it->second.second.find(name) != std::string::npos)
00884 {
00885 devFileName = it->second.first;
00886 std::cout << "Found " << name << " on port " << devFileName << std::endl;
00887 break;
00888 }
00889 }
00890 if (devFileName.size() == 0)
00891 throw DashelException(DashelException::ConnectionFailed, 0, "The specified name could not be find among the serial ports.");
00892 }
00893 else
00894 {
00895 target.erase("device");
00896 target.erase("name");
00897
00898 std::map<int, std::pair<std::string, std::string> > ports = SerialPortEnumerator::getPorts();
00899 std::map<int, std::pair<std::string, std::string> >::const_iterator it = ports.find(target.get<int>("port"));
00900 if (it != ports.end())
00901 {
00902 devFileName = it->first;
00903 }
00904 else
00905 throw DashelException(DashelException::ConnectionFailed, 0, "The specified serial port does not exists.");
00906 }
00907
00908 fd = open(devFileName.c_str(), O_RDWR);
00909
00910 if (fd == -1)
00911 throw DashelException(DashelException::ConnectionFailed, 0, "Cannot open serial port.");
00912
00913 struct termios newtio;
00914
00915
00916 tcgetattr(fd, &oldtio);
00917 memset(&newtio, 0, sizeof(newtio));
00918
00919 newtio.c_cflag |= CLOCAL;
00920 newtio.c_cflag |= CREAD;
00921 switch (target.get<int>("bits"))
00922 {
00923 case 5: newtio.c_cflag |= CS5; break;
00924 case 6: newtio.c_cflag |= CS6; break;
00925 case 7: newtio.c_cflag |= CS7; break;
00926 case 8: newtio.c_cflag |= CS8; break;
00927 default: throw DashelException(DashelException::InvalidTarget, 0, "Invalid number of bits per character, must be 5, 6, 7, or 8.");
00928 }
00929 if (target.get("stop") == "2")
00930 newtio.c_cflag |= CSTOPB;
00931 if (target.get("fc") == "hard")
00932 newtio.c_cflag |= CRTSCTS;
00933 if (target.get("parity") != "none")
00934 {
00935 newtio.c_cflag |= PARENB;
00936 if (target.get("parity") == "odd")
00937 newtio.c_cflag |= PARODD;
00938 }
00939
00940 #ifdef MACOSX
00941 if (cfsetspeed(&newtio,target.get<int>("baud")) != 0)
00942 throw DashelException(DashelException::ConnectionFailed, errno, "Invalid baud rate.");
00943 #else
00944 switch (target.get<int>("baud"))
00945 {
00946 case 50: newtio.c_cflag |= B50; break;
00947 case 75: newtio.c_cflag |= B75; break;
00948 case 110: newtio.c_cflag |= B110; break;
00949 case 134: newtio.c_cflag |= B134; break;
00950 case 150: newtio.c_cflag |= B150; break;
00951 case 200: newtio.c_cflag |= B200; break;
00952 case 300: newtio.c_cflag |= B300; break;
00953 case 600: newtio.c_cflag |= B600; break;
00954 case 1200: newtio.c_cflag |= B1200; break;
00955 case 1800: newtio.c_cflag |= B1800; break;
00956 case 2400: newtio.c_cflag |= B2400; break;
00957 case 4800: newtio.c_cflag |= B4800; break;
00958 case 9600: newtio.c_cflag |= B9600; break;
00959 case 19200: newtio.c_cflag |= B19200; break;
00960 case 38400: newtio.c_cflag |= B38400; break;
00961 case 57600: newtio.c_cflag |= B57600; break;
00962 case 115200: newtio.c_cflag |= B115200; break;
00963 case 230400: newtio.c_cflag |= B230400; break;
00964 case 460800: newtio.c_cflag |= B460800; break;
00965 case 500000: newtio.c_cflag |= B500000; break;
00966 case 576000: newtio.c_cflag |= B576000; break;
00967 case 921600: newtio.c_cflag |= B921600; break;
00968 case 1000000: newtio.c_cflag |= B1000000; break;
00969 case 1152000: newtio.c_cflag |= B1152000; break;
00970 case 1500000: newtio.c_cflag |= B1500000; break;
00971 case 2000000: newtio.c_cflag |= B2000000; break;
00972 case 2500000: newtio.c_cflag |= B2500000; break;
00973 case 3000000: newtio.c_cflag |= B3000000; break;
00974 case 3500000: newtio.c_cflag |= B3500000; break;
00975 case 4000000: newtio.c_cflag |= B4000000; break;
00976 default: throw DashelException(DashelException::ConnectionFailed, 0, "Invalid baud rate.");
00977 }
00978 #endif
00979
00980 newtio.c_iflag = IGNPAR;
00981
00982 newtio.c_oflag = 0;
00983
00984 newtio.c_lflag = 0;
00985
00986 newtio.c_cc[VTIME] = 0;
00987 newtio.c_cc[VMIN] = 1;
00988
00989
00990 if ((tcflush(fd, TCIOFLUSH) < 0) || (tcsetattr(fd, TCSANOW, &newtio) < 0))
00991 throw DashelException(DashelException::ConnectionFailed, 0, "Cannot setup serial port. The requested baud rate might not be supported.");
00992 }
00993
00995 virtual ~SerialStream()
00996 {
00997 tcsetattr(fd, TCSANOW, &oldtio);
00998 }
00999
01000 virtual void flush()
01001 {
01002 }
01003 };
01004
01005
01006
01007
01008
01009
01010
01011
01012
01013
01014
01016
01017
01019
01020
01021
01022
01023
01024
01025
01026
01027
01029
01030
01031
01033
01034
01035
01036
01037
01038
01039
01040
01041
01042
01043
01044
01045
01046
01047
01048
01049 Hub::Hub(const bool resolveIncomingNames):
01050 resolveIncomingNames(resolveIncomingNames)
01051 {
01052 int *terminationPipes = new int[2];
01053 if (pipe(terminationPipes) != 0)
01054 abort();
01055 hTerminate = terminationPipes;
01056
01057 streamsLock = new pthread_mutex_t;
01058
01059 pthread_mutex_init((pthread_mutex_t*)streamsLock, NULL);
01060
01061
01062
01063 }
01064
01065 Hub::~Hub()
01066 {
01067
01068
01069
01070 int *terminationPipes = (int*)hTerminate;
01071 close(terminationPipes[0]);
01072 close(terminationPipes[1]);
01073 delete[] terminationPipes;
01074
01075 for (StreamsSet::iterator it = streams.begin(); it != streams.end(); ++it)
01076 delete *it;
01077
01078 pthread_mutex_destroy((pthread_mutex_t*)streamsLock);
01079
01080 delete (pthread_mutex_t*) streamsLock;
01081 }
01082
01083 Stream* Hub::connect(const std::string &target)
01084 {
01085 std::string proto, params;
01086 size_t c = target.find_first_of(':');
01087 if (c == std::string::npos)
01088 throw DashelException(DashelException::InvalidTarget, 0, "No protocol specified in target.");
01089 proto = target.substr(0, c);
01090 params = target.substr(c+1);
01091
01092 SelectableStream *s(dynamic_cast<SelectableStream*>(streamTypeRegistry.create(proto, target, *this)));
01093 if(!s)
01094 {
01095 std::string r = "Invalid protocol in target: ";
01096 r += proto;
01097 r += ", known protocol are: ";
01098 r += streamTypeRegistry.list();
01099 throw DashelException(DashelException::InvalidTarget, 0, r.c_str());
01100 }
01101
01102
01103
01104 streams.insert(s);
01105 if (proto != "tcpin")
01106 {
01107 dataStreams.insert(s);
01108 connectionCreated(s);
01109 }
01110
01111 return s;
01112 }
01113
01114 void Hub::run()
01115 {
01116 while (step(-1));
01117 }
01118
01119 bool Hub::step(const int timeout)
01120 {
01121 bool firstPoll = true;
01122 bool wasActivity = false;
01123 bool runInterrupted = false;
01124
01125 pthread_mutex_lock((pthread_mutex_t*)streamsLock);
01126
01127 do
01128 {
01129 wasActivity = false;
01130 size_t streamsCount = streams.size();
01131 valarray<struct pollfd> pollFdsArray(streamsCount+1);
01132 valarray<SelectableStream*> streamsArray(streamsCount);
01133
01134
01135 size_t i = 0;
01136 for (StreamsSet::iterator it = streams.begin(); it != streams.end(); ++it)
01137 {
01138 SelectableStream* stream = polymorphic_downcast<SelectableStream*>(*it);
01139
01140 streamsArray[i] = stream;
01141 pollFdsArray[i].fd = stream->fd;
01142 pollFdsArray[i].events = 0;
01143 if ((!stream->failed()) && (!stream->writeOnly))
01144 pollFdsArray[i].events |= POLLIN;
01145
01146 i++;
01147 }
01148
01149 int *terminationPipes = (int*)hTerminate;
01150 pollFdsArray[i].fd = terminationPipes[0];
01151 pollFdsArray[i].events = POLLIN;
01152
01153
01154 int thisPollTimeout = firstPoll ? timeout : 0;
01155 firstPoll = false;
01156
01157 pthread_mutex_unlock((pthread_mutex_t*)streamsLock);
01158
01159 #ifndef USE_POLL_EMU
01160 int ret = poll(&pollFdsArray[0], pollFdsArray.size(), thisPollTimeout);
01161 #else
01162 int ret = poll_emu(&pollFdsArray[0], pollFdsArray.size(), thisPollTimeout);
01163 #endif
01164 if (ret < 0)
01165 throw DashelException(DashelException::SyncError, errno, "Error during poll.");
01166
01167 pthread_mutex_lock((pthread_mutex_t*)streamsLock);
01168
01169
01170 for (i = 0; i < streamsCount; i++)
01171 {
01172 SelectableStream* stream = streamsArray[i];
01173
01174
01175 if (streams.find(stream) == streams.end())
01176 continue;
01177
01178 assert((pollFdsArray[i].revents & POLLNVAL) == 0);
01179
01180 if (pollFdsArray[i].revents & POLLERR)
01181 {
01182
01183 wasActivity = true;
01184
01185 try
01186 {
01187 stream->fail(DashelException::SyncError, 0, "Error on stream during poll.");
01188 }
01189 catch (DashelException e)
01190 {
01191 assert(e.stream);
01192 }
01193
01194 try
01195 {
01196 connectionClosed(stream, true);
01197 }
01198 catch (DashelException e)
01199 {
01200 assert(e.stream);
01201 }
01202
01203 closeStream(stream);
01204 }
01205 else if (pollFdsArray[i].revents & POLLHUP)
01206 {
01207
01208 wasActivity = true;
01209
01210 try
01211 {
01212 connectionClosed(stream, false);
01213 }
01214 catch (DashelException e)
01215 {
01216 assert(e.stream);
01217 }
01218
01219 closeStream(stream);
01220 }
01221 else if (pollFdsArray[i].revents & POLLIN)
01222 {
01223
01224 wasActivity = true;
01225
01226
01227 SocketServerStream* serverStream = dynamic_cast<SocketServerStream*>(stream);
01228
01229 if (serverStream)
01230 {
01231
01232 struct sockaddr_in targetAddr;
01233 socklen_t l = sizeof (targetAddr);
01234 int targetFD = accept (stream->fd, (struct sockaddr *)&targetAddr, &l);
01235 if (targetFD < 0)
01236 {
01237 pthread_mutex_unlock((pthread_mutex_t*)streamsLock);
01238 throw DashelException(DashelException::SyncError, errno, "Cannot accept new stream.");
01239 }
01240
01241
01242 ostringstream targetName;
01243 targetName << IPV4Address(ntohl(targetAddr.sin_addr.s_addr), ntohs(targetAddr.sin_port)).format(resolveIncomingNames);
01244 targetName << ";connectionPort=";
01245 targetName << atoi(serverStream->getTargetParameter("port").c_str());
01246 targetName << ";sock=";
01247 targetName << targetFD;
01248 connect(targetName.str());
01249 }
01250 else
01251 {
01252 bool streamClosed = false;
01253 try
01254 {
01255 if (stream->receiveDataAndCheckDisconnection())
01256 {
01257
01258 connectionClosed(stream, false);
01259 streamClosed = true;
01260 }
01261 else
01262 {
01263
01264 while (stream->isDataInRecvBuffer())
01265 incomingData(stream);
01266
01267 }
01268 }
01269 catch (DashelException e)
01270 {
01271
01272 assert(e.stream);
01273 }
01274
01275 if (streamClosed)
01276 closeStream(stream);
01277 }
01278 }
01279 }
01280
01281 if (pollFdsArray[i].revents)
01282 {
01283 char c;
01284 const ssize_t ret = read(pollFdsArray[i].fd, &c, 1);
01285 if (ret != 1)
01286 abort();
01287 runInterrupted = true;
01288 }
01289
01290
01291 vector<Stream*> failedStreams;
01292 for (StreamsSet::iterator it = streams.begin(); it != streams.end();++it)
01293 if ((*it)->failed())
01294 failedStreams.push_back(*it);
01295
01296 for (size_t i = 0; i < failedStreams.size(); i++)
01297 {
01298 Stream* stream = failedStreams[i];
01299 if (streams.find(stream) == streams.end())
01300 continue;
01301 if (stream->failed())
01302 {
01303 try
01304 {
01305 connectionClosed(stream, true);
01306 }
01307 catch (DashelException e)
01308 {
01309 assert(e.stream);
01310 }
01311 closeStream(stream);
01312 }
01313 }
01314 }
01315 while (wasActivity && !runInterrupted);
01316
01317 pthread_mutex_unlock((pthread_mutex_t*)streamsLock);
01318
01319 return !runInterrupted;
01320 }
01321
01322 void Hub::lock()
01323 {
01324 pthread_mutex_lock((pthread_mutex_t*)streamsLock);
01325 }
01326
01327 void Hub::unlock()
01328 {
01329 pthread_mutex_unlock((pthread_mutex_t*)streamsLock);
01330 }
01331
01332 void Hub::stop()
01333 {
01334 int *terminationPipes = (int*)hTerminate;
01335 char c = 0;
01336 const ssize_t ret = write(terminationPipes[1], &c, 1);
01337 if (ret != 1)
01338 throw DashelException(DashelException::IOError, ret, "Cannot write to termination pipe.");
01339 }
01340
01341 StreamTypeRegistry::StreamTypeRegistry()
01342 {
01343 reg("file", &createInstance<FileStream>);
01344 reg("stdin", &createInstance<StdinStream>);
01345 reg("stdout", &createInstance<StdoutStream>);
01346 reg("ser", &createInstance<SerialStream>);
01347 reg("tcpin", &createInstance<SocketServerStream>);
01348 reg("tcp", &createInstance<SocketStream>);
01349 reg("udp", &createInstance<UDPSocketStream>);
01350 }
01351
01352 StreamTypeRegistry __attribute__((init_priority(1000))) streamTypeRegistry;
01353 }