00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041 #include "dashel.h"
00042 #include "dashel-private.h"
00043 #include <algorithm>
00044
00045 #include <ostream>
00046 #include <sstream>
00047 #ifndef WIN32
00048 #include <netdb.h>
00049 #include <sys/socket.h>
00050 #include <arpa/inet.h>
00051 #else
00052 #include <winsock2.h>
00053 #endif
00054
00059 namespace Dashel
00060 {
00061 using namespace std;
00062
00063
00064 ExpandableBuffer::ExpandableBuffer(size_t size) :
00065 _data((unsigned char*)malloc(size)),
00066 _size(size),
00067 _pos(0)
00068 {
00069 }
00070
00071 ExpandableBuffer::~ExpandableBuffer()
00072 {
00073 free(_data);
00074 }
00075
00076 void ExpandableBuffer::clear()
00077 {
00078 _pos = 0;
00079 }
00080
00081 void ExpandableBuffer::add(const void* data, const size_t size)
00082 {
00083 if (_pos + size > _size)
00084 {
00085 _size = max(_size * 2, _size + size);
00086 _data = (unsigned char*)realloc(_data, _size);
00087 }
00088 memcpy(_data + _pos, (unsigned char *)data, size);
00089 _pos += size;
00090 }
00091
00092
00093 DashelException::DashelException(Source s, int se, const char *reason, Stream* stream) :
00094 std::runtime_error(reason),
00095 source(s),
00096 sysError(se),
00097 stream(stream)
00098 {
00099
00100 }
00101
00102 IPV4Address::IPV4Address(unsigned addr, unsigned short prt)
00103 {
00104 address = addr;
00105 port = prt;
00106 }
00107
00108 IPV4Address::IPV4Address(const std::string& name, unsigned short port) :
00109 port(port)
00110 {
00111 hostent *he = gethostbyname(name.c_str());
00112
00113 if (he == NULL)
00114 {
00115 #ifndef WIN32
00116 struct in_addr addr;
00117 if (inet_aton(name.c_str(), &addr))
00118 {
00119 address = ntohl(addr.s_addr);
00120 }
00121 else
00122 {
00123 address = INADDR_ANY;
00124 }
00125 #else // WIN32
00126 unsigned long addr = inet_addr(name.c_str());
00127 if(addr != INADDR_NONE)
00128 address = addr;
00129 else
00130 address = INADDR_ANY;
00131 #endif // WIN32
00132 }
00133 else
00134 {
00135 #ifndef WIN32
00136 address = ntohl(*((unsigned *)he->h_addr));
00137 #else
00138 address = ntohl(*((unsigned *)he->h_addr));
00139 #endif
00140 }
00141 }
00142
00143 bool IPV4Address::operator==(const IPV4Address& o) const
00144 {
00145 return address==o.address && port==o.port;
00146 }
00147
00148 bool IPV4Address::operator<(const IPV4Address& o) const
00149 {
00150 return address<o.address || (address==o.address && port<o.port);
00151 }
00152
00153 std::string IPV4Address::hostname() const
00154 {
00155 unsigned a2 = htonl(address);
00156 struct hostent *he = gethostbyaddr((const char *)&a2, 4, AF_INET);
00157
00158 if (he == NULL)
00159 {
00160 struct in_addr addr;
00161 addr.s_addr = a2;
00162 return std::string(inet_ntoa(addr));
00163 }
00164 else
00165 {
00166 return std::string(he->h_name);
00167 }
00168 }
00169
00170 std::string IPV4Address::format(const bool resolveName) const
00171 {
00172 std::ostringstream buf;
00173 unsigned a2 = htonl(address);
00174
00175 if (resolveName)
00176 {
00177 struct hostent *he = gethostbyaddr((const char *)&a2, 4, AF_INET);
00178 if (he != NULL)
00179 {
00180 buf << "tcp:host=" << he->h_name << ";port=" << port;
00181 return buf.str();
00182 }
00183 }
00184
00185 struct in_addr addr;
00186 addr.s_addr = a2;
00187 buf << "tcp:host=" << inet_ntoa(addr) << ";port=" << port;
00188 return buf.str();
00189 }
00190
00191 void ParameterSet::add(const char *line)
00192 {
00193 char *lc = strdup(line);
00194 int spc = 0;
00195 char *param;
00196 bool storeParams = (params.size() == 0);
00197 char *protocolName = strtok(lc, ":");
00198
00199
00200 assert(protocolName);
00201
00202 while((param = strtok(NULL, ";")) != NULL)
00203 {
00204 char *sep = strchr(param, '=');
00205 if(sep)
00206 {
00207 *sep++ = 0;
00208 values[param] = sep;
00209 if (storeParams)
00210 params.push_back(param);
00211 }
00212 else
00213 {
00214 if (storeParams)
00215 params.push_back(param);
00216 values[params[spc]] = param;
00217 }
00218 ++spc;
00219 }
00220
00221 free(lc);
00222 }
00223
00224 void ParameterSet::addParam(const char *param, const char *value, bool atStart)
00225 {
00226 if (atStart)
00227 params.insert(params.begin(), 1, param);
00228 else
00229 params.push_back(param);
00230
00231 if (value)
00232 values[param] = value;
00233 }
00234
00235 bool ParameterSet::isSet(const char *key) const
00236 {
00237 return (values.find(key) != values.end());
00238 }
00239
00240 const std::string& ParameterSet::get(const char *key) const
00241 {
00242 std::map<std::string, std::string>::const_iterator it = values.find(key);
00243 if(it == values.end())
00244 {
00245 std::string r = std::string("Parameter missing: ").append(key);
00246 throw DashelException(DashelException::InvalidTarget, 0, r.c_str());
00247 }
00248 return it->second;
00249 }
00250
00251 std::string ParameterSet::getString() const
00252 {
00253 std::ostringstream oss;
00254 std::vector<std::string>::const_iterator i = params.begin();
00255 while (i != params.end())
00256 {
00257 oss << *i << "=" << values.find(*i)->second;
00258 if (++i == params.end())
00259 break;
00260 oss << ";";
00261 }
00262 return oss.str();
00263 }
00264
00265 void ParameterSet::erase(const char *key)
00266 {
00267 std::vector<std::string>::iterator i = std::find(params.begin(), params.end(), key);
00268 if (i != params.end())
00269 params.erase(i);
00270
00271 std::map<std::string, std::string>::iterator j = values.find(key);
00272 if (j != values.end())
00273 values.erase(j);
00274 }
00275
00276
00277 void MemoryPacketStream::write(const void *data, const size_t size)
00278 {
00279 sendBuffer.add(data, size);
00280 }
00281
00282 void MemoryPacketStream::read(void *data, size_t size)
00283 {
00284 if (size > receptionBuffer.size())
00285 fail(DashelException::IOError, 0, "Attempt to read past available data");
00286
00287 unsigned char* ptr = (unsigned char*)data;
00288 std::copy(receptionBuffer.begin(), receptionBuffer.begin() + size, ptr);
00289 receptionBuffer.erase(receptionBuffer.begin(), receptionBuffer.begin() + size);
00290 }
00291
00292 void Hub::closeStream(Stream* stream)
00293 {
00294 streams.erase(stream);
00295 dataStreams.erase(stream);
00296 delete stream;
00297 }
00298
00299 void StreamTypeRegistry::reg(const std::string& proto, const CreatorFunc func)
00300 {
00301 creators[proto] = func;
00302 }
00303
00304 Stream* StreamTypeRegistry::create(const std::string& proto, const std::string& target, const Hub& hub) const
00305 {
00306 typedef CreatorMap::const_iterator ConstIt;
00307 ConstIt it(creators.find(proto));
00308 if (it == creators.end())
00309 return 0;
00310 const CreatorFunc& creatorFunc(it->second);
00311 return creatorFunc(target, hub);
00312 }
00313
00314 std::string StreamTypeRegistry::list() const
00315 {
00316 std::string s;
00317 for (CreatorMap::const_iterator it = creators.begin(); it != creators.end();)
00318 {
00319 s += it->first;
00320 ++it;
00321 if (it != creators.end())
00322 s += ", ";
00323 }
00324 return s;
00325 }
00326 }
00327