$search
00001 /* 00002 Dashel 00003 A cross-platform DAta Stream Helper Encatargetulation Library 00004 Copyright (C) 2007 -- 2012: 00005 00006 Stephane Magnenat <stephane at magnenat dot net> 00007 (http://stephane.magnenat.net) 00008 Mobots group - Laboratory of Robotics Systems, EPFL, Lausanne 00009 (http://mobots.epfl.ch) 00010 00011 Sebastian Gerlach 00012 Kenzan Technologies 00013 (http://www.kenzantech.com) 00014 00015 All rights reserved. 00016 00017 Redistribution and use in source and binary forms, with or without 00018 modification, are permitted provided that the following conditions are met: 00019 * Redistributions of source code must retain the above copyright 00020 notice, this list of conditions and the following disclaimer. 00021 * Redistributions in binary form must reproduce the above copyright 00022 notice, this list of conditions and the following disclaimer in the 00023 documentation and/or other materials provided with the distribution. 00024 * Neither the names of "Mobots", "Laboratory of Robotics Systems", "EPFL", 00025 "Kenzan Technologies" nor the names of the contributors may be used to 00026 endorse or promote products derived from this software without specific 00027 prior written permission. 00028 00029 THIS SOFTWARE IS PROVIDED BY COPYRIGHT HOLDERS ``AS IS'' AND ANY 00030 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 00031 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 00032 DISCLAIMED. IN NO EVENT SHALL COPYRIGHT HOLDERS BE LIABLE FOR ANY 00033 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 00034 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00035 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND 00036 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 00037 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 00038 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00039 */ 00040 00041 #include "dashel.h" 00042 00043 #include <cassert> 00044 #include <cstdlib> 00045 #include <malloc.h> 00046 #include <map> 00047 #include <vector> 00048 #include <algorithm> 00049 #include <iostream> 00050 #include <sstream> 00051 00052 #ifdef _MSC_VER 00053 #pragma comment(lib, "ws2_32.lib") 00054 #pragma comment(lib, "wbemuuid.lib") 00055 #pragma comment(lib, "comsuppw.lib") 00056 #endif // _MSC_VER 00057 00058 #define _WIN32_WINNT 0x0501 00059 #include <winsock2.h> 00060 #include <windows.h> 00061 #include <setupapi.h> 00062 #include <devguid.h> 00063 #include <regstr.h> 00064 #include <winnls.h> 00065 00066 #pragma warning(disable:4996) 00067 00068 #include "dashel-private.h" 00069 00073 namespace Dashel 00074 { 00076 typedef enum { 00077 EvData, 00078 EvPotentialData, 00079 EvClosed, 00080 EvConnect, 00081 } EvType; 00082 00084 template<typename Derived, typename Base> 00085 inline Derived polymorphic_downcast(Base base) 00086 { 00087 Derived derived = dynamic_cast<Derived>(base); 00088 assert(derived); 00089 return derived; 00090 } 00091 00092 void Stream::fail(DashelException::Source s, int se, const char* reason) 00093 { 00094 char sysMessage[1024] = {0}; 00095 failedFlag = true; 00096 00097 if (se) 00098 FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL, se, 0, sysMessage, 1024, NULL); 00099 00100 failReason = reason; 00101 failReason += " "; 00102 failReason += sysMessage; 00103 00104 throw DashelException(s, se, failReason.c_str(), this); 00105 } 00106 00107 // Serial port enumerator 00108 00109 std::map<int, std::pair<std::string, std::string> > SerialPortEnumerator::getPorts() 00110 { 00111 std::map<int, std::pair<std::string, std::string> > ports; 00112 00113 // Mainly based on http://support.microsoft.com/kb/259695 00114 HDEVINFO hDevInfo; 00115 SP_DEVINFO_DATA DeviceInfoData; 00116 DWORD i; 00117 char* co; 00118 char dn[1024], dcn[1024]; 00119 00120 // Create a HDEVINFO with all present ports. 00121 hDevInfo = SetupDiGetClassDevs(&GUID_DEVCLASS_PORTS, 0, 0, DIGCF_PRESENT ); 00122 00123 if (hDevInfo == INVALID_HANDLE_VALUE) 00124 throw DashelException(DashelException::EnumerationError, GetLastError(), "Cannot list serial port devices."); 00125 00126 // Enumerate through all devices in Set. 00127 DeviceInfoData.cbSize = sizeof(SP_DEVINFO_DATA); 00128 for (i = 0; SetupDiEnumDeviceInfo(hDevInfo, i, &DeviceInfoData); i++) 00129 { 00130 DWORD DataT; 00131 LPTSTR buffer = NULL; 00132 DWORD buffersize = 0; 00133 00134 // Call function with null to begin with, then use the returned buffer size (doubled) to Alloc the buffer. Keep calling until 00135 // success or an unknown failure. 00136 // Double the returned buffersize to correct for underlying legacy CM functions that return an incorrect buffersize value on 00137 // DBCS/MBCS systems. 00138 while (!SetupDiGetDeviceRegistryPropertyW(hDevInfo, &DeviceInfoData, SPDRP_FRIENDLYNAME, &DataT, (PBYTE)buffer, buffersize, &buffersize)) 00139 { 00140 if (GetLastError() == ERROR_INSUFFICIENT_BUFFER) 00141 { 00142 // Change the buffer size. 00143 if (buffer) 00144 LocalFree(buffer); 00145 // Double the size to avoid problems on 00146 // W2k MBCS systems per KB 888609. 00147 buffer = (LPTSTR)LocalAlloc(LPTR,buffersize * 2); 00148 } 00149 else 00150 throw DashelException(DashelException::EnumerationError, GetLastError(), "Cannot get serial port properties."); 00151 } 00152 00153 WideCharToMultiByte(CP_UTF8, 0, (LPCWSTR)buffer, -1, dn, 1024, NULL, NULL); 00154 00155 // Filter to get only the COMx ports 00156 if((co = strstr(dn, "(COM"))) 00157 { 00158 strcpy(dcn, co+1); 00159 strtok(dcn, ")"); 00160 00161 int v = atoi(&dcn[3]); 00162 00163 if(v > 0 && v < 256) 00164 { 00165 std::string name = std::string("\\\\.\\").append(dcn); 00166 ports.insert(std::pair<int, std::pair<std::string, std::string> >(v, std::pair<std::string, std::string> (name, dn))); 00167 } 00168 } 00169 00170 00171 if (buffer) 00172 LocalFree(buffer); 00173 } 00174 00175 // Error ? 00176 if ( GetLastError()!=NO_ERROR && GetLastError()!=ERROR_NO_MORE_ITEMS ) 00177 throw DashelException(DashelException::EnumerationError, GetLastError(), "Error while enumerating serial port devices."); 00178 00179 // Cleanup 00180 SetupDiDestroyDeviceInfoList(hDevInfo); 00181 00182 return ports; 00183 }; 00184 00185 00187 void startWinSock() 00188 { 00189 bool started = false; 00190 if(!started) 00191 { 00192 WORD ver = 0x0101; 00193 WSADATA d; 00194 memset(&d, 0, sizeof(d)); 00195 00196 int rv = WSAStartup(ver, &d); 00197 if(rv) 00198 throw DashelException(DashelException::Unknown, rv, "Could not start WinSock service."); 00199 started = true; 00200 } 00201 } 00202 00204 class WaitableStream: virtual public Stream 00205 { 00206 public: 00208 00210 std::map<EvType,HANDLE> hEvents; 00211 00213 bool readDone; 00214 00215 protected: 00217 HANDLE hEOF; 00218 00220 00222 HANDLE createEvent(EvType t) 00223 { 00224 HANDLE he = CreateEvent(NULL, FALSE, FALSE, NULL); 00225 hEvents[t] = he; 00226 return he; 00227 } 00228 00230 00233 void addEvent(EvType t, HANDLE he) 00234 { 00235 hEvents[t] = he; 00236 } 00237 00238 public: 00240 WaitableStream(const std::string& protocolName) : Stream(protocolName) 00241 { 00242 hEOF = createEvent(EvClosed); 00243 } 00244 00246 00248 virtual ~WaitableStream() 00249 { 00250 for(std::map<EvType, HANDLE>::iterator it = hEvents.begin(); it != hEvents.end(); ++it) 00251 CloseHandle(it->second); 00252 } 00253 00255 00258 virtual void notifyEvent(Hub *srv, EvType& t) { } 00259 }; 00260 00262 00265 class SocketServerStream : public WaitableStream 00266 { 00267 protected: 00269 SOCKET sock; 00270 00272 HANDLE hev; 00273 00275 const bool resolveIncomingNames; 00276 00277 public: 00278 00280 SocketServerStream(const std::string& params, const Hub& hub) : Stream("tcpin"), WaitableStream("tcpin"), 00281 resolveIncomingNames(hub.resolveIncomingNames) 00282 { 00283 target.add("tcpin:port=5000;address=0.0.0.0"); 00284 target.add(params.c_str()); 00285 00286 startWinSock(); 00287 00288 IPV4Address bindAddress(target.get("address"), target.get<int>("port")); 00289 00290 // Create socket. 00291 sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); 00292 if (sock == SOCKET_ERROR) 00293 throw DashelException(DashelException::ConnectionFailed, WSAGetLastError(), "Cannot create socket."); 00294 00295 // Reuse address. 00296 int flag = 1; 00297 if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (const char *)&flag, sizeof (flag)) < 0) 00298 throw DashelException(DashelException::ConnectionFailed, WSAGetLastError(), "Cannot set address reuse flag on socket, probably the port is already in use."); 00299 00300 // Bind socket. 00301 sockaddr_in addr; 00302 addr.sin_family = AF_INET; 00303 addr.sin_port = htons(bindAddress.port); 00304 addr.sin_addr.s_addr = htonl(bindAddress.address); 00305 if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) != 0) 00306 throw DashelException(DashelException::ConnectionFailed, WSAGetLastError(), "Cannot bind socket to port, probably the port is already in use."); 00307 00308 // Listen on socket, backlog is sort of arbitrary. 00309 if(listen(sock, 16) == SOCKET_ERROR) 00310 throw DashelException(DashelException::ConnectionFailed, WSAGetLastError(), "Cannot listen on socket."); 00311 00312 // Create and register event. 00313 hev = createEvent(EvConnect); 00314 WSAEventSelect(sock, hev, FD_ACCEPT); 00315 } 00316 00318 ~SocketServerStream() 00319 { 00320 if(sock) 00321 closesocket(sock); 00322 } 00323 00325 00328 virtual void notifyEvent(Hub *srv, EvType& t) 00329 { 00330 if(t == EvConnect) 00331 { 00332 // Accept incoming connection. 00333 struct sockaddr_in targetAddr; 00334 int l = sizeof (targetAddr); 00335 SOCKET trg = accept (sock, (struct sockaddr *)&targetAddr, &l); 00336 if (trg == SOCKET_ERROR) 00337 { 00338 fail(DashelException::ConnectionFailed, WSAGetLastError(), "Cannot accept incoming connection on socket."); 00339 } 00340 00341 // create stream 00342 std::string ls = IPV4Address(ntohl(targetAddr.sin_addr.s_addr), ntohs(targetAddr.sin_port)).format(resolveIncomingNames); 00343 00344 std::ostringstream buf; 00345 buf << ";connectionPort="; 00346 buf << atoi(getTargetParameter("port").c_str()); 00347 buf << ";sock="; 00348 buf << (int)trg; 00349 ls.append(buf.str()); 00350 srv->connect(ls); 00351 } 00352 } 00353 00354 virtual void write(const void *data, const size_t size) { } 00355 virtual void flush() { } 00356 virtual void read(void *data, size_t size) { } 00357 }; 00358 00360 00363 class StdinStream : public WaitableStream 00364 { 00365 protected: 00367 HANDLE hf; 00368 00370 HANDLE hev; 00371 00372 public: 00373 00375 StdinStream(const std::string& params) : Stream("stdin"), WaitableStream("stdin") 00376 { 00377 target.add(params.c_str()); 00378 00379 if((hf = GetStdHandle(STD_INPUT_HANDLE)) == INVALID_HANDLE_VALUE) 00380 throw DashelException(DashelException::ConnectionFailed, GetLastError(), "Cannot open standard input."); 00381 00382 DWORD cm; 00383 GetConsoleMode(hf, &cm); 00384 cm &= ~(ENABLE_ECHO_INPUT | ENABLE_LINE_INPUT); 00385 if(!SetConsoleMode(hf, cm)) 00386 throw DashelException(DashelException::ConnectionFailed, GetLastError(), "Cannot change standard input mode to immediate."); 00387 00388 // Create events. 00389 addEvent(EvPotentialData, hf); 00390 hev = createEvent(EvData); 00391 } 00392 00394 ~StdinStream() 00395 { 00396 CloseHandle(hf); 00397 } 00398 00400 00402 virtual void notifyEvent(Hub *srv, EvType& t) 00403 { 00404 DWORD n = 0; 00405 if(GetNumberOfConsoleInputEvents(hf, &n)) 00406 { 00407 if(n > 0) 00408 { 00409 INPUT_RECORD ir; 00410 PeekConsoleInput(hf, &ir, 1, &n); 00411 if(ir.EventType != KEY_EVENT) 00412 ReadConsoleInput(hf, &ir, 1, &n); 00413 else 00414 { 00415 t = EvData; 00416 } 00417 } 00418 } 00419 } 00420 00422 virtual void write(const void *data, const size_t size) 00423 { 00424 throw DashelException(DashelException::InvalidOperation, GetLastError(), "Cannot write to standard input.", this); 00425 } 00426 00428 virtual void flush() 00429 { 00430 throw DashelException(DashelException::InvalidOperation, GetLastError(), "Cannot flush standard input.", this); 00431 } 00432 00433 virtual void read(void *data, size_t size) 00434 { 00435 char *ptr = (char *)data; 00436 DWORD left = (DWORD)size; 00437 00438 // Quick check to make sure nobody is giving us funny 64-bit stuff. 00439 assert(left == size); 00440 00441 readDone = true; 00442 00443 while (left) 00444 { 00445 DWORD len = 0; 00446 BOOL r; 00447 00448 // Blocking write. 00449 if((r = ReadFile(hf, ptr, left, &len, NULL)) == 0) 00450 { 00451 fail(DashelException::IOError, GetLastError(), "Read error from standard input."); 00452 } 00453 else 00454 { 00455 ptr += len; 00456 left -= len; 00457 } 00458 } 00459 } 00460 00461 }; 00462 00464 class StdoutStream : public WaitableStream 00465 { 00466 protected: 00468 HANDLE hf; 00469 00470 public: 00471 00473 StdoutStream(const std::string& params) : Stream("stdout"), WaitableStream("stdout") 00474 { 00475 target.add(params.c_str()); 00476 00477 if((hf = GetStdHandle(STD_OUTPUT_HANDLE)) == INVALID_HANDLE_VALUE) 00478 throw DashelException(DashelException::ConnectionFailed, GetLastError(), "Cannot open standard output."); 00479 } 00480 00482 ~StdoutStream() 00483 { 00484 CloseHandle(hf); 00485 } 00486 00487 virtual void write(const void *data, const size_t size) 00488 { 00489 const char *ptr = (const char *)data; 00490 DWORD left = (DWORD)size; 00491 00492 // Quick check to make sure nobody is giving us funny 64-bit stuff. 00493 assert(left == size); 00494 00495 while (left) 00496 { 00497 DWORD len = 0; 00498 BOOL r; 00499 00500 // Blocking write. 00501 if((r = WriteFile(hf, ptr, left, &len, NULL)) == 0) 00502 { 00503 fail(DashelException::IOError, GetLastError(), "Write error to standard output."); 00504 } 00505 else 00506 { 00507 ptr += len; 00508 left -= len; 00509 } 00510 } 00511 } 00512 00513 virtual void flush() 00514 { 00515 FlushFileBuffers(hf); 00516 } 00517 00518 virtual void read(void *data, size_t size) 00519 { 00520 fail(DashelException::InvalidOperation, GetLastError(), "Cannot read from standard output."); 00521 } 00522 00523 }; 00524 00526 class FileStream : public WaitableStream 00527 { 00528 protected: 00530 HANDLE hf; 00531 00533 OVERLAPPED ovl; 00534 00536 DWORD writeOffset; 00537 00539 00543 bool readyToRead; 00544 00546 char readByte; 00547 00549 bool readByteAvailable; 00550 00551 protected: 00553 00555 FileStream(const std::string& protocolName, bool dummy) : Stream(protocolName), WaitableStream(protocolName) { } 00556 00558 void startStream(EvType et = EvData) 00559 { 00560 readByteAvailable = false; 00561 memset(&ovl, 0, sizeof(ovl)); 00562 ovl.hEvent = createEvent(et); 00563 BOOL r = ReadFile(hf, &readByte, 1, NULL, &ovl); 00564 if(!r) 00565 { 00566 DWORD err = GetLastError(); 00567 if(err != ERROR_IO_PENDING) 00568 throw DashelException(DashelException::IOError, GetLastError(), "Cannot read from file stream."); 00569 } 00570 else 00571 readByteAvailable = true; 00572 } 00573 00574 public: 00575 00577 FileStream(const std::string& params) : Stream("file"), WaitableStream("file") 00578 { 00579 target.add("file:name;mode=read"); 00580 target.add(params.c_str()); 00581 std::string name = target.get("name"); 00582 std::string mode = target.get("mode"); 00583 00584 hf = NULL; 00585 if (mode == "read") 00586 { 00587 hf = CreateFile(name.c_str(), GENERIC_READ, FILE_SHARE_READ, NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL); 00588 startStream(); 00589 } 00590 else if (mode == "write") 00591 { 00592 writeOffset = 0; 00593 hf = CreateFile(name.c_str(), GENERIC_WRITE, FILE_SHARE_READ | FILE_SHARE_WRITE, NULL, CREATE_ALWAYS, FILE_FLAG_OVERLAPPED, NULL); 00594 } 00595 else if (mode == "readwrite") 00596 { 00597 writeOffset = 0; 00598 hf = CreateFile(name.c_str(), GENERIC_WRITE | GENERIC_READ, FILE_SHARE_READ | FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, FILE_FLAG_OVERLAPPED, NULL); 00599 startStream(); 00600 } 00601 if(hf == INVALID_HANDLE_VALUE) 00602 throw DashelException(DashelException::ConnectionFailed, GetLastError(), "Cannot open file."); 00603 } 00604 00606 ~FileStream() 00607 { 00608 CloseHandle(hf); 00609 } 00610 00611 virtual void write(const void *data, const size_t size) 00612 { 00613 const char *ptr = (const char *)data; 00614 const unsigned int RETRY_LIMIT = 3; 00615 DWORD left = (DWORD)size; 00616 unsigned int retry = 0; 00617 00618 // Quick check to make sure nobody is giving us funny 64-bit stuff. 00619 assert(left == size); 00620 00621 while (left) 00622 { 00623 DWORD len = 0; 00624 OVERLAPPED o; 00625 memset(&o, 0, sizeof(o)); 00626 00627 o.Offset = writeOffset; 00628 o.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); 00629 00630 // Blocking write. 00631 BOOL r = WriteFile(hf, ptr, left, &len, &o); 00632 if(!r) 00633 { 00634 DWORD err; 00635 switch((err = GetLastError())) 00636 { 00637 case ERROR_IO_PENDING: 00638 GetOverlappedResult(hf, &o, &len, TRUE); 00639 if (len == 0) 00640 { 00641 if (retry++ >= RETRY_LIMIT) 00642 { 00643 SetEvent(hEOF); 00644 fail(DashelException::IOError, GetLastError(), "Cannot write to file (max retry reached)."); 00645 } 00646 else 00647 continue; 00648 } 00649 ptr += len; 00650 left -= len; 00651 break; 00652 00653 default: 00654 fail(DashelException::IOError, GetLastError(), "Cannot write to file."); 00655 break; 00656 } 00657 } 00658 else 00659 { 00660 ptr += len; 00661 left -= len; 00662 } 00663 00664 writeOffset += len; 00665 } 00666 } 00667 00668 virtual void flush() 00669 { 00670 FlushFileBuffers(hf); 00671 } 00672 00673 virtual void read(void *data, size_t size) 00674 { 00675 char *ptr = (char *)data; 00676 DWORD left = (DWORD)size; 00677 00678 // Quick check to make sure nobody is giving us funny 64-bit stuff. 00679 assert(left == size); 00680 00681 if (size == 0) 00682 return; 00683 00684 readDone = true; 00685 00686 if(!readByteAvailable) 00687 WaitForSingleObject(ovl.hEvent, INFINITE); 00688 00689 DWORD dataUsed; 00690 if(!GetOverlappedResult(hf, &ovl, &dataUsed, TRUE)) 00691 fail(DashelException::IOError, GetLastError(), "File read I/O error."); 00692 00693 if(dataUsed) 00694 { 00695 *ptr++ = readByte; 00696 left--; 00697 } 00698 readByteAvailable = false; 00699 00700 while (left) 00701 { 00702 DWORD len = 0; 00703 OVERLAPPED o; 00704 memset(&o, 0, sizeof(o)); 00705 o.Offset = ovl.Offset + size - left; 00706 o.hEvent = ovl.hEvent ; 00707 00708 // Non-blocking read. 00709 BOOL r = ReadFile(hf, ptr, left, &len, &o); 00710 if(!r) 00711 { 00712 DWORD err; 00713 switch((err = GetLastError())) 00714 { 00715 case ERROR_HANDLE_EOF: 00716 fail(DashelException::ConnectionLost, GetLastError(), "Reached end of file."); 00717 break; 00718 00719 case ERROR_IO_PENDING: 00720 WaitForSingleObject(ovl.hEvent, INFINITE); 00721 if(!GetOverlappedResult(hf, &o, &len, TRUE)) 00722 fail(DashelException::IOError, GetLastError(), "File read I/O error."); 00723 if(len == 0) 00724 return; 00725 00726 ptr += len; 00727 left -= len; 00728 break; 00729 00730 default: 00731 fail(DashelException::IOError, GetLastError(), "File read I/O error."); 00732 break; 00733 } 00734 } 00735 else 00736 { 00737 WaitForSingleObject(ovl.hEvent, INFINITE); 00738 ptr += len; 00739 left -= len; 00740 } 00741 } 00742 00743 // Reset our blocking read for whatever is up next. 00744 ovl.Offset += (DWORD)size; 00745 BOOL r = ReadFile(hf, &readByte, 1, &dataUsed, &ovl); 00746 if(!r) 00747 { 00748 DWORD err = GetLastError(); 00749 if(err == ERROR_HANDLE_EOF) 00750 { 00751 SetEvent(hEOF); 00752 } 00753 else if(err != ERROR_IO_PENDING) 00754 { 00755 fail(DashelException::IOError, GetLastError(), "Cannot read from file stream."); 00756 } 00757 } 00758 else 00759 readByteAvailable = true; 00760 00761 } 00762 00764 00766 virtual void notifyEvent(Hub *srv, EvType& t) 00767 { 00768 if(t == EvPotentialData) 00769 { 00770 DWORD dataUsed; 00771 GetOverlappedResult(hf, &ovl, &dataUsed, TRUE); 00772 if(dataUsed == 0) 00773 ReadFile(hf, &readByte, 1, NULL, &ovl); 00774 else 00775 { 00776 readByteAvailable = true; 00777 t = EvData; 00778 } 00779 } 00780 } 00781 }; 00782 00784 class SerialStream : public FileStream 00785 { 00786 private: 00788 00794 bool buildDCB(HANDLE sp, int speed, int bits, const std::string& parity, const std::string& stopbits, const std::string& fc) 00795 { 00796 DCB dcb; 00797 00798 memset(&dcb, 0, sizeof(dcb)); 00799 dcb.DCBlength = sizeof(dcb); 00800 00801 if(!GetCommState(sp, &dcb)) 00802 throw DashelException(DashelException::ConnectionFailed, GetLastError(), "Cannot read current serial port state.", this); 00803 00804 // Fill in the DCB 00805 memset(&dcb,0,sizeof(dcb)); 00806 dcb.DCBlength = sizeof(dcb); 00807 if(fc == "hard") 00808 { 00809 dcb.fOutxCtsFlow = TRUE; 00810 dcb.fRtsControl = RTS_CONTROL_HANDSHAKE; 00811 } 00812 else 00813 { 00814 dcb.fOutxCtsFlow = FALSE; 00815 dcb.fRtsControl = RTS_CONTROL_DISABLE; 00816 } 00817 00818 dcb.fOutxDsrFlow = FALSE; 00819 dcb.fDtrControl = DTR_CONTROL_ENABLE; 00820 dcb.fDsrSensitivity = FALSE; 00821 dcb.fBinary = TRUE; 00822 dcb.fParity = TRUE; 00823 dcb.BaudRate = speed; 00824 dcb.ByteSize = bits; 00825 if(parity == "even") 00826 dcb.Parity = EVENPARITY; 00827 else if(parity == "odd") 00828 dcb.Parity = ODDPARITY; 00829 else if(parity == "space") 00830 dcb.Parity = SPACEPARITY; 00831 else if(parity == "mark") 00832 dcb.Parity = MARKPARITY; 00833 else 00834 dcb.Parity = NOPARITY; 00835 00836 if(stopbits == "1.5") 00837 dcb.StopBits = ONE5STOPBITS; 00838 else if(stopbits == "2") 00839 dcb.StopBits = TWOSTOPBITS; 00840 else 00841 dcb.StopBits = ONESTOPBIT; 00842 00843 // Set the com port state. 00844 if(!SetCommState(sp, &dcb)) 00845 throw DashelException(DashelException::ConnectionFailed, GetLastError(), "Cannot set new serial port state.", this); 00846 00847 // Set timeouts as well for good measure. Since we will effectively be woken whenever 00848 // this happens, keep it long. 00849 COMMTIMEOUTS cto; 00850 memset(&cto, 0, sizeof(cto)); 00851 cto.ReadIntervalTimeout = 100000; 00852 cto.ReadTotalTimeoutConstant = 100000; 00853 cto.ReadTotalTimeoutMultiplier = 100000; 00854 cto.WriteTotalTimeoutConstant = 100000; 00855 cto.WriteTotalTimeoutMultiplier = 100000; 00856 if(!SetCommTimeouts(sp, &cto)) 00857 throw DashelException(DashelException::ConnectionFailed, GetLastError(), "Cannot set new serial port timeouts.", this); 00858 00859 return true; 00860 } 00861 00862 public: 00864 00866 SerialStream(const std::string& params) : Stream("ser"), FileStream("ser", true) 00867 { 00868 target.add("ser:port=1;baud=115200;stop=1;parity=none;fc=none;bits=8"); 00869 target.add(params.c_str()); 00870 00871 std::string devName; 00872 if (target.isSet("device")) 00873 { 00874 target.addParam("device", NULL, true); 00875 target.erase("port"); 00876 00877 devName = target.get("device"); 00878 } 00879 else if (target.isSet("name")) 00880 { 00881 target.addParam("name", NULL, true); 00882 target.erase("port"); 00883 target.erase("device"); 00884 00885 // Enumerates the ports 00886 std::string name = target.get("name"); 00887 std::map<int, std::pair<std::string, std:: string> > ports = SerialPortEnumerator::getPorts(); 00888 00889 // Iterate on all ports to found one with "name" in its description 00890 std::map<int, std::pair<std::string, std:: string> >::iterator it; 00891 for (it = ports.begin(); it != ports.end(); it++) 00892 { 00893 if (it->second.second.find(name) != std::string::npos) 00894 { 00895 devName = it->second.first; 00896 std::cout << "Found " << name << " on port " << devName << std::endl; 00897 break; 00898 } 00899 } 00900 if (devName.size() == 0) 00901 throw DashelException(DashelException::ConnectionFailed, 0, "The specified name could not be find among the serial ports."); 00902 } 00903 else 00904 { 00905 target.erase("device"); 00906 00907 devName = std::string("\\\\.\\COM").append(target.get("port")); 00908 } 00909 00910 hf = CreateFile(devName.c_str(), GENERIC_WRITE | GENERIC_READ, 0, NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL); 00911 if(hf == INVALID_HANDLE_VALUE) 00912 throw DashelException(DashelException::ConnectionFailed, GetLastError(), "Cannot open serial port."); 00913 00914 buildDCB(hf, target.get<int>("baud"), target.get<int>("bits"), target.get("parity"), target.get("stop"), target.get("fc")); 00915 00916 startStream(EvPotentialData); 00917 } 00918 }; 00919 00921 class SocketStream : public WaitableStream 00922 { 00924 SOCKET sock; 00925 00927 HANDLE hev; 00928 00930 HANDLE hev2; 00931 00933 00937 bool readyToRead; 00938 00940 char readByte; 00941 00943 bool readByteAvailable; 00944 00945 public: 00947 00949 SocketStream(const std::string& params) : Stream("tcp"), WaitableStream("tcp") 00950 { 00951 target.add("tcp:host;port;connectionPort=-1;sock=0"); 00952 target.add(params.c_str()); 00953 00954 sock = target.get<SOCKET>("sock"); 00955 if(!sock) 00956 { 00957 startWinSock(); 00958 00959 // create socket 00960 sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); 00961 if (sock == SOCKET_ERROR) 00962 throw DashelException(DashelException::ConnectionFailed, WSAGetLastError(), "Cannot create socket."); 00963 00964 IPV4Address remoteAddress(target.get("host"), target.get<int>("port")); 00965 // connect 00966 sockaddr_in addr; 00967 addr.sin_family = AF_INET; 00968 addr.sin_port = htons(remoteAddress.port); 00969 addr.sin_addr.s_addr = htonl(remoteAddress.address); 00970 if (connect(sock, (struct sockaddr *)&addr, sizeof(addr)) != 0) 00971 throw DashelException(DashelException::ConnectionFailed, WSAGetLastError(), "Cannot connect to remote host."); 00972 00973 // overwrite target name with a canonical one 00974 target.add(remoteAddress.format().c_str()); 00975 target.erase("connectionPort"); 00976 } 00977 else 00978 { 00979 // remove file descriptor information from target name 00980 target.erase("sock"); 00981 } 00982 00983 hev2 = createEvent(EvData); 00984 hev = createEvent(EvPotentialData); 00985 00986 int rv = WSAEventSelect(sock, hev, FD_READ | FD_CLOSE); 00987 if (rv == SOCKET_ERROR) 00988 throw DashelException(DashelException::ConnectionFailed, WSAGetLastError(), "Cannot select socket events."); 00989 00990 readyToRead = false; 00991 readByteAvailable = false; 00992 } 00993 00994 ~SocketStream() 00995 { 00996 shutdown(sock, SD_BOTH); 00997 closesocket(sock); 00998 } 00999 01001 01003 virtual void notifyEvent(Hub *srv, EvType& t) 01004 { 01005 if(t == EvPotentialData) 01006 { 01007 if(readByteAvailable) 01008 return; 01009 01010 int rv = recv(sock, &readByte, 1, 0); 01011 if(rv <= 0) 01012 { 01013 t = EvClosed; 01014 } 01015 else 01016 { 01017 readByteAvailable = true; 01018 readyToRead = true; 01019 t = EvData; 01020 } 01021 } 01022 } 01023 01024 virtual void write(const void *data, const size_t size) 01025 { 01026 char *ptr = (char *)data; 01027 size_t left = size; 01028 01029 while (left) 01030 { 01031 int len = send(sock, ptr, (int)left, 0); 01032 01033 if (len == SOCKET_ERROR) 01034 { 01035 fail(DashelException::ConnectionLost, GetLastError(), "Connection lost on write."); 01036 } 01037 else 01038 { 01039 ptr += len; 01040 left -= len; 01041 } 01042 } 01043 } 01044 01045 virtual void flush() { } 01046 01047 virtual void read(void *data, size_t size) 01048 { 01049 char *ptr = (char *)data; 01050 size_t left = size; 01051 01052 if (size == 0) 01053 return; 01054 01055 readDone = true; 01056 01057 //std::cerr << "ready to read " << readyToRead << std::endl; 01058 if(!readyToRead) 01059 { 01060 // Block until something happens. 01061 WaitForSingleObject(hev, INFINITE); 01062 } 01063 readyToRead = false; 01064 01065 if(readByteAvailable) 01066 { 01067 *ptr++ = readByte; 01068 readByteAvailable = false; 01069 left--; 01070 if(left) 01071 WaitForSingleObject(hev, INFINITE); 01072 } 01073 01074 while (left) 01075 { 01076 //std::cerr << "ready to recv " << std::endl; 01077 int len = recv(sock, ptr, (int)left, 0); 01078 //std::cerr << "recv done " << std::endl; 01079 01080 if (len == SOCKET_ERROR) 01081 { 01082 //std::cerr << "socket error" << std::endl; 01083 fail(DashelException::ConnectionLost, GetLastError(), "Connection lost on read."); 01084 } 01085 else if(len == 0) 01086 { 01087 // We have been disconnected. 01088 } 01089 else 01090 { 01091 ptr += len; 01092 left -= len; 01093 } 01094 if(left) 01095 { 01096 // Wait for more data. 01097 WaitForSingleObject(hev, INFINITE); 01098 } 01099 } 01100 01101 /* int rv = WSAEventSelect(sock, hev, FD_READ | FD_CLOSE); 01102 if (rv == SOCKET_ERROR) 01103 throw DashelException(DashelException::ConnectionFailed, WSAGetLastError(), "Cannot select socket events."); 01104 */ 01105 } 01106 }; 01107 01109 class UDPSocketStream: public MemoryPacketStream, public WaitableStream 01110 { 01111 private: 01113 SOCKET sock; 01114 01116 HANDLE hev; 01117 01118 public: 01120 UDPSocketStream(const std::string& targetName) : 01121 Stream("udp"), 01122 MemoryPacketStream("udp"), 01123 WaitableStream("udp") 01124 { 01125 target.add("udp:port=5000;address=0.0.0.0;sock=0"); 01126 target.add(targetName.c_str()); 01127 01128 sock = target.get<SOCKET>("sock"); 01129 if(!sock) 01130 { 01131 startWinSock(); 01132 01133 // create socket 01134 sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); 01135 if (sock == SOCKET_ERROR) 01136 throw DashelException(DashelException::ConnectionFailed, WSAGetLastError(), "Cannot create socket."); 01137 01138 IPV4Address bindAddress(target.get("address"), target.get<int>("port")); 01139 01140 // bind 01141 sockaddr_in addr; 01142 addr.sin_family = AF_INET; 01143 addr.sin_port = htons(bindAddress.port); 01144 addr.sin_addr.s_addr = htonl(bindAddress.address); 01145 if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) != 0) 01146 throw DashelException(DashelException::ConnectionFailed, WSAGetLastError(), "Cannot bind socket to port, probably the port is already in use."); 01147 } 01148 else 01149 { 01150 // remove file descriptor information from target name 01151 target.erase("sock"); 01152 } 01153 01154 // enable broadcast 01155 int broadcastPermission = 1; 01156 setsockopt(sock, SOL_SOCKET, SO_BROADCAST, (const char*)&broadcastPermission, sizeof(broadcastPermission)); 01157 01158 // Create and register event. 01159 hev = createEvent(EvData); 01160 01161 int rv = WSAEventSelect(sock, hev, FD_READ); 01162 if (rv == SOCKET_ERROR) 01163 throw DashelException(DashelException::ConnectionFailed, WSAGetLastError(), "Cannot select socket events."); 01164 } 01165 01166 virtual ~UDPSocketStream() 01167 { 01168 closesocket(sock); 01169 } 01170 01171 virtual void send(const IPV4Address& dest) 01172 { 01173 sockaddr_in addr; 01174 addr.sin_family = AF_INET; 01175 addr.sin_port = htons(dest.port);; 01176 addr.sin_addr.s_addr = htonl(dest.address); 01177 01178 if (sendto(sock, (const char*)sendBuffer.get(), sendBuffer.size(), 0, (struct sockaddr *)&addr, sizeof(addr)) != sendBuffer.size()) 01179 fail(DashelException::IOError, WSAGetLastError(), "UDP Socket write I/O error."); 01180 01181 sendBuffer.clear(); 01182 } 01183 01184 virtual void receive(IPV4Address& source) 01185 { 01186 unsigned char buf[4006]; 01187 sockaddr_in addr; 01188 int addrLen = sizeof(addr); 01189 readDone = true; 01190 01191 int recvCount = recvfrom(sock, (char*)buf, 4096, 0, (struct sockaddr *)&addr, &addrLen); 01192 if (recvCount <= 0) 01193 fail(DashelException::ConnectionLost, WSAGetLastError(), "UDP Socket read I/O error."); 01194 01195 receptionBuffer.resize(recvCount); 01196 std::copy(buf, buf+recvCount, receptionBuffer.begin()); 01197 01198 source = IPV4Address(ntohl(addr.sin_addr.s_addr), ntohs(addr.sin_port)); 01199 } 01200 }; 01201 01202 Hub::Hub(const bool resolveIncomingNames): 01203 resolveIncomingNames(resolveIncomingNames) 01204 { 01205 hTerminate = CreateEvent(NULL, TRUE, FALSE, NULL); 01206 } 01207 01208 Hub::~Hub() 01209 { 01210 for (StreamsSet::iterator it = streams.begin(); it != streams.end(); ++it) 01211 delete *it; 01212 } 01213 01214 Stream* Hub::connect(const std::string &target) 01215 { 01216 std::string proto, params; 01217 size_t c = target.find_first_of(':'); 01218 if(c == std::string::npos) 01219 throw DashelException(DashelException::InvalidTarget, 0, "No protocol specified in target."); 01220 proto = target.substr(0, c); 01221 params = target.substr(c+1); 01222 01223 WaitableStream *s(dynamic_cast<WaitableStream*>(streamTypeRegistry.create(proto, target, *this))); 01224 if(!s) 01225 { 01226 std::string r = "Invalid protocol in target: "; 01227 r += proto; 01228 r += ", known protocol are: "; 01229 r += streamTypeRegistry.list(); 01230 throw DashelException(DashelException::InvalidTarget, 0, r.c_str()); 01231 } 01232 01233 streams.insert(s); 01234 if (proto != "tcpin") 01235 { 01236 dataStreams.insert(s); 01237 connectionCreated(s); 01238 } 01239 return s; 01240 } 01241 01242 void Hub::run() 01243 { 01244 while(step(-1)); 01245 } 01246 01247 bool Hub::step(const int timeout) 01248 { 01249 HANDLE hEvs[64] = { hTerminate }; 01250 WaitableStream *strs[64] = { NULL }; 01251 EvType ets[64] = { EvClosed }; 01252 01253 // Wait on all our events. 01254 DWORD ms = timeout >= 0 ? timeout : INFINITE; 01255 01256 // Loop in order to consume all events. 01257 do 01258 { 01259 DWORD hc = 1; 01260 01261 // Collect all events from all our streams. 01262 for(std::set<Stream*>::iterator it = streams.begin(); it != streams.end(); ++it) 01263 { 01264 WaitableStream* stream = polymorphic_downcast<WaitableStream*>(*it); 01265 for(std::map<EvType,HANDLE>::iterator ei = stream->hEvents.begin(); ei != stream->hEvents.end(); ++ei) 01266 { 01267 strs[hc] = stream; 01268 ets[hc] = ei->first; 01269 hEvs[hc] = ei->second; 01270 hc++; 01271 } 01272 } 01273 01274 DWORD r = WaitForMultipleObjects(hc, hEvs, FALSE, ms); 01275 01276 // Check for error or timeout. 01277 if (r == WAIT_FAILED) 01278 throw DashelException(DashelException::SyncError, 0, "Wait failed."); 01279 if (r == WAIT_TIMEOUT) 01280 return true; 01281 01282 // Look for what we got. 01283 r -= WAIT_OBJECT_0; 01284 if(r == 0) 01285 { 01286 // Quit 01287 ResetEvent(hTerminate); 01288 return false; 01289 } 01290 else 01291 { 01292 // Notify user that something happended. 01293 if(ets[r] == EvClosed) 01294 { 01295 try 01296 { 01297 connectionClosed(strs[r], false); 01298 } 01299 catch (DashelException e) { } 01300 closeStream(strs[r]); 01301 continue; 01302 } 01303 01304 // Notify the stream that its event arrived. 01305 strs[r]->notifyEvent(this, ets[r]); 01306 01307 // Notify user that something happended. 01308 if(ets[r] == EvData) 01309 { 01310 try 01311 { 01312 strs[r]->readDone = false; 01313 incomingData(strs[r]); 01314 } 01315 catch (DashelException e) { } 01316 if(!strs[r]->readDone) 01317 throw DashelException(DashelException::PreviousIncomingDataNotRead, 0, "Previous incoming data not read.", strs[r]); 01318 if(strs[r]->failed()) 01319 { 01320 connectionClosed(strs[r], true); 01321 closeStream(strs[r]); 01322 } 01323 } 01324 } 01325 01326 // No more timeouts on following rounds. 01327 ms = 0; 01328 } 01329 while(true); 01330 } 01331 01332 void Hub::lock() 01333 { 01334 } 01335 01336 void Hub::unlock() 01337 { 01338 } 01339 01340 void Hub::stop() 01341 { 01342 SetEvent(hTerminate); 01343 } 01344 01345 StreamTypeRegistry::StreamTypeRegistry() 01346 { 01347 reg("file", &createInstance<FileStream>); 01348 reg("stdin", &createInstance<StdinStream>); 01349 reg("stdout", &createInstance<StdoutStream>); 01350 reg("ser", &createInstance<SerialStream>); 01351 reg("tcpin", &createInstanceWithHub<SocketServerStream>); 01352 reg("tcp", &createInstance<SocketStream>); 01353 reg("udp", &createInstance<UDPSocketStream>); 01354 } 01355 01356 StreamTypeRegistry streamTypeRegistry; 01357 }