dashel-win32.cpp
Go to the documentation of this file.
00001 /*
00002         Dashel
00003         A cross-platform DAta Stream Helper Encatargetulation Library
00004         Copyright (C) 2007 -- 2012:
00005                 
00006                 Stephane Magnenat <stephane at magnenat dot net>
00007                         (http://stephane.magnenat.net)
00008                 Mobots group - Laboratory of Robotics Systems, EPFL, Lausanne
00009                         (http://mobots.epfl.ch)
00010                 
00011                 Sebastian Gerlach
00012                 Kenzan Technologies
00013                         (http://www.kenzantech.com)
00014         
00015         All rights reserved.
00016         
00017         Redistribution and use in source and binary forms, with or without
00018         modification, are permitted provided that the following conditions are met:
00019                 * Redistributions of source code must retain the above copyright
00020                   notice, this list of conditions and the following disclaimer.
00021                 * Redistributions in binary form must reproduce the above copyright
00022                   notice, this list of conditions and the following disclaimer in the
00023                   documentation and/or other materials provided with the distribution.
00024                 * Neither the names of "Mobots", "Laboratory of Robotics Systems", "EPFL",
00025                   "Kenzan Technologies" nor the names of the contributors may be used to
00026                   endorse or promote products derived from this software without specific
00027                   prior written permission.
00028         
00029         THIS SOFTWARE IS PROVIDED BY COPYRIGHT HOLDERS ``AS IS'' AND ANY
00030         EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
00031         WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
00032         DISCLAIMED. IN NO EVENT SHALL COPYRIGHT HOLDERS BE LIABLE FOR ANY
00033         DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
00034         (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00035         LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
00036         ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00037         (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00038         SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00039 */
00040 
00041 #include "dashel.h"
00042 
00043 #include <cassert>
00044 #include <cstdlib>
00045 #include <malloc.h>
00046 #include <map>
00047 #include <vector>
00048 #include <algorithm>
00049 #include <iostream>
00050 #include <sstream>
00051 
00052 #ifdef _MSC_VER
00053         #pragma comment(lib, "ws2_32.lib")
00054         #pragma comment(lib, "wbemuuid.lib")
00055         #pragma comment(lib, "comsuppw.lib")
00056 #endif // _MSC_VER
00057 
00058 #define _WIN32_WINNT 0x0501
00059 #include <winsock2.h>
00060 #include <windows.h>
00061 #include <setupapi.h>
00062 #include <devguid.h>
00063 #include <regstr.h>
00064 #include <winnls.h>
00065 
00066 #pragma warning(disable:4996)
00067 
00068 #include "dashel-private.h"
00069 
00073 namespace Dashel
00074 {
00076         typedef enum {
00077                 EvData,                         
00078                 EvPotentialData,        
00079                 EvClosed,                       
00080                 EvConnect,                      
00081         } EvType;
00082         
00084         template<typename Derived, typename Base>
00085         inline Derived polymorphic_downcast(Base base)
00086         {
00087                 Derived derived = dynamic_cast<Derived>(base);
00088                 assert(derived);
00089                 return derived;
00090         }
00091 
00092         void Stream::fail(DashelException::Source s, int se, const char* reason)
00093         {
00094                 char sysMessage[1024] = {0};
00095                 failedFlag = true;
00096 
00097                 if (se)
00098                         FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL, se, 0, sysMessage, 1024, NULL);
00099 
00100                 failReason = reason;
00101                 failReason += " ";
00102                 failReason += sysMessage;
00103                 
00104                 throw DashelException(s, se, failReason.c_str(), this);
00105         }
00106 
00107         // Serial port enumerator
00108 
00109         std::map<int, std::pair<std::string, std::string> > SerialPortEnumerator::getPorts()
00110         {
00111                 std::map<int, std::pair<std::string, std::string> > ports;
00112 
00113                 // Mainly based on http://support.microsoft.com/kb/259695
00114                 HDEVINFO hDevInfo;
00115                 SP_DEVINFO_DATA DeviceInfoData;
00116                 DWORD i;
00117                 char* co;
00118                 char dn[1024], dcn[1024];
00119                 
00120                 // Create a HDEVINFO with all present ports.
00121                 hDevInfo = SetupDiGetClassDevs(&GUID_DEVCLASS_PORTS, 0, 0, DIGCF_PRESENT );
00122 
00123                 if (hDevInfo == INVALID_HANDLE_VALUE)
00124                         throw DashelException(DashelException::EnumerationError, GetLastError(), "Cannot list serial port devices.");
00125 
00126                 // Enumerate through all devices in Set.
00127                 DeviceInfoData.cbSize = sizeof(SP_DEVINFO_DATA);
00128                 for (i = 0; SetupDiEnumDeviceInfo(hDevInfo, i, &DeviceInfoData); i++)
00129                 {
00130                         DWORD DataT;
00131                         LPTSTR buffer = NULL;
00132                         DWORD buffersize = 0;
00133 
00134                         // Call function with null to begin with, then use the returned buffer size (doubled) to Alloc the buffer. Keep calling until
00135                         // success or an unknown failure.
00136                         // Double the returned buffersize to correct for underlying legacy CM functions that return an incorrect buffersize value on 
00137                         // DBCS/MBCS systems.
00138                         while (!SetupDiGetDeviceRegistryPropertyW(hDevInfo, &DeviceInfoData, SPDRP_FRIENDLYNAME, &DataT, (PBYTE)buffer, buffersize, &buffersize))
00139                         {
00140                                 if (GetLastError() == ERROR_INSUFFICIENT_BUFFER)
00141                                 {
00142                                         // Change the buffer size.
00143                                         if (buffer)
00144                                                 LocalFree(buffer);
00145                                         // Double the size to avoid problems on 
00146                                         // W2k MBCS systems per KB 888609. 
00147                                         buffer = (LPTSTR)LocalAlloc(LPTR,buffersize * 2);
00148                                 }
00149                                 else
00150                                         throw DashelException(DashelException::EnumerationError, GetLastError(), "Cannot get serial port properties.");
00151                         }
00152 
00153                         WideCharToMultiByte(CP_UTF8, 0, (LPCWSTR)buffer, -1, dn, 1024, NULL, NULL);
00154 
00155                         // Filter to get only the COMx ports
00156                         if((co = strstr(dn, "(COM")))
00157                         {
00158                                 strcpy(dcn, co+1);
00159                                 strtok(dcn, ")");
00160 
00161                                 int v = atoi(&dcn[3]);
00162 
00163                                 if(v > 0 && v < 256)
00164                                 {
00165                                         std::string name = std::string("\\\\.\\").append(dcn);
00166                                         ports.insert(std::pair<int, std::pair<std::string, std::string> >(v, std::pair<std::string, std::string> (name, dn)));
00167                                 }
00168                         }
00169 
00170 
00171                         if (buffer)
00172                                 LocalFree(buffer);
00173                 }
00174 
00175                 // Error ?
00176                 if ( GetLastError()!=NO_ERROR && GetLastError()!=ERROR_NO_MORE_ITEMS )
00177                         throw DashelException(DashelException::EnumerationError, GetLastError(), "Error while enumerating serial port devices.");
00178 
00179                 // Cleanup
00180                 SetupDiDestroyDeviceInfoList(hDevInfo);
00181 
00182                 return ports;
00183         };
00184 
00185 
00187         void startWinSock()
00188         {
00189                 bool started = false;
00190                 if(!started)
00191                 {
00192                         WORD ver = 0x0101;
00193                         WSADATA d;
00194                         memset(&d, 0, sizeof(d));
00195 
00196                         int rv = WSAStartup(ver, &d);
00197                         if(rv)
00198                                 throw DashelException(DashelException::Unknown, rv, "Could not start WinSock service.");
00199                         started = true;
00200                 }
00201         }
00202 
00204         class WaitableStream: virtual public Stream
00205         {
00206         public:
00208 
00210                 std::map<EvType,HANDLE> hEvents; 
00211 
00213                 bool readDone;
00214                 
00215         protected:
00217                 HANDLE hEOF;
00218 
00220 
00222                 HANDLE createEvent(EvType t) 
00223                 {
00224                         HANDLE he = CreateEvent(NULL, FALSE, FALSE, NULL);
00225                         hEvents[t] = he;
00226                         return he;
00227                 }
00228 
00230 
00233                 void addEvent(EvType t, HANDLE he) 
00234                 {
00235                         hEvents[t] = he;
00236                 }
00237                 
00238         public:
00240                 WaitableStream(const std::string& protocolName) : Stream(protocolName)
00241                 {
00242                         hEOF = createEvent(EvClosed);
00243                 }
00244 
00246 
00248                 virtual ~WaitableStream()
00249                 {
00250                         for(std::map<EvType, HANDLE>::iterator it = hEvents.begin(); it != hEvents.end(); ++it)
00251                                 CloseHandle(it->second);
00252                 }
00253                 
00255 
00258                 virtual void notifyEvent(Hub *srv, EvType& t) { }
00259         };
00260 
00262 
00265         class SocketServerStream : public WaitableStream
00266         {
00267         protected:
00269                 SOCKET sock;
00270 
00272                 HANDLE hev;
00273 
00275                 const bool resolveIncomingNames;
00276 
00277         public:
00278 
00280                 SocketServerStream(const std::string& params, const Hub& hub) : Stream("tcpin"), WaitableStream("tcpin"),
00281                         resolveIncomingNames(hub.resolveIncomingNames)
00282                 { 
00283                         target.add("tcpin:port=5000;address=0.0.0.0");
00284                         target.add(params.c_str());
00285 
00286                         startWinSock();
00287 
00288                         IPV4Address bindAddress(target.get("address"), target.get<int>("port"));
00289                         
00290                         // Create socket.
00291                         sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
00292                         if (sock == SOCKET_ERROR)
00293                                 throw DashelException(DashelException::ConnectionFailed, WSAGetLastError(), "Cannot create socket.");
00294                         
00295                         // Reuse address.
00296                         int flag = 1;
00297                         if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (const char *)&flag, sizeof (flag)) < 0)
00298                                 throw DashelException(DashelException::ConnectionFailed, WSAGetLastError(), "Cannot set address reuse flag on socket, probably the port is already in use.");
00299                         
00300                         // Bind socket.
00301                         sockaddr_in addr;
00302                         addr.sin_family = AF_INET;
00303                         addr.sin_port = htons(bindAddress.port);
00304                         addr.sin_addr.s_addr = htonl(bindAddress.address);
00305                         if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) != 0)
00306                                 throw DashelException(DashelException::ConnectionFailed, WSAGetLastError(), "Cannot bind socket to port, probably the port is already in use.");
00307                         
00308                         // Listen on socket, backlog is sort of arbitrary.
00309                         if(listen(sock, 16) == SOCKET_ERROR)
00310                                 throw DashelException(DashelException::ConnectionFailed, WSAGetLastError(), "Cannot listen on socket.");
00311 
00312                         // Create and register event.
00313                         hev = createEvent(EvConnect);
00314                         WSAEventSelect(sock, hev, FD_ACCEPT);
00315                 }
00316 
00318                 ~SocketServerStream()
00319                 {
00320                         if(sock)
00321                                 closesocket(sock);
00322                 }
00323 
00325 
00328                 virtual void notifyEvent(Hub *srv, EvType& t) 
00329                 { 
00330                         if(t == EvConnect)
00331                         {
00332                                 // Accept incoming connection.
00333                                 struct sockaddr_in targetAddr;
00334                                 int l = sizeof (targetAddr);
00335                                 SOCKET trg = accept (sock, (struct sockaddr *)&targetAddr, &l);
00336                                 if (trg == SOCKET_ERROR)
00337                                 {
00338                                         fail(DashelException::ConnectionFailed, WSAGetLastError(), "Cannot accept incoming connection on socket.");
00339                                 }
00340                                 
00341                                 // create stream
00342                                 std::string ls = IPV4Address(ntohl(targetAddr.sin_addr.s_addr), ntohs(targetAddr.sin_port)).format(resolveIncomingNames);
00343                                 
00344                                 std::ostringstream buf;
00345                                 buf << ";connectionPort=";
00346                                 buf << atoi(getTargetParameter("port").c_str());
00347                                 buf << ";sock=";
00348                                 buf << (int)trg;
00349                                 ls.append(buf.str());
00350                                 srv->connect(ls);
00351                         }
00352                 }
00353 
00354                 virtual void write(const void *data, const size_t size) { }
00355                 virtual void flush() { }
00356                 virtual void read(void *data, size_t size) { }
00357         };
00358 
00360 
00363         class StdinStream : public WaitableStream
00364         {
00365         protected:
00367                 HANDLE hf;
00368 
00370                 HANDLE hev;
00371 
00372         public:
00373 
00375                 StdinStream(const std::string& params) : Stream("stdin"), WaitableStream("stdin")
00376                 { 
00377                         target.add(params.c_str());
00378 
00379                         if((hf = GetStdHandle(STD_INPUT_HANDLE)) == INVALID_HANDLE_VALUE)
00380                                 throw DashelException(DashelException::ConnectionFailed, GetLastError(), "Cannot open standard input.");
00381 
00382                         DWORD cm;
00383                         GetConsoleMode(hf, &cm);
00384                         cm &= ~(ENABLE_ECHO_INPUT | ENABLE_LINE_INPUT);
00385                         if(!SetConsoleMode(hf, cm))
00386                                 throw DashelException(DashelException::ConnectionFailed, GetLastError(), "Cannot change standard input mode to immediate.");
00387 
00388                         // Create events.
00389                         addEvent(EvPotentialData, hf);
00390                         hev = createEvent(EvData);
00391                 }
00392 
00394                 ~StdinStream()
00395                 {
00396                         CloseHandle(hf);
00397                 }
00398 
00400 
00402                 virtual void notifyEvent(Hub *srv, EvType& t) 
00403                 { 
00404                         DWORD n = 0;
00405                         if(GetNumberOfConsoleInputEvents(hf, &n))
00406                         {
00407                                 if(n > 0)
00408                                 {
00409                                         INPUT_RECORD ir;
00410                                         PeekConsoleInput(hf, &ir, 1, &n);
00411                                         if(ir.EventType != KEY_EVENT)
00412                                                 ReadConsoleInput(hf, &ir, 1, &n);
00413                                         else
00414                                         {
00415                                                 t = EvData;
00416                                         }
00417                                 }
00418                         }
00419                 }
00420 
00422                 virtual void write(const void *data, const size_t size)
00423                 { 
00424                         throw DashelException(DashelException::InvalidOperation, GetLastError(), "Cannot write to standard input.", this);
00425                 }
00426                 
00428                 virtual void flush() 
00429                 { 
00430                         throw DashelException(DashelException::InvalidOperation, GetLastError(), "Cannot flush standard input.", this);
00431                 }
00432                 
00433                 virtual void read(void *data, size_t size)
00434                 {
00435                         char *ptr = (char *)data;
00436                         DWORD left = (DWORD)size;
00437 
00438                         // Quick check to make sure nobody is giving us funny 64-bit stuff.
00439                         assert(left == size);
00440 
00441                         readDone = true;
00442 
00443                         while (left)
00444                         {
00445                                 DWORD len = 0;
00446                                 BOOL r;
00447 
00448                                 // Blocking write.
00449                                 if((r = ReadFile(hf, ptr, left, &len, NULL)) == 0)
00450                                 {
00451                                         fail(DashelException::IOError, GetLastError(), "Read error from standard input.");
00452                                 }
00453                                 else
00454                                 {
00455                                         ptr += len;
00456                                         left -= len;
00457                                 }
00458                         }
00459                 }
00460 
00461         };
00462 
00464         class StdoutStream : public WaitableStream
00465         {
00466         protected:
00468                 HANDLE hf;
00469 
00470         public:
00471 
00473                 StdoutStream(const std::string& params) : Stream("stdout"), WaitableStream("stdout")
00474                 { 
00475                         target.add(params.c_str());
00476 
00477                         if((hf = GetStdHandle(STD_OUTPUT_HANDLE)) == INVALID_HANDLE_VALUE)
00478                                 throw DashelException(DashelException::ConnectionFailed, GetLastError(), "Cannot open standard output.");
00479                 }
00480 
00482                 ~StdoutStream()
00483                 {
00484                         CloseHandle(hf);
00485                 }
00486 
00487                 virtual void write(const void *data, const size_t size)
00488                 {
00489                         const char *ptr = (const char *)data;
00490                         DWORD left = (DWORD)size;
00491 
00492                         // Quick check to make sure nobody is giving us funny 64-bit stuff.
00493                         assert(left == size);
00494 
00495                         while (left)
00496                         {
00497                                 DWORD len = 0;
00498                                 BOOL r;
00499 
00500                                 // Blocking write.
00501                                 if((r = WriteFile(hf, ptr, left, &len, NULL)) == 0)
00502                                 {
00503                                         fail(DashelException::IOError, GetLastError(), "Write error to standard output.");
00504                                 }
00505                                 else
00506                                 {
00507                                         ptr += len;
00508                                         left -= len;
00509                                 }
00510                         }
00511                 }
00512                 
00513                 virtual void flush()
00514                 {
00515                         FlushFileBuffers(hf);
00516                 }
00517                 
00518                 virtual void read(void *data, size_t size) 
00519                 { 
00520                         fail(DashelException::InvalidOperation, GetLastError(), "Cannot read from standard output.");
00521                 }
00522 
00523         };
00524 
00526         class FileStream : public WaitableStream
00527         {
00528         protected:
00530                 HANDLE hf;
00531 
00533                 OVERLAPPED ovl;
00534 
00536                 DWORD writeOffset;
00537 
00539 
00543                 bool readyToRead;
00544 
00546                 char readByte;
00547 
00549                 bool readByteAvailable;
00550 
00551         protected:
00553 
00555                 FileStream(const std::string& protocolName, bool dummy) : Stream(protocolName), WaitableStream(protocolName) { }
00556 
00558                 void startStream(EvType et = EvData)
00559                 {
00560                         readByteAvailable = false;
00561                         memset(&ovl, 0, sizeof(ovl));
00562                         ovl.hEvent = createEvent(et);
00563                         BOOL r = ReadFile(hf, &readByte, 1, NULL, &ovl);
00564                         if(!r)
00565                         {
00566                                 DWORD err = GetLastError();
00567                                 if(err != ERROR_IO_PENDING)
00568                                         throw DashelException(DashelException::IOError, GetLastError(), "Cannot read from file stream.");
00569                         }
00570                         else
00571                                 readByteAvailable = true;
00572                 }
00573 
00574         public:
00575 
00577                 FileStream(const std::string& params) : Stream("file"), WaitableStream("file")
00578                 { 
00579                         target.add("file:name;mode=read");
00580                         target.add(params.c_str());
00581                         std::string name = target.get("name");
00582                         std::string mode = target.get("mode");
00583 
00584                         hf = NULL;
00585                         if (mode == "read")
00586                         {
00587                                 hf = CreateFile(name.c_str(), GENERIC_READ, FILE_SHARE_READ, NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL);
00588                                 startStream();
00589                         }
00590                         else if (mode == "write")
00591                         {
00592                                 writeOffset = 0;
00593                                 hf = CreateFile(name.c_str(), GENERIC_WRITE, FILE_SHARE_READ | FILE_SHARE_WRITE, NULL, CREATE_ALWAYS, FILE_FLAG_OVERLAPPED, NULL);
00594                         }
00595                         else if (mode == "readwrite") 
00596                         {
00597                                 writeOffset = 0;
00598                                 hf = CreateFile(name.c_str(), GENERIC_WRITE | GENERIC_READ, FILE_SHARE_READ | FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, FILE_FLAG_OVERLAPPED, NULL);
00599                                 startStream();
00600                         }
00601                         if(hf == INVALID_HANDLE_VALUE)
00602                                 throw DashelException(DashelException::ConnectionFailed, GetLastError(), "Cannot open file.");
00603                 }
00604 
00606                 ~FileStream()
00607                 {
00608                         CloseHandle(hf);
00609                 }
00610                 
00611                 virtual void write(const void *data, const size_t size)
00612                 {
00613                         const char *ptr = (const char *)data;
00614                         const unsigned int RETRY_LIMIT = 3;
00615                         DWORD left = (DWORD)size;
00616                         unsigned int retry = 0;
00617 
00618                         // Quick check to make sure nobody is giving us funny 64-bit stuff.
00619                         assert(left == size);
00620 
00621                         while (left)
00622                         {
00623                                 DWORD len = 0;
00624                                 OVERLAPPED o;
00625                                 memset(&o, 0, sizeof(o));
00626 
00627                                 o.Offset = writeOffset;
00628                                 o.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
00629 
00630                                 // Blocking write.
00631                                 BOOL r = WriteFile(hf, ptr, left, &len, &o);
00632                                 if(!r)
00633                                 {
00634                                         DWORD err;
00635                                         switch((err = GetLastError()))
00636                                         {
00637                                         case ERROR_IO_PENDING:
00638                                                 GetOverlappedResult(hf, &o, &len, TRUE);
00639                                                 if (len == 0)
00640                                                 {
00641                                                         if (retry++ >= RETRY_LIMIT)
00642                                                         {
00643                                                                 SetEvent(hEOF);
00644                                                                 fail(DashelException::IOError, GetLastError(), "Cannot write to file (max retry reached).");
00645                                                         }
00646                                                         else
00647                                                                 continue;
00648                                                 }
00649                                                 ptr += len;
00650                                                 left -= len;
00651                                                 break;
00652 
00653                                         default:
00654                                                 fail(DashelException::IOError, GetLastError(), "Cannot write to file.");
00655                                                 break;
00656                                         }
00657                                 }
00658                                 else
00659                                 {
00660                                         ptr += len;
00661                                         left -= len;
00662                                 }
00663                         
00664                                 writeOffset += len;     
00665                         }
00666                 }
00667                 
00668                 virtual void flush()
00669                 {
00670                         FlushFileBuffers(hf);
00671                 }
00672                 
00673                 virtual void read(void *data, size_t size)
00674                 {
00675                         char *ptr = (char *)data;
00676                         DWORD left = (DWORD)size;
00677 
00678                         // Quick check to make sure nobody is giving us funny 64-bit stuff.
00679                         assert(left == size);
00680                         
00681                         if (size == 0)
00682                                 return;
00683 
00684                         readDone = true;
00685 
00686                         if(!readByteAvailable)
00687                                 WaitForSingleObject(ovl.hEvent, INFINITE);
00688 
00689                         DWORD dataUsed;
00690                         if(!GetOverlappedResult(hf, &ovl, &dataUsed, TRUE))
00691                                 fail(DashelException::IOError, GetLastError(), "File read I/O error.");
00692 
00693                         if(dataUsed)
00694                         {
00695                                 *ptr++ = readByte;
00696                                 left--;
00697                         }
00698                         readByteAvailable = false;
00699 
00700                         while (left)
00701                         {
00702                                 DWORD len = 0;
00703                                 OVERLAPPED o;
00704                                 memset(&o, 0, sizeof(o));
00705                                 o.Offset = ovl.Offset + size - left;
00706                                 o.hEvent = ovl.hEvent ;
00707 
00708                                 // Non-blocking read.
00709                                 BOOL r = ReadFile(hf, ptr, left, &len, &o);
00710                                 if(!r)
00711                                 {
00712                                         DWORD err;
00713                                         switch((err = GetLastError()))
00714                                         {
00715                                         case ERROR_HANDLE_EOF:
00716                                                 fail(DashelException::ConnectionLost, GetLastError(), "Reached end of file.");
00717                                                 break;
00718 
00719                                         case ERROR_IO_PENDING:
00720                                                 WaitForSingleObject(ovl.hEvent, INFINITE);
00721                                                 if(!GetOverlappedResult(hf, &o, &len, TRUE))
00722                                                         fail(DashelException::IOError, GetLastError(), "File read I/O error.");
00723                                                 if(len == 0)
00724                                                         return;
00725 
00726                                                 ptr += len;
00727                                                 left -= len;
00728                                                 break;
00729 
00730                                         default:
00731                                                 fail(DashelException::IOError, GetLastError(), "File read I/O error.");
00732                                                 break;
00733                                         }
00734                                 }
00735                                 else
00736                                 {
00737                                         WaitForSingleObject(ovl.hEvent, INFINITE);
00738                                         ptr += len;
00739                                         left -= len;
00740                                 }
00741                         }
00742 
00743                         // Reset our blocking read for whatever is up next.
00744                         ovl.Offset += (DWORD)size;
00745                         BOOL r = ReadFile(hf, &readByte, 1, &dataUsed, &ovl);
00746                         if(!r)
00747                         {
00748                                 DWORD err = GetLastError();
00749                                 if(err == ERROR_HANDLE_EOF)
00750                                 {
00751                                         SetEvent(hEOF);
00752                                 }
00753                                 else if(err != ERROR_IO_PENDING)
00754                                 {
00755                                         fail(DashelException::IOError, GetLastError(), "Cannot read from file stream.");
00756                                 }
00757                         }
00758                         else
00759                                 readByteAvailable = true;
00760 
00761                 }
00762 
00764 
00766                 virtual void notifyEvent(Hub *srv, EvType& t) 
00767                 { 
00768                         if(t == EvPotentialData)
00769                         {
00770                                 DWORD dataUsed;
00771                                 GetOverlappedResult(hf, &ovl, &dataUsed, TRUE);
00772                                 if(dataUsed == 0)
00773                                         ReadFile(hf, &readByte, 1, NULL, &ovl);
00774                                 else
00775                                 {
00776                                         readByteAvailable = true;
00777                                         t = EvData;
00778                                 }
00779                         }
00780                 }
00781         };
00782 
00784         class SerialStream : public FileStream
00785         {
00786         private:
00788 
00794                 bool buildDCB(HANDLE sp, int speed, int bits, const std::string& parity, const std::string& stopbits, const std::string& fc)
00795                 {
00796                         DCB dcb;
00797 
00798                         memset(&dcb, 0, sizeof(dcb));
00799                         dcb.DCBlength = sizeof(dcb);
00800 
00801                         if(!GetCommState(sp, &dcb))
00802                                 throw DashelException(DashelException::ConnectionFailed, GetLastError(), "Cannot read current serial port state.", this);
00803 
00804                         // Fill in the DCB
00805                         memset(&dcb,0,sizeof(dcb));
00806                         dcb.DCBlength = sizeof(dcb);
00807                         if(fc == "hard")
00808                         {
00809                                 dcb.fOutxCtsFlow = TRUE;
00810                                 dcb.fRtsControl = RTS_CONTROL_HANDSHAKE;
00811                         }
00812                         else
00813                         {
00814                                 dcb.fOutxCtsFlow = FALSE;
00815                                 dcb.fRtsControl = RTS_CONTROL_DISABLE;
00816                         }
00817 
00818                         dcb.fOutxDsrFlow = FALSE;
00819                         dcb.fDtrControl = DTR_CONTROL_ENABLE;
00820                         dcb.fDsrSensitivity = FALSE;
00821                         dcb.fBinary = TRUE;
00822                         dcb.fParity = TRUE;
00823                         dcb.BaudRate = speed;
00824                         dcb.ByteSize = bits;
00825                         if(parity == "even")
00826                                 dcb.Parity = EVENPARITY;
00827                         else if(parity == "odd")
00828                                 dcb.Parity = ODDPARITY;
00829                         else if(parity == "space")
00830                                 dcb.Parity = SPACEPARITY;
00831                         else if(parity == "mark")
00832                                 dcb.Parity = MARKPARITY;
00833                         else
00834                                 dcb.Parity = NOPARITY;
00835 
00836                         if(stopbits == "1.5")
00837                                 dcb.StopBits = ONE5STOPBITS;
00838                         else if(stopbits == "2")
00839                                 dcb.StopBits = TWOSTOPBITS;
00840                         else 
00841                                 dcb.StopBits = ONESTOPBIT;
00842 
00843                         // Set the com port state.
00844                         if(!SetCommState(sp, &dcb))
00845                                 throw DashelException(DashelException::ConnectionFailed, GetLastError(), "Cannot set new serial port state.", this);
00846 
00847                         // Set timeouts as well for good measure. Since we will effectively be woken whenever
00848                         // this happens, keep it long.
00849                         COMMTIMEOUTS cto;
00850                         memset(&cto, 0, sizeof(cto));
00851                         cto.ReadIntervalTimeout = 100000;
00852                         cto.ReadTotalTimeoutConstant = 100000;
00853                         cto.ReadTotalTimeoutMultiplier = 100000;
00854                         cto.WriteTotalTimeoutConstant = 100000;
00855                         cto.WriteTotalTimeoutMultiplier = 100000;
00856                         if(!SetCommTimeouts(sp, &cto))
00857                                 throw DashelException(DashelException::ConnectionFailed, GetLastError(), "Cannot set new serial port timeouts.", this);
00858 
00859                         return true;
00860                 }
00861 
00862         public:
00864 
00866                 SerialStream(const std::string& params) : Stream("ser"), FileStream("ser", true)
00867                 { 
00868                         target.add("ser:port=1;baud=115200;stop=1;parity=none;fc=none;bits=8");
00869                         target.add(params.c_str());
00870 
00871                         std::string devName;
00872                         if (target.isSet("device"))
00873                         {
00874                                 target.addParam("device", NULL, true);
00875                                 target.erase("port");
00876                                 
00877                                 devName = target.get("device");
00878                         }
00879                         else if (target.isSet("name"))
00880                         {
00881                                 target.addParam("name", NULL, true);
00882                                 target.erase("port");
00883                                 target.erase("device");
00884 
00885                                 // Enumerates the ports
00886                                 std::string name = target.get("name");
00887                                 std::map<int, std::pair<std::string, std:: string> > ports = SerialPortEnumerator::getPorts();
00888 
00889                                 // Iterate on all ports to found one with "name" in its description
00890                                 std::map<int, std::pair<std::string, std:: string> >::iterator it;
00891                                 for (it = ports.begin(); it != ports.end(); it++)
00892                                 {
00893                                         if (it->second.second.find(name) != std::string::npos)
00894                                         {
00895                                                 devName = it->second.first;
00896                                                 std::cout << "Found " << name << " on port " << devName << std::endl;
00897                                                 break;
00898                                         }
00899                                 }
00900                                 if (devName.size() == 0)
00901                                         throw DashelException(DashelException::ConnectionFailed, 0, "The specified name could not be find among the serial ports.");
00902                         }
00903                         else
00904                         {
00905                                 target.erase("device");
00906                                 
00907                                 devName = std::string("\\\\.\\COM").append(target.get("port"));
00908                         }
00909 
00910                         hf = CreateFile(devName.c_str(), GENERIC_WRITE | GENERIC_READ, 0, NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL);
00911                         if(hf == INVALID_HANDLE_VALUE)
00912                                 throw DashelException(DashelException::ConnectionFailed, GetLastError(), "Cannot open serial port.");
00913 
00914                         buildDCB(hf, target.get<int>("baud"), target.get<int>("bits"), target.get("parity"), target.get("stop"), target.get("fc"));
00915 
00916                         startStream(EvPotentialData);
00917                 }
00918         };
00919 
00921         class SocketStream : public WaitableStream
00922         {
00924                 SOCKET sock;
00925 
00927                 HANDLE hev;
00928 
00930                 HANDLE hev2;
00931 
00933 
00937                 bool readyToRead;
00938 
00940                 char readByte;
00941 
00943                 bool readByteAvailable;
00944 
00945         public:
00947 
00949                 SocketStream(const std::string& params) : Stream("tcp"), WaitableStream("tcp")
00950                 { 
00951                         target.add("tcp:host;port;connectionPort=-1;sock=0");
00952                         target.add(params.c_str());
00953 
00954                         sock = target.get<SOCKET>("sock");
00955                         if(!sock)
00956                         {
00957                                 startWinSock();
00958 
00959                                 // create socket
00960                                 sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
00961                                 if (sock == SOCKET_ERROR)
00962                                         throw DashelException(DashelException::ConnectionFailed, WSAGetLastError(), "Cannot create socket.");
00963                         
00964                                 IPV4Address remoteAddress(target.get("host"), target.get<int>("port"));
00965                                 // connect
00966                                 sockaddr_in addr;
00967                                 addr.sin_family = AF_INET;
00968                                 addr.sin_port = htons(remoteAddress.port);
00969                                 addr.sin_addr.s_addr = htonl(remoteAddress.address);
00970                                 if (connect(sock, (struct sockaddr *)&addr, sizeof(addr)) != 0)
00971                                         throw DashelException(DashelException::ConnectionFailed, WSAGetLastError(), "Cannot connect to remote host.");
00972                                 
00973                                 // overwrite target name with a canonical one
00974                                 target.add(remoteAddress.format().c_str());
00975                                 target.erase("connectionPort");
00976                         }
00977                         else
00978                         {
00979                                 // remove file descriptor information from target name
00980                                 target.erase("sock");
00981                         }
00982 
00983                         hev2 = createEvent(EvData);
00984                         hev = createEvent(EvPotentialData);
00985 
00986                         int rv = WSAEventSelect(sock, hev, FD_READ | FD_CLOSE);
00987                         if (rv == SOCKET_ERROR)
00988                                 throw DashelException(DashelException::ConnectionFailed, WSAGetLastError(), "Cannot select socket events.");
00989 
00990                         readyToRead = false;
00991                         readByteAvailable = false;
00992                 }
00993 
00994                 ~SocketStream()
00995                 {
00996                         shutdown(sock, SD_BOTH);
00997                         closesocket(sock);
00998                 }
00999 
01001 
01003                 virtual void notifyEvent(Hub *srv, EvType& t) 
01004                 { 
01005                         if(t == EvPotentialData)
01006                         {
01007                                 if(readByteAvailable)
01008                                         return;
01009 
01010                                 int rv = recv(sock, &readByte, 1, 0);
01011                                 if(rv <= 0)
01012                                 {
01013                                         t = EvClosed;
01014                                 }
01015                                 else
01016                                 {
01017                                         readByteAvailable = true;
01018                                         readyToRead = true;
01019                                         t = EvData;
01020                                 }
01021                         }
01022                 }
01023 
01024                 virtual void write(const void *data, const size_t size)
01025                 {
01026                         char *ptr = (char *)data;
01027                         size_t left = size;
01028                         
01029                         while (left)
01030                         {
01031                                 int len = send(sock, ptr, (int)left, 0);
01032                                 
01033                                 if (len == SOCKET_ERROR)
01034                                 {
01035                                         fail(DashelException::ConnectionLost, GetLastError(), "Connection lost on write.");
01036                                 }
01037                                 else
01038                                 {
01039                                         ptr += len;
01040                                         left -= len;
01041                                 }
01042                         }
01043                 }
01044                 
01045                 virtual void flush() { }
01046                 
01047                 virtual void read(void *data, size_t size)
01048                 {
01049                         char *ptr = (char *)data;
01050                         size_t left = size;
01051                         
01052                         if (size == 0)
01053                                 return;
01054 
01055                         readDone = true;
01056 
01057                         //std::cerr << "ready to read " << readyToRead << std::endl;
01058                         if(!readyToRead)
01059                         {
01060                                 // Block until something happens.
01061                                 WaitForSingleObject(hev, INFINITE);
01062                         }
01063                         readyToRead = false;
01064 
01065                         if(readByteAvailable)
01066                         {
01067                                 *ptr++ = readByte;
01068                                 readByteAvailable = false;
01069                                 left--;
01070                                 if(left)
01071                                         WaitForSingleObject(hev, INFINITE);
01072                         }
01073                         
01074                         while (left)
01075                         {
01076                                 //std::cerr << "ready to recv " << std::endl;
01077                                 int len = recv(sock, ptr, (int)left, 0);
01078                                 //std::cerr << "recv done " << std::endl;
01079                                 
01080                                 if (len == SOCKET_ERROR)
01081                                 {
01082                                         //std::cerr << "socket error" << std::endl;
01083                                         fail(DashelException::ConnectionLost, GetLastError(), "Connection lost on read.");
01084                                 }
01085                                 else if(len == 0)
01086                                 {
01087                                         // We have been disconnected.
01088                                 }
01089                                 else
01090                                 {
01091                                         ptr += len;
01092                                         left -= len;
01093                                 }
01094                                 if(left)
01095                                 {
01096                                         // Wait for more data.
01097                                         WaitForSingleObject(hev, INFINITE);
01098                                 }
01099                         }
01100 
01101 /*                      int rv = WSAEventSelect(sock, hev, FD_READ | FD_CLOSE);
01102                         if (rv == SOCKET_ERROR)
01103                                 throw DashelException(DashelException::ConnectionFailed, WSAGetLastError(), "Cannot select socket events.");
01104 */
01105                 }
01106         };
01107 
01109         class UDPSocketStream: public MemoryPacketStream, public WaitableStream
01110         {
01111         private:
01113                 SOCKET sock;
01114 
01116                 HANDLE hev;
01117 
01118         public:
01120                 UDPSocketStream(const std::string& targetName) :
01121                         Stream("udp"),
01122                         MemoryPacketStream("udp"),
01123                         WaitableStream("udp")
01124                 {
01125                         target.add("udp:port=5000;address=0.0.0.0;sock=0");
01126                         target.add(targetName.c_str());
01127                         
01128                         sock = target.get<SOCKET>("sock");
01129                         if(!sock)
01130                         {
01131                                 startWinSock();
01132                                 
01133                                 // create socket
01134                                 sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
01135                                 if (sock == SOCKET_ERROR)
01136                                         throw DashelException(DashelException::ConnectionFailed, WSAGetLastError(), "Cannot create socket.");
01137                                 
01138                                 IPV4Address bindAddress(target.get("address"), target.get<int>("port"));
01139                                 
01140                                 // bind
01141                                 sockaddr_in addr;
01142                                 addr.sin_family = AF_INET;
01143                                 addr.sin_port = htons(bindAddress.port);
01144                                 addr.sin_addr.s_addr = htonl(bindAddress.address);
01145                                 if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) != 0)
01146                                         throw DashelException(DashelException::ConnectionFailed, WSAGetLastError(), "Cannot bind socket to port, probably the port is already in use.");
01147                         }
01148                         else
01149                         {
01150                                 // remove file descriptor information from target name
01151                                 target.erase("sock");
01152                         }
01153                         
01154                         // enable broadcast
01155                         int broadcastPermission = 1;
01156                         setsockopt(sock, SOL_SOCKET, SO_BROADCAST, (const char*)&broadcastPermission, sizeof(broadcastPermission));
01157                         
01158                         // Create and register event.
01159                         hev = createEvent(EvData);
01160                         
01161                         int rv = WSAEventSelect(sock, hev, FD_READ);
01162                         if (rv == SOCKET_ERROR)
01163                                 throw DashelException(DashelException::ConnectionFailed, WSAGetLastError(), "Cannot select socket events.");
01164                 }
01165 
01166                 virtual ~UDPSocketStream()
01167                 {
01168                         closesocket(sock);
01169                 }
01170 
01171                 virtual void send(const IPV4Address& dest)
01172                 {
01173                         sockaddr_in addr;
01174                         addr.sin_family = AF_INET;
01175                         addr.sin_port = htons(dest.port);;
01176                         addr.sin_addr.s_addr = htonl(dest.address);
01177                         
01178                         if (sendto(sock, (const char*)sendBuffer.get(), sendBuffer.size(), 0, (struct sockaddr *)&addr, sizeof(addr)) != sendBuffer.size())
01179                                 fail(DashelException::IOError, WSAGetLastError(), "UDP Socket write I/O error.");
01180                         
01181                         sendBuffer.clear();
01182                 }
01183                 
01184                 virtual void receive(IPV4Address& source)
01185                 {
01186                         unsigned char buf[4006];
01187                         sockaddr_in addr;
01188                         int addrLen = sizeof(addr);
01189                         readDone = true;
01190 
01191                         int recvCount = recvfrom(sock, (char*)buf, 4096, 0, (struct sockaddr *)&addr, &addrLen);
01192                         if (recvCount <= 0)
01193                                 fail(DashelException::ConnectionLost, WSAGetLastError(), "UDP Socket read I/O error.");
01194                         
01195                         receptionBuffer.resize(recvCount);
01196                         std::copy(buf, buf+recvCount, receptionBuffer.begin());
01197                         
01198                         source = IPV4Address(ntohl(addr.sin_addr.s_addr), ntohs(addr.sin_port));
01199                 }
01200         };
01201 
01202         Hub::Hub(const bool resolveIncomingNames):
01203                 resolveIncomingNames(resolveIncomingNames)
01204         {
01205                 hTerminate = CreateEvent(NULL, TRUE, FALSE, NULL);
01206         }
01207         
01208         Hub::~Hub()
01209         {
01210                 for (StreamsSet::iterator it = streams.begin(); it != streams.end(); ++it)
01211                         delete *it;
01212         }
01213         
01214         Stream* Hub::connect(const std::string &target)
01215         {
01216                 std::string proto, params;
01217                 size_t c = target.find_first_of(':');
01218                 if(c == std::string::npos)
01219                         throw DashelException(DashelException::InvalidTarget, 0, "No protocol specified in target.");
01220                 proto = target.substr(0, c);
01221                 params = target.substr(c+1);
01222 
01223                 WaitableStream *s(dynamic_cast<WaitableStream*>(streamTypeRegistry.create(proto, target, *this)));
01224                 if(!s)
01225                 {
01226                         std::string r = "Invalid protocol in target: ";
01227                         r += proto;
01228                         r += ", known protocol are: ";
01229                         r += streamTypeRegistry.list();
01230                         throw DashelException(DashelException::InvalidTarget, 0, r.c_str());
01231                 }
01232                 
01233                 streams.insert(s);
01234                 if (proto != "tcpin")
01235                 {
01236                         dataStreams.insert(s);
01237                         connectionCreated(s);
01238                 }
01239                 return s;
01240         }
01241         
01242         void Hub::run()
01243         {
01244                 while(step(-1));
01245         }
01246         
01247         bool Hub::step(const int timeout)
01248         {
01249                 HANDLE hEvs[64] = { hTerminate };
01250                 WaitableStream *strs[64] = { NULL };
01251                 EvType ets[64] = { EvClosed };
01252                 
01253                 // Wait on all our events.
01254                 DWORD ms = timeout >= 0 ? timeout : INFINITE;
01255 
01256                 // Loop in order to consume all events.
01257                 do
01258                 {
01259                         DWORD hc = 1;
01260 
01261                         // Collect all events from all our streams.
01262                         for(std::set<Stream*>::iterator it = streams.begin(); it != streams.end(); ++it)
01263                         {
01264                                 WaitableStream* stream = polymorphic_downcast<WaitableStream*>(*it);
01265                                 for(std::map<EvType,HANDLE>::iterator ei = stream->hEvents.begin(); ei != stream->hEvents.end(); ++ei)
01266                                 {
01267                                         strs[hc] = stream;
01268                                         ets[hc] = ei->first;
01269                                         hEvs[hc] = ei->second;
01270                                         hc++;
01271                                 }
01272                         }
01273 
01274                         DWORD r = WaitForMultipleObjects(hc, hEvs, FALSE, ms);
01275                         
01276                         // Check for error or timeout.
01277                         if (r == WAIT_FAILED)
01278                                 throw DashelException(DashelException::SyncError, 0, "Wait failed.");
01279                         if (r == WAIT_TIMEOUT)
01280                                 return true;
01281 
01282                         // Look for what we got.
01283                         r -= WAIT_OBJECT_0;
01284                         if(r == 0)
01285                         {
01286                                 // Quit
01287                                 ResetEvent(hTerminate);
01288                                 return false;
01289                         }
01290                         else 
01291                         {
01292                                 // Notify user that something happended.
01293                                 if(ets[r] == EvClosed)
01294                                 {
01295                                         try
01296                                         {
01297                                                 connectionClosed(strs[r], false);
01298                                         }
01299                                         catch (DashelException e) { }
01300                                         closeStream(strs[r]);
01301                                         continue;
01302                                 }
01303 
01304                                 // Notify the stream that its event arrived.
01305                                 strs[r]->notifyEvent(this, ets[r]);
01306 
01307                                 // Notify user that something happended.
01308                                 if(ets[r] == EvData)
01309                                 {
01310                                         try
01311                                         {
01312                                                 strs[r]->readDone = false;
01313                                                 incomingData(strs[r]);
01314                                         }
01315                                         catch (DashelException e) { }
01316                                         if(!strs[r]->readDone)
01317                                                 throw DashelException(DashelException::PreviousIncomingDataNotRead, 0, "Previous incoming data not read.", strs[r]);
01318                                         if(strs[r]->failed())
01319                                         {
01320                                                 connectionClosed(strs[r], true);
01321                                                 closeStream(strs[r]);
01322                                         }
01323                                 }
01324                         }
01325 
01326                         // No more timeouts on following rounds.
01327                         ms = 0;
01328                 }
01329                 while(true);
01330         }
01331         
01332         void Hub::lock()
01333         {
01334         }
01335         
01336         void Hub::unlock()
01337         {
01338         }
01339 
01340         void Hub::stop()
01341         {
01342                 SetEvent(hTerminate);
01343         }
01344 
01345         StreamTypeRegistry::StreamTypeRegistry()
01346         {
01347                 reg("file", &createInstance<FileStream>);
01348                 reg("stdin", &createInstance<StdinStream>);
01349                 reg("stdout", &createInstance<StdoutStream>);
01350                 reg("ser", &createInstance<SerialStream>);
01351                 reg("tcpin", &createInstanceWithHub<SocketServerStream>);
01352                 reg("tcp", &createInstance<SocketStream>);
01353                 reg("udp", &createInstance<UDPSocketStream>);
01354         }
01355         
01356         StreamTypeRegistry streamTypeRegistry;
01357 }


dashel
Author(s): Stéphane Magnenat
autogenerated on Sun Oct 5 2014 23:46:32