MQTTProtocolOut.c
Go to the documentation of this file.
1 /*******************************************************************************
2  * Copyright (c) 2009, 2020 IBM Corp.
3  *
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v2.0
6  * and Eclipse Distribution License v1.0 which accompany this distribution.
7  *
8  * The Eclipse Public License is available at
9  * https://www.eclipse.org/legal/epl-2.0/
10  * and the Eclipse Distribution License is available at
11  * http://www.eclipse.org/org/documents/edl-v10.php.
12  *
13  * Contributors:
14  * Ian Craggs - initial API and implementation and/or initial documentation
15  * Ian Craggs, Allan Stockdill-Mander - SSL updates
16  * Ian Craggs - fix for buffer overflow in addressPort bug #433290
17  * Ian Craggs - MQTT 3.1.1 support
18  * Rong Xiang, Ian Craggs - C++ compatibility
19  * Ian Craggs - fix for bug 479376
20  * Ian Craggs - SNI support
21  * Ian Craggs - fix for issue #164
22  * Ian Craggs - fix for issue #179
23  * Ian Craggs - MQTT 5.0 support
24  *******************************************************************************/
25 
33 #include <stdlib.h>
34 #include <string.h>
35 #include <ctype.h>
36 
37 #include "MQTTProtocolOut.h"
38 #include "StackTrace.h"
39 #include "Heap.h"
40 #include "WebSocket.h"
41 #include "Base64.h"
42 
43 extern ClientStates* bstate;
44 
45 
46 
54 size_t MQTTProtocol_addressPort(const char* uri, int* port, const char **topic, int default_port)
55 {
56  char* colon_pos = strrchr(uri, ':'); /* reverse find to allow for ':' in IPv6 addresses */
57  char* buf = (char*)uri;
58  size_t len;
59 
60  FUNC_ENTRY;
61  if (uri[0] == '[')
62  { /* ip v6 */
63  if (colon_pos < strrchr(uri, ']'))
64  colon_pos = NULL; /* means it was an IPv6 separator, not for host:port */
65  }
66 
67  if (colon_pos) /* have to strip off the port */
68  {
69  len = colon_pos - uri;
70  *port = atoi(colon_pos + 1);
71  }
72  else
73  {
74  len = strlen(buf);
75  *port = default_port;
76  }
77 
78  /* try and find topic portion */
79  if ( topic )
80  {
81  const char* addr_start = uri;
82  if ( colon_pos )
83  addr_start = colon_pos;
84  *topic = strchr( addr_start, '/' );
85  }
86 
87  if (buf[len - 1] == ']')
88  {
89  /* we are stripping off the final ], so length is 1 shorter */
90  --len;
91  }
92  FUNC_EXIT;
93  return len;
94 }
95 
96 
105 void MQTTProtocol_specialChars(char* p0, char* p1, b64_size_t *basic_auth_in_len)
106 {
107  while(*p1 != '@') {
108  if (*p1 != '%') {
109  *p0++ = *p1++;
110  }
111  else if (isxdigit(*(p1 + 1)) && isxdigit(*(p1 + 2))) {
112  /* next 2 characters are hexa digits */
113  char hex[3];
114  p1++;
115  hex[0] = *p1++;
116  hex[1] = *p1++;
117  hex[2] = '\0';
118  *p0++ = (char)strtol(hex, 0, 16);
119  /* 3 input char => 1 output char */
120  *basic_auth_in_len -= 2;
121  }
122  }
123  *p0 = 0x0;
124 }
125 
126 
136 #if defined(OPENSSL)
137 #if defined(__GNUC__) && defined(__linux__)
138 int MQTTProtocol_connect(const char* ip_address, Clients* aClient, int ssl, int websocket, int MQTTVersion,
139  MQTTProperties* connectProperties, MQTTProperties* willProperties, long timeout)
140 #else
141 int MQTTProtocol_connect(const char* ip_address, Clients* aClient, int ssl, int websocket, int MQTTVersion,
142  MQTTProperties* connectProperties, MQTTProperties* willProperties)
143 #endif
144 #else
145 #if defined(__GNUC__) && defined(__linux__)
146 int MQTTProtocol_connect(const char* ip_address, Clients* aClient, int websocket, int MQTTVersion,
147  MQTTProperties* connectProperties, MQTTProperties* willProperties, long timeout)
148 #else
149 int MQTTProtocol_connect(const char* ip_address, Clients* aClient, int websocket, int MQTTVersion,
150  MQTTProperties* connectProperties, MQTTProperties* willProperties)
151 #endif
152 #endif
153 {
154  int rc = 0,
155  port;
156  size_t addr_len;
157  b64_size_t basic_auth_in_len, basic_auth_out_len;
158  char *p0, *p1;
159  b64_data_t *basic_auth;
160 
161  FUNC_ENTRY;
162  aClient->good = 1;
163  aClient->net.http_proxy = NULL;
164  aClient->net.http_proxy_auth = NULL;
165  if ((p0 = getenv("http_proxy")))
166  {
167  p1 = strchr(p0, '@');
168  if(p1)
169  {
170  aClient->net.http_proxy = p1 + 1;
171  p1 = strchr(p0, ':') + 3;
172  basic_auth_in_len = (b64_size_t)(aClient->net.http_proxy - p1);
173  basic_auth = (b64_data_t *)malloc(sizeof(char)*basic_auth_in_len);
174  if (!basic_auth)
175  {
176  rc = PAHO_MEMORY_ERROR;
177  goto exit;
178  }
179  basic_auth_in_len--;
180  p0 = (char *)basic_auth;
181  MQTTProtocol_specialChars(p0, p1, &basic_auth_in_len);
182  basic_auth_out_len = Base64_encodeLength(basic_auth, basic_auth_in_len);
183  if ((aClient->net.http_proxy_auth = (char *)malloc(sizeof(char) * basic_auth_out_len)) == NULL)
184  {
185  free(basic_auth);
186  rc = PAHO_MEMORY_ERROR;
187  goto exit;
188  }
189  Base64_encode(aClient->net.http_proxy_auth, basic_auth_out_len, basic_auth, basic_auth_in_len);
190  free(basic_auth);
191  }
192  else {
193  p1 = strchr(p0, ':');
194  if (p1)
195  aClient->net.http_proxy = p1 + 3;
196  }
197  Log(TRACE_PROTOCOL, -1, "MQTTProtocol_connect: setting http proxy to %s", aClient->net.http_proxy);
198  }
199 #if defined(OPENSSL)
200  aClient->net.https_proxy = NULL;
201  aClient->net.https_proxy_auth = NULL;
202  if ((p0 = getenv("https_proxy"))) {
203  p1 = strchr(p0, '@');
204  if(p1) {
205  aClient->net.https_proxy = p1 + 1;
206  p1 = strchr(p0, ':') + 3;
207  basic_auth_in_len = (b64_size_t)(aClient->net.https_proxy - p1);
208  basic_auth = (b64_data_t *)malloc(sizeof(char)*basic_auth_in_len);
209  if (!basic_auth)
210  {
211  rc = PAHO_MEMORY_ERROR;
212  goto exit;
213  }
214  basic_auth_in_len--;
215  p0 = (char *)basic_auth;
216  MQTTProtocol_specialChars(p0, p1, &basic_auth_in_len);
217  basic_auth_out_len = Base64_encodeLength(basic_auth, basic_auth_in_len);
218  if ((aClient->net.https_proxy_auth = (char *)malloc(sizeof(char) * basic_auth_out_len)) == NULL)
219  {
220  free(basic_auth);
221  rc = PAHO_MEMORY_ERROR;
222  goto exit;
223  }
224  Base64_encode(aClient->net.https_proxy_auth, basic_auth_out_len, basic_auth, basic_auth_in_len);
225  free(basic_auth);
226  }
227  else {
228  p1 = strchr(p0, ':');
229  if (p1)
230  aClient->net.https_proxy = p1 + 3;
231  }
232  }
233 
234  if (!ssl && websocket && aClient->net.http_proxy) {
235 #else
236  if (websocket && aClient->net.http_proxy) {
237 #endif
238  addr_len = MQTTProtocol_addressPort(aClient->net.http_proxy, &port, NULL, WS_DEFAULT_PORT);
239 #if defined(__GNUC__) && defined(__linux__)
240  if (timeout < 0)
241  rc = -1;
242  else
243  rc = Socket_new(aClient->net.http_proxy, addr_len, port, &(aClient->net.socket), timeout);
244 #else
245  rc = Socket_new(aClient->net.http_proxy, addr_len, port, &(aClient->net.socket));
246 #endif
247  }
248 #if defined(OPENSSL)
249  else if (ssl && websocket && aClient->net.https_proxy) {
250  addr_len = MQTTProtocol_addressPort(aClient->net.https_proxy, &port, NULL, WS_DEFAULT_PORT);
251 #if defined(__GNUC__) && defined(__linux__)
252  if (timeout < 0)
253  rc = -1;
254  else
255  rc = Socket_new(aClient->net.https_proxy, addr_len, port, &(aClient->net.socket), timeout);
256 #else
257  rc = Socket_new(aClient->net.https_proxy, addr_len, port, &(aClient->net.socket));
258 #endif
259  }
260 #endif
261  else {
262 #if defined(OPENSSL)
263  addr_len = MQTTProtocol_addressPort(ip_address, &port, NULL, ssl ? SECURE_MQTT_DEFAULT_PORT : MQTT_DEFAULT_PORT);
264 #else
265  addr_len = MQTTProtocol_addressPort(ip_address, &port, NULL, MQTT_DEFAULT_PORT);
266 #endif
267 #if defined(__GNUC__) && defined(__linux__)
268  if (timeout < 0)
269  rc = -1;
270  else
271  rc = Socket_new(ip_address, addr_len, port, &(aClient->net.socket), timeout);
272 #else
273  rc = Socket_new(ip_address, addr_len, port, &(aClient->net.socket));
274 #endif
275  }
276  if (rc == EINPROGRESS || rc == EWOULDBLOCK)
277  aClient->connect_state = TCP_IN_PROGRESS; /* TCP connect called - wait for connect completion */
278  else if (rc == 0)
279  { /* TCP connect completed. If SSL, send SSL connect */
280 #if defined(OPENSSL)
281  if (ssl)
282  {
283  if (websocket && aClient->net.https_proxy) {
285  rc = WebSocket_proxy_connect( &aClient->net, 1, ip_address);
286  }
287  if (rc == 0 && SSLSocket_setSocketForSSL(&aClient->net, aClient->sslopts, ip_address, addr_len) == 1)
288  {
289  rc = aClient->sslopts->struct_version >= 3 ?
290  SSLSocket_connect(aClient->net.ssl, aClient->net.socket, ip_address,
291  aClient->sslopts->verify, aClient->sslopts->ssl_error_cb, aClient->sslopts->ssl_error_context) :
292  SSLSocket_connect(aClient->net.ssl, aClient->net.socket, ip_address,
293  aClient->sslopts->verify, NULL, NULL);
294  if (rc == TCPSOCKET_INTERRUPTED)
295  aClient->connect_state = SSL_IN_PROGRESS; /* SSL connect called - wait for completion */
296  }
297  else
298  rc = SOCKET_ERROR;
299  }
300  else if (websocket && aClient->net.http_proxy) {
301 #else
302  if (websocket && aClient->net.http_proxy) {
303 #endif
305  rc = WebSocket_proxy_connect( &aClient->net, 0, ip_address);
306  }
307  if ( websocket )
308  {
309  rc = WebSocket_connect( &aClient->net, ip_address );
310  if ( rc == TCPSOCKET_INTERRUPTED )
311  aClient->connect_state = WEBSOCKET_IN_PROGRESS; /* Websocket connect called - wait for completion */
312  }
313  if (rc == 0)
314  {
315  /* Now send the MQTT connect packet */
316  if ((rc = MQTTPacket_send_connect(aClient, MQTTVersion, connectProperties, willProperties)) == 0)
317  aClient->connect_state = WAIT_FOR_CONNACK; /* MQTT Connect sent - wait for CONNACK */
318  else
319  aClient->connect_state = NOT_IN_PROGRESS;
320  }
321  }
322 
323 exit:
324  FUNC_EXIT_RC(rc);
325  return rc;
326 }
327 
328 
335 int MQTTProtocol_handlePingresps(void* pack, int sock)
336 {
337  Clients* client = NULL;
338  int rc = TCPSOCKET_COMPLETE;
339 
340  FUNC_ENTRY;
341  client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
342  Log(LOG_PROTOCOL, 21, NULL, sock, client->clientID);
343  client->ping_outstanding = 0;
344  FUNC_EXIT_RC(rc);
345  return rc;
346 }
347 
348 
360 {
361  int rc = 0;
362 
363  FUNC_ENTRY;
364  rc = MQTTPacket_send_subscribe(topics, qoss, opts, props, msgID, 0, client);
365  FUNC_EXIT_RC(rc);
366  return rc;
367 }
368 
369 
376 int MQTTProtocol_handleSubacks(void* pack, int sock)
377 {
378  Suback* suback = (Suback*)pack;
379  Clients* client = NULL;
380  int rc = TCPSOCKET_COMPLETE;
381 
382  FUNC_ENTRY;
383  client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
384  Log(LOG_PROTOCOL, 23, NULL, sock, client->clientID, suback->msgId);
385  MQTTPacket_freeSuback(suback);
386  FUNC_EXIT_RC(rc);
387  return rc;
388 }
389 
390 
398 {
399  int rc = 0;
400 
401  FUNC_ENTRY;
402  rc = MQTTPacket_send_unsubscribe(topics, props, msgID, 0, client);
403  FUNC_EXIT_RC(rc);
404  return rc;
405 }
406 
407 
414 int MQTTProtocol_handleUnsubacks(void* pack, int sock)
415 {
416  Unsuback* unsuback = (Unsuback*)pack;
417  Clients* client = NULL;
418  int rc = TCPSOCKET_COMPLETE;
419 
420  FUNC_ENTRY;
421  client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
422  Log(LOG_PROTOCOL, 24, NULL, sock, client->clientID, unsuback->msgId);
423  MQTTPacket_freeUnsuback(unsuback);
424  FUNC_EXIT_RC(rc);
425  return rc;
426 }
427 
int WebSocket_proxy_connect(networkHandles *net, int ssl, const char *hostname)
Definition: WebSocket.c:1445
void MQTTPacket_freeSuback(Suback *pack)
Definition: MQTTPacket.c:684
#define PROXY_CONNECT_IN_PROGRESS
Definition: Clients.h:110
#define LOG_PROTOCOL
Definition: Log.h:64
int msgId
Definition: MQTTPacket.h:174
string topic
Definition: test2.py:8
#define FUNC_EXIT
Definition: StackTrace.h:59
MQTTProperties props
Definition: paho_c_pub.c:54
#define MQTT_DEFAULT_PORT
int Socket_new(const char *addr, size_t addr_len, int port, int *sock)
Definition: Socket.c:671
int SSLSocket_setSocketForSSL(networkHandles *net, MQTTClient_SSLOptions *opts, const char *hostname, size_t hostname_len)
int MQTTProtocol_connect(const char *ip_address, Clients *aClient, int websocket, int MQTTVersion, MQTTProperties *connectProperties, MQTTProperties *willProperties)
#define PAHO_MEMORY_ERROR
Definition: Heap.h:26
int MQTTPacket_send_connect(Clients *client, int MQTTVersion, MQTTProperties *connectProperties, MQTTProperties *willProperties)
Definition: MQTTPacketOut.c:48
int SSLSocket_connect(SSL *ssl, int sock, const char *hostname, int verify, int(*cb)(const char *str, size_t len, void *u), void *u)
struct pubsub_opts opts
Definition: paho_c_pub.c:42
#define malloc(x)
Definition: Heap.h:41
int msgId
Definition: MQTTPacket.h:187
signed int connect_state
Definition: Clients.h:128
#define free(x)
Definition: Heap.h:55
networkHandles net
Definition: Clients.h:129
int clientSocketCompare(void *a, void *b)
Definition: Clients.c:50
b64_size_t Base64_encodeLength(const b64_data_t *in, b64_size_t in_len)
Definition: Base64.c:250
void MQTTProtocol_specialChars(char *p0, char *p1, b64_size_t *basic_auth_in_len)
unsigned int good
Definition: Clients.h:126
int MQTTProtocol_handleUnsubacks(void *pack, int sock)
#define FUNC_EXIT_RC(x)
Definition: StackTrace.h:63
#define SOCKET_ERROR
Definition: Socket.h:76
List * clients
Definition: Clients.h:163
b64_size_t Base64_encode(char *out, b64_size_t out_len, const b64_data_t *in, b64_size_t in_len)
Definition: Base64.c:179
int MQTTPacket_send_subscribe(List *topics, List *qoss, MQTTSubscribe_options *opts, MQTTProperties *props, int msgid, int dup, Clients *client)
#define SSL_IN_PROGRESS
Definition: Clients.h:104
void MQTTPacket_freeUnsuback(Unsuback *pack)
Definition: MQTTPacket.c:700
void Log(enum LOG_LEVELS log_level, int msgno, const char *format,...)
Definition: Log.c:417
int MQTTProtocol_unsubscribe(Clients *client, List *topics, int msgID, MQTTProperties *props)
int MQTTPacket_send_unsubscribe(List *topics, MQTTProperties *props, int msgid, int dup, Clients *client)
ClientStates * bstate
Definition: MQTTAsync.c:117
#define WEBSOCKET_IN_PROGRESS
Definition: Clients.h:106
MQTTAsync client
Definition: test6.c:276
#define FUNC_ENTRY
Definition: StackTrace.h:55
unsigned int ping_outstanding
Definition: Clients.h:127
#define SECURE_MQTT_DEFAULT_PORT
int MQTTProtocol_handlePingresps(void *pack, int sock)
#define NOT_IN_PROGRESS
Definition: Clients.h:100
#define WS_DEFAULT_PORT
#define TCP_IN_PROGRESS
Definition: Clients.h:102
int MQTTProtocol_subscribe(Clients *client, List *topics, List *qoss, int msgID, MQTTSubscribe_options *opts, MQTTProperties *props)
unsigned char b64_data_t
Definition: Base64.h:23
#define WAIT_FOR_CONNACK
Definition: Clients.h:108
int WebSocket_connect(networkHandles *net, const char *uri)
Definition: WebSocket.c:383
int MQTTProtocol_handleSubacks(void *pack, int sock)
char * clientID
Definition: Clients.h:119
size_t MQTTProtocol_addressPort(const char *uri, int *port, const char **topic, int default_port)
char * http_proxy_auth
Definition: Clients.h:91
#define TCPSOCKET_COMPLETE
Definition: Socket.h:73
ListElement * ListFindItem(List *aList, void *content, int(*callback)(void *, void *))
Definition: LinkedList.c:154
char * topics[]
unsigned int b64_size_t
Definition: Base64.h:21
enum MQTTReasonCodes rc
Definition: test10.c:1112
char * http_proxy
Definition: Clients.h:90
#define TCPSOCKET_INTERRUPTED
Definition: Socket.h:79
int len
Definition: utf-8.c:46


plotjuggler
Author(s): Davide Faconti
autogenerated on Sun Dec 6 2020 03:48:09