00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041 #include <string.h>
00042 #include <cassert>
00043 #include <cstdlib>
00044 #include <map>
00045 #include <vector>
00046 #include <valarray>
00047 #include <algorithm>
00048 #include <iostream>
00049 #include <sstream>
00050 #include <signal.h>
00051 #include <errno.h>
00052 #include <unistd.h>
00053 #include <termios.h>
00054 #include <fcntl.h>
00055 #include <netdb.h>
00056 #include <signal.h>
00057 #include <sys/select.h>
00058 #include <sys/time.h>
00059 #include <sys/types.h>
00060 #include <sys/stat.h>
00061 #include <pthread.h>
00062
00063 #ifdef __APPLE__
00064 #define MACOSX
00065 #endif
00066
00067 #ifdef MACOSX
00068 #define USE_POLL_EMU
00069 #endif
00070
00071 #ifdef MACOSX
00072 #include <CoreFoundation/CoreFoundation.h>
00073 #include <IOKit/IOKitLib.h>
00074 #include <IOKit/serial/IOSerialKeys.h>
00075 #endif
00076
00077 #ifdef USE_HAL
00078 #include <hal/libhal.h>
00079 #endif
00080
00081 #ifndef USE_POLL_EMU
00082 #include <poll.h>
00083 #else
00084 #include "poll_emu.h"
00085 #endif
00086
00087 #include "dashel-private.h"
00088
00089
00090
00095 namespace Dashel
00096 {
00097 using namespace std;
00098
00099
00100
00101 void Stream::fail(DashelException::Source s, int se, const char* reason)
00102 {
00103 string sysMessage;
00104 failedFlag = true;
00105
00106 if (se)
00107 sysMessage = strerror(errno);
00108
00109 failReason = reason;
00110 failReason += " ";
00111 failReason += sysMessage;
00112
00113 throw DashelException(s, se, failReason.c_str(), this);
00114 }
00115
00116
00117
00118 std::map<int, std::pair<std::string, std::string> > SerialPortEnumerator::getPorts()
00119 {
00120 std::map<int, std::pair<std::string, std::string> > ports;
00121
00122
00123
00124 #ifdef MACOSX
00125
00126
00127
00128 CFMutableDictionaryRef classesToMatch = IOServiceMatching(kIOSerialBSDServiceValue);
00129 if (classesToMatch == NULL)
00130 throw DashelException(DashelException::EnumerationError, 0, "IOServiceMatching returned a NULL dictionary");
00131
00132
00133 CFDictionarySetValue(classesToMatch, CFSTR(kIOSerialBSDTypeKey), CFSTR(kIOSerialBSDAllTypes));
00134
00135
00136 io_iterator_t matchingServices;
00137 kern_return_t kernResult = IOServiceGetMatchingServices(kIOMasterPortDefault, classesToMatch, &matchingServices);
00138 if (KERN_SUCCESS != kernResult)
00139 throw DashelException(DashelException::EnumerationError, kernResult, "IOServiceGetMatchingServices failed");
00140
00141
00142 io_object_t modemService;
00143 int index = 0;
00144 while((modemService = IOIteratorNext(matchingServices)))
00145 {
00146
00147 CFTypeRef bsdPathAsCFString = IORegistryEntryCreateCFProperty(modemService, CFSTR(kIOCalloutDeviceKey), kCFAllocatorDefault, 0);
00148 if (bsdPathAsCFString)
00149 {
00150 std::string path;
00151 char cStr[255];
00152
00153 bool res = CFStringGetCString((CFStringRef) bsdPathAsCFString, cStr, 255, kCFStringEncodingUTF8);
00154 if(res)
00155 path = cStr;
00156 else
00157 throw DashelException(DashelException::EnumerationError, 0, "CFStringGetCString failed");
00158
00159 CFRelease(bsdPathAsCFString);
00160
00161
00162
00163 ports[index++] = std::make_pair<std::string, std::string>(path, path);
00164 }
00165 else
00166 throw DashelException(DashelException::EnumerationError, 0, "IORegistryEntryCreateCFProperty returned a NULL path");
00167
00168
00169 IOObjectRelease(modemService);
00170 }
00171
00172 IOObjectRelease(matchingServices);
00173
00174
00175 #elif defined(USE_HAL)
00176
00177
00178 DBusConnection* dbusConnection = dbus_bus_get(DBUS_BUS_SYSTEM, 0);
00179 if (!dbusConnection)
00180 throw DashelException(DashelException::EnumerationError, 0, "cannot connect to D-BUS.");
00181
00182 LibHalContext* halContext = libhal_ctx_new();
00183 if (!halContext)
00184 throw DashelException(DashelException::EnumerationError, 0, "cannot create HAL context: cannot create context");
00185 if (!libhal_ctx_set_dbus_connection(halContext, dbusConnection))
00186 throw DashelException(DashelException::EnumerationError, 0, "cannot create HAL context: cannot connect to D-BUS");
00187 if (!libhal_ctx_init(halContext, 0))
00188 throw DashelException(DashelException::EnumerationError, 0, "cannot create HAL context: cannot init context");
00189
00190 int devicesCount;
00191 char** devices = libhal_find_device_by_capability(halContext, "serial", &devicesCount, 0);
00192 for (int i = 0; i < devicesCount; i++)
00193 {
00194 char* devFileName = libhal_device_get_property_string(halContext, devices[i], "serial.device", 0);
00195 char* info = libhal_device_get_property_string(halContext, devices[i], "info.product", 0);
00196 int port = libhal_device_get_property_int(halContext, devices[i], "serial.port", 0);
00197
00198 ostringstream oss;
00199 oss << info << " " << port;
00200 ports[devicesCount - i] = std::make_pair<std::string, std::string>(devFileName, oss.str());
00201
00202 libhal_free_string(info);
00203 libhal_free_string(devFileName);
00204 }
00205
00206 libhal_free_string_array(devices);
00207 libhal_ctx_shutdown(halContext, 0);
00208 libhal_ctx_free(halContext);
00209 #endif
00210
00211 return ports;
00212 };
00213
00214
00215
00217 template<typename Derived, typename Base>
00218 inline Derived polymorphic_downcast(Base base)
00219 {
00220 Derived derived = dynamic_cast<Derived>(base);
00221 assert(derived);
00222 return derived;
00223 }
00224
00225
00226
00227 #define RECV_BUFFER_SIZE 4096
00228
00230 class SelectableStream: virtual public Stream
00231 {
00232 protected:
00233 int fd;
00234 bool writeOnly;
00235 friend class Hub;
00236
00237 public:
00239 SelectableStream(const string& protocolName) :
00240 Stream(protocolName),
00241 fd(-1),
00242 writeOnly(false)
00243 {
00244
00245 }
00246
00247 virtual ~SelectableStream()
00248 {
00249
00250 if (fd >= 3)
00251 close(fd);
00252 }
00253
00255 virtual bool receiveDataAndCheckDisconnection() = 0;
00256
00258 virtual bool isDataInRecvBuffer() const = 0;
00259 };
00260
00262 class DisconnectableStream: public SelectableStream
00263 {
00264 protected:
00265 friend class Hub;
00266 unsigned char recvBuffer[RECV_BUFFER_SIZE];
00267 size_t recvBufferPos;
00268 size_t recvBufferSize;
00269
00270 public:
00272 DisconnectableStream(const string& protocolName) :
00273 Stream(protocolName),
00274 SelectableStream(protocolName),
00275 recvBufferPos(0),
00276 recvBufferSize(0)
00277 {
00278
00279 }
00280
00282 virtual bool isDataInRecvBuffer() const { return recvBufferPos != recvBufferSize; }
00283 };
00284
00286 class SocketStream: public DisconnectableStream
00287 {
00288 protected:
00289 #ifndef TCP_CORK
00290
00291 enum Consts
00292 {
00293 SEND_BUFFER_SIZE_INITIAL = 256,
00294 SEND_BUFFER_SIZE_LIMIT = 65536
00295 };
00296
00297 ExpandableBuffer sendBuffer;
00298 #endif
00299
00300 public:
00302 SocketStream(const string& targetName) :
00303 Stream("tcp"),
00304 DisconnectableStream("tcp")
00305 #ifndef TCP_CORK
00306 ,sendBuffer(SEND_BUFFER_SIZE_INITIAL)
00307 #endif
00308 {
00309 target.add("tcp:host;port;connectionPort=-1;sock=-1");
00310 target.add(targetName.c_str());
00311
00312 fd = target.get<int>("sock");
00313 if (fd < 0)
00314 {
00315
00316 fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
00317 if (fd < 0)
00318 throw DashelException(DashelException::ConnectionFailed, errno, "Cannot create socket.");
00319
00320 IPV4Address remoteAddress(target.get("host"), target.get<int>("port"));
00321
00322
00323 sockaddr_in addr;
00324 addr.sin_family = AF_INET;
00325 addr.sin_port = htons(remoteAddress.port);
00326 addr.sin_addr.s_addr = htonl(remoteAddress.address);
00327 if (connect(fd, (struct sockaddr *)&addr, sizeof(addr)) != 0)
00328 throw DashelException(DashelException::ConnectionFailed, errno, "Cannot connect to remote host.");
00329
00330
00331 target.add(remoteAddress.format().c_str());
00332 target.erase("connectionPort");
00333 }
00334 else
00335 {
00336
00337 target.erase("sock");
00338 }
00339
00340
00341 #ifdef TCP_CORK
00342 int flag = 1;
00343 setsockopt(fd, IPPROTO_TCP, TCP_CORK, &flag , sizeof(flag));
00344 #endif
00345 }
00346
00347 virtual ~SocketStream()
00348 {
00349 if (!failed())
00350 flush();
00351
00352 if (fd >= 0)
00353 shutdown(fd, SHUT_RDWR);
00354 }
00355
00356 virtual void write(const void *data, const size_t size)
00357 {
00358 assert(fd >= 0);
00359
00360 if (size == 0)
00361 return;
00362
00363 #ifdef TCP_CORK
00364 send(data, size);
00365 #else
00366 if (size >= SEND_BUFFER_SIZE_LIMIT)
00367 {
00368 flush();
00369 send(data, size);
00370 }
00371 else
00372 {
00373 sendBuffer.add(data, size);
00374 if (sendBuffer.size() >= SEND_BUFFER_SIZE_LIMIT)
00375 flush();
00376 }
00377 #endif
00378 }
00379
00381 void send(const void *data, size_t size)
00382 {
00383 assert(fd >= 0);
00384
00385 unsigned char *ptr = (unsigned char *)data;
00386 size_t left = size;
00387
00388 while (left)
00389 {
00390 #ifdef MACOSX
00391 ssize_t len = ::send(fd, ptr, left, 0);
00392 #else
00393 ssize_t len = ::send(fd, ptr, left, MSG_NOSIGNAL);
00394 #endif
00395
00396 if (len < 0)
00397 {
00398 fail(DashelException::IOError, errno, "Socket write I/O error.");
00399 }
00400 else if (len == 0)
00401 {
00402 fail(DashelException::ConnectionLost, 0, "Connection lost.");
00403 }
00404 else
00405 {
00406 ptr += len;
00407 left -= len;
00408 }
00409 }
00410 }
00411
00412 virtual void flush()
00413 {
00414 assert(fd >= 0);
00415
00416 #ifdef TCP_CORK
00417 int flag = 0;
00418 setsockopt(fd, IPPROTO_TCP, TCP_CORK, &flag , sizeof(flag));
00419 flag = 1;
00420 setsockopt(fd, IPPROTO_TCP, TCP_CORK, &flag , sizeof(flag));
00421 #else
00422 send(sendBuffer.get(), sendBuffer.size());
00423 sendBuffer.clear();
00424 #endif
00425 }
00426
00427 virtual void read(void *data, size_t size)
00428 {
00429 assert(fd >= 0);
00430
00431 if (size == 0)
00432 return;
00433
00434 unsigned char *ptr = (unsigned char *)data;
00435 size_t left = size;
00436
00437 if (isDataInRecvBuffer())
00438 {
00439 size_t toCopy = std::min(recvBufferSize - recvBufferPos, size);
00440 memcpy(ptr, recvBuffer + recvBufferPos, toCopy);
00441 recvBufferPos += toCopy;
00442 ptr += toCopy;
00443 left -= toCopy;
00444 }
00445
00446 while (left)
00447 {
00448 ssize_t len = recv(fd, ptr, left, 0);
00449
00450 if (len < 0)
00451 {
00452 fail(DashelException::IOError, errno, "Socket read I/O error.");
00453 }
00454 else if (len == 0)
00455 {
00456 fail(DashelException::ConnectionLost, 0, "Connection lost.");
00457 }
00458 else
00459 {
00460 ptr += len;
00461 left -= len;
00462 }
00463 }
00464 }
00465
00466 virtual bool receiveDataAndCheckDisconnection()
00467 {
00468 assert(recvBufferPos == recvBufferSize);
00469
00470 ssize_t len = recv(fd, &recvBuffer, RECV_BUFFER_SIZE, 0);
00471 if (len > 0)
00472 {
00473 recvBufferSize = len;
00474 recvBufferPos = 0;
00475 return false;
00476 }
00477 else
00478 {
00479 if (len < 0)
00480 fail(DashelException::IOError, errno, "Socket read I/O error.");
00481 return true;
00482 }
00483 }
00484 };
00485
00487
00490 class SocketServerStream : public SelectableStream
00491 {
00492 public:
00494 SocketServerStream(const std::string& targetName) :
00495 Stream("tcpin"),
00496 SelectableStream("tcpin")
00497 {
00498 target.add("tcpin:port=5000;address=0.0.0.0");
00499 target.add(targetName.c_str());
00500
00501 IPV4Address bindAddress(target.get("address"), target.get<int>("port"));
00502
00503
00504 fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
00505 if (fd < 0)
00506 throw DashelException(DashelException::ConnectionFailed, errno, "Cannot create socket.");
00507
00508
00509 int flag = 1;
00510 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (flag)) < 0)
00511 throw DashelException(DashelException::ConnectionFailed, errno, "Cannot set address reuse flag on socket, probably the port is already in use.");
00512
00513
00514 sockaddr_in addr;
00515 addr.sin_family = AF_INET;
00516 addr.sin_port = htons(bindAddress.port);
00517 addr.sin_addr.s_addr = htonl(bindAddress.address);
00518 if (bind(fd, (struct sockaddr *)&addr, sizeof(addr)) != 0)
00519 throw DashelException(DashelException::ConnectionFailed, errno, "Cannot bind socket to port, probably the port is already in use.");
00520
00521
00522 if(listen(fd, 16) < 0)
00523 throw DashelException(DashelException::ConnectionFailed, errno, "Cannot listen on socket.");
00524 }
00525
00526 virtual void write(const void *data, const size_t size) { }
00527 virtual void flush() { }
00528 virtual void read(void *data, size_t size) { }
00529 virtual bool receiveDataAndCheckDisconnection() { return false; }
00530 virtual bool isDataInRecvBuffer() const { return false; }
00531 };
00532
00534 class UDPSocketStream: public MemoryPacketStream, public SelectableStream
00535 {
00536 private:
00537 mutable bool selectWasCalled;
00538
00539 public:
00541 UDPSocketStream(const string& targetName) :
00542 Stream("udp"),
00543 MemoryPacketStream("udp"),
00544 SelectableStream("udp"),
00545 selectWasCalled(false)
00546 {
00547 target.add("udp:port=5000;address=0.0.0.0;sock=-1");
00548 target.add(targetName.c_str());
00549
00550 fd = target.get<int>("sock");
00551 if (fd < 0)
00552 {
00553
00554 fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
00555 if (fd < 0)
00556 throw DashelException(DashelException::ConnectionFailed, errno, "Cannot create socket.");
00557
00558 IPV4Address bindAddress(target.get("address"), target.get<int>("port"));
00559
00560
00561 sockaddr_in addr;
00562 addr.sin_family = AF_INET;
00563 addr.sin_port = htons(bindAddress.port);
00564 addr.sin_addr.s_addr = htonl(bindAddress.address);
00565 if (bind(fd, (struct sockaddr *)&addr, sizeof(addr)) != 0)
00566 throw DashelException(DashelException::ConnectionFailed, errno, "Cannot bind socket to port, probably the port is already in use.");
00567 }
00568 else
00569 {
00570
00571 target.erase("sock");
00572 }
00573
00574
00575 int broadcastPermission = 1;
00576 setsockopt(fd, SOL_SOCKET, SO_BROADCAST, &broadcastPermission, sizeof(broadcastPermission));
00577 }
00578
00579 virtual void send(const IPV4Address& dest)
00580 {
00581 sockaddr_in addr;
00582 addr.sin_family = AF_INET;
00583 addr.sin_port = htons(dest.port);;
00584 addr.sin_addr.s_addr = htonl(dest.address);
00585
00586 if (sendto(fd, sendBuffer.get(), sendBuffer.size(), 0, (struct sockaddr *)&addr, sizeof(addr)) != sendBuffer.size())
00587 fail(DashelException::IOError, errno, "UDP Socket write I/O error.");
00588
00589 sendBuffer.clear();
00590 }
00591
00592 virtual void receive(IPV4Address& source)
00593 {
00594 unsigned char buf[4006];
00595 sockaddr_in addr;
00596 socklen_t addrLen = sizeof(addr);
00597 ssize_t recvCount = recvfrom(fd, buf, 4096, 0, (struct sockaddr *)&addr, &addrLen);
00598 if (recvCount <= 0)
00599 fail(DashelException::ConnectionLost, errno, "UDP Socket read I/O error.");
00600
00601 receptionBuffer.resize(recvCount);
00602 std::copy(buf, buf+recvCount, receptionBuffer.begin());
00603
00604 source = IPV4Address(ntohl(addr.sin_addr.s_addr), ntohs(addr.sin_port));
00605 }
00606
00607 virtual bool receiveDataAndCheckDisconnection() { selectWasCalled = true; return false; }
00608 virtual bool isDataInRecvBuffer() const { bool ret = selectWasCalled; selectWasCalled = false; return ret; }
00609 };
00610
00611
00613 class FileDescriptorStream: public DisconnectableStream
00614 {
00615 public:
00617 FileDescriptorStream(const string& protocolName) :
00618 Stream(protocolName),
00619 DisconnectableStream(protocolName)
00620 { }
00621
00622 virtual void write(const void *data, const size_t size)
00623 {
00624 assert(fd >= 0);
00625
00626 if (size == 0)
00627 return;
00628
00629 const char *ptr = (const char *)data;
00630 size_t left = size;
00631
00632 while (left)
00633 {
00634 ssize_t len = ::write(fd, ptr, left);
00635
00636 if (len < 0)
00637 {
00638 fail(DashelException::IOError, errno, "File write I/O error.");
00639 }
00640 else if (len == 0)
00641 {
00642 fail(DashelException::ConnectionLost, 0, "File full.");
00643 }
00644 else
00645 {
00646 ptr += len;
00647 left -= len;
00648 }
00649 }
00650 }
00651
00652 virtual void flush()
00653 {
00654 assert(fd >= 0);
00655
00656 #ifdef MACOSX
00657 if (fsync(fd) < 0)
00658 #else
00659 if (fdatasync(fd) < 0)
00660 #endif
00661 {
00662 fail(DashelException::IOError, errno, "File flush error.");
00663 }
00664 }
00665
00666 virtual void read(void *data, size_t size)
00667 {
00668 assert(fd >= 0);
00669
00670 if (size == 0)
00671 return;
00672
00673 char *ptr = (char *)data;
00674 size_t left = size;
00675
00676 if (isDataInRecvBuffer())
00677 {
00678 size_t toCopy = std::min(recvBufferSize - recvBufferPos, size);
00679 memcpy(ptr, recvBuffer + recvBufferPos, toCopy);
00680 recvBufferPos += toCopy;
00681 ptr += toCopy;
00682 left -= toCopy;
00683 }
00684
00685 while (left)
00686 {
00687 ssize_t len = ::read(fd, ptr, left);
00688
00689 if (len < 0)
00690 {
00691 fail(DashelException::IOError, errno, "File read I/O error.");
00692 }
00693 else if (len == 0)
00694 {
00695 fail(DashelException::ConnectionLost, 0, "Reached end of file.");
00696 }
00697 else
00698 {
00699 ptr += len;
00700 left -= len;
00701 }
00702 }
00703 }
00704
00705 virtual bool receiveDataAndCheckDisconnection()
00706 {
00707 assert(recvBufferPos == recvBufferSize);
00708
00709 ssize_t len = ::read(fd, &recvBuffer, RECV_BUFFER_SIZE);
00710 if (len > 0)
00711 {
00712 recvBufferSize = len;
00713 recvBufferPos = 0;
00714 return false;
00715 }
00716 else
00717 {
00718 if (len < 0)
00719 fail(DashelException::IOError, errno, "File read I/O error.");
00720 return true;
00721 }
00722 }
00723 };
00724
00726 class FileStream: public FileDescriptorStream
00727 {
00728 public:
00730 FileStream(const string& targetName) :
00731 Stream("file"),
00732 FileDescriptorStream("file")
00733 {
00734 target.add("file:name;mode=read;fd=-1");
00735 target.add(targetName.c_str());
00736 fd = target.get<int>("fd");
00737 if (fd < 0)
00738 {
00739 const std::string name = target.get("name");
00740 const std::string mode = target.get("mode");
00741
00742
00743 if (mode == "read")
00744 fd = open(name.c_str(), O_RDONLY);
00745 else if (mode == "write")
00746 fd = creat(name.c_str(), S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP), writeOnly = true;
00747 else if (mode == "readwrite")
00748 fd = open(name.c_str(), O_RDWR);
00749 else
00750 throw DashelException(DashelException::InvalidTarget, 0, "Invalid file mode.");
00751 if (fd == -1)
00752 {
00753 string errorMessage = "Cannot open file " + name + " for " + mode + ".";
00754 throw DashelException(DashelException::ConnectionFailed, errno, errorMessage.c_str());
00755 }
00756 }
00757 else
00758 {
00759
00760 target.erase("fd");
00761 }
00762 }
00763 };
00764
00766 class SerialStream: public FileDescriptorStream
00767 {
00768 protected:
00769 struct termios oldtio;
00770
00771 public:
00773 SerialStream(const string& targetName) :
00774 Stream("ser"),
00775 FileDescriptorStream("ser")
00776 {
00777 target.add("ser:port=1;baud=115200;stop=1;parity=none;fc=none;bits=8");
00778 target.add(targetName.c_str());
00779 string devFileName;
00780
00781 if (target.isSet("device"))
00782 {
00783 target.addParam("device", NULL, true);
00784 target.erase("port");
00785
00786 devFileName = target.get("device");
00787 }
00788 else
00789 {
00790 target.erase("device");
00791
00792 std::map<int, std::pair<std::string, std::string> > ports = SerialPortEnumerator::getPorts();
00793 std::map<int, std::pair<std::string, std::string> >::const_iterator it = ports.find(target.get<int>("port"));
00794 if (it != ports.end())
00795 {
00796 devFileName = it->first;
00797 }
00798 else
00799 throw DashelException(DashelException::ConnectionFailed, 0, "The specified serial port does not exists.");
00800 }
00801
00802 fd = open(devFileName.c_str(), O_RDWR);
00803
00804 if (fd == -1)
00805 throw DashelException(DashelException::ConnectionFailed, 0, "Cannot open serial port.");
00806
00807 struct termios newtio;
00808
00809
00810 tcgetattr(fd, &oldtio);
00811 memset(&newtio, 0, sizeof(newtio));
00812
00813 newtio.c_cflag |= CLOCAL;
00814 newtio.c_cflag |= CREAD;
00815 switch (target.get<int>("bits"))
00816 {
00817 case 5: newtio.c_cflag |= CS5; break;
00818 case 6: newtio.c_cflag |= CS6; break;
00819 case 7: newtio.c_cflag |= CS7; break;
00820 case 8: newtio.c_cflag |= CS8; break;
00821 default: throw DashelException(DashelException::InvalidTarget, 0, "Invalid number of bits per character, must be 5, 6, 7, or 8.");
00822 }
00823 if (target.get("stop") == "2")
00824 newtio.c_cflag |= CSTOPB;
00825 if (target.get("fc") == "hard")
00826 newtio.c_cflag |= CRTSCTS;
00827 if (target.get("parity") != "none")
00828 {
00829 newtio.c_cflag |= PARENB;
00830 if (target.get("parity") == "odd")
00831 newtio.c_cflag |= PARODD;
00832 }
00833
00834 #ifdef MACOSX
00835 if (cfsetspeed(&newtio,target.get<int>("baud")) != 0)
00836 throw DashelException(DashelException::ConnectionFailed, errno, "Invalid baud rate.");
00837 #else
00838 switch (target.get<int>("baud"))
00839 {
00840 case 50: newtio.c_cflag |= B50; break;
00841 case 75: newtio.c_cflag |= B75; break;
00842 case 110: newtio.c_cflag |= B110; break;
00843 case 134: newtio.c_cflag |= B134; break;
00844 case 150: newtio.c_cflag |= B150; break;
00845 case 200: newtio.c_cflag |= B200; break;
00846 case 300: newtio.c_cflag |= B300; break;
00847 case 600: newtio.c_cflag |= B600; break;
00848 case 1200: newtio.c_cflag |= B1200; break;
00849 case 1800: newtio.c_cflag |= B1800; break;
00850 case 2400: newtio.c_cflag |= B2400; break;
00851 case 4800: newtio.c_cflag |= B4800; break;
00852 case 9600: newtio.c_cflag |= B9600; break;
00853 case 19200: newtio.c_cflag |= B19200; break;
00854 case 38400: newtio.c_cflag |= B38400; break;
00855 case 57600: newtio.c_cflag |= B57600; break;
00856 case 115200: newtio.c_cflag |= B115200; break;
00857 case 230400: newtio.c_cflag |= B230400; break;
00858 case 460800: newtio.c_cflag |= B460800; break;
00859 case 500000: newtio.c_cflag |= B500000; break;
00860 case 576000: newtio.c_cflag |= B576000; break;
00861 case 921600: newtio.c_cflag |= B921600; break;
00862 case 1000000: newtio.c_cflag |= B1000000; break;
00863 case 1152000: newtio.c_cflag |= B1152000; break;
00864 case 1500000: newtio.c_cflag |= B1500000; break;
00865 case 2000000: newtio.c_cflag |= B2000000; break;
00866 case 2500000: newtio.c_cflag |= B2500000; break;
00867 case 3000000: newtio.c_cflag |= B3000000; break;
00868 case 3500000: newtio.c_cflag |= B3500000; break;
00869 case 4000000: newtio.c_cflag |= B4000000; break;
00870 default: throw DashelException(DashelException::ConnectionFailed, 0, "Invalid baud rate.");
00871 }
00872 #endif
00873
00874 newtio.c_iflag = IGNPAR;
00875
00876 newtio.c_oflag = 0;
00877
00878 newtio.c_lflag = 0;
00879
00880 newtio.c_cc[VTIME] = 0;
00881 newtio.c_cc[VMIN] = 1;
00882
00883
00884 if ((tcflush(fd, TCIOFLUSH) < 0) || (tcsetattr(fd, TCSANOW, &newtio) < 0))
00885 throw DashelException(DashelException::ConnectionFailed, 0, "Cannot setup serial port. The requested baud rate might not be supported.");
00886 }
00887
00889 virtual ~SerialStream()
00890 {
00891 tcsetattr(fd, TCSANOW, &oldtio);
00892 }
00893
00894 virtual void flush()
00895 {
00896 }
00897 };
00898
00899
00900
00901
00902
00903
00904
00905
00906
00907
00908
00910
00911
00913
00914
00915
00916
00917
00918
00919
00920
00921
00923
00924
00925
00927
00928
00929
00930
00931
00932
00933
00934
00935
00936
00937
00938
00939
00940
00941
00942
00943 Hub::Hub(const bool resolveIncomingNames):
00944 resolveIncomingNames(resolveIncomingNames)
00945 {
00946 int *terminationPipes = new int[2];
00947 if (pipe(terminationPipes) != 0)
00948 abort();
00949 hTerminate = terminationPipes;
00950
00951 streamsLock = new pthread_mutex_t;
00952
00953 pthread_mutex_init((pthread_mutex_t*)streamsLock, NULL);
00954
00955
00956
00957 }
00958
00959 Hub::~Hub()
00960 {
00961
00962
00963
00964 int *terminationPipes = (int*)hTerminate;
00965 close(terminationPipes[0]);
00966 close(terminationPipes[1]);
00967 delete[] terminationPipes;
00968
00969 for (StreamsSet::iterator it = streams.begin(); it != streams.end(); ++it)
00970 delete *it;
00971
00972 pthread_mutex_destroy((pthread_mutex_t*)streamsLock);
00973
00974 delete (pthread_mutex_t*) streamsLock;
00975 }
00976
00977 Stream* Hub::connect(const std::string &target)
00978 {
00979 std::string proto, params;
00980 size_t c = target.find_first_of(':');
00981 if (c == std::string::npos)
00982 throw DashelException(DashelException::InvalidTarget, 0, "No protocol specified in target.");
00983 proto = target.substr(0, c);
00984 params = target.substr(c+1);
00985
00986 SelectableStream *s = NULL;
00987 if(proto == "file")
00988 s = new FileStream(target);
00989 if(proto == "stdin")
00990 s = new FileStream("file:name=/dev/stdin;mode=read;fd=0");
00991 if(proto == "stdout")
00992 s = new FileStream("file:name=/dev/stdout;mode=write;fd=1");
00993 if(proto == "ser")
00994 s = new SerialStream(target);
00995 if(proto == "tcpin")
00996 s = new SocketServerStream(target);
00997 if(proto == "tcp")
00998 s = new SocketStream(target);
00999 if(proto == "udp")
01000 s = new UDPSocketStream(target);
01001
01002 if(!s)
01003 {
01004 std::string r = "Invalid protocol in target: ";
01005 r = r.append(proto);
01006 throw DashelException(DashelException::InvalidTarget, 0, r.c_str());
01007 }
01008
01009
01010
01011 streams.insert(s);
01012 if (proto != "tcpin")
01013 {
01014 dataStreams.insert(s);
01015 connectionCreated(s);
01016 }
01017
01018 return s;
01019 }
01020
01021 void Hub::run()
01022 {
01023 while (step(-1));
01024 }
01025
01026 bool Hub::step(const int timeout)
01027 {
01028 bool firstPoll = true;
01029 bool wasActivity = false;
01030 bool runInterrupted = false;
01031
01032 pthread_mutex_lock((pthread_mutex_t*)streamsLock);
01033
01034 do
01035 {
01036 wasActivity = false;
01037 size_t streamsCount = streams.size();
01038 valarray<struct pollfd> pollFdsArray(streamsCount+1);
01039 valarray<SelectableStream*> streamsArray(streamsCount);
01040
01041
01042 size_t i = 0;
01043 for (StreamsSet::iterator it = streams.begin(); it != streams.end(); ++it)
01044 {
01045 SelectableStream* stream = polymorphic_downcast<SelectableStream*>(*it);
01046
01047 streamsArray[i] = stream;
01048 pollFdsArray[i].fd = stream->fd;
01049 pollFdsArray[i].events = 0;
01050 if ((!stream->failed()) && (!stream->writeOnly))
01051 pollFdsArray[i].events |= POLLIN;
01052
01053 i++;
01054 }
01055
01056 int *terminationPipes = (int*)hTerminate;
01057 pollFdsArray[i].fd = terminationPipes[0];
01058 pollFdsArray[i].events = POLLIN;
01059
01060
01061 int thisPollTimeout = firstPoll ? timeout : 0;
01062 firstPoll = false;
01063
01064 pthread_mutex_unlock((pthread_mutex_t*)streamsLock);
01065
01066 #ifndef USE_POLL_EMU
01067 int ret = poll(&pollFdsArray[0], pollFdsArray.size(), thisPollTimeout);
01068 #else
01069 int ret = poll_emu(&pollFdsArray[0], pollFdsArray.size(), thisPollTimeout);
01070 #endif
01071 if (ret < 0)
01072 throw DashelException(DashelException::SyncError, errno, "Error during poll.");
01073
01074 pthread_mutex_lock((pthread_mutex_t*)streamsLock);
01075
01076
01077 for (i = 0; i < streamsCount; i++)
01078 {
01079 SelectableStream* stream = streamsArray[i];
01080
01081
01082 if (streams.find(stream) == streams.end())
01083 continue;
01084
01085 assert((pollFdsArray[i].revents & POLLNVAL) == 0);
01086
01087 if (pollFdsArray[i].revents & POLLERR)
01088 {
01089
01090 wasActivity = true;
01091
01092 try
01093 {
01094 stream->fail(DashelException::SyncError, 0, "Error on stream during poll.");
01095 }
01096 catch (DashelException e)
01097 {
01098 assert(e.stream);
01099 }
01100
01101 try
01102 {
01103 connectionClosed(stream, true);
01104 }
01105 catch (DashelException e)
01106 {
01107 assert(e.stream);
01108 }
01109
01110 closeStream(stream);
01111 }
01112 else if (pollFdsArray[i].revents & POLLHUP)
01113 {
01114
01115 wasActivity = true;
01116
01117 try
01118 {
01119 connectionClosed(stream, false);
01120 }
01121 catch (DashelException e)
01122 {
01123 assert(e.stream);
01124 }
01125
01126 closeStream(stream);
01127 }
01128 else if (pollFdsArray[i].revents & POLLIN)
01129 {
01130
01131 wasActivity = true;
01132
01133
01134 SocketServerStream* serverStream = dynamic_cast<SocketServerStream*>(stream);
01135
01136 if (serverStream)
01137 {
01138
01139 struct sockaddr_in targetAddr;
01140 socklen_t l = sizeof (targetAddr);
01141 int targetFD = accept (stream->fd, (struct sockaddr *)&targetAddr, &l);
01142 if (targetFD < 0)
01143 throw DashelException(DashelException::SyncError, errno, "Cannot accept new stream.");
01144
01145
01146 ostringstream targetName;
01147 targetName << IPV4Address(ntohl(targetAddr.sin_addr.s_addr), ntohs(targetAddr.sin_port)).format(resolveIncomingNames);
01148 targetName << ";connectionPort=";
01149 targetName << atoi(serverStream->getTargetParameter("port").c_str());
01150 targetName << ";sock=";
01151 targetName << targetFD;
01152 connect(targetName.str());
01153 }
01154 else
01155 {
01156 bool streamClosed = false;
01157 try
01158 {
01159 if (stream->receiveDataAndCheckDisconnection())
01160 {
01161
01162 connectionClosed(stream, false);
01163 streamClosed = true;
01164 }
01165 else
01166 {
01167
01168 while (stream->isDataInRecvBuffer())
01169 incomingData(stream);
01170
01171 }
01172 }
01173 catch (DashelException e)
01174 {
01175
01176 assert(e.stream);
01177 }
01178
01179 if (streamClosed)
01180 closeStream(stream);
01181 }
01182 }
01183 }
01184
01185 if (pollFdsArray[i].revents)
01186 {
01187 char c;
01188 read(pollFdsArray[i].fd, &c, 1);
01189 runInterrupted = true;
01190 }
01191
01192
01193 vector<Stream*> failedStreams;
01194 for (StreamsSet::iterator it = streams.begin(); it != streams.end();++it)
01195 if ((*it)->failed())
01196 failedStreams.push_back(*it);
01197
01198 for (size_t i = 0; i < failedStreams.size(); i++)
01199 {
01200 Stream* stream = failedStreams[i];
01201 if (streams.find(stream) == streams.end())
01202 continue;
01203 if (stream->failed())
01204 {
01205 try
01206 {
01207 connectionClosed(stream, true);
01208 }
01209 catch (DashelException e)
01210 {
01211 assert(e.stream);
01212 }
01213 closeStream(stream);
01214 }
01215 }
01216 }
01217 while (wasActivity && !runInterrupted);
01218
01219 pthread_mutex_unlock((pthread_mutex_t*)streamsLock);
01220
01221 return !runInterrupted;
01222 }
01223
01224 void Hub::lock()
01225 {
01226 pthread_mutex_lock((pthread_mutex_t*)streamsLock);
01227 }
01228
01229 void Hub::unlock()
01230 {
01231 pthread_mutex_unlock((pthread_mutex_t*)streamsLock);
01232 }
01233
01234 void Hub::stop()
01235 {
01236 int *terminationPipes = (int*)hTerminate;
01237 char c = 0;
01238 write(terminationPipes[1], &c, 1);
01239 }
01240 }