MQTTClient.c
Go to the documentation of this file.
1 /*******************************************************************************
2  * Copyright (c) 2009, 2020 IBM Corp. and others
3  *
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v2.0
6  * and Eclipse Distribution License v1.0 which accompany this distribution.
7  *
8  * The Eclipse Public License is available at
9  * https://www.eclipse.org/legal/epl-2.0/
10  * and the Eclipse Distribution License is available at
11  * http://www.eclipse.org/org/documents/edl-v10.php.
12  *
13  * Contributors:
14  * Ian Craggs - initial API and implementation and/or initial documentation
15  * Ian Craggs - bug 384016 - segv setting will message
16  * Ian Craggs - bug 384053 - v1.0.0.7 - stop MQTTClient_receive on socket error
17  * Ian Craggs, Allan Stockdill-Mander - add ability to connect with SSL
18  * Ian Craggs - multiple server connection support
19  * Ian Craggs - fix for bug 413429 - connectionLost not called
20  * Ian Craggs - fix for bug 421103 - trying to write to same socket, in publish/retries
21  * Ian Craggs - fix for bug 419233 - mutexes not reporting errors
22  * Ian Craggs - fix for bug 420851
23  * Ian Craggs - fix for bug 432903 - queue persistence
24  * Ian Craggs - MQTT 3.1.1 support
25  * Ian Craggs - fix for bug 438176 - MQTT version selection
26  * Rong Xiang, Ian Craggs - C++ compatibility
27  * Ian Craggs - fix for bug 443724 - stack corruption
28  * Ian Craggs - fix for bug 447672 - simultaneous access to socket structure
29  * Ian Craggs - fix for bug 459791 - deadlock in WaitForCompletion for bad client
30  * Ian Craggs - fix for bug 474905 - insufficient synchronization for subscribe, unsubscribe, connect
31  * Ian Craggs - make it clear that yield and receive are not intended for multi-threaded mode (bug 474748)
32  * Ian Craggs - SNI support, message queue unpersist bug
33  * Ian Craggs - binary will message support
34  * Ian Craggs - waitforCompletion fix #240
35  * Ian Craggs - check for NULL SSL options #334
36  * Ian Craggs - allocate username/password buffers #431
37  * Ian Craggs - MQTT 5.0 support
38  *******************************************************************************/
39 
46 #include <stdlib.h>
47 #include <string.h>
48 #if !defined(_WIN32) && !defined(_WIN64)
49  #include <sys/time.h>
50 #endif
51 
52 #include "MQTTClient.h"
53 #if !defined(NO_PERSISTENCE)
54 #include "MQTTPersistence.h"
55 #endif
56 
57 #include "utf-8.h"
58 #include "MQTTProtocol.h"
59 #include "MQTTProtocolOut.h"
60 #include "Thread.h"
61 #include "SocketBuffer.h"
62 #include "StackTrace.h"
63 #include "Heap.h"
64 
65 #if defined(OPENSSL)
66 #include <openssl/ssl.h>
67 #else
68 #define URI_SSL "ssl://"
69 #endif
70 
71 #include "OsWrapper.h"
72 
73 #define URI_TCP "tcp://"
74 #define URI_WS "ws://"
75 #define URI_WSS "wss://"
76 
77 #include "VersionInfo.h"
78 #include "WebSocket.h"
79 
80 const char *client_timestamp_eye = "MQTTClientV3_Timestamp " BUILD_TIMESTAMP;
81 const char *client_version_eye = "MQTTClientV3_Version " CLIENT_VERSION;
82 
83 int MQTTClient_init(void);
84 
86 {
88 #if defined(OPENSSL)
90 #endif
91 }
92 
94 {
95  CLIENT_VERSION, /* version */
96  NULL /* client list */
97 };
98 
100 
102 
103 #if defined(_WIN32) || defined(_WIN64)
104 static mutex_type mqttclient_mutex = NULL;
105 static mutex_type socket_mutex = NULL;
106 static mutex_type subscribe_mutex = NULL;
107 static mutex_type unsubscribe_mutex = NULL;
108 static mutex_type connect_mutex = NULL;
109 #if !defined(NO_HEAP_TRACKING)
110 extern mutex_type stack_mutex;
111 extern mutex_type heap_mutex;
112 #endif
113 extern mutex_type log_mutex;
114 
115 int MQTTClient_init(void)
116 {
117  DWORD rc = 0;
118 
119  if (mqttclient_mutex == NULL)
120  {
121  if ((mqttclient_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
122  {
123  rc = GetLastError();
124  printf("mqttclient_mutex error %d\n", rc);
125  goto exit;
126  }
127  if ((subscribe_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
128  {
129  rc = GetLastError();
130  printf("subscribe_mutex error %d\n", rc);
131  goto exit;
132  }
133  if ((unsubscribe_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
134  {
135  rc = GetLastError();
136  printf("unsubscribe_mutex error %d\n", rc);
137  goto exit;
138  }
139  if ((connect_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
140  {
141  rc = GetLastError();
142  printf("connect_mutex error %d\n", rc);
143  goto exit;
144  }
145 #if !defined(NO_HEAP_TRACKING)
146  if ((stack_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
147  {
148  rc = GetLastError();
149  printf("stack_mutex error %d\n", rc);
150  goto exit;
151  }
152  if ((heap_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
153  {
154  rc = GetLastError();
155  printf("heap_mutex error %d\n", rc);
156  goto exit;
157  }
158 #endif
159  if ((log_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
160  {
161  rc = GetLastError();
162  printf("log_mutex error %d\n", rc);
163  goto exit;
164  }
165  if ((socket_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
166  {
167  rc = GetLastError();
168  printf("socket_mutex error %d\n", rc);
169  goto exit;
170  }
171  }
172 exit:
173  return rc;
174 }
175 
176 void MQTTClient_cleanup(void)
177 {
178  if (connect_mutex)
179  CloseHandle(connect_mutex);
180  if (subscribe_mutex)
181  CloseHandle(subscribe_mutex);
182  if (unsubscribe_mutex)
183  CloseHandle(unsubscribe_mutex);
184 #if !defined(NO_HEAP_TRACKING)
185  if (stack_mutex)
186  CloseHandle(stack_mutex);
187  if (heap_mutex)
188  CloseHandle(heap_mutex);
189 #endif
190  if (log_mutex)
191  CloseHandle(log_mutex);
192  if (socket_mutex)
193  CloseHandle(socket_mutex);
194  if (mqttclient_mutex)
195  CloseHandle(mqttclient_mutex);
196 }
197 
198 #if defined(PAHO_MQTT_STATIC)
199 /* Global variable for one-time initialization structure */
200 static INIT_ONCE g_InitOnce = INIT_ONCE_STATIC_INIT; /* Static initialization */
201 
202 /* One time initialization function */
203 BOOL CALLBACK InitOnceFunction (
204  PINIT_ONCE InitOnce, /* Pointer to one-time initialization structure */
205  PVOID Parameter, /* Optional parameter passed by InitOnceExecuteOnce */
206  PVOID *lpContext) /* Receives pointer to event object */
207 {
208  int rc = MQTTClient_init();
209  return rc == 0;
210 }
211 
212 #else
213 BOOL APIENTRY DllMain(HANDLE hModule,
214  DWORD ul_reason_for_call,
215  LPVOID lpReserved)
216 {
217  switch (ul_reason_for_call)
218  {
219  case DLL_PROCESS_ATTACH:
220  MQTTClient_init();
221  break;
222  case DLL_THREAD_ATTACH:
223  break;
224  case DLL_THREAD_DETACH:
225  break;
226  case DLL_PROCESS_DETACH:
227  if (lpReserved)
228  MQTTClient_cleanup();
229  break;
230  }
231  return TRUE;
232 }
233 #endif
234 
235 #else
236 static pthread_mutex_t mqttclient_mutex_store = PTHREAD_MUTEX_INITIALIZER;
238 
239 static pthread_mutex_t socket_mutex_store = PTHREAD_MUTEX_INITIALIZER;
241 
242 static pthread_mutex_t subscribe_mutex_store = PTHREAD_MUTEX_INITIALIZER;
244 
245 static pthread_mutex_t unsubscribe_mutex_store = PTHREAD_MUTEX_INITIALIZER;
247 
248 static pthread_mutex_t connect_mutex_store = PTHREAD_MUTEX_INITIALIZER;
250 
252 {
253  pthread_mutexattr_t attr;
254  int rc;
255 
256  pthread_mutexattr_init(&attr);
257 #if !defined(_WRS_KERNEL)
258  pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
259 #else
260  /* #warning "no pthread_mutexattr_settype" */
261 #endif /* !defined(_WRS_KERNEL) */
262  if ((rc = pthread_mutex_init(mqttclient_mutex, &attr)) != 0)
263  printf("MQTTClient: error %d initializing client_mutex\n", rc);
264  else if ((rc = pthread_mutex_init(socket_mutex, &attr)) != 0)
265  printf("MQTTClient: error %d initializing socket_mutex\n", rc);
266  else if ((rc = pthread_mutex_init(subscribe_mutex, &attr)) != 0)
267  printf("MQTTClient: error %d initializing subscribe_mutex\n", rc);
268  else if ((rc = pthread_mutex_init(unsubscribe_mutex, &attr)) != 0)
269  printf("MQTTClient: error %d initializing unsubscribe_mutex\n", rc);
270  else if ((rc = pthread_mutex_init(connect_mutex, &attr)) != 0)
271  printf("MQTTClient: error %d initializing connect_mutex\n", rc);
272 
273  return rc;
274 }
275 
276 #define WINAPI
277 #endif
278 
279 static volatile int library_initialized = 0;
280 static List* handles = NULL;
281 static int running = 0;
282 static int tostop = 0;
284 
285 typedef struct
286 {
288  char* topicName;
289  int topicLen;
290  unsigned int seqno; /* only used on restore */
291 } qEntry;
292 
293 
294 typedef struct
295 {
296  char* serverURI;
297  const char* currentServerURI; /* when using HA options, set the currently used serverURI */
298 #if defined(OPENSSL)
299  int ssl;
300 #endif
306  void* context;
307 
309  void* disconnected_context; /* the context to be associated with the disconnected callback*/
310 
312  void* published_context; /* the context to be associated with the disconnected callback*/
313 
314 #if 0
315  MQTTClient_authHandle* auth_handle;
316  void* auth_handle_context; /* the context to be associated with the authHandle callback*/
317 #endif
318 
320  int rc; /* getsockopt return code in connect */
325 
326 } MQTTClients;
327 
329 {
333 };
334 
335 static void MQTTClient_terminate(void);
337 static int MQTTClient_deliverMessage(
338  int rc, MQTTClients* m,
339  char** topicName, int* topicLen,
340  MQTTClient_message** message);
341 static int clientSockCompare(void* a, void* b);
344 static int MQTTClient_stop(void);
349  const char* serverURI, int MQTTVersion,
350  START_TIME_TYPE start, ELAPSED_TIME_TYPE millisecsTimeout,
351  MQTTProperties* connectProperties, MQTTProperties* willProperties);
352 static MQTTResponse MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectOptions* options, const char* serverURI,
353  MQTTProperties* connectProperties, MQTTProperties* willProperties);
354 static int MQTTClient_disconnect1(MQTTClient handle, int timeout, int internal, int stop, enum MQTTReasonCodes, MQTTProperties*);
355 static int MQTTClient_disconnect_internal(MQTTClient handle, int timeout);
356 static void MQTTClient_retry(void);
357 static MQTTPacket* MQTTClient_cycle(int* sock, ELAPSED_TIME_TYPE timeout, int* rc);
358 static MQTTPacket* MQTTClient_waitfor(MQTTClient handle, int packet_type, int* rc, ELAPSED_TIME_TYPE timeout);
359 /*static int pubCompare(void* a, void* b); */
360 static void MQTTProtocol_checkPendingWrites(void);
361 static void MQTTClient_writeComplete(int socket, int rc);
362 
363 
364 int MQTTClient_createWithOptions(MQTTClient* handle, const char* serverURI, const char* clientId,
365  int persistence_type, void* persistence_context, MQTTClient_createOptions* options)
366 {
367  int rc = 0;
368  MQTTClients *m = NULL;
369 
370 #if (defined(_WIN32) || defined(_WIN64)) && defined(PAHO_MQTT_STATIC)
371  /* intializes mutexes once. Must come before FUNC_ENTRY */
372  BOOL bStatus = InitOnceExecuteOnce(&g_InitOnce, InitOnceFunction, NULL, NULL);
373 #endif
374  FUNC_ENTRY;
375  if ((rc = Thread_lock_mutex(mqttclient_mutex)) != 0)
376  goto exit;
377 
378  if (serverURI == NULL || clientId == NULL)
379  {
381  goto exit;
382  }
383 
384  if (!UTF8_validateString(clientId))
385  {
387  goto exit;
388  }
389 
390  if (strlen(clientId) == 0 && persistence_type == MQTTCLIENT_PERSISTENCE_DEFAULT)
391  {
393  goto exit;
394  }
395 
396  if (strstr(serverURI, "://") != NULL)
397  {
398  if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) != 0
399  && strncmp(URI_WS, serverURI, strlen(URI_WS)) != 0
400 #if defined(OPENSSL)
401  && strncmp(URI_SSL, serverURI, strlen(URI_SSL)) != 0
402  && strncmp(URI_WSS, serverURI, strlen(URI_WSS)) != 0
403 #endif
404  )
405  {
407  goto exit;
408  }
409  }
410 
411  if (options && (strncmp(options->struct_id, "MQCO", 4) != 0 || options->struct_version != 0))
412  {
414  goto exit;
415  }
416 
417  if (!library_initialized)
418  {
419  #if !defined(NO_HEAP_TRACKING)
420  Heap_initialize();
421  #endif
423  bstate->clients = ListInitialize();
426  handles = ListInitialize();
427 #if defined(OPENSSL)
429 #endif
431  }
432 
433  if ((m = malloc(sizeof(MQTTClients))) == NULL)
434  {
435  rc = PAHO_MEMORY_ERROR;
436  goto exit;
437  }
438  *handle = m;
439  memset(m, '\0', sizeof(MQTTClients));
440  if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) == 0)
441  serverURI += strlen(URI_TCP);
442  else if (strncmp(URI_WS, serverURI, strlen(URI_WS)) == 0)
443  {
444  serverURI += strlen(URI_WS);
445  m->websocket = 1;
446  }
447  else if (strncmp(URI_SSL, serverURI, strlen(URI_SSL)) == 0)
448  {
449 #if defined(OPENSSL)
450  serverURI += strlen(URI_SSL);
451  m->ssl = 1;
452 #else
454  goto exit;
455 #endif
456  }
457  else if (strncmp(URI_WSS, serverURI, strlen(URI_WSS)) == 0)
458  {
459 #if defined(OPENSSL)
460  serverURI += strlen(URI_WSS);
461  m->ssl = 1;
462  m->websocket = 1;
463 #else
465  goto exit;
466 #endif
467  }
468  m->serverURI = MQTTStrdup(serverURI);
469  ListAppend(handles, m, sizeof(MQTTClients));
470 
471  if ((m->c = malloc(sizeof(Clients))) == NULL)
472  {
473  ListRemove(handles, m);
474  rc = PAHO_MEMORY_ERROR;
475  goto exit;
476  }
477  memset(m->c, '\0', sizeof(Clients));
478  m->c->context = m;
479  m->c->MQTTVersion = (options) ? options->MQTTVersion : MQTTVERSION_DEFAULT;
480  m->c->outboundMsgs = ListInitialize();
481  m->c->inboundMsgs = ListInitialize();
482  m->c->messageQueue = ListInitialize();
483  m->c->clientID = MQTTStrdup(clientId);
484  m->connect_sem = Thread_create_sem(&rc);
485  m->connack_sem = Thread_create_sem(&rc);
486  m->suback_sem = Thread_create_sem(&rc);
488 
489 #if !defined(NO_PERSISTENCE)
490  rc = MQTTPersistence_create(&(m->c->persistence), persistence_type, persistence_context);
491  if (rc == 0)
492  {
494  if (rc == 0)
496  }
497 #endif
498  ListAppend(bstate->clients, m->c, sizeof(Clients) + 3*sizeof(List));
499 
500 exit:
502  FUNC_EXIT_RC(rc);
503  return rc;
504 }
505 
506 
507 int MQTTClient_create(MQTTClient* handle, const char* serverURI, const char* clientId,
508  int persistence_type, void* persistence_context)
509 {
510  return MQTTClient_createWithOptions(handle, serverURI, clientId, persistence_type,
511  persistence_context, NULL);
512 }
513 
514 
515 static void MQTTClient_terminate(void)
516 {
517  FUNC_ENTRY;
518  MQTTClient_stop();
520  {
521  ListFree(bstate->clients);
522  ListFree(handles);
523  handles = NULL;
525  #if !defined(NO_HEAP_TRACKING)
526  Heap_terminate();
527  #endif
528  Log_terminate();
530  }
531  FUNC_EXIT;
532 }
533 
534 
536 {
537  FUNC_ENTRY;
538  /* empty message queue */
539  if (client->messageQueue->count > 0)
540  {
541  ListElement* current = NULL;
542  while (ListNextElement(client->messageQueue, &current))
543  {
544  qEntry* qe = (qEntry*)(current->content);
545  free(qe->topicName);
547  free(qe->msg->payload);
548  free(qe->msg);
549  }
550  ListEmpty(client->messageQueue);
551  }
552  FUNC_EXIT;
553 }
554 
555 
557 {
558  MQTTClients* m = *handle;
559 
560  FUNC_ENTRY;
563 
564  if (m == NULL)
565  goto exit;
566 
567  if (m->c)
568  {
569  int saved_socket = m->c->net.socket;
570  char* saved_clientid = MQTTStrdup(m->c->clientID);
571 #if !defined(NO_PERSISTENCE)
573 #endif
576  if (!ListRemove(bstate->clients, m->c))
577  Log(LOG_ERROR, 0, NULL);
578  else
579  Log(TRACE_MIN, 1, NULL, saved_clientid, saved_socket);
580  free(saved_clientid);
581  }
582  if (m->serverURI)
583  free(m->serverURI);
588  if (!ListRemove(handles, m))
589  Log(LOG_ERROR, -1, "free error");
590  *handle = NULL;
591  if (bstate->clients->count == 0)
593 
594 exit:
597  FUNC_EXIT;
598 }
599 
600 
602 {
603  FUNC_ENTRY;
604  MQTTProperties_free(&(*message)->properties);
605  free((*message)->payload);
606  free(*message);
607  *message = NULL;
608  FUNC_EXIT;
609 }
610 
611 
612 void MQTTClient_free(void* memory)
613 {
614  FUNC_ENTRY;
615  free(memory);
616  FUNC_EXIT;
617 }
618 
619 
621 {
622  FUNC_ENTRY;
623  if (response.reasonCodeCount > 0 && response.reasonCodes)
624  free(response.reasonCodes);
625  if (response.properties)
626  {
628  free(response.properties);
629  }
630  FUNC_EXIT;
631 }
632 
633 
634 static int MQTTClient_deliverMessage(int rc, MQTTClients* m, char** topicName, int* topicLen, MQTTClient_message** message)
635 {
636  qEntry* qe = (qEntry*)(m->c->messageQueue->first->content);
637 
638  FUNC_ENTRY;
639  *message = qe->msg;
640  *topicName = qe->topicName;
641  *topicLen = qe->topicLen;
642  if (strlen(*topicName) != *topicLen)
644 #if !defined(NO_PERSISTENCE)
645  if (m->c->persistence)
647 #endif
649  FUNC_EXIT_RC(rc);
650  return rc;
651 }
652 
653 
660 static int clientSockCompare(void* a, void* b)
661 {
662  MQTTClients* m = (MQTTClients*)a;
663  return m->c->net.socket == *(int*)b;
664 }
665 
666 
674 {
675  MQTTClients* m = (MQTTClients*)context;
676 
677  (*(m->cl))(m->context, NULL);
678  return 0;
679 }
680 
681 
683 {
684  int rc = MQTTCLIENT_SUCCESS;
685  MQTTClients* m = handle;
686 
687  FUNC_ENTRY;
689 
690  if (m == NULL || m->c->connect_state != NOT_IN_PROGRESS)
691  rc = MQTTCLIENT_FAILURE;
692  else
693  {
696  }
697 
699  FUNC_EXIT_RC(rc);
700  return rc;
701 }
702 
703 
704 
712 {
713  struct props_rc_parms* pr = (struct props_rc_parms*)context;
714 
715  (*(pr->m->disconnected))(pr->m->disconnected_context, pr->properties, pr->reasonCode);
717  free(pr->properties);
718  free(pr);
719  return 0;
720 }
721 
722 
724 {
725  int rc = MQTTCLIENT_SUCCESS;
726  MQTTClients* m = handle;
727 
728  FUNC_ENTRY;
730 
731  if (m == NULL || m->c->connect_state != NOT_IN_PROGRESS)
732  rc = MQTTCLIENT_FAILURE;
733  else
734  {
736  m->published = published;
737  }
738 
740  FUNC_EXIT_RC(rc);
741  return rc;
742 }
743 
744 
745 #if 0
746 int MQTTClient_setHandleAuth(MQTTClient handle, void* context, MQTTClient_handleAuth* auth_handle)
747 {
748  int rc = MQTTCLIENT_SUCCESS;
749  MQTTClients* m = handle;
750 
751  FUNC_ENTRY;
753 
754  if (m == NULL || m->c->connect_state != NOT_IN_PROGRESS)
755  rc = MQTTCLIENT_FAILURE;
756  else
757  {
758  m->auth_handle_context = context;
759  m->auth_handle = auth_handle;
760  }
761 
763  FUNC_EXIT_RC(rc);
764  return rc;
765 }
766 
767 
774 static thread_return_type WINAPI call_auth_handle(void* context)
775 {
776  struct props_rc_parms* pr = (struct props_rc_parms*)context;
777 
778  (*(pr->m->auth_handle))(pr->m->auth_handle_context, pr->properties, pr->reasonCode);
780  free(pr->properties);
781  free(pr);
782  return 0;
783 }
784 #endif
785 
786 
787 /* This is the thread function that handles the calling of callback functions if set */
789 {
790  long timeout = 10L; /* first time in we have a small timeout. Gets things started more quickly */
791 
792  FUNC_ENTRY;
793  running = 1;
794  run_id = Thread_getid();
795 
797  while (!tostop)
798  {
799  int rc = SOCKET_ERROR;
800  int sock = -1;
801  MQTTClients* m = NULL;
802  MQTTPacket* pack = NULL;
803 
805  pack = MQTTClient_cycle(&sock, timeout, &rc);
807  if (tostop)
808  break;
809  timeout = 1000L;
810 
811  /* find client corresponding to socket */
812  if (ListFindItem(handles, &sock, clientSockCompare) == NULL)
813  {
814  /* assert: should not happen */
815  continue;
816  }
817  m = (MQTTClient)(handles->current->content);
818  if (m == NULL)
819  {
820  /* assert: should not happen */
821  continue;
822  }
823  if (rc == SOCKET_ERROR)
824  {
825  if (m->c->connected)
827  else
828  {
829  if (m->c->connect_state == SSL_IN_PROGRESS)
830  {
831  Log(TRACE_MIN, -1, "Posting connect semaphore for client %s", m->c->clientID);
834  }
835  if (m->c->connect_state == WAIT_FOR_CONNACK)
836  {
837  Log(TRACE_MIN, -1, "Posting connack semaphore for client %s", m->c->clientID);
840  }
841  }
842  }
843  else
844  {
845  if (m->c->messageQueue->count > 0 && m->ma)
846  {
847  qEntry* qe = (qEntry*)(m->c->messageQueue->first->content);
848  int topicLen = qe->topicLen;
849 
850  if (strlen(qe->topicName) == topicLen)
851  topicLen = 0;
852 
853  Log(TRACE_MIN, -1, "Calling messageArrived for client %s, queue depth %d",
854  m->c->clientID, m->c->messageQueue->count);
856  rc = (*(m->ma))(m->context, qe->topicName, topicLen, qe->msg);
858  /* if 0 (false) is returned by the callback then it failed, so we don't remove the message from
859  * the queue, and it will be retried later. If 1 is returned then the message data may have been freed,
860  * so we must be careful how we use it.
861  */
862  if (rc)
863  {
864  #if !defined(NO_PERSISTENCE)
865  if (m->c->persistence)
867  #endif
868  ListRemove(m->c->messageQueue, qe);
869  }
870  else
871  Log(TRACE_MIN, -1, "False returned from messageArrived for client %s, message remains on queue",
872  m->c->clientID);
873  }
874  if (pack)
875  {
876  if (pack->header.bits.type == CONNACK)
877  {
878  Log(TRACE_MIN, -1, "Posting connack semaphore for client %s", m->c->clientID);
879  m->pack = pack;
881  }
882  else if (pack->header.bits.type == SUBACK)
883  {
884  Log(TRACE_MIN, -1, "Posting suback semaphore for client %s", m->c->clientID);
885  m->pack = pack;
887  }
888  else if (pack->header.bits.type == UNSUBACK)
889  {
890  Log(TRACE_MIN, -1, "Posting unsuback semaphore for client %s", m->c->clientID);
891  m->pack = pack;
893  }
894  else if (m->c->MQTTVersion >= MQTTVERSION_5)
895  {
896  if (pack->header.bits.type == DISCONNECT && m->disconnected)
897  {
898  struct props_rc_parms* dp;
899  Ack* disc = (Ack*)pack;
900 
901  dp = malloc(sizeof(struct props_rc_parms));
902  if (dp)
903  {
904  dp->m = m;
905  dp->reasonCode = disc->rc;
906  dp->properties = malloc(sizeof(MQTTProperties));
907  if (dp->properties)
908  {
909  *(dp->properties) = disc->properties;
911  Log(TRACE_MIN, -1, "Calling disconnected for client %s", m->c->clientID);
913  }
914  else
915  free(dp);
916  }
917  free(disc);
918  }
919 #if 0
920  if (pack->header.bits.type == AUTH && m->auth_handle)
921  {
922  struct props_rc_parms dp;
923  Ack* disc = (Ack*)pack;
924 
925  dp.m = m;
926  dp.properties = &disc->properties;
927  dp.reasonCode = disc->rc;
928  free(pack);
929  Log(TRACE_MIN, -1, "Calling auth_handle for client %s", m->c->clientID);
930  Thread_start(call_auth_handle, &dp);
931  }
932 #endif
933  }
934  }
935  else if (m->c->connect_state == TCP_IN_PROGRESS)
936  {
937  int error;
938  socklen_t len = sizeof(error);
939 
940  if ((m->rc = getsockopt(m->c->net.socket, SOL_SOCKET, SO_ERROR, (char*)&error, &len)) == 0)
941  m->rc = error;
942  Log(TRACE_MIN, -1, "Posting connect semaphore for client %s rc %d", m->c->clientID, m->rc);
945  }
946 #if defined(OPENSSL)
947  else if (m->c->connect_state == SSL_IN_PROGRESS)
948  {
949  rc = m->c->sslopts->struct_version >= 3 ?
950  SSLSocket_connect(m->c->net.ssl, m->c->net.socket, m->serverURI,
951  m->c->sslopts->verify, m->c->sslopts->ssl_error_cb, m->c->sslopts->ssl_error_context) :
952  SSLSocket_connect(m->c->net.ssl, m->c->net.socket, m->serverURI,
953  m->c->sslopts->verify, NULL, NULL);
954  if (rc == 1 || rc == SSL_FATAL)
955  {
956  if (rc == 1 && (m->c->cleansession == 0 && m->c->cleanstart == 0) && m->c->session == NULL)
957  m->c->session = SSL_get1_session(m->c->net.ssl);
958  m->rc = rc;
959  Log(TRACE_MIN, -1, "Posting connect semaphore for SSL client %s rc %d", m->c->clientID, m->rc);
962  }
963  }
964 #endif
965 
966  else if (m->c->connect_state == WEBSOCKET_IN_PROGRESS)
967  {
968  if (rc != TCPSOCKET_INTERRUPTED)
969  {
970  Log(TRACE_MIN, -1, "Posting websocket handshake for client %s rc %d", m->c->clientID, m->rc);
973  }
974  }
975  }
976  }
977  run_id = 0;
978  running = tostop = 0;
980  FUNC_EXIT;
981 #if defined(_WIN32) || defined(_WIN64)
982  ExitThread(0);
983 #endif
984  return 0;
985 }
986 
987 
988 static int MQTTClient_stop(void)
989 {
990  int rc = 0;
991 
992  FUNC_ENTRY;
993  if (running == 1 && tostop == 0)
994  {
995  int conn_count = 0;
996  ListElement* current = NULL;
997 
998  if (handles != NULL)
999  {
1000  /* find out how many handles are still connected */
1001  while (ListNextElement(handles, &current))
1002  {
1003  if (((MQTTClients*)(current->content))->c->connect_state > NOT_IN_PROGRESS ||
1004  ((MQTTClients*)(current->content))->c->connected)
1005  ++conn_count;
1006  }
1007  }
1008  Log(TRACE_MIN, -1, "Conn_count is %d", conn_count);
1009  /* stop the background thread, if we are the last one to be using it */
1010  if (conn_count == 0)
1011  {
1012  int count = 0;
1013  tostop = 1;
1014  if (Thread_getid() != run_id)
1015  {
1016  while (running && ++count < 100)
1017  {
1019  Log(TRACE_MIN, -1, "sleeping");
1020  MQTTTime_sleep(100L);
1022  }
1023  }
1024  rc = 1;
1025  }
1026  }
1027  FUNC_EXIT_RC(rc);
1028  return rc;
1029 }
1030 
1031 
1034 {
1035  int rc = MQTTCLIENT_SUCCESS;
1036  MQTTClients* m = handle;
1037 
1038  FUNC_ENTRY;
1040 
1041  if (m == NULL || ma == NULL || m->c->connect_state != NOT_IN_PROGRESS)
1042  rc = MQTTCLIENT_FAILURE;
1043  else
1044  {
1045  m->context = context;
1046  m->cl = cl;
1047  m->ma = ma;
1048  m->dc = dc;
1049  }
1050 
1052  FUNC_EXIT_RC(rc);
1053  return rc;
1054 }
1055 
1056 
1058 {
1059  FUNC_ENTRY;
1060  client->good = 0;
1061  client->ping_outstanding = 0;
1062  if (client->net.socket > 0)
1063  {
1064  if (client->connected)
1065  MQTTPacket_send_disconnect(client, reason, props);
1067  WebSocket_close(&client->net, WebSocket_CLOSE_NORMAL, NULL);
1068 
1069 #if defined(OPENSSL)
1070  SSL_SESSION_free(client->session); /* is a no-op if session is NULL */
1071  client->session = NULL; /* show the session has been freed */
1072  SSLSocket_close(&client->net);
1073 #endif
1074  Socket_close(client->net.socket);
1076  client->net.socket = 0;
1077 #if defined(OPENSSL)
1078  client->net.ssl = NULL;
1079 #endif
1080  }
1081  client->connected = 0;
1082  client->connect_state = NOT_IN_PROGRESS;
1083 
1084  if (client->MQTTVersion < MQTTVERSION_5 && client->cleansession)
1085  MQTTClient_cleanSession(client);
1086  FUNC_EXIT;
1087 }
1088 
1089 
1091 {
1092  int rc = 0;
1093 
1094  FUNC_ENTRY;
1095 #if !defined(NO_PERSISTENCE)
1096  rc = MQTTPersistence_clear(client);
1097 #endif
1101  client->msgID = 0;
1102  FUNC_EXIT_RC(rc);
1103  return rc;
1104 }
1105 
1106 
1107 void Protocol_processPublication(Publish* publish, Clients* client, int allocatePayload)
1108 {
1109  qEntry* qe = NULL;
1110  MQTTClient_message* mm = NULL;
1112 
1113  FUNC_ENTRY;
1114  qe = malloc(sizeof(qEntry));
1115  if (!qe)
1116  goto exit;
1117  mm = malloc(sizeof(MQTTClient_message));
1118  if (!mm)
1119  {
1120  free(qe);
1121  goto exit;
1122  }
1123  memcpy(mm, &initialized, sizeof(MQTTClient_message));
1124 
1125  qe->msg = mm;
1126  qe->topicName = publish->topic;
1127  qe->topicLen = publish->topiclen;
1128  publish->topic = NULL;
1129  if (allocatePayload)
1130  {
1131  mm->payload = malloc(publish->payloadlen);
1132  if (mm->payload == NULL)
1133  {
1134  free(mm);
1135  free(qe);
1136  goto exit;
1137  }
1138  memcpy(mm->payload, publish->payload, publish->payloadlen);
1139  }
1140  else
1141  mm->payload = publish->payload;
1142  mm->payloadlen = publish->payloadlen;
1143  mm->qos = publish->header.bits.qos;
1144  mm->retained = publish->header.bits.retain;
1145  if (publish->header.bits.qos == 2)
1146  mm->dup = 0; /* ensure that a QoS2 message is not passed to the application with dup = 1 */
1147  else
1148  mm->dup = publish->header.bits.dup;
1149  mm->msgid = publish->msgId;
1150 
1151  if (publish->MQTTVersion >= 5)
1152  mm->properties = MQTTProperties_copy(&publish->properties);
1153 
1154  ListAppend(client->messageQueue, qe, sizeof(qe) + sizeof(mm) + mm->payloadlen + strlen(qe->topicName)+1);
1155 #if !defined(NO_PERSISTENCE)
1156  if (client->persistence)
1158 #endif
1159 exit:
1160  FUNC_EXIT;
1161 }
1162 
1163 
1164 static MQTTResponse MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOptions* options, const char* serverURI, int MQTTVersion,
1165  START_TIME_TYPE start, ELAPSED_TIME_TYPE millisecsTimeout, MQTTProperties* connectProperties, MQTTProperties* willProperties)
1166 {
1167  MQTTClients* m = handle;
1168  int rc = SOCKET_ERROR;
1169  int sessionPresent = 0;
1171 
1172  FUNC_ENTRY;
1173  resp.reasonCode = SOCKET_ERROR;
1174  if (m->ma && !running)
1175  {
1176  Thread_start(MQTTClient_run, handle);
1177  if (MQTTTime_elapsed(start) >= millisecsTimeout)
1178  {
1179  rc = SOCKET_ERROR;
1180  goto exit;
1181  }
1182  MQTTTime_sleep(100L);
1183  }
1184 
1185  Log(TRACE_MIN, -1, "Connecting to serverURI %s with MQTT version %d", serverURI, MQTTVersion);
1186 #if defined(OPENSSL)
1187 #if defined(__GNUC__) && defined(__linux__)
1188  rc = MQTTProtocol_connect(serverURI, m->c, m->ssl, m->websocket, MQTTVersion, connectProperties, willProperties,
1189  millisecsTimeout - MQTTTime_elapsed(start));
1190 #else
1191  rc = MQTTProtocol_connect(serverURI, m->c, m->ssl, m->websocket, MQTTVersion, connectProperties, willProperties);
1192 #endif
1193 #else
1194 #if defined(__GNUC__) && defined(__linux__)
1195  rc = MQTTProtocol_connect(serverURI, m->c, m->websocket, MQTTVersion, connectProperties, willProperties,
1196  millisecsTimeout - MQTTTime_elapsed(start));
1197 #else
1198  rc = MQTTProtocol_connect(serverURI, m->c, m->websocket, MQTTVersion, connectProperties, willProperties);
1199 #endif
1200 #endif
1201  if (rc == SOCKET_ERROR)
1202  goto exit;
1203 
1204  if (m->c->connect_state == NOT_IN_PROGRESS)
1205  {
1206  rc = SOCKET_ERROR;
1207  goto exit;
1208  }
1209 
1210  if (m->c->connect_state == TCP_IN_PROGRESS) /* TCP connect started - wait for completion */
1211  {
1213  MQTTClient_waitfor(handle, CONNECT, &rc, millisecsTimeout - MQTTTime_elapsed(start));
1215  if (rc != 0)
1216  {
1217  rc = SOCKET_ERROR;
1218  goto exit;
1219  }
1220 #if defined(OPENSSL)
1221  if (m->ssl)
1222  {
1223  int port1;
1224  size_t hostname_len;
1225  const char *topic;
1226  int setSocketForSSLrc = 0;
1227 
1228  if (m->websocket && m->c->net.https_proxy) {
1230  if ((rc = WebSocket_proxy_connect( &m->c->net, 1, serverURI)) == SOCKET_ERROR )
1231  goto exit;
1232  }
1233 
1234  hostname_len = MQTTProtocol_addressPort(serverURI, &port1, &topic, MQTT_DEFAULT_PORT);
1235  setSocketForSSLrc = SSLSocket_setSocketForSSL(&m->c->net, m->c->sslopts,
1236  serverURI, hostname_len);
1237 
1238  if (setSocketForSSLrc != MQTTCLIENT_SUCCESS)
1239  {
1240  if (m->c->session != NULL)
1241  if ((rc = SSL_set_session(m->c->net.ssl, m->c->session)) != 1)
1242  Log(TRACE_MIN, -1, "Failed to set SSL session with stored data, non critical");
1243  rc = m->c->sslopts->struct_version >= 3 ?
1244  SSLSocket_connect(m->c->net.ssl, m->c->net.socket, serverURI,
1245  m->c->sslopts->verify, m->c->sslopts->ssl_error_cb, m->c->sslopts->ssl_error_context) :
1246  SSLSocket_connect(m->c->net.ssl, m->c->net.socket, serverURI,
1247  m->c->sslopts->verify, NULL, NULL);
1248  if (rc == TCPSOCKET_INTERRUPTED)
1249  m->c->connect_state = SSL_IN_PROGRESS; /* the connect is still in progress */
1250  else if (rc == SSL_FATAL)
1251  {
1252  rc = SOCKET_ERROR;
1253  goto exit;
1254  }
1255  else if (rc == 1)
1256  {
1257  if (m->websocket)
1258  {
1260  rc = WebSocket_connect(&m->c->net, serverURI);
1261  if ( rc == SOCKET_ERROR )
1262  goto exit;
1263  }
1264  else
1265  {
1266  rc = MQTTCLIENT_SUCCESS;
1268  if (MQTTPacket_send_connect(m->c, MQTTVersion, connectProperties, willProperties) == SOCKET_ERROR)
1269  {
1270  rc = SOCKET_ERROR;
1271  goto exit;
1272  }
1273  if ((m->c->cleansession == 0 && m->c->cleanstart == 0) && m->c->session == NULL)
1274  m->c->session = SSL_get1_session(m->c->net.ssl);
1275  }
1276  }
1277  }
1278  else
1279  {
1280  rc = SOCKET_ERROR;
1281  goto exit;
1282  }
1283  }
1284 #endif
1285  else if (m->websocket)
1286  {
1287  if (m->c->net.http_proxy) {
1289  if ((rc = WebSocket_proxy_connect( &m->c->net, 0, serverURI)) == SOCKET_ERROR )
1290  goto exit;
1291  }
1292 
1294  if ( WebSocket_connect(&m->c->net, serverURI) == SOCKET_ERROR )
1295  {
1296  rc = SOCKET_ERROR;
1297  goto exit;
1298  }
1299  }
1300  else
1301  {
1302  m->c->connect_state = WAIT_FOR_CONNACK; /* TCP connect completed, in which case send the MQTT connect packet */
1303  if (MQTTPacket_send_connect(m->c, MQTTVersion, connectProperties, willProperties) == SOCKET_ERROR)
1304  {
1305  rc = SOCKET_ERROR;
1306  goto exit;
1307  }
1308  }
1309  }
1310 
1311 #if defined(OPENSSL)
1312  if (m->c->connect_state == SSL_IN_PROGRESS) /* SSL connect sent - wait for completion */
1313  {
1315  MQTTClient_waitfor(handle, CONNECT, &rc, millisecsTimeout - MQTTTime_elapsed(start));
1317  if (rc != 1)
1318  {
1319  rc = SOCKET_ERROR;
1320  goto exit;
1321  }
1322  if((m->c->cleansession == 0 && m->c->cleanstart == 0) && m->c->session == NULL)
1323  m->c->session = SSL_get1_session(m->c->net.ssl);
1324 
1325  if ( m->websocket )
1326  {
1327  /* wait for websocket connect */
1329  rc = WebSocket_connect( &m->c->net, serverURI );
1330  if ( rc != 1 )
1331  {
1332  rc = SOCKET_ERROR;
1333  goto exit;
1334  }
1335  }
1336  else
1337  {
1338  m->c->connect_state = WAIT_FOR_CONNACK; /* TCP connect completed, in which case send the MQTT connect packet */
1339  if (MQTTPacket_send_connect(m->c, MQTTVersion, connectProperties, willProperties) == SOCKET_ERROR)
1340  {
1341  rc = SOCKET_ERROR;
1342  goto exit;
1343  }
1344  }
1345  }
1346 #endif
1347 
1348  if (m->c->connect_state == WEBSOCKET_IN_PROGRESS) /* websocket request sent - wait for upgrade */
1349  {
1351  MQTTClient_waitfor(handle, CONNECT, &rc, millisecsTimeout - MQTTTime_elapsed(start));
1353  m->c->connect_state = WAIT_FOR_CONNACK; /* websocket upgrade complete */
1354  if (MQTTPacket_send_connect(m->c, MQTTVersion, connectProperties, willProperties) == SOCKET_ERROR)
1355  {
1356  rc = SOCKET_ERROR;
1357  goto exit;
1358  }
1359  }
1360 
1361  if (m->c->connect_state == WAIT_FOR_CONNACK) /* MQTT connect sent - wait for CONNACK */
1362  {
1363  MQTTPacket* pack = NULL;
1365  pack = MQTTClient_waitfor(handle, CONNACK, &rc, millisecsTimeout - MQTTTime_elapsed(start));
1367  if (pack == NULL)
1368  rc = SOCKET_ERROR;
1369  else
1370  {
1371  Connack* connack = (Connack*)pack;
1372  Log(TRACE_PROTOCOL, 1, NULL, m->c->net.socket, m->c->clientID, connack->rc);
1373  if ((rc = connack->rc) == MQTTCLIENT_SUCCESS)
1374  {
1375  m->c->connected = 1;
1376  m->c->good = 1;
1378  if (MQTTVersion == 4)
1379  sessionPresent = connack->flags.bits.sessionPresent;
1380  if (m->c->cleansession || m->c->cleanstart)
1381  rc = MQTTClient_cleanSession(m->c);
1382  if (m->c->outboundMsgs->count > 0)
1383  {
1384  ListElement* outcurrent = NULL;
1386 
1387  while (ListNextElement(m->c->outboundMsgs, &outcurrent))
1388  {
1389  Messages* m = (Messages*)(outcurrent->content);
1390  memset(&m->lastTouch, '\0', sizeof(m->lastTouch));
1391  }
1392  MQTTProtocol_retry(zero, 1, 1);
1393  if (m->c->connected != 1)
1395  }
1396  if (m->c->MQTTVersion == MQTTVERSION_5)
1397  {
1398  if ((resp.properties = malloc(sizeof(MQTTProperties))) == NULL)
1399  {
1400  rc = PAHO_MEMORY_ERROR;
1401  goto exit;
1402  }
1403  *resp.properties = MQTTProperties_copy(&connack->properties);
1404  }
1405  }
1406  MQTTPacket_freeConnack(connack);
1407  m->pack = NULL;
1408  }
1409  }
1410 exit:
1411  if (rc == MQTTCLIENT_SUCCESS)
1412  {
1413  if (options->struct_version >= 4) /* means we have to fill out return values */
1414  {
1415  options->returned.serverURI = serverURI;
1416  options->returned.MQTTVersion = MQTTVersion;
1417  options->returned.sessionPresent = sessionPresent;
1418  }
1419  }
1420  else
1421  MQTTClient_disconnect1(handle, 0, 0, (MQTTVersion == 3), MQTTREASONCODE_SUCCESS, NULL); /* don't want to call connection lost */
1422 
1423  resp.reasonCode = rc;
1424  FUNC_EXIT_RC(resp.reasonCode);
1425  return resp;
1426 }
1427 
1428 static int retryLoopInterval = 5;
1429 
1430 static void setRetryLoopInterval(int keepalive)
1431 {
1432  int proposed = keepalive / 10;
1433 
1434  if (proposed < 1)
1435  proposed = 1;
1436  else if (proposed > 5)
1437  proposed = 5;
1438  if (proposed < retryLoopInterval)
1439  retryLoopInterval = proposed;
1440 }
1441 
1442 
1444  MQTTProperties* connectProperties, MQTTProperties* willProperties)
1445 {
1446  MQTTClients* m = handle;
1447  START_TIME_TYPE start;
1448  ELAPSED_TIME_TYPE millisecsTimeout = 30000L;
1450  int MQTTVersion = 0;
1451 
1452  FUNC_ENTRY;
1453  rc.reasonCode = SOCKET_ERROR;
1454  millisecsTimeout = options->connectTimeout * 1000;
1455  start = MQTTTime_start_clock();
1456 
1457  m->currentServerURI = serverURI;
1458  m->c->keepAliveInterval = options->keepAliveInterval;
1459  m->c->retryInterval = options->retryInterval;
1461  m->c->MQTTVersion = options->MQTTVersion;
1462  m->c->cleanstart = m->c->cleansession = 0;
1463  if (m->c->MQTTVersion >= MQTTVERSION_5)
1464  m->c->cleanstart = options->cleanstart;
1465  else
1466  m->c->cleansession = options->cleansession;
1467  m->c->maxInflightMessages = (options->reliable) ? 1 : 10;
1468  if (options->struct_version >= 6)
1469  {
1470  if (options->maxInflightMessages > 0)
1471  m->c->maxInflightMessages = options->maxInflightMessages;
1472  }
1473 
1474  if (options->struct_version >= 7)
1475  {
1476  m->c->net.httpHeaders = options->httpHeaders;
1477  }
1478 
1479  if (m->c->will)
1480  {
1481  free(m->c->will->payload);
1482  free(m->c->will->topic);
1483  free(m->c->will);
1484  m->c->will = NULL;
1485  }
1486 
1487  if (options->will && (options->will->struct_version == 0 || options->will->struct_version == 1))
1488  {
1489  const void* source = NULL;
1490 
1491  if ((m->c->will = malloc(sizeof(willMessages))) == NULL)
1492  {
1494  goto exit;
1495  }
1496  if (options->will->message || (options->will->struct_version == 1 && options->will->payload.data))
1497  {
1498  if (options->will->struct_version == 1 && options->will->payload.data)
1499  {
1500  m->c->will->payloadlen = options->will->payload.len;
1501  source = options->will->payload.data;
1502  }
1503  else
1504  {
1505  m->c->will->payloadlen = (int)strlen(options->will->message);
1506  source = (void*)options->will->message;
1507  }
1508  if ((m->c->will->payload = malloc(m->c->will->payloadlen)) == NULL)
1509  {
1510  free(m->c->will);
1512  goto exit;
1513  }
1514  memcpy(m->c->will->payload, source, m->c->will->payloadlen);
1515  }
1516  else
1517  {
1518  m->c->will->payload = NULL;
1519  m->c->will->payloadlen = 0;
1520  }
1521  m->c->will->qos = options->will->qos;
1522  m->c->will->retained = options->will->retained;
1523  m->c->will->topic = MQTTStrdup(options->will->topicName);
1524  }
1525 
1526 #if defined(OPENSSL)
1527  if (m->c->sslopts)
1528  {
1529  if (m->c->sslopts->trustStore)
1530  free((void*)m->c->sslopts->trustStore);
1531  if (m->c->sslopts->keyStore)
1532  free((void*)m->c->sslopts->keyStore);
1533  if (m->c->sslopts->privateKey)
1534  free((void*)m->c->sslopts->privateKey);
1535  if (m->c->sslopts->privateKeyPassword)
1536  free((void*)m->c->sslopts->privateKeyPassword);
1537  if (m->c->sslopts->enabledCipherSuites)
1538  free((void*)m->c->sslopts->enabledCipherSuites);
1539  if (m->c->sslopts->struct_version >= 2)
1540  {
1541  if (m->c->sslopts->CApath)
1542  free((void*)m->c->sslopts->CApath);
1543  }
1544  free(m->c->sslopts);
1545  m->c->sslopts = NULL;
1546  }
1547 
1548  if (options->struct_version != 0 && options->ssl)
1549  {
1550  if ((m->c->sslopts = malloc(sizeof(MQTTClient_SSLOptions))) == NULL)
1551  {
1553  goto exit;
1554  }
1555  memset(m->c->sslopts, '\0', sizeof(MQTTClient_SSLOptions));
1556  m->c->sslopts->struct_version = options->ssl->struct_version;
1557  if (options->ssl->trustStore)
1558  m->c->sslopts->trustStore = MQTTStrdup(options->ssl->trustStore);
1559  if (options->ssl->keyStore)
1560  m->c->sslopts->keyStore = MQTTStrdup(options->ssl->keyStore);
1561  if (options->ssl->privateKey)
1562  m->c->sslopts->privateKey = MQTTStrdup(options->ssl->privateKey);
1563  if (options->ssl->privateKeyPassword)
1564  m->c->sslopts->privateKeyPassword = MQTTStrdup(options->ssl->privateKeyPassword);
1565  if (options->ssl->enabledCipherSuites)
1566  m->c->sslopts->enabledCipherSuites = MQTTStrdup(options->ssl->enabledCipherSuites);
1567  m->c->sslopts->enableServerCertAuth = options->ssl->enableServerCertAuth;
1568  if (m->c->sslopts->struct_version >= 1)
1569  m->c->sslopts->sslVersion = options->ssl->sslVersion;
1570  if (m->c->sslopts->struct_version >= 2)
1571  {
1572  m->c->sslopts->verify = options->ssl->verify;
1573  if (options->ssl->CApath)
1574  m->c->sslopts->CApath = MQTTStrdup(options->ssl->CApath);
1575  }
1576  if (m->c->sslopts->struct_version >= 3)
1577  {
1578  m->c->sslopts->ssl_error_cb = options->ssl->ssl_error_cb;
1579  m->c->sslopts->ssl_error_context = options->ssl->ssl_error_context;
1580  }
1581  if (m->c->sslopts->struct_version >= 4)
1582  {
1583  m->c->sslopts->ssl_psk_cb = options->ssl->ssl_psk_cb;
1584  m->c->sslopts->ssl_psk_context = options->ssl->ssl_psk_context;
1585  m->c->sslopts->disableDefaultTrustStore = options->ssl->disableDefaultTrustStore;
1586  }
1587  if (m->c->sslopts->struct_version >= 5)
1588  {
1589  m->c->sslopts->protos = options->ssl->protos;
1590  m->c->sslopts->protos_len = options->ssl->protos_len;
1591  }
1592  }
1593 #endif
1594 
1595  if (m->c->username)
1596  free((void*)m->c->username);
1597  if (options->username)
1598  m->c->username = MQTTStrdup(options->username);
1599  if (m->c->password)
1600  free((void*)m->c->password);
1601  if (options->password)
1602  {
1603  m->c->password = MQTTStrdup(options->password);
1604  m->c->passwordlen = (int)strlen(options->password);
1605  }
1606  else if (options->struct_version >= 5 && options->binarypwd.data)
1607  {
1608  m->c->passwordlen = options->binarypwd.len;
1609  if ((m->c->password = malloc(m->c->passwordlen)) == NULL)
1610  {
1612  goto exit;
1613  }
1614  memcpy((void*)m->c->password, options->binarypwd.data, m->c->passwordlen);
1615  }
1616 
1617  if (options->struct_version >= 3)
1618  MQTTVersion = options->MQTTVersion;
1619  else
1620  MQTTVersion = MQTTVERSION_DEFAULT;
1621 
1622  if (MQTTVersion == MQTTVERSION_DEFAULT)
1623  {
1624  rc = MQTTClient_connectURIVersion(handle, options, serverURI, 4, start, millisecsTimeout,
1625  connectProperties, willProperties);
1626  if (rc.reasonCode != MQTTCLIENT_SUCCESS)
1627  {
1628  rc = MQTTClient_connectURIVersion(handle, options, serverURI, 3, start, millisecsTimeout,
1629  connectProperties, willProperties);
1630  }
1631  }
1632  else
1633  rc = MQTTClient_connectURIVersion(handle, options, serverURI, MQTTVersion, start, millisecsTimeout,
1634  connectProperties, willProperties);
1635 
1636 exit:
1638  return rc;
1639 }
1640 
1642  MQTTProperties* connectProperties, MQTTProperties* willProperties);
1643 
1645 {
1646  MQTTClients* m = handle;
1647  MQTTResponse response;
1648 
1649  if (m->c->MQTTVersion >= MQTTVERSION_5)
1651 
1652  response = MQTTClient_connectAll(handle, options, NULL, NULL);
1653 
1654  return response.reasonCode;
1655 }
1656 
1657 
1659  MQTTProperties* connectProperties, MQTTProperties* willProperties)
1660 {
1661  MQTTClients* m = handle;
1663 
1664  if (m->c->MQTTVersion < MQTTVERSION_5)
1665  {
1667  return response;
1668  }
1669 
1670  return MQTTClient_connectAll(handle, options, connectProperties, willProperties);
1671 }
1672 
1673 
1675  MQTTProperties* connectProperties, MQTTProperties* willProperties)
1676 {
1677  MQTTClients* m = handle;
1679 
1680  FUNC_ENTRY;
1683 
1684  rc.reasonCode = SOCKET_ERROR;
1685  if (!library_initialized)
1686  {
1688  goto exit;
1689  }
1690 
1691  if (options == NULL)
1692  {
1694  goto exit;
1695  }
1696 
1697  if (strncmp(options->struct_id, "MQTC", 4) != 0 || options->struct_version < 0 || options->struct_version > 7)
1698  {
1700  goto exit;
1701  }
1702 
1703 #if defined(OPENSSL)
1704  if (m->ssl && options->ssl == NULL)
1705  {
1707  goto exit;
1708  }
1709 #endif
1710 
1711  if (options->will) /* check validity of will options structure */
1712  {
1713  if (strncmp(options->will->struct_id, "MQTW", 4) != 0 || (options->will->struct_version != 0 && options->will->struct_version != 1))
1714  {
1716  goto exit;
1717  }
1718  if (options->will->qos < 0 || options->will->qos > 2)
1719  {
1721  goto exit;
1722  }
1723  if (options->will->topicName == NULL)
1724  {
1726  goto exit;
1727  } else if (strlen(options->will->topicName) == 0)
1728  {
1730  goto exit;
1731  }
1732  }
1733 
1734 
1735 #if defined(OPENSSL)
1736  if (options->struct_version != 0 && options->ssl) /* check validity of SSL options structure */
1737  {
1738  if (strncmp(options->ssl->struct_id, "MQTS", 4) != 0 || options->ssl->struct_version < 0 || options->ssl->struct_version > 5)
1739  {
1741  goto exit;
1742  }
1743  }
1744 #endif
1745 
1746  if ((options->username && !UTF8_validateString(options->username)) ||
1747  (options->password && !UTF8_validateString(options->password)))
1748  {
1750  goto exit;
1751  }
1752 
1753  if (options->MQTTVersion != MQTTVERSION_DEFAULT &&
1754  (options->MQTTVersion < MQTTVERSION_3_1 || options->MQTTVersion > MQTTVERSION_5))
1755  {
1757  goto exit;
1758  }
1759 
1760  if (options->MQTTVersion >= MQTTVERSION_5)
1761  {
1762  if (options->cleansession != 0)
1763  {
1765  goto exit;
1766  }
1767  }
1768  else if (options->cleanstart != 0)
1769  {
1771  goto exit;
1772  }
1773 
1774  if (options->struct_version < 2 || options->serverURIcount == 0)
1775  {
1776  if ( !m )
1777  {
1779  goto exit;
1780  }
1781  rc = MQTTClient_connectURI(handle, options, m->serverURI, connectProperties, willProperties);
1782  }
1783  else
1784  {
1785  int i;
1786 
1787  for (i = 0; i < options->serverURIcount; ++i)
1788  {
1789  char* serverURI = options->serverURIs[i];
1790 
1791  if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) == 0)
1792  serverURI += strlen(URI_TCP);
1793  else if (strncmp(URI_WS, serverURI, strlen(URI_WS)) == 0)
1794  {
1795  serverURI += strlen(URI_WS);
1796  m->websocket = 1;
1797  }
1798 #if defined(OPENSSL)
1799  else if (strncmp(URI_SSL, serverURI, strlen(URI_SSL)) == 0)
1800  {
1801  serverURI += strlen(URI_SSL);
1802  m->ssl = 1;
1803  }
1804  else if (strncmp(URI_WSS, serverURI, strlen(URI_WSS)) == 0)
1805  {
1806  serverURI += strlen(URI_WSS);
1807  m->ssl = 1;
1808  m->websocket = 1;
1809  }
1810 #endif
1811  rc = MQTTClient_connectURI(handle, options, serverURI, connectProperties, willProperties);
1813  break;
1814  }
1815  }
1817  {
1819  {
1821  if (m->c->maxInflightMessages > recv_max)
1822  m->c->maxInflightMessages = recv_max;
1823  }
1824  }
1825 
1826 exit:
1827  if (m && m->c && m->c->will)
1828  {
1829  if (m->c->will->payload)
1830  free(m->c->will->payload);
1831  if (m->c->will->topic)
1832  free(m->c->will->topic);
1833  free(m->c->will);
1834  m->c->will = NULL;
1835  }
1839  return rc;
1840 }
1841 
1842 
1846 static int MQTTClient_disconnect1(MQTTClient handle, int timeout, int call_connection_lost, int stop,
1847  enum MQTTReasonCodes reason, MQTTProperties* props)
1848 {
1849  MQTTClients* m = handle;
1850  START_TIME_TYPE start;
1851  int rc = MQTTCLIENT_SUCCESS;
1852  int was_connected = 0;
1853 
1854  FUNC_ENTRY;
1855  if (m == NULL || m->c == NULL)
1856  {
1857  rc = MQTTCLIENT_FAILURE;
1858  goto exit;
1859  }
1860  was_connected = m->c->connected; /* should be 1 */
1861  if (m->c->connected != 0)
1862  {
1863  start = MQTTTime_start_clock();
1864  m->c->connect_state = DISCONNECTING; /* indicate disconnecting */
1865  while (m->c->inboundMsgs->count > 0 || m->c->outboundMsgs->count > 0)
1866  { /* wait for all inflight message flows to finish, up to timeout */
1867  if (MQTTTime_elapsed(start) >= (ELAPSED_TIME_TYPE)timeout)
1868  break;
1870  MQTTClient_yield();
1872  }
1873  }
1874 
1875  MQTTClient_closeSession(m->c, reason, props);
1876 
1877 exit:
1878  if (stop)
1879  MQTTClient_stop();
1880  if (call_connection_lost && m->cl && was_connected)
1881  {
1882  Log(TRACE_MIN, -1, "Calling connectionLost for client %s", m->c->clientID);
1884  }
1885  FUNC_EXIT_RC(rc);
1886  return rc;
1887 }
1888 
1889 
1893 static int MQTTClient_disconnect_internal(MQTTClient handle, int timeout)
1894 {
1895  return MQTTClient_disconnect1(handle, timeout, 1, 1, MQTTREASONCODE_SUCCESS, NULL);
1896 }
1897 
1898 
1903 {
1905 }
1906 
1907 
1908 int MQTTClient_disconnect(MQTTClient handle, int timeout)
1909 {
1910  int rc = 0;
1911 
1913  rc = MQTTClient_disconnect1(handle, timeout, 0, 1, MQTTREASONCODE_SUCCESS, NULL);
1915  return rc;
1916 }
1917 
1918 
1920 {
1921  int rc = 0;
1922 
1924  rc = MQTTClient_disconnect1(handle, timeout, 0, 1, reason, props);
1926  return rc;
1927 }
1928 
1929 
1931 {
1932  MQTTClients* m = handle;
1933  int rc = 0;
1934 
1935  FUNC_ENTRY;
1937  if (m && m->c)
1938  rc = m->c->connected;
1940  FUNC_EXIT_RC(rc);
1941  return rc;
1942 }
1943 
1944 
1947 {
1948  MQTTClients* m = handle;
1949  List* topics = NULL;
1950  List* qoss = NULL;
1951  int i = 0;
1952  int rc = MQTTCLIENT_FAILURE;
1954  int msgid = 0;
1955 
1956  FUNC_ENTRY;
1959 
1961  if (m == NULL || m->c == NULL)
1962  {
1963  rc = MQTTCLIENT_FAILURE;
1964  goto exit;
1965  }
1966  if (m->c->connected == 0)
1967  {
1969  goto exit;
1970  }
1971  for (i = 0; i < count; i++)
1972  {
1973  if (!UTF8_validateString(topic[i]))
1974  {
1976  goto exit;
1977  }
1978 
1979  if (qos[i] < 0 || qos[i] > 2)
1980  {
1981  rc = MQTTCLIENT_BAD_QOS;
1982  goto exit;
1983  }
1984  }
1985  if ((msgid = MQTTProtocol_assignMsgId(m->c)) == 0)
1986  {
1988  goto exit;
1989  }
1990 
1991  topics = ListInitialize();
1992  qoss = ListInitialize();
1993  for (i = 0; i < count; i++)
1994  {
1995  ListAppend(topics, topic[i], strlen(topic[i]));
1996  ListAppend(qoss, &qos[i], sizeof(int));
1997  }
1998 
1999  rc = MQTTProtocol_subscribe(m->c, topics, qoss, msgid, opts, props);
2000  ListFreeNoContent(topics);
2001  ListFreeNoContent(qoss);
2002 
2003  if (rc == TCPSOCKET_COMPLETE)
2004  {
2005  MQTTPacket* pack = NULL;
2006 
2008  pack = MQTTClient_waitfor(handle, SUBACK, &rc, 10000L);
2010  if (pack != NULL)
2011  {
2012  Suback* sub = (Suback*)pack;
2013 
2014  if (m->c->MQTTVersion == MQTTVERSION_5)
2015  {
2016  if (sub->properties.count > 0)
2017  {
2018  if ((resp.properties = malloc(sizeof(MQTTProperties))) == NULL)
2019  {
2020  rc = PAHO_MEMORY_ERROR;
2021  goto exit;
2022  }
2023  *resp.properties = MQTTProperties_copy(&sub->properties);
2024  }
2025  resp.reasonCodeCount = sub->qoss->count;
2026  resp.reasonCode = *(int*)sub->qoss->first->content;
2027  if (sub->qoss->count > 1)
2028  {
2029  ListElement* current = NULL;
2030  int rc_count = 0;
2031 
2032  if ((resp.reasonCodes = malloc(sizeof(enum MQTTReasonCodes) * (sub->qoss->count))) == NULL)
2033  {
2034  rc = PAHO_MEMORY_ERROR;
2035  goto exit;
2036  }
2037  while (ListNextElement(sub->qoss, &current))
2038  (resp.reasonCodes)[rc_count++] = *(enum MQTTReasonCodes*)(current->content);
2039  }
2040  }
2041  else
2042  {
2043  ListElement* current = NULL;
2044  i = 0;
2045  while (ListNextElement(sub->qoss, &current))
2046  {
2047  int* reqqos = (int*)(current->content);
2048  qos[i++] = *reqqos;
2049  }
2050  resp.reasonCode = rc;
2051  }
2052  rc = MQTTProtocol_handleSubacks(pack, m->c->net.socket);
2053  m->pack = NULL;
2054  }
2055  else
2056  rc = SOCKET_ERROR;
2057  }
2058 
2059  if (rc == SOCKET_ERROR)
2060  MQTTClient_disconnect_internal(handle, 0);
2061  else if (rc == TCPSOCKET_COMPLETE)
2062  rc = MQTTCLIENT_SUCCESS;
2063 
2064 exit:
2065  if (rc < 0)
2066  resp.reasonCode = rc;
2069  FUNC_EXIT_RC(resp.reasonCode);
2070  return resp;
2071 }
2072 
2073 
2074 
2075 int MQTTClient_subscribeMany(MQTTClient handle, int count, char* const* topic, int* qos)
2076 {
2077  MQTTClients* m = handle;
2079 
2080  if (m->c->MQTTVersion >= MQTTVERSION_5)
2082  else
2083  response = MQTTClient_subscribeMany5(handle, count, topic, qos, NULL, NULL);
2084 
2085  return response.reasonCode;
2086 }
2087 
2088 
2089 
2092 {
2093  MQTTResponse rc;
2094 
2095  FUNC_ENTRY;
2096  rc = MQTTClient_subscribeMany5(handle, 1, (char * const *)(&topic), &qos, opts, props);
2097  if (qos == MQTT_BAD_SUBSCRIBE) /* addition for MQTT 3.1.1 - error code from subscribe */
2100  return rc;
2101 }
2102 
2103 
2104 int MQTTClient_subscribe(MQTTClient handle, const char* topic, int qos)
2105 {
2106  MQTTClients* m = handle;
2108 
2109  if (m->c->MQTTVersion >= MQTTVERSION_5)
2111  else
2112  response = MQTTClient_subscribe5(handle, topic, qos, NULL, NULL);
2113 
2114  return response.reasonCode;
2115 }
2116 
2117 
2119 {
2120  MQTTClients* m = handle;
2121  List* topics = NULL;
2122  int i = 0;
2123  int rc = SOCKET_ERROR;
2125  int msgid = 0;
2126 
2127  FUNC_ENTRY;
2130 
2132  if (m == NULL || m->c == NULL)
2133  {
2134  rc = MQTTCLIENT_FAILURE;
2135  goto exit;
2136  }
2137  if (m->c->connected == 0)
2138  {
2140  goto exit;
2141  }
2142  for (i = 0; i < count; i++)
2143  {
2144  if (!UTF8_validateString(topic[i]))
2145  {
2147  goto exit;
2148  }
2149  }
2150  if ((msgid = MQTTProtocol_assignMsgId(m->c)) == 0)
2151  {
2153  goto exit;
2154  }
2155 
2156  topics = ListInitialize();
2157  for (i = 0; i < count; i++)
2158  ListAppend(topics, topic[i], strlen(topic[i]));
2159  rc = MQTTProtocol_unsubscribe(m->c, topics, msgid, props);
2160  ListFreeNoContent(topics);
2161 
2162  if (rc == TCPSOCKET_COMPLETE)
2163  {
2164  MQTTPacket* pack = NULL;
2165 
2167  pack = MQTTClient_waitfor(handle, UNSUBACK, &rc, 10000L);
2169  if (pack != NULL)
2170  {
2171  Unsuback* unsub = (Unsuback*)pack;
2172 
2173  if (m->c->MQTTVersion == MQTTVERSION_5)
2174  {
2175  if (unsub->properties.count > 0)
2176  {
2177  if ((resp.properties = malloc(sizeof(MQTTProperties))) == NULL)
2178  {
2179  rc = PAHO_MEMORY_ERROR;
2180  goto exit;
2181  }
2182  *resp.properties = MQTTProperties_copy(&unsub->properties);
2183  }
2184  resp.reasonCodeCount = unsub->reasonCodes->count;
2185  resp.reasonCode = *(int*)unsub->reasonCodes->first->content;
2186  if (unsub->reasonCodes->count > 1)
2187  {
2188  ListElement* current = NULL;
2189  int rc_count = 0;
2190 
2191  if ((resp.reasonCodes = malloc(sizeof(enum MQTTReasonCodes) * (unsub->reasonCodes->count))) == NULL)
2192  {
2193  rc = PAHO_MEMORY_ERROR;
2194  goto exit;
2195  }
2196  while (ListNextElement(unsub->reasonCodes, &current))
2197  (resp.reasonCodes)[rc_count++] = *(enum MQTTReasonCodes*)(current->content);
2198  }
2199  }
2200  else
2201  resp.reasonCode = rc;
2202  rc = MQTTProtocol_handleUnsubacks(pack, m->c->net.socket);
2203  m->pack = NULL;
2204  }
2205  else
2206  rc = SOCKET_ERROR;
2207  }
2208 
2209  if (rc == SOCKET_ERROR)
2210  MQTTClient_disconnect_internal(handle, 0);
2211 
2212 exit:
2213  if (rc < 0)
2214  resp.reasonCode = rc;
2217  FUNC_EXIT_RC(resp.reasonCode);
2218  return resp;
2219 }
2220 
2221 
2222 int MQTTClient_unsubscribeMany(MQTTClient handle, int count, char* const* topic)
2223 {
2224  MQTTResponse response = MQTTClient_unsubscribeMany5(handle, count, topic, NULL);
2225 
2226  return response.reasonCode;
2227 }
2228 
2229 
2231 {
2232  MQTTResponse rc;
2233 
2234  rc = MQTTClient_unsubscribeMany5(handle, 1, (char * const *)(&topic), props);
2235  return rc;
2236 }
2237 
2238 
2239 int MQTTClient_unsubscribe(MQTTClient handle, const char* topic)
2240 {
2241  MQTTResponse response = MQTTClient_unsubscribe5(handle, topic, NULL);
2242 
2243  return response.reasonCode;
2244 }
2245 
2246 
2247 MQTTResponse MQTTClient_publish5(MQTTClient handle, const char* topicName, int payloadlen, const void* payload,
2248  int qos, int retained, MQTTProperties* properties, MQTTClient_deliveryToken* deliveryToken)
2249 {
2250  int rc = MQTTCLIENT_SUCCESS;
2251  MQTTClients* m = handle;
2252  Messages* msg = NULL;
2253  Publish* p = NULL;
2254  int blocked = 0;
2255  int msgid = 0;
2257 
2258  FUNC_ENTRY;
2260 
2261  if (m == NULL || m->c == NULL)
2262  rc = MQTTCLIENT_FAILURE;
2263  else if (m->c->connected == 0)
2265  else if (!UTF8_validateString(topicName))
2267 
2268  if (rc != MQTTCLIENT_SUCCESS)
2269  goto exit;
2270 
2271  /* If outbound queue is full, block until it is not */
2272  while (m->c->outboundMsgs->count >= m->c->maxInflightMessages ||
2273  Socket_noPendingWrites(m->c->net.socket) == 0) /* wait until the socket is free of large packets being written */
2274  {
2275  if (blocked == 0)
2276  {
2277  blocked = 1;
2278  Log(TRACE_MIN, -1, "Blocking publish on queue full for client %s", m->c->clientID);
2279  }
2281  MQTTClient_yield();
2283  if (m->c->connected == 0)
2284  {
2285  rc = MQTTCLIENT_FAILURE;
2286  goto exit;
2287  }
2288  }
2289  if (blocked == 1)
2290  Log(TRACE_MIN, -1, "Resuming publish now queue not full for client %s", m->c->clientID);
2291  if (qos > 0 && (msgid = MQTTProtocol_assignMsgId(m->c)) == 0)
2292  { /* this should never happen as we've waited for spaces in the queue */
2294  goto exit;
2295  }
2296 
2297  if ((p = malloc(sizeof(Publish))) == NULL)
2298  {
2299  rc = PAHO_MEMORY_ERROR;
2300  goto exit_and_free;
2301  }
2302  memset(p->mask, '\0', sizeof(p->mask));
2303  p->payload = NULL;
2304  p->payloadlen = payloadlen;
2305  if (payloadlen > 0)
2306  {
2307  if ((p->payload = malloc(payloadlen)) == NULL)
2308  {
2309  rc = PAHO_MEMORY_ERROR;
2310  goto exit_and_free;
2311  }
2312  memcpy(p->payload, payload, payloadlen);
2313  }
2314  if ((p->topic = MQTTStrdup(topicName)) == NULL)
2315  {
2316  rc = PAHO_MEMORY_ERROR;
2317  goto exit_and_free;
2318  }
2319  p->msgId = msgid;
2320  p->MQTTVersion = m->c->MQTTVersion;
2321  if (m->c->MQTTVersion >= MQTTVERSION_5)
2322  {
2323  if (properties)
2324  p->properties = *properties;
2325  else
2326  {
2328  p->properties = props;
2329  }
2330  }
2331 
2332  rc = MQTTProtocol_startPublish(m->c, p, qos, retained, &msg);
2333 
2334  /* If the packet was partially written to the socket, wait for it to complete.
2335  * However, if the client is disconnected during this time and qos is not 0, still return success, as
2336  * the packet has already been written to persistence and assigned a message id so will
2337  * be sent when the client next connects.
2338  */
2339  if (rc == TCPSOCKET_INTERRUPTED)
2340  {
2341  while (m->c->connected == 1)
2342  {
2343  pending_writes* writing = NULL;
2344 
2346  writing = SocketBuffer_getWrite(m->c->net.socket);
2348 
2349  if (writing == NULL)
2350  break;
2351 
2353  MQTTClient_yield();
2355  }
2356  rc = (qos > 0 || m->c->connected == 1) ? MQTTCLIENT_SUCCESS : MQTTCLIENT_FAILURE;
2357  }
2358 
2359  if (deliveryToken && qos > 0)
2360  *deliveryToken = msg->msgid;
2361 
2362 exit_and_free:
2363  if (p)
2364  {
2365  if (p->topic)
2366  free(p->topic);
2367  if (p->payload)
2368  free(p->payload);
2369  free(p);
2370  }
2371 
2372  if (rc == SOCKET_ERROR)
2373  {
2374  MQTTClient_disconnect_internal(handle, 0);
2375  /* Return success for qos > 0 as the send will be retried automatically */
2376  rc = (qos > 0) ? MQTTCLIENT_SUCCESS : MQTTCLIENT_FAILURE;
2377  }
2378 
2379 exit:
2381  resp.reasonCode = rc;
2382  FUNC_EXIT_RC(resp.reasonCode);
2383  return resp;
2384 }
2385 
2386 
2387 int MQTTClient_publish(MQTTClient handle, const char* topicName, int payloadlen, const void* payload,
2388  int qos, int retained, MQTTClient_deliveryToken* deliveryToken)
2389 {
2390  MQTTClients* m = handle;
2392 
2393  if (m->c->MQTTVersion >= MQTTVERSION_5)
2395  else
2396  rc = MQTTClient_publish5(handle, topicName, payloadlen, payload, qos, retained, NULL, deliveryToken);
2397  return rc.reasonCode;
2398 }
2399 
2400 
2402  MQTTClient_deliveryToken* deliveryToken)
2403 {
2405  MQTTProperties* props = NULL;
2406 
2407  FUNC_ENTRY;
2408  if (message == NULL)
2409  {
2411  goto exit;
2412  }
2413 
2414  if (strncmp(message->struct_id, "MQTM", 4) != 0 ||
2415  (message->struct_version != 0 && message->struct_version != 1))
2416  {
2418  goto exit;
2419  }
2420 
2421  if (message->struct_version >= 1)
2422  props = &message->properties;
2423 
2424  rc = MQTTClient_publish5(handle, topicName, message->payloadlen, message->payload,
2425  message->qos, message->retained, props, deliveryToken);
2426 exit:
2428  return rc;
2429 }
2430 
2431 
2432 int MQTTClient_publishMessage(MQTTClient handle, const char* topicName, MQTTClient_message* message,
2433  MQTTClient_deliveryToken* deliveryToken)
2434 {
2435  MQTTClients* m = handle;
2437 
2438  if (strncmp(message->struct_id, "MQTM", 4) != 0 ||
2439  (message->struct_version != 0 && message->struct_version != 1))
2441  else if (m->c->MQTTVersion >= MQTTVERSION_5)
2443  else
2444  rc = MQTTClient_publishMessage5(handle, topicName, message, deliveryToken);
2445  return rc.reasonCode;
2446 }
2447 
2448 
2449 static void MQTTClient_retry(void)
2450 {
2451  static START_TIME_TYPE last = START_TIME_ZERO;
2452  START_TIME_TYPE now;
2453 
2454  FUNC_ENTRY;
2455  now = MQTTTime_now();
2456  if (MQTTTime_difftime(now, last) > (retryLoopInterval * 1000))
2457  {
2458  last = MQTTTime_now();
2460  MQTTProtocol_retry(now, 1, 0);
2461  }
2462  else
2463  MQTTProtocol_retry(now, 0, 0);
2464  FUNC_EXIT;
2465 }
2466 
2467 
2469 {
2470  struct timeval tp = {0L, 0L};
2471  static Ack ack;
2472  MQTTPacket* pack = NULL;
2473 
2474  FUNC_ENTRY;
2475  if (timeout > 0L)
2476  {
2477  tp.tv_sec = (long)(timeout / 1000);
2478  tp.tv_usec = (long)((timeout % 1000) * 1000); /* this field is microseconds! */
2479  }
2480 
2481 #if defined(OPENSSL)
2482  if ((*sock = SSLSocket_getPendingRead()) == -1)
2483  {
2484  /* 0 from getReadySocket indicates no work to do, -1 == error, but can happen normally */
2485 #endif
2486  *sock = Socket_getReadySocket(0, &tp, socket_mutex);
2487 #if defined(OPENSSL)
2488  }
2489 #endif
2491  if (*sock > 0)
2492  {
2493  MQTTClients* m = NULL;
2494  if (ListFindItem(handles, sock, clientSockCompare) != NULL)
2495  m = (MQTTClient)(handles->current->content);
2496  if (m != NULL)
2497  {
2499  *rc = 0; /* waiting for connect state to clear */
2500  else if (m->c->connect_state == WEBSOCKET_IN_PROGRESS)
2501  *rc = WebSocket_upgrade(&m->c->net);
2502  else
2503  {
2504  pack = MQTTPacket_Factory(m->c->MQTTVersion, &m->c->net, rc);
2505  if (*rc == TCPSOCKET_INTERRUPTED)
2506  *rc = 0;
2507  }
2508  }
2509 
2510  if (pack)
2511  {
2512  int freed = 1;
2513 
2514  /* Note that these handle... functions free the packet structure that they are dealing with */
2515  if (pack->header.bits.type == PUBLISH)
2516  *rc = MQTTProtocol_handlePublishes(pack, *sock);
2517  else if (pack->header.bits.type == PUBACK || pack->header.bits.type == PUBCOMP)
2518  {
2519  int msgid;
2520 
2521  ack = (pack->header.bits.type == PUBCOMP) ? *(Pubcomp*)pack : *(Puback*)pack;
2522  msgid = ack.msgId;
2523  if (m && m->c->MQTTVersion >= MQTTVERSION_5 && m->published)
2524  {
2525  Log(TRACE_MIN, -1, "Calling published for client %s, msgid %d", m->c->clientID, msgid);
2526  (*(m->published))(m->published_context, msgid, pack->header.bits.type, &ack.properties, ack.rc);
2527  }
2528  *rc = (pack->header.bits.type == PUBCOMP) ?
2529  MQTTProtocol_handlePubcomps(pack, *sock) : MQTTProtocol_handlePubacks(pack, *sock);
2530  if (m && m->dc)
2531  {
2532  Log(TRACE_MIN, -1, "Calling deliveryComplete for client %s, msgid %d", m->c->clientID, msgid);
2533  (*(m->dc))(m->context, msgid);
2534  }
2535  }
2536  else if (pack->header.bits.type == PUBREC)
2537  {
2538  Pubrec* pubrec = (Pubrec*)pack;
2539 
2540  if (m && m->c->MQTTVersion >= MQTTVERSION_5 && m->published && pubrec->rc >= MQTTREASONCODE_UNSPECIFIED_ERROR)
2541  {
2542  Log(TRACE_MIN, -1, "Calling published for client %s, msgid %d", m->c->clientID, ack.msgId);
2543  (*(m->published))(m->published_context, pubrec->msgId, pack->header.bits.type,
2544  &pubrec->properties, pubrec->rc);
2545  }
2546  *rc = MQTTProtocol_handlePubrecs(pack, *sock);
2547  }
2548  else if (pack->header.bits.type == PUBREL)
2549  *rc = MQTTProtocol_handlePubrels(pack, *sock);
2550  else if (pack->header.bits.type == PINGRESP)
2551  *rc = MQTTProtocol_handlePingresps(pack, *sock);
2552  else
2553  freed = 0;
2554  if (freed)
2555  pack = NULL;
2556  }
2557  }
2558  MQTTClient_retry();
2560  FUNC_EXIT_RC(*rc);
2561  return pack;
2562 }
2563 
2564 
2566 {
2567  MQTTPacket* pack = NULL;
2568  MQTTClients* m = handle;
2570 
2571  FUNC_ENTRY;
2572  if (((MQTTClients*)handle) == NULL || timeout <= 0L)
2573  {
2574  *rc = MQTTCLIENT_FAILURE;
2575  goto exit;
2576  }
2577 
2578  if (running)
2579  {
2580  if (packet_type == CONNECT)
2581  {
2582  if ((*rc = Thread_wait_sem(m->connect_sem, (int)timeout)) == 0)
2583  *rc = m->rc;
2584  }
2585  else if (packet_type == CONNACK)
2586  *rc = Thread_wait_sem(m->connack_sem, (int)timeout);
2587  else if (packet_type == SUBACK)
2588  *rc = Thread_wait_sem(m->suback_sem, (int)timeout);
2589  else if (packet_type == UNSUBACK)
2590  *rc = Thread_wait_sem(m->unsuback_sem, (int)timeout);
2591  if (*rc == 0 && packet_type != CONNECT && m->pack == NULL)
2592  Log(LOG_ERROR, -1, "waitfor unexpectedly is NULL for client %s, packet_type %d, timeout %ld", m->c->clientID, packet_type, timeout);
2593  pack = m->pack;
2594  }
2595  else
2596  {
2597  *rc = TCPSOCKET_COMPLETE;
2598  while (1)
2599  {
2600  int sock = -1;
2601  pack = MQTTClient_cycle(&sock, 100L, rc);
2602  if (sock == m->c->net.socket)
2603  {
2604  if (*rc == SOCKET_ERROR)
2605  break;
2606  if (pack && (pack->header.bits.type == packet_type))
2607  break;
2608  if (m->c->connect_state == TCP_IN_PROGRESS)
2609  {
2610  int error;
2611  socklen_t len = sizeof(error);
2612 
2613  if ((*rc = getsockopt(m->c->net.socket, SOL_SOCKET, SO_ERROR, (char*)&error, &len)) == 0)
2614  *rc = error;
2615  break;
2616  }
2617 #if defined(OPENSSL)
2618  else if (m->c->connect_state == SSL_IN_PROGRESS)
2619  {
2620 
2621  *rc = m->c->sslopts->struct_version >= 3 ?
2622  SSLSocket_connect(m->c->net.ssl, sock, m->currentServerURI,
2623  m->c->sslopts->verify, m->c->sslopts->ssl_error_cb, m->c->sslopts->ssl_error_context) :
2624  SSLSocket_connect(m->c->net.ssl, sock, m->currentServerURI,
2625  m->c->sslopts->verify, NULL, NULL);
2626  if (*rc == SSL_FATAL)
2627  break;
2628  else if (*rc == 1) /* rc == 1 means SSL connect has finished and succeeded */
2629  {
2630  if ((m->c->cleansession == 0 && m->c->cleanstart == 0) && m->c->session == NULL)
2631  m->c->session = SSL_get1_session(m->c->net.ssl);
2632  break;
2633  }
2634  }
2635 #endif
2636  else if (m->c->connect_state == WEBSOCKET_IN_PROGRESS )
2637  {
2638  *rc = 1;
2639  break;
2640  }
2641  else if (m->c->connect_state == PROXY_CONNECT_IN_PROGRESS )
2642  {
2643  *rc = 1;
2644  break;
2645  }
2646  else if (m->c->connect_state == WAIT_FOR_CONNACK)
2647  {
2648  int error;
2649  socklen_t len = sizeof(error);
2650  if (getsockopt(m->c->net.socket, SOL_SOCKET, SO_ERROR, (char*)&error, &len) == 0)
2651  {
2652  if (error)
2653  {
2654  *rc = error;
2655  break;
2656  }
2657  }
2658  }
2659  }
2660  if (MQTTTime_elapsed(start) > (ELAPSED_TIME_TYPE)timeout)
2661  {
2662  pack = NULL;
2663  break;
2664  }
2665  }
2666  }
2667 
2668 exit:
2669  FUNC_EXIT_RC(*rc);
2670  return pack;
2671 }
2672 
2673 
2674 int MQTTClient_receive(MQTTClient handle, char** topicName, int* topicLen, MQTTClient_message** message,
2675  unsigned long timeout)
2676 {
2677  int rc = TCPSOCKET_COMPLETE;
2680  MQTTClients* m = handle;
2681 
2682  FUNC_ENTRY;
2683  if (m == NULL || m->c == NULL
2684  || running) /* receive is not meant to be called in a multi-thread environment */
2685  {
2686  rc = MQTTCLIENT_FAILURE;
2687  goto exit;
2688  }
2689  if (m->c->connected == 0)
2690  {
2692  goto exit;
2693  }
2694 
2695  *topicName = NULL;
2696  *message = NULL;
2697 
2698  /* if there is already a message waiting, don't hang around but still do some packet handling */
2699  if (m->c->messageQueue->count > 0)
2700  timeout = 0L;
2701 
2702  elapsed = MQTTTime_elapsed(start);
2703  do
2704  {
2705  int sock = 0;
2706  MQTTClient_cycle(&sock, (timeout > elapsed) ? timeout - elapsed : 0L, &rc);
2707 
2708  if (rc == SOCKET_ERROR)
2709  {
2710  if (ListFindItem(handles, &sock, clientSockCompare) && /* find client corresponding to socket */
2711  (MQTTClient)(handles->current->content) == handle)
2712  break; /* there was an error on the socket we are interested in */
2713  }
2714  elapsed = MQTTTime_elapsed(start);
2715  }
2716  while (elapsed < timeout && m->c->messageQueue->count == 0);
2717 
2718  if (m->c->messageQueue->count > 0)
2719  rc = MQTTClient_deliverMessage(rc, m, topicName, topicLen, message);
2720 
2721  if (rc == SOCKET_ERROR)
2722  MQTTClient_disconnect_internal(handle, 0);
2723 
2724 exit:
2725  FUNC_EXIT_RC(rc);
2726  return rc;
2727 }
2728 
2729 
2731 {
2734  ELAPSED_TIME_TYPE timeout = 100L;
2735  int rc = 0;
2736 
2737  FUNC_ENTRY;
2738  if (running) /* yield is not meant to be called in a multi-thread environment */
2739  {
2740  MQTTTime_sleep(timeout);
2741  goto exit;
2742  }
2743 
2744  elapsed = MQTTTime_elapsed(start);
2745  do
2746  {
2747  int sock = -1;
2748  MQTTClient_cycle(&sock, (timeout > elapsed) ? timeout - elapsed : 0L, &rc);
2750  if (rc == SOCKET_ERROR && ListFindItem(handles, &sock, clientSockCompare))
2751  {
2752  MQTTClients* m = (MQTTClient)(handles->current->content);
2753  if (m->c->connect_state != DISCONNECTING)
2755  }
2757  elapsed = MQTTTime_elapsed(start);
2758  }
2759  while (elapsed < timeout);
2760 exit:
2761  FUNC_EXIT;
2762 }
2763 
2764 /*
2765 static int pubCompare(void* a, void* b)
2766 {
2767  Messages* msg = (Messages*)a;
2768  return msg->publish == (Publications*)b;
2769 }*/
2770 
2771 
2773 {
2774  int rc = MQTTCLIENT_FAILURE;
2777  MQTTClients* m = handle;
2778 
2779  FUNC_ENTRY;
2781 
2782  if (m == NULL || m->c == NULL)
2783  {
2784  rc = MQTTCLIENT_FAILURE;
2785  goto exit;
2786  }
2787 
2788  elapsed = MQTTTime_elapsed(start);
2789  while (elapsed < timeout)
2790  {
2791  if (m->c->connected == 0)
2792  {
2794  goto exit;
2795  }
2796  if (ListFindItem(m->c->outboundMsgs, &mdt, messageIDCompare) == NULL)
2797  {
2798  rc = MQTTCLIENT_SUCCESS; /* well we couldn't find it */
2799  goto exit;
2800  }
2802  MQTTClient_yield();
2804  elapsed = MQTTTime_elapsed(start);
2805  }
2806 
2807 exit:
2809  FUNC_EXIT_RC(rc);
2810  return rc;
2811 }
2812 
2813 
2815 {
2816  int rc = MQTTCLIENT_SUCCESS;
2817  MQTTClients* m = handle;
2818  *tokens = NULL;
2819 
2820  FUNC_ENTRY;
2822 
2823  if (m == NULL)
2824  {
2825  rc = MQTTCLIENT_FAILURE;
2826  goto exit;
2827  }
2828 
2829  if (m->c && m->c->outboundMsgs->count > 0)
2830  {
2831  ListElement* current = NULL;
2832  int count = 0;
2833 
2834  *tokens = malloc(sizeof(MQTTClient_deliveryToken) * (m->c->outboundMsgs->count + 1));
2835  if (!*tokens)
2836  {
2837  rc = PAHO_MEMORY_ERROR;
2838  goto exit;
2839  }
2840  while (ListNextElement(m->c->outboundMsgs, &current))
2841  {
2842  Messages* m = (Messages*)(current->content);
2843  (*tokens)[count++] = m->msgid;
2844  }
2845  (*tokens)[count] = -1;
2846  }
2847 
2848 exit:
2850  FUNC_EXIT_RC(rc);
2851  return rc;
2852 }
2853 
2854 
2856 {
2857  Log_setTraceLevel((enum LOG_LEVELS)level);
2858 }
2859 
2860 
2862 {
2864 }
2865 
2866 
2868 {
2869  #define MAX_INFO_STRINGS 8
2870  static MQTTClient_nameValue libinfo[MAX_INFO_STRINGS + 1];
2871  int i = 0;
2872 
2873  libinfo[i].name = "Product name";
2874  libinfo[i++].value = "Eclipse Paho Synchronous MQTT C Client Library";
2875 
2876  libinfo[i].name = "Version";
2877  libinfo[i++].value = CLIENT_VERSION;
2878 
2879  libinfo[i].name = "Build level";
2880  libinfo[i++].value = BUILD_TIMESTAMP;
2881 #if defined(OPENSSL)
2882  libinfo[i].name = "OpenSSL version";
2883  libinfo[i++].value = SSLeay_version(SSLEAY_VERSION);
2884 
2885  libinfo[i].name = "OpenSSL flags";
2886  libinfo[i++].value = SSLeay_version(SSLEAY_CFLAGS);
2887 
2888  libinfo[i].name = "OpenSSL build timestamp";
2889  libinfo[i++].value = SSLeay_version(SSLEAY_BUILT_ON);
2890 
2891  libinfo[i].name = "OpenSSL platform";
2892  libinfo[i++].value = SSLeay_version(SSLEAY_PLATFORM);
2893 
2894  libinfo[i].name = "OpenSSL directory";
2895  libinfo[i++].value = SSLeay_version(SSLEAY_DIR);
2896 #endif
2897  libinfo[i].name = NULL;
2898  libinfo[i].value = NULL;
2899  return libinfo;
2900 }
2901 
2902 
2903 const char* MQTTClient_strerror(int code)
2904 {
2905  static char buf[30];
2906 
2907  switch (code) {
2908  case MQTTCLIENT_SUCCESS:
2909  return "Success";
2910  case MQTTCLIENT_FAILURE:
2911  return "Failure";
2913  return "Disconnected";
2915  return "Maximum in-flight messages amount reached";
2917  return "Invalid UTF8 string";
2919  return "Invalid (NULL) parameter";
2921  return "Topic containing NULL characters has been truncated";
2923  return "Bad structure";
2924  case MQTTCLIENT_BAD_QOS:
2925  return "Invalid QoS value";
2927  return "SSL is not supported";
2929  return "Unrecognized MQTT version";
2931  return "Invalid protocol scheme";
2933  return "Options for wrong MQTT version";
2935  return "Client created for another version of MQTT";
2937  return "Zero length will topic on connect";
2938  }
2939 
2940  sprintf(buf, "Unknown error code %d", code);
2941  return buf;
2942 }
2943 
2944 
2951 {
2952  FUNC_ENTRY;
2953  if (state.pending_writes.count > 0)
2954  {
2955  ListElement* le = state.pending_writes.first;
2956  while (le)
2957  {
2958  if (Socket_noPendingWrites(((pending_write*)(le->content))->socket))
2959  {
2961  state.pending_writes.current = le;
2962  ListRemove(&(state.pending_writes), le->content); /* does NextElement itself */
2963  le = state.pending_writes.current;
2964  }
2965  else
2966  ListNextElement(&(state.pending_writes), &le);
2967  }
2968  }
2969  FUNC_EXIT;
2970 }
2971 
2972 
2973 static void MQTTClient_writeComplete(int socket, int rc)
2974 {
2975  ListElement* found = NULL;
2976 
2977  FUNC_ENTRY;
2978  /* a partial write is now complete for a socket - this will be on a publish*/
2979 
2981 
2982  /* find the client using this socket */
2983  if ((found = ListFindItem(handles, &socket, clientSockCompare)) != NULL)
2984  {
2985  MQTTClients* m = (MQTTClients*)(found->content);
2986 
2987  m->c->net.lastSent = MQTTTime_now();
2988  }
2989  FUNC_EXIT;
2990 }
int Thread_post_sem(sem_type sem)
Definition: Thread.c:313
MQTTProperties properties
Definition: MQTTAsync.h:316
int MQTTClient_disconnect5(MQTTClient handle, int timeout, enum MQTTReasonCodes reason, MQTTProperties *props)
Definition: MQTTClient.c:1919
static int tostop
Definition: MQTTClient.c:282
int WebSocket_proxy_connect(networkHandles *net, int ssl, const char *hostname)
Definition: WebSocket.c:1445
static mutex_type connect_mutex
Definition: MQTTClient.c:249
void MQTTProtocol_removePublication(Publications *p)
MQTTProperties properties
Definition: MQTTPacket.h:176
char * topicName
Definition: MQTTAsync.c:309
int MQTTClient_waitForCompletion(MQTTClient handle, MQTTClient_deliveryToken mdt, unsigned long timeout)
Definition: MQTTClient.c:2772
List * messageQueue
Definition: Clients.h:137
int MQTTProtocol_handlePubacks(void *pack, int sock)
#define PROXY_CONNECT_IN_PROGRESS
Definition: Clients.h:110
MQTTClient_message * msg
Definition: MQTTClient.c:287
static MQTTResponse MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOptions *options, const char *serverURI, int MQTTVersion, START_TIME_TYPE start, ELAPSED_TIME_TYPE millisecsTimeout, MQTTProperties *connectProperties, MQTTProperties *willProperties)
Definition: MQTTClient.c:1164
union Connack::@62 flags
MQTTProperties properties
Definition: MQTTPacket.h:155
static int MQTTClient_disconnect_internal(MQTTClient handle, int timeout)
Definition: MQTTClient.c:1893
#define TRACE_MIN
Definition: Log.h:66
#define MQTTCLIENT_FAILURE
Definition: MQTTClient.h:136
int MQTTClient_receive(MQTTClient handle, char **topicName, int *topicLen, MQTTClient_message **message, unsigned long timeout)
Definition: MQTTClient.c:2674
char * topic
Definition: MQTTPacket.h:200
enum MQTTReasonCodes reasonCode
Definition: MQTTClient.c:332
#define WebSocket_CLOSE_NORMAL
Definition: WebSocket.h:38
const char * currentServerURI
Definition: MQTTClient.c:297
void WebSocket_close(networkHandles *net, int status_code, const char *reason)
Definition: WebSocket.c:521
int MQTTPersistence_clear(Clients *c)
MQTTResponse MQTTClient_publish5(MQTTClient handle, const char *topicName, int payloadlen, const void *payload, int qos, int retained, MQTTProperties *properties, MQTTClient_deliveryToken *deliveryToken)
Definition: MQTTClient.c:2247
#define MQTTCLIENT_SUCCESS
Definition: MQTTClient.h:131
sem_type Thread_create_sem(int *rc)
Definition: Thread.c:190
void MQTTProtocol_freeClient(Clients *client)
MQTTReasonCodes
char * serverURI
Definition: MQTTClient.c:296
string topic
Definition: test2.py:8
#define FUNC_EXIT
Definition: StackTrace.h:59
struct MQTTClient_connectOptions::@57 returned
int MQTTClient_isConnected(MQTTClient handle)
Definition: MQTTClient.c:1930
MQTTProperties props
Definition: paho_c_pub.c:54
#define MQTT_DEFAULT_PORT
static mutex_type heap_mutex
Definition: Heap.c:55
int UTF8_validateString(const char *string)
Definition: utf-8.c:156
int MQTTClient_connect(MQTTClient handle, MQTTClient_connectOptions *options)
Definition: MQTTClient.c:1644
int MQTTProtocol_handlePubcomps(void *pack, int sock)
int SSLSocket_setSocketForSSL(networkHandles *net, MQTTClient_SSLOptions *opts, const char *hostname, size_t hostname_len)
void WebSocket_terminate(void)
Definition: WebSocket.c:1257
int msgId
Definition: MQTTPacket.h:217
int MQTTVersion
Definition: Clients.h:146
static int MQTTClient_cleanSession(Clients *client)
Definition: MQTTClient.c:1090
static pthread_mutex_t socket_mutex_store
Definition: MQTTClient.c:239
int MQTTClient_publish(MQTTClient handle, const char *topicName, int payloadlen, const void *payload, int qos, int retained, MQTTClient_deliveryToken *deliveryToken)
Definition: MQTTClient.c:2387
int msgId
Definition: MQTTPacket.h:202
MQTTResponse MQTTClient_unsubscribeMany5(MQTTClient handle, int count, char *const *topic, MQTTProperties *props)
Definition: MQTTClient.c:2118
void Log_setTraceLevel(enum LOG_LEVELS level)
Definition: Log.c:224
int MQTTProtocol_connect(const char *ip_address, Clients *aClient, int websocket, int MQTTVersion, MQTTProperties *connectProperties, MQTTProperties *willProperties)
DIFF_TIME_TYPE MQTTTime_difftime(START_TIME_TYPE new, START_TIME_TYPE old)
Definition: MQTTTime.c:99
ListElement * current
Definition: LinkedList.h:69
int MQTTClient_messageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *message)
Definition: MQTTClient.h:359
void MQTTClient_connectionLost(void *context, char *cause)
Definition: MQTTClient.h:398
bool retain
Definition: MQTTPacket.h:77
LOG_LEVELS
Definition: Log.h:35
#define MQTTCLIENT_PERSISTENCE_ERROR
int MQTTClient_disconnect(MQTTClient handle, int timeout)
Definition: MQTTClient.c:1908
static mutex_type mqttclient_mutex
Definition: MQTTClient.c:237
void MQTTProperties_free(MQTTProperties *props)
#define MQTTCLIENT_BAD_QOS
Definition: MQTTClient.h:171
#define DISCONNECTING
Definition: Clients.h:112
MQTTClient_published * published
Definition: MQTTClient.c:311
static int MQTTClient_deliverMessage(int rc, MQTTClients *m, char **topicName, int *topicLen, MQTTClient_message **message)
Definition: MQTTClient.c:634
void MQTTPacket_freeConnack(Connack *pack)
int Socket_noPendingWrites(int socket)
Definition: Socket.c:417
struct Connack::@62::@63 bits
static int published
Definition: paho_c_pub.c:175
void MQTTClient_disconnected(void *context, MQTTProperties *properties, enum MQTTReasonCodes reasonCode)
Definition: MQTTClient.h:441
int msgID
Definition: Clients.h:130
#define PAHO_MEMORY_ERROR
Definition: Heap.h:26
struct Header::@59 bits
const MQTTClient_nameValue * httpHeaders
Definition: Clients.h:94
int MQTTPacket_send_connect(Clients *client, int MQTTVersion, MQTTProperties *connectProperties, MQTTProperties *willProperties)
Definition: MQTTPacketOut.c:48
const char * enabledCipherSuites
Definition: MQTTClient.h:696
int Socket_getReadySocket(int more_work, struct timeval *tp, mutex_type mutex)
Definition: Socket.c:242
void * context
Definition: MQTTClient.c:306
int MQTTProtocol_handlePublishes(void *pack, int sock)
int Thread_lock_mutex(mutex_type mutex)
Definition: Thread.c:112
char * payload
Definition: MQTTPacket.h:203
int MQTTPacket_send_disconnect(Clients *client, enum MQTTReasonCodes reason, MQTTProperties *props)
Definition: MQTTPacket.c:508
int SSLSocket_connect(SSL *ssl, int sock, const char *hostname, int verify, int(*cb)(const char *str, size_t len, void *u), void *u)
List pending_writes
Definition: MQTTProtocol.h:40
List * outboundMsgs
Definition: Clients.h:136
void * context
Definition: Clients.h:145
struct pubsub_opts opts
Definition: paho_c_pub.c:42
static void MQTTClient_emptyMessageQueue(Clients *client)
Definition: MQTTClient.c:535
#define malloc(x)
Definition: Heap.h:41
static thread_return_type WINAPI connectionLost_call(void *context)
Definition: MQTTClient.c:673
static l_noret error(LoadState *S, const char *why)
Definition: lundump.c:40
int topiclen
Definition: MQTTPacket.h:201
int MQTTClient_createWithOptions(MQTTClient *handle, const char *serverURI, const char *clientId, int persistence_type, void *persistence_context, MQTTClient_createOptions *options)
Definition: MQTTClient.c:364
#define MQTTCLIENT_BAD_MQTT_VERSION
Definition: MQTTClient.h:179
int MQTTClient_init(void)
Definition: MQTTClient.c:251
Header header
Definition: MQTTPacket.h:164
#define MQTTCLIENT_BAD_STRUCTURE
Definition: MQTTClient.h:167
int ListRemove(List *aList, void *content)
Definition: LinkedList.c:257
MQTTResponse MQTTClient_unsubscribe5(MQTTClient handle, const char *topic, MQTTProperties *props)
Definition: MQTTClient.c:2230
long elapsed(START_TIME_TYPE start_time)
Definition: test1.c:233
#define START_TIME_ZERO
Definition: MQTTTime.h:37
signed int connect_state
Definition: Clients.h:128
#define MQTTCLIENT_MAX_MESSAGES_INFLIGHT
Definition: MQTTClient.h:148
START_TIME_TYPE MQTTTime_start_clock(void)
Definition: MQTTTime.c:55
enum MQTTReasonCodes * reasonCodes
Definition: MQTTClient.h:993
void Log_setTraceCallback(Log_traceCallback *callback)
Definition: Log.c:218
MQTTProperties properties
Definition: MQTTPacket.h:189
int MQTTProperties_getNumericValue(MQTTProperties *props, enum MQTTPropertyCodes propid)
START_TIME_TYPE lastTouch
Definition: Clients.h:61
#define MQTTClient_message_initializer
Definition: MQTTClient.h:327
enum MQTTReasonCodes reasonCode
Definition: MQTTClient.h:991
#define MQTTCLIENT_WRONG_MQTT_VERSION
Definition: MQTTClient.h:193
#define MQTTCLIENT_BAD_UTF8_STRING
Definition: MQTTClient.h:152
void MQTTResponse_free(MQTTResponse response)
Definition: MQTTClient.c:620
#define URI_TCP
Definition: MQTTClient.c:73
const char * client_timestamp_eye
Definition: MQTTClient.c:80
static mutex_type stack_mutex
Definition: StackTrace.c:73
#define MAX_INFO_STRINGS
static mutex_type unsubscribe_mutex
Definition: MQTTClient.c:246
MQTTCLIENT_TRACE_LEVELS
Definition: MQTTClient.h:1377
#define free(x)
Definition: Heap.h:55
int passwordlen
Definition: Clients.h:121
unsigned int type
Definition: MQTTPacket.h:80
networkHandles net
Definition: Clients.h:129
unsigned int connected
Definition: Clients.h:125
int MQTTClient_setDisconnected(MQTTClient handle, void *context, MQTTClient_disconnected *disconnected)
Definition: MQTTClient.c:682
void MQTTClient_setTraceLevel(enum MQTTCLIENT_TRACE_LEVELS level)
Definition: MQTTClient.c:2855
sem_type connack_sem
Definition: MQTTClient.c:321
const char * value
Definition: MQTTClient.h:781
void MQTTClient_global_init(MQTTClient_init_options *inits)
Definition: MQTTClient.c:85
const char * privateKey
Definition: MQTTClient.h:683
static pthread_mutex_t connect_mutex_store
Definition: MQTTClient.c:248
void Socket_setWriteCompleteCallback(Socket_writeComplete *mywritecomplete)
Definition: Socket.c:852
static thread_return_type WINAPI MQTTClient_run(void *n)
Definition: MQTTClient.c:788
unsigned int good
Definition: Clients.h:126
MQTTClient_messageArrived * ma
Definition: MQTTClient.c:304
int MQTTProtocol_handleUnsubacks(void *pack, int sock)
MQTTPacket * pack
Definition: MQTTClient.c:324
int MQTTClient_setCallbacks(MQTTClient handle, void *context, MQTTClient_connectionLost *cl, MQTTClient_messageArrived *ma, MQTTClient_deliveryComplete *dc)
Definition: MQTTClient.c:1032
void ListEmpty(List *aList)
Definition: LinkedList.c:359
MQTTProperties MQTTProperties_copy(const MQTTProperties *props)
#define MQTTCLIENT_BAD_MQTT_OPTION
Definition: MQTTClient.h:189
MQTTClient_SSLOptions * ssl
Definition: MQTTClient.h:895
static MQTTPacket * MQTTClient_cycle(int *sock, ELAPSED_TIME_TYPE timeout, int *rc)
Definition: MQTTClient.c:2468
static void MQTTClient_retry(void)
Definition: MQTTClient.c:2449
void MQTTClient_setTraceCallback(MQTTClient_traceCallback *callback)
Definition: MQTTClient.c:2861
constexpr size_t count()
Definition: core.h:960
int MQTTClient_publishMessage(MQTTClient handle, const char *topicName, MQTTClient_message *message, MQTTClient_deliveryToken *deliveryToken)
Definition: MQTTClient.c:2432
static int MQTTClient_disconnect1(MQTTClient handle, int timeout, int internal, int stop, enum MQTTReasonCodes, MQTTProperties *)
Definition: MQTTClient.c:1846
#define FUNC_EXIT_RC(x)
Definition: StackTrace.h:63
#define SOCKET_ERROR
Definition: Socket.h:76
int topicLen
Definition: MQTTAsync.c:310
List * clients
Definition: Clients.h:163
static int clientSockCompare(void *a, void *b)
Definition: MQTTClient.c:660
int count
Definition: LinkedList.h:72
static pthread_mutex_t unsubscribe_mutex_store
Definition: MQTTClient.c:245
ListElement * ListNextElement(List *aList, ListElement **pos)
Definition: LinkedList.c:411
const char * MQTTClient_strerror(int code)
Definition: MQTTClient.c:2903
List * reasonCodes
Definition: MQTTPacket.h:190
int keepAliveInterval
Definition: Clients.h:131
bool dup
Definition: MQTTPacket.h:79
void MQTTProtocol_keepalive(START_TIME_TYPE now)
#define SSL_IN_PROGRESS
Definition: Clients.h:104
ClientStates * bstate
Definition: MQTTClient.c:99
#define MQTTCLIENT_0_LEN_WILL_TOPIC
Definition: MQTTClient.h:197
static pthread_mutex_t subscribe_mutex_store
Definition: MQTTClient.c:242
void * published_context
Definition: MQTTClient.c:312
static void MQTTClient_writeComplete(int socket, int rc)
Definition: MQTTClient.c:2973
MQTTResponse MQTTClient_publishMessage5(MQTTClient handle, const char *topicName, MQTTClient_message *message, MQTTClient_deliveryToken *deliveryToken)
Definition: MQTTClient.c:2401
static mutex_type log_mutex
Definition: Log.c:128
unsigned int(* ssl_psk_cb)(const char *hint, char *identity, unsigned int max_identity_len, unsigned char *psk, unsigned int max_psk_len, void *u)
Definition: MQTTClient.h:738
MQTTClient_nameValue * MQTTClient_getVersionInfo(void)
Definition: MQTTClient.c:2867
static MQTTResponse MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectOptions *options, const char *serverURI, MQTTProperties *connectProperties, MQTTProperties *willProperties)
Definition: MQTTClient.c:1443
#define MQTTCLIENT_BAD_PROTOCOL
Definition: MQTTClient.h:185
int payloadlen
Definition: MQTTPacket.h:204
MQTTClient_persistence * persistence
Definition: Clients.h:140
void SSLSocket_handleOpensslInit(int bool_value)
int MQTTClient_unsubscribe(MQTTClient handle, const char *topic)
Definition: MQTTClient.c:2239
static ClientStates ClientState
Definition: MQTTClient.c:93
int Thread_unlock_mutex(mutex_type mutex)
Definition: Thread.c:133
int payloadlen
Definition: Clients.h:72
int packet_type
Definition: test10.c:1111
struct MQTTClient_willOptions::@56 payload
void MQTTClient_published(void *context, int dt, int packet_type, MQTTProperties *properties, enum MQTTReasonCodes reasonCode)
Definition: MQTTClient.h:482
void Log(enum LOG_LEVELS log_level, int msgno, const char *format,...)
Definition: Log.c:417
int WebSocket_upgrade(networkHandles *net)
Definition: WebSocket.c:1306
void * MQTTPacket_Factory(int MQTTVersion, networkHandles *net, int *error)
Definition: MQTTPacket.c:103
unsigned char rc
Definition: MQTTPacket.h:218
#define MQTTCLIENT_NULL_PARAMETER
Definition: MQTTClient.h:156
static void MQTTClient_closeSession(Clients *client, enum MQTTReasonCodes reason, MQTTProperties *props)
Definition: MQTTClient.c:1057
void MQTTProtocol_retry(START_TIME_TYPE now, int doRetry, int regardless)
int msgid
Definition: Clients.h:57
int qos
Definition: test6.c:56
unsigned int protos_len
Definition: MQTTClient.h:766
MQTTProperties properties
Definition: MQTTPacket.h:206
const unsigned char * protos
Definition: MQTTClient.h:760
const char * topicName
Definition: MQTTClient.h:619
void * payload
Definition: Clients.h:73
MQTTClients * m
Definition: MQTTClient.c:330
void MQTTTime_sleep(ELAPSED_TIME_TYPE milliseconds)
Definition: MQTTTime.c:27
static thread_return_type WINAPI call_disconnected(void *context)
Definition: MQTTClient.c:711
int MQTTProtocol_unsubscribe(Clients *client, List *topics, int msgID, MQTTProperties *props)
#define START_TIME_TYPE
Definition: MQTTTime.h:36
#define mutex_type
Definition: mutex_type.h:22
int retained
Definition: Clients.h:74
void MQTTClient_freeMessage(MQTTClient_message **message)
Definition: MQTTClient.c:601
Definition: Log.h:41
const char * keyStore
Definition: MQTTClient.h:678
ListElement * ListAppend(List *aList, void *content, size_t size)
Definition: LinkedList.c:90
int SSLSocket_getPendingRead(void)
void * MQTTClient
Definition: MQTTClient.h:246
List * qoss
Definition: MQTTPacket.h:177
void MQTTClient_yield(void)
Definition: MQTTClient.c:2730
#define URI_WS
Definition: MQTTClient.c:74
static mutex_type subscribe_mutex
Definition: MQTTClient.c:243
int MQTTPersistence_restoreMessageQueue(Clients *c)
#define WEBSOCKET_IN_PROGRESS
Definition: Clients.h:106
static thread_id_type run_id
Definition: MQTTClient.c:283
void MQTTClient_destroy(MQTTClient *handle)
Definition: MQTTClient.c:556
void Protocol_processPublication(Publish *publish, Clients *client, int allocatePayload)
Definition: MQTTClient.c:1107
Clients * c
Definition: MQTTClient.c:302
int maxInflightMessages
Definition: Clients.h:133
MQTTResponse MQTTClient_connect5(MQTTClient handle, MQTTClient_connectOptions *options, MQTTProperties *connectProperties, MQTTProperties *willProperties)
Definition: MQTTClient.c:1658
int MQTTPersistence_close(Clients *c)
#define MQTTCLIENT_PERSISTENCE_DEFAULT
#define MQTTCLIENT_SSL_NOT_SUPPORTED
Definition: MQTTClient.h:175
const MQTTClient_nameValue * httpHeaders
Definition: MQTTClient.h:950
void MQTTClient_traceCallback(enum MQTTCLIENT_TRACE_LEVELS level, char *message)
Definition: MQTTClient.h:1405
MQTTProperties properties
Definition: MQTTClient.h:324
MQTTAsync client
Definition: test6.c:276
void * disconnected_context
Definition: MQTTClient.c:309
void MQTTClient_deliveryComplete(void *context, MQTTClient_deliveryToken dt)
Definition: MQTTClient.h:381
static volatile int library_initialized
Definition: MQTTClient.c:279
int SSLSocket_close(networkHandles *net)
int MQTTProperties_hasProperty(MQTTProperties *props, enum MQTTPropertyCodes propid)
#define FUNC_ENTRY
Definition: StackTrace.h:55
unsigned char rc
Definition: MQTTPacket.h:153
unsigned int cleanstart
Definition: Clients.h:124
START_TIME_TYPE lastSent
Definition: Clients.h:81
#define MQTT_BAD_SUBSCRIBE
Definition: MQTTAsync.h:211
const char * trustStore
Definition: MQTTClient.h:673
const char * client_version_eye
Definition: MQTTClient.c:81
unsigned int ping_outstanding
Definition: Clients.h:127
int MQTTClient_setPublished(MQTTClient handle, void *context, MQTTClient_published *published)
Definition: MQTTClient.c:723
static mutex_type socket_mutex
Definition: MQTTClient.c:240
char * MQTTStrdup(const char *src)
int Thread_wait_sem(sem_type sem, int timeout)
Definition: Thread.c:230
int SSLSocket_initialize(void)
MQTTProperties * properties
Definition: MQTTClient.c:331
int MQTTClient_subscribe(MQTTClient handle, const char *topic, int qos)
Definition: MQTTClient.c:2104
void Heap_terminate(void)
Definition: Heap.c:417
static int retryLoopInterval
Definition: MQTTClient.c:1428
#define MQTTVERSION_5
Definition: MQTTAsync.h:207
int MQTTProtocol_handlePingresps(void *pack, int sock)
int Heap_initialize(void)
Definition: Heap.c:406
int reasonCodeCount
Definition: MQTTClient.h:992
MQTTResponse MQTTClient_subscribe5(MQTTClient handle, const char *topic, int qos, MQTTSubscribe_options *opts, MQTTProperties *props)
Definition: MQTTClient.c:2090
int MQTTPersistence_unpersistQueueEntry(Clients *client, MQTTPersistence_qEntry *qe)
MQTTClient c
Definition: test10.c:1656
int messageIDCompare(void *a, void *b)
static void MQTTClient_terminate(void)
Definition: MQTTClient.c:515
int MQTTPersistence_create(MQTTClient_persistence **persistence, int type, void *pcontext)
int retained
Definition: test6.c:57
dictionary context
Definition: test2.py:57
pending_writes * SocketBuffer_getWrite(int socket)
Definition: SocketBuffer.c:399
void MQTTProtocol_emptyMessageList(List *msgList)
#define NOT_IN_PROGRESS
Definition: Clients.h:100
#define thread_return_type
Definition: Thread.h:44
thread_id_type Thread_getid(void)
Definition: Thread.c:176
const void * password
Definition: Clients.h:122
void MQTTClient_free(void *memory)
Definition: MQTTClient.c:612
int Log_initialize(Log_nameValue *info)
Definition: Log.c:132
static int disconnected
Definition: paho_c_pub.c:80
#define URI_SSL
Definition: MQTTClient.c:68
#define WINAPI
Definition: MQTTClient.c:276
char * topic
Definition: Clients.h:71
#define MQTTCLIENT_DISCONNECTED
Definition: MQTTClient.h:143
#define MQTTResponse_initializer
Definition: MQTTClient.h:997
START_TIME_TYPE MQTTTime_now(void)
Definition: MQTTTime.c:66
sem_t * sem_type
Definition: Thread.h:53
int MQTTProtocol_handlePubrels(void *pack, int sock)
MQTTAsync_message * msg
Definition: MQTTAsync.c:308
#define TCP_IN_PROGRESS
Definition: Clients.h:102
MQTTResponse MQTTClient_connectAll(MQTTClient handle, MQTTClient_connectOptions *options, MQTTProperties *connectProperties, MQTTProperties *willProperties)
Definition: MQTTClient.c:1674
void MQTTProtocol_closeSession(Clients *c, int sendwill)
Definition: MQTTClient.c:1902
int MQTTProtocol_subscribe(Clients *client, List *topics, List *qoss, int msgID, MQTTSubscribe_options *opts, MQTTProperties *props)
int MQTTClient_subscribeMany(MQTTClient handle, int count, char *const *topic, int *qos)
Definition: MQTTClient.c:2075
int MQTTClient_deliveryToken
Definition: MQTTClient.h:257
int MQTTClient_getPendingDeliveryTokens(MQTTClient handle, MQTTClient_deliveryToken **tokens)
Definition: MQTTClient.c:2814
List * ListInitialize(void)
Definition: LinkedList.c:52
#define ELAPSED_TIME_TYPE
Definition: MQTTTime.h:40
Header header
Definition: MQTTPacket.h:199
#define WAIT_FOR_CONNACK
Definition: Clients.h:108
int WebSocket_connect(networkHandles *net, const char *uri)
Definition: WebSocket.c:383
void Log_traceCallback(enum LOG_LEVELS level, const char *message)
Definition: Log.h:81
int MQTTProtocol_handleSubacks(void *pack, int sock)
int MQTTClient_unsubscribeMany(MQTTClient handle, int count, char *const *topic)
Definition: MQTTClient.c:2222
char * clientID
Definition: Clients.h:119
thread_type Thread_start(thread_fn fn, void *parameter)
Definition: Thread.c:60
#define thread_id_type
Definition: Thread.h:43
size_t MQTTProtocol_addressPort(const char *uri, int *port, const char **topic, int default_port)
int Thread_destroy_sem(sem_type sem)
Definition: Thread.c:341
static void setRetryLoopInterval(int keepalive)
Definition: MQTTClient.c:1430
void ListFreeNoContent(List *aList)
Definition: LinkedList.c:392
#define MQTTCLIENT_TOPICNAME_TRUNCATED
Definition: MQTTClient.h:162
int MQTTProtocol_handlePubrecs(void *pack, int sock)
#define TCPSOCKET_COMPLETE
Definition: Socket.h:73
static void MQTTProtocol_checkPendingWrites(void)
Definition: MQTTClient.c:2950
int MQTTProtocol_startPublish(Clients *pubclient, Publish *publish, int qos, int retained, Messages **mm)
int MQTTVersion
Definition: MQTTPacket.h:205
MQTTProperties properties
Definition: MQTTPacket.h:220
ListElement * ListFindItem(List *aList, void *content, int(*callback)(void *, void *))
Definition: LinkedList.c:154
ListElement * first
Definition: LinkedList.h:69
const char * privateKeyPassword
Definition: MQTTClient.h:686
List * inboundMsgs
Definition: Clients.h:135
MQTTProtocol state
Definition: MQTTClient.c:101
char *const * serverURIs
Definition: MQTTClient.h:913
void Socket_close(int socket)
Definition: Socket.c:627
int retryInterval
Definition: Clients.h:132
sem_type suback_sem
Definition: MQTTClient.c:322
char * topics[]
MQTTClient_connectionLost * cl
Definition: MQTTClient.c:303
const char * CApath
Definition: MQTTClient.h:719
#define MQTTVERSION_3_1
Definition: MQTTAsync.h:199
const char * message
Definition: MQTTClient.h:621
static int running
Definition: MQTTClient.c:281
sem_type unsuback_sem
Definition: MQTTClient.c:323
void ListFree(List *aList)
Definition: LinkedList.c:381
enum MQTTReasonCodes rc
Definition: test10.c:1112
int MQTTPersistence_persistQueueEntry(Clients *aclient, MQTTPersistence_qEntry *qe)
void Socket_outInitialize(void)
Definition: Socket.c:122
MQTTClient_deliveryComplete * dc
Definition: MQTTClient.c:305
int MQTTClient_create(MQTTClient *handle, const char *serverURI, const char *clientId, int persistence_type, void *persistence_context)
Definition: MQTTClient.c:507
ELAPSED_TIME_TYPE MQTTTime_elapsed(START_TIME_TYPE milliseconds)
Definition: MQTTTime.c:109
static int MQTTClient_stop(void)
Definition: MQTTClient.c:988
const char * username
Definition: Clients.h:120
const char * name
Definition: MQTTClient.h:780
static List * handles
Definition: MQTTClient.c:280
#define MQTTVERSION_DEFAULT
Definition: MQTTAsync.h:195
MQTTClient_willOptions * will
Definition: MQTTClient.h:866
char * http_proxy
Definition: Clients.h:90
int(* ssl_error_cb)(const char *str, size_t len, void *u)
Definition: MQTTClient.h:725
unsigned int qos
Definition: MQTTPacket.h:78
int MQTTPersistence_initialize(Clients *c, const char *serverURI)
uint8_t mask[4]
Definition: MQTTPacket.h:207
sem_type connect_sem
Definition: MQTTClient.c:319
MQTTProperties * properties
Definition: MQTTClient.h:994
#define TCPSOCKET_INTERRUPTED
Definition: Socket.h:79
unsigned int cleansession
Definition: Clients.h:123
void Log_terminate(void)
Definition: Log.c:232
int len
Definition: utf-8.c:46
MQTTClient_disconnected * disconnected
Definition: MQTTClient.c:308
static pthread_mutex_t mqttclient_mutex_store
Definition: MQTTClient.c:236
MQTTResponse MQTTClient_subscribeMany5(MQTTClient handle, int count, char *const *topic, int *qos, MQTTSubscribe_options *opts, MQTTProperties *props)
Definition: MQTTClient.c:1945
static MQTTPacket * MQTTClient_waitfor(MQTTClient handle, int packet_type, int *rc, ELAPSED_TIME_TYPE timeout)
Definition: MQTTClient.c:2565
int MQTTProtocol_assignMsgId(Clients *client)
struct MQTTClient_connectOptions::@58 binarypwd
#define SSL_FATAL
Definition: Socket.h:80
struct Options options
willMessages * will
Definition: Clients.h:134
#define URI_WSS
Definition: MQTTClient.c:75
#define MQTTProperties_initializer


plotjuggler
Author(s): Davide Faconti
autogenerated on Sun Dec 6 2020 03:48:09